diff options
author | Itamar Ostricher <itamarost@gmail.com> | 2023-05-01 21:10:13 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-01 21:10:13 (GMT) |
commit | a474e04388c2ef6aca75c26cb70a1b6200235feb (patch) | |
tree | 43520d5ad16016620f149dc1e84d4d57e45051d5 /Lib/asyncio | |
parent | 59bc36aacddd5a3acd32c80c0dfd0726135a7817 (diff) | |
download | cpython-a474e04388c2ef6aca75c26cb70a1b6200235feb.zip cpython-a474e04388c2ef6aca75c26cb70a1b6200235feb.tar.gz cpython-a474e04388c2ef6aca75c26cb70a1b6200235feb.tar.bz2 |
gh-97696: asyncio eager tasks factory (#102853)
Co-authored-by: Jacob Bower <jbower@meta.com>
Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/base_tasks.py | 10 | ||||
-rw-r--r-- | Lib/asyncio/tasks.py | 122 |
2 files changed, 106 insertions, 26 deletions
diff --git a/Lib/asyncio/base_tasks.py b/Lib/asyncio/base_tasks.py index 26298e6..c907b68 100644 --- a/Lib/asyncio/base_tasks.py +++ b/Lib/asyncio/base_tasks.py @@ -15,11 +15,13 @@ def _task_repr_info(task): info.insert(1, 'name=%r' % task.get_name()) - coro = coroutines._format_coroutine(task._coro) - info.insert(2, f'coro=<{coro}>') - if task._fut_waiter is not None: - info.insert(3, f'wait_for={task._fut_waiter!r}') + info.insert(2, f'wait_for={task._fut_waiter!r}') + + if task._coro: + coro = coroutines._format_coroutine(task._coro) + info.insert(2, f'coro=<{coro}>') + return info diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index c90d32c..aa5269a 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -6,6 +6,7 @@ __all__ = ( 'wait', 'wait_for', 'as_completed', 'sleep', 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 'current_task', 'all_tasks', + 'create_eager_task_factory', 'eager_task_factory', '_register_task', '_unregister_task', '_enter_task', '_leave_task', ) @@ -43,22 +44,26 @@ def all_tasks(loop=None): """Return a set of all tasks for the loop.""" if loop is None: loop = events.get_running_loop() - # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another - # thread while we do so. Therefore we cast it to list prior to filtering. The list - # cast itself requires iteration, so we repeat it several times ignoring - # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for - # details. + # capturing the set of eager tasks first, so if an eager task "graduates" + # to a regular task in another thread, we don't risk missing it. + eager_tasks = list(_eager_tasks) + # Looping over the WeakSet isn't safe as it can be updated from another + # thread, therefore we cast it to list prior to filtering. The list cast + # itself requires iteration, so we repeat it several times ignoring + # RuntimeErrors (which are not very likely to occur). + # See issues 34970 and 36607 for details. + scheduled_tasks = None i = 0 while True: try: - tasks = list(_all_tasks) + scheduled_tasks = list(_scheduled_tasks) except RuntimeError: i += 1 if i >= 1000: raise else: break - return {t for t in tasks + return {t for t in itertools.chain(scheduled_tasks, eager_tasks) if futures._get_loop(t) is loop and not t.done()} @@ -93,7 +98,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation # status is still pending _log_destroy_pending = True - def __init__(self, coro, *, loop=None, name=None, context=None): + def __init__(self, coro, *, loop=None, name=None, context=None, + eager_start=False): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] @@ -117,8 +123,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation else: self._context = context - self._loop.call_soon(self.__step, context=self._context) - _register_task(self) + if eager_start and self._loop.is_running(): + self.__eager_start() + else: + self._loop.call_soon(self.__step, context=self._context) + _register_task(self) def __del__(self): if self._state == futures._PENDING and self._log_destroy_pending: @@ -250,6 +259,25 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self._num_cancels_requested -= 1 return self._num_cancels_requested + def __eager_start(self): + prev_task = _swap_current_task(self._loop, self) + try: + _register_eager_task(self) + try: + self._context.run(self.__step_run_and_handle_result, None) + finally: + _unregister_eager_task(self) + finally: + try: + curtask = _swap_current_task(self._loop, prev_task) + assert curtask is self + finally: + if self.done(): + self._coro = None + self = None # Needed to break cycles when an exception occurs. + else: + _register_task(self) + def __step(self, exc=None): if self.done(): raise exceptions.InvalidStateError( @@ -258,11 +286,17 @@ class Task(futures._PyFuture): # Inherit Python Task implementation if not isinstance(exc, exceptions.CancelledError): exc = self._make_cancelled_error() self._must_cancel = False - coro = self._coro self._fut_waiter = None _enter_task(self._loop, self) - # Call either coro.throw(exc) or coro.send(None). + try: + self.__step_run_and_handle_result(exc) + finally: + _leave_task(self._loop, self) + self = None # Needed to break cycles when an exception occurs. + + def __step_run_and_handle_result(self, exc): + coro = self._coro try: if exc is None: # We use the `send` method directly, because coroutines @@ -334,7 +368,6 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self._loop.call_soon( self.__step, new_exc, context=self._context) finally: - _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): @@ -897,8 +930,27 @@ def run_coroutine_threadsafe(coro, loop): return future -# WeakSet containing all alive tasks. -_all_tasks = weakref.WeakSet() +def create_eager_task_factory(custom_task_constructor): + + if "eager_start" not in inspect.signature(custom_task_constructor).parameters: + raise TypeError( + "Provided constructor does not support eager task execution") + + def factory(loop, coro, *, name=None, context=None): + return custom_task_constructor( + coro, loop=loop, name=name, context=context, eager_start=True) + + + return factory + +eager_task_factory = create_eager_task_factory(Task) + + +# Collectively these two sets hold references to the complete set of active +# tasks. Eagerly executed tasks use a faster regular set as an optimization +# but may graduate to a WeakSet if the task blocks on IO. +_scheduled_tasks = weakref.WeakSet() +_eager_tasks = set() # Dictionary containing tasks that are currently active in # all running event loops. {EventLoop: Task} @@ -906,8 +958,13 @@ _current_tasks = {} def _register_task(task): - """Register a new task in asyncio as executed by loop.""" - _all_tasks.add(task) + """Register an asyncio Task scheduled to run on an event loop.""" + _scheduled_tasks.add(task) + + +def _register_eager_task(task): + """Register an asyncio Task about to be eagerly executed.""" + _eager_tasks.add(task) def _enter_task(loop, task): @@ -926,28 +983,49 @@ def _leave_task(loop, task): del _current_tasks[loop] +def _swap_current_task(loop, task): + prev_task = _current_tasks.get(loop) + if task is None: + del _current_tasks[loop] + else: + _current_tasks[loop] = task + return prev_task + + def _unregister_task(task): - """Unregister a task.""" - _all_tasks.discard(task) + """Unregister a completed, scheduled Task.""" + _scheduled_tasks.discard(task) + + +def _unregister_eager_task(task): + """Unregister a task which finished its first eager step.""" + _eager_tasks.discard(task) _py_current_task = current_task _py_register_task = _register_task +_py_register_eager_task = _register_eager_task _py_unregister_task = _unregister_task +_py_unregister_eager_task = _unregister_eager_task _py_enter_task = _enter_task _py_leave_task = _leave_task +_py_swap_current_task = _swap_current_task try: - from _asyncio import (_register_task, _unregister_task, - _enter_task, _leave_task, - _all_tasks, _current_tasks, + from _asyncio import (_register_task, _register_eager_task, + _unregister_task, _unregister_eager_task, + _enter_task, _leave_task, _swap_current_task, + _scheduled_tasks, _eager_tasks, _current_tasks, current_task) except ImportError: pass else: _c_current_task = current_task _c_register_task = _register_task + _c_register_eager_task = _register_eager_task _c_unregister_task = _unregister_task + _c_unregister_eager_task = _unregister_eager_task _c_enter_task = _enter_task _c_leave_task = _leave_task + _c_swap_current_task = _swap_current_task |