summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/selector_events.py
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2018-11-12 17:00:22 (GMT)
committerGitHub <noreply@github.com>2018-11-12 17:00:22 (GMT)
commit74387926072abf338a4c1cec1bf0501fc65bbee5 (patch)
treea3328774e0b0ad8bbd256f0239b392dcd2952b3e /Lib/asyncio/selector_events.py
parent9404e7737bd09bc1df154e1216d721e5168e4c68 (diff)
downloadcpython-74387926072abf338a4c1cec1bf0501fc65bbee5.zip
cpython-74387926072abf338a4c1cec1bf0501fc65bbee5.tar.gz
cpython-74387926072abf338a4c1cec1bf0501fc65bbee5.tar.bz2
bpo-30064: Refactor sock_* asyncio API (#10419)
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r--Lib/asyncio/selector_events.py89
1 files changed, 49 insertions, 40 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 116c08d..ad09300 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -358,26 +358,29 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
+ try:
+ return sock.recv(n)
+ except (BlockingIOError, InterruptedError):
+ pass
fut = self.create_future()
- self._sock_recv(fut, None, sock, n)
+ fd = sock.fileno()
+ self.add_reader(fd, self._sock_recv, fut, sock, n)
+ fut.add_done_callback(
+ functools.partial(self._sock_read_done, fd))
return await fut
- def _sock_recv(self, fut, registered_fd, sock, n):
+ def _sock_read_done(self, fd, fut):
+ self.remove_reader(fd)
+
+ def _sock_recv(self, fut, sock, n):
# _sock_recv() can add itself as an I/O callback if the operation can't
# be done immediately. Don't use it directly, call sock_recv().
- if registered_fd is not None:
- # Remove the callback early. It should be rare that the
- # selector says the fd is ready but the call still returns
- # EAGAIN, and I am willing to take a hit in that case in
- # order to simplify the common case.
- self.remove_reader(registered_fd)
- if fut.cancelled():
+ if fut.done():
return
try:
data = sock.recv(n)
except (BlockingIOError, InterruptedError):
- fd = sock.fileno()
- self.add_reader(fd, self._sock_recv, fut, fd, sock, n)
+ return # try again next time
except Exception as exc:
fut.set_exception(exc)
else:
@@ -391,27 +394,27 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
+ try:
+ return sock.recv_into(buf)
+ except (BlockingIOError, InterruptedError):
+ pass
fut = self.create_future()
- self._sock_recv_into(fut, None, sock, buf)
+ fd = sock.fileno()
+ self.add_reader(fd, self._sock_recv_into, fut, sock, buf)
+ fut.add_done_callback(
+ functools.partial(self._sock_read_done, fd))
return await fut
- def _sock_recv_into(self, fut, registered_fd, sock, buf):
+ def _sock_recv_into(self, fut, sock, buf):
# _sock_recv_into() can add itself as an I/O callback if the operation
# can't be done immediately. Don't use it directly, call
# sock_recv_into().
- if registered_fd is not None:
- # Remove the callback early. It should be rare that the
- # selector says the FD is ready but the call still returns
- # EAGAIN, and I am willing to take a hit in that case in
- # order to simplify the common case.
- self.remove_reader(registered_fd)
- if fut.cancelled():
+ if fut.done():
return
try:
nbytes = sock.recv_into(buf)
except (BlockingIOError, InterruptedError):
- fd = sock.fileno()
- self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf)
+ return # try again next time
except Exception as exc:
fut.set_exception(exc)
else:
@@ -428,23 +431,32 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- fut = self.create_future()
- if data:
- self._sock_sendall(fut, None, sock, data)
+ try:
+ n = sock.send(data)
+ except (BlockingIOError, InterruptedError):
+ n = 0
+
+ if n == len(data):
+ # all data sent
+ return
else:
- fut.set_result(None)
+ data = bytearray(memoryview(data)[n:])
+
+ fut = self.create_future()
+ fd = sock.fileno()
+ fut.add_done_callback(
+ functools.partial(self._sock_write_done, fd))
+ self.add_writer(fd, self._sock_sendall, fut, sock, data)
return await fut
- def _sock_sendall(self, fut, registered_fd, sock, data):
- if registered_fd is not None:
- self.remove_writer(registered_fd)
- if fut.cancelled():
+ def _sock_sendall(self, fut, sock, data):
+ if fut.done():
+ # Future cancellation can be scheduled on previous loop iteration
return
-
try:
n = sock.send(data)
except (BlockingIOError, InterruptedError):
- n = 0
+ return
except Exception as exc:
fut.set_exception(exc)
return
@@ -452,10 +464,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
if n == len(data):
fut.set_result(None)
else:
- if n:
- data = data[n:]
- fd = sock.fileno()
- self.add_writer(fd, self._sock_sendall, fut, fd, sock, data)
+ del data[:n]
async def sock_connect(self, sock, address):
"""Connect to a remote socket at address.
@@ -484,18 +493,18 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
# becomes writable to be notified when the connection succeed or
# fails.
fut.add_done_callback(
- functools.partial(self._sock_connect_done, fd))
+ functools.partial(self._sock_write_done, fd))
self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
except Exception as exc:
fut.set_exception(exc)
else:
fut.set_result(None)
- def _sock_connect_done(self, fd, fut):
+ def _sock_write_done(self, fd, fut):
self.remove_writer(fd)
def _sock_connect_cb(self, fut, sock, address):
- if fut.cancelled():
+ if fut.done():
return
try:
@@ -529,7 +538,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
fd = sock.fileno()
if registered:
self.remove_reader(fd)
- if fut.cancelled():
+ if fut.done():
return
try:
conn, address = sock.accept()