summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/test_streams.py
diff options
context:
space:
mode:
authorSerhiy Storchaka <storchaka@gmail.com>2021-04-25 10:40:44 (GMT)
committerGitHub <noreply@github.com>2021-04-25 10:40:44 (GMT)
commit172c0f2752d8708b6dda7b42e6c5a3519420a4e8 (patch)
tree35a076c6baad7ca053a62b9f505af3762a867b79 /Lib/test/test_asyncio/test_streams.py
parentface87c94e67ad9c72b9a3724f112fd76c1002b9 (diff)
downloadcpython-172c0f2752d8708b6dda7b42e6c5a3519420a4e8.zip
cpython-172c0f2752d8708b6dda7b42e6c5a3519420a4e8.tar.gz
cpython-172c0f2752d8708b6dda7b42e6c5a3519420a4e8.tar.bz2
bpo-39529: Deprecate creating new event loop in asyncio.get_event_loop() (GH-23554)
asyncio.get_event_loop() emits now a deprecation warning when it creates a new event loop. In future releases it will became an alias of asyncio.get_running_loop().
Diffstat (limited to 'Lib/test/test_asyncio/test_streams.py')
-rw-r--r--Lib/test/test_asyncio/test_streams.py53
1 files changed, 42 insertions, 11 deletions
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index a075358..6eaa289 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -40,11 +40,6 @@ class StreamTests(test_utils.TestCase):
gc.collect()
super().tearDown()
- @mock.patch('asyncio.streams.events')
- def test_ctor_global_loop(self, m_events):
- stream = asyncio.StreamReader()
- self.assertIs(stream._loop, m_events.get_event_loop.return_value)
-
def _basetest_open_connection(self, open_connection_fut):
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
@@ -751,23 +746,59 @@ os.close(fd)
data = self.loop.run_until_complete(reader.read(-1))
self.assertEqual(data, b'data')
- def test_streamreader_constructor(self):
- self.addCleanup(asyncio.set_event_loop, None)
- asyncio.set_event_loop(self.loop)
+ def test_streamreader_constructor_without_loop(self):
+ with self.assertWarns(DeprecationWarning) as cm:
+ with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
+ asyncio.StreamReader()
+ self.assertEqual(cm.warnings[0].filename, __file__)
+ def test_streamreader_constructor_use_running_loop(self):
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
- reader = asyncio.StreamReader()
+ async def test():
+ return asyncio.StreamReader()
+
+ reader = self.loop.run_until_complete(test())
self.assertIs(reader._loop, self.loop)
- def test_streamreaderprotocol_constructor(self):
+ def test_streamreader_constructor_use_global_loop(self):
+ # asyncio issue #184: Ensure that StreamReaderProtocol constructor
+ # retrieves the current loop if the loop parameter is not set
+ # Deprecated in 3.10
self.addCleanup(asyncio.set_event_loop, None)
asyncio.set_event_loop(self.loop)
+ with self.assertWarns(DeprecationWarning) as cm:
+ reader = asyncio.StreamReader()
+ self.assertEqual(cm.warnings[0].filename, __file__)
+ self.assertIs(reader._loop, self.loop)
+
+ def test_streamreaderprotocol_constructor_without_loop(self):
+ reader = mock.Mock()
+ with self.assertWarns(DeprecationWarning) as cm:
+ with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
+ asyncio.StreamReaderProtocol(reader)
+ self.assertEqual(cm.warnings[0].filename, __file__)
+
+ def test_streamreaderprotocol_constructor_use_running_loop(self):
+ # asyncio issue #184: Ensure that StreamReaderProtocol constructor
+ # retrieves the current loop if the loop parameter is not set
+ reader = mock.Mock()
+ async def test():
+ return asyncio.StreamReaderProtocol(reader)
+ protocol = self.loop.run_until_complete(test())
+ self.assertIs(protocol._loop, self.loop)
+
+ def test_streamreaderprotocol_constructor_use_global_loop(self):
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
+ # Deprecated in 3.10
+ self.addCleanup(asyncio.set_event_loop, None)
+ asyncio.set_event_loop(self.loop)
reader = mock.Mock()
- protocol = asyncio.StreamReaderProtocol(reader)
+ with self.assertWarns(DeprecationWarning) as cm:
+ protocol = asyncio.StreamReaderProtocol(reader)
+ self.assertEqual(cm.warnings[0].filename, __file__)
self.assertIs(protocol._loop, self.loop)
def test_drain_raises(self):