From bf88ffba5edf780e12a64db9cb929216c19f6cfa Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Mon, 12 May 2014 10:04:37 -0700 Subject: asyncio: Fix upstream issue 168: StreamReader.read(-1) from pipe may hang if data exceeds buffer limit. --- Lib/asyncio/streams.py | 17 +++++++++++------ Lib/test/test_asyncio/test_streams.py | 36 +++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 27d595f..e239248 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -419,12 +419,17 @@ class StreamReader: return b'' if n < 0: - while not self._eof: - self._waiter = self._create_waiter('read') - try: - yield from self._waiter - finally: - self._waiter = None + # This used to just loop creating a new waiter hoping to + # collect everything in self._buffer, but that would + # deadlock if the subprocess sends more than self.limit + # bytes. So just call self.read(self._limit) until EOF. + blocks = [] + while True: + block = yield from self.read(self._limit) + if not block: + break + blocks.append(block) + return b''.join(blocks) else: if not self._buffer and not self._eof: self._waiter = self._create_waiter('read') diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 031499e..23012b7 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -1,7 +1,9 @@ """Tests for streams.py.""" import gc +import os import socket +import sys import unittest from unittest import mock try: @@ -583,6 +585,40 @@ class StreamReaderTests(unittest.TestCase): server.stop() self.assertEqual(msg, b"hello world!\n") + @unittest.skipIf(sys.platform == 'win32', "Don't have pipes") + def test_read_all_from_pipe_reader(self): + # See Tulip issue 168. This test is derived from the example + # subprocess_attach_read_pipe.py, but we configure the + # StreamReader's limit so that twice it is less than the size + # of the data writter. Also we must explicitly attach a child + # watcher to the event loop. + + watcher = asyncio.get_child_watcher() + watcher.attach_loop(self.loop) + + code = """\ +import os, sys +fd = int(sys.argv[1]) +os.write(fd, b'data') +os.close(fd) +""" + rfd, wfd = os.pipe() + args = [sys.executable, '-c', code, str(wfd)] + + pipe = open(rfd, 'rb', 0) + reader = asyncio.StreamReader(loop=self.loop, limit=1) + protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop) + transport, _ = self.loop.run_until_complete( + self.loop.connect_read_pipe(lambda: protocol, pipe)) + + proc = self.loop.run_until_complete( + asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop)) + self.loop.run_until_complete(proc.wait()) + + os.close(wfd) + data = self.loop.run_until_complete(reader.read(-1)) + self.assertEqual(data, b'data') + if __name__ == '__main__': unittest.main() -- cgit v0.12