diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2017-12-16 19:58:38 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-16 19:58:38 (GMT) |
commit | 44d1a5912ea629aa20fdc377a5ab69d9ccf75d61 (patch) | |
tree | 8634c0010adf1de08980dd9d47043a40eb904120 /Modules/_asynciomodule.c | |
parent | 950840261c349e100ec5d7381fcd742c017e242d (diff) | |
download | cpython-44d1a5912ea629aa20fdc377a5ab69d9ccf75d61.zip cpython-44d1a5912ea629aa20fdc377a5ab69d9ccf75d61.tar.gz cpython-44d1a5912ea629aa20fdc377a5ab69d9ccf75d61.tar.bz2 |
bpo-32250: Implement asyncio.current_task() and asyncio.all_tasks() (#4799)
Diffstat (limited to 'Modules/_asynciomodule.c')
-rw-r--r-- | Modules/_asynciomodule.c | 370 |
1 files changed, 260 insertions, 110 deletions
diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 9ac1c44..378bd08 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -11,9 +11,12 @@ module _asyncio /* identifiers used from some functions */ _Py_IDENTIFIER(__asyncio_running_event_loop__); _Py_IDENTIFIER(add_done_callback); +_Py_IDENTIFIER(all_tasks); _Py_IDENTIFIER(call_soon); _Py_IDENTIFIER(cancel); +_Py_IDENTIFIER(current_task); _Py_IDENTIFIER(get_event_loop); +_Py_IDENTIFIER(pop); _Py_IDENTIFIER(send); _Py_IDENTIFIER(throw); _Py_IDENTIFIER(_step); @@ -22,19 +25,29 @@ _Py_IDENTIFIER(_wakeup); /* State of the _asyncio module */ -static PyObject *all_tasks; -static PyObject *current_tasks; +static PyObject *asyncio_mod; +static PyObject *inspect_isgenerator; +static PyObject *os_getpid; static PyObject *traceback_extract_stack; static PyObject *asyncio_get_event_loop_policy; -static PyObject *asyncio_iscoroutine_func; static PyObject *asyncio_future_repr_info_func; -static PyObject *asyncio_task_repr_info_func; +static PyObject *asyncio_iscoroutine_func; static PyObject *asyncio_task_get_stack_func; static PyObject *asyncio_task_print_stack_func; +static PyObject *asyncio_task_repr_info_func; static PyObject *asyncio_InvalidStateError; static PyObject *asyncio_CancelledError; -static PyObject *inspect_isgenerator; -static PyObject *os_getpid; + + +/* WeakKeyDictionary of {Task: EventLoop} containing all tasks alive. + Task should be a weak reference to remove entry on task garbage + collection, EventLoop is required + to not access to private task._loop attribute. */ +static PyObject *current_tasks; + +/* Dictionary containing tasks that are currently active in + all running event loops. {EventLoop: Task} */ +static PyObject *all_tasks; typedef enum { @@ -1445,6 +1458,80 @@ TaskWakeupMethWrapper_new(TaskObj *task) return (PyObject*) o; } +/* ----- Task introspection helpers */ + +static int +register_task(PyObject *loop, PyObject *task) +{ + return PyObject_SetItem(all_tasks, task, loop); +} + + +static int +unregister_task(PyObject *loop, PyObject *task) +{ + PyObject *res; + + res = _PyObject_CallMethodIdObjArgs(all_tasks, &PyId_pop, + task, Py_None, NULL); + if (res == NULL) { + return -1; + } + Py_DECREF(res); + return 0; +} + + +static int +enter_task(PyObject *loop, PyObject *task) +{ + PyObject *item; + Py_hash_t hash; + hash = PyObject_Hash(loop); + if (hash == -1) { + return -1; + } + item = _PyDict_GetItem_KnownHash(current_tasks, loop, hash); + if (item != NULL) { + PyErr_Format( + PyExc_RuntimeError, + "Cannot enter into task %R while another " \ + "task %R is being executed.", + task, item, NULL); + return -1; + } + if (_PyDict_SetItem_KnownHash(current_tasks, loop, task, hash) < 0) { + return -1; + } + return 0; +} + + +static int +leave_task(PyObject *loop, PyObject *task) +/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/ +{ + PyObject *item; + Py_hash_t hash; + hash = PyObject_Hash(loop); + if (hash == -1) { + return -1; + } + item = _PyDict_GetItem_KnownHash(current_tasks, loop, hash); + if (item != task) { + if (item == NULL) { + /* Not entered, replace with None */ + item = Py_None; + } + PyErr_Format( + PyExc_RuntimeError, + "Leaving task %R does not match the current task %R.", + task, item, NULL); + return -1; + } + return _PyDict_DelItem_KnownHash(current_tasks, loop, hash); +} + /* ----- Task */ /*[clinic input] @@ -1463,8 +1550,6 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop) { PyObject *res; int tmp; - _Py_IDENTIFIER(add); - if (future_init((FutureObj*)self, loop)) { return -1; } @@ -1500,14 +1585,7 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop) if (task_call_step_soon(self, NULL)) { return -1; } - - res = _PyObject_CallMethodIdObjArgs(all_tasks, &PyId_add, self, NULL); - if (res == NULL) { - return -1; - } - Py_DECREF(res); - - return 0; + return register_task(self->task_loop, (PyObject*)self); } static int @@ -1600,76 +1678,36 @@ static PyObject * _asyncio_Task_current_task_impl(PyTypeObject *type, PyObject *loop) /*[clinic end generated code: output=99fbe7332c516e03 input=cd14770c5b79c7eb]*/ { - PyObject *res; + PyObject *ret; + PyObject *current_task_func; + + if (PyErr_WarnEx(PyExc_PendingDeprecationWarning, + "Task.current_task() is deprecated, " \ + "use asyncio.current_task() instead", + 1) < 0) { + return NULL; + } + + current_task_func = _PyObject_GetAttrId(asyncio_mod, &PyId_current_task); + if (current_task_func == NULL) { + return NULL; + } if (loop == Py_None) { loop = get_event_loop(); if (loop == NULL) { return NULL; } - - res = PyDict_GetItem(current_tasks, loop); + ret = PyObject_CallFunctionObjArgs(current_task_func, loop, NULL); + Py_DECREF(current_task_func); Py_DECREF(loop); + return ret; } else { - res = PyDict_GetItem(current_tasks, loop); - } - - if (res == NULL) { - Py_RETURN_NONE; - } - else { - Py_INCREF(res); - return res; - } -} - -static PyObject * -task_all_tasks(PyObject *loop) -{ - PyObject *task; - PyObject *task_loop; - PyObject *set; - PyObject *iter; - - assert(loop != NULL); - - set = PySet_New(NULL); - if (set == NULL) { - return NULL; - } - - iter = PyObject_GetIter(all_tasks); - if (iter == NULL) { - goto fail; - } - - while ((task = PyIter_Next(iter))) { - task_loop = PyObject_GetAttrString(task, "_loop"); - if (task_loop == NULL) { - Py_DECREF(task); - goto fail; - } - if (task_loop == loop) { - if (PySet_Add(set, task) == -1) { - Py_DECREF(task_loop); - Py_DECREF(task); - goto fail; - } - } - Py_DECREF(task_loop); - Py_DECREF(task); - } - if (PyErr_Occurred()) { - goto fail; + ret = PyObject_CallFunctionObjArgs(current_task_func, loop, NULL); + Py_DECREF(current_task_func); + return ret; } - Py_DECREF(iter); - return set; - -fail: - Py_DECREF(set); - Py_XDECREF(iter); - return NULL; } /*[clinic input] @@ -1688,20 +1726,22 @@ _asyncio_Task_all_tasks_impl(PyTypeObject *type, PyObject *loop) /*[clinic end generated code: output=11f9b20749ccca5d input=497f80bc9ce726b5]*/ { PyObject *res; + PyObject *all_tasks_func; - if (loop == Py_None) { - loop = get_event_loop(); - if (loop == NULL) { - return NULL; - } - - res = task_all_tasks(loop); - Py_DECREF(loop); + all_tasks_func = _PyObject_GetAttrId(asyncio_mod, &PyId_all_tasks); + if (all_tasks_func == NULL) { + return NULL; } - else { - res = task_all_tasks(loop); + + if (PyErr_WarnEx(PyExc_PendingDeprecationWarning, + "Task.all_tasks() is deprecated, " \ + "use asyncio.all_tasks() instead", + 1) < 0) { + return NULL; } + res = PyObject_CallFunctionObjArgs(all_tasks_func, loop, NULL); + Py_DECREF(all_tasks_func); return res; } @@ -2437,11 +2477,8 @@ static PyObject * task_step(TaskObj *task, PyObject *exc) { PyObject *res; - PyObject *ot; - if (PyDict_SetItem(current_tasks, - task->task_loop, (PyObject*)task) == -1) - { + if (enter_task(task->task_loop, (PyObject*)task) < 0) { return NULL; } @@ -2450,19 +2487,16 @@ task_step(TaskObj *task, PyObject *exc) if (res == NULL) { PyObject *et, *ev, *tb; PyErr_Fetch(&et, &ev, &tb); - ot = _PyDict_Pop(current_tasks, task->task_loop, NULL); - Py_XDECREF(ot); + leave_task(task->task_loop, (PyObject*)task); _PyErr_ChainExceptions(et, ev, tb); return NULL; } else { - ot = _PyDict_Pop(current_tasks, task->task_loop, NULL); - if (ot == NULL) { + if(leave_task(task->task_loop, (PyObject*)task) < 0) { Py_DECREF(res); return NULL; } else { - Py_DECREF(ot); return res; } } @@ -2615,6 +2649,99 @@ _asyncio_get_running_loop_impl(PyObject *module) return loop; } +/*[clinic input] +_asyncio._register_task + + loop: object + task: object + +Register a new task in asyncio as executed by loop. + +Returns None. +[clinic start generated code]*/ + +static PyObject * +_asyncio__register_task_impl(PyObject *module, PyObject *loop, + PyObject *task) +/*[clinic end generated code: output=54c5cb733dbe0f38 input=9b5fee38fcb2c288]*/ +{ + if (register_task(loop, task) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + + +/*[clinic input] +_asyncio._unregister_task + + loop: object + task: object + +Unregister a task. + +Returns None. +[clinic start generated code]*/ + +static PyObject * +_asyncio__unregister_task_impl(PyObject *module, PyObject *loop, + PyObject *task) +/*[clinic end generated code: output=f634743a76b84ebc input=51fa1820634ef331]*/ +{ + if (unregister_task(loop, task) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + + +/*[clinic input] +_asyncio._enter_task + + loop: object + task: object + +Enter into task execution or resume suspended task. + +Task belongs to loop. + +Returns None. +[clinic start generated code]*/ + +static PyObject * +_asyncio__enter_task_impl(PyObject *module, PyObject *loop, PyObject *task) +/*[clinic end generated code: output=a22611c858035b73 input=de1b06dca70d8737]*/ +{ + if (enter_task(loop, task) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + + +/*[clinic input] +_asyncio._leave_task + + loop: object + task: object + +Leave task execution or suspend a task. + +Task belongs to loop. + +Returns None. +[clinic start generated code]*/ + +static PyObject * +_asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task) +/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/ +{ + if (leave_task(loop, task) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + /*********************** Module **************************/ @@ -2622,26 +2749,37 @@ _asyncio_get_running_loop_impl(PyObject *module) static void module_free(void *m) { - Py_CLEAR(current_tasks); - Py_CLEAR(all_tasks); + Py_CLEAR(asyncio_mod); + Py_CLEAR(inspect_isgenerator); + Py_CLEAR(os_getpid); Py_CLEAR(traceback_extract_stack); - Py_CLEAR(asyncio_get_event_loop_policy); Py_CLEAR(asyncio_future_repr_info_func); + Py_CLEAR(asyncio_get_event_loop_policy); Py_CLEAR(asyncio_iscoroutine_func); - Py_CLEAR(asyncio_task_repr_info_func); Py_CLEAR(asyncio_task_get_stack_func); Py_CLEAR(asyncio_task_print_stack_func); + Py_CLEAR(asyncio_task_repr_info_func); Py_CLEAR(asyncio_InvalidStateError); Py_CLEAR(asyncio_CancelledError); - Py_CLEAR(inspect_isgenerator); - Py_CLEAR(os_getpid); + + Py_CLEAR(current_tasks); + Py_CLEAR(all_tasks); } static int module_init(void) { PyObject *module = NULL; - PyObject *cls; + + asyncio_mod = PyImport_ImportModule("asyncio"); + if (asyncio_mod == NULL) { + goto fail; + } + + current_tasks = PyDict_New(); + if (current_tasks == NULL) { + goto fail; + } #define WITH_MOD(NAME) \ Py_CLEAR(module); \ @@ -2681,19 +2819,15 @@ module_init(void) WITH_MOD("traceback") GET_MOD_ATTR(traceback_extract_stack, "extract_stack") + PyObject *weak_key_dict; WITH_MOD("weakref") - GET_MOD_ATTR(cls, "WeakSet") - all_tasks = _PyObject_CallNoArg(cls); - Py_DECREF(cls); + GET_MOD_ATTR(weak_key_dict, "WeakKeyDictionary"); + all_tasks = _PyObject_CallNoArg(weak_key_dict); + Py_CLEAR(weak_key_dict); if (all_tasks == NULL) { goto fail; } - current_tasks = PyDict_New(); - if (current_tasks == NULL) { - goto fail; - } - Py_DECREF(module); return 0; @@ -2713,6 +2847,10 @@ static PyMethodDef asyncio_methods[] = { _ASYNCIO_GET_RUNNING_LOOP_METHODDEF _ASYNCIO__GET_RUNNING_LOOP_METHODDEF _ASYNCIO__SET_RUNNING_LOOP_METHODDEF + _ASYNCIO__REGISTER_TASK_METHODDEF + _ASYNCIO__UNREGISTER_TASK_METHODDEF + _ASYNCIO__ENTER_TASK_METHODDEF + _ASYNCIO__LEAVE_TASK_METHODDEF {NULL, NULL} }; @@ -2768,5 +2906,17 @@ PyInit__asyncio(void) return NULL; } + Py_INCREF(all_tasks); + if (PyModule_AddObject(m, "_all_tasks", all_tasks) < 0) { + Py_DECREF(all_tasks); + return NULL; + } + + Py_INCREF(current_tasks); + if (PyModule_AddObject(m, "_current_tasks", current_tasks) < 0) { + Py_DECREF(current_tasks); + return NULL; + } + return m; } |