summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2014-05-12 17:04:37 (GMT)
committerGuido van Rossum <guido@python.org>2014-05-12 17:04:37 (GMT)
commitbf88ffba5edf780e12a64db9cb929216c19f6cfa (patch)
tree8ee2815214e6848e7e3e909151a43ea552189157
parenta869fd3dc0117d2f02fb7e4146b6c446a68eaeb4 (diff)
downloadcpython-bf88ffba5edf780e12a64db9cb929216c19f6cfa.zip
cpython-bf88ffba5edf780e12a64db9cb929216c19f6cfa.tar.gz
cpython-bf88ffba5edf780e12a64db9cb929216c19f6cfa.tar.bz2
asyncio: Fix upstream issue 168: StreamReader.read(-1) from pipe may hang if data exceeds buffer limit.
-rw-r--r--Lib/asyncio/streams.py17
-rw-r--r--Lib/test/test_asyncio/test_streams.py36
2 files changed, 47 insertions, 6 deletions
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 27d595f..e239248 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -419,12 +419,17 @@ class StreamReader:
return b''
if n < 0:
- while not self._eof:
- self._waiter = self._create_waiter('read')
- try:
- yield from self._waiter
- finally:
- self._waiter = None
+ # This used to just loop creating a new waiter hoping to
+ # collect everything in self._buffer, but that would
+ # deadlock if the subprocess sends more than self.limit
+ # bytes. So just call self.read(self._limit) until EOF.
+ blocks = []
+ while True:
+ block = yield from self.read(self._limit)
+ if not block:
+ break
+ blocks.append(block)
+ return b''.join(blocks)
else:
if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read')
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index 031499e..23012b7 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -1,7 +1,9 @@
"""Tests for streams.py."""
import gc
+import os
import socket
+import sys
import unittest
from unittest import mock
try:
@@ -583,6 +585,40 @@ class StreamReaderTests(unittest.TestCase):
server.stop()
self.assertEqual(msg, b"hello world!\n")
+ @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
+ def test_read_all_from_pipe_reader(self):
+ # See Tulip issue 168. This test is derived from the example
+ # subprocess_attach_read_pipe.py, but we configure the
+ # StreamReader's limit so that twice it is less than the size
+ # of the data writter. Also we must explicitly attach a child
+ # watcher to the event loop.
+
+ watcher = asyncio.get_child_watcher()
+ watcher.attach_loop(self.loop)
+
+ code = """\
+import os, sys
+fd = int(sys.argv[1])
+os.write(fd, b'data')
+os.close(fd)
+"""
+ rfd, wfd = os.pipe()
+ args = [sys.executable, '-c', code, str(wfd)]
+
+ pipe = open(rfd, 'rb', 0)
+ reader = asyncio.StreamReader(loop=self.loop, limit=1)
+ protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
+ transport, _ = self.loop.run_until_complete(
+ self.loop.connect_read_pipe(lambda: protocol, pipe))
+
+ proc = self.loop.run_until_complete(
+ asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop))
+ self.loop.run_until_complete(proc.wait())
+
+ os.close(wfd)
+ data = self.loop.run_until_complete(reader.read(-1))
+ self.assertEqual(data, b'data')
+
if __name__ == '__main__':
unittest.main()