From 44d1a5912ea629aa20fdc377a5ab69d9ccf75d61 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 16 Dec 2017 21:58:38 +0200 Subject: bpo-32250: Implement asyncio.current_task() and asyncio.all_tasks() (#4799) --- Doc/library/asyncio-task.rst | 22 ++ Lib/asyncio/tasks.py | 101 +++++- Lib/test/test_asyncio/test_tasks.py | 206 ++++++++++-- .../2017-12-12-16-58-20.bpo-32250.UljTa0.rst | 5 + Modules/_asynciomodule.c | 370 +++++++++++++++------ Modules/clinic/_asynciomodule.c.h | 140 +++++++- 6 files changed, 696 insertions(+), 148 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2017-12-12-16-58-20.bpo-32250.UljTa0.rst diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 72fae5e..d85dddf 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -528,6 +528,28 @@ Task functions the event loop object used by the underlying task or coroutine. If it's not provided, the default event loop is used. + +.. function:: current_task(loop=None): + + Return the current running :class:`Task` instance or ``None``, if + no task is running. + + If *loop* is ``None`` :func:`get_running_loop` is used to get + the current loop. + + .. versionadded:: 3.7 + + +.. function:: all_tasks(loop=None): + + Return a set of :class:`Task` objects created for the loop. + + If *loop* is ``None`` :func:`get_event_loop` is used for getting + current loop. + + .. versionadded:: 3.7 + + .. function:: as_completed(fs, \*, loop=None, timeout=None) Return an iterator whose values, when waited for, are :class:`Future` diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 172057e..cdb483a 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -5,6 +5,8 @@ __all__ = ( 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 'wait', 'wait_for', 'as_completed', 'sleep', 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', + 'current_task', 'all_tasks', + '_register_task', '_unregister_task', '_enter_task', '_leave_task', ) import concurrent.futures @@ -21,6 +23,20 @@ from . import futures from .coroutines import coroutine +def current_task(loop=None): + """Return a currently executed task.""" + if loop is None: + loop = events.get_running_loop() + return _current_tasks.get(loop) + + +def all_tasks(loop=None): + """Return a set of all tasks for the loop.""" + if loop is None: + loop = events.get_event_loop() + return {t for t, l in _all_tasks.items() if l is loop} + + class Task(futures.Future): """A coroutine wrapped in a Future.""" @@ -33,13 +49,6 @@ class Task(futures.Future): # _wakeup(). When _fut_waiter is not None, one of its callbacks # must be _wakeup(). - # Weak set containing all tasks alive. - _all_tasks = weakref.WeakSet() - - # Dictionary containing tasks that are currently active in - # all running event loops. {EventLoop: Task} - _current_tasks = {} - # If False, don't log a message if the task is destroyed whereas its # status is still pending _log_destroy_pending = True @@ -52,9 +61,13 @@ class Task(futures.Future): None is returned when called not in the context of a Task. """ + warnings.warn("Task.current_task() is deprecated, " + "use asyncio.current_task() instead", + PendingDeprecationWarning, + stacklevel=2) if loop is None: loop = events.get_event_loop() - return cls._current_tasks.get(loop) + return current_task(loop) @classmethod def all_tasks(cls, loop=None): @@ -62,9 +75,11 @@ class Task(futures.Future): By default all tasks for the current event loop are returned. """ - if loop is None: - loop = events.get_event_loop() - return {t for t in cls._all_tasks if t._loop is loop} + warnings.warn("Task.all_tasks() is deprecated, " + "use asyncio.all_tasks() instead", + PendingDeprecationWarning, + stacklevel=2) + return all_tasks(loop) def __init__(self, coro, *, loop=None): super().__init__(loop=loop) @@ -81,7 +96,7 @@ class Task(futures.Future): self._coro = coro self._loop.call_soon(self._step) - self.__class__._all_tasks.add(self) + _register_task(self._loop, self) def __del__(self): if self._state == futures._PENDING and self._log_destroy_pending: @@ -173,7 +188,7 @@ class Task(futures.Future): coro = self._coro self._fut_waiter = None - self.__class__._current_tasks[self._loop] = self + _enter_task(self._loop, self) # Call either coro.throw(exc) or coro.send(None). try: if exc is None: @@ -237,7 +252,7 @@ class Task(futures.Future): new_exc = RuntimeError(f'Task got bad yield: {result!r}') self._loop.call_soon(self._step, new_exc) finally: - self.__class__._current_tasks.pop(self._loop) + _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs. def _wakeup(self, future): @@ -715,3 +730,61 @@ def run_coroutine_threadsafe(coro, loop): loop.call_soon_threadsafe(callback) return future + + +# WeakKeyDictionary of {Task: EventLoop} containing all tasks alive. +# Task should be a weak reference to remove entry on task garbage +# collection, EventLoop is required +# to not access to private task._loop attribute. +_all_tasks = weakref.WeakKeyDictionary() + +# Dictionary containing tasks that are currently active in +# all running event loops. {EventLoop: Task} +_current_tasks = {} + + +def _register_task(loop, task): + """Register a new task in asyncio as executed by loop. + + Returns None. + """ + _all_tasks[task] = loop + + +def _enter_task(loop, task): + current_task = _current_tasks.get(loop) + if current_task is not None: + raise RuntimeError(f"Cannot enter into task {task!r} while another " + f"task {current_task!r} is being executed.") + _current_tasks[loop] = task + + +def _leave_task(loop, task): + current_task = _current_tasks.get(loop) + if current_task is not task: + raise RuntimeError(f"Leaving task {task!r} does not match " + f"the current task {current_task!r}.") + del _current_tasks[loop] + + +def _unregister_task(loop, task): + _all_tasks.pop(task, None) + + +_py_register_task = _register_task +_py_unregister_task = _unregister_task +_py_enter_task = _enter_task +_py_leave_task = _leave_task + + +try: + from _asyncio import (_register_task, _unregister_task, + _enter_task, _leave_task, + _all_tasks, _current_tasks) +except ImportError: + pass +else: + _c_register_task = _register_task + _c_unregister_task = _unregister_task + _c_enter_task = _enter_task + _c_leave_task = _leave_task diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index a32dca1..5429facb 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -1493,53 +1493,69 @@ class BaseTaskTests: self.assertEqual(res, 'test') self.assertIsNone(t2.result()) - def test_current_task(self): + + def test_current_task_deprecated(self): Task = self.__class__.Task - self.assertIsNone(Task.current_task(loop=self.loop)) + with self.assertWarns(PendingDeprecationWarning): + self.assertIsNone(Task.current_task(loop=self.loop)) - @asyncio.coroutine - def coro(loop): - self.assertTrue(Task.current_task(loop=loop) is task) + async def coro(loop): + with self.assertWarns(PendingDeprecationWarning): + self.assertIs(Task.current_task(loop=loop), task) # See http://bugs.python.org/issue29271 for details: asyncio.set_event_loop(loop) try: - self.assertIs(Task.current_task(None), task) - self.assertIs(Task.current_task(), task) + with self.assertWarns(PendingDeprecationWarning): + self.assertIs(Task.current_task(None), task) + with self.assertWarns(PendingDeprecationWarning): + self.assertIs(Task.current_task(), task) finally: asyncio.set_event_loop(None) task = self.new_task(self.loop, coro(self.loop)) self.loop.run_until_complete(task) - self.assertIsNone(Task.current_task(loop=self.loop)) + with self.assertWarns(PendingDeprecationWarning): + self.assertIsNone(Task.current_task(loop=self.loop)) - def test_current_task_with_interleaving_tasks(self): - Task = self.__class__.Task + def test_current_task(self): + self.assertIsNone(asyncio.current_task(loop=self.loop)) - self.assertIsNone(Task.current_task(loop=self.loop)) + async def coro(loop): + self.assertIs(asyncio.current_task(loop=loop), task) + + self.assertIs(asyncio.current_task(None), task) + self.assertIs(asyncio.current_task(), task) + + task = self.new_task(self.loop, coro(self.loop)) + self.loop.run_until_complete(task) + self.assertIsNone(asyncio.current_task(loop=self.loop)) + + def test_current_task_with_interleaving_tasks(self): + self.assertIsNone(asyncio.current_task(loop=self.loop)) fut1 = self.new_future(self.loop) fut2 = self.new_future(self.loop) async def coro1(loop): - self.assertTrue(Task.current_task(loop=loop) is task1) + self.assertTrue(asyncio.current_task(loop=loop) is task1) await fut1 - self.assertTrue(Task.current_task(loop=loop) is task1) + self.assertTrue(asyncio.current_task(loop=loop) is task1) fut2.set_result(True) async def coro2(loop): - self.assertTrue(Task.current_task(loop=loop) is task2) + self.assertTrue(asyncio.current_task(loop=loop) is task2) fut1.set_result(True) await fut2 - self.assertTrue(Task.current_task(loop=loop) is task2) + self.assertTrue(asyncio.current_task(loop=loop) is task2) task1 = self.new_task(self.loop, coro1(self.loop)) task2 = self.new_task(self.loop, coro2(self.loop)) self.loop.run_until_complete(asyncio.wait((task1, task2), loop=self.loop)) - self.assertIsNone(Task.current_task(loop=self.loop)) + self.assertIsNone(asyncio.current_task(loop=self.loop)) # Some thorough tests for cancellation propagation through # coroutines, tasks and wait(). @@ -1826,6 +1842,16 @@ class BaseTaskTests: self.assertIsInstance(exception, Exception) self.assertEqual(exception.args, ("foo", )) + def test_all_tasks_deprecated(self): + Task = self.__class__.Task + + async def coro(): + with self.assertWarns(PendingDeprecationWarning): + assert Task.all_tasks(self.loop) == {t} + + t = self.new_task(self.loop, coro()) + self.loop.run_until_complete(t) + def test_log_destroyed_pending_task(self): Task = self.__class__.Task @@ -1845,13 +1871,13 @@ class BaseTaskTests: coro = kill_me(self.loop) task = asyncio.ensure_future(coro, loop=self.loop) - self.assertEqual(Task.all_tasks(loop=self.loop), {task}) + self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) # See http://bugs.python.org/issue29271 for details: asyncio.set_event_loop(self.loop) try: - self.assertEqual(Task.all_tasks(), {task}) - self.assertEqual(Task.all_tasks(None), {task}) + self.assertEqual(asyncio.all_tasks(), {task}) + self.assertEqual(asyncio.all_tasks(None), {task}) finally: asyncio.set_event_loop(None) @@ -1868,7 +1894,7 @@ class BaseTaskTests: # no more reference to kill_me() task: the task is destroyed by the GC support.gc_collect() - self.assertEqual(Task.all_tasks(loop=self.loop), set()) + self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) mock_handler.assert_called_with(self.loop, { 'message': 'Task was destroyed but it is pending!', @@ -2052,7 +2078,7 @@ class BaseTaskTests: message = m_log.error.call_args[0][0] self.assertIn('Task was destroyed but it is pending', message) - self.assertEqual(self.Task.all_tasks(self.loop), set()) + self.assertEqual(asyncio.all_tasks(self.loop), set()) def test_create_task_with_noncoroutine(self): with self.assertRaisesRegex(TypeError, @@ -2201,6 +2227,140 @@ class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): Future = futures._PyFuture +class BaseTaskIntrospectionTests: + _register_task = None + _unregister_task = None + _enter_task = None + _leave_task = None + + def test__register_task(self): + task = mock.Mock() + loop = mock.Mock() + self.assertEqual(asyncio.all_tasks(loop), set()) + self._register_task(loop, task) + self.assertEqual(asyncio.all_tasks(loop), {task}) + self._unregister_task(loop, task) + + def test__enter_task(self): + task = mock.Mock() + loop = mock.Mock() + self.assertIsNone(asyncio.current_task(loop)) + self._enter_task(loop, task) + self.assertIs(asyncio.current_task(loop), task) + self._leave_task(loop, task) + + def test__enter_task_failure(self): + task1 = mock.Mock() + task2 = mock.Mock() + loop = mock.Mock() + self._enter_task(loop, task1) + with self.assertRaises(RuntimeError): + self._enter_task(loop, task2) + self.assertIs(asyncio.current_task(loop), task1) + self._leave_task(loop, task1) + + def test__leave_task(self): + task = mock.Mock() + loop = mock.Mock() + self._enter_task(loop, task) + self._leave_task(loop, task) + self.assertIsNone(asyncio.current_task(loop)) + + def test__leave_task_failure1(self): + task1 = mock.Mock() + task2 = mock.Mock() + loop = mock.Mock() + self._enter_task(loop, task1) + with self.assertRaises(RuntimeError): + self._leave_task(loop, task2) + self.assertIs(asyncio.current_task(loop), task1) + self._leave_task(loop, task1) + + def test__leave_task_failure2(self): + task = mock.Mock() + loop = mock.Mock() + with self.assertRaises(RuntimeError): + self._leave_task(loop, task) + self.assertIsNone(asyncio.current_task(loop)) + + def test__unregister_task(self): + task = mock.Mock() + loop = mock.Mock() + self._register_task(loop, task) + self._unregister_task(loop, task) + self.assertEqual(asyncio.all_tasks(loop), set()) + + def test__unregister_task_not_registered(self): + task = mock.Mock() + loop = mock.Mock() + self._unregister_task(loop, task) + self.assertEqual(asyncio.all_tasks(loop), set()) + + +class PyIntrospectionTests(unittest.TestCase, BaseTaskIntrospectionTests): + _register_task = staticmethod(tasks._py_register_task) + _unregister_task = staticmethod(tasks._py_unregister_task) + _enter_task = staticmethod(tasks._py_enter_task) + _leave_task = staticmethod(tasks._py_leave_task) + + +@unittest.skipUnless(hasattr(tasks, '_c_register_task'), + 'requires the C _asyncio module') +class CIntrospectionTests(unittest.TestCase, BaseTaskIntrospectionTests): + _register_task = staticmethod(tasks._c_register_task) + _unregister_task = staticmethod(tasks._c_unregister_task) + _enter_task = staticmethod(tasks._c_enter_task) + _leave_task = staticmethod(tasks._c_leave_task) + + +class BaseCurrentLoopTests: + + def setUp(self): + super().setUp() + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + self.loop.close() + asyncio.set_event_loop(None) + super().tearDown() + + def new_task(self, coro): + raise NotImplementedError + + def test_current_task_no_running_loop(self): + self.assertIsNone(asyncio.current_task(loop=self.loop)) + + def test_current_task_no_running_loop_implicit(self): + with self.assertRaises(RuntimeError): + asyncio.current_task() + + def test_current_task_with_implicit_loop(self): + async def coro(): + self.assertIs(asyncio.current_task(loop=self.loop), task) + + self.assertIs(asyncio.current_task(None), task) + self.assertIs(asyncio.current_task(), task) + + task = self.new_task(coro()) + self.loop.run_until_complete(task) + self.assertIsNone(asyncio.current_task(loop=self.loop)) + + +class PyCurrentLoopTests(BaseCurrentLoopTests, unittest.TestCase): + + def new_task(self, coro): + return tasks._PyTask(coro, loop=self.loop) + + +@unittest.skipUnless(hasattr(tasks, '_CTask'), + 'requires the C _asyncio module') +class CCurrentLoopTests(BaseCurrentLoopTests, unittest.TestCase): + + def new_task(self, coro): + return getattr(tasks, '_CTask')(coro, loop=self.loop) + + class GenericTaskTests(test_utils.TestCase): def test_future_subclass(self): @@ -2522,7 +2682,7 @@ class RunCoroutineThreadsafeTests(test_utils.TestCase): if fail: raise RuntimeError("Fail!") if cancel: - asyncio.tasks.Task.current_task(self.loop).cancel() + asyncio.current_task(self.loop).cancel() yield return a + b @@ -2568,7 +2728,7 @@ class RunCoroutineThreadsafeTests(test_utils.TestCase): self.loop.run_until_complete(future) test_utils.run_briefly(self.loop) # Check that there's no pending task (add has been cancelled) - for task in asyncio.Task.all_tasks(self.loop): + for task in asyncio.all_tasks(self.loop): self.assertTrue(task.done()) def test_run_coroutine_threadsafe_task_cancelled(self): diff --git a/Misc/NEWS.d/next/Library/2017-12-12-16-58-20.bpo-32250.UljTa0.rst b/Misc/NEWS.d/next/Library/2017-12-12-16-58-20.bpo-32250.UljTa0.rst new file mode 100644 index 0000000..f2d016d --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-12-12-16-58-20.bpo-32250.UljTa0.rst @@ -0,0 +1,5 @@ +Implement ``asyncio.current_task()`` and ``asyncio.all_tasks()``. Add +helpers intended to be used by alternative task implementations: +``asyncio._register_task``, ``asyncio._enter_task``, ``asyncio._leave_task`` +and ``asyncio._unregister_task``. Deprecate ``asyncio.Task.current_task()`` +and ``asyncio.Task.all_tasks()``. diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 9ac1c44..378bd08 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -11,9 +11,12 @@ module _asyncio /* identifiers used from some functions */ _Py_IDENTIFIER(__asyncio_running_event_loop__); _Py_IDENTIFIER(add_done_callback); +_Py_IDENTIFIER(all_tasks); _Py_IDENTIFIER(call_soon); _Py_IDENTIFIER(cancel); +_Py_IDENTIFIER(current_task); _Py_IDENTIFIER(get_event_loop); +_Py_IDENTIFIER(pop); _Py_IDENTIFIER(send); _Py_IDENTIFIER(throw); _Py_IDENTIFIER(_step); @@ -22,19 +25,29 @@ _Py_IDENTIFIER(_wakeup); /* State of the _asyncio module */ -static PyObject *all_tasks; -static PyObject *current_tasks; +static PyObject *asyncio_mod; +static PyObject *inspect_isgenerator; +static PyObject *os_getpid; static PyObject *traceback_extract_stack; static PyObject *asyncio_get_event_loop_policy; -static PyObject *asyncio_iscoroutine_func; static PyObject *asyncio_future_repr_info_func; -static PyObject *asyncio_task_repr_info_func; +static PyObject *asyncio_iscoroutine_func; static PyObject *asyncio_task_get_stack_func; static PyObject *asyncio_task_print_stack_func; +static PyObject *asyncio_task_repr_info_func; static PyObject *asyncio_InvalidStateError; static PyObject *asyncio_CancelledError; -static PyObject *inspect_isgenerator; -static PyObject *os_getpid; + + +/* WeakKeyDictionary of {Task: EventLoop} containing all tasks alive. + Task should be a weak reference to remove entry on task garbage + collection, EventLoop is required + to not access to private task._loop attribute. */ +static PyObject *current_tasks; + +/* Dictionary containing tasks that are currently active in + all running event loops. {EventLoop: Task} */ +static PyObject *all_tasks; typedef enum { @@ -1445,6 +1458,80 @@ TaskWakeupMethWrapper_new(TaskObj *task) return (PyObject*) o; } +/* ----- Task introspection helpers */ + +static int +register_task(PyObject *loop, PyObject *task) +{ + return PyObject_SetItem(all_tasks, task, loop); +} + + +static int +unregister_task(PyObject *loop, PyObject *task) +{ + PyObject *res; + + res = _PyObject_CallMethodIdObjArgs(all_tasks, &PyId_pop, + task, Py_None, NULL); + if (res == NULL) { + return -1; + } + Py_DECREF(res); + return 0; +} + + +static int +enter_task(PyObject *loop, PyObject *task) +{ + PyObject *item; + Py_hash_t hash; + hash = PyObject_Hash(loop); + if (hash == -1) { + return -1; + } + item = _PyDict_GetItem_KnownHash(current_tasks, loop, hash); + if (item != NULL) { + PyErr_Format( + PyExc_RuntimeError, + "Cannot enter into task %R while another " \ + "task %R is being executed.", + task, item, NULL); + return -1; + } + if (_PyDict_SetItem_KnownHash(current_tasks, loop, task, hash) < 0) { + return -1; + } + return 0; +} + + +static int +leave_task(PyObject *loop, PyObject *task) +/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/ +{ + PyObject *item; + Py_hash_t hash; + hash = PyObject_Hash(loop); + if (hash == -1) { + return -1; + } + item = _PyDict_GetItem_KnownHash(current_tasks, loop, hash); + if (item != task) { + if (item == NULL) { + /* Not entered, replace with None */ + item = Py_None; + } + PyErr_Format( + PyExc_RuntimeError, + "Leaving task %R does not match the current task %R.", + task, item, NULL); + return -1; + } + return _PyDict_DelItem_KnownHash(current_tasks, loop, hash); +} + /* ----- Task */ /*[clinic input] @@ -1463,8 +1550,6 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop) { PyObject *res; int tmp; - _Py_IDENTIFIER(add); - if (future_init((FutureObj*)self, loop)) { return -1; } @@ -1500,14 +1585,7 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop) if (task_call_step_soon(self, NULL)) { return -1; } - - res = _PyObject_CallMethodIdObjArgs(all_tasks, &PyId_add, self, NULL); - if (res == NULL) { - return -1; - } - Py_DECREF(res); - - return 0; + return register_task(self->task_loop, (PyObject*)self); } static int @@ -1600,76 +1678,36 @@ static PyObject * _asyncio_Task_current_task_impl(PyTypeObject *type, PyObject *loop) /*[clinic end generated code: output=99fbe7332c516e03 input=cd14770c5b79c7eb]*/ { - PyObject *res; + PyObject *ret; + PyObject *current_task_func; + + if (PyErr_WarnEx(PyExc_PendingDeprecationWarning, + "Task.current_task() is deprecated, " \ + "use asyncio.current_task() instead", + 1) < 0) { + return NULL; + } + + current_task_func = _PyObject_GetAttrId(asyncio_mod, &PyId_current_task); + if (current_task_func == NULL) { + return NULL; + } if (loop == Py_None) { loop = get_event_loop(); if (loop == NULL) { return NULL; } - - res = PyDict_GetItem(current_tasks, loop); + ret = PyObject_CallFunctionObjArgs(current_task_func, loop, NULL); + Py_DECREF(current_task_func); Py_DECREF(loop); + return ret; } else { - res = PyDict_GetItem(current_tasks, loop); - } - - if (res == NULL) { - Py_RETURN_NONE; - } - else { - Py_INCREF(res); - return res; - } -} - -static PyObject * -task_all_tasks(PyObject *loop) -{ - PyObject *task; - PyObject *task_loop; - PyObject *set; - PyObject *iter; - - assert(loop != NULL); - - set = PySet_New(NULL); - if (set == NULL) { - return NULL; - } - - iter = PyObject_GetIter(all_tasks); - if (iter == NULL) { - goto fail; - } - - while ((task = PyIter_Next(iter))) { - task_loop = PyObject_GetAttrString(task, "_loop"); - if (task_loop == NULL) { - Py_DECREF(task); - goto fail; - } - if (task_loop == loop) { - if (PySet_Add(set, task) == -1) { - Py_DECREF(task_loop); - Py_DECREF(task); - goto fail; - } - } - Py_DECREF(task_loop); - Py_DECREF(task); - } - if (PyErr_Occurred()) { - goto fail; + ret = PyObject_CallFunctionObjArgs(current_task_func, loop, NULL); + Py_DECREF(current_task_func); + return ret; } - Py_DECREF(iter); - return set; - -fail: - Py_DECREF(set); - Py_XDECREF(iter); - return NULL; } /*[clinic input] @@ -1688,20 +1726,22 @@ _asyncio_Task_all_tasks_impl(PyTypeObject *type, PyObject *loop) /*[clinic end generated code: output=11f9b20749ccca5d input=497f80bc9ce726b5]*/ { PyObject *res; + PyObject *all_tasks_func; - if (loop == Py_None) { - loop = get_event_loop(); - if (loop == NULL) { - return NULL; - } - - res = task_all_tasks(loop); - Py_DECREF(loop); + all_tasks_func = _PyObject_GetAttrId(asyncio_mod, &PyId_all_tasks); + if (all_tasks_func == NULL) { + return NULL; } - else { - res = task_all_tasks(loop); + + if (PyErr_WarnEx(PyExc_PendingDeprecationWarning, + "Task.all_tasks() is deprecated, " \ + "use asyncio.all_tasks() instead", + 1) < 0) { + return NULL; } + res = PyObject_CallFunctionObjArgs(all_tasks_func, loop, NULL); + Py_DECREF(all_tasks_func); return res; } @@ -2437,11 +2477,8 @@ static PyObject * task_step(TaskObj *task, PyObject *exc) { PyObject *res; - PyObject *ot; - if (PyDict_SetItem(current_tasks, - task->task_loop, (PyObject*)task) == -1) - { + if (enter_task(task->task_loop, (PyObject*)task) < 0) { return NULL; } @@ -2450,19 +2487,16 @@ task_step(TaskObj *task, PyObject *exc) if (res == NULL) { PyObject *et, *ev, *tb; PyErr_Fetch(&et, &ev, &tb); - ot = _PyDict_Pop(current_tasks, task->task_loop, NULL); - Py_XDECREF(ot); + leave_task(task->task_loop, (PyObject*)task); _PyErr_ChainExceptions(et, ev, tb); return NULL; } else { - ot = _PyDict_Pop(current_tasks, task->task_loop, NULL); - if (ot == NULL) { + if(leave_task(task->task_loop, (PyObject*)task) < 0) { Py_DECREF(res); return NULL; } else { - Py_DECREF(ot); return res; } } @@ -2615,6 +2649,99 @@ _asyncio_get_running_loop_impl(PyObject *module) return loop; } +/*[clinic input] +_asyncio._register_task + + loop: object + task: object + +Register a new task in asyncio as executed by loop. + +Returns None. +[clinic start generated code]*/ + +static PyObject * +_asyncio__register_task_impl(PyObject *module, PyObject *loop, + PyObject *task) +/*[clinic end generated code: output=54c5cb733dbe0f38 input=9b5fee38fcb2c288]*/ +{ + if (register_task(loop, task) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + + +/*[clinic input] +_asyncio._unregister_task + + loop: object + task: object + +Unregister a task. + +Returns None. +[clinic start generated code]*/ + +static PyObject * +_asyncio__unregister_task_impl(PyObject *module, PyObject *loop, + PyObject *task) +/*[clinic end generated code: output=f634743a76b84ebc input=51fa1820634ef331]*/ +{ + if (unregister_task(loop, task) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + + +/*[clinic input] +_asyncio._enter_task + + loop: object + task: object + +Enter into task execution or resume suspended task. + +Task belongs to loop. + +Returns None. +[clinic start generated code]*/ + +static PyObject * +_asyncio__enter_task_impl(PyObject *module, PyObject *loop, PyObject *task) +/*[clinic end generated code: output=a22611c858035b73 input=de1b06dca70d8737]*/ +{ + if (enter_task(loop, task) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + + +/*[clinic input] +_asyncio._leave_task + + loop: object + task: object + +Leave task execution or suspend a task. + +Task belongs to loop. + +Returns None. +[clinic start generated code]*/ + +static PyObject * +_asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task) +/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/ +{ + if (leave_task(loop, task) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + /*********************** Module **************************/ @@ -2622,26 +2749,37 @@ _asyncio_get_running_loop_impl(PyObject *module) static void module_free(void *m) { - Py_CLEAR(current_tasks); - Py_CLEAR(all_tasks); + Py_CLEAR(asyncio_mod); + Py_CLEAR(inspect_isgenerator); + Py_CLEAR(os_getpid); Py_CLEAR(traceback_extract_stack); - Py_CLEAR(asyncio_get_event_loop_policy); Py_CLEAR(asyncio_future_repr_info_func); + Py_CLEAR(asyncio_get_event_loop_policy); Py_CLEAR(asyncio_iscoroutine_func); - Py_CLEAR(asyncio_task_repr_info_func); Py_CLEAR(asyncio_task_get_stack_func); Py_CLEAR(asyncio_task_print_stack_func); + Py_CLEAR(asyncio_task_repr_info_func); Py_CLEAR(asyncio_InvalidStateError); Py_CLEAR(asyncio_CancelledError); - Py_CLEAR(inspect_isgenerator); - Py_CLEAR(os_getpid); + + Py_CLEAR(current_tasks); + Py_CLEAR(all_tasks); } static int module_init(void) { PyObject *module = NULL; - PyObject *cls; + + asyncio_mod = PyImport_ImportModule("asyncio"); + if (asyncio_mod == NULL) { + goto fail; + } + + current_tasks = PyDict_New(); + if (current_tasks == NULL) { + goto fail; + } #define WITH_MOD(NAME) \ Py_CLEAR(module); \ @@ -2681,19 +2819,15 @@ module_init(void) WITH_MOD("traceback") GET_MOD_ATTR(traceback_extract_stack, "extract_stack") + PyObject *weak_key_dict; WITH_MOD("weakref") - GET_MOD_ATTR(cls, "WeakSet") - all_tasks = _PyObject_CallNoArg(cls); - Py_DECREF(cls); + GET_MOD_ATTR(weak_key_dict, "WeakKeyDictionary"); + all_tasks = _PyObject_CallNoArg(weak_key_dict); + Py_CLEAR(weak_key_dict); if (all_tasks == NULL) { goto fail; } - current_tasks = PyDict_New(); - if (current_tasks == NULL) { - goto fail; - } - Py_DECREF(module); return 0; @@ -2713,6 +2847,10 @@ static PyMethodDef asyncio_methods[] = { _ASYNCIO_GET_RUNNING_LOOP_METHODDEF _ASYNCIO__GET_RUNNING_LOOP_METHODDEF _ASYNCIO__SET_RUNNING_LOOP_METHODDEF + _ASYNCIO__REGISTER_TASK_METHODDEF + _ASYNCIO__UNREGISTER_TASK_METHODDEF + _ASYNCIO__ENTER_TASK_METHODDEF + _ASYNCIO__LEAVE_TASK_METHODDEF {NULL, NULL} }; @@ -2768,5 +2906,17 @@ PyInit__asyncio(void) return NULL; } + Py_INCREF(all_tasks); + if (PyModule_AddObject(m, "_all_tasks", all_tasks) < 0) { + Py_DECREF(all_tasks); + return NULL; + } + + Py_INCREF(current_tasks); + if (PyModule_AddObject(m, "_current_tasks", current_tasks) < 0) { + Py_DECREF(current_tasks); + return NULL; + } + return m; } diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h index 952316c..9d5dea5 100644 --- a/Modules/clinic/_asynciomodule.c.h +++ b/Modules/clinic/_asynciomodule.c.h @@ -595,4 +595,142 @@ _asyncio_get_running_loop(PyObject *module, PyObject *Py_UNUSED(ignored)) { return _asyncio_get_running_loop_impl(module); } -/*[clinic end generated code: output=21e5424c3a5572b0 input=a9049054013a1b77]*/ + +PyDoc_STRVAR(_asyncio__register_task__doc__, +"_register_task($module, /, loop, task)\n" +"--\n" +"\n" +"Register a new task in asyncio as executed by loop.\n" +"\n" +"Returns None."); + +#define _ASYNCIO__REGISTER_TASK_METHODDEF \ + {"_register_task", (PyCFunction)_asyncio__register_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__register_task__doc__}, + +static PyObject * +_asyncio__register_task_impl(PyObject *module, PyObject *loop, + PyObject *task); + +static PyObject * +_asyncio__register_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"loop", "task", NULL}; + static _PyArg_Parser _parser = {"OO:_register_task", _keywords, 0}; + PyObject *loop; + PyObject *task; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &loop, &task)) { + goto exit; + } + return_value = _asyncio__register_task_impl(module, loop, task); + +exit: + return return_value; +} + +PyDoc_STRVAR(_asyncio__unregister_task__doc__, +"_unregister_task($module, /, loop, task)\n" +"--\n" +"\n" +"Unregister a task.\n" +"\n" +"Returns None."); + +#define _ASYNCIO__UNREGISTER_TASK_METHODDEF \ + {"_unregister_task", (PyCFunction)_asyncio__unregister_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__unregister_task__doc__}, + +static PyObject * +_asyncio__unregister_task_impl(PyObject *module, PyObject *loop, + PyObject *task); + +static PyObject * +_asyncio__unregister_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"loop", "task", NULL}; + static _PyArg_Parser _parser = {"OO:_unregister_task", _keywords, 0}; + PyObject *loop; + PyObject *task; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &loop, &task)) { + goto exit; + } + return_value = _asyncio__unregister_task_impl(module, loop, task); + +exit: + return return_value; +} + +PyDoc_STRVAR(_asyncio__enter_task__doc__, +"_enter_task($module, /, loop, task)\n" +"--\n" +"\n" +"Enter into task execution or resume suspended task.\n" +"\n" +"Task belongs to loop.\n" +"\n" +"Returns None."); + +#define _ASYNCIO__ENTER_TASK_METHODDEF \ + {"_enter_task", (PyCFunction)_asyncio__enter_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__enter_task__doc__}, + +static PyObject * +_asyncio__enter_task_impl(PyObject *module, PyObject *loop, PyObject *task); + +static PyObject * +_asyncio__enter_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"loop", "task", NULL}; + static _PyArg_Parser _parser = {"OO:_enter_task", _keywords, 0}; + PyObject *loop; + PyObject *task; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &loop, &task)) { + goto exit; + } + return_value = _asyncio__enter_task_impl(module, loop, task); + +exit: + return return_value; +} + +PyDoc_STRVAR(_asyncio__leave_task__doc__, +"_leave_task($module, /, loop, task)\n" +"--\n" +"\n" +"Leave task execution or suspend a task.\n" +"\n" +"Task belongs to loop.\n" +"\n" +"Returns None."); + +#define _ASYNCIO__LEAVE_TASK_METHODDEF \ + {"_leave_task", (PyCFunction)_asyncio__leave_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__leave_task__doc__}, + +static PyObject * +_asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task); + +static PyObject * +_asyncio__leave_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"loop", "task", NULL}; + static _PyArg_Parser _parser = {"OO:_leave_task", _keywords, 0}; + PyObject *loop; + PyObject *task; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &loop, &task)) { + goto exit; + } + return_value = _asyncio__leave_task_impl(module, loop, task); + +exit: + return return_value; +} +/*[clinic end generated code: output=0033af17965b51b4 input=a9049054013a1b77]*/ -- cgit v0.12