diff options
author | Fantix King <fantix.king@gmail.com> | 2020-05-27 19:47:30 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-27 19:47:30 (GMT) |
commit | 210a137396979d747c2602eeef46c34fc4955448 (patch) | |
tree | 4f667dafb62453bcea604a96e10a9bc1d2a21835 /Lib/asyncio/selector_events.py | |
parent | 526e23f1538134b728c21ac71ac977ae9e6a8de6 (diff) | |
download | cpython-210a137396979d747c2602eeef46c34fc4955448.zip cpython-210a137396979d747c2602eeef46c34fc4955448.tar.gz cpython-210a137396979d747c2602eeef46c34fc4955448.tar.bz2 |
bpo-30064: Fix asyncio loop.sock_* race condition issue (#20369)
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r-- | Lib/asyncio/selector_events.py | 41 |
1 files changed, 25 insertions, 16 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index a05cbb6..884a58f 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -266,6 +266,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): (handle, writer)) if reader is not None: reader.cancel() + return handle def _remove_reader(self, fd): if self.is_closed(): @@ -302,6 +303,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): (reader, handle)) if writer is not None: writer.cancel() + return handle def _remove_writer(self, fd): """Remove a writer callback.""" @@ -329,7 +331,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def add_reader(self, fd, callback, *args): """Add a reader callback.""" self._ensure_fd_no_transport(fd) - return self._add_reader(fd, callback, *args) + self._add_reader(fd, callback, *args) def remove_reader(self, fd): """Remove a reader callback.""" @@ -339,7 +341,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def add_writer(self, fd, callback, *args): """Add a writer callback..""" self._ensure_fd_no_transport(fd) - return self._add_writer(fd, callback, *args) + self._add_writer(fd, callback, *args) def remove_writer(self, fd): """Remove a writer callback.""" @@ -362,13 +364,15 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): pass fut = self.create_future() fd = sock.fileno() - self.add_reader(fd, self._sock_recv, fut, sock, n) + self._ensure_fd_no_transport(fd) + handle = self._add_reader(fd, self._sock_recv, fut, sock, n) fut.add_done_callback( - functools.partial(self._sock_read_done, fd)) + functools.partial(self._sock_read_done, fd, handle=handle)) return await fut - def _sock_read_done(self, fd, fut): - self.remove_reader(fd) + def _sock_read_done(self, fd, fut, handle=None): + if handle is None or not handle.cancelled(): + self.remove_reader(fd) def _sock_recv(self, fut, sock, n): # _sock_recv() can add itself as an I/O callback if the operation can't @@ -401,9 +405,10 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): pass fut = self.create_future() fd = sock.fileno() - self.add_reader(fd, self._sock_recv_into, fut, sock, buf) + self._ensure_fd_no_transport(fd) + handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf) fut.add_done_callback( - functools.partial(self._sock_read_done, fd)) + functools.partial(self._sock_read_done, fd, handle=handle)) return await fut def _sock_recv_into(self, fut, sock, buf): @@ -446,11 +451,12 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): fut = self.create_future() fd = sock.fileno() - fut.add_done_callback( - functools.partial(self._sock_write_done, fd)) + self._ensure_fd_no_transport(fd) # use a trick with a list in closure to store a mutable state - self.add_writer(fd, self._sock_sendall, fut, sock, - memoryview(data), [n]) + handle = self._add_writer(fd, self._sock_sendall, fut, sock, + memoryview(data), [n]) + fut.add_done_callback( + functools.partial(self._sock_write_done, fd, handle=handle)) return await fut def _sock_sendall(self, fut, sock, view, pos): @@ -502,9 +508,11 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): # connection runs in background. We have to wait until the socket # becomes writable to be notified when the connection succeed or # fails. + self._ensure_fd_no_transport(fd) + handle = self._add_writer( + fd, self._sock_connect_cb, fut, sock, address) fut.add_done_callback( - functools.partial(self._sock_write_done, fd)) - self.add_writer(fd, self._sock_connect_cb, fut, sock, address) + functools.partial(self._sock_write_done, fd, handle=handle)) except (SystemExit, KeyboardInterrupt): raise except BaseException as exc: @@ -512,8 +520,9 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): else: fut.set_result(None) - def _sock_write_done(self, fd, fut): - self.remove_writer(fd) + def _sock_write_done(self, fd, fut, handle=None): + if handle is None or not handle.cancelled(): + self.remove_writer(fd) def _sock_connect_cb(self, fut, sock, address): if fut.done(): |