diff options
Diffstat (limited to 'Lib/test/test_smtplib.py')
| -rw-r--r-- | Lib/test/test_smtplib.py | 452 |
1 files changed, 396 insertions, 56 deletions
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py index 95a9dbe..28539f3 100644 --- a/Lib/test/test_smtplib.py +++ b/Lib/test/test_smtplib.py @@ -1,6 +1,10 @@ import asyncore +import base64 import email.mime.text +from email.message import EmailMessage +from email.base64mime import body_encode as encode_base64 import email.utils +import hmac import socket import smtpd import smtplib @@ -10,6 +14,7 @@ import sys import time import select import errno +import textwrap import unittest from test import support, mock_socket @@ -30,7 +35,7 @@ if sys.platform == 'darwin': def server(evt, buf, serv): - serv.listen(5) + serv.listen() evt.set() try: conn, addr = serv.accept() @@ -123,6 +128,27 @@ class GeneralTests(unittest.TestCase): self.assertEqual(smtp.sock.gettimeout(), 30) smtp.close() + def test_debuglevel(self): + mock_socket.reply_with(b"220 Hello world") + smtp = smtplib.SMTP() + smtp.set_debuglevel(1) + with support.captured_stderr() as stderr: + smtp.connect(HOST, self.port) + smtp.close() + expected = re.compile(r"^connect:", re.MULTILINE) + self.assertRegex(stderr.getvalue(), expected) + + def test_debuglevel_2(self): + mock_socket.reply_with(b"220 Hello world") + smtp = smtplib.SMTP() + smtp.set_debuglevel(2) + with support.captured_stderr() as stderr: + smtp.connect(HOST, self.port) + smtp.close() + expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", + re.MULTILINE) + self.assertRegex(stderr.getvalue(), expected) + # Test server thread using the specified SMTP server class def debugging_server(serv, serv_evt, client_evt): @@ -184,7 +210,8 @@ class DebuggingServerTests(unittest.TestCase): self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM smtpd.DEBUGSTREAM = io.StringIO() # Pick a random unused port by passing 0 for the port number - self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1)) + self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), + decode_data=True) # Keep a note of what port was assigned self.port = self.serv.socket.getsockname()[1] serv_args = (self.serv, self.serv_evt, self.client_evt) @@ -598,19 +625,12 @@ sim_users = {'Mr.A@somewhere.com':'John A', sim_auth = ('Mr.A@somewhere.com', 'somepassword') sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') -sim_auth_credentials = { - 'login': 'TXIuQUBzb21ld2hlcmUuY29t', - 'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=', - 'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ' - 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'), - } -sim_auth_login_password = 'C29TZXBHC3N3B3JK' - sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 'list-2':['Ms.B@xn--fo-fka.com',], } # Simulated SMTP channel & server +class ResponseException(Exception): pass class SimSMTPChannel(smtpd.SMTPChannel): quit_response = None @@ -620,12 +640,109 @@ class SimSMTPChannel(smtpd.SMTPChannel): rcpt_count = 0 rset_count = 0 disconnect = 0 + AUTH = 99 # Add protocol state to enable auth testing. + authenticated_user = None def __init__(self, extra_features, *args, **kw): self._extrafeatures = ''.join( [ "250-{0}\r\n".format(x) for x in extra_features ]) super(SimSMTPChannel, self).__init__(*args, **kw) + # AUTH related stuff. It would be nice if support for this were in smtpd. + def found_terminator(self): + if self.smtp_state == self.AUTH: + line = self._emptystring.join(self.received_lines) + print('Data:', repr(line), file=smtpd.DEBUGSTREAM) + self.received_lines = [] + try: + self.auth_object(line) + except ResponseException as e: + self.smtp_state = self.COMMAND + self.push('%s %s' % (e.smtp_code, e.smtp_error)) + return + super().found_terminator() + + + def smtp_AUTH(self, arg): + if not self.seen_greeting: + self.push('503 Error: send EHLO first') + return + if not self.extended_smtp or 'AUTH' not in self._extrafeatures: + self.push('500 Error: command "AUTH" not recognized') + return + if self.authenticated_user is not None: + self.push( + '503 Bad sequence of commands: already authenticated') + return + args = arg.split() + if len(args) not in [1, 2]: + self.push('501 Syntax: AUTH <mechanism> [initial-response]') + return + auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') + try: + self.auth_object = getattr(self, auth_object_name) + except AttributeError: + self.push('504 Command parameter not implemented: unsupported ' + ' authentication mechanism {!r}'.format(auth_object_name)) + return + self.smtp_state = self.AUTH + self.auth_object(args[1] if len(args) == 2 else None) + + def _authenticated(self, user, valid): + if valid: + self.authenticated_user = user + self.push('235 Authentication Succeeded') + else: + self.push('535 Authentication credentials invalid') + self.smtp_state = self.COMMAND + + def _decode_base64(self, string): + return base64.decodebytes(string.encode('ascii')).decode('utf-8') + + def _auth_plain(self, arg=None): + if arg is None: + self.push('334 ') + else: + logpass = self._decode_base64(arg) + try: + *_, user, password = logpass.split('\0') + except ValueError as e: + self.push('535 Splitting response {!r} into user and password' + ' failed: {}'.format(logpass, e)) + return + self._authenticated(user, password == sim_auth[1]) + + def _auth_login(self, arg=None): + if arg is None: + # base64 encoded 'Username:' + self.push('334 VXNlcm5hbWU6') + elif not hasattr(self, '_auth_login_user'): + self._auth_login_user = self._decode_base64(arg) + # base64 encoded 'Password:' + self.push('334 UGFzc3dvcmQ6') + else: + password = self._decode_base64(arg) + self._authenticated(self._auth_login_user, password == sim_auth[1]) + del self._auth_login_user + + def _auth_cram_md5(self, arg=None): + if arg is None: + self.push('334 {}'.format(sim_cram_md5_challenge)) + else: + logpass = self._decode_base64(arg) + try: + user, hashed_pass = logpass.split() + except ValueError as e: + self.push('535 Splitting response {!r} into user and password' + 'failed: {}'.format(logpass, e)) + return False + valid_hashed_pass = hmac.HMAC( + sim_auth[1].encode('ascii'), + self._decode_base64(sim_cram_md5_challenge).encode('ascii'), + 'md5').hexdigest() + self._authenticated(user, hashed_pass == valid_hashed_pass) + # end AUTH related stuff. + def smtp_EHLO(self, arg): resp = ('250-testhost\r\n' '250-EXPN\r\n' @@ -657,22 +774,6 @@ class SimSMTPChannel(smtpd.SMTPChannel): else: self.push('550 No access for you!') - def smtp_AUTH(self, arg): - if arg.strip().lower()=='cram-md5': - self.push('334 {}'.format(sim_cram_md5_challenge)) - return - mech, auth = arg.split() - mech = mech.lower() - if mech not in sim_auth_credentials: - self.push('504 auth type unimplemented') - return - if mech == 'plain' and auth==sim_auth_credentials['plain']: - self.push('235 plain auth ok') - elif mech=='login' and auth==sim_auth_credentials['login']: - self.push('334 Password:') - else: - self.push('550 No access for you!') - def smtp_QUIT(self, arg): if self.quit_response is None: super(SimSMTPChannel, self).smtp_QUIT(arg) @@ -719,7 +820,8 @@ class SimSMTPServer(smtpd.SMTPServer): def handle_accepted(self, conn, addr): self._SMTPchannel = self.channel_class( - self._extra_features, self, conn, addr) + self._extra_features, self, conn, addr, + decode_data=self._decode_data) def process_message(self, peer, mailfrom, rcpttos, data): pass @@ -742,7 +844,7 @@ class SMTPSimTests(unittest.TestCase): self.serv_evt = threading.Event() self.client_evt = threading.Event() # Pick a random unused port by passing 0 for the port number - self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1)) + self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) # Keep a note of what port was assigned self.port = self.serv.socket.getsockname()[1] serv_args = (self.serv, self.serv_evt, self.client_evt) @@ -790,11 +892,11 @@ class SMTPSimTests(unittest.TestCase): def testVRFY(self): smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - for email, name in sim_users.items(): + for addr_spec, name in sim_users.items(): expected_known = (250, bytes('%s %s' % - (name, smtplib.quoteaddr(email)), + (name, smtplib.quoteaddr(addr_spec)), "ascii")) - self.assertEqual(smtp.vrfy(email), expected_known) + self.assertEqual(smtp.vrfy(addr_spec), expected_known) u = 'nobody@nowhere.com' expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) @@ -819,45 +921,47 @@ class SMTPSimTests(unittest.TestCase): def testAUTH_PLAIN(self): self.serv.add_feature("AUTH PLAIN") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - - expected_auth_ok = (235, b'plain auth ok') - self.assertEqual(smtp.login(sim_auth[0], sim_auth[1]), expected_auth_ok) + resp = smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(resp, (235, b'Authentication Succeeded')) smtp.close() - # SimSMTPChannel doesn't fully support LOGIN or CRAM-MD5 auth because they - # require a synchronous read to obtain the credentials...so instead smtpd - # sees the credential sent by smtplib's login method as an unknown command, - # which results in smtplib raising an auth error. Fortunately the error - # message contains the encoded credential, so we can partially check that it - # was generated correctly (partially, because the 'word' is uppercased in - # the error message). - def testAUTH_LOGIN(self): self.serv.add_feature("AUTH LOGIN") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - try: smtp.login(sim_auth[0], sim_auth[1]) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_login_password, str(err)) + resp = smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(resp, (235, b'Authentication Succeeded')) smtp.close() def testAUTH_CRAM_MD5(self): self.serv.add_feature("AUTH CRAM-MD5") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - - try: smtp.login(sim_auth[0], sim_auth[1]) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_credentials['cram-md5'], str(err)) + resp = smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(resp, (235, b'Authentication Succeeded')) smtp.close() def testAUTH_multiple(self): # Test that multiple authentication methods are tried. self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - try: smtp.login(sim_auth[0], sim_auth[1]) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_login_password, str(err)) + resp = smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(resp, (235, b'Authentication Succeeded')) smtp.close() + def test_auth_function(self): + supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'} + for mechanism in supported: + self.serv.add_feature("AUTH {}".format(mechanism)) + for mechanism in supported: + with self.subTest(mechanism=mechanism): + smtp = smtplib.SMTP(HOST, self.port, + local_hostname='localhost', timeout=15) + smtp.ehlo('foo') + smtp.user, smtp.password = sim_auth[0], sim_auth[1] + method = 'auth_' + mechanism.lower().replace('-', '_') + resp = smtp.auth(mechanism, getattr(smtp, method)) + self.assertEqual(resp, (235, b'Authentication Succeeded')) + smtp.close() + def test_quit_resets_greeting(self): smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', @@ -938,13 +1042,249 @@ class SMTPSimTests(unittest.TestCase): self.assertIsNone(smtp.sock) self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) + def test_smtputf8_NotSupportedError_if_no_server_support(self): + smtp = smtplib.SMTP( + HOST, self.port, local_hostname='localhost', timeout=3) + self.addCleanup(smtp.close) + smtp.ehlo() + self.assertTrue(smtp.does_esmtp) + self.assertFalse(smtp.has_extn('smtputf8')) + self.assertRaises( + smtplib.SMTPNotSupportedError, + smtp.sendmail, + 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) + self.assertRaises( + smtplib.SMTPNotSupportedError, + smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) + + def test_send_unicode_without_SMTPUTF8(self): + smtp = smtplib.SMTP( + HOST, self.port, local_hostname='localhost', timeout=3) + self.addCleanup(smtp.close) + self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') + self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') + + +class SimSMTPUTF8Server(SimSMTPServer): + + def __init__(self, *args, **kw): + # The base SMTP server turns these on automatically, but our test + # server is set up to munge the EHLO response, so we need to provide + # them as well. And yes, the call is to SMTPServer not SimSMTPServer. + self._extra_features = ['SMTPUTF8', '8BITMIME'] + smtpd.SMTPServer.__init__(self, *args, **kw) + + def handle_accepted(self, conn, addr): + self._SMTPchannel = self.channel_class( + self._extra_features, self, conn, addr, + decode_data=self._decode_data, + enable_SMTPUTF8=self.enable_SMTPUTF8, + ) + + def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, + rcpt_options=None): + self.last_peer = peer + self.last_mailfrom = mailfrom + self.last_rcpttos = rcpttos + self.last_message = data + self.last_mail_options = mail_options + self.last_rcpt_options = rcpt_options + + +@unittest.skipUnless(threading, 'Threading required for this test.') +class SMTPUTF8SimTests(unittest.TestCase): + + maxDiff = None + + def setUp(self): + self.real_getfqdn = socket.getfqdn + socket.getfqdn = mock_socket.getfqdn + self.serv_evt = threading.Event() + self.client_evt = threading.Event() + # Pick a random unused port by passing 0 for the port number + self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), + decode_data=False, + enable_SMTPUTF8=True) + # Keep a note of what port was assigned + self.port = self.serv.socket.getsockname()[1] + serv_args = (self.serv, self.serv_evt, self.client_evt) + self.thread = threading.Thread(target=debugging_server, args=serv_args) + self.thread.start() + + # wait until server thread has assigned a port number + self.serv_evt.wait() + self.serv_evt.clear() + + def tearDown(self): + socket.getfqdn = self.real_getfqdn + # indicate that the client is finished + self.client_evt.set() + # wait for the server thread to terminate + self.serv_evt.wait() + self.thread.join() + + def test_test_server_supports_extensions(self): + smtp = smtplib.SMTP( + HOST, self.port, local_hostname='localhost', timeout=3) + self.addCleanup(smtp.close) + smtp.ehlo() + self.assertTrue(smtp.does_esmtp) + self.assertTrue(smtp.has_extn('smtputf8')) + + def test_send_unicode_with_SMTPUTF8_via_sendmail(self): + m = '¡a test message containing unicode!'.encode('utf-8') + smtp = smtplib.SMTP( + HOST, self.port, local_hostname='localhost', timeout=3) + self.addCleanup(smtp.close) + smtp.sendmail('Jőhn', 'Sálly', m, + mail_options=['BODY=8BITMIME', 'SMTPUTF8']) + self.assertEqual(self.serv.last_mailfrom, 'Jőhn') + self.assertEqual(self.serv.last_rcpttos, ['Sálly']) + self.assertEqual(self.serv.last_message, m) + self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) + self.assertIn('SMTPUTF8', self.serv.last_mail_options) + self.assertEqual(self.serv.last_rcpt_options, []) + + def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): + m = '¡a test message containing unicode!'.encode('utf-8') + smtp = smtplib.SMTP( + HOST, self.port, local_hostname='localhost', timeout=3) + self.addCleanup(smtp.close) + smtp.ehlo() + self.assertEqual( + smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), + (250, b'OK')) + self.assertEqual(smtp.rcpt('János'), (250, b'OK')) + self.assertEqual(smtp.data(m), (250, b'OK')) + self.assertEqual(self.serv.last_mailfrom, 'Jő') + self.assertEqual(self.serv.last_rcpttos, ['János']) + self.assertEqual(self.serv.last_message, m) + self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) + self.assertIn('SMTPUTF8', self.serv.last_mail_options) + self.assertEqual(self.serv.last_rcpt_options, []) + + def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): + msg = EmailMessage() + msg['From'] = "Páolo <főo@bar.com>" + msg['To'] = 'Dinsdale' + msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' + # XXX I don't know why I need two \n's here, but this is an existing + # bug (if it is one) and not a problem with the new functionality. + msg.set_content("oh là là, know what I mean, know what I mean?\n\n") + # XXX smtpd converts received /r/n to /n, so we can't easily test that + # we are successfully sending /r/n :(. + expected = textwrap.dedent("""\ + From: Páolo <főo@bar.com> + To: Dinsdale + Subject: Nudge nudge, wink, wink \u1F609 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: 8bit + MIME-Version: 1.0 + + oh là là, know what I mean, know what I mean? + """) + smtp = smtplib.SMTP( + HOST, self.port, local_hostname='localhost', timeout=3) + self.addCleanup(smtp.close) + self.assertEqual(smtp.send_message(msg), {}) + self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com') + self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) + self.assertEqual(self.serv.last_message.decode(), expected) + self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) + self.assertIn('SMTPUTF8', self.serv.last_mail_options) + self.assertEqual(self.serv.last_rcpt_options, []) + + def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): + msg = EmailMessage() + msg['From'] = "Páolo <főo@bar.com>" + msg['To'] = 'Dinsdale' + msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' + smtp = smtplib.SMTP( + HOST, self.port, local_hostname='localhost', timeout=3) + self.addCleanup(smtp.close) + self.assertRaises(smtplib.SMTPNotSupportedError, + smtp.send_message(msg)) + + +EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') + +class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): + def smtp_AUTH(self, arg): + # RFC 4954's AUTH command allows for an optional initial-response. + # Not all AUTH methods support this; some require a challenge. AUTH + # PLAIN does those, so test that here. See issue #15014. + args = arg.split() + if args[0].lower() == 'plain': + if len(args) == 2: + # AUTH PLAIN <initial-response> with the response base 64 + # encoded. Hard code the expected response for the test. + if args[1] == EXPECTED_RESPONSE: + self.push('235 Ok') + return + self.push('571 Bad authentication') + +class SimSMTPAUTHInitialResponseServer(SimSMTPServer): + channel_class = SimSMTPAUTHInitialResponseChannel + + +@unittest.skipUnless(threading, 'Threading required for this test.') +class SMTPAUTHInitialResponseSimTests(unittest.TestCase): + def setUp(self): + self.real_getfqdn = socket.getfqdn + socket.getfqdn = mock_socket.getfqdn + self.serv_evt = threading.Event() + self.client_evt = threading.Event() + # Pick a random unused port by passing 0 for the port number + self.serv = SimSMTPAUTHInitialResponseServer( + (HOST, 0), ('nowhere', -1), decode_data=True) + # Keep a note of what port was assigned + self.port = self.serv.socket.getsockname()[1] + serv_args = (self.serv, self.serv_evt, self.client_evt) + self.thread = threading.Thread(target=debugging_server, args=serv_args) + self.thread.start() + + # wait until server thread has assigned a port number + self.serv_evt.wait() + self.serv_evt.clear() + + def tearDown(self): + socket.getfqdn = self.real_getfqdn + # indicate that the client is finished + self.client_evt.set() + # wait for the server thread to terminate + self.serv_evt.wait() + self.thread.join() + + def testAUTH_PLAIN_initial_response_login(self): + self.serv.add_feature('AUTH PLAIN') + smtp = smtplib.SMTP(HOST, self.port, + local_hostname='localhost', timeout=15) + smtp.login('psu', 'doesnotexist') + smtp.close() + + def testAUTH_PLAIN_initial_response_auth(self): + self.serv.add_feature('AUTH PLAIN') + smtp = smtplib.SMTP(HOST, self.port, + local_hostname='localhost', timeout=15) + smtp.user = 'psu' + smtp.password = 'doesnotexist' + code, response = smtp.auth('plain', smtp.auth_plain) + smtp.close() + self.assertEqual(code, 235) + @support.reap_threads def test_main(verbose=None): - support.run_unittest(GeneralTests, DebuggingServerTests, - NonConnectingTests, - BadHELOServerTests, SMTPSimTests, - TooLongLineTests) + support.run_unittest( + BadHELOServerTests, + DebuggingServerTests, + GeneralTests, + NonConnectingTests, + SMTPAUTHInitialResponseSimTests, + SMTPSimTests, + TooLongLineTests, + ) + if __name__ == '__main__': test_main() |
