summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/test_runners.py
diff options
context:
space:
mode:
authorYury Selivanov <yury@magic.io>2017-12-14 14:42:21 (GMT)
committerGitHub <noreply@github.com>2017-12-14 14:42:21 (GMT)
commit02a0a19206da6902c3855a1fa09e60b208474cfa (patch)
treedf9a24bf2a131693ef4f3dad55849c22a1567991 /Lib/test/test_asyncio/test_runners.py
parenteadad1b97f64619bfd246b9d3b60d25f456e0592 (diff)
downloadcpython-02a0a19206da6902c3855a1fa09e60b208474cfa.zip
cpython-02a0a19206da6902c3855a1fa09e60b208474cfa.tar.gz
cpython-02a0a19206da6902c3855a1fa09e60b208474cfa.tar.bz2
bpo-32314: Implement asyncio.run() (#4852)
Diffstat (limited to 'Lib/test/test_asyncio/test_runners.py')
-rw-r--r--Lib/test/test_asyncio/test_runners.py100
1 files changed, 100 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_runners.py b/Lib/test/test_asyncio/test_runners.py
new file mode 100644
index 0000000..c52bd94
--- /dev/null
+++ b/Lib/test/test_asyncio/test_runners.py
@@ -0,0 +1,100 @@
+import asyncio
+import unittest
+
+from unittest import mock
+
+
+class TestPolicy(asyncio.AbstractEventLoopPolicy):
+
+ def __init__(self, loop_factory):
+ self.loop_factory = loop_factory
+ self.loop = None
+
+ def get_event_loop(self):
+ # shouldn't ever be called by asyncio.run()
+ raise RuntimeError
+
+ def new_event_loop(self):
+ return self.loop_factory()
+
+ def set_event_loop(self, loop):
+ if loop is not None:
+ # we want to check if the loop is closed
+ # in BaseTest.tearDown
+ self.loop = loop
+
+
+class BaseTest(unittest.TestCase):
+
+ def new_loop(self):
+ loop = asyncio.BaseEventLoop()
+ loop._process_events = mock.Mock()
+ loop._selector = mock.Mock()
+ loop._selector.select.return_value = ()
+ loop.shutdown_ag_run = False
+
+ async def shutdown_asyncgens():
+ loop.shutdown_ag_run = True
+ loop.shutdown_asyncgens = shutdown_asyncgens
+
+ return loop
+
+ def setUp(self):
+ super().setUp()
+
+ policy = TestPolicy(self.new_loop)
+ asyncio.set_event_loop_policy(policy)
+
+ def tearDown(self):
+ policy = asyncio.get_event_loop_policy()
+ if policy.loop is not None:
+ self.assertTrue(policy.loop.is_closed())
+ self.assertTrue(policy.loop.shutdown_ag_run)
+
+ asyncio.set_event_loop_policy(None)
+ super().tearDown()
+
+
+class RunTests(BaseTest):
+
+ def test_asyncio_run_return(self):
+ async def main():
+ await asyncio.sleep(0)
+ return 42
+
+ self.assertEqual(asyncio.run(main()), 42)
+
+ def test_asyncio_run_raises(self):
+ async def main():
+ await asyncio.sleep(0)
+ raise ValueError('spam')
+
+ with self.assertRaisesRegex(ValueError, 'spam'):
+ asyncio.run(main())
+
+ def test_asyncio_run_only_coro(self):
+ for o in {1, lambda: None}:
+ with self.subTest(obj=o), \
+ self.assertRaisesRegex(ValueError,
+ 'a coroutine was expected'):
+ asyncio.run(o)
+
+ def test_asyncio_run_debug(self):
+ async def main(expected):
+ loop = asyncio.get_event_loop()
+ self.assertIs(loop.get_debug(), expected)
+
+ asyncio.run(main(False))
+ asyncio.run(main(True), debug=True)
+
+ def test_asyncio_run_from_running_loop(self):
+ async def main():
+ coro = main()
+ try:
+ asyncio.run(coro)
+ finally:
+ coro.close() # Suppress ResourceWarning
+
+ with self.assertRaisesRegex(RuntimeError,
+ 'cannot be called from a running'):
+ asyncio.run(main())