summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/functional.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asyncio/functional.py')
-rw-r--r--Lib/test/test_asyncio/functional.py274
1 files changed, 0 insertions, 274 deletions
diff --git a/Lib/test/test_asyncio/functional.py b/Lib/test/test_asyncio/functional.py
deleted file mode 100644
index 5cd0659..0000000
--- a/Lib/test/test_asyncio/functional.py
+++ /dev/null
@@ -1,274 +0,0 @@
-import asyncio
-import asyncio.events
-import contextlib
-import os
-import pprint
-import select
-import socket
-import tempfile
-import threading
-from test import support
-
-
-class FunctionalTestCaseMixin:
-
- def new_loop(self):
- return asyncio.new_event_loop()
-
- def run_loop_briefly(self, *, delay=0.01):
- self.loop.run_until_complete(asyncio.sleep(delay))
-
- def loop_exception_handler(self, loop, context):
- self.__unhandled_exceptions.append(context)
- self.loop.default_exception_handler(context)
-
- def setUp(self):
- self.loop = self.new_loop()
- asyncio.set_event_loop(None)
-
- self.loop.set_exception_handler(self.loop_exception_handler)
- self.__unhandled_exceptions = []
-
- # Disable `_get_running_loop`.
- self._old_get_running_loop = asyncio.events._get_running_loop
- asyncio.events._get_running_loop = lambda: None
-
- def tearDown(self):
- try:
- self.loop.close()
-
- if self.__unhandled_exceptions:
- print('Unexpected calls to loop.call_exception_handler():')
- pprint.pprint(self.__unhandled_exceptions)
- self.fail('unexpected calls to loop.call_exception_handler()')
-
- finally:
- asyncio.events._get_running_loop = self._old_get_running_loop
- asyncio.set_event_loop(None)
- self.loop = None
-
- def tcp_server(self, server_prog, *,
- family=socket.AF_INET,
- addr=None,
- timeout=support.LOOPBACK_TIMEOUT,
- backlog=1,
- max_clients=10):
-
- if addr is None:
- if hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
- with tempfile.NamedTemporaryFile() as tmp:
- addr = tmp.name
- else:
- addr = ('127.0.0.1', 0)
-
- sock = socket.create_server(addr, family=family, backlog=backlog)
- if timeout is None:
- raise RuntimeError('timeout is required')
- if timeout <= 0:
- raise RuntimeError('only blocking sockets are supported')
- sock.settimeout(timeout)
-
- return TestThreadedServer(
- self, sock, server_prog, timeout, max_clients)
-
- def tcp_client(self, client_prog,
- family=socket.AF_INET,
- timeout=support.LOOPBACK_TIMEOUT):
-
- sock = socket.socket(family, socket.SOCK_STREAM)
-
- if timeout is None:
- raise RuntimeError('timeout is required')
- if timeout <= 0:
- raise RuntimeError('only blocking sockets are supported')
- sock.settimeout(timeout)
-
- return TestThreadedClient(
- self, sock, client_prog, timeout)
-
- def unix_server(self, *args, **kwargs):
- if not hasattr(socket, 'AF_UNIX'):
- raise NotImplementedError
- return self.tcp_server(*args, family=socket.AF_UNIX, **kwargs)
-
- def unix_client(self, *args, **kwargs):
- if not hasattr(socket, 'AF_UNIX'):
- raise NotImplementedError
- return self.tcp_client(*args, family=socket.AF_UNIX, **kwargs)
-
- @contextlib.contextmanager
- def unix_sock_name(self):
- with tempfile.TemporaryDirectory() as td:
- fn = os.path.join(td, 'sock')
- try:
- yield fn
- finally:
- try:
- os.unlink(fn)
- except OSError:
- pass
-
- def _abort_socket_test(self, ex):
- try:
- self.loop.stop()
- finally:
- self.fail(ex)
-
-
-##############################################################################
-# Socket Testing Utilities
-##############################################################################
-
-
-class TestSocketWrapper:
-
- def __init__(self, sock):
- self.__sock = sock
-
- def recv_all(self, n):
- buf = b''
- while len(buf) < n:
- data = self.recv(n - len(buf))
- if data == b'':
- raise ConnectionAbortedError
- buf += data
- return buf
-
- def start_tls(self, ssl_context, *,
- server_side=False,
- server_hostname=None):
-
- ssl_sock = ssl_context.wrap_socket(
- self.__sock, server_side=server_side,
- server_hostname=server_hostname,
- do_handshake_on_connect=False)
-
- try:
- ssl_sock.do_handshake()
- except:
- ssl_sock.close()
- raise
- finally:
- self.__sock.close()
-
- self.__sock = ssl_sock
-
- def __getattr__(self, name):
- return getattr(self.__sock, name)
-
- def __repr__(self):
- return '<{} {!r}>'.format(type(self).__name__, self.__sock)
-
-
-class SocketThread(threading.Thread):
-
- def stop(self):
- self._active = False
- self.join()
-
- def __enter__(self):
- self.start()
- return self
-
- def __exit__(self, *exc):
- self.stop()
-
-
-class TestThreadedClient(SocketThread):
-
- def __init__(self, test, sock, prog, timeout):
- threading.Thread.__init__(self, None, None, 'test-client')
- self.daemon = True
-
- self._timeout = timeout
- self._sock = sock
- self._active = True
- self._prog = prog
- self._test = test
-
- def run(self):
- try:
- self._prog(TestSocketWrapper(self._sock))
- except Exception as ex:
- self._test._abort_socket_test(ex)
-
-
-class TestThreadedServer(SocketThread):
-
- def __init__(self, test, sock, prog, timeout, max_clients):
- threading.Thread.__init__(self, None, None, 'test-server')
- self.daemon = True
-
- self._clients = 0
- self._finished_clients = 0
- self._max_clients = max_clients
- self._timeout = timeout
- self._sock = sock
- self._active = True
-
- self._prog = prog
-
- self._s1, self._s2 = socket.socketpair()
- self._s1.setblocking(False)
-
- self._test = test
-
- def stop(self):
- try:
- if self._s2 and self._s2.fileno() != -1:
- try:
- self._s2.send(b'stop')
- except OSError:
- pass
- finally:
- super().stop()
-
- def run(self):
- try:
- with self._sock:
- self._sock.setblocking(False)
- self._run()
- finally:
- self._s1.close()
- self._s2.close()
-
- def _run(self):
- while self._active:
- if self._clients >= self._max_clients:
- return
-
- r, w, x = select.select(
- [self._sock, self._s1], [], [], self._timeout)
-
- if self._s1 in r:
- return
-
- if self._sock in r:
- try:
- conn, addr = self._sock.accept()
- except BlockingIOError:
- continue
- except socket.timeout:
- if not self._active:
- return
- else:
- raise
- else:
- self._clients += 1
- conn.settimeout(self._timeout)
- try:
- with conn:
- self._handle_client(conn)
- except Exception as ex:
- self._active = False
- try:
- raise
- finally:
- self._test._abort_socket_test(ex)
-
- def _handle_client(self, sock):
- self._prog(TestSocketWrapper(sock))
-
- @property
- def addr(self):
- return self._sock.getsockname()