diff options
author | Itamar Ostricher <itamarost@gmail.com> | 2023-05-01 21:10:13 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-01 21:10:13 (GMT) |
commit | a474e04388c2ef6aca75c26cb70a1b6200235feb (patch) | |
tree | 43520d5ad16016620f149dc1e84d4d57e45051d5 /Modules | |
parent | 59bc36aacddd5a3acd32c80c0dfd0726135a7817 (diff) | |
download | cpython-a474e04388c2ef6aca75c26cb70a1b6200235feb.zip cpython-a474e04388c2ef6aca75c26cb70a1b6200235feb.tar.gz cpython-a474e04388c2ef6aca75c26cb70a1b6200235feb.tar.bz2 |
gh-97696: asyncio eager tasks factory (#102853)
Co-authored-by: Jacob Bower <jbower@meta.com>
Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
Diffstat (limited to 'Modules')
-rw-r--r-- | Modules/_asynciomodule.c | 254 | ||||
-rw-r--r-- | Modules/clinic/_asynciomodule.c.h | 203 |
2 files changed, 436 insertions, 21 deletions
diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 82dbc08..8b1a29b 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -8,6 +8,7 @@ #include "pycore_runtime_init.h" // _Py_ID() #include "pycore_moduleobject.h" // _PyModule_GetState() #include "structmember.h" // PyMemberDef +#include "cpython/context.h" #include <stddef.h> // offsetof() @@ -31,8 +32,11 @@ typedef struct { all running event loops. {EventLoop: Task} */ PyObject *current_tasks; - /* WeakSet containing all alive tasks. */ - PyObject *all_tasks; + /* WeakSet containing all tasks scheduled to run on event loops. */ + PyObject *scheduled_tasks; + + /* Set containing all eagerly executing tasks. */ + PyObject *eager_tasks; /* An isinstance type cache for the 'is_coroutine()' function. */ PyObject *iscoroutine_typecache; @@ -156,6 +160,9 @@ class _asyncio.Future "FutureObj *" "&Future_Type" /* Get FutureIter from Future */ static PyObject * future_new_iter(PyObject *); +static PyObject * +task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *result); + static int _is_coroutine(asyncio_state *state, PyObject *coro) @@ -1830,6 +1837,7 @@ class _asyncio.Task "TaskObj *" "&Task_Type" static int task_call_step_soon(asyncio_state *state, TaskObj *, PyObject *); static PyObject * task_wakeup(TaskObj *, PyObject *); static PyObject * task_step(asyncio_state *, TaskObj *, PyObject *); +static int task_eager_start(asyncio_state *state, TaskObj *task); /* ----- Task._step wrapper */ @@ -1940,7 +1948,7 @@ static PyMethodDef TaskWakeupDef = { static int register_task(asyncio_state *state, PyObject *task) { - PyObject *res = PyObject_CallMethodOneArg(state->all_tasks, + PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks, &_Py_ID(add), task); if (res == NULL) { return -1; @@ -1949,11 +1957,16 @@ register_task(asyncio_state *state, PyObject *task) return 0; } +static int +register_eager_task(asyncio_state *state, PyObject *task) +{ + return PySet_Add(state->eager_tasks, task); +} static int unregister_task(asyncio_state *state, PyObject *task) { - PyObject *res = PyObject_CallMethodOneArg(state->all_tasks, + PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks, &_Py_ID(discard), task); if (res == NULL) { return -1; @@ -1962,6 +1975,11 @@ unregister_task(asyncio_state *state, PyObject *task) return 0; } +static int +unregister_eager_task(asyncio_state *state, PyObject *task) +{ + return PySet_Discard(state->eager_tasks, task); +} static int enter_task(asyncio_state *state, PyObject *loop, PyObject *task) @@ -2015,6 +2033,54 @@ leave_task(asyncio_state *state, PyObject *loop, PyObject *task) return _PyDict_DelItem_KnownHash(state->current_tasks, loop, hash); } +static PyObject * +swap_current_task(asyncio_state *state, PyObject *loop, PyObject *task) +{ + PyObject *prev_task; + Py_hash_t hash; + hash = PyObject_Hash(loop); + if (hash == -1) { + return NULL; + } + + prev_task = _PyDict_GetItem_KnownHash(state->current_tasks, loop, hash); + if (prev_task == NULL) { + if (PyErr_Occurred()) { + return NULL; + } + prev_task = Py_None; + } + + if (task == Py_None) { + if (_PyDict_DelItem_KnownHash(state->current_tasks, loop, hash) == -1) { + return NULL; + } + } else { + if (_PyDict_SetItem_KnownHash(state->current_tasks, loop, task, hash) == -1) { + return NULL; + } + } + + Py_INCREF(prev_task); + + return prev_task; +} + +static int +is_loop_running(PyObject *loop) +{ + PyObject *func = PyObject_GetAttr(loop, &_Py_ID(is_running)); + if (func == NULL) { + PyErr_Format(PyExc_TypeError, "Loop missing is_running()"); + return -1; + } + PyObject *res = PyObject_CallNoArgs(func); + int retval = Py_IsTrue(res); + Py_DECREF(func); + Py_DECREF(res); + return !!retval; +} + /* ----- Task */ /*[clinic input] @@ -2025,15 +2091,16 @@ _asyncio.Task.__init__ loop: object = None name: object = None context: object = None + eager_start: bool = False A coroutine wrapped in a Future. [clinic start generated code]*/ static int _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, - PyObject *name, PyObject *context) -/*[clinic end generated code: output=49ac96fe33d0e5c7 input=924522490c8ce825]*/ - + PyObject *name, PyObject *context, + int eager_start) +/*[clinic end generated code: output=7aced2d27836f1a1 input=18e3f113a51b829d]*/ { if (future_init((FutureObj*)self, loop)) { return -1; @@ -2083,6 +2150,19 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, return -1; } + if (eager_start) { + int loop_running = is_loop_running(self->task_loop); + if (loop_running == -1) { + return -1; + } + if (loop_running) { + if (task_eager_start(state, self)) { + return -1; + } + return 0; + } + } + if (task_call_step_soon(state, self, NULL)) { return -1; } @@ -2831,6 +2911,20 @@ task_step_impl(asyncio_state *state, TaskObj *task, PyObject *exc) Py_RETURN_NONE; } + PyObject *ret = task_step_handle_result_impl(state, task, result); + return ret; + +fail: + return NULL; +} + + +static PyObject * +task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *result) +{ + int res; + PyObject *o; + if (result == (PyObject*)task) { /* We have a task that wants to await on itself */ goto self_await; @@ -3062,6 +3156,65 @@ task_step(asyncio_state *state, TaskObj *task, PyObject *exc) } } +static int +task_eager_start(asyncio_state *state, TaskObj *task) +{ + assert(task != NULL); + PyObject *prevtask = swap_current_task(state, task->task_loop, (PyObject *)task); + if (prevtask == NULL) { + return -1; + } + + if (register_eager_task(state, (PyObject *)task) == -1) { + Py_DECREF(prevtask); + return -1; + } + + if (PyContext_Enter(task->task_context) == -1) { + Py_DECREF(prevtask); + return -1; + } + + int retval = 0; + + PyObject *stepres = task_step_impl(state, task, NULL); + if (stepres == NULL) { + PyObject *exc = PyErr_GetRaisedException(); + _PyErr_ChainExceptions1(exc); + retval = -1; + } else { + Py_DECREF(stepres); + } + + PyObject *curtask = swap_current_task(state, task->task_loop, prevtask); + Py_DECREF(prevtask); + if (curtask == NULL) { + retval = -1; + } else { + assert(curtask == (PyObject *)task); + Py_DECREF(curtask); + } + + if (unregister_eager_task(state, (PyObject *)task) == -1) { + retval = -1; + } + + if (PyContext_Exit(task->task_context) == -1) { + retval = -1; + } + + if (task->task_state == STATE_PENDING) { + if (register_task(state, (PyObject *)task) == -1) { + retval = -1; + } + } else { + // This seems to really help performance on pyperformance benchmarks + Py_CLEAR(task->task_coro); + } + + return retval; +} + static PyObject * task_wakeup(TaskObj *task, PyObject *o) { @@ -3225,6 +3378,27 @@ _asyncio__register_task_impl(PyObject *module, PyObject *task) Py_RETURN_NONE; } +/*[clinic input] +_asyncio._register_eager_task + + task: object + +Register a new task in asyncio as executed by loop. + +Returns None. +[clinic start generated code]*/ + +static PyObject * +_asyncio__register_eager_task_impl(PyObject *module, PyObject *task) +/*[clinic end generated code: output=dfe1d45367c73f1a input=237f684683398c51]*/ +{ + asyncio_state *state = get_asyncio_state(module); + if (register_eager_task(state, task) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + /*[clinic input] _asyncio._unregister_task @@ -3247,6 +3421,27 @@ _asyncio__unregister_task_impl(PyObject *module, PyObject *task) Py_RETURN_NONE; } +/*[clinic input] +_asyncio._unregister_eager_task + + task: object + +Unregister a task. + +Returns None. +[clinic start generated code]*/ + +static PyObject * +_asyncio__unregister_eager_task_impl(PyObject *module, PyObject *task) +/*[clinic end generated code: output=a426922bd07f23d1 input=9d07401ef14ee048]*/ +{ + asyncio_state *state = get_asyncio_state(module); + if (unregister_eager_task(state, task) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + /*[clinic input] _asyncio._enter_task @@ -3299,6 +3494,27 @@ _asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task) /*[clinic input] +_asyncio._swap_current_task + + loop: object + task: object + +Temporarily swap in the supplied task and return the original one (or None). + +This is intended for use during eager coroutine execution. + +[clinic start generated code]*/ + +static PyObject * +_asyncio__swap_current_task_impl(PyObject *module, PyObject *loop, + PyObject *task) +/*[clinic end generated code: output=9f88de958df74c7e input=c9c72208d3d38b6c]*/ +{ + return swap_current_task(get_asyncio_state(module), loop, task); +} + + +/*[clinic input] _asyncio.current_task loop: object = None @@ -3379,7 +3595,8 @@ module_traverse(PyObject *mod, visitproc visit, void *arg) Py_VISIT(state->asyncio_InvalidStateError); Py_VISIT(state->asyncio_CancelledError); - Py_VISIT(state->all_tasks); + Py_VISIT(state->scheduled_tasks); + Py_VISIT(state->eager_tasks); Py_VISIT(state->current_tasks); Py_VISIT(state->iscoroutine_typecache); @@ -3416,7 +3633,8 @@ module_clear(PyObject *mod) Py_CLEAR(state->asyncio_InvalidStateError); Py_CLEAR(state->asyncio_CancelledError); - Py_CLEAR(state->all_tasks); + Py_CLEAR(state->scheduled_tasks); + Py_CLEAR(state->eager_tasks); Py_CLEAR(state->current_tasks); Py_CLEAR(state->iscoroutine_typecache); @@ -3496,9 +3714,14 @@ module_init(asyncio_state *state) PyObject *weak_set; WITH_MOD("weakref") GET_MOD_ATTR(weak_set, "WeakSet"); - state->all_tasks = PyObject_CallNoArgs(weak_set); + state->scheduled_tasks = PyObject_CallNoArgs(weak_set); Py_CLEAR(weak_set); - if (state->all_tasks == NULL) { + if (state->scheduled_tasks == NULL) { + goto fail; + } + + state->eager_tasks = PySet_New(NULL); + if (state->eager_tasks == NULL) { goto fail; } @@ -3522,9 +3745,12 @@ static PyMethodDef asyncio_methods[] = { _ASYNCIO__GET_RUNNING_LOOP_METHODDEF _ASYNCIO__SET_RUNNING_LOOP_METHODDEF _ASYNCIO__REGISTER_TASK_METHODDEF + _ASYNCIO__REGISTER_EAGER_TASK_METHODDEF _ASYNCIO__UNREGISTER_TASK_METHODDEF + _ASYNCIO__UNREGISTER_EAGER_TASK_METHODDEF _ASYNCIO__ENTER_TASK_METHODDEF _ASYNCIO__LEAVE_TASK_METHODDEF + _ASYNCIO__SWAP_CURRENT_TASK_METHODDEF {NULL, NULL} }; @@ -3561,7 +3787,11 @@ module_exec(PyObject *mod) return -1; } - if (PyModule_AddObjectRef(mod, "_all_tasks", state->all_tasks) < 0) { + if (PyModule_AddObjectRef(mod, "_scheduled_tasks", state->scheduled_tasks) < 0) { + return -1; + } + + if (PyModule_AddObjectRef(mod, "_eager_tasks", state->eager_tasks) < 0) { return -1; } diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h index 43c5d77..6a780a8 100644 --- a/Modules/clinic/_asynciomodule.c.h +++ b/Modules/clinic/_asynciomodule.c.h @@ -482,14 +482,15 @@ _asyncio_Future__make_cancelled_error(FutureObj *self, PyObject *Py_UNUSED(ignor } PyDoc_STRVAR(_asyncio_Task___init____doc__, -"Task(coro, *, loop=None, name=None, context=None)\n" +"Task(coro, *, loop=None, name=None, context=None, eager_start=False)\n" "--\n" "\n" "A coroutine wrapped in a Future."); static int _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, - PyObject *name, PyObject *context); + PyObject *name, PyObject *context, + int eager_start); static int _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) @@ -497,14 +498,14 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) int return_value = -1; #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE) - #define NUM_KEYWORDS 4 + #define NUM_KEYWORDS 5 static struct { PyGC_Head _this_is_not_used; PyObject_VAR_HEAD PyObject *ob_item[NUM_KEYWORDS]; } _kwtuple = { .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS) - .ob_item = { &_Py_ID(coro), &_Py_ID(loop), &_Py_ID(name), &_Py_ID(context), }, + .ob_item = { &_Py_ID(coro), &_Py_ID(loop), &_Py_ID(name), &_Py_ID(context), &_Py_ID(eager_start), }, }; #undef NUM_KEYWORDS #define KWTUPLE (&_kwtuple.ob_base.ob_base) @@ -513,14 +514,14 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) # define KWTUPLE NULL #endif // !Py_BUILD_CORE - static const char * const _keywords[] = {"coro", "loop", "name", "context", NULL}; + static const char * const _keywords[] = {"coro", "loop", "name", "context", "eager_start", NULL}; static _PyArg_Parser _parser = { .keywords = _keywords, .fname = "Task", .kwtuple = KWTUPLE, }; #undef KWTUPLE - PyObject *argsbuf[4]; + PyObject *argsbuf[5]; PyObject * const *fastargs; Py_ssize_t nargs = PyTuple_GET_SIZE(args); Py_ssize_t noptargs = nargs + (kwargs ? PyDict_GET_SIZE(kwargs) : 0) - 1; @@ -528,6 +529,7 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) PyObject *loop = Py_None; PyObject *name = Py_None; PyObject *context = Py_None; + int eager_start = 0; fastargs = _PyArg_UnpackKeywords(_PyTuple_CAST(args)->ob_item, nargs, kwargs, NULL, &_parser, 1, 1, 0, argsbuf); if (!fastargs) { @@ -549,9 +551,18 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) goto skip_optional_kwonly; } } - context = fastargs[3]; + if (fastargs[3]) { + context = fastargs[3]; + if (!--noptargs) { + goto skip_optional_kwonly; + } + } + eager_start = PyObject_IsTrue(fastargs[4]); + if (eager_start < 0) { + goto exit; + } skip_optional_kwonly: - return_value = _asyncio_Task___init___impl((TaskObj *)self, coro, loop, name, context); + return_value = _asyncio_Task___init___impl((TaskObj *)self, coro, loop, name, context, eager_start); exit: return return_value; @@ -1064,6 +1075,63 @@ exit: return return_value; } +PyDoc_STRVAR(_asyncio__register_eager_task__doc__, +"_register_eager_task($module, /, task)\n" +"--\n" +"\n" +"Register a new task in asyncio as executed by loop.\n" +"\n" +"Returns None."); + +#define _ASYNCIO__REGISTER_EAGER_TASK_METHODDEF \ + {"_register_eager_task", _PyCFunction_CAST(_asyncio__register_eager_task), METH_FASTCALL|METH_KEYWORDS, _asyncio__register_eager_task__doc__}, + +static PyObject * +_asyncio__register_eager_task_impl(PyObject *module, PyObject *task); + +static PyObject * +_asyncio__register_eager_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE) + + #define NUM_KEYWORDS 1 + static struct { + PyGC_Head _this_is_not_used; + PyObject_VAR_HEAD + PyObject *ob_item[NUM_KEYWORDS]; + } _kwtuple = { + .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS) + .ob_item = { &_Py_ID(task), }, + }; + #undef NUM_KEYWORDS + #define KWTUPLE (&_kwtuple.ob_base.ob_base) + + #else // !Py_BUILD_CORE + # define KWTUPLE NULL + #endif // !Py_BUILD_CORE + + static const char * const _keywords[] = {"task", NULL}; + static _PyArg_Parser _parser = { + .keywords = _keywords, + .fname = "_register_eager_task", + .kwtuple = KWTUPLE, + }; + #undef KWTUPLE + PyObject *argsbuf[1]; + PyObject *task; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + task = args[0]; + return_value = _asyncio__register_eager_task_impl(module, task); + +exit: + return return_value; +} + PyDoc_STRVAR(_asyncio__unregister_task__doc__, "_unregister_task($module, /, task)\n" "--\n" @@ -1121,6 +1189,63 @@ exit: return return_value; } +PyDoc_STRVAR(_asyncio__unregister_eager_task__doc__, +"_unregister_eager_task($module, /, task)\n" +"--\n" +"\n" +"Unregister a task.\n" +"\n" +"Returns None."); + +#define _ASYNCIO__UNREGISTER_EAGER_TASK_METHODDEF \ + {"_unregister_eager_task", _PyCFunction_CAST(_asyncio__unregister_eager_task), METH_FASTCALL|METH_KEYWORDS, _asyncio__unregister_eager_task__doc__}, + +static PyObject * +_asyncio__unregister_eager_task_impl(PyObject *module, PyObject *task); + +static PyObject * +_asyncio__unregister_eager_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE) + + #define NUM_KEYWORDS 1 + static struct { + PyGC_Head _this_is_not_used; + PyObject_VAR_HEAD + PyObject *ob_item[NUM_KEYWORDS]; + } _kwtuple = { + .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS) + .ob_item = { &_Py_ID(task), }, + }; + #undef NUM_KEYWORDS + #define KWTUPLE (&_kwtuple.ob_base.ob_base) + + #else // !Py_BUILD_CORE + # define KWTUPLE NULL + #endif // !Py_BUILD_CORE + + static const char * const _keywords[] = {"task", NULL}; + static _PyArg_Parser _parser = { + .keywords = _keywords, + .fname = "_unregister_eager_task", + .kwtuple = KWTUPLE, + }; + #undef KWTUPLE + PyObject *argsbuf[1]; + PyObject *task; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + task = args[0]; + return_value = _asyncio__unregister_eager_task_impl(module, task); + +exit: + return return_value; +} + PyDoc_STRVAR(_asyncio__enter_task__doc__, "_enter_task($module, /, loop, task)\n" "--\n" @@ -1243,6 +1368,66 @@ exit: return return_value; } +PyDoc_STRVAR(_asyncio__swap_current_task__doc__, +"_swap_current_task($module, /, loop, task)\n" +"--\n" +"\n" +"Temporarily swap in the supplied task and return the original one (or None).\n" +"\n" +"This is intended for use during eager coroutine execution."); + +#define _ASYNCIO__SWAP_CURRENT_TASK_METHODDEF \ + {"_swap_current_task", _PyCFunction_CAST(_asyncio__swap_current_task), METH_FASTCALL|METH_KEYWORDS, _asyncio__swap_current_task__doc__}, + +static PyObject * +_asyncio__swap_current_task_impl(PyObject *module, PyObject *loop, + PyObject *task); + +static PyObject * +_asyncio__swap_current_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE) + + #define NUM_KEYWORDS 2 + static struct { + PyGC_Head _this_is_not_used; + PyObject_VAR_HEAD + PyObject *ob_item[NUM_KEYWORDS]; + } _kwtuple = { + .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS) + .ob_item = { &_Py_ID(loop), &_Py_ID(task), }, + }; + #undef NUM_KEYWORDS + #define KWTUPLE (&_kwtuple.ob_base.ob_base) + + #else // !Py_BUILD_CORE + # define KWTUPLE NULL + #endif // !Py_BUILD_CORE + + static const char * const _keywords[] = {"loop", "task", NULL}; + static _PyArg_Parser _parser = { + .keywords = _keywords, + .fname = "_swap_current_task", + .kwtuple = KWTUPLE, + }; + #undef KWTUPLE + PyObject *argsbuf[2]; + PyObject *loop; + PyObject *task; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 2, 2, 0, argsbuf); + if (!args) { + goto exit; + } + loop = args[0]; + task = args[1]; + return_value = _asyncio__swap_current_task_impl(module, loop, task); + +exit: + return return_value; +} + PyDoc_STRVAR(_asyncio_current_task__doc__, "current_task($module, /, loop=None)\n" "--\n" @@ -1302,4 +1487,4 @@ skip_optional_pos: exit: return return_value; } -/*[clinic end generated code: output=00f494214f2fd008 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=6b0e283177b07639 input=a9049054013a1b77]*/ |