diff options
author | Yury Selivanov <yury@edgedb.com> | 2019-09-30 04:59:55 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-30 04:59:55 (GMT) |
commit | 6758e6e12a71ef5530146161881f88df1fa43382 (patch) | |
tree | da1f89f35e54ddcfffc3706b87bb13f54907f7ea /Lib/test/test_asyncio | |
parent | 3667e1ee6c90e6d3b6a745cd590ece87118f81ad (diff) | |
download | cpython-6758e6e12a71ef5530146161881f88df1fa43382.zip cpython-6758e6e12a71ef5530146161881f88df1fa43382.tar.gz cpython-6758e6e12a71ef5530146161881f88df1fa43382.tar.bz2 |
bpo-38242: Revert "bpo-36889: Merge asyncio streams (GH-13251)" (#16482)
See https://bugs.python.org/issue38242 for more details
Diffstat (limited to 'Lib/test/test_asyncio')
-rw-r--r-- | Lib/test/test_asyncio/test_buffered_proto.py | 7 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_pep492.py | 8 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_streams.py | 1132 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_subprocess.py | 12 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_windows_events.py | 12 |
5 files changed, 192 insertions, 979 deletions
diff --git a/Lib/test/test_asyncio/test_buffered_proto.py b/Lib/test/test_asyncio/test_buffered_proto.py index b1531fb..f24e363 100644 --- a/Lib/test/test_asyncio/test_buffered_proto.py +++ b/Lib/test/test_asyncio/test_buffered_proto.py @@ -58,10 +58,9 @@ class BaseTestBufferedProtocol(func_tests.FunctionalTestCaseMixin): writer.close() await writer.wait_closed() - with self.assertWarns(DeprecationWarning): - srv = self.loop.run_until_complete( - asyncio.start_server( - on_server_client, '127.0.0.1', 0)) + srv = self.loop.run_until_complete( + asyncio.start_server( + on_server_client, '127.0.0.1', 0)) addr = srv.sockets[0].getsockname() self.loop.run_until_complete( diff --git a/Lib/test/test_asyncio/test_pep492.py b/Lib/test/test_asyncio/test_pep492.py index 58a6094..a1f27dd 100644 --- a/Lib/test/test_asyncio/test_pep492.py +++ b/Lib/test/test_asyncio/test_pep492.py @@ -95,11 +95,9 @@ class StreamReaderTests(BaseTest): def test_readline(self): DATA = b'line1\nline2\nline3' - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(DATA) - stream._feed_eof() + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(DATA) + stream.feed_eof() async def reader(): data = [] diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 6325ee3..b9413ab 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -1,8 +1,6 @@ """Tests for streams.py.""" -import contextlib import gc -import io import os import queue import pickle @@ -18,7 +16,6 @@ except ImportError: ssl = None import asyncio -from asyncio.streams import _StreamProtocol, _ensure_can_read, _ensure_can_write from test.test_asyncio import utils as test_utils @@ -26,24 +23,6 @@ def tearDownModule(): asyncio.set_event_loop_policy(None) -class StreamModeTests(unittest.TestCase): - def test__ensure_can_read_ok(self): - self.assertIsNone(_ensure_can_read(asyncio.StreamMode.READ)) - self.assertIsNone(_ensure_can_read(asyncio.StreamMode.READWRITE)) - - def test__ensure_can_read_fail(self): - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - _ensure_can_read(asyncio.StreamMode.WRITE) - - def test__ensure_can_write_ok(self): - self.assertIsNone(_ensure_can_write(asyncio.StreamMode.WRITE)) - self.assertIsNone(_ensure_can_write(asyncio.StreamMode.READWRITE)) - - def test__ensure_can_write_fail(self): - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - _ensure_can_write(asyncio.StreamMode.READ) - - class StreamTests(test_utils.TestCase): DATA = b'line1\nline2\nline3\n' @@ -63,8 +42,7 @@ class StreamTests(test_utils.TestCase): @mock.patch('asyncio.streams.events') def test_ctor_global_loop(self, m_events): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - _asyncio_internal=True) + stream = asyncio.StreamReader() self.assertIs(stream._loop, m_events.get_event_loop.return_value) def _basetest_open_connection(self, open_connection_fut): @@ -100,8 +78,7 @@ class StreamTests(test_utils.TestCase): self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) try: with self.assertWarns(DeprecationWarning): - reader, writer = self.loop.run_until_complete( - open_connection_fut) + reader, writer = self.loop.run_until_complete(open_connection_fut) finally: asyncio.set_event_loop(None) writer.write(b'GET / HTTP/1.0\r\n\r\n') @@ -161,27 +138,21 @@ class StreamTests(test_utils.TestCase): self._basetest_open_connection_error(conn_fut) def test_feed_empty_data(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) - stream._feed_data(b'') + stream.feed_data(b'') self.assertEqual(b'', stream._buffer) def test_feed_nonempty_data(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) - stream._feed_data(self.DATA) + stream.feed_data(self.DATA) self.assertEqual(self.DATA, stream._buffer) def test_read_zero(self): # Read zero bytes. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(self.DATA) + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(self.DATA) data = self.loop.run_until_complete(stream.read(0)) self.assertEqual(b'', data) @@ -189,13 +160,11 @@ class StreamTests(test_utils.TestCase): def test_read(self): # Read bytes. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) read_task = self.loop.create_task(stream.read(30)) def cb(): - stream._feed_data(self.DATA) + stream.feed_data(self.DATA) self.loop.call_soon(cb) data = self.loop.run_until_complete(read_task) @@ -204,11 +173,9 @@ class StreamTests(test_utils.TestCase): def test_read_line_breaks(self): # Read bytes without line breaks. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'line1') - stream._feed_data(b'line2') + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'line1') + stream.feed_data(b'line2') data = self.loop.run_until_complete(stream.read(5)) @@ -217,13 +184,11 @@ class StreamTests(test_utils.TestCase): def test_read_eof(self): # Read bytes, stop at eof. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) read_task = self.loop.create_task(stream.read(1024)) def cb(): - stream._feed_eof() + stream.feed_eof() self.loop.call_soon(cb) data = self.loop.run_until_complete(read_task) @@ -232,15 +197,13 @@ class StreamTests(test_utils.TestCase): def test_read_until_eof(self): # Read all bytes until eof. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) read_task = self.loop.create_task(stream.read(-1)) def cb(): - stream._feed_data(b'chunk1\n') - stream._feed_data(b'chunk2') - stream._feed_eof() + stream.feed_data(b'chunk1\n') + stream.feed_data(b'chunk2') + stream.feed_eof() self.loop.call_soon(cb) data = self.loop.run_until_complete(read_task) @@ -249,34 +212,26 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'', stream._buffer) def test_read_exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'line\n') + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'line\n') data = self.loop.run_until_complete(stream.read(2)) self.assertEqual(b'li', data) - stream._set_exception(ValueError()) + stream.set_exception(ValueError()) self.assertRaises( ValueError, self.loop.run_until_complete, stream.read(2)) def test_invalid_limit(self): with self.assertRaisesRegex(ValueError, 'imit'): - asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=0, loop=self.loop, - _asyncio_internal=True) + asyncio.StreamReader(limit=0, loop=self.loop) with self.assertRaisesRegex(ValueError, 'imit'): - asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=-1, loop=self.loop, - _asyncio_internal=True) + asyncio.StreamReader(limit=-1, loop=self.loop) def test_read_limit(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=3, loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'chunk') + stream = asyncio.StreamReader(limit=3, loop=self.loop) + stream.feed_data(b'chunk') data = self.loop.run_until_complete(stream.read(5)) self.assertEqual(b'chunk', data) self.assertEqual(b'', stream._buffer) @@ -284,16 +239,14 @@ class StreamTests(test_utils.TestCase): def test_readline(self): # Read one line. 'readline' will need to wait for the data # to come from 'cb' - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'chunk1 ') + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'chunk1 ') read_task = self.loop.create_task(stream.readline()) def cb(): - stream._feed_data(b'chunk2 ') - stream._feed_data(b'chunk3 ') - stream._feed_data(b'\n chunk4') + stream.feed_data(b'chunk2 ') + stream.feed_data(b'chunk3 ') + stream.feed_data(b'\n chunk4') self.loop.call_soon(cb) line = self.loop.run_until_complete(read_task) @@ -301,26 +254,22 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b' chunk4', stream._buffer) def test_readline_limit_with_existing_data(self): - # Read one line. The data is in Stream's buffer + # Read one line. The data is in StreamReader's buffer # before the event loop is run. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=3, loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'li') - stream._feed_data(b'ne1\nline2\n') + stream = asyncio.StreamReader(limit=3, loop=self.loop) + stream.feed_data(b'li') + stream.feed_data(b'ne1\nline2\n') self.assertRaises( ValueError, self.loop.run_until_complete, stream.readline()) # The buffer should contain the remaining data after exception self.assertEqual(b'line2\n', stream._buffer) - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=3, loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'li') - stream._feed_data(b'ne1') - stream._feed_data(b'li') + stream = asyncio.StreamReader(limit=3, loop=self.loop) + stream.feed_data(b'li') + stream.feed_data(b'ne1') + stream.feed_data(b'li') self.assertRaises( ValueError, self.loop.run_until_complete, stream.readline()) @@ -332,34 +281,30 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'', stream._buffer) def test_at_eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) self.assertFalse(stream.at_eof()) - stream._feed_data(b'some data\n') + stream.feed_data(b'some data\n') self.assertFalse(stream.at_eof()) self.loop.run_until_complete(stream.readline()) self.assertFalse(stream.at_eof()) - stream._feed_data(b'some data\n') - stream._feed_eof() + stream.feed_data(b'some data\n') + stream.feed_eof() self.loop.run_until_complete(stream.readline()) self.assertTrue(stream.at_eof()) def test_readline_limit(self): - # Read one line. Streams are fed with data after + # Read one line. StreamReaders are fed with data after # their 'readline' methods are called. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=7, loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(limit=7, loop=self.loop) def cb(): - stream._feed_data(b'chunk1') - stream._feed_data(b'chunk2') - stream._feed_data(b'chunk3\n') - stream._feed_eof() + stream.feed_data(b'chunk1') + stream.feed_data(b'chunk2') + stream.feed_data(b'chunk3\n') + stream.feed_eof() self.loop.call_soon(cb) self.assertRaises( @@ -368,14 +313,12 @@ class StreamTests(test_utils.TestCase): # a ValueError it should be empty. self.assertEqual(b'', stream._buffer) - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=7, loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(limit=7, loop=self.loop) def cb(): - stream._feed_data(b'chunk1') - stream._feed_data(b'chunk2\n') - stream._feed_data(b'chunk3\n') - stream._feed_eof() + stream.feed_data(b'chunk1') + stream.feed_data(b'chunk2\n') + stream.feed_data(b'chunk3\n') + stream.feed_eof() self.loop.call_soon(cb) self.assertRaises( @@ -383,20 +326,18 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'chunk3\n', stream._buffer) # check strictness of the limit - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=7, loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'1234567\n') + stream = asyncio.StreamReader(limit=7, loop=self.loop) + stream.feed_data(b'1234567\n') line = self.loop.run_until_complete(stream.readline()) self.assertEqual(b'1234567\n', line) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'12345678\n') + stream.feed_data(b'12345678\n') with self.assertRaises(ValueError) as cm: self.loop.run_until_complete(stream.readline()) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'12345678') + stream.feed_data(b'12345678') with self.assertRaises(ValueError) as cm: self.loop.run_until_complete(stream.readline()) self.assertEqual(b'', stream._buffer) @@ -404,11 +345,9 @@ class StreamTests(test_utils.TestCase): def test_readline_nolimit_nowait(self): # All needed data for the first 'readline' call will be # in the buffer. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(self.DATA[:6]) - stream._feed_data(self.DATA[6:]) + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(self.DATA[:6]) + stream.feed_data(self.DATA[6:]) line = self.loop.run_until_complete(stream.readline()) @@ -416,29 +355,23 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'line2\nline3\n', stream._buffer) def test_readline_eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'some data') - stream._feed_eof() + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'some data') + stream.feed_eof() line = self.loop.run_until_complete(stream.readline()) self.assertEqual(b'some data', line) def test_readline_empty_eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_eof() + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_eof() line = self.loop.run_until_complete(stream.readline()) self.assertEqual(b'', line) def test_readline_read_byte_count(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(self.DATA) + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(self.DATA) self.loop.run_until_complete(stream.readline()) @@ -448,89 +381,79 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'ine3\n', stream._buffer) def test_readline_exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'line\n') + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'line\n') data = self.loop.run_until_complete(stream.readline()) self.assertEqual(b'line\n', data) - stream._set_exception(ValueError()) + stream.set_exception(ValueError()) self.assertRaises( ValueError, self.loop.run_until_complete, stream.readline()) self.assertEqual(b'', stream._buffer) def test_readuntil_separator(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) with self.assertRaisesRegex(ValueError, 'Separator should be'): self.loop.run_until_complete(stream.readuntil(separator=b'')) def test_readuntil_multi_chunks(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) - stream._feed_data(b'lineAAA') + stream.feed_data(b'lineAAA') data = self.loop.run_until_complete(stream.readuntil(separator=b'AAA')) self.assertEqual(b'lineAAA', data) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'lineAAA') + stream.feed_data(b'lineAAA') data = self.loop.run_until_complete(stream.readuntil(b'AAA')) self.assertEqual(b'lineAAA', data) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'lineAAAxxx') + stream.feed_data(b'lineAAAxxx') data = self.loop.run_until_complete(stream.readuntil(b'AAA')) self.assertEqual(b'lineAAA', data) self.assertEqual(b'xxx', stream._buffer) def test_readuntil_multi_chunks_1(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) - stream._feed_data(b'QWEaa') - stream._feed_data(b'XYaa') - stream._feed_data(b'a') + stream.feed_data(b'QWEaa') + stream.feed_data(b'XYaa') + stream.feed_data(b'a') data = self.loop.run_until_complete(stream.readuntil(b'aaa')) self.assertEqual(b'QWEaaXYaaa', data) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'QWEaa') - stream._feed_data(b'XYa') - stream._feed_data(b'aa') + stream.feed_data(b'QWEaa') + stream.feed_data(b'XYa') + stream.feed_data(b'aa') data = self.loop.run_until_complete(stream.readuntil(b'aaa')) self.assertEqual(b'QWEaaXYaaa', data) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'aaa') + stream.feed_data(b'aaa') data = self.loop.run_until_complete(stream.readuntil(b'aaa')) self.assertEqual(b'aaa', data) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'Xaaa') + stream.feed_data(b'Xaaa') data = self.loop.run_until_complete(stream.readuntil(b'aaa')) self.assertEqual(b'Xaaa', data) self.assertEqual(b'', stream._buffer) - stream._feed_data(b'XXX') - stream._feed_data(b'a') - stream._feed_data(b'a') - stream._feed_data(b'a') + stream.feed_data(b'XXX') + stream.feed_data(b'a') + stream.feed_data(b'a') + stream.feed_data(b'a') data = self.loop.run_until_complete(stream.readuntil(b'aaa')) self.assertEqual(b'XXXaaa', data) self.assertEqual(b'', stream._buffer) def test_readuntil_eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'some dataAA') - stream._feed_eof() + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'some dataAA') + stream.feed_eof() with self.assertRaises(asyncio.IncompleteReadError) as cm: self.loop.run_until_complete(stream.readuntil(b'AAA')) @@ -539,18 +462,15 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'', stream._buffer) def test_readuntil_limit_found_sep(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, limit=3, - _asyncio_internal=True) - stream._feed_data(b'some dataAA') - + stream = asyncio.StreamReader(loop=self.loop, limit=3) + stream.feed_data(b'some dataAA') with self.assertRaisesRegex(asyncio.LimitOverrunError, 'not found') as cm: self.loop.run_until_complete(stream.readuntil(b'AAA')) self.assertEqual(b'some dataAA', stream._buffer) - stream._feed_data(b'A') + stream.feed_data(b'A') with self.assertRaisesRegex(asyncio.LimitOverrunError, 'is found') as cm: self.loop.run_until_complete(stream.readuntil(b'AAA')) @@ -559,10 +479,8 @@ class StreamTests(test_utils.TestCase): def test_readexactly_zero_or_less(self): # Read exact number of bytes (zero or less). - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(self.DATA) + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(self.DATA) data = self.loop.run_until_complete(stream.readexactly(0)) self.assertEqual(b'', data) @@ -574,17 +492,15 @@ class StreamTests(test_utils.TestCase): def test_readexactly(self): # Read exact number of bytes. - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) n = 2 * len(self.DATA) read_task = self.loop.create_task(stream.readexactly(n)) def cb(): - stream._feed_data(self.DATA) - stream._feed_data(self.DATA) - stream._feed_data(self.DATA) + stream.feed_data(self.DATA) + stream.feed_data(self.DATA) + stream.feed_data(self.DATA) self.loop.call_soon(cb) data = self.loop.run_until_complete(read_task) @@ -592,25 +508,21 @@ class StreamTests(test_utils.TestCase): self.assertEqual(self.DATA, stream._buffer) def test_readexactly_limit(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - limit=3, loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'chunk') + stream = asyncio.StreamReader(limit=3, loop=self.loop) + stream.feed_data(b'chunk') data = self.loop.run_until_complete(stream.readexactly(5)) self.assertEqual(b'chunk', data) self.assertEqual(b'', stream._buffer) def test_readexactly_eof(self): # Read exact number of bytes (eof). - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) n = 2 * len(self.DATA) read_task = self.loop.create_task(stream.readexactly(n)) def cb(): - stream._feed_data(self.DATA) - stream._feed_eof() + stream.feed_data(self.DATA) + stream.feed_eof() self.loop.call_soon(cb) with self.assertRaises(asyncio.IncompleteReadError) as cm: @@ -622,35 +534,29 @@ class StreamTests(test_utils.TestCase): self.assertEqual(b'', stream._buffer) def test_readexactly_exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'line\n') + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'line\n') data = self.loop.run_until_complete(stream.readexactly(2)) self.assertEqual(b'li', data) - stream._set_exception(ValueError()) + stream.set_exception(ValueError()) self.assertRaises( ValueError, self.loop.run_until_complete, stream.readexactly(2)) def test_exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) self.assertIsNone(stream.exception()) exc = ValueError() - stream._set_exception(exc) + stream.set_exception(exc) self.assertIs(stream.exception(), exc) def test_exception_waiter(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) async def set_err(): - stream._set_exception(ValueError()) + stream.set_exception(ValueError()) t1 = self.loop.create_task(stream.readline()) t2 = self.loop.create_task(set_err()) @@ -660,16 +566,14 @@ class StreamTests(test_utils.TestCase): self.assertRaises(ValueError, t1.result) def test_exception_cancel(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) t = self.loop.create_task(stream.readline()) test_utils.run_briefly(self.loop) t.cancel() test_utils.run_briefly(self.loop) # The following line fails if set_exception() isn't careful. - stream._set_exception(RuntimeError('message')) + stream.set_exception(RuntimeError('message')) test_utils.run_briefly(self.loop) self.assertIs(stream._waiter, None) @@ -829,7 +733,7 @@ class StreamTests(test_utils.TestCase): def test_read_all_from_pipe_reader(self): # See asyncio issue 168. This test is derived from the example # subprocess_attach_read_pipe.py, but we configure the - # Stream's limit so that twice it is less than the size + # StreamReader's limit so that twice it is less than the size # of the data writter. Also we must explicitly attach a child # watcher to the event loop. @@ -843,11 +747,8 @@ os.close(fd) args = [sys.executable, '-c', code, str(wfd)] pipe = open(rfd, 'rb', 0) - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, limit=1, - _asyncio_internal=True) - protocol = _StreamProtocol(stream, loop=self.loop, - _asyncio_internal=True) + reader = asyncio.StreamReader(loop=self.loop, limit=1) + protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop) transport, _ = self.loop.run_until_complete( self.loop.connect_read_pipe(lambda: protocol, pipe)) @@ -865,30 +766,29 @@ os.close(fd) asyncio.set_child_watcher(None) os.close(wfd) - data = self.loop.run_until_complete(stream.read(-1)) + data = self.loop.run_until_complete(reader.read(-1)) self.assertEqual(data, b'data') def test_streamreader_constructor(self): self.addCleanup(asyncio.set_event_loop, None) asyncio.set_event_loop(self.loop) - # asyncio issue #184: Ensure that _StreamProtocol constructor + # asyncio issue #184: Ensure that StreamReaderProtocol constructor # retrieves the current loop if the loop parameter is not set - reader = asyncio.Stream(mode=asyncio.StreamMode.READ, - _asyncio_internal=True) + reader = asyncio.StreamReader() self.assertIs(reader._loop, self.loop) def test_streamreaderprotocol_constructor(self): self.addCleanup(asyncio.set_event_loop, None) asyncio.set_event_loop(self.loop) - # asyncio issue #184: Ensure that _StreamProtocol constructor + # asyncio issue #184: Ensure that StreamReaderProtocol constructor # retrieves the current loop if the loop parameter is not set - stream = mock.Mock() - protocol = _StreamProtocol(stream, _asyncio_internal=True) + reader = mock.Mock() + protocol = asyncio.StreamReaderProtocol(reader) self.assertIs(protocol._loop, self.loop) - def test_drain_raises_deprecated(self): + def test_drain_raises(self): # See http://bugs.python.org/issue25441 # This test should not use asyncio for the mock server; the @@ -902,7 +802,7 @@ os.close(fd) def server(): # Runs in a separate thread. - with socket.create_server(('127.0.0.1', 0)) as sock: + with socket.create_server(('localhost', 0)) as sock: addr = sock.getsockname() q.put(addr) clt, _ = sock.accept() @@ -933,106 +833,48 @@ os.close(fd) thread.join() self.assertEqual([], messages) - def test_drain_raises(self): - # See http://bugs.python.org/issue25441 - - # This test should not use asyncio for the mock server; the - # whole point of the test is to test for a bug in drain() - # where it never gives up the event loop but the socket is - # closed on the server side. - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - q = queue.Queue() - - def server(): - # Runs in a separate thread. - with socket.create_server(('localhost', 0)) as sock: - addr = sock.getsockname() - q.put(addr) - clt, _ = sock.accept() - clt.close() - - async def client(host, port): - stream = await asyncio.connect(host, port) - - while True: - stream.write(b"foo\n") - await stream.drain() - - # Start the server thread and wait for it to be listening. - thread = threading.Thread(target=server) - thread.setDaemon(True) - thread.start() - addr = q.get() - - # Should not be stuck in an infinite loop. - with self.assertRaises((ConnectionResetError, ConnectionAbortedError, - BrokenPipeError)): - self.loop.run_until_complete(client(*addr)) - - # Clean up the thread. (Only on success; on failure, it may - # be stuck in accept().) - thread.join() - self.assertEqual([], messages) - def test___repr__(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - self.assertEqual("<Stream mode=StreamMode.READ>", repr(stream)) + stream = asyncio.StreamReader(loop=self.loop) + self.assertEqual("<StreamReader>", repr(stream)) def test___repr__nondefault_limit(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, limit=123, - _asyncio_internal=True) - self.assertEqual("<Stream mode=StreamMode.READ limit=123>", repr(stream)) + stream = asyncio.StreamReader(loop=self.loop, limit=123) + self.assertEqual("<StreamReader limit=123>", repr(stream)) def test___repr__eof(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_eof() - self.assertEqual("<Stream mode=StreamMode.READ eof>", repr(stream)) + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_eof() + self.assertEqual("<StreamReader eof>", repr(stream)) def test___repr__data(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._feed_data(b'data') - self.assertEqual("<Stream mode=StreamMode.READ 4 bytes>", repr(stream)) + stream = asyncio.StreamReader(loop=self.loop) + stream.feed_data(b'data') + self.assertEqual("<StreamReader 4 bytes>", repr(stream)) def test___repr__exception(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) exc = RuntimeError() - stream._set_exception(exc) - self.assertEqual("<Stream mode=StreamMode.READ exception=RuntimeError()>", + stream.set_exception(exc) + self.assertEqual("<StreamReader exception=RuntimeError()>", repr(stream)) def test___repr__waiter(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - stream._waiter = self.loop.create_future() + stream = asyncio.StreamReader(loop=self.loop) + stream._waiter = asyncio.Future(loop=self.loop) self.assertRegex( repr(stream), - r"<Stream .+ waiter=<Future pending[\S ]*>>") + r"<StreamReader waiter=<Future pending[\S ]*>>") stream._waiter.set_result(None) self.loop.run_until_complete(stream._waiter) stream._waiter = None - self.assertEqual("<Stream mode=StreamMode.READ>", repr(stream)) + self.assertEqual("<StreamReader>", repr(stream)) def test___repr__transport(self): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) + stream = asyncio.StreamReader(loop=self.loop) stream._transport = mock.Mock() stream._transport.__repr__ = mock.Mock() stream._transport.__repr__.return_value = "<Transport>" - self.assertEqual("<Stream mode=StreamMode.READ transport=<Transport>>", - repr(stream)) + self.assertEqual("<StreamReader transport=<Transport>>", repr(stream)) def test_IncompleteReadError_pickleable(self): e = asyncio.IncompleteReadError(b'abc', 10) @@ -1051,7 +893,7 @@ os.close(fd) self.assertEqual(str(e), str(e2)) self.assertEqual(e.consumed, e2.consumed) - def test_wait_closed_on_close_deprecated(self): + def test_wait_closed_on_close(self): with test_utils.run_test_server() as httpd: with self.assertWarns(DeprecationWarning): rd, wr = self.loop.run_until_complete( @@ -1069,24 +911,7 @@ os.close(fd) self.assertTrue(wr.is_closing()) self.loop.run_until_complete(wr.wait_closed()) - def test_wait_closed_on_close(self): - with test_utils.run_test_server() as httpd: - stream = self.loop.run_until_complete( - asyncio.connect(*httpd.address)) - - stream.write(b'GET / HTTP/1.0\r\n\r\n') - f = stream.readline() - data = self.loop.run_until_complete(f) - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - f = stream.read() - data = self.loop.run_until_complete(f) - self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - self.assertFalse(stream.is_closing()) - stream.close() - self.assertTrue(stream.is_closing()) - self.loop.run_until_complete(stream.wait_closed()) - - def test_wait_closed_on_close_with_unread_data_deprecated(self): + def test_wait_closed_on_close_with_unread_data(self): with test_utils.run_test_server() as httpd: with self.assertWarns(DeprecationWarning): rd, wr = self.loop.run_until_complete( @@ -1099,44 +924,33 @@ os.close(fd) wr.close() self.loop.run_until_complete(wr.wait_closed()) - def test_wait_closed_on_close_with_unread_data(self): - with test_utils.run_test_server() as httpd: - stream = self.loop.run_until_complete( - asyncio.connect(*httpd.address)) - - stream.write(b'GET / HTTP/1.0\r\n\r\n') - f = stream.readline() - data = self.loop.run_until_complete(f) - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - stream.close() - self.loop.run_until_complete(stream.wait_closed()) - def test_del_stream_before_sock_closing(self): messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - async def test(): - - with test_utils.run_test_server() as httpd: - stream = await asyncio.connect(*httpd.address) - sock = stream.get_extra_info('socket') - self.assertNotEqual(sock.fileno(), -1) + with test_utils.run_test_server() as httpd: + with self.assertWarns(DeprecationWarning): + rd, wr = self.loop.run_until_complete( + asyncio.open_connection(*httpd.address, loop=self.loop)) + sock = wr.get_extra_info('socket') + self.assertNotEqual(sock.fileno(), -1) - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') + wr.write(b'GET / HTTP/1.0\r\n\r\n') + f = rd.readline() + data = self.loop.run_until_complete(f) + self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - # drop refs to reader/writer - del stream - gc.collect() - # make a chance to close the socket - await asyncio.sleep(0) + # drop refs to reader/writer + del rd + del wr + gc.collect() + # make a chance to close the socket + test_utils.run_briefly(self.loop) - self.assertEqual(1, len(messages), messages) - self.assertEqual(sock.fileno(), -1) + self.assertEqual(1, len(messages)) + self.assertEqual(sock.fileno(), -1) - self.loop.run_until_complete(test()) - self.assertEqual(1, len(messages), messages) + self.assertEqual(1, len(messages)) self.assertEqual('An open stream object is being garbage ' 'collected; call "stream.close()" explicitly.', messages[0]['message']) @@ -1146,12 +960,9 @@ os.close(fd) self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) with test_utils.run_test_server() as httpd: - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, - _asyncio_internal=True) - pr = _StreamProtocol(stream, loop=self.loop, - _asyncio_internal=True) - del stream + rd = asyncio.StreamReader(loop=self.loop) + pr = asyncio.StreamReaderProtocol(rd, loop=self.loop) + del rd gc.collect() tr, _ = self.loop.run_until_complete( self.loop.create_connection( @@ -1168,14 +979,15 @@ os.close(fd) def test_async_writer_api(self): async def inner(httpd): - stream = await asyncio.connect(*httpd.address) + rd, wr = await asyncio.open_connection(*httpd.address) - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() + wr.write(b'GET / HTTP/1.0\r\n\r\n') + data = await rd.readline() self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await stream.read() + data = await rd.read() self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - await stream.close() + wr.close() + await wr.wait_closed() messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) @@ -1187,16 +999,17 @@ os.close(fd) def test_async_writer_api_exception_after_close(self): async def inner(httpd): - stream = await asyncio.connect(*httpd.address) + rd, wr = await asyncio.open_connection(*httpd.address) - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() + wr.write(b'GET / HTTP/1.0\r\n\r\n') + data = await rd.readline() self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await stream.read() + data = await rd.read() self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - stream.close() + wr.close() with self.assertRaises(ConnectionResetError): - await stream.write(b'data') + wr.write(b'data') + await wr.drain() messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) @@ -1227,587 +1040,6 @@ os.close(fd) self.assertEqual(messages, []) - def test_stream_reader_create_warning(self): - with contextlib.suppress(AttributeError): - del asyncio.StreamReader - with self.assertWarns(DeprecationWarning): - asyncio.StreamReader - - def test_stream_writer_create_warning(self): - with contextlib.suppress(AttributeError): - del asyncio.StreamWriter - with self.assertWarns(DeprecationWarning): - asyncio.StreamWriter - - def test_stream_reader_forbidden_ops(self): - async def inner(): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - _asyncio_internal=True) - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - await stream.write(b'data') - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - await stream.writelines([b'data', b'other']) - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - stream.write_eof() - with self.assertRaisesRegex(RuntimeError, "The stream is read-only"): - await stream.drain() - - self.loop.run_until_complete(inner()) - - def test_stream_writer_forbidden_ops(self): - async def inner(): - stream = asyncio.Stream(mode=asyncio.StreamMode.WRITE, - _asyncio_internal=True) - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - stream._feed_data(b'data') - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - await stream.readline() - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - await stream.readuntil() - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - await stream.read() - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - await stream.readexactly(10) - with self.assertRaisesRegex(RuntimeError, "The stream is write-only"): - async for chunk in stream: - pass - - self.loop.run_until_complete(inner()) - - def _basetest_connect(self, stream): - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - - stream.write(b'GET / HTTP/1.0\r\n\r\n') - f = stream.readline() - data = self.loop.run_until_complete(f) - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - f = stream.read() - data = self.loop.run_until_complete(f) - self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - stream.close() - self.loop.run_until_complete(stream.wait_closed()) - - self.assertEqual([], messages) - - def test_connect(self): - with test_utils.run_test_server() as httpd: - stream = self.loop.run_until_complete( - asyncio.connect(*httpd.address)) - self.assertFalse(stream.is_server_side()) - self._basetest_connect(stream) - - @support.skip_unless_bind_unix_socket - def test_connect_unix(self): - with test_utils.run_test_unix_server() as httpd: - stream = self.loop.run_until_complete( - asyncio.connect_unix(httpd.address)) - self._basetest_connect(stream) - - def test_stream_async_context_manager(self): - async def test(httpd): - stream = await asyncio.connect(*httpd.address) - async with stream: - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await stream.read() - self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - self.assertTrue(stream.is_closing()) - - with test_utils.run_test_server() as httpd: - self.loop.run_until_complete(test(httpd)) - - def test_connect_async_context_manager(self): - async def test(httpd): - async with asyncio.connect(*httpd.address) as stream: - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await stream.read() - self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - self.assertTrue(stream.is_closing()) - - with test_utils.run_test_server() as httpd: - self.loop.run_until_complete(test(httpd)) - - @support.skip_unless_bind_unix_socket - def test_connect_unix_async_context_manager(self): - async def test(httpd): - async with asyncio.connect_unix(httpd.address) as stream: - await stream.write(b'GET / HTTP/1.0\r\n\r\n') - data = await stream.readline() - self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - data = await stream.read() - self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - self.assertTrue(stream.is_closing()) - - with test_utils.run_test_unix_server() as httpd: - self.loop.run_until_complete(test(httpd)) - - def test_stream_server(self): - - async def handle_client(stream): - self.assertTrue(stream.is_server_side()) - data = await stream.readline() - await stream.write(data) - await stream.close() - - async def client(srv): - addr = srv.sockets[0].getsockname() - stream = await asyncio.connect(*addr) - # send a line - await stream.write(b"hello world!\n") - # read it back - msgback = await stream.readline() - await stream.close() - self.assertEqual(msgback, b"hello world!\n") - await srv.close() - - async def test(): - async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as server: - await server.start_serving() - task = asyncio.create_task(client(server)) - with contextlib.suppress(asyncio.CancelledError): - await server.serve_forever() - await task - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(test()) - self.assertEqual(messages, []) - - @support.skip_unless_bind_unix_socket - def test_unix_stream_server(self): - - async def handle_client(stream): - data = await stream.readline() - await stream.write(data) - await stream.close() - - async def client(srv): - addr = srv.sockets[0].getsockname() - stream = await asyncio.connect_unix(addr) - # send a line - await stream.write(b"hello world!\n") - # read it back - msgback = await stream.readline() - await stream.close() - self.assertEqual(msgback, b"hello world!\n") - await srv.close() - - async def test(): - with test_utils.unix_socket_path() as path: - async with asyncio.UnixStreamServer(handle_client, path) as server: - await server.start_serving() - task = asyncio.create_task(client(server)) - with contextlib.suppress(asyncio.CancelledError): - await server.serve_forever() - await task - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(test()) - self.assertEqual(messages, []) - - def test_stream_server_inheritance_forbidden(self): - with self.assertRaises(TypeError): - class MyServer(asyncio.StreamServer): - pass - - @support.skip_unless_bind_unix_socket - def test_unix_stream_server_inheritance_forbidden(self): - with self.assertRaises(TypeError): - class MyServer(asyncio.UnixStreamServer): - pass - - def test_stream_server_bind(self): - async def handle_client(stream): - await stream.close() - - async def test(): - srv = asyncio.StreamServer(handle_client, '127.0.0.1', 0) - self.assertFalse(srv.is_bound()) - self.assertEqual(0, len(srv.sockets)) - await srv.bind() - self.assertTrue(srv.is_bound()) - self.assertEqual(1, len(srv.sockets)) - await srv.close() - self.assertFalse(srv.is_bound()) - self.assertEqual(0, len(srv.sockets)) - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(test()) - self.assertEqual(messages, []) - - def test_stream_server_bind_async_with(self): - async def handle_client(stream): - await stream.close() - - async def test(): - async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as srv: - self.assertTrue(srv.is_bound()) - self.assertEqual(1, len(srv.sockets)) - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(test()) - self.assertEqual(messages, []) - - def test_stream_server_start_serving(self): - async def handle_client(stream): - await stream.close() - - async def test(): - async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as srv: - self.assertFalse(srv.is_serving()) - await srv.start_serving() - self.assertTrue(srv.is_serving()) - await srv.close() - self.assertFalse(srv.is_serving()) - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(test()) - self.assertEqual(messages, []) - - def test_stream_server_close(self): - server_stream_aborted = False - fut1 = self.loop.create_future() - fut2 = self.loop.create_future() - - async def handle_client(stream): - data = await stream.readexactly(4) - self.assertEqual(b'data', data) - fut1.set_result(None) - await fut2 - self.assertEqual(b'', await stream.readline()) - nonlocal server_stream_aborted - server_stream_aborted = True - - async def client(srv): - addr = srv.sockets[0].getsockname() - stream = await asyncio.connect(*addr) - await stream.write(b'data') - await fut2 - self.assertEqual(b'', await stream.readline()) - await stream.close() - - async def test(): - async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as server: - await server.start_serving() - task = asyncio.create_task(client(server)) - await fut1 - fut2.set_result(None) - await server.close() - await task - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(asyncio.wait_for(test(), 60.0)) - self.assertEqual(messages, []) - self.assertTrue(fut1.done()) - self.assertTrue(fut2.done()) - self.assertTrue(server_stream_aborted) - - def test_stream_server_abort(self): - server_stream_aborted = False - fut1 = self.loop.create_future() - fut2 = self.loop.create_future() - - async def handle_client(stream): - data = await stream.readexactly(4) - self.assertEqual(b'data', data) - fut1.set_result(None) - await fut2 - self.assertEqual(b'', await stream.readline()) - nonlocal server_stream_aborted - server_stream_aborted = True - - async def client(srv): - addr = srv.sockets[0].getsockname() - stream = await asyncio.connect(*addr) - await stream.write(b'data') - await fut2 - self.assertEqual(b'', await stream.readline()) - await stream.close() - - async def test(): - async with asyncio.StreamServer(handle_client, '127.0.0.1', 0) as server: - await server.start_serving() - task = asyncio.create_task(client(server)) - await fut1 - fut2.set_result(None) - await server.abort() - await task - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(asyncio.wait_for(test(), 60.0)) - self.assertEqual(messages, []) - self.assertTrue(fut1.done()) - self.assertTrue(fut2.done()) - self.assertTrue(server_stream_aborted) - - def test_stream_shutdown_hung_task(self): - fut1 = self.loop.create_future() - fut2 = self.loop.create_future() - cancelled = self.loop.create_future() - - async def handle_client(stream): - data = await stream.readexactly(4) - self.assertEqual(b'data', data) - fut1.set_result(None) - await fut2 - try: - while True: - await asyncio.sleep(0.01) - except asyncio.CancelledError: - cancelled.set_result(None) - raise - - async def client(srv): - addr = srv.sockets[0].getsockname() - stream = await asyncio.connect(*addr) - await stream.write(b'data') - await fut2 - self.assertEqual(b'', await stream.readline()) - await stream.close() - - async def test(): - async with asyncio.StreamServer(handle_client, - '127.0.0.1', - 0, - shutdown_timeout=0.3) as server: - await server.start_serving() - task = asyncio.create_task(client(server)) - await fut1 - fut2.set_result(None) - await server.close() - await task - await cancelled - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(asyncio.wait_for(test(), 60.0)) - self.assertEqual(messages, []) - self.assertTrue(fut1.done()) - self.assertTrue(fut2.done()) - self.assertTrue(cancelled.done()) - - def test_stream_shutdown_hung_task_prevents_cancellation(self): - fut1 = self.loop.create_future() - fut2 = self.loop.create_future() - cancelled = self.loop.create_future() - do_handle_client = True - - async def handle_client(stream): - data = await stream.readexactly(4) - self.assertEqual(b'data', data) - fut1.set_result(None) - await fut2 - while do_handle_client: - with contextlib.suppress(asyncio.CancelledError): - await asyncio.sleep(0.01) - cancelled.set_result(None) - - async def client(srv): - addr = srv.sockets[0].getsockname() - stream = await asyncio.connect(*addr) - await stream.write(b'data') - await fut2 - self.assertEqual(b'', await stream.readline()) - await stream.close() - - async def test(): - async with asyncio.StreamServer(handle_client, - '127.0.0.1', - 0, - shutdown_timeout=0.3) as server: - await server.start_serving() - task = asyncio.create_task(client(server)) - await fut1 - fut2.set_result(None) - await server.close() - nonlocal do_handle_client - do_handle_client = False - await task - await cancelled - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - self.loop.run_until_complete(asyncio.wait_for(test(), 60.0)) - self.assertEqual(1, len(messages)) - self.assertRegex(messages[0]['message'], - "<Task pending .+ ignored cancellation request") - self.assertTrue(fut1.done()) - self.assertTrue(fut2.done()) - self.assertTrue(cancelled.done()) - - def test_sendfile(self): - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - - with open(support.TESTFN, 'wb') as fp: - fp.write(b'data\n') - self.addCleanup(support.unlink, support.TESTFN) - - async def serve_callback(stream): - data = await stream.readline() - await stream.write(b'ack-' + data) - data = await stream.readline() - await stream.write(b'ack-' + data) - data = await stream.readline() - await stream.write(b'ack-' + data) - await stream.close() - - async def do_connect(host, port): - stream = await asyncio.connect(host, port) - await stream.write(b'begin\n') - data = await stream.readline() - self.assertEqual(b'ack-begin\n', data) - with open(support.TESTFN, 'rb') as fp: - await stream.sendfile(fp) - data = await stream.readline() - self.assertEqual(b'ack-data\n', data) - await stream.write(b'end\n') - data = await stream.readline() - self.assertEqual(data, b'ack-end\n') - await stream.close() - - async def test(): - async with asyncio.StreamServer(serve_callback, '127.0.0.1', 0) as srv: - await srv.start_serving() - await do_connect(*srv.sockets[0].getsockname()) - - self.loop.run_until_complete(test()) - - self.assertEqual([], messages) - - - @unittest.skipIf(ssl is None, 'No ssl module') - def test_connect_start_tls(self): - with test_utils.run_test_server(use_ssl=True) as httpd: - # connect without SSL but upgrade to TLS just after - # connection is established - stream = self.loop.run_until_complete( - asyncio.connect(*httpd.address)) - - self.loop.run_until_complete( - stream.start_tls( - sslcontext=test_utils.dummy_ssl_context())) - self._basetest_connect(stream) - - def test_repr_unbound(self): - async def serve(stream): - pass - - async def test(): - srv = asyncio.StreamServer(serve) - self.assertEqual('<StreamServer>', repr(srv)) - await srv.close() - - self.loop.run_until_complete(test()) - - def test_repr_bound(self): - async def serve(stream): - pass - - async def test(): - srv = asyncio.StreamServer(serve, '127.0.0.1', 0) - await srv.bind() - self.assertRegex(repr(srv), r'<StreamServer sockets=\(.+\)>') - await srv.close() - - self.loop.run_until_complete(test()) - - def test_repr_serving(self): - async def serve(stream): - pass - - async def test(): - srv = asyncio.StreamServer(serve, '127.0.0.1', 0) - await srv.start_serving() - self.assertRegex(repr(srv), r'<StreamServer serving sockets=\(.+\)>') - await srv.close() - - self.loop.run_until_complete(test()) - - - @unittest.skipUnless(sys.platform != 'win32', - "Don't support pipes for Windows") - def test_read_pipe(self): - async def test(): - rpipe, wpipe = os.pipe() - pipeobj = io.open(rpipe, 'rb', 1024) - - async with asyncio.connect_read_pipe(pipeobj) as stream: - self.assertEqual(stream.mode, asyncio.StreamMode.READ) - - os.write(wpipe, b'1') - data = await stream.readexactly(1) - self.assertEqual(data, b'1') - - os.write(wpipe, b'2345') - data = await stream.readexactly(4) - self.assertEqual(data, b'2345') - os.close(wpipe) - - self.loop.run_until_complete(test()) - - @unittest.skipUnless(sys.platform != 'win32', - "Don't support pipes for Windows") - def test_write_pipe(self): - async def test(): - rpipe, wpipe = os.pipe() - pipeobj = io.open(wpipe, 'wb', 1024) - - async with asyncio.connect_write_pipe(pipeobj) as stream: - self.assertEqual(stream.mode, asyncio.StreamMode.WRITE) - - await stream.write(b'1') - data = os.read(rpipe, 1024) - self.assertEqual(data, b'1') - - await stream.write(b'2345') - data = os.read(rpipe, 1024) - self.assertEqual(data, b'2345') - - os.close(rpipe) - - self.loop.run_until_complete(test()) - - def test_stream_ctor_forbidden(self): - with self.assertRaisesRegex(RuntimeError, - "should be instantiated " - "by asyncio internals only"): - asyncio.Stream(asyncio.StreamMode.READWRITE) - - def test_deprecated_methods(self): - async def f(): - return asyncio.Stream(mode=asyncio.StreamMode.READWRITE, - _asyncio_internal=True) - - stream = self.loop.run_until_complete(f()) - - tr = mock.Mock() - - with self.assertWarns(DeprecationWarning): - stream.set_transport(tr) - - with self.assertWarns(DeprecationWarning): - stream.transport is tr - - with self.assertWarns(DeprecationWarning): - stream.feed_data(b'data') - - with self.assertWarns(DeprecationWarning): - stream.feed_eof() - - with self.assertWarns(DeprecationWarning): - stream.set_exception(ConnectionResetError("test")) - if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 3ad18e5..fe8cfa6 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -582,18 +582,6 @@ class SubprocessMixin: self.loop.run_until_complete(execute()) - def test_subprocess_protocol_create_warning(self): - with self.assertWarns(DeprecationWarning): - subprocess.SubprocessStreamProtocol(limit=10, loop=self.loop) - - def test_process_create_warning(self): - proto = subprocess.SubprocessStreamProtocol(limit=10, loop=self.loop, - _asyncio_internal=True) - transp = mock.Mock() - - with self.assertWarns(DeprecationWarning): - subprocess.Process(transp, proto, loop=self.loop) - def test_create_subprocess_exec_text_mode_fails(self): async def execute(): with self.assertRaises(ValueError): diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index d0ba193..9ed10fc 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -15,7 +15,6 @@ import _winapi import asyncio from asyncio import windows_events -from asyncio.streams import _StreamProtocol from test.test_asyncio import utils as test_utils @@ -118,16 +117,14 @@ class ProactorTests(test_utils.TestCase): clients = [] for i in range(5): - stream = asyncio.Stream(mode=asyncio.StreamMode.READ, - loop=self.loop, _asyncio_internal=True) - protocol = _StreamProtocol(stream, - loop=self.loop, - _asyncio_internal=True) + stream_reader = asyncio.StreamReader(loop=self.loop) + protocol = asyncio.StreamReaderProtocol(stream_reader, + loop=self.loop) trans, proto = await self.loop.create_pipe_connection( lambda: protocol, ADDRESS) self.assertIsInstance(trans, asyncio.Transport) self.assertEqual(protocol, proto) - clients.append((stream, trans)) + clients.append((stream_reader, trans)) for i, (r, w) in enumerate(clients): w.write('lower-{}\n'.format(i).encode()) @@ -136,7 +133,6 @@ class ProactorTests(test_utils.TestCase): response = await r.readline() self.assertEqual(response, 'LOWER-{}\n'.format(i).encode()) w.close() - await r.close() server.close() |