diff options
author | Guido van Rossum <guido@dropbox.com> | 2013-10-18 22:17:11 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@dropbox.com> | 2013-10-18 22:17:11 (GMT) |
commit | 355491dc47ea4a2574ee8f9ea60a0d25fe3fba43 (patch) | |
tree | 2b9661e6f8c6d24704fae0c82467674802749d6d /Lib/asyncio/selector_events.py | |
parent | 051a33148813d045c33745ccd0e9e20e96b1bb6f (diff) | |
download | cpython-355491dc47ea4a2574ee8f9ea60a0d25fe3fba43.zip cpython-355491dc47ea4a2574ee8f9ea60a0d25fe3fba43.tar.gz cpython-355491dc47ea4a2574ee8f9ea60a0d25fe3fba43.tar.bz2 |
Write flow control for asyncio (includes asyncio.streams overhaul).
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r-- | Lib/asyncio/selector_events.py | 78 |
1 files changed, 64 insertions, 14 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 084d9be..63164f0 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -346,8 +346,10 @@ class _SelectorTransport(transports.Transport): self._buffer = collections.deque() self._conn_lost = 0 # Set when call to connection_lost scheduled. self._closing = False # Set when close() called. - if server is not None: - server.attach(self) + self._protocol_paused = False + self.set_write_buffer_limits() + if self._server is not None: + self._server.attach(self) def abort(self): self._force_close(None) @@ -392,6 +394,40 @@ class _SelectorTransport(transports.Transport): server.detach(self) self._server = None + def _maybe_pause_protocol(self): + size = self.get_write_buffer_size() + if size <= self._high_water: + return + if not self._protocol_paused: + self._protocol_paused = True + try: + self._protocol.pause_writing() + except Exception: + tulip_log.exception('pause_writing() failed') + + def _maybe_resume_protocol(self): + if self._protocol_paused and self.get_write_buffer_size() <= self._low_water: + self._protocol_paused = False + try: + self._protocol.resume_writing() + except Exception: + tulip_log.exception('resume_writing() failed') + + def set_write_buffer_limits(self, high=None, low=None): + if high is None: + if low is None: + high = 64*1024 + else: + high = 4*low + if low is None: + low = high // 4 + assert 0 <= low <= high, repr((low, high)) + self._high_water = high + self._low_water = low + + def get_write_buffer_size(self): + return sum(len(data) for data in self._buffer) + class _SelectorSocketTransport(_SelectorTransport): @@ -447,7 +483,7 @@ class _SelectorSocketTransport(_SelectorTransport): return if not self._buffer: - # Attempt to send it right away first. + # Optimization: try to send now. try: n = self._sock.send(data) except (BlockingIOError, InterruptedError): @@ -459,34 +495,36 @@ class _SelectorSocketTransport(_SelectorTransport): data = data[n:] if not data: return - - # Start async I/O. + # Not all was written; register write handler. self._loop.add_writer(self._sock_fd, self._write_ready) + # Add it to the buffer. self._buffer.append(data) + self._maybe_pause_protocol() def _write_ready(self): data = b''.join(self._buffer) assert data, 'Data should not be empty' - self._buffer.clear() + self._buffer.clear() # Optimistically; may have to put it back later. try: n = self._sock.send(data) except (BlockingIOError, InterruptedError): - self._buffer.append(data) + self._buffer.append(data) # Still need to write this. except Exception as exc: self._loop.remove_writer(self._sock_fd) self._fatal_error(exc) else: data = data[n:] - if not data: + if data: + self._buffer.append(data) # Still need to write this. + self._maybe_resume_protocol() # May append to buffer. + if not self._buffer: self._loop.remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) elif self._eof: self._sock.shutdown(socket.SHUT_WR) - return - self._buffer.append(data) # Try again later. def write_eof(self): if self._eof: @@ -546,16 +584,23 @@ class _SelectorSslTransport(_SelectorTransport): self._loop.add_writer(self._sock_fd, self._on_handshake) return except Exception as exc: + self._loop.remove_reader(self._sock_fd) + self._loop.remove_writer(self._sock_fd) self._sock.close() if self._waiter is not None: self._waiter.set_exception(exc) return except BaseException as exc: + self._loop.remove_reader(self._sock_fd) + self._loop.remove_writer(self._sock_fd) self._sock.close() if self._waiter is not None: self._waiter.set_exception(exc) raise + self._loop.remove_reader(self._sock_fd) + self._loop.remove_writer(self._sock_fd) + # Verify hostname if requested. peercert = self._sock.getpeercert() if (self._server_hostname is not None and @@ -574,8 +619,6 @@ class _SelectorSslTransport(_SelectorTransport): compression=self._sock.compression(), ) - self._loop.remove_reader(self._sock_fd) - self._loop.remove_writer(self._sock_fd) self._loop.add_reader(self._sock_fd, self._on_ready) self._loop.add_writer(self._sock_fd, self._on_ready) self._loop.call_soon(self._protocol.connection_made, self) @@ -642,6 +685,8 @@ class _SelectorSslTransport(_SelectorTransport): if n < len(data): self._buffer.append(data[n:]) + self._maybe_resume_protocol() # May append to buffer. + if self._closing and not self._buffer: self._loop.remove_writer(self._sock_fd) self._call_connection_lost(None) @@ -657,8 +702,9 @@ class _SelectorSslTransport(_SelectorTransport): self._conn_lost += 1 return - self._buffer.append(data) # We could optimize, but the callback can do this for now. + self._buffer.append(data) + self._maybe_pause_protocol() def can_write_eof(self): return False @@ -675,11 +721,13 @@ class _SelectorDatagramTransport(_SelectorTransport): def __init__(self, loop, sock, protocol, address=None, extra=None): super().__init__(loop, sock, protocol, extra) - self._address = address self._loop.add_reader(self._sock_fd, self._read_ready) self._loop.call_soon(self._protocol.connection_made, self) + def get_write_buffer_size(self): + return sum(len(data) for data, _ in self._buffer) + def _read_ready(self): try: data, addr = self._sock.recvfrom(self.max_size) @@ -723,6 +771,7 @@ class _SelectorDatagramTransport(_SelectorTransport): return self._buffer.append((data, addr)) + self._maybe_pause_protocol() def _sendto_ready(self): while self._buffer: @@ -743,6 +792,7 @@ class _SelectorDatagramTransport(_SelectorTransport): self._fatal_error(exc) return + self._maybe_resume_protocol() # May append to buffer. if not self._buffer: self._loop.remove_writer(self._sock_fd) if self._closing: |