summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
authorGuido van Rossum <guido@dropbox.com>2013-10-17 20:40:50 (GMT)
committerGuido van Rossum <guido@dropbox.com>2013-10-17 20:40:50 (GMT)
commit27b7c7ebf1039e96cac41b6330cf16b5632d9e49 (patch)
tree814505b0f9d02a5cabdec733dcde70250b04ee28 /Lib/asyncio
parent5b37f97ea5ac9f6b33b0e0269c69539cbb478142 (diff)
downloadcpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.zip
cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.tar.gz
cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.tar.bz2
Initial checkin of asyncio package (== Tulip, == PEP 3156).
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/__init__.py33
-rw-r--r--Lib/asyncio/base_events.py606
-rw-r--r--Lib/asyncio/constants.py4
-rw-r--r--Lib/asyncio/events.py395
-rw-r--r--Lib/asyncio/futures.py338
-rw-r--r--Lib/asyncio/locks.py401
-rw-r--r--Lib/asyncio/log.py6
-rw-r--r--Lib/asyncio/proactor_events.py352
-rw-r--r--Lib/asyncio/protocols.py98
-rw-r--r--Lib/asyncio/queues.py284
-rw-r--r--Lib/asyncio/selector_events.py769
-rw-r--r--Lib/asyncio/streams.py257
-rw-r--r--Lib/asyncio/tasks.py636
-rw-r--r--Lib/asyncio/test_utils.py246
-rw-r--r--Lib/asyncio/transports.py186
-rw-r--r--Lib/asyncio/unix_events.py541
-rw-r--r--Lib/asyncio/windows_events.py375
-rw-r--r--Lib/asyncio/windows_utils.py181
18 files changed, 5708 insertions, 0 deletions
diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py
new file mode 100644
index 0000000..afc444d
--- /dev/null
+++ b/Lib/asyncio/__init__.py
@@ -0,0 +1,33 @@
+"""The asyncio package, tracking PEP 3156."""
+
+import sys
+
+# The selectors module is in the stdlib in Python 3.4 but not in 3.3.
+# Do this first, so the other submodules can use "from . import selectors".
+try:
+ import selectors # Will also be exported.
+except ImportError:
+ from . import selectors
+
+# This relies on each of the submodules having an __all__ variable.
+from .futures import *
+from .events import *
+from .locks import *
+from .transports import *
+from .protocols import *
+from .streams import *
+from .tasks import *
+
+if sys.platform == 'win32': # pragma: no cover
+ from .windows_events import *
+else:
+ from .unix_events import * # pragma: no cover
+
+
+__all__ = (futures.__all__ +
+ events.__all__ +
+ locks.__all__ +
+ transports.__all__ +
+ protocols.__all__ +
+ streams.__all__ +
+ tasks.__all__)
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
new file mode 100644
index 0000000..32457eb
--- /dev/null
+++ b/Lib/asyncio/base_events.py
@@ -0,0 +1,606 @@
+"""Base implementation of event loop.
+
+The event loop can be broken up into a multiplexer (the part
+responsible for notifying us of IO events) and the event loop proper,
+which wraps a multiplexer with functionality for scheduling callbacks,
+immediately or at a given time in the future.
+
+Whenever a public API takes a callback, subsequent positional
+arguments will be passed to the callback if/when it is called. This
+avoids the proliferation of trivial lambdas implementing closures.
+Keyword arguments for the callback are not supported; this is a
+conscious design decision, leaving the door open for keyword arguments
+to modify the meaning of the API call itself.
+"""
+
+
+import collections
+import concurrent.futures
+import heapq
+import logging
+import socket
+import subprocess
+import time
+import os
+import sys
+
+from . import events
+from . import futures
+from . import tasks
+from .log import asyncio_log
+
+
+__all__ = ['BaseEventLoop', 'Server']
+
+
+# Argument for default thread pool executor creation.
+_MAX_WORKERS = 5
+
+
+class _StopError(BaseException):
+ """Raised to stop the event loop."""
+
+
+def _raise_stop_error(*args):
+ raise _StopError
+
+
+class Server(events.AbstractServer):
+
+ def __init__(self, loop, sockets):
+ self.loop = loop
+ self.sockets = sockets
+ self.active_count = 0
+ self.waiters = []
+
+ def attach(self, transport):
+ assert self.sockets is not None
+ self.active_count += 1
+
+ def detach(self, transport):
+ assert self.active_count > 0
+ self.active_count -= 1
+ if self.active_count == 0 and self.sockets is None:
+ self._wakeup()
+
+ def close(self):
+ sockets = self.sockets
+ if sockets is not None:
+ self.sockets = None
+ for sock in sockets:
+ self.loop._stop_serving(sock)
+ if self.active_count == 0:
+ self._wakeup()
+
+ def _wakeup(self):
+ waiters = self.waiters
+ self.waiters = None
+ for waiter in waiters:
+ if not waiter.done():
+ waiter.set_result(waiter)
+
+ @tasks.coroutine
+ def wait_closed(self):
+ if self.sockets is None or self.waiters is None:
+ return
+ waiter = futures.Future(loop=self.loop)
+ self.waiters.append(waiter)
+ yield from waiter
+
+
+class BaseEventLoop(events.AbstractEventLoop):
+
+ def __init__(self):
+ self._ready = collections.deque()
+ self._scheduled = []
+ self._default_executor = None
+ self._internal_fds = 0
+ self._running = False
+
+ def _make_socket_transport(self, sock, protocol, waiter=None, *,
+ extra=None, server=None):
+ """Create socket transport."""
+ raise NotImplementedError
+
+ def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
+ server_side=False, server_hostname=None,
+ extra=None, server=None):
+ """Create SSL transport."""
+ raise NotImplementedError
+
+ def _make_datagram_transport(self, sock, protocol,
+ address=None, extra=None):
+ """Create datagram transport."""
+ raise NotImplementedError
+
+ def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
+ extra=None):
+ """Create read pipe transport."""
+ raise NotImplementedError
+
+ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
+ extra=None):
+ """Create write pipe transport."""
+ raise NotImplementedError
+
+ @tasks.coroutine
+ def _make_subprocess_transport(self, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ extra=None, **kwargs):
+ """Create subprocess transport."""
+ raise NotImplementedError
+
+ def _read_from_self(self):
+ """XXX"""
+ raise NotImplementedError
+
+ def _write_to_self(self):
+ """XXX"""
+ raise NotImplementedError
+
+ def _process_events(self, event_list):
+ """Process selector events."""
+ raise NotImplementedError
+
+ def run_forever(self):
+ """Run until stop() is called."""
+ if self._running:
+ raise RuntimeError('Event loop is running.')
+ self._running = True
+ try:
+ while True:
+ try:
+ self._run_once()
+ except _StopError:
+ break
+ finally:
+ self._running = False
+
+ def run_until_complete(self, future):
+ """Run until the Future is done.
+
+ If the argument is a coroutine, it is wrapped in a Task.
+
+ XXX TBD: It would be disastrous to call run_until_complete()
+ with the same coroutine twice -- it would wrap it in two
+ different Tasks and that can't be good.
+
+ Return the Future's result, or raise its exception.
+ """
+ future = tasks.async(future, loop=self)
+ future.add_done_callback(_raise_stop_error)
+ self.run_forever()
+ future.remove_done_callback(_raise_stop_error)
+ if not future.done():
+ raise RuntimeError('Event loop stopped before Future completed.')
+
+ return future.result()
+
+ def stop(self):
+ """Stop running the event loop.
+
+ Every callback scheduled before stop() is called will run.
+ Callback scheduled after stop() is called won't. However,
+ those callbacks will run if run() is called again later.
+ """
+ self.call_soon(_raise_stop_error)
+
+ def is_running(self):
+ """Returns running status of event loop."""
+ return self._running
+
+ def time(self):
+ """Return the time according to the event loop's clock."""
+ return time.monotonic()
+
+ def call_later(self, delay, callback, *args):
+ """Arrange for a callback to be called at a given time.
+
+ Return a Handle: an opaque object with a cancel() method that
+ can be used to cancel the call.
+
+ The delay can be an int or float, expressed in seconds. It is
+ always a relative time.
+
+ Each callback will be called exactly once. If two callbacks
+ are scheduled for exactly the same time, it undefined which
+ will be called first.
+
+ Any positional arguments after the callback will be passed to
+ the callback when it is called.
+ """
+ return self.call_at(self.time() + delay, callback, *args)
+
+ def call_at(self, when, callback, *args):
+ """Like call_later(), but uses an absolute time."""
+ timer = events.TimerHandle(when, callback, args)
+ heapq.heappush(self._scheduled, timer)
+ return timer
+
+ def call_soon(self, callback, *args):
+ """Arrange for a callback to be called as soon as possible.
+
+ This operates as a FIFO queue, callbacks are called in the
+ order in which they are registered. Each callback will be
+ called exactly once.
+
+ Any positional arguments after the callback will be passed to
+ the callback when it is called.
+ """
+ handle = events.make_handle(callback, args)
+ self._ready.append(handle)
+ return handle
+
+ def call_soon_threadsafe(self, callback, *args):
+ """XXX"""
+ handle = self.call_soon(callback, *args)
+ self._write_to_self()
+ return handle
+
+ def run_in_executor(self, executor, callback, *args):
+ if isinstance(callback, events.Handle):
+ assert not args
+ assert not isinstance(callback, events.TimerHandle)
+ if callback._cancelled:
+ f = futures.Future(loop=self)
+ f.set_result(None)
+ return f
+ callback, args = callback._callback, callback._args
+ if executor is None:
+ executor = self._default_executor
+ if executor is None:
+ executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
+ self._default_executor = executor
+ return futures.wrap_future(executor.submit(callback, *args), loop=self)
+
+ def set_default_executor(self, executor):
+ self._default_executor = executor
+
+ def getaddrinfo(self, host, port, *,
+ family=0, type=0, proto=0, flags=0):
+ return self.run_in_executor(None, socket.getaddrinfo,
+ host, port, family, type, proto, flags)
+
+ def getnameinfo(self, sockaddr, flags=0):
+ return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
+
+ @tasks.coroutine
+ def create_connection(self, protocol_factory, host=None, port=None, *,
+ ssl=None, family=0, proto=0, flags=0, sock=None,
+ local_addr=None):
+ """XXX"""
+ if host is not None or port is not None:
+ if sock is not None:
+ raise ValueError(
+ 'host/port and sock can not be specified at the same time')
+
+ f1 = self.getaddrinfo(
+ host, port, family=family,
+ type=socket.SOCK_STREAM, proto=proto, flags=flags)
+ fs = [f1]
+ if local_addr is not None:
+ f2 = self.getaddrinfo(
+ *local_addr, family=family,
+ type=socket.SOCK_STREAM, proto=proto, flags=flags)
+ fs.append(f2)
+ else:
+ f2 = None
+
+ yield from tasks.wait(fs, loop=self)
+
+ infos = f1.result()
+ if not infos:
+ raise OSError('getaddrinfo() returned empty list')
+ if f2 is not None:
+ laddr_infos = f2.result()
+ if not laddr_infos:
+ raise OSError('getaddrinfo() returned empty list')
+
+ exceptions = []
+ for family, type, proto, cname, address in infos:
+ try:
+ sock = socket.socket(family=family, type=type, proto=proto)
+ sock.setblocking(False)
+ if f2 is not None:
+ for _, _, _, _, laddr in laddr_infos:
+ try:
+ sock.bind(laddr)
+ break
+ except OSError as exc:
+ exc = OSError(
+ exc.errno, 'error while '
+ 'attempting to bind on address '
+ '{!r}: {}'.format(
+ laddr, exc.strerror.lower()))
+ exceptions.append(exc)
+ else:
+ sock.close()
+ sock = None
+ continue
+ yield from self.sock_connect(sock, address)
+ except OSError as exc:
+ if sock is not None:
+ sock.close()
+ exceptions.append(exc)
+ else:
+ break
+ else:
+ if len(exceptions) == 1:
+ raise exceptions[0]
+ else:
+ # If they all have the same str(), raise one.
+ model = str(exceptions[0])
+ if all(str(exc) == model for exc in exceptions):
+ raise exceptions[0]
+ # Raise a combined exception so the user can see all
+ # the various error messages.
+ raise OSError('Multiple exceptions: {}'.format(
+ ', '.join(str(exc) for exc in exceptions)))
+
+ elif sock is None:
+ raise ValueError(
+ 'host and port was not specified and no sock specified')
+
+ sock.setblocking(False)
+
+ protocol = protocol_factory()
+ waiter = futures.Future(loop=self)
+ if ssl:
+ sslcontext = None if isinstance(ssl, bool) else ssl
+ transport = self._make_ssl_transport(
+ sock, protocol, sslcontext, waiter,
+ server_side=False, server_hostname=host)
+ else:
+ transport = self._make_socket_transport(sock, protocol, waiter)
+
+ yield from waiter
+ return transport, protocol
+
+ @tasks.coroutine
+ def create_datagram_endpoint(self, protocol_factory,
+ local_addr=None, remote_addr=None, *,
+ family=0, proto=0, flags=0):
+ """Create datagram connection."""
+ if not (local_addr or remote_addr):
+ if family == 0:
+ raise ValueError('unexpected address family')
+ addr_pairs_info = (((family, proto), (None, None)),)
+ else:
+ # join addresss by (family, protocol)
+ addr_infos = collections.OrderedDict()
+ for idx, addr in ((0, local_addr), (1, remote_addr)):
+ if addr is not None:
+ assert isinstance(addr, tuple) and len(addr) == 2, (
+ '2-tuple is expected')
+
+ infos = yield from self.getaddrinfo(
+ *addr, family=family, type=socket.SOCK_DGRAM,
+ proto=proto, flags=flags)
+ if not infos:
+ raise OSError('getaddrinfo() returned empty list')
+
+ for fam, _, pro, _, address in infos:
+ key = (fam, pro)
+ if key not in addr_infos:
+ addr_infos[key] = [None, None]
+ addr_infos[key][idx] = address
+
+ # each addr has to have info for each (family, proto) pair
+ addr_pairs_info = [
+ (key, addr_pair) for key, addr_pair in addr_infos.items()
+ if not ((local_addr and addr_pair[0] is None) or
+ (remote_addr and addr_pair[1] is None))]
+
+ if not addr_pairs_info:
+ raise ValueError('can not get address information')
+
+ exceptions = []
+
+ for ((family, proto),
+ (local_address, remote_address)) in addr_pairs_info:
+ sock = None
+ r_addr = None
+ try:
+ sock = socket.socket(
+ family=family, type=socket.SOCK_DGRAM, proto=proto)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.setblocking(False)
+
+ if local_addr:
+ sock.bind(local_address)
+ if remote_addr:
+ yield from self.sock_connect(sock, remote_address)
+ r_addr = remote_address
+ except OSError as exc:
+ if sock is not None:
+ sock.close()
+ exceptions.append(exc)
+ else:
+ break
+ else:
+ raise exceptions[0]
+
+ protocol = protocol_factory()
+ transport = self._make_datagram_transport(sock, protocol, r_addr)
+ return transport, protocol
+
+ @tasks.coroutine
+ def create_server(self, protocol_factory, host=None, port=None,
+ *,
+ family=socket.AF_UNSPEC,
+ flags=socket.AI_PASSIVE,
+ sock=None,
+ backlog=100,
+ ssl=None,
+ reuse_address=None):
+ """XXX"""
+ if host is not None or port is not None:
+ if sock is not None:
+ raise ValueError(
+ 'host/port and sock can not be specified at the same time')
+
+ AF_INET6 = getattr(socket, 'AF_INET6', 0)
+ if reuse_address is None:
+ reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
+ sockets = []
+ if host == '':
+ host = None
+
+ infos = yield from self.getaddrinfo(
+ host, port, family=family,
+ type=socket.SOCK_STREAM, proto=0, flags=flags)
+ if not infos:
+ raise OSError('getaddrinfo() returned empty list')
+
+ completed = False
+ try:
+ for res in infos:
+ af, socktype, proto, canonname, sa = res
+ sock = socket.socket(af, socktype, proto)
+ sockets.append(sock)
+ if reuse_address:
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
+ True)
+ # Disable IPv4/IPv6 dual stack support (enabled by
+ # default on Linux) which makes a single socket
+ # listen on both address families.
+ if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
+ sock.setsockopt(socket.IPPROTO_IPV6,
+ socket.IPV6_V6ONLY,
+ True)
+ try:
+ sock.bind(sa)
+ except OSError as err:
+ raise OSError(err.errno, 'error while attempting '
+ 'to bind on address %r: %s'
+ % (sa, err.strerror.lower()))
+ completed = True
+ finally:
+ if not completed:
+ for sock in sockets:
+ sock.close()
+ else:
+ if sock is None:
+ raise ValueError(
+ 'host and port was not specified and no sock specified')
+ sockets = [sock]
+
+ server = Server(self, sockets)
+ for sock in sockets:
+ sock.listen(backlog)
+ sock.setblocking(False)
+ self._start_serving(protocol_factory, sock, ssl, server)
+ return server
+
+ @tasks.coroutine
+ def connect_read_pipe(self, protocol_factory, pipe):
+ protocol = protocol_factory()
+ waiter = futures.Future(loop=self)
+ transport = self._make_read_pipe_transport(pipe, protocol, waiter)
+ yield from waiter
+ return transport, protocol
+
+ @tasks.coroutine
+ def connect_write_pipe(self, protocol_factory, pipe):
+ protocol = protocol_factory()
+ waiter = futures.Future(loop=self)
+ transport = self._make_write_pipe_transport(pipe, protocol, waiter)
+ yield from waiter
+ return transport, protocol
+
+ @tasks.coroutine
+ def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=False, shell=True, bufsize=0,
+ **kwargs):
+ assert not universal_newlines, "universal_newlines must be False"
+ assert shell, "shell must be True"
+ assert isinstance(cmd, str), cmd
+ protocol = protocol_factory()
+ transport = yield from self._make_subprocess_transport(
+ protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
+ return transport, protocol
+
+ @tasks.coroutine
+ def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=False, shell=False, bufsize=0,
+ **kwargs):
+ assert not universal_newlines, "universal_newlines must be False"
+ assert not shell, "shell must be False"
+ protocol = protocol_factory()
+ transport = yield from self._make_subprocess_transport(
+ protocol, args, False, stdin, stdout, stderr, bufsize, **kwargs)
+ return transport, protocol
+
+ def _add_callback(self, handle):
+ """Add a Handle to ready or scheduled."""
+ assert isinstance(handle, events.Handle), 'A Handle is required here'
+ if handle._cancelled:
+ return
+ if isinstance(handle, events.TimerHandle):
+ heapq.heappush(self._scheduled, handle)
+ else:
+ self._ready.append(handle)
+
+ def _add_callback_signalsafe(self, handle):
+ """Like _add_callback() but called from a signal handler."""
+ self._add_callback(handle)
+ self._write_to_self()
+
+ def _run_once(self):
+ """Run one full iteration of the event loop.
+
+ This calls all currently ready callbacks, polls for I/O,
+ schedules the resulting callbacks, and finally schedules
+ 'call_later' callbacks.
+ """
+ # Remove delayed calls that were cancelled from head of queue.
+ while self._scheduled and self._scheduled[0]._cancelled:
+ heapq.heappop(self._scheduled)
+
+ timeout = None
+ if self._ready:
+ timeout = 0
+ elif self._scheduled:
+ # Compute the desired timeout.
+ when = self._scheduled[0]._when
+ deadline = max(0, when - self.time())
+ if timeout is None:
+ timeout = deadline
+ else:
+ timeout = min(timeout, deadline)
+
+ # TODO: Instrumentation only in debug mode?
+ t0 = self.time()
+ event_list = self._selector.select(timeout)
+ t1 = self.time()
+ argstr = '' if timeout is None else '{:.3f}'.format(timeout)
+ if t1-t0 >= 1:
+ level = logging.INFO
+ else:
+ level = logging.DEBUG
+ asyncio_log.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)
+ self._process_events(event_list)
+
+ # Handle 'later' callbacks that are ready.
+ now = self.time()
+ while self._scheduled:
+ handle = self._scheduled[0]
+ if handle._when > now:
+ break
+ handle = heapq.heappop(self._scheduled)
+ self._ready.append(handle)
+
+ # This is the only place where callbacks are actually *called*.
+ # All other places just add them to ready.
+ # Note: We run all currently scheduled callbacks, but not any
+ # callbacks scheduled by callbacks run this time around --
+ # they will be run the next time (after another I/O poll).
+ # Use an idiom that is threadsafe without using locks.
+ ntodo = len(self._ready)
+ for i in range(ntodo):
+ handle = self._ready.popleft()
+ if not handle._cancelled:
+ handle._run()
+ handle = None # Needed to break cycles when an exception occurs.
diff --git a/Lib/asyncio/constants.py b/Lib/asyncio/constants.py
new file mode 100644
index 0000000..79c3b93
--- /dev/null
+++ b/Lib/asyncio/constants.py
@@ -0,0 +1,4 @@
+"""Constants."""
+
+
+LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
new file mode 100644
index 0000000..9724615
--- /dev/null
+++ b/Lib/asyncio/events.py
@@ -0,0 +1,395 @@
+"""Event loop and event loop policy."""
+
+__all__ = ['AbstractEventLoopPolicy', 'DefaultEventLoopPolicy',
+ 'AbstractEventLoop', 'AbstractServer',
+ 'Handle', 'TimerHandle',
+ 'get_event_loop_policy', 'set_event_loop_policy',
+ 'get_event_loop', 'set_event_loop', 'new_event_loop',
+ ]
+
+import subprocess
+import sys
+import threading
+import socket
+
+from .log import asyncio_log
+
+
+class Handle:
+ """Object returned by callback registration methods."""
+
+ def __init__(self, callback, args):
+ self._callback = callback
+ self._args = args
+ self._cancelled = False
+
+ def __repr__(self):
+ res = 'Handle({}, {})'.format(self._callback, self._args)
+ if self._cancelled:
+ res += '<cancelled>'
+ return res
+
+ def cancel(self):
+ self._cancelled = True
+
+ def _run(self):
+ try:
+ self._callback(*self._args)
+ except Exception:
+ asyncio_log.exception('Exception in callback %s %r',
+ self._callback, self._args)
+ self = None # Needed to break cycles when an exception occurs.
+
+
+def make_handle(callback, args):
+ # TODO: Inline this?
+ assert not isinstance(callback, Handle), 'A Handle is not a callback'
+ return Handle(callback, args)
+
+
+class TimerHandle(Handle):
+ """Object returned by timed callback registration methods."""
+
+ def __init__(self, when, callback, args):
+ assert when is not None
+ super().__init__(callback, args)
+
+ self._when = when
+
+ def __repr__(self):
+ res = 'TimerHandle({}, {}, {})'.format(self._when,
+ self._callback,
+ self._args)
+ if self._cancelled:
+ res += '<cancelled>'
+
+ return res
+
+ def __hash__(self):
+ return hash(self._when)
+
+ def __lt__(self, other):
+ return self._when < other._when
+
+ def __le__(self, other):
+ if self._when < other._when:
+ return True
+ return self.__eq__(other)
+
+ def __gt__(self, other):
+ return self._when > other._when
+
+ def __ge__(self, other):
+ if self._when > other._when:
+ return True
+ return self.__eq__(other)
+
+ def __eq__(self, other):
+ if isinstance(other, TimerHandle):
+ return (self._when == other._when and
+ self._callback == other._callback and
+ self._args == other._args and
+ self._cancelled == other._cancelled)
+ return NotImplemented
+
+ def __ne__(self, other):
+ equal = self.__eq__(other)
+ return NotImplemented if equal is NotImplemented else not equal
+
+
+class AbstractServer:
+ """Abstract server returned by create_service()."""
+
+ def close(self):
+ """Stop serving. This leaves existing connections open."""
+ return NotImplemented
+
+ def wait_closed(self):
+ """Coroutine to wait until service is closed."""
+ return NotImplemented
+
+
+class AbstractEventLoop:
+ """Abstract event loop."""
+
+ # Running and stopping the event loop.
+
+ def run_forever(self):
+ """Run the event loop until stop() is called."""
+ raise NotImplementedError
+
+ def run_until_complete(self, future):
+ """Run the event loop until a Future is done.
+
+ Return the Future's result, or raise its exception.
+ """
+ raise NotImplementedError
+
+ def stop(self):
+ """Stop the event loop as soon as reasonable.
+
+ Exactly how soon that is may depend on the implementation, but
+ no more I/O callbacks should be scheduled.
+ """
+ raise NotImplementedError
+
+ def is_running(self):
+ """Return whether the event loop is currently running."""
+ raise NotImplementedError
+
+ # Methods scheduling callbacks. All these return Handles.
+
+ def call_soon(self, callback, *args):
+ return self.call_later(0, callback, *args)
+
+ def call_later(self, delay, callback, *args):
+ raise NotImplementedError
+
+ def call_at(self, when, callback, *args):
+ raise NotImplementedError
+
+ def time(self):
+ raise NotImplementedError
+
+ # Methods for interacting with threads.
+
+ def call_soon_threadsafe(self, callback, *args):
+ raise NotImplementedError
+
+ def run_in_executor(self, executor, callback, *args):
+ raise NotImplementedError
+
+ def set_default_executor(self, executor):
+ raise NotImplementedError
+
+ # Network I/O methods returning Futures.
+
+ def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
+ raise NotImplementedError
+
+ def getnameinfo(self, sockaddr, flags=0):
+ raise NotImplementedError
+
+ def create_connection(self, protocol_factory, host=None, port=None, *,
+ ssl=None, family=0, proto=0, flags=0, sock=None,
+ local_addr=None):
+ raise NotImplementedError
+
+ def create_server(self, protocol_factory, host=None, port=None, *,
+ family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
+ sock=None, backlog=100, ssl=None, reuse_address=None):
+ """A coroutine which creates a TCP server bound to host and port.
+
+ The return value is a Server object which can be used to stop
+ the service.
+
+ If host is an empty string or None all interfaces are assumed
+ and a list of multiple sockets will be returned (most likely
+ one for IPv4 and another one for IPv6).
+
+ family can be set to either AF_INET or AF_INET6 to force the
+ socket to use IPv4 or IPv6. If not set it will be determined
+ from host (defaults to AF_UNSPEC).
+
+ flags is a bitmask for getaddrinfo().
+
+ sock can optionally be specified in order to use a preexisting
+ socket object.
+
+ backlog is the maximum number of queued connections passed to
+ listen() (defaults to 100).
+
+ ssl can be set to an SSLContext to enable SSL over the
+ accepted connections.
+
+ reuse_address tells the kernel to reuse a local socket in
+ TIME_WAIT state, without waiting for its natural timeout to
+ expire. If not specified will automatically be set to True on
+ UNIX.
+ """
+ raise NotImplementedError
+
+ def create_datagram_endpoint(self, protocol_factory,
+ local_addr=None, remote_addr=None, *,
+ family=0, proto=0, flags=0):
+ raise NotImplementedError
+
+ def connect_read_pipe(self, protocol_factory, pipe):
+ """Register read pipe in eventloop.
+
+ protocol_factory should instantiate object with Protocol interface.
+ pipe is file-like object already switched to nonblocking.
+ Return pair (transport, protocol), where transport support
+ ReadTransport ABC"""
+ # The reason to accept file-like object instead of just file descriptor
+ # is: we need to own pipe and close it at transport finishing
+ # Can got complicated errors if pass f.fileno(),
+ # close fd in pipe transport then close f and vise versa.
+ raise NotImplementedError
+
+ def connect_write_pipe(self, protocol_factory, pipe):
+ """Register write pipe in eventloop.
+
+ protocol_factory should instantiate object with BaseProtocol interface.
+ Pipe is file-like object already switched to nonblocking.
+ Return pair (transport, protocol), where transport support
+ WriteTransport ABC"""
+ # The reason to accept file-like object instead of just file descriptor
+ # is: we need to own pipe and close it at transport finishing
+ # Can got complicated errors if pass f.fileno(),
+ # close fd in pipe transport then close f and vise versa.
+ raise NotImplementedError
+
+ def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ **kwargs):
+ raise NotImplementedError
+
+ def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ **kwargs):
+ raise NotImplementedError
+
+ # Ready-based callback registration methods.
+ # The add_*() methods return None.
+ # The remove_*() methods return True if something was removed,
+ # False if there was nothing to delete.
+
+ def add_reader(self, fd, callback, *args):
+ raise NotImplementedError
+
+ def remove_reader(self, fd):
+ raise NotImplementedError
+
+ def add_writer(self, fd, callback, *args):
+ raise NotImplementedError
+
+ def remove_writer(self, fd):
+ raise NotImplementedError
+
+ # Completion based I/O methods returning Futures.
+
+ def sock_recv(self, sock, nbytes):
+ raise NotImplementedError
+
+ def sock_sendall(self, sock, data):
+ raise NotImplementedError
+
+ def sock_connect(self, sock, address):
+ raise NotImplementedError
+
+ def sock_accept(self, sock):
+ raise NotImplementedError
+
+ # Signal handling.
+
+ def add_signal_handler(self, sig, callback, *args):
+ raise NotImplementedError
+
+ def remove_signal_handler(self, sig):
+ raise NotImplementedError
+
+
+class AbstractEventLoopPolicy:
+ """Abstract policy for accessing the event loop."""
+
+ def get_event_loop(self):
+ """XXX"""
+ raise NotImplementedError
+
+ def set_event_loop(self, loop):
+ """XXX"""
+ raise NotImplementedError
+
+ def new_event_loop(self):
+ """XXX"""
+ raise NotImplementedError
+
+
+class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy):
+ """Default policy implementation for accessing the event loop.
+
+ In this policy, each thread has its own event loop. However, we
+ only automatically create an event loop by default for the main
+ thread; other threads by default have no event loop.
+
+ Other policies may have different rules (e.g. a single global
+ event loop, or automatically creating an event loop per thread, or
+ using some other notion of context to which an event loop is
+ associated).
+ """
+
+ _loop = None
+ _set_called = False
+
+ def get_event_loop(self):
+ """Get the event loop.
+
+ This may be None or an instance of EventLoop.
+ """
+ if (self._loop is None and
+ not self._set_called and
+ isinstance(threading.current_thread(), threading._MainThread)):
+ self._loop = self.new_event_loop()
+ assert self._loop is not None, \
+ ('There is no current event loop in thread %r.' %
+ threading.current_thread().name)
+ return self._loop
+
+ def set_event_loop(self, loop):
+ """Set the event loop."""
+ # TODO: The isinstance() test violates the PEP.
+ self._set_called = True
+ assert loop is None or isinstance(loop, AbstractEventLoop)
+ self._loop = loop
+
+ def new_event_loop(self):
+ """Create a new event loop.
+
+ You must call set_event_loop() to make this the current event
+ loop.
+ """
+ if sys.platform == 'win32': # pragma: no cover
+ from . import windows_events
+ return windows_events.SelectorEventLoop()
+ else: # pragma: no cover
+ from . import unix_events
+ return unix_events.SelectorEventLoop()
+
+
+# Event loop policy. The policy itself is always global, even if the
+# policy's rules say that there is an event loop per thread (or other
+# notion of context). The default policy is installed by the first
+# call to get_event_loop_policy().
+_event_loop_policy = None
+
+
+def get_event_loop_policy():
+ """XXX"""
+ global _event_loop_policy
+ if _event_loop_policy is None:
+ _event_loop_policy = DefaultEventLoopPolicy()
+ return _event_loop_policy
+
+
+def set_event_loop_policy(policy):
+ """XXX"""
+ global _event_loop_policy
+ # TODO: The isinstance() test violates the PEP.
+ assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
+ _event_loop_policy = policy
+
+
+def get_event_loop():
+ """XXX"""
+ return get_event_loop_policy().get_event_loop()
+
+
+def set_event_loop(loop):
+ """XXX"""
+ get_event_loop_policy().set_event_loop(loop)
+
+
+def new_event_loop():
+ """XXX"""
+ return get_event_loop_policy().new_event_loop()
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
new file mode 100644
index 0000000..99a043b
--- /dev/null
+++ b/Lib/asyncio/futures.py
@@ -0,0 +1,338 @@
+"""A Future class similar to the one in PEP 3148."""
+
+__all__ = ['CancelledError', 'TimeoutError',
+ 'InvalidStateError',
+ 'Future', 'wrap_future',
+ ]
+
+import concurrent.futures._base
+import logging
+import traceback
+
+from . import events
+from .log import asyncio_log
+
+# States for Future.
+_PENDING = 'PENDING'
+_CANCELLED = 'CANCELLED'
+_FINISHED = 'FINISHED'
+
+# TODO: Do we really want to depend on concurrent.futures internals?
+Error = concurrent.futures._base.Error
+CancelledError = concurrent.futures.CancelledError
+TimeoutError = concurrent.futures.TimeoutError
+
+STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
+
+
+class InvalidStateError(Error):
+ """The operation is not allowed in this state."""
+ # TODO: Show the future, its state, the method, and the required state.
+
+
+class _TracebackLogger:
+ """Helper to log a traceback upon destruction if not cleared.
+
+ This solves a nasty problem with Futures and Tasks that have an
+ exception set: if nobody asks for the exception, the exception is
+ never logged. This violates the Zen of Python: 'Errors should
+ never pass silently. Unless explicitly silenced.'
+
+ However, we don't want to log the exception as soon as
+ set_exception() is called: if the calling code is written
+ properly, it will get the exception and handle it properly. But
+ we *do* want to log it if result() or exception() was never called
+ -- otherwise developers waste a lot of time wondering why their
+ buggy code fails silently.
+
+ An earlier attempt added a __del__() method to the Future class
+ itself, but this backfired because the presence of __del__()
+ prevents garbage collection from breaking cycles. A way out of
+ this catch-22 is to avoid having a __del__() method on the Future
+ class itself, but instead to have a reference to a helper object
+ with a __del__() method that logs the traceback, where we ensure
+ that the helper object doesn't participate in cycles, and only the
+ Future has a reference to it.
+
+ The helper object is added when set_exception() is called. When
+ the Future is collected, and the helper is present, the helper
+ object is also collected, and its __del__() method will log the
+ traceback. When the Future's result() or exception() method is
+ called (and a helper object is present), it removes the the helper
+ object, after calling its clear() method to prevent it from
+ logging.
+
+ One downside is that we do a fair amount of work to extract the
+ traceback from the exception, even when it is never logged. It
+ would seem cheaper to just store the exception object, but that
+ references the traceback, which references stack frames, which may
+ reference the Future, which references the _TracebackLogger, and
+ then the _TracebackLogger would be included in a cycle, which is
+ what we're trying to avoid! As an optimization, we don't
+ immediately format the exception; we only do the work when
+ activate() is called, which call is delayed until after all the
+ Future's callbacks have run. Since usually a Future has at least
+ one callback (typically set by 'yield from') and usually that
+ callback extracts the callback, thereby removing the need to
+ format the exception.
+
+ PS. I don't claim credit for this solution. I first heard of it
+ in a discussion about closing files when they are collected.
+ """
+
+ __slots__ = ['exc', 'tb']
+
+ def __init__(self, exc):
+ self.exc = exc
+ self.tb = None
+
+ def activate(self):
+ exc = self.exc
+ if exc is not None:
+ self.exc = None
+ self.tb = traceback.format_exception(exc.__class__, exc,
+ exc.__traceback__)
+
+ def clear(self):
+ self.exc = None
+ self.tb = None
+
+ def __del__(self):
+ if self.tb:
+ asyncio_log.error('Future/Task exception was never retrieved:\n%s',
+ ''.join(self.tb))
+
+
+class Future:
+ """This class is *almost* compatible with concurrent.futures.Future.
+
+ Differences:
+
+ - result() and exception() do not take a timeout argument and
+ raise an exception when the future isn't done yet.
+
+ - Callbacks registered with add_done_callback() are always called
+ via the event loop's call_soon_threadsafe().
+
+ - This class is not compatible with the wait() and as_completed()
+ methods in the concurrent.futures package.
+
+ (In Python 3.4 or later we may be able to unify the implementations.)
+ """
+
+ # Class variables serving as defaults for instance variables.
+ _state = _PENDING
+ _result = None
+ _exception = None
+ _loop = None
+
+ _blocking = False # proper use of future (yield vs yield from)
+
+ _tb_logger = None
+
+ def __init__(self, *, loop=None):
+ """Initialize the future.
+
+ The optional event_loop argument allows to explicitly set the event
+ loop object used by the future. If it's not provided, the future uses
+ the default event loop.
+ """
+ if loop is None:
+ self._loop = events.get_event_loop()
+ else:
+ self._loop = loop
+ self._callbacks = []
+
+ def __repr__(self):
+ res = self.__class__.__name__
+ if self._state == _FINISHED:
+ if self._exception is not None:
+ res += '<exception={!r}>'.format(self._exception)
+ else:
+ res += '<result={!r}>'.format(self._result)
+ elif self._callbacks:
+ size = len(self._callbacks)
+ if size > 2:
+ res += '<{}, [{}, <{} more>, {}]>'.format(
+ self._state, self._callbacks[0],
+ size-2, self._callbacks[-1])
+ else:
+ res += '<{}, {}>'.format(self._state, self._callbacks)
+ else:
+ res += '<{}>'.format(self._state)
+ return res
+
+ def cancel(self):
+ """Cancel the future and schedule callbacks.
+
+ If the future is already done or cancelled, return False. Otherwise,
+ change the future's state to cancelled, schedule the callbacks and
+ return True.
+ """
+ if self._state != _PENDING:
+ return False
+ self._state = _CANCELLED
+ self._schedule_callbacks()
+ return True
+
+ def _schedule_callbacks(self):
+ """Internal: Ask the event loop to call all callbacks.
+
+ The callbacks are scheduled to be called as soon as possible. Also
+ clears the callback list.
+ """
+ callbacks = self._callbacks[:]
+ if not callbacks:
+ return
+
+ self._callbacks[:] = []
+ for callback in callbacks:
+ self._loop.call_soon(callback, self)
+
+ def cancelled(self):
+ """Return True if the future was cancelled."""
+ return self._state == _CANCELLED
+
+ # Don't implement running(); see http://bugs.python.org/issue18699
+
+ def done(self):
+ """Return True if the future is done.
+
+ Done means either that a result / exception are available, or that the
+ future was cancelled.
+ """
+ return self._state != _PENDING
+
+ def result(self):
+ """Return the result this future represents.
+
+ If the future has been cancelled, raises CancelledError. If the
+ future's result isn't yet available, raises InvalidStateError. If
+ the future is done and has an exception set, this exception is raised.
+ """
+ if self._state == _CANCELLED:
+ raise CancelledError
+ if self._state != _FINISHED:
+ raise InvalidStateError('Result is not ready.')
+ if self._tb_logger is not None:
+ self._tb_logger.clear()
+ self._tb_logger = None
+ if self._exception is not None:
+ raise self._exception
+ return self._result
+
+ def exception(self):
+ """Return the exception that was set on this future.
+
+ The exception (or None if no exception was set) is returned only if
+ the future is done. If the future has been cancelled, raises
+ CancelledError. If the future isn't done yet, raises
+ InvalidStateError.
+ """
+ if self._state == _CANCELLED:
+ raise CancelledError
+ if self._state != _FINISHED:
+ raise InvalidStateError('Exception is not set.')
+ if self._tb_logger is not None:
+ self._tb_logger.clear()
+ self._tb_logger = None
+ return self._exception
+
+ def add_done_callback(self, fn):
+ """Add a callback to be run when the future becomes done.
+
+ The callback is called with a single argument - the future object. If
+ the future is already done when this is called, the callback is
+ scheduled with call_soon.
+ """
+ if self._state != _PENDING:
+ self._loop.call_soon(fn, self)
+ else:
+ self._callbacks.append(fn)
+
+ # New method not in PEP 3148.
+
+ def remove_done_callback(self, fn):
+ """Remove all instances of a callback from the "call when done" list.
+
+ Returns the number of callbacks removed.
+ """
+ filtered_callbacks = [f for f in self._callbacks if f != fn]
+ removed_count = len(self._callbacks) - len(filtered_callbacks)
+ if removed_count:
+ self._callbacks[:] = filtered_callbacks
+ return removed_count
+
+ # So-called internal methods (note: no set_running_or_notify_cancel()).
+
+ def set_result(self, result):
+ """Mark the future done and set its result.
+
+ If the future is already done when this method is called, raises
+ InvalidStateError.
+ """
+ if self._state != _PENDING:
+ raise InvalidStateError('{}: {!r}'.format(self._state, self))
+ self._result = result
+ self._state = _FINISHED
+ self._schedule_callbacks()
+
+ def set_exception(self, exception):
+ """Mark the future done and set an exception.
+
+ If the future is already done when this method is called, raises
+ InvalidStateError.
+ """
+ if self._state != _PENDING:
+ raise InvalidStateError('{}: {!r}'.format(self._state, self))
+ self._exception = exception
+ self._tb_logger = _TracebackLogger(exception)
+ self._state = _FINISHED
+ self._schedule_callbacks()
+ # Arrange for the logger to be activated after all callbacks
+ # have had a chance to call result() or exception().
+ self._loop.call_soon(self._tb_logger.activate)
+
+ # Truly internal methods.
+
+ def _copy_state(self, other):
+ """Internal helper to copy state from another Future.
+
+ The other Future may be a concurrent.futures.Future.
+ """
+ assert other.done()
+ assert not self.done()
+ if other.cancelled():
+ self.cancel()
+ else:
+ exception = other.exception()
+ if exception is not None:
+ self.set_exception(exception)
+ else:
+ result = other.result()
+ self.set_result(result)
+
+ def __iter__(self):
+ if not self.done():
+ self._blocking = True
+ yield self # This tells Task to wait for completion.
+ assert self.done(), "yield from wasn't used with future"
+ return self.result() # May raise too.
+
+
+def wrap_future(fut, *, loop=None):
+ """Wrap concurrent.futures.Future object."""
+ if isinstance(fut, Future):
+ return fut
+
+ assert isinstance(fut, concurrent.futures.Future), \
+ 'concurrent.futures.Future is expected, got {!r}'.format(fut)
+
+ if loop is None:
+ loop = events.get_event_loop()
+
+ new_future = Future(loop=loop)
+ fut.add_done_callback(
+ lambda future: loop.call_soon_threadsafe(
+ new_future._copy_state, fut))
+ return new_future
diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py
new file mode 100644
index 0000000..06edbbc
--- /dev/null
+++ b/Lib/asyncio/locks.py
@@ -0,0 +1,401 @@
+"""Synchronization primitives."""
+
+__all__ = ['Lock', 'Event', 'Condition', 'Semaphore']
+
+import collections
+
+from . import events
+from . import futures
+from . import tasks
+
+
+class Lock:
+ """Primitive lock objects.
+
+ A primitive lock is a synchronization primitive that is not owned
+ by a particular coroutine when locked. A primitive lock is in one
+ of two states, 'locked' or 'unlocked'.
+
+ It is created in the unlocked state. It has two basic methods,
+ acquire() and release(). When the state is unlocked, acquire()
+ changes the state to locked and returns immediately. When the
+ state is locked, acquire() blocks until a call to release() in
+ another coroutine changes it to unlocked, then the acquire() call
+ resets it to locked and returns. The release() method should only
+ be called in the locked state; it changes the state to unlocked
+ and returns immediately. If an attempt is made to release an
+ unlocked lock, a RuntimeError will be raised.
+
+ When more than one coroutine is blocked in acquire() waiting for
+ the state to turn to unlocked, only one coroutine proceeds when a
+ release() call resets the state to unlocked; first coroutine which
+ is blocked in acquire() is being processed.
+
+ acquire() is a coroutine and should be called with 'yield from'.
+
+ Locks also support the context manager protocol. '(yield from lock)'
+ should be used as context manager expression.
+
+ Usage:
+
+ lock = Lock()
+ ...
+ yield from lock
+ try:
+ ...
+ finally:
+ lock.release()
+
+ Context manager usage:
+
+ lock = Lock()
+ ...
+ with (yield from lock):
+ ...
+
+ Lock objects can be tested for locking state:
+
+ if not lock.locked():
+ yield from lock
+ else:
+ # lock is acquired
+ ...
+
+ """
+
+ def __init__(self, *, loop=None):
+ self._waiters = collections.deque()
+ self._locked = False
+ if loop is not None:
+ self._loop = loop
+ else:
+ self._loop = events.get_event_loop()
+
+ def __repr__(self):
+ res = super().__repr__()
+ extra = 'locked' if self._locked else 'unlocked'
+ if self._waiters:
+ extra = '{},waiters:{}'.format(extra, len(self._waiters))
+ return '<{} [{}]>'.format(res[1:-1], extra)
+
+ def locked(self):
+ """Return true if lock is acquired."""
+ return self._locked
+
+ @tasks.coroutine
+ def acquire(self):
+ """Acquire a lock.
+
+ This method blocks until the lock is unlocked, then sets it to
+ locked and returns True.
+ """
+ if not self._waiters and not self._locked:
+ self._locked = True
+ return True
+
+ fut = futures.Future(loop=self._loop)
+ self._waiters.append(fut)
+ try:
+ yield from fut
+ self._locked = True
+ return True
+ finally:
+ self._waiters.remove(fut)
+
+ def release(self):
+ """Release a lock.
+
+ When the lock is locked, reset it to unlocked, and return.
+ If any other coroutines are blocked waiting for the lock to become
+ unlocked, allow exactly one of them to proceed.
+
+ When invoked on an unlocked lock, a RuntimeError is raised.
+
+ There is no return value.
+ """
+ if self._locked:
+ self._locked = False
+ # Wake up the first waiter who isn't cancelled.
+ for fut in self._waiters:
+ if not fut.done():
+ fut.set_result(True)
+ break
+ else:
+ raise RuntimeError('Lock is not acquired.')
+
+ def __enter__(self):
+ if not self._locked:
+ raise RuntimeError(
+ '"yield from" should be used as context manager expression')
+ return True
+
+ def __exit__(self, *args):
+ self.release()
+
+ def __iter__(self):
+ yield from self.acquire()
+ return self
+
+
+class Event:
+ """An Event implementation, our equivalent to threading.Event.
+
+ Class implementing event objects. An event manages a flag that can be set
+ to true with the set() method and reset to false with the clear() method.
+ The wait() method blocks until the flag is true. The flag is initially
+ false.
+ """
+
+ def __init__(self, *, loop=None):
+ self._waiters = collections.deque()
+ self._value = False
+ if loop is not None:
+ self._loop = loop
+ else:
+ self._loop = events.get_event_loop()
+
+ def __repr__(self):
+ # TODO: add waiters:N if > 0.
+ res = super().__repr__()
+ return '<{} [{}]>'.format(res[1:-1], 'set' if self._value else 'unset')
+
+ def is_set(self):
+ """Return true if and only if the internal flag is true."""
+ return self._value
+
+ def set(self):
+ """Set the internal flag to true. All coroutines waiting for it to
+ become true are awakened. Coroutine that call wait() once the flag is
+ true will not block at all.
+ """
+ if not self._value:
+ self._value = True
+
+ for fut in self._waiters:
+ if not fut.done():
+ fut.set_result(True)
+
+ def clear(self):
+ """Reset the internal flag to false. Subsequently, coroutines calling
+ wait() will block until set() is called to set the internal flag
+ to true again."""
+ self._value = False
+
+ @tasks.coroutine
+ def wait(self):
+ """Block until the internal flag is true.
+
+ If the internal flag is true on entry, return True
+ immediately. Otherwise, block until another coroutine calls
+ set() to set the flag to true, then return True.
+ """
+ if self._value:
+ return True
+
+ fut = futures.Future(loop=self._loop)
+ self._waiters.append(fut)
+ try:
+ yield from fut
+ return True
+ finally:
+ self._waiters.remove(fut)
+
+
+# TODO: Why is this a Lock subclass? threading.Condition *has* a lock.
+class Condition(Lock):
+ """A Condition implementation.
+
+ This class implements condition variable objects. A condition variable
+ allows one or more coroutines to wait until they are notified by another
+ coroutine.
+ """
+
+ def __init__(self, *, loop=None):
+ super().__init__(loop=loop)
+ self._condition_waiters = collections.deque()
+
+ # TODO: Add __repr__() with len(_condition_waiters).
+
+ @tasks.coroutine
+ def wait(self):
+ """Wait until notified.
+
+ If the calling coroutine has not acquired the lock when this
+ method is called, a RuntimeError is raised.
+
+ This method releases the underlying lock, and then blocks
+ until it is awakened by a notify() or notify_all() call for
+ the same condition variable in another coroutine. Once
+ awakened, it re-acquires the lock and returns True.
+ """
+ if not self._locked:
+ raise RuntimeError('cannot wait on un-acquired lock')
+
+ keep_lock = True
+ self.release()
+ try:
+ fut = futures.Future(loop=self._loop)
+ self._condition_waiters.append(fut)
+ try:
+ yield from fut
+ return True
+ finally:
+ self._condition_waiters.remove(fut)
+
+ except GeneratorExit:
+ keep_lock = False # Prevent yield in finally clause.
+ raise
+ finally:
+ if keep_lock:
+ yield from self.acquire()
+
+ @tasks.coroutine
+ def wait_for(self, predicate):
+ """Wait until a predicate becomes true.
+
+ The predicate should be a callable which result will be
+ interpreted as a boolean value. The final predicate value is
+ the return value.
+ """
+ result = predicate()
+ while not result:
+ yield from self.wait()
+ result = predicate()
+ return result
+
+ def notify(self, n=1):
+ """By default, wake up one coroutine waiting on this condition, if any.
+ If the calling coroutine has not acquired the lock when this method
+ is called, a RuntimeError is raised.
+
+ This method wakes up at most n of the coroutines waiting for the
+ condition variable; it is a no-op if no coroutines are waiting.
+
+ Note: an awakened coroutine does not actually return from its
+ wait() call until it can reacquire the lock. Since notify() does
+ not release the lock, its caller should.
+ """
+ if not self._locked:
+ raise RuntimeError('cannot notify on un-acquired lock')
+
+ idx = 0
+ for fut in self._condition_waiters:
+ if idx >= n:
+ break
+
+ if not fut.done():
+ idx += 1
+ fut.set_result(False)
+
+ def notify_all(self):
+ """Wake up all threads waiting on this condition. This method acts
+ like notify(), but wakes up all waiting threads instead of one. If the
+ calling thread has not acquired the lock when this method is called,
+ a RuntimeError is raised.
+ """
+ self.notify(len(self._condition_waiters))
+
+
+class Semaphore:
+ """A Semaphore implementation.
+
+ A semaphore manages an internal counter which is decremented by each
+ acquire() call and incremented by each release() call. The counter
+ can never go below zero; when acquire() finds that it is zero, it blocks,
+ waiting until some other thread calls release().
+
+ Semaphores also support the context manager protocol.
+
+ The first optional argument gives the initial value for the internal
+ counter; it defaults to 1. If the value given is less than 0,
+ ValueError is raised.
+
+ The second optional argument determins can semophore be released more than
+ initial internal counter value; it defaults to False. If the value given
+ is True and number of release() is more than number of successfull
+ acquire() calls ValueError is raised.
+ """
+
+ def __init__(self, value=1, bound=False, *, loop=None):
+ if value < 0:
+ raise ValueError("Semaphore initial value must be > 0")
+ self._value = value
+ self._bound = bound
+ self._bound_value = value
+ self._waiters = collections.deque()
+ self._locked = False
+ if loop is not None:
+ self._loop = loop
+ else:
+ self._loop = events.get_event_loop()
+
+ def __repr__(self):
+ # TODO: add waiters:N if > 0.
+ res = super().__repr__()
+ return '<{} [{}]>'.format(
+ res[1:-1],
+ 'locked' if self._locked else 'unlocked,value:{}'.format(
+ self._value))
+
+ def locked(self):
+ """Returns True if semaphore can not be acquired immediately."""
+ return self._locked
+
+ @tasks.coroutine
+ def acquire(self):
+ """Acquire a semaphore.
+
+ If the internal counter is larger than zero on entry,
+ decrement it by one and return True immediately. If it is
+ zero on entry, block, waiting until some other coroutine has
+ called release() to make it larger than 0, and then return
+ True.
+ """
+ if not self._waiters and self._value > 0:
+ self._value -= 1
+ if self._value == 0:
+ self._locked = True
+ return True
+
+ fut = futures.Future(loop=self._loop)
+ self._waiters.append(fut)
+ try:
+ yield from fut
+ self._value -= 1
+ if self._value == 0:
+ self._locked = True
+ return True
+ finally:
+ self._waiters.remove(fut)
+
+ def release(self):
+ """Release a semaphore, incrementing the internal counter by one.
+ When it was zero on entry and another coroutine is waiting for it to
+ become larger than zero again, wake up that coroutine.
+
+ If Semaphore is create with "bound" paramter equals true, then
+ release() method checks to make sure its current value doesn't exceed
+ its initial value. If it does, ValueError is raised.
+ """
+ if self._bound and self._value >= self._bound_value:
+ raise ValueError('Semaphore released too many times')
+
+ self._value += 1
+ self._locked = False
+
+ for waiter in self._waiters:
+ if not waiter.done():
+ waiter.set_result(True)
+ break
+
+ def __enter__(self):
+ # TODO: This is questionable. How do we know the user actually
+ # wrote "with (yield from sema)" instead of "with sema"?
+ return True
+
+ def __exit__(self, *args):
+ self.release()
+
+ def __iter__(self):
+ yield from self.acquire()
+ return self
diff --git a/Lib/asyncio/log.py b/Lib/asyncio/log.py
new file mode 100644
index 0000000..54dc784
--- /dev/null
+++ b/Lib/asyncio/log.py
@@ -0,0 +1,6 @@
+"""Logging configuration."""
+
+import logging
+
+
+asyncio_log = logging.getLogger("asyncio")
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
new file mode 100644
index 0000000..348de03
--- /dev/null
+++ b/Lib/asyncio/proactor_events.py
@@ -0,0 +1,352 @@
+"""Event loop using a proactor and related classes.
+
+A proactor is a "notify-on-completion" multiplexer. Currently a
+proactor is only implemented on Windows with IOCP.
+"""
+
+import socket
+
+from . import base_events
+from . import constants
+from . import futures
+from . import transports
+from .log import asyncio_log
+
+
+class _ProactorBasePipeTransport(transports.BaseTransport):
+ """Base class for pipe and socket transports."""
+
+ def __init__(self, loop, sock, protocol, waiter=None,
+ extra=None, server=None):
+ super().__init__(extra)
+ self._set_extra(sock)
+ self._loop = loop
+ self._sock = sock
+ self._protocol = protocol
+ self._server = server
+ self._buffer = []
+ self._read_fut = None
+ self._write_fut = None
+ self._conn_lost = 0
+ self._closing = False # Set when close() called.
+ self._eof_written = False
+ if self._server is not None:
+ self._server.attach(self)
+ self._loop.call_soon(self._protocol.connection_made, self)
+ if waiter is not None:
+ self._loop.call_soon(waiter.set_result, None)
+
+ def _set_extra(self, sock):
+ self._extra['pipe'] = sock
+
+ def close(self):
+ if self._closing:
+ return
+ self._closing = True
+ self._conn_lost += 1
+ if not self._buffer and self._write_fut is None:
+ self._loop.call_soon(self._call_connection_lost, None)
+ if self._read_fut is not None:
+ self._read_fut.cancel()
+
+ def _fatal_error(self, exc):
+ asyncio_log.exception('Fatal error for %s', self)
+ self._force_close(exc)
+
+ def _force_close(self, exc):
+ if self._closing:
+ return
+ self._closing = True
+ self._conn_lost += 1
+ if self._write_fut:
+ self._write_fut.cancel()
+ if self._read_fut:
+ self._read_fut.cancel()
+ self._write_fut = self._read_fut = None
+ self._buffer = []
+ self._loop.call_soon(self._call_connection_lost, exc)
+
+ def _call_connection_lost(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ # XXX If there is a pending overlapped read on the other
+ # end then it may fail with ERROR_NETNAME_DELETED if we
+ # just close our end. First calling shutdown() seems to
+ # cure it, but maybe using DisconnectEx() would be better.
+ if hasattr(self._sock, 'shutdown'):
+ self._sock.shutdown(socket.SHUT_RDWR)
+ self._sock.close()
+ server = self._server
+ if server is not None:
+ server.detach(self)
+ self._server = None
+
+
+class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
+ transports.ReadTransport):
+ """Transport for read pipes."""
+
+ def __init__(self, loop, sock, protocol, waiter=None,
+ extra=None, server=None):
+ super().__init__(loop, sock, protocol, waiter, extra, server)
+ self._read_fut = None
+ self._paused = False
+ self._loop.call_soon(self._loop_reading)
+
+ def pause(self):
+ assert not self._closing, 'Cannot pause() when closing'
+ assert not self._paused, 'Already paused'
+ self._paused = True
+
+ def resume(self):
+ assert self._paused, 'Not paused'
+ self._paused = False
+ if self._closing:
+ return
+ self._loop.call_soon(self._loop_reading, self._read_fut)
+
+ def _loop_reading(self, fut=None):
+ if self._paused:
+ return
+ data = None
+
+ try:
+ if fut is not None:
+ assert self._read_fut is fut or (self._read_fut is None and
+ self._closing)
+ self._read_fut = None
+ data = fut.result() # deliver data later in "finally" clause
+
+ if self._closing:
+ # since close() has been called we ignore any read data
+ data = None
+ return
+
+ if data == b'':
+ # we got end-of-file so no need to reschedule a new read
+ return
+
+ # reschedule a new read
+ self._read_fut = self._loop._proactor.recv(self._sock, 4096)
+ except ConnectionAbortedError as exc:
+ if not self._closing:
+ self._fatal_error(exc)
+ except ConnectionResetError as exc:
+ self._force_close(exc)
+ except OSError as exc:
+ self._fatal_error(exc)
+ except futures.CancelledError:
+ if not self._closing:
+ raise
+ else:
+ self._read_fut.add_done_callback(self._loop_reading)
+ finally:
+ if data:
+ self._protocol.data_received(data)
+ elif data is not None:
+ keep_open = self._protocol.eof_received()
+ if not keep_open:
+ self.close()
+
+
+class _ProactorWritePipeTransport(_ProactorBasePipeTransport,
+ transports.WriteTransport):
+ """Transport for write pipes."""
+
+ def write(self, data):
+ assert isinstance(data, bytes), repr(data)
+ if self._eof_written:
+ raise IOError('write_eof() already called')
+
+ if not data:
+ return
+
+ if self._conn_lost:
+ if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
+ asyncio_log.warning('socket.send() raised exception.')
+ self._conn_lost += 1
+ return
+ self._buffer.append(data)
+ if self._write_fut is None:
+ self._loop_writing()
+
+ def _loop_writing(self, f=None):
+ try:
+ assert f is self._write_fut
+ self._write_fut = None
+ if f:
+ f.result()
+ data = b''.join(self._buffer)
+ self._buffer = []
+ if not data:
+ if self._closing:
+ self._loop.call_soon(self._call_connection_lost, None)
+ if self._eof_written:
+ self._sock.shutdown(socket.SHUT_WR)
+ return
+ self._write_fut = self._loop._proactor.send(self._sock, data)
+ self._write_fut.add_done_callback(self._loop_writing)
+ except ConnectionResetError as exc:
+ self._force_close(exc)
+ except OSError as exc:
+ self._fatal_error(exc)
+
+ def can_write_eof(self):
+ return True
+
+ def write_eof(self):
+ self.close()
+
+ def abort(self):
+ self._force_close(None)
+
+
+class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
+ _ProactorWritePipeTransport,
+ transports.Transport):
+ """Transport for duplex pipes."""
+
+ def can_write_eof(self):
+ return False
+
+ def write_eof(self):
+ raise NotImplementedError
+
+
+class _ProactorSocketTransport(_ProactorReadPipeTransport,
+ _ProactorWritePipeTransport,
+ transports.Transport):
+ """Transport for connected sockets."""
+
+ def _set_extra(self, sock):
+ self._extra['socket'] = sock
+ try:
+ self._extra['sockname'] = sock.getsockname()
+ except (socket.error, AttributeError):
+ pass
+ if 'peername' not in self._extra:
+ try:
+ self._extra['peername'] = sock.getpeername()
+ except (socket.error, AttributeError):
+ pass
+
+ def can_write_eof(self):
+ return True
+
+ def write_eof(self):
+ if self._closing or self._eof_written:
+ return
+ self._eof_written = True
+ if self._write_fut is None:
+ self._sock.shutdown(socket.SHUT_WR)
+
+
+class BaseProactorEventLoop(base_events.BaseEventLoop):
+
+ def __init__(self, proactor):
+ super().__init__()
+ asyncio_log.debug('Using proactor: %s', proactor.__class__.__name__)
+ self._proactor = proactor
+ self._selector = proactor # convenient alias
+ proactor.set_loop(self)
+ self._make_self_pipe()
+
+ def _make_socket_transport(self, sock, protocol, waiter=None,
+ extra=None, server=None):
+ return _ProactorSocketTransport(self, sock, protocol, waiter,
+ extra, server)
+
+ def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
+ extra=None):
+ return _ProactorDuplexPipeTransport(self,
+ sock, protocol, waiter, extra)
+
+ def _make_read_pipe_transport(self, sock, protocol, waiter=None,
+ extra=None):
+ return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
+
+ def _make_write_pipe_transport(self, sock, protocol, waiter=None,
+ extra=None):
+ return _ProactorWritePipeTransport(self, sock, protocol, waiter, extra)
+
+ def close(self):
+ if self._proactor is not None:
+ self._close_self_pipe()
+ self._proactor.close()
+ self._proactor = None
+ self._selector = None
+
+ def sock_recv(self, sock, n):
+ return self._proactor.recv(sock, n)
+
+ def sock_sendall(self, sock, data):
+ return self._proactor.send(sock, data)
+
+ def sock_connect(self, sock, address):
+ return self._proactor.connect(sock, address)
+
+ def sock_accept(self, sock):
+ return self._proactor.accept(sock)
+
+ def _socketpair(self):
+ raise NotImplementedError
+
+ def _close_self_pipe(self):
+ self._ssock.close()
+ self._ssock = None
+ self._csock.close()
+ self._csock = None
+ self._internal_fds -= 1
+
+ def _make_self_pipe(self):
+ # A self-socket, really. :-)
+ self._ssock, self._csock = self._socketpair()
+ self._ssock.setblocking(False)
+ self._csock.setblocking(False)
+ self._internal_fds += 1
+ self.call_soon(self._loop_self_reading)
+
+ def _loop_self_reading(self, f=None):
+ try:
+ if f is not None:
+ f.result() # may raise
+ f = self._proactor.recv(self._ssock, 4096)
+ except:
+ self.close()
+ raise
+ else:
+ f.add_done_callback(self._loop_self_reading)
+
+ def _write_to_self(self):
+ self._csock.send(b'x')
+
+ def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
+ assert not ssl, 'IocpEventLoop is incompatible with SSL.'
+
+ def loop(f=None):
+ try:
+ if f is not None:
+ conn, addr = f.result()
+ protocol = protocol_factory()
+ self._make_socket_transport(
+ conn, protocol,
+ extra={'peername': addr}, server=server)
+ f = self._proactor.accept(sock)
+ except OSError:
+ if sock.fileno() != -1:
+ asyncio_log.exception('Accept failed')
+ sock.close()
+ except futures.CancelledError:
+ sock.close()
+ else:
+ f.add_done_callback(loop)
+
+ self.call_soon(loop)
+
+ def _process_events(self, event_list):
+ pass # XXX hard work currently done in poll
+
+ def _stop_serving(self, sock):
+ self._proactor._stop_serving(sock)
+ sock.close()
diff --git a/Lib/asyncio/protocols.py b/Lib/asyncio/protocols.py
new file mode 100644
index 0000000..a94abbe
--- /dev/null
+++ b/Lib/asyncio/protocols.py
@@ -0,0 +1,98 @@
+"""Abstract Protocol class."""
+
+__all__ = ['Protocol', 'DatagramProtocol']
+
+
+class BaseProtocol:
+ """ABC for base protocol class.
+
+ Usually user implements protocols that derived from BaseProtocol
+ like Protocol or ProcessProtocol.
+
+ The only case when BaseProtocol should be implemented directly is
+ write-only transport like write pipe
+ """
+
+ def connection_made(self, transport):
+ """Called when a connection is made.
+
+ The argument is the transport representing the pipe connection.
+ To receive data, wait for data_received() calls.
+ When the connection is closed, connection_lost() is called.
+ """
+
+ def connection_lost(self, exc):
+ """Called when the connection is lost or closed.
+
+ The argument is an exception object or None (the latter
+ meaning a regular EOF is received or the connection was
+ aborted or closed).
+ """
+
+
+class Protocol(BaseProtocol):
+ """ABC representing a protocol.
+
+ The user should implement this interface. They can inherit from
+ this class but don't need to. The implementations here do
+ nothing (they don't raise exceptions).
+
+ When the user wants to requests a transport, they pass a protocol
+ factory to a utility function (e.g., EventLoop.create_connection()).
+
+ When the connection is made successfully, connection_made() is
+ called with a suitable transport object. Then data_received()
+ will be called 0 or more times with data (bytes) received from the
+ transport; finally, connection_lost() will be called exactly once
+ with either an exception object or None as an argument.
+
+ State machine of calls:
+
+ start -> CM [-> DR*] [-> ER?] -> CL -> end
+ """
+
+ def data_received(self, data):
+ """Called when some data is received.
+
+ The argument is a bytes object.
+ """
+
+ def eof_received(self):
+ """Called when the other end calls write_eof() or equivalent.
+
+ If this returns a false value (including None), the transport
+ will close itself. If it returns a true value, closing the
+ transport is up to the protocol.
+ """
+
+
+class DatagramProtocol(BaseProtocol):
+ """ABC representing a datagram protocol."""
+
+ def datagram_received(self, data, addr):
+ """Called when some datagram is received."""
+
+ def connection_refused(self, exc):
+ """Connection is refused."""
+
+
+class SubprocessProtocol(BaseProtocol):
+ """ABC representing a protocol for subprocess calls."""
+
+ def pipe_data_received(self, fd, data):
+ """Called when subprocess write a data into stdout/stderr pipes.
+
+ fd is int file dascriptor.
+ data is bytes object.
+ """
+
+ def pipe_connection_lost(self, fd, exc):
+ """Called when a file descriptor associated with the child process is
+ closed.
+
+ fd is the int file descriptor that was closed.
+ """
+
+ def process_exited(self):
+ """Called when subprocess has exited.
+ """
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
new file mode 100644
index 0000000..536de1c
--- /dev/null
+++ b/Lib/asyncio/queues.py
@@ -0,0 +1,284 @@
+"""Queues"""
+
+__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue',
+ 'Full', 'Empty']
+
+import collections
+import heapq
+import queue
+
+from . import events
+from . import futures
+from . import locks
+from .tasks import coroutine
+
+
+# Re-export queue.Full and .Empty exceptions.
+Full = queue.Full
+Empty = queue.Empty
+
+
+class Queue:
+ """A queue, useful for coordinating producer and consumer coroutines.
+
+ If maxsize is less than or equal to zero, the queue size is infinite. If it
+ is an integer greater than 0, then "yield from put()" will block when the
+ queue reaches maxsize, until an item is removed by get().
+
+ Unlike the standard library Queue, you can reliably know this Queue's size
+ with qsize(), since your single-threaded Tulip application won't be
+ interrupted between calling qsize() and doing an operation on the Queue.
+ """
+
+ def __init__(self, maxsize=0, *, loop=None):
+ if loop is None:
+ self._loop = events.get_event_loop()
+ else:
+ self._loop = loop
+ self._maxsize = maxsize
+
+ # Futures.
+ self._getters = collections.deque()
+ # Pairs of (item, Future).
+ self._putters = collections.deque()
+ self._init(maxsize)
+
+ def _init(self, maxsize):
+ self._queue = collections.deque()
+
+ def _get(self):
+ return self._queue.popleft()
+
+ def _put(self, item):
+ self._queue.append(item)
+
+ def __repr__(self):
+ return '<{} at {:#x} {}>'.format(
+ type(self).__name__, id(self), self._format())
+
+ def __str__(self):
+ return '<{} {}>'.format(type(self).__name__, self._format())
+
+ def _format(self):
+ result = 'maxsize={!r}'.format(self._maxsize)
+ if getattr(self, '_queue', None):
+ result += ' _queue={!r}'.format(list(self._queue))
+ if self._getters:
+ result += ' _getters[{}]'.format(len(self._getters))
+ if self._putters:
+ result += ' _putters[{}]'.format(len(self._putters))
+ return result
+
+ def _consume_done_getters(self):
+ # Delete waiters at the head of the get() queue who've timed out.
+ while self._getters and self._getters[0].done():
+ self._getters.popleft()
+
+ def _consume_done_putters(self):
+ # Delete waiters at the head of the put() queue who've timed out.
+ while self._putters and self._putters[0][1].done():
+ self._putters.popleft()
+
+ def qsize(self):
+ """Number of items in the queue."""
+ return len(self._queue)
+
+ @property
+ def maxsize(self):
+ """Number of items allowed in the queue."""
+ return self._maxsize
+
+ def empty(self):
+ """Return True if the queue is empty, False otherwise."""
+ return not self._queue
+
+ def full(self):
+ """Return True if there are maxsize items in the queue.
+
+ Note: if the Queue was initialized with maxsize=0 (the default),
+ then full() is never True.
+ """
+ if self._maxsize <= 0:
+ return False
+ else:
+ return self.qsize() == self._maxsize
+
+ @coroutine
+ def put(self, item):
+ """Put an item into the queue.
+
+ If you yield from put(), wait until a free slot is available
+ before adding item.
+ """
+ self._consume_done_getters()
+ if self._getters:
+ assert not self._queue, (
+ 'queue non-empty, why are getters waiting?')
+
+ getter = self._getters.popleft()
+
+ # Use _put and _get instead of passing item straight to getter, in
+ # case a subclass has logic that must run (e.g. JoinableQueue).
+ self._put(item)
+ getter.set_result(self._get())
+
+ elif self._maxsize > 0 and self._maxsize == self.qsize():
+ waiter = futures.Future(loop=self._loop)
+
+ self._putters.append((item, waiter))
+ yield from waiter
+
+ else:
+ self._put(item)
+
+ def put_nowait(self, item):
+ """Put an item into the queue without blocking.
+
+ If no free slot is immediately available, raise Full.
+ """
+ self._consume_done_getters()
+ if self._getters:
+ assert not self._queue, (
+ 'queue non-empty, why are getters waiting?')
+
+ getter = self._getters.popleft()
+
+ # Use _put and _get instead of passing item straight to getter, in
+ # case a subclass has logic that must run (e.g. JoinableQueue).
+ self._put(item)
+ getter.set_result(self._get())
+
+ elif self._maxsize > 0 and self._maxsize == self.qsize():
+ raise Full
+ else:
+ self._put(item)
+
+ @coroutine
+ def get(self):
+ """Remove and return an item from the queue.
+
+ If you yield from get(), wait until a item is available.
+ """
+ self._consume_done_putters()
+ if self._putters:
+ assert self.full(), 'queue not full, why are putters waiting?'
+ item, putter = self._putters.popleft()
+ self._put(item)
+
+ # When a getter runs and frees up a slot so this putter can
+ # run, we need to defer the put for a tick to ensure that
+ # getters and putters alternate perfectly. See
+ # ChannelTest.test_wait.
+ self._loop.call_soon(putter.set_result, None)
+
+ return self._get()
+
+ elif self.qsize():
+ return self._get()
+ else:
+ waiter = futures.Future(loop=self._loop)
+
+ self._getters.append(waiter)
+ return (yield from waiter)
+
+ def get_nowait(self):
+ """Remove and return an item from the queue.
+
+ Return an item if one is immediately available, else raise Full.
+ """
+ self._consume_done_putters()
+ if self._putters:
+ assert self.full(), 'queue not full, why are putters waiting?'
+ item, putter = self._putters.popleft()
+ self._put(item)
+ # Wake putter on next tick.
+ putter.set_result(None)
+
+ return self._get()
+
+ elif self.qsize():
+ return self._get()
+ else:
+ raise Empty
+
+
+class PriorityQueue(Queue):
+ """A subclass of Queue; retrieves entries in priority order (lowest first).
+
+ Entries are typically tuples of the form: (priority number, data).
+ """
+
+ def _init(self, maxsize):
+ self._queue = []
+
+ def _put(self, item, heappush=heapq.heappush):
+ heappush(self._queue, item)
+
+ def _get(self, heappop=heapq.heappop):
+ return heappop(self._queue)
+
+
+class LifoQueue(Queue):
+ """A subclass of Queue that retrieves most recently added entries first."""
+
+ def _init(self, maxsize):
+ self._queue = []
+
+ def _put(self, item):
+ self._queue.append(item)
+
+ def _get(self):
+ return self._queue.pop()
+
+
+class JoinableQueue(Queue):
+ """A subclass of Queue with task_done() and join() methods."""
+
+ def __init__(self, maxsize=0, *, loop=None):
+ super().__init__(maxsize=maxsize, loop=loop)
+ self._unfinished_tasks = 0
+ self._finished = locks.Event(loop=self._loop)
+ self._finished.set()
+
+ def _format(self):
+ result = Queue._format(self)
+ if self._unfinished_tasks:
+ result += ' tasks={}'.format(self._unfinished_tasks)
+ return result
+
+ def _put(self, item):
+ super()._put(item)
+ self._unfinished_tasks += 1
+ self._finished.clear()
+
+ def task_done(self):
+ """Indicate that a formerly enqueued task is complete.
+
+ Used by queue consumers. For each get() used to fetch a task,
+ a subsequent call to task_done() tells the queue that the processing
+ on the task is complete.
+
+ If a join() is currently blocking, it will resume when all items have
+ been processed (meaning that a task_done() call was received for every
+ item that had been put() into the queue).
+
+ Raises ValueError if called more times than there were items placed in
+ the queue.
+ """
+ if self._unfinished_tasks <= 0:
+ raise ValueError('task_done() called too many times')
+ self._unfinished_tasks -= 1
+ if self._unfinished_tasks == 0:
+ self._finished.set()
+
+ @coroutine
+ def join(self):
+ """Block until all items in the queue have been gotten and processed.
+
+ The count of unfinished tasks goes up whenever an item is added to the
+ queue. The count goes down whenever a consumer thread calls task_done()
+ to indicate that the item was retrieved and all work on it is complete.
+ When the count of unfinished tasks drops to zero, join() unblocks.
+ """
+ if self._unfinished_tasks > 0:
+ yield from self._finished.wait()
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
new file mode 100644
index 0000000..bae9a49
--- /dev/null
+++ b/Lib/asyncio/selector_events.py
@@ -0,0 +1,769 @@
+"""Event loop using a selector and related classes.
+
+A selector is a "notify-when-ready" multiplexer. For a subclass which
+also includes support for signal handling, see the unix_events sub-module.
+"""
+
+import collections
+import socket
+try:
+ import ssl
+except ImportError: # pragma: no cover
+ ssl = None
+
+from . import base_events
+from . import constants
+from . import events
+from . import futures
+from . import selectors
+from . import transports
+from .log import asyncio_log
+
+
+class BaseSelectorEventLoop(base_events.BaseEventLoop):
+ """Selector event loop.
+
+ See events.EventLoop for API specification.
+ """
+
+ def __init__(self, selector=None):
+ super().__init__()
+
+ if selector is None:
+ selector = selectors.DefaultSelector()
+ asyncio_log.debug('Using selector: %s', selector.__class__.__name__)
+ self._selector = selector
+ self._make_self_pipe()
+
+ def _make_socket_transport(self, sock, protocol, waiter=None, *,
+ extra=None, server=None):
+ return _SelectorSocketTransport(self, sock, protocol, waiter,
+ extra, server)
+
+ def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
+ server_side=False, server_hostname=None,
+ extra=None, server=None):
+ return _SelectorSslTransport(
+ self, rawsock, protocol, sslcontext, waiter,
+ server_side, server_hostname, extra, server)
+
+ def _make_datagram_transport(self, sock, protocol,
+ address=None, extra=None):
+ return _SelectorDatagramTransport(self, sock, protocol, address, extra)
+
+ def close(self):
+ if self._selector is not None:
+ self._close_self_pipe()
+ self._selector.close()
+ self._selector = None
+
+ def _socketpair(self):
+ raise NotImplementedError
+
+ def _close_self_pipe(self):
+ self.remove_reader(self._ssock.fileno())
+ self._ssock.close()
+ self._ssock = None
+ self._csock.close()
+ self._csock = None
+ self._internal_fds -= 1
+
+ def _make_self_pipe(self):
+ # A self-socket, really. :-)
+ self._ssock, self._csock = self._socketpair()
+ self._ssock.setblocking(False)
+ self._csock.setblocking(False)
+ self._internal_fds += 1
+ self.add_reader(self._ssock.fileno(), self._read_from_self)
+
+ def _read_from_self(self):
+ try:
+ self._ssock.recv(1)
+ except (BlockingIOError, InterruptedError):
+ pass
+
+ def _write_to_self(self):
+ try:
+ self._csock.send(b'x')
+ except (BlockingIOError, InterruptedError):
+ pass
+
+ def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
+ self.add_reader(sock.fileno(), self._accept_connection,
+ protocol_factory, sock, ssl, server)
+
+ def _accept_connection(self, protocol_factory, sock, ssl=None,
+ server=None):
+ try:
+ conn, addr = sock.accept()
+ conn.setblocking(False)
+ except (BlockingIOError, InterruptedError):
+ pass # False alarm.
+ except Exception:
+ # Bad error. Stop serving.
+ self.remove_reader(sock.fileno())
+ sock.close()
+ # There's nowhere to send the error, so just log it.
+ # TODO: Someone will want an error handler for this.
+ asyncio_log.exception('Accept failed')
+ else:
+ if ssl:
+ self._make_ssl_transport(
+ conn, protocol_factory(), ssl, None,
+ server_side=True, extra={'peername': addr}, server=server)
+ else:
+ self._make_socket_transport(
+ conn, protocol_factory(), extra={'peername': addr},
+ server=server)
+ # It's now up to the protocol to handle the connection.
+
+ def add_reader(self, fd, callback, *args):
+ """Add a reader callback."""
+ handle = events.make_handle(callback, args)
+ try:
+ key = self._selector.get_key(fd)
+ except KeyError:
+ self._selector.register(fd, selectors.EVENT_READ,
+ (handle, None))
+ else:
+ mask, (reader, writer) = key.events, key.data
+ self._selector.modify(fd, mask | selectors.EVENT_READ,
+ (handle, writer))
+ if reader is not None:
+ reader.cancel()
+
+ def remove_reader(self, fd):
+ """Remove a reader callback."""
+ try:
+ key = self._selector.get_key(fd)
+ except KeyError:
+ return False
+ else:
+ mask, (reader, writer) = key.events, key.data
+ mask &= ~selectors.EVENT_READ
+ if not mask:
+ self._selector.unregister(fd)
+ else:
+ self._selector.modify(fd, mask, (None, writer))
+
+ if reader is not None:
+ reader.cancel()
+ return True
+ else:
+ return False
+
+ def add_writer(self, fd, callback, *args):
+ """Add a writer callback.."""
+ handle = events.make_handle(callback, args)
+ try:
+ key = self._selector.get_key(fd)
+ except KeyError:
+ self._selector.register(fd, selectors.EVENT_WRITE,
+ (None, handle))
+ else:
+ mask, (reader, writer) = key.events, key.data
+ self._selector.modify(fd, mask | selectors.EVENT_WRITE,
+ (reader, handle))
+ if writer is not None:
+ writer.cancel()
+
+ def remove_writer(self, fd):
+ """Remove a writer callback."""
+ try:
+ key = self._selector.get_key(fd)
+ except KeyError:
+ return False
+ else:
+ mask, (reader, writer) = key.events, key.data
+ # Remove both writer and connector.
+ mask &= ~selectors.EVENT_WRITE
+ if not mask:
+ self._selector.unregister(fd)
+ else:
+ self._selector.modify(fd, mask, (reader, None))
+
+ if writer is not None:
+ writer.cancel()
+ return True
+ else:
+ return False
+
+ def sock_recv(self, sock, n):
+ """XXX"""
+ fut = futures.Future(loop=self)
+ self._sock_recv(fut, False, sock, n)
+ return fut
+
+ def _sock_recv(self, fut, registered, sock, n):
+ fd = sock.fileno()
+ if registered:
+ # Remove the callback early. It should be rare that the
+ # selector says the fd is ready but the call still returns
+ # EAGAIN, and I am willing to take a hit in that case in
+ # order to simplify the common case.
+ self.remove_reader(fd)
+ if fut.cancelled():
+ return
+ try:
+ data = sock.recv(n)
+ except (BlockingIOError, InterruptedError):
+ self.add_reader(fd, self._sock_recv, fut, True, sock, n)
+ except Exception as exc:
+ fut.set_exception(exc)
+ else:
+ fut.set_result(data)
+
+ def sock_sendall(self, sock, data):
+ """XXX"""
+ fut = futures.Future(loop=self)
+ if data:
+ self._sock_sendall(fut, False, sock, data)
+ else:
+ fut.set_result(None)
+ return fut
+
+ def _sock_sendall(self, fut, registered, sock, data):
+ fd = sock.fileno()
+
+ if registered:
+ self.remove_writer(fd)
+ if fut.cancelled():
+ return
+
+ try:
+ n = sock.send(data)
+ except (BlockingIOError, InterruptedError):
+ n = 0
+ except Exception as exc:
+ fut.set_exception(exc)
+ return
+
+ if n == len(data):
+ fut.set_result(None)
+ else:
+ if n:
+ data = data[n:]
+ self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
+
+ def sock_connect(self, sock, address):
+ """XXX"""
+ # That address better not require a lookup! We're not calling
+ # self.getaddrinfo() for you here. But verifying this is
+ # complicated; the socket module doesn't have a pattern for
+ # IPv6 addresses (there are too many forms, apparently).
+ fut = futures.Future(loop=self)
+ self._sock_connect(fut, False, sock, address)
+ return fut
+
+ def _sock_connect(self, fut, registered, sock, address):
+ # TODO: Use getaddrinfo() to look up the address, to avoid the
+ # trap of hanging the entire event loop when the address
+ # requires doing a DNS lookup. (OTOH, the caller should
+ # already have done this, so it would be nice if we could
+ # easily tell whether the address needs looking up or not. I
+ # know how to do this for IPv4, but IPv6 addresses have many
+ # syntaxes.)
+ fd = sock.fileno()
+ if registered:
+ self.remove_writer(fd)
+ if fut.cancelled():
+ return
+ try:
+ if not registered:
+ # First time around.
+ sock.connect(address)
+ else:
+ err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ if err != 0:
+ # Jump to the except clause below.
+ raise OSError(err, 'Connect call failed')
+ except (BlockingIOError, InterruptedError):
+ self.add_writer(fd, self._sock_connect, fut, True, sock, address)
+ except Exception as exc:
+ fut.set_exception(exc)
+ else:
+ fut.set_result(None)
+
+ def sock_accept(self, sock):
+ """XXX"""
+ fut = futures.Future(loop=self)
+ self._sock_accept(fut, False, sock)
+ return fut
+
+ def _sock_accept(self, fut, registered, sock):
+ fd = sock.fileno()
+ if registered:
+ self.remove_reader(fd)
+ if fut.cancelled():
+ return
+ try:
+ conn, address = sock.accept()
+ conn.setblocking(False)
+ except (BlockingIOError, InterruptedError):
+ self.add_reader(fd, self._sock_accept, fut, True, sock)
+ except Exception as exc:
+ fut.set_exception(exc)
+ else:
+ fut.set_result((conn, address))
+
+ def _process_events(self, event_list):
+ for key, mask in event_list:
+ fileobj, (reader, writer) = key.fileobj, key.data
+ if mask & selectors.EVENT_READ and reader is not None:
+ if reader._cancelled:
+ self.remove_reader(fileobj)
+ else:
+ self._add_callback(reader)
+ if mask & selectors.EVENT_WRITE and writer is not None:
+ if writer._cancelled:
+ self.remove_writer(fileobj)
+ else:
+ self._add_callback(writer)
+
+ def _stop_serving(self, sock):
+ self.remove_reader(sock.fileno())
+ sock.close()
+
+
+class _SelectorTransport(transports.Transport):
+
+ max_size = 256 * 1024 # Buffer size passed to recv().
+
+ def __init__(self, loop, sock, protocol, extra, server=None):
+ super().__init__(extra)
+ self._extra['socket'] = sock
+ self._extra['sockname'] = sock.getsockname()
+ if 'peername' not in self._extra:
+ try:
+ self._extra['peername'] = sock.getpeername()
+ except socket.error:
+ self._extra['peername'] = None
+ self._loop = loop
+ self._sock = sock
+ self._sock_fd = sock.fileno()
+ self._protocol = protocol
+ self._server = server
+ self._buffer = collections.deque()
+ self._conn_lost = 0
+ self._closing = False # Set when close() called.
+ if server is not None:
+ server.attach(self)
+
+ def abort(self):
+ self._force_close(None)
+
+ def close(self):
+ if self._closing:
+ return
+ self._closing = True
+ self._conn_lost += 1
+ self._loop.remove_reader(self._sock_fd)
+ if not self._buffer:
+ self._loop.call_soon(self._call_connection_lost, None)
+
+ def _fatal_error(self, exc):
+ # should be called from exception handler only
+ asyncio_log.exception('Fatal error for %s', self)
+ self._force_close(exc)
+
+ def _force_close(self, exc):
+ if self._buffer:
+ self._buffer.clear()
+ self._loop.remove_writer(self._sock_fd)
+
+ if self._closing:
+ return
+
+ self._closing = True
+ self._conn_lost += 1
+ self._loop.remove_reader(self._sock_fd)
+ self._loop.call_soon(self._call_connection_lost, exc)
+
+ def _call_connection_lost(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ self._sock.close()
+ self._sock = None
+ self._protocol = None
+ self._loop = None
+ server = self._server
+ if server is not None:
+ server.detach(self)
+ self._server = None
+
+
+class _SelectorSocketTransport(_SelectorTransport):
+
+ def __init__(self, loop, sock, protocol, waiter=None,
+ extra=None, server=None):
+ super().__init__(loop, sock, protocol, extra, server)
+ self._eof = False
+ self._paused = False
+
+ self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop.call_soon(self._protocol.connection_made, self)
+ if waiter is not None:
+ self._loop.call_soon(waiter.set_result, None)
+
+ def pause(self):
+ assert not self._closing, 'Cannot pause() when closing'
+ assert not self._paused, 'Already paused'
+ self._paused = True
+ self._loop.remove_reader(self._sock_fd)
+
+ def resume(self):
+ assert self._paused, 'Not paused'
+ self._paused = False
+ if self._closing:
+ return
+ self._loop.add_reader(self._sock_fd, self._read_ready)
+
+ def _read_ready(self):
+ try:
+ data = self._sock.recv(self.max_size)
+ except (BlockingIOError, InterruptedError):
+ pass
+ except ConnectionResetError as exc:
+ self._force_close(exc)
+ except Exception as exc:
+ self._fatal_error(exc)
+ else:
+ if data:
+ self._protocol.data_received(data)
+ else:
+ keep_open = self._protocol.eof_received()
+ if not keep_open:
+ self.close()
+
+ def write(self, data):
+ assert isinstance(data, bytes), repr(type(data))
+ assert not self._eof, 'Cannot call write() after write_eof()'
+ if not data:
+ return
+
+ if self._conn_lost:
+ if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
+ asyncio_log.warning('socket.send() raised exception.')
+ self._conn_lost += 1
+ return
+
+ if not self._buffer:
+ # Attempt to send it right away first.
+ try:
+ n = self._sock.send(data)
+ except (BlockingIOError, InterruptedError):
+ n = 0
+ except (BrokenPipeError, ConnectionResetError) as exc:
+ self._force_close(exc)
+ return
+ except OSError as exc:
+ self._fatal_error(exc)
+ return
+ else:
+ data = data[n:]
+ if not data:
+ return
+ # Start async I/O.
+ self._loop.add_writer(self._sock_fd, self._write_ready)
+
+ self._buffer.append(data)
+
+ def _write_ready(self):
+ data = b''.join(self._buffer)
+ assert data, 'Data should not be empty'
+
+ self._buffer.clear()
+ try:
+ n = self._sock.send(data)
+ except (BlockingIOError, InterruptedError):
+ self._buffer.append(data)
+ except (BrokenPipeError, ConnectionResetError) as exc:
+ self._loop.remove_writer(self._sock_fd)
+ self._force_close(exc)
+ except Exception as exc:
+ self._loop.remove_writer(self._sock_fd)
+ self._fatal_error(exc)
+ else:
+ data = data[n:]
+ if not data:
+ self._loop.remove_writer(self._sock_fd)
+ if self._closing:
+ self._call_connection_lost(None)
+ elif self._eof:
+ self._sock.shutdown(socket.SHUT_WR)
+ return
+
+ self._buffer.append(data) # Try again later.
+
+ def write_eof(self):
+ if self._eof:
+ return
+ self._eof = True
+ if not self._buffer:
+ self._sock.shutdown(socket.SHUT_WR)
+
+ def can_write_eof(self):
+ return True
+
+
+class _SelectorSslTransport(_SelectorTransport):
+
+ def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
+ server_side=False, server_hostname=None,
+ extra=None, server=None):
+ if server_side:
+ assert isinstance(
+ sslcontext, ssl.SSLContext), 'Must pass an SSLContext'
+ else:
+ # Client-side may pass ssl=True to use a default context.
+ # The default is the same as used by urllib.
+ if sslcontext is None:
+ sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslcontext.options |= ssl.OP_NO_SSLv2
+ sslcontext.set_default_verify_paths()
+ sslcontext.verify_mode = ssl.CERT_REQUIRED
+ wrap_kwargs = {
+ 'server_side': server_side,
+ 'do_handshake_on_connect': False,
+ }
+ if server_hostname is not None and not server_side and ssl.HAS_SNI:
+ wrap_kwargs['server_hostname'] = server_hostname
+ sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
+
+ super().__init__(loop, sslsock, protocol, extra, server)
+
+ self._server_hostname = server_hostname
+ self._waiter = waiter
+ self._rawsock = rawsock
+ self._sslcontext = sslcontext
+ self._paused = False
+
+ # SSL-specific extra info. (peercert is set later)
+ self._extra.update(sslcontext=sslcontext)
+
+ self._on_handshake()
+
+ def _on_handshake(self):
+ try:
+ self._sock.do_handshake()
+ except ssl.SSLWantReadError:
+ self._loop.add_reader(self._sock_fd, self._on_handshake)
+ return
+ except ssl.SSLWantWriteError:
+ self._loop.add_writer(self._sock_fd, self._on_handshake)
+ return
+ except Exception as exc:
+ self._sock.close()
+ if self._waiter is not None:
+ self._waiter.set_exception(exc)
+ return
+ except BaseException as exc:
+ self._sock.close()
+ if self._waiter is not None:
+ self._waiter.set_exception(exc)
+ raise
+
+ # Verify hostname if requested.
+ peercert = self._sock.getpeercert()
+ if (self._server_hostname is not None and
+ self._sslcontext.verify_mode == ssl.CERT_REQUIRED):
+ try:
+ ssl.match_hostname(peercert, self._server_hostname)
+ except Exception as exc:
+ self._sock.close()
+ if self._waiter is not None:
+ self._waiter.set_exception(exc)
+ return
+
+ # Add extra info that becomes available after handshake.
+ self._extra.update(peercert=peercert,
+ cipher=self._sock.cipher(),
+ compression=self._sock.compression(),
+ )
+
+ self._loop.remove_reader(self._sock_fd)
+ self._loop.remove_writer(self._sock_fd)
+ self._loop.add_reader(self._sock_fd, self._on_ready)
+ self._loop.add_writer(self._sock_fd, self._on_ready)
+ self._loop.call_soon(self._protocol.connection_made, self)
+ if self._waiter is not None:
+ self._loop.call_soon(self._waiter.set_result, None)
+
+ def pause(self):
+ # XXX This is a bit icky, given the comment at the top of
+ # _on_ready(). Is it possible to evoke a deadlock? I don't
+ # know, although it doesn't look like it; write() will still
+ # accept more data for the buffer and eventually the app will
+ # call resume() again, and things will flow again.
+
+ assert not self._closing, 'Cannot pause() when closing'
+ assert not self._paused, 'Already paused'
+ self._paused = True
+ self._loop.remove_reader(self._sock_fd)
+
+ def resume(self):
+ assert self._paused, 'Not paused'
+ self._paused = False
+ if self._closing:
+ return
+ self._loop.add_reader(self._sock_fd, self._on_ready)
+
+ def _on_ready(self):
+ # Because of renegotiations (?), there's no difference between
+ # readable and writable. We just try both. XXX This may be
+ # incorrect; we probably need to keep state about what we
+ # should do next.
+
+ # First try reading.
+ if not self._closing and not self._paused:
+ try:
+ data = self._sock.recv(self.max_size)
+ except (BlockingIOError, InterruptedError,
+ ssl.SSLWantReadError, ssl.SSLWantWriteError):
+ pass
+ except ConnectionResetError as exc:
+ self._force_close(exc)
+ except Exception as exc:
+ self._fatal_error(exc)
+ else:
+ if data:
+ self._protocol.data_received(data)
+ else:
+ try:
+ self._protocol.eof_received()
+ finally:
+ self.close()
+
+ # Now try writing, if there's anything to write.
+ if self._buffer:
+ data = b''.join(self._buffer)
+ self._buffer.clear()
+ try:
+ n = self._sock.send(data)
+ except (BlockingIOError, InterruptedError,
+ ssl.SSLWantReadError, ssl.SSLWantWriteError):
+ n = 0
+ except (BrokenPipeError, ConnectionResetError) as exc:
+ self._loop.remove_writer(self._sock_fd)
+ self._force_close(exc)
+ return
+ except Exception as exc:
+ self._loop.remove_writer(self._sock_fd)
+ self._fatal_error(exc)
+ return
+
+ if n < len(data):
+ self._buffer.append(data[n:])
+
+ if self._closing and not self._buffer:
+ self._loop.remove_writer(self._sock_fd)
+ self._call_connection_lost(None)
+
+ def write(self, data):
+ assert isinstance(data, bytes), repr(type(data))
+ if not data:
+ return
+
+ if self._conn_lost:
+ if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
+ asyncio_log.warning('socket.send() raised exception.')
+ self._conn_lost += 1
+ return
+
+ self._buffer.append(data)
+ # We could optimize, but the callback can do this for now.
+
+ def can_write_eof(self):
+ return False
+
+ def close(self):
+ if self._closing:
+ return
+ self._closing = True
+ self._conn_lost += 1
+ self._loop.remove_reader(self._sock_fd)
+
+
+class _SelectorDatagramTransport(_SelectorTransport):
+
+ def __init__(self, loop, sock, protocol, address=None, extra=None):
+ super().__init__(loop, sock, protocol, extra)
+
+ self._address = address
+ self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop.call_soon(self._protocol.connection_made, self)
+
+ def _read_ready(self):
+ try:
+ data, addr = self._sock.recvfrom(self.max_size)
+ except (BlockingIOError, InterruptedError):
+ pass
+ except Exception as exc:
+ self._fatal_error(exc)
+ else:
+ self._protocol.datagram_received(data, addr)
+
+ def sendto(self, data, addr=None):
+ assert isinstance(data, bytes), repr(type(data))
+ if not data:
+ return
+
+ if self._address:
+ assert addr in (None, self._address)
+
+ if self._conn_lost and self._address:
+ if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
+ asyncio_log.warning('socket.send() raised exception.')
+ self._conn_lost += 1
+ return
+
+ if not self._buffer:
+ # Attempt to send it right away first.
+ try:
+ if self._address:
+ self._sock.send(data)
+ else:
+ self._sock.sendto(data, addr)
+ return
+ except ConnectionRefusedError as exc:
+ if self._address:
+ self._fatal_error(exc)
+ return
+ except (BlockingIOError, InterruptedError):
+ self._loop.add_writer(self._sock_fd, self._sendto_ready)
+ except Exception as exc:
+ self._fatal_error(exc)
+ return
+
+ self._buffer.append((data, addr))
+
+ def _sendto_ready(self):
+ while self._buffer:
+ data, addr = self._buffer.popleft()
+ try:
+ if self._address:
+ self._sock.send(data)
+ else:
+ self._sock.sendto(data, addr)
+ except ConnectionRefusedError as exc:
+ if self._address:
+ self._fatal_error(exc)
+ return
+ except (BlockingIOError, InterruptedError):
+ self._buffer.appendleft((data, addr)) # Try again later.
+ break
+ except Exception as exc:
+ self._fatal_error(exc)
+ return
+
+ if not self._buffer:
+ self._loop.remove_writer(self._sock_fd)
+ if self._closing:
+ self._call_connection_lost(None)
+
+ def _force_close(self, exc):
+ if self._address and isinstance(exc, ConnectionRefusedError):
+ self._protocol.connection_refused(exc)
+
+ super()._force_close(exc)
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
new file mode 100644
index 0000000..d0f12e8
--- /dev/null
+++ b/Lib/asyncio/streams.py
@@ -0,0 +1,257 @@
+"""Stream-related things."""
+
+__all__ = ['StreamReader', 'StreamReaderProtocol', 'open_connection']
+
+import collections
+
+from . import events
+from . import futures
+from . import protocols
+from . import tasks
+
+
+_DEFAULT_LIMIT = 2**16
+
+
+@tasks.coroutine
+def open_connection(host=None, port=None, *,
+ loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ """A wrapper for create_connection() returning a (reader, writer) pair.
+
+ The reader returned is a StreamReader instance; the writer is a
+ Transport.
+
+ The arguments are all the usual arguments to create_connection()
+ except protocol_factory; most common are positional host and port,
+ with various optional keyword arguments following.
+
+ Additional optional keyword arguments are loop (to set the event loop
+ instance to use) and limit (to set the buffer limit passed to the
+ StreamReader).
+
+ (If you want to customize the StreamReader and/or
+ StreamReaderProtocol classes, just copy the code -- there's
+ really nothing special here except some convenience.)
+ """
+ if loop is None:
+ loop = events.get_event_loop()
+ reader = StreamReader(limit=limit, loop=loop)
+ protocol = StreamReaderProtocol(reader)
+ transport, _ = yield from loop.create_connection(
+ lambda: protocol, host, port, **kwds)
+ return reader, transport # (reader, writer)
+
+
+class StreamReaderProtocol(protocols.Protocol):
+ """Trivial helper class to adapt between Protocol and StreamReader.
+
+ (This is a helper class instead of making StreamReader itself a
+ Protocol subclass, because the StreamReader has other potential
+ uses, and to prevent the user of the StreamReader to accidentally
+ call inappropriate methods of the protocol.)
+ """
+
+ def __init__(self, stream_reader):
+ self.stream_reader = stream_reader
+
+ def connection_made(self, transport):
+ self.stream_reader.set_transport(transport)
+
+ def connection_lost(self, exc):
+ if exc is None:
+ self.stream_reader.feed_eof()
+ else:
+ self.stream_reader.set_exception(exc)
+
+ def data_received(self, data):
+ self.stream_reader.feed_data(data)
+
+ def eof_received(self):
+ self.stream_reader.feed_eof()
+
+
+class StreamReader:
+
+ def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
+ # The line length limit is a security feature;
+ # it also doubles as half the buffer limit.
+ self.limit = limit
+ if loop is None:
+ loop = events.get_event_loop()
+ self.loop = loop
+ self.buffer = collections.deque() # Deque of bytes objects.
+ self.byte_count = 0 # Bytes in buffer.
+ self.eof = False # Whether we're done.
+ self.waiter = None # A future.
+ self._exception = None
+ self._transport = None
+ self._paused = False
+
+ def exception(self):
+ return self._exception
+
+ def set_exception(self, exc):
+ self._exception = exc
+
+ waiter = self.waiter
+ if waiter is not None:
+ self.waiter = None
+ if not waiter.cancelled():
+ waiter.set_exception(exc)
+
+ def set_transport(self, transport):
+ assert self._transport is None, 'Transport already set'
+ self._transport = transport
+
+ def _maybe_resume_transport(self):
+ if self._paused and self.byte_count <= self.limit:
+ self._paused = False
+ self._transport.resume()
+
+ def feed_eof(self):
+ self.eof = True
+ waiter = self.waiter
+ if waiter is not None:
+ self.waiter = None
+ if not waiter.cancelled():
+ waiter.set_result(True)
+
+ def feed_data(self, data):
+ if not data:
+ return
+
+ self.buffer.append(data)
+ self.byte_count += len(data)
+
+ waiter = self.waiter
+ if waiter is not None:
+ self.waiter = None
+ if not waiter.cancelled():
+ waiter.set_result(False)
+
+ if (self._transport is not None and
+ not self._paused and
+ self.byte_count > 2*self.limit):
+ try:
+ self._transport.pause()
+ except NotImplementedError:
+ # The transport can't be paused.
+ # We'll just have to buffer all data.
+ # Forget the transport so we don't keep trying.
+ self._transport = None
+ else:
+ self._paused = True
+
+ @tasks.coroutine
+ def readline(self):
+ if self._exception is not None:
+ raise self._exception
+
+ parts = []
+ parts_size = 0
+ not_enough = True
+
+ while not_enough:
+ while self.buffer and not_enough:
+ data = self.buffer.popleft()
+ ichar = data.find(b'\n')
+ if ichar < 0:
+ parts.append(data)
+ parts_size += len(data)
+ else:
+ ichar += 1
+ head, tail = data[:ichar], data[ichar:]
+ if tail:
+ self.buffer.appendleft(tail)
+ not_enough = False
+ parts.append(head)
+ parts_size += len(head)
+
+ if parts_size > self.limit:
+ self.byte_count -= parts_size
+ self._maybe_resume_transport()
+ raise ValueError('Line is too long')
+
+ if self.eof:
+ break
+
+ if not_enough:
+ assert self.waiter is None
+ self.waiter = futures.Future(loop=self.loop)
+ try:
+ yield from self.waiter
+ finally:
+ self.waiter = None
+
+ line = b''.join(parts)
+ self.byte_count -= parts_size
+ self._maybe_resume_transport()
+
+ return line
+
+ @tasks.coroutine
+ def read(self, n=-1):
+ if self._exception is not None:
+ raise self._exception
+
+ if not n:
+ return b''
+
+ if n < 0:
+ while not self.eof:
+ assert not self.waiter
+ self.waiter = futures.Future(loop=self.loop)
+ try:
+ yield from self.waiter
+ finally:
+ self.waiter = None
+ else:
+ if not self.byte_count and not self.eof:
+ assert not self.waiter
+ self.waiter = futures.Future(loop=self.loop)
+ try:
+ yield from self.waiter
+ finally:
+ self.waiter = None
+
+ if n < 0 or self.byte_count <= n:
+ data = b''.join(self.buffer)
+ self.buffer.clear()
+ self.byte_count = 0
+ self._maybe_resume_transport()
+ return data
+
+ parts = []
+ parts_bytes = 0
+ while self.buffer and parts_bytes < n:
+ data = self.buffer.popleft()
+ data_bytes = len(data)
+ if n < parts_bytes + data_bytes:
+ data_bytes = n - parts_bytes
+ data, rest = data[:data_bytes], data[data_bytes:]
+ self.buffer.appendleft(rest)
+
+ parts.append(data)
+ parts_bytes += data_bytes
+ self.byte_count -= data_bytes
+ self._maybe_resume_transport()
+
+ return b''.join(parts)
+
+ @tasks.coroutine
+ def readexactly(self, n):
+ if self._exception is not None:
+ raise self._exception
+
+ if n <= 0:
+ return b''
+
+ while self.byte_count < n and not self.eof:
+ assert not self.waiter
+ self.waiter = futures.Future(loop=self.loop)
+ try:
+ yield from self.waiter
+ finally:
+ self.waiter = None
+
+ return (yield from self.read(n))
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
new file mode 100644
index 0000000..2c8579f
--- /dev/null
+++ b/Lib/asyncio/tasks.py
@@ -0,0 +1,636 @@
+"""Support for tasks, coroutines and the scheduler."""
+
+__all__ = ['coroutine', 'Task',
+ 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
+ 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
+ 'gather',
+ ]
+
+import collections
+import concurrent.futures
+import functools
+import inspect
+import linecache
+import traceback
+import weakref
+
+from . import events
+from . import futures
+from .log import asyncio_log
+
+# If you set _DEBUG to true, @coroutine will wrap the resulting
+# generator objects in a CoroWrapper instance (defined below). That
+# instance will log a message when the generator is never iterated
+# over, which may happen when you forget to use "yield from" with a
+# coroutine call. Note that the value of the _DEBUG flag is taken
+# when the decorator is used, so to be of any use it must be set
+# before you define your coroutines. A downside of using this feature
+# is that tracebacks show entries for the CoroWrapper.__next__ method
+# when _DEBUG is true.
+_DEBUG = False
+
+
+class CoroWrapper:
+ """Wrapper for coroutine in _DEBUG mode."""
+
+ __slot__ = ['gen', 'func']
+
+ def __init__(self, gen, func):
+ assert inspect.isgenerator(gen), gen
+ self.gen = gen
+ self.func = func
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return next(self.gen)
+
+ def send(self, value):
+ return self.gen.send(value)
+
+ def throw(self, exc):
+ return self.gen.throw(exc)
+
+ def close(self):
+ return self.gen.close()
+
+ def __del__(self):
+ frame = self.gen.gi_frame
+ if frame is not None and frame.f_lasti == -1:
+ func = self.func
+ code = func.__code__
+ filename = code.co_filename
+ lineno = code.co_firstlineno
+ asyncio_log.error('Coroutine %r defined at %s:%s was never yielded from',
+ func.__name__, filename, lineno)
+
+
+def coroutine(func):
+ """Decorator to mark coroutines.
+
+ If the coroutine is not yielded from before it is destroyed,
+ an error message is logged.
+ """
+ if inspect.isgeneratorfunction(func):
+ coro = func
+ else:
+ @functools.wraps(func)
+ def coro(*args, **kw):
+ res = func(*args, **kw)
+ if isinstance(res, futures.Future) or inspect.isgenerator(res):
+ res = yield from res
+ return res
+
+ if not _DEBUG:
+ wrapper = coro
+ else:
+ @functools.wraps(func)
+ def wrapper(*args, **kwds):
+ w = CoroWrapper(coro(*args, **kwds), func)
+ w.__name__ = coro.__name__
+ w.__doc__ = coro.__doc__
+ return w
+
+ wrapper._is_coroutine = True # For iscoroutinefunction().
+ return wrapper
+
+
+def iscoroutinefunction(func):
+ """Return True if func is a decorated coroutine function."""
+ return getattr(func, '_is_coroutine', False)
+
+
+def iscoroutine(obj):
+ """Return True if obj is a coroutine object."""
+ return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
+
+
+class Task(futures.Future):
+ """A coroutine wrapped in a Future."""
+
+ # An important invariant maintained while a Task not done:
+ #
+ # - Either _fut_waiter is None, and _step() is scheduled;
+ # - or _fut_waiter is some Future, and _step() is *not* scheduled.
+ #
+ # The only transition from the latter to the former is through
+ # _wakeup(). When _fut_waiter is not None, one of its callbacks
+ # must be _wakeup().
+
+ # Weak set containing all tasks alive.
+ _all_tasks = weakref.WeakSet()
+
+ @classmethod
+ def all_tasks(cls, loop=None):
+ """Return a set of all tasks for an event loop.
+
+ By default all tasks for the current event loop are returned.
+ """
+ if loop is None:
+ loop = events.get_event_loop()
+ return {t for t in cls._all_tasks if t._loop is loop}
+
+ def __init__(self, coro, *, loop=None):
+ assert iscoroutine(coro), repr(coro) # Not a coroutine function!
+ super().__init__(loop=loop)
+ self._coro = iter(coro) # Use the iterator just in case.
+ self._fut_waiter = None
+ self._must_cancel = False
+ self._loop.call_soon(self._step)
+ self.__class__._all_tasks.add(self)
+
+ def __repr__(self):
+ res = super().__repr__()
+ if (self._must_cancel and
+ self._state == futures._PENDING and
+ '<PENDING' in res):
+ res = res.replace('<PENDING', '<CANCELLING', 1)
+ i = res.find('<')
+ if i < 0:
+ i = len(res)
+ res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
+ return res
+
+ def get_stack(self, *, limit=None):
+ """Return the list of stack frames for this task's coroutine.
+
+ If the coroutine is active, this returns the stack where it is
+ suspended. If the coroutine has completed successfully or was
+ cancelled, this returns an empty list. If the coroutine was
+ terminated by an exception, this returns the list of traceback
+ frames.
+
+ The frames are always ordered from oldest to newest.
+
+ The optional limit gives the maximum nummber of frames to
+ return; by default all available frames are returned. Its
+ meaning differs depending on whether a stack or a traceback is
+ returned: the newest frames of a stack are returned, but the
+ oldest frames of a traceback are returned. (This matches the
+ behavior of the traceback module.)
+
+ For reasons beyond our control, only one stack frame is
+ returned for a suspended coroutine.
+ """
+ frames = []
+ f = self._coro.gi_frame
+ if f is not None:
+ while f is not None:
+ if limit is not None:
+ if limit <= 0:
+ break
+ limit -= 1
+ frames.append(f)
+ f = f.f_back
+ frames.reverse()
+ elif self._exception is not None:
+ tb = self._exception.__traceback__
+ while tb is not None:
+ if limit is not None:
+ if limit <= 0:
+ break
+ limit -= 1
+ frames.append(tb.tb_frame)
+ tb = tb.tb_next
+ return frames
+
+ def print_stack(self, *, limit=None, file=None):
+ """Print the stack or traceback for this task's coroutine.
+
+ This produces output similar to that of the traceback module,
+ for the frames retrieved by get_stack(). The limit argument
+ is passed to get_stack(). The file argument is an I/O stream
+ to which the output goes; by default it goes to sys.stderr.
+ """
+ extracted_list = []
+ checked = set()
+ for f in self.get_stack(limit=limit):
+ lineno = f.f_lineno
+ co = f.f_code
+ filename = co.co_filename
+ name = co.co_name
+ if filename not in checked:
+ checked.add(filename)
+ linecache.checkcache(filename)
+ line = linecache.getline(filename, lineno, f.f_globals)
+ extracted_list.append((filename, lineno, name, line))
+ exc = self._exception
+ if not extracted_list:
+ print('No stack for %r' % self, file=file)
+ elif exc is not None:
+ print('Traceback for %r (most recent call last):' % self,
+ file=file)
+ else:
+ print('Stack for %r (most recent call last):' % self,
+ file=file)
+ traceback.print_list(extracted_list, file=file)
+ if exc is not None:
+ for line in traceback.format_exception_only(exc.__class__, exc):
+ print(line, file=file, end='')
+
+ def cancel(self):
+ if self.done():
+ return False
+ if self._fut_waiter is not None:
+ if self._fut_waiter.cancel():
+ # Leave self._fut_waiter; it may be a Task that
+ # catches and ignores the cancellation so we may have
+ # to cancel it again later.
+ return True
+ # It must be the case that self._step is already scheduled.
+ self._must_cancel = True
+ return True
+
+ def _step(self, value=None, exc=None):
+ assert not self.done(), \
+ '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
+ if self._must_cancel:
+ if not isinstance(exc, futures.CancelledError):
+ exc = futures.CancelledError()
+ self._must_cancel = False
+ coro = self._coro
+ self._fut_waiter = None
+ # Call either coro.throw(exc) or coro.send(value).
+ try:
+ if exc is not None:
+ result = coro.throw(exc)
+ elif value is not None:
+ result = coro.send(value)
+ else:
+ result = next(coro)
+ except StopIteration as exc:
+ self.set_result(exc.value)
+ except futures.CancelledError as exc:
+ super().cancel() # I.e., Future.cancel(self).
+ except Exception as exc:
+ self.set_exception(exc)
+ except BaseException as exc:
+ self.set_exception(exc)
+ raise
+ else:
+ if isinstance(result, futures.Future):
+ # Yielded Future must come from Future.__iter__().
+ if result._blocking:
+ result._blocking = False
+ result.add_done_callback(self._wakeup)
+ self._fut_waiter = result
+ if self._must_cancel:
+ if self._fut_waiter.cancel():
+ self._must_cancel = False
+ else:
+ self._loop.call_soon(
+ self._step, None,
+ RuntimeError(
+ 'yield was used instead of yield from '
+ 'in task {!r} with {!r}'.format(self, result)))
+ elif result is None:
+ # Bare yield relinquishes control for one event loop iteration.
+ self._loop.call_soon(self._step)
+ elif inspect.isgenerator(result):
+ # Yielding a generator is just wrong.
+ self._loop.call_soon(
+ self._step, None,
+ RuntimeError(
+ 'yield was used instead of yield from for '
+ 'generator in task {!r} with {}'.format(
+ self, result)))
+ else:
+ # Yielding something else is an error.
+ self._loop.call_soon(
+ self._step, None,
+ RuntimeError(
+ 'Task got bad yield: {!r}'.format(result)))
+ self = None
+
+ def _wakeup(self, future):
+ try:
+ value = future.result()
+ except Exception as exc:
+ # This may also be a cancellation.
+ self._step(None, exc)
+ else:
+ self._step(value, None)
+ self = None # Needed to break cycles when an exception occurs.
+
+
+# wait() and as_completed() similar to those in PEP 3148.
+
+FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
+FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
+ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
+
+
+@coroutine
+def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
+ """Wait for the Futures and coroutines given by fs to complete.
+
+ Coroutines will be wrapped in Tasks.
+
+ Returns two sets of Future: (done, pending).
+
+ Usage:
+
+ done, pending = yield from asyncio.wait(fs)
+
+ Note: This does not raise TimeoutError! Futures that aren't done
+ when the timeout occurs are returned in the second set.
+ """
+ if not fs:
+ raise ValueError('Set of coroutines/Futures is empty.')
+
+ if loop is None:
+ loop = events.get_event_loop()
+
+ fs = set(async(f, loop=loop) for f in fs)
+
+ if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
+ raise ValueError('Invalid return_when value: {}'.format(return_when))
+ return (yield from _wait(fs, timeout, return_when, loop))
+
+
+def _release_waiter(waiter, value=True, *args):
+ if not waiter.done():
+ waiter.set_result(value)
+
+
+@coroutine
+def wait_for(fut, timeout, *, loop=None):
+ """Wait for the single Future or coroutine to complete, with timeout.
+
+ Coroutine will be wrapped in Task.
+
+ Returns result of the Future or coroutine. Raises TimeoutError when
+ timeout occurs.
+
+ Usage:
+
+ result = yield from asyncio.wait_for(fut, 10.0)
+
+ """
+ if loop is None:
+ loop = events.get_event_loop()
+
+ waiter = futures.Future(loop=loop)
+ timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
+ cb = functools.partial(_release_waiter, waiter, True)
+
+ fut = async(fut, loop=loop)
+ fut.add_done_callback(cb)
+
+ try:
+ if (yield from waiter):
+ return fut.result()
+ else:
+ fut.remove_done_callback(cb)
+ raise futures.TimeoutError()
+ finally:
+ timeout_handle.cancel()
+
+
+@coroutine
+def _wait(fs, timeout, return_when, loop):
+ """Internal helper for wait() and _wait_for().
+
+ The fs argument must be a collection of Futures.
+ """
+ assert fs, 'Set of Futures is empty.'
+ waiter = futures.Future(loop=loop)
+ timeout_handle = None
+ if timeout is not None:
+ timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
+ counter = len(fs)
+
+ def _on_completion(f):
+ nonlocal counter
+ counter -= 1
+ if (counter <= 0 or
+ return_when == FIRST_COMPLETED or
+ return_when == FIRST_EXCEPTION and (not f.cancelled() and
+ f.exception() is not None)):
+ if timeout_handle is not None:
+ timeout_handle.cancel()
+ if not waiter.done():
+ waiter.set_result(False)
+
+ for f in fs:
+ f.add_done_callback(_on_completion)
+
+ try:
+ yield from waiter
+ finally:
+ if timeout_handle is not None:
+ timeout_handle.cancel()
+
+ done, pending = set(), set()
+ for f in fs:
+ f.remove_done_callback(_on_completion)
+ if f.done():
+ done.add(f)
+ else:
+ pending.add(f)
+ return done, pending
+
+
+# This is *not* a @coroutine! It is just an iterator (yielding Futures).
+def as_completed(fs, *, loop=None, timeout=None):
+ """Return an iterator whose values, when waited for, are Futures.
+
+ This differs from PEP 3148; the proper way to use this is:
+
+ for f in as_completed(fs):
+ result = yield from f # The 'yield from' may raise.
+ # Use result.
+
+ Raises TimeoutError if the timeout occurs before all Futures are
+ done.
+
+ Note: The futures 'f' are not necessarily members of fs.
+ """
+ loop = loop if loop is not None else events.get_event_loop()
+ deadline = None if timeout is None else loop.time() + timeout
+ todo = set(async(f, loop=loop) for f in fs)
+ completed = collections.deque()
+
+ @coroutine
+ def _wait_for_one():
+ while not completed:
+ timeout = None
+ if deadline is not None:
+ timeout = deadline - loop.time()
+ if timeout < 0:
+ raise futures.TimeoutError()
+ done, pending = yield from _wait(
+ todo, timeout, FIRST_COMPLETED, loop)
+ # Multiple callers might be waiting for the same events
+ # and getting the same outcome. Dedupe by updating todo.
+ for f in done:
+ if f in todo:
+ todo.remove(f)
+ completed.append(f)
+ f = completed.popleft()
+ return f.result() # May raise.
+
+ for _ in range(len(todo)):
+ yield _wait_for_one()
+
+
+@coroutine
+def sleep(delay, result=None, *, loop=None):
+ """Coroutine that completes after a given time (in seconds)."""
+ future = futures.Future(loop=loop)
+ h = future._loop.call_later(delay, future.set_result, result)
+ try:
+ return (yield from future)
+ finally:
+ h.cancel()
+
+
+def async(coro_or_future, *, loop=None):
+ """Wrap a coroutine in a future.
+
+ If the argument is a Future, it is returned directly.
+ """
+ if isinstance(coro_or_future, futures.Future):
+ if loop is not None and loop is not coro_or_future._loop:
+ raise ValueError('loop argument must agree with Future')
+ return coro_or_future
+ elif iscoroutine(coro_or_future):
+ return Task(coro_or_future, loop=loop)
+ else:
+ raise TypeError('A Future or coroutine is required')
+
+
+class _GatheringFuture(futures.Future):
+ """Helper for gather().
+
+ This overrides cancel() to cancel all the children and act more
+ like Task.cancel(), which doesn't immediately mark itself as
+ cancelled.
+ """
+
+ def __init__(self, children, *, loop=None):
+ super().__init__(loop=loop)
+ self._children = children
+
+ def cancel(self):
+ if self.done():
+ return False
+ for child in self._children:
+ child.cancel()
+ return True
+
+
+def gather(*coros_or_futures, loop=None, return_exceptions=False):
+ """Return a future aggregating results from the given coroutines
+ or futures.
+
+ All futures must share the same event loop. If all the tasks are
+ done successfully, the returned future's result is the list of
+ results (in the order of the original sequence, not necessarily
+ the order of results arrival). If *result_exception* is True,
+ exceptions in the tasks are treated the same as successful
+ results, and gathered in the result list; otherwise, the first
+ raised exception will be immediately propagated to the returned
+ future.
+
+ Cancellation: if the outer Future is cancelled, all children (that
+ have not completed yet) are also cancelled. If any child is
+ cancelled, this is treated as if it raised CancelledError --
+ the outer Future is *not* cancelled in this case. (This is to
+ prevent the cancellation of one child to cause other children to
+ be cancelled.)
+ """
+ children = [async(fut, loop=loop) for fut in coros_or_futures]
+ n = len(children)
+ if n == 0:
+ outer = futures.Future(loop=loop)
+ outer.set_result([])
+ return outer
+ if loop is None:
+ loop = children[0]._loop
+ for fut in children:
+ if fut._loop is not loop:
+ raise ValueError("futures are tied to different event loops")
+ outer = _GatheringFuture(children, loop=loop)
+ nfinished = 0
+ results = [None] * n
+
+ def _done_callback(i, fut):
+ nonlocal nfinished
+ if outer._state != futures._PENDING:
+ if fut._exception is not None:
+ # Mark exception retrieved.
+ fut.exception()
+ return
+ if fut._state == futures._CANCELLED:
+ res = futures.CancelledError()
+ if not return_exceptions:
+ outer.set_exception(res)
+ return
+ elif fut._exception is not None:
+ res = fut.exception() # Mark exception retrieved.
+ if not return_exceptions:
+ outer.set_exception(res)
+ return
+ else:
+ res = fut._result
+ results[i] = res
+ nfinished += 1
+ if nfinished == n:
+ outer.set_result(results)
+
+ for i, fut in enumerate(children):
+ fut.add_done_callback(functools.partial(_done_callback, i))
+ return outer
+
+
+def shield(arg, *, loop=None):
+ """Wait for a future, shielding it from cancellation.
+
+ The statement
+
+ res = yield from shield(something())
+
+ is exactly equivalent to the statement
+
+ res = yield from something()
+
+ *except* that if the coroutine containing it is cancelled, the
+ task running in something() is not cancelled. From the POV of
+ something(), the cancellation did not happen. But its caller is
+ still cancelled, so the yield-from expression still raises
+ CancelledError. Note: If something() is cancelled by other means
+ this will still cancel shield().
+
+ If you want to completely ignore cancellation (not recommended)
+ you can combine shield() with a try/except clause, as follows:
+
+ try:
+ res = yield from shield(something())
+ except CancelledError:
+ res = None
+ """
+ inner = async(arg, loop=loop)
+ if inner.done():
+ # Shortcut.
+ return inner
+ loop = inner._loop
+ outer = futures.Future(loop=loop)
+
+ def _done_callback(inner):
+ if outer.cancelled():
+ # Mark inner's result as retrieved.
+ inner.cancelled() or inner.exception()
+ return
+ if inner.cancelled():
+ outer.cancel()
+ else:
+ exc = inner.exception()
+ if exc is not None:
+ outer.set_exception(exc)
+ else:
+ outer.set_result(inner.result())
+
+ inner.add_done_callback(_done_callback)
+ return outer
diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py
new file mode 100644
index 0000000..91bbedb
--- /dev/null
+++ b/Lib/asyncio/test_utils.py
@@ -0,0 +1,246 @@
+"""Utilities shared by tests."""
+
+import collections
+import contextlib
+import io
+import unittest.mock
+import os
+import sys
+import threading
+import unittest
+import unittest.mock
+from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer
+try:
+ import ssl
+except ImportError: # pragma: no cover
+ ssl = None
+
+from . import tasks
+from . import base_events
+from . import events
+from . import selectors
+
+
+if sys.platform == 'win32': # pragma: no cover
+ from .windows_utils import socketpair
+else:
+ from socket import socketpair # pragma: no cover
+
+
+def dummy_ssl_context():
+ if ssl is None:
+ return None
+ else:
+ return ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+
+
+def run_briefly(loop):
+ @tasks.coroutine
+ def once():
+ pass
+ gen = once()
+ t = tasks.Task(gen, loop=loop)
+ try:
+ loop.run_until_complete(t)
+ finally:
+ gen.close()
+
+
+def run_once(loop):
+ """loop.stop() schedules _raise_stop_error()
+ and run_forever() runs until _raise_stop_error() callback.
+ this wont work if test waits for some IO events, because
+ _raise_stop_error() runs before any of io events callbacks.
+ """
+ loop.stop()
+ loop.run_forever()
+
+
+@contextlib.contextmanager
+def run_test_server(*, host='127.0.0.1', port=0, use_ssl=False):
+
+ class SilentWSGIRequestHandler(WSGIRequestHandler):
+ def get_stderr(self):
+ return io.StringIO()
+
+ def log_message(self, format, *args):
+ pass
+
+ class SilentWSGIServer(WSGIServer):
+ def handle_error(self, request, client_address):
+ pass
+
+ class SSLWSGIServer(SilentWSGIServer):
+ def finish_request(self, request, client_address):
+ # The relative location of our test directory (which
+ # contains the sample key and certificate files) differs
+ # between the stdlib and stand-alone Tulip/asyncio.
+ # Prefer our own if we can find it.
+ here = os.path.join(os.path.dirname(__file__), '..', 'tests')
+ if not os.path.isdir(here):
+ here = os.path.join(os.path.dirname(os.__file__),
+ 'test', 'test_asyncio')
+ keyfile = os.path.join(here, 'sample.key')
+ certfile = os.path.join(here, 'sample.crt')
+ ssock = ssl.wrap_socket(request,
+ keyfile=keyfile,
+ certfile=certfile,
+ server_side=True)
+ try:
+ self.RequestHandlerClass(ssock, client_address, self)
+ ssock.close()
+ except OSError:
+ # maybe socket has been closed by peer
+ pass
+
+ def app(environ, start_response):
+ status = '200 OK'
+ headers = [('Content-type', 'text/plain')]
+ start_response(status, headers)
+ return [b'Test message']
+
+ # Run the test WSGI server in a separate thread in order not to
+ # interfere with event handling in the main thread
+ server_class = SSLWSGIServer if use_ssl else SilentWSGIServer
+ httpd = make_server(host, port, app,
+ server_class, SilentWSGIRequestHandler)
+ httpd.address = httpd.server_address
+ server_thread = threading.Thread(target=httpd.serve_forever)
+ server_thread.start()
+ try:
+ yield httpd
+ finally:
+ httpd.shutdown()
+ server_thread.join()
+
+
+def make_test_protocol(base):
+ dct = {}
+ for name in dir(base):
+ if name.startswith('__') and name.endswith('__'):
+ # skip magic names
+ continue
+ dct[name] = unittest.mock.Mock(return_value=None)
+ return type('TestProtocol', (base,) + base.__bases__, dct)()
+
+
+class TestSelector(selectors.BaseSelector):
+
+ def select(self, timeout):
+ return []
+
+
+class TestLoop(base_events.BaseEventLoop):
+ """Loop for unittests.
+
+ It manages self time directly.
+ If something scheduled to be executed later then
+ on next loop iteration after all ready handlers done
+ generator passed to __init__ is calling.
+
+ Generator should be like this:
+
+ def gen():
+ ...
+ when = yield ...
+ ... = yield time_advance
+
+ Value retuned by yield is absolute time of next scheduled handler.
+ Value passed to yield is time advance to move loop's time forward.
+ """
+
+ def __init__(self, gen=None):
+ super().__init__()
+
+ if gen is None:
+ def gen():
+ yield
+ self._check_on_close = False
+ else:
+ self._check_on_close = True
+
+ self._gen = gen()
+ next(self._gen)
+ self._time = 0
+ self._timers = []
+ self._selector = TestSelector()
+
+ self.readers = {}
+ self.writers = {}
+ self.reset_counters()
+
+ def time(self):
+ return self._time
+
+ def advance_time(self, advance):
+ """Move test time forward."""
+ if advance:
+ self._time += advance
+
+ def close(self):
+ if self._check_on_close:
+ try:
+ self._gen.send(0)
+ except StopIteration:
+ pass
+ else: # pragma: no cover
+ raise AssertionError("Time generator is not finished")
+
+ def add_reader(self, fd, callback, *args):
+ self.readers[fd] = events.make_handle(callback, args)
+
+ def remove_reader(self, fd):
+ self.remove_reader_count[fd] += 1
+ if fd in self.readers:
+ del self.readers[fd]
+ return True
+ else:
+ return False
+
+ def assert_reader(self, fd, callback, *args):
+ assert fd in self.readers, 'fd {} is not registered'.format(fd)
+ handle = self.readers[fd]
+ assert handle._callback == callback, '{!r} != {!r}'.format(
+ handle._callback, callback)
+ assert handle._args == args, '{!r} != {!r}'.format(
+ handle._args, args)
+
+ def add_writer(self, fd, callback, *args):
+ self.writers[fd] = events.make_handle(callback, args)
+
+ def remove_writer(self, fd):
+ self.remove_writer_count[fd] += 1
+ if fd in self.writers:
+ del self.writers[fd]
+ return True
+ else:
+ return False
+
+ def assert_writer(self, fd, callback, *args):
+ assert fd in self.writers, 'fd {} is not registered'.format(fd)
+ handle = self.writers[fd]
+ assert handle._callback == callback, '{!r} != {!r}'.format(
+ handle._callback, callback)
+ assert handle._args == args, '{!r} != {!r}'.format(
+ handle._args, args)
+
+ def reset_counters(self):
+ self.remove_reader_count = collections.defaultdict(int)
+ self.remove_writer_count = collections.defaultdict(int)
+
+ def _run_once(self):
+ super()._run_once()
+ for when in self._timers:
+ advance = self._gen.send(when)
+ self.advance_time(advance)
+ self._timers = []
+
+ def call_at(self, when, callback, *args):
+ self._timers.append(when)
+ return super().call_at(when, callback, *args)
+
+ def _process_events(self, event_list):
+ return
+
+ def _write_to_self(self):
+ pass
diff --git a/Lib/asyncio/transports.py b/Lib/asyncio/transports.py
new file mode 100644
index 0000000..bf3adee
--- /dev/null
+++ b/Lib/asyncio/transports.py
@@ -0,0 +1,186 @@
+"""Abstract Transport class."""
+
+__all__ = ['ReadTransport', 'WriteTransport', 'Transport']
+
+
+class BaseTransport:
+ """Base ABC for transports."""
+
+ def __init__(self, extra=None):
+ if extra is None:
+ extra = {}
+ self._extra = extra
+
+ def get_extra_info(self, name, default=None):
+ """Get optional transport information."""
+ return self._extra.get(name, default)
+
+ def close(self):
+ """Closes the transport.
+
+ Buffered data will be flushed asynchronously. No more data
+ will be received. After all buffered data is flushed, the
+ protocol's connection_lost() method will (eventually) called
+ with None as its argument.
+ """
+ raise NotImplementedError
+
+
+class ReadTransport(BaseTransport):
+ """ABC for read-only transports."""
+
+ def pause(self):
+ """Pause the receiving end.
+
+ No data will be passed to the protocol's data_received()
+ method until resume() is called.
+ """
+ raise NotImplementedError
+
+ def resume(self):
+ """Resume the receiving end.
+
+ Data received will once again be passed to the protocol's
+ data_received() method.
+ """
+ raise NotImplementedError
+
+
+class WriteTransport(BaseTransport):
+ """ABC for write-only transports."""
+
+ def write(self, data):
+ """Write some data bytes to the transport.
+
+ This does not block; it buffers the data and arranges for it
+ to be sent out asynchronously.
+ """
+ raise NotImplementedError
+
+ def writelines(self, list_of_data):
+ """Write a list (or any iterable) of data bytes to the transport.
+
+ The default implementation just calls write() for each item in
+ the list/iterable.
+ """
+ for data in list_of_data:
+ self.write(data)
+
+ def write_eof(self):
+ """Closes the write end after flushing buffered data.
+
+ (This is like typing ^D into a UNIX program reading from stdin.)
+
+ Data may still be received.
+ """
+ raise NotImplementedError
+
+ def can_write_eof(self):
+ """Return True if this protocol supports write_eof(), False if not."""
+ raise NotImplementedError
+
+ def abort(self):
+ """Closes the transport immediately.
+
+ Buffered data will be lost. No more data will be received.
+ The protocol's connection_lost() method will (eventually) be
+ called with None as its argument.
+ """
+ raise NotImplementedError
+
+
+class Transport(ReadTransport, WriteTransport):
+ """ABC representing a bidirectional transport.
+
+ There may be several implementations, but typically, the user does
+ not implement new transports; rather, the platform provides some
+ useful transports that are implemented using the platform's best
+ practices.
+
+ The user never instantiates a transport directly; they call a
+ utility function, passing it a protocol factory and other
+ information necessary to create the transport and protocol. (E.g.
+ EventLoop.create_connection() or EventLoop.create_server().)
+
+ The utility function will asynchronously create a transport and a
+ protocol and hook them up by calling the protocol's
+ connection_made() method, passing it the transport.
+
+ The implementation here raises NotImplemented for every method
+ except writelines(), which calls write() in a loop.
+ """
+
+
+class DatagramTransport(BaseTransport):
+ """ABC for datagram (UDP) transports."""
+
+ def sendto(self, data, addr=None):
+ """Send data to the transport.
+
+ This does not block; it buffers the data and arranges for it
+ to be sent out asynchronously.
+ addr is target socket address.
+ If addr is None use target address pointed on transport creation.
+ """
+ raise NotImplementedError
+
+ def abort(self):
+ """Closes the transport immediately.
+
+ Buffered data will be lost. No more data will be received.
+ The protocol's connection_lost() method will (eventually) be
+ called with None as its argument.
+ """
+ raise NotImplementedError
+
+
+class SubprocessTransport(BaseTransport):
+
+ def get_pid(self):
+ """Get subprocess id."""
+ raise NotImplementedError
+
+ def get_returncode(self):
+ """Get subprocess returncode.
+
+ See also
+ http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
+ """
+ raise NotImplementedError
+
+ def get_pipe_transport(self, fd):
+ """Get transport for pipe with number fd."""
+ raise NotImplementedError
+
+ def send_signal(self, signal):
+ """Send signal to subprocess.
+
+ See also:
+ docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
+ """
+ raise NotImplementedError
+
+ def terminate(self):
+ """Stop the subprocess.
+
+ Alias for close() method.
+
+ On Posix OSs the method sends SIGTERM to the subprocess.
+ On Windows the Win32 API function TerminateProcess()
+ is called to stop the subprocess.
+
+ See also:
+ http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
+ """
+ raise NotImplementedError
+
+ def kill(self):
+ """Kill the subprocess.
+
+ On Posix OSs the function sends SIGKILL to the subprocess.
+ On Windows kill() is an alias for terminate().
+
+ See also:
+ http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
+ """
+ raise NotImplementedError
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
new file mode 100644
index 0000000..a3a8e11
--- /dev/null
+++ b/Lib/asyncio/unix_events.py
@@ -0,0 +1,541 @@
+"""Selector eventloop for Unix with signal handling."""
+
+import collections
+import errno
+import fcntl
+import functools
+import os
+import signal
+import socket
+import stat
+import subprocess
+import sys
+
+
+from . import constants
+from . import events
+from . import protocols
+from . import selector_events
+from . import tasks
+from . import transports
+from .log import asyncio_log
+
+
+__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR']
+
+STDIN = 0
+STDOUT = 1
+STDERR = 2
+
+
+if sys.platform == 'win32': # pragma: no cover
+ raise ImportError('Signals are not really supported on Windows')
+
+
+class SelectorEventLoop(selector_events.BaseSelectorEventLoop):
+ """Unix event loop
+
+ Adds signal handling to SelectorEventLoop
+ """
+
+ def __init__(self, selector=None):
+ super().__init__(selector)
+ self._signal_handlers = {}
+ self._subprocesses = {}
+
+ def _socketpair(self):
+ return socket.socketpair()
+
+ def close(self):
+ handler = self._signal_handlers.get(signal.SIGCHLD)
+ if handler is not None:
+ self.remove_signal_handler(signal.SIGCHLD)
+ super().close()
+
+ def add_signal_handler(self, sig, callback, *args):
+ """Add a handler for a signal. UNIX only.
+
+ Raise ValueError if the signal number is invalid or uncatchable.
+ Raise RuntimeError if there is a problem setting up the handler.
+ """
+ self._check_signal(sig)
+ try:
+ # set_wakeup_fd() raises ValueError if this is not the
+ # main thread. By calling it early we ensure that an
+ # event loop running in another thread cannot add a signal
+ # handler.
+ signal.set_wakeup_fd(self._csock.fileno())
+ except ValueError as exc:
+ raise RuntimeError(str(exc))
+
+ handle = events.make_handle(callback, args)
+ self._signal_handlers[sig] = handle
+
+ try:
+ signal.signal(sig, self._handle_signal)
+ except OSError as exc:
+ del self._signal_handlers[sig]
+ if not self._signal_handlers:
+ try:
+ signal.set_wakeup_fd(-1)
+ except ValueError as nexc:
+ asyncio_log.info('set_wakeup_fd(-1) failed: %s', nexc)
+
+ if exc.errno == errno.EINVAL:
+ raise RuntimeError('sig {} cannot be caught'.format(sig))
+ else:
+ raise
+
+ def _handle_signal(self, sig, arg):
+ """Internal helper that is the actual signal handler."""
+ handle = self._signal_handlers.get(sig)
+ if handle is None:
+ return # Assume it's some race condition.
+ if handle._cancelled:
+ self.remove_signal_handler(sig) # Remove it properly.
+ else:
+ self._add_callback_signalsafe(handle)
+
+ def remove_signal_handler(self, sig):
+ """Remove a handler for a signal. UNIX only.
+
+ Return True if a signal handler was removed, False if not.
+ """
+ self._check_signal(sig)
+ try:
+ del self._signal_handlers[sig]
+ except KeyError:
+ return False
+
+ if sig == signal.SIGINT:
+ handler = signal.default_int_handler
+ else:
+ handler = signal.SIG_DFL
+
+ try:
+ signal.signal(sig, handler)
+ except OSError as exc:
+ if exc.errno == errno.EINVAL:
+ raise RuntimeError('sig {} cannot be caught'.format(sig))
+ else:
+ raise
+
+ if not self._signal_handlers:
+ try:
+ signal.set_wakeup_fd(-1)
+ except ValueError as exc:
+ asyncio_log.info('set_wakeup_fd(-1) failed: %s', exc)
+
+ return True
+
+ def _check_signal(self, sig):
+ """Internal helper to validate a signal.
+
+ Raise ValueError if the signal number is invalid or uncatchable.
+ Raise RuntimeError if there is a problem setting up the handler.
+ """
+ if not isinstance(sig, int):
+ raise TypeError('sig must be an int, not {!r}'.format(sig))
+
+ if not (1 <= sig < signal.NSIG):
+ raise ValueError(
+ 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
+
+ def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
+ extra=None):
+ return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
+
+ def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
+ extra=None):
+ return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
+
+ @tasks.coroutine
+ def _make_subprocess_transport(self, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ extra=None, **kwargs):
+ self._reg_sigchld()
+ transp = _UnixSubprocessTransport(self, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ extra=None, **kwargs)
+ self._subprocesses[transp.get_pid()] = transp
+ yield from transp._post_init()
+ return transp
+
+ def _reg_sigchld(self):
+ if signal.SIGCHLD not in self._signal_handlers:
+ self.add_signal_handler(signal.SIGCHLD, self._sig_chld)
+
+ def _sig_chld(self):
+ try:
+ try:
+ pid, status = os.waitpid(0, os.WNOHANG)
+ except ChildProcessError:
+ return
+ if pid == 0:
+ self.call_soon(self._sig_chld)
+ return
+ elif os.WIFSIGNALED(status):
+ returncode = -os.WTERMSIG(status)
+ elif os.WIFEXITED(status):
+ returncode = os.WEXITSTATUS(status)
+ else:
+ self.call_soon(self._sig_chld)
+ return
+ transp = self._subprocesses.get(pid)
+ if transp is not None:
+ transp._process_exited(returncode)
+ except Exception:
+ asyncio_log.exception('Unknown exception in SIGCHLD handler')
+
+ def _subprocess_closed(self, transport):
+ pid = transport.get_pid()
+ self._subprocesses.pop(pid, None)
+
+
+def _set_nonblocking(fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ flags = flags | os.O_NONBLOCK
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+
+
+class _UnixReadPipeTransport(transports.ReadTransport):
+
+ max_size = 256 * 1024 # max bytes we read in one eventloop iteration
+
+ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
+ super().__init__(extra)
+ self._extra['pipe'] = pipe
+ self._loop = loop
+ self._pipe = pipe
+ self._fileno = pipe.fileno()
+ _set_nonblocking(self._fileno)
+ self._protocol = protocol
+ self._closing = False
+ self._loop.add_reader(self._fileno, self._read_ready)
+ self._loop.call_soon(self._protocol.connection_made, self)
+ if waiter is not None:
+ self._loop.call_soon(waiter.set_result, None)
+
+ def _read_ready(self):
+ try:
+ data = os.read(self._fileno, self.max_size)
+ except (BlockingIOError, InterruptedError):
+ pass
+ except OSError as exc:
+ self._fatal_error(exc)
+ else:
+ if data:
+ self._protocol.data_received(data)
+ else:
+ self._closing = True
+ self._loop.remove_reader(self._fileno)
+ self._loop.call_soon(self._protocol.eof_received)
+ self._loop.call_soon(self._call_connection_lost, None)
+
+ def pause(self):
+ self._loop.remove_reader(self._fileno)
+
+ def resume(self):
+ self._loop.add_reader(self._fileno, self._read_ready)
+
+ def close(self):
+ if not self._closing:
+ self._close(None)
+
+ def _fatal_error(self, exc):
+ # should be called by exception handler only
+ asyncio_log.exception('Fatal error for %s', self)
+ self._close(exc)
+
+ def _close(self, exc):
+ self._closing = True
+ self._loop.remove_reader(self._fileno)
+ self._loop.call_soon(self._call_connection_lost, exc)
+
+ def _call_connection_lost(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ self._pipe.close()
+ self._pipe = None
+ self._protocol = None
+ self._loop = None
+
+
+class _UnixWritePipeTransport(transports.WriteTransport):
+
+ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
+ super().__init__(extra)
+ self._extra['pipe'] = pipe
+ self._loop = loop
+ self._pipe = pipe
+ self._fileno = pipe.fileno()
+ if not stat.S_ISFIFO(os.fstat(self._fileno).st_mode):
+ raise ValueError("Pipe transport is for pipes only.")
+ _set_nonblocking(self._fileno)
+ self._protocol = protocol
+ self._buffer = []
+ self._conn_lost = 0
+ self._closing = False # Set when close() or write_eof() called.
+ self._loop.add_reader(self._fileno, self._read_ready)
+
+ self._loop.call_soon(self._protocol.connection_made, self)
+ if waiter is not None:
+ self._loop.call_soon(waiter.set_result, None)
+
+ def _read_ready(self):
+ # pipe was closed by peer
+ self._close()
+
+ def write(self, data):
+ assert isinstance(data, bytes), repr(data)
+ if not data:
+ return
+
+ if self._conn_lost or self._closing:
+ if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
+ asyncio_log.warning('pipe closed by peer or '
+ 'os.write(pipe, data) raised exception.')
+ self._conn_lost += 1
+ return
+
+ if not self._buffer:
+ # Attempt to send it right away first.
+ try:
+ n = os.write(self._fileno, data)
+ except (BlockingIOError, InterruptedError):
+ n = 0
+ except Exception as exc:
+ self._conn_lost += 1
+ self._fatal_error(exc)
+ return
+ if n == len(data):
+ return
+ elif n > 0:
+ data = data[n:]
+ self._loop.add_writer(self._fileno, self._write_ready)
+
+ self._buffer.append(data)
+
+ def _write_ready(self):
+ data = b''.join(self._buffer)
+ assert data, 'Data should not be empty'
+
+ self._buffer.clear()
+ try:
+ n = os.write(self._fileno, data)
+ except (BlockingIOError, InterruptedError):
+ self._buffer.append(data)
+ except Exception as exc:
+ self._conn_lost += 1
+ # Remove writer here, _fatal_error() doesn't it
+ # because _buffer is empty.
+ self._loop.remove_writer(self._fileno)
+ self._fatal_error(exc)
+ else:
+ if n == len(data):
+ self._loop.remove_writer(self._fileno)
+ if self._closing:
+ self._loop.remove_reader(self._fileno)
+ self._call_connection_lost(None)
+ return
+ elif n > 0:
+ data = data[n:]
+
+ self._buffer.append(data) # Try again later.
+
+ def can_write_eof(self):
+ return True
+
+ # TODO: Make the relationships between write_eof(), close(),
+ # abort(), _fatal_error() and _close() more straightforward.
+
+ def write_eof(self):
+ if self._closing:
+ return
+ assert self._pipe
+ self._closing = True
+ if not self._buffer:
+ self._loop.remove_reader(self._fileno)
+ self._loop.call_soon(self._call_connection_lost, None)
+
+ def close(self):
+ if not self._closing:
+ # write_eof is all what we needed to close the write pipe
+ self.write_eof()
+
+ def abort(self):
+ self._close(None)
+
+ def _fatal_error(self, exc):
+ # should be called by exception handler only
+ asyncio_log.exception('Fatal error for %s', self)
+ self._close(exc)
+
+ def _close(self, exc=None):
+ self._closing = True
+ if self._buffer:
+ self._loop.remove_writer(self._fileno)
+ self._buffer.clear()
+ self._loop.remove_reader(self._fileno)
+ self._loop.call_soon(self._call_connection_lost, exc)
+
+ def _call_connection_lost(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ self._pipe.close()
+ self._pipe = None
+ self._protocol = None
+ self._loop = None
+
+
+class _UnixWriteSubprocessPipeProto(protocols.BaseProtocol):
+ pipe = None
+
+ def __init__(self, proc, fd):
+ self.proc = proc
+ self.fd = fd
+ self.connected = False
+ self.disconnected = False
+ proc._pipes[fd] = self
+
+ def connection_made(self, transport):
+ self.connected = True
+ self.pipe = transport
+ self.proc._try_connected()
+
+ def connection_lost(self, exc):
+ self.disconnected = True
+ self.proc._pipe_connection_lost(self.fd, exc)
+
+
+class _UnixReadSubprocessPipeProto(_UnixWriteSubprocessPipeProto,
+ protocols.Protocol):
+
+ def data_received(self, data):
+ self.proc._pipe_data_received(self.fd, data)
+
+ def eof_received(self):
+ pass
+
+
+class _UnixSubprocessTransport(transports.SubprocessTransport):
+
+ def __init__(self, loop, protocol, args, shell,
+ stdin, stdout, stderr, bufsize,
+ extra=None, **kwargs):
+ super().__init__(extra)
+ self._protocol = protocol
+ self._loop = loop
+
+ self._pipes = {}
+ if stdin == subprocess.PIPE:
+ self._pipes[STDIN] = None
+ if stdout == subprocess.PIPE:
+ self._pipes[STDOUT] = None
+ if stderr == subprocess.PIPE:
+ self._pipes[STDERR] = None
+ self._pending_calls = collections.deque()
+ self._finished = False
+ self._returncode = None
+
+ self._proc = subprocess.Popen(
+ args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
+ universal_newlines=False, bufsize=bufsize, **kwargs)
+ self._extra['subprocess'] = self._proc
+
+ def close(self):
+ for proto in self._pipes.values():
+ proto.pipe.close()
+ if self._returncode is None:
+ self.terminate()
+
+ def get_pid(self):
+ return self._proc.pid
+
+ def get_returncode(self):
+ return self._returncode
+
+ def get_pipe_transport(self, fd):
+ if fd in self._pipes:
+ return self._pipes[fd].pipe
+ else:
+ return None
+
+ def send_signal(self, signal):
+ self._proc.send_signal(signal)
+
+ def terminate(self):
+ self._proc.terminate()
+
+ def kill(self):
+ self._proc.kill()
+
+ @tasks.coroutine
+ def _post_init(self):
+ proc = self._proc
+ loop = self._loop
+ if proc.stdin is not None:
+ transp, proto = yield from loop.connect_write_pipe(
+ functools.partial(
+ _UnixWriteSubprocessPipeProto, self, STDIN),
+ proc.stdin)
+ if proc.stdout is not None:
+ transp, proto = yield from loop.connect_read_pipe(
+ functools.partial(
+ _UnixReadSubprocessPipeProto, self, STDOUT),
+ proc.stdout)
+ if proc.stderr is not None:
+ transp, proto = yield from loop.connect_read_pipe(
+ functools.partial(
+ _UnixReadSubprocessPipeProto, self, STDERR),
+ proc.stderr)
+ if not self._pipes:
+ self._try_connected()
+
+ def _call(self, cb, *data):
+ if self._pending_calls is not None:
+ self._pending_calls.append((cb, data))
+ else:
+ self._loop.call_soon(cb, *data)
+
+ def _try_connected(self):
+ assert self._pending_calls is not None
+ if all(p is not None and p.connected for p in self._pipes.values()):
+ self._loop.call_soon(self._protocol.connection_made, self)
+ for callback, data in self._pending_calls:
+ self._loop.call_soon(callback, *data)
+ self._pending_calls = None
+
+ def _pipe_connection_lost(self, fd, exc):
+ self._call(self._protocol.pipe_connection_lost, fd, exc)
+ self._try_finish()
+
+ def _pipe_data_received(self, fd, data):
+ self._call(self._protocol.pipe_data_received, fd, data)
+
+ def _process_exited(self, returncode):
+ assert returncode is not None, returncode
+ assert self._returncode is None, self._returncode
+ self._returncode = returncode
+ self._loop._subprocess_closed(self)
+ self._call(self._protocol.process_exited)
+ self._try_finish()
+
+ def _try_finish(self):
+ assert not self._finished
+ if self._returncode is None:
+ return
+ if all(p is not None and p.disconnected
+ for p in self._pipes.values()):
+ self._finished = True
+ self._loop.call_soon(self._call_connection_lost, None)
+
+ def _call_connection_lost(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ self._proc = None
+ self._protocol = None
+ self._loop = None
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
new file mode 100644
index 0000000..1d0ad26
--- /dev/null
+++ b/Lib/asyncio/windows_events.py
@@ -0,0 +1,375 @@
+"""Selector and proactor eventloops for Windows."""
+
+import errno
+import socket
+import weakref
+import struct
+import _winapi
+
+from . import futures
+from . import proactor_events
+from . import selector_events
+from . import tasks
+from . import windows_utils
+from .log import asyncio_log
+
+try:
+ import _overlapped
+except ImportError:
+ from . import _overlapped
+
+
+__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor']
+
+
+NULL = 0
+INFINITE = 0xffffffff
+ERROR_CONNECTION_REFUSED = 1225
+ERROR_CONNECTION_ABORTED = 1236
+
+
+class _OverlappedFuture(futures.Future):
+ """Subclass of Future which represents an overlapped operation.
+
+ Cancelling it will immediately cancel the overlapped operation.
+ """
+
+ def __init__(self, ov, *, loop=None):
+ super().__init__(loop=loop)
+ self.ov = ov
+
+ def cancel(self):
+ try:
+ self.ov.cancel()
+ except OSError:
+ pass
+ return super().cancel()
+
+
+class PipeServer(object):
+ """Class representing a pipe server.
+
+ This is much like a bound, listening socket.
+ """
+ def __init__(self, address):
+ self._address = address
+ self._free_instances = weakref.WeakSet()
+ self._pipe = self._server_pipe_handle(True)
+
+ def _get_unconnected_pipe(self):
+ # Create new instance and return previous one. This ensures
+ # that (until the server is closed) there is always at least
+ # one pipe handle for address. Therefore if a client attempt
+ # to connect it will not fail with FileNotFoundError.
+ tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
+ return tmp
+
+ def _server_pipe_handle(self, first):
+ # Return a wrapper for a new pipe handle.
+ if self._address is None:
+ return None
+ flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
+ if first:
+ flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
+ h = _winapi.CreateNamedPipe(
+ self._address, flags,
+ _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
+ _winapi.PIPE_WAIT,
+ _winapi.PIPE_UNLIMITED_INSTANCES,
+ windows_utils.BUFSIZE, windows_utils.BUFSIZE,
+ _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
+ pipe = windows_utils.PipeHandle(h)
+ self._free_instances.add(pipe)
+ return pipe
+
+ def close(self):
+ # Close all instances which have not been connected to by a client.
+ if self._address is not None:
+ for pipe in self._free_instances:
+ pipe.close()
+ self._pipe = None
+ self._address = None
+ self._free_instances.clear()
+
+ __del__ = close
+
+
+class SelectorEventLoop(selector_events.BaseSelectorEventLoop):
+ """Windows version of selector event loop."""
+
+ def _socketpair(self):
+ return windows_utils.socketpair()
+
+
+class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
+ """Windows version of proactor event loop using IOCP."""
+
+ def __init__(self, proactor=None):
+ if proactor is None:
+ proactor = IocpProactor()
+ super().__init__(proactor)
+
+ def _socketpair(self):
+ return windows_utils.socketpair()
+
+ @tasks.coroutine
+ def create_pipe_connection(self, protocol_factory, address):
+ f = self._proactor.connect_pipe(address)
+ pipe = yield from f
+ protocol = protocol_factory()
+ trans = self._make_duplex_pipe_transport(pipe, protocol,
+ extra={'addr': address})
+ return trans, protocol
+
+ @tasks.coroutine
+ def start_serving_pipe(self, protocol_factory, address):
+ server = PipeServer(address)
+ def loop(f=None):
+ pipe = None
+ try:
+ if f:
+ pipe = f.result()
+ server._free_instances.discard(pipe)
+ protocol = protocol_factory()
+ self._make_duplex_pipe_transport(
+ pipe, protocol, extra={'addr': address})
+ pipe = server._get_unconnected_pipe()
+ if pipe is None:
+ return
+ f = self._proactor.accept_pipe(pipe)
+ except OSError:
+ if pipe and pipe.fileno() != -1:
+ asyncio_log.exception('Pipe accept failed')
+ pipe.close()
+ except futures.CancelledError:
+ if pipe:
+ pipe.close()
+ else:
+ f.add_done_callback(loop)
+ self.call_soon(loop)
+ return [server]
+
+ def _stop_serving(self, server):
+ server.close()
+
+
+class IocpProactor:
+ """Proactor implementation using IOCP."""
+
+ def __init__(self, concurrency=0xffffffff):
+ self._loop = None
+ self._results = []
+ self._iocp = _overlapped.CreateIoCompletionPort(
+ _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
+ self._cache = {}
+ self._registered = weakref.WeakSet()
+ self._stopped_serving = weakref.WeakSet()
+
+ def set_loop(self, loop):
+ self._loop = loop
+
+ def select(self, timeout=None):
+ if not self._results:
+ self._poll(timeout)
+ tmp = self._results
+ self._results = []
+ return tmp
+
+ def recv(self, conn, nbytes, flags=0):
+ self._register_with_iocp(conn)
+ ov = _overlapped.Overlapped(NULL)
+ if isinstance(conn, socket.socket):
+ ov.WSARecv(conn.fileno(), nbytes, flags)
+ else:
+ ov.ReadFile(conn.fileno(), nbytes)
+ def finish(trans, key, ov):
+ try:
+ return ov.getresult()
+ except OSError as exc:
+ if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
+ raise ConnectionResetError(*exc.args)
+ else:
+ raise
+ return self._register(ov, conn, finish)
+
+ def send(self, conn, buf, flags=0):
+ self._register_with_iocp(conn)
+ ov = _overlapped.Overlapped(NULL)
+ if isinstance(conn, socket.socket):
+ ov.WSASend(conn.fileno(), buf, flags)
+ else:
+ ov.WriteFile(conn.fileno(), buf)
+ def finish(trans, key, ov):
+ try:
+ return ov.getresult()
+ except OSError as exc:
+ if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
+ raise ConnectionResetError(*exc.args)
+ else:
+ raise
+ return self._register(ov, conn, finish)
+
+ def accept(self, listener):
+ self._register_with_iocp(listener)
+ conn = self._get_accept_socket(listener.family)
+ ov = _overlapped.Overlapped(NULL)
+ ov.AcceptEx(listener.fileno(), conn.fileno())
+ def finish_accept(trans, key, ov):
+ ov.getresult()
+ # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
+ buf = struct.pack('@P', listener.fileno())
+ conn.setsockopt(socket.SOL_SOCKET,
+ _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
+ conn.settimeout(listener.gettimeout())
+ return conn, conn.getpeername()
+ return self._register(ov, listener, finish_accept)
+
+ def connect(self, conn, address):
+ self._register_with_iocp(conn)
+ # The socket needs to be locally bound before we call ConnectEx().
+ try:
+ _overlapped.BindLocal(conn.fileno(), conn.family)
+ except OSError as e:
+ if e.winerror != errno.WSAEINVAL:
+ raise
+ # Probably already locally bound; check using getsockname().
+ if conn.getsockname()[1] == 0:
+ raise
+ ov = _overlapped.Overlapped(NULL)
+ ov.ConnectEx(conn.fileno(), address)
+ def finish_connect(trans, key, ov):
+ ov.getresult()
+ # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
+ conn.setsockopt(socket.SOL_SOCKET,
+ _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
+ return conn
+ return self._register(ov, conn, finish_connect)
+
+ def accept_pipe(self, pipe):
+ self._register_with_iocp(pipe)
+ ov = _overlapped.Overlapped(NULL)
+ ov.ConnectNamedPipe(pipe.fileno())
+ def finish(trans, key, ov):
+ ov.getresult()
+ return pipe
+ return self._register(ov, pipe, finish)
+
+ def connect_pipe(self, address):
+ ov = _overlapped.Overlapped(NULL)
+ ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
+ def finish(err, handle, ov):
+ # err, handle were arguments passed to PostQueuedCompletionStatus()
+ # in a function run in a thread pool.
+ if err == _overlapped.ERROR_SEM_TIMEOUT:
+ # Connection did not succeed within time limit.
+ msg = _overlapped.FormatMessage(err)
+ raise ConnectionRefusedError(0, msg, None, err)
+ elif err != 0:
+ msg = _overlapped.FormatMessage(err)
+ raise OSError(0, msg, None, err)
+ else:
+ return windows_utils.PipeHandle(handle)
+ return self._register(ov, None, finish, wait_for_post=True)
+
+ def _register_with_iocp(self, obj):
+ # To get notifications of finished ops on this objects sent to the
+ # completion port, were must register the handle.
+ if obj not in self._registered:
+ self._registered.add(obj)
+ _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
+ # XXX We could also use SetFileCompletionNotificationModes()
+ # to avoid sending notifications to completion port of ops
+ # that succeed immediately.
+
+ def _register(self, ov, obj, callback, wait_for_post=False):
+ # Return a future which will be set with the result of the
+ # operation when it completes. The future's value is actually
+ # the value returned by callback().
+ f = _OverlappedFuture(ov, loop=self._loop)
+ if ov.pending or wait_for_post:
+ # Register the overlapped operation for later. Note that
+ # we only store obj to prevent it from being garbage
+ # collected too early.
+ self._cache[ov.address] = (f, ov, obj, callback)
+ else:
+ # The operation has completed, so no need to postpone the
+ # work. We cannot take this short cut if we need the
+ # NumberOfBytes, CompletionKey values returned by
+ # PostQueuedCompletionStatus().
+ try:
+ value = callback(None, None, ov)
+ except OSError as e:
+ f.set_exception(e)
+ else:
+ f.set_result(value)
+ return f
+
+ def _get_accept_socket(self, family):
+ s = socket.socket(family)
+ s.settimeout(0)
+ return s
+
+ def _poll(self, timeout=None):
+ if timeout is None:
+ ms = INFINITE
+ elif timeout < 0:
+ raise ValueError("negative timeout")
+ else:
+ ms = int(timeout * 1000 + 0.5)
+ if ms >= INFINITE:
+ raise ValueError("timeout too big")
+ while True:
+ status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
+ if status is None:
+ return
+ err, transferred, key, address = status
+ try:
+ f, ov, obj, callback = self._cache.pop(address)
+ except KeyError:
+ # key is either zero, or it is used to return a pipe
+ # handle which should be closed to avoid a leak.
+ if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
+ _winapi.CloseHandle(key)
+ ms = 0
+ continue
+ if obj in self._stopped_serving:
+ f.cancel()
+ elif not f.cancelled():
+ try:
+ value = callback(transferred, key, ov)
+ except OSError as e:
+ f.set_exception(e)
+ self._results.append(f)
+ else:
+ f.set_result(value)
+ self._results.append(f)
+ ms = 0
+
+ def _stop_serving(self, obj):
+ # obj is a socket or pipe handle. It will be closed in
+ # BaseProactorEventLoop._stop_serving() which will make any
+ # pending operations fail quickly.
+ self._stopped_serving.add(obj)
+
+ def close(self):
+ # Cancel remaining registered operations.
+ for address, (f, ov, obj, callback) in list(self._cache.items()):
+ if obj is None:
+ # The operation was started with connect_pipe() which
+ # queues a task to Windows' thread pool. This cannot
+ # be cancelled, so just forget it.
+ del self._cache[address]
+ else:
+ try:
+ ov.cancel()
+ except OSError:
+ pass
+
+ while self._cache:
+ if not self._poll(1):
+ asyncio_log.debug('taking long time to close proactor')
+
+ self._results = []
+ if self._iocp is not None:
+ _winapi.CloseHandle(self._iocp)
+ self._iocp = None
diff --git a/Lib/asyncio/windows_utils.py b/Lib/asyncio/windows_utils.py
new file mode 100644
index 0000000..04b43e9
--- /dev/null
+++ b/Lib/asyncio/windows_utils.py
@@ -0,0 +1,181 @@
+"""
+Various Windows specific bits and pieces
+"""
+
+import sys
+
+if sys.platform != 'win32': # pragma: no cover
+ raise ImportError('win32 only')
+
+import socket
+import itertools
+import msvcrt
+import os
+import subprocess
+import tempfile
+import _winapi
+
+
+__all__ = ['socketpair', 'pipe', 'Popen', 'PIPE', 'PipeHandle']
+
+#
+# Constants/globals
+#
+
+BUFSIZE = 8192
+PIPE = subprocess.PIPE
+_mmap_counter = itertools.count()
+
+#
+# Replacement for socket.socketpair()
+#
+
+def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
+ """A socket pair usable as a self-pipe, for Windows.
+
+ Origin: https://gist.github.com/4325783, by Geert Jansen. Public domain.
+ """
+ # We create a connected TCP socket. Note the trick with setblocking(0)
+ # that prevents us from having to create a thread.
+ lsock = socket.socket(family, type, proto)
+ lsock.bind(('localhost', 0))
+ lsock.listen(1)
+ addr, port = lsock.getsockname()
+ csock = socket.socket(family, type, proto)
+ csock.setblocking(False)
+ try:
+ csock.connect((addr, port))
+ except (BlockingIOError, InterruptedError):
+ pass
+ except Exception:
+ lsock.close()
+ csock.close()
+ raise
+ ssock, _ = lsock.accept()
+ csock.setblocking(True)
+ lsock.close()
+ return (ssock, csock)
+
+#
+# Replacement for os.pipe() using handles instead of fds
+#
+
+def pipe(*, duplex=False, overlapped=(True, True), bufsize=BUFSIZE):
+ """Like os.pipe() but with overlapped support and using handles not fds."""
+ address = tempfile.mktemp(prefix=r'\\.\pipe\python-pipe-%d-%d-' %
+ (os.getpid(), next(_mmap_counter)))
+
+ if duplex:
+ openmode = _winapi.PIPE_ACCESS_DUPLEX
+ access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
+ obsize, ibsize = bufsize, bufsize
+ else:
+ openmode = _winapi.PIPE_ACCESS_INBOUND
+ access = _winapi.GENERIC_WRITE
+ obsize, ibsize = 0, bufsize
+
+ openmode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
+
+ if overlapped[0]:
+ openmode |= _winapi.FILE_FLAG_OVERLAPPED
+
+ if overlapped[1]:
+ flags_and_attribs = _winapi.FILE_FLAG_OVERLAPPED
+ else:
+ flags_and_attribs = 0
+
+ h1 = h2 = None
+ try:
+ h1 = _winapi.CreateNamedPipe(
+ address, openmode, _winapi.PIPE_WAIT,
+ 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
+
+ h2 = _winapi.CreateFile(
+ address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
+ flags_and_attribs, _winapi.NULL)
+
+ ov = _winapi.ConnectNamedPipe(h1, overlapped=True)
+ ov.GetOverlappedResult(True)
+ return h1, h2
+ except:
+ if h1 is not None:
+ _winapi.CloseHandle(h1)
+ if h2 is not None:
+ _winapi.CloseHandle(h2)
+ raise
+
+#
+# Wrapper for a pipe handle
+#
+
+class PipeHandle:
+ """Wrapper for an overlapped pipe handle which is vaguely file-object like.
+
+ The IOCP event loop can use these instead of socket objects.
+ """
+ def __init__(self, handle):
+ self._handle = handle
+
+ @property
+ def handle(self):
+ return self._handle
+
+ def fileno(self):
+ return self._handle
+
+ def close(self, *, CloseHandle=_winapi.CloseHandle):
+ if self._handle != -1:
+ CloseHandle(self._handle)
+ self._handle = -1
+
+ __del__ = close
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, t, v, tb):
+ self.close()
+
+#
+# Replacement for subprocess.Popen using overlapped pipe handles
+#
+
+class Popen(subprocess.Popen):
+ """Replacement for subprocess.Popen using overlapped pipe handles.
+
+ The stdin, stdout, stderr are None or instances of PipeHandle.
+ """
+ def __init__(self, args, stdin=None, stdout=None, stderr=None, **kwds):
+ stdin_rfd = stdout_wfd = stderr_wfd = None
+ stdin_wh = stdout_rh = stderr_rh = None
+ if stdin == PIPE:
+ stdin_rh, stdin_wh = pipe(overlapped=(False, True))
+ stdin_rfd = msvcrt.open_osfhandle(stdin_rh, os.O_RDONLY)
+ if stdout == PIPE:
+ stdout_rh, stdout_wh = pipe(overlapped=(True, False))
+ stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0)
+ if stderr == PIPE:
+ stderr_rh, stderr_wh = pipe(overlapped=(True, False))
+ stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0)
+ try:
+ super().__init__(args, bufsize=0, universal_newlines=False,
+ stdin=stdin_rfd, stdout=stdout_wfd,
+ stderr=stderr_wfd, **kwds)
+ except:
+ for h in (stdin_wh, stdout_rh, stderr_rh):
+ _winapi.CloseHandle(h)
+ raise
+ else:
+ if stdin_wh is not None:
+ self.stdin = PipeHandle(stdin_wh)
+ if stdout_rh is not None:
+ self.stdout = PipeHandle(stdout_rh)
+ if stderr_rh is not None:
+ self.stderr = PipeHandle(stderr_rh)
+ finally:
+ if stdin == PIPE:
+ os.close(stdin_rfd)
+ if stdout == PIPE:
+ os.close(stdout_wfd)
+ if stderr == PIPE:
+ os.close(stderr_wfd)