summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorR. David Murray <rdmurray@bitdance.com>2010-11-08 17:15:13 (GMT)
committerR. David Murray <rdmurray@bitdance.com>2010-11-08 17:15:13 (GMT)
commit7dff9e08fbfb371b90431e5cbf380c4ea5d228f7 (patch)
tree417cf2edad370f4aa9bdb4215a80f5e79ec8c27f /Lib
parenta563392f92558d2c288f18924af38a1a9c58fad3 (diff)
downloadcpython-7dff9e08fbfb371b90431e5cbf380c4ea5d228f7.zip
cpython-7dff9e08fbfb371b90431e5cbf380c4ea5d228f7.tar.gz
cpython-7dff9e08fbfb371b90431e5cbf380c4ea5d228f7.tar.bz2
#10321: Add support for sending binary DATA and Message objects to smtplib
Diffstat (limited to 'Lib')
-rwxr-xr-xLib/smtplib.py59
-rw-r--r--Lib/test/test_smtplib.py106
2 files changed, 159 insertions, 6 deletions
diff --git a/Lib/smtplib.py b/Lib/smtplib.py
index 1c1a9d1..ccb2236 100755
--- a/Lib/smtplib.py
+++ b/Lib/smtplib.py
@@ -42,8 +42,11 @@ Example:
# This was modified from the Python 1.5 library HTTP lib.
import socket
+import io
import re
import email.utils
+import email.message
+import email.generator
import base64
import hmac
from email.base64mime import body_encode as encode_base64
@@ -57,6 +60,7 @@ __all__ = ["SMTPException","SMTPServerDisconnected","SMTPResponseException",
SMTP_PORT = 25
SMTP_SSL_PORT = 465
CRLF="\r\n"
+bCRLF=b"\r\n"
OLDSTYLE_AUTH = re.compile(r"auth=(.*)", re.I)
@@ -147,6 +151,7 @@ def quoteaddr(addr):
else:
return "<%s>" % m
+# Legacy method kept for backward compatibility.
def quotedata(data):
"""Quote data for email.
@@ -156,6 +161,12 @@ def quotedata(data):
return re.sub(r'(?m)^\.', '..',
re.sub(r'(?:\r\n|\n|\r(?!\n))', CRLF, data))
+def _quote_periods(bindata):
+ return re.sub(br'(?m)^\.', '..', bindata)
+
+def _fix_eols(data):
+ return re.sub(r'(?:\r\n|\n|\r(?!\n))', CRLF, data)
+
try:
import ssl
except ImportError:
@@ -469,7 +480,9 @@ class SMTP:
Automatically quotes lines beginning with a period per rfc821.
Raises SMTPDataError if there is an unexpected reply to the
DATA command; the return value from this method is the final
- response code received when the all data is sent.
+ response code received when the all data is sent. If msg
+ is a string, lone '\r' and '\n' characters are converted to
+ '\r\n' characters. If msg is bytes, it is transmitted as is.
"""
self.putcmd("data")
(code,repl)=self.getreply()
@@ -477,10 +490,12 @@ class SMTP:
if code != 354:
raise SMTPDataError(code,repl)
else:
- q = quotedata(msg)
- if q[-2:] != CRLF:
- q = q + CRLF
- q = q + "." + CRLF
+ if isinstance(msg, str):
+ msg = _fix_eols(msg).encode('ascii')
+ q = _quote_periods(msg)
+ if q[-2:] != bCRLF:
+ q = q + bCRLF
+ q = q + b"." + bCRLF
self.send(q)
(code,msg)=self.getreply()
if self.debuglevel >0 : print("data:", (code,msg), file=stderr)
@@ -648,6 +663,10 @@ class SMTP:
- rcpt_options : List of ESMTP options (such as DSN commands) for
all the rcpt commands.
+ msg may be a string containing characters in the ASCII range, or a byte
+ string. A string is encoded to bytes using the ascii codec, and lone
+ \r and \n characters are converted to \r\n characters.
+
If there has been no previous EHLO or HELO command this session, this
method tries ESMTP EHLO first. If the server does ESMTP, message size
and each of the specified options will be passed to it. If EHLO
@@ -693,6 +712,8 @@ class SMTP:
"""
self.ehlo_or_helo_if_needed()
esmtp_opts = []
+ if isinstance(msg, str):
+ msg = _fix_eols(msg).encode('ascii')
if self.does_esmtp:
# Hmmm? what's this? -ddm
# self.esmtp_features['7bit']=""
@@ -700,7 +721,6 @@ class SMTP:
esmtp_opts.append("size=%d" % len(msg))
for option in mail_options:
esmtp_opts.append(option)
-
(code,resp) = self.mail(from_addr, esmtp_opts)
if code != 250:
self.rset()
@@ -723,6 +743,33 @@ class SMTP:
#if we got here then somebody got our mail
return senderrs
+ def send_message(self, msg, from_addr=None, to_addrs=None,
+ mail_options=[], rcpt_options={}):
+ """Converts message to a bytestring and passes it to sendmail.
+
+ The arguments are as for sendmail, except that msg is an
+ email.message.Message object. If from_addr is None, the from_addr is
+ taken from the 'From' header of the Message. If to_addrs is None, its
+ value is composed from the addresses listed in the 'To', 'CC', and
+ 'Bcc' fields. Regardless of the values of from_addr and to_addr, any
+ Bcc field in the Message object is deleted. The Message object is then
+ serialized using email.generator.BytesGenerator and sendmail is called
+ to transmit the message.
+ """
+ if from_addr is None:
+ from_addr = msg['From']
+ if to_addrs is None:
+ addr_fields = [f for f in (msg['To'], msg['Bcc'], msg['CC'])
+ if f is not None]
+ to_addrs = [a[1] for a in email.utils.getaddresses(addr_fields)]
+ del msg['Bcc']
+ with io.BytesIO() as bytesmsg:
+ g = email.generator.BytesGenerator(bytesmsg)
+ g.flatten(msg, linesep='\r\n')
+ flatmsg = bytesmsg.getvalue()
+ return self.sendmail(from_addr, to_addrs, flatmsg, mail_options,
+ rcpt_options)
+
def close(self):
"""Close the connection to the SMTP server."""
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
index 6390a86d..795586a 100644
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -1,9 +1,11 @@
import asyncore
+import email.mime.text
import email.utils
import socket
import smtpd
import smtplib
import io
+import re
import sys
import time
import select
@@ -57,6 +59,13 @@ class GeneralTests(unittest.TestCase):
def tearDown(self):
smtplib.socket = socket
+ # This method is no longer used but is retained for backward compatibility,
+ # so test to make sure it still works.
+ def testQuoteData(self):
+ teststr = "abc\n.jkl\rfoo\r\n..blue"
+ expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
+ self.assertEqual(expected, smtplib.quotedata(teststr))
+
def testBasic1(self):
mock_socket.reply_with(b"220 Hola mundo")
# connects
@@ -150,6 +159,8 @@ MSG_END = '------------ END MESSAGE ------------\n'
@unittest.skipUnless(threading, 'Threading required for this test.')
class DebuggingServerTests(unittest.TestCase):
+ maxDiff = None
+
def setUp(self):
self.real_getfqdn = socket.getfqdn
socket.getfqdn = mock_socket.getfqdn
@@ -161,6 +172,9 @@ class DebuggingServerTests(unittest.TestCase):
self._threads = support.threading_setup()
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
+ # Capture SMTPChannel debug output
+ self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
+ smtpd.DEBUGSTREAM = io.StringIO()
# Pick a random unused port by passing 0 for the port number
self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
# Keep a note of what port was assigned
@@ -183,6 +197,9 @@ class DebuggingServerTests(unittest.TestCase):
support.threading_cleanup(*self._threads)
# restore sys.stdout
sys.stdout = self.old_stdout
+ # restore DEBUGSTREAM
+ smtpd.DEBUGSTREAM.close()
+ smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
def testBasic(self):
# connect
@@ -247,6 +264,95 @@ class DebuggingServerTests(unittest.TestCase):
mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
self.assertEqual(self.output.getvalue(), mexpect)
+ def testSendBinary(self):
+ m = b'A test message'
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
+ smtp.sendmail('John', 'Sally', m)
+ # XXX (see comment in testSend)
+ time.sleep(0.01)
+ smtp.quit()
+
+ self.client_evt.set()
+ self.serv_evt.wait()
+ self.output.flush()
+ mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
+ self.assertEqual(self.output.getvalue(), mexpect)
+
+ def testSendMessage(self):
+ m = email.mime.text.MIMEText('A test message')
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
+ smtp.send_message(m, from_addr='John', to_addrs='Sally')
+ # XXX (see comment in testSend)
+ time.sleep(0.01)
+ smtp.quit()
+
+ self.client_evt.set()
+ self.serv_evt.wait()
+ self.output.flush()
+ # Add the X-Peer header that DebuggingServer adds
+ # XXX: I'm not sure hardcoding this IP will work on linux-vserver.
+ m['X-Peer'] = '127.0.0.1'
+ mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
+ self.assertEqual(self.output.getvalue(), mexpect)
+
+ def testSendMessageWithAddresses(self):
+ m = email.mime.text.MIMEText('A test message')
+ m['From'] = 'foo@bar.com'
+ m['To'] = 'John'
+ m['CC'] = 'Sally, Fred'
+ m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
+ smtp.send_message(m)
+ # XXX (see comment in testSend)
+ time.sleep(0.01)
+ smtp.quit()
+
+ self.client_evt.set()
+ self.serv_evt.wait()
+ self.output.flush()
+ # Add the X-Peer header that DebuggingServer adds
+ # XXX: I'm not sure hardcoding this IP will work on linux-vserver.
+ m['X-Peer'] = '127.0.0.1'
+ # The Bcc header is deleted before serialization.
+ del m['Bcc']
+ mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
+ self.assertEqual(self.output.getvalue(), mexpect)
+ debugout = smtpd.DEBUGSTREAM.getvalue()
+ sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
+ self.assertRegexpMatches(debugout, sender)
+ for addr in ('John', 'Sally', 'Fred', 'root@localhost',
+ 'warped@silly.walks.com'):
+ to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
+ re.MULTILINE)
+ self.assertRegexpMatches(debugout, to_addr)
+
+ def testSendMessageWithSomeAddresses(self):
+ # Make sure nothing breaks if not all of the three 'to' headers exist
+ m = email.mime.text.MIMEText('A test message')
+ m['From'] = 'foo@bar.com'
+ m['To'] = 'John, Dinsdale'
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
+ smtp.send_message(m)
+ # XXX (see comment in testSend)
+ time.sleep(0.01)
+ smtp.quit()
+
+ self.client_evt.set()
+ self.serv_evt.wait()
+ self.output.flush()
+ # Add the X-Peer header that DebuggingServer adds
+ # XXX: I'm not sure hardcoding this IP will work on linux-vserver.
+ m['X-Peer'] = '127.0.0.1'
+ mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
+ self.assertEqual(self.output.getvalue(), mexpect)
+ debugout = smtpd.DEBUGSTREAM.getvalue()
+ sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
+ self.assertRegexpMatches(debugout, sender)
+ for addr in ('John', 'Dinsdale'):
+ to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
+ re.MULTILINE)
+ self.assertRegexpMatches(debugout, to_addr)
+
class NonConnectingTests(unittest.TestCase):