summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorR David Murray <rdmurray@bitdance.com>2015-05-16 17:58:14 (GMT)
committerR David Murray <rdmurray@bitdance.com>2015-05-16 17:58:14 (GMT)
commitcee7cf60263210ca34f8221fc7dabd360c825533 (patch)
tree0c92234dbae918c8fa655681fdf41f0e3befc23e /Lib
parentb907a513c8285410d93adc2895508985838fe1a7 (diff)
downloadcpython-cee7cf60263210ca34f8221fc7dabd360c825533.zip
cpython-cee7cf60263210ca34f8221fc7dabd360c825533.tar.gz
cpython-cee7cf60263210ca34f8221fc7dabd360c825533.tar.bz2
#22027: Add RFC6531 support to smtplib.
Initial patch by Milan Oberkirch.
Diffstat (limited to 'Lib')
-rwxr-xr-xLib/smtplib.py42
-rw-r--r--Lib/test/test_smtplib.py119
2 files changed, 155 insertions, 6 deletions
diff --git a/Lib/smtplib.py b/Lib/smtplib.py
index 7f49f23..6895bed 100755
--- a/Lib/smtplib.py
+++ b/Lib/smtplib.py
@@ -71,6 +71,13 @@ OLDSTYLE_AUTH = re.compile(r"auth=(.*)", re.I)
class SMTPException(OSError):
"""Base class for all exceptions raised by this module."""
+class SMTPNotSupportedError(SMTPException):
+ """The command or option is not supported by the SMTP server.
+
+ This exception is raised when an attempt is made to run a command or a
+ command with an option which is not supported by the server.
+ """
+
class SMTPServerDisconnected(SMTPException):
"""Not connected to any SMTP server.
@@ -237,6 +244,7 @@ class SMTP:
self._host = host
self.timeout = timeout
self.esmtp_features = {}
+ self.command_encoding = 'ascii'
self.source_address = source_address
if host:
@@ -337,7 +345,10 @@ class SMTP:
self._print_debug('send:', repr(s))
if hasattr(self, 'sock') and self.sock:
if isinstance(s, str):
- s = s.encode("ascii")
+ # send is used by the 'data' command, where command_encoding
+ # should not be used, but 'data' needs to convert the string to
+ # binary itself anyway, so that's not a problem.
+ s = s.encode(self.command_encoding)
try:
self.sock.sendall(s)
except OSError:
@@ -482,6 +493,7 @@ class SMTP:
def rset(self):
"""SMTP 'rset' command -- resets session."""
+ self.command_encoding = 'ascii'
return self.docmd("rset")
def _rset(self):
@@ -501,9 +513,22 @@ class SMTP:
return self.docmd("noop")
def mail(self, sender, options=[]):
- """SMTP 'mail' command -- begins mail xfer session."""
+ """SMTP 'mail' command -- begins mail xfer session.
+
+ This method may raise the following exceptions:
+
+ SMTPNotSupportedError The options parameter includes 'SMTPUTF8'
+ but the SMTPUTF8 extension is not supported by
+ the server.
+ """
optionlist = ''
if options and self.does_esmtp:
+ if any(x.lower()=='smtputf8' for x in options):
+ if self.has_extn('smtputf8'):
+ self.command_encoding = 'utf-8'
+ else:
+ raise SMTPNotSupportedError(
+ 'SMTPUTF8 not supported by server')
optionlist = ' ' + ' '.join(options)
self.putcmd("mail", "FROM:%s%s" % (quoteaddr(sender), optionlist))
return self.getreply()
@@ -642,13 +667,16 @@ class SMTP:
the helo greeting.
SMTPAuthenticationError The server didn't accept the username/
password combination.
+ SMTPNotSupportedError The AUTH command is not supported by the
+ server.
SMTPException No suitable authentication method was
found.
"""
self.ehlo_or_helo_if_needed()
if not self.has_extn("auth"):
- raise SMTPException("SMTP AUTH extension not supported by server.")
+ raise SMTPNotSupportedError(
+ "SMTP AUTH extension not supported by server.")
# Authentication methods the server claims to support
advertised_authlist = self.esmtp_features["auth"].split()
@@ -700,7 +728,8 @@ class SMTP:
"""
self.ehlo_or_helo_if_needed()
if not self.has_extn("starttls"):
- raise SMTPException("STARTTLS extension not supported by server.")
+ raise SMTPNotSupportedError(
+ "STARTTLS extension not supported by server.")
(resp, reply) = self.docmd("STARTTLS")
if resp == 220:
if not _have_ssl:
@@ -765,6 +794,9 @@ class SMTP:
SMTPDataError The server replied with an unexpected
error code (other than a refusal of
a recipient).
+ SMTPNotSupportedError The mail_options parameter includes 'SMTPUTF8'
+ but the SMTPUTF8 extension is not supported by
+ the server.
Note: the connection will be open even after an exception is raised.
@@ -793,8 +825,6 @@ class SMTP:
if isinstance(msg, str):
msg = _fix_eols(msg).encode('ascii')
if self.does_esmtp:
- # Hmmm? what's this? -ddm
- # self.esmtp_features['7bit']=""
if self.has_extn('size'):
esmtp_opts.append("size=%d" % len(msg))
for option in mail_options:
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
index 9011042..e496371 100644
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -977,6 +977,125 @@ class SMTPSimTests(unittest.TestCase):
self.assertIsNone(smtp.sock)
self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
+ def test_smtputf8_NotSupportedError_if_no_server_support(self):
+ smtp = smtplib.SMTP(
+ HOST, self.port, local_hostname='localhost', timeout=3)
+ self.addCleanup(smtp.close)
+ smtp.ehlo()
+ self.assertTrue(smtp.does_esmtp)
+ self.assertFalse(smtp.has_extn('smtputf8'))
+ self.assertRaises(
+ smtplib.SMTPNotSupportedError,
+ smtp.sendmail,
+ 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
+ self.assertRaises(
+ smtplib.SMTPNotSupportedError,
+ smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
+
+ def test_send_unicode_without_SMTPUTF8(self):
+ smtp = smtplib.SMTP(
+ HOST, self.port, local_hostname='localhost', timeout=3)
+ self.addCleanup(smtp.close)
+ self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
+ self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
+
+
+class SimSMTPUTF8Server(SimSMTPServer):
+
+ def __init__(self, *args, **kw):
+ # The base SMTP server turns these on automatically, but our test
+ # server is set up to munge the EHLO response, so we need to provide
+ # them as well. And yes, the call is to SMTPServer not SimSMTPServer.
+ self._extra_features = ['SMTPUTF8', '8BITMIME']
+ smtpd.SMTPServer.__init__(self, *args, **kw)
+
+ def handle_accepted(self, conn, addr):
+ self._SMTPchannel = self.channel_class(
+ self._extra_features, self, conn, addr,
+ decode_data=self._decode_data,
+ enable_SMTPUTF8=self.enable_SMTPUTF8,
+ )
+
+ def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
+ rcpt_options=None):
+ self.last_peer = peer
+ self.last_mailfrom = mailfrom
+ self.last_rcpttos = rcpttos
+ self.last_message = data
+ self.last_mail_options = mail_options
+ self.last_rcpt_options = rcpt_options
+
+
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class SMTPUTF8SimTests(unittest.TestCase):
+
+ def setUp(self):
+ self.real_getfqdn = socket.getfqdn
+ socket.getfqdn = mock_socket.getfqdn
+ self.serv_evt = threading.Event()
+ self.client_evt = threading.Event()
+ # Pick a random unused port by passing 0 for the port number
+ self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
+ decode_data=False,
+ enable_SMTPUTF8=True)
+ # Keep a note of what port was assigned
+ self.port = self.serv.socket.getsockname()[1]
+ serv_args = (self.serv, self.serv_evt, self.client_evt)
+ self.thread = threading.Thread(target=debugging_server, args=serv_args)
+ self.thread.start()
+
+ # wait until server thread has assigned a port number
+ self.serv_evt.wait()
+ self.serv_evt.clear()
+
+ def tearDown(self):
+ socket.getfqdn = self.real_getfqdn
+ # indicate that the client is finished
+ self.client_evt.set()
+ # wait for the server thread to terminate
+ self.serv_evt.wait()
+ self.thread.join()
+
+ def test_test_server_supports_extensions(self):
+ smtp = smtplib.SMTP(
+ HOST, self.port, local_hostname='localhost', timeout=3)
+ self.addCleanup(smtp.close)
+ smtp.ehlo()
+ self.assertTrue(smtp.does_esmtp)
+ self.assertTrue(smtp.has_extn('smtputf8'))
+
+ def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
+ m = '¡a test message containing unicode!'.encode('utf-8')
+ smtp = smtplib.SMTP(
+ HOST, self.port, local_hostname='localhost', timeout=3)
+ self.addCleanup(smtp.close)
+ smtp.sendmail('Jőhn', 'Sálly', m,
+ mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
+ self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
+ self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
+ self.assertEqual(self.serv.last_message, m)
+ self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
+ self.assertIn('SMTPUTF8', self.serv.last_mail_options)
+ self.assertEqual(self.serv.last_rcpt_options, [])
+
+ def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
+ m = '¡a test message containing unicode!'.encode('utf-8')
+ smtp = smtplib.SMTP(
+ HOST, self.port, local_hostname='localhost', timeout=3)
+ self.addCleanup(smtp.close)
+ smtp.ehlo()
+ self.assertEqual(
+ smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
+ (250, b'OK'))
+ self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
+ self.assertEqual(smtp.data(m), (250, b'OK'))
+ self.assertEqual(self.serv.last_mailfrom, 'Jő')
+ self.assertEqual(self.serv.last_rcpttos, ['János'])
+ self.assertEqual(self.serv.last_message, m)
+ self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
+ self.assertIn('SMTPUTF8', self.serv.last_mail_options)
+ self.assertEqual(self.serv.last_rcpt_options, [])
+
@support.reap_threads
def test_main(verbose=None):