diff options
author | Guido van Rossum <guido@dropbox.com> | 2013-11-01 21:18:02 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@dropbox.com> | 2013-11-01 21:18:02 (GMT) |
commit | 2b57016458bc8a4400c03e204b431e2723a0e579 (patch) | |
tree | a6bb3b08c6ea9307cd0a64659248c6d7ecdb9187 /Lib/asyncio | |
parent | 21c85a7124004bfbddf2c3db4085ec48842e90f6 (diff) | |
download | cpython-2b57016458bc8a4400c03e204b431e2723a0e579.zip cpython-2b57016458bc8a4400c03e204b431e2723a0e579.tar.gz cpython-2b57016458bc8a4400c03e204b431e2723a0e579.tar.bz2 |
asyncio: Refactor ssl transport ready loop (Nikolay Kim).
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/selector_events.py | 94 |
1 files changed, 53 insertions, 41 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 44430b2..a975dbb 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -286,7 +286,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err != 0: # Jump to the except clause below. - raise OSError(err, 'Connect call failed') + raise OSError(err, 'Connect call failed %s' % (address,)) except (BlockingIOError, InterruptedError): self.add_writer(fd, self._sock_connect, fut, True, sock, address) except Exception as exc: @@ -413,7 +413,7 @@ class _SelectorTransport(transports.Transport): try: self._protocol.pause_writing() except Exception: - tulip_log.exception('pause_writing() failed') + logger.exception('pause_writing() failed') def _maybe_resume_protocol(self): if (self._protocol_paused and @@ -422,7 +422,7 @@ class _SelectorTransport(transports.Transport): try: self._protocol.resume_writing() except Exception: - tulip_log.exception('resume_writing() failed') + logger.exception('resume_writing() failed') def set_write_buffer_limits(self, high=None, low=None): if high is None: @@ -635,15 +635,16 @@ class _SelectorSslTransport(_SelectorTransport): compression=self._sock.compression(), ) - self._loop.add_reader(self._sock_fd, self._on_ready) - self._loop.add_writer(self._sock_fd, self._on_ready) + self._read_wants_write = False + self._write_wants_read = False + self._loop.add_reader(self._sock_fd, self._read_ready) self._loop.call_soon(self._protocol.connection_made, self) if self._waiter is not None: self._loop.call_soon(self._waiter.set_result, None) def pause_reading(self): # XXX This is a bit icky, given the comment at the top of - # _on_ready(). Is it possible to evoke a deadlock? I don't + # _read_ready(). Is it possible to evoke a deadlock? I don't # know, although it doesn't look like it; write() will still # accept more data for the buffer and eventually the app will # call resume_reading() again, and things will flow again. @@ -658,41 +659,55 @@ class _SelectorSslTransport(_SelectorTransport): self._paused = False if self._closing: return - self._loop.add_reader(self._sock_fd, self._on_ready) + self._loop.add_reader(self._sock_fd, self._read_ready) - def _on_ready(self): - # Because of renegotiations (?), there's no difference between - # readable and writable. We just try both. XXX This may be - # incorrect; we probably need to keep state about what we - # should do next. + def _read_ready(self): + if self._write_wants_read: + self._write_wants_read = False + self._write_ready() - # First try reading. - if not self._closing and not self._paused: - try: - data = self._sock.recv(self.max_size) - except (BlockingIOError, InterruptedError, - ssl.SSLWantReadError, ssl.SSLWantWriteError): - pass - except Exception as exc: - self._fatal_error(exc) + if self._buffer: + self._loop.add_writer(self._sock_fd, self._write_ready) + + try: + data = self._sock.recv(self.max_size) + except (BlockingIOError, InterruptedError, ssl.SSLWantReadError): + pass + except ssl.SSLWantWriteError: + self._read_wants_write = True + self._loop.remove_reader(self._sock_fd) + self._loop.add_writer(self._sock_fd, self._write_ready) + except Exception as exc: + self._fatal_error(exc) + else: + if data: + self._protocol.data_received(data) else: - if data: - self._protocol.data_received(data) - else: - try: - self._protocol.eof_received() - finally: - self.close() + try: + self._protocol.eof_received() + finally: + self.close() + + def _write_ready(self): + if self._read_wants_write: + self._read_wants_write = False + self._read_ready() + + if not (self._paused or self._closing): + self._loop.add_reader(self._sock_fd, self._read_ready) - # Now try writing, if there's anything to write. if self._buffer: data = b''.join(self._buffer) self._buffer.clear() try: n = self._sock.send(data) except (BlockingIOError, InterruptedError, - ssl.SSLWantReadError, ssl.SSLWantWriteError): + ssl.SSLWantWriteError): n = 0 + except ssl.SSLWantReadError: + n = 0 + self._loop.remove_writer(self._sock_fd) + self._write_wants_read = True except Exception as exc: self._loop.remove_writer(self._sock_fd) self._fatal_error(exc) @@ -701,11 +716,12 @@ class _SelectorSslTransport(_SelectorTransport): if n < len(data): self._buffer.append(data[n:]) - self._maybe_resume_protocol() # May append to buffer. + self._maybe_resume_protocol() # May append to buffer. - if self._closing and not self._buffer: + if not self._buffer: self._loop.remove_writer(self._sock_fd) - self._call_connection_lost(None) + if self._closing: + self._call_connection_lost(None) def write(self, data): assert isinstance(data, bytes), repr(type(data)) @@ -718,20 +734,16 @@ class _SelectorSslTransport(_SelectorTransport): self._conn_lost += 1 return - # We could optimize, but the callback can do this for now. + if not self._buffer: + self._loop.add_writer(self._sock_fd, self._write_ready) + + # Add it to the buffer. self._buffer.append(data) self._maybe_pause_protocol() def can_write_eof(self): return False - def close(self): - if self._closing: - return - self._closing = True - self._conn_lost += 1 - self._loop.remove_reader(self._sock_fd) - class _SelectorDatagramTransport(_SelectorTransport): |