diff options
author | R David Murray <rdmurray@bitdance.com> | 2015-05-11 16:11:40 (GMT) |
---|---|---|
committer | R David Murray <rdmurray@bitdance.com> | 2015-05-11 16:11:40 (GMT) |
commit | a33df31629f2f6ed85890baa9b4e71c30efa95a9 (patch) | |
tree | 00602e680a45fbb39ac9127995764b92ee096488 /Lib/test/test_smtpd.py | |
parent | 0d905d4fcdb39119f763afd4036cfaea78a2ae5b (diff) | |
download | cpython-a33df31629f2f6ed85890baa9b4e71c30efa95a9.zip cpython-a33df31629f2f6ed85890baa9b4e71c30efa95a9.tar.gz cpython-a33df31629f2f6ed85890baa9b4e71c30efa95a9.tar.bz2 |
#21795: advertise 8BITMIME if decode_data is False.
Patch by Milan Oberkirch, with a few updates. This changeset also
tweaks the smtpd and whatsnew docs for smtpd into what should be
the final form for the 3.5 release.
Diffstat (limited to 'Lib/test/test_smtpd.py')
-rw-r--r-- | Lib/test/test_smtpd.py | 129 |
1 files changed, 108 insertions, 21 deletions
diff --git a/Lib/test/test_smtpd.py b/Lib/test/test_smtpd.py index 6eb47f1..1aa55d2 100644 --- a/Lib/test/test_smtpd.py +++ b/Lib/test/test_smtpd.py @@ -16,13 +16,12 @@ class DummyServer(smtpd.SMTPServer): else: self.return_status = b'return status' - def process_message(self, peer, mailfrom, rcpttos, data): + def process_message(self, peer, mailfrom, rcpttos, data, **kw): self.messages.append((peer, mailfrom, rcpttos, data)) if data == self.return_status: return '250 Okish' - - def process_smtputf8_message(self, *args, **kwargs): - return '250 SMTPUTF8 message okish' + if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']: + return '250 SMTPUTF8 message okish' class DummyDispatcherBroken(Exception): @@ -54,22 +53,6 @@ class SMTPDServerTest(unittest.TestCase): write_line(b'DATA') self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') - def test_process_smtputf8_message_unimplemented(self): - server = smtpd.SMTPServer((support.HOST, 0), ('b', 0), - enable_SMTPUTF8=True) - conn, addr = server.accept() - channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) - - def write_line(line): - channel.socket.queue_recv(line) - channel.handle_read() - - write_line(b'EHLO example') - write_line(b'MAIL From: <eggs@example> BODY=8BITMIME SMTPUTF8') - write_line(b'RCPT To: <spam@example>') - write_line(b'DATA') - self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') - def test_decode_data_default_warns(self): with self.assertWarns(DeprecationWarning): smtpd.SMTPServer((support.HOST, 0), ('b', 0)) @@ -168,7 +151,8 @@ class DebuggingServerTest(unittest.TestCase): enable_SMTPUTF8=True) stdout = s.getvalue() self.assertEqual(stdout, textwrap.dedent("""\ - ----- SMTPUTF8 MESSAGE FOLLOWS ------ + ---------- MESSAGE FOLLOWS ---------- + mail options: ['BODY=8BITMIME', 'SMTPUTF8'] b'From: test' b'X-Peer: peer-address' b'' @@ -201,6 +185,109 @@ class TestFamilyDetection(unittest.TestCase): self.assertEqual(server.socket.family, socket.AF_INET) +class TestRcptOptionParsing(unittest.TestCase): + error_response = (b'555 RCPT TO parameters not recognized or not ' + b'implemented\r\n') + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, channel, line): + channel.socket.queue_recv(line) + channel.handle_read() + + def test_params_rejected(self): + server = DummyServer((support.HOST, 0), ('b', 0), decode_data=False) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False) + self.write_line(channel, b'EHLO example') + self.write_line(channel, b'MAIL from: <foo@example.com> size=20') + self.write_line(channel, b'RCPT to: <foo@example.com> foo=bar') + self.assertEqual(channel.socket.last, self.error_response) + + def test_nothing_accepted(self): + server = DummyServer((support.HOST, 0), ('b', 0), decode_data=False) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False) + self.write_line(channel, b'EHLO example') + self.write_line(channel, b'MAIL from: <foo@example.com> size=20') + self.write_line(channel, b'RCPT to: <foo@example.com>') + self.assertEqual(channel.socket.last, b'250 OK\r\n') + + +class TestMailOptionParsing(unittest.TestCase): + error_response = (b'555 MAIL FROM parameters not recognized or not ' + b'implemented\r\n') + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, channel, line): + channel.socket.queue_recv(line) + channel.handle_read() + + def test_with_decode_data_true(self): + server = DummyServer((support.HOST, 0), ('b', 0), decode_data=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) + self.write_line(channel, b'EHLO example') + for line in [ + b'MAIL from: <foo@example.com> size=20 SMTPUTF8', + b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME', + b'MAIL from: <foo@example.com> size=20 BODY=UNKNOWN', + b'MAIL from: <foo@example.com> size=20 body=8bitmime', + ]: + self.write_line(channel, line) + self.assertEqual(channel.socket.last, self.error_response) + self.write_line(channel, b'MAIL from: <foo@example.com> size=20') + self.assertEqual(channel.socket.last, b'250 OK\r\n') + + def test_with_decode_data_false(self): + server = DummyServer((support.HOST, 0), ('b', 0), decode_data=False) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False) + self.write_line(channel, b'EHLO example') + for line in [ + b'MAIL from: <foo@example.com> size=20 SMTPUTF8', + b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME', + ]: + self.write_line(channel, line) + self.assertEqual(channel.socket.last, self.error_response) + self.write_line( + channel, + b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN') + self.assertEqual( + channel.socket.last, + b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n') + self.write_line( + channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime') + self.assertEqual(channel.socket.last, b'250 OK\r\n') + + def test_with_enable_smtputf8_true(self): + server = DummyServer((support.HOST, 0), ('b', 0), enable_SMTPUTF8=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) + self.write_line(channel, b'EHLO example') + self.write_line( + channel, + b'MAIL from: <foo@example.com> size=20 body=8bitmime smtputf8') + self.assertEqual(channel.socket.last, b'250 OK\r\n') + + class SMTPDChannelTest(unittest.TestCase): def setUp(self): smtpd.socket = asyncore.socket = mock_socket |