diff options
Diffstat (limited to 'Lib/asyncio')
| -rw-r--r-- | Lib/asyncio/base_events.py | 195 | ||||
| -rw-r--r-- | Lib/asyncio/base_subprocess.py | 6 | ||||
| -rw-r--r-- | Lib/asyncio/compat.py | 1 | ||||
| -rw-r--r-- | Lib/asyncio/coroutines.py | 3 | ||||
| -rw-r--r-- | Lib/asyncio/events.py | 6 | ||||
| -rw-r--r-- | Lib/asyncio/futures.py | 9 | ||||
| -rw-r--r-- | Lib/asyncio/locks.py | 18 | ||||
| -rw-r--r-- | Lib/asyncio/proactor_events.py | 11 | ||||
| -rw-r--r-- | Lib/asyncio/queues.py | 4 | ||||
| -rw-r--r-- | Lib/asyncio/selector_events.py | 61 | ||||
| -rw-r--r-- | Lib/asyncio/sslproto.py | 5 | ||||
| -rw-r--r-- | Lib/asyncio/streams.py | 95 | ||||
| -rw-r--r-- | Lib/asyncio/subprocess.py | 2 | ||||
| -rw-r--r-- | Lib/asyncio/tasks.py | 67 | ||||
| -rw-r--r-- | Lib/asyncio/unix_events.py | 18 | ||||
| -rw-r--r-- | Lib/asyncio/windows_events.py | 6 |
16 files changed, 276 insertions, 231 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 4505732..0174375 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -16,10 +16,8 @@ to modify the meaning of the API call itself. import collections import concurrent.futures -import functools import heapq import inspect -import ipaddress import itertools import logging import os @@ -54,6 +52,12 @@ _MIN_SCHEDULED_TIMER_HANDLES = 100 # before cleanup of cancelled handles is performed. _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 +# Exceptions which must not call the exception handler in fatal error +# methods (_fatal_error()) +_FATAL_ERROR_IGNORE = (BrokenPipeError, + ConnectionResetError, ConnectionAbortedError) + + def _format_handle(handle): cb = handle._callback if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task): @@ -80,12 +84,14 @@ if hasattr(socket, 'SOCK_CLOEXEC'): _SOCKET_TYPE_MASK |= socket.SOCK_CLOEXEC -@functools.lru_cache(maxsize=1024) def _ipaddr_info(host, port, family, type, proto): - # Try to skip getaddrinfo if "host" is already an IP. Since getaddrinfo - # blocks on an exclusive lock on some platforms, users might handle name - # resolution in their own code and pass in resolved IPs. - if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or host is None: + # Try to skip getaddrinfo if "host" is already an IP. Users might have + # handled name resolution in their own code and pass in resolved IPs. + if not hasattr(socket, 'inet_pton'): + return + + if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ + host is None: return None type &= ~_SOCKET_TYPE_MASK @@ -96,59 +102,63 @@ def _ipaddr_info(host, port, family, type, proto): else: return None - if hasattr(socket, 'inet_pton'): - if family == socket.AF_UNSPEC: - afs = [socket.AF_INET, socket.AF_INET6] + if port is None: + port = 0 + elif isinstance(port, bytes): + if port == b'': + port = 0 else: - afs = [family] - - for af in afs: - # Linux's inet_pton doesn't accept an IPv6 zone index after host, - # like '::1%lo0', so strip it. If we happen to make an invalid - # address look valid, we fail later in sock.connect or sock.bind. try: - if af == socket.AF_INET6: - socket.inet_pton(af, host.partition('%')[0]) - else: - socket.inet_pton(af, host) - return af, type, proto, '', (host, port) - except OSError: - pass - - # "host" is not an IP address. - return None + port = int(port) + except ValueError: + # Might be a service name like b"http". + port = socket.getservbyname(port.decode('ascii')) + elif isinstance(port, str): + if port == '': + port = 0 + else: + try: + port = int(port) + except ValueError: + # Might be a service name like "http". + port = socket.getservbyname(port) - # No inet_pton. (On Windows it's only available since Python 3.4.) - # Even though getaddrinfo with AI_NUMERICHOST would be non-blocking, it - # still requires a lock on some platforms, and waiting for that lock could - # block the event loop. Use ipaddress instead, it's just text parsing. - try: - addr = ipaddress.IPv4Address(host) - except ValueError: - try: - addr = ipaddress.IPv6Address(host.partition('%')[0]) - except ValueError: - return None + if family == socket.AF_UNSPEC: + afs = [socket.AF_INET, socket.AF_INET6] + else: + afs = [family] - af = socket.AF_INET if addr.version == 4 else socket.AF_INET6 - if family not in (socket.AF_UNSPEC, af): - # "host" is wrong IP version for "family". + if isinstance(host, bytes): + host = host.decode('idna') + if '%' in host: + # Linux's inet_pton doesn't accept an IPv6 zone index after host, + # like '::1%lo0'. return None - return af, type, proto, '', (host, port) + for af in afs: + try: + socket.inet_pton(af, host) + # The host has already been resolved. + return af, type, proto, '', (host, port) + except OSError: + pass + # "host" is not an IP address. + return None -def _check_resolved_address(sock, address): - # Ensure that the address is already resolved to avoid the trap of hanging - # the entire event loop when the address requires doing a DNS lookup. - - if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX: - return +def _ensure_resolved(address, *, family=0, type=socket.SOCK_STREAM, proto=0, + flags=0, loop): host, port = address[:2] - if _ipaddr_info(host, port, sock.family, sock.type, sock.proto) is None: - raise ValueError("address must be resolved (IP address)," - " got host %r" % host) + info = _ipaddr_info(host, port, family, type, proto) + if info is not None: + # "host" is already a resolved IP. + fut = loop.create_future() + fut.set_result([info]) + return fut + else: + return loop.getaddrinfo(host, port, family=family, type=type, + proto=proto, flags=flags) def _run_until_complete_cb(fut): @@ -203,7 +213,7 @@ class Server(events.AbstractServer): def wait_closed(self): if self.sockets is None or self._waiters is None: return - waiter = futures.Future(loop=self._loop) + waiter = self._loop.create_future() self._waiters.append(waiter) yield from waiter @@ -237,6 +247,10 @@ class BaseEventLoop(events.AbstractEventLoop): % (self.__class__.__name__, self.is_running(), self.is_closed(), self.get_debug())) + def create_future(self): + """Create a Future object attached to the loop.""" + return futures.Future(loop=self) + def create_task(self, coro): """Schedule a coroutine object. @@ -530,7 +544,7 @@ class BaseEventLoop(events.AbstractEventLoop): assert not args assert not isinstance(func, events.TimerHandle) if func._cancelled: - f = futures.Future(loop=self) + f = self.create_future() f.set_result(None) return f func, args = func._callback, func._args @@ -571,12 +585,7 @@ class BaseEventLoop(events.AbstractEventLoop): def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0): - info = _ipaddr_info(host, port, family, type, proto) - if info is not None: - fut = futures.Future(loop=self) - fut.set_result([info]) - return fut - elif self._debug: + if self._debug: return self.run_in_executor(None, self._getaddrinfo_debug, host, port, family, type, proto, flags) else: @@ -625,14 +634,14 @@ class BaseEventLoop(events.AbstractEventLoop): raise ValueError( 'host/port and sock can not be specified at the same time') - f1 = self.getaddrinfo( - host, port, family=family, - type=socket.SOCK_STREAM, proto=proto, flags=flags) + f1 = _ensure_resolved((host, port), family=family, + type=socket.SOCK_STREAM, proto=proto, + flags=flags, loop=self) fs = [f1] if local_addr is not None: - f2 = self.getaddrinfo( - *local_addr, family=family, - type=socket.SOCK_STREAM, proto=proto, flags=flags) + f2 = _ensure_resolved(local_addr, family=family, + type=socket.SOCK_STREAM, proto=proto, + flags=flags, loop=self) fs.append(f2) else: f2 = None @@ -698,8 +707,6 @@ class BaseEventLoop(events.AbstractEventLoop): raise ValueError( 'host and port was not specified and no sock specified') - sock.setblocking(False) - transport, protocol = yield from self._create_connection_transport( sock, protocol_factory, ssl, server_hostname) if self._debug: @@ -712,14 +719,17 @@ class BaseEventLoop(events.AbstractEventLoop): @coroutine def _create_connection_transport(self, sock, protocol_factory, ssl, - server_hostname): + server_hostname, server_side=False): + + sock.setblocking(False) + protocol = protocol_factory() - waiter = futures.Future(loop=self) + waiter = self.create_future() if ssl: sslcontext = None if isinstance(ssl, bool) else ssl transport = self._make_ssl_transport( sock, protocol, sslcontext, waiter, - server_side=False, server_hostname=server_hostname) + server_side=server_side, server_hostname=server_hostname) else: transport = self._make_socket_transport(sock, protocol, waiter) @@ -767,9 +777,9 @@ class BaseEventLoop(events.AbstractEventLoop): assert isinstance(addr, tuple) and len(addr) == 2, ( '2-tuple is expected') - infos = yield from self.getaddrinfo( - *addr, family=family, type=socket.SOCK_DGRAM, - proto=proto, flags=flags) + infos = yield from _ensure_resolved( + addr, family=family, type=socket.SOCK_DGRAM, + proto=proto, flags=flags, loop=self) if not infos: raise OSError('getaddrinfo() returned empty list') @@ -834,7 +844,7 @@ class BaseEventLoop(events.AbstractEventLoop): raise exceptions[0] protocol = protocol_factory() - waiter = futures.Future(loop=self) + waiter = self.create_future() transport = self._make_datagram_transport( sock, protocol, r_addr, waiter) if self._debug: @@ -857,9 +867,9 @@ class BaseEventLoop(events.AbstractEventLoop): @coroutine def _create_server_getaddrinfo(self, host, port, family, flags): - infos = yield from self.getaddrinfo(host, port, family=family, + infos = yield from _ensure_resolved((host, port), family=family, type=socket.SOCK_STREAM, - flags=flags) + flags=flags, loop=self) if not infos: raise OSError('getaddrinfo({!r}) returned empty list'.format(host)) return infos @@ -880,7 +890,10 @@ class BaseEventLoop(events.AbstractEventLoop): to host and port. The host parameter can also be a sequence of strings and in that case - the TCP server is bound to all hosts of the sequence. + the TCP server is bound to all hosts of the sequence. If a host + appears multiple times (possibly indirectly e.g. when hostnames + resolve to the same IP address), the server is only bound once to that + host. Return a Server object which can be used to stop the service. @@ -909,7 +922,7 @@ class BaseEventLoop(events.AbstractEventLoop): flags=flags) for host in hosts] infos = yield from tasks.gather(*fs, loop=self) - infos = itertools.chain.from_iterable(infos) + infos = set(itertools.chain.from_iterable(infos)) completed = False try: @@ -968,9 +981,28 @@ class BaseEventLoop(events.AbstractEventLoop): return server @coroutine + def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None): + """Handle an accepted connection. + + This is used by servers that accept connections outside of + asyncio but that use asyncio to handle connections. + + This method is a coroutine. When completed, the coroutine + returns a (transport, protocol) pair. + """ + transport, protocol = yield from self._create_connection_transport( + sock, protocol_factory, ssl, '', server_side=True) + if self._debug: + # Get the socket from the transport because SSL transport closes + # the old socket and creates a new SSL socket + sock = transport.get_extra_info('socket') + logger.debug("%r handled: (%r, %r)", sock, transport, protocol) + return transport, protocol + + @coroutine def connect_read_pipe(self, protocol_factory, pipe): protocol = protocol_factory() - waiter = futures.Future(loop=self) + waiter = self.create_future() transport = self._make_read_pipe_transport(pipe, protocol, waiter) try: @@ -987,7 +1019,7 @@ class BaseEventLoop(events.AbstractEventLoop): @coroutine def connect_write_pipe(self, protocol_factory, pipe): protocol = protocol_factory() - waiter = futures.Future(loop=self) + waiter = self.create_future() transport = self._make_write_pipe_transport(pipe, protocol, waiter) try: @@ -1069,6 +1101,11 @@ class BaseEventLoop(events.AbstractEventLoop): logger.info('%s: %r' % (debug_log, transport)) return transport, protocol + def get_exception_handler(self): + """Return an exception handler, or None if the default one is in use. + """ + return self._exception_handler + def set_exception_handler(self, handler): """Set handler as the new event loop exception handler. diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 73425d9..8fc253c 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -210,6 +210,10 @@ class BaseSubprocessTransport(transports.SubprocessTransport): logger.info('%r exited with return code %r', self, returncode) self._returncode = returncode + if self._proc.returncode is None: + # asyncio uses a child watcher: copy the status into the Popen + # object. On Python 3.6, it is required to avoid a ResourceWarning. + self._proc.returncode = returncode self._call(self._protocol.process_exited) self._try_finish() @@ -227,7 +231,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): if self._returncode is not None: return self._returncode - waiter = futures.Future(loop=self._loop) + waiter = self._loop.create_future() self._exit_waiters.append(waiter) return (yield from waiter) diff --git a/Lib/asyncio/compat.py b/Lib/asyncio/compat.py index 660b7e7..4790bb4 100644 --- a/Lib/asyncio/compat.py +++ b/Lib/asyncio/compat.py @@ -4,6 +4,7 @@ import sys PY34 = sys.version_info >= (3, 4) PY35 = sys.version_info >= (3, 5) +PY352 = sys.version_info >= (3, 5, 2) def flatten_list_bytes(list_of_data): diff --git a/Lib/asyncio/coroutines.py b/Lib/asyncio/coroutines.py index 27ab42a..71bc6fb 100644 --- a/Lib/asyncio/coroutines.py +++ b/Lib/asyncio/coroutines.py @@ -204,7 +204,8 @@ def coroutine(func): @functools.wraps(func) def coro(*args, **kw): res = func(*args, **kw) - if isinstance(res, futures.Future) or inspect.isgenerator(res): + if isinstance(res, futures.Future) or inspect.isgenerator(res) or \ + isinstance(res, CoroWrapper): res = yield from res elif _AwaitableABC is not None: # If 'func' returns an Awaitable (new in 3.5) we diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 176a846..c48c5be 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -266,6 +266,9 @@ class AbstractEventLoop: def time(self): raise NotImplementedError + def create_future(self): + raise NotImplementedError + # Method scheduling a coroutine object: create a task. def create_task(self, coro): @@ -484,6 +487,9 @@ class AbstractEventLoop: # Error handlers. + def get_exception_handler(self): + raise NotImplementedError + def set_exception_handler(self, handler): raise NotImplementedError diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index 4dcb654..1feba4d 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -142,7 +142,7 @@ class Future: def __init__(self, *, loop=None): """Initialize the future. - The optional event_loop argument allows to explicitly set the event + The optional event_loop argument allows explicitly setting the event loop object used by the future. If it's not provided, the future uses the default event loop. """ @@ -341,6 +341,9 @@ class Future: raise InvalidStateError('{}: {!r}'.format(self._state, self)) if isinstance(exception, type): exception = exception() + if type(exception) is StopIteration: + raise TypeError("StopIteration interacts badly with generators " + "and cannot be raised into a Future") self._exception = exception self._state = _FINISHED self._schedule_callbacks() @@ -448,6 +451,8 @@ def wrap_future(future, *, loop=None): return future assert isinstance(future, concurrent.futures.Future), \ 'concurrent.futures.Future is expected, got {!r}'.format(future) - new_future = Future(loop=loop) + if loop is None: + loop = events.get_event_loop() + new_future = loop.create_future() _chain_future(future, new_future) return new_future diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py index 34f6bc1..741aaf2 100644 --- a/Lib/asyncio/locks.py +++ b/Lib/asyncio/locks.py @@ -111,7 +111,7 @@ class Lock(_ContextManagerMixin): acquire() is a coroutine and should be called with 'yield from'. Locks also support the context management protocol. '(yield from lock)' - should be used as context manager expression. + should be used as the context manager expression. Usage: @@ -170,7 +170,7 @@ class Lock(_ContextManagerMixin): self._locked = True return True - fut = futures.Future(loop=self._loop) + fut = self._loop.create_future() self._waiters.append(fut) try: yield from fut @@ -258,7 +258,7 @@ class Event: if self._value: return True - fut = futures.Future(loop=self._loop) + fut = self._loop.create_future() self._waiters.append(fut) try: yield from fut @@ -320,7 +320,7 @@ class Condition(_ContextManagerMixin): self.release() try: - fut = futures.Future(loop=self._loop) + fut = self._loop.create_future() self._waiters.append(fut) try: yield from fut @@ -329,7 +329,13 @@ class Condition(_ContextManagerMixin): self._waiters.remove(fut) finally: - yield from self.acquire() + # Must reacquire lock even if wait is cancelled + while True: + try: + yield from self.acquire() + break + except futures.CancelledError: + pass @coroutine def wait_for(self, predicate): @@ -433,7 +439,7 @@ class Semaphore(_ContextManagerMixin): True. """ while self._value <= 0: - fut = futures.Future(loop=self._loop) + fut = self._loop.create_future() self._waiters.append(fut) try: yield from fut diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 14c0659..3ac314c 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -90,7 +90,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, self.close() def _fatal_error(self, exc, message='Fatal error on pipe transport'): - if isinstance(exc, (BrokenPipeError, ConnectionResetError)): + if isinstance(exc, base_events._FATAL_ERROR_IGNORE): if self._loop.get_debug(): logger.debug("%r: %s", self, message, exc_info=True) else: @@ -440,14 +440,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): return self._proactor.send(sock, data) def sock_connect(self, sock, address): - try: - base_events._check_resolved_address(sock, address) - except ValueError as err: - fut = futures.Future(loop=self) - fut.set_exception(err) - return fut - else: - return self._proactor.connect(sock, address) + return self._proactor.connect(sock, address) def sock_accept(self, sock): return self._proactor.accept(sock) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index e3a1d5e..c453f02 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -128,7 +128,7 @@ class Queue: This method is a coroutine. """ while self.full(): - putter = futures.Future(loop=self._loop) + putter = self._loop.create_future() self._putters.append(putter) try: yield from putter @@ -162,7 +162,7 @@ class Queue: This method is a coroutine. """ while self.empty(): - getter = futures.Future(loop=self._loop) + getter = self._loop.create_future() self._getters.append(getter) try: yield from getter diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 5b26631..ed2b4d7 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -196,7 +196,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): transport = None try: protocol = protocol_factory() - waiter = futures.Future(loop=self) + waiter = self.create_future() if sslcontext: transport = self._make_ssl_transport( conn, protocol, sslcontext, waiter=waiter, @@ -314,7 +314,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = futures.Future(loop=self) + fut = self.create_future() self._sock_recv(fut, False, sock, n) return fut @@ -352,7 +352,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = futures.Future(loop=self) + fut = self.create_future() if data: self._sock_sendall(fut, False, sock, data) else: @@ -385,24 +385,29 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def sock_connect(self, sock, address): """Connect to a remote socket at address. - The address must be already resolved to avoid the trap of hanging the - entire event loop when the address requires doing a DNS lookup. For - example, it must be an IP address, not an hostname, for AF_INET and - AF_INET6 address families. Use getaddrinfo() to resolve the hostname - asynchronously. - This method is a coroutine. """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = futures.Future(loop=self) + + fut = self.create_future() + if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX: + self._sock_connect(fut, sock, address) + else: + resolved = base_events._ensure_resolved( + address, family=sock.family, proto=sock.proto, loop=self) + resolved.add_done_callback( + lambda resolved: self._on_resolved(fut, sock, resolved)) + + return fut + + def _on_resolved(self, fut, sock, resolved): try: - base_events._check_resolved_address(sock, address) - except ValueError as err: - fut.set_exception(err) + _, _, _, _, address = resolved.result()[0] + except Exception as exc: + fut.set_exception(exc) else: self._sock_connect(fut, sock, address) - return fut def _sock_connect(self, fut, sock, address): fd = sock.fileno() @@ -453,7 +458,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = futures.Future(loop=self) + fut = self.create_future() self._sock_accept(fut, False, sock) return fut @@ -565,6 +570,7 @@ class _SelectorTransport(transports._FlowControlMixin, self._loop.remove_reader(self._sock_fd) if not self._buffer: self._conn_lost += 1 + self._loop.remove_writer(self._sock_fd) self._loop.call_soon(self._call_connection_lost, None) # On Python 3.3 and older, objects with a destructor part of a reference @@ -578,8 +584,7 @@ class _SelectorTransport(transports._FlowControlMixin, def _fatal_error(self, exc, message='Fatal error on transport'): # Should be called from exception handler only. - if isinstance(exc, (BrokenPipeError, - ConnectionResetError, ConnectionAbortedError)): + if isinstance(exc, base_events._FATAL_ERROR_IGNORE): if self._loop.get_debug(): logger.debug("%r: %s", self, message, exc_info=True) else: @@ -659,6 +664,8 @@ class _SelectorSocketTransport(_SelectorTransport): logger.debug("%r resumes reading", self) def _read_ready(self): + if self._conn_lost: + return try: data = self._sock.recv(self.max_size) except (BlockingIOError, InterruptedError): @@ -682,8 +689,8 @@ class _SelectorSocketTransport(_SelectorTransport): def write(self, data): if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError('data argument must be byte-ish (%r)', - type(data)) + raise TypeError('data argument must be a bytes-like object, ' + 'not %r' % type(data).__name__) if self._eof: raise RuntimeError('Cannot call write() after write_eof()') if not data: @@ -718,6 +725,8 @@ class _SelectorSocketTransport(_SelectorTransport): def _write_ready(self): assert self._buffer, 'Data should not be empty' + if self._conn_lost: + return try: n = self._sock.send(self._buffer) except (BlockingIOError, InterruptedError): @@ -888,6 +897,8 @@ class _SelectorSslTransport(_SelectorTransport): logger.debug("%r resumes reading", self) def _read_ready(self): + if self._conn_lost: + return if self._write_wants_read: self._write_wants_read = False self._write_ready() @@ -920,6 +931,8 @@ class _SelectorSslTransport(_SelectorTransport): self.close() def _write_ready(self): + if self._conn_lost: + return if self._read_wants_write: self._read_wants_write = False self._read_ready() @@ -954,8 +967,8 @@ class _SelectorSslTransport(_SelectorTransport): def write(self, data): if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError('data argument must be byte-ish (%r)', - type(data)) + raise TypeError('data argument must be a bytes-like object, ' + 'not %r' % type(data).__name__) if not data: return @@ -997,6 +1010,8 @@ class _SelectorDatagramTransport(_SelectorTransport): return sum(len(data) for data, _ in self._buffer) def _read_ready(self): + if self._conn_lost: + return try: data, addr = self._sock.recvfrom(self.max_size) except (BlockingIOError, InterruptedError): @@ -1010,8 +1025,8 @@ class _SelectorDatagramTransport(_SelectorTransport): def sendto(self, data, addr=None): if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError('data argument must be byte-ish (%r)', - type(data)) + raise TypeError('data argument must be a bytes-like object, ' + 'not %r' % type(data).__name__) if not data: return diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py index dde980b..33d5de2 100644 --- a/Lib/asyncio/sslproto.py +++ b/Lib/asyncio/sslproto.py @@ -5,6 +5,7 @@ try: except ImportError: # pragma: no cover ssl = None +from . import base_events from . import compat from . import protocols from . import transports @@ -603,7 +604,7 @@ class SSLProtocol(protocols.Protocol): self._wakeup_waiter() self._session_established = True # In case transport.write() was already called. Don't call - # immediatly _process_write_backlog(), but schedule it: + # immediately _process_write_backlog(), but schedule it: # _on_handshake_complete() can be called indirectly from # _process_write_backlog(), and _process_write_backlog() is not # reentrant. @@ -655,7 +656,7 @@ class SSLProtocol(protocols.Protocol): def _fatal_error(self, exc, message='Fatal error on transport'): # Should be called from exception handler only. - if isinstance(exc, (BrokenPipeError, ConnectionResetError)): + if isinstance(exc, base_events._FATAL_ERROR_IGNORE): if self._loop.get_debug(): logger.debug("%r: %s", self, message, exc_info=True) else: diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 0008d51..c88a87c 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -14,13 +14,12 @@ if hasattr(socket, 'AF_UNIX'): from . import coroutines from . import compat from . import events -from . import futures from . import protocols from .coroutines import coroutine from .log import logger -_DEFAULT_LIMIT = 2**16 +_DEFAULT_LIMIT = 2 ** 16 class IncompleteReadError(EOFError): @@ -38,15 +37,13 @@ class IncompleteReadError(EOFError): class LimitOverrunError(Exception): - """Reached buffer limit while looking for the separator. + """Reached the buffer limit while looking for a separator. Attributes: - - message: error message - - consumed: total number of bytes that should be consumed + - consumed: total number of to be consumed bytes. """ def __init__(self, message, consumed): super().__init__(message) - self.message = message self.consumed = consumed @@ -132,7 +129,6 @@ if hasattr(socket, 'AF_UNIX'): writer = StreamWriter(transport, protocol, reader, loop) return reader, writer - @coroutine def start_unix_server(client_connected_cb, path=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): @@ -210,7 +206,7 @@ class FlowControlMixin(protocols.Protocol): return waiter = self._drain_waiter assert waiter is None or waiter.cancelled() - waiter = futures.Future(loop=self._loop) + waiter = self._loop.create_future() self._drain_waiter = waiter yield from waiter @@ -229,9 +225,11 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): self._stream_reader = stream_reader self._stream_writer = None self._client_connected_cb = client_connected_cb + self._over_ssl = False def connection_made(self, transport): self._stream_reader.set_transport(transport) + self._over_ssl = transport.get_extra_info('sslcontext') is not None if self._client_connected_cb is not None: self._stream_writer = StreamWriter(transport, self, self._stream_reader, @@ -242,17 +240,25 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): self._loop.create_task(res) def connection_lost(self, exc): - if exc is None: - self._stream_reader.feed_eof() - else: - self._stream_reader.set_exception(exc) + if self._stream_reader is not None: + if exc is None: + self._stream_reader.feed_eof() + else: + self._stream_reader.set_exception(exc) super().connection_lost(exc) + self._stream_reader = None + self._stream_writer = None def data_received(self, data): self._stream_reader.feed_data(data) def eof_received(self): self._stream_reader.feed_eof() + if self._over_ssl: + # Prevent a warning in SSLProtocol.eof_received: + # "returning true from eof_received() + # has no effect when using ssl" + return False return True @@ -413,8 +419,8 @@ class StreamReader: self._wakeup_waiter() if (self._transport is not None and - not self._paused and - len(self._buffer) > 2*self._limit): + not self._paused and + len(self._buffer) > 2 * self._limit): try: self._transport.pause_reading() except NotImplementedError: @@ -446,7 +452,7 @@ class StreamReader: self._paused = False self._transport.resume_reading() - self._waiter = futures.Future(loop=self._loop) + self._waiter = self._loop.create_future() try: yield from self._waiter finally: @@ -486,24 +492,24 @@ class StreamReader: @coroutine def readuntil(self, separator=b'\n'): - """Read chunk of data from the stream until `separator` is found. - - On success, chunk and its separator will be removed from internal buffer - (i.e. consumed). Returned chunk will include separator at the end. + """Read data from the stream until ``separator`` is found. - Configured stream limit is used to check result. Limit means maximal - length of chunk that can be returned, not counting the separator. + On success, the data and separator will be removed from the + internal buffer (consumed). Returned data will include the + separator at the end. - If EOF occurs and complete separator still not found, - IncompleteReadError(<partial data>, None) will be raised and internal - buffer becomes empty. This partial data may contain a partial separator. + Configured stream limit is used to check result. Limit sets the + maximal length of data that can be returned, not counting the + separator. - If chunk cannot be read due to overlimit, LimitOverrunError will be raised - and data will be left in internal buffer, so it can be read again, in - some different way. + If an EOF occurs and the complete separator is still not found, + an IncompleteReadError exception will be raised, and the internal + buffer will be reset. The IncompleteReadError.partial attribute + may contain the separator partially. - If stream was paused, this function will automatically resume it if - needed. + If the data cannot be read because of over limit, a + LimitOverrunError exception will be raised, and the data + will be left in the internal buffer, so it can be read again. """ seplen = len(separator) if seplen == 0: @@ -529,8 +535,8 @@ class StreamReader: # performance problems. Even when reading MIME-encoded # messages :) - # `offset` is the number of bytes from the beginning of the buffer where - # is no occurrence of `separator`. + # `offset` is the number of bytes from the beginning of the buffer + # where there is no occurrence of `separator`. offset = 0 # Loop until we find `separator` in the buffer, exceed the buffer size, @@ -544,14 +550,16 @@ class StreamReader: isep = self._buffer.find(separator, offset) if isep != -1: - # `separator` is in the buffer. `isep` will be used later to - # retrieve the data. + # `separator` is in the buffer. `isep` will be used later + # to retrieve the data. break # see upper comment for explanation. offset = buflen + 1 - seplen if offset > self._limit: - raise LimitOverrunError('Separator is not found, and chunk exceed the limit', offset) + raise LimitOverrunError( + 'Separator is not found, and chunk exceed the limit', + offset) # Complete message (with full separator) may be present in buffer # even when EOF flag is set. This may happen when the last chunk @@ -566,7 +574,8 @@ class StreamReader: yield from self._wait_for_data('readuntil') if isep > self._limit: - raise LimitOverrunError('Separator is found, but chunk is longer than limit', isep) + raise LimitOverrunError( + 'Separator is found, but chunk is longer than limit', isep) chunk = self._buffer[:isep + seplen] del self._buffer[:isep + seplen] @@ -588,7 +597,8 @@ class StreamReader: received before any byte is read, this function returns empty byte object. - Returned value is not limited with limit, configured at stream creation. + Returned value is not limited with limit, configured at stream + creation. If stream was paused, this function will automatically resume it if needed. @@ -627,13 +637,14 @@ class StreamReader: def readexactly(self, n): """Read exactly `n` bytes. - Raise an `IncompleteReadError` if EOF is reached before `n` bytes can be - read. The `IncompleteReadError.partial` attribute of the exception will + Raise an IncompleteReadError if EOF is reached before `n` bytes can be + read. The IncompleteReadError.partial attribute of the exception will contain the partial read bytes. if n is zero, return empty bytes object. - Returned value is not limited with limit, configured at stream creation. + Returned value is not limited with limit, configured at stream + creation. If stream was paused, this function will automatically resume it if needed. @@ -678,3 +689,9 @@ class StreamReader: if val == b'': raise StopAsyncIteration return val + + if compat.PY352: + # In Python 3.5.2 and greater, __aiter__ should return + # the asynchronous iterator directly. + def __aiter__(self): + return self diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index ead4039..b2f5304 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -166,7 +166,7 @@ class Process: @coroutine def communicate(self, input=None): - if input: + if input is not None: stdin = self._feed_stdin(input) else: stdin = self._noop() diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index c37aa41..0cca8e3 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -4,7 +4,6 @@ __all__ = ['Task', 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 'wait', 'wait_for', 'as_completed', 'sleep', 'async', 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', - 'timeout', ] import concurrent.futures @@ -373,7 +372,7 @@ def wait_for(fut, timeout, *, loop=None): if timeout is None: return (yield from fut) - waiter = futures.Future(loop=loop) + waiter = loop.create_future() timeout_handle = loop.call_later(timeout, _release_waiter, waiter) cb = functools.partial(_release_waiter, waiter) @@ -401,12 +400,12 @@ def wait_for(fut, timeout, *, loop=None): @coroutine def _wait(fs, timeout, return_when, loop): - """Internal helper for wait() and _wait_for(). + """Internal helper for wait() and wait_for(). The fs argument must be a collection of Futures. """ assert fs, 'Set of Futures is empty.' - waiter = futures.Future(loop=loop) + waiter = loop.create_future() timeout_handle = None if timeout is not None: timeout_handle = loop.call_later(timeout, _release_waiter, waiter) @@ -507,7 +506,9 @@ def sleep(delay, result=None, *, loop=None): yield return result - future = futures.Future(loop=loop) + if loop is None: + loop = events.get_event_loop() + future = loop.create_future() h = future._loop.call_later(delay, futures._set_result_unless_cancelled, future, result) @@ -604,7 +605,9 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): be cancelled.) """ if not coros_or_futures: - outer = futures.Future(loop=loop) + if loop is None: + loop = events.get_event_loop() + outer = loop.create_future() outer.set_result([]) return outer @@ -692,7 +695,7 @@ def shield(arg, *, loop=None): # Shortcut. return inner loop = inner._loop - outer = futures.Future(loop=loop) + outer = loop.create_future() def _done_callback(inner): if outer.cancelled(): @@ -733,53 +736,3 @@ def run_coroutine_threadsafe(coro, loop): loop.call_soon_threadsafe(callback) return future - - -def timeout(timeout, *, loop=None): - """A factory which produce a context manager with timeout. - - Useful in cases when you want to apply timeout logic around block - of code or in cases when asyncio.wait_for is not suitable. - - For example: - - >>> with asyncio.timeout(0.001): - ... yield from coro() - - - timeout: timeout value in seconds - loop: asyncio compatible event loop - """ - if loop is None: - loop = events.get_event_loop() - return _Timeout(timeout, loop=loop) - - -class _Timeout: - def __init__(self, timeout, *, loop): - self._timeout = timeout - self._loop = loop - self._task = None - self._cancelled = False - self._cancel_handler = None - - def __enter__(self): - self._task = Task.current_task(loop=self._loop) - if self._task is None: - raise RuntimeError('Timeout context manager should be used ' - 'inside a task') - self._cancel_handler = self._loop.call_later( - self._timeout, self._cancel_task) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if exc_type is futures.CancelledError and self._cancelled: - self._cancel_handler = None - self._task = None - raise futures.TimeoutError - self._cancel_handler.cancel() - self._cancel_handler = None - self._task = None - - def _cancel_task(self): - self._cancelled = self._task.cancel() diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 7747ff4..d712749 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -177,7 +177,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): stdin, stdout, stderr, bufsize, extra=None, **kwargs): with events.get_child_watcher() as watcher: - waiter = futures.Future(loop=self) + waiter = self.create_future() transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, waiter=waiter, extra=extra, @@ -329,14 +329,17 @@ class _UnixReadPipeTransport(transports.ReadTransport): elif self._closing: info.append('closing') info.append('fd=%s' % self._fileno) - if self._pipe is not None: + selector = getattr(self._loop, '_selector', None) + if self._pipe is not None and selector is not None: polling = selector_events._test_selector_event( - self._loop._selector, + selector, self._fileno, selectors.EVENT_READ) if polling: info.append('polling') else: info.append('idle') + elif self._pipe is not None: + info.append('open') else: info.append('closed') return '<%s>' % ' '.join(info) @@ -453,9 +456,10 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, elif self._closing: info.append('closing') info.append('fd=%s' % self._fileno) - if self._pipe is not None: + selector = getattr(self._loop, '_selector', None) + if self._pipe is not None and selector is not None: polling = selector_events._test_selector_event( - self._loop._selector, + selector, self._fileno, selectors.EVENT_WRITE) if polling: info.append('polling') @@ -464,6 +468,8 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, bufsize = self.get_write_buffer_size() info.append('bufsize=%s' % bufsize) + elif self._pipe is not None: + info.append('open') else: info.append('closed') return '<%s>' % ' '.join(info) @@ -575,7 +581,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, def _fatal_error(self, exc, message='Fatal error on pipe transport'): # should be called by exception handler only - if isinstance(exc, (BrokenPipeError, ConnectionResetError)): + if isinstance(exc, base_events._FATAL_ERROR_IGNORE): if self._loop.get_debug(): logger.debug("%r: %s", self, message, exc_info=True) else: diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 922594f..668fe14 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -197,7 +197,7 @@ class _WaitHandleFuture(_BaseWaitHandleFuture): # # If the IocpProactor already received the event, it's safe to call # _unregister() because we kept a reference to the Overlapped object - # which is used as an unique key. + # which is used as a unique key. self._proactor._unregister(self._ov) self._proactor = None @@ -366,7 +366,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): - waiter = futures.Future(loop=self) + waiter = self.create_future() transp = _WindowsSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, waiter=waiter, extra=extra, @@ -417,7 +417,7 @@ class IocpProactor: return tmp def _result(self, value): - fut = futures.Future(loop=self._loop) + fut = self._loop.create_future() fut.set_result(value) return fut |
