summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/base_events.py195
-rw-r--r--Lib/asyncio/base_subprocess.py6
-rw-r--r--Lib/asyncio/compat.py1
-rw-r--r--Lib/asyncio/coroutines.py3
-rw-r--r--Lib/asyncio/events.py6
-rw-r--r--Lib/asyncio/futures.py9
-rw-r--r--Lib/asyncio/locks.py18
-rw-r--r--Lib/asyncio/proactor_events.py11
-rw-r--r--Lib/asyncio/queues.py4
-rw-r--r--Lib/asyncio/selector_events.py61
-rw-r--r--Lib/asyncio/sslproto.py5
-rw-r--r--Lib/asyncio/streams.py95
-rw-r--r--Lib/asyncio/subprocess.py2
-rw-r--r--Lib/asyncio/tasks.py67
-rw-r--r--Lib/asyncio/unix_events.py18
-rw-r--r--Lib/asyncio/windows_events.py6
16 files changed, 276 insertions, 231 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 4505732..0174375 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -16,10 +16,8 @@ to modify the meaning of the API call itself.
import collections
import concurrent.futures
-import functools
import heapq
import inspect
-import ipaddress
import itertools
import logging
import os
@@ -54,6 +52,12 @@ _MIN_SCHEDULED_TIMER_HANDLES = 100
# before cleanup of cancelled handles is performed.
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
+# Exceptions which must not call the exception handler in fatal error
+# methods (_fatal_error())
+_FATAL_ERROR_IGNORE = (BrokenPipeError,
+ ConnectionResetError, ConnectionAbortedError)
+
+
def _format_handle(handle):
cb = handle._callback
if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
@@ -80,12 +84,14 @@ if hasattr(socket, 'SOCK_CLOEXEC'):
_SOCKET_TYPE_MASK |= socket.SOCK_CLOEXEC
-@functools.lru_cache(maxsize=1024)
def _ipaddr_info(host, port, family, type, proto):
- # Try to skip getaddrinfo if "host" is already an IP. Since getaddrinfo
- # blocks on an exclusive lock on some platforms, users might handle name
- # resolution in their own code and pass in resolved IPs.
- if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or host is None:
+ # Try to skip getaddrinfo if "host" is already an IP. Users might have
+ # handled name resolution in their own code and pass in resolved IPs.
+ if not hasattr(socket, 'inet_pton'):
+ return
+
+ if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
+ host is None:
return None
type &= ~_SOCKET_TYPE_MASK
@@ -96,59 +102,63 @@ def _ipaddr_info(host, port, family, type, proto):
else:
return None
- if hasattr(socket, 'inet_pton'):
- if family == socket.AF_UNSPEC:
- afs = [socket.AF_INET, socket.AF_INET6]
+ if port is None:
+ port = 0
+ elif isinstance(port, bytes):
+ if port == b'':
+ port = 0
else:
- afs = [family]
-
- for af in afs:
- # Linux's inet_pton doesn't accept an IPv6 zone index after host,
- # like '::1%lo0', so strip it. If we happen to make an invalid
- # address look valid, we fail later in sock.connect or sock.bind.
try:
- if af == socket.AF_INET6:
- socket.inet_pton(af, host.partition('%')[0])
- else:
- socket.inet_pton(af, host)
- return af, type, proto, '', (host, port)
- except OSError:
- pass
-
- # "host" is not an IP address.
- return None
+ port = int(port)
+ except ValueError:
+ # Might be a service name like b"http".
+ port = socket.getservbyname(port.decode('ascii'))
+ elif isinstance(port, str):
+ if port == '':
+ port = 0
+ else:
+ try:
+ port = int(port)
+ except ValueError:
+ # Might be a service name like "http".
+ port = socket.getservbyname(port)
- # No inet_pton. (On Windows it's only available since Python 3.4.)
- # Even though getaddrinfo with AI_NUMERICHOST would be non-blocking, it
- # still requires a lock on some platforms, and waiting for that lock could
- # block the event loop. Use ipaddress instead, it's just text parsing.
- try:
- addr = ipaddress.IPv4Address(host)
- except ValueError:
- try:
- addr = ipaddress.IPv6Address(host.partition('%')[0])
- except ValueError:
- return None
+ if family == socket.AF_UNSPEC:
+ afs = [socket.AF_INET, socket.AF_INET6]
+ else:
+ afs = [family]
- af = socket.AF_INET if addr.version == 4 else socket.AF_INET6
- if family not in (socket.AF_UNSPEC, af):
- # "host" is wrong IP version for "family".
+ if isinstance(host, bytes):
+ host = host.decode('idna')
+ if '%' in host:
+ # Linux's inet_pton doesn't accept an IPv6 zone index after host,
+ # like '::1%lo0'.
return None
- return af, type, proto, '', (host, port)
+ for af in afs:
+ try:
+ socket.inet_pton(af, host)
+ # The host has already been resolved.
+ return af, type, proto, '', (host, port)
+ except OSError:
+ pass
+ # "host" is not an IP address.
+ return None
-def _check_resolved_address(sock, address):
- # Ensure that the address is already resolved to avoid the trap of hanging
- # the entire event loop when the address requires doing a DNS lookup.
-
- if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX:
- return
+def _ensure_resolved(address, *, family=0, type=socket.SOCK_STREAM, proto=0,
+ flags=0, loop):
host, port = address[:2]
- if _ipaddr_info(host, port, sock.family, sock.type, sock.proto) is None:
- raise ValueError("address must be resolved (IP address),"
- " got host %r" % host)
+ info = _ipaddr_info(host, port, family, type, proto)
+ if info is not None:
+ # "host" is already a resolved IP.
+ fut = loop.create_future()
+ fut.set_result([info])
+ return fut
+ else:
+ return loop.getaddrinfo(host, port, family=family, type=type,
+ proto=proto, flags=flags)
def _run_until_complete_cb(fut):
@@ -203,7 +213,7 @@ class Server(events.AbstractServer):
def wait_closed(self):
if self.sockets is None or self._waiters is None:
return
- waiter = futures.Future(loop=self._loop)
+ waiter = self._loop.create_future()
self._waiters.append(waiter)
yield from waiter
@@ -237,6 +247,10 @@ class BaseEventLoop(events.AbstractEventLoop):
% (self.__class__.__name__, self.is_running(),
self.is_closed(), self.get_debug()))
+ def create_future(self):
+ """Create a Future object attached to the loop."""
+ return futures.Future(loop=self)
+
def create_task(self, coro):
"""Schedule a coroutine object.
@@ -530,7 +544,7 @@ class BaseEventLoop(events.AbstractEventLoop):
assert not args
assert not isinstance(func, events.TimerHandle)
if func._cancelled:
- f = futures.Future(loop=self)
+ f = self.create_future()
f.set_result(None)
return f
func, args = func._callback, func._args
@@ -571,12 +585,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def getaddrinfo(self, host, port, *,
family=0, type=0, proto=0, flags=0):
- info = _ipaddr_info(host, port, family, type, proto)
- if info is not None:
- fut = futures.Future(loop=self)
- fut.set_result([info])
- return fut
- elif self._debug:
+ if self._debug:
return self.run_in_executor(None, self._getaddrinfo_debug,
host, port, family, type, proto, flags)
else:
@@ -625,14 +634,14 @@ class BaseEventLoop(events.AbstractEventLoop):
raise ValueError(
'host/port and sock can not be specified at the same time')
- f1 = self.getaddrinfo(
- host, port, family=family,
- type=socket.SOCK_STREAM, proto=proto, flags=flags)
+ f1 = _ensure_resolved((host, port), family=family,
+ type=socket.SOCK_STREAM, proto=proto,
+ flags=flags, loop=self)
fs = [f1]
if local_addr is not None:
- f2 = self.getaddrinfo(
- *local_addr, family=family,
- type=socket.SOCK_STREAM, proto=proto, flags=flags)
+ f2 = _ensure_resolved(local_addr, family=family,
+ type=socket.SOCK_STREAM, proto=proto,
+ flags=flags, loop=self)
fs.append(f2)
else:
f2 = None
@@ -698,8 +707,6 @@ class BaseEventLoop(events.AbstractEventLoop):
raise ValueError(
'host and port was not specified and no sock specified')
- sock.setblocking(False)
-
transport, protocol = yield from self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname)
if self._debug:
@@ -712,14 +719,17 @@ class BaseEventLoop(events.AbstractEventLoop):
@coroutine
def _create_connection_transport(self, sock, protocol_factory, ssl,
- server_hostname):
+ server_hostname, server_side=False):
+
+ sock.setblocking(False)
+
protocol = protocol_factory()
- waiter = futures.Future(loop=self)
+ waiter = self.create_future()
if ssl:
sslcontext = None if isinstance(ssl, bool) else ssl
transport = self._make_ssl_transport(
sock, protocol, sslcontext, waiter,
- server_side=False, server_hostname=server_hostname)
+ server_side=server_side, server_hostname=server_hostname)
else:
transport = self._make_socket_transport(sock, protocol, waiter)
@@ -767,9 +777,9 @@ class BaseEventLoop(events.AbstractEventLoop):
assert isinstance(addr, tuple) and len(addr) == 2, (
'2-tuple is expected')
- infos = yield from self.getaddrinfo(
- *addr, family=family, type=socket.SOCK_DGRAM,
- proto=proto, flags=flags)
+ infos = yield from _ensure_resolved(
+ addr, family=family, type=socket.SOCK_DGRAM,
+ proto=proto, flags=flags, loop=self)
if not infos:
raise OSError('getaddrinfo() returned empty list')
@@ -834,7 +844,7 @@ class BaseEventLoop(events.AbstractEventLoop):
raise exceptions[0]
protocol = protocol_factory()
- waiter = futures.Future(loop=self)
+ waiter = self.create_future()
transport = self._make_datagram_transport(
sock, protocol, r_addr, waiter)
if self._debug:
@@ -857,9 +867,9 @@ class BaseEventLoop(events.AbstractEventLoop):
@coroutine
def _create_server_getaddrinfo(self, host, port, family, flags):
- infos = yield from self.getaddrinfo(host, port, family=family,
+ infos = yield from _ensure_resolved((host, port), family=family,
type=socket.SOCK_STREAM,
- flags=flags)
+ flags=flags, loop=self)
if not infos:
raise OSError('getaddrinfo({!r}) returned empty list'.format(host))
return infos
@@ -880,7 +890,10 @@ class BaseEventLoop(events.AbstractEventLoop):
to host and port.
The host parameter can also be a sequence of strings and in that case
- the TCP server is bound to all hosts of the sequence.
+ the TCP server is bound to all hosts of the sequence. If a host
+ appears multiple times (possibly indirectly e.g. when hostnames
+ resolve to the same IP address), the server is only bound once to that
+ host.
Return a Server object which can be used to stop the service.
@@ -909,7 +922,7 @@ class BaseEventLoop(events.AbstractEventLoop):
flags=flags)
for host in hosts]
infos = yield from tasks.gather(*fs, loop=self)
- infos = itertools.chain.from_iterable(infos)
+ infos = set(itertools.chain.from_iterable(infos))
completed = False
try:
@@ -968,9 +981,28 @@ class BaseEventLoop(events.AbstractEventLoop):
return server
@coroutine
+ def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
+ """Handle an accepted connection.
+
+ This is used by servers that accept connections outside of
+ asyncio but that use asyncio to handle connections.
+
+ This method is a coroutine. When completed, the coroutine
+ returns a (transport, protocol) pair.
+ """
+ transport, protocol = yield from self._create_connection_transport(
+ sock, protocol_factory, ssl, '', server_side=True)
+ if self._debug:
+ # Get the socket from the transport because SSL transport closes
+ # the old socket and creates a new SSL socket
+ sock = transport.get_extra_info('socket')
+ logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
+ return transport, protocol
+
+ @coroutine
def connect_read_pipe(self, protocol_factory, pipe):
protocol = protocol_factory()
- waiter = futures.Future(loop=self)
+ waiter = self.create_future()
transport = self._make_read_pipe_transport(pipe, protocol, waiter)
try:
@@ -987,7 +1019,7 @@ class BaseEventLoop(events.AbstractEventLoop):
@coroutine
def connect_write_pipe(self, protocol_factory, pipe):
protocol = protocol_factory()
- waiter = futures.Future(loop=self)
+ waiter = self.create_future()
transport = self._make_write_pipe_transport(pipe, protocol, waiter)
try:
@@ -1069,6 +1101,11 @@ class BaseEventLoop(events.AbstractEventLoop):
logger.info('%s: %r' % (debug_log, transport))
return transport, protocol
+ def get_exception_handler(self):
+ """Return an exception handler, or None if the default one is in use.
+ """
+ return self._exception_handler
+
def set_exception_handler(self, handler):
"""Set handler as the new event loop exception handler.
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
index 73425d9..8fc253c 100644
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -210,6 +210,10 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
logger.info('%r exited with return code %r',
self, returncode)
self._returncode = returncode
+ if self._proc.returncode is None:
+ # asyncio uses a child watcher: copy the status into the Popen
+ # object. On Python 3.6, it is required to avoid a ResourceWarning.
+ self._proc.returncode = returncode
self._call(self._protocol.process_exited)
self._try_finish()
@@ -227,7 +231,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
if self._returncode is not None:
return self._returncode
- waiter = futures.Future(loop=self._loop)
+ waiter = self._loop.create_future()
self._exit_waiters.append(waiter)
return (yield from waiter)
diff --git a/Lib/asyncio/compat.py b/Lib/asyncio/compat.py
index 660b7e7..4790bb4 100644
--- a/Lib/asyncio/compat.py
+++ b/Lib/asyncio/compat.py
@@ -4,6 +4,7 @@ import sys
PY34 = sys.version_info >= (3, 4)
PY35 = sys.version_info >= (3, 5)
+PY352 = sys.version_info >= (3, 5, 2)
def flatten_list_bytes(list_of_data):
diff --git a/Lib/asyncio/coroutines.py b/Lib/asyncio/coroutines.py
index 27ab42a..71bc6fb 100644
--- a/Lib/asyncio/coroutines.py
+++ b/Lib/asyncio/coroutines.py
@@ -204,7 +204,8 @@ def coroutine(func):
@functools.wraps(func)
def coro(*args, **kw):
res = func(*args, **kw)
- if isinstance(res, futures.Future) or inspect.isgenerator(res):
+ if isinstance(res, futures.Future) or inspect.isgenerator(res) or \
+ isinstance(res, CoroWrapper):
res = yield from res
elif _AwaitableABC is not None:
# If 'func' returns an Awaitable (new in 3.5) we
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 176a846..c48c5be 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -266,6 +266,9 @@ class AbstractEventLoop:
def time(self):
raise NotImplementedError
+ def create_future(self):
+ raise NotImplementedError
+
# Method scheduling a coroutine object: create a task.
def create_task(self, coro):
@@ -484,6 +487,9 @@ class AbstractEventLoop:
# Error handlers.
+ def get_exception_handler(self):
+ raise NotImplementedError
+
def set_exception_handler(self, handler):
raise NotImplementedError
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index 4dcb654..1feba4d 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -142,7 +142,7 @@ class Future:
def __init__(self, *, loop=None):
"""Initialize the future.
- The optional event_loop argument allows to explicitly set the event
+ The optional event_loop argument allows explicitly setting the event
loop object used by the future. If it's not provided, the future uses
the default event loop.
"""
@@ -341,6 +341,9 @@ class Future:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
if isinstance(exception, type):
exception = exception()
+ if type(exception) is StopIteration:
+ raise TypeError("StopIteration interacts badly with generators "
+ "and cannot be raised into a Future")
self._exception = exception
self._state = _FINISHED
self._schedule_callbacks()
@@ -448,6 +451,8 @@ def wrap_future(future, *, loop=None):
return future
assert isinstance(future, concurrent.futures.Future), \
'concurrent.futures.Future is expected, got {!r}'.format(future)
- new_future = Future(loop=loop)
+ if loop is None:
+ loop = events.get_event_loop()
+ new_future = loop.create_future()
_chain_future(future, new_future)
return new_future
diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py
index 34f6bc1..741aaf2 100644
--- a/Lib/asyncio/locks.py
+++ b/Lib/asyncio/locks.py
@@ -111,7 +111,7 @@ class Lock(_ContextManagerMixin):
acquire() is a coroutine and should be called with 'yield from'.
Locks also support the context management protocol. '(yield from lock)'
- should be used as context manager expression.
+ should be used as the context manager expression.
Usage:
@@ -170,7 +170,7 @@ class Lock(_ContextManagerMixin):
self._locked = True
return True
- fut = futures.Future(loop=self._loop)
+ fut = self._loop.create_future()
self._waiters.append(fut)
try:
yield from fut
@@ -258,7 +258,7 @@ class Event:
if self._value:
return True
- fut = futures.Future(loop=self._loop)
+ fut = self._loop.create_future()
self._waiters.append(fut)
try:
yield from fut
@@ -320,7 +320,7 @@ class Condition(_ContextManagerMixin):
self.release()
try:
- fut = futures.Future(loop=self._loop)
+ fut = self._loop.create_future()
self._waiters.append(fut)
try:
yield from fut
@@ -329,7 +329,13 @@ class Condition(_ContextManagerMixin):
self._waiters.remove(fut)
finally:
- yield from self.acquire()
+ # Must reacquire lock even if wait is cancelled
+ while True:
+ try:
+ yield from self.acquire()
+ break
+ except futures.CancelledError:
+ pass
@coroutine
def wait_for(self, predicate):
@@ -433,7 +439,7 @@ class Semaphore(_ContextManagerMixin):
True.
"""
while self._value <= 0:
- fut = futures.Future(loop=self._loop)
+ fut = self._loop.create_future()
self._waiters.append(fut)
try:
yield from fut
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index 14c0659..3ac314c 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -90,7 +90,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self.close()
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
- if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+ if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
@@ -440,14 +440,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
return self._proactor.send(sock, data)
def sock_connect(self, sock, address):
- try:
- base_events._check_resolved_address(sock, address)
- except ValueError as err:
- fut = futures.Future(loop=self)
- fut.set_exception(err)
- return fut
- else:
- return self._proactor.connect(sock, address)
+ return self._proactor.connect(sock, address)
def sock_accept(self, sock):
return self._proactor.accept(sock)
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
index e3a1d5e..c453f02 100644
--- a/Lib/asyncio/queues.py
+++ b/Lib/asyncio/queues.py
@@ -128,7 +128,7 @@ class Queue:
This method is a coroutine.
"""
while self.full():
- putter = futures.Future(loop=self._loop)
+ putter = self._loop.create_future()
self._putters.append(putter)
try:
yield from putter
@@ -162,7 +162,7 @@ class Queue:
This method is a coroutine.
"""
while self.empty():
- getter = futures.Future(loop=self._loop)
+ getter = self._loop.create_future()
self._getters.append(getter)
try:
yield from getter
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 5b26631..ed2b4d7 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -196,7 +196,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
transport = None
try:
protocol = protocol_factory()
- waiter = futures.Future(loop=self)
+ waiter = self.create_future()
if sslcontext:
transport = self._make_ssl_transport(
conn, protocol, sslcontext, waiter=waiter,
@@ -314,7 +314,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = futures.Future(loop=self)
+ fut = self.create_future()
self._sock_recv(fut, False, sock, n)
return fut
@@ -352,7 +352,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = futures.Future(loop=self)
+ fut = self.create_future()
if data:
self._sock_sendall(fut, False, sock, data)
else:
@@ -385,24 +385,29 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def sock_connect(self, sock, address):
"""Connect to a remote socket at address.
- The address must be already resolved to avoid the trap of hanging the
- entire event loop when the address requires doing a DNS lookup. For
- example, it must be an IP address, not an hostname, for AF_INET and
- AF_INET6 address families. Use getaddrinfo() to resolve the hostname
- asynchronously.
-
This method is a coroutine.
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = futures.Future(loop=self)
+
+ fut = self.create_future()
+ if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX:
+ self._sock_connect(fut, sock, address)
+ else:
+ resolved = base_events._ensure_resolved(
+ address, family=sock.family, proto=sock.proto, loop=self)
+ resolved.add_done_callback(
+ lambda resolved: self._on_resolved(fut, sock, resolved))
+
+ return fut
+
+ def _on_resolved(self, fut, sock, resolved):
try:
- base_events._check_resolved_address(sock, address)
- except ValueError as err:
- fut.set_exception(err)
+ _, _, _, _, address = resolved.result()[0]
+ except Exception as exc:
+ fut.set_exception(exc)
else:
self._sock_connect(fut, sock, address)
- return fut
def _sock_connect(self, fut, sock, address):
fd = sock.fileno()
@@ -453,7 +458,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = futures.Future(loop=self)
+ fut = self.create_future()
self._sock_accept(fut, False, sock)
return fut
@@ -565,6 +570,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self._loop.remove_reader(self._sock_fd)
if not self._buffer:
self._conn_lost += 1
+ self._loop.remove_writer(self._sock_fd)
self._loop.call_soon(self._call_connection_lost, None)
# On Python 3.3 and older, objects with a destructor part of a reference
@@ -578,8 +584,7 @@ class _SelectorTransport(transports._FlowControlMixin,
def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only.
- if isinstance(exc, (BrokenPipeError,
- ConnectionResetError, ConnectionAbortedError)):
+ if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
@@ -659,6 +664,8 @@ class _SelectorSocketTransport(_SelectorTransport):
logger.debug("%r resumes reading", self)
def _read_ready(self):
+ if self._conn_lost:
+ return
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError):
@@ -682,8 +689,8 @@ class _SelectorSocketTransport(_SelectorTransport):
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
- raise TypeError('data argument must be byte-ish (%r)',
- type(data))
+ raise TypeError('data argument must be a bytes-like object, '
+ 'not %r' % type(data).__name__)
if self._eof:
raise RuntimeError('Cannot call write() after write_eof()')
if not data:
@@ -718,6 +725,8 @@ class _SelectorSocketTransport(_SelectorTransport):
def _write_ready(self):
assert self._buffer, 'Data should not be empty'
+ if self._conn_lost:
+ return
try:
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError):
@@ -888,6 +897,8 @@ class _SelectorSslTransport(_SelectorTransport):
logger.debug("%r resumes reading", self)
def _read_ready(self):
+ if self._conn_lost:
+ return
if self._write_wants_read:
self._write_wants_read = False
self._write_ready()
@@ -920,6 +931,8 @@ class _SelectorSslTransport(_SelectorTransport):
self.close()
def _write_ready(self):
+ if self._conn_lost:
+ return
if self._read_wants_write:
self._read_wants_write = False
self._read_ready()
@@ -954,8 +967,8 @@ class _SelectorSslTransport(_SelectorTransport):
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
- raise TypeError('data argument must be byte-ish (%r)',
- type(data))
+ raise TypeError('data argument must be a bytes-like object, '
+ 'not %r' % type(data).__name__)
if not data:
return
@@ -997,6 +1010,8 @@ class _SelectorDatagramTransport(_SelectorTransport):
return sum(len(data) for data, _ in self._buffer)
def _read_ready(self):
+ if self._conn_lost:
+ return
try:
data, addr = self._sock.recvfrom(self.max_size)
except (BlockingIOError, InterruptedError):
@@ -1010,8 +1025,8 @@ class _SelectorDatagramTransport(_SelectorTransport):
def sendto(self, data, addr=None):
if not isinstance(data, (bytes, bytearray, memoryview)):
- raise TypeError('data argument must be byte-ish (%r)',
- type(data))
+ raise TypeError('data argument must be a bytes-like object, '
+ 'not %r' % type(data).__name__)
if not data:
return
diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py
index dde980b..33d5de2 100644
--- a/Lib/asyncio/sslproto.py
+++ b/Lib/asyncio/sslproto.py
@@ -5,6 +5,7 @@ try:
except ImportError: # pragma: no cover
ssl = None
+from . import base_events
from . import compat
from . import protocols
from . import transports
@@ -603,7 +604,7 @@ class SSLProtocol(protocols.Protocol):
self._wakeup_waiter()
self._session_established = True
# In case transport.write() was already called. Don't call
- # immediatly _process_write_backlog(), but schedule it:
+ # immediately _process_write_backlog(), but schedule it:
# _on_handshake_complete() can be called indirectly from
# _process_write_backlog(), and _process_write_backlog() is not
# reentrant.
@@ -655,7 +656,7 @@ class SSLProtocol(protocols.Protocol):
def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only.
- if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+ if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 0008d51..c88a87c 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -14,13 +14,12 @@ if hasattr(socket, 'AF_UNIX'):
from . import coroutines
from . import compat
from . import events
-from . import futures
from . import protocols
from .coroutines import coroutine
from .log import logger
-_DEFAULT_LIMIT = 2**16
+_DEFAULT_LIMIT = 2 ** 16
class IncompleteReadError(EOFError):
@@ -38,15 +37,13 @@ class IncompleteReadError(EOFError):
class LimitOverrunError(Exception):
- """Reached buffer limit while looking for the separator.
+ """Reached the buffer limit while looking for a separator.
Attributes:
- - message: error message
- - consumed: total number of bytes that should be consumed
+ - consumed: total number of to be consumed bytes.
"""
def __init__(self, message, consumed):
super().__init__(message)
- self.message = message
self.consumed = consumed
@@ -132,7 +129,6 @@ if hasattr(socket, 'AF_UNIX'):
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer
-
@coroutine
def start_unix_server(client_connected_cb, path=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
@@ -210,7 +206,7 @@ class FlowControlMixin(protocols.Protocol):
return
waiter = self._drain_waiter
assert waiter is None or waiter.cancelled()
- waiter = futures.Future(loop=self._loop)
+ waiter = self._loop.create_future()
self._drain_waiter = waiter
yield from waiter
@@ -229,9 +225,11 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
self._stream_reader = stream_reader
self._stream_writer = None
self._client_connected_cb = client_connected_cb
+ self._over_ssl = False
def connection_made(self, transport):
self._stream_reader.set_transport(transport)
+ self._over_ssl = transport.get_extra_info('sslcontext') is not None
if self._client_connected_cb is not None:
self._stream_writer = StreamWriter(transport, self,
self._stream_reader,
@@ -242,17 +240,25 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
self._loop.create_task(res)
def connection_lost(self, exc):
- if exc is None:
- self._stream_reader.feed_eof()
- else:
- self._stream_reader.set_exception(exc)
+ if self._stream_reader is not None:
+ if exc is None:
+ self._stream_reader.feed_eof()
+ else:
+ self._stream_reader.set_exception(exc)
super().connection_lost(exc)
+ self._stream_reader = None
+ self._stream_writer = None
def data_received(self, data):
self._stream_reader.feed_data(data)
def eof_received(self):
self._stream_reader.feed_eof()
+ if self._over_ssl:
+ # Prevent a warning in SSLProtocol.eof_received:
+ # "returning true from eof_received()
+ # has no effect when using ssl"
+ return False
return True
@@ -413,8 +419,8 @@ class StreamReader:
self._wakeup_waiter()
if (self._transport is not None and
- not self._paused and
- len(self._buffer) > 2*self._limit):
+ not self._paused and
+ len(self._buffer) > 2 * self._limit):
try:
self._transport.pause_reading()
except NotImplementedError:
@@ -446,7 +452,7 @@ class StreamReader:
self._paused = False
self._transport.resume_reading()
- self._waiter = futures.Future(loop=self._loop)
+ self._waiter = self._loop.create_future()
try:
yield from self._waiter
finally:
@@ -486,24 +492,24 @@ class StreamReader:
@coroutine
def readuntil(self, separator=b'\n'):
- """Read chunk of data from the stream until `separator` is found.
-
- On success, chunk and its separator will be removed from internal buffer
- (i.e. consumed). Returned chunk will include separator at the end.
+ """Read data from the stream until ``separator`` is found.
- Configured stream limit is used to check result. Limit means maximal
- length of chunk that can be returned, not counting the separator.
+ On success, the data and separator will be removed from the
+ internal buffer (consumed). Returned data will include the
+ separator at the end.
- If EOF occurs and complete separator still not found,
- IncompleteReadError(<partial data>, None) will be raised and internal
- buffer becomes empty. This partial data may contain a partial separator.
+ Configured stream limit is used to check result. Limit sets the
+ maximal length of data that can be returned, not counting the
+ separator.
- If chunk cannot be read due to overlimit, LimitOverrunError will be raised
- and data will be left in internal buffer, so it can be read again, in
- some different way.
+ If an EOF occurs and the complete separator is still not found,
+ an IncompleteReadError exception will be raised, and the internal
+ buffer will be reset. The IncompleteReadError.partial attribute
+ may contain the separator partially.
- If stream was paused, this function will automatically resume it if
- needed.
+ If the data cannot be read because of over limit, a
+ LimitOverrunError exception will be raised, and the data
+ will be left in the internal buffer, so it can be read again.
"""
seplen = len(separator)
if seplen == 0:
@@ -529,8 +535,8 @@ class StreamReader:
# performance problems. Even when reading MIME-encoded
# messages :)
- # `offset` is the number of bytes from the beginning of the buffer where
- # is no occurrence of `separator`.
+ # `offset` is the number of bytes from the beginning of the buffer
+ # where there is no occurrence of `separator`.
offset = 0
# Loop until we find `separator` in the buffer, exceed the buffer size,
@@ -544,14 +550,16 @@ class StreamReader:
isep = self._buffer.find(separator, offset)
if isep != -1:
- # `separator` is in the buffer. `isep` will be used later to
- # retrieve the data.
+ # `separator` is in the buffer. `isep` will be used later
+ # to retrieve the data.
break
# see upper comment for explanation.
offset = buflen + 1 - seplen
if offset > self._limit:
- raise LimitOverrunError('Separator is not found, and chunk exceed the limit', offset)
+ raise LimitOverrunError(
+ 'Separator is not found, and chunk exceed the limit',
+ offset)
# Complete message (with full separator) may be present in buffer
# even when EOF flag is set. This may happen when the last chunk
@@ -566,7 +574,8 @@ class StreamReader:
yield from self._wait_for_data('readuntil')
if isep > self._limit:
- raise LimitOverrunError('Separator is found, but chunk is longer than limit', isep)
+ raise LimitOverrunError(
+ 'Separator is found, but chunk is longer than limit', isep)
chunk = self._buffer[:isep + seplen]
del self._buffer[:isep + seplen]
@@ -588,7 +597,8 @@ class StreamReader:
received before any byte is read, this function returns empty byte
object.
- Returned value is not limited with limit, configured at stream creation.
+ Returned value is not limited with limit, configured at stream
+ creation.
If stream was paused, this function will automatically resume it if
needed.
@@ -627,13 +637,14 @@ class StreamReader:
def readexactly(self, n):
"""Read exactly `n` bytes.
- Raise an `IncompleteReadError` if EOF is reached before `n` bytes can be
- read. The `IncompleteReadError.partial` attribute of the exception will
+ Raise an IncompleteReadError if EOF is reached before `n` bytes can be
+ read. The IncompleteReadError.partial attribute of the exception will
contain the partial read bytes.
if n is zero, return empty bytes object.
- Returned value is not limited with limit, configured at stream creation.
+ Returned value is not limited with limit, configured at stream
+ creation.
If stream was paused, this function will automatically resume it if
needed.
@@ -678,3 +689,9 @@ class StreamReader:
if val == b'':
raise StopAsyncIteration
return val
+
+ if compat.PY352:
+ # In Python 3.5.2 and greater, __aiter__ should return
+ # the asynchronous iterator directly.
+ def __aiter__(self):
+ return self
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index ead4039..b2f5304 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -166,7 +166,7 @@ class Process:
@coroutine
def communicate(self, input=None):
- if input:
+ if input is not None:
stdin = self._feed_stdin(input)
else:
stdin = self._noop()
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index c37aa41..0cca8e3 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -4,7 +4,6 @@ __all__ = ['Task',
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
'wait', 'wait_for', 'as_completed', 'sleep', 'async',
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
- 'timeout',
]
import concurrent.futures
@@ -373,7 +372,7 @@ def wait_for(fut, timeout, *, loop=None):
if timeout is None:
return (yield from fut)
- waiter = futures.Future(loop=loop)
+ waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
cb = functools.partial(_release_waiter, waiter)
@@ -401,12 +400,12 @@ def wait_for(fut, timeout, *, loop=None):
@coroutine
def _wait(fs, timeout, return_when, loop):
- """Internal helper for wait() and _wait_for().
+ """Internal helper for wait() and wait_for().
The fs argument must be a collection of Futures.
"""
assert fs, 'Set of Futures is empty.'
- waiter = futures.Future(loop=loop)
+ waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
@@ -507,7 +506,9 @@ def sleep(delay, result=None, *, loop=None):
yield
return result
- future = futures.Future(loop=loop)
+ if loop is None:
+ loop = events.get_event_loop()
+ future = loop.create_future()
h = future._loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
@@ -604,7 +605,9 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
be cancelled.)
"""
if not coros_or_futures:
- outer = futures.Future(loop=loop)
+ if loop is None:
+ loop = events.get_event_loop()
+ outer = loop.create_future()
outer.set_result([])
return outer
@@ -692,7 +695,7 @@ def shield(arg, *, loop=None):
# Shortcut.
return inner
loop = inner._loop
- outer = futures.Future(loop=loop)
+ outer = loop.create_future()
def _done_callback(inner):
if outer.cancelled():
@@ -733,53 +736,3 @@ def run_coroutine_threadsafe(coro, loop):
loop.call_soon_threadsafe(callback)
return future
-
-
-def timeout(timeout, *, loop=None):
- """A factory which produce a context manager with timeout.
-
- Useful in cases when you want to apply timeout logic around block
- of code or in cases when asyncio.wait_for is not suitable.
-
- For example:
-
- >>> with asyncio.timeout(0.001):
- ... yield from coro()
-
-
- timeout: timeout value in seconds
- loop: asyncio compatible event loop
- """
- if loop is None:
- loop = events.get_event_loop()
- return _Timeout(timeout, loop=loop)
-
-
-class _Timeout:
- def __init__(self, timeout, *, loop):
- self._timeout = timeout
- self._loop = loop
- self._task = None
- self._cancelled = False
- self._cancel_handler = None
-
- def __enter__(self):
- self._task = Task.current_task(loop=self._loop)
- if self._task is None:
- raise RuntimeError('Timeout context manager should be used '
- 'inside a task')
- self._cancel_handler = self._loop.call_later(
- self._timeout, self._cancel_task)
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_type is futures.CancelledError and self._cancelled:
- self._cancel_handler = None
- self._task = None
- raise futures.TimeoutError
- self._cancel_handler.cancel()
- self._cancel_handler = None
- self._task = None
-
- def _cancel_task(self):
- self._cancelled = self._task.cancel()
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 7747ff4..d712749 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -177,7 +177,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
with events.get_child_watcher() as watcher:
- waiter = futures.Future(loop=self)
+ waiter = self.create_future()
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
waiter=waiter, extra=extra,
@@ -329,14 +329,17 @@ class _UnixReadPipeTransport(transports.ReadTransport):
elif self._closing:
info.append('closing')
info.append('fd=%s' % self._fileno)
- if self._pipe is not None:
+ selector = getattr(self._loop, '_selector', None)
+ if self._pipe is not None and selector is not None:
polling = selector_events._test_selector_event(
- self._loop._selector,
+ selector,
self._fileno, selectors.EVENT_READ)
if polling:
info.append('polling')
else:
info.append('idle')
+ elif self._pipe is not None:
+ info.append('open')
else:
info.append('closed')
return '<%s>' % ' '.join(info)
@@ -453,9 +456,10 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
elif self._closing:
info.append('closing')
info.append('fd=%s' % self._fileno)
- if self._pipe is not None:
+ selector = getattr(self._loop, '_selector', None)
+ if self._pipe is not None and selector is not None:
polling = selector_events._test_selector_event(
- self._loop._selector,
+ selector,
self._fileno, selectors.EVENT_WRITE)
if polling:
info.append('polling')
@@ -464,6 +468,8 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
bufsize = self.get_write_buffer_size()
info.append('bufsize=%s' % bufsize)
+ elif self._pipe is not None:
+ info.append('open')
else:
info.append('closed')
return '<%s>' % ' '.join(info)
@@ -575,7 +581,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
# should be called by exception handler only
- if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+ if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index 922594f..668fe14 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -197,7 +197,7 @@ class _WaitHandleFuture(_BaseWaitHandleFuture):
#
# If the IocpProactor already received the event, it's safe to call
# _unregister() because we kept a reference to the Overlapped object
- # which is used as an unique key.
+ # which is used as a unique key.
self._proactor._unregister(self._ov)
self._proactor = None
@@ -366,7 +366,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
- waiter = futures.Future(loop=self)
+ waiter = self.create_future()
transp = _WindowsSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
waiter=waiter, extra=extra,
@@ -417,7 +417,7 @@ class IocpProactor:
return tmp
def _result(self, value):
- fut = futures.Future(loop=self._loop)
+ fut = self._loop.create_future()
fut.set_result(value)
return fut