diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2017-12-16 19:58:38 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-16 19:58:38 (GMT) |
commit | 44d1a5912ea629aa20fdc377a5ab69d9ccf75d61 (patch) | |
tree | 8634c0010adf1de08980dd9d47043a40eb904120 /Lib/asyncio/tasks.py | |
parent | 950840261c349e100ec5d7381fcd742c017e242d (diff) | |
download | cpython-44d1a5912ea629aa20fdc377a5ab69d9ccf75d61.zip cpython-44d1a5912ea629aa20fdc377a5ab69d9ccf75d61.tar.gz cpython-44d1a5912ea629aa20fdc377a5ab69d9ccf75d61.tar.bz2 |
bpo-32250: Implement asyncio.current_task() and asyncio.all_tasks() (#4799)
Diffstat (limited to 'Lib/asyncio/tasks.py')
-rw-r--r-- | Lib/asyncio/tasks.py | 101 |
1 files changed, 87 insertions, 14 deletions
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 172057e..cdb483a 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -5,6 +5,8 @@ __all__ = ( 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 'wait', 'wait_for', 'as_completed', 'sleep', 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', + 'current_task', 'all_tasks', + '_register_task', '_unregister_task', '_enter_task', '_leave_task', ) import concurrent.futures @@ -21,6 +23,20 @@ from . import futures from .coroutines import coroutine +def current_task(loop=None): + """Return a currently executed task.""" + if loop is None: + loop = events.get_running_loop() + return _current_tasks.get(loop) + + +def all_tasks(loop=None): + """Return a set of all tasks for the loop.""" + if loop is None: + loop = events.get_event_loop() + return {t for t, l in _all_tasks.items() if l is loop} + + class Task(futures.Future): """A coroutine wrapped in a Future.""" @@ -33,13 +49,6 @@ class Task(futures.Future): # _wakeup(). When _fut_waiter is not None, one of its callbacks # must be _wakeup(). - # Weak set containing all tasks alive. - _all_tasks = weakref.WeakSet() - - # Dictionary containing tasks that are currently active in - # all running event loops. {EventLoop: Task} - _current_tasks = {} - # If False, don't log a message if the task is destroyed whereas its # status is still pending _log_destroy_pending = True @@ -52,9 +61,13 @@ class Task(futures.Future): None is returned when called not in the context of a Task. """ + warnings.warn("Task.current_task() is deprecated, " + "use asyncio.current_task() instead", + PendingDeprecationWarning, + stacklevel=2) if loop is None: loop = events.get_event_loop() - return cls._current_tasks.get(loop) + return current_task(loop) @classmethod def all_tasks(cls, loop=None): @@ -62,9 +75,11 @@ class Task(futures.Future): By default all tasks for the current event loop are returned. """ - if loop is None: - loop = events.get_event_loop() - return {t for t in cls._all_tasks if t._loop is loop} + warnings.warn("Task.all_tasks() is deprecated, " + "use asyncio.all_tasks() instead", + PendingDeprecationWarning, + stacklevel=2) + return all_tasks(loop) def __init__(self, coro, *, loop=None): super().__init__(loop=loop) @@ -81,7 +96,7 @@ class Task(futures.Future): self._coro = coro self._loop.call_soon(self._step) - self.__class__._all_tasks.add(self) + _register_task(self._loop, self) def __del__(self): if self._state == futures._PENDING and self._log_destroy_pending: @@ -173,7 +188,7 @@ class Task(futures.Future): coro = self._coro self._fut_waiter = None - self.__class__._current_tasks[self._loop] = self + _enter_task(self._loop, self) # Call either coro.throw(exc) or coro.send(None). try: if exc is None: @@ -237,7 +252,7 @@ class Task(futures.Future): new_exc = RuntimeError(f'Task got bad yield: {result!r}') self._loop.call_soon(self._step, new_exc) finally: - self.__class__._current_tasks.pop(self._loop) + _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs. def _wakeup(self, future): @@ -715,3 +730,61 @@ def run_coroutine_threadsafe(coro, loop): loop.call_soon_threadsafe(callback) return future + + +# WeakKeyDictionary of {Task: EventLoop} containing all tasks alive. +# Task should be a weak reference to remove entry on task garbage +# collection, EventLoop is required +# to not access to private task._loop attribute. +_all_tasks = weakref.WeakKeyDictionary() + +# Dictionary containing tasks that are currently active in +# all running event loops. {EventLoop: Task} +_current_tasks = {} + + +def _register_task(loop, task): + """Register a new task in asyncio as executed by loop. + + Returns None. + """ + _all_tasks[task] = loop + + +def _enter_task(loop, task): + current_task = _current_tasks.get(loop) + if current_task is not None: + raise RuntimeError(f"Cannot enter into task {task!r} while another " + f"task {current_task!r} is being executed.") + _current_tasks[loop] = task + + +def _leave_task(loop, task): + current_task = _current_tasks.get(loop) + if current_task is not task: + raise RuntimeError(f"Leaving task {task!r} does not match " + f"the current task {current_task!r}.") + del _current_tasks[loop] + + +def _unregister_task(loop, task): + _all_tasks.pop(task, None) + + +_py_register_task = _register_task +_py_unregister_task = _unregister_task +_py_enter_task = _enter_task +_py_leave_task = _leave_task + + +try: + from _asyncio import (_register_task, _unregister_task, + _enter_task, _leave_task, + _all_tasks, _current_tasks) +except ImportError: + pass +else: + _c_register_task = _register_task + _c_unregister_task = _unregister_task + _c_enter_task = _enter_task + _c_leave_task = _leave_task |