summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/selector_events.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r--Lib/asyncio/selector_events.py61
1 files changed, 38 insertions, 23 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 5b26631..ed2b4d7 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -196,7 +196,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
transport = None
try:
protocol = protocol_factory()
- waiter = futures.Future(loop=self)
+ waiter = self.create_future()
if sslcontext:
transport = self._make_ssl_transport(
conn, protocol, sslcontext, waiter=waiter,
@@ -314,7 +314,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = futures.Future(loop=self)
+ fut = self.create_future()
self._sock_recv(fut, False, sock, n)
return fut
@@ -352,7 +352,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = futures.Future(loop=self)
+ fut = self.create_future()
if data:
self._sock_sendall(fut, False, sock, data)
else:
@@ -385,24 +385,29 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def sock_connect(self, sock, address):
"""Connect to a remote socket at address.
- The address must be already resolved to avoid the trap of hanging the
- entire event loop when the address requires doing a DNS lookup. For
- example, it must be an IP address, not an hostname, for AF_INET and
- AF_INET6 address families. Use getaddrinfo() to resolve the hostname
- asynchronously.
-
This method is a coroutine.
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = futures.Future(loop=self)
+
+ fut = self.create_future()
+ if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX:
+ self._sock_connect(fut, sock, address)
+ else:
+ resolved = base_events._ensure_resolved(
+ address, family=sock.family, proto=sock.proto, loop=self)
+ resolved.add_done_callback(
+ lambda resolved: self._on_resolved(fut, sock, resolved))
+
+ return fut
+
+ def _on_resolved(self, fut, sock, resolved):
try:
- base_events._check_resolved_address(sock, address)
- except ValueError as err:
- fut.set_exception(err)
+ _, _, _, _, address = resolved.result()[0]
+ except Exception as exc:
+ fut.set_exception(exc)
else:
self._sock_connect(fut, sock, address)
- return fut
def _sock_connect(self, fut, sock, address):
fd = sock.fileno()
@@ -453,7 +458,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = futures.Future(loop=self)
+ fut = self.create_future()
self._sock_accept(fut, False, sock)
return fut
@@ -565,6 +570,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self._loop.remove_reader(self._sock_fd)
if not self._buffer:
self._conn_lost += 1
+ self._loop.remove_writer(self._sock_fd)
self._loop.call_soon(self._call_connection_lost, None)
# On Python 3.3 and older, objects with a destructor part of a reference
@@ -578,8 +584,7 @@ class _SelectorTransport(transports._FlowControlMixin,
def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only.
- if isinstance(exc, (BrokenPipeError,
- ConnectionResetError, ConnectionAbortedError)):
+ if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
@@ -659,6 +664,8 @@ class _SelectorSocketTransport(_SelectorTransport):
logger.debug("%r resumes reading", self)
def _read_ready(self):
+ if self._conn_lost:
+ return
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError):
@@ -682,8 +689,8 @@ class _SelectorSocketTransport(_SelectorTransport):
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
- raise TypeError('data argument must be byte-ish (%r)',
- type(data))
+ raise TypeError('data argument must be a bytes-like object, '
+ 'not %r' % type(data).__name__)
if self._eof:
raise RuntimeError('Cannot call write() after write_eof()')
if not data:
@@ -718,6 +725,8 @@ class _SelectorSocketTransport(_SelectorTransport):
def _write_ready(self):
assert self._buffer, 'Data should not be empty'
+ if self._conn_lost:
+ return
try:
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError):
@@ -888,6 +897,8 @@ class _SelectorSslTransport(_SelectorTransport):
logger.debug("%r resumes reading", self)
def _read_ready(self):
+ if self._conn_lost:
+ return
if self._write_wants_read:
self._write_wants_read = False
self._write_ready()
@@ -920,6 +931,8 @@ class _SelectorSslTransport(_SelectorTransport):
self.close()
def _write_ready(self):
+ if self._conn_lost:
+ return
if self._read_wants_write:
self._read_wants_write = False
self._read_ready()
@@ -954,8 +967,8 @@ class _SelectorSslTransport(_SelectorTransport):
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
- raise TypeError('data argument must be byte-ish (%r)',
- type(data))
+ raise TypeError('data argument must be a bytes-like object, '
+ 'not %r' % type(data).__name__)
if not data:
return
@@ -997,6 +1010,8 @@ class _SelectorDatagramTransport(_SelectorTransport):
return sum(len(data) for data, _ in self._buffer)
def _read_ready(self):
+ if self._conn_lost:
+ return
try:
data, addr = self._sock.recvfrom(self.max_size)
except (BlockingIOError, InterruptedError):
@@ -1010,8 +1025,8 @@ class _SelectorDatagramTransport(_SelectorTransport):
def sendto(self, data, addr=None):
if not isinstance(data, (bytes, bytearray, memoryview)):
- raise TypeError('data argument must be byte-ish (%r)',
- type(data))
+ raise TypeError('data argument must be a bytes-like object, '
+ 'not %r' % type(data).__name__)
if not data:
return