summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
authorItamar Ostricher <itamarost@gmail.com>2023-05-01 21:10:13 (GMT)
committerGitHub <noreply@github.com>2023-05-01 21:10:13 (GMT)
commita474e04388c2ef6aca75c26cb70a1b6200235feb (patch)
tree43520d5ad16016620f149dc1e84d4d57e45051d5 /Lib/asyncio
parent59bc36aacddd5a3acd32c80c0dfd0726135a7817 (diff)
downloadcpython-a474e04388c2ef6aca75c26cb70a1b6200235feb.zip
cpython-a474e04388c2ef6aca75c26cb70a1b6200235feb.tar.gz
cpython-a474e04388c2ef6aca75c26cb70a1b6200235feb.tar.bz2
gh-97696: asyncio eager tasks factory (#102853)
Co-authored-by: Jacob Bower <jbower@meta.com> Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/base_tasks.py10
-rw-r--r--Lib/asyncio/tasks.py122
2 files changed, 106 insertions, 26 deletions
diff --git a/Lib/asyncio/base_tasks.py b/Lib/asyncio/base_tasks.py
index 26298e6..c907b68 100644
--- a/Lib/asyncio/base_tasks.py
+++ b/Lib/asyncio/base_tasks.py
@@ -15,11 +15,13 @@ def _task_repr_info(task):
info.insert(1, 'name=%r' % task.get_name())
- coro = coroutines._format_coroutine(task._coro)
- info.insert(2, f'coro=<{coro}>')
-
if task._fut_waiter is not None:
- info.insert(3, f'wait_for={task._fut_waiter!r}')
+ info.insert(2, f'wait_for={task._fut_waiter!r}')
+
+ if task._coro:
+ coro = coroutines._format_coroutine(task._coro)
+ info.insert(2, f'coro=<{coro}>')
+
return info
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index c90d32c..aa5269a 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -6,6 +6,7 @@ __all__ = (
'wait', 'wait_for', 'as_completed', 'sleep',
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
'current_task', 'all_tasks',
+ 'create_eager_task_factory', 'eager_task_factory',
'_register_task', '_unregister_task', '_enter_task', '_leave_task',
)
@@ -43,22 +44,26 @@ def all_tasks(loop=None):
"""Return a set of all tasks for the loop."""
if loop is None:
loop = events.get_running_loop()
- # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
- # thread while we do so. Therefore we cast it to list prior to filtering. The list
- # cast itself requires iteration, so we repeat it several times ignoring
- # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
- # details.
+ # capturing the set of eager tasks first, so if an eager task "graduates"
+ # to a regular task in another thread, we don't risk missing it.
+ eager_tasks = list(_eager_tasks)
+ # Looping over the WeakSet isn't safe as it can be updated from another
+ # thread, therefore we cast it to list prior to filtering. The list cast
+ # itself requires iteration, so we repeat it several times ignoring
+ # RuntimeErrors (which are not very likely to occur).
+ # See issues 34970 and 36607 for details.
+ scheduled_tasks = None
i = 0
while True:
try:
- tasks = list(_all_tasks)
+ scheduled_tasks = list(_scheduled_tasks)
except RuntimeError:
i += 1
if i >= 1000:
raise
else:
break
- return {t for t in tasks
+ return {t for t in itertools.chain(scheduled_tasks, eager_tasks)
if futures._get_loop(t) is loop and not t.done()}
@@ -93,7 +98,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
# status is still pending
_log_destroy_pending = True
- def __init__(self, coro, *, loop=None, name=None, context=None):
+ def __init__(self, coro, *, loop=None, name=None, context=None,
+ eager_start=False):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
@@ -117,8 +123,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
else:
self._context = context
- self._loop.call_soon(self.__step, context=self._context)
- _register_task(self)
+ if eager_start and self._loop.is_running():
+ self.__eager_start()
+ else:
+ self._loop.call_soon(self.__step, context=self._context)
+ _register_task(self)
def __del__(self):
if self._state == futures._PENDING and self._log_destroy_pending:
@@ -250,6 +259,25 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._num_cancels_requested -= 1
return self._num_cancels_requested
+ def __eager_start(self):
+ prev_task = _swap_current_task(self._loop, self)
+ try:
+ _register_eager_task(self)
+ try:
+ self._context.run(self.__step_run_and_handle_result, None)
+ finally:
+ _unregister_eager_task(self)
+ finally:
+ try:
+ curtask = _swap_current_task(self._loop, prev_task)
+ assert curtask is self
+ finally:
+ if self.done():
+ self._coro = None
+ self = None # Needed to break cycles when an exception occurs.
+ else:
+ _register_task(self)
+
def __step(self, exc=None):
if self.done():
raise exceptions.InvalidStateError(
@@ -258,11 +286,17 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
if not isinstance(exc, exceptions.CancelledError):
exc = self._make_cancelled_error()
self._must_cancel = False
- coro = self._coro
self._fut_waiter = None
_enter_task(self._loop, self)
- # Call either coro.throw(exc) or coro.send(None).
+ try:
+ self.__step_run_and_handle_result(exc)
+ finally:
+ _leave_task(self._loop, self)
+ self = None # Needed to break cycles when an exception occurs.
+
+ def __step_run_and_handle_result(self, exc):
+ coro = self._coro
try:
if exc is None:
# We use the `send` method directly, because coroutines
@@ -334,7 +368,6 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._loop.call_soon(
self.__step, new_exc, context=self._context)
finally:
- _leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.
def __wakeup(self, future):
@@ -897,8 +930,27 @@ def run_coroutine_threadsafe(coro, loop):
return future
-# WeakSet containing all alive tasks.
-_all_tasks = weakref.WeakSet()
+def create_eager_task_factory(custom_task_constructor):
+
+ if "eager_start" not in inspect.signature(custom_task_constructor).parameters:
+ raise TypeError(
+ "Provided constructor does not support eager task execution")
+
+ def factory(loop, coro, *, name=None, context=None):
+ return custom_task_constructor(
+ coro, loop=loop, name=name, context=context, eager_start=True)
+
+
+ return factory
+
+eager_task_factory = create_eager_task_factory(Task)
+
+
+# Collectively these two sets hold references to the complete set of active
+# tasks. Eagerly executed tasks use a faster regular set as an optimization
+# but may graduate to a WeakSet if the task blocks on IO.
+_scheduled_tasks = weakref.WeakSet()
+_eager_tasks = set()
# Dictionary containing tasks that are currently active in
# all running event loops. {EventLoop: Task}
@@ -906,8 +958,13 @@ _current_tasks = {}
def _register_task(task):
- """Register a new task in asyncio as executed by loop."""
- _all_tasks.add(task)
+ """Register an asyncio Task scheduled to run on an event loop."""
+ _scheduled_tasks.add(task)
+
+
+def _register_eager_task(task):
+ """Register an asyncio Task about to be eagerly executed."""
+ _eager_tasks.add(task)
def _enter_task(loop, task):
@@ -926,28 +983,49 @@ def _leave_task(loop, task):
del _current_tasks[loop]
+def _swap_current_task(loop, task):
+ prev_task = _current_tasks.get(loop)
+ if task is None:
+ del _current_tasks[loop]
+ else:
+ _current_tasks[loop] = task
+ return prev_task
+
+
def _unregister_task(task):
- """Unregister a task."""
- _all_tasks.discard(task)
+ """Unregister a completed, scheduled Task."""
+ _scheduled_tasks.discard(task)
+
+
+def _unregister_eager_task(task):
+ """Unregister a task which finished its first eager step."""
+ _eager_tasks.discard(task)
_py_current_task = current_task
_py_register_task = _register_task
+_py_register_eager_task = _register_eager_task
_py_unregister_task = _unregister_task
+_py_unregister_eager_task = _unregister_eager_task
_py_enter_task = _enter_task
_py_leave_task = _leave_task
+_py_swap_current_task = _swap_current_task
try:
- from _asyncio import (_register_task, _unregister_task,
- _enter_task, _leave_task,
- _all_tasks, _current_tasks,
+ from _asyncio import (_register_task, _register_eager_task,
+ _unregister_task, _unregister_eager_task,
+ _enter_task, _leave_task, _swap_current_task,
+ _scheduled_tasks, _eager_tasks, _current_tasks,
current_task)
except ImportError:
pass
else:
_c_current_task = current_task
_c_register_task = _register_task
+ _c_register_eager_task = _register_eager_task
_c_unregister_task = _unregister_task
+ _c_unregister_eager_task = _unregister_eager_task
_c_enter_task = _enter_task
_c_leave_task = _leave_task
+ _c_swap_current_task = _swap_current_task