summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/proactor_events.py
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-02-01 21:49:59 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2014-02-01 21:49:59 (GMT)
commit915bcb01110c7db65f8be9139bf887c749fbde75 (patch)
treefa24b947b19c1479ed581dc817c2e696386f3fb0 /Lib/asyncio/proactor_events.py
parent153d97b24e7253f344860094eb2c98ed93657720 (diff)
downloadcpython-915bcb01110c7db65f8be9139bf887c749fbde75.zip
cpython-915bcb01110c7db65f8be9139bf887c749fbde75.tar.gz
cpython-915bcb01110c7db65f8be9139bf887c749fbde75.tar.bz2
Issue #20400: Merge Tulip into Python: add the new asyncio.subprocess module
* Add a new asyncio.subprocess module * Add new create_subprocess_exec() and create_subprocess_shell() functions * The new asyncio.subprocess.SubprocessStreamProtocol creates stream readers for stdout and stderr and a stream writer for stdin. * The new asyncio.subprocess.Process class offers an API close to the subprocess.Popen class: - pid, returncode, stdin, stdout and stderr attributes - communicate(), wait(), send_signal(), terminate() and kill() methods * Remove STDIN (0), STDOUT (1) and STDERR (2) constants from base_subprocess and unix_events, to not be confused with the symbols with the same name of subprocess and asyncio.subprocess modules * _ProactorBasePipeTransport.get_write_buffer_size() now counts also the size of the pending write * _ProactorBaseWritePipeTransport._loop_writing() may now pause the protocol if the write buffer size is greater than the high water mark (64 KB by default)
Diffstat (limited to 'Lib/asyncio/proactor_events.py')
-rw-r--r--Lib/asyncio/proactor_events.py34
1 files changed, 21 insertions, 13 deletions
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index b6b3be2..fb67155 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -29,6 +29,7 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
self._buffer = None # None or bytearray.
self._read_fut = None
self._write_fut = None
+ self._pending_write = 0
self._conn_lost = 0
self._closing = False # Set when close() called.
self._eof_written = False
@@ -68,6 +69,7 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
if self._read_fut:
self._read_fut.cancel()
self._write_fut = self._read_fut = None
+ self._pending_write = 0
self._buffer = None
self._loop.call_soon(self._call_connection_lost, exc)
@@ -128,11 +130,10 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
self._low_water = low
def get_write_buffer_size(self):
- # NOTE: This doesn't take into account data already passed to
- # send() even if send() hasn't finished yet.
- if not self._buffer:
- return 0
- return len(self._buffer)
+ size = self._pending_write
+ if self._buffer is not None:
+ size += len(self._buffer)
+ return size
class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
@@ -206,7 +207,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
- transports.WriteTransport):
+ transports.WriteTransport):
"""Transport for write pipes."""
def write(self, data):
@@ -252,6 +253,7 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
try:
assert f is self._write_fut
self._write_fut = None
+ self._pending_write = 0
if f:
f.result()
if data is None:
@@ -262,15 +264,21 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
self._loop.call_soon(self._call_connection_lost, None)
if self._eof_written:
self._sock.shutdown(socket.SHUT_WR)
+ # Now that we've reduced the buffer size, tell the
+ # protocol to resume writing if it was paused. Note that
+ # we do this last since the callback is called immediately
+ # and it may add more data to the buffer (even causing the
+ # protocol to be paused again).
+ self._maybe_resume_protocol()
else:
self._write_fut = self._loop._proactor.send(self._sock, data)
- self._write_fut.add_done_callback(self._loop_writing)
- # Now that we've reduced the buffer size, tell the
- # protocol to resume writing if it was paused. Note that
- # we do this last since the callback is called immediately
- # and it may add more data to the buffer (even causing the
- # protocol to be paused again).
- self._maybe_resume_protocol()
+ if not self._write_fut.done():
+ assert self._pending_write == 0
+ self._pending_write = len(data)
+ self._write_fut.add_done_callback(self._loop_writing)
+ self._maybe_pause_protocol()
+ else:
+ self._write_fut.add_done_callback(self._loop_writing)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc: