diff options
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r-- | Lib/asyncio/selector_events.py | 61 |
1 files changed, 38 insertions, 23 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 5b26631..ed2b4d7 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -196,7 +196,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): transport = None try: protocol = protocol_factory() - waiter = futures.Future(loop=self) + waiter = self.create_future() if sslcontext: transport = self._make_ssl_transport( conn, protocol, sslcontext, waiter=waiter, @@ -314,7 +314,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = futures.Future(loop=self) + fut = self.create_future() self._sock_recv(fut, False, sock, n) return fut @@ -352,7 +352,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = futures.Future(loop=self) + fut = self.create_future() if data: self._sock_sendall(fut, False, sock, data) else: @@ -385,24 +385,29 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def sock_connect(self, sock, address): """Connect to a remote socket at address. - The address must be already resolved to avoid the trap of hanging the - entire event loop when the address requires doing a DNS lookup. For - example, it must be an IP address, not an hostname, for AF_INET and - AF_INET6 address families. Use getaddrinfo() to resolve the hostname - asynchronously. - This method is a coroutine. """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = futures.Future(loop=self) + + fut = self.create_future() + if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX: + self._sock_connect(fut, sock, address) + else: + resolved = base_events._ensure_resolved( + address, family=sock.family, proto=sock.proto, loop=self) + resolved.add_done_callback( + lambda resolved: self._on_resolved(fut, sock, resolved)) + + return fut + + def _on_resolved(self, fut, sock, resolved): try: - base_events._check_resolved_address(sock, address) - except ValueError as err: - fut.set_exception(err) + _, _, _, _, address = resolved.result()[0] + except Exception as exc: + fut.set_exception(exc) else: self._sock_connect(fut, sock, address) - return fut def _sock_connect(self, fut, sock, address): fd = sock.fileno() @@ -453,7 +458,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = futures.Future(loop=self) + fut = self.create_future() self._sock_accept(fut, False, sock) return fut @@ -565,6 +570,7 @@ class _SelectorTransport(transports._FlowControlMixin, self._loop.remove_reader(self._sock_fd) if not self._buffer: self._conn_lost += 1 + self._loop.remove_writer(self._sock_fd) self._loop.call_soon(self._call_connection_lost, None) # On Python 3.3 and older, objects with a destructor part of a reference @@ -578,8 +584,7 @@ class _SelectorTransport(transports._FlowControlMixin, def _fatal_error(self, exc, message='Fatal error on transport'): # Should be called from exception handler only. - if isinstance(exc, (BrokenPipeError, - ConnectionResetError, ConnectionAbortedError)): + if isinstance(exc, base_events._FATAL_ERROR_IGNORE): if self._loop.get_debug(): logger.debug("%r: %s", self, message, exc_info=True) else: @@ -659,6 +664,8 @@ class _SelectorSocketTransport(_SelectorTransport): logger.debug("%r resumes reading", self) def _read_ready(self): + if self._conn_lost: + return try: data = self._sock.recv(self.max_size) except (BlockingIOError, InterruptedError): @@ -682,8 +689,8 @@ class _SelectorSocketTransport(_SelectorTransport): def write(self, data): if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError('data argument must be byte-ish (%r)', - type(data)) + raise TypeError('data argument must be a bytes-like object, ' + 'not %r' % type(data).__name__) if self._eof: raise RuntimeError('Cannot call write() after write_eof()') if not data: @@ -718,6 +725,8 @@ class _SelectorSocketTransport(_SelectorTransport): def _write_ready(self): assert self._buffer, 'Data should not be empty' + if self._conn_lost: + return try: n = self._sock.send(self._buffer) except (BlockingIOError, InterruptedError): @@ -888,6 +897,8 @@ class _SelectorSslTransport(_SelectorTransport): logger.debug("%r resumes reading", self) def _read_ready(self): + if self._conn_lost: + return if self._write_wants_read: self._write_wants_read = False self._write_ready() @@ -920,6 +931,8 @@ class _SelectorSslTransport(_SelectorTransport): self.close() def _write_ready(self): + if self._conn_lost: + return if self._read_wants_write: self._read_wants_write = False self._read_ready() @@ -954,8 +967,8 @@ class _SelectorSslTransport(_SelectorTransport): def write(self, data): if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError('data argument must be byte-ish (%r)', - type(data)) + raise TypeError('data argument must be a bytes-like object, ' + 'not %r' % type(data).__name__) if not data: return @@ -997,6 +1010,8 @@ class _SelectorDatagramTransport(_SelectorTransport): return sum(len(data) for data, _ in self._buffer) def _read_ready(self): + if self._conn_lost: + return try: data, addr = self._sock.recvfrom(self.max_size) except (BlockingIOError, InterruptedError): @@ -1010,8 +1025,8 @@ class _SelectorDatagramTransport(_SelectorTransport): def sendto(self, data, addr=None): if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError('data argument must be byte-ish (%r)', - type(data)) + raise TypeError('data argument must be a bytes-like object, ' + 'not %r' % type(data).__name__) if not data: return |