summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorItamar Ostricher <itamarost@gmail.com>2023-05-01 21:10:13 (GMT)
committerGitHub <noreply@github.com>2023-05-01 21:10:13 (GMT)
commita474e04388c2ef6aca75c26cb70a1b6200235feb (patch)
tree43520d5ad16016620f149dc1e84d4d57e45051d5
parent59bc36aacddd5a3acd32c80c0dfd0726135a7817 (diff)
downloadcpython-a474e04388c2ef6aca75c26cb70a1b6200235feb.zip
cpython-a474e04388c2ef6aca75c26cb70a1b6200235feb.tar.gz
cpython-a474e04388c2ef6aca75c26cb70a1b6200235feb.tar.bz2
gh-97696: asyncio eager tasks factory (#102853)
Co-authored-by: Jacob Bower <jbower@meta.com> Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
-rw-r--r--Doc/library/asyncio-task.rst36
-rw-r--r--Doc/whatsnew/3.12.rst5
-rw-r--r--Include/internal/pycore_global_objects_fini_generated.h2
-rw-r--r--Include/internal/pycore_global_strings.h2
-rw-r--r--Include/internal/pycore_runtime_init_generated.h2
-rw-r--r--Include/internal/pycore_unicodeobject_generated.h6
-rw-r--r--Lib/asyncio/base_tasks.py10
-rw-r--r--Lib/asyncio/tasks.py122
-rw-r--r--Lib/test/test_asyncio/test_eager_task_factory.py344
-rw-r--r--Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst6
-rw-r--r--Modules/_asynciomodule.c254
-rw-r--r--Modules/clinic/_asynciomodule.c.h203
12 files changed, 945 insertions, 47 deletions
diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst
index ba0f909..f8727b9 100644
--- a/Doc/library/asyncio-task.rst
+++ b/Doc/library/asyncio-task.rst
@@ -527,6 +527,42 @@ Running Tasks Concurrently
and there is no running event loop.
+Eager Task Factory
+==================
+
+.. function:: eager_task_factory(loop, coro, *, name=None, context=None)
+
+ A task factory for eager task execution.
+
+ When using this factory (via :meth:`loop.set_task_factory(asyncio.eager_task_factory) <loop.set_task_factory>`),
+ coroutines begin execution synchronously during :class:`Task` construction.
+ Tasks are only scheduled on the event loop if they block.
+ This can be a performance improvement as the overhead of loop scheduling
+ is avoided for coroutines that complete synchronously.
+
+ A common example where this is beneficial is coroutines which employ
+ caching or memoization to avoid actual I/O when possible.
+
+ .. note::
+
+ Immediate execution of the coroutine is a semantic change.
+ If the coroutine returns or raises, the task is never scheduled
+ to the event loop. If the coroutine execution blocks, the task is
+ scheduled to the event loop. This change may introduce behavior
+ changes to existing applications. For example,
+ the application's task execution order is likely to change.
+
+ .. versionadded:: 3.12
+
+.. function:: create_eager_task_factory(custom_task_constructor)
+
+ Create an eager task factory, similar to :func:`eager_task_factory`,
+ using the provided *custom_task_constructor* when creating a new task instead
+ of the default :class:`Task`.
+
+ .. versionadded:: 3.12
+
+
Shielding From Cancellation
===========================
diff --git a/Doc/whatsnew/3.12.rst b/Doc/whatsnew/3.12.rst
index f4ee30b..a3fce7c 100644
--- a/Doc/whatsnew/3.12.rst
+++ b/Doc/whatsnew/3.12.rst
@@ -613,6 +613,11 @@ Optimizations
* Speed up :class:`asyncio.Task` creation by deferring expensive string formatting.
(Contributed by Itamar O in :gh:`103793`.)
+* Added :func:`asyncio.eager_task_factory` and :func:`asyncio.create_eager_task_factory`
+ functions to allow opting an event loop in to eager task execution,
+ speeding up some use-cases by up to 50%.
+ (Contributed by Jacob Bower & Itamar O in :gh:`102853`)
+
CPython bytecode changes
========================
diff --git a/Include/internal/pycore_global_objects_fini_generated.h b/Include/internal/pycore_global_objects_fini_generated.h
index 4fa15d7..5e8a8d7 100644
--- a/Include/internal/pycore_global_objects_fini_generated.h
+++ b/Include/internal/pycore_global_objects_fini_generated.h
@@ -882,6 +882,7 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) {
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(dst_dir_fd));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(duration));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(e));
+ _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(eager_start));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(effective_ids));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(element_factory));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(encode));
@@ -972,6 +973,7 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) {
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(instructions));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(intern));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(intersection));
+ _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(is_running));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(isatty));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(isinstance));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(isoformat));
diff --git a/Include/internal/pycore_global_strings.h b/Include/internal/pycore_global_strings.h
index e19d8ff..28e8220 100644
--- a/Include/internal/pycore_global_strings.h
+++ b/Include/internal/pycore_global_strings.h
@@ -370,6 +370,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(dst_dir_fd)
STRUCT_FOR_ID(duration)
STRUCT_FOR_ID(e)
+ STRUCT_FOR_ID(eager_start)
STRUCT_FOR_ID(effective_ids)
STRUCT_FOR_ID(element_factory)
STRUCT_FOR_ID(encode)
@@ -460,6 +461,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(instructions)
STRUCT_FOR_ID(intern)
STRUCT_FOR_ID(intersection)
+ STRUCT_FOR_ID(is_running)
STRUCT_FOR_ID(isatty)
STRUCT_FOR_ID(isinstance)
STRUCT_FOR_ID(isoformat)
diff --git a/Include/internal/pycore_runtime_init_generated.h b/Include/internal/pycore_runtime_init_generated.h
index 42c4874..dd44711 100644
--- a/Include/internal/pycore_runtime_init_generated.h
+++ b/Include/internal/pycore_runtime_init_generated.h
@@ -876,6 +876,7 @@ extern "C" {
INIT_ID(dst_dir_fd), \
INIT_ID(duration), \
INIT_ID(e), \
+ INIT_ID(eager_start), \
INIT_ID(effective_ids), \
INIT_ID(element_factory), \
INIT_ID(encode), \
@@ -966,6 +967,7 @@ extern "C" {
INIT_ID(instructions), \
INIT_ID(intern), \
INIT_ID(intersection), \
+ INIT_ID(is_running), \
INIT_ID(isatty), \
INIT_ID(isinstance), \
INIT_ID(isoformat), \
diff --git a/Include/internal/pycore_unicodeobject_generated.h b/Include/internal/pycore_unicodeobject_generated.h
index 6d9cd24..1a8338b 100644
--- a/Include/internal/pycore_unicodeobject_generated.h
+++ b/Include/internal/pycore_unicodeobject_generated.h
@@ -963,6 +963,9 @@ _PyUnicode_InitStaticStrings(PyInterpreterState *interp) {
string = &_Py_ID(e);
assert(_PyUnicode_CheckConsistency(string, 1));
_PyUnicode_InternInPlace(interp, &string);
+ string = &_Py_ID(eager_start);
+ assert(_PyUnicode_CheckConsistency(string, 1));
+ _PyUnicode_InternInPlace(interp, &string);
string = &_Py_ID(effective_ids);
assert(_PyUnicode_CheckConsistency(string, 1));
_PyUnicode_InternInPlace(interp, &string);
@@ -1233,6 +1236,9 @@ _PyUnicode_InitStaticStrings(PyInterpreterState *interp) {
string = &_Py_ID(intersection);
assert(_PyUnicode_CheckConsistency(string, 1));
_PyUnicode_InternInPlace(interp, &string);
+ string = &_Py_ID(is_running);
+ assert(_PyUnicode_CheckConsistency(string, 1));
+ _PyUnicode_InternInPlace(interp, &string);
string = &_Py_ID(isatty);
assert(_PyUnicode_CheckConsistency(string, 1));
_PyUnicode_InternInPlace(interp, &string);
diff --git a/Lib/asyncio/base_tasks.py b/Lib/asyncio/base_tasks.py
index 26298e6..c907b68 100644
--- a/Lib/asyncio/base_tasks.py
+++ b/Lib/asyncio/base_tasks.py
@@ -15,11 +15,13 @@ def _task_repr_info(task):
info.insert(1, 'name=%r' % task.get_name())
- coro = coroutines._format_coroutine(task._coro)
- info.insert(2, f'coro=<{coro}>')
-
if task._fut_waiter is not None:
- info.insert(3, f'wait_for={task._fut_waiter!r}')
+ info.insert(2, f'wait_for={task._fut_waiter!r}')
+
+ if task._coro:
+ coro = coroutines._format_coroutine(task._coro)
+ info.insert(2, f'coro=<{coro}>')
+
return info
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index c90d32c..aa5269a 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -6,6 +6,7 @@ __all__ = (
'wait', 'wait_for', 'as_completed', 'sleep',
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
'current_task', 'all_tasks',
+ 'create_eager_task_factory', 'eager_task_factory',
'_register_task', '_unregister_task', '_enter_task', '_leave_task',
)
@@ -43,22 +44,26 @@ def all_tasks(loop=None):
"""Return a set of all tasks for the loop."""
if loop is None:
loop = events.get_running_loop()
- # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
- # thread while we do so. Therefore we cast it to list prior to filtering. The list
- # cast itself requires iteration, so we repeat it several times ignoring
- # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
- # details.
+ # capturing the set of eager tasks first, so if an eager task "graduates"
+ # to a regular task in another thread, we don't risk missing it.
+ eager_tasks = list(_eager_tasks)
+ # Looping over the WeakSet isn't safe as it can be updated from another
+ # thread, therefore we cast it to list prior to filtering. The list cast
+ # itself requires iteration, so we repeat it several times ignoring
+ # RuntimeErrors (which are not very likely to occur).
+ # See issues 34970 and 36607 for details.
+ scheduled_tasks = None
i = 0
while True:
try:
- tasks = list(_all_tasks)
+ scheduled_tasks = list(_scheduled_tasks)
except RuntimeError:
i += 1
if i >= 1000:
raise
else:
break
- return {t for t in tasks
+ return {t for t in itertools.chain(scheduled_tasks, eager_tasks)
if futures._get_loop(t) is loop and not t.done()}
@@ -93,7 +98,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
# status is still pending
_log_destroy_pending = True
- def __init__(self, coro, *, loop=None, name=None, context=None):
+ def __init__(self, coro, *, loop=None, name=None, context=None,
+ eager_start=False):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
@@ -117,8 +123,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
else:
self._context = context
- self._loop.call_soon(self.__step, context=self._context)
- _register_task(self)
+ if eager_start and self._loop.is_running():
+ self.__eager_start()
+ else:
+ self._loop.call_soon(self.__step, context=self._context)
+ _register_task(self)
def __del__(self):
if self._state == futures._PENDING and self._log_destroy_pending:
@@ -250,6 +259,25 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._num_cancels_requested -= 1
return self._num_cancels_requested
+ def __eager_start(self):
+ prev_task = _swap_current_task(self._loop, self)
+ try:
+ _register_eager_task(self)
+ try:
+ self._context.run(self.__step_run_and_handle_result, None)
+ finally:
+ _unregister_eager_task(self)
+ finally:
+ try:
+ curtask = _swap_current_task(self._loop, prev_task)
+ assert curtask is self
+ finally:
+ if self.done():
+ self._coro = None
+ self = None # Needed to break cycles when an exception occurs.
+ else:
+ _register_task(self)
+
def __step(self, exc=None):
if self.done():
raise exceptions.InvalidStateError(
@@ -258,11 +286,17 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
if not isinstance(exc, exceptions.CancelledError):
exc = self._make_cancelled_error()
self._must_cancel = False
- coro = self._coro
self._fut_waiter = None
_enter_task(self._loop, self)
- # Call either coro.throw(exc) or coro.send(None).
+ try:
+ self.__step_run_and_handle_result(exc)
+ finally:
+ _leave_task(self._loop, self)
+ self = None # Needed to break cycles when an exception occurs.
+
+ def __step_run_and_handle_result(self, exc):
+ coro = self._coro
try:
if exc is None:
# We use the `send` method directly, because coroutines
@@ -334,7 +368,6 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._loop.call_soon(
self.__step, new_exc, context=self._context)
finally:
- _leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.
def __wakeup(self, future):
@@ -897,8 +930,27 @@ def run_coroutine_threadsafe(coro, loop):
return future
-# WeakSet containing all alive tasks.
-_all_tasks = weakref.WeakSet()
+def create_eager_task_factory(custom_task_constructor):
+
+ if "eager_start" not in inspect.signature(custom_task_constructor).parameters:
+ raise TypeError(
+ "Provided constructor does not support eager task execution")
+
+ def factory(loop, coro, *, name=None, context=None):
+ return custom_task_constructor(
+ coro, loop=loop, name=name, context=context, eager_start=True)
+
+
+ return factory
+
+eager_task_factory = create_eager_task_factory(Task)
+
+
+# Collectively these two sets hold references to the complete set of active
+# tasks. Eagerly executed tasks use a faster regular set as an optimization
+# but may graduate to a WeakSet if the task blocks on IO.
+_scheduled_tasks = weakref.WeakSet()
+_eager_tasks = set()
# Dictionary containing tasks that are currently active in
# all running event loops. {EventLoop: Task}
@@ -906,8 +958,13 @@ _current_tasks = {}
def _register_task(task):
- """Register a new task in asyncio as executed by loop."""
- _all_tasks.add(task)
+ """Register an asyncio Task scheduled to run on an event loop."""
+ _scheduled_tasks.add(task)
+
+
+def _register_eager_task(task):
+ """Register an asyncio Task about to be eagerly executed."""
+ _eager_tasks.add(task)
def _enter_task(loop, task):
@@ -926,28 +983,49 @@ def _leave_task(loop, task):
del _current_tasks[loop]
+def _swap_current_task(loop, task):
+ prev_task = _current_tasks.get(loop)
+ if task is None:
+ del _current_tasks[loop]
+ else:
+ _current_tasks[loop] = task
+ return prev_task
+
+
def _unregister_task(task):
- """Unregister a task."""
- _all_tasks.discard(task)
+ """Unregister a completed, scheduled Task."""
+ _scheduled_tasks.discard(task)
+
+
+def _unregister_eager_task(task):
+ """Unregister a task which finished its first eager step."""
+ _eager_tasks.discard(task)
_py_current_task = current_task
_py_register_task = _register_task
+_py_register_eager_task = _register_eager_task
_py_unregister_task = _unregister_task
+_py_unregister_eager_task = _unregister_eager_task
_py_enter_task = _enter_task
_py_leave_task = _leave_task
+_py_swap_current_task = _swap_current_task
try:
- from _asyncio import (_register_task, _unregister_task,
- _enter_task, _leave_task,
- _all_tasks, _current_tasks,
+ from _asyncio import (_register_task, _register_eager_task,
+ _unregister_task, _unregister_eager_task,
+ _enter_task, _leave_task, _swap_current_task,
+ _scheduled_tasks, _eager_tasks, _current_tasks,
current_task)
except ImportError:
pass
else:
_c_current_task = current_task
_c_register_task = _register_task
+ _c_register_eager_task = _register_eager_task
_c_unregister_task = _unregister_task
+ _c_unregister_eager_task = _unregister_eager_task
_c_enter_task = _enter_task
_c_leave_task = _leave_task
+ _c_swap_current_task = _swap_current_task
diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py
new file mode 100644
index 0000000..fe69093
--- /dev/null
+++ b/Lib/test/test_asyncio/test_eager_task_factory.py
@@ -0,0 +1,344 @@
+"""Tests for base_events.py"""
+
+import asyncio
+import contextvars
+import gc
+import time
+import unittest
+
+from types import GenericAlias
+from unittest import mock
+from asyncio import base_events
+from asyncio import tasks
+from test.test_asyncio import utils as test_utils
+from test.test_asyncio.test_tasks import get_innermost_context
+from test import support
+
+MOCK_ANY = mock.ANY
+
+
+def tearDownModule():
+ asyncio.set_event_loop_policy(None)
+
+
+class EagerTaskFactoryLoopTests:
+
+ Task = None
+
+ def run_coro(self, coro):
+ """
+ Helper method to run the `coro` coroutine in the test event loop.
+ It helps with making sure the event loop is running before starting
+ to execute `coro`. This is important for testing the eager step
+ functionality, since an eager step is taken only if the event loop
+ is already running.
+ """
+
+ async def coro_runner():
+ self.assertTrue(asyncio.get_event_loop().is_running())
+ return await coro
+
+ return self.loop.run_until_complete(coro)
+
+ def setUp(self):
+ super().setUp()
+ self.loop = asyncio.new_event_loop()
+ self.eager_task_factory = asyncio.create_eager_task_factory(self.Task)
+ self.loop.set_task_factory(self.eager_task_factory)
+ self.set_event_loop(self.loop)
+
+ def test_eager_task_factory_set(self):
+ self.assertIsNotNone(self.eager_task_factory)
+ self.assertIs(self.loop.get_task_factory(), self.eager_task_factory)
+
+ async def noop(): pass
+
+ async def run():
+ t = self.loop.create_task(noop())
+ self.assertIsInstance(t, self.Task)
+ await t
+
+ self.run_coro(run())
+
+ def test_await_future_during_eager_step(self):
+
+ async def set_result(fut, val):
+ fut.set_result(val)
+
+ async def run():
+ fut = self.loop.create_future()
+ t = self.loop.create_task(set_result(fut, 'my message'))
+ # assert the eager step completed the task
+ self.assertTrue(t.done())
+ return await fut
+
+ self.assertEqual(self.run_coro(run()), 'my message')
+
+ def test_eager_completion(self):
+
+ async def coro():
+ return 'hello'
+
+ async def run():
+ t = self.loop.create_task(coro())
+ # assert the eager step completed the task
+ self.assertTrue(t.done())
+ return await t
+
+ self.assertEqual(self.run_coro(run()), 'hello')
+
+ def test_block_after_eager_step(self):
+
+ async def coro():
+ await asyncio.sleep(0.1)
+ return 'finished after blocking'
+
+ async def run():
+ t = self.loop.create_task(coro())
+ self.assertFalse(t.done())
+ result = await t
+ self.assertTrue(t.done())
+ return result
+
+ self.assertEqual(self.run_coro(run()), 'finished after blocking')
+
+ def test_cancellation_after_eager_completion(self):
+
+ async def coro():
+ return 'finished without blocking'
+
+ async def run():
+ t = self.loop.create_task(coro())
+ t.cancel()
+ result = await t
+ # finished task can't be cancelled
+ self.assertFalse(t.cancelled())
+ return result
+
+ self.assertEqual(self.run_coro(run()), 'finished without blocking')
+
+ def test_cancellation_after_eager_step_blocks(self):
+
+ async def coro():
+ await asyncio.sleep(0.1)
+ return 'finished after blocking'
+
+ async def run():
+ t = self.loop.create_task(coro())
+ t.cancel('cancellation message')
+ self.assertGreater(t.cancelling(), 0)
+ result = await t
+
+ with self.assertRaises(asyncio.CancelledError) as cm:
+ self.run_coro(run())
+
+ self.assertEqual('cancellation message', cm.exception.args[0])
+
+ def test_current_task(self):
+ captured_current_task = None
+
+ async def coro():
+ nonlocal captured_current_task
+ captured_current_task = asyncio.current_task()
+ # verify the task before and after blocking is identical
+ await asyncio.sleep(0.1)
+ self.assertIs(asyncio.current_task(), captured_current_task)
+
+ async def run():
+ t = self.loop.create_task(coro())
+ self.assertIs(captured_current_task, t)
+ await t
+
+ self.run_coro(run())
+ captured_current_task = None
+
+ def test_all_tasks_with_eager_completion(self):
+ captured_all_tasks = None
+
+ async def coro():
+ nonlocal captured_all_tasks
+ captured_all_tasks = asyncio.all_tasks()
+
+ async def run():
+ t = self.loop.create_task(coro())
+ self.assertIn(t, captured_all_tasks)
+ self.assertNotIn(t, asyncio.all_tasks())
+
+ self.run_coro(run())
+
+ def test_all_tasks_with_blocking(self):
+ captured_eager_all_tasks = None
+
+ async def coro(fut1, fut2):
+ nonlocal captured_eager_all_tasks
+ captured_eager_all_tasks = asyncio.all_tasks()
+ await fut1
+ fut2.set_result(None)
+
+ async def run():
+ fut1 = self.loop.create_future()
+ fut2 = self.loop.create_future()
+ t = self.loop.create_task(coro(fut1, fut2))
+ self.assertIn(t, captured_eager_all_tasks)
+ self.assertIn(t, asyncio.all_tasks())
+ fut1.set_result(None)
+ await fut2
+ self.assertNotIn(t, asyncio.all_tasks())
+
+ self.run_coro(run())
+
+ def test_context_vars(self):
+ cv = contextvars.ContextVar('cv', default=0)
+
+ coro_first_step_ran = False
+ coro_second_step_ran = False
+
+ async def coro():
+ nonlocal coro_first_step_ran
+ nonlocal coro_second_step_ran
+ self.assertEqual(cv.get(), 1)
+ cv.set(2)
+ self.assertEqual(cv.get(), 2)
+ coro_first_step_ran = True
+ await asyncio.sleep(0.1)
+ self.assertEqual(cv.get(), 2)
+ cv.set(3)
+ self.assertEqual(cv.get(), 3)
+ coro_second_step_ran = True
+
+ async def run():
+ cv.set(1)
+ t = self.loop.create_task(coro())
+ self.assertTrue(coro_first_step_ran)
+ self.assertFalse(coro_second_step_ran)
+ self.assertEqual(cv.get(), 1)
+ await t
+ self.assertTrue(coro_second_step_ran)
+ self.assertEqual(cv.get(), 1)
+
+ self.run_coro(run())
+
+
+class PyEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase):
+ Task = tasks._PyTask
+
+
+@unittest.skipUnless(hasattr(tasks, '_CTask'),
+ 'requires the C _asyncio module')
+class CEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase):
+ Task = getattr(tasks, '_CTask', None)
+
+
+class AsyncTaskCounter:
+ def __init__(self, loop, *, task_class, eager):
+ self.suspense_count = 0
+ self.task_count = 0
+
+ def CountingTask(*args, eager_start=False, **kwargs):
+ if not eager_start:
+ self.task_count += 1
+ kwargs["eager_start"] = eager_start
+ return task_class(*args, **kwargs)
+
+ if eager:
+ factory = asyncio.create_eager_task_factory(CountingTask)
+ else:
+ def factory(loop, coro, **kwargs):
+ return CountingTask(coro, loop=loop, **kwargs)
+ loop.set_task_factory(factory)
+
+ def get(self):
+ return self.task_count
+
+
+async def awaitable_chain(depth):
+ if depth == 0:
+ return 0
+ return 1 + await awaitable_chain(depth - 1)
+
+
+async def recursive_taskgroups(width, depth):
+ if depth == 0:
+ return
+
+ async with asyncio.TaskGroup() as tg:
+ futures = [
+ tg.create_task(recursive_taskgroups(width, depth - 1))
+ for _ in range(width)
+ ]
+
+
+async def recursive_gather(width, depth):
+ if depth == 0:
+ return
+
+ await asyncio.gather(
+ *[recursive_gather(width, depth - 1) for _ in range(width)]
+ )
+
+
+class BaseTaskCountingTests:
+
+ Task = None
+ eager = None
+ expected_task_count = None
+
+ def setUp(self):
+ super().setUp()
+ self.loop = asyncio.new_event_loop()
+ self.counter = AsyncTaskCounter(self.loop, task_class=self.Task, eager=self.eager)
+ self.set_event_loop(self.loop)
+
+ def test_awaitables_chain(self):
+ observed_depth = self.loop.run_until_complete(awaitable_chain(100))
+ self.assertEqual(observed_depth, 100)
+ self.assertEqual(self.counter.get(), 0 if self.eager else 1)
+
+ def test_recursive_taskgroups(self):
+ num_tasks = self.loop.run_until_complete(recursive_taskgroups(5, 4))
+ self.assertEqual(self.counter.get(), self.expected_task_count)
+
+ def test_recursive_gather(self):
+ self.loop.run_until_complete(recursive_gather(5, 4))
+ self.assertEqual(self.counter.get(), self.expected_task_count)
+
+
+class BaseNonEagerTaskFactoryTests(BaseTaskCountingTests):
+ eager = False
+ expected_task_count = 781 # 1 + 5 + 5^2 + 5^3 + 5^4
+
+
+class BaseEagerTaskFactoryTests(BaseTaskCountingTests):
+ eager = True
+ expected_task_count = 0
+
+
+class NonEagerTests(BaseNonEagerTaskFactoryTests, test_utils.TestCase):
+ Task = asyncio.Task
+
+
+class EagerTests(BaseEagerTaskFactoryTests, test_utils.TestCase):
+ Task = asyncio.Task
+
+
+class NonEagerPyTaskTests(BaseNonEagerTaskFactoryTests, test_utils.TestCase):
+ Task = tasks._PyTask
+
+
+class EagerPyTaskTests(BaseEagerTaskFactoryTests, test_utils.TestCase):
+ Task = tasks._PyTask
+
+
+@unittest.skipUnless(hasattr(tasks, '_CTask'),
+ 'requires the C _asyncio module')
+class NonEagerCTaskTests(BaseNonEagerTaskFactoryTests, test_utils.TestCase):
+ Task = getattr(tasks, '_CTask', None)
+
+
+@unittest.skipUnless(hasattr(tasks, '_CTask'),
+ 'requires the C _asyncio module')
+class EagerCTaskTests(BaseEagerTaskFactoryTests, test_utils.TestCase):
+ Task = getattr(tasks, '_CTask', None)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst b/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst
new file mode 100644
index 0000000..0b3854d
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst
@@ -0,0 +1,6 @@
+Implemented an eager task factory in asyncio.
+When used as a task factory on an event loop, it performs eager execution of
+coroutines. Coroutines that are able to complete synchronously (e.g. return or
+raise without blocking) are returned immediately as a finished task, and the
+task is never scheduled to the event loop. If the coroutine blocks, the
+(pending) task is scheduled and returned.
diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c
index 82dbc08..8b1a29b 100644
--- a/Modules/_asynciomodule.c
+++ b/Modules/_asynciomodule.c
@@ -8,6 +8,7 @@
#include "pycore_runtime_init.h" // _Py_ID()
#include "pycore_moduleobject.h" // _PyModule_GetState()
#include "structmember.h" // PyMemberDef
+#include "cpython/context.h"
#include <stddef.h> // offsetof()
@@ -31,8 +32,11 @@ typedef struct {
all running event loops. {EventLoop: Task} */
PyObject *current_tasks;
- /* WeakSet containing all alive tasks. */
- PyObject *all_tasks;
+ /* WeakSet containing all tasks scheduled to run on event loops. */
+ PyObject *scheduled_tasks;
+
+ /* Set containing all eagerly executing tasks. */
+ PyObject *eager_tasks;
/* An isinstance type cache for the 'is_coroutine()' function. */
PyObject *iscoroutine_typecache;
@@ -156,6 +160,9 @@ class _asyncio.Future "FutureObj *" "&Future_Type"
/* Get FutureIter from Future */
static PyObject * future_new_iter(PyObject *);
+static PyObject *
+task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *result);
+
static int
_is_coroutine(asyncio_state *state, PyObject *coro)
@@ -1830,6 +1837,7 @@ class _asyncio.Task "TaskObj *" "&Task_Type"
static int task_call_step_soon(asyncio_state *state, TaskObj *, PyObject *);
static PyObject * task_wakeup(TaskObj *, PyObject *);
static PyObject * task_step(asyncio_state *, TaskObj *, PyObject *);
+static int task_eager_start(asyncio_state *state, TaskObj *task);
/* ----- Task._step wrapper */
@@ -1940,7 +1948,7 @@ static PyMethodDef TaskWakeupDef = {
static int
register_task(asyncio_state *state, PyObject *task)
{
- PyObject *res = PyObject_CallMethodOneArg(state->all_tasks,
+ PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks,
&_Py_ID(add), task);
if (res == NULL) {
return -1;
@@ -1949,11 +1957,16 @@ register_task(asyncio_state *state, PyObject *task)
return 0;
}
+static int
+register_eager_task(asyncio_state *state, PyObject *task)
+{
+ return PySet_Add(state->eager_tasks, task);
+}
static int
unregister_task(asyncio_state *state, PyObject *task)
{
- PyObject *res = PyObject_CallMethodOneArg(state->all_tasks,
+ PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks,
&_Py_ID(discard), task);
if (res == NULL) {
return -1;
@@ -1962,6 +1975,11 @@ unregister_task(asyncio_state *state, PyObject *task)
return 0;
}
+static int
+unregister_eager_task(asyncio_state *state, PyObject *task)
+{
+ return PySet_Discard(state->eager_tasks, task);
+}
static int
enter_task(asyncio_state *state, PyObject *loop, PyObject *task)
@@ -2015,6 +2033,54 @@ leave_task(asyncio_state *state, PyObject *loop, PyObject *task)
return _PyDict_DelItem_KnownHash(state->current_tasks, loop, hash);
}
+static PyObject *
+swap_current_task(asyncio_state *state, PyObject *loop, PyObject *task)
+{
+ PyObject *prev_task;
+ Py_hash_t hash;
+ hash = PyObject_Hash(loop);
+ if (hash == -1) {
+ return NULL;
+ }
+
+ prev_task = _PyDict_GetItem_KnownHash(state->current_tasks, loop, hash);
+ if (prev_task == NULL) {
+ if (PyErr_Occurred()) {
+ return NULL;
+ }
+ prev_task = Py_None;
+ }
+
+ if (task == Py_None) {
+ if (_PyDict_DelItem_KnownHash(state->current_tasks, loop, hash) == -1) {
+ return NULL;
+ }
+ } else {
+ if (_PyDict_SetItem_KnownHash(state->current_tasks, loop, task, hash) == -1) {
+ return NULL;
+ }
+ }
+
+ Py_INCREF(prev_task);
+
+ return prev_task;
+}
+
+static int
+is_loop_running(PyObject *loop)
+{
+ PyObject *func = PyObject_GetAttr(loop, &_Py_ID(is_running));
+ if (func == NULL) {
+ PyErr_Format(PyExc_TypeError, "Loop missing is_running()");
+ return -1;
+ }
+ PyObject *res = PyObject_CallNoArgs(func);
+ int retval = Py_IsTrue(res);
+ Py_DECREF(func);
+ Py_DECREF(res);
+ return !!retval;
+}
+
/* ----- Task */
/*[clinic input]
@@ -2025,15 +2091,16 @@ _asyncio.Task.__init__
loop: object = None
name: object = None
context: object = None
+ eager_start: bool = False
A coroutine wrapped in a Future.
[clinic start generated code]*/
static int
_asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
- PyObject *name, PyObject *context)
-/*[clinic end generated code: output=49ac96fe33d0e5c7 input=924522490c8ce825]*/
-
+ PyObject *name, PyObject *context,
+ int eager_start)
+/*[clinic end generated code: output=7aced2d27836f1a1 input=18e3f113a51b829d]*/
{
if (future_init((FutureObj*)self, loop)) {
return -1;
@@ -2083,6 +2150,19 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
return -1;
}
+ if (eager_start) {
+ int loop_running = is_loop_running(self->task_loop);
+ if (loop_running == -1) {
+ return -1;
+ }
+ if (loop_running) {
+ if (task_eager_start(state, self)) {
+ return -1;
+ }
+ return 0;
+ }
+ }
+
if (task_call_step_soon(state, self, NULL)) {
return -1;
}
@@ -2831,6 +2911,20 @@ task_step_impl(asyncio_state *state, TaskObj *task, PyObject *exc)
Py_RETURN_NONE;
}
+ PyObject *ret = task_step_handle_result_impl(state, task, result);
+ return ret;
+
+fail:
+ return NULL;
+}
+
+
+static PyObject *
+task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *result)
+{
+ int res;
+ PyObject *o;
+
if (result == (PyObject*)task) {
/* We have a task that wants to await on itself */
goto self_await;
@@ -3062,6 +3156,65 @@ task_step(asyncio_state *state, TaskObj *task, PyObject *exc)
}
}
+static int
+task_eager_start(asyncio_state *state, TaskObj *task)
+{
+ assert(task != NULL);
+ PyObject *prevtask = swap_current_task(state, task->task_loop, (PyObject *)task);
+ if (prevtask == NULL) {
+ return -1;
+ }
+
+ if (register_eager_task(state, (PyObject *)task) == -1) {
+ Py_DECREF(prevtask);
+ return -1;
+ }
+
+ if (PyContext_Enter(task->task_context) == -1) {
+ Py_DECREF(prevtask);
+ return -1;
+ }
+
+ int retval = 0;
+
+ PyObject *stepres = task_step_impl(state, task, NULL);
+ if (stepres == NULL) {
+ PyObject *exc = PyErr_GetRaisedException();
+ _PyErr_ChainExceptions1(exc);
+ retval = -1;
+ } else {
+ Py_DECREF(stepres);
+ }
+
+ PyObject *curtask = swap_current_task(state, task->task_loop, prevtask);
+ Py_DECREF(prevtask);
+ if (curtask == NULL) {
+ retval = -1;
+ } else {
+ assert(curtask == (PyObject *)task);
+ Py_DECREF(curtask);
+ }
+
+ if (unregister_eager_task(state, (PyObject *)task) == -1) {
+ retval = -1;
+ }
+
+ if (PyContext_Exit(task->task_context) == -1) {
+ retval = -1;
+ }
+
+ if (task->task_state == STATE_PENDING) {
+ if (register_task(state, (PyObject *)task) == -1) {
+ retval = -1;
+ }
+ } else {
+ // This seems to really help performance on pyperformance benchmarks
+ Py_CLEAR(task->task_coro);
+ }
+
+ return retval;
+}
+
static PyObject *
task_wakeup(TaskObj *task, PyObject *o)
{
@@ -3225,6 +3378,27 @@ _asyncio__register_task_impl(PyObject *module, PyObject *task)
Py_RETURN_NONE;
}
+/*[clinic input]
+_asyncio._register_eager_task
+
+ task: object
+
+Register a new task in asyncio as executed by loop.
+
+Returns None.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__register_eager_task_impl(PyObject *module, PyObject *task)
+/*[clinic end generated code: output=dfe1d45367c73f1a input=237f684683398c51]*/
+{
+ asyncio_state *state = get_asyncio_state(module);
+ if (register_eager_task(state, task) < 0) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
/*[clinic input]
_asyncio._unregister_task
@@ -3247,6 +3421,27 @@ _asyncio__unregister_task_impl(PyObject *module, PyObject *task)
Py_RETURN_NONE;
}
+/*[clinic input]
+_asyncio._unregister_eager_task
+
+ task: object
+
+Unregister a task.
+
+Returns None.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__unregister_eager_task_impl(PyObject *module, PyObject *task)
+/*[clinic end generated code: output=a426922bd07f23d1 input=9d07401ef14ee048]*/
+{
+ asyncio_state *state = get_asyncio_state(module);
+ if (unregister_eager_task(state, task) < 0) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
/*[clinic input]
_asyncio._enter_task
@@ -3299,6 +3494,27 @@ _asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task)
/*[clinic input]
+_asyncio._swap_current_task
+
+ loop: object
+ task: object
+
+Temporarily swap in the supplied task and return the original one (or None).
+
+This is intended for use during eager coroutine execution.
+
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__swap_current_task_impl(PyObject *module, PyObject *loop,
+ PyObject *task)
+/*[clinic end generated code: output=9f88de958df74c7e input=c9c72208d3d38b6c]*/
+{
+ return swap_current_task(get_asyncio_state(module), loop, task);
+}
+
+
+/*[clinic input]
_asyncio.current_task
loop: object = None
@@ -3379,7 +3595,8 @@ module_traverse(PyObject *mod, visitproc visit, void *arg)
Py_VISIT(state->asyncio_InvalidStateError);
Py_VISIT(state->asyncio_CancelledError);
- Py_VISIT(state->all_tasks);
+ Py_VISIT(state->scheduled_tasks);
+ Py_VISIT(state->eager_tasks);
Py_VISIT(state->current_tasks);
Py_VISIT(state->iscoroutine_typecache);
@@ -3416,7 +3633,8 @@ module_clear(PyObject *mod)
Py_CLEAR(state->asyncio_InvalidStateError);
Py_CLEAR(state->asyncio_CancelledError);
- Py_CLEAR(state->all_tasks);
+ Py_CLEAR(state->scheduled_tasks);
+ Py_CLEAR(state->eager_tasks);
Py_CLEAR(state->current_tasks);
Py_CLEAR(state->iscoroutine_typecache);
@@ -3496,9 +3714,14 @@ module_init(asyncio_state *state)
PyObject *weak_set;
WITH_MOD("weakref")
GET_MOD_ATTR(weak_set, "WeakSet");
- state->all_tasks = PyObject_CallNoArgs(weak_set);
+ state->scheduled_tasks = PyObject_CallNoArgs(weak_set);
Py_CLEAR(weak_set);
- if (state->all_tasks == NULL) {
+ if (state->scheduled_tasks == NULL) {
+ goto fail;
+ }
+
+ state->eager_tasks = PySet_New(NULL);
+ if (state->eager_tasks == NULL) {
goto fail;
}
@@ -3522,9 +3745,12 @@ static PyMethodDef asyncio_methods[] = {
_ASYNCIO__GET_RUNNING_LOOP_METHODDEF
_ASYNCIO__SET_RUNNING_LOOP_METHODDEF
_ASYNCIO__REGISTER_TASK_METHODDEF
+ _ASYNCIO__REGISTER_EAGER_TASK_METHODDEF
_ASYNCIO__UNREGISTER_TASK_METHODDEF
+ _ASYNCIO__UNREGISTER_EAGER_TASK_METHODDEF
_ASYNCIO__ENTER_TASK_METHODDEF
_ASYNCIO__LEAVE_TASK_METHODDEF
+ _ASYNCIO__SWAP_CURRENT_TASK_METHODDEF
{NULL, NULL}
};
@@ -3561,7 +3787,11 @@ module_exec(PyObject *mod)
return -1;
}
- if (PyModule_AddObjectRef(mod, "_all_tasks", state->all_tasks) < 0) {
+ if (PyModule_AddObjectRef(mod, "_scheduled_tasks", state->scheduled_tasks) < 0) {
+ return -1;
+ }
+
+ if (PyModule_AddObjectRef(mod, "_eager_tasks", state->eager_tasks) < 0) {
return -1;
}
diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h
index 43c5d77..6a780a8 100644
--- a/Modules/clinic/_asynciomodule.c.h
+++ b/Modules/clinic/_asynciomodule.c.h
@@ -482,14 +482,15 @@ _asyncio_Future__make_cancelled_error(FutureObj *self, PyObject *Py_UNUSED(ignor
}
PyDoc_STRVAR(_asyncio_Task___init____doc__,
-"Task(coro, *, loop=None, name=None, context=None)\n"
+"Task(coro, *, loop=None, name=None, context=None, eager_start=False)\n"
"--\n"
"\n"
"A coroutine wrapped in a Future.");
static int
_asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
- PyObject *name, PyObject *context);
+ PyObject *name, PyObject *context,
+ int eager_start);
static int
_asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs)
@@ -497,14 +498,14 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs)
int return_value = -1;
#if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
- #define NUM_KEYWORDS 4
+ #define NUM_KEYWORDS 5
static struct {
PyGC_Head _this_is_not_used;
PyObject_VAR_HEAD
PyObject *ob_item[NUM_KEYWORDS];
} _kwtuple = {
.ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
- .ob_item = { &_Py_ID(coro), &_Py_ID(loop), &_Py_ID(name), &_Py_ID(context), },
+ .ob_item = { &_Py_ID(coro), &_Py_ID(loop), &_Py_ID(name), &_Py_ID(context), &_Py_ID(eager_start), },
};
#undef NUM_KEYWORDS
#define KWTUPLE (&_kwtuple.ob_base.ob_base)
@@ -513,14 +514,14 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs)
# define KWTUPLE NULL
#endif // !Py_BUILD_CORE
- static const char * const _keywords[] = {"coro", "loop", "name", "context", NULL};
+ static const char * const _keywords[] = {"coro", "loop", "name", "context", "eager_start", NULL};
static _PyArg_Parser _parser = {
.keywords = _keywords,
.fname = "Task",
.kwtuple = KWTUPLE,
};
#undef KWTUPLE
- PyObject *argsbuf[4];
+ PyObject *argsbuf[5];
PyObject * const *fastargs;
Py_ssize_t nargs = PyTuple_GET_SIZE(args);
Py_ssize_t noptargs = nargs + (kwargs ? PyDict_GET_SIZE(kwargs) : 0) - 1;
@@ -528,6 +529,7 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs)
PyObject *loop = Py_None;
PyObject *name = Py_None;
PyObject *context = Py_None;
+ int eager_start = 0;
fastargs = _PyArg_UnpackKeywords(_PyTuple_CAST(args)->ob_item, nargs, kwargs, NULL, &_parser, 1, 1, 0, argsbuf);
if (!fastargs) {
@@ -549,9 +551,18 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs)
goto skip_optional_kwonly;
}
}
- context = fastargs[3];
+ if (fastargs[3]) {
+ context = fastargs[3];
+ if (!--noptargs) {
+ goto skip_optional_kwonly;
+ }
+ }
+ eager_start = PyObject_IsTrue(fastargs[4]);
+ if (eager_start < 0) {
+ goto exit;
+ }
skip_optional_kwonly:
- return_value = _asyncio_Task___init___impl((TaskObj *)self, coro, loop, name, context);
+ return_value = _asyncio_Task___init___impl((TaskObj *)self, coro, loop, name, context, eager_start);
exit:
return return_value;
@@ -1064,6 +1075,63 @@ exit:
return return_value;
}
+PyDoc_STRVAR(_asyncio__register_eager_task__doc__,
+"_register_eager_task($module, /, task)\n"
+"--\n"
+"\n"
+"Register a new task in asyncio as executed by loop.\n"
+"\n"
+"Returns None.");
+
+#define _ASYNCIO__REGISTER_EAGER_TASK_METHODDEF \
+ {"_register_eager_task", _PyCFunction_CAST(_asyncio__register_eager_task), METH_FASTCALL|METH_KEYWORDS, _asyncio__register_eager_task__doc__},
+
+static PyObject *
+_asyncio__register_eager_task_impl(PyObject *module, PyObject *task);
+
+static PyObject *
+_asyncio__register_eager_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+{
+ PyObject *return_value = NULL;
+ #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
+
+ #define NUM_KEYWORDS 1
+ static struct {
+ PyGC_Head _this_is_not_used;
+ PyObject_VAR_HEAD
+ PyObject *ob_item[NUM_KEYWORDS];
+ } _kwtuple = {
+ .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
+ .ob_item = { &_Py_ID(task), },
+ };
+ #undef NUM_KEYWORDS
+ #define KWTUPLE (&_kwtuple.ob_base.ob_base)
+
+ #else // !Py_BUILD_CORE
+ # define KWTUPLE NULL
+ #endif // !Py_BUILD_CORE
+
+ static const char * const _keywords[] = {"task", NULL};
+ static _PyArg_Parser _parser = {
+ .keywords = _keywords,
+ .fname = "_register_eager_task",
+ .kwtuple = KWTUPLE,
+ };
+ #undef KWTUPLE
+ PyObject *argsbuf[1];
+ PyObject *task;
+
+ args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf);
+ if (!args) {
+ goto exit;
+ }
+ task = args[0];
+ return_value = _asyncio__register_eager_task_impl(module, task);
+
+exit:
+ return return_value;
+}
+
PyDoc_STRVAR(_asyncio__unregister_task__doc__,
"_unregister_task($module, /, task)\n"
"--\n"
@@ -1121,6 +1189,63 @@ exit:
return return_value;
}
+PyDoc_STRVAR(_asyncio__unregister_eager_task__doc__,
+"_unregister_eager_task($module, /, task)\n"
+"--\n"
+"\n"
+"Unregister a task.\n"
+"\n"
+"Returns None.");
+
+#define _ASYNCIO__UNREGISTER_EAGER_TASK_METHODDEF \
+ {"_unregister_eager_task", _PyCFunction_CAST(_asyncio__unregister_eager_task), METH_FASTCALL|METH_KEYWORDS, _asyncio__unregister_eager_task__doc__},
+
+static PyObject *
+_asyncio__unregister_eager_task_impl(PyObject *module, PyObject *task);
+
+static PyObject *
+_asyncio__unregister_eager_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+{
+ PyObject *return_value = NULL;
+ #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
+
+ #define NUM_KEYWORDS 1
+ static struct {
+ PyGC_Head _this_is_not_used;
+ PyObject_VAR_HEAD
+ PyObject *ob_item[NUM_KEYWORDS];
+ } _kwtuple = {
+ .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
+ .ob_item = { &_Py_ID(task), },
+ };
+ #undef NUM_KEYWORDS
+ #define KWTUPLE (&_kwtuple.ob_base.ob_base)
+
+ #else // !Py_BUILD_CORE
+ # define KWTUPLE NULL
+ #endif // !Py_BUILD_CORE
+
+ static const char * const _keywords[] = {"task", NULL};
+ static _PyArg_Parser _parser = {
+ .keywords = _keywords,
+ .fname = "_unregister_eager_task",
+ .kwtuple = KWTUPLE,
+ };
+ #undef KWTUPLE
+ PyObject *argsbuf[1];
+ PyObject *task;
+
+ args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf);
+ if (!args) {
+ goto exit;
+ }
+ task = args[0];
+ return_value = _asyncio__unregister_eager_task_impl(module, task);
+
+exit:
+ return return_value;
+}
+
PyDoc_STRVAR(_asyncio__enter_task__doc__,
"_enter_task($module, /, loop, task)\n"
"--\n"
@@ -1243,6 +1368,66 @@ exit:
return return_value;
}
+PyDoc_STRVAR(_asyncio__swap_current_task__doc__,
+"_swap_current_task($module, /, loop, task)\n"
+"--\n"
+"\n"
+"Temporarily swap in the supplied task and return the original one (or None).\n"
+"\n"
+"This is intended for use during eager coroutine execution.");
+
+#define _ASYNCIO__SWAP_CURRENT_TASK_METHODDEF \
+ {"_swap_current_task", _PyCFunction_CAST(_asyncio__swap_current_task), METH_FASTCALL|METH_KEYWORDS, _asyncio__swap_current_task__doc__},
+
+static PyObject *
+_asyncio__swap_current_task_impl(PyObject *module, PyObject *loop,
+ PyObject *task);
+
+static PyObject *
+_asyncio__swap_current_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+{
+ PyObject *return_value = NULL;
+ #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
+
+ #define NUM_KEYWORDS 2
+ static struct {
+ PyGC_Head _this_is_not_used;
+ PyObject_VAR_HEAD
+ PyObject *ob_item[NUM_KEYWORDS];
+ } _kwtuple = {
+ .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
+ .ob_item = { &_Py_ID(loop), &_Py_ID(task), },
+ };
+ #undef NUM_KEYWORDS
+ #define KWTUPLE (&_kwtuple.ob_base.ob_base)
+
+ #else // !Py_BUILD_CORE
+ # define KWTUPLE NULL
+ #endif // !Py_BUILD_CORE
+
+ static const char * const _keywords[] = {"loop", "task", NULL};
+ static _PyArg_Parser _parser = {
+ .keywords = _keywords,
+ .fname = "_swap_current_task",
+ .kwtuple = KWTUPLE,
+ };
+ #undef KWTUPLE
+ PyObject *argsbuf[2];
+ PyObject *loop;
+ PyObject *task;
+
+ args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 2, 2, 0, argsbuf);
+ if (!args) {
+ goto exit;
+ }
+ loop = args[0];
+ task = args[1];
+ return_value = _asyncio__swap_current_task_impl(module, loop, task);
+
+exit:
+ return return_value;
+}
+
PyDoc_STRVAR(_asyncio_current_task__doc__,
"current_task($module, /, loop=None)\n"
"--\n"
@@ -1302,4 +1487,4 @@ skip_optional_pos:
exit:
return return_value;
}
-/*[clinic end generated code: output=00f494214f2fd008 input=a9049054013a1b77]*/
+/*[clinic end generated code: output=6b0e283177b07639 input=a9049054013a1b77]*/