summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/test_streams.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asyncio/test_streams.py')
-rw-r--r--Lib/test/test_asyncio/test_streams.py46
1 files changed, 0 insertions, 46 deletions
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index ae943f3..d32b7ff 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -822,52 +822,6 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(msg1, b"hello world 1!\n")
self.assertEqual(msg2, b"hello world 2!\n")
- @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
- @requires_subprocess()
- def test_read_all_from_pipe_reader(self):
- # See asyncio issue 168. This test is derived from the example
- # subprocess_attach_read_pipe.py, but we configure the
- # StreamReader's limit so that twice it is less than the size
- # of the data writer. Also we must explicitly attach a child
- # watcher to the event loop.
-
- code = """\
-import os, sys
-fd = int(sys.argv[1])
-os.write(fd, b'data')
-os.close(fd)
-"""
- rfd, wfd = os.pipe()
- args = [sys.executable, '-c', code, str(wfd)]
-
- pipe = open(rfd, 'rb', 0)
- reader = asyncio.StreamReader(loop=self.loop, limit=1)
- protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
- transport, _ = self.loop.run_until_complete(
- self.loop.connect_read_pipe(lambda: protocol, pipe))
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- watcher = asyncio.SafeChildWatcher()
- watcher.attach_loop(self.loop)
- try:
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- asyncio.set_child_watcher(watcher)
- create = asyncio.create_subprocess_exec(
- *args,
- pass_fds={wfd},
- )
- proc = self.loop.run_until_complete(create)
- self.loop.run_until_complete(proc.wait())
- finally:
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- asyncio.set_child_watcher(None)
-
- os.close(wfd)
- data = self.loop.run_until_complete(reader.read(-1))
- self.assertEqual(data, b'data')
-
def test_streamreader_constructor_without_loop(self):
with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
asyncio.StreamReader()