diff options
Diffstat (limited to 'Lib/test/test_smtpd.py')
-rw-r--r-- | Lib/test/test_smtpd.py | 118 |
1 files changed, 111 insertions, 7 deletions
diff --git a/Lib/test/test_smtpd.py b/Lib/test/test_smtpd.py index 93f14c4..db1f52b 100644 --- a/Lib/test/test_smtpd.py +++ b/Lib/test/test_smtpd.py @@ -7,13 +7,18 @@ import asyncore class DummyServer(smtpd.SMTPServer): - def __init__(self, localaddr, remoteaddr): - smtpd.SMTPServer.__init__(self, localaddr, remoteaddr) + def __init__(self, localaddr, remoteaddr, decode_data=True): + smtpd.SMTPServer.__init__(self, localaddr, remoteaddr, + decode_data=decode_data) self.messages = [] + if decode_data: + self.return_status = 'return status' + else: + self.return_status = b'return status' def process_message(self, peer, mailfrom, rcpttos, data): self.messages.append((peer, mailfrom, rcpttos, data)) - if data == 'return status': + if data == self.return_status: return '250 Okish' @@ -31,9 +36,9 @@ class SMTPDServerTest(unittest.TestCase): smtpd.socket = asyncore.socket = mock_socket def test_process_message_unimplemented(self): - server = smtpd.SMTPServer('a', 'b') + server = smtpd.SMTPServer('a', 'b', decode_data=True) conn, addr = server.accept() - channel = smtpd.SMTPChannel(server, conn, addr) + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) def write_line(line): channel.socket.queue_recv(line) @@ -45,6 +50,10 @@ class SMTPDServerTest(unittest.TestCase): write_line(b'DATA') self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') + def test_decode_data_default_warns(self): + with self.assertWarns(DeprecationWarning): + smtpd.SMTPServer('a', 'b') + def tearDown(self): asyncore.close_all() asyncore.socket = smtpd.socket = socket @@ -57,7 +66,8 @@ class SMTPDChannelTest(unittest.TestCase): self.debug = smtpd.DEBUGSTREAM = io.StringIO() self.server = DummyServer('a', 'b') conn, addr = self.server.accept() - self.channel = smtpd.SMTPChannel(self.server, conn, addr) + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + decode_data=True) def tearDown(self): asyncore.close_all() @@ -502,6 +512,12 @@ class SMTPDChannelTest(unittest.TestCase): with support.check_warnings(('', DeprecationWarning)): self.channel._SMTPChannel__addr = 'spam' + def test_decode_data_default_warning(self): + server = DummyServer('a', 'b') + conn, addr = self.server.accept() + with self.assertWarns(DeprecationWarning): + smtpd.SMTPChannel(server, conn, addr) + class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): @@ -512,7 +528,8 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): self.server = DummyServer('a', 'b') conn, addr = self.server.accept() # Set DATA size limit to 32 bytes for easy testing - self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32) + self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32, + decode_data=True) def tearDown(self): asyncore.close_all() @@ -553,5 +570,92 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): b'552 Error: Too much mail data\r\n') +class SMTPDChannelWithDecodeDataFalse(unittest.TestCase): + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer('a', 'b', decode_data=False) + conn, addr = self.server.accept() + # Set decode_data to False + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + decode_data=False) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, line): + self.channel.socket.queue_recv(line) + self.channel.handle_read() + + def test_ascii_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'plain ascii text') + self.write_line(b'.') + self.assertEqual(self.channel.received_data, b'plain ascii text') + + def test_utf8_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') + self.write_line(b'and some plain ascii') + self.write_line(b'.') + self.assertEqual( + self.channel.received_data, + b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n' + b'and some plain ascii') + + +class SMTPDChannelWithDecodeDataTrue(unittest.TestCase): + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer('a', 'b') + conn, addr = self.server.accept() + # Set decode_data to True + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + decode_data=True) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, line): + self.channel.socket.queue_recv(line) + self.channel.handle_read() + + def test_ascii_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'plain ascii text') + self.write_line(b'.') + self.assertEqual(self.channel.received_data, 'plain ascii text') + + def test_utf8_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') + self.write_line(b'and some plain ascii') + self.write_line(b'.') + self.assertEqual( + self.channel.received_data, + 'utf8 enriched text: żźć\nand some plain ascii') + + if __name__ == "__main__": unittest.main() |