diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2018-11-12 17:00:22 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-11-12 17:00:22 (GMT) |
commit | 74387926072abf338a4c1cec1bf0501fc65bbee5 (patch) | |
tree | a3328774e0b0ad8bbd256f0239b392dcd2952b3e /Lib/asyncio/selector_events.py | |
parent | 9404e7737bd09bc1df154e1216d721e5168e4c68 (diff) | |
download | cpython-74387926072abf338a4c1cec1bf0501fc65bbee5.zip cpython-74387926072abf338a4c1cec1bf0501fc65bbee5.tar.gz cpython-74387926072abf338a4c1cec1bf0501fc65bbee5.tar.bz2 |
bpo-30064: Refactor sock_* asyncio API (#10419)
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r-- | Lib/asyncio/selector_events.py | 89 |
1 files changed, 49 insertions, 40 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 116c08d..ad09300 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -358,26 +358,29 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") + try: + return sock.recv(n) + except (BlockingIOError, InterruptedError): + pass fut = self.create_future() - self._sock_recv(fut, None, sock, n) + fd = sock.fileno() + self.add_reader(fd, self._sock_recv, fut, sock, n) + fut.add_done_callback( + functools.partial(self._sock_read_done, fd)) return await fut - def _sock_recv(self, fut, registered_fd, sock, n): + def _sock_read_done(self, fd, fut): + self.remove_reader(fd) + + def _sock_recv(self, fut, sock, n): # _sock_recv() can add itself as an I/O callback if the operation can't # be done immediately. Don't use it directly, call sock_recv(). - if registered_fd is not None: - # Remove the callback early. It should be rare that the - # selector says the fd is ready but the call still returns - # EAGAIN, and I am willing to take a hit in that case in - # order to simplify the common case. - self.remove_reader(registered_fd) - if fut.cancelled(): + if fut.done(): return try: data = sock.recv(n) except (BlockingIOError, InterruptedError): - fd = sock.fileno() - self.add_reader(fd, self._sock_recv, fut, fd, sock, n) + return # try again next time except Exception as exc: fut.set_exception(exc) else: @@ -391,27 +394,27 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") + try: + return sock.recv_into(buf) + except (BlockingIOError, InterruptedError): + pass fut = self.create_future() - self._sock_recv_into(fut, None, sock, buf) + fd = sock.fileno() + self.add_reader(fd, self._sock_recv_into, fut, sock, buf) + fut.add_done_callback( + functools.partial(self._sock_read_done, fd)) return await fut - def _sock_recv_into(self, fut, registered_fd, sock, buf): + def _sock_recv_into(self, fut, sock, buf): # _sock_recv_into() can add itself as an I/O callback if the operation # can't be done immediately. Don't use it directly, call # sock_recv_into(). - if registered_fd is not None: - # Remove the callback early. It should be rare that the - # selector says the FD is ready but the call still returns - # EAGAIN, and I am willing to take a hit in that case in - # order to simplify the common case. - self.remove_reader(registered_fd) - if fut.cancelled(): + if fut.done(): return try: nbytes = sock.recv_into(buf) except (BlockingIOError, InterruptedError): - fd = sock.fileno() - self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf) + return # try again next time except Exception as exc: fut.set_exception(exc) else: @@ -428,23 +431,32 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): """ if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - fut = self.create_future() - if data: - self._sock_sendall(fut, None, sock, data) + try: + n = sock.send(data) + except (BlockingIOError, InterruptedError): + n = 0 + + if n == len(data): + # all data sent + return else: - fut.set_result(None) + data = bytearray(memoryview(data)[n:]) + + fut = self.create_future() + fd = sock.fileno() + fut.add_done_callback( + functools.partial(self._sock_write_done, fd)) + self.add_writer(fd, self._sock_sendall, fut, sock, data) return await fut - def _sock_sendall(self, fut, registered_fd, sock, data): - if registered_fd is not None: - self.remove_writer(registered_fd) - if fut.cancelled(): + def _sock_sendall(self, fut, sock, data): + if fut.done(): + # Future cancellation can be scheduled on previous loop iteration return - try: n = sock.send(data) except (BlockingIOError, InterruptedError): - n = 0 + return except Exception as exc: fut.set_exception(exc) return @@ -452,10 +464,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): if n == len(data): fut.set_result(None) else: - if n: - data = data[n:] - fd = sock.fileno() - self.add_writer(fd, self._sock_sendall, fut, fd, sock, data) + del data[:n] async def sock_connect(self, sock, address): """Connect to a remote socket at address. @@ -484,18 +493,18 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): # becomes writable to be notified when the connection succeed or # fails. fut.add_done_callback( - functools.partial(self._sock_connect_done, fd)) + functools.partial(self._sock_write_done, fd)) self.add_writer(fd, self._sock_connect_cb, fut, sock, address) except Exception as exc: fut.set_exception(exc) else: fut.set_result(None) - def _sock_connect_done(self, fd, fut): + def _sock_write_done(self, fd, fut): self.remove_writer(fd) def _sock_connect_cb(self, fut, sock, address): - if fut.cancelled(): + if fut.done(): return try: @@ -529,7 +538,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): fd = sock.fileno() if registered: self.remove_reader(fd) - if fut.cancelled(): + if fut.done(): return try: conn, address = sock.accept() |