diff options
author | R David Murray <rdmurray@bitdance.com> | 2014-08-09 20:40:49 (GMT) |
---|---|---|
committer | R David Murray <rdmurray@bitdance.com> | 2014-08-09 20:40:49 (GMT) |
commit | 2539e6744b1405404c9e2c02af33381bd349106e (patch) | |
tree | c0d7b49152de3788e15df46c9560124c6208b6c9 /Lib/test/test_smtpd.py | |
parent | ae04ba1952fe7610fb93cc127b9ac6fb3782cef4 (diff) | |
download | cpython-2539e6744b1405404c9e2c02af33381bd349106e.zip cpython-2539e6744b1405404c9e2c02af33381bd349106e.tar.gz cpython-2539e6744b1405404c9e2c02af33381bd349106e.tar.bz2 |
#21725: Add RFC 6531 (SMTPUTF8) support to smtpd.
Patch by Milan Oberkirch, developed as part of his 2014 GSOC project.
Note that this also fixes a bug in mock_socket ('getpeername' was returning a
simple string instead of the tuple required for IPvX protocols), a bug in
DebugServer with respect to handling binary data (should have been fixed when
decode_data was introduced, but wasn't found until this patch was written),
and a long-standing bug in DebugServer (it was printing an extra blank line at
the end of the displayed message text).
Diffstat (limited to 'Lib/test/test_smtpd.py')
-rw-r--r-- | Lib/test/test_smtpd.py | 256 |
1 files changed, 243 insertions, 13 deletions
diff --git a/Lib/test/test_smtpd.py b/Lib/test/test_smtpd.py index caeb797..6eb47f1 100644 --- a/Lib/test/test_smtpd.py +++ b/Lib/test/test_smtpd.py @@ -1,4 +1,5 @@ import unittest +import textwrap from test import support, mock_socket import socket import io @@ -7,11 +8,10 @@ import asyncore class DummyServer(smtpd.SMTPServer): - def __init__(self, localaddr, remoteaddr, decode_data=True): - smtpd.SMTPServer.__init__(self, localaddr, remoteaddr, - decode_data=decode_data) + def __init__(self, *args, **kwargs): + smtpd.SMTPServer.__init__(self, *args, **kwargs) self.messages = [] - if decode_data: + if self._decode_data: self.return_status = 'return status' else: self.return_status = b'return status' @@ -21,6 +21,9 @@ class DummyServer(smtpd.SMTPServer): if data == self.return_status: return '250 Okish' + def process_smtputf8_message(self, *args, **kwargs): + return '250 SMTPUTF8 message okish' + class DummyDispatcherBroken(Exception): pass @@ -51,10 +54,128 @@ class SMTPDServerTest(unittest.TestCase): write_line(b'DATA') self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') + def test_process_smtputf8_message_unimplemented(self): + server = smtpd.SMTPServer((support.HOST, 0), ('b', 0), + enable_SMTPUTF8=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) + + def write_line(line): + channel.socket.queue_recv(line) + channel.handle_read() + + write_line(b'EHLO example') + write_line(b'MAIL From: <eggs@example> BODY=8BITMIME SMTPUTF8') + write_line(b'RCPT To: <spam@example>') + write_line(b'DATA') + self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') + def test_decode_data_default_warns(self): with self.assertWarns(DeprecationWarning): smtpd.SMTPServer((support.HOST, 0), ('b', 0)) + def test_decode_data_and_enable_SMTPUTF8_raises(self): + self.assertRaises( + ValueError, + smtpd.SMTPServer, + (support.HOST, 0), + ('b', 0), + enable_SMTPUTF8=True, + decode_data=True) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + + +class DebuggingServerTest(unittest.TestCase): + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + + def send_data(self, channel, data, enable_SMTPUTF8=False): + def write_line(line): + channel.socket.queue_recv(line) + channel.handle_read() + write_line(b'EHLO example') + if enable_SMTPUTF8: + write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8') + else: + write_line(b'MAIL From:eggs@example') + write_line(b'RCPT To:spam@example') + write_line(b'DATA') + write_line(data) + write_line(b'.') + + def test_process_message_with_decode_data_true(self): + server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), + decode_data=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) + with support.captured_stdout() as s: + self.send_data(channel, b'From: test\n\nhello\n') + stdout = s.getvalue() + self.assertEqual(stdout, textwrap.dedent("""\ + ---------- MESSAGE FOLLOWS ---------- + From: test + X-Peer: peer-address + + hello + ------------ END MESSAGE ------------ + """)) + + def test_process_message_with_decode_data_false(self): + server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), + decode_data=False) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False) + with support.captured_stdout() as s: + self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') + stdout = s.getvalue() + self.assertEqual(stdout, textwrap.dedent("""\ + ---------- MESSAGE FOLLOWS ---------- + b'From: test' + b'X-Peer: peer-address' + b'' + b'h\\xc3\\xa9llo\\xff' + ------------ END MESSAGE ------------ + """)) + + def test_process_message_with_enable_SMTPUTF8_true(self): + server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), + enable_SMTPUTF8=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) + with support.captured_stdout() as s: + self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') + stdout = s.getvalue() + self.assertEqual(stdout, textwrap.dedent("""\ + ---------- MESSAGE FOLLOWS ---------- + b'From: test' + b'X-Peer: peer-address' + b'' + b'h\\xc3\\xa9llo\\xff' + ------------ END MESSAGE ------------ + """)) + + def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self): + server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), + enable_SMTPUTF8=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) + with support.captured_stdout() as s: + self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n', + enable_SMTPUTF8=True) + stdout = s.getvalue() + self.assertEqual(stdout, textwrap.dedent("""\ + ----- SMTPUTF8 MESSAGE FOLLOWS ------ + b'From: test' + b'X-Peer: peer-address' + b'' + b'h\\xc3\\xa9llo\\xff' + ------------ END MESSAGE ------------ + """)) + def tearDown(self): asyncore.close_all() asyncore.socket = smtpd.socket = socket @@ -85,7 +206,8 @@ class SMTPDChannelTest(unittest.TestCase): smtpd.socket = asyncore.socket = mock_socket self.old_debugstream = smtpd.DEBUGSTREAM self.debug = smtpd.DEBUGSTREAM = io.StringIO() - self.server = DummyServer((support.HOST, 0), ('b', 0)) + self.server = DummyServer((support.HOST, 0), ('b', 0), + decode_data=True) conn, addr = self.server.accept() self.channel = smtpd.SMTPChannel(self.server, conn, addr, decode_data=True) @@ -102,7 +224,7 @@ class SMTPDChannelTest(unittest.TestCase): def test_broken_connect(self): self.assertRaises( DummyDispatcherBroken, BrokenDummyServer, - (support.HOST, 0), ('b', 0)) + (support.HOST, 0), ('b', 0), decode_data=True) def test_server_accept(self): self.server.handle_accept() @@ -247,6 +369,12 @@ class SMTPDChannelTest(unittest.TestCase): self.assertEqual(self.channel.socket.last, b'500 Error: line too long\r\n') + def test_MAIL_command_rejects_SMTPUTF8_by_default(self): + self.write_line(b'EHLO example') + self.write_line( + b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8') + self.assertEqual(self.channel.socket.last[0:1], b'5') + def test_data_longer_than_default_data_size_limit(self): # Hack the default so we don't have to generate so much data. self.channel.data_size_limit = 1048 @@ -420,7 +548,10 @@ class SMTPDChannelTest(unittest.TestCase): self.write_line(b'data\r\nmore\r\n.') self.assertEqual(self.channel.socket.last, b'250 OK\r\n') self.assertEqual(self.server.messages, - [('peer', 'eggs@example', ['spam@example'], 'data\nmore')]) + [(('peer-address', 'peer-port'), + 'eggs@example', + ['spam@example'], + 'data\nmore')]) def test_DATA_syntax(self): self.write_line(b'HELO example') @@ -450,7 +581,10 @@ class SMTPDChannelTest(unittest.TestCase): self.write_line(b'DATA') self.write_line(b'data\r\n.') self.assertEqual(self.server.messages, - [('peer', 'eggs@example', ['spam@example','ham@example'], 'data')]) + [(('peer-address', 'peer-port'), + 'eggs@example', + ['spam@example','ham@example'], + 'data')]) def test_manual_status(self): # checks that the Channel is able to return a custom status message @@ -472,7 +606,10 @@ class SMTPDChannelTest(unittest.TestCase): self.write_line(b'DATA') self.write_line(b'data\r\n.') self.assertEqual(self.server.messages, - [('peer', 'foo@example', ['eggs@example'], 'data')]) + [(('peer-address', 'peer-port'), + 'foo@example', + ['eggs@example'], + 'data')]) def test_HELO_RSET(self): self.write_line(b'HELO example') @@ -536,7 +673,8 @@ class SMTPDChannelTest(unittest.TestCase): self.channel._SMTPChannel__addr = 'spam' def test_decode_data_default_warning(self): - server = DummyServer((support.HOST, 0), ('b', 0)) + with self.assertWarns(DeprecationWarning): + server = DummyServer((support.HOST, 0), ('b', 0)) conn, addr = self.server.accept() with self.assertWarns(DeprecationWarning): smtpd.SMTPChannel(server, conn, addr) @@ -547,7 +685,8 @@ class SMTPDChannelIPv6Test(SMTPDChannelTest): smtpd.socket = asyncore.socket = mock_socket self.old_debugstream = smtpd.DEBUGSTREAM self.debug = smtpd.DEBUGSTREAM = io.StringIO() - self.server = DummyServer((support.HOSTv6, 0), ('b', 0)) + self.server = DummyServer((support.HOSTv6, 0), ('b', 0), + decode_data=True) conn, addr = self.server.accept() self.channel = smtpd.SMTPChannel(self.server, conn, addr, decode_data=True) @@ -558,7 +697,8 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): smtpd.socket = asyncore.socket = mock_socket self.old_debugstream = smtpd.DEBUGSTREAM self.debug = smtpd.DEBUGSTREAM = io.StringIO() - self.server = DummyServer((support.HOST, 0), ('b', 0)) + self.server = DummyServer((support.HOST, 0), ('b', 0), + decode_data=True) conn, addr = self.server.accept() # Set DATA size limit to 32 bytes for easy testing self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32, @@ -586,7 +726,10 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): self.write_line(b'data\r\nmore\r\n.') self.assertEqual(self.channel.socket.last, b'250 OK\r\n') self.assertEqual(self.server.messages, - [('peer', 'eggs@example', ['spam@example'], 'data\nmore')]) + [(('peer-address', 'peer-port'), + 'eggs@example', + ['spam@example'], + 'data\nmore')]) def test_data_limit_dialog_too_much_data(self): self.write_line(b'HELO example') @@ -692,5 +835,92 @@ class SMTPDChannelWithDecodeDataTrue(unittest.TestCase): 'utf8 enriched text: żźć\nand some plain ascii') +class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase): + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer((support.HOST, 0), ('b', 0), + enable_SMTPUTF8=True) + conn, addr = self.server.accept() + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + enable_SMTPUTF8=True) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, line): + self.channel.socket.queue_recv(line) + self.channel.handle_read() + + def test_MAIL_command_accepts_SMTPUTF8_when_announced(self): + self.write_line(b'EHLO example') + self.write_line( + 'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode( + 'utf-8') + ) + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_process_smtputf8_message(self): + self.write_line(b'EHLO example') + for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']: + self.write_line(b'MAIL from: <a@example> ' + mail_parameters) + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'rcpt to:<b@example.com>') + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'data') + self.assertEqual(self.channel.socket.last[0:3], b'354') + self.write_line(b'c\r\n.') + if mail_parameters == b'': + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + else: + self.assertEqual(self.channel.socket.last, + b'250 SMTPUTF8 message okish\r\n') + + def test_utf8_data(self): + self.write_line(b'EHLO example') + self.write_line( + 'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8')) + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line('RCPT To:späm@examplé'.encode('utf-8')) + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'DATA') + self.assertEqual(self.channel.socket.last[0:3], b'354') + self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') + self.write_line(b'.') + self.assertEqual( + self.channel.received_data, + b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') + + def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self): + self.write_line(b'ehlo example') + fill_len = (512 + 26 + 10) - len('mail from:<@example>') + self.write_line(b'MAIL from:<' + + b'a' * (fill_len + 1) + + b'@example>') + self.assertEqual(self.channel.socket.last, + b'500 Error: line too long\r\n') + self.write_line(b'MAIL from:<' + + b'a' * fill_len + + b'@example>') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_multiple_emails_with_extended_command_length(self): + self.write_line(b'ehlo example') + fill_len = (512 + 26 + 10) - len('mail from:<@example>') + for char in [b'a', b'b', b'c']: + self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>') + self.assertEqual(self.channel.socket.last[0:3], b'500') + self.write_line(b'MAIL from:<' + char * fill_len + b'@example>') + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'rcpt to:<hans@example.com>') + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'data') + self.assertEqual(self.channel.socket.last[0:3], b'354') + self.write_line(b'test\r\n.') + self.assertEqual(self.channel.socket.last[0:3], b'250') + if __name__ == "__main__": unittest.main() |