summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/tasks.py
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2017-12-16 19:58:38 (GMT)
committerGitHub <noreply@github.com>2017-12-16 19:58:38 (GMT)
commit44d1a5912ea629aa20fdc377a5ab69d9ccf75d61 (patch)
tree8634c0010adf1de08980dd9d47043a40eb904120 /Lib/asyncio/tasks.py
parent950840261c349e100ec5d7381fcd742c017e242d (diff)
downloadcpython-44d1a5912ea629aa20fdc377a5ab69d9ccf75d61.zip
cpython-44d1a5912ea629aa20fdc377a5ab69d9ccf75d61.tar.gz
cpython-44d1a5912ea629aa20fdc377a5ab69d9ccf75d61.tar.bz2
bpo-32250: Implement asyncio.current_task() and asyncio.all_tasks() (#4799)
Diffstat (limited to 'Lib/asyncio/tasks.py')
-rw-r--r--Lib/asyncio/tasks.py101
1 files changed, 87 insertions, 14 deletions
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 172057e..cdb483a 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -5,6 +5,8 @@ __all__ = (
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
'wait', 'wait_for', 'as_completed', 'sleep',
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
+ 'current_task', 'all_tasks',
+ '_register_task', '_unregister_task', '_enter_task', '_leave_task',
)
import concurrent.futures
@@ -21,6 +23,20 @@ from . import futures
from .coroutines import coroutine
+def current_task(loop=None):
+ """Return a currently executed task."""
+ if loop is None:
+ loop = events.get_running_loop()
+ return _current_tasks.get(loop)
+
+
+def all_tasks(loop=None):
+ """Return a set of all tasks for the loop."""
+ if loop is None:
+ loop = events.get_event_loop()
+ return {t for t, l in _all_tasks.items() if l is loop}
+
+
class Task(futures.Future):
"""A coroutine wrapped in a Future."""
@@ -33,13 +49,6 @@ class Task(futures.Future):
# _wakeup(). When _fut_waiter is not None, one of its callbacks
# must be _wakeup().
- # Weak set containing all tasks alive.
- _all_tasks = weakref.WeakSet()
-
- # Dictionary containing tasks that are currently active in
- # all running event loops. {EventLoop: Task}
- _current_tasks = {}
-
# If False, don't log a message if the task is destroyed whereas its
# status is still pending
_log_destroy_pending = True
@@ -52,9 +61,13 @@ class Task(futures.Future):
None is returned when called not in the context of a Task.
"""
+ warnings.warn("Task.current_task() is deprecated, "
+ "use asyncio.current_task() instead",
+ PendingDeprecationWarning,
+ stacklevel=2)
if loop is None:
loop = events.get_event_loop()
- return cls._current_tasks.get(loop)
+ return current_task(loop)
@classmethod
def all_tasks(cls, loop=None):
@@ -62,9 +75,11 @@ class Task(futures.Future):
By default all tasks for the current event loop are returned.
"""
- if loop is None:
- loop = events.get_event_loop()
- return {t for t in cls._all_tasks if t._loop is loop}
+ warnings.warn("Task.all_tasks() is deprecated, "
+ "use asyncio.all_tasks() instead",
+ PendingDeprecationWarning,
+ stacklevel=2)
+ return all_tasks(loop)
def __init__(self, coro, *, loop=None):
super().__init__(loop=loop)
@@ -81,7 +96,7 @@ class Task(futures.Future):
self._coro = coro
self._loop.call_soon(self._step)
- self.__class__._all_tasks.add(self)
+ _register_task(self._loop, self)
def __del__(self):
if self._state == futures._PENDING and self._log_destroy_pending:
@@ -173,7 +188,7 @@ class Task(futures.Future):
coro = self._coro
self._fut_waiter = None
- self.__class__._current_tasks[self._loop] = self
+ _enter_task(self._loop, self)
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
@@ -237,7 +252,7 @@ class Task(futures.Future):
new_exc = RuntimeError(f'Task got bad yield: {result!r}')
self._loop.call_soon(self._step, new_exc)
finally:
- self.__class__._current_tasks.pop(self._loop)
+ _leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.
def _wakeup(self, future):
@@ -715,3 +730,61 @@ def run_coroutine_threadsafe(coro, loop):
loop.call_soon_threadsafe(callback)
return future
+
+
+# WeakKeyDictionary of {Task: EventLoop} containing all tasks alive.
+# Task should be a weak reference to remove entry on task garbage
+# collection, EventLoop is required
+# to not access to private task._loop attribute.
+_all_tasks = weakref.WeakKeyDictionary()
+
+# Dictionary containing tasks that are currently active in
+# all running event loops. {EventLoop: Task}
+_current_tasks = {}
+
+
+def _register_task(loop, task):
+ """Register a new task in asyncio as executed by loop.
+
+ Returns None.
+ """
+ _all_tasks[task] = loop
+
+
+def _enter_task(loop, task):
+ current_task = _current_tasks.get(loop)
+ if current_task is not None:
+ raise RuntimeError(f"Cannot enter into task {task!r} while another "
+ f"task {current_task!r} is being executed.")
+ _current_tasks[loop] = task
+
+
+def _leave_task(loop, task):
+ current_task = _current_tasks.get(loop)
+ if current_task is not task:
+ raise RuntimeError(f"Leaving task {task!r} does not match "
+ f"the current task {current_task!r}.")
+ del _current_tasks[loop]
+
+
+def _unregister_task(loop, task):
+ _all_tasks.pop(task, None)
+
+
+_py_register_task = _register_task
+_py_unregister_task = _unregister_task
+_py_enter_task = _enter_task
+_py_leave_task = _leave_task
+
+
+try:
+ from _asyncio import (_register_task, _unregister_task,
+ _enter_task, _leave_task,
+ _all_tasks, _current_tasks)
+except ImportError:
+ pass
+else:
+ _c_register_task = _register_task
+ _c_unregister_task = _unregister_task
+ _c_enter_task = _enter_task
+ _c_leave_task = _leave_task