diff options
Diffstat (limited to 'Lib/test/test_smtplib.py')
| -rw-r--r-- | Lib/test/test_smtplib.py | 211 |
1 files changed, 171 insertions, 40 deletions
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py index 727ef83..884529f 100644 --- a/Lib/test/test_smtplib.py +++ b/Lib/test/test_smtplib.py @@ -1,16 +1,22 @@ import asyncore +import email.mime.text import email.utils import socket -import threading import smtpd import smtplib import io +import re import sys import time import select -from unittest import TestCase -from test import support +import unittest +from test import support, mock_socket + +try: + import threading +except ImportError: + threading = None HOST = support.HOST @@ -44,48 +50,55 @@ def server(evt, buf, serv): serv.close() evt.set() -class GeneralTests(TestCase): +class GeneralTests(unittest.TestCase): def setUp(self): - self.evt = threading.Event() - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.settimeout(15) - self.port = support.bind_port(self.sock) - servargs = (self.evt, b"220 Hola mundo\n", self.sock) - threading.Thread(target=server, args=servargs).start() - self.evt.wait() - self.evt.clear() + smtplib.socket = mock_socket + self.port = 25 def tearDown(self): - self.evt.wait() + smtplib.socket = socket + + # This method is no longer used but is retained for backward compatibility, + # so test to make sure it still works. + def testQuoteData(self): + teststr = "abc\n.jkl\rfoo\r\n..blue" + expected = "abc\r\n..jkl\r\nfoo\r\n...blue" + self.assertEqual(expected, smtplib.quotedata(teststr)) def testBasic1(self): + mock_socket.reply_with(b"220 Hola mundo") # connects smtp = smtplib.SMTP(HOST, self.port) smtp.close() def testBasic2(self): + mock_socket.reply_with(b"220 Hola mundo") # connects, include port in host name smtp = smtplib.SMTP("%s:%s" % (HOST, self.port)) smtp.close() def testLocalHostName(self): + mock_socket.reply_with(b"220 Hola mundo") # check that supplied local_hostname is used smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost") self.assertEqual(smtp.local_hostname, "testhost") smtp.close() def testTimeoutDefault(self): - self.assertTrue(socket.getdefaulttimeout() is None) - socket.setdefaulttimeout(30) + mock_socket.reply_with(b"220 Hola mundo") + self.assertTrue(mock_socket.getdefaulttimeout() is None) + mock_socket.setdefaulttimeout(30) + self.assertEqual(mock_socket.getdefaulttimeout(), 30) try: smtp = smtplib.SMTP(HOST, self.port) finally: - socket.setdefaulttimeout(None) + mock_socket.setdefaulttimeout(None) self.assertEqual(smtp.sock.gettimeout(), 30) smtp.close() def testTimeoutNone(self): + mock_socket.reply_with(b"220 Hola mundo") self.assertTrue(socket.getdefaulttimeout() is None) socket.setdefaulttimeout(30) try: @@ -96,6 +109,7 @@ class GeneralTests(TestCase): smtp.close() def testTimeoutValue(self): + mock_socket.reply_with(b"220 Hola mundo") smtp = smtplib.SMTP(HOST, self.port, timeout=30) self.assertEqual(smtp.sock.gettimeout(), 30) smtp.close() @@ -142,34 +156,50 @@ MSG_END = '------------ END MESSAGE ------------\n' # test server times out, causing the test to fail. # Test behavior of smtpd.DebuggingServer -class DebuggingServerTests(TestCase): +@unittest.skipUnless(threading, 'Threading required for this test.') +class DebuggingServerTests(unittest.TestCase): + + maxDiff = None def setUp(self): + self.real_getfqdn = socket.getfqdn + socket.getfqdn = mock_socket.getfqdn # temporarily replace sys.stdout to capture DebuggingServer output self.old_stdout = sys.stdout self.output = io.StringIO() sys.stdout = self.output + self._threads = support.threading_setup() self.serv_evt = threading.Event() self.client_evt = threading.Event() + # Capture SMTPChannel debug output + self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM + smtpd.DEBUGSTREAM = io.StringIO() # Pick a random unused port by passing 0 for the port number self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1)) # Keep a note of what port was assigned self.port = self.serv.socket.getsockname()[1] serv_args = (self.serv, self.serv_evt, self.client_evt) - threading.Thread(target=debugging_server, args=serv_args).start() + self.thread = threading.Thread(target=debugging_server, args=serv_args) + self.thread.start() # wait until server thread has assigned a port number self.serv_evt.wait() self.serv_evt.clear() def tearDown(self): + socket.getfqdn = self.real_getfqdn # indicate that the client is finished self.client_evt.set() # wait for the server thread to terminate self.serv_evt.wait() + self.thread.join() + support.threading_cleanup(*self._threads) # restore sys.stdout sys.stdout = self.old_stdout + # restore DEBUGSTREAM + smtpd.DEBUGSTREAM.close() + smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM def testBasic(self): # connect @@ -234,8 +264,100 @@ class DebuggingServerTests(TestCase): mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) self.assertEqual(self.output.getvalue(), mexpect) + def testSendBinary(self): + m = b'A test message' + smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) + smtp.sendmail('John', 'Sally', m) + # XXX (see comment in testSend) + time.sleep(0.01) + smtp.quit() + + self.client_evt.set() + self.serv_evt.wait() + self.output.flush() + mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) + self.assertEqual(self.output.getvalue(), mexpect) + + def testSendMessage(self): + m = email.mime.text.MIMEText('A test message') + smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) + smtp.send_message(m, from_addr='John', to_addrs='Sally') + # XXX (see comment in testSend) + time.sleep(0.01) + smtp.quit() + + self.client_evt.set() + self.serv_evt.wait() + self.output.flush() + # Add the X-Peer header that DebuggingServer adds + m['X-Peer'] = socket.gethostbyname('localhost') + mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) + self.assertEqual(self.output.getvalue(), mexpect) + + def testSendMessageWithAddresses(self): + m = email.mime.text.MIMEText('A test message') + m['From'] = 'foo@bar.com' + m['To'] = 'John' + m['CC'] = 'Sally, Fred' + m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' + smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) + smtp.send_message(m) + # XXX (see comment in testSend) + time.sleep(0.01) + smtp.quit() + + self.client_evt.set() + self.serv_evt.wait() + self.output.flush() + # Add the X-Peer header that DebuggingServer adds + m['X-Peer'] = socket.gethostbyname('localhost') + # The Bcc header is deleted before serialization. + del m['Bcc'] + mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) + self.assertEqual(self.output.getvalue(), mexpect) + debugout = smtpd.DEBUGSTREAM.getvalue() + sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) + self.assertRegex(debugout, sender) + for addr in ('John', 'Sally', 'Fred', 'root@localhost', + 'warped@silly.walks.com'): + to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), + re.MULTILINE) + self.assertRegex(debugout, to_addr) + + def testSendMessageWithSomeAddresses(self): + # Make sure nothing breaks if not all of the three 'to' headers exist + m = email.mime.text.MIMEText('A test message') + m['From'] = 'foo@bar.com' + m['To'] = 'John, Dinsdale' + smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3) + smtp.send_message(m) + # XXX (see comment in testSend) + time.sleep(0.01) + smtp.quit() + + self.client_evt.set() + self.serv_evt.wait() + self.output.flush() + # Add the X-Peer header that DebuggingServer adds + m['X-Peer'] = socket.gethostbyname('localhost') + mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) + self.assertEqual(self.output.getvalue(), mexpect) + debugout = smtpd.DEBUGSTREAM.getvalue() + sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) + self.assertRegex(debugout, sender) + for addr in ('John', 'Dinsdale'): + to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), + re.MULTILINE) + self.assertRegex(debugout, to_addr) + + +class NonConnectingTests(unittest.TestCase): -class NonConnectingTests(TestCase): + def setUp(self): + smtplib.socket = mock_socket + + def tearDown(self): + smtplib.socket = socket def testNotConnected(self): # Test various operations on an unconnected SMTP object that @@ -249,31 +371,26 @@ class NonConnectingTests(TestCase): def testNonnumericPort(self): # check that non-numeric port raises socket.error - self.assertRaises(socket.error, smtplib.SMTP, + self.assertRaises(mock_socket.error, smtplib.SMTP, "localhost", "bogus") - self.assertRaises(socket.error, smtplib.SMTP, + self.assertRaises(mock_socket.error, smtplib.SMTP, "localhost:bogus") # test response of client to a non-successful HELO message -class BadHELOServerTests(TestCase): +@unittest.skipUnless(threading, 'Threading required for this test.') +class BadHELOServerTests(unittest.TestCase): def setUp(self): + smtplib.socket = mock_socket + mock_socket.reply_with(b"199 no hello for you!") self.old_stdout = sys.stdout self.output = io.StringIO() sys.stdout = self.output - - self.evt = threading.Event() - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.settimeout(15) - self.port = support.bind_port(self.sock) - servargs = (self.evt, b"199 no hello for you!\n", self.sock) - threading.Thread(target=server, args=servargs).start() - self.evt.wait() - self.evt.clear() + self.port = 25 def tearDown(self): - self.evt.wait() + smtplib.socket = socket sys.stdout = self.old_stdout def testFailingHELO(self): @@ -355,6 +472,9 @@ class SimSMTPChannel(smtpd.SMTPChannel): else: self.push('550 No access for you!') + def handle_error(self): + raise + class SimSMTPServer(smtpd.SMTPServer): @@ -362,8 +482,7 @@ class SimSMTPServer(smtpd.SMTPServer): self._extra_features = [] smtpd.SMTPServer.__init__(self, *args, **kw) - def handle_accept(self): - conn, addr = self.accept() + def handle_accepted(self, conn, addr): self._SMTPchannel = SimSMTPChannel(self._extra_features, self, conn, addr) @@ -373,12 +492,19 @@ class SimSMTPServer(smtpd.SMTPServer): def add_feature(self, feature): self._extra_features.append(feature) + def handle_error(self): + raise + # Test various SMTP & ESMTP commands/behaviors that require a simulated server # (i.e., something with more features than DebuggingServer) -class SMTPSimTests(TestCase): +@unittest.skipUnless(threading, 'Threading required for this test.') +class SMTPSimTests(unittest.TestCase): def setUp(self): + self.real_getfqdn = socket.getfqdn + socket.getfqdn = mock_socket.getfqdn + self._threads = support.threading_setup() self.serv_evt = threading.Event() self.client_evt = threading.Event() # Pick a random unused port by passing 0 for the port number @@ -386,17 +512,21 @@ class SMTPSimTests(TestCase): # Keep a note of what port was assigned self.port = self.serv.socket.getsockname()[1] serv_args = (self.serv, self.serv_evt, self.client_evt) - threading.Thread(target=debugging_server, args=serv_args).start() + self.thread = threading.Thread(target=debugging_server, args=serv_args) + self.thread.start() # wait until server thread has assigned a port number self.serv_evt.wait() self.serv_evt.clear() def tearDown(self): + socket.getfqdn = self.real_getfqdn # indicate that the client is finished self.client_evt.set() # wait for the server thread to terminate self.serv_evt.wait() + self.thread.join() + support.threading_cleanup(*self._threads) def testBasic(self): # smoke test @@ -460,6 +590,7 @@ class SMTPSimTests(TestCase): expected_auth_ok = (235, b'plain auth ok') self.assertEqual(smtp.login(sim_auth[0], sim_auth[1]), expected_auth_ok) + smtp.close() # SimSMTPChannel doesn't fully support LOGIN or CRAM-MD5 auth because they # require a synchronous read to obtain the credentials...so instead smtpd @@ -474,8 +605,8 @@ class SMTPSimTests(TestCase): smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) try: smtp.login(sim_auth[0], sim_auth[1]) except smtplib.SMTPAuthenticationError as err: - if sim_auth_login_password not in str(err): - raise "expected encoded password not found in error message" + self.assertIn(sim_auth_login_password, str(err)) + smtp.close() def testAUTH_CRAM_MD5(self): self.serv.add_feature("AUTH CRAM-MD5") @@ -483,8 +614,8 @@ class SMTPSimTests(TestCase): try: smtp.login(sim_auth[0], sim_auth[1]) except smtplib.SMTPAuthenticationError as err: - if sim_auth_credentials['cram-md5'] not in str(err): - raise "expected encoded credentials not found in error message" + self.assertIn(sim_auth_credentials['cram-md5'], str(err)) + smtp.close() #TODO: add tests for correct AUTH method fallback now that the #test infrastructure can support it. |
