diff options
author | R David Murray <rdmurray@bitdance.com> | 2012-05-26 18:33:59 (GMT) |
---|---|---|
committer | R David Murray <rdmurray@bitdance.com> | 2012-05-26 18:33:59 (GMT) |
commit | d1a30c939cc6378423dd3cc22382a9abe2a7d882 (patch) | |
tree | d083d5bab633c09a9e9857987f8d2b960a00c396 /Lib/test | |
parent | 032eed3c4a42ca29de2c07fba2e0555eaff1700c (diff) | |
download | cpython-d1a30c939cc6378423dd3cc22382a9abe2a7d882.zip cpython-d1a30c939cc6378423dd3cc22382a9abe2a7d882.tar.gz cpython-d1a30c939cc6378423dd3cc22382a9abe2a7d882.tar.bz2 |
#8739: upgrade smtpd to RFC 5321 and 1870.
smtpd now handles EHLO and has infrastructure for extended smtp command mode.
The SIZE extension is also implemented. In order to support parameters on
MAIL FROM, the RFC 5322 parser from the email package is used to parse the
address "token".
Logging subclasses things and overrides __init__, so it was necessary to
update those __init__ functions in the logging tests to make the logging tests
pass.
The original suggestion and patch were by Alberto Trevino. Juhana Jauhiainen
added the --size argument and SIZE parameter support. Michele OrrĂ¹ improved
the patch and added more tests. Dan Boswell conditionalized various bits of
code on whether or not we are in HELO or EHLO mode, as well as some other
improvements and tests. I finalized the patch and added the address parsing.
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/test_logging.py | 3 | ||||
-rw-r--r-- | Lib/test/test_smtpd.py | 280 | ||||
-rw-r--r-- | Lib/test/test_smtplib.py | 19 |
3 files changed, 266 insertions, 36 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index 3adeaec..26baf11 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -663,6 +663,7 @@ if threading: self.smtp_server = server self.conn = conn self.addr = addr + self.data_size_limit = None self.received_lines = [] self.smtp_state = self.COMMAND self.seen_greeting = '' @@ -682,6 +683,7 @@ if threading: return self.push('220 %s %s' % (self.fqdn, smtpd.__version__)) self.set_terminator(b'\r\n') + self.extended_smtp = False class TestSMTPServer(smtpd.SMTPServer): @@ -709,6 +711,7 @@ if threading: def __init__(self, addr, handler, poll_interval, sockmap): self._localaddr = addr self._remoteaddr = None + self.data_size_limit = None self.sockmap = sockmap asyncore.dispatcher.__init__(self, map=sockmap) try: diff --git a/Lib/test/test_smtpd.py b/Lib/test/test_smtpd.py index a7dc5f6..dda1941 100644 --- a/Lib/test/test_smtpd.py +++ b/Lib/test/test_smtpd.py @@ -1,4 +1,4 @@ -from unittest import TestCase +import unittest from test import support, mock_socket import socket import io @@ -26,7 +26,7 @@ class BrokenDummyServer(DummyServer): raise DummyDispatcherBroken() -class SMTPDServerTest(TestCase): +class SMTPDServerTest(unittest.TestCase): def setUp(self): smtpd.socket = asyncore.socket = mock_socket @@ -39,7 +39,7 @@ class SMTPDServerTest(TestCase): channel.socket.queue_recv(line) channel.handle_read() - write_line(b'HELO test.example') + write_line(b'HELO example') write_line(b'MAIL From:eggs@example') write_line(b'RCPT To:spam@example') write_line(b'DATA') @@ -50,7 +50,7 @@ class SMTPDServerTest(TestCase): asyncore.socket = smtpd.socket = socket -class SMTPDChannelTest(TestCase): +class SMTPDChannelTest(unittest.TestCase): def setUp(self): smtpd.socket = asyncore.socket = mock_socket self.old_debugstream = smtpd.DEBUGSTREAM @@ -79,36 +79,94 @@ class SMTPDChannelTest(TestCase): self.assertEqual(self.channel.socket.last, b'500 Error: bad syntax\r\n') - def test_EHLO_not_implemented(self): - self.write_line(b'EHLO test.example') + def test_EHLO(self): + self.write_line(b'EHLO example') + self.assertEqual(self.channel.socket.last, b'250 HELP\r\n') + + def test_EHLO_bad_syntax(self): + self.write_line(b'EHLO') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: EHLO hostname\r\n') + + def test_EHLO_duplicate(self): + self.write_line(b'EHLO example') + self.write_line(b'EHLO example') + self.assertEqual(self.channel.socket.last, + b'503 Duplicate HELO/EHLO\r\n') + + def test_EHLO_HELO_duplicate(self): + self.write_line(b'EHLO example') + self.write_line(b'HELO example') self.assertEqual(self.channel.socket.last, - b'502 Error: command "EHLO" not implemented\r\n') + b'503 Duplicate HELO/EHLO\r\n') def test_HELO(self): name = smtpd.socket.getfqdn() - self.write_line(b'HELO test.example') + self.write_line(b'HELO example') self.assertEqual(self.channel.socket.last, '250 {}\r\n'.format(name).encode('ascii')) + def test_HELO_EHLO_duplicate(self): + self.write_line(b'HELO example') + self.write_line(b'EHLO example') + self.assertEqual(self.channel.socket.last, + b'503 Duplicate HELO/EHLO\r\n') + + def test_HELP(self): + self.write_line(b'HELP') + self.assertEqual(self.channel.socket.last, + b'250 Supported commands: EHLO HELO MAIL RCPT ' + \ + b'DATA RSET NOOP QUIT VRFY\r\n') + + def test_HELP_command(self): + self.write_line(b'HELP MAIL') + self.assertEqual(self.channel.socket.last, + b'250 Syntax: MAIL FROM: <address>\r\n') + + def test_HELP_command_unknown(self): + self.write_line(b'HELP SPAM') + self.assertEqual(self.channel.socket.last, + b'501 Supported commands: EHLO HELO MAIL RCPT ' + \ + b'DATA RSET NOOP QUIT VRFY\r\n') + def test_HELO_bad_syntax(self): self.write_line(b'HELO') self.assertEqual(self.channel.socket.last, b'501 Syntax: HELO hostname\r\n') def test_HELO_duplicate(self): - self.write_line(b'HELO test.example') - self.write_line(b'HELO test.example') + self.write_line(b'HELO example') + self.write_line(b'HELO example') self.assertEqual(self.channel.socket.last, b'503 Duplicate HELO/EHLO\r\n') + def test_HELO_parameter_rejected_when_extensions_not_enabled(self): + self.extended_smtp = False + self.write_line(b'HELO example') + self.write_line(b'MAIL from:<foo@example.com> SIZE=1234') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: MAIL FROM: <address>\r\n') + + def test_MAIL_allows_space_after_colon(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL from: <foo@example.com>') + self.assertEqual(self.channel.socket.last, + b'250 OK\r\n') + + def test_extended_MAIL_allows_space_after_colon(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from: <foo@example.com> size=20') + self.assertEqual(self.channel.socket.last, + b'250 OK\r\n') + def test_NOOP(self): self.write_line(b'NOOP') - self.assertEqual(self.channel.socket.last, b'250 Ok\r\n') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') def test_HELO_NOOP(self): self.write_line(b'HELO example') self.write_line(b'NOOP') - self.assertEqual(self.channel.socket.last, b'250 Ok\r\n') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') def test_NOOP_bad_syntax(self): self.write_line(b'NOOP hi') @@ -136,15 +194,29 @@ class SMTPDChannelTest(TestCase): def test_command_too_long(self): self.write_line(b'HELO example') - self.write_line(b'MAIL from ' + + self.write_line(b'MAIL from: ' + b'a' * self.channel.command_size_limit + b'@example') self.assertEqual(self.channel.socket.last, b'500 Error: line too long\r\n') - def test_data_too_long(self): - # Small hack. Setting limit to 2K octets here will save us some time. - self.channel.data_size_limit = 2048 + def test_MAIL_command_limit_extended_with_SIZE(self): + self.write_line(b'EHLO example') + fill_len = self.channel.command_size_limit - len('MAIL from:<@example>') + self.write_line(b'MAIL from:<' + + b'a' * fill_len + + b'@example> SIZE=1234') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + self.write_line(b'MAIL from:<' + + b'a' * (fill_len + 26) + + b'@example> SIZE=1234') + self.assertEqual(self.channel.socket.last, + b'500 Error: line too long\r\n') + + def test_data_longer_than_default_data_size_limit(self): + # Hack the default so we don't have to generate so much data. + self.channel.data_size_limit = 1048 self.write_line(b'HELO example') self.write_line(b'MAIL From:eggs@example') self.write_line(b'RCPT To:spam@example') @@ -154,28 +226,93 @@ class SMTPDChannelTest(TestCase): self.assertEqual(self.channel.socket.last, b'552 Error: Too much mail data\r\n') + def test_MAIL_size_parameter(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL FROM:<eggs@example> SIZE=512') + self.assertEqual(self.channel.socket.last, + b'250 OK\r\n') + + def test_MAIL_invalid_size_parameter(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') + + def test_MAIL_RCPT_unknown_parameters(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL FROM:<eggs@example> ham=green') + self.assertEqual(self.channel.socket.last, + b'555 MAIL FROM parameters not recognized or not implemented\r\n') + + self.write_line(b'MAIL FROM:<eggs@example>') + self.write_line(b'RCPT TO:<eggs@example> ham=green') + self.assertEqual(self.channel.socket.last, + b'555 RCPT TO parameters not recognized or not implemented\r\n') + + def test_MAIL_size_parameter_larger_than_default_data_size_limit(self): + self.channel.data_size_limit = 1048 + self.write_line(b'EHLO example') + self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096') + self.assertEqual(self.channel.socket.last, + b'552 Error: message size exceeds fixed maximum message size\r\n') + def test_need_MAIL(self): self.write_line(b'HELO example') self.write_line(b'RCPT to:spam@example') self.assertEqual(self.channel.socket.last, b'503 Error: need MAIL command\r\n') - def test_MAIL_syntax(self): + def test_MAIL_syntax_HELO(self): self.write_line(b'HELO example') self.write_line(b'MAIL from eggs@example') self.assertEqual(self.channel.socket.last, - b'501 Syntax: MAIL FROM:<address>\r\n') + b'501 Syntax: MAIL FROM: <address>\r\n') - def test_MAIL_missing_from(self): + def test_MAIL_syntax_EHLO(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from eggs@example') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') + + def test_MAIL_missing_address(self): self.write_line(b'HELO example') self.write_line(b'MAIL from:') self.assertEqual(self.channel.socket.last, - b'501 Syntax: MAIL FROM:<address>\r\n') + b'501 Syntax: MAIL FROM: <address>\r\n') def test_MAIL_chevrons(self): self.write_line(b'HELO example') self.write_line(b'MAIL from:<eggs@example>') - self.assertEqual(self.channel.socket.last, b'250 Ok\r\n') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_MAIL_empty_chevrons(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from:<>') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_MAIL_quoted_localpart(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from: <"Fred Blogs"@example.com>') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') + + def test_MAIL_quoted_localpart_no_angles(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from: "Fred Blogs"@example.com') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') + + def test_MAIL_quoted_localpart_with_size(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') + + def test_MAIL_quoted_localpart_with_size_no_angles(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') def test_nested_MAIL(self): self.write_line(b'HELO example') @@ -184,6 +321,22 @@ class SMTPDChannelTest(TestCase): self.assertEqual(self.channel.socket.last, b'503 Error: nested MAIL command\r\n') + def test_VRFY(self): + self.write_line(b'VRFY eggs@example') + self.assertEqual(self.channel.socket.last, + b'252 Cannot VRFY user, but will accept message and attempt ' + \ + b'delivery\r\n') + + def test_VRFY_syntax(self): + self.write_line(b'VRFY') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: VRFY <address>\r\n') + + def test_EXPN_not_implemented(self): + self.write_line(b'EXPN') + self.assertEqual(self.channel.socket.last, + b'502 EXPN not implemented\r\n') + def test_no_HELO_MAIL(self): self.write_line(b'MAIL from:<foo@example.com>') self.assertEqual(self.channel.socket.last, @@ -196,13 +349,26 @@ class SMTPDChannelTest(TestCase): self.assertEqual(self.channel.socket.last, b'503 Error: need RCPT command\r\n') - def test_RCPT_syntax(self): + def test_RCPT_syntax_HELO(self): self.write_line(b'HELO example') - self.write_line(b'MAIL From:eggs@example') + self.write_line(b'MAIL From: eggs@example') self.write_line(b'RCPT to eggs@example') self.assertEqual(self.channel.socket.last, b'501 Syntax: RCPT TO: <address>\r\n') + def test_RCPT_syntax_EHLO(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL From: eggs@example') + self.write_line(b'RCPT to eggs@example') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n') + + def test_RCPT_lowercase_to_OK(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From: eggs@example') + self.write_line(b'RCPT to: <eggs@example>') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + def test_no_HELO_RCPT(self): self.write_line(b'RCPT to eggs@example') self.assertEqual(self.channel.socket.last, @@ -211,15 +377,15 @@ class SMTPDChannelTest(TestCase): def test_data_dialog(self): self.write_line(b'HELO example') self.write_line(b'MAIL From:eggs@example') - self.assertEqual(self.channel.socket.last, b'250 Ok\r\n') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') self.write_line(b'RCPT To:spam@example') - self.assertEqual(self.channel.socket.last, b'250 Ok\r\n') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') self.write_line(b'DATA') self.assertEqual(self.channel.socket.last, b'354 End data with <CR><LF>.<CR><LF>\r\n') self.write_line(b'data\r\nmore\r\n.') - self.assertEqual(self.channel.socket.last, b'250 Ok\r\n') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') self.assertEqual(self.server.messages, [('peer', 'eggs@example', ['spam@example'], 'data\nmore')]) @@ -267,7 +433,7 @@ class SMTPDChannelTest(TestCase): self.write_line(b'MAIL From:eggs@example') self.write_line(b'RCPT To:spam@example') self.write_line(b'RSET') - self.assertEqual(self.channel.socket.last, b'250 Ok\r\n') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') self.write_line(b'MAIL From:foo@example') self.write_line(b'RCPT To:eggs@example') self.write_line(b'DATA') @@ -278,12 +444,18 @@ class SMTPDChannelTest(TestCase): def test_HELO_RSET(self): self.write_line(b'HELO example') self.write_line(b'RSET') - self.assertEqual(self.channel.socket.last, b'250 Ok\r\n') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') def test_RSET_syntax(self): self.write_line(b'RSET hi') self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n') + def test_unknown_command(self): + self.write_line(b'UNKNOWN_CMD') + self.assertEqual(self.channel.socket.last, + b'500 Error: command "UNKNOWN_CMD" not ' + \ + b'recognized\r\n') + def test_attribute_deprecations(self): with support.check_warnings(('', DeprecationWarning)): spam = self.channel._SMTPChannel__server @@ -330,8 +502,54 @@ class SMTPDChannelTest(TestCase): with support.check_warnings(('', DeprecationWarning)): self.channel._SMTPChannel__addr = 'spam' -def test_main(): - support.run_unittest(SMTPDServerTest, SMTPDChannelTest) + +class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer('a', 'b') + conn, addr = self.server.accept() + # Set DATA size limit to 32 bytes for easy testing + self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + + def write_line(self, line): + self.channel.socket.queue_recv(line) + self.channel.handle_read() + + def test_data_limit_dialog(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.write_line(b'RCPT To:spam@example') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + self.write_line(b'DATA') + self.assertEqual(self.channel.socket.last, + b'354 End data with <CR><LF>.<CR><LF>\r\n') + self.write_line(b'data\r\nmore\r\n.') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.assertEqual(self.server.messages, + [('peer', 'eggs@example', ['spam@example'], 'data\nmore')]) + + def test_data_limit_dialog_too_much_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.write_line(b'RCPT To:spam@example') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + self.write_line(b'DATA') + self.assertEqual(self.channel.socket.last, + b'354 End data with <CR><LF>.<CR><LF>\r\n') + self.write_line(b'This message is longer than 32 bytes\r\n.') + self.assertEqual(self.channel.socket.last, + b'552 Error: Too much mail data\r\n') + if __name__ == "__main__": - test_main() + unittest.main() diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py index 18dde2f..befc49e 100644 --- a/Lib/test/test_smtplib.py +++ b/Lib/test/test_smtplib.py @@ -229,13 +229,13 @@ class DebuggingServerTests(unittest.TestCase): def testNOOP(self): smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) - expected = (250, b'Ok') + expected = (250, b'OK') self.assertEqual(smtp.noop(), expected) smtp.quit() def testRSET(self): smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) - expected = (250, b'Ok') + expected = (250, b'OK') self.assertEqual(smtp.rset(), expected) smtp.quit() @@ -246,10 +246,18 @@ class DebuggingServerTests(unittest.TestCase): self.assertEqual(smtp.ehlo(), expected) smtp.quit() + def testNotImplemented(self): + # EXPN isn't implemented in DebuggingServer + smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) + expected = (502, b'EXPN not implemented') + smtp.putcmd('EXPN') + self.assertEqual(smtp.getreply(), expected) + smtp.quit() + def testVRFY(self): - # VRFY isn't implemented in DebuggingServer smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) - expected = (502, b'Error: command "VRFY" not implemented') + expected = (252, b'Cannot VRFY user, but will accept message ' + \ + b'and attempt delivery') self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) smtp.quit() @@ -265,7 +273,8 @@ class DebuggingServerTests(unittest.TestCase): def testHELP(self): smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) - self.assertEqual(smtp.help(), b'Error: command "HELP" not implemented') + self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ + b'RCPT DATA RSET NOOP QUIT VRFY') smtp.quit() def testSend(self): |