diff options
author | Serhiy Storchaka <storchaka@gmail.com> | 2020-04-25 07:06:29 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-25 07:06:29 (GMT) |
commit | 16994912c93e8e5db7365d48b75d67d3f70dd7b2 (patch) | |
tree | 248f177a93676406af6d6ae977bed868aa2d1a34 /Lib | |
parent | 3c8a5b459d68b4337776ada1e04f5b33f90a2275 (diff) | |
download | cpython-16994912c93e8e5db7365d48b75d67d3f70dd7b2.zip cpython-16994912c93e8e5db7365d48b75d67d3f70dd7b2.tar.gz cpython-16994912c93e8e5db7365d48b75d67d3f70dd7b2.tar.bz2 |
bpo-40275: Avoid importing socket in test.support (GH-19603)
* Move socket related functions from test.support to socket_helper.
* Import socket, nntplib and urllib.error lazily in transient_internet().
* Remove importing multiprocess.
Diffstat (limited to 'Lib')
36 files changed, 401 insertions, 370 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 376f5e3..083ad53 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -26,6 +26,7 @@ import warnings import test.support import test.support.script_helper from test import support +from test.support import socket_helper # Skip tests if _multiprocessing wasn't built. @@ -2928,7 +2929,7 @@ class _TestRemoteManager(BaseTestCase): authkey = os.urandom(32) manager = QueueManager( - address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER + address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER ) manager.start() self.addCleanup(manager.shutdown) @@ -2965,7 +2966,7 @@ class _TestManagerRestart(BaseTestCase): def test_rapid_restart(self): authkey = os.urandom(32) manager = QueueManager( - address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) + address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER) try: srvr = manager.get_server() addr = srvr.address @@ -3455,7 +3456,7 @@ class _TestPicklingConnections(BaseTestCase): new_conn.close() l.close() - l = socket.create_server((test.support.HOST, 0)) + l = socket.create_server((socket_helper.HOST, 0)) conn.send(l.getsockname()) new_conn, addr = l.accept() conn.send(new_conn) @@ -4593,7 +4594,7 @@ class TestWait(unittest.TestCase): def test_wait_socket(self, slow=False): from multiprocessing.connection import wait - l = socket.create_server((test.support.HOST, 0)) + l = socket.create_server((socket_helper.HOST, 0)) addr = l.getsockname() readers = [] procs = [] diff --git a/Lib/test/eintrdata/eintr_tester.py b/Lib/test/eintrdata/eintr_tester.py index 4e0e305..606f31b 100644 --- a/Lib/test/eintrdata/eintr_tester.py +++ b/Lib/test/eintrdata/eintr_tester.py @@ -22,6 +22,7 @@ import time import unittest from test import support +from test.support import socket_helper @contextlib.contextmanager def kill_on_error(proc): @@ -283,14 +284,14 @@ class SocketEINTRTest(EINTRBaseTest): self._test_send(lambda sock, data: sock.sendmsg([data])) def test_accept(self): - sock = socket.create_server((support.HOST, 0)) + sock = socket.create_server((socket_helper.HOST, 0)) self.addCleanup(sock.close) port = sock.getsockname()[1] code = '\n'.join(( 'import socket, time', '', - 'host = %r' % support.HOST, + 'host = %r' % socket_helper.HOST, 'port = %s' % port, 'sleep_time = %r' % self.sleep_time, '', diff --git a/Lib/test/ssl_servers.py b/Lib/test/ssl_servers.py index 2e7e235..a4bd745 100644 --- a/Lib/test/ssl_servers.py +++ b/Lib/test/ssl_servers.py @@ -9,10 +9,11 @@ from http.server import (HTTPServer as _HTTPServer, SimpleHTTPRequestHandler, BaseHTTPRequestHandler) from test import support +from test.support import socket_helper here = os.path.dirname(__file__) -HOST = support.HOST +HOST = socket_helper.HOST CERTFILE = os.path.join(here, 'keycert.pem') # This one's based on HTTPServer, which is based on socketserver diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 2b3a414..15cf45d 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -16,12 +16,10 @@ import importlib import importlib.util import locale import logging.handlers -import nntplib import os import platform import re import shutil -import socket import stat import struct import subprocess @@ -33,17 +31,11 @@ import threading import time import types import unittest -import urllib.error import warnings from .testresult import get_test_runner try: - import multiprocessing.process -except ImportError: - multiprocessing = None - -try: import zlib except ImportError: zlib = None @@ -98,14 +90,13 @@ __all__ = [ "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", "requires_IEEE_754", "skip_unless_xattr", "requires_zlib", "anticipate_failure", "load_package_tests", "detect_api_mismatch", - "check__all__", "skip_unless_bind_unix_socket", "skip_if_buggy_ucrt_strfptime", + "check__all__", "skip_if_buggy_ucrt_strfptime", "ignore_warnings", # sys "is_jython", "is_android", "check_impl_detail", "unix_shell", "setswitchinterval", # network - "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource", - "bind_unix_socket", + "open_urlresource", # processes 'temp_umask', "reap_children", # logging @@ -727,135 +718,6 @@ def requires_hashdigest(digestname, openssl=None, usedforsecurity=True): return decorator -HOST = "localhost" -HOSTv4 = "127.0.0.1" -HOSTv6 = "::1" - - -def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): - """Returns an unused port that should be suitable for binding. This is - achieved by creating a temporary socket with the same family and type as - the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to - the specified host address (defaults to 0.0.0.0) with the port set to 0, - eliciting an unused ephemeral port from the OS. The temporary socket is - then closed and deleted, and the ephemeral port is returned. - - Either this method or bind_port() should be used for any tests where a - server socket needs to be bound to a particular port for the duration of - the test. Which one to use depends on whether the calling code is creating - a python socket, or if an unused port needs to be provided in a constructor - or passed to an external program (i.e. the -accept argument to openssl's - s_server mode). Always prefer bind_port() over find_unused_port() where - possible. Hard coded ports should *NEVER* be used. As soon as a server - socket is bound to a hard coded port, the ability to run multiple instances - of the test simultaneously on the same host is compromised, which makes the - test a ticking time bomb in a buildbot environment. On Unix buildbots, this - may simply manifest as a failed test, which can be recovered from without - intervention in most cases, but on Windows, the entire python process can - completely and utterly wedge, requiring someone to log in to the buildbot - and manually kill the affected process. - - (This is easy to reproduce on Windows, unfortunately, and can be traced to - the SO_REUSEADDR socket option having different semantics on Windows versus - Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, - listen and then accept connections on identical host/ports. An EADDRINUSE - OSError will be raised at some point (depending on the platform and - the order bind and listen were called on each socket). - - However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE - will ever be raised when attempting to bind two identical host/ports. When - accept() is called on each socket, the second caller's process will steal - the port from the first caller, leaving them both in an awkwardly wedged - state where they'll no longer respond to any signals or graceful kills, and - must be forcibly killed via OpenProcess()/TerminateProcess(). - - The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option - instead of SO_REUSEADDR, which effectively affords the same semantics as - SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open - Source world compared to Windows ones, this is a common mistake. A quick - look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when - openssl.exe is called with the 's_server' option, for example. See - http://bugs.python.org/issue2550 for more info. The following site also - has a very thorough description about the implications of both REUSEADDR - and EXCLUSIVEADDRUSE on Windows: - http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) - - XXX: although this approach is a vast improvement on previous attempts to - elicit unused ports, it rests heavily on the assumption that the ephemeral - port returned to us by the OS won't immediately be dished back out to some - other process when we close and delete our temporary socket but before our - calling code has a chance to bind the returned port. We can deal with this - issue if/when we come across it. - """ - - with socket.socket(family, socktype) as tempsock: - port = bind_port(tempsock) - del tempsock - return port - -def bind_port(sock, host=HOST): - """Bind the socket to a free port and return the port number. Relies on - ephemeral ports in order to ensure we are using an unbound port. This is - important as many tests may be running simultaneously, especially in a - buildbot environment. This method raises an exception if the sock.family - is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR - or SO_REUSEPORT set on it. Tests should *never* set these socket options - for TCP/IP sockets. The only case for setting these options is testing - multicasting via multiple UDP sockets. - - Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. - on Windows), it will be set on the socket. This will prevent anyone else - from bind()'ing to our host/port for the duration of the test. - """ - - if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: - if hasattr(socket, 'SO_REUSEADDR'): - if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: - raise TestFailed("tests should never set the SO_REUSEADDR " \ - "socket option on TCP/IP sockets!") - if hasattr(socket, 'SO_REUSEPORT'): - try: - if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: - raise TestFailed("tests should never set the SO_REUSEPORT " \ - "socket option on TCP/IP sockets!") - except OSError: - # Python's socket module was compiled using modern headers - # thus defining SO_REUSEPORT but this process is running - # under an older kernel that does not support SO_REUSEPORT. - pass - if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): - sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) - - sock.bind((host, 0)) - port = sock.getsockname()[1] - return port - -def bind_unix_socket(sock, addr): - """Bind a unix socket, raising SkipTest if PermissionError is raised.""" - assert sock.family == socket.AF_UNIX - try: - sock.bind(addr) - except PermissionError: - sock.close() - raise unittest.SkipTest('cannot bind AF_UNIX sockets') - -def _is_ipv6_enabled(): - """Check whether IPv6 is enabled on this host.""" - if socket.has_ipv6: - sock = None - try: - sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - sock.bind((HOSTv6, 0)) - return True - except OSError: - pass - finally: - if sock: - sock.close() - return False - -IPV6_ENABLED = _is_ipv6_enabled() - def system_must_validate_cert(f): """Skip the test on TLS certificate validation failures.""" @functools.wraps(f) @@ -1563,31 +1425,13 @@ socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) -def get_socket_conn_refused_errs(): - """ - Get the different socket error numbers ('errno') which can be received - when a connection is refused. - """ - errors = [errno.ECONNREFUSED] - if hasattr(errno, 'ENETUNREACH'): - # On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED - errors.append(errno.ENETUNREACH) - if hasattr(errno, 'EADDRNOTAVAIL'): - # bpo-31910: socket.create_connection() fails randomly - # with EADDRNOTAVAIL on Travis CI - errors.append(errno.EADDRNOTAVAIL) - if hasattr(errno, 'EHOSTUNREACH'): - # bpo-37583: The destination host cannot be reached - errors.append(errno.EHOSTUNREACH) - if not IPV6_ENABLED: - errors.append(errno.EAFNOSUPPORT) - return errors - - @contextlib.contextmanager def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()): """Return a context manager that raises ResourceDenied when various issues with the Internet connection manifest themselves as exceptions.""" + import socket + import nntplib + import urllib.error if timeout is _NOT_SET: timeout = INTERNET_TIMEOUT @@ -2754,28 +2598,6 @@ def skip_if_pgo_task(test): msg = "Not run for (non-extended) PGO task" return test if ok else unittest.skip(msg)(test) -_bind_nix_socket_error = None -def skip_unless_bind_unix_socket(test): - """Decorator for tests requiring a functional bind() for unix sockets.""" - if not hasattr(socket, 'AF_UNIX'): - return unittest.skip('No UNIX Sockets')(test) - global _bind_nix_socket_error - if _bind_nix_socket_error is None: - path = TESTFN + "can_bind_unix_socket" - with socket.socket(socket.AF_UNIX) as sock: - try: - sock.bind(path) - _bind_nix_socket_error = False - except OSError as e: - _bind_nix_socket_error = e - finally: - unlink(path) - if _bind_nix_socket_error: - msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error - return unittest.skip(msg)(test) - else: - return test - def fs_is_case_insensitive(directory): """Detects if the file system for the specified directory is case-insensitive.""" diff --git a/Lib/test/support/socket_helper.py b/Lib/test/support/socket_helper.py new file mode 100644 index 0000000..5f4a7f1 --- /dev/null +++ b/Lib/test/support/socket_helper.py @@ -0,0 +1,177 @@ +import errno +import socket +import unittest + +HOST = "localhost" +HOSTv4 = "127.0.0.1" +HOSTv6 = "::1" + + +def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): + """Returns an unused port that should be suitable for binding. This is + achieved by creating a temporary socket with the same family and type as + the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to + the specified host address (defaults to 0.0.0.0) with the port set to 0, + eliciting an unused ephemeral port from the OS. The temporary socket is + then closed and deleted, and the ephemeral port is returned. + + Either this method or bind_port() should be used for any tests where a + server socket needs to be bound to a particular port for the duration of + the test. Which one to use depends on whether the calling code is creating + a python socket, or if an unused port needs to be provided in a constructor + or passed to an external program (i.e. the -accept argument to openssl's + s_server mode). Always prefer bind_port() over find_unused_port() where + possible. Hard coded ports should *NEVER* be used. As soon as a server + socket is bound to a hard coded port, the ability to run multiple instances + of the test simultaneously on the same host is compromised, which makes the + test a ticking time bomb in a buildbot environment. On Unix buildbots, this + may simply manifest as a failed test, which can be recovered from without + intervention in most cases, but on Windows, the entire python process can + completely and utterly wedge, requiring someone to log in to the buildbot + and manually kill the affected process. + + (This is easy to reproduce on Windows, unfortunately, and can be traced to + the SO_REUSEADDR socket option having different semantics on Windows versus + Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, + listen and then accept connections on identical host/ports. An EADDRINUSE + OSError will be raised at some point (depending on the platform and + the order bind and listen were called on each socket). + + However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE + will ever be raised when attempting to bind two identical host/ports. When + accept() is called on each socket, the second caller's process will steal + the port from the first caller, leaving them both in an awkwardly wedged + state where they'll no longer respond to any signals or graceful kills, and + must be forcibly killed via OpenProcess()/TerminateProcess(). + + The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option + instead of SO_REUSEADDR, which effectively affords the same semantics as + SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open + Source world compared to Windows ones, this is a common mistake. A quick + look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when + openssl.exe is called with the 's_server' option, for example. See + http://bugs.python.org/issue2550 for more info. The following site also + has a very thorough description about the implications of both REUSEADDR + and EXCLUSIVEADDRUSE on Windows: + http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) + + XXX: although this approach is a vast improvement on previous attempts to + elicit unused ports, it rests heavily on the assumption that the ephemeral + port returned to us by the OS won't immediately be dished back out to some + other process when we close and delete our temporary socket but before our + calling code has a chance to bind the returned port. We can deal with this + issue if/when we come across it. + """ + + with socket.socket(family, socktype) as tempsock: + port = bind_port(tempsock) + del tempsock + return port + +def bind_port(sock, host=HOST): + """Bind the socket to a free port and return the port number. Relies on + ephemeral ports in order to ensure we are using an unbound port. This is + important as many tests may be running simultaneously, especially in a + buildbot environment. This method raises an exception if the sock.family + is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR + or SO_REUSEPORT set on it. Tests should *never* set these socket options + for TCP/IP sockets. The only case for setting these options is testing + multicasting via multiple UDP sockets. + + Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. + on Windows), it will be set on the socket. This will prevent anyone else + from bind()'ing to our host/port for the duration of the test. + """ + + if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: + if hasattr(socket, 'SO_REUSEADDR'): + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: + raise TestFailed("tests should never set the SO_REUSEADDR " \ + "socket option on TCP/IP sockets!") + if hasattr(socket, 'SO_REUSEPORT'): + try: + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: + raise TestFailed("tests should never set the SO_REUSEPORT " \ + "socket option on TCP/IP sockets!") + except OSError: + # Python's socket module was compiled using modern headers + # thus defining SO_REUSEPORT but this process is running + # under an older kernel that does not support SO_REUSEPORT. + pass + if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) + + sock.bind((host, 0)) + port = sock.getsockname()[1] + return port + +def bind_unix_socket(sock, addr): + """Bind a unix socket, raising SkipTest if PermissionError is raised.""" + assert sock.family == socket.AF_UNIX + try: + sock.bind(addr) + except PermissionError: + sock.close() + raise unittest.SkipTest('cannot bind AF_UNIX sockets') + +def _is_ipv6_enabled(): + """Check whether IPv6 is enabled on this host.""" + if socket.has_ipv6: + sock = None + try: + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.bind((HOSTv6, 0)) + return True + except OSError: + pass + finally: + if sock: + sock.close() + return False + +IPV6_ENABLED = _is_ipv6_enabled() + + +_bind_nix_socket_error = None +def skip_unless_bind_unix_socket(test): + """Decorator for tests requiring a functional bind() for unix sockets.""" + if not hasattr(socket, 'AF_UNIX'): + return unittest.skip('No UNIX Sockets')(test) + global _bind_nix_socket_error + if _bind_nix_socket_error is None: + from test.support import TESTFN, unlink + path = TESTFN + "can_bind_unix_socket" + with socket.socket(socket.AF_UNIX) as sock: + try: + sock.bind(path) + _bind_nix_socket_error = False + except OSError as e: + _bind_nix_socket_error = e + finally: + unlink(path) + if _bind_nix_socket_error: + msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error + return unittest.skip(msg)(test) + else: + return test + + +def get_socket_conn_refused_errs(): + """ + Get the different socket error numbers ('errno') which can be received + when a connection is refused. + """ + errors = [errno.ECONNREFUSED] + if hasattr(errno, 'ENETUNREACH'): + # On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED + errors.append(errno.ENETUNREACH) + if hasattr(errno, 'EADDRNOTAVAIL'): + # bpo-31910: socket.create_connection() fails randomly + # with EADDRNOTAVAIL on Travis CI + errors.append(errno.EADDRNOTAVAIL) + if hasattr(errno, 'EHOSTUNREACH'): + # bpo-37583: The destination host cannot be reached + errors.append(errno.EHOSTUNREACH) + if not IPV6_ENABLED: + errors.append(errno.EAFNOSUPPORT) + return errors diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py index ce85057..004d368 100644 --- a/Lib/test/test_asynchat.py +++ b/Lib/test/test_asynchat.py @@ -1,6 +1,7 @@ # test asynchat from test import support +from test.support import socket_helper import asynchat import asyncore @@ -12,7 +13,7 @@ import time import unittest import unittest.mock -HOST = support.HOST +HOST = socket_helper.HOST SERVER_QUIT = b'QUIT\n' @@ -25,7 +26,7 @@ class echo_server(threading.Thread): threading.Thread.__init__(self) self.event = event self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.port = support.bind_port(self.sock) + self.port = socket_helper.bind_port(self.sock) # This will be set if the client wants us to wait before echoing # data back. self.start_resend_event = None diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index fc832d3..4adcbf4 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -17,6 +17,7 @@ from asyncio import constants from test.test_asyncio import utils as test_utils from test import support from test.support.script_helper import assert_python_ok +from test.support import socket_helper MOCK_ANY = mock.ANY @@ -91,7 +92,7 @@ class BaseEventTests(test_utils.TestCase): self.assertIsNone( base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0)) - if support.IPV6_ENABLED: + if socket_helper.IPV6_ENABLED: # IPv4 address with family IPv6. self.assertIsNone( base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP)) @@ -1156,7 +1157,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): srv.close() self.loop.run_until_complete(srv.wait_closed()) - @unittest.skipUnless(support.IPV6_ENABLED, 'no IPv6 support') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') def test_create_server_ipv6(self): async def main(): with self.assertWarns(DeprecationWarning): @@ -1288,7 +1289,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): t.close() test_utils.run_briefly(self.loop) # allow transport to close - if support.IPV6_ENABLED: + if socket_helper.IPV6_ENABLED: sock.family = socket.AF_INET6 coro = self.loop.create_connection(asyncio.Protocol, '::1', 80) t, p = self.loop.run_until_complete(coro) @@ -1307,7 +1308,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): t.close() test_utils.run_briefly(self.loop) # allow transport to close - @unittest.skipUnless(support.IPV6_ENABLED, 'no IPv6 support') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') @unittest.skipIf(sys.platform.startswith('aix'), "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX") @patch_socket @@ -1639,7 +1640,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): self.assertRaises( OSError, self.loop.run_until_complete, coro) - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') def test_create_datagram_endpoint_no_matching_family(self): coro = self.loop.create_datagram_endpoint( asyncio.DatagramProtocol, @@ -1700,7 +1701,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): self.loop.run_until_complete(protocol.done) self.assertEqual('CLOSED', protocol.state) - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket def test_create_datagram_endpoint_existing_sock_unix(self): with test_utils.unix_socket_path() as path: sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM) @@ -2015,7 +2016,7 @@ class BaseLoopSockSendfileTests(test_utils.TestCase): sock = self.make_socket() proto = self.MyProto(self.loop) server = self.run_loop(self.loop.create_server( - lambda: proto, support.HOST, 0, family=socket.AF_INET)) + lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET)) addr = server.sockets[0].getsockname() for _ in range(10): diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 1f71c1f..aa4daca 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -32,6 +32,7 @@ from asyncio import proactor_events from asyncio import selector_events from test.test_asyncio import utils as test_utils from test import support +from test.support import socket_helper from test.support import ALWAYS_EQ, LARGEST, SMALLEST @@ -517,7 +518,7 @@ class EventLoopTestsMixin: lambda: MyProto(loop=self.loop), *httpd.address) self._basetest_create_connection(conn_fut) - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket def test_create_unix_connection(self): # Issue #20682: On Mac OS X Tiger, getsockname() returns a # zero-length address for UNIX socket. @@ -619,7 +620,7 @@ class EventLoopTestsMixin: self._test_create_ssl_connection(httpd, create_connection, peername=httpd.address) - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket @unittest.skipIf(ssl is None, 'No ssl module') def test_create_ssl_unix_connection(self): # Issue #20682: On Mac OS X Tiger, getsockname() returns a @@ -638,7 +639,7 @@ class EventLoopTestsMixin: def test_create_connection_local_addr(self): with test_utils.run_test_server() as httpd: - port = support.find_unused_port() + port = socket_helper.find_unused_port() f = self.loop.create_connection( lambda: MyProto(loop=self.loop), *httpd.address, local_addr=(httpd.address[0], port)) @@ -843,7 +844,7 @@ class EventLoopTestsMixin: return server, path - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket def test_create_unix_server(self): proto = MyProto(loop=self.loop) server, path = self._make_unix_server(lambda: proto) @@ -935,7 +936,7 @@ class EventLoopTestsMixin: # stop serving server.close() - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket @unittest.skipIf(ssl is None, 'No ssl module') def test_create_unix_server_ssl(self): proto = MyProto(loop=self.loop) @@ -995,7 +996,7 @@ class EventLoopTestsMixin: self.assertIsNone(proto.transport) server.close() - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket @unittest.skipIf(ssl is None, 'No ssl module') def test_create_unix_server_ssl_verify_failed(self): proto = MyProto(loop=self.loop) @@ -1055,7 +1056,7 @@ class EventLoopTestsMixin: self.assertIsNone(proto.transport) server.close() - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket @unittest.skipIf(ssl is None, 'No ssl module') def test_create_unix_server_ssl_verified(self): proto = MyProto(loop=self.loop) @@ -1148,7 +1149,7 @@ class EventLoopTestsMixin: server.close() - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') def test_create_server_dual_stack(self): f_proto = self.loop.create_future() @@ -1160,7 +1161,7 @@ class EventLoopTestsMixin: try_count = 0 while True: try: - port = support.find_unused_port() + port = socket_helper.find_unused_port() f = self.loop.create_server(TestMyProto, host=None, port=port) server = self.loop.run_until_complete(f) except OSError as ex: diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 007039a..b5d1df9 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -13,6 +13,7 @@ from asyncio.proactor_events import _ProactorWritePipeTransport from asyncio.proactor_events import _ProactorDuplexPipeTransport from asyncio.proactor_events import _ProactorDatagramTransport from test import support +from test.support import socket_helper from test.test_asyncio import utils as test_utils @@ -950,7 +951,7 @@ class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase): def prepare(self): sock = self.make_socket() proto = self.MyProto(self.loop) - port = support.find_unused_port() + port = socket_helper.find_unused_port() srv_sock = self.make_socket(cleanup=False) srv_sock.bind(('127.0.0.1', port)) server = self.run_loop(self.loop.create_server( diff --git a/Lib/test/test_asyncio/test_sendfile.py b/Lib/test/test_asyncio/test_sendfile.py index 3b7f784..dbce199 100644 --- a/Lib/test/test_asyncio/test_sendfile.py +++ b/Lib/test/test_asyncio/test_sendfile.py @@ -10,6 +10,7 @@ from asyncio import base_events from asyncio import constants from unittest import mock from test import support +from test.support import socket_helper from test.test_asyncio import utils as test_utils try: @@ -163,9 +164,9 @@ class SockSendfileMixin(SendfileBase): def prepare_socksendfile(self): proto = MyProto(self.loop) - port = support.find_unused_port() + port = socket_helper.find_unused_port() srv_sock = self.make_socket(cleanup=False) - srv_sock.bind((support.HOST, port)) + srv_sock.bind((socket_helper.HOST, port)) server = self.run_loop(self.loop.create_server( lambda: proto, sock=srv_sock)) self.reduce_receive_buffer_size(srv_sock) @@ -240,7 +241,7 @@ class SendfileMixin(SendfileBase): # Note: sendfile via SSL transport is equal to sendfile fallback def prepare_sendfile(self, *, is_ssl=False, close_after=0): - port = support.find_unused_port() + port = socket_helper.find_unused_port() srv_proto = MySendfileProto(loop=self.loop, close_after=close_after) if is_ssl: @@ -252,17 +253,17 @@ class SendfileMixin(SendfileBase): srv_ctx = None cli_ctx = None srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - srv_sock.bind((support.HOST, port)) + srv_sock.bind((socket_helper.HOST, port)) server = self.run_loop(self.loop.create_server( lambda: srv_proto, sock=srv_sock, ssl=srv_ctx)) self.reduce_receive_buffer_size(srv_sock) if is_ssl: - server_hostname = support.HOST + server_hostname = socket_helper.HOST else: server_hostname = None cli_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - cli_sock.connect((support.HOST, port)) + cli_sock.connect((socket_helper.HOST, port)) cli_proto = MySendfileProto(loop=self.loop) tr, pr = self.run_loop(self.loop.create_connection( diff --git a/Lib/test/test_asyncio/test_server.py b/Lib/test/test_asyncio/test_server.py index d47ccc0..006ead2 100644 --- a/Lib/test/test_asyncio/test_server.py +++ b/Lib/test/test_asyncio/test_server.py @@ -4,6 +4,7 @@ import threading import unittest from test import support +from test.support import socket_helper from test.test_asyncio import utils as test_utils from test.test_asyncio import functional as func_tests @@ -47,7 +48,7 @@ class BaseStartServer(func_tests.FunctionalTestCaseMixin): with self.assertWarns(DeprecationWarning): srv = self.loop.run_until_complete(asyncio.start_server( - serve, support.HOSTv4, 0, loop=self.loop, start_serving=False)) + serve, socket_helper.HOSTv4, 0, loop=self.loop, start_serving=False)) self.assertFalse(srv.is_serving()) @@ -73,7 +74,7 @@ class SelectorStartServerTests(BaseStartServer, unittest.TestCase): def new_loop(self): return asyncio.SelectorEventLoop() - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket def test_start_unix_server_1(self): HELLO_MSG = b'1' * 1024 * 5 + b'\n' started = threading.Event() diff --git a/Lib/test/test_asyncio/test_sock_lowlevel.py b/Lib/test/test_asyncio/test_sock_lowlevel.py index 89c2af9..2f2d5a4 100644 --- a/Lib/test/test_asyncio/test_sock_lowlevel.py +++ b/Lib/test/test_asyncio/test_sock_lowlevel.py @@ -5,6 +5,7 @@ from asyncio import proactor_events from itertools import cycle, islice from test.test_asyncio import utils as test_utils from test import support +from test.support import socket_helper class MyProto(asyncio.Protocol): @@ -225,7 +226,7 @@ class BaseSockTestsMixin: self.loop.run_until_complete( self._basetest_huge_content_recvinto(httpd.address)) - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket def test_unix_sock_client_ops(self): with test_utils.run_test_unix_server() as httpd: sock = socket.socket(socket.AF_UNIX) diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 12bd536..1e9d115 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -9,7 +9,7 @@ import sys import threading import unittest from unittest import mock -from test import support +from test.support import socket_helper try: import ssl except ImportError: @@ -66,7 +66,7 @@ class StreamTests(test_utils.TestCase): loop=self.loop) self._basetest_open_connection(conn_fut) - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket def test_open_unix_connection(self): with test_utils.run_test_unix_server() as httpd: conn_fut = asyncio.open_unix_connection(httpd.address, @@ -99,7 +99,7 @@ class StreamTests(test_utils.TestCase): self._basetest_open_connection_no_loop_ssl(conn_fut) - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket @unittest.skipIf(ssl is None, 'No ssl module') def test_open_unix_connection_no_loop_ssl(self): with test_utils.run_test_unix_server(use_ssl=True) as httpd: @@ -130,7 +130,7 @@ class StreamTests(test_utils.TestCase): loop=self.loop) self._basetest_open_connection_error(conn_fut) - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket def test_open_unix_connection_error(self): with test_utils.run_test_unix_server() as httpd: conn_fut = asyncio.open_unix_connection(httpd.address, @@ -653,7 +653,7 @@ class StreamTests(test_utils.TestCase): self.assertEqual(messages, []) - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket def test_start_unix_server(self): class MyServer: diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 5487b7a..10bd46d 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -15,6 +15,7 @@ import threading import unittest from unittest import mock from test import support +from test.support import socket_helper if sys.platform == 'win32': raise unittest.SkipTest('UNIX only') @@ -273,7 +274,7 @@ class SelectorEventLoopUnixSocketTests(test_utils.TestCase): self.loop = asyncio.SelectorEventLoop() self.set_event_loop(self.loop) - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket def test_create_unix_server_existing_path_sock(self): with test_utils.unix_socket_path() as path: sock = socket.socket(socket.AF_UNIX) @@ -286,7 +287,7 @@ class SelectorEventLoopUnixSocketTests(test_utils.TestCase): srv.close() self.loop.run_until_complete(srv.wait_closed()) - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket def test_create_unix_server_pathlib(self): with test_utils.unix_socket_path() as path: path = pathlib.Path(path) @@ -344,7 +345,7 @@ class SelectorEventLoopUnixSocketTests(test_utils.TestCase): @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 'no socket.SOCK_NONBLOCK (linux only)') - @support.skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket def test_create_unix_server_path_stream_bittype(self): sock = socket.socket( socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) @@ -497,12 +498,12 @@ class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase): def prepare(self): sock = self.make_socket() proto = self.MyProto(self.loop) - port = support.find_unused_port() + port = socket_helper.find_unused_port() srv_sock = self.make_socket(cleanup=False) - srv_sock.bind((support.HOST, port)) + srv_sock.bind((socket_helper.HOST, port)) server = self.run_loop(self.loop.create_server( lambda: proto, sock=srv_sock)) - self.run_loop(self.loop.sock_connect(sock, (support.HOST, port))) + self.run_loop(self.loop.sock_connect(sock, (socket_helper.HOST, port))) self.run_loop(proto._ready) def cleanup(): diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py index 6c84ac4..3c3abe4 100644 --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -10,6 +10,7 @@ import struct import threading from test import support +from test.support import socket_helper from io import BytesIO if support.PGO: @@ -91,7 +92,7 @@ def bind_af_aware(sock, addr): if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: # Make sure the path doesn't exist. support.unlink(addr) - support.bind_unix_socket(sock, addr) + socket_helper.bind_unix_socket(sock, addr) else: sock.bind(addr) @@ -327,7 +328,7 @@ class DispatcherWithSendTests(unittest.TestCase): evt = threading.Event() sock = socket.socket() sock.settimeout(3) - port = support.bind_port(sock) + port = socket_helper.bind_port(sock) cap = BytesIO() args = (evt, cap, sock) @@ -341,7 +342,7 @@ class DispatcherWithSendTests(unittest.TestCase): data = b"Suppose there isn't a 16-ton weight?" d = dispatcherwithsend_noread() d.create_socket() - d.connect((support.HOST, port)) + d.connect((socket_helper.HOST, port)) # give time for socket to connect time.sleep(0.1) @@ -791,12 +792,12 @@ class BaseTestAPI: class TestAPI_UseIPv4Sockets(BaseTestAPI): family = socket.AF_INET - addr = (support.HOST, 0) + addr = (socket_helper.HOST, 0) -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required') +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required') class TestAPI_UseIPv6Sockets(BaseTestAPI): family = socket.AF_INET6 - addr = (support.HOSTv6, 0) + addr = (socket_helper.HOSTv6, 0) @unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required') class TestAPI_UseUnixSockets(BaseTestAPI): diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py index cf30a3d..e424076 100644 --- a/Lib/test/test_ftplib.py +++ b/Lib/test/test_ftplib.py @@ -19,7 +19,8 @@ except ImportError: from unittest import TestCase, skipUnless from test import support -from test.support import HOST, HOSTv6 +from test.support import socket_helper +from test.support.socket_helper import HOST, HOSTv6 TIMEOUT = support.LOOPBACK_TIMEOUT DEFAULT_ENCODING = 'utf-8' @@ -751,7 +752,7 @@ class TestFTPClass(TestCase): def test_source_address(self): self.client.quit() - port = support.find_unused_port() + port = socket_helper.find_unused_port() try: self.client.connect(self.server.host, self.server.port, source_address=(HOST, port)) @@ -763,7 +764,7 @@ class TestFTPClass(TestCase): raise def test_source_address_passive_connection(self): - port = support.find_unused_port() + port = socket_helper.find_unused_port() self.client.source_address = (HOST, port) try: with self.client.transfercmd('list') as sock: @@ -816,7 +817,7 @@ class TestFTPClass(TestCase): self.assertEqual(DEFAULT_ENCODING, client.encoding) -@skipUnless(support.IPV6_ENABLED, "IPv6 not enabled") +@skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") class TestIPv6Environment(TestCase): def setUp(self): @@ -1007,7 +1008,7 @@ class TestTimeouts(TestCase): self.evt = threading.Event() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(20) - self.port = support.bind_port(self.sock) + self.port = socket_helper.bind_port(self.sock) self.server_thread = threading.Thread(target=self.server) self.server_thread.daemon = True self.server_thread.start() diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py index 77d4335..6b7a9de 100644 --- a/Lib/test/test_httplib.py +++ b/Lib/test/test_httplib.py @@ -13,6 +13,7 @@ import unittest TestCase = unittest.TestCase from test import support +from test.support import socket_helper here = os.path.dirname(__file__) # Self-signed cert file for 'localhost' @@ -42,7 +43,7 @@ last_chunk_extended = "0" + chunk_extension + "\r\n" trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" chunked_end = "\r\n" -HOST = support.HOST +HOST = socket_helper.HOST class FakeSocket: def __init__(self, text, fileclass=io.BytesIO, host=None, port=None): @@ -1463,8 +1464,8 @@ class OfflineTest(TestCase): class SourceAddressTest(TestCase): def setUp(self): self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.port = support.bind_port(self.serv) - self.source_port = support.find_unused_port() + self.port = socket_helper.bind_port(self.serv) + self.source_port = socket_helper.find_unused_port() self.serv.listen() self.conn = None @@ -1496,7 +1497,7 @@ class TimeoutTest(TestCase): def setUp(self): self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - TimeoutTest.PORT = support.bind_port(self.serv) + TimeoutTest.PORT = socket_helper.bind_port(self.serv) self.serv.listen() def tearDown(self): diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index 91aa771..7645666 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -1,4 +1,5 @@ from test import support +from test.support import socket_helper from contextlib import contextmanager import imaplib @@ -82,7 +83,7 @@ class TestImaplib(unittest.TestCase): pass # This is the exception that should be raised. - expected_errnos = support.get_socket_conn_refused_errs() + expected_errnos = socket_helper.get_socket_conn_refused_errs() with self.assertRaises(OSError) as cm: imaplib.IMAP4() self.assertIn(cm.exception.errno, expected_errnos) @@ -210,7 +211,7 @@ class NewIMAPTestsMixin(): raise self.addCleanup(self._cleanup) - self.server = self.server_class((support.HOST, 0), imap_handler) + self.server = self.server_class((socket_helper.HOST, 0), imap_handler) self.thread = threading.Thread( name=self._testMethodName+'-server', target=self.server.serve_forever, @@ -600,7 +601,7 @@ class ThreadedNetworkedTests(unittest.TestCase): @contextmanager def reaped_server(self, hdlr): - server, thread = self.make_server((support.HOST, 0), hdlr) + server, thread = self.make_server((socket_helper.HOST, 0), hdlr) try: yield server finally: diff --git a/Lib/test/test_largefile.py b/Lib/test/test_largefile.py index c254b04..a99b4ba 100644 --- a/Lib/test/test_largefile.py +++ b/Lib/test/test_largefile.py @@ -8,8 +8,9 @@ import unittest import socket import shutil import threading -from test.support import TESTFN, requires, unlink, bigmemtest, find_unused_port +from test.support import TESTFN, requires, unlink, bigmemtest from test.support import SHORT_TIMEOUT +from test.support import socket_helper import io # C implementation of io import _pyio as pyio # Python implementation of io @@ -219,7 +220,7 @@ class TestSocketSendfile(LargeFileTest, unittest.TestCase): # bit more tolerance. @skip_no_disk_space(TESTFN, size * 2.5) def test_it(self): - port = find_unused_port() + port = socket_helper.find_unused_port() with socket.create_server(("", port)) as sock: self.tcp_server(sock) with socket.create_connection(("127.0.0.1", port)) as client: diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index 99e74eb..241ed2c 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -43,6 +43,7 @@ import sys import tempfile from test.support.script_helper import assert_python_ok, assert_python_failure from test import support +from test.support import socket_helper import textwrap import threading import time @@ -1053,10 +1054,10 @@ class SMTPHandlerTest(BaseTest): def test_basic(self): sockmap = {} - server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001, + server = TestSMTPServer((socket_helper.HOST, 0), self.process_message, 0.001, sockmap) server.start() - addr = (support.HOST, server.port) + addr = (socket_helper.HOST, server.port) h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', timeout=self.TIMEOUT) self.assertEqual(h.toaddrs, ['you']) @@ -1921,7 +1922,7 @@ class UnixSysLogHandlerTest(SysLogHandlerTest): SysLogHandlerTest.tearDown(self) support.unlink(self.address) -@unittest.skipUnless(support.IPV6_ENABLED, +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required for this test.') class IPv6SysLogHandlerTest(SysLogHandlerTest): diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py index fdd76f9..2a5a0b9 100644 --- a/Lib/test/test_nntplib.py +++ b/Lib/test/test_nntplib.py @@ -10,6 +10,7 @@ import re import threading from test import support +from test.support import socket_helper from nntplib import NNTP, GroupInfo import nntplib from unittest.mock import patch @@ -1555,14 +1556,14 @@ class MockSslTests(MockSocketTests): class LocalServerTests(unittest.TestCase): def setUp(self): sock = socket.socket() - port = support.bind_port(sock) + port = socket_helper.bind_port(sock) sock.listen() self.background = threading.Thread( target=self.run_server, args=(sock,)) self.background.start() self.addCleanup(self.background.join) - self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__() + self.nntp = NNTP(socket_helper.HOST, port, usenetrc=False).__enter__() self.addCleanup(self.nntp.__exit__, None, None, None) def run_server(self, sock): diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 74aef47..362ba9e 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -30,6 +30,7 @@ import unittest import uuid import warnings from test import support +from test.support import socket_helper from platform import win32_is_iot try: @@ -3171,7 +3172,7 @@ class TestSendfile(unittest.TestCase): support.unlink(support.TESTFN) def setUp(self): - self.server = SendfileTestServer((support.HOST, 0)) + self.server = SendfileTestServer((socket_helper.HOST, 0)) self.server.start() self.client = socket.socket() self.client.connect((self.server.host, self.server.port)) diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py index 7f06d19..d4877b1 100644 --- a/Lib/test/test_poplib.py +++ b/Lib/test/test_poplib.py @@ -13,8 +13,9 @@ import threading from unittest import TestCase, skipUnless from test import support as test_support +from test.support import socket_helper -HOST = test_support.HOST +HOST = socket_helper.HOST PORT = 0 SUPPORTS_SSL = False @@ -480,7 +481,7 @@ class TestTimeouts(TestCase): self.evt = threading.Event() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(60) # Safety net. Look issue 11812 - self.port = test_support.bind_port(self.sock) + self.port = socket_helper.bind_port(self.sock) self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock)) self.thread.daemon = True self.thread.start() diff --git a/Lib/test/test_robotparser.py b/Lib/test/test_robotparser.py index f28d8be..9d4764e 100644 --- a/Lib/test/test_robotparser.py +++ b/Lib/test/test_robotparser.py @@ -4,6 +4,7 @@ import threading import unittest import urllib.robotparser from test import support +from test.support import socket_helper from http.server import BaseHTTPRequestHandler, HTTPServer @@ -312,7 +313,7 @@ class PasswordProtectedSiteTestCase(unittest.TestCase): # clear _opener global variable self.addCleanup(urllib.request.urlcleanup) - self.server = HTTPServer((support.HOST, 0), RobotHandler) + self.server = HTTPServer((socket_helper.HOST, 0), RobotHandler) self.t = threading.Thread( name='HTTPServer serving', @@ -332,7 +333,7 @@ class PasswordProtectedSiteTestCase(unittest.TestCase): @support.reap_threads def testPasswordProtectedSite(self): addr = self.server.server_address - url = 'http://' + support.HOST + ':' + str(addr[1]) + url = 'http://' + socket_helper.HOST + ':' + str(addr[1]) robots_url = url + "/robots.txt" parser = urllib.robotparser.RobotFileParser() parser.set_url(url) diff --git a/Lib/test/test_selectors.py b/Lib/test/test_selectors.py index c449155..2274c39 100644 --- a/Lib/test/test_selectors.py +++ b/Lib/test/test_selectors.py @@ -6,6 +6,7 @@ import signal import socket import sys from test import support +from test.support import socket_helper from time import sleep import unittest import unittest.mock @@ -22,7 +23,7 @@ if hasattr(socket, 'socketpair'): else: def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0): with socket.socket(family, type, proto) as l: - l.bind((support.HOST, 0)) + l.bind((socket_helper.HOST, 0)) l.listen() c = socket.socket(family, type, proto) try: diff --git a/Lib/test/test_smtpd.py b/Lib/test/test_smtpd.py index a9f7d5a..3be7739 100644 --- a/Lib/test/test_smtpd.py +++ b/Lib/test/test_smtpd.py @@ -1,6 +1,7 @@ import unittest import textwrap from test import support, mock_socket +from test.support import socket_helper import socket import io import smtpd @@ -38,7 +39,7 @@ class SMTPDServerTest(unittest.TestCase): smtpd.socket = asyncore.socket = mock_socket def test_process_message_unimplemented(self): - server = smtpd.SMTPServer((support.HOST, 0), ('b', 0), + server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0), decode_data=True) conn, addr = server.accept() channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) @@ -57,7 +58,7 @@ class SMTPDServerTest(unittest.TestCase): self.assertRaises( ValueError, smtpd.SMTPServer, - (support.HOST, 0), + (socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True, decode_data=True) @@ -87,7 +88,7 @@ class DebuggingServerTest(unittest.TestCase): write_line(b'.') def test_process_message_with_decode_data_true(self): - server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), + server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), decode_data=True) conn, addr = server.accept() channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) @@ -104,7 +105,7 @@ class DebuggingServerTest(unittest.TestCase): """)) def test_process_message_with_decode_data_false(self): - server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0)) + server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0)) conn, addr = server.accept() channel = smtpd.SMTPChannel(server, conn, addr) with support.captured_stdout() as s: @@ -120,7 +121,7 @@ class DebuggingServerTest(unittest.TestCase): """)) def test_process_message_with_enable_SMTPUTF8_true(self): - server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), + server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True) conn, addr = server.accept() channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) @@ -137,7 +138,7 @@ class DebuggingServerTest(unittest.TestCase): """)) def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self): - server = smtpd.DebuggingServer((support.HOST, 0), ('b', 0), + server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True) conn, addr = server.accept() channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) @@ -168,13 +169,13 @@ class TestFamilyDetection(unittest.TestCase): asyncore.close_all() asyncore.socket = smtpd.socket = socket - @unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled") + @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") def test_socket_uses_IPv6(self): - server = smtpd.SMTPServer((support.HOSTv6, 0), (support.HOSTv4, 0)) + server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0)) self.assertEqual(server.socket.family, socket.AF_INET6) def test_socket_uses_IPv4(self): - server = smtpd.SMTPServer((support.HOSTv4, 0), (support.HOSTv6, 0)) + server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0)) self.assertEqual(server.socket.family, socket.AF_INET) @@ -197,7 +198,7 @@ class TestRcptOptionParsing(unittest.TestCase): channel.handle_read() def test_params_rejected(self): - server = DummyServer((support.HOST, 0), ('b', 0)) + server = DummyServer((socket_helper.HOST, 0), ('b', 0)) conn, addr = server.accept() channel = smtpd.SMTPChannel(server, conn, addr) self.write_line(channel, b'EHLO example') @@ -206,7 +207,7 @@ class TestRcptOptionParsing(unittest.TestCase): self.assertEqual(channel.socket.last, self.error_response) def test_nothing_accepted(self): - server = DummyServer((support.HOST, 0), ('b', 0)) + server = DummyServer((socket_helper.HOST, 0), ('b', 0)) conn, addr = server.accept() channel = smtpd.SMTPChannel(server, conn, addr) self.write_line(channel, b'EHLO example') @@ -234,7 +235,7 @@ class TestMailOptionParsing(unittest.TestCase): channel.handle_read() def test_with_decode_data_true(self): - server = DummyServer((support.HOST, 0), ('b', 0), decode_data=True) + server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True) conn, addr = server.accept() channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) self.write_line(channel, b'EHLO example') @@ -250,7 +251,7 @@ class TestMailOptionParsing(unittest.TestCase): self.assertEqual(channel.socket.last, b'250 OK\r\n') def test_with_decode_data_false(self): - server = DummyServer((support.HOST, 0), ('b', 0)) + server = DummyServer((socket_helper.HOST, 0), ('b', 0)) conn, addr = server.accept() channel = smtpd.SMTPChannel(server, conn, addr) self.write_line(channel, b'EHLO example') @@ -271,7 +272,7 @@ class TestMailOptionParsing(unittest.TestCase): self.assertEqual(channel.socket.last, b'250 OK\r\n') def test_with_enable_smtputf8_true(self): - server = DummyServer((support.HOST, 0), ('b', 0), enable_SMTPUTF8=True) + server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True) conn, addr = server.accept() channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) self.write_line(channel, b'EHLO example') @@ -286,7 +287,7 @@ class SMTPDChannelTest(unittest.TestCase): smtpd.socket = asyncore.socket = mock_socket self.old_debugstream = smtpd.DEBUGSTREAM self.debug = smtpd.DEBUGSTREAM = io.StringIO() - self.server = DummyServer((support.HOST, 0), ('b', 0), + self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True) conn, addr = self.server.accept() self.channel = smtpd.SMTPChannel(self.server, conn, addr, @@ -304,7 +305,7 @@ class SMTPDChannelTest(unittest.TestCase): def test_broken_connect(self): self.assertRaises( DummyDispatcherBroken, BrokenDummyServer, - (support.HOST, 0), ('b', 0), decode_data=True) + (socket_helper.HOST, 0), ('b', 0), decode_data=True) def test_decode_data_and_enable_SMTPUTF8_raises(self): self.assertRaises( @@ -758,13 +759,13 @@ class SMTPDChannelTest(unittest.TestCase): with support.check_warnings(('', DeprecationWarning)): self.channel._SMTPChannel__addr = 'spam' -@unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled") +@unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") class SMTPDChannelIPv6Test(SMTPDChannelTest): def setUp(self): smtpd.socket = asyncore.socket = mock_socket self.old_debugstream = smtpd.DEBUGSTREAM self.debug = smtpd.DEBUGSTREAM = io.StringIO() - self.server = DummyServer((support.HOSTv6, 0), ('b', 0), + self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0), decode_data=True) conn, addr = self.server.accept() self.channel = smtpd.SMTPChannel(self.server, conn, addr, @@ -776,7 +777,7 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): smtpd.socket = asyncore.socket = mock_socket self.old_debugstream = smtpd.DEBUGSTREAM self.debug = smtpd.DEBUGSTREAM = io.StringIO() - self.server = DummyServer((support.HOST, 0), ('b', 0), + self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True) conn, addr = self.server.accept() # Set DATA size limit to 32 bytes for easy testing @@ -831,7 +832,7 @@ class SMTPDChannelWithDecodeDataFalse(unittest.TestCase): smtpd.socket = asyncore.socket = mock_socket self.old_debugstream = smtpd.DEBUGSTREAM self.debug = smtpd.DEBUGSTREAM = io.StringIO() - self.server = DummyServer((support.HOST, 0), ('b', 0)) + self.server = DummyServer((socket_helper.HOST, 0), ('b', 0)) conn, addr = self.server.accept() self.channel = smtpd.SMTPChannel(self.server, conn, addr) @@ -873,7 +874,7 @@ class SMTPDChannelWithDecodeDataTrue(unittest.TestCase): smtpd.socket = asyncore.socket = mock_socket self.old_debugstream = smtpd.DEBUGSTREAM self.debug = smtpd.DEBUGSTREAM = io.StringIO() - self.server = DummyServer((support.HOST, 0), ('b', 0), + self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True) conn, addr = self.server.accept() # Set decode_data to True @@ -916,7 +917,7 @@ class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase): smtpd.socket = asyncore.socket = mock_socket self.old_debugstream = smtpd.DEBUGSTREAM self.debug = smtpd.DEBUGSTREAM = io.StringIO() - self.server = DummyServer((support.HOST, 0), ('b', 0), + self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True) conn, addr = self.server.accept() self.channel = smtpd.SMTPChannel(self.server, conn, addr, diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py index 067c01c..d1ffb36 100644 --- a/Lib/test/test_smtplib.py +++ b/Lib/test/test_smtplib.py @@ -20,11 +20,12 @@ import threading import unittest from test import support, mock_socket -from test.support import HOST +from test.support import socket_helper from test.support import threading_setup, threading_cleanup, join_thread from test.support import requires_hashdigest from unittest.mock import Mock +HOST = socket_helper.HOST if sys.platform == 'darwin': # select.poll returns a select.POLLHUP at the end of the tests @@ -271,7 +272,7 @@ class DebuggingServerTests(unittest.TestCase): def testSourceAddress(self): # connect - src_port = support.find_unused_port() + src_port = socket_helper.find_unused_port() try: smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost', timeout=support.LOOPBACK_TIMEOUT, @@ -711,7 +712,7 @@ class TooLongLineTests(unittest.TestCase): self.evt = threading.Event() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(15) - self.port = support.bind_port(self.sock) + self.port = socket_helper.bind_port(self.sock) servargs = (self.evt, self.respdata, self.sock) self.thread = threading.Thread(target=server, args=servargs) self.thread.start() diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py index 6e4e4fe..a70e282 100755 --- a/Lib/test/test_socket.py +++ b/Lib/test/test_socket.py @@ -1,5 +1,6 @@ import unittest from test import support +from test.support import socket_helper import errno import io @@ -34,7 +35,7 @@ try: except ImportError: fcntl = None -HOST = support.HOST +HOST = socket_helper.HOST # test unicode string and carriage return MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') @@ -161,7 +162,7 @@ class SocketTCPTest(unittest.TestCase): def setUp(self): self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.port = support.bind_port(self.serv) + self.port = socket_helper.bind_port(self.serv) self.serv.listen() def tearDown(self): @@ -172,7 +173,7 @@ class SocketUDPTest(unittest.TestCase): def setUp(self): self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.port = support.bind_port(self.serv) + self.port = socket_helper.bind_port(self.serv) def tearDown(self): self.serv.close() @@ -182,7 +183,7 @@ class SocketUDPLITETest(SocketUDPTest): def setUp(self): self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) - self.port = support.bind_port(self.serv) + self.port = socket_helper.bind_port(self.serv) class ThreadSafeCleanupTestCase(unittest.TestCase): """Subclass of unittest.TestCase with thread-safe cleanup methods. @@ -265,7 +266,7 @@ class SocketRDSTest(unittest.TestCase): self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) self.addCleanup(self.serv.close) try: - self.port = support.bind_port(self.serv) + self.port = socket_helper.bind_port(self.serv) except OSError: self.skipTest('unable to bind RDS socket') @@ -682,7 +683,7 @@ class UnixSocketTestBase(SocketTestBase): def bindSock(self, sock): path = tempfile.mktemp(dir=self.dir_path) - support.bind_unix_socket(sock, path) + socket_helper.bind_unix_socket(sock, path) self.addCleanup(support.unlink, path) class UnixStreamBase(UnixSocketTestBase): @@ -702,7 +703,7 @@ class InetTestBase(SocketTestBase): self.port = self.serv_addr[1] def bindSock(self, sock): - support.bind_port(sock, host=self.host) + socket_helper.bind_port(sock, host=self.host) class TCPTestBase(InetTestBase): """Base class for TCP-over-IPv4 tests.""" @@ -733,7 +734,7 @@ class SCTPStreamBase(InetTestBase): class Inet6TestBase(InetTestBase): """Base class for IPv6 socket tests.""" - host = support.HOSTv6 + host = socket_helper.HOSTv6 class UDP6TestBase(Inet6TestBase): """Base class for UDP-over-IPv6 tests.""" @@ -966,12 +967,12 @@ class GeneralModuleTests(unittest.TestCase): self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) def test_host_resolution(self): - for addr in [support.HOSTv4, '10.0.0.1', '255.255.255.255']: + for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']: self.assertEqual(socket.gethostbyname(addr), addr) - # we don't test support.HOSTv6 because there's a chance it doesn't have + # we don't test socket_helper.HOSTv6 because there's a chance it doesn't have # a matching name entry (e.g. 'ip6-localhost') - for host in [support.HOSTv4]: + for host in [socket_helper.HOSTv4]: self.assertIn(host, socket.gethostbyaddr(host)[2]) def test_host_resolution_bad_address(self): @@ -1334,7 +1335,7 @@ class GeneralModuleTests(unittest.TestCase): def testSockName(self): # Testing getsockname() - port = support.find_unused_port() + port = socket_helper.find_unused_port() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.addCleanup(sock.close) sock.bind(("0.0.0.0", port)) @@ -1400,7 +1401,7 @@ class GeneralModuleTests(unittest.TestCase): def test_getsockaddrarg(self): sock = socket.socket() self.addCleanup(sock.close) - port = support.find_unused_port() + port = socket_helper.find_unused_port() big_port = port + 65536 neg_port = port - 65536 self.assertRaises(OverflowError, sock.bind, (HOST, big_port)) @@ -1408,7 +1409,7 @@ class GeneralModuleTests(unittest.TestCase): # Since find_unused_port() is inherently subject to race conditions, we # call it a couple times if necessary. for i in itertools.count(): - port = support.find_unused_port() + port = socket_helper.find_unused_port() try: sock.bind((HOST, port)) except OSError as e: @@ -1461,7 +1462,7 @@ class GeneralModuleTests(unittest.TestCase): socket.getaddrinfo('localhost', 80) socket.getaddrinfo('127.0.0.1', 80) socket.getaddrinfo(None, 80) - if support.IPV6_ENABLED: + if socket_helper.IPV6_ENABLED: socket.getaddrinfo('::1', 80) # port can be a string service name such as "http", a numeric # port number or None @@ -1674,14 +1675,14 @@ class GeneralModuleTests(unittest.TestCase): srv.bind((HOST, 0)) self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') def test_flowinfo(self): self.assertRaises(OverflowError, socket.getnameinfo, - (support.HOSTv6, 0, 0xffffffff), 0) + (socket_helper.HOSTv6, 0, 0xffffffff), 0) with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: - self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10)) + self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10)) - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') def test_getaddrinfo_ipv6_basic(self): ((*_, sockaddr),) = socket.getaddrinfo( 'ff02::1de:c0:face:8D', # Note capital letter `D`. @@ -1691,7 +1692,7 @@ class GeneralModuleTests(unittest.TestCase): ) self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0)) - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') @unittest.skipIf(AIX, 'Symbolic scope id does not work') def test_getaddrinfo_ipv6_scopeid_symbolic(self): @@ -1706,7 +1707,7 @@ class GeneralModuleTests(unittest.TestCase): # Note missing interface name part in IPv6 address self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless( sys.platform == 'win32', 'Numeric scope id does not work or undocumented') @@ -1723,7 +1724,7 @@ class GeneralModuleTests(unittest.TestCase): # Note missing interface name part in IPv6 address self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') @unittest.skipIf(AIX, 'Symbolic scope id does not work') def test_getnameinfo_ipv6_scopeid_symbolic(self): @@ -1733,7 +1734,7 @@ class GeneralModuleTests(unittest.TestCase): nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234')) - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless( sys.platform == 'win32', 'Numeric scope id does not work or undocumented') def test_getnameinfo_ipv6_scopeid_numeric(self): @@ -1826,19 +1827,19 @@ class GeneralModuleTests(unittest.TestCase): def test_socket_fileno(self): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.addCleanup(s.close) - s.bind((support.HOST, 0)) + s.bind((socket_helper.HOST, 0)) self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM) if hasattr(socket, "SOCK_DGRAM"): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.addCleanup(s.close) - s.bind((support.HOST, 0)) + s.bind((socket_helper.HOST, 0)) self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM) - if support.IPV6_ENABLED: + if socket_helper.IPV6_ENABLED: s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) self.addCleanup(s.close) - s.bind((support.HOSTv6, 0, 0, 0)) + s.bind((socket_helper.HOSTv6, 0, 0, 0)) self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM) if hasattr(socket, "AF_UNIX"): @@ -2214,12 +2215,12 @@ class BasicQIPCRTRTest(unittest.TestCase): def testBindSock(self): with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: - support.bind_port(s, host=s.getsockname()[0]) + socket_helper.bind_port(s, host=s.getsockname()[0]) self.assertNotEqual(s.getsockname()[1], 0) def testInvalidBindSock(self): with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: - self.assertRaises(OSError, support.bind_port, s, host=-2) + self.assertRaises(OSError, socket_helper.bind_port, s, host=-2) def testAutoBindSock(self): with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: @@ -4094,25 +4095,25 @@ class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase, self.assertEqual(addr1[:-1], addr2[:-1]) @requireAttrs(socket.socket, "sendmsg") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @requireSocket("AF_INET6", "SOCK_DGRAM") class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase): pass @requireAttrs(socket.socket, "recvmsg") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @requireSocket("AF_INET6", "SOCK_DGRAM") class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase): pass @requireAttrs(socket.socket, "recvmsg_into") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @requireSocket("AF_INET6", "SOCK_DGRAM") class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase): pass @requireAttrs(socket.socket, "recvmsg") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @requireAttrs(socket, "IPPROTO_IPV6") @requireSocket("AF_INET6", "SOCK_DGRAM") class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest, @@ -4120,7 +4121,7 @@ class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest, pass @requireAttrs(socket.socket, "recvmsg_into") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @requireAttrs(socket, "IPPROTO_IPV6") @requireSocket("AF_INET6", "SOCK_DGRAM") class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin, @@ -4167,7 +4168,7 @@ class SendrecvmsgUDPLITE6TestBase(SendrecvmsgDgramFlagsBase, self.assertEqual(addr1[:-1], addr2[:-1]) @requireAttrs(socket.socket, "sendmsg") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(HAVE_SOCKET_UDPLITE, 'UDPLITE sockets required for this test.') @requireSocket("AF_INET6", "SOCK_DGRAM") @@ -4175,7 +4176,7 @@ class SendmsgUDPLITE6Test(SendmsgConnectionlessTests, SendrecvmsgUDPLITE6TestBas pass @requireAttrs(socket.socket, "recvmsg") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(HAVE_SOCKET_UDPLITE, 'UDPLITE sockets required for this test.') @requireSocket("AF_INET6", "SOCK_DGRAM") @@ -4183,7 +4184,7 @@ class RecvmsgUDPLITE6Test(RecvmsgTests, SendrecvmsgUDPLITE6TestBase): pass @requireAttrs(socket.socket, "recvmsg_into") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(HAVE_SOCKET_UDPLITE, 'UDPLITE sockets required for this test.') @requireSocket("AF_INET6", "SOCK_DGRAM") @@ -4191,7 +4192,7 @@ class RecvmsgIntoUDPLITE6Test(RecvmsgIntoTests, SendrecvmsgUDPLITE6TestBase): pass @requireAttrs(socket.socket, "recvmsg") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(HAVE_SOCKET_UDPLITE, 'UDPLITE sockets required for this test.') @requireAttrs(socket, "IPPROTO_IPV6") @@ -4201,7 +4202,7 @@ class RecvmsgRFC3542AncillaryUDPLITE6Test(RFC3542AncillaryTest, pass @requireAttrs(socket.socket, "recvmsg_into") -@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.') +@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') @unittest.skipUnless(HAVE_SOCKET_UDPLITE, 'UDPLITE sockets required for this test.') @requireAttrs(socket, "IPPROTO_IPV6") @@ -4999,7 +5000,7 @@ class NetworkConnectionNoServer(unittest.TestCase): socket.socket = old_socket def test_connect(self): - port = support.find_unused_port() + port = socket_helper.find_unused_port() cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.addCleanup(cli.close) with self.assertRaises(OSError) as cm: @@ -5009,7 +5010,7 @@ class NetworkConnectionNoServer(unittest.TestCase): def test_create_connection(self): # Issue #9792: errors raised by create_connection() should have # a proper errno attribute. - port = support.find_unused_port() + port = socket_helper.find_unused_port() with self.assertRaises(OSError) as cm: socket.create_connection((HOST, port)) @@ -5027,7 +5028,7 @@ class NetworkConnectionNoServer(unittest.TestCase): # On Solaris, ENETUNREACH is returned in this circumstance instead # of ECONNREFUSED. So, if that errno exists, add it to our list of # expected errnos. - expected_errnos = support.get_socket_conn_refused_errs() + expected_errnos = socket_helper.get_socket_conn_refused_errs() self.assertIn(cm.exception.errno, expected_errnos) def test_create_connection_timeout(self): @@ -5039,7 +5040,7 @@ class NetworkConnectionNoServer(unittest.TestCase): except socket.timeout: pass except OSError as exc: - if support.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT: + if socket_helper.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT: raise else: self.fail('socket.timeout not raised') @@ -5052,7 +5053,7 @@ class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): ThreadableTest.__init__(self) def clientSetUp(self): - self.source_port = support.find_unused_port() + self.source_port = socket_helper.find_unused_port() def clientTearDown(self): self.cli.close() @@ -5338,7 +5339,7 @@ class TestUnixDomain(unittest.TestCase): def bind(self, sock, path): # Bind the socket try: - support.bind_unix_socket(sock, path) + socket_helper.bind_unix_socket(sock, path) except OSError as e: if str(e) == "AF_UNIX path too long": self.skipTest( @@ -6328,11 +6329,11 @@ class TestMSWindowsTCPFlags(unittest.TestCase): class CreateServerTest(unittest.TestCase): def test_address(self): - port = support.find_unused_port() + port = socket_helper.find_unused_port() with socket.create_server(("127.0.0.1", port)) as sock: self.assertEqual(sock.getsockname()[0], "127.0.0.1") self.assertEqual(sock.getsockname()[1], port) - if support.IPV6_ENABLED: + if socket_helper.IPV6_ENABLED: with socket.create_server(("::1", port), family=socket.AF_INET6) as sock: self.assertEqual(sock.getsockname()[0], "::1") @@ -6342,7 +6343,7 @@ class CreateServerTest(unittest.TestCase): with socket.create_server(("127.0.0.1", 0)) as sock: self.assertEqual(sock.family, socket.AF_INET) self.assertEqual(sock.type, socket.SOCK_STREAM) - if support.IPV6_ENABLED: + if socket_helper.IPV6_ENABLED: with socket.create_server(("::1", 0), family=socket.AF_INET6) as s: self.assertEqual(s.family, socket.AF_INET6) self.assertEqual(sock.type, socket.SOCK_STREAM) @@ -6362,14 +6363,14 @@ class CreateServerTest(unittest.TestCase): @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or not hasattr(_socket, 'IPV6_V6ONLY'), "IPV6_V6ONLY option not supported") - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') def test_ipv6_only_default(self): with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock: assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY) @unittest.skipIf(not socket.has_dualstack_ipv6(), "dualstack_ipv6 not supported") - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') def test_dualstack_ipv6_family(self): with socket.create_server(("::1", 0), family=socket.AF_INET6, dualstack_ipv6=True) as sock: @@ -6411,14 +6412,14 @@ class CreateServerFunctionalTest(unittest.TestCase): self.assertEqual(sock.recv(1024), b'foo') def test_tcp4(self): - port = support.find_unused_port() + port = socket_helper.find_unused_port() with socket.create_server(("", port)) as sock: self.echo_server(sock) self.echo_client(("127.0.0.1", port), socket.AF_INET) - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') def test_tcp6(self): - port = support.find_unused_port() + port = socket_helper.find_unused_port() with socket.create_server(("", port), family=socket.AF_INET6) as sock: self.echo_server(sock) @@ -6428,9 +6429,9 @@ class CreateServerFunctionalTest(unittest.TestCase): @unittest.skipIf(not socket.has_dualstack_ipv6(), "dualstack_ipv6 not supported") - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') def test_dual_stack_client_v4(self): - port = support.find_unused_port() + port = socket_helper.find_unused_port() with socket.create_server(("", port), family=socket.AF_INET6, dualstack_ipv6=True) as sock: self.echo_server(sock) @@ -6438,9 +6439,9 @@ class CreateServerFunctionalTest(unittest.TestCase): @unittest.skipIf(not socket.has_dualstack_ipv6(), "dualstack_ipv6 not supported") - @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') def test_dual_stack_client_v6(self): - port = support.find_unused_port() + port = socket_helper.find_unused_port() with socket.create_server(("", port), family=socket.AF_INET6, dualstack_ipv6=True) as sock: self.echo_server(sock) diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py index f818df0..c663cc9 100644 --- a/Lib/test/test_socketserver.py +++ b/Lib/test/test_socketserver.py @@ -15,12 +15,13 @@ import socketserver import test.support from test.support import reap_children, reap_threads, verbose +from test.support import socket_helper test.support.requires("network") TEST_STR = b"hello world\n" -HOST = test.support.HOST +HOST = socket_helper.HOST HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX") requires_unix_sockets = unittest.skipUnless(HAVE_UNIX_SOCKETS, diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 4184665..dafdb6c 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -4,6 +4,7 @@ import sys import unittest import unittest.mock from test import support +from test.support import socket_helper import socket import select import time @@ -33,7 +34,7 @@ Py_DEBUG = hasattr(sys, 'gettotalrefcount') Py_DEBUG_WIN32 = Py_DEBUG and sys.platform == 'win32' PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) -HOST = support.HOST +HOST = socket_helper.HOST IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL') IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0) IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1) @@ -762,7 +763,7 @@ class BasicSocketTests(unittest.TestCase): fail(cert, 'example.net') # -- IPv6 matching -- - if support.IPV6_ENABLED: + if socket_helper.IPV6_ENABLED: cert = {'subject': ((('commonName', 'example.com'),),), 'subjectAltName': ( ('DNS', 'example.com'), @@ -845,7 +846,7 @@ class BasicSocketTests(unittest.TestCase): ssl._inet_paton(invalid) for ipaddr in ['127.0.0.1', '192.168.0.1']: self.assertTrue(ssl._inet_paton(ipaddr)) - if support.IPV6_ENABLED: + if socket_helper.IPV6_ENABLED: for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']: self.assertTrue(ssl._inet_paton(ipaddr)) @@ -1073,7 +1074,7 @@ class BasicSocketTests(unittest.TestCase): def test_connect_ex_error(self): server = socket.socket(socket.AF_INET) self.addCleanup(server.close) - port = support.bind_port(server) # Reserve port but don't listen + port = socket_helper.bind_port(server) # Reserve port but don't listen s = test_wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED) self.addCleanup(s.close) @@ -2256,7 +2257,7 @@ class NetworkedTests(unittest.TestCase): self.skipTest("REMOTE_HOST responded too quickly") self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) - @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6') + @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'Needs IPv6') def test_get_server_certificate_ipv6(self): with support.transient_internet('ipv6.google.com'): _test_get_server_certificate(self, 'ipv6.google.com', 443) @@ -2511,7 +2512,7 @@ class ThreadedEchoServer(threading.Thread): self.connectionchatty = connectionchatty self.starttls_server = starttls_server self.sock = socket.socket() - self.port = support.bind_port(self.sock) + self.port = socket_helper.bind_port(self.sock) self.flag = None self.active = False self.selected_npn_protocols = [] @@ -2624,7 +2625,7 @@ class AsyncoreEchoServer(threading.Thread): def __init__(self, certfile): self.certfile = certfile sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.port = support.bind_port(sock, '') + self.port = socket_helper.bind_port(sock, '') asyncore.dispatcher.__init__(self, sock) self.listen(5) @@ -3144,7 +3145,7 @@ class ThreadedTests(unittest.TestCase): listener_gone = threading.Event() s = socket.socket() - port = support.bind_port(s, HOST) + port = socket_helper.bind_port(s, HOST) # `listener` runs in a thread. It sits in an accept() until # the main thread connects. Then it rudely closes the socket, @@ -3640,7 +3641,7 @@ class ThreadedTests(unittest.TestCase): # Issue #5103: SSL handshake must respect the socket timeout server = socket.socket(socket.AF_INET) host = "127.0.0.1" - port = support.bind_port(server) + port = socket_helper.bind_port(server) started = threading.Event() finish = False @@ -3694,7 +3695,7 @@ class ThreadedTests(unittest.TestCase): context.load_cert_chain(SIGNED_CERTFILE) server = socket.socket(socket.AF_INET) host = "127.0.0.1" - port = support.bind_port(server) + port = socket_helper.bind_port(server) server = context.wrap_socket(server, server_side=True) self.assertTrue(server.server_side) diff --git a/Lib/test/test_stat.py b/Lib/test/test_stat.py index be01db2..d9e4ffd 100644 --- a/Lib/test/test_stat.py +++ b/Lib/test/test_stat.py @@ -2,8 +2,8 @@ import unittest import os import socket import sys -from test.support import (TESTFN, import_fresh_module, - skip_unless_bind_unix_socket) +from test.support import socket_helper +from test.support import TESTFN, import_fresh_module c_stat = import_fresh_module('stat', fresh=['_stat']) py_stat = import_fresh_module('stat', blocked=['_stat']) @@ -193,7 +193,7 @@ class TestFilemode: self.assertS_IS("BLK", st_mode) break - @skip_unless_bind_unix_socket + @socket_helper.skip_unless_bind_unix_socket def test_socket(self): with socket.socket(socket.AF_UNIX) as s: s.bind(TESTFN) diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index dee1db7..606e570 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -14,6 +14,7 @@ import time import unittest from test import support from test.support import script_helper +from test.support import socket_helper TESTFN = support.TESTFN @@ -91,17 +92,17 @@ class TestSupport(unittest.TestCase): support.rmtree('__pycache__') def test_HOST(self): - s = socket.create_server((support.HOST, 0)) + s = socket.create_server((socket_helper.HOST, 0)) s.close() def test_find_unused_port(self): - port = support.find_unused_port() - s = socket.create_server((support.HOST, port)) + port = socket_helper.find_unused_port() + s = socket.create_server((socket_helper.HOST, port)) s.close() def test_bind_port(self): s = socket.socket() - support.bind_port(s) + socket_helper.bind_port(s) s.listen() s.close() diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py index 414c328..7633901 100644 --- a/Lib/test/test_telnetlib.py +++ b/Lib/test/test_telnetlib.py @@ -5,9 +5,10 @@ import threading import contextlib from test import support +from test.support import socket_helper import unittest -HOST = support.HOST +HOST = socket_helper.HOST def server(evt, serv): serv.listen() @@ -26,7 +27,7 @@ class GeneralTests(unittest.TestCase): self.evt = threading.Event() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(60) # Safety net. Look issue 11812 - self.port = support.bind_port(self.sock) + self.port = socket_helper.bind_port(self.sock) self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) self.thread.setDaemon(True) self.thread.start() diff --git a/Lib/test/test_timeout.py b/Lib/test/test_timeout.py index ff41b13..c0952c7 100644 --- a/Lib/test/test_timeout.py +++ b/Lib/test/test_timeout.py @@ -3,6 +3,7 @@ import functools import unittest from test import support +from test.support import socket_helper # This requires the 'network' resource as given on the regrtest command line. skip_expected = not support.is_resource_enabled('network') @@ -110,7 +111,7 @@ class TimeoutTestCase(unittest.TestCase): # solution. fuzz = 2.0 - localhost = support.HOST + localhost = socket_helper.HOST def setUp(self): raise NotImplementedError() @@ -240,14 +241,14 @@ class TCPTimeoutTestCase(TimeoutTestCase): def testAcceptTimeout(self): # Test accept() timeout - support.bind_port(self.sock, self.localhost) + socket_helper.bind_port(self.sock, self.localhost) self.sock.listen() self._sock_operation(1, 1.5, 'accept') def testSend(self): # Test send() timeout with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv: - support.bind_port(serv, self.localhost) + socket_helper.bind_port(serv, self.localhost) serv.listen() self.sock.connect(serv.getsockname()) # Send a lot of data in order to bypass buffering in the TCP stack. @@ -256,7 +257,7 @@ class TCPTimeoutTestCase(TimeoutTestCase): def testSendto(self): # Test sendto() timeout with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv: - support.bind_port(serv, self.localhost) + socket_helper.bind_port(serv, self.localhost) serv.listen() self.sock.connect(serv.getsockname()) # The address argument is ignored since we already connected. @@ -266,7 +267,7 @@ class TCPTimeoutTestCase(TimeoutTestCase): def testSendall(self): # Test sendall() timeout with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv: - support.bind_port(serv, self.localhost) + socket_helper.bind_port(serv, self.localhost) serv.listen() self.sock.connect(serv.getsockname()) # Send a lot of data in order to bypass buffering in the TCP stack. @@ -285,7 +286,7 @@ class UDPTimeoutTestCase(TimeoutTestCase): def testRecvfromTimeout(self): # Test recvfrom() timeout # Prevent "Address already in use" socket exceptions - support.bind_port(self.sock, self.localhost) + socket_helper.bind_port(self.sock, self.localhost) self._sock_operation(1, 1.5, 'recvfrom', 1024) diff --git a/Lib/test/test_wsgiref.py b/Lib/test/test_wsgiref.py index 6af4514..4bf5d39 100644 --- a/Lib/test/test_wsgiref.py +++ b/Lib/test/test_wsgiref.py @@ -1,5 +1,6 @@ from unittest import mock from test import support +from test.support import socket_helper from test.test_httpservers import NoLogRequestHandler from unittest import TestCase from wsgiref.util import setup_testing_defaults @@ -263,7 +264,7 @@ class IntegrationTests(TestCase): class WsgiHandler(NoLogRequestHandler, WSGIRequestHandler): pass - server = make_server(support.HOST, 0, app, handler_class=WsgiHandler) + server = make_server(socket_helper.HOST, 0, app, handler_class=WsgiHandler) self.addCleanup(server.server_close) interrupted = threading.Event() diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index e5c3496..f68af52 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -15,6 +15,7 @@ import re import io import contextlib from test import support +from test.support import socket_helper from test.support import ALWAYS_EQ, LARGEST, SMALLEST try: @@ -334,7 +335,7 @@ class XMLRPCTestCase(unittest.TestCase): server.handle_request() # First request and attempt at second server.handle_request() # Retried second request - server = http.server.HTTPServer((support.HOST, 0), RequestHandler) + server = http.server.HTTPServer((socket_helper.HOST, 0), RequestHandler) self.addCleanup(server.server_close) thread = threading.Thread(target=run_server) thread.start() |