diff options
author | Victor Stinner <vstinner@redhat.com> | 2018-06-07 22:25:52 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-06-07 22:25:52 (GMT) |
commit | 79790bc35fe722a49977b52647f9b5fe1deda2b7 (patch) | |
tree | b1b3da290ccfb7055c94e937cd1edc0282bf7a38 /Lib/asyncio | |
parent | d3ed67d14ed401dfe2b5d07b6941adc3ecacb268 (diff) | |
download | cpython-79790bc35fe722a49977b52647f9b5fe1deda2b7.zip cpython-79790bc35fe722a49977b52647f9b5fe1deda2b7.tar.gz cpython-79790bc35fe722a49977b52647f9b5fe1deda2b7.tar.bz2 |
bpo-33694: Fix race condition in asyncio proactor (GH-7498)
The cancellation of an overlapped WSARecv() has a race condition
which causes data loss because of the current implementation of
proactor in asyncio.
No longer cancel overlapped WSARecv() in _ProactorReadPipeTransport
to work around the race condition.
Remove the optimized recv_into() implementation to get simple
implementation of pause_reading() using the single _pending_data
attribute.
Move _feed_data_to_bufferred_proto() to protocols.py.
Remove set_protocol() method which became useless.
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/proactor_events.py | 178 | ||||
-rw-r--r-- | Lib/asyncio/protocols.py | 19 | ||||
-rw-r--r-- | Lib/asyncio/sslproto.py | 21 |
3 files changed, 73 insertions, 145 deletions
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 337ed0f..d9cfdff 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -159,27 +159,13 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, def __init__(self, loop, sock, protocol, waiter=None, extra=None, server=None): - self._loop_reading_cb = None + self._pending_data = None self._paused = True super().__init__(loop, sock, protocol, waiter, extra, server) - self._reschedule_on_resume = False self._loop.call_soon(self._loop_reading) self._paused = False - def set_protocol(self, protocol): - if isinstance(protocol, protocols.BufferedProtocol): - self._loop_reading_cb = self._loop_reading__get_buffer - else: - self._loop_reading_cb = self._loop_reading__data_received - - super().set_protocol(protocol) - - if self.is_reading(): - # reset reading callback / buffers / self._read_fut - self.pause_reading() - self.resume_reading() - def is_reading(self): return not self._paused and not self._closing @@ -188,17 +174,16 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, return self._paused = True - if self._read_fut is not None and not self._read_fut.done(): - # TODO: This is an ugly hack to cancel the current read future - # *and* avoid potential race conditions, as read cancellation - # goes through `future.cancel()` and `loop.call_soon()`. - # We then use this special attribute in the reader callback to - # exit *immediately* without doing any cleanup/rescheduling. - self._read_fut.__asyncio_cancelled_on_pause__ = True - - self._read_fut.cancel() - self._read_fut = None - self._reschedule_on_resume = True + # bpo-33694: Don't cancel self._read_fut because cancelling an + # overlapped WSASend() loss silently data with the current proactor + # implementation. + # + # If CancelIoEx() fails with ERROR_NOT_FOUND, it means that WSASend() + # completed (even if HasOverlappedIoCompleted() returns 0), but + # Overlapped.cancel() currently silently ignores the ERROR_NOT_FOUND + # error. Once the overlapped is ignored, the IOCP loop will ignores the + # completion I/O event and so not read the result of the overlapped + # WSARecv(). if self._loop.get_debug(): logger.debug("%r pauses reading", self) @@ -206,14 +191,22 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, def resume_reading(self): if self._closing or not self._paused: return + self._paused = False - if self._reschedule_on_resume: - self._loop.call_soon(self._loop_reading, self._read_fut) - self._reschedule_on_resume = False + if self._read_fut is None: + self._loop.call_soon(self._loop_reading, None) + + data = self._pending_data + self._pending_data = None + if data is not None: + # Call the protocol methode after calling _loop_reading(), + # since the protocol can decide to pause reading again. + self._loop.call_soon(self._data_received, data) + if self._loop.get_debug(): logger.debug("%r resumes reading", self) - def _loop_reading__on_eof(self): + def _eof_received(self): if self._loop.get_debug(): logger.debug("%r received EOF", self) @@ -227,18 +220,30 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if not keep_open: self.close() - def _loop_reading(self, fut=None): - self._loop_reading_cb(fut) - - def _loop_reading__data_received(self, fut): - if (fut is not None and - getattr(fut, '__asyncio_cancelled_on_pause__', False)): + def _data_received(self, data): + if self._paused: + # Don't call any protocol method while reading is paused. + # The protocol will be called on resume_reading(). + assert self._pending_data is None + self._pending_data = data return - if self._paused: - self._reschedule_on_resume = True + if not data: + self._eof_received() return + if isinstance(self._protocol, protocols.BufferedProtocol): + try: + protocols._feed_data_to_bufferred_proto(self._protocol, data) + except Exception as exc: + self._fatal_error(exc, + 'Fatal error: protocol.buffer_updated() ' + 'call failed.') + return + else: + self._protocol.data_received(data) + + def _loop_reading(self, fut=None): data = None try: if fut is not None: @@ -261,8 +266,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, # we got end-of-file so no need to reschedule a new read return - # reschedule a new read - self._read_fut = self._loop._proactor.recv(self._sock, 32768) + # bpo-33694: buffer_updated() has currently no fast path because of + # a data loss issue caused by overlapped WSASend() cancellation. + + if not self._paused: + # reschedule a new read + self._read_fut = self._loop._proactor.recv(self._sock, 32768) except ConnectionAbortedError as exc: if not self._closing: self._fatal_error(exc, 'Fatal read error on pipe transport') @@ -277,92 +286,11 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if not self._closing: raise else: - self._read_fut.add_done_callback(self._loop_reading__data_received) + if not self._paused: + self._read_fut.add_done_callback(self._loop_reading) finally: - if data: - self._protocol.data_received(data) - elif data == b'': - self._loop_reading__on_eof() - - def _loop_reading__get_buffer(self, fut): - if (fut is not None and - getattr(fut, '__asyncio_cancelled_on_pause__', False)): - return - - if self._paused: - self._reschedule_on_resume = True - return - - nbytes = None - if fut is not None: - assert self._read_fut is fut or (self._read_fut is None and - self._closing) - self._read_fut = None - try: - if fut.done(): - nbytes = fut.result() - else: - # the future will be replaced by next proactor.recv call - fut.cancel() - except ConnectionAbortedError as exc: - if not self._closing: - self._fatal_error( - exc, 'Fatal read error on pipe transport') - elif self._loop.get_debug(): - logger.debug("Read error on pipe transport while closing", - exc_info=True) - except ConnectionResetError as exc: - self._force_close(exc) - except OSError as exc: - self._fatal_error(exc, 'Fatal read error on pipe transport') - except futures.CancelledError: - if not self._closing: - raise - - if nbytes is not None: - if nbytes == 0: - # we got end-of-file so no need to reschedule a new read - self._loop_reading__on_eof() - else: - try: - self._protocol.buffer_updated(nbytes) - except Exception as exc: - self._fatal_error( - exc, - 'Fatal error: ' - 'protocol.buffer_updated() call failed.') - return - - if self._closing or nbytes == 0: - # since close() has been called we ignore any read data - return - - try: - buf = self._protocol.get_buffer(-1) - if not len(buf): - raise RuntimeError('get_buffer() returned an empty buffer') - except Exception as exc: - self._fatal_error( - exc, 'Fatal error: protocol.get_buffer() call failed.') - return - - try: - # schedule a new read - self._read_fut = self._loop._proactor.recv_into(self._sock, buf) - self._read_fut.add_done_callback(self._loop_reading__get_buffer) - except ConnectionAbortedError as exc: - if not self._closing: - self._fatal_error(exc, 'Fatal read error on pipe transport') - elif self._loop.get_debug(): - logger.debug("Read error on pipe transport while closing", - exc_info=True) - except ConnectionResetError as exc: - self._force_close(exc) - except OSError as exc: - self._fatal_error(exc, 'Fatal read error on pipe transport') - except futures.CancelledError: - if not self._closing: - raise + if data is not None: + self._data_received(data) class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, diff --git a/Lib/asyncio/protocols.py b/Lib/asyncio/protocols.py index b8d2e6b..4d47da3 100644 --- a/Lib/asyncio/protocols.py +++ b/Lib/asyncio/protocols.py @@ -189,3 +189,22 @@ class SubprocessProtocol(BaseProtocol): def process_exited(self): """Called when subprocess has exited.""" + + +def _feed_data_to_bufferred_proto(proto, data): + data_len = len(data) + while data_len: + buf = proto.get_buffer(data_len) + buf_len = len(buf) + if not buf_len: + raise RuntimeError('get_buffer() returned an empty buffer') + + if buf_len >= data_len: + buf[:data_len] = data + proto.buffer_updated(data_len) + return + else: + buf[:buf_len] = data[:buf_len] + proto.buffer_updated(buf_len) + data = data[buf_len:] + data_len = len(data) diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py index fac2ae7..5578c6f 100644 --- a/Lib/asyncio/sslproto.py +++ b/Lib/asyncio/sslproto.py @@ -535,7 +535,7 @@ class SSLProtocol(protocols.Protocol): if chunk: try: if self._app_protocol_is_buffer: - _feed_data_to_bufferred_proto( + protocols._feed_data_to_bufferred_proto( self._app_protocol, chunk) else: self._app_protocol.data_received(chunk) @@ -721,22 +721,3 @@ class SSLProtocol(protocols.Protocol): self._transport.abort() finally: self._finalize() - - -def _feed_data_to_bufferred_proto(proto, data): - data_len = len(data) - while data_len: - buf = proto.get_buffer(data_len) - buf_len = len(buf) - if not buf_len: - raise RuntimeError('get_buffer() returned an empty buffer') - - if buf_len >= data_len: - buf[:data_len] = data - proto.buffer_updated(data_len) - return - else: - buf[:buf_len] = data[:buf_len] - proto.buffer_updated(buf_len) - data = data[buf_len:] - data_len = len(data) |