summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_smtpd.py
diff options
context:
space:
mode:
authorR David Murray <rdmurray@bitdance.com>2014-08-09 20:40:49 (GMT)
committerR David Murray <rdmurray@bitdance.com>2014-08-09 20:40:49 (GMT)
commit2539e6744b1405404c9e2c02af33381bd349106e (patch)
treec0d7b49152de3788e15df46c9560124c6208b6c9 /Lib/test/test_smtpd.py
parentae04ba1952fe7610fb93cc127b9ac6fb3782cef4 (diff)
downloadcpython-2539e6744b1405404c9e2c02af33381bd349106e.zip
cpython-2539e6744b1405404c9e2c02af33381bd349106e.tar.gz
cpython-2539e6744b1405404c9e2c02af33381bd349106e.tar.bz2
#21725: Add RFC 6531 (SMTPUTF8) support to smtpd.
Patch by Milan Oberkirch, developed as part of his 2014 GSOC project. Note that this also fixes a bug in mock_socket ('getpeername' was returning a simple string instead of the tuple required for IPvX protocols), a bug in DebugServer with respect to handling binary data (should have been fixed when decode_data was introduced, but wasn't found until this patch was written), and a long-standing bug in DebugServer (it was printing an extra blank line at the end of the displayed message text).
Diffstat (limited to 'Lib/test/test_smtpd.py')
-rw-r--r--Lib/test/test_smtpd.py256
1 files changed, 243 insertions, 13 deletions
diff --git a/Lib/test/test_smtpd.py b/Lib/test/test_smtpd.py
index caeb797..6eb47f1 100644
--- a/Lib/test/test_smtpd.py
+++ b/Lib/test/test_smtpd.py
@@ -1,4 +1,5 @@
import unittest
+import textwrap
from test import support, mock_socket
import socket
import io
@@ -7,11 +8,10 @@ import asyncore
class DummyServer(smtpd.SMTPServer):
- def __init__(self, localaddr, remoteaddr, decode_data=True):
- smtpd.SMTPServer.__init__(self, localaddr, remoteaddr,
- decode_data=decode_data)
+ def __init__(self, *args, **kwargs):
+ smtpd.SMTPServer.__init__(self, *args, **kwargs)
self.messages = []
- if decode_data:
+ if self._decode_data:
self.return_status = 'return status'
else:
self.return_status = b'return status'
@@ -21,6 +21,9 @@ class DummyServer(smtpd.SMTPServer):
if data == self.return_status:
return '250 Okish'
+ def process_smtputf8_message(self, *args, **kwargs):
+ return '250 SMTPUTF8 message okish'
+
class DummyDispatcherBroken(Exception):
pass
@@ -51,10 +54,128 @@ class SMTPDServerTest(unittest.TestCase):
write_line(b'DATA')
self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
+ def test_process_smtputf8_message_unimplemented(self):
+ server = smtpd.SMTPServer((support.HOST, 0), ('b', 0),
+ enable_SMTPUTF8=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
+
+ def write_line(line):
+ channel.socket.queue_recv(line)
+ channel.handle_read()
+
+ write_line(b'EHLO example')
+ write_line(b'MAIL From: <eggs@example> BODY=8BITMIME SMTPUTF8')
+ write_line(b'RCPT To: <spam@example>')
+ write_line(b'DATA')
+ self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
+
def test_decode_data_default_warns(self):
with self.assertWarns(DeprecationWarning):
smtpd.SMTPServer((support.HOST, 0), ('b', 0))
+ def test_decode_data_and_enable_SMTPUTF8_raises(self):
+ self.assertRaises(
+ ValueError,
+ smtpd.SMTPServer,
+ (support.HOST, 0),
+ ('b', 0),
+ enable_SMTPUTF8=True,
+ decode_data=True)
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+
+
+class DebuggingServerTest(unittest.TestCase):
+
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+
+ def send_data(self, channel, data, enable_SMTPUTF8=False):
+ def write_line(line):
+ channel.socket.queue_recv(line)
+ channel.handle_read()
+ write_line(b'EHLO example')
+ if enable_SMTPUTF8:
+ write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8')
+ else:
+ write_line(b'MAIL From:eggs@example')
+ write_line(b'RCPT To:spam@example')
+ write_line(b'DATA')
+ write_line(data)
+ write_line(b'.')
+
+ def test_process_message_with_decode_data_true(self):
+ server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
+ decode_data=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
+ with support.captured_stdout() as s:
+ self.send_data(channel, b'From: test\n\nhello\n')
+ stdout = s.getvalue()
+ self.assertEqual(stdout, textwrap.dedent("""\
+ ---------- MESSAGE FOLLOWS ----------
+ From: test
+ X-Peer: peer-address
+
+ hello
+ ------------ END MESSAGE ------------
+ """))
+
+ def test_process_message_with_decode_data_false(self):
+ server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
+ decode_data=False)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False)
+ with support.captured_stdout() as s:
+ self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
+ stdout = s.getvalue()
+ self.assertEqual(stdout, textwrap.dedent("""\
+ ---------- MESSAGE FOLLOWS ----------
+ b'From: test'
+ b'X-Peer: peer-address'
+ b''
+ b'h\\xc3\\xa9llo\\xff'
+ ------------ END MESSAGE ------------
+ """))
+
+ def test_process_message_with_enable_SMTPUTF8_true(self):
+ server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
+ enable_SMTPUTF8=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
+ with support.captured_stdout() as s:
+ self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
+ stdout = s.getvalue()
+ self.assertEqual(stdout, textwrap.dedent("""\
+ ---------- MESSAGE FOLLOWS ----------
+ b'From: test'
+ b'X-Peer: peer-address'
+ b''
+ b'h\\xc3\\xa9llo\\xff'
+ ------------ END MESSAGE ------------
+ """))
+
+ def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
+ server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0),
+ enable_SMTPUTF8=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
+ with support.captured_stdout() as s:
+ self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n',
+ enable_SMTPUTF8=True)
+ stdout = s.getvalue()
+ self.assertEqual(stdout, textwrap.dedent("""\
+ ----- SMTPUTF8 MESSAGE FOLLOWS ------
+ b'From: test'
+ b'X-Peer: peer-address'
+ b''
+ b'h\\xc3\\xa9llo\\xff'
+ ------------ END MESSAGE ------------
+ """))
+
def tearDown(self):
asyncore.close_all()
asyncore.socket = smtpd.socket = socket
@@ -85,7 +206,8 @@ class SMTPDChannelTest(unittest.TestCase):
smtpd.socket = asyncore.socket = mock_socket
self.old_debugstream = smtpd.DEBUGSTREAM
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
- self.server = DummyServer((support.HOST, 0), ('b', 0))
+ self.server = DummyServer((support.HOST, 0), ('b', 0),
+ decode_data=True)
conn, addr = self.server.accept()
self.channel = smtpd.SMTPChannel(self.server, conn, addr,
decode_data=True)
@@ -102,7 +224,7 @@ class SMTPDChannelTest(unittest.TestCase):
def test_broken_connect(self):
self.assertRaises(
DummyDispatcherBroken, BrokenDummyServer,
- (support.HOST, 0), ('b', 0))
+ (support.HOST, 0), ('b', 0), decode_data=True)
def test_server_accept(self):
self.server.handle_accept()
@@ -247,6 +369,12 @@ class SMTPDChannelTest(unittest.TestCase):
self.assertEqual(self.channel.socket.last,
b'500 Error: line too long\r\n')
+ def test_MAIL_command_rejects_SMTPUTF8_by_default(self):
+ self.write_line(b'EHLO example')
+ self.write_line(
+ b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8')
+ self.assertEqual(self.channel.socket.last[0:1], b'5')
+
def test_data_longer_than_default_data_size_limit(self):
# Hack the default so we don't have to generate so much data.
self.channel.data_size_limit = 1048
@@ -420,7 +548,10 @@ class SMTPDChannelTest(unittest.TestCase):
self.write_line(b'data\r\nmore\r\n.')
self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
self.assertEqual(self.server.messages,
- [('peer', 'eggs@example', ['spam@example'], 'data\nmore')])
+ [(('peer-address', 'peer-port'),
+ 'eggs@example',
+ ['spam@example'],
+ 'data\nmore')])
def test_DATA_syntax(self):
self.write_line(b'HELO example')
@@ -450,7 +581,10 @@ class SMTPDChannelTest(unittest.TestCase):
self.write_line(b'DATA')
self.write_line(b'data\r\n.')
self.assertEqual(self.server.messages,
- [('peer', 'eggs@example', ['spam@example','ham@example'], 'data')])
+ [(('peer-address', 'peer-port'),
+ 'eggs@example',
+ ['spam@example','ham@example'],
+ 'data')])
def test_manual_status(self):
# checks that the Channel is able to return a custom status message
@@ -472,7 +606,10 @@ class SMTPDChannelTest(unittest.TestCase):
self.write_line(b'DATA')
self.write_line(b'data\r\n.')
self.assertEqual(self.server.messages,
- [('peer', 'foo@example', ['eggs@example'], 'data')])
+ [(('peer-address', 'peer-port'),
+ 'foo@example',
+ ['eggs@example'],
+ 'data')])
def test_HELO_RSET(self):
self.write_line(b'HELO example')
@@ -536,7 +673,8 @@ class SMTPDChannelTest(unittest.TestCase):
self.channel._SMTPChannel__addr = 'spam'
def test_decode_data_default_warning(self):
- server = DummyServer((support.HOST, 0), ('b', 0))
+ with self.assertWarns(DeprecationWarning):
+ server = DummyServer((support.HOST, 0), ('b', 0))
conn, addr = self.server.accept()
with self.assertWarns(DeprecationWarning):
smtpd.SMTPChannel(server, conn, addr)
@@ -547,7 +685,8 @@ class SMTPDChannelIPv6Test(SMTPDChannelTest):
smtpd.socket = asyncore.socket = mock_socket
self.old_debugstream = smtpd.DEBUGSTREAM
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
- self.server = DummyServer((support.HOSTv6, 0), ('b', 0))
+ self.server = DummyServer((support.HOSTv6, 0), ('b', 0),
+ decode_data=True)
conn, addr = self.server.accept()
self.channel = smtpd.SMTPChannel(self.server, conn, addr,
decode_data=True)
@@ -558,7 +697,8 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
smtpd.socket = asyncore.socket = mock_socket
self.old_debugstream = smtpd.DEBUGSTREAM
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
- self.server = DummyServer((support.HOST, 0), ('b', 0))
+ self.server = DummyServer((support.HOST, 0), ('b', 0),
+ decode_data=True)
conn, addr = self.server.accept()
# Set DATA size limit to 32 bytes for easy testing
self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32,
@@ -586,7 +726,10 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
self.write_line(b'data\r\nmore\r\n.')
self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
self.assertEqual(self.server.messages,
- [('peer', 'eggs@example', ['spam@example'], 'data\nmore')])
+ [(('peer-address', 'peer-port'),
+ 'eggs@example',
+ ['spam@example'],
+ 'data\nmore')])
def test_data_limit_dialog_too_much_data(self):
self.write_line(b'HELO example')
@@ -692,5 +835,92 @@ class SMTPDChannelWithDecodeDataTrue(unittest.TestCase):
'utf8 enriched text: żźć\nand some plain ascii')
+class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase):
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+ self.old_debugstream = smtpd.DEBUGSTREAM
+ self.debug = smtpd.DEBUGSTREAM = io.StringIO()
+ self.server = DummyServer((support.HOST, 0), ('b', 0),
+ enable_SMTPUTF8=True)
+ conn, addr = self.server.accept()
+ self.channel = smtpd.SMTPChannel(self.server, conn, addr,
+ enable_SMTPUTF8=True)
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+ smtpd.DEBUGSTREAM = self.old_debugstream
+
+ def write_line(self, line):
+ self.channel.socket.queue_recv(line)
+ self.channel.handle_read()
+
+ def test_MAIL_command_accepts_SMTPUTF8_when_announced(self):
+ self.write_line(b'EHLO example')
+ self.write_line(
+ 'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode(
+ 'utf-8')
+ )
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ def test_process_smtputf8_message(self):
+ self.write_line(b'EHLO example')
+ for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']:
+ self.write_line(b'MAIL from: <a@example> ' + mail_parameters)
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'rcpt to:<b@example.com>')
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'data')
+ self.assertEqual(self.channel.socket.last[0:3], b'354')
+ self.write_line(b'c\r\n.')
+ if mail_parameters == b'':
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ else:
+ self.assertEqual(self.channel.socket.last,
+ b'250 SMTPUTF8 message okish\r\n')
+
+ def test_utf8_data(self):
+ self.write_line(b'EHLO example')
+ self.write_line(
+ 'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8'))
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line('RCPT To:späm@examplé'.encode('utf-8'))
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'DATA')
+ self.assertEqual(self.channel.socket.last[0:3], b'354')
+ self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
+ self.write_line(b'.')
+ self.assertEqual(
+ self.channel.received_data,
+ b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
+
+ def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self):
+ self.write_line(b'ehlo example')
+ fill_len = (512 + 26 + 10) - len('mail from:<@example>')
+ self.write_line(b'MAIL from:<' +
+ b'a' * (fill_len + 1) +
+ b'@example>')
+ self.assertEqual(self.channel.socket.last,
+ b'500 Error: line too long\r\n')
+ self.write_line(b'MAIL from:<' +
+ b'a' * fill_len +
+ b'@example>')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ def test_multiple_emails_with_extended_command_length(self):
+ self.write_line(b'ehlo example')
+ fill_len = (512 + 26 + 10) - len('mail from:<@example>')
+ for char in [b'a', b'b', b'c']:
+ self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>')
+ self.assertEqual(self.channel.socket.last[0:3], b'500')
+ self.write_line(b'MAIL from:<' + char * fill_len + b'@example>')
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'rcpt to:<hans@example.com>')
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'data')
+ self.assertEqual(self.channel.socket.last[0:3], b'354')
+ self.write_line(b'test\r\n.')
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+
if __name__ == "__main__":
unittest.main()