summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_smtpd.py
diff options
context:
space:
mode:
authorR David Murray <rdmurray@bitdance.com>2015-05-11 16:11:40 (GMT)
committerR David Murray <rdmurray@bitdance.com>2015-05-11 16:11:40 (GMT)
commita33df31629f2f6ed85890baa9b4e71c30efa95a9 (patch)
tree00602e680a45fbb39ac9127995764b92ee096488 /Lib/test/test_smtpd.py
parent0d905d4fcdb39119f763afd4036cfaea78a2ae5b (diff)
downloadcpython-a33df31629f2f6ed85890baa9b4e71c30efa95a9.zip
cpython-a33df31629f2f6ed85890baa9b4e71c30efa95a9.tar.gz
cpython-a33df31629f2f6ed85890baa9b4e71c30efa95a9.tar.bz2
#21795: advertise 8BITMIME if decode_data is False.
Patch by Milan Oberkirch, with a few updates. This changeset also tweaks the smtpd and whatsnew docs for smtpd into what should be the final form for the 3.5 release.
Diffstat (limited to 'Lib/test/test_smtpd.py')
-rw-r--r--Lib/test/test_smtpd.py129
1 files changed, 108 insertions, 21 deletions
diff --git a/Lib/test/test_smtpd.py b/Lib/test/test_smtpd.py
index 6eb47f1..1aa55d2 100644
--- a/Lib/test/test_smtpd.py
+++ b/Lib/test/test_smtpd.py
@@ -16,13 +16,12 @@ class DummyServer(smtpd.SMTPServer):
else:
self.return_status = b'return status'
- def process_message(self, peer, mailfrom, rcpttos, data):
+ def process_message(self, peer, mailfrom, rcpttos, data, **kw):
self.messages.append((peer, mailfrom, rcpttos, data))
if data == self.return_status:
return '250 Okish'
-
- def process_smtputf8_message(self, *args, **kwargs):
- return '250 SMTPUTF8 message okish'
+ if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']:
+ return '250 SMTPUTF8 message okish'
class DummyDispatcherBroken(Exception):
@@ -54,22 +53,6 @@ class SMTPDServerTest(unittest.TestCase):
write_line(b'DATA')
self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
- def test_process_smtputf8_message_unimplemented(self):
- server = smtpd.SMTPServer((support.HOST, 0), ('b', 0),
- enable_SMTPUTF8=True)
- conn, addr = server.accept()
- channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
-
- def write_line(line):
- channel.socket.queue_recv(line)
- channel.handle_read()
-
- write_line(b'EHLO example')
- write_line(b'MAIL From: <eggs@example> BODY=8BITMIME SMTPUTF8')
- write_line(b'RCPT To: <spam@example>')
- write_line(b'DATA')
- self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
-
def test_decode_data_default_warns(self):
with self.assertWarns(DeprecationWarning):
smtpd.SMTPServer((support.HOST, 0), ('b', 0))
@@ -168,7 +151,8 @@ class DebuggingServerTest(unittest.TestCase):
enable_SMTPUTF8=True)
stdout = s.getvalue()
self.assertEqual(stdout, textwrap.dedent("""\
- ----- SMTPUTF8 MESSAGE FOLLOWS ------
+ ---------- MESSAGE FOLLOWS ----------
+ mail options: ['BODY=8BITMIME', 'SMTPUTF8']
b'From: test'
b'X-Peer: peer-address'
b''
@@ -201,6 +185,109 @@ class TestFamilyDetection(unittest.TestCase):
self.assertEqual(server.socket.family, socket.AF_INET)
+class TestRcptOptionParsing(unittest.TestCase):
+ error_response = (b'555 RCPT TO parameters not recognized or not '
+ b'implemented\r\n')
+
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+ self.old_debugstream = smtpd.DEBUGSTREAM
+ self.debug = smtpd.DEBUGSTREAM = io.StringIO()
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+ smtpd.DEBUGSTREAM = self.old_debugstream
+
+ def write_line(self, channel, line):
+ channel.socket.queue_recv(line)
+ channel.handle_read()
+
+ def test_params_rejected(self):
+ server = DummyServer((support.HOST, 0), ('b', 0), decode_data=False)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False)
+ self.write_line(channel, b'EHLO example')
+ self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
+ self.write_line(channel, b'RCPT to: <foo@example.com> foo=bar')
+ self.assertEqual(channel.socket.last, self.error_response)
+
+ def test_nothing_accepted(self):
+ server = DummyServer((support.HOST, 0), ('b', 0), decode_data=False)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False)
+ self.write_line(channel, b'EHLO example')
+ self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
+ self.write_line(channel, b'RCPT to: <foo@example.com>')
+ self.assertEqual(channel.socket.last, b'250 OK\r\n')
+
+
+class TestMailOptionParsing(unittest.TestCase):
+ error_response = (b'555 MAIL FROM parameters not recognized or not '
+ b'implemented\r\n')
+
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+ self.old_debugstream = smtpd.DEBUGSTREAM
+ self.debug = smtpd.DEBUGSTREAM = io.StringIO()
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+ smtpd.DEBUGSTREAM = self.old_debugstream
+
+ def write_line(self, channel, line):
+ channel.socket.queue_recv(line)
+ channel.handle_read()
+
+ def test_with_decode_data_true(self):
+ server = DummyServer((support.HOST, 0), ('b', 0), decode_data=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
+ self.write_line(channel, b'EHLO example')
+ for line in [
+ b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
+ b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
+ b'MAIL from: <foo@example.com> size=20 BODY=UNKNOWN',
+ b'MAIL from: <foo@example.com> size=20 body=8bitmime',
+ ]:
+ self.write_line(channel, line)
+ self.assertEqual(channel.socket.last, self.error_response)
+ self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
+ self.assertEqual(channel.socket.last, b'250 OK\r\n')
+
+ def test_with_decode_data_false(self):
+ server = DummyServer((support.HOST, 0), ('b', 0), decode_data=False)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, decode_data=False)
+ self.write_line(channel, b'EHLO example')
+ for line in [
+ b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
+ b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
+ ]:
+ self.write_line(channel, line)
+ self.assertEqual(channel.socket.last, self.error_response)
+ self.write_line(
+ channel,
+ b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN')
+ self.assertEqual(
+ channel.socket.last,
+ b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n')
+ self.write_line(
+ channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime')
+ self.assertEqual(channel.socket.last, b'250 OK\r\n')
+
+ def test_with_enable_smtputf8_true(self):
+ server = DummyServer((support.HOST, 0), ('b', 0), enable_SMTPUTF8=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
+ self.write_line(channel, b'EHLO example')
+ self.write_line(
+ channel,
+ b'MAIL from: <foo@example.com> size=20 body=8bitmime smtputf8')
+ self.assertEqual(channel.socket.last, b'250 OK\r\n')
+
+
class SMTPDChannelTest(unittest.TestCase):
def setUp(self):
smtpd.socket = asyncore.socket = mock_socket