summaryrefslogtreecommitdiffstats
path: root/Modules/_asynciomodule.c
diff options
context:
space:
mode:
authorAndrew Svetlov <andrew.svetlov@gmail.com>2017-12-16 19:58:38 (GMT)
committerGitHub <noreply@github.com>2017-12-16 19:58:38 (GMT)
commit44d1a5912ea629aa20fdc377a5ab69d9ccf75d61 (patch)
tree8634c0010adf1de08980dd9d47043a40eb904120 /Modules/_asynciomodule.c
parent950840261c349e100ec5d7381fcd742c017e242d (diff)
downloadcpython-44d1a5912ea629aa20fdc377a5ab69d9ccf75d61.zip
cpython-44d1a5912ea629aa20fdc377a5ab69d9ccf75d61.tar.gz
cpython-44d1a5912ea629aa20fdc377a5ab69d9ccf75d61.tar.bz2
bpo-32250: Implement asyncio.current_task() and asyncio.all_tasks() (#4799)
Diffstat (limited to 'Modules/_asynciomodule.c')
-rw-r--r--Modules/_asynciomodule.c370
1 files changed, 260 insertions, 110 deletions
diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c
index 9ac1c44..378bd08 100644
--- a/Modules/_asynciomodule.c
+++ b/Modules/_asynciomodule.c
@@ -11,9 +11,12 @@ module _asyncio
/* identifiers used from some functions */
_Py_IDENTIFIER(__asyncio_running_event_loop__);
_Py_IDENTIFIER(add_done_callback);
+_Py_IDENTIFIER(all_tasks);
_Py_IDENTIFIER(call_soon);
_Py_IDENTIFIER(cancel);
+_Py_IDENTIFIER(current_task);
_Py_IDENTIFIER(get_event_loop);
+_Py_IDENTIFIER(pop);
_Py_IDENTIFIER(send);
_Py_IDENTIFIER(throw);
_Py_IDENTIFIER(_step);
@@ -22,19 +25,29 @@ _Py_IDENTIFIER(_wakeup);
/* State of the _asyncio module */
-static PyObject *all_tasks;
-static PyObject *current_tasks;
+static PyObject *asyncio_mod;
+static PyObject *inspect_isgenerator;
+static PyObject *os_getpid;
static PyObject *traceback_extract_stack;
static PyObject *asyncio_get_event_loop_policy;
-static PyObject *asyncio_iscoroutine_func;
static PyObject *asyncio_future_repr_info_func;
-static PyObject *asyncio_task_repr_info_func;
+static PyObject *asyncio_iscoroutine_func;
static PyObject *asyncio_task_get_stack_func;
static PyObject *asyncio_task_print_stack_func;
+static PyObject *asyncio_task_repr_info_func;
static PyObject *asyncio_InvalidStateError;
static PyObject *asyncio_CancelledError;
-static PyObject *inspect_isgenerator;
-static PyObject *os_getpid;
+
+
+/* WeakKeyDictionary of {Task: EventLoop} containing all tasks alive.
+ Task should be a weak reference to remove entry on task garbage
+ collection, EventLoop is required
+ to not access to private task._loop attribute. */
+static PyObject *current_tasks;
+
+/* Dictionary containing tasks that are currently active in
+ all running event loops. {EventLoop: Task} */
+static PyObject *all_tasks;
typedef enum {
@@ -1445,6 +1458,80 @@ TaskWakeupMethWrapper_new(TaskObj *task)
return (PyObject*) o;
}
+/* ----- Task introspection helpers */
+
+static int
+register_task(PyObject *loop, PyObject *task)
+{
+ return PyObject_SetItem(all_tasks, task, loop);
+}
+
+
+static int
+unregister_task(PyObject *loop, PyObject *task)
+{
+ PyObject *res;
+
+ res = _PyObject_CallMethodIdObjArgs(all_tasks, &PyId_pop,
+ task, Py_None, NULL);
+ if (res == NULL) {
+ return -1;
+ }
+ Py_DECREF(res);
+ return 0;
+}
+
+
+static int
+enter_task(PyObject *loop, PyObject *task)
+{
+ PyObject *item;
+ Py_hash_t hash;
+ hash = PyObject_Hash(loop);
+ if (hash == -1) {
+ return -1;
+ }
+ item = _PyDict_GetItem_KnownHash(current_tasks, loop, hash);
+ if (item != NULL) {
+ PyErr_Format(
+ PyExc_RuntimeError,
+ "Cannot enter into task %R while another " \
+ "task %R is being executed.",
+ task, item, NULL);
+ return -1;
+ }
+ if (_PyDict_SetItem_KnownHash(current_tasks, loop, task, hash) < 0) {
+ return -1;
+ }
+ return 0;
+}
+
+
+static int
+leave_task(PyObject *loop, PyObject *task)
+/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/
+{
+ PyObject *item;
+ Py_hash_t hash;
+ hash = PyObject_Hash(loop);
+ if (hash == -1) {
+ return -1;
+ }
+ item = _PyDict_GetItem_KnownHash(current_tasks, loop, hash);
+ if (item != task) {
+ if (item == NULL) {
+ /* Not entered, replace with None */
+ item = Py_None;
+ }
+ PyErr_Format(
+ PyExc_RuntimeError,
+ "Leaving task %R does not match the current task %R.",
+ task, item, NULL);
+ return -1;
+ }
+ return _PyDict_DelItem_KnownHash(current_tasks, loop, hash);
+}
+
/* ----- Task */
/*[clinic input]
@@ -1463,8 +1550,6 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop)
{
PyObject *res;
int tmp;
- _Py_IDENTIFIER(add);
-
if (future_init((FutureObj*)self, loop)) {
return -1;
}
@@ -1500,14 +1585,7 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop)
if (task_call_step_soon(self, NULL)) {
return -1;
}
-
- res = _PyObject_CallMethodIdObjArgs(all_tasks, &PyId_add, self, NULL);
- if (res == NULL) {
- return -1;
- }
- Py_DECREF(res);
-
- return 0;
+ return register_task(self->task_loop, (PyObject*)self);
}
static int
@@ -1600,76 +1678,36 @@ static PyObject *
_asyncio_Task_current_task_impl(PyTypeObject *type, PyObject *loop)
/*[clinic end generated code: output=99fbe7332c516e03 input=cd14770c5b79c7eb]*/
{
- PyObject *res;
+ PyObject *ret;
+ PyObject *current_task_func;
+
+ if (PyErr_WarnEx(PyExc_PendingDeprecationWarning,
+ "Task.current_task() is deprecated, " \
+ "use asyncio.current_task() instead",
+ 1) < 0) {
+ return NULL;
+ }
+
+ current_task_func = _PyObject_GetAttrId(asyncio_mod, &PyId_current_task);
+ if (current_task_func == NULL) {
+ return NULL;
+ }
if (loop == Py_None) {
loop = get_event_loop();
if (loop == NULL) {
return NULL;
}
-
- res = PyDict_GetItem(current_tasks, loop);
+ ret = PyObject_CallFunctionObjArgs(current_task_func, loop, NULL);
+ Py_DECREF(current_task_func);
Py_DECREF(loop);
+ return ret;
}
else {
- res = PyDict_GetItem(current_tasks, loop);
- }
-
- if (res == NULL) {
- Py_RETURN_NONE;
- }
- else {
- Py_INCREF(res);
- return res;
- }
-}
-
-static PyObject *
-task_all_tasks(PyObject *loop)
-{
- PyObject *task;
- PyObject *task_loop;
- PyObject *set;
- PyObject *iter;
-
- assert(loop != NULL);
-
- set = PySet_New(NULL);
- if (set == NULL) {
- return NULL;
- }
-
- iter = PyObject_GetIter(all_tasks);
- if (iter == NULL) {
- goto fail;
- }
-
- while ((task = PyIter_Next(iter))) {
- task_loop = PyObject_GetAttrString(task, "_loop");
- if (task_loop == NULL) {
- Py_DECREF(task);
- goto fail;
- }
- if (task_loop == loop) {
- if (PySet_Add(set, task) == -1) {
- Py_DECREF(task_loop);
- Py_DECREF(task);
- goto fail;
- }
- }
- Py_DECREF(task_loop);
- Py_DECREF(task);
- }
- if (PyErr_Occurred()) {
- goto fail;
+ ret = PyObject_CallFunctionObjArgs(current_task_func, loop, NULL);
+ Py_DECREF(current_task_func);
+ return ret;
}
- Py_DECREF(iter);
- return set;
-
-fail:
- Py_DECREF(set);
- Py_XDECREF(iter);
- return NULL;
}
/*[clinic input]
@@ -1688,20 +1726,22 @@ _asyncio_Task_all_tasks_impl(PyTypeObject *type, PyObject *loop)
/*[clinic end generated code: output=11f9b20749ccca5d input=497f80bc9ce726b5]*/
{
PyObject *res;
+ PyObject *all_tasks_func;
- if (loop == Py_None) {
- loop = get_event_loop();
- if (loop == NULL) {
- return NULL;
- }
-
- res = task_all_tasks(loop);
- Py_DECREF(loop);
+ all_tasks_func = _PyObject_GetAttrId(asyncio_mod, &PyId_all_tasks);
+ if (all_tasks_func == NULL) {
+ return NULL;
}
- else {
- res = task_all_tasks(loop);
+
+ if (PyErr_WarnEx(PyExc_PendingDeprecationWarning,
+ "Task.all_tasks() is deprecated, " \
+ "use asyncio.all_tasks() instead",
+ 1) < 0) {
+ return NULL;
}
+ res = PyObject_CallFunctionObjArgs(all_tasks_func, loop, NULL);
+ Py_DECREF(all_tasks_func);
return res;
}
@@ -2437,11 +2477,8 @@ static PyObject *
task_step(TaskObj *task, PyObject *exc)
{
PyObject *res;
- PyObject *ot;
- if (PyDict_SetItem(current_tasks,
- task->task_loop, (PyObject*)task) == -1)
- {
+ if (enter_task(task->task_loop, (PyObject*)task) < 0) {
return NULL;
}
@@ -2450,19 +2487,16 @@ task_step(TaskObj *task, PyObject *exc)
if (res == NULL) {
PyObject *et, *ev, *tb;
PyErr_Fetch(&et, &ev, &tb);
- ot = _PyDict_Pop(current_tasks, task->task_loop, NULL);
- Py_XDECREF(ot);
+ leave_task(task->task_loop, (PyObject*)task);
_PyErr_ChainExceptions(et, ev, tb);
return NULL;
}
else {
- ot = _PyDict_Pop(current_tasks, task->task_loop, NULL);
- if (ot == NULL) {
+ if(leave_task(task->task_loop, (PyObject*)task) < 0) {
Py_DECREF(res);
return NULL;
}
else {
- Py_DECREF(ot);
return res;
}
}
@@ -2615,6 +2649,99 @@ _asyncio_get_running_loop_impl(PyObject *module)
return loop;
}
+/*[clinic input]
+_asyncio._register_task
+
+ loop: object
+ task: object
+
+Register a new task in asyncio as executed by loop.
+
+Returns None.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__register_task_impl(PyObject *module, PyObject *loop,
+ PyObject *task)
+/*[clinic end generated code: output=54c5cb733dbe0f38 input=9b5fee38fcb2c288]*/
+{
+ if (register_task(loop, task) < 0) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+
+/*[clinic input]
+_asyncio._unregister_task
+
+ loop: object
+ task: object
+
+Unregister a task.
+
+Returns None.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__unregister_task_impl(PyObject *module, PyObject *loop,
+ PyObject *task)
+/*[clinic end generated code: output=f634743a76b84ebc input=51fa1820634ef331]*/
+{
+ if (unregister_task(loop, task) < 0) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+
+/*[clinic input]
+_asyncio._enter_task
+
+ loop: object
+ task: object
+
+Enter into task execution or resume suspended task.
+
+Task belongs to loop.
+
+Returns None.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__enter_task_impl(PyObject *module, PyObject *loop, PyObject *task)
+/*[clinic end generated code: output=a22611c858035b73 input=de1b06dca70d8737]*/
+{
+ if (enter_task(loop, task) < 0) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+
+/*[clinic input]
+_asyncio._leave_task
+
+ loop: object
+ task: object
+
+Leave task execution or suspend a task.
+
+Task belongs to loop.
+
+Returns None.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task)
+/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/
+{
+ if (leave_task(loop, task) < 0) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
/*********************** Module **************************/
@@ -2622,26 +2749,37 @@ _asyncio_get_running_loop_impl(PyObject *module)
static void
module_free(void *m)
{
- Py_CLEAR(current_tasks);
- Py_CLEAR(all_tasks);
+ Py_CLEAR(asyncio_mod);
+ Py_CLEAR(inspect_isgenerator);
+ Py_CLEAR(os_getpid);
Py_CLEAR(traceback_extract_stack);
- Py_CLEAR(asyncio_get_event_loop_policy);
Py_CLEAR(asyncio_future_repr_info_func);
+ Py_CLEAR(asyncio_get_event_loop_policy);
Py_CLEAR(asyncio_iscoroutine_func);
- Py_CLEAR(asyncio_task_repr_info_func);
Py_CLEAR(asyncio_task_get_stack_func);
Py_CLEAR(asyncio_task_print_stack_func);
+ Py_CLEAR(asyncio_task_repr_info_func);
Py_CLEAR(asyncio_InvalidStateError);
Py_CLEAR(asyncio_CancelledError);
- Py_CLEAR(inspect_isgenerator);
- Py_CLEAR(os_getpid);
+
+ Py_CLEAR(current_tasks);
+ Py_CLEAR(all_tasks);
}
static int
module_init(void)
{
PyObject *module = NULL;
- PyObject *cls;
+
+ asyncio_mod = PyImport_ImportModule("asyncio");
+ if (asyncio_mod == NULL) {
+ goto fail;
+ }
+
+ current_tasks = PyDict_New();
+ if (current_tasks == NULL) {
+ goto fail;
+ }
#define WITH_MOD(NAME) \
Py_CLEAR(module); \
@@ -2681,19 +2819,15 @@ module_init(void)
WITH_MOD("traceback")
GET_MOD_ATTR(traceback_extract_stack, "extract_stack")
+ PyObject *weak_key_dict;
WITH_MOD("weakref")
- GET_MOD_ATTR(cls, "WeakSet")
- all_tasks = _PyObject_CallNoArg(cls);
- Py_DECREF(cls);
+ GET_MOD_ATTR(weak_key_dict, "WeakKeyDictionary");
+ all_tasks = _PyObject_CallNoArg(weak_key_dict);
+ Py_CLEAR(weak_key_dict);
if (all_tasks == NULL) {
goto fail;
}
- current_tasks = PyDict_New();
- if (current_tasks == NULL) {
- goto fail;
- }
-
Py_DECREF(module);
return 0;
@@ -2713,6 +2847,10 @@ static PyMethodDef asyncio_methods[] = {
_ASYNCIO_GET_RUNNING_LOOP_METHODDEF
_ASYNCIO__GET_RUNNING_LOOP_METHODDEF
_ASYNCIO__SET_RUNNING_LOOP_METHODDEF
+ _ASYNCIO__REGISTER_TASK_METHODDEF
+ _ASYNCIO__UNREGISTER_TASK_METHODDEF
+ _ASYNCIO__ENTER_TASK_METHODDEF
+ _ASYNCIO__LEAVE_TASK_METHODDEF
{NULL, NULL}
};
@@ -2768,5 +2906,17 @@ PyInit__asyncio(void)
return NULL;
}
+ Py_INCREF(all_tasks);
+ if (PyModule_AddObject(m, "_all_tasks", all_tasks) < 0) {
+ Py_DECREF(all_tasks);
+ return NULL;
+ }
+
+ Py_INCREF(current_tasks);
+ if (PyModule_AddObject(m, "_current_tasks", current_tasks) < 0) {
+ Py_DECREF(current_tasks);
+ return NULL;
+ }
+
return m;
}