diff options
author | Serhiy Storchaka <storchaka@gmail.com> | 2021-04-25 10:40:44 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-25 10:40:44 (GMT) |
commit | 172c0f2752d8708b6dda7b42e6c5a3519420a4e8 (patch) | |
tree | 35a076c6baad7ca053a62b9f505af3762a867b79 /Lib/test/test_asyncio/test_streams.py | |
parent | face87c94e67ad9c72b9a3724f112fd76c1002b9 (diff) | |
download | cpython-172c0f2752d8708b6dda7b42e6c5a3519420a4e8.zip cpython-172c0f2752d8708b6dda7b42e6c5a3519420a4e8.tar.gz cpython-172c0f2752d8708b6dda7b42e6c5a3519420a4e8.tar.bz2 |
bpo-39529: Deprecate creating new event loop in asyncio.get_event_loop() (GH-23554)
asyncio.get_event_loop() emits now a deprecation warning when it creates a new event loop.
In future releases it will became an alias of asyncio.get_running_loop().
Diffstat (limited to 'Lib/test/test_asyncio/test_streams.py')
-rw-r--r-- | Lib/test/test_asyncio/test_streams.py | 53 |
1 files changed, 42 insertions, 11 deletions
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index a075358..6eaa289 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -40,11 +40,6 @@ class StreamTests(test_utils.TestCase): gc.collect() super().tearDown() - @mock.patch('asyncio.streams.events') - def test_ctor_global_loop(self, m_events): - stream = asyncio.StreamReader() - self.assertIs(stream._loop, m_events.get_event_loop.return_value) - def _basetest_open_connection(self, open_connection_fut): messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) @@ -751,23 +746,59 @@ os.close(fd) data = self.loop.run_until_complete(reader.read(-1)) self.assertEqual(data, b'data') - def test_streamreader_constructor(self): - self.addCleanup(asyncio.set_event_loop, None) - asyncio.set_event_loop(self.loop) + def test_streamreader_constructor_without_loop(self): + with self.assertWarns(DeprecationWarning) as cm: + with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'): + asyncio.StreamReader() + self.assertEqual(cm.warnings[0].filename, __file__) + def test_streamreader_constructor_use_running_loop(self): # asyncio issue #184: Ensure that StreamReaderProtocol constructor # retrieves the current loop if the loop parameter is not set - reader = asyncio.StreamReader() + async def test(): + return asyncio.StreamReader() + + reader = self.loop.run_until_complete(test()) self.assertIs(reader._loop, self.loop) - def test_streamreaderprotocol_constructor(self): + def test_streamreader_constructor_use_global_loop(self): + # asyncio issue #184: Ensure that StreamReaderProtocol constructor + # retrieves the current loop if the loop parameter is not set + # Deprecated in 3.10 self.addCleanup(asyncio.set_event_loop, None) asyncio.set_event_loop(self.loop) + with self.assertWarns(DeprecationWarning) as cm: + reader = asyncio.StreamReader() + self.assertEqual(cm.warnings[0].filename, __file__) + self.assertIs(reader._loop, self.loop) + + def test_streamreaderprotocol_constructor_without_loop(self): + reader = mock.Mock() + with self.assertWarns(DeprecationWarning) as cm: + with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'): + asyncio.StreamReaderProtocol(reader) + self.assertEqual(cm.warnings[0].filename, __file__) + + def test_streamreaderprotocol_constructor_use_running_loop(self): + # asyncio issue #184: Ensure that StreamReaderProtocol constructor + # retrieves the current loop if the loop parameter is not set + reader = mock.Mock() + async def test(): + return asyncio.StreamReaderProtocol(reader) + protocol = self.loop.run_until_complete(test()) + self.assertIs(protocol._loop, self.loop) + + def test_streamreaderprotocol_constructor_use_global_loop(self): # asyncio issue #184: Ensure that StreamReaderProtocol constructor # retrieves the current loop if the loop parameter is not set + # Deprecated in 3.10 + self.addCleanup(asyncio.set_event_loop, None) + asyncio.set_event_loop(self.loop) reader = mock.Mock() - protocol = asyncio.StreamReaderProtocol(reader) + with self.assertWarns(DeprecationWarning) as cm: + protocol = asyncio.StreamReaderProtocol(reader) + self.assertEqual(cm.warnings[0].filename, __file__) self.assertIs(protocol._loop, self.loop) def test_drain_raises(self): |