diff options
author | Guido van Rossum <guido@dropbox.com> | 2013-10-18 17:10:36 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@dropbox.com> | 2013-10-18 17:10:36 (GMT) |
commit | 2546a177650264205e8a52b6836bc5b8c48bf085 (patch) | |
tree | 493debb64fc96b13970c2ee06afe357ebe126860 /Lib | |
parent | 559ae0fb1c4b071fb919ddc10144dd3bf80db07e (diff) | |
download | cpython-2546a177650264205e8a52b6836bc5b8c48bf085.zip cpython-2546a177650264205e8a52b6836bc5b8c48bf085.tar.gz cpython-2546a177650264205e8a52b6836bc5b8c48bf085.tar.bz2 |
Important race condition fix for Tulip.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/asyncio/selector_events.py | 51 |
1 files changed, 18 insertions, 33 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 2edac65..084d9be 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -344,7 +344,7 @@ class _SelectorTransport(transports.Transport): self._protocol = protocol self._server = server self._buffer = collections.deque() - self._conn_lost = 0 + self._conn_lost = 0 # Set when call to connection_lost scheduled. self._closing = False # Set when close() called. if server is not None: server.attach(self) @@ -356,27 +356,27 @@ class _SelectorTransport(transports.Transport): if self._closing: return self._closing = True - self._conn_lost += 1 self._loop.remove_reader(self._sock_fd) if not self._buffer: + self._conn_lost += 1 self._loop.call_soon(self._call_connection_lost, None) def _fatal_error(self, exc): - # should be called from exception handler only - logger.exception('Fatal error for %s', self) + # Should be called from exception handler only. + if not isinstance(exc, (BrokenPipeError, ConnectionResetError)): + logger.exception('Fatal error for %s', self) self._force_close(exc) def _force_close(self, exc): + if self._conn_lost: + return if self._buffer: self._buffer.clear() self._loop.remove_writer(self._sock_fd) - - if self._closing: - return - - self._closing = True + if not self._closing: + self._closing = True + self._loop.remove_reader(self._sock_fd) self._conn_lost += 1 - self._loop.remove_reader(self._sock_fd) self._loop.call_soon(self._call_connection_lost, exc) def _call_connection_lost(self, exc): @@ -424,8 +424,6 @@ class _SelectorSocketTransport(_SelectorTransport): data = self._sock.recv(self.max_size) except (BlockingIOError, InterruptedError): pass - except ConnectionResetError as exc: - self._force_close(exc) except Exception as exc: self._fatal_error(exc) else: @@ -453,17 +451,15 @@ class _SelectorSocketTransport(_SelectorTransport): try: n = self._sock.send(data) except (BlockingIOError, InterruptedError): - n = 0 - except (BrokenPipeError, ConnectionResetError) as exc: - self._force_close(exc) - return - except OSError as exc: + pass + except Exception as exc: self._fatal_error(exc) return else: data = data[n:] if not data: return + # Start async I/O. self._loop.add_writer(self._sock_fd, self._write_ready) @@ -478,9 +474,6 @@ class _SelectorSocketTransport(_SelectorTransport): n = self._sock.send(data) except (BlockingIOError, InterruptedError): self._buffer.append(data) - except (BrokenPipeError, ConnectionResetError) as exc: - self._loop.remove_writer(self._sock_fd) - self._force_close(exc) except Exception as exc: self._loop.remove_writer(self._sock_fd) self._fatal_error(exc) @@ -493,7 +486,6 @@ class _SelectorSocketTransport(_SelectorTransport): elif self._eof: self._sock.shutdown(socket.SHUT_WR) return - self._buffer.append(data) # Try again later. def write_eof(self): @@ -622,8 +614,6 @@ class _SelectorSslTransport(_SelectorTransport): except (BlockingIOError, InterruptedError, ssl.SSLWantReadError, ssl.SSLWantWriteError): pass - except ConnectionResetError as exc: - self._force_close(exc) except Exception as exc: self._fatal_error(exc) else: @@ -644,10 +634,6 @@ class _SelectorSslTransport(_SelectorTransport): except (BlockingIOError, InterruptedError, ssl.SSLWantReadError, ssl.SSLWantWriteError): n = 0 - except (BrokenPipeError, ConnectionResetError) as exc: - self._loop.remove_writer(self._sock_fd) - self._force_close(exc) - return except Exception as exc: self._loop.remove_writer(self._sock_fd) self._fatal_error(exc) @@ -726,12 +712,12 @@ class _SelectorDatagramTransport(_SelectorTransport): else: self._sock.sendto(data, addr) return + except (BlockingIOError, InterruptedError): + self._loop.add_writer(self._sock_fd, self._sendto_ready) except ConnectionRefusedError as exc: if self._address: self._fatal_error(exc) return - except (BlockingIOError, InterruptedError): - self._loop.add_writer(self._sock_fd, self._sendto_ready) except Exception as exc: self._fatal_error(exc) return @@ -746,13 +732,13 @@ class _SelectorDatagramTransport(_SelectorTransport): self._sock.send(data) else: self._sock.sendto(data, addr) + except (BlockingIOError, InterruptedError): + self._buffer.appendleft((data, addr)) # Try again later. + break except ConnectionRefusedError as exc: if self._address: self._fatal_error(exc) return - except (BlockingIOError, InterruptedError): - self._buffer.appendleft((data, addr)) # Try again later. - break except Exception as exc: self._fatal_error(exc) return @@ -765,5 +751,4 @@ class _SelectorDatagramTransport(_SelectorTransport): def _force_close(self, exc): if self._address and isinstance(exc, ConnectionRefusedError): self._protocol.connection_refused(exc) - super()._force_close(exc) |