diff options
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r-- | Lib/asyncio/selector_events.py | 280 |
1 files changed, 174 insertions, 106 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 5b26631..12d357b 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -11,6 +11,7 @@ import errno import functools import socket import warnings +import weakref try: import ssl except ImportError: # pragma: no cover @@ -39,6 +40,17 @@ def _test_selector_event(selector, fd, event): return bool(key.events & event) +if hasattr(socket, 'TCP_NODELAY'): + def _set_nodelay(sock): + if (sock.family in {socket.AF_INET, socket.AF_INET6} and + sock.type == socket.SOCK_STREAM and + sock.proto == socket.IPPROTO_TCP): + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) +else: + def _set_nodelay(sock): + pass + + class BaseSelectorEventLoop(base_events.BaseEventLoop): """Selector event loop. @@ -53,6 +65,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): logger.debug('Using selector: %s', selector.__class__.__name__) self._selector = selector self._make_self_pipe() + self._transports = weakref.WeakValueDictionary() def _make_socket_transport(self, sock, protocol, waiter=None, *, extra=None, server=None): @@ -104,7 +117,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): raise NotImplementedError def _close_self_pipe(self): - self.remove_reader(self._ssock.fileno()) + self._remove_reader(self._ssock.fileno()) self._ssock.close() self._ssock = None self._csock.close() @@ -117,7 +130,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): self._ssock.setblocking(False) self._csock.setblocking(False) self._internal_fds += 1 - self.add_reader(self._ssock.fileno(), self._read_from_self) + self._add_reader(self._ssock.fileno(), self._read_from_self) def _process_self_data(self, data): pass @@ -151,43 +164,50 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): exc_info=True) def _start_serving(self, protocol_factory, sock, - sslcontext=None, server=None): - self.add_reader(sock.fileno(), self._accept_connection, - protocol_factory, sock, sslcontext, server) + sslcontext=None, server=None, backlog=100): + self._add_reader(sock.fileno(), self._accept_connection, + protocol_factory, sock, sslcontext, server, backlog) def _accept_connection(self, protocol_factory, sock, - sslcontext=None, server=None): - try: - conn, addr = sock.accept() - if self._debug: - logger.debug("%r got a new connection from %r: %r", - server, addr, conn) - conn.setblocking(False) - except (BlockingIOError, InterruptedError, ConnectionAbortedError): - pass # False alarm. - except OSError as exc: - # There's nowhere to send the error, so just log it. - if exc.errno in (errno.EMFILE, errno.ENFILE, - errno.ENOBUFS, errno.ENOMEM): - # Some platforms (e.g. Linux keep reporting the FD as - # ready, so we remove the read handler temporarily. - # We'll try again in a while. - self.call_exception_handler({ - 'message': 'socket.accept() out of system resource', - 'exception': exc, - 'socket': sock, - }) - self.remove_reader(sock.fileno()) - self.call_later(constants.ACCEPT_RETRY_DELAY, - self._start_serving, - protocol_factory, sock, sslcontext, server) + sslcontext=None, server=None, backlog=100): + # This method is only called once for each event loop tick where the + # listening socket has triggered an EVENT_READ. There may be multiple + # connections waiting for an .accept() so it is called in a loop. + # See https://bugs.python.org/issue27906 for more details. + for _ in range(backlog): + try: + conn, addr = sock.accept() + if self._debug: + logger.debug("%r got a new connection from %r: %r", + server, addr, conn) + conn.setblocking(False) + except (BlockingIOError, InterruptedError, ConnectionAbortedError): + # Early exit because the socket accept buffer is empty. + return None + except OSError as exc: + # There's nowhere to send the error, so just log it. + if exc.errno in (errno.EMFILE, errno.ENFILE, + errno.ENOBUFS, errno.ENOMEM): + # Some platforms (e.g. Linux keep reporting the FD as + # ready, so we remove the read handler temporarily. + # We'll try again in a while. + self.call_exception_handler({ + 'message': 'socket.accept() out of system resource', + 'exception': exc, + 'socket': sock, + }) + self._remove_reader(sock.fileno()) + self.call_later(constants.ACCEPT_RETRY_DELAY, + self._start_serving, + protocol_factory, sock, sslcontext, server, + backlog) + else: + raise # The event loop will catch, log and ignore it. else: - raise # The event loop will catch, log and ignore it. - else: - extra = {'peername': addr} - accept = self._accept_connection2(protocol_factory, conn, extra, - sslcontext, server) - self.create_task(accept) + extra = {'peername': addr} + accept = self._accept_connection2(protocol_factory, conn, extra, + sslcontext, server) + self.create_task(accept) @coroutine def _accept_connection2(self, protocol_factory, conn, extra, @@ -196,7 +216,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): transport = None try: protocol = protocol_factory() - waiter = futures.Future(loop=self) + waiter = self.create_future() if sslcontext: transport = self._make_ssl_transport( conn, protocol, sslcontext, waiter=waiter, @@ -226,8 +246,18 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): context['transport'] = transport self.call_exception_handler(context) - def add_reader(self, fd, callback, *args): - """Add a reader callback.""" + def _ensure_fd_no_transport(self, fd): + try: + transport = self._transports[fd] + except KeyError: + pass + else: + if not transport.is_closing(): + raise RuntimeError( + 'File descriptor {!r} is used by transport {!r}'.format( + fd, transport)) + + def _add_reader(self, fd, callback, *args): self._check_closed() handle = events.Handle(callback, args, self) try: @@ -242,8 +272,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): if reader is not None: reader.cancel() - def remove_reader(self, fd): - """Remove a reader callback.""" + def _remove_reader(self, fd): if self.is_closed(): return False try: @@ -264,8 +293,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): else: return False - def add_writer(self, fd, callback, *args): - """Add a writer callback..""" + def _add_writer(self, fd, callback, *args): self._check_closed() handle = events.Handle(callback, args, self) try: @@ -280,7 +308,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): if writer is not None: writer.cancel() - def remove_writer(self, fd): + def _remove_writer(self, fd): """Remove a writer callback.""" if self.is_closed(): return False @@ -303,6 +331,26 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): else: return False + def add_reader(self, fd, callback, *args): + """Add a reader callback.""" + self._ensure_fd_no_transport(fd) + return self._add_reader(fd, callback, *args) + + def remove_reader(self, fd): + """Remove a reader callback.""" + self._ensure_fd_no_transport(fd) + return self._remove_reader(fd) + + def add_writer(self, fd, callback, *args): + """Add a writer callback..""" + self._ensure_fd_no_transport(fd) + return self._add_writer(fd, callback, *args) + + def remove_writer(self, fd): + """Remove a writer callback.""" + self._ensure_fd_no_transport(fd) + return self._remove_writer(fd) + def sock_recv(self, sock, n): """Receive data from the socket. @@ -314,7 +362,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = futures.Future(loop=self) + fut = self.create_future() self._sock_recv(fut, False, sock, n) return fut @@ -352,7 +400,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = futures.Future(loop=self) + fut = self.create_future() if data: self._sock_sendall(fut, False, sock, data) else: @@ -382,27 +430,25 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): data = data[n:] self.add_writer(fd, self._sock_sendall, fut, True, sock, data) + @coroutine def sock_connect(self, sock, address): """Connect to a remote socket at address. - The address must be already resolved to avoid the trap of hanging the - entire event loop when the address requires doing a DNS lookup. For - example, it must be an IP address, not an hostname, for AF_INET and - AF_INET6 address families. Use getaddrinfo() to resolve the hostname - asynchronously. - This method is a coroutine. """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = futures.Future(loop=self) - try: - base_events._check_resolved_address(sock, address) - except ValueError as err: - fut.set_exception(err) - else: - self._sock_connect(fut, sock, address) - return fut + + if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX: + resolved = base_events._ensure_resolved( + address, family=sock.family, proto=sock.proto, loop=self) + if not resolved.done(): + yield from resolved + _, _, _, _, address = resolved.result()[0] + + fut = self.create_future() + self._sock_connect(fut, sock, address) + return (yield from fut) def _sock_connect(self, fut, sock, address): fd = sock.fileno() @@ -413,8 +459,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): # connection runs in background. We have to wait until the socket # becomes writable to be notified when the connection succeed or # fails. - fut.add_done_callback(functools.partial(self._sock_connect_done, - fd)) + fut.add_done_callback( + functools.partial(self._sock_connect_done, fd)) self.add_writer(fd, self._sock_connect_cb, fut, sock, address) except Exception as exc: fut.set_exception(exc) @@ -453,7 +499,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = futures.Future(loop=self) + fut = self.create_future() self._sock_accept(fut, False, sock) return fut @@ -478,17 +524,17 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): fileobj, (reader, writer) = key.fileobj, key.data if mask & selectors.EVENT_READ and reader is not None: if reader._cancelled: - self.remove_reader(fileobj) + self._remove_reader(fileobj) else: self._add_callback(reader) if mask & selectors.EVENT_WRITE and writer is not None: if writer._cancelled: - self.remove_writer(fileobj) + self._remove_writer(fileobj) else: self._add_callback(writer) def _stop_serving(self, sock): - self.remove_reader(sock.fileno()) + self._remove_reader(sock.fileno()) sock.close() @@ -523,6 +569,7 @@ class _SelectorTransport(transports._FlowControlMixin, self._closing = False # Set when close() called. if self._server is not None: self._server._attach() + loop._transports[self._sock_fd] = self def __repr__(self): info = [self.__class__.__name__] @@ -555,6 +602,12 @@ class _SelectorTransport(transports._FlowControlMixin, def abort(self): self._force_close(None) + def set_protocol(self, protocol): + self._protocol = protocol + + def get_protocol(self): + return self._protocol + def is_closing(self): return self._closing @@ -562,9 +615,10 @@ class _SelectorTransport(transports._FlowControlMixin, if self._closing: return self._closing = True - self._loop.remove_reader(self._sock_fd) + self._loop._remove_reader(self._sock_fd) if not self._buffer: self._conn_lost += 1 + self._loop._remove_writer(self._sock_fd) self._loop.call_soon(self._call_connection_lost, None) # On Python 3.3 and older, objects with a destructor part of a reference @@ -578,8 +632,7 @@ class _SelectorTransport(transports._FlowControlMixin, def _fatal_error(self, exc, message='Fatal error on transport'): # Should be called from exception handler only. - if isinstance(exc, (BrokenPipeError, - ConnectionResetError, ConnectionAbortedError)): + if isinstance(exc, base_events._FATAL_ERROR_IGNORE): if self._loop.get_debug(): logger.debug("%r: %s", self, message, exc_info=True) else: @@ -596,10 +649,10 @@ class _SelectorTransport(transports._FlowControlMixin, return if self._buffer: self._buffer.clear() - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) if not self._closing: self._closing = True - self._loop.remove_reader(self._sock_fd) + self._loop._remove_reader(self._sock_fd) self._conn_lost += 1 self._loop.call_soon(self._call_connection_lost, exc) @@ -629,9 +682,14 @@ class _SelectorSocketTransport(_SelectorTransport): self._eof = False self._paused = False + # Disable the Nagle algorithm -- small writes will be + # sent without waiting for the TCP ACK. This generally + # decreases the latency (in some cases significantly.) + _set_nodelay(self._sock) + self._loop.call_soon(self._protocol.connection_made, self) # only start reading when connection_made() has been called - self._loop.call_soon(self._loop.add_reader, + self._loop.call_soon(self._loop._add_reader, self._sock_fd, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called @@ -644,7 +702,7 @@ class _SelectorSocketTransport(_SelectorTransport): if self._paused: raise RuntimeError('Already paused') self._paused = True - self._loop.remove_reader(self._sock_fd) + self._loop._remove_reader(self._sock_fd) if self._loop.get_debug(): logger.debug("%r pauses reading", self) @@ -654,11 +712,13 @@ class _SelectorSocketTransport(_SelectorTransport): self._paused = False if self._closing: return - self._loop.add_reader(self._sock_fd, self._read_ready) + self._loop._add_reader(self._sock_fd, self._read_ready) if self._loop.get_debug(): logger.debug("%r resumes reading", self) def _read_ready(self): + if self._conn_lost: + return try: data = self._sock.recv(self.max_size) except (BlockingIOError, InterruptedError): @@ -676,14 +736,14 @@ class _SelectorSocketTransport(_SelectorTransport): # We're keeping the connection open so the # protocol can write more, but we still can't # receive more, so remove the reader callback. - self._loop.remove_reader(self._sock_fd) + self._loop._remove_reader(self._sock_fd) else: self.close() def write(self, data): if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError('data argument must be byte-ish (%r)', - type(data)) + raise TypeError('data argument must be a bytes-like object, ' + 'not %r' % type(data).__name__) if self._eof: raise RuntimeError('Cannot call write() after write_eof()') if not data: @@ -709,7 +769,7 @@ class _SelectorSocketTransport(_SelectorTransport): if not data: return # Not all was written; register write handler. - self._loop.add_writer(self._sock_fd, self._write_ready) + self._loop._add_writer(self._sock_fd, self._write_ready) # Add it to the buffer. self._buffer.extend(data) @@ -718,12 +778,14 @@ class _SelectorSocketTransport(_SelectorTransport): def _write_ready(self): assert self._buffer, 'Data should not be empty' + if self._conn_lost: + return try: n = self._sock.send(self._buffer) except (BlockingIOError, InterruptedError): pass except Exception as exc: - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) self._buffer.clear() self._fatal_error(exc, 'Fatal write error on socket transport') else: @@ -731,7 +793,7 @@ class _SelectorSocketTransport(_SelectorTransport): del self._buffer[:n] self._maybe_resume_protocol() # May append to buffer. if not self._buffer: - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) elif self._eof: @@ -802,19 +864,19 @@ class _SelectorSslTransport(_SelectorTransport): try: self._sock.do_handshake() except ssl.SSLWantReadError: - self._loop.add_reader(self._sock_fd, - self._on_handshake, start_time) + self._loop._add_reader(self._sock_fd, + self._on_handshake, start_time) return except ssl.SSLWantWriteError: - self._loop.add_writer(self._sock_fd, - self._on_handshake, start_time) + self._loop._add_writer(self._sock_fd, + self._on_handshake, start_time) return except BaseException as exc: if self._loop.get_debug(): logger.warning("%r: SSL handshake failed", self, exc_info=True) - self._loop.remove_reader(self._sock_fd) - self._loop.remove_writer(self._sock_fd) + self._loop._remove_reader(self._sock_fd) + self._loop._remove_writer(self._sock_fd) self._sock.close() self._wakeup_waiter(exc) if isinstance(exc, Exception): @@ -822,8 +884,8 @@ class _SelectorSslTransport(_SelectorTransport): else: raise - self._loop.remove_reader(self._sock_fd) - self._loop.remove_writer(self._sock_fd) + self._loop._remove_reader(self._sock_fd) + self._loop._remove_writer(self._sock_fd) peercert = self._sock.getpeercert() if not hasattr(self._sslcontext, 'check_hostname'): @@ -851,7 +913,7 @@ class _SelectorSslTransport(_SelectorTransport): self._read_wants_write = False self._write_wants_read = False - self._loop.add_reader(self._sock_fd, self._read_ready) + self._loop._add_reader(self._sock_fd, self._read_ready) self._protocol_connected = True self._loop.call_soon(self._protocol.connection_made, self) # only wake up the waiter when connection_made() has been called @@ -873,7 +935,7 @@ class _SelectorSslTransport(_SelectorTransport): if self._paused: raise RuntimeError('Already paused') self._paused = True - self._loop.remove_reader(self._sock_fd) + self._loop._remove_reader(self._sock_fd) if self._loop.get_debug(): logger.debug("%r pauses reading", self) @@ -883,17 +945,19 @@ class _SelectorSslTransport(_SelectorTransport): self._paused = False if self._closing: return - self._loop.add_reader(self._sock_fd, self._read_ready) + self._loop._add_reader(self._sock_fd, self._read_ready) if self._loop.get_debug(): logger.debug("%r resumes reading", self) def _read_ready(self): + if self._conn_lost: + return if self._write_wants_read: self._write_wants_read = False self._write_ready() if self._buffer: - self._loop.add_writer(self._sock_fd, self._write_ready) + self._loop._add_writer(self._sock_fd, self._write_ready) try: data = self._sock.recv(self.max_size) @@ -901,8 +965,8 @@ class _SelectorSslTransport(_SelectorTransport): pass except ssl.SSLWantWriteError: self._read_wants_write = True - self._loop.remove_reader(self._sock_fd) - self._loop.add_writer(self._sock_fd, self._write_ready) + self._loop._remove_reader(self._sock_fd) + self._loop._add_writer(self._sock_fd, self._write_ready) except Exception as exc: self._fatal_error(exc, 'Fatal read error on SSL transport') else: @@ -920,12 +984,14 @@ class _SelectorSslTransport(_SelectorTransport): self.close() def _write_ready(self): + if self._conn_lost: + return if self._read_wants_write: self._read_wants_write = False self._read_ready() if not (self._paused or self._closing): - self._loop.add_reader(self._sock_fd, self._read_ready) + self._loop._add_reader(self._sock_fd, self._read_ready) if self._buffer: try: @@ -934,10 +1000,10 @@ class _SelectorSslTransport(_SelectorTransport): n = 0 except ssl.SSLWantReadError: n = 0 - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) self._write_wants_read = True except Exception as exc: - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) self._buffer.clear() self._fatal_error(exc, 'Fatal write error on SSL transport') return @@ -948,14 +1014,14 @@ class _SelectorSslTransport(_SelectorTransport): self._maybe_resume_protocol() # May append to buffer. if not self._buffer: - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) def write(self, data): if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError('data argument must be byte-ish (%r)', - type(data)) + raise TypeError('data argument must be a bytes-like object, ' + 'not %r' % type(data).__name__) if not data: return @@ -966,7 +1032,7 @@ class _SelectorSslTransport(_SelectorTransport): return if not self._buffer: - self._loop.add_writer(self._sock_fd, self._write_ready) + self._loop._add_writer(self._sock_fd, self._write_ready) # Add it to the buffer. self._buffer.extend(data) @@ -986,7 +1052,7 @@ class _SelectorDatagramTransport(_SelectorTransport): self._address = address self._loop.call_soon(self._protocol.connection_made, self) # only start reading when connection_made() has been called - self._loop.call_soon(self._loop.add_reader, + self._loop.call_soon(self._loop._add_reader, self._sock_fd, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called @@ -997,6 +1063,8 @@ class _SelectorDatagramTransport(_SelectorTransport): return sum(len(data) for data, _ in self._buffer) def _read_ready(self): + if self._conn_lost: + return try: data, addr = self._sock.recvfrom(self.max_size) except (BlockingIOError, InterruptedError): @@ -1010,8 +1078,8 @@ class _SelectorDatagramTransport(_SelectorTransport): def sendto(self, data, addr=None): if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError('data argument must be byte-ish (%r)', - type(data)) + raise TypeError('data argument must be a bytes-like object, ' + 'not %r' % type(data).__name__) if not data: return @@ -1034,7 +1102,7 @@ class _SelectorDatagramTransport(_SelectorTransport): self._sock.sendto(data, addr) return except (BlockingIOError, InterruptedError): - self._loop.add_writer(self._sock_fd, self._sendto_ready) + self._loop._add_writer(self._sock_fd, self._sendto_ready) except OSError as exc: self._protocol.error_received(exc) return @@ -1068,6 +1136,6 @@ class _SelectorDatagramTransport(_SelectorTransport): self._maybe_resume_protocol() # May append to buffer. if not self._buffer: - self._loop.remove_writer(self._sock_fd) + self._loop._remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) |