summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-11-25 16:21:43 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2014-11-25 16:21:43 (GMT)
commit6a11e5e1aed1cb3566edb41d9f4a6b490abb2581 (patch)
treee9fca62df358be222bc35970b2e43bb0bad2d639
parent166ebc4e5dd09f005c6144b7568da83728b8b893 (diff)
parent5ef586f25a6d5128a15341e849d7dca4fe882d22 (diff)
downloadcpython-6a11e5e1aed1cb3566edb41d9f4a6b490abb2581.zip
cpython-6a11e5e1aed1cb3566edb41d9f4a6b490abb2581.tar.gz
cpython-6a11e5e1aed1cb3566edb41d9f4a6b490abb2581.tar.bz2
(Merge 3.4) Closes #22685, asyncio: Set the transport of stdout and stderr
StreamReader objects in the SubprocessStreamProtocol. It allows to pause the transport to not buffer too much stdout or stderr data.
-rw-r--r--Lib/asyncio/subprocess.py17
-rw-r--r--Lib/test/test_asyncio/test_subprocess.py32
2 files changed, 44 insertions, 5 deletions
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index e4c1499..f6d6a14 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -41,15 +41,22 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
def connection_made(self, transport):
self._transport = transport
- if transport.get_pipe_transport(1):
+
+ stdout_transport = transport.get_pipe_transport(1)
+ if stdout_transport is not None:
self.stdout = streams.StreamReader(limit=self._limit,
loop=self._loop)
- if transport.get_pipe_transport(2):
+ self.stdout.set_transport(stdout_transport)
+
+ stderr_transport = transport.get_pipe_transport(2)
+ if stderr_transport is not None:
self.stderr = streams.StreamReader(limit=self._limit,
loop=self._loop)
- stdin = transport.get_pipe_transport(0)
- if stdin is not None:
- self.stdin = streams.StreamWriter(stdin,
+ self.stderr.set_transport(stderr_transport)
+
+ stdin_transport = transport.get_pipe_transport(0)
+ if stdin_transport is not None:
+ self.stdin = streams.StreamWriter(stdin_transport,
protocol=self,
reader=None,
loop=self._loop)
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index 0e9e1ce..d0ab230 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -4,6 +4,7 @@ import asyncio
import signal
import sys
import unittest
+from unittest import mock
from test import support
if sys.platform != 'win32':
from asyncio import unix_events
@@ -161,6 +162,37 @@ class SubprocessMixin:
self.loop.run_until_complete(proc.communicate(large_data))
self.loop.run_until_complete(proc.wait())
+ def test_pause_reading(self):
+ @asyncio.coroutine
+ def test_pause_reading():
+ limit = 100
+
+ code = '\n'.join((
+ 'import sys',
+ 'sys.stdout.write("x" * %s)' % (limit * 2 + 1),
+ 'sys.stdout.flush()',
+ ))
+ proc = yield from asyncio.create_subprocess_exec(
+ sys.executable, '-c', code,
+ stdin=asyncio.subprocess.PIPE,
+ stdout=asyncio.subprocess.PIPE,
+ limit=limit,
+ loop=self.loop)
+ stdout_transport = proc._transport.get_pipe_transport(1)
+ stdout_transport.pause_reading = mock.Mock()
+
+ yield from proc.wait()
+
+ # The child process produced more than limit bytes of output,
+ # the stream reader transport should pause the protocol to not
+ # allocate too much memory.
+ return stdout_transport.pause_reading.called
+
+ # Issue #22685: Ensure that the stream reader pauses the protocol
+ # when the child process produces too much data
+ called = self.loop.run_until_complete(test_pause_reading())
+ self.assertTrue(called)
+
if sys.platform != 'win32':
# Unix