From 3e9751819ad13a965e8be13c1e5bc5a6811fe6b8 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Mon, 11 Dec 2017 10:04:40 -0500 Subject: bpo-32273: Move asyncio.test_utils to test.test_asyncio (#4785) --- Lib/asyncio/test_utils.py | 502 --------------------- Lib/test/test_asyncio/test_base_events.py | 15 +- Lib/test/test_asyncio/test_events.py | 4 +- Lib/test/test_asyncio/test_futures.py | 7 +- Lib/test/test_asyncio/test_locks.py | 2 +- Lib/test/test_asyncio/test_pep492.py | 7 +- Lib/test/test_asyncio/test_proactor_events.py | 2 +- Lib/test/test_asyncio/test_queues.py | 2 +- Lib/test/test_asyncio/test_selector_events.py | 2 +- Lib/test/test_asyncio/test_sslproto.py | 2 +- Lib/test/test_asyncio/test_streams.py | 2 +- Lib/test/test_asyncio/test_subprocess.py | 8 +- Lib/test/test_asyncio/test_tasks.py | 15 +- Lib/test/test_asyncio/test_unix_events.py | 2 +- Lib/test/test_asyncio/test_windows_events.py | 2 +- Lib/test/test_asyncio/test_windows_utils.py | 7 +- Lib/test/test_asyncio/utils.py | 502 +++++++++++++++++++++ .../2017-12-10-19-14-55.bpo-32273.5KKlCv.rst | 1 + 18 files changed, 526 insertions(+), 558 deletions(-) delete mode 100644 Lib/asyncio/test_utils.py create mode 100644 Lib/test/test_asyncio/utils.py create mode 100644 Misc/NEWS.d/next/Library/2017-12-10-19-14-55.bpo-32273.5KKlCv.rst diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py deleted file mode 100644 index 2319169..0000000 --- a/Lib/asyncio/test_utils.py +++ /dev/null @@ -1,502 +0,0 @@ -"""Utilities shared by tests.""" - -import collections -import contextlib -import io -import logging -import os -import re -import selectors -import socket -import socketserver -import sys -import tempfile -import threading -import time -import unittest -import weakref - -from unittest import mock - -from http.server import HTTPServer -from wsgiref.simple_server import WSGIRequestHandler, WSGIServer - -try: - import ssl -except ImportError: # pragma: no cover - ssl = None - -from . import base_events -from . import events -from . import futures -from . import tasks -from .log import logger -from test import support - - -def dummy_ssl_context(): - if ssl is None: - return None - else: - return ssl.SSLContext(ssl.PROTOCOL_TLS) - - -def run_briefly(loop): - async def once(): - pass - gen = once() - t = loop.create_task(gen) - # Don't log a warning if the task is not done after run_until_complete(). - # It occurs if the loop is stopped or if a task raises a BaseException. - t._log_destroy_pending = False - try: - loop.run_until_complete(t) - finally: - gen.close() - - -def run_until(loop, pred, timeout=30): - deadline = time.time() + timeout - while not pred(): - if timeout is not None: - timeout = deadline - time.time() - if timeout <= 0: - raise futures.TimeoutError() - loop.run_until_complete(tasks.sleep(0.001, loop=loop)) - - -def run_once(loop): - """Legacy API to run once through the event loop. - - This is the recommended pattern for test code. It will poll the - selector once and run all callbacks scheduled in response to I/O - events. - """ - loop.call_soon(loop.stop) - loop.run_forever() - - -class SilentWSGIRequestHandler(WSGIRequestHandler): - - def get_stderr(self): - return io.StringIO() - - def log_message(self, format, *args): - pass - - -class SilentWSGIServer(WSGIServer): - - request_timeout = 2 - - def get_request(self): - request, client_addr = super().get_request() - request.settimeout(self.request_timeout) - return request, client_addr - - def handle_error(self, request, client_address): - pass - - -class SSLWSGIServerMixin: - - def finish_request(self, request, client_address): - # The relative location of our test directory (which - # contains the ssl key and certificate files) differs - # between the stdlib and stand-alone asyncio. - # Prefer our own if we can find it. - here = os.path.join(os.path.dirname(__file__), '..', 'tests') - if not os.path.isdir(here): - here = os.path.join(os.path.dirname(os.__file__), - 'test', 'test_asyncio') - keyfile = os.path.join(here, 'ssl_key.pem') - certfile = os.path.join(here, 'ssl_cert.pem') - context = ssl.SSLContext() - context.load_cert_chain(certfile, keyfile) - - ssock = context.wrap_socket(request, server_side=True) - try: - self.RequestHandlerClass(ssock, client_address, self) - ssock.close() - except OSError: - # maybe socket has been closed by peer - pass - - -class SSLWSGIServer(SSLWSGIServerMixin, SilentWSGIServer): - pass - - -def _run_test_server(*, address, use_ssl=False, server_cls, server_ssl_cls): - - def app(environ, start_response): - status = '200 OK' - headers = [('Content-type', 'text/plain')] - start_response(status, headers) - return [b'Test message'] - - # Run the test WSGI server in a separate thread in order not to - # interfere with event handling in the main thread - server_class = server_ssl_cls if use_ssl else server_cls - httpd = server_class(address, SilentWSGIRequestHandler) - httpd.set_app(app) - httpd.address = httpd.server_address - server_thread = threading.Thread( - target=lambda: httpd.serve_forever(poll_interval=0.05)) - server_thread.start() - try: - yield httpd - finally: - httpd.shutdown() - httpd.server_close() - server_thread.join() - - -if hasattr(socket, 'AF_UNIX'): - - class UnixHTTPServer(socketserver.UnixStreamServer, HTTPServer): - - def server_bind(self): - socketserver.UnixStreamServer.server_bind(self) - self.server_name = '127.0.0.1' - self.server_port = 80 - - - class UnixWSGIServer(UnixHTTPServer, WSGIServer): - - request_timeout = 2 - - def server_bind(self): - UnixHTTPServer.server_bind(self) - self.setup_environ() - - def get_request(self): - request, client_addr = super().get_request() - request.settimeout(self.request_timeout) - # Code in the stdlib expects that get_request - # will return a socket and a tuple (host, port). - # However, this isn't true for UNIX sockets, - # as the second return value will be a path; - # hence we return some fake data sufficient - # to get the tests going - return request, ('127.0.0.1', '') - - - class SilentUnixWSGIServer(UnixWSGIServer): - - def handle_error(self, request, client_address): - pass - - - class UnixSSLWSGIServer(SSLWSGIServerMixin, SilentUnixWSGIServer): - pass - - - def gen_unix_socket_path(): - with tempfile.NamedTemporaryFile() as file: - return file.name - - - @contextlib.contextmanager - def unix_socket_path(): - path = gen_unix_socket_path() - try: - yield path - finally: - try: - os.unlink(path) - except OSError: - pass - - - @contextlib.contextmanager - def run_test_unix_server(*, use_ssl=False): - with unix_socket_path() as path: - yield from _run_test_server(address=path, use_ssl=use_ssl, - server_cls=SilentUnixWSGIServer, - server_ssl_cls=UnixSSLWSGIServer) - - -@contextlib.contextmanager -def run_test_server(*, host='127.0.0.1', port=0, use_ssl=False): - yield from _run_test_server(address=(host, port), use_ssl=use_ssl, - server_cls=SilentWSGIServer, - server_ssl_cls=SSLWSGIServer) - - -def make_test_protocol(base): - dct = {} - for name in dir(base): - if name.startswith('__') and name.endswith('__'): - # skip magic names - continue - dct[name] = MockCallback(return_value=None) - return type('TestProtocol', (base,) + base.__bases__, dct)() - - -class TestSelector(selectors.BaseSelector): - - def __init__(self): - self.keys = {} - - def register(self, fileobj, events, data=None): - key = selectors.SelectorKey(fileobj, 0, events, data) - self.keys[fileobj] = key - return key - - def unregister(self, fileobj): - return self.keys.pop(fileobj) - - def select(self, timeout): - return [] - - def get_map(self): - return self.keys - - -class TestLoop(base_events.BaseEventLoop): - """Loop for unittests. - - It manages self time directly. - If something scheduled to be executed later then - on next loop iteration after all ready handlers done - generator passed to __init__ is calling. - - Generator should be like this: - - def gen(): - ... - when = yield ... - ... = yield time_advance - - Value returned by yield is absolute time of next scheduled handler. - Value passed to yield is time advance to move loop's time forward. - """ - - def __init__(self, gen=None): - super().__init__() - - if gen is None: - def gen(): - yield - self._check_on_close = False - else: - self._check_on_close = True - - self._gen = gen() - next(self._gen) - self._time = 0 - self._clock_resolution = 1e-9 - self._timers = [] - self._selector = TestSelector() - - self.readers = {} - self.writers = {} - self.reset_counters() - - self._transports = weakref.WeakValueDictionary() - - def time(self): - return self._time - - def advance_time(self, advance): - """Move test time forward.""" - if advance: - self._time += advance - - def close(self): - super().close() - if self._check_on_close: - try: - self._gen.send(0) - except StopIteration: - pass - else: # pragma: no cover - raise AssertionError("Time generator is not finished") - - def _add_reader(self, fd, callback, *args): - self.readers[fd] = events.Handle(callback, args, self) - - def _remove_reader(self, fd): - self.remove_reader_count[fd] += 1 - if fd in self.readers: - del self.readers[fd] - return True - else: - return False - - def assert_reader(self, fd, callback, *args): - assert fd in self.readers, 'fd {} is not registered'.format(fd) - handle = self.readers[fd] - assert handle._callback == callback, '{!r} != {!r}'.format( - handle._callback, callback) - assert handle._args == args, '{!r} != {!r}'.format( - handle._args, args) - - def _add_writer(self, fd, callback, *args): - self.writers[fd] = events.Handle(callback, args, self) - - def _remove_writer(self, fd): - self.remove_writer_count[fd] += 1 - if fd in self.writers: - del self.writers[fd] - return True - else: - return False - - def assert_writer(self, fd, callback, *args): - assert fd in self.writers, 'fd {} is not registered'.format(fd) - handle = self.writers[fd] - assert handle._callback == callback, '{!r} != {!r}'.format( - handle._callback, callback) - assert handle._args == args, '{!r} != {!r}'.format( - handle._args, args) - - def _ensure_fd_no_transport(self, fd): - if not isinstance(fd, int): - try: - fd = int(fd.fileno()) - except (AttributeError, TypeError, ValueError): - # This code matches selectors._fileobj_to_fd function. - raise ValueError("Invalid file object: " - "{!r}".format(fd)) from None - try: - transport = self._transports[fd] - except KeyError: - pass - else: - raise RuntimeError( - 'File descriptor {!r} is used by transport {!r}'.format( - fd, transport)) - - def add_reader(self, fd, callback, *args): - """Add a reader callback.""" - self._ensure_fd_no_transport(fd) - return self._add_reader(fd, callback, *args) - - def remove_reader(self, fd): - """Remove a reader callback.""" - self._ensure_fd_no_transport(fd) - return self._remove_reader(fd) - - def add_writer(self, fd, callback, *args): - """Add a writer callback..""" - self._ensure_fd_no_transport(fd) - return self._add_writer(fd, callback, *args) - - def remove_writer(self, fd): - """Remove a writer callback.""" - self._ensure_fd_no_transport(fd) - return self._remove_writer(fd) - - def reset_counters(self): - self.remove_reader_count = collections.defaultdict(int) - self.remove_writer_count = collections.defaultdict(int) - - def _run_once(self): - super()._run_once() - for when in self._timers: - advance = self._gen.send(when) - self.advance_time(advance) - self._timers = [] - - def call_at(self, when, callback, *args): - self._timers.append(when) - return super().call_at(when, callback, *args) - - def _process_events(self, event_list): - return - - def _write_to_self(self): - pass - - -def MockCallback(**kwargs): - return mock.Mock(spec=['__call__'], **kwargs) - - -class MockPattern(str): - """A regex based str with a fuzzy __eq__. - - Use this helper with 'mock.assert_called_with', or anywhere - where a regex comparison between strings is needed. - - For instance: - mock_call.assert_called_with(MockPattern('spam.*ham')) - """ - def __eq__(self, other): - return bool(re.search(str(self), other, re.S)) - - -def get_function_source(func): - source = events._get_function_source(func) - if source is None: - raise ValueError("unable to get the source of %r" % (func,)) - return source - - -class TestCase(unittest.TestCase): - @staticmethod - def close_loop(loop): - executor = loop._default_executor - if executor is not None: - executor.shutdown(wait=True) - loop.close() - - def set_event_loop(self, loop, *, cleanup=True): - assert loop is not None - # ensure that the event loop is passed explicitly in asyncio - events.set_event_loop(None) - if cleanup: - self.addCleanup(self.close_loop, loop) - - def new_test_loop(self, gen=None): - loop = TestLoop(gen) - self.set_event_loop(loop) - return loop - - def unpatch_get_running_loop(self): - events._get_running_loop = self._get_running_loop - - def setUp(self): - self._get_running_loop = events._get_running_loop - events._get_running_loop = lambda: None - self._thread_cleanup = support.threading_setup() - - def tearDown(self): - self.unpatch_get_running_loop() - - events.set_event_loop(None) - - # Detect CPython bug #23353: ensure that yield/yield-from is not used - # in an except block of a generator - self.assertEqual(sys.exc_info(), (None, None, None)) - - self.doCleanups() - support.threading_cleanup(*self._thread_cleanup) - support.reap_children() - - -@contextlib.contextmanager -def disable_logger(): - """Context manager to disable asyncio logger. - - For example, it can be used to ignore warnings in debug mode. - """ - old_level = logger.level - try: - logger.setLevel(logging.CRITICAL+1) - yield - finally: - logger.setLevel(old_level) - - -def mock_nonblocking_socket(proto=socket.IPPROTO_TCP, type=socket.SOCK_STREAM, - family=socket.AF_INET): - """Create a mock of a non-blocking socket.""" - sock = mock.MagicMock(socket.socket) - sock.proto = proto - sock.type = type - sock.family = family - sock.gettimeout.return_value = 0.0 - return sock diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 6561a9d..8b08ed1 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -14,18 +14,9 @@ from unittest import mock import asyncio from asyncio import base_events from asyncio import constants -from asyncio import test_utils -try: - from test import support -except ImportError: - from asyncio import test_support as support -try: - from test.support.script_helper import assert_python_ok -except ImportError: - try: - from test.script_helper import assert_python_ok - except ImportError: - from asyncio.test_support import assert_python_ok +from test.test_asyncio import utils as test_utils +from test import support +from test.support.script_helper import assert_python_ok MOCK_ANY = mock.ANY diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 0130024..39d5bb5 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -3,7 +3,6 @@ import collections.abc import concurrent.futures import functools -import gc import io import os import platform @@ -30,8 +29,7 @@ import asyncio from asyncio import coroutines from asyncio import proactor_events from asyncio import selector_events -from asyncio import sslproto -from asyncio import test_utils +from test.test_asyncio import utils as test_utils try: from test import support except ImportError: diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py index 4320a90..444d1df 100644 --- a/Lib/test/test_asyncio/test_futures.py +++ b/Lib/test/test_asyncio/test_futures.py @@ -9,12 +9,9 @@ import unittest from unittest import mock import asyncio -from asyncio import test_utils from asyncio import futures -try: - from test import support -except ImportError: - from asyncio import test_support as support +from test.test_asyncio import utils as test_utils +from test import support def _fakefunc(f): diff --git a/Lib/test/test_asyncio/test_locks.py b/Lib/test/test_asyncio/test_locks.py index 78d80ec..3c50697 100644 --- a/Lib/test/test_asyncio/test_locks.py +++ b/Lib/test/test_asyncio/test_locks.py @@ -5,7 +5,7 @@ from unittest import mock import re import asyncio -from asyncio import test_utils +from test.test_asyncio import utils as test_utils STR_RGX_REPR = ( r'^<(?P.*?) object at (?P
.*?)' diff --git a/Lib/test/test_asyncio/test_pep492.py b/Lib/test/test_asyncio/test_pep492.py index 4425770..8dd0c25 100644 --- a/Lib/test/test_asyncio/test_pep492.py +++ b/Lib/test/test_asyncio/test_pep492.py @@ -3,14 +3,11 @@ import types import unittest -try: - from test import support -except ImportError: - from asyncio import test_support as support +from test import support from unittest import mock import asyncio -from asyncio import test_utils +from test.test_asyncio import utils as test_utils class BaseTest(test_utils.TestCase): diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index def08b9..7ccc681 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -9,7 +9,7 @@ from asyncio.proactor_events import BaseProactorEventLoop from asyncio.proactor_events import _ProactorSocketTransport from asyncio.proactor_events import _ProactorWritePipeTransport from asyncio.proactor_events import _ProactorDuplexPipeTransport -from asyncio import test_utils +from test.test_asyncio import utils as test_utils def close_transport(transport): diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 3b66d61..8d78546 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -4,7 +4,7 @@ import unittest from unittest import mock import asyncio -from asyncio import test_utils +from test.test_asyncio import utils as test_utils class _QueueTestBase(test_utils.TestCase): diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 616eb6f..24feb30 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -11,11 +11,11 @@ except ImportError: ssl = None import asyncio -from asyncio import test_utils from asyncio.selector_events import BaseSelectorEventLoop from asyncio.selector_events import _SelectorTransport from asyncio.selector_events import _SelectorSocketTransport from asyncio.selector_events import _SelectorDatagramTransport +from test.test_asyncio import utils as test_utils MOCK_ANY = mock.ANY diff --git a/Lib/test/test_asyncio/test_sslproto.py b/Lib/test/test_asyncio/test_sslproto.py index f573ae8..7650fe6 100644 --- a/Lib/test/test_asyncio/test_sslproto.py +++ b/Lib/test/test_asyncio/test_sslproto.py @@ -11,7 +11,7 @@ except ImportError: import asyncio from asyncio import log from asyncio import sslproto -from asyncio import test_utils +from test.test_asyncio import utils as test_utils @unittest.skipIf(ssl is None, 'No ssl module') diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 1927d73..7a0762c 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -16,7 +16,7 @@ except ImportError: ssl = None import asyncio -from asyncio import test_utils +from test.test_asyncio import utils as test_utils class StreamReaderTests(test_utils.TestCase): diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index ad4bb14..81b08d6 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -7,11 +7,9 @@ from unittest import mock import asyncio from asyncio import base_subprocess from asyncio import subprocess -from asyncio import test_utils -try: - from test import support -except ImportError: - from asyncio import test_support as support +from test.test_asyncio import utils as test_utils +from test import support + if sys.platform != 'win32': from asyncio import unix_events diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index cdc882a..071481d 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -17,18 +17,9 @@ import asyncio from asyncio import coroutines from asyncio import futures from asyncio import tasks -from asyncio import test_utils -try: - from test import support -except ImportError: - from asyncio import test_support as support -try: - from test.support.script_helper import assert_python_ok -except ImportError: - try: - from test.script_helper import assert_python_ok - except ImportError: - from asyncio.test_support import assert_python_ok +from test.test_asyncio import utils as test_utils +from test import support +from test.support.script_helper import assert_python_ok @asyncio.coroutine diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 6746b34..e13cc37 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -21,8 +21,8 @@ if sys.platform == 'win32': import asyncio from asyncio import log -from asyncio import test_utils from asyncio import unix_events +from test.test_asyncio import utils as test_utils MOCK_ANY = mock.ANY diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index de6fe12..fdba636 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -11,8 +11,8 @@ import _overlapped import _winapi import asyncio -from asyncio import test_utils from asyncio import windows_events +from test.test_asyncio import utils as test_utils class UpperProto(asyncio.Protocol): diff --git a/Lib/test/test_asyncio/test_windows_utils.py b/Lib/test/test_asyncio/test_windows_utils.py index 952f95e..9fc3858 100644 --- a/Lib/test/test_asyncio/test_windows_utils.py +++ b/Lib/test/test_asyncio/test_windows_utils.py @@ -1,10 +1,8 @@ """Tests for window_utils""" -import socket import sys import unittest import warnings -from unittest import mock if sys.platform != 'win32': raise unittest.SkipTest('Windows only') @@ -13,10 +11,7 @@ import _overlapped import _winapi from asyncio import windows_utils -try: - from test import support -except ImportError: - from asyncio import test_support as support +from test import support class PipeTests(unittest.TestCase): diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py new file mode 100644 index 0000000..560db9f --- /dev/null +++ b/Lib/test/test_asyncio/utils.py @@ -0,0 +1,502 @@ +"""Utilities shared by tests.""" + +import collections +import contextlib +import io +import logging +import os +import re +import selectors +import socket +import socketserver +import sys +import tempfile +import threading +import time +import unittest +import weakref + +from unittest import mock + +from http.server import HTTPServer +from wsgiref.simple_server import WSGIRequestHandler, WSGIServer + +try: + import ssl +except ImportError: # pragma: no cover + ssl = None + +from asyncio import base_events +from asyncio import events +from asyncio import futures +from asyncio import tasks +from asyncio.log import logger +from test import support + + +def dummy_ssl_context(): + if ssl is None: + return None + else: + return ssl.SSLContext(ssl.PROTOCOL_TLS) + + +def run_briefly(loop): + async def once(): + pass + gen = once() + t = loop.create_task(gen) + # Don't log a warning if the task is not done after run_until_complete(). + # It occurs if the loop is stopped or if a task raises a BaseException. + t._log_destroy_pending = False + try: + loop.run_until_complete(t) + finally: + gen.close() + + +def run_until(loop, pred, timeout=30): + deadline = time.time() + timeout + while not pred(): + if timeout is not None: + timeout = deadline - time.time() + if timeout <= 0: + raise futures.TimeoutError() + loop.run_until_complete(tasks.sleep(0.001, loop=loop)) + + +def run_once(loop): + """Legacy API to run once through the event loop. + + This is the recommended pattern for test code. It will poll the + selector once and run all callbacks scheduled in response to I/O + events. + """ + loop.call_soon(loop.stop) + loop.run_forever() + + +class SilentWSGIRequestHandler(WSGIRequestHandler): + + def get_stderr(self): + return io.StringIO() + + def log_message(self, format, *args): + pass + + +class SilentWSGIServer(WSGIServer): + + request_timeout = 2 + + def get_request(self): + request, client_addr = super().get_request() + request.settimeout(self.request_timeout) + return request, client_addr + + def handle_error(self, request, client_address): + pass + + +class SSLWSGIServerMixin: + + def finish_request(self, request, client_address): + # The relative location of our test directory (which + # contains the ssl key and certificate files) differs + # between the stdlib and stand-alone asyncio. + # Prefer our own if we can find it. + here = os.path.join(os.path.dirname(__file__), '..', 'tests') + if not os.path.isdir(here): + here = os.path.join(os.path.dirname(os.__file__), + 'test', 'test_asyncio') + keyfile = os.path.join(here, 'ssl_key.pem') + certfile = os.path.join(here, 'ssl_cert.pem') + context = ssl.SSLContext() + context.load_cert_chain(certfile, keyfile) + + ssock = context.wrap_socket(request, server_side=True) + try: + self.RequestHandlerClass(ssock, client_address, self) + ssock.close() + except OSError: + # maybe socket has been closed by peer + pass + + +class SSLWSGIServer(SSLWSGIServerMixin, SilentWSGIServer): + pass + + +def _run_test_server(*, address, use_ssl=False, server_cls, server_ssl_cls): + + def app(environ, start_response): + status = '200 OK' + headers = [('Content-type', 'text/plain')] + start_response(status, headers) + return [b'Test message'] + + # Run the test WSGI server in a separate thread in order not to + # interfere with event handling in the main thread + server_class = server_ssl_cls if use_ssl else server_cls + httpd = server_class(address, SilentWSGIRequestHandler) + httpd.set_app(app) + httpd.address = httpd.server_address + server_thread = threading.Thread( + target=lambda: httpd.serve_forever(poll_interval=0.05)) + server_thread.start() + try: + yield httpd + finally: + httpd.shutdown() + httpd.server_close() + server_thread.join() + + +if hasattr(socket, 'AF_UNIX'): + + class UnixHTTPServer(socketserver.UnixStreamServer, HTTPServer): + + def server_bind(self): + socketserver.UnixStreamServer.server_bind(self) + self.server_name = '127.0.0.1' + self.server_port = 80 + + + class UnixWSGIServer(UnixHTTPServer, WSGIServer): + + request_timeout = 2 + + def server_bind(self): + UnixHTTPServer.server_bind(self) + self.setup_environ() + + def get_request(self): + request, client_addr = super().get_request() + request.settimeout(self.request_timeout) + # Code in the stdlib expects that get_request + # will return a socket and a tuple (host, port). + # However, this isn't true for UNIX sockets, + # as the second return value will be a path; + # hence we return some fake data sufficient + # to get the tests going + return request, ('127.0.0.1', '') + + + class SilentUnixWSGIServer(UnixWSGIServer): + + def handle_error(self, request, client_address): + pass + + + class UnixSSLWSGIServer(SSLWSGIServerMixin, SilentUnixWSGIServer): + pass + + + def gen_unix_socket_path(): + with tempfile.NamedTemporaryFile() as file: + return file.name + + + @contextlib.contextmanager + def unix_socket_path(): + path = gen_unix_socket_path() + try: + yield path + finally: + try: + os.unlink(path) + except OSError: + pass + + + @contextlib.contextmanager + def run_test_unix_server(*, use_ssl=False): + with unix_socket_path() as path: + yield from _run_test_server(address=path, use_ssl=use_ssl, + server_cls=SilentUnixWSGIServer, + server_ssl_cls=UnixSSLWSGIServer) + + +@contextlib.contextmanager +def run_test_server(*, host='127.0.0.1', port=0, use_ssl=False): + yield from _run_test_server(address=(host, port), use_ssl=use_ssl, + server_cls=SilentWSGIServer, + server_ssl_cls=SSLWSGIServer) + + +def make_test_protocol(base): + dct = {} + for name in dir(base): + if name.startswith('__') and name.endswith('__'): + # skip magic names + continue + dct[name] = MockCallback(return_value=None) + return type('TestProtocol', (base,) + base.__bases__, dct)() + + +class TestSelector(selectors.BaseSelector): + + def __init__(self): + self.keys = {} + + def register(self, fileobj, events, data=None): + key = selectors.SelectorKey(fileobj, 0, events, data) + self.keys[fileobj] = key + return key + + def unregister(self, fileobj): + return self.keys.pop(fileobj) + + def select(self, timeout): + return [] + + def get_map(self): + return self.keys + + +class TestLoop(base_events.BaseEventLoop): + """Loop for unittests. + + It manages self time directly. + If something scheduled to be executed later then + on next loop iteration after all ready handlers done + generator passed to __init__ is calling. + + Generator should be like this: + + def gen(): + ... + when = yield ... + ... = yield time_advance + + Value returned by yield is absolute time of next scheduled handler. + Value passed to yield is time advance to move loop's time forward. + """ + + def __init__(self, gen=None): + super().__init__() + + if gen is None: + def gen(): + yield + self._check_on_close = False + else: + self._check_on_close = True + + self._gen = gen() + next(self._gen) + self._time = 0 + self._clock_resolution = 1e-9 + self._timers = [] + self._selector = TestSelector() + + self.readers = {} + self.writers = {} + self.reset_counters() + + self._transports = weakref.WeakValueDictionary() + + def time(self): + return self._time + + def advance_time(self, advance): + """Move test time forward.""" + if advance: + self._time += advance + + def close(self): + super().close() + if self._check_on_close: + try: + self._gen.send(0) + except StopIteration: + pass + else: # pragma: no cover + raise AssertionError("Time generator is not finished") + + def _add_reader(self, fd, callback, *args): + self.readers[fd] = events.Handle(callback, args, self) + + def _remove_reader(self, fd): + self.remove_reader_count[fd] += 1 + if fd in self.readers: + del self.readers[fd] + return True + else: + return False + + def assert_reader(self, fd, callback, *args): + assert fd in self.readers, 'fd {} is not registered'.format(fd) + handle = self.readers[fd] + assert handle._callback == callback, '{!r} != {!r}'.format( + handle._callback, callback) + assert handle._args == args, '{!r} != {!r}'.format( + handle._args, args) + + def _add_writer(self, fd, callback, *args): + self.writers[fd] = events.Handle(callback, args, self) + + def _remove_writer(self, fd): + self.remove_writer_count[fd] += 1 + if fd in self.writers: + del self.writers[fd] + return True + else: + return False + + def assert_writer(self, fd, callback, *args): + assert fd in self.writers, 'fd {} is not registered'.format(fd) + handle = self.writers[fd] + assert handle._callback == callback, '{!r} != {!r}'.format( + handle._callback, callback) + assert handle._args == args, '{!r} != {!r}'.format( + handle._args, args) + + def _ensure_fd_no_transport(self, fd): + if not isinstance(fd, int): + try: + fd = int(fd.fileno()) + except (AttributeError, TypeError, ValueError): + # This code matches selectors._fileobj_to_fd function. + raise ValueError("Invalid file object: " + "{!r}".format(fd)) from None + try: + transport = self._transports[fd] + except KeyError: + pass + else: + raise RuntimeError( + 'File descriptor {!r} is used by transport {!r}'.format( + fd, transport)) + + def add_reader(self, fd, callback, *args): + """Add a reader callback.""" + self._ensure_fd_no_transport(fd) + return self._add_reader(fd, callback, *args) + + def remove_reader(self, fd): + """Remove a reader callback.""" + self._ensure_fd_no_transport(fd) + return self._remove_reader(fd) + + def add_writer(self, fd, callback, *args): + """Add a writer callback..""" + self._ensure_fd_no_transport(fd) + return self._add_writer(fd, callback, *args) + + def remove_writer(self, fd): + """Remove a writer callback.""" + self._ensure_fd_no_transport(fd) + return self._remove_writer(fd) + + def reset_counters(self): + self.remove_reader_count = collections.defaultdict(int) + self.remove_writer_count = collections.defaultdict(int) + + def _run_once(self): + super()._run_once() + for when in self._timers: + advance = self._gen.send(when) + self.advance_time(advance) + self._timers = [] + + def call_at(self, when, callback, *args): + self._timers.append(when) + return super().call_at(when, callback, *args) + + def _process_events(self, event_list): + return + + def _write_to_self(self): + pass + + +def MockCallback(**kwargs): + return mock.Mock(spec=['__call__'], **kwargs) + + +class MockPattern(str): + """A regex based str with a fuzzy __eq__. + + Use this helper with 'mock.assert_called_with', or anywhere + where a regex comparison between strings is needed. + + For instance: + mock_call.assert_called_with(MockPattern('spam.*ham')) + """ + def __eq__(self, other): + return bool(re.search(str(self), other, re.S)) + + +def get_function_source(func): + source = events._get_function_source(func) + if source is None: + raise ValueError("unable to get the source of %r" % (func,)) + return source + + +class TestCase(unittest.TestCase): + @staticmethod + def close_loop(loop): + executor = loop._default_executor + if executor is not None: + executor.shutdown(wait=True) + loop.close() + + def set_event_loop(self, loop, *, cleanup=True): + assert loop is not None + # ensure that the event loop is passed explicitly in asyncio + events.set_event_loop(None) + if cleanup: + self.addCleanup(self.close_loop, loop) + + def new_test_loop(self, gen=None): + loop = TestLoop(gen) + self.set_event_loop(loop) + return loop + + def unpatch_get_running_loop(self): + events._get_running_loop = self._get_running_loop + + def setUp(self): + self._get_running_loop = events._get_running_loop + events._get_running_loop = lambda: None + self._thread_cleanup = support.threading_setup() + + def tearDown(self): + self.unpatch_get_running_loop() + + events.set_event_loop(None) + + # Detect CPython bug #23353: ensure that yield/yield-from is not used + # in an except block of a generator + self.assertEqual(sys.exc_info(), (None, None, None)) + + self.doCleanups() + support.threading_cleanup(*self._thread_cleanup) + support.reap_children() + + +@contextlib.contextmanager +def disable_logger(): + """Context manager to disable asyncio logger. + + For example, it can be used to ignore warnings in debug mode. + """ + old_level = logger.level + try: + logger.setLevel(logging.CRITICAL+1) + yield + finally: + logger.setLevel(old_level) + + +def mock_nonblocking_socket(proto=socket.IPPROTO_TCP, type=socket.SOCK_STREAM, + family=socket.AF_INET): + """Create a mock of a non-blocking socket.""" + sock = mock.MagicMock(socket.socket) + sock.proto = proto + sock.type = type + sock.family = family + sock.gettimeout.return_value = 0.0 + return sock diff --git a/Misc/NEWS.d/next/Library/2017-12-10-19-14-55.bpo-32273.5KKlCv.rst b/Misc/NEWS.d/next/Library/2017-12-10-19-14-55.bpo-32273.5KKlCv.rst new file mode 100644 index 0000000..3bbec8f --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-12-10-19-14-55.bpo-32273.5KKlCv.rst @@ -0,0 +1 @@ +Move asyncio.test_utils to test.test_asyncio. -- cgit v0.12