summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/proactor_events.py
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2013-12-04 20:12:07 (GMT)
committerGuido van Rossum <guido@python.org>2013-12-04 20:12:07 (GMT)
commitebb8e58f0ab66a9d124580e4a1d5b0d5499b0a4d (patch)
treef602167fd70427162ffa575fac0a63ad7ad1c036 /Lib/asyncio/proactor_events.py
parent638aebd58e16a686ea8641f94ca714c406df1792 (diff)
downloadcpython-ebb8e58f0ab66a9d124580e4a1d5b0d5499b0a4d.zip
cpython-ebb8e58f0ab66a9d124580e4a1d5b0d5499b0a4d.tar.gz
cpython-ebb8e58f0ab66a9d124580e4a1d5b0d5499b0a4d.tar.bz2
asyncio: Write flow control for proactor event loop.
Diffstat (limited to 'Lib/asyncio/proactor_events.py')
-rw-r--r--Lib/asyncio/proactor_events.py116
1 files changed, 99 insertions, 17 deletions
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index ce226b9..979bc25 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -24,12 +24,14 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
self._sock = sock
self._protocol = protocol
self._server = server
- self._buffer = []
+ self._buffer = None # None or bytearray.
self._read_fut = None
self._write_fut = None
self._conn_lost = 0
self._closing = False # Set when close() called.
self._eof_written = False
+ self._protocol_paused = False
+ self.set_write_buffer_limits()
if self._server is not None:
self._server.attach(self)
self._loop.call_soon(self._protocol.connection_made, self)
@@ -63,7 +65,7 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
if self._read_fut:
self._read_fut.cancel()
self._write_fut = self._read_fut = None
- self._buffer = []
+ self._buffer = None
self._loop.call_soon(self._call_connection_lost, exc)
def _call_connection_lost(self, exc):
@@ -82,6 +84,53 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
server.detach(self)
self._server = None
+ # XXX The next four methods are nearly identical to corresponding
+ # ones in _SelectorTransport. Maybe refactor buffer management to
+ # share the implementations? (Also these are really only needed
+ # by _ProactorWritePipeTransport but since _buffer is defined on
+ # the base class I am putting it here for now.)
+
+ def _maybe_pause_protocol(self):
+ size = self.get_write_buffer_size()
+ if size <= self._high_water:
+ return
+ if not self._protocol_paused:
+ self._protocol_paused = True
+ try:
+ self._protocol.pause_writing()
+ except Exception:
+ logger.exception('pause_writing() failed')
+
+ def _maybe_resume_protocol(self):
+ if (self._protocol_paused and
+ self.get_write_buffer_size() <= self._low_water):
+ self._protocol_paused = False
+ try:
+ self._protocol.resume_writing()
+ except Exception:
+ logger.exception('resume_writing() failed')
+
+ def set_write_buffer_limits(self, high=None, low=None):
+ if high is None:
+ if low is None:
+ high = 64*1024
+ else:
+ high = 4*low
+ if low is None:
+ low = high // 4
+ if not high >= low >= 0:
+ raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
+ (high, low))
+ self._high_water = high
+ self._low_water = low
+
+ def get_write_buffer_size(self):
+ # NOTE: This doesn't take into account data already passed to
+ # send() even if send() hasn't finished yet.
+ if not self._buffer:
+ return 0
+ return len(self._buffer)
+
class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
transports.ReadTransport):
@@ -95,12 +144,15 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
self._loop.call_soon(self._loop_reading)
def pause_reading(self):
- assert not self._closing, 'Cannot pause_reading() when closing'
- assert not self._paused, 'Already paused'
+ if self._closing:
+ raise RuntimeError('Cannot pause_reading() when closing')
+ if self._paused:
+ raise RuntimeError('Already paused')
self._paused = True
def resume_reading(self):
- assert self._paused, 'Not paused'
+ if not self._paused:
+ raise RuntimeError('Not paused')
self._paused = False
if self._closing:
return
@@ -155,9 +207,11 @@ class _ProactorWritePipeTransport(_ProactorBasePipeTransport,
"""Transport for write pipes."""
def write(self, data):
- assert isinstance(data, bytes), repr(data)
+ if not isinstance(data, (bytes, bytearray, memoryview)):
+ raise TypeError('data argument must be byte-ish (%r)',
+ type(data))
if self._eof_written:
- raise IOError('write_eof() already called')
+ raise RuntimeError('write_eof() already called')
if not data:
return
@@ -167,26 +221,53 @@ class _ProactorWritePipeTransport(_ProactorBasePipeTransport,
logger.warning('socket.send() raised exception.')
self._conn_lost += 1
return
- self._buffer.append(data)
- if self._write_fut is None:
- self._loop_writing()
- def _loop_writing(self, f=None):
+ # Observable states:
+ # 1. IDLE: _write_fut and _buffer both None
+ # 2. WRITING: _write_fut set; _buffer None
+ # 3. BACKED UP: _write_fut set; _buffer a bytearray
+ # We always copy the data, so the caller can't modify it
+ # while we're still waiting for the I/O to happen.
+ if self._write_fut is None: # IDLE -> WRITING
+ assert self._buffer is None
+ # Pass a copy, except if it's already immutable.
+ self._loop_writing(data=bytes(data))
+ # XXX Should we pause the protocol at this point
+ # if len(data) > self._high_water? (That would
+ # require keeping track of the number of bytes passed
+ # to a send() that hasn't finished yet.)
+ elif not self._buffer: # WRITING -> BACKED UP
+ # Make a mutable copy which we can extend.
+ self._buffer = bytearray(data)
+ self._maybe_pause_protocol()
+ else: # BACKED UP
+ # Append to buffer (also copies).
+ self._buffer.extend(data)
+ self._maybe_pause_protocol()
+
+ def _loop_writing(self, f=None, data=None):
try:
assert f is self._write_fut
self._write_fut = None
if f:
f.result()
- data = b''.join(self._buffer)
- self._buffer = []
+ if data is None:
+ data = self._buffer
+ self._buffer = None
if not data:
if self._closing:
self._loop.call_soon(self._call_connection_lost, None)
if self._eof_written:
self._sock.shutdown(socket.SHUT_WR)
- return
- self._write_fut = self._loop._proactor.send(self._sock, data)
- self._write_fut.add_done_callback(self._loop_writing)
+ else:
+ self._write_fut = self._loop._proactor.send(self._sock, data)
+ self._write_fut.add_done_callback(self._loop_writing)
+ # Now that we've reduced the buffer size, tell the
+ # protocol to resume writing if it was paused. Note that
+ # we do this last since the callback is called immediately
+ # and it may add more data to the buffer (even causing the
+ # protocol to be paused again).
+ self._maybe_resume_protocol()
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
@@ -330,7 +411,8 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
self._csock.send(b'x')
def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
- assert not ssl, 'IocpEventLoop is incompatible with SSL.'
+ if ssl:
+ raise ValueError('IocpEventLoop is incompatible with SSL.')
def loop(f=None):
try: