summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/selector_events.py
diff options
context:
space:
mode:
authorGuido van Rossum <guido@dropbox.com>2013-10-18 22:17:11 (GMT)
committerGuido van Rossum <guido@dropbox.com>2013-10-18 22:17:11 (GMT)
commit355491dc47ea4a2574ee8f9ea60a0d25fe3fba43 (patch)
tree2b9661e6f8c6d24704fae0c82467674802749d6d /Lib/asyncio/selector_events.py
parent051a33148813d045c33745ccd0e9e20e96b1bb6f (diff)
downloadcpython-355491dc47ea4a2574ee8f9ea60a0d25fe3fba43.zip
cpython-355491dc47ea4a2574ee8f9ea60a0d25fe3fba43.tar.gz
cpython-355491dc47ea4a2574ee8f9ea60a0d25fe3fba43.tar.bz2
Write flow control for asyncio (includes asyncio.streams overhaul).
Diffstat (limited to 'Lib/asyncio/selector_events.py')
-rw-r--r--Lib/asyncio/selector_events.py78
1 files changed, 64 insertions, 14 deletions
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 084d9be..63164f0 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -346,8 +346,10 @@ class _SelectorTransport(transports.Transport):
self._buffer = collections.deque()
self._conn_lost = 0 # Set when call to connection_lost scheduled.
self._closing = False # Set when close() called.
- if server is not None:
- server.attach(self)
+ self._protocol_paused = False
+ self.set_write_buffer_limits()
+ if self._server is not None:
+ self._server.attach(self)
def abort(self):
self._force_close(None)
@@ -392,6 +394,40 @@ class _SelectorTransport(transports.Transport):
server.detach(self)
self._server = None
+ def _maybe_pause_protocol(self):
+ size = self.get_write_buffer_size()
+ if size <= self._high_water:
+ return
+ if not self._protocol_paused:
+ self._protocol_paused = True
+ try:
+ self._protocol.pause_writing()
+ except Exception:
+ tulip_log.exception('pause_writing() failed')
+
+ def _maybe_resume_protocol(self):
+ if self._protocol_paused and self.get_write_buffer_size() <= self._low_water:
+ self._protocol_paused = False
+ try:
+ self._protocol.resume_writing()
+ except Exception:
+ tulip_log.exception('resume_writing() failed')
+
+ def set_write_buffer_limits(self, high=None, low=None):
+ if high is None:
+ if low is None:
+ high = 64*1024
+ else:
+ high = 4*low
+ if low is None:
+ low = high // 4
+ assert 0 <= low <= high, repr((low, high))
+ self._high_water = high
+ self._low_water = low
+
+ def get_write_buffer_size(self):
+ return sum(len(data) for data in self._buffer)
+
class _SelectorSocketTransport(_SelectorTransport):
@@ -447,7 +483,7 @@ class _SelectorSocketTransport(_SelectorTransport):
return
if not self._buffer:
- # Attempt to send it right away first.
+ # Optimization: try to send now.
try:
n = self._sock.send(data)
except (BlockingIOError, InterruptedError):
@@ -459,34 +495,36 @@ class _SelectorSocketTransport(_SelectorTransport):
data = data[n:]
if not data:
return
-
- # Start async I/O.
+ # Not all was written; register write handler.
self._loop.add_writer(self._sock_fd, self._write_ready)
+ # Add it to the buffer.
self._buffer.append(data)
+ self._maybe_pause_protocol()
def _write_ready(self):
data = b''.join(self._buffer)
assert data, 'Data should not be empty'
- self._buffer.clear()
+ self._buffer.clear() # Optimistically; may have to put it back later.
try:
n = self._sock.send(data)
except (BlockingIOError, InterruptedError):
- self._buffer.append(data)
+ self._buffer.append(data) # Still need to write this.
except Exception as exc:
self._loop.remove_writer(self._sock_fd)
self._fatal_error(exc)
else:
data = data[n:]
- if not data:
+ if data:
+ self._buffer.append(data) # Still need to write this.
+ self._maybe_resume_protocol() # May append to buffer.
+ if not self._buffer:
self._loop.remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
elif self._eof:
self._sock.shutdown(socket.SHUT_WR)
- return
- self._buffer.append(data) # Try again later.
def write_eof(self):
if self._eof:
@@ -546,16 +584,23 @@ class _SelectorSslTransport(_SelectorTransport):
self._loop.add_writer(self._sock_fd, self._on_handshake)
return
except Exception as exc:
+ self._loop.remove_reader(self._sock_fd)
+ self._loop.remove_writer(self._sock_fd)
self._sock.close()
if self._waiter is not None:
self._waiter.set_exception(exc)
return
except BaseException as exc:
+ self._loop.remove_reader(self._sock_fd)
+ self._loop.remove_writer(self._sock_fd)
self._sock.close()
if self._waiter is not None:
self._waiter.set_exception(exc)
raise
+ self._loop.remove_reader(self._sock_fd)
+ self._loop.remove_writer(self._sock_fd)
+
# Verify hostname if requested.
peercert = self._sock.getpeercert()
if (self._server_hostname is not None and
@@ -574,8 +619,6 @@ class _SelectorSslTransport(_SelectorTransport):
compression=self._sock.compression(),
)
- self._loop.remove_reader(self._sock_fd)
- self._loop.remove_writer(self._sock_fd)
self._loop.add_reader(self._sock_fd, self._on_ready)
self._loop.add_writer(self._sock_fd, self._on_ready)
self._loop.call_soon(self._protocol.connection_made, self)
@@ -642,6 +685,8 @@ class _SelectorSslTransport(_SelectorTransport):
if n < len(data):
self._buffer.append(data[n:])
+ self._maybe_resume_protocol() # May append to buffer.
+
if self._closing and not self._buffer:
self._loop.remove_writer(self._sock_fd)
self._call_connection_lost(None)
@@ -657,8 +702,9 @@ class _SelectorSslTransport(_SelectorTransport):
self._conn_lost += 1
return
- self._buffer.append(data)
# We could optimize, but the callback can do this for now.
+ self._buffer.append(data)
+ self._maybe_pause_protocol()
def can_write_eof(self):
return False
@@ -675,11 +721,13 @@ class _SelectorDatagramTransport(_SelectorTransport):
def __init__(self, loop, sock, protocol, address=None, extra=None):
super().__init__(loop, sock, protocol, extra)
-
self._address = address
self._loop.add_reader(self._sock_fd, self._read_ready)
self._loop.call_soon(self._protocol.connection_made, self)
+ def get_write_buffer_size(self):
+ return sum(len(data) for data, _ in self._buffer)
+
def _read_ready(self):
try:
data, addr = self._sock.recvfrom(self.max_size)
@@ -723,6 +771,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
return
self._buffer.append((data, addr))
+ self._maybe_pause_protocol()
def _sendto_ready(self):
while self._buffer:
@@ -743,6 +792,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
self._fatal_error(exc)
return
+ self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop.remove_writer(self._sock_fd)
if self._closing: