summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/selector_events.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r--Lib/asyncio/selector_events.py280
1 files changed, 174 insertions, 106 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 5b26631..12d357b 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -11,6 +11,7 @@ import errno
import functools
import socket
import warnings
+import weakref
try:
import ssl
except ImportError: # pragma: no cover
@@ -39,6 +40,17 @@ def _test_selector_event(selector, fd, event):
return bool(key.events & event)
+if hasattr(socket, 'TCP_NODELAY'):
+ def _set_nodelay(sock):
+ if (sock.family in {socket.AF_INET, socket.AF_INET6} and
+ sock.type == socket.SOCK_STREAM and
+ sock.proto == socket.IPPROTO_TCP):
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+else:
+ def _set_nodelay(sock):
+ pass
+
+
class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""Selector event loop.
@@ -53,6 +65,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
logger.debug('Using selector: %s', selector.__class__.__name__)
self._selector = selector
self._make_self_pipe()
+ self._transports = weakref.WeakValueDictionary()
def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None):
@@ -104,7 +117,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
raise NotImplementedError
def _close_self_pipe(self):
- self.remove_reader(self._ssock.fileno())
+ self._remove_reader(self._ssock.fileno())
self._ssock.close()
self._ssock = None
self._csock.close()
@@ -117,7 +130,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
self._ssock.setblocking(False)
self._csock.setblocking(False)
self._internal_fds += 1
- self.add_reader(self._ssock.fileno(), self._read_from_self)
+ self._add_reader(self._ssock.fileno(), self._read_from_self)
def _process_self_data(self, data):
pass
@@ -151,43 +164,50 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
exc_info=True)
def _start_serving(self, protocol_factory, sock,
- sslcontext=None, server=None):
- self.add_reader(sock.fileno(), self._accept_connection,
- protocol_factory, sock, sslcontext, server)
+ sslcontext=None, server=None, backlog=100):
+ self._add_reader(sock.fileno(), self._accept_connection,
+ protocol_factory, sock, sslcontext, server, backlog)
def _accept_connection(self, protocol_factory, sock,
- sslcontext=None, server=None):
- try:
- conn, addr = sock.accept()
- if self._debug:
- logger.debug("%r got a new connection from %r: %r",
- server, addr, conn)
- conn.setblocking(False)
- except (BlockingIOError, InterruptedError, ConnectionAbortedError):
- pass # False alarm.
- except OSError as exc:
- # There's nowhere to send the error, so just log it.
- if exc.errno in (errno.EMFILE, errno.ENFILE,
- errno.ENOBUFS, errno.ENOMEM):
- # Some platforms (e.g. Linux keep reporting the FD as
- # ready, so we remove the read handler temporarily.
- # We'll try again in a while.
- self.call_exception_handler({
- 'message': 'socket.accept() out of system resource',
- 'exception': exc,
- 'socket': sock,
- })
- self.remove_reader(sock.fileno())
- self.call_later(constants.ACCEPT_RETRY_DELAY,
- self._start_serving,
- protocol_factory, sock, sslcontext, server)
+ sslcontext=None, server=None, backlog=100):
+ # This method is only called once for each event loop tick where the
+ # listening socket has triggered an EVENT_READ. There may be multiple
+ # connections waiting for an .accept() so it is called in a loop.
+ # See https://bugs.python.org/issue27906 for more details.
+ for _ in range(backlog):
+ try:
+ conn, addr = sock.accept()
+ if self._debug:
+ logger.debug("%r got a new connection from %r: %r",
+ server, addr, conn)
+ conn.setblocking(False)
+ except (BlockingIOError, InterruptedError, ConnectionAbortedError):
+ # Early exit because the socket accept buffer is empty.
+ return None
+ except OSError as exc:
+ # There's nowhere to send the error, so just log it.
+ if exc.errno in (errno.EMFILE, errno.ENFILE,
+ errno.ENOBUFS, errno.ENOMEM):
+ # Some platforms (e.g. Linux keep reporting the FD as
+ # ready, so we remove the read handler temporarily.
+ # We'll try again in a while.
+ self.call_exception_handler({
+ 'message': 'socket.accept() out of system resource',
+ 'exception': exc,
+ 'socket': sock,
+ })
+ self._remove_reader(sock.fileno())
+ self.call_later(constants.ACCEPT_RETRY_DELAY,
+ self._start_serving,
+ protocol_factory, sock, sslcontext, server,
+ backlog)
+ else:
+ raise # The event loop will catch, log and ignore it.
else:
- raise # The event loop will catch, log and ignore it.
- else:
- extra = {'peername': addr}
- accept = self._accept_connection2(protocol_factory, conn, extra,
- sslcontext, server)
- self.create_task(accept)
+ extra = {'peername': addr}
+ accept = self._accept_connection2(protocol_factory, conn, extra,
+ sslcontext, server)
+ self.create_task(accept)
@coroutine
def _accept_connection2(self, protocol_factory, conn, extra,
@@ -196,7 +216,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
transport = None
try:
protocol = protocol_factory()
- waiter = futures.Future(loop=self)
+ waiter = self.create_future()
if sslcontext:
transport = self._make_ssl_transport(
conn, protocol, sslcontext, waiter=waiter,
@@ -226,8 +246,18 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
context['transport'] = transport
self.call_exception_handler(context)
- def add_reader(self, fd, callback, *args):
- """Add a reader callback."""
+ def _ensure_fd_no_transport(self, fd):
+ try:
+ transport = self._transports[fd]
+ except KeyError:
+ pass
+ else:
+ if not transport.is_closing():
+ raise RuntimeError(
+ 'File descriptor {!r} is used by transport {!r}'.format(
+ fd, transport))
+
+ def _add_reader(self, fd, callback, *args):
self._check_closed()
handle = events.Handle(callback, args, self)
try:
@@ -242,8 +272,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
if reader is not None:
reader.cancel()
- def remove_reader(self, fd):
- """Remove a reader callback."""
+ def _remove_reader(self, fd):
if self.is_closed():
return False
try:
@@ -264,8 +293,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
return False
- def add_writer(self, fd, callback, *args):
- """Add a writer callback.."""
+ def _add_writer(self, fd, callback, *args):
self._check_closed()
handle = events.Handle(callback, args, self)
try:
@@ -280,7 +308,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
if writer is not None:
writer.cancel()
- def remove_writer(self, fd):
+ def _remove_writer(self, fd):
"""Remove a writer callback."""
if self.is_closed():
return False
@@ -303,6 +331,26 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
return False
+ def add_reader(self, fd, callback, *args):
+ """Add a reader callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._add_reader(fd, callback, *args)
+
+ def remove_reader(self, fd):
+ """Remove a reader callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._remove_reader(fd)
+
+ def add_writer(self, fd, callback, *args):
+ """Add a writer callback.."""
+ self._ensure_fd_no_transport(fd)
+ return self._add_writer(fd, callback, *args)
+
+ def remove_writer(self, fd):
+ """Remove a writer callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._remove_writer(fd)
+
def sock_recv(self, sock, n):
"""Receive data from the socket.
@@ -314,7 +362,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = futures.Future(loop=self)
+ fut = self.create_future()
self._sock_recv(fut, False, sock, n)
return fut
@@ -352,7 +400,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = futures.Future(loop=self)
+ fut = self.create_future()
if data:
self._sock_sendall(fut, False, sock, data)
else:
@@ -382,27 +430,25 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
data = data[n:]
self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
+ @coroutine
def sock_connect(self, sock, address):
"""Connect to a remote socket at address.
- The address must be already resolved to avoid the trap of hanging the
- entire event loop when the address requires doing a DNS lookup. For
- example, it must be an IP address, not an hostname, for AF_INET and
- AF_INET6 address families. Use getaddrinfo() to resolve the hostname
- asynchronously.
-
This method is a coroutine.
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = futures.Future(loop=self)
- try:
- base_events._check_resolved_address(sock, address)
- except ValueError as err:
- fut.set_exception(err)
- else:
- self._sock_connect(fut, sock, address)
- return fut
+
+ if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
+ resolved = base_events._ensure_resolved(
+ address, family=sock.family, proto=sock.proto, loop=self)
+ if not resolved.done():
+ yield from resolved
+ _, _, _, _, address = resolved.result()[0]
+
+ fut = self.create_future()
+ self._sock_connect(fut, sock, address)
+ return (yield from fut)
def _sock_connect(self, fut, sock, address):
fd = sock.fileno()
@@ -413,8 +459,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
# connection runs in background. We have to wait until the socket
# becomes writable to be notified when the connection succeed or
# fails.
- fut.add_done_callback(functools.partial(self._sock_connect_done,
- fd))
+ fut.add_done_callback(
+ functools.partial(self._sock_connect_done, fd))
self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
except Exception as exc:
fut.set_exception(exc)
@@ -453,7 +499,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = futures.Future(loop=self)
+ fut = self.create_future()
self._sock_accept(fut, False, sock)
return fut
@@ -478,17 +524,17 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
fileobj, (reader, writer) = key.fileobj, key.data
if mask & selectors.EVENT_READ and reader is not None:
if reader._cancelled:
- self.remove_reader(fileobj)
+ self._remove_reader(fileobj)
else:
self._add_callback(reader)
if mask & selectors.EVENT_WRITE and writer is not None:
if writer._cancelled:
- self.remove_writer(fileobj)
+ self._remove_writer(fileobj)
else:
self._add_callback(writer)
def _stop_serving(self, sock):
- self.remove_reader(sock.fileno())
+ self._remove_reader(sock.fileno())
sock.close()
@@ -523,6 +569,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self._closing = False # Set when close() called.
if self._server is not None:
self._server._attach()
+ loop._transports[self._sock_fd] = self
def __repr__(self):
info = [self.__class__.__name__]
@@ -555,6 +602,12 @@ class _SelectorTransport(transports._FlowControlMixin,
def abort(self):
self._force_close(None)
+ def set_protocol(self, protocol):
+ self._protocol = protocol
+
+ def get_protocol(self):
+ return self._protocol
+
def is_closing(self):
return self._closing
@@ -562,9 +615,10 @@ class _SelectorTransport(transports._FlowControlMixin,
if self._closing:
return
self._closing = True
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
if not self._buffer:
self._conn_lost += 1
+ self._loop._remove_writer(self._sock_fd)
self._loop.call_soon(self._call_connection_lost, None)
# On Python 3.3 and older, objects with a destructor part of a reference
@@ -578,8 +632,7 @@ class _SelectorTransport(transports._FlowControlMixin,
def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only.
- if isinstance(exc, (BrokenPipeError,
- ConnectionResetError, ConnectionAbortedError)):
+ if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
@@ -596,10 +649,10 @@ class _SelectorTransport(transports._FlowControlMixin,
return
if self._buffer:
self._buffer.clear()
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
if not self._closing:
self._closing = True
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
self._conn_lost += 1
self._loop.call_soon(self._call_connection_lost, exc)
@@ -629,9 +682,14 @@ class _SelectorSocketTransport(_SelectorTransport):
self._eof = False
self._paused = False
+ # Disable the Nagle algorithm -- small writes will be
+ # sent without waiting for the TCP ACK. This generally
+ # decreases the latency (in some cases significantly.)
+ _set_nodelay(self._sock)
+
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
- self._loop.call_soon(self._loop.add_reader,
+ self._loop.call_soon(self._loop._add_reader,
self._sock_fd, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
@@ -644,7 +702,7 @@ class _SelectorSocketTransport(_SelectorTransport):
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
@@ -654,11 +712,13 @@ class _SelectorSocketTransport(_SelectorTransport):
self._paused = False
if self._closing:
return
- self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
def _read_ready(self):
+ if self._conn_lost:
+ return
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError):
@@ -676,14 +736,14 @@ class _SelectorSocketTransport(_SelectorTransport):
# We're keeping the connection open so the
# protocol can write more, but we still can't
# receive more, so remove the reader callback.
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
else:
self.close()
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
- raise TypeError('data argument must be byte-ish (%r)',
- type(data))
+ raise TypeError('data argument must be a bytes-like object, '
+ 'not %r' % type(data).__name__)
if self._eof:
raise RuntimeError('Cannot call write() after write_eof()')
if not data:
@@ -709,7 +769,7 @@ class _SelectorSocketTransport(_SelectorTransport):
if not data:
return
# Not all was written; register write handler.
- self._loop.add_writer(self._sock_fd, self._write_ready)
+ self._loop._add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
self._buffer.extend(data)
@@ -718,12 +778,14 @@ class _SelectorSocketTransport(_SelectorTransport):
def _write_ready(self):
assert self._buffer, 'Data should not be empty'
+ if self._conn_lost:
+ return
try:
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError):
pass
except Exception as exc:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on socket transport')
else:
@@ -731,7 +793,7 @@ class _SelectorSocketTransport(_SelectorTransport):
del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
elif self._eof:
@@ -802,19 +864,19 @@ class _SelectorSslTransport(_SelectorTransport):
try:
self._sock.do_handshake()
except ssl.SSLWantReadError:
- self._loop.add_reader(self._sock_fd,
- self._on_handshake, start_time)
+ self._loop._add_reader(self._sock_fd,
+ self._on_handshake, start_time)
return
except ssl.SSLWantWriteError:
- self._loop.add_writer(self._sock_fd,
- self._on_handshake, start_time)
+ self._loop._add_writer(self._sock_fd,
+ self._on_handshake, start_time)
return
except BaseException as exc:
if self._loop.get_debug():
logger.warning("%r: SSL handshake failed",
self, exc_info=True)
- self._loop.remove_reader(self._sock_fd)
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._sock.close()
self._wakeup_waiter(exc)
if isinstance(exc, Exception):
@@ -822,8 +884,8 @@ class _SelectorSslTransport(_SelectorTransport):
else:
raise
- self._loop.remove_reader(self._sock_fd)
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
peercert = self._sock.getpeercert()
if not hasattr(self._sslcontext, 'check_hostname'):
@@ -851,7 +913,7 @@ class _SelectorSslTransport(_SelectorTransport):
self._read_wants_write = False
self._write_wants_read = False
- self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop._add_reader(self._sock_fd, self._read_ready)
self._protocol_connected = True
self._loop.call_soon(self._protocol.connection_made, self)
# only wake up the waiter when connection_made() has been called
@@ -873,7 +935,7 @@ class _SelectorSslTransport(_SelectorTransport):
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
@@ -883,17 +945,19 @@ class _SelectorSslTransport(_SelectorTransport):
self._paused = False
if self._closing:
return
- self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
def _read_ready(self):
+ if self._conn_lost:
+ return
if self._write_wants_read:
self._write_wants_read = False
self._write_ready()
if self._buffer:
- self._loop.add_writer(self._sock_fd, self._write_ready)
+ self._loop._add_writer(self._sock_fd, self._write_ready)
try:
data = self._sock.recv(self.max_size)
@@ -901,8 +965,8 @@ class _SelectorSslTransport(_SelectorTransport):
pass
except ssl.SSLWantWriteError:
self._read_wants_write = True
- self._loop.remove_reader(self._sock_fd)
- self._loop.add_writer(self._sock_fd, self._write_ready)
+ self._loop._remove_reader(self._sock_fd)
+ self._loop._add_writer(self._sock_fd, self._write_ready)
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on SSL transport')
else:
@@ -920,12 +984,14 @@ class _SelectorSslTransport(_SelectorTransport):
self.close()
def _write_ready(self):
+ if self._conn_lost:
+ return
if self._read_wants_write:
self._read_wants_write = False
self._read_ready()
if not (self._paused or self._closing):
- self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop._add_reader(self._sock_fd, self._read_ready)
if self._buffer:
try:
@@ -934,10 +1000,10 @@ class _SelectorSslTransport(_SelectorTransport):
n = 0
except ssl.SSLWantReadError:
n = 0
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._write_wants_read = True
except Exception as exc:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on SSL transport')
return
@@ -948,14 +1014,14 @@ class _SelectorSslTransport(_SelectorTransport):
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
- raise TypeError('data argument must be byte-ish (%r)',
- type(data))
+ raise TypeError('data argument must be a bytes-like object, '
+ 'not %r' % type(data).__name__)
if not data:
return
@@ -966,7 +1032,7 @@ class _SelectorSslTransport(_SelectorTransport):
return
if not self._buffer:
- self._loop.add_writer(self._sock_fd, self._write_ready)
+ self._loop._add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
self._buffer.extend(data)
@@ -986,7 +1052,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
self._address = address
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
- self._loop.call_soon(self._loop.add_reader,
+ self._loop.call_soon(self._loop._add_reader,
self._sock_fd, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
@@ -997,6 +1063,8 @@ class _SelectorDatagramTransport(_SelectorTransport):
return sum(len(data) for data, _ in self._buffer)
def _read_ready(self):
+ if self._conn_lost:
+ return
try:
data, addr = self._sock.recvfrom(self.max_size)
except (BlockingIOError, InterruptedError):
@@ -1010,8 +1078,8 @@ class _SelectorDatagramTransport(_SelectorTransport):
def sendto(self, data, addr=None):
if not isinstance(data, (bytes, bytearray, memoryview)):
- raise TypeError('data argument must be byte-ish (%r)',
- type(data))
+ raise TypeError('data argument must be a bytes-like object, '
+ 'not %r' % type(data).__name__)
if not data:
return
@@ -1034,7 +1102,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
self._sock.sendto(data, addr)
return
except (BlockingIOError, InterruptedError):
- self._loop.add_writer(self._sock_fd, self._sendto_ready)
+ self._loop._add_writer(self._sock_fd, self._sendto_ready)
except OSError as exc:
self._protocol.error_received(exc)
return
@@ -1068,6 +1136,6 @@ class _SelectorDatagramTransport(_SelectorTransport):
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)