diff options
author | Victor Stinner <vstinner@python.org> | 2021-12-07 11:31:04 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-07 11:31:04 (GMT) |
commit | cf7eaa4617295747ee5646c4e2b7e7a16d7c64ab (patch) | |
tree | b09b7bcc7b2cdfcae0b8291f6ec867dbb8337246 /Lib | |
parent | 2bf551757e0a7e3cc6ce2ebed2178b82438ac6b5 (diff) | |
download | cpython-cf7eaa4617295747ee5646c4e2b7e7a16d7c64ab.zip cpython-cf7eaa4617295747ee5646c4e2b7e7a16d7c64ab.tar.gz cpython-cf7eaa4617295747ee5646c4e2b7e7a16d7c64ab.tar.bz2 |
Revert "bpo-28533: Remove asyncore, asynchat, smtpd modules (GH-29521)" (GH-29951)
This reverts commit 9bf2cbc4c498812e14f20d86acb61c53928a5a57.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/asynchat.py (renamed from Lib/test/support/_asynchat.py) | 10 | ||||
-rw-r--r-- | Lib/asyncore.py (renamed from Lib/test/support/_asyncore.py) | 6 | ||||
-rwxr-xr-x | Lib/smtpd.py (renamed from Lib/test/support/_smtpd.py) | 137 | ||||
-rw-r--r-- | Lib/test/libregrtest/save_env.py | 12 | ||||
-rw-r--r-- | Lib/test/mock_socket.py | 4 | ||||
-rw-r--r-- | Lib/test/test_asynchat.py | 292 | ||||
-rw-r--r-- | Lib/test/test_asyncore.py | 841 | ||||
-rw-r--r-- | Lib/test/test_ftplib.py | 10 | ||||
-rw-r--r-- | Lib/test/test_logging.py | 7 | ||||
-rw-r--r-- | Lib/test/test_os.py | 7 | ||||
-rw-r--r-- | Lib/test/test_poplib.py | 8 | ||||
-rw-r--r-- | Lib/test/test_smtpd.py | 1018 | ||||
-rw-r--r-- | Lib/test/test_smtplib.py | 8 | ||||
-rw-r--r-- | Lib/test/test_ssl.py | 5 |
14 files changed, 2346 insertions, 19 deletions
diff --git a/Lib/test/support/_asynchat.py b/Lib/asynchat.py index 941cc1d..de26ffa 100644 --- a/Lib/test/support/_asynchat.py +++ b/Lib/asynchat.py @@ -45,9 +45,17 @@ command will be accumulated (using your own 'collect_incoming_data' method) up to the terminator, and then control will be returned to you - by calling your self.found_terminator() method. """ -from test.support import _asyncore as asyncore +import asyncore from collections import deque +from warnings import warn +warn( + 'The asynchat module is deprecated. ' + 'The recommended replacement is asyncio', + DeprecationWarning, + stacklevel=2) + + class async_chat(asyncore.dispatcher): """This is an abstract class. You must derive from this class, and add diff --git a/Lib/test/support/_asyncore.py b/Lib/asyncore.py index 7863efa..b1eea4b 100644 --- a/Lib/test/support/_asyncore.py +++ b/Lib/asyncore.py @@ -57,6 +57,12 @@ from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \ ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \ errorcode +warnings.warn( + 'The asyncore module is deprecated. ' + 'The recommended replacement is asyncio', + DeprecationWarning, + stacklevel=2) + _DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, EBADF}) diff --git a/Lib/test/support/_smtpd.py b/Lib/smtpd.py index 0e37d08..1cd004f 100755 --- a/Lib/test/support/_smtpd.py +++ b/Lib/smtpd.py @@ -80,13 +80,22 @@ import collections from warnings import warn from email._header_value_parser import get_addr_spec, get_angle_addr -from test.support import _asyncore as asyncore -from test.support import _asynchat as asynchat - __all__ = [ "SMTPChannel", "SMTPServer", "DebuggingServer", "PureProxy", ] +warn( + 'The smtpd module is deprecated and unmaintained. Please see aiosmtpd ' + '(https://aiosmtpd.readthedocs.io/) for the recommended replacement.', + DeprecationWarning, + stacklevel=2) + + +# These are imported after the above warning so that users get the correct +# deprecation warning. +import asyncore +import asynchat + program = sys.argv[0] __version__ = 'Python SMTP proxy version 0.3' @@ -179,6 +188,128 @@ class SMTPChannel(asynchat.async_chat): self.received_lines = [] + # properties for backwards-compatibility + @property + def __server(self): + warn("Access to __server attribute on SMTPChannel is deprecated, " + "use 'smtp_server' instead", DeprecationWarning, 2) + return self.smtp_server + @__server.setter + def __server(self, value): + warn("Setting __server attribute on SMTPChannel is deprecated, " + "set 'smtp_server' instead", DeprecationWarning, 2) + self.smtp_server = value + + @property + def __line(self): + warn("Access to __line attribute on SMTPChannel is deprecated, " + "use 'received_lines' instead", DeprecationWarning, 2) + return self.received_lines + @__line.setter + def __line(self, value): + warn("Setting __line attribute on SMTPChannel is deprecated, " + "set 'received_lines' instead", DeprecationWarning, 2) + self.received_lines = value + + @property + def __state(self): + warn("Access to __state attribute on SMTPChannel is deprecated, " + "use 'smtp_state' instead", DeprecationWarning, 2) + return self.smtp_state + @__state.setter + def __state(self, value): + warn("Setting __state attribute on SMTPChannel is deprecated, " + "set 'smtp_state' instead", DeprecationWarning, 2) + self.smtp_state = value + + @property + def __greeting(self): + warn("Access to __greeting attribute on SMTPChannel is deprecated, " + "use 'seen_greeting' instead", DeprecationWarning, 2) + return self.seen_greeting + @__greeting.setter + def __greeting(self, value): + warn("Setting __greeting attribute on SMTPChannel is deprecated, " + "set 'seen_greeting' instead", DeprecationWarning, 2) + self.seen_greeting = value + + @property + def __mailfrom(self): + warn("Access to __mailfrom attribute on SMTPChannel is deprecated, " + "use 'mailfrom' instead", DeprecationWarning, 2) + return self.mailfrom + @__mailfrom.setter + def __mailfrom(self, value): + warn("Setting __mailfrom attribute on SMTPChannel is deprecated, " + "set 'mailfrom' instead", DeprecationWarning, 2) + self.mailfrom = value + + @property + def __rcpttos(self): + warn("Access to __rcpttos attribute on SMTPChannel is deprecated, " + "use 'rcpttos' instead", DeprecationWarning, 2) + return self.rcpttos + @__rcpttos.setter + def __rcpttos(self, value): + warn("Setting __rcpttos attribute on SMTPChannel is deprecated, " + "set 'rcpttos' instead", DeprecationWarning, 2) + self.rcpttos = value + + @property + def __data(self): + warn("Access to __data attribute on SMTPChannel is deprecated, " + "use 'received_data' instead", DeprecationWarning, 2) + return self.received_data + @__data.setter + def __data(self, value): + warn("Setting __data attribute on SMTPChannel is deprecated, " + "set 'received_data' instead", DeprecationWarning, 2) + self.received_data = value + + @property + def __fqdn(self): + warn("Access to __fqdn attribute on SMTPChannel is deprecated, " + "use 'fqdn' instead", DeprecationWarning, 2) + return self.fqdn + @__fqdn.setter + def __fqdn(self, value): + warn("Setting __fqdn attribute on SMTPChannel is deprecated, " + "set 'fqdn' instead", DeprecationWarning, 2) + self.fqdn = value + + @property + def __peer(self): + warn("Access to __peer attribute on SMTPChannel is deprecated, " + "use 'peer' instead", DeprecationWarning, 2) + return self.peer + @__peer.setter + def __peer(self, value): + warn("Setting __peer attribute on SMTPChannel is deprecated, " + "set 'peer' instead", DeprecationWarning, 2) + self.peer = value + + @property + def __conn(self): + warn("Access to __conn attribute on SMTPChannel is deprecated, " + "use 'conn' instead", DeprecationWarning, 2) + return self.conn + @__conn.setter + def __conn(self, value): + warn("Setting __conn attribute on SMTPChannel is deprecated, " + "set 'conn' instead", DeprecationWarning, 2) + self.conn = value + + @property + def __addr(self): + warn("Access to __addr attribute on SMTPChannel is deprecated, " + "use 'addr' instead", DeprecationWarning, 2) + return self.addr + @__addr.setter + def __addr(self, value): + warn("Setting __addr attribute on SMTPChannel is deprecated, " + "set 'addr' instead", DeprecationWarning, 2) + self.addr = value + # Overrides base class for convenience. def push(self, msg): asynchat.async_chat.push(self, bytes( diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py index 17dda99..60c9be2 100644 --- a/Lib/test/libregrtest/save_env.py +++ b/Lib/test/libregrtest/save_env.py @@ -52,7 +52,7 @@ class saved_test_environment: resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr', 'os.environ', 'sys.path', 'sys.path_hooks', '__import__', - 'warnings.filters', + 'warnings.filters', 'asyncore.socket_map', 'logging._handlers', 'logging._handlerList', 'sys.gettrace', 'sys.warnoptions', # multiprocessing.process._cleanup() may release ref @@ -160,6 +160,16 @@ class saved_test_environment: warnings.filters = saved_filters[1] warnings.filters[:] = saved_filters[2] + def get_asyncore_socket_map(self): + asyncore = sys.modules.get('asyncore') + # XXX Making a copy keeps objects alive until __exit__ gets called. + return asyncore and asyncore.socket_map.copy() or {} + def restore_asyncore_socket_map(self, saved_map): + asyncore = sys.modules.get('asyncore') + if asyncore is not None: + asyncore.close_all(ignore_all=True) + asyncore.socket_map.update(saved_map) + def get_shutil_archive_formats(self): shutil = self.try_get_module('shutil') # we could call get_archives_formats() but that only returns the diff --git a/Lib/test/mock_socket.py b/Lib/test/mock_socket.py index 9788d58..c7abddc 100644 --- a/Lib/test/mock_socket.py +++ b/Lib/test/mock_socket.py @@ -1,4 +1,4 @@ -"""Mock socket module used by the smtplib tests. +"""Mock socket module used by the smtpd and smtplib tests. """ # imported for _GLOBAL_DEFAULT_TIMEOUT @@ -33,7 +33,7 @@ class MockFile: class MockSocket: - """Mock socket object used by smtplib tests. + """Mock socket object used by smtpd and smtplib tests. """ def __init__(self, family=None): global _reply_data diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py new file mode 100644 index 0000000..973ac1f --- /dev/null +++ b/Lib/test/test_asynchat.py @@ -0,0 +1,292 @@ +# test asynchat + +from test import support +from test.support import socket_helper +from test.support import threading_helper + +import errno +import socket +import sys +import threading +import time +import unittest +import unittest.mock + +import warnings +with warnings.catch_warnings(): + warnings.simplefilter('ignore', DeprecationWarning) + import asynchat + import asyncore + +HOST = socket_helper.HOST +SERVER_QUIT = b'QUIT\n' + + +class echo_server(threading.Thread): + # parameter to determine the number of bytes passed back to the + # client each send + chunk_size = 1 + + def __init__(self, event): + threading.Thread.__init__(self) + self.event = event + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.port = socket_helper.bind_port(self.sock) + # This will be set if the client wants us to wait before echoing + # data back. + self.start_resend_event = None + + def run(self): + self.sock.listen() + self.event.set() + conn, client = self.sock.accept() + self.buffer = b"" + # collect data until quit message is seen + while SERVER_QUIT not in self.buffer: + data = conn.recv(1) + if not data: + break + self.buffer = self.buffer + data + + # remove the SERVER_QUIT message + self.buffer = self.buffer.replace(SERVER_QUIT, b'') + + if self.start_resend_event: + self.start_resend_event.wait() + + # re-send entire set of collected data + try: + # this may fail on some tests, such as test_close_when_done, + # since the client closes the channel when it's done sending + while self.buffer: + n = conn.send(self.buffer[:self.chunk_size]) + time.sleep(0.001) + self.buffer = self.buffer[n:] + except: + pass + + conn.close() + self.sock.close() + +class echo_client(asynchat.async_chat): + + def __init__(self, terminator, server_port): + asynchat.async_chat.__init__(self) + self.contents = [] + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect((HOST, server_port)) + self.set_terminator(terminator) + self.buffer = b"" + + def handle_connect(self): + pass + + if sys.platform == 'darwin': + # select.poll returns a select.POLLHUP at the end of the tests + # on darwin, so just ignore it + def handle_expt(self): + pass + + def collect_incoming_data(self, data): + self.buffer += data + + def found_terminator(self): + self.contents.append(self.buffer) + self.buffer = b"" + +def start_echo_server(): + event = threading.Event() + s = echo_server(event) + s.start() + event.wait() + event.clear() + time.sleep(0.01) # Give server time to start accepting. + return s, event + + +class TestAsynchat(unittest.TestCase): + usepoll = False + + def setUp(self): + self._threads = threading_helper.threading_setup() + + def tearDown(self): + threading_helper.threading_cleanup(*self._threads) + + def line_terminator_check(self, term, server_chunk): + event = threading.Event() + s = echo_server(event) + s.chunk_size = server_chunk + s.start() + event.wait() + event.clear() + time.sleep(0.01) # Give server time to start accepting. + c = echo_client(term, s.port) + c.push(b"hello ") + c.push(b"world" + term) + c.push(b"I'm not dead yet!" + term) + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + threading_helper.join_thread(s) + + self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) + + # the line terminator tests below check receiving variously-sized + # chunks back from the server in order to exercise all branches of + # async_chat.handle_read + + def test_line_terminator1(self): + # test one-character terminator + for l in (1, 2, 3): + self.line_terminator_check(b'\n', l) + + def test_line_terminator2(self): + # test two-character terminator + for l in (1, 2, 3): + self.line_terminator_check(b'\r\n', l) + + def test_line_terminator3(self): + # test three-character terminator + for l in (1, 2, 3): + self.line_terminator_check(b'qqq', l) + + def numeric_terminator_check(self, termlen): + # Try reading a fixed number of bytes + s, event = start_echo_server() + c = echo_client(termlen, s.port) + data = b"hello world, I'm not dead yet!\n" + c.push(data) + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + threading_helper.join_thread(s) + + self.assertEqual(c.contents, [data[:termlen]]) + + def test_numeric_terminator1(self): + # check that ints & longs both work (since type is + # explicitly checked in async_chat.handle_read) + self.numeric_terminator_check(1) + + def test_numeric_terminator2(self): + self.numeric_terminator_check(6) + + def test_none_terminator(self): + # Try reading a fixed number of bytes + s, event = start_echo_server() + c = echo_client(None, s.port) + data = b"hello world, I'm not dead yet!\n" + c.push(data) + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + threading_helper.join_thread(s) + + self.assertEqual(c.contents, []) + self.assertEqual(c.buffer, data) + + def test_simple_producer(self): + s, event = start_echo_server() + c = echo_client(b'\n', s.port) + data = b"hello world\nI'm not dead yet!\n" + p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8) + c.push_with_producer(p) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + threading_helper.join_thread(s) + + self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) + + def test_string_producer(self): + s, event = start_echo_server() + c = echo_client(b'\n', s.port) + data = b"hello world\nI'm not dead yet!\n" + c.push_with_producer(data+SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + threading_helper.join_thread(s) + + self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) + + def test_empty_line(self): + # checks that empty lines are handled correctly + s, event = start_echo_server() + c = echo_client(b'\n', s.port) + c.push(b"hello world\n\nI'm not dead yet!\n") + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + threading_helper.join_thread(s) + + self.assertEqual(c.contents, + [b"hello world", b"", b"I'm not dead yet!"]) + + def test_close_when_done(self): + s, event = start_echo_server() + s.start_resend_event = threading.Event() + c = echo_client(b'\n', s.port) + c.push(b"hello world\nI'm not dead yet!\n") + c.push(SERVER_QUIT) + c.close_when_done() + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + + # Only allow the server to start echoing data back to the client after + # the client has closed its connection. This prevents a race condition + # where the server echoes all of its data before we can check that it + # got any down below. + s.start_resend_event.set() + threading_helper.join_thread(s) + + self.assertEqual(c.contents, []) + # the server might have been able to send a byte or two back, but this + # at least checks that it received something and didn't just fail + # (which could still result in the client not having received anything) + self.assertGreater(len(s.buffer), 0) + + def test_push(self): + # Issue #12523: push() should raise a TypeError if it doesn't get + # a bytes string + s, event = start_echo_server() + c = echo_client(b'\n', s.port) + data = b'bytes\n' + c.push(data) + c.push(bytearray(data)) + c.push(memoryview(data)) + self.assertRaises(TypeError, c.push, 10) + self.assertRaises(TypeError, c.push, 'unicode') + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + threading_helper.join_thread(s) + self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes']) + + +class TestAsynchat_WithPoll(TestAsynchat): + usepoll = True + + +class TestAsynchatMocked(unittest.TestCase): + def test_blockingioerror(self): + # Issue #16133: handle_read() must ignore BlockingIOError + sock = unittest.mock.Mock() + sock.recv.side_effect = BlockingIOError(errno.EAGAIN) + + dispatcher = asynchat.async_chat() + dispatcher.set_socket(sock) + self.addCleanup(dispatcher.del_channel) + + with unittest.mock.patch.object(dispatcher, 'handle_error') as error: + dispatcher.handle_read() + self.assertFalse(error.called) + + +class TestHelperFunctions(unittest.TestCase): + def test_find_prefix_at_end(self): + self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1) + self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0) + + +class TestNotConnected(unittest.TestCase): + def test_disallow_negative_terminator(self): + # Issue #11259 + client = asynchat.async_chat() + self.assertRaises(ValueError, client.set_terminator, -1) + + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py new file mode 100644 index 0000000..ecd1e12 --- /dev/null +++ b/Lib/test/test_asyncore.py @@ -0,0 +1,841 @@ +import unittest +import select +import os +import socket +import sys +import time +import errno +import struct +import threading + +from test import support +from test.support import os_helper +from test.support import socket_helper +from test.support import threading_helper +from test.support import warnings_helper +from io import BytesIO + +if support.PGO: + raise unittest.SkipTest("test is not helpful for PGO") + +import warnings +with warnings.catch_warnings(): + warnings.simplefilter('ignore', DeprecationWarning) + import asyncore + + +HAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX') + +class dummysocket: + def __init__(self): + self.closed = False + + def close(self): + self.closed = True + + def fileno(self): + return 42 + +class dummychannel: + def __init__(self): + self.socket = dummysocket() + + def close(self): + self.socket.close() + +class exitingdummy: + def __init__(self): + pass + + def handle_read_event(self): + raise asyncore.ExitNow() + + handle_write_event = handle_read_event + handle_close = handle_read_event + handle_expt_event = handle_read_event + +class crashingdummy: + def __init__(self): + self.error_handled = False + + def handle_read_event(self): + raise Exception() + + handle_write_event = handle_read_event + handle_close = handle_read_event + handle_expt_event = handle_read_event + + def handle_error(self): + self.error_handled = True + +# used when testing senders; just collects what it gets until newline is sent +def capture_server(evt, buf, serv): + try: + serv.listen() + conn, addr = serv.accept() + except TimeoutError: + pass + else: + n = 200 + start = time.monotonic() + while n > 0 and time.monotonic() - start < 3.0: + r, w, e = select.select([conn], [], [], 0.1) + if r: + n -= 1 + data = conn.recv(10) + # keep everything except for the newline terminator + buf.write(data.replace(b'\n', b'')) + if b'\n' in data: + break + time.sleep(0.01) + + conn.close() + finally: + serv.close() + evt.set() + +def bind_af_aware(sock, addr): + """Helper function to bind a socket according to its family.""" + if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: + # Make sure the path doesn't exist. + os_helper.unlink(addr) + socket_helper.bind_unix_socket(sock, addr) + else: + sock.bind(addr) + + +class HelperFunctionTests(unittest.TestCase): + def test_readwriteexc(self): + # Check exception handling behavior of read, write and _exception + + # check that ExitNow exceptions in the object handler method + # bubbles all the way up through asyncore read/write/_exception calls + tr1 = exitingdummy() + self.assertRaises(asyncore.ExitNow, asyncore.read, tr1) + self.assertRaises(asyncore.ExitNow, asyncore.write, tr1) + self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1) + + # check that an exception other than ExitNow in the object handler + # method causes the handle_error method to get called + tr2 = crashingdummy() + asyncore.read(tr2) + self.assertEqual(tr2.error_handled, True) + + tr2 = crashingdummy() + asyncore.write(tr2) + self.assertEqual(tr2.error_handled, True) + + tr2 = crashingdummy() + asyncore._exception(tr2) + self.assertEqual(tr2.error_handled, True) + + # asyncore.readwrite uses constants in the select module that + # are not present in Windows systems (see this thread: + # http://mail.python.org/pipermail/python-list/2001-October/109973.html) + # These constants should be present as long as poll is available + + @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') + def test_readwrite(self): + # Check that correct methods are called by readwrite() + + attributes = ('read', 'expt', 'write', 'closed', 'error_handled') + + expected = ( + (select.POLLIN, 'read'), + (select.POLLPRI, 'expt'), + (select.POLLOUT, 'write'), + (select.POLLERR, 'closed'), + (select.POLLHUP, 'closed'), + (select.POLLNVAL, 'closed'), + ) + + class testobj: + def __init__(self): + self.read = False + self.write = False + self.closed = False + self.expt = False + self.error_handled = False + + def handle_read_event(self): + self.read = True + + def handle_write_event(self): + self.write = True + + def handle_close(self): + self.closed = True + + def handle_expt_event(self): + self.expt = True + + def handle_error(self): + self.error_handled = True + + for flag, expectedattr in expected: + tobj = testobj() + self.assertEqual(getattr(tobj, expectedattr), False) + asyncore.readwrite(tobj, flag) + + # Only the attribute modified by the routine we expect to be + # called should be True. + for attr in attributes: + self.assertEqual(getattr(tobj, attr), attr==expectedattr) + + # check that ExitNow exceptions in the object handler method + # bubbles all the way up through asyncore readwrite call + tr1 = exitingdummy() + self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) + + # check that an exception other than ExitNow in the object handler + # method causes the handle_error method to get called + tr2 = crashingdummy() + self.assertEqual(tr2.error_handled, False) + asyncore.readwrite(tr2, flag) + self.assertEqual(tr2.error_handled, True) + + def test_closeall(self): + self.closeall_check(False) + + def test_closeall_default(self): + self.closeall_check(True) + + def closeall_check(self, usedefault): + # Check that close_all() closes everything in a given map + + l = [] + testmap = {} + for i in range(10): + c = dummychannel() + l.append(c) + self.assertEqual(c.socket.closed, False) + testmap[i] = c + + if usedefault: + socketmap = asyncore.socket_map + try: + asyncore.socket_map = testmap + asyncore.close_all() + finally: + testmap, asyncore.socket_map = asyncore.socket_map, socketmap + else: + asyncore.close_all(testmap) + + self.assertEqual(len(testmap), 0) + + for c in l: + self.assertEqual(c.socket.closed, True) + + def test_compact_traceback(self): + try: + raise Exception("I don't like spam!") + except: + real_t, real_v, real_tb = sys.exc_info() + r = asyncore.compact_traceback() + else: + self.fail("Expected exception") + + (f, function, line), t, v, info = r + self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py') + self.assertEqual(function, 'test_compact_traceback') + self.assertEqual(t, real_t) + self.assertEqual(v, real_v) + self.assertEqual(info, '[%s|%s|%s]' % (f, function, line)) + + +class DispatcherTests(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + asyncore.close_all() + + def test_basic(self): + d = asyncore.dispatcher() + self.assertEqual(d.readable(), True) + self.assertEqual(d.writable(), True) + + def test_repr(self): + d = asyncore.dispatcher() + self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d)) + + def test_log(self): + d = asyncore.dispatcher() + + # capture output of dispatcher.log() (to stderr) + l1 = "Lovely spam! Wonderful spam!" + l2 = "I don't like spam!" + with support.captured_stderr() as stderr: + d.log(l1) + d.log(l2) + + lines = stderr.getvalue().splitlines() + self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2]) + + def test_log_info(self): + d = asyncore.dispatcher() + + # capture output of dispatcher.log_info() (to stdout via print) + l1 = "Have you got anything without spam?" + l2 = "Why can't she have egg bacon spam and sausage?" + l3 = "THAT'S got spam in it!" + with support.captured_stdout() as stdout: + d.log_info(l1, 'EGGS') + d.log_info(l2) + d.log_info(l3, 'SPAM') + + lines = stdout.getvalue().splitlines() + expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3] + self.assertEqual(lines, expected) + + def test_unhandled(self): + d = asyncore.dispatcher() + d.ignore_log_types = () + + # capture output of dispatcher.log_info() (to stdout via print) + with support.captured_stdout() as stdout: + d.handle_expt() + d.handle_read() + d.handle_write() + d.handle_connect() + + lines = stdout.getvalue().splitlines() + expected = ['warning: unhandled incoming priority event', + 'warning: unhandled read event', + 'warning: unhandled write event', + 'warning: unhandled connect event'] + self.assertEqual(lines, expected) + + def test_strerror(self): + # refers to bug #8573 + err = asyncore._strerror(errno.EPERM) + if hasattr(os, 'strerror'): + self.assertEqual(err, os.strerror(errno.EPERM)) + err = asyncore._strerror(-1) + self.assertTrue(err != "") + + +class dispatcherwithsend_noread(asyncore.dispatcher_with_send): + def readable(self): + return False + + def handle_connect(self): + pass + + +class DispatcherWithSendTests(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + asyncore.close_all() + + @threading_helper.reap_threads + def test_send(self): + evt = threading.Event() + sock = socket.socket() + sock.settimeout(3) + port = socket_helper.bind_port(sock) + + cap = BytesIO() + args = (evt, cap, sock) + t = threading.Thread(target=capture_server, args=args) + t.start() + try: + # wait a little longer for the server to initialize (it sometimes + # refuses connections on slow machines without this wait) + time.sleep(0.2) + + data = b"Suppose there isn't a 16-ton weight?" + d = dispatcherwithsend_noread() + d.create_socket() + d.connect((socket_helper.HOST, port)) + + # give time for socket to connect + time.sleep(0.1) + + d.send(data) + d.send(data) + d.send(b'\n') + + n = 1000 + while d.out_buffer and n > 0: + asyncore.poll() + n -= 1 + + evt.wait() + + self.assertEqual(cap.getvalue(), data*2) + finally: + threading_helper.join_thread(t) + + +@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'), + 'asyncore.file_wrapper required') +class FileWrapperTest(unittest.TestCase): + def setUp(self): + self.d = b"It's not dead, it's sleeping!" + with open(os_helper.TESTFN, 'wb') as file: + file.write(self.d) + + def tearDown(self): + os_helper.unlink(os_helper.TESTFN) + + def test_recv(self): + fd = os.open(os_helper.TESTFN, os.O_RDONLY) + w = asyncore.file_wrapper(fd) + os.close(fd) + + self.assertNotEqual(w.fd, fd) + self.assertNotEqual(w.fileno(), fd) + self.assertEqual(w.recv(13), b"It's not dead") + self.assertEqual(w.read(6), b", it's") + w.close() + self.assertRaises(OSError, w.read, 1) + + def test_send(self): + d1 = b"Come again?" + d2 = b"I want to buy some cheese." + fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_APPEND) + w = asyncore.file_wrapper(fd) + os.close(fd) + + w.write(d1) + w.send(d2) + w.close() + with open(os_helper.TESTFN, 'rb') as file: + self.assertEqual(file.read(), self.d + d1 + d2) + + @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'), + 'asyncore.file_dispatcher required') + def test_dispatcher(self): + fd = os.open(os_helper.TESTFN, os.O_RDONLY) + data = [] + class FileDispatcher(asyncore.file_dispatcher): + def handle_read(self): + data.append(self.recv(29)) + s = FileDispatcher(fd) + os.close(fd) + asyncore.loop(timeout=0.01, use_poll=True, count=2) + self.assertEqual(b"".join(data), self.d) + + def test_resource_warning(self): + # Issue #11453 + fd = os.open(os_helper.TESTFN, os.O_RDONLY) + f = asyncore.file_wrapper(fd) + + os.close(fd) + with warnings_helper.check_warnings(('', ResourceWarning)): + f = None + support.gc_collect() + + def test_close_twice(self): + fd = os.open(os_helper.TESTFN, os.O_RDONLY) + f = asyncore.file_wrapper(fd) + os.close(fd) + + os.close(f.fd) # file_wrapper dupped fd + with self.assertRaises(OSError): + f.close() + + self.assertEqual(f.fd, -1) + # calling close twice should not fail + f.close() + + +class BaseTestHandler(asyncore.dispatcher): + + def __init__(self, sock=None): + asyncore.dispatcher.__init__(self, sock) + self.flag = False + + def handle_accept(self): + raise Exception("handle_accept not supposed to be called") + + def handle_accepted(self): + raise Exception("handle_accepted not supposed to be called") + + def handle_connect(self): + raise Exception("handle_connect not supposed to be called") + + def handle_expt(self): + raise Exception("handle_expt not supposed to be called") + + def handle_close(self): + raise Exception("handle_close not supposed to be called") + + def handle_error(self): + raise + + +class BaseServer(asyncore.dispatcher): + """A server which listens on an address and dispatches the + connection to a handler. + """ + + def __init__(self, family, addr, handler=BaseTestHandler): + asyncore.dispatcher.__init__(self) + self.create_socket(family) + self.set_reuse_addr() + bind_af_aware(self.socket, addr) + self.listen(5) + self.handler = handler + + @property + def address(self): + return self.socket.getsockname() + + def handle_accepted(self, sock, addr): + self.handler(sock) + + def handle_error(self): + raise + + +class BaseClient(BaseTestHandler): + + def __init__(self, family, address): + BaseTestHandler.__init__(self) + self.create_socket(family) + self.connect(address) + + def handle_connect(self): + pass + + +class BaseTestAPI: + + def tearDown(self): + asyncore.close_all(ignore_all=True) + + def loop_waiting_for_flag(self, instance, timeout=5): + timeout = float(timeout) / 100 + count = 100 + while asyncore.socket_map and count > 0: + asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll) + if instance.flag: + return + count -= 1 + time.sleep(timeout) + self.fail("flag not set") + + def test_handle_connect(self): + # make sure handle_connect is called on connect() + + class TestClient(BaseClient): + def handle_connect(self): + self.flag = True + + server = BaseServer(self.family, self.addr) + client = TestClient(self.family, server.address) + self.loop_waiting_for_flag(client) + + def test_handle_accept(self): + # make sure handle_accept() is called when a client connects + + class TestListener(BaseTestHandler): + + def __init__(self, family, addr): + BaseTestHandler.__init__(self) + self.create_socket(family) + bind_af_aware(self.socket, addr) + self.listen(5) + self.address = self.socket.getsockname() + + def handle_accept(self): + self.flag = True + + server = TestListener(self.family, self.addr) + client = BaseClient(self.family, server.address) + self.loop_waiting_for_flag(server) + + def test_handle_accepted(self): + # make sure handle_accepted() is called when a client connects + + class TestListener(BaseTestHandler): + + def __init__(self, family, addr): + BaseTestHandler.__init__(self) + self.create_socket(family) + bind_af_aware(self.socket, addr) + self.listen(5) + self.address = self.socket.getsockname() + + def handle_accept(self): + asyncore.dispatcher.handle_accept(self) + + def handle_accepted(self, sock, addr): + sock.close() + self.flag = True + + server = TestListener(self.family, self.addr) + client = BaseClient(self.family, server.address) + self.loop_waiting_for_flag(server) + + + def test_handle_read(self): + # make sure handle_read is called on data received + + class TestClient(BaseClient): + def handle_read(self): + self.flag = True + + class TestHandler(BaseTestHandler): + def __init__(self, conn): + BaseTestHandler.__init__(self, conn) + self.send(b'x' * 1024) + + server = BaseServer(self.family, self.addr, TestHandler) + client = TestClient(self.family, server.address) + self.loop_waiting_for_flag(client) + + def test_handle_write(self): + # make sure handle_write is called + + class TestClient(BaseClient): + def handle_write(self): + self.flag = True + + server = BaseServer(self.family, self.addr) + client = TestClient(self.family, server.address) + self.loop_waiting_for_flag(client) + + def test_handle_close(self): + # make sure handle_close is called when the other end closes + # the connection + + class TestClient(BaseClient): + + def handle_read(self): + # in order to make handle_close be called we are supposed + # to make at least one recv() call + self.recv(1024) + + def handle_close(self): + self.flag = True + self.close() + + class TestHandler(BaseTestHandler): + def __init__(self, conn): + BaseTestHandler.__init__(self, conn) + self.close() + + server = BaseServer(self.family, self.addr, TestHandler) + client = TestClient(self.family, server.address) + self.loop_waiting_for_flag(client) + + def test_handle_close_after_conn_broken(self): + # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and + # #11265). + + data = b'\0' * 128 + + class TestClient(BaseClient): + + def handle_write(self): + self.send(data) + + def handle_close(self): + self.flag = True + self.close() + + def handle_expt(self): + self.flag = True + self.close() + + class TestHandler(BaseTestHandler): + + def handle_read(self): + self.recv(len(data)) + self.close() + + def writable(self): + return False + + server = BaseServer(self.family, self.addr, TestHandler) + client = TestClient(self.family, server.address) + self.loop_waiting_for_flag(client) + + @unittest.skipIf(sys.platform.startswith("sunos"), + "OOB support is broken on Solaris") + def test_handle_expt(self): + # Make sure handle_expt is called on OOB data received. + # Note: this might fail on some platforms as OOB data is + # tenuously supported and rarely used. + if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: + self.skipTest("Not applicable to AF_UNIX sockets.") + + if sys.platform == "darwin" and self.use_poll: + self.skipTest("poll may fail on macOS; see issue #28087") + + class TestClient(BaseClient): + def handle_expt(self): + self.socket.recv(1024, socket.MSG_OOB) + self.flag = True + + class TestHandler(BaseTestHandler): + def __init__(self, conn): + BaseTestHandler.__init__(self, conn) + self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB) + + server = BaseServer(self.family, self.addr, TestHandler) + client = TestClient(self.family, server.address) + self.loop_waiting_for_flag(client) + + def test_handle_error(self): + + class TestClient(BaseClient): + def handle_write(self): + 1.0 / 0 + def handle_error(self): + self.flag = True + try: + raise + except ZeroDivisionError: + pass + else: + raise Exception("exception not raised") + + server = BaseServer(self.family, self.addr) + client = TestClient(self.family, server.address) + self.loop_waiting_for_flag(client) + + def test_connection_attributes(self): + server = BaseServer(self.family, self.addr) + client = BaseClient(self.family, server.address) + + # we start disconnected + self.assertFalse(server.connected) + self.assertTrue(server.accepting) + # this can't be taken for granted across all platforms + #self.assertFalse(client.connected) + self.assertFalse(client.accepting) + + # execute some loops so that client connects to server + asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100) + self.assertFalse(server.connected) + self.assertTrue(server.accepting) + self.assertTrue(client.connected) + self.assertFalse(client.accepting) + + # disconnect the client + client.close() + self.assertFalse(server.connected) + self.assertTrue(server.accepting) + self.assertFalse(client.connected) + self.assertFalse(client.accepting) + + # stop serving + server.close() + self.assertFalse(server.connected) + self.assertFalse(server.accepting) + + def test_create_socket(self): + s = asyncore.dispatcher() + s.create_socket(self.family) + self.assertEqual(s.socket.type, socket.SOCK_STREAM) + self.assertEqual(s.socket.family, self.family) + self.assertEqual(s.socket.gettimeout(), 0) + self.assertFalse(s.socket.get_inheritable()) + + def test_bind(self): + if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: + self.skipTest("Not applicable to AF_UNIX sockets.") + s1 = asyncore.dispatcher() + s1.create_socket(self.family) + s1.bind(self.addr) + s1.listen(5) + port = s1.socket.getsockname()[1] + + s2 = asyncore.dispatcher() + s2.create_socket(self.family) + # EADDRINUSE indicates the socket was correctly bound + self.assertRaises(OSError, s2.bind, (self.addr[0], port)) + + def test_set_reuse_addr(self): + if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: + self.skipTest("Not applicable to AF_UNIX sockets.") + + with socket.socket(self.family) as sock: + try: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + except OSError: + unittest.skip("SO_REUSEADDR not supported on this platform") + else: + # if SO_REUSEADDR succeeded for sock we expect asyncore + # to do the same + s = asyncore.dispatcher(socket.socket(self.family)) + self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR)) + s.socket.close() + s.create_socket(self.family) + s.set_reuse_addr() + self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR)) + + @threading_helper.reap_threads + def test_quick_connect(self): + # see: http://bugs.python.org/issue10340 + if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())): + self.skipTest("test specific to AF_INET and AF_INET6") + + server = BaseServer(self.family, self.addr) + # run the thread 500 ms: the socket should be connected in 200 ms + t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, + count=5)) + t.start() + try: + with socket.socket(self.family, socket.SOCK_STREAM) as s: + s.settimeout(.2) + s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, + struct.pack('ii', 1, 0)) + + try: + s.connect(server.address) + except OSError: + pass + finally: + threading_helper.join_thread(t) + +class TestAPI_UseIPv4Sockets(BaseTestAPI): + family = socket.AF_INET + addr = (socket_helper.HOST, 0) + +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required') +class TestAPI_UseIPv6Sockets(BaseTestAPI): + family = socket.AF_INET6 + addr = (socket_helper.HOSTv6, 0) + +@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required') +class TestAPI_UseUnixSockets(BaseTestAPI): + if HAS_UNIX_SOCKETS: + family = socket.AF_UNIX + addr = os_helper.TESTFN + + def tearDown(self): + os_helper.unlink(self.addr) + BaseTestAPI.tearDown(self) + +class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase): + use_poll = False + +@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') +class TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets, unittest.TestCase): + use_poll = True + +class TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets, unittest.TestCase): + use_poll = False + +@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') +class TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets, unittest.TestCase): + use_poll = True + +class TestAPI_UseUnixSocketsSelect(TestAPI_UseUnixSockets, unittest.TestCase): + use_poll = False + +@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') +class TestAPI_UseUnixSocketsPoll(TestAPI_UseUnixSockets, unittest.TestCase): + use_poll = True + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py index c9edfac..56e3d8a 100644 --- a/Lib/test/test_ftplib.py +++ b/Lib/test/test_ftplib.py @@ -18,13 +18,17 @@ except ImportError: from unittest import TestCase, skipUnless from test import support -from test.support import _asynchat as asynchat -from test.support import _asyncore as asyncore -from test.support import socket_helper from test.support import threading_helper +from test.support import socket_helper from test.support import warnings_helper from test.support.socket_helper import HOST, HOSTv6 +import warnings +with warnings.catch_warnings(): + warnings.simplefilter('ignore', DeprecationWarning) + import asyncore + import asynchat + TIMEOUT = support.LOOPBACK_TIMEOUT DEFAULT_ENCODING = 'utf-8' diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index e709ea3..85b6e5f 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -42,8 +42,6 @@ import sys import tempfile from test.support.script_helper import assert_python_ok, assert_python_failure from test import support -from test.support import _asyncore as asyncore -from test.support import _smtpd as smtpd from test.support import os_helper from test.support import socket_helper from test.support import threading_helper @@ -61,6 +59,11 @@ from urllib.parse import urlparse, parse_qs from socketserver import (ThreadingUDPServer, DatagramRequestHandler, ThreadingTCPServer, StreamRequestHandler) +with warnings.catch_warnings(): + warnings.simplefilter('ignore', DeprecationWarning) + import asyncore + import smtpd + try: import win32evtlog, win32evtlogutil, pywintypes except ImportError: diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 5e15340..8da0aa3 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -30,8 +30,6 @@ import unittest import uuid import warnings from test import support -from test.support import _asynchat as asynchat -from test.support import _asyncore as asyncore from test.support import import_helper from test.support import os_helper from test.support import socket_helper @@ -39,6 +37,11 @@ from test.support import threading_helper from test.support import warnings_helper from platform import win32_is_iot +with warnings.catch_warnings(): + warnings.simplefilter('ignore', DeprecationWarning) + import asynchat + import asyncore + try: import resource except ImportError: diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py index 23c6be3..44cf523 100644 --- a/Lib/test/test_poplib.py +++ b/Lib/test/test_poplib.py @@ -12,12 +12,16 @@ import threading import unittest from unittest import TestCase, skipUnless from test import support as test_support -from test.support import _asynchat as asynchat -from test.support import _asyncore as asyncore from test.support import hashlib_helper from test.support import socket_helper from test.support import threading_helper +import warnings +with warnings.catch_warnings(): + warnings.simplefilter('ignore', DeprecationWarning) + import asynchat + import asyncore + HOST = socket_helper.HOST PORT = 0 diff --git a/Lib/test/test_smtpd.py b/Lib/test/test_smtpd.py new file mode 100644 index 0000000..d2e150d --- /dev/null +++ b/Lib/test/test_smtpd.py @@ -0,0 +1,1018 @@ +import unittest +import textwrap +from test import support, mock_socket +from test.support import socket_helper +from test.support import warnings_helper +import socket +import io + +import warnings +with warnings.catch_warnings(): + warnings.simplefilter('ignore', DeprecationWarning) + import smtpd + import asyncore + + +class DummyServer(smtpd.SMTPServer): + def __init__(self, *args, **kwargs): + smtpd.SMTPServer.__init__(self, *args, **kwargs) + self.messages = [] + if self._decode_data: + self.return_status = 'return status' + else: + self.return_status = b'return status' + + def process_message(self, peer, mailfrom, rcpttos, data, **kw): + self.messages.append((peer, mailfrom, rcpttos, data)) + if data == self.return_status: + return '250 Okish' + if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']: + return '250 SMTPUTF8 message okish' + + +class DummyDispatcherBroken(Exception): + pass + + +class BrokenDummyServer(DummyServer): + def listen(self, num): + raise DummyDispatcherBroken() + + +class SMTPDServerTest(unittest.TestCase): + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + + def test_process_message_unimplemented(self): + server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0), + decode_data=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) + + def write_line(line): + channel.socket.queue_recv(line) + channel.handle_read() + + write_line(b'HELO example') + write_line(b'MAIL From:eggs@example') + write_line(b'RCPT To:spam@example') + write_line(b'DATA') + self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') + + def test_decode_data_and_enable_SMTPUTF8_raises(self): + self.assertRaises( + ValueError, + smtpd.SMTPServer, + (socket_helper.HOST, 0), + ('b', 0), + enable_SMTPUTF8=True, + decode_data=True) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + + +class DebuggingServerTest(unittest.TestCase): + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + + def send_data(self, channel, data, enable_SMTPUTF8=False): + def write_line(line): + channel.socket.queue_recv(line) + channel.handle_read() + write_line(b'EHLO example') + if enable_SMTPUTF8: + write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8') + else: + write_line(b'MAIL From:eggs@example') + write_line(b'RCPT To:spam@example') + write_line(b'DATA') + write_line(data) + write_line(b'.') + + def test_process_message_with_decode_data_true(self): + server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), + decode_data=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) + with support.captured_stdout() as s: + self.send_data(channel, b'From: test\n\nhello\n') + stdout = s.getvalue() + self.assertEqual(stdout, textwrap.dedent("""\ + ---------- MESSAGE FOLLOWS ---------- + From: test + X-Peer: peer-address + + hello + ------------ END MESSAGE ------------ + """)) + + def test_process_message_with_decode_data_false(self): + server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0)) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr) + with support.captured_stdout() as s: + self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') + stdout = s.getvalue() + self.assertEqual(stdout, textwrap.dedent("""\ + ---------- MESSAGE FOLLOWS ---------- + b'From: test' + b'X-Peer: peer-address' + b'' + b'h\\xc3\\xa9llo\\xff' + ------------ END MESSAGE ------------ + """)) + + def test_process_message_with_enable_SMTPUTF8_true(self): + server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), + enable_SMTPUTF8=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) + with support.captured_stdout() as s: + self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') + stdout = s.getvalue() + self.assertEqual(stdout, textwrap.dedent("""\ + ---------- MESSAGE FOLLOWS ---------- + b'From: test' + b'X-Peer: peer-address' + b'' + b'h\\xc3\\xa9llo\\xff' + ------------ END MESSAGE ------------ + """)) + + def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self): + server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), + enable_SMTPUTF8=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) + with support.captured_stdout() as s: + self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n', + enable_SMTPUTF8=True) + stdout = s.getvalue() + self.assertEqual(stdout, textwrap.dedent("""\ + ---------- MESSAGE FOLLOWS ---------- + mail options: ['BODY=8BITMIME', 'SMTPUTF8'] + b'From: test' + b'X-Peer: peer-address' + b'' + b'h\\xc3\\xa9llo\\xff' + ------------ END MESSAGE ------------ + """)) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + + +class TestFamilyDetection(unittest.TestCase): + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + + @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") + def test_socket_uses_IPv6(self): + server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0)) + self.assertEqual(server.socket.family, socket.AF_INET6) + + def test_socket_uses_IPv4(self): + server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0)) + self.assertEqual(server.socket.family, socket.AF_INET) + + +class TestRcptOptionParsing(unittest.TestCase): + error_response = (b'555 RCPT TO parameters not recognized or not ' + b'implemented\r\n') + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, channel, line): + channel.socket.queue_recv(line) + channel.handle_read() + + def test_params_rejected(self): + server = DummyServer((socket_helper.HOST, 0), ('b', 0)) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr) + self.write_line(channel, b'EHLO example') + self.write_line(channel, b'MAIL from: <foo@example.com> size=20') + self.write_line(channel, b'RCPT to: <foo@example.com> foo=bar') + self.assertEqual(channel.socket.last, self.error_response) + + def test_nothing_accepted(self): + server = DummyServer((socket_helper.HOST, 0), ('b', 0)) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr) + self.write_line(channel, b'EHLO example') + self.write_line(channel, b'MAIL from: <foo@example.com> size=20') + self.write_line(channel, b'RCPT to: <foo@example.com>') + self.assertEqual(channel.socket.last, b'250 OK\r\n') + + +class TestMailOptionParsing(unittest.TestCase): + error_response = (b'555 MAIL FROM parameters not recognized or not ' + b'implemented\r\n') + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, channel, line): + channel.socket.queue_recv(line) + channel.handle_read() + + def test_with_decode_data_true(self): + server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) + self.write_line(channel, b'EHLO example') + for line in [ + b'MAIL from: <foo@example.com> size=20 SMTPUTF8', + b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME', + b'MAIL from: <foo@example.com> size=20 BODY=UNKNOWN', + b'MAIL from: <foo@example.com> size=20 body=8bitmime', + ]: + self.write_line(channel, line) + self.assertEqual(channel.socket.last, self.error_response) + self.write_line(channel, b'MAIL from: <foo@example.com> size=20') + self.assertEqual(channel.socket.last, b'250 OK\r\n') + + def test_with_decode_data_false(self): + server = DummyServer((socket_helper.HOST, 0), ('b', 0)) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr) + self.write_line(channel, b'EHLO example') + for line in [ + b'MAIL from: <foo@example.com> size=20 SMTPUTF8', + b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME', + ]: + self.write_line(channel, line) + self.assertEqual(channel.socket.last, self.error_response) + self.write_line( + channel, + b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN') + self.assertEqual( + channel.socket.last, + b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n') + self.write_line( + channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime') + self.assertEqual(channel.socket.last, b'250 OK\r\n') + + def test_with_enable_smtputf8_true(self): + server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True) + conn, addr = server.accept() + channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) + self.write_line(channel, b'EHLO example') + self.write_line( + channel, + b'MAIL from: <foo@example.com> size=20 body=8bitmime smtputf8') + self.assertEqual(channel.socket.last, b'250 OK\r\n') + + +class SMTPDChannelTest(unittest.TestCase): + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), + decode_data=True) + conn, addr = self.server.accept() + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + decode_data=True) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, line): + self.channel.socket.queue_recv(line) + self.channel.handle_read() + + def test_broken_connect(self): + self.assertRaises( + DummyDispatcherBroken, BrokenDummyServer, + (socket_helper.HOST, 0), ('b', 0), decode_data=True) + + def test_decode_data_and_enable_SMTPUTF8_raises(self): + self.assertRaises( + ValueError, smtpd.SMTPChannel, + self.server, self.channel.conn, self.channel.addr, + enable_SMTPUTF8=True, decode_data=True) + + def test_server_accept(self): + self.server.handle_accept() + + def test_missing_data(self): + self.write_line(b'') + self.assertEqual(self.channel.socket.last, + b'500 Error: bad syntax\r\n') + + def test_EHLO(self): + self.write_line(b'EHLO example') + self.assertEqual(self.channel.socket.last, b'250 HELP\r\n') + + def test_EHLO_bad_syntax(self): + self.write_line(b'EHLO') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: EHLO hostname\r\n') + + def test_EHLO_duplicate(self): + self.write_line(b'EHLO example') + self.write_line(b'EHLO example') + self.assertEqual(self.channel.socket.last, + b'503 Duplicate HELO/EHLO\r\n') + + def test_EHLO_HELO_duplicate(self): + self.write_line(b'EHLO example') + self.write_line(b'HELO example') + self.assertEqual(self.channel.socket.last, + b'503 Duplicate HELO/EHLO\r\n') + + def test_HELO(self): + name = smtpd.socket.getfqdn() + self.write_line(b'HELO example') + self.assertEqual(self.channel.socket.last, + '250 {}\r\n'.format(name).encode('ascii')) + + def test_HELO_EHLO_duplicate(self): + self.write_line(b'HELO example') + self.write_line(b'EHLO example') + self.assertEqual(self.channel.socket.last, + b'503 Duplicate HELO/EHLO\r\n') + + def test_HELP(self): + self.write_line(b'HELP') + self.assertEqual(self.channel.socket.last, + b'250 Supported commands: EHLO HELO MAIL RCPT ' + \ + b'DATA RSET NOOP QUIT VRFY\r\n') + + def test_HELP_command(self): + self.write_line(b'HELP MAIL') + self.assertEqual(self.channel.socket.last, + b'250 Syntax: MAIL FROM: <address>\r\n') + + def test_HELP_command_unknown(self): + self.write_line(b'HELP SPAM') + self.assertEqual(self.channel.socket.last, + b'501 Supported commands: EHLO HELO MAIL RCPT ' + \ + b'DATA RSET NOOP QUIT VRFY\r\n') + + def test_HELO_bad_syntax(self): + self.write_line(b'HELO') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: HELO hostname\r\n') + + def test_HELO_duplicate(self): + self.write_line(b'HELO example') + self.write_line(b'HELO example') + self.assertEqual(self.channel.socket.last, + b'503 Duplicate HELO/EHLO\r\n') + + def test_HELO_parameter_rejected_when_extensions_not_enabled(self): + self.extended_smtp = False + self.write_line(b'HELO example') + self.write_line(b'MAIL from:<foo@example.com> SIZE=1234') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: MAIL FROM: <address>\r\n') + + def test_MAIL_allows_space_after_colon(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL from: <foo@example.com>') + self.assertEqual(self.channel.socket.last, + b'250 OK\r\n') + + def test_extended_MAIL_allows_space_after_colon(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from: <foo@example.com> size=20') + self.assertEqual(self.channel.socket.last, + b'250 OK\r\n') + + def test_NOOP(self): + self.write_line(b'NOOP') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_HELO_NOOP(self): + self.write_line(b'HELO example') + self.write_line(b'NOOP') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_NOOP_bad_syntax(self): + self.write_line(b'NOOP hi') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: NOOP\r\n') + + def test_QUIT(self): + self.write_line(b'QUIT') + self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') + + def test_HELO_QUIT(self): + self.write_line(b'HELO example') + self.write_line(b'QUIT') + self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') + + def test_QUIT_arg_ignored(self): + self.write_line(b'QUIT bye bye') + self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') + + def test_bad_state(self): + self.channel.smtp_state = 'BAD STATE' + self.write_line(b'HELO example') + self.assertEqual(self.channel.socket.last, + b'451 Internal confusion\r\n') + + def test_command_too_long(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL from: ' + + b'a' * self.channel.command_size_limit + + b'@example') + self.assertEqual(self.channel.socket.last, + b'500 Error: line too long\r\n') + + def test_MAIL_command_limit_extended_with_SIZE(self): + self.write_line(b'EHLO example') + fill_len = self.channel.command_size_limit - len('MAIL from:<@example>') + self.write_line(b'MAIL from:<' + + b'a' * fill_len + + b'@example> SIZE=1234') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + self.write_line(b'MAIL from:<' + + b'a' * (fill_len + 26) + + b'@example> SIZE=1234') + self.assertEqual(self.channel.socket.last, + b'500 Error: line too long\r\n') + + def test_MAIL_command_rejects_SMTPUTF8_by_default(self): + self.write_line(b'EHLO example') + self.write_line( + b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8') + self.assertEqual(self.channel.socket.last[0:1], b'5') + + def test_data_longer_than_default_data_size_limit(self): + # Hack the default so we don't have to generate so much data. + self.channel.data_size_limit = 1048 + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'A' * self.channel.data_size_limit + + b'A\r\n.') + self.assertEqual(self.channel.socket.last, + b'552 Error: Too much mail data\r\n') + + def test_MAIL_size_parameter(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL FROM:<eggs@example> SIZE=512') + self.assertEqual(self.channel.socket.last, + b'250 OK\r\n') + + def test_MAIL_invalid_size_parameter(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') + + def test_MAIL_RCPT_unknown_parameters(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL FROM:<eggs@example> ham=green') + self.assertEqual(self.channel.socket.last, + b'555 MAIL FROM parameters not recognized or not implemented\r\n') + + self.write_line(b'MAIL FROM:<eggs@example>') + self.write_line(b'RCPT TO:<eggs@example> ham=green') + self.assertEqual(self.channel.socket.last, + b'555 RCPT TO parameters not recognized or not implemented\r\n') + + def test_MAIL_size_parameter_larger_than_default_data_size_limit(self): + self.channel.data_size_limit = 1048 + self.write_line(b'EHLO example') + self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096') + self.assertEqual(self.channel.socket.last, + b'552 Error: message size exceeds fixed maximum message size\r\n') + + def test_need_MAIL(self): + self.write_line(b'HELO example') + self.write_line(b'RCPT to:spam@example') + self.assertEqual(self.channel.socket.last, + b'503 Error: need MAIL command\r\n') + + def test_MAIL_syntax_HELO(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL from eggs@example') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: MAIL FROM: <address>\r\n') + + def test_MAIL_syntax_EHLO(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from eggs@example') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') + + def test_MAIL_missing_address(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL from:') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: MAIL FROM: <address>\r\n') + + def test_MAIL_chevrons(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL from:<eggs@example>') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_MAIL_empty_chevrons(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from:<>') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_MAIL_quoted_localpart(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from: <"Fred Blogs"@example.com>') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') + + def test_MAIL_quoted_localpart_no_angles(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from: "Fred Blogs"@example.com') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') + + def test_MAIL_quoted_localpart_with_size(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') + + def test_MAIL_quoted_localpart_with_size_no_angles(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') + + def test_nested_MAIL(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL from:eggs@example') + self.write_line(b'MAIL from:spam@example') + self.assertEqual(self.channel.socket.last, + b'503 Error: nested MAIL command\r\n') + + def test_VRFY(self): + self.write_line(b'VRFY eggs@example') + self.assertEqual(self.channel.socket.last, + b'252 Cannot VRFY user, but will accept message and attempt ' + \ + b'delivery\r\n') + + def test_VRFY_syntax(self): + self.write_line(b'VRFY') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: VRFY <address>\r\n') + + def test_EXPN_not_implemented(self): + self.write_line(b'EXPN') + self.assertEqual(self.channel.socket.last, + b'502 EXPN not implemented\r\n') + + def test_no_HELO_MAIL(self): + self.write_line(b'MAIL from:<foo@example.com>') + self.assertEqual(self.channel.socket.last, + b'503 Error: send HELO first\r\n') + + def test_need_RCPT(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'DATA') + self.assertEqual(self.channel.socket.last, + b'503 Error: need RCPT command\r\n') + + def test_RCPT_syntax_HELO(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From: eggs@example') + self.write_line(b'RCPT to eggs@example') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: RCPT TO: <address>\r\n') + + def test_RCPT_syntax_EHLO(self): + self.write_line(b'EHLO example') + self.write_line(b'MAIL From: eggs@example') + self.write_line(b'RCPT to eggs@example') + self.assertEqual(self.channel.socket.last, + b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n') + + def test_RCPT_lowercase_to_OK(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From: eggs@example') + self.write_line(b'RCPT to: <eggs@example>') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_no_HELO_RCPT(self): + self.write_line(b'RCPT to eggs@example') + self.assertEqual(self.channel.socket.last, + b'503 Error: send HELO first\r\n') + + def test_data_dialog(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.write_line(b'RCPT To:spam@example') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + self.write_line(b'DATA') + self.assertEqual(self.channel.socket.last, + b'354 End data with <CR><LF>.<CR><LF>\r\n') + self.write_line(b'data\r\nmore\r\n.') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.assertEqual(self.server.messages, + [(('peer-address', 'peer-port'), + 'eggs@example', + ['spam@example'], + 'data\nmore')]) + + def test_DATA_syntax(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA spam') + self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n') + + def test_no_HELO_DATA(self): + self.write_line(b'DATA spam') + self.assertEqual(self.channel.socket.last, + b'503 Error: send HELO first\r\n') + + def test_data_transparency_section_4_5_2(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'..\r\n.\r\n') + self.assertEqual(self.channel.received_data, '.') + + def test_multiple_RCPT(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'RCPT To:ham@example') + self.write_line(b'DATA') + self.write_line(b'data\r\n.') + self.assertEqual(self.server.messages, + [(('peer-address', 'peer-port'), + 'eggs@example', + ['spam@example','ham@example'], + 'data')]) + + def test_manual_status(self): + # checks that the Channel is able to return a custom status message + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'return status\r\n.') + self.assertEqual(self.channel.socket.last, b'250 Okish\r\n') + + def test_RSET(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'RSET') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.write_line(b'MAIL From:foo@example') + self.write_line(b'RCPT To:eggs@example') + self.write_line(b'DATA') + self.write_line(b'data\r\n.') + self.assertEqual(self.server.messages, + [(('peer-address', 'peer-port'), + 'foo@example', + ['eggs@example'], + 'data')]) + + def test_HELO_RSET(self): + self.write_line(b'HELO example') + self.write_line(b'RSET') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_RSET_syntax(self): + self.write_line(b'RSET hi') + self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n') + + def test_unknown_command(self): + self.write_line(b'UNKNOWN_CMD') + self.assertEqual(self.channel.socket.last, + b'500 Error: command "UNKNOWN_CMD" not ' + \ + b'recognized\r\n') + + def test_attribute_deprecations(self): + with warnings_helper.check_warnings(('', DeprecationWarning)): + spam = self.channel._SMTPChannel__server + with warnings_helper.check_warnings(('', DeprecationWarning)): + self.channel._SMTPChannel__server = 'spam' + with warnings_helper.check_warnings(('', DeprecationWarning)): + spam = self.channel._SMTPChannel__line + with warnings_helper.check_warnings(('', DeprecationWarning)): + self.channel._SMTPChannel__line = 'spam' + with warnings_helper.check_warnings(('', DeprecationWarning)): + spam = self.channel._SMTPChannel__state + with warnings_helper.check_warnings(('', DeprecationWarning)): + self.channel._SMTPChannel__state = 'spam' + with warnings_helper.check_warnings(('', DeprecationWarning)): + spam = self.channel._SMTPChannel__greeting + with warnings_helper.check_warnings(('', DeprecationWarning)): + self.channel._SMTPChannel__greeting = 'spam' + with warnings_helper.check_warnings(('', DeprecationWarning)): + spam = self.channel._SMTPChannel__mailfrom + with warnings_helper.check_warnings(('', DeprecationWarning)): + self.channel._SMTPChannel__mailfrom = 'spam' + with warnings_helper.check_warnings(('', DeprecationWarning)): + spam = self.channel._SMTPChannel__rcpttos + with warnings_helper.check_warnings(('', DeprecationWarning)): + self.channel._SMTPChannel__rcpttos = 'spam' + with warnings_helper.check_warnings(('', DeprecationWarning)): + spam = self.channel._SMTPChannel__data + with warnings_helper.check_warnings(('', DeprecationWarning)): + self.channel._SMTPChannel__data = 'spam' + with warnings_helper.check_warnings(('', DeprecationWarning)): + spam = self.channel._SMTPChannel__fqdn + with warnings_helper.check_warnings(('', DeprecationWarning)): + self.channel._SMTPChannel__fqdn = 'spam' + with warnings_helper.check_warnings(('', DeprecationWarning)): + spam = self.channel._SMTPChannel__peer + with warnings_helper.check_warnings(('', DeprecationWarning)): + self.channel._SMTPChannel__peer = 'spam' + with warnings_helper.check_warnings(('', DeprecationWarning)): + spam = self.channel._SMTPChannel__conn + with warnings_helper.check_warnings(('', DeprecationWarning)): + self.channel._SMTPChannel__conn = 'spam' + with warnings_helper.check_warnings(('', DeprecationWarning)): + spam = self.channel._SMTPChannel__addr + with warnings_helper.check_warnings(('', DeprecationWarning)): + self.channel._SMTPChannel__addr = 'spam' + +@unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") +class SMTPDChannelIPv6Test(SMTPDChannelTest): + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0), + decode_data=True) + conn, addr = self.server.accept() + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + decode_data=True) + +class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), + decode_data=True) + conn, addr = self.server.accept() + # Set DATA size limit to 32 bytes for easy testing + self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32, + decode_data=True) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, line): + self.channel.socket.queue_recv(line) + self.channel.handle_read() + + def test_data_limit_dialog(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.write_line(b'RCPT To:spam@example') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + self.write_line(b'DATA') + self.assertEqual(self.channel.socket.last, + b'354 End data with <CR><LF>.<CR><LF>\r\n') + self.write_line(b'data\r\nmore\r\n.') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.assertEqual(self.server.messages, + [(('peer-address', 'peer-port'), + 'eggs@example', + ['spam@example'], + 'data\nmore')]) + + def test_data_limit_dialog_too_much_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + self.write_line(b'RCPT To:spam@example') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + self.write_line(b'DATA') + self.assertEqual(self.channel.socket.last, + b'354 End data with <CR><LF>.<CR><LF>\r\n') + self.write_line(b'This message is longer than 32 bytes\r\n.') + self.assertEqual(self.channel.socket.last, + b'552 Error: Too much mail data\r\n') + + +class SMTPDChannelWithDecodeDataFalse(unittest.TestCase): + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer((socket_helper.HOST, 0), ('b', 0)) + conn, addr = self.server.accept() + self.channel = smtpd.SMTPChannel(self.server, conn, addr) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, line): + self.channel.socket.queue_recv(line) + self.channel.handle_read() + + def test_ascii_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'plain ascii text') + self.write_line(b'.') + self.assertEqual(self.channel.received_data, b'plain ascii text') + + def test_utf8_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') + self.write_line(b'and some plain ascii') + self.write_line(b'.') + self.assertEqual( + self.channel.received_data, + b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n' + b'and some plain ascii') + + +class SMTPDChannelWithDecodeDataTrue(unittest.TestCase): + + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), + decode_data=True) + conn, addr = self.server.accept() + # Set decode_data to True + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + decode_data=True) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, line): + self.channel.socket.queue_recv(line) + self.channel.handle_read() + + def test_ascii_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'plain ascii text') + self.write_line(b'.') + self.assertEqual(self.channel.received_data, 'plain ascii text') + + def test_utf8_data(self): + self.write_line(b'HELO example') + self.write_line(b'MAIL From:eggs@example') + self.write_line(b'RCPT To:spam@example') + self.write_line(b'DATA') + self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') + self.write_line(b'and some plain ascii') + self.write_line(b'.') + self.assertEqual( + self.channel.received_data, + 'utf8 enriched text: żźć\nand some plain ascii') + + +class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase): + def setUp(self): + smtpd.socket = asyncore.socket = mock_socket + self.old_debugstream = smtpd.DEBUGSTREAM + self.debug = smtpd.DEBUGSTREAM = io.StringIO() + self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), + enable_SMTPUTF8=True) + conn, addr = self.server.accept() + self.channel = smtpd.SMTPChannel(self.server, conn, addr, + enable_SMTPUTF8=True) + + def tearDown(self): + asyncore.close_all() + asyncore.socket = smtpd.socket = socket + smtpd.DEBUGSTREAM = self.old_debugstream + + def write_line(self, line): + self.channel.socket.queue_recv(line) + self.channel.handle_read() + + def test_MAIL_command_accepts_SMTPUTF8_when_announced(self): + self.write_line(b'EHLO example') + self.write_line( + 'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode( + 'utf-8') + ) + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_process_smtputf8_message(self): + self.write_line(b'EHLO example') + for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']: + self.write_line(b'MAIL from: <a@example> ' + mail_parameters) + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'rcpt to:<b@example.com>') + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'data') + self.assertEqual(self.channel.socket.last[0:3], b'354') + self.write_line(b'c\r\n.') + if mail_parameters == b'': + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + else: + self.assertEqual(self.channel.socket.last, + b'250 SMTPUTF8 message okish\r\n') + + def test_utf8_data(self): + self.write_line(b'EHLO example') + self.write_line( + 'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8')) + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line('RCPT To:späm@examplé'.encode('utf-8')) + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'DATA') + self.assertEqual(self.channel.socket.last[0:3], b'354') + self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') + self.write_line(b'.') + self.assertEqual( + self.channel.received_data, + b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') + + def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self): + self.write_line(b'ehlo example') + fill_len = (512 + 26 + 10) - len('mail from:<@example>') + self.write_line(b'MAIL from:<' + + b'a' * (fill_len + 1) + + b'@example>') + self.assertEqual(self.channel.socket.last, + b'500 Error: line too long\r\n') + self.write_line(b'MAIL from:<' + + b'a' * fill_len + + b'@example>') + self.assertEqual(self.channel.socket.last, b'250 OK\r\n') + + def test_multiple_emails_with_extended_command_length(self): + self.write_line(b'ehlo example') + fill_len = (512 + 26 + 10) - len('mail from:<@example>') + for char in [b'a', b'b', b'c']: + self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>') + self.assertEqual(self.channel.socket.last[0:3], b'500') + self.write_line(b'MAIL from:<' + char * fill_len + b'@example>') + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'rcpt to:<hans@example.com>') + self.assertEqual(self.channel.socket.last[0:3], b'250') + self.write_line(b'data') + self.assertEqual(self.channel.socket.last[0:3], b'354') + self.write_line(b'test\r\n.') + self.assertEqual(self.channel.socket.last[0:3], b'250') + + +class MiscTestCase(unittest.TestCase): + def test__all__(self): + not_exported = { + "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE", + "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs", + } + support.check__all__(self, smtpd, not_exported=not_exported) + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py index bdce3e3..9761a37 100644 --- a/Lib/test/test_smtplib.py +++ b/Lib/test/test_smtplib.py @@ -18,13 +18,17 @@ import threading import unittest from test import support, mock_socket -from test.support import _asyncore as asyncore -from test.support import _smtpd as smtpd from test.support import hashlib_helper from test.support import socket_helper from test.support import threading_helper from unittest.mock import Mock +import warnings +with warnings.catch_warnings(): + warnings.simplefilter('ignore', DeprecationWarning) + import asyncore + import smtpd + HOST = socket_helper.HOST if sys.platform == 'darwin': diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 88eeb07..981e2fe 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -4,7 +4,6 @@ import sys import unittest import unittest.mock from test import support -from test.support import _asyncore as asyncore from test.support import import_helper from test.support import os_helper from test.support import socket_helper @@ -31,6 +30,10 @@ try: except ImportError: ctypes = None +import warnings +with warnings.catch_warnings(): + warnings.simplefilter('ignore', DeprecationWarning) + import asyncore ssl = import_helper.import_module("ssl") import _ssl |