summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/test_runners.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asyncio/test_runners.py')
-rw-r--r--Lib/test/test_asyncio/test_runners.py179
1 files changed, 0 insertions, 179 deletions
diff --git a/Lib/test/test_asyncio/test_runners.py b/Lib/test/test_asyncio/test_runners.py
deleted file mode 100644
index 3b58dde..0000000
--- a/Lib/test/test_asyncio/test_runners.py
+++ /dev/null
@@ -1,179 +0,0 @@
-import asyncio
-import unittest
-
-from unittest import mock
-from . import utils as test_utils
-
-
-class TestPolicy(asyncio.AbstractEventLoopPolicy):
-
- def __init__(self, loop_factory):
- self.loop_factory = loop_factory
- self.loop = None
-
- def get_event_loop(self):
- # shouldn't ever be called by asyncio.run()
- raise RuntimeError
-
- def new_event_loop(self):
- return self.loop_factory()
-
- def set_event_loop(self, loop):
- if loop is not None:
- # we want to check if the loop is closed
- # in BaseTest.tearDown
- self.loop = loop
-
-
-class BaseTest(unittest.TestCase):
-
- def new_loop(self):
- loop = asyncio.BaseEventLoop()
- loop._process_events = mock.Mock()
- loop._selector = mock.Mock()
- loop._selector.select.return_value = ()
- loop.shutdown_ag_run = False
-
- async def shutdown_asyncgens():
- loop.shutdown_ag_run = True
- loop.shutdown_asyncgens = shutdown_asyncgens
-
- return loop
-
- def setUp(self):
- super().setUp()
-
- policy = TestPolicy(self.new_loop)
- asyncio.set_event_loop_policy(policy)
-
- def tearDown(self):
- policy = asyncio.get_event_loop_policy()
- if policy.loop is not None:
- self.assertTrue(policy.loop.is_closed())
- self.assertTrue(policy.loop.shutdown_ag_run)
-
- asyncio.set_event_loop_policy(None)
- super().tearDown()
-
-
-class RunTests(BaseTest):
-
- def test_asyncio_run_return(self):
- async def main():
- await asyncio.sleep(0)
- return 42
-
- self.assertEqual(asyncio.run(main()), 42)
-
- def test_asyncio_run_raises(self):
- async def main():
- await asyncio.sleep(0)
- raise ValueError('spam')
-
- with self.assertRaisesRegex(ValueError, 'spam'):
- asyncio.run(main())
-
- def test_asyncio_run_only_coro(self):
- for o in {1, lambda: None}:
- with self.subTest(obj=o), \
- self.assertRaisesRegex(ValueError,
- 'a coroutine was expected'):
- asyncio.run(o)
-
- def test_asyncio_run_debug(self):
- async def main(expected):
- loop = asyncio.get_event_loop()
- self.assertIs(loop.get_debug(), expected)
-
- asyncio.run(main(False))
- asyncio.run(main(True), debug=True)
-
- def test_asyncio_run_from_running_loop(self):
- async def main():
- coro = main()
- try:
- asyncio.run(coro)
- finally:
- coro.close() # Suppress ResourceWarning
-
- with self.assertRaisesRegex(RuntimeError,
- 'cannot be called from a running'):
- asyncio.run(main())
-
- def test_asyncio_run_cancels_hanging_tasks(self):
- lo_task = None
-
- async def leftover():
- await asyncio.sleep(0.1)
-
- async def main():
- nonlocal lo_task
- lo_task = asyncio.create_task(leftover())
- return 123
-
- self.assertEqual(asyncio.run(main()), 123)
- self.assertTrue(lo_task.done())
-
- def test_asyncio_run_reports_hanging_tasks_errors(self):
- lo_task = None
- call_exc_handler_mock = mock.Mock()
-
- async def leftover():
- try:
- await asyncio.sleep(0.1)
- except asyncio.CancelledError:
- 1 / 0
-
- async def main():
- loop = asyncio.get_running_loop()
- loop.call_exception_handler = call_exc_handler_mock
-
- nonlocal lo_task
- lo_task = asyncio.create_task(leftover())
- return 123
-
- self.assertEqual(asyncio.run(main()), 123)
- self.assertTrue(lo_task.done())
-
- call_exc_handler_mock.assert_called_with({
- 'message': test_utils.MockPattern(r'asyncio.run.*shutdown'),
- 'task': lo_task,
- 'exception': test_utils.MockInstanceOf(ZeroDivisionError)
- })
-
- def test_asyncio_run_closes_gens_after_hanging_tasks_errors(self):
- spinner = None
- lazyboy = None
-
- class FancyExit(Exception):
- pass
-
- async def fidget():
- while True:
- yield 1
- await asyncio.sleep(1)
-
- async def spin():
- nonlocal spinner
- spinner = fidget()
- try:
- async for the_meaning_of_life in spinner: # NoQA
- pass
- except asyncio.CancelledError:
- 1 / 0
-
- async def main():
- loop = asyncio.get_running_loop()
- loop.call_exception_handler = mock.Mock()
-
- nonlocal lazyboy
- lazyboy = asyncio.create_task(spin())
- raise FancyExit
-
- with self.assertRaises(FancyExit):
- asyncio.run(main())
-
- self.assertTrue(lazyboy.done())
-
- self.assertIsNone(spinner.ag_frame)
- self.assertFalse(spinner.ag_running)