summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2021-12-07 11:31:04 (GMT)
committerGitHub <noreply@github.com>2021-12-07 11:31:04 (GMT)
commitcf7eaa4617295747ee5646c4e2b7e7a16d7c64ab (patch)
treeb09b7bcc7b2cdfcae0b8291f6ec867dbb8337246 /Lib
parent2bf551757e0a7e3cc6ce2ebed2178b82438ac6b5 (diff)
downloadcpython-cf7eaa4617295747ee5646c4e2b7e7a16d7c64ab.zip
cpython-cf7eaa4617295747ee5646c4e2b7e7a16d7c64ab.tar.gz
cpython-cf7eaa4617295747ee5646c4e2b7e7a16d7c64ab.tar.bz2
Revert "bpo-28533: Remove asyncore, asynchat, smtpd modules (GH-29521)" (GH-29951)
This reverts commit 9bf2cbc4c498812e14f20d86acb61c53928a5a57.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/asynchat.py (renamed from Lib/test/support/_asynchat.py)10
-rw-r--r--Lib/asyncore.py (renamed from Lib/test/support/_asyncore.py)6
-rwxr-xr-xLib/smtpd.py (renamed from Lib/test/support/_smtpd.py)137
-rw-r--r--Lib/test/libregrtest/save_env.py12
-rw-r--r--Lib/test/mock_socket.py4
-rw-r--r--Lib/test/test_asynchat.py292
-rw-r--r--Lib/test/test_asyncore.py841
-rw-r--r--Lib/test/test_ftplib.py10
-rw-r--r--Lib/test/test_logging.py7
-rw-r--r--Lib/test/test_os.py7
-rw-r--r--Lib/test/test_poplib.py8
-rw-r--r--Lib/test/test_smtpd.py1018
-rw-r--r--Lib/test/test_smtplib.py8
-rw-r--r--Lib/test/test_ssl.py5
14 files changed, 2346 insertions, 19 deletions
diff --git a/Lib/test/support/_asynchat.py b/Lib/asynchat.py
index 941cc1d..de26ffa 100644
--- a/Lib/test/support/_asynchat.py
+++ b/Lib/asynchat.py
@@ -45,9 +45,17 @@ command will be accumulated (using your own 'collect_incoming_data'
method) up to the terminator, and then control will be returned to
you - by calling your self.found_terminator() method.
"""
-from test.support import _asyncore as asyncore
+import asyncore
from collections import deque
+from warnings import warn
+warn(
+ 'The asynchat module is deprecated. '
+ 'The recommended replacement is asyncio',
+ DeprecationWarning,
+ stacklevel=2)
+
+
class async_chat(asyncore.dispatcher):
"""This is an abstract class. You must derive from this class, and add
diff --git a/Lib/test/support/_asyncore.py b/Lib/asyncore.py
index 7863efa..b1eea4b 100644
--- a/Lib/test/support/_asyncore.py
+++ b/Lib/asyncore.py
@@ -57,6 +57,12 @@ from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
errorcode
+warnings.warn(
+ 'The asyncore module is deprecated. '
+ 'The recommended replacement is asyncio',
+ DeprecationWarning,
+ stacklevel=2)
+
_DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
EBADF})
diff --git a/Lib/test/support/_smtpd.py b/Lib/smtpd.py
index 0e37d08..1cd004f 100755
--- a/Lib/test/support/_smtpd.py
+++ b/Lib/smtpd.py
@@ -80,13 +80,22 @@ import collections
from warnings import warn
from email._header_value_parser import get_addr_spec, get_angle_addr
-from test.support import _asyncore as asyncore
-from test.support import _asynchat as asynchat
-
__all__ = [
"SMTPChannel", "SMTPServer", "DebuggingServer", "PureProxy",
]
+warn(
+ 'The smtpd module is deprecated and unmaintained. Please see aiosmtpd '
+ '(https://aiosmtpd.readthedocs.io/) for the recommended replacement.',
+ DeprecationWarning,
+ stacklevel=2)
+
+
+# These are imported after the above warning so that users get the correct
+# deprecation warning.
+import asyncore
+import asynchat
+
program = sys.argv[0]
__version__ = 'Python SMTP proxy version 0.3'
@@ -179,6 +188,128 @@ class SMTPChannel(asynchat.async_chat):
self.received_lines = []
+ # properties for backwards-compatibility
+ @property
+ def __server(self):
+ warn("Access to __server attribute on SMTPChannel is deprecated, "
+ "use 'smtp_server' instead", DeprecationWarning, 2)
+ return self.smtp_server
+ @__server.setter
+ def __server(self, value):
+ warn("Setting __server attribute on SMTPChannel is deprecated, "
+ "set 'smtp_server' instead", DeprecationWarning, 2)
+ self.smtp_server = value
+
+ @property
+ def __line(self):
+ warn("Access to __line attribute on SMTPChannel is deprecated, "
+ "use 'received_lines' instead", DeprecationWarning, 2)
+ return self.received_lines
+ @__line.setter
+ def __line(self, value):
+ warn("Setting __line attribute on SMTPChannel is deprecated, "
+ "set 'received_lines' instead", DeprecationWarning, 2)
+ self.received_lines = value
+
+ @property
+ def __state(self):
+ warn("Access to __state attribute on SMTPChannel is deprecated, "
+ "use 'smtp_state' instead", DeprecationWarning, 2)
+ return self.smtp_state
+ @__state.setter
+ def __state(self, value):
+ warn("Setting __state attribute on SMTPChannel is deprecated, "
+ "set 'smtp_state' instead", DeprecationWarning, 2)
+ self.smtp_state = value
+
+ @property
+ def __greeting(self):
+ warn("Access to __greeting attribute on SMTPChannel is deprecated, "
+ "use 'seen_greeting' instead", DeprecationWarning, 2)
+ return self.seen_greeting
+ @__greeting.setter
+ def __greeting(self, value):
+ warn("Setting __greeting attribute on SMTPChannel is deprecated, "
+ "set 'seen_greeting' instead", DeprecationWarning, 2)
+ self.seen_greeting = value
+
+ @property
+ def __mailfrom(self):
+ warn("Access to __mailfrom attribute on SMTPChannel is deprecated, "
+ "use 'mailfrom' instead", DeprecationWarning, 2)
+ return self.mailfrom
+ @__mailfrom.setter
+ def __mailfrom(self, value):
+ warn("Setting __mailfrom attribute on SMTPChannel is deprecated, "
+ "set 'mailfrom' instead", DeprecationWarning, 2)
+ self.mailfrom = value
+
+ @property
+ def __rcpttos(self):
+ warn("Access to __rcpttos attribute on SMTPChannel is deprecated, "
+ "use 'rcpttos' instead", DeprecationWarning, 2)
+ return self.rcpttos
+ @__rcpttos.setter
+ def __rcpttos(self, value):
+ warn("Setting __rcpttos attribute on SMTPChannel is deprecated, "
+ "set 'rcpttos' instead", DeprecationWarning, 2)
+ self.rcpttos = value
+
+ @property
+ def __data(self):
+ warn("Access to __data attribute on SMTPChannel is deprecated, "
+ "use 'received_data' instead", DeprecationWarning, 2)
+ return self.received_data
+ @__data.setter
+ def __data(self, value):
+ warn("Setting __data attribute on SMTPChannel is deprecated, "
+ "set 'received_data' instead", DeprecationWarning, 2)
+ self.received_data = value
+
+ @property
+ def __fqdn(self):
+ warn("Access to __fqdn attribute on SMTPChannel is deprecated, "
+ "use 'fqdn' instead", DeprecationWarning, 2)
+ return self.fqdn
+ @__fqdn.setter
+ def __fqdn(self, value):
+ warn("Setting __fqdn attribute on SMTPChannel is deprecated, "
+ "set 'fqdn' instead", DeprecationWarning, 2)
+ self.fqdn = value
+
+ @property
+ def __peer(self):
+ warn("Access to __peer attribute on SMTPChannel is deprecated, "
+ "use 'peer' instead", DeprecationWarning, 2)
+ return self.peer
+ @__peer.setter
+ def __peer(self, value):
+ warn("Setting __peer attribute on SMTPChannel is deprecated, "
+ "set 'peer' instead", DeprecationWarning, 2)
+ self.peer = value
+
+ @property
+ def __conn(self):
+ warn("Access to __conn attribute on SMTPChannel is deprecated, "
+ "use 'conn' instead", DeprecationWarning, 2)
+ return self.conn
+ @__conn.setter
+ def __conn(self, value):
+ warn("Setting __conn attribute on SMTPChannel is deprecated, "
+ "set 'conn' instead", DeprecationWarning, 2)
+ self.conn = value
+
+ @property
+ def __addr(self):
+ warn("Access to __addr attribute on SMTPChannel is deprecated, "
+ "use 'addr' instead", DeprecationWarning, 2)
+ return self.addr
+ @__addr.setter
+ def __addr(self, value):
+ warn("Setting __addr attribute on SMTPChannel is deprecated, "
+ "set 'addr' instead", DeprecationWarning, 2)
+ self.addr = value
+
# Overrides base class for convenience.
def push(self, msg):
asynchat.async_chat.push(self, bytes(
diff --git a/Lib/test/libregrtest/save_env.py b/Lib/test/libregrtest/save_env.py
index 17dda99..60c9be2 100644
--- a/Lib/test/libregrtest/save_env.py
+++ b/Lib/test/libregrtest/save_env.py
@@ -52,7 +52,7 @@ class saved_test_environment:
resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr',
'os.environ', 'sys.path', 'sys.path_hooks', '__import__',
- 'warnings.filters',
+ 'warnings.filters', 'asyncore.socket_map',
'logging._handlers', 'logging._handlerList', 'sys.gettrace',
'sys.warnoptions',
# multiprocessing.process._cleanup() may release ref
@@ -160,6 +160,16 @@ class saved_test_environment:
warnings.filters = saved_filters[1]
warnings.filters[:] = saved_filters[2]
+ def get_asyncore_socket_map(self):
+ asyncore = sys.modules.get('asyncore')
+ # XXX Making a copy keeps objects alive until __exit__ gets called.
+ return asyncore and asyncore.socket_map.copy() or {}
+ def restore_asyncore_socket_map(self, saved_map):
+ asyncore = sys.modules.get('asyncore')
+ if asyncore is not None:
+ asyncore.close_all(ignore_all=True)
+ asyncore.socket_map.update(saved_map)
+
def get_shutil_archive_formats(self):
shutil = self.try_get_module('shutil')
# we could call get_archives_formats() but that only returns the
diff --git a/Lib/test/mock_socket.py b/Lib/test/mock_socket.py
index 9788d58..c7abddc 100644
--- a/Lib/test/mock_socket.py
+++ b/Lib/test/mock_socket.py
@@ -1,4 +1,4 @@
-"""Mock socket module used by the smtplib tests.
+"""Mock socket module used by the smtpd and smtplib tests.
"""
# imported for _GLOBAL_DEFAULT_TIMEOUT
@@ -33,7 +33,7 @@ class MockFile:
class MockSocket:
- """Mock socket object used by smtplib tests.
+ """Mock socket object used by smtpd and smtplib tests.
"""
def __init__(self, family=None):
global _reply_data
diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py
new file mode 100644
index 0000000..973ac1f
--- /dev/null
+++ b/Lib/test/test_asynchat.py
@@ -0,0 +1,292 @@
+# test asynchat
+
+from test import support
+from test.support import socket_helper
+from test.support import threading_helper
+
+import errno
+import socket
+import sys
+import threading
+import time
+import unittest
+import unittest.mock
+
+import warnings
+with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ import asynchat
+ import asyncore
+
+HOST = socket_helper.HOST
+SERVER_QUIT = b'QUIT\n'
+
+
+class echo_server(threading.Thread):
+ # parameter to determine the number of bytes passed back to the
+ # client each send
+ chunk_size = 1
+
+ def __init__(self, event):
+ threading.Thread.__init__(self)
+ self.event = event
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.port = socket_helper.bind_port(self.sock)
+ # This will be set if the client wants us to wait before echoing
+ # data back.
+ self.start_resend_event = None
+
+ def run(self):
+ self.sock.listen()
+ self.event.set()
+ conn, client = self.sock.accept()
+ self.buffer = b""
+ # collect data until quit message is seen
+ while SERVER_QUIT not in self.buffer:
+ data = conn.recv(1)
+ if not data:
+ break
+ self.buffer = self.buffer + data
+
+ # remove the SERVER_QUIT message
+ self.buffer = self.buffer.replace(SERVER_QUIT, b'')
+
+ if self.start_resend_event:
+ self.start_resend_event.wait()
+
+ # re-send entire set of collected data
+ try:
+ # this may fail on some tests, such as test_close_when_done,
+ # since the client closes the channel when it's done sending
+ while self.buffer:
+ n = conn.send(self.buffer[:self.chunk_size])
+ time.sleep(0.001)
+ self.buffer = self.buffer[n:]
+ except:
+ pass
+
+ conn.close()
+ self.sock.close()
+
+class echo_client(asynchat.async_chat):
+
+ def __init__(self, terminator, server_port):
+ asynchat.async_chat.__init__(self)
+ self.contents = []
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.connect((HOST, server_port))
+ self.set_terminator(terminator)
+ self.buffer = b""
+
+ def handle_connect(self):
+ pass
+
+ if sys.platform == 'darwin':
+ # select.poll returns a select.POLLHUP at the end of the tests
+ # on darwin, so just ignore it
+ def handle_expt(self):
+ pass
+
+ def collect_incoming_data(self, data):
+ self.buffer += data
+
+ def found_terminator(self):
+ self.contents.append(self.buffer)
+ self.buffer = b""
+
+def start_echo_server():
+ event = threading.Event()
+ s = echo_server(event)
+ s.start()
+ event.wait()
+ event.clear()
+ time.sleep(0.01) # Give server time to start accepting.
+ return s, event
+
+
+class TestAsynchat(unittest.TestCase):
+ usepoll = False
+
+ def setUp(self):
+ self._threads = threading_helper.threading_setup()
+
+ def tearDown(self):
+ threading_helper.threading_cleanup(*self._threads)
+
+ def line_terminator_check(self, term, server_chunk):
+ event = threading.Event()
+ s = echo_server(event)
+ s.chunk_size = server_chunk
+ s.start()
+ event.wait()
+ event.clear()
+ time.sleep(0.01) # Give server time to start accepting.
+ c = echo_client(term, s.port)
+ c.push(b"hello ")
+ c.push(b"world" + term)
+ c.push(b"I'm not dead yet!" + term)
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ threading_helper.join_thread(s)
+
+ self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
+
+ # the line terminator tests below check receiving variously-sized
+ # chunks back from the server in order to exercise all branches of
+ # async_chat.handle_read
+
+ def test_line_terminator1(self):
+ # test one-character terminator
+ for l in (1, 2, 3):
+ self.line_terminator_check(b'\n', l)
+
+ def test_line_terminator2(self):
+ # test two-character terminator
+ for l in (1, 2, 3):
+ self.line_terminator_check(b'\r\n', l)
+
+ def test_line_terminator3(self):
+ # test three-character terminator
+ for l in (1, 2, 3):
+ self.line_terminator_check(b'qqq', l)
+
+ def numeric_terminator_check(self, termlen):
+ # Try reading a fixed number of bytes
+ s, event = start_echo_server()
+ c = echo_client(termlen, s.port)
+ data = b"hello world, I'm not dead yet!\n"
+ c.push(data)
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ threading_helper.join_thread(s)
+
+ self.assertEqual(c.contents, [data[:termlen]])
+
+ def test_numeric_terminator1(self):
+ # check that ints & longs both work (since type is
+ # explicitly checked in async_chat.handle_read)
+ self.numeric_terminator_check(1)
+
+ def test_numeric_terminator2(self):
+ self.numeric_terminator_check(6)
+
+ def test_none_terminator(self):
+ # Try reading a fixed number of bytes
+ s, event = start_echo_server()
+ c = echo_client(None, s.port)
+ data = b"hello world, I'm not dead yet!\n"
+ c.push(data)
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ threading_helper.join_thread(s)
+
+ self.assertEqual(c.contents, [])
+ self.assertEqual(c.buffer, data)
+
+ def test_simple_producer(self):
+ s, event = start_echo_server()
+ c = echo_client(b'\n', s.port)
+ data = b"hello world\nI'm not dead yet!\n"
+ p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
+ c.push_with_producer(p)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ threading_helper.join_thread(s)
+
+ self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
+
+ def test_string_producer(self):
+ s, event = start_echo_server()
+ c = echo_client(b'\n', s.port)
+ data = b"hello world\nI'm not dead yet!\n"
+ c.push_with_producer(data+SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ threading_helper.join_thread(s)
+
+ self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
+
+ def test_empty_line(self):
+ # checks that empty lines are handled correctly
+ s, event = start_echo_server()
+ c = echo_client(b'\n', s.port)
+ c.push(b"hello world\n\nI'm not dead yet!\n")
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ threading_helper.join_thread(s)
+
+ self.assertEqual(c.contents,
+ [b"hello world", b"", b"I'm not dead yet!"])
+
+ def test_close_when_done(self):
+ s, event = start_echo_server()
+ s.start_resend_event = threading.Event()
+ c = echo_client(b'\n', s.port)
+ c.push(b"hello world\nI'm not dead yet!\n")
+ c.push(SERVER_QUIT)
+ c.close_when_done()
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+
+ # Only allow the server to start echoing data back to the client after
+ # the client has closed its connection. This prevents a race condition
+ # where the server echoes all of its data before we can check that it
+ # got any down below.
+ s.start_resend_event.set()
+ threading_helper.join_thread(s)
+
+ self.assertEqual(c.contents, [])
+ # the server might have been able to send a byte or two back, but this
+ # at least checks that it received something and didn't just fail
+ # (which could still result in the client not having received anything)
+ self.assertGreater(len(s.buffer), 0)
+
+ def test_push(self):
+ # Issue #12523: push() should raise a TypeError if it doesn't get
+ # a bytes string
+ s, event = start_echo_server()
+ c = echo_client(b'\n', s.port)
+ data = b'bytes\n'
+ c.push(data)
+ c.push(bytearray(data))
+ c.push(memoryview(data))
+ self.assertRaises(TypeError, c.push, 10)
+ self.assertRaises(TypeError, c.push, 'unicode')
+ c.push(SERVER_QUIT)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
+ threading_helper.join_thread(s)
+ self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])
+
+
+class TestAsynchat_WithPoll(TestAsynchat):
+ usepoll = True
+
+
+class TestAsynchatMocked(unittest.TestCase):
+ def test_blockingioerror(self):
+ # Issue #16133: handle_read() must ignore BlockingIOError
+ sock = unittest.mock.Mock()
+ sock.recv.side_effect = BlockingIOError(errno.EAGAIN)
+
+ dispatcher = asynchat.async_chat()
+ dispatcher.set_socket(sock)
+ self.addCleanup(dispatcher.del_channel)
+
+ with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
+ dispatcher.handle_read()
+ self.assertFalse(error.called)
+
+
+class TestHelperFunctions(unittest.TestCase):
+ def test_find_prefix_at_end(self):
+ self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
+ self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
+
+
+class TestNotConnected(unittest.TestCase):
+ def test_disallow_negative_terminator(self):
+ # Issue #11259
+ client = asynchat.async_chat()
+ self.assertRaises(ValueError, client.set_terminator, -1)
+
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
new file mode 100644
index 0000000..ecd1e12
--- /dev/null
+++ b/Lib/test/test_asyncore.py
@@ -0,0 +1,841 @@
+import unittest
+import select
+import os
+import socket
+import sys
+import time
+import errno
+import struct
+import threading
+
+from test import support
+from test.support import os_helper
+from test.support import socket_helper
+from test.support import threading_helper
+from test.support import warnings_helper
+from io import BytesIO
+
+if support.PGO:
+ raise unittest.SkipTest("test is not helpful for PGO")
+
+import warnings
+with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ import asyncore
+
+
+HAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX')
+
+class dummysocket:
+ def __init__(self):
+ self.closed = False
+
+ def close(self):
+ self.closed = True
+
+ def fileno(self):
+ return 42
+
+class dummychannel:
+ def __init__(self):
+ self.socket = dummysocket()
+
+ def close(self):
+ self.socket.close()
+
+class exitingdummy:
+ def __init__(self):
+ pass
+
+ def handle_read_event(self):
+ raise asyncore.ExitNow()
+
+ handle_write_event = handle_read_event
+ handle_close = handle_read_event
+ handle_expt_event = handle_read_event
+
+class crashingdummy:
+ def __init__(self):
+ self.error_handled = False
+
+ def handle_read_event(self):
+ raise Exception()
+
+ handle_write_event = handle_read_event
+ handle_close = handle_read_event
+ handle_expt_event = handle_read_event
+
+ def handle_error(self):
+ self.error_handled = True
+
+# used when testing senders; just collects what it gets until newline is sent
+def capture_server(evt, buf, serv):
+ try:
+ serv.listen()
+ conn, addr = serv.accept()
+ except TimeoutError:
+ pass
+ else:
+ n = 200
+ start = time.monotonic()
+ while n > 0 and time.monotonic() - start < 3.0:
+ r, w, e = select.select([conn], [], [], 0.1)
+ if r:
+ n -= 1
+ data = conn.recv(10)
+ # keep everything except for the newline terminator
+ buf.write(data.replace(b'\n', b''))
+ if b'\n' in data:
+ break
+ time.sleep(0.01)
+
+ conn.close()
+ finally:
+ serv.close()
+ evt.set()
+
+def bind_af_aware(sock, addr):
+ """Helper function to bind a socket according to its family."""
+ if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
+ # Make sure the path doesn't exist.
+ os_helper.unlink(addr)
+ socket_helper.bind_unix_socket(sock, addr)
+ else:
+ sock.bind(addr)
+
+
+class HelperFunctionTests(unittest.TestCase):
+ def test_readwriteexc(self):
+ # Check exception handling behavior of read, write and _exception
+
+ # check that ExitNow exceptions in the object handler method
+ # bubbles all the way up through asyncore read/write/_exception calls
+ tr1 = exitingdummy()
+ self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
+ self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
+ self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
+
+ # check that an exception other than ExitNow in the object handler
+ # method causes the handle_error method to get called
+ tr2 = crashingdummy()
+ asyncore.read(tr2)
+ self.assertEqual(tr2.error_handled, True)
+
+ tr2 = crashingdummy()
+ asyncore.write(tr2)
+ self.assertEqual(tr2.error_handled, True)
+
+ tr2 = crashingdummy()
+ asyncore._exception(tr2)
+ self.assertEqual(tr2.error_handled, True)
+
+ # asyncore.readwrite uses constants in the select module that
+ # are not present in Windows systems (see this thread:
+ # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
+ # These constants should be present as long as poll is available
+
+ @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
+ def test_readwrite(self):
+ # Check that correct methods are called by readwrite()
+
+ attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
+
+ expected = (
+ (select.POLLIN, 'read'),
+ (select.POLLPRI, 'expt'),
+ (select.POLLOUT, 'write'),
+ (select.POLLERR, 'closed'),
+ (select.POLLHUP, 'closed'),
+ (select.POLLNVAL, 'closed'),
+ )
+
+ class testobj:
+ def __init__(self):
+ self.read = False
+ self.write = False
+ self.closed = False
+ self.expt = False
+ self.error_handled = False
+
+ def handle_read_event(self):
+ self.read = True
+
+ def handle_write_event(self):
+ self.write = True
+
+ def handle_close(self):
+ self.closed = True
+
+ def handle_expt_event(self):
+ self.expt = True
+
+ def handle_error(self):
+ self.error_handled = True
+
+ for flag, expectedattr in expected:
+ tobj = testobj()
+ self.assertEqual(getattr(tobj, expectedattr), False)
+ asyncore.readwrite(tobj, flag)
+
+ # Only the attribute modified by the routine we expect to be
+ # called should be True.
+ for attr in attributes:
+ self.assertEqual(getattr(tobj, attr), attr==expectedattr)
+
+ # check that ExitNow exceptions in the object handler method
+ # bubbles all the way up through asyncore readwrite call
+ tr1 = exitingdummy()
+ self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
+
+ # check that an exception other than ExitNow in the object handler
+ # method causes the handle_error method to get called
+ tr2 = crashingdummy()
+ self.assertEqual(tr2.error_handled, False)
+ asyncore.readwrite(tr2, flag)
+ self.assertEqual(tr2.error_handled, True)
+
+ def test_closeall(self):
+ self.closeall_check(False)
+
+ def test_closeall_default(self):
+ self.closeall_check(True)
+
+ def closeall_check(self, usedefault):
+ # Check that close_all() closes everything in a given map
+
+ l = []
+ testmap = {}
+ for i in range(10):
+ c = dummychannel()
+ l.append(c)
+ self.assertEqual(c.socket.closed, False)
+ testmap[i] = c
+
+ if usedefault:
+ socketmap = asyncore.socket_map
+ try:
+ asyncore.socket_map = testmap
+ asyncore.close_all()
+ finally:
+ testmap, asyncore.socket_map = asyncore.socket_map, socketmap
+ else:
+ asyncore.close_all(testmap)
+
+ self.assertEqual(len(testmap), 0)
+
+ for c in l:
+ self.assertEqual(c.socket.closed, True)
+
+ def test_compact_traceback(self):
+ try:
+ raise Exception("I don't like spam!")
+ except:
+ real_t, real_v, real_tb = sys.exc_info()
+ r = asyncore.compact_traceback()
+ else:
+ self.fail("Expected exception")
+
+ (f, function, line), t, v, info = r
+ self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
+ self.assertEqual(function, 'test_compact_traceback')
+ self.assertEqual(t, real_t)
+ self.assertEqual(v, real_v)
+ self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
+
+
+class DispatcherTests(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ asyncore.close_all()
+
+ def test_basic(self):
+ d = asyncore.dispatcher()
+ self.assertEqual(d.readable(), True)
+ self.assertEqual(d.writable(), True)
+
+ def test_repr(self):
+ d = asyncore.dispatcher()
+ self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
+
+ def test_log(self):
+ d = asyncore.dispatcher()
+
+ # capture output of dispatcher.log() (to stderr)
+ l1 = "Lovely spam! Wonderful spam!"
+ l2 = "I don't like spam!"
+ with support.captured_stderr() as stderr:
+ d.log(l1)
+ d.log(l2)
+
+ lines = stderr.getvalue().splitlines()
+ self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
+
+ def test_log_info(self):
+ d = asyncore.dispatcher()
+
+ # capture output of dispatcher.log_info() (to stdout via print)
+ l1 = "Have you got anything without spam?"
+ l2 = "Why can't she have egg bacon spam and sausage?"
+ l3 = "THAT'S got spam in it!"
+ with support.captured_stdout() as stdout:
+ d.log_info(l1, 'EGGS')
+ d.log_info(l2)
+ d.log_info(l3, 'SPAM')
+
+ lines = stdout.getvalue().splitlines()
+ expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
+ self.assertEqual(lines, expected)
+
+ def test_unhandled(self):
+ d = asyncore.dispatcher()
+ d.ignore_log_types = ()
+
+ # capture output of dispatcher.log_info() (to stdout via print)
+ with support.captured_stdout() as stdout:
+ d.handle_expt()
+ d.handle_read()
+ d.handle_write()
+ d.handle_connect()
+
+ lines = stdout.getvalue().splitlines()
+ expected = ['warning: unhandled incoming priority event',
+ 'warning: unhandled read event',
+ 'warning: unhandled write event',
+ 'warning: unhandled connect event']
+ self.assertEqual(lines, expected)
+
+ def test_strerror(self):
+ # refers to bug #8573
+ err = asyncore._strerror(errno.EPERM)
+ if hasattr(os, 'strerror'):
+ self.assertEqual(err, os.strerror(errno.EPERM))
+ err = asyncore._strerror(-1)
+ self.assertTrue(err != "")
+
+
+class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
+ def readable(self):
+ return False
+
+ def handle_connect(self):
+ pass
+
+
+class DispatcherWithSendTests(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ asyncore.close_all()
+
+ @threading_helper.reap_threads
+ def test_send(self):
+ evt = threading.Event()
+ sock = socket.socket()
+ sock.settimeout(3)
+ port = socket_helper.bind_port(sock)
+
+ cap = BytesIO()
+ args = (evt, cap, sock)
+ t = threading.Thread(target=capture_server, args=args)
+ t.start()
+ try:
+ # wait a little longer for the server to initialize (it sometimes
+ # refuses connections on slow machines without this wait)
+ time.sleep(0.2)
+
+ data = b"Suppose there isn't a 16-ton weight?"
+ d = dispatcherwithsend_noread()
+ d.create_socket()
+ d.connect((socket_helper.HOST, port))
+
+ # give time for socket to connect
+ time.sleep(0.1)
+
+ d.send(data)
+ d.send(data)
+ d.send(b'\n')
+
+ n = 1000
+ while d.out_buffer and n > 0:
+ asyncore.poll()
+ n -= 1
+
+ evt.wait()
+
+ self.assertEqual(cap.getvalue(), data*2)
+ finally:
+ threading_helper.join_thread(t)
+
+
+@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
+ 'asyncore.file_wrapper required')
+class FileWrapperTest(unittest.TestCase):
+ def setUp(self):
+ self.d = b"It's not dead, it's sleeping!"
+ with open(os_helper.TESTFN, 'wb') as file:
+ file.write(self.d)
+
+ def tearDown(self):
+ os_helper.unlink(os_helper.TESTFN)
+
+ def test_recv(self):
+ fd = os.open(os_helper.TESTFN, os.O_RDONLY)
+ w = asyncore.file_wrapper(fd)
+ os.close(fd)
+
+ self.assertNotEqual(w.fd, fd)
+ self.assertNotEqual(w.fileno(), fd)
+ self.assertEqual(w.recv(13), b"It's not dead")
+ self.assertEqual(w.read(6), b", it's")
+ w.close()
+ self.assertRaises(OSError, w.read, 1)
+
+ def test_send(self):
+ d1 = b"Come again?"
+ d2 = b"I want to buy some cheese."
+ fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_APPEND)
+ w = asyncore.file_wrapper(fd)
+ os.close(fd)
+
+ w.write(d1)
+ w.send(d2)
+ w.close()
+ with open(os_helper.TESTFN, 'rb') as file:
+ self.assertEqual(file.read(), self.d + d1 + d2)
+
+ @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
+ 'asyncore.file_dispatcher required')
+ def test_dispatcher(self):
+ fd = os.open(os_helper.TESTFN, os.O_RDONLY)
+ data = []
+ class FileDispatcher(asyncore.file_dispatcher):
+ def handle_read(self):
+ data.append(self.recv(29))
+ s = FileDispatcher(fd)
+ os.close(fd)
+ asyncore.loop(timeout=0.01, use_poll=True, count=2)
+ self.assertEqual(b"".join(data), self.d)
+
+ def test_resource_warning(self):
+ # Issue #11453
+ fd = os.open(os_helper.TESTFN, os.O_RDONLY)
+ f = asyncore.file_wrapper(fd)
+
+ os.close(fd)
+ with warnings_helper.check_warnings(('', ResourceWarning)):
+ f = None
+ support.gc_collect()
+
+ def test_close_twice(self):
+ fd = os.open(os_helper.TESTFN, os.O_RDONLY)
+ f = asyncore.file_wrapper(fd)
+ os.close(fd)
+
+ os.close(f.fd) # file_wrapper dupped fd
+ with self.assertRaises(OSError):
+ f.close()
+
+ self.assertEqual(f.fd, -1)
+ # calling close twice should not fail
+ f.close()
+
+
+class BaseTestHandler(asyncore.dispatcher):
+
+ def __init__(self, sock=None):
+ asyncore.dispatcher.__init__(self, sock)
+ self.flag = False
+
+ def handle_accept(self):
+ raise Exception("handle_accept not supposed to be called")
+
+ def handle_accepted(self):
+ raise Exception("handle_accepted not supposed to be called")
+
+ def handle_connect(self):
+ raise Exception("handle_connect not supposed to be called")
+
+ def handle_expt(self):
+ raise Exception("handle_expt not supposed to be called")
+
+ def handle_close(self):
+ raise Exception("handle_close not supposed to be called")
+
+ def handle_error(self):
+ raise
+
+
+class BaseServer(asyncore.dispatcher):
+ """A server which listens on an address and dispatches the
+ connection to a handler.
+ """
+
+ def __init__(self, family, addr, handler=BaseTestHandler):
+ asyncore.dispatcher.__init__(self)
+ self.create_socket(family)
+ self.set_reuse_addr()
+ bind_af_aware(self.socket, addr)
+ self.listen(5)
+ self.handler = handler
+
+ @property
+ def address(self):
+ return self.socket.getsockname()
+
+ def handle_accepted(self, sock, addr):
+ self.handler(sock)
+
+ def handle_error(self):
+ raise
+
+
+class BaseClient(BaseTestHandler):
+
+ def __init__(self, family, address):
+ BaseTestHandler.__init__(self)
+ self.create_socket(family)
+ self.connect(address)
+
+ def handle_connect(self):
+ pass
+
+
+class BaseTestAPI:
+
+ def tearDown(self):
+ asyncore.close_all(ignore_all=True)
+
+ def loop_waiting_for_flag(self, instance, timeout=5):
+ timeout = float(timeout) / 100
+ count = 100
+ while asyncore.socket_map and count > 0:
+ asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
+ if instance.flag:
+ return
+ count -= 1
+ time.sleep(timeout)
+ self.fail("flag not set")
+
+ def test_handle_connect(self):
+ # make sure handle_connect is called on connect()
+
+ class TestClient(BaseClient):
+ def handle_connect(self):
+ self.flag = True
+
+ server = BaseServer(self.family, self.addr)
+ client = TestClient(self.family, server.address)
+ self.loop_waiting_for_flag(client)
+
+ def test_handle_accept(self):
+ # make sure handle_accept() is called when a client connects
+
+ class TestListener(BaseTestHandler):
+
+ def __init__(self, family, addr):
+ BaseTestHandler.__init__(self)
+ self.create_socket(family)
+ bind_af_aware(self.socket, addr)
+ self.listen(5)
+ self.address = self.socket.getsockname()
+
+ def handle_accept(self):
+ self.flag = True
+
+ server = TestListener(self.family, self.addr)
+ client = BaseClient(self.family, server.address)
+ self.loop_waiting_for_flag(server)
+
+ def test_handle_accepted(self):
+ # make sure handle_accepted() is called when a client connects
+
+ class TestListener(BaseTestHandler):
+
+ def __init__(self, family, addr):
+ BaseTestHandler.__init__(self)
+ self.create_socket(family)
+ bind_af_aware(self.socket, addr)
+ self.listen(5)
+ self.address = self.socket.getsockname()
+
+ def handle_accept(self):
+ asyncore.dispatcher.handle_accept(self)
+
+ def handle_accepted(self, sock, addr):
+ sock.close()
+ self.flag = True
+
+ server = TestListener(self.family, self.addr)
+ client = BaseClient(self.family, server.address)
+ self.loop_waiting_for_flag(server)
+
+
+ def test_handle_read(self):
+ # make sure handle_read is called on data received
+
+ class TestClient(BaseClient):
+ def handle_read(self):
+ self.flag = True
+
+ class TestHandler(BaseTestHandler):
+ def __init__(self, conn):
+ BaseTestHandler.__init__(self, conn)
+ self.send(b'x' * 1024)
+
+ server = BaseServer(self.family, self.addr, TestHandler)
+ client = TestClient(self.family, server.address)
+ self.loop_waiting_for_flag(client)
+
+ def test_handle_write(self):
+ # make sure handle_write is called
+
+ class TestClient(BaseClient):
+ def handle_write(self):
+ self.flag = True
+
+ server = BaseServer(self.family, self.addr)
+ client = TestClient(self.family, server.address)
+ self.loop_waiting_for_flag(client)
+
+ def test_handle_close(self):
+ # make sure handle_close is called when the other end closes
+ # the connection
+
+ class TestClient(BaseClient):
+
+ def handle_read(self):
+ # in order to make handle_close be called we are supposed
+ # to make at least one recv() call
+ self.recv(1024)
+
+ def handle_close(self):
+ self.flag = True
+ self.close()
+
+ class TestHandler(BaseTestHandler):
+ def __init__(self, conn):
+ BaseTestHandler.__init__(self, conn)
+ self.close()
+
+ server = BaseServer(self.family, self.addr, TestHandler)
+ client = TestClient(self.family, server.address)
+ self.loop_waiting_for_flag(client)
+
+ def test_handle_close_after_conn_broken(self):
+ # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and
+ # #11265).
+
+ data = b'\0' * 128
+
+ class TestClient(BaseClient):
+
+ def handle_write(self):
+ self.send(data)
+
+ def handle_close(self):
+ self.flag = True
+ self.close()
+
+ def handle_expt(self):
+ self.flag = True
+ self.close()
+
+ class TestHandler(BaseTestHandler):
+
+ def handle_read(self):
+ self.recv(len(data))
+ self.close()
+
+ def writable(self):
+ return False
+
+ server = BaseServer(self.family, self.addr, TestHandler)
+ client = TestClient(self.family, server.address)
+ self.loop_waiting_for_flag(client)
+
+ @unittest.skipIf(sys.platform.startswith("sunos"),
+ "OOB support is broken on Solaris")
+ def test_handle_expt(self):
+ # Make sure handle_expt is called on OOB data received.
+ # Note: this might fail on some platforms as OOB data is
+ # tenuously supported and rarely used.
+ if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
+ self.skipTest("Not applicable to AF_UNIX sockets.")
+
+ if sys.platform == "darwin" and self.use_poll:
+ self.skipTest("poll may fail on macOS; see issue #28087")
+
+ class TestClient(BaseClient):
+ def handle_expt(self):
+ self.socket.recv(1024, socket.MSG_OOB)
+ self.flag = True
+
+ class TestHandler(BaseTestHandler):
+ def __init__(self, conn):
+ BaseTestHandler.__init__(self, conn)
+ self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
+
+ server = BaseServer(self.family, self.addr, TestHandler)
+ client = TestClient(self.family, server.address)
+ self.loop_waiting_for_flag(client)
+
+ def test_handle_error(self):
+
+ class TestClient(BaseClient):
+ def handle_write(self):
+ 1.0 / 0
+ def handle_error(self):
+ self.flag = True
+ try:
+ raise
+ except ZeroDivisionError:
+ pass
+ else:
+ raise Exception("exception not raised")
+
+ server = BaseServer(self.family, self.addr)
+ client = TestClient(self.family, server.address)
+ self.loop_waiting_for_flag(client)
+
+ def test_connection_attributes(self):
+ server = BaseServer(self.family, self.addr)
+ client = BaseClient(self.family, server.address)
+
+ # we start disconnected
+ self.assertFalse(server.connected)
+ self.assertTrue(server.accepting)
+ # this can't be taken for granted across all platforms
+ #self.assertFalse(client.connected)
+ self.assertFalse(client.accepting)
+
+ # execute some loops so that client connects to server
+ asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
+ self.assertFalse(server.connected)
+ self.assertTrue(server.accepting)
+ self.assertTrue(client.connected)
+ self.assertFalse(client.accepting)
+
+ # disconnect the client
+ client.close()
+ self.assertFalse(server.connected)
+ self.assertTrue(server.accepting)
+ self.assertFalse(client.connected)
+ self.assertFalse(client.accepting)
+
+ # stop serving
+ server.close()
+ self.assertFalse(server.connected)
+ self.assertFalse(server.accepting)
+
+ def test_create_socket(self):
+ s = asyncore.dispatcher()
+ s.create_socket(self.family)
+ self.assertEqual(s.socket.type, socket.SOCK_STREAM)
+ self.assertEqual(s.socket.family, self.family)
+ self.assertEqual(s.socket.gettimeout(), 0)
+ self.assertFalse(s.socket.get_inheritable())
+
+ def test_bind(self):
+ if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
+ self.skipTest("Not applicable to AF_UNIX sockets.")
+ s1 = asyncore.dispatcher()
+ s1.create_socket(self.family)
+ s1.bind(self.addr)
+ s1.listen(5)
+ port = s1.socket.getsockname()[1]
+
+ s2 = asyncore.dispatcher()
+ s2.create_socket(self.family)
+ # EADDRINUSE indicates the socket was correctly bound
+ self.assertRaises(OSError, s2.bind, (self.addr[0], port))
+
+ def test_set_reuse_addr(self):
+ if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
+ self.skipTest("Not applicable to AF_UNIX sockets.")
+
+ with socket.socket(self.family) as sock:
+ try:
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ except OSError:
+ unittest.skip("SO_REUSEADDR not supported on this platform")
+ else:
+ # if SO_REUSEADDR succeeded for sock we expect asyncore
+ # to do the same
+ s = asyncore.dispatcher(socket.socket(self.family))
+ self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR))
+ s.socket.close()
+ s.create_socket(self.family)
+ s.set_reuse_addr()
+ self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR))
+
+ @threading_helper.reap_threads
+ def test_quick_connect(self):
+ # see: http://bugs.python.org/issue10340
+ if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
+ self.skipTest("test specific to AF_INET and AF_INET6")
+
+ server = BaseServer(self.family, self.addr)
+ # run the thread 500 ms: the socket should be connected in 200 ms
+ t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
+ count=5))
+ t.start()
+ try:
+ with socket.socket(self.family, socket.SOCK_STREAM) as s:
+ s.settimeout(.2)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
+ struct.pack('ii', 1, 0))
+
+ try:
+ s.connect(server.address)
+ except OSError:
+ pass
+ finally:
+ threading_helper.join_thread(t)
+
+class TestAPI_UseIPv4Sockets(BaseTestAPI):
+ family = socket.AF_INET
+ addr = (socket_helper.HOST, 0)
+
+@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required')
+class TestAPI_UseIPv6Sockets(BaseTestAPI):
+ family = socket.AF_INET6
+ addr = (socket_helper.HOSTv6, 0)
+
+@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required')
+class TestAPI_UseUnixSockets(BaseTestAPI):
+ if HAS_UNIX_SOCKETS:
+ family = socket.AF_UNIX
+ addr = os_helper.TESTFN
+
+ def tearDown(self):
+ os_helper.unlink(self.addr)
+ BaseTestAPI.tearDown(self)
+
+class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):
+ use_poll = False
+
+@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
+class TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets, unittest.TestCase):
+ use_poll = True
+
+class TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets, unittest.TestCase):
+ use_poll = False
+
+@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
+class TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets, unittest.TestCase):
+ use_poll = True
+
+class TestAPI_UseUnixSocketsSelect(TestAPI_UseUnixSockets, unittest.TestCase):
+ use_poll = False
+
+@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
+class TestAPI_UseUnixSocketsPoll(TestAPI_UseUnixSockets, unittest.TestCase):
+ use_poll = True
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py
index c9edfac..56e3d8a 100644
--- a/Lib/test/test_ftplib.py
+++ b/Lib/test/test_ftplib.py
@@ -18,13 +18,17 @@ except ImportError:
from unittest import TestCase, skipUnless
from test import support
-from test.support import _asynchat as asynchat
-from test.support import _asyncore as asyncore
-from test.support import socket_helper
from test.support import threading_helper
+from test.support import socket_helper
from test.support import warnings_helper
from test.support.socket_helper import HOST, HOSTv6
+import warnings
+with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ import asyncore
+ import asynchat
+
TIMEOUT = support.LOOPBACK_TIMEOUT
DEFAULT_ENCODING = 'utf-8'
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index e709ea3..85b6e5f 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -42,8 +42,6 @@ import sys
import tempfile
from test.support.script_helper import assert_python_ok, assert_python_failure
from test import support
-from test.support import _asyncore as asyncore
-from test.support import _smtpd as smtpd
from test.support import os_helper
from test.support import socket_helper
from test.support import threading_helper
@@ -61,6 +59,11 @@ from urllib.parse import urlparse, parse_qs
from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
ThreadingTCPServer, StreamRequestHandler)
+with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ import asyncore
+ import smtpd
+
try:
import win32evtlog, win32evtlogutil, pywintypes
except ImportError:
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 5e15340..8da0aa3 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -30,8 +30,6 @@ import unittest
import uuid
import warnings
from test import support
-from test.support import _asynchat as asynchat
-from test.support import _asyncore as asyncore
from test.support import import_helper
from test.support import os_helper
from test.support import socket_helper
@@ -39,6 +37,11 @@ from test.support import threading_helper
from test.support import warnings_helper
from platform import win32_is_iot
+with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ import asynchat
+ import asyncore
+
try:
import resource
except ImportError:
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py
index 23c6be3..44cf523 100644
--- a/Lib/test/test_poplib.py
+++ b/Lib/test/test_poplib.py
@@ -12,12 +12,16 @@ import threading
import unittest
from unittest import TestCase, skipUnless
from test import support as test_support
-from test.support import _asynchat as asynchat
-from test.support import _asyncore as asyncore
from test.support import hashlib_helper
from test.support import socket_helper
from test.support import threading_helper
+import warnings
+with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ import asynchat
+ import asyncore
+
HOST = socket_helper.HOST
PORT = 0
diff --git a/Lib/test/test_smtpd.py b/Lib/test/test_smtpd.py
new file mode 100644
index 0000000..d2e150d
--- /dev/null
+++ b/Lib/test/test_smtpd.py
@@ -0,0 +1,1018 @@
+import unittest
+import textwrap
+from test import support, mock_socket
+from test.support import socket_helper
+from test.support import warnings_helper
+import socket
+import io
+
+import warnings
+with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ import smtpd
+ import asyncore
+
+
+class DummyServer(smtpd.SMTPServer):
+ def __init__(self, *args, **kwargs):
+ smtpd.SMTPServer.__init__(self, *args, **kwargs)
+ self.messages = []
+ if self._decode_data:
+ self.return_status = 'return status'
+ else:
+ self.return_status = b'return status'
+
+ def process_message(self, peer, mailfrom, rcpttos, data, **kw):
+ self.messages.append((peer, mailfrom, rcpttos, data))
+ if data == self.return_status:
+ return '250 Okish'
+ if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']:
+ return '250 SMTPUTF8 message okish'
+
+
+class DummyDispatcherBroken(Exception):
+ pass
+
+
+class BrokenDummyServer(DummyServer):
+ def listen(self, num):
+ raise DummyDispatcherBroken()
+
+
+class SMTPDServerTest(unittest.TestCase):
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+
+ def test_process_message_unimplemented(self):
+ server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0),
+ decode_data=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
+
+ def write_line(line):
+ channel.socket.queue_recv(line)
+ channel.handle_read()
+
+ write_line(b'HELO example')
+ write_line(b'MAIL From:eggs@example')
+ write_line(b'RCPT To:spam@example')
+ write_line(b'DATA')
+ self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
+
+ def test_decode_data_and_enable_SMTPUTF8_raises(self):
+ self.assertRaises(
+ ValueError,
+ smtpd.SMTPServer,
+ (socket_helper.HOST, 0),
+ ('b', 0),
+ enable_SMTPUTF8=True,
+ decode_data=True)
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+
+
+class DebuggingServerTest(unittest.TestCase):
+
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+
+ def send_data(self, channel, data, enable_SMTPUTF8=False):
+ def write_line(line):
+ channel.socket.queue_recv(line)
+ channel.handle_read()
+ write_line(b'EHLO example')
+ if enable_SMTPUTF8:
+ write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8')
+ else:
+ write_line(b'MAIL From:eggs@example')
+ write_line(b'RCPT To:spam@example')
+ write_line(b'DATA')
+ write_line(data)
+ write_line(b'.')
+
+ def test_process_message_with_decode_data_true(self):
+ server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
+ decode_data=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
+ with support.captured_stdout() as s:
+ self.send_data(channel, b'From: test\n\nhello\n')
+ stdout = s.getvalue()
+ self.assertEqual(stdout, textwrap.dedent("""\
+ ---------- MESSAGE FOLLOWS ----------
+ From: test
+ X-Peer: peer-address
+
+ hello
+ ------------ END MESSAGE ------------
+ """))
+
+ def test_process_message_with_decode_data_false(self):
+ server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0))
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr)
+ with support.captured_stdout() as s:
+ self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
+ stdout = s.getvalue()
+ self.assertEqual(stdout, textwrap.dedent("""\
+ ---------- MESSAGE FOLLOWS ----------
+ b'From: test'
+ b'X-Peer: peer-address'
+ b''
+ b'h\\xc3\\xa9llo\\xff'
+ ------------ END MESSAGE ------------
+ """))
+
+ def test_process_message_with_enable_SMTPUTF8_true(self):
+ server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
+ enable_SMTPUTF8=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
+ with support.captured_stdout() as s:
+ self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
+ stdout = s.getvalue()
+ self.assertEqual(stdout, textwrap.dedent("""\
+ ---------- MESSAGE FOLLOWS ----------
+ b'From: test'
+ b'X-Peer: peer-address'
+ b''
+ b'h\\xc3\\xa9llo\\xff'
+ ------------ END MESSAGE ------------
+ """))
+
+ def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
+ server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
+ enable_SMTPUTF8=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
+ with support.captured_stdout() as s:
+ self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n',
+ enable_SMTPUTF8=True)
+ stdout = s.getvalue()
+ self.assertEqual(stdout, textwrap.dedent("""\
+ ---------- MESSAGE FOLLOWS ----------
+ mail options: ['BODY=8BITMIME', 'SMTPUTF8']
+ b'From: test'
+ b'X-Peer: peer-address'
+ b''
+ b'h\\xc3\\xa9llo\\xff'
+ ------------ END MESSAGE ------------
+ """))
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+
+
+class TestFamilyDetection(unittest.TestCase):
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+
+ @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
+ def test_socket_uses_IPv6(self):
+ server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0))
+ self.assertEqual(server.socket.family, socket.AF_INET6)
+
+ def test_socket_uses_IPv4(self):
+ server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0))
+ self.assertEqual(server.socket.family, socket.AF_INET)
+
+
+class TestRcptOptionParsing(unittest.TestCase):
+ error_response = (b'555 RCPT TO parameters not recognized or not '
+ b'implemented\r\n')
+
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+ self.old_debugstream = smtpd.DEBUGSTREAM
+ self.debug = smtpd.DEBUGSTREAM = io.StringIO()
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+ smtpd.DEBUGSTREAM = self.old_debugstream
+
+ def write_line(self, channel, line):
+ channel.socket.queue_recv(line)
+ channel.handle_read()
+
+ def test_params_rejected(self):
+ server = DummyServer((socket_helper.HOST, 0), ('b', 0))
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr)
+ self.write_line(channel, b'EHLO example')
+ self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
+ self.write_line(channel, b'RCPT to: <foo@example.com> foo=bar')
+ self.assertEqual(channel.socket.last, self.error_response)
+
+ def test_nothing_accepted(self):
+ server = DummyServer((socket_helper.HOST, 0), ('b', 0))
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr)
+ self.write_line(channel, b'EHLO example')
+ self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
+ self.write_line(channel, b'RCPT to: <foo@example.com>')
+ self.assertEqual(channel.socket.last, b'250 OK\r\n')
+
+
+class TestMailOptionParsing(unittest.TestCase):
+ error_response = (b'555 MAIL FROM parameters not recognized or not '
+ b'implemented\r\n')
+
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+ self.old_debugstream = smtpd.DEBUGSTREAM
+ self.debug = smtpd.DEBUGSTREAM = io.StringIO()
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+ smtpd.DEBUGSTREAM = self.old_debugstream
+
+ def write_line(self, channel, line):
+ channel.socket.queue_recv(line)
+ channel.handle_read()
+
+ def test_with_decode_data_true(self):
+ server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
+ self.write_line(channel, b'EHLO example')
+ for line in [
+ b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
+ b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
+ b'MAIL from: <foo@example.com> size=20 BODY=UNKNOWN',
+ b'MAIL from: <foo@example.com> size=20 body=8bitmime',
+ ]:
+ self.write_line(channel, line)
+ self.assertEqual(channel.socket.last, self.error_response)
+ self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
+ self.assertEqual(channel.socket.last, b'250 OK\r\n')
+
+ def test_with_decode_data_false(self):
+ server = DummyServer((socket_helper.HOST, 0), ('b', 0))
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr)
+ self.write_line(channel, b'EHLO example')
+ for line in [
+ b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
+ b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
+ ]:
+ self.write_line(channel, line)
+ self.assertEqual(channel.socket.last, self.error_response)
+ self.write_line(
+ channel,
+ b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN')
+ self.assertEqual(
+ channel.socket.last,
+ b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n')
+ self.write_line(
+ channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime')
+ self.assertEqual(channel.socket.last, b'250 OK\r\n')
+
+ def test_with_enable_smtputf8_true(self):
+ server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True)
+ conn, addr = server.accept()
+ channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
+ self.write_line(channel, b'EHLO example')
+ self.write_line(
+ channel,
+ b'MAIL from: <foo@example.com> size=20 body=8bitmime smtputf8')
+ self.assertEqual(channel.socket.last, b'250 OK\r\n')
+
+
+class SMTPDChannelTest(unittest.TestCase):
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+ self.old_debugstream = smtpd.DEBUGSTREAM
+ self.debug = smtpd.DEBUGSTREAM = io.StringIO()
+ self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
+ decode_data=True)
+ conn, addr = self.server.accept()
+ self.channel = smtpd.SMTPChannel(self.server, conn, addr,
+ decode_data=True)
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+ smtpd.DEBUGSTREAM = self.old_debugstream
+
+ def write_line(self, line):
+ self.channel.socket.queue_recv(line)
+ self.channel.handle_read()
+
+ def test_broken_connect(self):
+ self.assertRaises(
+ DummyDispatcherBroken, BrokenDummyServer,
+ (socket_helper.HOST, 0), ('b', 0), decode_data=True)
+
+ def test_decode_data_and_enable_SMTPUTF8_raises(self):
+ self.assertRaises(
+ ValueError, smtpd.SMTPChannel,
+ self.server, self.channel.conn, self.channel.addr,
+ enable_SMTPUTF8=True, decode_data=True)
+
+ def test_server_accept(self):
+ self.server.handle_accept()
+
+ def test_missing_data(self):
+ self.write_line(b'')
+ self.assertEqual(self.channel.socket.last,
+ b'500 Error: bad syntax\r\n')
+
+ def test_EHLO(self):
+ self.write_line(b'EHLO example')
+ self.assertEqual(self.channel.socket.last, b'250 HELP\r\n')
+
+ def test_EHLO_bad_syntax(self):
+ self.write_line(b'EHLO')
+ self.assertEqual(self.channel.socket.last,
+ b'501 Syntax: EHLO hostname\r\n')
+
+ def test_EHLO_duplicate(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'EHLO example')
+ self.assertEqual(self.channel.socket.last,
+ b'503 Duplicate HELO/EHLO\r\n')
+
+ def test_EHLO_HELO_duplicate(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'HELO example')
+ self.assertEqual(self.channel.socket.last,
+ b'503 Duplicate HELO/EHLO\r\n')
+
+ def test_HELO(self):
+ name = smtpd.socket.getfqdn()
+ self.write_line(b'HELO example')
+ self.assertEqual(self.channel.socket.last,
+ '250 {}\r\n'.format(name).encode('ascii'))
+
+ def test_HELO_EHLO_duplicate(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'EHLO example')
+ self.assertEqual(self.channel.socket.last,
+ b'503 Duplicate HELO/EHLO\r\n')
+
+ def test_HELP(self):
+ self.write_line(b'HELP')
+ self.assertEqual(self.channel.socket.last,
+ b'250 Supported commands: EHLO HELO MAIL RCPT ' + \
+ b'DATA RSET NOOP QUIT VRFY\r\n')
+
+ def test_HELP_command(self):
+ self.write_line(b'HELP MAIL')
+ self.assertEqual(self.channel.socket.last,
+ b'250 Syntax: MAIL FROM: <address>\r\n')
+
+ def test_HELP_command_unknown(self):
+ self.write_line(b'HELP SPAM')
+ self.assertEqual(self.channel.socket.last,
+ b'501 Supported commands: EHLO HELO MAIL RCPT ' + \
+ b'DATA RSET NOOP QUIT VRFY\r\n')
+
+ def test_HELO_bad_syntax(self):
+ self.write_line(b'HELO')
+ self.assertEqual(self.channel.socket.last,
+ b'501 Syntax: HELO hostname\r\n')
+
+ def test_HELO_duplicate(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'HELO example')
+ self.assertEqual(self.channel.socket.last,
+ b'503 Duplicate HELO/EHLO\r\n')
+
+ def test_HELO_parameter_rejected_when_extensions_not_enabled(self):
+ self.extended_smtp = False
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL from:<foo@example.com> SIZE=1234')
+ self.assertEqual(self.channel.socket.last,
+ b'501 Syntax: MAIL FROM: <address>\r\n')
+
+ def test_MAIL_allows_space_after_colon(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL from: <foo@example.com>')
+ self.assertEqual(self.channel.socket.last,
+ b'250 OK\r\n')
+
+ def test_extended_MAIL_allows_space_after_colon(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'MAIL from: <foo@example.com> size=20')
+ self.assertEqual(self.channel.socket.last,
+ b'250 OK\r\n')
+
+ def test_NOOP(self):
+ self.write_line(b'NOOP')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ def test_HELO_NOOP(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'NOOP')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ def test_NOOP_bad_syntax(self):
+ self.write_line(b'NOOP hi')
+ self.assertEqual(self.channel.socket.last,
+ b'501 Syntax: NOOP\r\n')
+
+ def test_QUIT(self):
+ self.write_line(b'QUIT')
+ self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
+
+ def test_HELO_QUIT(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'QUIT')
+ self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
+
+ def test_QUIT_arg_ignored(self):
+ self.write_line(b'QUIT bye bye')
+ self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
+
+ def test_bad_state(self):
+ self.channel.smtp_state = 'BAD STATE'
+ self.write_line(b'HELO example')
+ self.assertEqual(self.channel.socket.last,
+ b'451 Internal confusion\r\n')
+
+ def test_command_too_long(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL from: ' +
+ b'a' * self.channel.command_size_limit +
+ b'@example')
+ self.assertEqual(self.channel.socket.last,
+ b'500 Error: line too long\r\n')
+
+ def test_MAIL_command_limit_extended_with_SIZE(self):
+ self.write_line(b'EHLO example')
+ fill_len = self.channel.command_size_limit - len('MAIL from:<@example>')
+ self.write_line(b'MAIL from:<' +
+ b'a' * fill_len +
+ b'@example> SIZE=1234')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ self.write_line(b'MAIL from:<' +
+ b'a' * (fill_len + 26) +
+ b'@example> SIZE=1234')
+ self.assertEqual(self.channel.socket.last,
+ b'500 Error: line too long\r\n')
+
+ def test_MAIL_command_rejects_SMTPUTF8_by_default(self):
+ self.write_line(b'EHLO example')
+ self.write_line(
+ b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8')
+ self.assertEqual(self.channel.socket.last[0:1], b'5')
+
+ def test_data_longer_than_default_data_size_limit(self):
+ # Hack the default so we don't have to generate so much data.
+ self.channel.data_size_limit = 1048
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.write_line(b'RCPT To:spam@example')
+ self.write_line(b'DATA')
+ self.write_line(b'A' * self.channel.data_size_limit +
+ b'A\r\n.')
+ self.assertEqual(self.channel.socket.last,
+ b'552 Error: Too much mail data\r\n')
+
+ def test_MAIL_size_parameter(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'MAIL FROM:<eggs@example> SIZE=512')
+ self.assertEqual(self.channel.socket.last,
+ b'250 OK\r\n')
+
+ def test_MAIL_invalid_size_parameter(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid')
+ self.assertEqual(self.channel.socket.last,
+ b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')
+
+ def test_MAIL_RCPT_unknown_parameters(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'MAIL FROM:<eggs@example> ham=green')
+ self.assertEqual(self.channel.socket.last,
+ b'555 MAIL FROM parameters not recognized or not implemented\r\n')
+
+ self.write_line(b'MAIL FROM:<eggs@example>')
+ self.write_line(b'RCPT TO:<eggs@example> ham=green')
+ self.assertEqual(self.channel.socket.last,
+ b'555 RCPT TO parameters not recognized or not implemented\r\n')
+
+ def test_MAIL_size_parameter_larger_than_default_data_size_limit(self):
+ self.channel.data_size_limit = 1048
+ self.write_line(b'EHLO example')
+ self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096')
+ self.assertEqual(self.channel.socket.last,
+ b'552 Error: message size exceeds fixed maximum message size\r\n')
+
+ def test_need_MAIL(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'RCPT to:spam@example')
+ self.assertEqual(self.channel.socket.last,
+ b'503 Error: need MAIL command\r\n')
+
+ def test_MAIL_syntax_HELO(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL from eggs@example')
+ self.assertEqual(self.channel.socket.last,
+ b'501 Syntax: MAIL FROM: <address>\r\n')
+
+ def test_MAIL_syntax_EHLO(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'MAIL from eggs@example')
+ self.assertEqual(self.channel.socket.last,
+ b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')
+
+ def test_MAIL_missing_address(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL from:')
+ self.assertEqual(self.channel.socket.last,
+ b'501 Syntax: MAIL FROM: <address>\r\n')
+
+ def test_MAIL_chevrons(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL from:<eggs@example>')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ def test_MAIL_empty_chevrons(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'MAIL from:<>')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ def test_MAIL_quoted_localpart(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'MAIL from: <"Fred Blogs"@example.com>')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
+
+ def test_MAIL_quoted_localpart_no_angles(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'MAIL from: "Fred Blogs"@example.com')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
+
+ def test_MAIL_quoted_localpart_with_size(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
+
+ def test_MAIL_quoted_localpart_with_size_no_angles(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
+
+ def test_nested_MAIL(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL from:eggs@example')
+ self.write_line(b'MAIL from:spam@example')
+ self.assertEqual(self.channel.socket.last,
+ b'503 Error: nested MAIL command\r\n')
+
+ def test_VRFY(self):
+ self.write_line(b'VRFY eggs@example')
+ self.assertEqual(self.channel.socket.last,
+ b'252 Cannot VRFY user, but will accept message and attempt ' + \
+ b'delivery\r\n')
+
+ def test_VRFY_syntax(self):
+ self.write_line(b'VRFY')
+ self.assertEqual(self.channel.socket.last,
+ b'501 Syntax: VRFY <address>\r\n')
+
+ def test_EXPN_not_implemented(self):
+ self.write_line(b'EXPN')
+ self.assertEqual(self.channel.socket.last,
+ b'502 EXPN not implemented\r\n')
+
+ def test_no_HELO_MAIL(self):
+ self.write_line(b'MAIL from:<foo@example.com>')
+ self.assertEqual(self.channel.socket.last,
+ b'503 Error: send HELO first\r\n')
+
+ def test_need_RCPT(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.write_line(b'DATA')
+ self.assertEqual(self.channel.socket.last,
+ b'503 Error: need RCPT command\r\n')
+
+ def test_RCPT_syntax_HELO(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From: eggs@example')
+ self.write_line(b'RCPT to eggs@example')
+ self.assertEqual(self.channel.socket.last,
+ b'501 Syntax: RCPT TO: <address>\r\n')
+
+ def test_RCPT_syntax_EHLO(self):
+ self.write_line(b'EHLO example')
+ self.write_line(b'MAIL From: eggs@example')
+ self.write_line(b'RCPT to eggs@example')
+ self.assertEqual(self.channel.socket.last,
+ b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n')
+
+ def test_RCPT_lowercase_to_OK(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From: eggs@example')
+ self.write_line(b'RCPT to: <eggs@example>')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ def test_no_HELO_RCPT(self):
+ self.write_line(b'RCPT to eggs@example')
+ self.assertEqual(self.channel.socket.last,
+ b'503 Error: send HELO first\r\n')
+
+ def test_data_dialog(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ self.write_line(b'RCPT To:spam@example')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ self.write_line(b'DATA')
+ self.assertEqual(self.channel.socket.last,
+ b'354 End data with <CR><LF>.<CR><LF>\r\n')
+ self.write_line(b'data\r\nmore\r\n.')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ self.assertEqual(self.server.messages,
+ [(('peer-address', 'peer-port'),
+ 'eggs@example',
+ ['spam@example'],
+ 'data\nmore')])
+
+ def test_DATA_syntax(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.write_line(b'RCPT To:spam@example')
+ self.write_line(b'DATA spam')
+ self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n')
+
+ def test_no_HELO_DATA(self):
+ self.write_line(b'DATA spam')
+ self.assertEqual(self.channel.socket.last,
+ b'503 Error: send HELO first\r\n')
+
+ def test_data_transparency_section_4_5_2(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.write_line(b'RCPT To:spam@example')
+ self.write_line(b'DATA')
+ self.write_line(b'..\r\n.\r\n')
+ self.assertEqual(self.channel.received_data, '.')
+
+ def test_multiple_RCPT(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.write_line(b'RCPT To:spam@example')
+ self.write_line(b'RCPT To:ham@example')
+ self.write_line(b'DATA')
+ self.write_line(b'data\r\n.')
+ self.assertEqual(self.server.messages,
+ [(('peer-address', 'peer-port'),
+ 'eggs@example',
+ ['spam@example','ham@example'],
+ 'data')])
+
+ def test_manual_status(self):
+ # checks that the Channel is able to return a custom status message
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.write_line(b'RCPT To:spam@example')
+ self.write_line(b'DATA')
+ self.write_line(b'return status\r\n.')
+ self.assertEqual(self.channel.socket.last, b'250 Okish\r\n')
+
+ def test_RSET(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.write_line(b'RCPT To:spam@example')
+ self.write_line(b'RSET')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ self.write_line(b'MAIL From:foo@example')
+ self.write_line(b'RCPT To:eggs@example')
+ self.write_line(b'DATA')
+ self.write_line(b'data\r\n.')
+ self.assertEqual(self.server.messages,
+ [(('peer-address', 'peer-port'),
+ 'foo@example',
+ ['eggs@example'],
+ 'data')])
+
+ def test_HELO_RSET(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'RSET')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ def test_RSET_syntax(self):
+ self.write_line(b'RSET hi')
+ self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n')
+
+ def test_unknown_command(self):
+ self.write_line(b'UNKNOWN_CMD')
+ self.assertEqual(self.channel.socket.last,
+ b'500 Error: command "UNKNOWN_CMD" not ' + \
+ b'recognized\r\n')
+
+ def test_attribute_deprecations(self):
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ spam = self.channel._SMTPChannel__server
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ self.channel._SMTPChannel__server = 'spam'
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ spam = self.channel._SMTPChannel__line
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ self.channel._SMTPChannel__line = 'spam'
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ spam = self.channel._SMTPChannel__state
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ self.channel._SMTPChannel__state = 'spam'
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ spam = self.channel._SMTPChannel__greeting
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ self.channel._SMTPChannel__greeting = 'spam'
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ spam = self.channel._SMTPChannel__mailfrom
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ self.channel._SMTPChannel__mailfrom = 'spam'
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ spam = self.channel._SMTPChannel__rcpttos
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ self.channel._SMTPChannel__rcpttos = 'spam'
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ spam = self.channel._SMTPChannel__data
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ self.channel._SMTPChannel__data = 'spam'
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ spam = self.channel._SMTPChannel__fqdn
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ self.channel._SMTPChannel__fqdn = 'spam'
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ spam = self.channel._SMTPChannel__peer
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ self.channel._SMTPChannel__peer = 'spam'
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ spam = self.channel._SMTPChannel__conn
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ self.channel._SMTPChannel__conn = 'spam'
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ spam = self.channel._SMTPChannel__addr
+ with warnings_helper.check_warnings(('', DeprecationWarning)):
+ self.channel._SMTPChannel__addr = 'spam'
+
+@unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
+class SMTPDChannelIPv6Test(SMTPDChannelTest):
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+ self.old_debugstream = smtpd.DEBUGSTREAM
+ self.debug = smtpd.DEBUGSTREAM = io.StringIO()
+ self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0),
+ decode_data=True)
+ conn, addr = self.server.accept()
+ self.channel = smtpd.SMTPChannel(self.server, conn, addr,
+ decode_data=True)
+
+class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
+
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+ self.old_debugstream = smtpd.DEBUGSTREAM
+ self.debug = smtpd.DEBUGSTREAM = io.StringIO()
+ self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
+ decode_data=True)
+ conn, addr = self.server.accept()
+ # Set DATA size limit to 32 bytes for easy testing
+ self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32,
+ decode_data=True)
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+ smtpd.DEBUGSTREAM = self.old_debugstream
+
+ def write_line(self, line):
+ self.channel.socket.queue_recv(line)
+ self.channel.handle_read()
+
+ def test_data_limit_dialog(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ self.write_line(b'RCPT To:spam@example')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ self.write_line(b'DATA')
+ self.assertEqual(self.channel.socket.last,
+ b'354 End data with <CR><LF>.<CR><LF>\r\n')
+ self.write_line(b'data\r\nmore\r\n.')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ self.assertEqual(self.server.messages,
+ [(('peer-address', 'peer-port'),
+ 'eggs@example',
+ ['spam@example'],
+ 'data\nmore')])
+
+ def test_data_limit_dialog_too_much_data(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ self.write_line(b'RCPT To:spam@example')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ self.write_line(b'DATA')
+ self.assertEqual(self.channel.socket.last,
+ b'354 End data with <CR><LF>.<CR><LF>\r\n')
+ self.write_line(b'This message is longer than 32 bytes\r\n.')
+ self.assertEqual(self.channel.socket.last,
+ b'552 Error: Too much mail data\r\n')
+
+
+class SMTPDChannelWithDecodeDataFalse(unittest.TestCase):
+
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+ self.old_debugstream = smtpd.DEBUGSTREAM
+ self.debug = smtpd.DEBUGSTREAM = io.StringIO()
+ self.server = DummyServer((socket_helper.HOST, 0), ('b', 0))
+ conn, addr = self.server.accept()
+ self.channel = smtpd.SMTPChannel(self.server, conn, addr)
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+ smtpd.DEBUGSTREAM = self.old_debugstream
+
+ def write_line(self, line):
+ self.channel.socket.queue_recv(line)
+ self.channel.handle_read()
+
+ def test_ascii_data(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.write_line(b'RCPT To:spam@example')
+ self.write_line(b'DATA')
+ self.write_line(b'plain ascii text')
+ self.write_line(b'.')
+ self.assertEqual(self.channel.received_data, b'plain ascii text')
+
+ def test_utf8_data(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.write_line(b'RCPT To:spam@example')
+ self.write_line(b'DATA')
+ self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
+ self.write_line(b'and some plain ascii')
+ self.write_line(b'.')
+ self.assertEqual(
+ self.channel.received_data,
+ b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n'
+ b'and some plain ascii')
+
+
+class SMTPDChannelWithDecodeDataTrue(unittest.TestCase):
+
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+ self.old_debugstream = smtpd.DEBUGSTREAM
+ self.debug = smtpd.DEBUGSTREAM = io.StringIO()
+ self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
+ decode_data=True)
+ conn, addr = self.server.accept()
+ # Set decode_data to True
+ self.channel = smtpd.SMTPChannel(self.server, conn, addr,
+ decode_data=True)
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+ smtpd.DEBUGSTREAM = self.old_debugstream
+
+ def write_line(self, line):
+ self.channel.socket.queue_recv(line)
+ self.channel.handle_read()
+
+ def test_ascii_data(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.write_line(b'RCPT To:spam@example')
+ self.write_line(b'DATA')
+ self.write_line(b'plain ascii text')
+ self.write_line(b'.')
+ self.assertEqual(self.channel.received_data, 'plain ascii text')
+
+ def test_utf8_data(self):
+ self.write_line(b'HELO example')
+ self.write_line(b'MAIL From:eggs@example')
+ self.write_line(b'RCPT To:spam@example')
+ self.write_line(b'DATA')
+ self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
+ self.write_line(b'and some plain ascii')
+ self.write_line(b'.')
+ self.assertEqual(
+ self.channel.received_data,
+ 'utf8 enriched text: żźć\nand some plain ascii')
+
+
+class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase):
+ def setUp(self):
+ smtpd.socket = asyncore.socket = mock_socket
+ self.old_debugstream = smtpd.DEBUGSTREAM
+ self.debug = smtpd.DEBUGSTREAM = io.StringIO()
+ self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
+ enable_SMTPUTF8=True)
+ conn, addr = self.server.accept()
+ self.channel = smtpd.SMTPChannel(self.server, conn, addr,
+ enable_SMTPUTF8=True)
+
+ def tearDown(self):
+ asyncore.close_all()
+ asyncore.socket = smtpd.socket = socket
+ smtpd.DEBUGSTREAM = self.old_debugstream
+
+ def write_line(self, line):
+ self.channel.socket.queue_recv(line)
+ self.channel.handle_read()
+
+ def test_MAIL_command_accepts_SMTPUTF8_when_announced(self):
+ self.write_line(b'EHLO example')
+ self.write_line(
+ 'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode(
+ 'utf-8')
+ )
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ def test_process_smtputf8_message(self):
+ self.write_line(b'EHLO example')
+ for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']:
+ self.write_line(b'MAIL from: <a@example> ' + mail_parameters)
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'rcpt to:<b@example.com>')
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'data')
+ self.assertEqual(self.channel.socket.last[0:3], b'354')
+ self.write_line(b'c\r\n.')
+ if mail_parameters == b'':
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+ else:
+ self.assertEqual(self.channel.socket.last,
+ b'250 SMTPUTF8 message okish\r\n')
+
+ def test_utf8_data(self):
+ self.write_line(b'EHLO example')
+ self.write_line(
+ 'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8'))
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line('RCPT To:späm@examplé'.encode('utf-8'))
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'DATA')
+ self.assertEqual(self.channel.socket.last[0:3], b'354')
+ self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
+ self.write_line(b'.')
+ self.assertEqual(
+ self.channel.received_data,
+ b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
+
+ def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self):
+ self.write_line(b'ehlo example')
+ fill_len = (512 + 26 + 10) - len('mail from:<@example>')
+ self.write_line(b'MAIL from:<' +
+ b'a' * (fill_len + 1) +
+ b'@example>')
+ self.assertEqual(self.channel.socket.last,
+ b'500 Error: line too long\r\n')
+ self.write_line(b'MAIL from:<' +
+ b'a' * fill_len +
+ b'@example>')
+ self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
+
+ def test_multiple_emails_with_extended_command_length(self):
+ self.write_line(b'ehlo example')
+ fill_len = (512 + 26 + 10) - len('mail from:<@example>')
+ for char in [b'a', b'b', b'c']:
+ self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>')
+ self.assertEqual(self.channel.socket.last[0:3], b'500')
+ self.write_line(b'MAIL from:<' + char * fill_len + b'@example>')
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'rcpt to:<hans@example.com>')
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+ self.write_line(b'data')
+ self.assertEqual(self.channel.socket.last[0:3], b'354')
+ self.write_line(b'test\r\n.')
+ self.assertEqual(self.channel.socket.last[0:3], b'250')
+
+
+class MiscTestCase(unittest.TestCase):
+ def test__all__(self):
+ not_exported = {
+ "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE",
+ "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs",
+ }
+ support.check__all__(self, smtpd, not_exported=not_exported)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
index bdce3e3..9761a37 100644
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -18,13 +18,17 @@ import threading
import unittest
from test import support, mock_socket
-from test.support import _asyncore as asyncore
-from test.support import _smtpd as smtpd
from test.support import hashlib_helper
from test.support import socket_helper
from test.support import threading_helper
from unittest.mock import Mock
+import warnings
+with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ import asyncore
+ import smtpd
+
HOST = socket_helper.HOST
if sys.platform == 'darwin':
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 88eeb07..981e2fe 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -4,7 +4,6 @@ import sys
import unittest
import unittest.mock
from test import support
-from test.support import _asyncore as asyncore
from test.support import import_helper
from test.support import os_helper
from test.support import socket_helper
@@ -31,6 +30,10 @@ try:
except ImportError:
ctypes = None
+import warnings
+with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ import asyncore
ssl = import_helper.import_module("ssl")
import _ssl