summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2022-03-24 19:51:16 (GMT)
committerGitHub <noreply@github.com>2022-03-24 19:51:16 (GMT)
commit4119d2d7c9e25acd4f16994fb92d656f8b7816d7 (patch)
tree13f5086fc5ad0247381d347c4271a8ca79a20fd7 /Lib
parent2f49b97cc5426087b46515254b9a97a22ee8c807 (diff)
downloadcpython-4119d2d7c9e25acd4f16994fb92d656f8b7816d7.zip
cpython-4119d2d7c9e25acd4f16994fb92d656f8b7816d7.tar.gz
cpython-4119d2d7c9e25acd4f16994fb92d656f8b7816d7.tar.bz2
bpo-47062: Implement asyncio.Runner context manager (GH-31799)
Co-authored-by: Zachary Ware <zach@python.org>
Diffstat (limited to 'Lib')
-rw-r--r--Lib/asyncio/runners.py124
-rw-r--r--Lib/test/test_asyncio/test_runners.py133
-rw-r--r--Lib/unittest/async_case.py70
3 files changed, 258 insertions, 69 deletions
diff --git a/Lib/asyncio/runners.py b/Lib/asyncio/runners.py
index 9a5e9a4..975509c 100644
--- a/Lib/asyncio/runners.py
+++ b/Lib/asyncio/runners.py
@@ -1,10 +1,112 @@
-__all__ = 'run',
+__all__ = ('Runner', 'run')
+import contextvars
+import enum
from . import coroutines
from . import events
from . import tasks
+class _State(enum.Enum):
+ CREATED = "created"
+ INITIALIZED = "initialized"
+ CLOSED = "closed"
+
+
+class Runner:
+ """A context manager that controls event loop life cycle.
+
+ The context manager always creates a new event loop,
+ allows to run async functions inside it,
+ and properly finalizes the loop at the context manager exit.
+
+ If debug is True, the event loop will be run in debug mode.
+ If factory is passed, it is used for new event loop creation.
+
+ asyncio.run(main(), debug=True)
+
+ is a shortcut for
+
+ with asyncio.Runner(debug=True) as runner:
+ runner.run(main())
+
+ The run() method can be called multiple times within the runner's context.
+
+ This can be useful for interactive console (e.g. IPython),
+ unittest runners, console tools, -- everywhere when async code
+ is called from existing sync framework and where the preferred single
+ asyncio.run() call doesn't work.
+
+ """
+
+ # Note: the class is final, it is not intended for inheritance.
+
+ def __init__(self, *, debug=None, factory=None):
+ self._state = _State.CREATED
+ self._debug = debug
+ self._factory = factory
+ self._loop = None
+ self._context = None
+
+ def __enter__(self):
+ self._lazy_init()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ def close(self):
+ """Shutdown and close event loop."""
+ if self._state is not _State.INITIALIZED:
+ return
+ try:
+ loop = self._loop
+ _cancel_all_tasks(loop)
+ loop.run_until_complete(loop.shutdown_asyncgens())
+ loop.run_until_complete(loop.shutdown_default_executor())
+ finally:
+ loop.close()
+ self._loop = None
+ self._state = _State.CLOSED
+
+ def get_loop(self):
+ """Return embedded event loop."""
+ self._lazy_init()
+ return self._loop
+
+ def run(self, coro, *, context=None):
+ """Run a coroutine inside the embedded event loop."""
+ if not coroutines.iscoroutine(coro):
+ raise ValueError("a coroutine was expected, got {!r}".format(coro))
+
+ if events._get_running_loop() is not None:
+ # fail fast with short traceback
+ raise RuntimeError(
+ "Runner.run() cannot be called from a running event loop")
+
+ self._lazy_init()
+
+ if context is None:
+ context = self._context
+ task = self._loop.create_task(coro, context=context)
+ return self._loop.run_until_complete(task)
+
+ def _lazy_init(self):
+ if self._state is _State.CLOSED:
+ raise RuntimeError("Runner is closed")
+ if self._state is _State.INITIALIZED:
+ return
+ if self._factory is None:
+ self._loop = events.new_event_loop()
+ else:
+ self._loop = self._factory()
+ if self._debug is not None:
+ self._loop.set_debug(self._debug)
+ self._context = contextvars.copy_context()
+ self._state = _State.INITIALIZED
+
+
+
def run(main, *, debug=None):
"""Execute the coroutine and return the result.
@@ -30,26 +132,12 @@ def run(main, *, debug=None):
asyncio.run(main())
"""
if events._get_running_loop() is not None:
+ # fail fast with short traceback
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
- if not coroutines.iscoroutine(main):
- raise ValueError("a coroutine was expected, got {!r}".format(main))
-
- loop = events.new_event_loop()
- try:
- events.set_event_loop(loop)
- if debug is not None:
- loop.set_debug(debug)
- return loop.run_until_complete(main)
- finally:
- try:
- _cancel_all_tasks(loop)
- loop.run_until_complete(loop.shutdown_asyncgens())
- loop.run_until_complete(loop.shutdown_default_executor())
- finally:
- events.set_event_loop(None)
- loop.close()
+ with Runner(debug=debug) as runner:
+ return runner.run(main)
def _cancel_all_tasks(loop):
diff --git a/Lib/test/test_asyncio/test_runners.py b/Lib/test/test_asyncio/test_runners.py
index 1122736..2919412 100644
--- a/Lib/test/test_asyncio/test_runners.py
+++ b/Lib/test/test_asyncio/test_runners.py
@@ -1,4 +1,7 @@
import asyncio
+import contextvars
+import gc
+import re
import unittest
from unittest import mock
@@ -186,5 +189,135 @@ class RunTests(BaseTest):
self.assertFalse(spinner.ag_running)
+class RunnerTests(BaseTest):
+
+ def test_non_debug(self):
+ with asyncio.Runner(debug=False) as runner:
+ self.assertFalse(runner.get_loop().get_debug())
+
+ def test_debug(self):
+ with asyncio.Runner(debug=True) as runner:
+ self.assertTrue(runner.get_loop().get_debug())
+
+ def test_custom_factory(self):
+ loop = mock.Mock()
+ with asyncio.Runner(factory=lambda: loop) as runner:
+ self.assertIs(runner.get_loop(), loop)
+
+ def test_run(self):
+ async def f():
+ await asyncio.sleep(0)
+ return 'done'
+
+ with asyncio.Runner() as runner:
+ self.assertEqual('done', runner.run(f()))
+ loop = runner.get_loop()
+
+ with self.assertRaisesRegex(
+ RuntimeError,
+ "Runner is closed"
+ ):
+ runner.get_loop()
+
+ self.assertTrue(loop.is_closed())
+
+ def test_run_non_coro(self):
+ with asyncio.Runner() as runner:
+ with self.assertRaisesRegex(
+ ValueError,
+ "a coroutine was expected"
+ ):
+ runner.run(123)
+
+ def test_run_future(self):
+ with asyncio.Runner() as runner:
+ with self.assertRaisesRegex(
+ ValueError,
+ "a coroutine was expected"
+ ):
+ fut = runner.get_loop().create_future()
+ runner.run(fut)
+
+ def test_explicit_close(self):
+ runner = asyncio.Runner()
+ loop = runner.get_loop()
+ runner.close()
+ with self.assertRaisesRegex(
+ RuntimeError,
+ "Runner is closed"
+ ):
+ runner.get_loop()
+
+ self.assertTrue(loop.is_closed())
+
+ def test_double_close(self):
+ runner = asyncio.Runner()
+ loop = runner.get_loop()
+
+ runner.close()
+ self.assertTrue(loop.is_closed())
+
+ # the second call is no-op
+ runner.close()
+ self.assertTrue(loop.is_closed())
+
+ def test_second_with_block_raises(self):
+ ret = []
+
+ async def f(arg):
+ ret.append(arg)
+
+ runner = asyncio.Runner()
+ with runner:
+ runner.run(f(1))
+
+ with self.assertRaisesRegex(
+ RuntimeError,
+ "Runner is closed"
+ ):
+ with runner:
+ runner.run(f(2))
+
+ self.assertEqual([1], ret)
+
+ def test_run_keeps_context(self):
+ cvar = contextvars.ContextVar("cvar", default=-1)
+
+ async def f(val):
+ old = cvar.get()
+ await asyncio.sleep(0)
+ cvar.set(val)
+ return old
+
+ async def get_context():
+ return contextvars.copy_context()
+
+ with asyncio.Runner() as runner:
+ self.assertEqual(-1, runner.run(f(1)))
+ self.assertEqual(1, runner.run(f(2)))
+
+ self.assertEqual({cvar: 2}, dict(runner.run(get_context())))
+
+ def test_recursine_run(self):
+ async def g():
+ pass
+
+ async def f():
+ runner.run(g())
+
+ with asyncio.Runner() as runner:
+ with self.assertWarnsRegex(
+ RuntimeWarning,
+ "coroutine .+ was never awaited",
+ ):
+ with self.assertRaisesRegex(
+ RuntimeError,
+ re.escape(
+ "Runner.run() cannot be called from a running event loop"
+ ),
+ ):
+ runner.run(f())
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/Lib/unittest/async_case.py b/Lib/unittest/async_case.py
index 25adc3d..85b938f 100644
--- a/Lib/unittest/async_case.py
+++ b/Lib/unittest/async_case.py
@@ -34,7 +34,7 @@ class IsolatedAsyncioTestCase(TestCase):
def __init__(self, methodName='runTest'):
super().__init__(methodName)
- self._asyncioTestLoop = None
+ self._asyncioRunner = None
self._asyncioTestContext = contextvars.copy_context()
async def asyncSetUp(self):
@@ -75,76 +75,44 @@ class IsolatedAsyncioTestCase(TestCase):
self._callMaybeAsync(function, *args, **kwargs)
def _callAsync(self, func, /, *args, **kwargs):
- assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
+ assert self._asyncioRunner is not None, 'asyncio runner is not initialized'
assert inspect.iscoroutinefunction(func), f'{func!r} is not an async function'
- task = self._asyncioTestLoop.create_task(
+ return self._asyncioRunner.run(
func(*args, **kwargs),
- context=self._asyncioTestContext,
+ context=self._asyncioTestContext
)
- return self._asyncioTestLoop.run_until_complete(task)
def _callMaybeAsync(self, func, /, *args, **kwargs):
- assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
+ assert self._asyncioRunner is not None, 'asyncio runner is not initialized'
if inspect.iscoroutinefunction(func):
- task = self._asyncioTestLoop.create_task(
+ return self._asyncioRunner.run(
func(*args, **kwargs),
context=self._asyncioTestContext,
)
- return self._asyncioTestLoop.run_until_complete(task)
else:
return self._asyncioTestContext.run(func, *args, **kwargs)
- def _setupAsyncioLoop(self):
- assert self._asyncioTestLoop is None, 'asyncio test loop already initialized'
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop.set_debug(True)
- self._asyncioTestLoop = loop
+ def _setupAsyncioRunner(self):
+ assert self._asyncioRunner is None, 'asyncio runner is already initialized'
+ runner = asyncio.Runner(debug=True)
+ self._asyncioRunner = runner
- def _tearDownAsyncioLoop(self):
- assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
- loop = self._asyncioTestLoop
- self._asyncioTestLoop = None
-
- try:
- # cancel all tasks
- to_cancel = asyncio.all_tasks(loop)
- if not to_cancel:
- return
-
- for task in to_cancel:
- task.cancel()
-
- loop.run_until_complete(
- asyncio.gather(*to_cancel, return_exceptions=True))
-
- for task in to_cancel:
- if task.cancelled():
- continue
- if task.exception() is not None:
- loop.call_exception_handler({
- 'message': 'unhandled exception during test shutdown',
- 'exception': task.exception(),
- 'task': task,
- })
- # shutdown asyncgens
- loop.run_until_complete(loop.shutdown_asyncgens())
- finally:
- asyncio.set_event_loop(None)
- loop.close()
+ def _tearDownAsyncioRunner(self):
+ runner = self._asyncioRunner
+ runner.close()
def run(self, result=None):
- self._setupAsyncioLoop()
+ self._setupAsyncioRunner()
try:
return super().run(result)
finally:
- self._tearDownAsyncioLoop()
+ self._tearDownAsyncioRunner()
def debug(self):
- self._setupAsyncioLoop()
+ self._setupAsyncioRunner()
super().debug()
- self._tearDownAsyncioLoop()
+ self._tearDownAsyncioRunner()
def __del__(self):
- if self._asyncioTestLoop is not None:
- self._tearDownAsyncioLoop()
+ if self._asyncioRunner is not None:
+ self._tearDownAsyncioRunner()