diff options
author | R David Murray <rdmurray@bitdance.com> | 2015-05-16 17:58:14 (GMT) |
---|---|---|
committer | R David Murray <rdmurray@bitdance.com> | 2015-05-16 17:58:14 (GMT) |
commit | cee7cf60263210ca34f8221fc7dabd360c825533 (patch) | |
tree | 0c92234dbae918c8fa655681fdf41f0e3befc23e /Lib | |
parent | b907a513c8285410d93adc2895508985838fe1a7 (diff) | |
download | cpython-cee7cf60263210ca34f8221fc7dabd360c825533.zip cpython-cee7cf60263210ca34f8221fc7dabd360c825533.tar.gz cpython-cee7cf60263210ca34f8221fc7dabd360c825533.tar.bz2 |
#22027: Add RFC6531 support to smtplib.
Initial patch by Milan Oberkirch.
Diffstat (limited to 'Lib')
-rwxr-xr-x | Lib/smtplib.py | 42 | ||||
-rw-r--r-- | Lib/test/test_smtplib.py | 119 |
2 files changed, 155 insertions, 6 deletions
diff --git a/Lib/smtplib.py b/Lib/smtplib.py index 7f49f23..6895bed 100755 --- a/Lib/smtplib.py +++ b/Lib/smtplib.py @@ -71,6 +71,13 @@ OLDSTYLE_AUTH = re.compile(r"auth=(.*)", re.I) class SMTPException(OSError): """Base class for all exceptions raised by this module.""" +class SMTPNotSupportedError(SMTPException): + """The command or option is not supported by the SMTP server. + + This exception is raised when an attempt is made to run a command or a + command with an option which is not supported by the server. + """ + class SMTPServerDisconnected(SMTPException): """Not connected to any SMTP server. @@ -237,6 +244,7 @@ class SMTP: self._host = host self.timeout = timeout self.esmtp_features = {} + self.command_encoding = 'ascii' self.source_address = source_address if host: @@ -337,7 +345,10 @@ class SMTP: self._print_debug('send:', repr(s)) if hasattr(self, 'sock') and self.sock: if isinstance(s, str): - s = s.encode("ascii") + # send is used by the 'data' command, where command_encoding + # should not be used, but 'data' needs to convert the string to + # binary itself anyway, so that's not a problem. + s = s.encode(self.command_encoding) try: self.sock.sendall(s) except OSError: @@ -482,6 +493,7 @@ class SMTP: def rset(self): """SMTP 'rset' command -- resets session.""" + self.command_encoding = 'ascii' return self.docmd("rset") def _rset(self): @@ -501,9 +513,22 @@ class SMTP: return self.docmd("noop") def mail(self, sender, options=[]): - """SMTP 'mail' command -- begins mail xfer session.""" + """SMTP 'mail' command -- begins mail xfer session. + + This method may raise the following exceptions: + + SMTPNotSupportedError The options parameter includes 'SMTPUTF8' + but the SMTPUTF8 extension is not supported by + the server. + """ optionlist = '' if options and self.does_esmtp: + if any(x.lower()=='smtputf8' for x in options): + if self.has_extn('smtputf8'): + self.command_encoding = 'utf-8' + else: + raise SMTPNotSupportedError( + 'SMTPUTF8 not supported by server') optionlist = ' ' + ' '.join(options) self.putcmd("mail", "FROM:%s%s" % (quoteaddr(sender), optionlist)) return self.getreply() @@ -642,13 +667,16 @@ class SMTP: the helo greeting. SMTPAuthenticationError The server didn't accept the username/ password combination. + SMTPNotSupportedError The AUTH command is not supported by the + server. SMTPException No suitable authentication method was found. """ self.ehlo_or_helo_if_needed() if not self.has_extn("auth"): - raise SMTPException("SMTP AUTH extension not supported by server.") + raise SMTPNotSupportedError( + "SMTP AUTH extension not supported by server.") # Authentication methods the server claims to support advertised_authlist = self.esmtp_features["auth"].split() @@ -700,7 +728,8 @@ class SMTP: """ self.ehlo_or_helo_if_needed() if not self.has_extn("starttls"): - raise SMTPException("STARTTLS extension not supported by server.") + raise SMTPNotSupportedError( + "STARTTLS extension not supported by server.") (resp, reply) = self.docmd("STARTTLS") if resp == 220: if not _have_ssl: @@ -765,6 +794,9 @@ class SMTP: SMTPDataError The server replied with an unexpected error code (other than a refusal of a recipient). + SMTPNotSupportedError The mail_options parameter includes 'SMTPUTF8' + but the SMTPUTF8 extension is not supported by + the server. Note: the connection will be open even after an exception is raised. @@ -793,8 +825,6 @@ class SMTP: if isinstance(msg, str): msg = _fix_eols(msg).encode('ascii') if self.does_esmtp: - # Hmmm? what's this? -ddm - # self.esmtp_features['7bit']="" if self.has_extn('size'): esmtp_opts.append("size=%d" % len(msg)) for option in mail_options: diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py index 9011042..e496371 100644 --- a/Lib/test/test_smtplib.py +++ b/Lib/test/test_smtplib.py @@ -977,6 +977,125 @@ class SMTPSimTests(unittest.TestCase): self.assertIsNone(smtp.sock) self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) + def test_smtputf8_NotSupportedError_if_no_server_support(self): + smtp = smtplib.SMTP( + HOST, self.port, local_hostname='localhost', timeout=3) + self.addCleanup(smtp.close) + smtp.ehlo() + self.assertTrue(smtp.does_esmtp) + self.assertFalse(smtp.has_extn('smtputf8')) + self.assertRaises( + smtplib.SMTPNotSupportedError, + smtp.sendmail, + 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) + self.assertRaises( + smtplib.SMTPNotSupportedError, + smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) + + def test_send_unicode_without_SMTPUTF8(self): + smtp = smtplib.SMTP( + HOST, self.port, local_hostname='localhost', timeout=3) + self.addCleanup(smtp.close) + self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') + self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') + + +class SimSMTPUTF8Server(SimSMTPServer): + + def __init__(self, *args, **kw): + # The base SMTP server turns these on automatically, but our test + # server is set up to munge the EHLO response, so we need to provide + # them as well. And yes, the call is to SMTPServer not SimSMTPServer. + self._extra_features = ['SMTPUTF8', '8BITMIME'] + smtpd.SMTPServer.__init__(self, *args, **kw) + + def handle_accepted(self, conn, addr): + self._SMTPchannel = self.channel_class( + self._extra_features, self, conn, addr, + decode_data=self._decode_data, + enable_SMTPUTF8=self.enable_SMTPUTF8, + ) + + def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, + rcpt_options=None): + self.last_peer = peer + self.last_mailfrom = mailfrom + self.last_rcpttos = rcpttos + self.last_message = data + self.last_mail_options = mail_options + self.last_rcpt_options = rcpt_options + + +@unittest.skipUnless(threading, 'Threading required for this test.') +class SMTPUTF8SimTests(unittest.TestCase): + + def setUp(self): + self.real_getfqdn = socket.getfqdn + socket.getfqdn = mock_socket.getfqdn + self.serv_evt = threading.Event() + self.client_evt = threading.Event() + # Pick a random unused port by passing 0 for the port number + self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), + decode_data=False, + enable_SMTPUTF8=True) + # Keep a note of what port was assigned + self.port = self.serv.socket.getsockname()[1] + serv_args = (self.serv, self.serv_evt, self.client_evt) + self.thread = threading.Thread(target=debugging_server, args=serv_args) + self.thread.start() + + # wait until server thread has assigned a port number + self.serv_evt.wait() + self.serv_evt.clear() + + def tearDown(self): + socket.getfqdn = self.real_getfqdn + # indicate that the client is finished + self.client_evt.set() + # wait for the server thread to terminate + self.serv_evt.wait() + self.thread.join() + + def test_test_server_supports_extensions(self): + smtp = smtplib.SMTP( + HOST, self.port, local_hostname='localhost', timeout=3) + self.addCleanup(smtp.close) + smtp.ehlo() + self.assertTrue(smtp.does_esmtp) + self.assertTrue(smtp.has_extn('smtputf8')) + + def test_send_unicode_with_SMTPUTF8_via_sendmail(self): + m = '¡a test message containing unicode!'.encode('utf-8') + smtp = smtplib.SMTP( + HOST, self.port, local_hostname='localhost', timeout=3) + self.addCleanup(smtp.close) + smtp.sendmail('Jőhn', 'Sálly', m, + mail_options=['BODY=8BITMIME', 'SMTPUTF8']) + self.assertEqual(self.serv.last_mailfrom, 'Jőhn') + self.assertEqual(self.serv.last_rcpttos, ['Sálly']) + self.assertEqual(self.serv.last_message, m) + self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) + self.assertIn('SMTPUTF8', self.serv.last_mail_options) + self.assertEqual(self.serv.last_rcpt_options, []) + + def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): + m = '¡a test message containing unicode!'.encode('utf-8') + smtp = smtplib.SMTP( + HOST, self.port, local_hostname='localhost', timeout=3) + self.addCleanup(smtp.close) + smtp.ehlo() + self.assertEqual( + smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), + (250, b'OK')) + self.assertEqual(smtp.rcpt('János'), (250, b'OK')) + self.assertEqual(smtp.data(m), (250, b'OK')) + self.assertEqual(self.serv.last_mailfrom, 'Jő') + self.assertEqual(self.serv.last_rcpttos, ['János']) + self.assertEqual(self.serv.last_message, m) + self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) + self.assertIn('SMTPUTF8', self.serv.last_mail_options) + self.assertEqual(self.serv.last_rcpt_options, []) + @support.reap_threads def test_main(verbose=None): |