diff options
author | Yury Selivanov <yury@magic.io> | 2016-10-28 16:52:37 (GMT) |
---|---|---|
committer | Yury Selivanov <yury@magic.io> | 2016-10-28 16:52:37 (GMT) |
commit | a0c1ba608eb89b4e10155f7652c50a3ac0b709af (patch) | |
tree | 90e811ae976793876c5c04de7d91cbb7f1518fd9 /Modules/_asynciomodule.c | |
parent | bbcb79920b0e220c2e1b0e77db5aca2f3a2a52d4 (diff) | |
download | cpython-a0c1ba608eb89b4e10155f7652c50a3ac0b709af.zip cpython-a0c1ba608eb89b4e10155f7652c50a3ac0b709af.tar.gz cpython-a0c1ba608eb89b4e10155f7652c50a3ac0b709af.tar.bz2 |
Issue #28544: Implement asyncio.Task in C.
This implementation provides additional 10-20% speed boost for
asyncio programs.
The patch also fixes _asynciomodule.c to use Arguments Clinic, and
makes '_schedule_callbacks' an overridable method (as it was in 3.5).
Diffstat (limited to 'Modules/_asynciomodule.c')
-rw-r--r-- | Modules/_asynciomodule.c | 1976 |
1 files changed, 1699 insertions, 277 deletions
diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index a3c96c8..d9419df 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -2,20 +2,35 @@ #include "structmember.h" +/*[clinic input] +module _asyncio +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=8fd17862aa989c69]*/ + + /* identifiers used from some functions */ +_Py_IDENTIFIER(add_done_callback); _Py_IDENTIFIER(call_soon); +_Py_IDENTIFIER(cancel); +_Py_IDENTIFIER(send); +_Py_IDENTIFIER(throw); +_Py_IDENTIFIER(_step); +_Py_IDENTIFIER(_schedule_callbacks); +_Py_IDENTIFIER(_wakeup); /* State of the _asyncio module */ +static PyObject *all_tasks; +static PyDictObject *current_tasks; static PyObject *traceback_extract_stack; static PyObject *asyncio_get_event_loop; -static PyObject *asyncio_repr_info_func; +static PyObject *asyncio_future_repr_info_func; +static PyObject *asyncio_task_repr_info_func; +static PyObject *asyncio_task_get_stack_func; +static PyObject *asyncio_task_print_stack_func; static PyObject *asyncio_InvalidStateError; static PyObject *asyncio_CancelledError; - - -/* Get FutureIter from Future */ -static PyObject* new_future_iter(PyObject *fut); +static PyObject *inspect_isgenerator; typedef enum { @@ -24,24 +39,57 @@ typedef enum { STATE_FINISHED } fut_state; +#define FutureObj_HEAD(prefix) \ + PyObject_HEAD \ + PyObject *prefix##_loop; \ + PyObject *prefix##_callbacks; \ + PyObject *prefix##_exception; \ + PyObject *prefix##_result; \ + PyObject *prefix##_source_tb; \ + fut_state prefix##_state; \ + int prefix##_log_tb; \ + int prefix##_blocking; \ + PyObject *dict; \ + PyObject *prefix##_weakreflist; typedef struct { - PyObject_HEAD - PyObject *fut_loop; - PyObject *fut_callbacks; - PyObject *fut_exception; - PyObject *fut_result; - PyObject *fut_source_tb; - fut_state fut_state; - int fut_log_tb; - int fut_blocking; - PyObject *dict; - PyObject *fut_weakreflist; + FutureObj_HEAD(fut) } FutureObj; +typedef struct { + FutureObj_HEAD(task) + PyObject *task_fut_waiter; + PyObject *task_coro; + int task_must_cancel; + int task_log_destroy_pending; +} TaskObj; + +typedef struct { + PyObject_HEAD + TaskObj *sw_task; + PyObject *sw_arg; +} TaskSendMethWrapper; + +typedef struct { + PyObject_HEAD + TaskObj *ww_task; +} TaskWakeupMethWrapper; + + +#include "clinic/_asynciomodule.c.h" + + +/*[clinic input] +class _asyncio.Future "FutureObj *" "&Future_Type" +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=00d3e4abca711e0f]*/ + +/* Get FutureIter from Future */ +static PyObject* future_new_iter(PyObject *); +static inline int future_call_schedule_callbacks(FutureObj *); static int -_schedule_callbacks(FutureObj *fut) +future_schedule_callbacks(FutureObj *fut) { Py_ssize_t len; PyObject* iters; @@ -87,16 +135,11 @@ _schedule_callbacks(FutureObj *fut) } static int -FutureObj_init(FutureObj *fut, PyObject *args, PyObject *kwds) +future_init(FutureObj *fut, PyObject *loop) { - static char *kwlist[] = {"loop", NULL}; - PyObject *loop = NULL; PyObject *res = NULL; _Py_IDENTIFIER(get_debug); - if (!PyArg_ParseTupleAndKeywords(args, kwds, "|$O", kwlist, &loop)) { - return -1; - } if (loop == NULL || loop == Py_None) { loop = PyObject_CallObject(asyncio_get_event_loop, NULL); if (loop == NULL) { @@ -128,106 +171,12 @@ FutureObj_init(FutureObj *fut, PyObject *args, PyObject *kwds) if (fut->fut_callbacks == NULL) { return -1; } - return 0; -} - -static int -FutureObj_clear(FutureObj *fut) -{ - Py_CLEAR(fut->fut_loop); - Py_CLEAR(fut->fut_callbacks); - Py_CLEAR(fut->fut_result); - Py_CLEAR(fut->fut_exception); - Py_CLEAR(fut->fut_source_tb); - Py_CLEAR(fut->dict); - return 0; -} -static int -FutureObj_traverse(FutureObj *fut, visitproc visit, void *arg) -{ - Py_VISIT(fut->fut_loop); - Py_VISIT(fut->fut_callbacks); - Py_VISIT(fut->fut_result); - Py_VISIT(fut->fut_exception); - Py_VISIT(fut->fut_source_tb); - Py_VISIT(fut->dict); return 0; } -PyDoc_STRVAR(pydoc_result, - "Return the result this future represents.\n" - "\n" - "If the future has been cancelled, raises CancelledError. If the\n" - "future's result isn't yet available, raises InvalidStateError. If\n" - "the future is done and has an exception set, this exception is raised." -); - -static PyObject * -FutureObj_result(FutureObj *fut, PyObject *arg) -{ - if (fut->fut_state == STATE_CANCELLED) { - PyErr_SetString(asyncio_CancelledError, ""); - return NULL; - } - - if (fut->fut_state != STATE_FINISHED) { - PyErr_SetString(asyncio_InvalidStateError, "Result is not ready."); - return NULL; - } - - fut->fut_log_tb = 0; - if (fut->fut_exception != NULL) { - PyObject *type = NULL; - type = PyExceptionInstance_Class(fut->fut_exception); - PyErr_SetObject(type, fut->fut_exception); - return NULL; - } - - Py_INCREF(fut->fut_result); - return fut->fut_result; -} - -PyDoc_STRVAR(pydoc_exception, - "Return the exception that was set on this future.\n" - "\n" - "The exception (or None if no exception was set) is returned only if\n" - "the future is done. If the future has been cancelled, raises\n" - "CancelledError. If the future isn't done yet, raises\n" - "InvalidStateError." -); - -static PyObject * -FutureObj_exception(FutureObj *fut, PyObject *arg) -{ - if (fut->fut_state == STATE_CANCELLED) { - PyErr_SetString(asyncio_CancelledError, ""); - return NULL; - } - - if (fut->fut_state != STATE_FINISHED) { - PyErr_SetString(asyncio_InvalidStateError, "Result is not ready."); - return NULL; - } - - if (fut->fut_exception != NULL) { - fut->fut_log_tb = 0; - Py_INCREF(fut->fut_exception); - return fut->fut_exception; - } - - Py_RETURN_NONE; -} - -PyDoc_STRVAR(pydoc_set_result, - "Mark the future done and set its result.\n" - "\n" - "If the future is already done when this method is called, raises\n" - "InvalidStateError." -); - static PyObject * -FutureObj_set_result(FutureObj *fut, PyObject *res) +future_set_result(FutureObj *fut, PyObject *res) { if (fut->fut_state != STATE_PENDING) { PyErr_SetString(asyncio_InvalidStateError, "invalid state"); @@ -238,21 +187,14 @@ FutureObj_set_result(FutureObj *fut, PyObject *res) fut->fut_result = res; fut->fut_state = STATE_FINISHED; - if (_schedule_callbacks(fut) == -1) { + if (future_call_schedule_callbacks(fut) == -1) { return NULL; } Py_RETURN_NONE; } -PyDoc_STRVAR(pydoc_set_exception, - "Mark the future done and set an exception.\n" - "\n" - "If the future is already done when this method is called, raises\n" - "InvalidStateError." -); - static PyObject * -FutureObj_set_exception(FutureObj *fut, PyObject *exc) +future_set_exception(FutureObj *fut, PyObject *exc) { PyObject *exc_val = NULL; @@ -287,7 +229,7 @@ FutureObj_set_exception(FutureObj *fut, PyObject *exc) fut->fut_exception = exc_val; fut->fut_state = STATE_FINISHED; - if (_schedule_callbacks(fut) == -1) { + if (future_call_schedule_callbacks(fut) == -1) { return NULL; } @@ -295,16 +237,50 @@ FutureObj_set_exception(FutureObj *fut, PyObject *exc) Py_RETURN_NONE; } -PyDoc_STRVAR(pydoc_add_done_callback, - "Add a callback to be run when the future becomes done.\n" - "\n" - "The callback is called with a single argument - the future object. If\n" - "the future is already done when this is called, the callback is\n" - "scheduled with call_soon."; -); +static int +future_get_result(FutureObj *fut, PyObject **result) +{ + PyObject *exc; + + if (fut->fut_state == STATE_CANCELLED) { + exc = _PyObject_CallNoArg(asyncio_CancelledError); + if (exc == NULL) { + return -1; + } + *result = exc; + return 1; + } + + if (fut->fut_state != STATE_FINISHED) { + PyObject *msg = PyUnicode_FromString("Result is not ready."); + if (msg == NULL) { + return -1; + } + + exc = _PyObject_CallArg1(asyncio_InvalidStateError, msg); + Py_DECREF(msg); + if (exc == NULL) { + return -1; + } + + *result = exc; + return 1; + } + + fut->fut_log_tb = 0; + if (fut->fut_exception != NULL) { + Py_INCREF(fut->fut_exception); + *result = fut->fut_exception; + return 1; + } + + Py_INCREF(fut->fut_result); + *result = fut->fut_result; + return 0; +} static PyObject * -FutureObj_add_done_callback(FutureObj *fut, PyObject *arg) +future_add_done_callback(FutureObj *fut, PyObject *arg) { if (fut->fut_state != STATE_PENDING) { PyObject *handle = _PyObject_CallMethodId( @@ -326,19 +302,216 @@ FutureObj_add_done_callback(FutureObj *fut, PyObject *arg) Py_RETURN_NONE; } -PyDoc_STRVAR(pydoc_remove_done_callback, - "Remove all instances of a callback from the \"call when done\" list.\n" - "\n" - "Returns the number of callbacks removed." -); +static PyObject * +future_cancel(FutureObj *fut) +{ + if (fut->fut_state != STATE_PENDING) { + Py_RETURN_FALSE; + } + fut->fut_state = STATE_CANCELLED; + + if (future_call_schedule_callbacks(fut) == -1) { + return NULL; + } + + Py_RETURN_TRUE; +} + +/*[clinic input] +_asyncio.Future.__init__ + + * + loop: 'O' = NULL + +This class is *almost* compatible with concurrent.futures.Future. + + Differences: + + - result() and exception() do not take a timeout argument and + raise an exception when the future isn't done yet. + + - Callbacks registered with add_done_callback() are always called + via the event loop's call_soon_threadsafe(). + + - This class is not compatible with the wait() and as_completed() + methods in the concurrent.futures package. +[clinic start generated code]*/ + +static int +_asyncio_Future___init___impl(FutureObj *self, PyObject *loop) +/*[clinic end generated code: output=9ed75799eaccb5d6 input=8e1681f23605be2d]*/ + +{ + return future_init(self, loop); +} + +static int +FutureObj_clear(FutureObj *fut) +{ + Py_CLEAR(fut->fut_loop); + Py_CLEAR(fut->fut_callbacks); + Py_CLEAR(fut->fut_result); + Py_CLEAR(fut->fut_exception); + Py_CLEAR(fut->fut_source_tb); + Py_CLEAR(fut->dict); + return 0; +} + +static int +FutureObj_traverse(FutureObj *fut, visitproc visit, void *arg) +{ + Py_VISIT(fut->fut_loop); + Py_VISIT(fut->fut_callbacks); + Py_VISIT(fut->fut_result); + Py_VISIT(fut->fut_exception); + Py_VISIT(fut->fut_source_tb); + Py_VISIT(fut->dict); + return 0; +} + +/*[clinic input] +_asyncio.Future.result + +Return the result this future represents. + +If the future has been cancelled, raises CancelledError. If the +future's result isn't yet available, raises InvalidStateError. If +the future is done and has an exception set, this exception is raised. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future_result_impl(FutureObj *self) +/*[clinic end generated code: output=f35f940936a4b1e5 input=49ecf9cf5ec50dc5]*/ +{ + PyObject *result; + int res = future_get_result(self, &result); + + if (res == -1) { + return NULL; + } + + if (res == 0) { + return result; + } + + assert(res == 1); + + PyErr_SetObject(PyExceptionInstance_Class(result), result); + Py_DECREF(result); + return NULL; +} + +/*[clinic input] +_asyncio.Future.exception + +Return the exception that was set on this future. + +The exception (or None if no exception was set) is returned only if +the future is done. If the future has been cancelled, raises +CancelledError. If the future isn't done yet, raises +InvalidStateError. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future_exception_impl(FutureObj *self) +/*[clinic end generated code: output=88b20d4f855e0710 input=733547a70c841c68]*/ +{ + if (self->fut_state == STATE_CANCELLED) { + PyErr_SetString(asyncio_CancelledError, ""); + return NULL; + } + + if (self->fut_state != STATE_FINISHED) { + PyErr_SetString(asyncio_InvalidStateError, "Result is not ready."); + return NULL; + } + + if (self->fut_exception != NULL) { + self->fut_log_tb = 0; + Py_INCREF(self->fut_exception); + return self->fut_exception; + } + + Py_RETURN_NONE; +} + +/*[clinic input] +_asyncio.Future.set_result + + res: 'O' + / + +Mark the future done and set its result. + +If the future is already done when this method is called, raises +InvalidStateError. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future_set_result(FutureObj *self, PyObject *res) +/*[clinic end generated code: output=a620abfc2796bfb6 input=8619565e0503357e]*/ +{ + return future_set_result(self, res); +} + +/*[clinic input] +_asyncio.Future.set_exception + + exception: 'O' + / + +Mark the future done and set an exception. + +If the future is already done when this method is called, raises +InvalidStateError. +[clinic start generated code]*/ static PyObject * -FutureObj_remove_done_callback(FutureObj *fut, PyObject *arg) +_asyncio_Future_set_exception(FutureObj *self, PyObject *exception) +/*[clinic end generated code: output=f1c1b0cd321be360 input=1377dbe15e6ea186]*/ +{ + return future_set_exception(self, exception); +} + +/*[clinic input] +_asyncio.Future.add_done_callback + + fn: 'O' + / + +Add a callback to be run when the future becomes done. + +The callback is called with a single argument - the future object. If +the future is already done when this is called, the callback is +scheduled with call_soon. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future_add_done_callback(FutureObj *self, PyObject *fn) +/*[clinic end generated code: output=819e09629b2ec2b5 input=8cce187e32cec6a8]*/ +{ + return future_add_done_callback(self, fn); +} + +/*[clinic input] +_asyncio.Future.remove_done_callback + + fn: 'O' + / + +Remove all instances of a callback from the "call when done" list. + +Returns the number of callbacks removed. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future_remove_done_callback(FutureObj *self, PyObject *fn) +/*[clinic end generated code: output=5ab1fb52b24ef31f input=3fedb73e1409c31c]*/ { PyObject *newlist; Py_ssize_t len, i, j=0; - len = PyList_GET_SIZE(fut->fut_callbacks); + len = PyList_GET_SIZE(self->fut_callbacks); if (len == 0) { return PyLong_FromSsize_t(0); } @@ -350,9 +523,9 @@ FutureObj_remove_done_callback(FutureObj *fut, PyObject *arg) for (i = 0; i < len; i++) { int ret; - PyObject *item = PyList_GET_ITEM(fut->fut_callbacks, i); + PyObject *item = PyList_GET_ITEM(self->fut_callbacks, i); - if ((ret = PyObject_RichCompareBool(arg, item, Py_EQ)) < 0) { + if ((ret = PyObject_RichCompareBool(fn, item, Py_EQ)) < 0) { goto fail; } if (ret == 0) { @@ -365,7 +538,7 @@ FutureObj_remove_done_callback(FutureObj *fut, PyObject *arg) if (PyList_SetSlice(newlist, j, len, NULL) < 0) { goto fail; } - if (PyList_SetSlice(fut->fut_callbacks, 0, len, newlist) < 0) { + if (PyList_SetSlice(self->fut_callbacks, 0, len, newlist) < 0) { goto fail; } Py_DECREF(newlist); @@ -376,35 +549,34 @@ fail: return NULL; } -PyDoc_STRVAR(pydoc_cancel, - "Cancel the future and schedule callbacks.\n" - "\n" - "If the future is already done or cancelled, return False. Otherwise,\n" - "change the future's state to cancelled, schedule the callbacks and\n" - "return True." -); +/*[clinic input] +_asyncio.Future.cancel -static PyObject * -FutureObj_cancel(FutureObj *fut, PyObject *arg) -{ - if (fut->fut_state != STATE_PENDING) { - Py_RETURN_FALSE; - } - fut->fut_state = STATE_CANCELLED; +Cancel the future and schedule callbacks. - if (_schedule_callbacks(fut) == -1) { - return NULL; - } +If the future is already done or cancelled, return False. Otherwise, +change the future's state to cancelled, schedule the callbacks and +return True. +[clinic start generated code]*/ - Py_RETURN_TRUE; +static PyObject * +_asyncio_Future_cancel_impl(FutureObj *self) +/*[clinic end generated code: output=e45b932ba8bd68a1 input=515709a127995109]*/ +{ + return future_cancel(self); } -PyDoc_STRVAR(pydoc_cancelled, "Return True if the future was cancelled."); +/*[clinic input] +_asyncio.Future.cancelled + +Return True if the future was cancelled. +[clinic start generated code]*/ static PyObject * -FutureObj_cancelled(FutureObj *fut, PyObject *arg) +_asyncio_Future_cancelled_impl(FutureObj *self) +/*[clinic end generated code: output=145197ced586357d input=943ab8b7b7b17e45]*/ { - if (fut->fut_state == STATE_CANCELLED) { + if (self->fut_state == STATE_CANCELLED) { Py_RETURN_TRUE; } else { @@ -412,17 +584,20 @@ FutureObj_cancelled(FutureObj *fut, PyObject *arg) } } -PyDoc_STRVAR(pydoc_done, - "Return True if the future is done.\n" - "\n" - "Done means either that a result / exception are available, or that the\n" - "future was cancelled." -); +/*[clinic input] +_asyncio.Future.done + +Return True if the future is done. + +Done means either that a result / exception are available, or that the +future was cancelled. +[clinic start generated code]*/ static PyObject * -FutureObj_done(FutureObj *fut, PyObject *arg) +_asyncio_Future_done_impl(FutureObj *self) +/*[clinic end generated code: output=244c5ac351145096 input=28d7b23fdb65d2ac]*/ { - if (fut->fut_state == STATE_PENDING) { + if (self->fut_state == STATE_PENDING) { Py_RETURN_FALSE; } else { @@ -538,13 +713,31 @@ FutureObj_get_state(FutureObj *fut) return ret; } -static PyObject* -FutureObj__repr_info(FutureObj *fut) +/*[clinic input] +_asyncio.Future._repr_info +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future__repr_info_impl(FutureObj *self) +/*[clinic end generated code: output=fa69e901bd176cfb input=f21504d8e2ae1ca2]*/ +{ + return PyObject_CallFunctionObjArgs( + asyncio_future_repr_info_func, self, NULL); +} + +/*[clinic input] +_asyncio.Future._schedule_callbacks +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future__schedule_callbacks_impl(FutureObj *self) +/*[clinic end generated code: output=5e8958d89ea1c5dc input=4f5f295f263f4a88]*/ { - if (asyncio_repr_info_func == NULL) { - return PyList_New(0); + int ret = future_schedule_callbacks(self); + if (ret == -1) { + return NULL; } - return PyObject_CallFunctionObjArgs(asyncio_repr_info_func, fut, NULL); + Py_RETURN_NONE; } static PyObject * @@ -661,43 +854,39 @@ finally: static PyAsyncMethods FutureType_as_async = { - (unaryfunc)new_future_iter, /* am_await */ + (unaryfunc)future_new_iter, /* am_await */ 0, /* am_aiter */ 0 /* am_anext */ }; static PyMethodDef FutureType_methods[] = { - {"_repr_info", (PyCFunction)FutureObj__repr_info, METH_NOARGS, NULL}, - {"add_done_callback", - (PyCFunction)FutureObj_add_done_callback, - METH_O, pydoc_add_done_callback}, - {"remove_done_callback", - (PyCFunction)FutureObj_remove_done_callback, - METH_O, pydoc_remove_done_callback}, - {"set_result", - (PyCFunction)FutureObj_set_result, METH_O, pydoc_set_result}, - {"set_exception", - (PyCFunction)FutureObj_set_exception, METH_O, pydoc_set_exception}, - {"cancel", (PyCFunction)FutureObj_cancel, METH_NOARGS, pydoc_cancel}, - {"cancelled", - (PyCFunction)FutureObj_cancelled, METH_NOARGS, pydoc_cancelled}, - {"done", (PyCFunction)FutureObj_done, METH_NOARGS, pydoc_done}, - {"result", (PyCFunction)FutureObj_result, METH_NOARGS, pydoc_result}, - {"exception", - (PyCFunction)FutureObj_exception, METH_NOARGS, pydoc_exception}, + _ASYNCIO_FUTURE_RESULT_METHODDEF + _ASYNCIO_FUTURE_EXCEPTION_METHODDEF + _ASYNCIO_FUTURE_SET_RESULT_METHODDEF + _ASYNCIO_FUTURE_SET_EXCEPTION_METHODDEF + _ASYNCIO_FUTURE_ADD_DONE_CALLBACK_METHODDEF + _ASYNCIO_FUTURE_REMOVE_DONE_CALLBACK_METHODDEF + _ASYNCIO_FUTURE_CANCEL_METHODDEF + _ASYNCIO_FUTURE_CANCELLED_METHODDEF + _ASYNCIO_FUTURE_DONE_METHODDEF + _ASYNCIO_FUTURE__REPR_INFO_METHODDEF + _ASYNCIO_FUTURE__SCHEDULE_CALLBACKS_METHODDEF {NULL, NULL} /* Sentinel */ }; -static PyGetSetDef FutureType_getsetlist[] = { - {"_state", (getter)FutureObj_get_state, NULL, NULL}, - {"_asyncio_future_blocking", (getter)FutureObj_get_blocking, - (setter)FutureObj_set_blocking, NULL}, - {"_loop", (getter)FutureObj_get_loop, NULL, NULL}, - {"_callbacks", (getter)FutureObj_get_callbacks, NULL, NULL}, - {"_result", (getter)FutureObj_get_result, NULL, NULL}, - {"_exception", (getter)FutureObj_get_exception, NULL, NULL}, - {"_log_traceback", (getter)FutureObj_get_log_traceback, NULL, NULL}, +#define FUTURE_COMMON_GETSETLIST \ + {"_state", (getter)FutureObj_get_state, NULL, NULL}, \ + {"_asyncio_future_blocking", (getter)FutureObj_get_blocking, \ + (setter)FutureObj_set_blocking, NULL}, \ + {"_loop", (getter)FutureObj_get_loop, NULL, NULL}, \ + {"_callbacks", (getter)FutureObj_get_callbacks, NULL, NULL}, \ + {"_result", (getter)FutureObj_get_result, NULL, NULL}, \ + {"_exception", (getter)FutureObj_get_exception, NULL, NULL}, \ + {"_log_traceback", (getter)FutureObj_get_log_traceback, NULL, NULL}, \ {"_source_traceback", (getter)FutureObj_get_source_traceback, NULL, NULL}, + +static PyGetSetDef FutureType_getsetlist[] = { + FUTURE_COMMON_GETSETLIST {NULL} /* Sentinel */ }; @@ -712,25 +901,46 @@ static PyTypeObject FutureType = { .tp_repr = (reprfunc)FutureObj_repr, .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_FINALIZE, - .tp_doc = "Fast asyncio.Future implementation.", + .tp_doc = _asyncio_Future___init____doc__, .tp_traverse = (traverseproc)FutureObj_traverse, .tp_clear = (inquiry)FutureObj_clear, .tp_weaklistoffset = offsetof(FutureObj, fut_weakreflist), - .tp_iter = (getiterfunc)new_future_iter, + .tp_iter = (getiterfunc)future_new_iter, .tp_methods = FutureType_methods, .tp_getset = FutureType_getsetlist, .tp_dictoffset = offsetof(FutureObj, dict), - .tp_init = (initproc)FutureObj_init, + .tp_init = (initproc)_asyncio_Future___init__, .tp_new = PyType_GenericNew, .tp_finalize = (destructor)FutureObj_finalize, }; +#define Future_CheckExact(obj) (Py_TYPE(obj) == &FutureType) + +static inline int +future_call_schedule_callbacks(FutureObj *fut) +{ + if (Future_CheckExact(fut)) { + return future_schedule_callbacks(fut); + } + else { + /* `fut` is a subclass of Future */ + PyObject *ret = _PyObject_CallMethodId( + (PyObject*)fut, &PyId__schedule_callbacks, NULL); + if (ret == NULL) { + return -1; + } + + Py_DECREF(ret); + return 0; + } +} + static void FutureObj_dealloc(PyObject *self) { FutureObj *fut = (FutureObj *)self; - if (Py_TYPE(fut) == &FutureType) { + if (Future_CheckExact(fut)) { /* When fut is subclass of Future, finalizer is called from * subtype_dealloc. */ @@ -744,7 +954,7 @@ FutureObj_dealloc(PyObject *self) PyObject_ClearWeakRefs(self); } - FutureObj_clear(fut); + (void)FutureObj_clear(fut); Py_TYPE(fut)->tp_free(fut); } @@ -759,7 +969,7 @@ typedef struct { static void FutureIter_dealloc(futureiterobject *it) { - _PyObject_GC_UNTRACK(it); + PyObject_GC_UnTrack(it); Py_XDECREF(it->future); PyObject_GC_Del(it); } @@ -785,7 +995,7 @@ FutureIter_iternext(futureiterobject *it) return NULL; } - res = FutureObj_result(fut, NULL); + res = _asyncio_Future_result_impl(fut); if (res != NULL) { /* The result of the Future is not an exception. @@ -884,37 +1094,19 @@ static PyMethodDef FutureIter_methods[] = { static PyTypeObject FutureIterType = { PyVarObject_HEAD_INIT(0, 0) "_asyncio.FutureIter", - sizeof(futureiterobject), /* tp_basicsize */ - 0, /* tp_itemsize */ - (destructor)FutureIter_dealloc, /* tp_dealloc */ - 0, /* tp_print */ - 0, /* tp_getattr */ - 0, /* tp_setattr */ - 0, /* tp_as_async */ - 0, /* tp_repr */ - 0, /* tp_as_number */ - 0, /* tp_as_sequence */ - 0, /* tp_as_mapping */ - 0, /* tp_hash */ - 0, /* tp_call */ - 0, /* tp_str */ - PyObject_GenericGetAttr, /* tp_getattro */ - 0, /* tp_setattro */ - 0, /* tp_as_buffer */ - Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /* tp_flags */ - 0, /* tp_doc */ - (traverseproc)FutureIter_traverse, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - PyObject_SelfIter, /* tp_iter */ - (iternextfunc)FutureIter_iternext, /* tp_iternext */ - FutureIter_methods, /* tp_methods */ - 0, /* tp_members */ + .tp_basicsize = sizeof(futureiterobject), + .tp_itemsize = 0, + .tp_dealloc = (destructor)FutureIter_dealloc, + .tp_getattro = PyObject_GenericGetAttr, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, + .tp_traverse = (traverseproc)FutureIter_traverse, + .tp_iter = PyObject_SelfIter, + .tp_iternext = (iternextfunc)FutureIter_iternext, + .tp_methods = FutureIter_methods, }; static PyObject * -new_future_iter(PyObject *fut) +future_new_iter(PyObject *fut) { futureiterobject *it; @@ -932,68 +1124,1283 @@ new_future_iter(PyObject *fut) return (PyObject*)it; } -/*********************** Module **************************/ + +/*********************** Task **************************/ + + +/*[clinic input] +class _asyncio.Task "TaskObj *" "&Task_Type" +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=719dcef0fcc03b37]*/ + +static int task_call_step_soon(TaskObj *, PyObject *); +static inline PyObject * task_call_wakeup(TaskObj *, PyObject *); +static inline PyObject * task_call_step(TaskObj *, PyObject *); +static PyObject * task_wakeup(TaskObj *, PyObject *); +static PyObject * task_step(TaskObj *, PyObject *); + +/* ----- Task._step wrapper */ static int -init_module(void) +TaskSendMethWrapper_clear(TaskSendMethWrapper *o) { - PyObject *module = NULL; + Py_CLEAR(o->sw_task); + Py_CLEAR(o->sw_arg); + return 0; +} + +static void +TaskSendMethWrapper_dealloc(TaskSendMethWrapper *o) +{ + PyObject_GC_UnTrack(o); + (void)TaskSendMethWrapper_clear(o); + Py_TYPE(o)->tp_free(o); +} + +static PyObject * +TaskSendMethWrapper_call(TaskSendMethWrapper *o, + PyObject *args, PyObject *kwds) +{ + return task_call_step(o->sw_task, o->sw_arg); +} + +static int +TaskSendMethWrapper_traverse(TaskSendMethWrapper *o, + visitproc visit, void *arg) +{ + Py_VISIT(o->sw_task); + Py_VISIT(o->sw_arg); + return 0; +} + +static PyObject * +TaskSendMethWrapper_get___self__(TaskSendMethWrapper *o) +{ + if (o->sw_task) { + Py_INCREF(o->sw_task); + return (PyObject*)o->sw_task; + } + Py_RETURN_NONE; +} + +static PyGetSetDef TaskSendMethWrapper_getsetlist[] = { + {"__self__", (getter)TaskSendMethWrapper_get___self__, NULL, NULL}, + {NULL} /* Sentinel */ +}; + +PyTypeObject TaskSendMethWrapper_Type = { + PyVarObject_HEAD_INIT(&PyType_Type, 0) + "TaskSendMethWrapper", + .tp_basicsize = sizeof(TaskSendMethWrapper), + .tp_itemsize = 0, + .tp_getset = TaskSendMethWrapper_getsetlist, + .tp_dealloc = (destructor)TaskSendMethWrapper_dealloc, + .tp_call = (ternaryfunc)TaskSendMethWrapper_call, + .tp_getattro = PyObject_GenericGetAttr, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, + .tp_traverse = (traverseproc)TaskSendMethWrapper_traverse, + .tp_clear = (inquiry)TaskSendMethWrapper_clear, +}; + +static PyObject * +TaskSendMethWrapper_new(TaskObj *task, PyObject *arg) +{ + TaskSendMethWrapper *o; + o = PyObject_GC_New(TaskSendMethWrapper, &TaskSendMethWrapper_Type); + if (o == NULL) { + return NULL; + } - module = PyImport_ImportModule("traceback"); - if (module == NULL) { + Py_INCREF(task); + o->sw_task = task; + + Py_XINCREF(arg); + o->sw_arg = arg; + + PyObject_GC_Track(o); + return (PyObject*) o; +} + +/* ----- Task._wakeup wrapper */ + +static PyObject * +TaskWakeupMethWrapper_call(TaskWakeupMethWrapper *o, + PyObject *args, PyObject *kwds) +{ + PyObject *fut; + + if (!PyArg_ParseTuple(args, "O|", &fut)) { + return NULL; + } + + return task_call_wakeup(o->ww_task, fut); +} + +static int +TaskWakeupMethWrapper_clear(TaskWakeupMethWrapper *o) +{ + Py_CLEAR(o->ww_task); + return 0; +} + +static int +TaskWakeupMethWrapper_traverse(TaskWakeupMethWrapper *o, + visitproc visit, void *arg) +{ + Py_VISIT(o->ww_task); + return 0; +} + +static void +TaskWakeupMethWrapper_dealloc(TaskWakeupMethWrapper *o) +{ + PyObject_GC_UnTrack(o); + (void)TaskWakeupMethWrapper_clear(o); + Py_TYPE(o)->tp_free(o); +} + +PyTypeObject TaskWakeupMethWrapper_Type = { + PyVarObject_HEAD_INIT(&PyType_Type, 0) + "TaskWakeupMethWrapper", + .tp_basicsize = sizeof(TaskWakeupMethWrapper), + .tp_itemsize = 0, + .tp_dealloc = (destructor)TaskWakeupMethWrapper_dealloc, + .tp_call = (ternaryfunc)TaskWakeupMethWrapper_call, + .tp_getattro = PyObject_GenericGetAttr, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, + .tp_traverse = (traverseproc)TaskWakeupMethWrapper_traverse, + .tp_clear = (inquiry)TaskWakeupMethWrapper_clear, +}; + +static PyObject * +TaskWakeupMethWrapper_new(TaskObj *task) +{ + TaskWakeupMethWrapper *o; + o = PyObject_GC_New(TaskWakeupMethWrapper, &TaskWakeupMethWrapper_Type); + if (o == NULL) { + return NULL; + } + + Py_INCREF(task); + o->ww_task = task; + + PyObject_GC_Track(o); + return (PyObject*) o; +} + +/* ----- Task */ + +/*[clinic input] +_asyncio.Task.__init__ + + coro: 'O' + * + loop: 'O' = NULL + +A coroutine wrapped in a Future. +[clinic start generated code]*/ + +static int +_asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop) +/*[clinic end generated code: output=9f24774c2287fc2f input=71d8d28c201a18cd]*/ +{ + PyObject *res; + _Py_IDENTIFIER(add); + + if (future_init((FutureObj*)self, loop)) { return -1; } - // new reference - traceback_extract_stack = PyObject_GetAttrString(module, "extract_stack"); - if (traceback_extract_stack == NULL) { - goto fail; + + self->task_fut_waiter = NULL; + self->task_must_cancel = 0; + self->task_log_destroy_pending = 1; + + Py_INCREF(coro); + self->task_coro = coro; + + if (task_call_step_soon(self, NULL)) { + return -1; + } + + res = _PyObject_CallMethodId(all_tasks, &PyId_add, "O", self, NULL); + if (res == NULL) { + return -1; + } + Py_DECREF(res); + + return 0; +} + +static int +TaskObj_clear(TaskObj *task) +{ + (void)FutureObj_clear((FutureObj*) task); + Py_CLEAR(task->task_coro); + Py_CLEAR(task->task_fut_waiter); + return 0; +} + +static int +TaskObj_traverse(TaskObj *task, visitproc visit, void *arg) +{ + Py_VISIT(task->task_coro); + Py_VISIT(task->task_fut_waiter); + (void)FutureObj_traverse((FutureObj*) task, visit, arg); + return 0; +} + +static PyObject * +TaskObj_get_log_destroy_pending(TaskObj *task) +{ + if (task->task_log_destroy_pending) { + Py_RETURN_TRUE; + } + else { + Py_RETURN_FALSE; + } +} + +static int +TaskObj_set_log_destroy_pending(TaskObj *task, PyObject *val) +{ + int is_true = PyObject_IsTrue(val); + if (is_true < 0) { + return -1; + } + task->task_log_destroy_pending = is_true; + return 0; +} + +static PyObject * +TaskObj_get_must_cancel(TaskObj *task) +{ + if (task->task_must_cancel) { + Py_RETURN_TRUE; + } + else { + Py_RETURN_FALSE; + } +} + +static PyObject * +TaskObj_get_coro(TaskObj *task) +{ + if (task->task_coro) { + Py_INCREF(task->task_coro); + return task->task_coro; + } + + Py_RETURN_NONE; +} + +static PyObject * +TaskObj_get_fut_waiter(TaskObj *task) +{ + if (task->task_fut_waiter) { + Py_INCREF(task->task_fut_waiter); + return task->task_fut_waiter; + } + + Py_RETURN_NONE; +} + +/*[clinic input] +@classmethod +_asyncio.Task.current_task + + loop: 'O' = NULL + +Return the currently running task in an event loop or None. + +By default the current task for the current event loop is returned. + +None is returned when called not in the context of a Task. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task_current_task_impl(PyTypeObject *type, PyObject *loop) +/*[clinic end generated code: output=99fbe7332c516e03 input=cd784537f02cf833]*/ +{ + PyObject *res; + + if (loop == NULL) { + loop = PyObject_CallObject(asyncio_get_event_loop, NULL); + if (loop == NULL) { + return NULL; + } + + res = PyDict_GetItem((PyObject*)current_tasks, loop); + Py_DECREF(loop); + } + else { + res = PyDict_GetItem((PyObject*)current_tasks, loop); } - Py_DECREF(module); - module = PyImport_ImportModule("asyncio.events"); - if (module == NULL) { + if (res == NULL) { + Py_RETURN_NONE; + } + else { + Py_INCREF(res); + return res; + } +} + +static PyObject * +task_all_tasks(PyObject *loop) +{ + PyObject *task; + PyObject *task_loop; + PyObject *set; + PyObject *iter; + + assert(loop != NULL); + + set = PySet_New(NULL); + if (set == NULL) { + return NULL; + } + + iter = PyObject_GetIter(all_tasks); + if (iter == NULL) { goto fail; } - asyncio_get_event_loop = PyObject_GetAttrString(module, "get_event_loop"); - if (asyncio_get_event_loop == NULL) { + + while ((task = PyIter_Next(iter))) { + task_loop = PyObject_GetAttrString(task, "_loop"); + if (task_loop == NULL) { + Py_DECREF(task); + goto fail; + } + if (task_loop == loop) { + if (PySet_Add(set, task) == -1) { + Py_DECREF(task_loop); + Py_DECREF(task); + goto fail; + } + } + Py_DECREF(task_loop); + Py_DECREF(task); + } + + Py_DECREF(iter); + return set; + +fail: + Py_XDECREF(set); + Py_XDECREF(iter); + return NULL; +} + +/*[clinic input] +@classmethod +_asyncio.Task.all_tasks + + loop: 'O' = NULL + +Return a set of all tasks for an event loop. + +By default all tasks for the current event loop are returned. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task_all_tasks_impl(PyTypeObject *type, PyObject *loop) +/*[clinic end generated code: output=11f9b20749ccca5d input=cd64aa5f88bd5c49]*/ +{ + PyObject *res; + + if (loop == NULL) { + loop = PyObject_CallObject(asyncio_get_event_loop, NULL); + if (loop == NULL) { + return NULL; + } + + res = task_all_tasks(loop); + Py_DECREF(loop); + } + else { + res = task_all_tasks(loop); + } + + return res; +} + +/*[clinic input] +_asyncio.Task._repr_info +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task__repr_info_impl(TaskObj *self) +/*[clinic end generated code: output=6a490eb66d5ba34b input=3c6d051ed3ddec8b]*/ +{ + return PyObject_CallFunctionObjArgs( + asyncio_task_repr_info_func, self, NULL); +} + +/*[clinic input] +_asyncio.Task.cancel + +Request that this task cancel itself. + +This arranges for a CancelledError to be thrown into the +wrapped coroutine on the next cycle through the event loop. +The coroutine then has a chance to clean up or even deny +the request using try/except/finally. + +Unlike Future.cancel, this does not guarantee that the +task will be cancelled: the exception might be caught and +acted upon, delaying cancellation of the task or preventing +cancellation completely. The task may also return a value or +raise a different exception. + +Immediately after this method is called, Task.cancelled() will +not return True (unless the task was already cancelled). A +task will be marked as cancelled when the wrapped coroutine +terminates with a CancelledError exception (even if cancel() +was not called). +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task_cancel_impl(TaskObj *self) +/*[clinic end generated code: output=6bfc0479da9d5757 input=13f9bf496695cb52]*/ +{ + if (self->task_state != STATE_PENDING) { + Py_RETURN_FALSE; + } + + if (self->task_fut_waiter) { + PyObject *res; + int is_true; + + res = _PyObject_CallMethodId( + self->task_fut_waiter, &PyId_cancel, NULL); + if (res == NULL) { + return NULL; + } + + is_true = PyObject_IsTrue(res); + Py_DECREF(res); + if (is_true < 0) { + return NULL; + } + + if (is_true) { + Py_RETURN_TRUE; + } + } + + self->task_must_cancel = 1; + Py_RETURN_TRUE; +} + +/*[clinic input] +_asyncio.Task.get_stack + + * + limit: 'O' = None + +Return the list of stack frames for this task's coroutine. + +If the coroutine is not done, this returns the stack where it is +suspended. If the coroutine has completed successfully or was +cancelled, this returns an empty list. If the coroutine was +terminated by an exception, this returns the list of traceback +frames. + +The frames are always ordered from oldest to newest. + +The optional limit gives the maximum number of frames to +return; by default all available frames are returned. Its +meaning differs depending on whether a stack or a traceback is +returned: the newest frames of a stack are returned, but the +oldest frames of a traceback are returned. (This matches the +behavior of the traceback module.) + +For reasons beyond our control, only one stack frame is +returned for a suspended coroutine. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task_get_stack_impl(TaskObj *self, PyObject *limit) +/*[clinic end generated code: output=c9aeeeebd1e18118 input=b1920230a766d17a]*/ +{ + return PyObject_CallFunctionObjArgs( + asyncio_task_get_stack_func, self, limit, NULL); +} + +/*[clinic input] +_asyncio.Task.print_stack + + * + limit: 'O' = None + file: 'O' = None + +Print the stack or traceback for this task's coroutine. + +This produces output similar to that of the traceback module, +for the frames retrieved by get_stack(). The limit argument +is passed to get_stack(). The file argument is an I/O stream +to which the output is written; by default output is written +to sys.stderr. +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task_print_stack_impl(TaskObj *self, PyObject *limit, + PyObject *file) +/*[clinic end generated code: output=7339e10314cd3f4d input=19f1e99ab5400bc3]*/ +{ + return PyObject_CallFunctionObjArgs( + asyncio_task_print_stack_func, self, limit, file, NULL); +} + +/*[clinic input] +_asyncio.Task._step + + exc: 'O' = NULL +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task__step_impl(TaskObj *self, PyObject *exc) +/*[clinic end generated code: output=7ed23f0cefd5ae42 input=ada4b2324e5370af]*/ +{ + return task_step(self, exc == Py_None ? NULL : exc); +} + +/*[clinic input] +_asyncio.Task._wakeup + + fut: 'O' +[clinic start generated code]*/ + +static PyObject * +_asyncio_Task__wakeup_impl(TaskObj *self, PyObject *fut) +/*[clinic end generated code: output=75cb341c760fd071 input=11ee4918a5bdbf21]*/ +{ + return task_wakeup(self, fut); +} + +static void +TaskObj_finalize(TaskObj *task) +{ + _Py_IDENTIFIER(call_exception_handler); + _Py_IDENTIFIER(task); + _Py_IDENTIFIER(message); + _Py_IDENTIFIER(source_traceback); + + PyObject *message = NULL; + PyObject *context = NULL; + PyObject *func = NULL; + PyObject *res = NULL; + + PyObject *error_type, *error_value, *error_traceback; + + if (task->task_state != STATE_PENDING || !task->task_log_destroy_pending) { + goto done; + } + + /* Save the current exception, if any. */ + PyErr_Fetch(&error_type, &error_value, &error_traceback); + + context = PyDict_New(); + if (context == NULL) { + goto finally; + } + + message = PyUnicode_FromString("Task was destroyed but it is pending!"); + if (message == NULL) { + goto finally; + } + + if (_PyDict_SetItemId(context, &PyId_message, message) < 0 || + _PyDict_SetItemId(context, &PyId_task, (PyObject*)task) < 0) + { + goto finally; + } + + if (task->task_source_tb != NULL) { + if (_PyDict_SetItemId(context, &PyId_source_traceback, + task->task_source_tb) < 0) + { + goto finally; + } + } + + func = _PyObject_GetAttrId(task->task_loop, &PyId_call_exception_handler); + if (func != NULL) { + res = _PyObject_CallArg1(func, context); + if (res == NULL) { + PyErr_WriteUnraisable(func); + } + } + +finally: + Py_CLEAR(context); + Py_CLEAR(message); + Py_CLEAR(func); + Py_CLEAR(res); + + /* Restore the saved exception. */ + PyErr_Restore(error_type, error_value, error_traceback); + +done: + FutureObj_finalize((FutureObj*)task); +} + +static void TaskObj_dealloc(PyObject *); /* Needs Task_CheckExact */ + +static PyMethodDef TaskType_methods[] = { + _ASYNCIO_FUTURE_RESULT_METHODDEF + _ASYNCIO_FUTURE_EXCEPTION_METHODDEF + _ASYNCIO_FUTURE_SET_RESULT_METHODDEF + _ASYNCIO_FUTURE_SET_EXCEPTION_METHODDEF + _ASYNCIO_FUTURE_ADD_DONE_CALLBACK_METHODDEF + _ASYNCIO_FUTURE_REMOVE_DONE_CALLBACK_METHODDEF + _ASYNCIO_FUTURE_CANCELLED_METHODDEF + _ASYNCIO_FUTURE_DONE_METHODDEF + _ASYNCIO_TASK_CURRENT_TASK_METHODDEF + _ASYNCIO_TASK_ALL_TASKS_METHODDEF + _ASYNCIO_TASK_CANCEL_METHODDEF + _ASYNCIO_TASK_GET_STACK_METHODDEF + _ASYNCIO_TASK_PRINT_STACK_METHODDEF + _ASYNCIO_TASK__WAKEUP_METHODDEF + _ASYNCIO_TASK__STEP_METHODDEF + _ASYNCIO_TASK__REPR_INFO_METHODDEF + {NULL, NULL} /* Sentinel */ +}; + +static PyGetSetDef TaskType_getsetlist[] = { + FUTURE_COMMON_GETSETLIST + {"_log_destroy_pending", (getter)TaskObj_get_log_destroy_pending, + (setter)TaskObj_set_log_destroy_pending, NULL}, + {"_must_cancel", (getter)TaskObj_get_must_cancel, NULL, NULL}, + {"_coro", (getter)TaskObj_get_coro, NULL, NULL}, + {"_fut_waiter", (getter)TaskObj_get_fut_waiter, NULL, NULL}, + {NULL} /* Sentinel */ +}; + +static PyTypeObject TaskType = { + PyVarObject_HEAD_INIT(0, 0) + "_asyncio.Task", + sizeof(TaskObj), /* tp_basicsize */ + .tp_base = &FutureType, + .tp_dealloc = TaskObj_dealloc, + .tp_as_async = &FutureType_as_async, + .tp_repr = (reprfunc)FutureObj_repr, + .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE + | Py_TPFLAGS_HAVE_FINALIZE, + .tp_doc = _asyncio_Task___init____doc__, + .tp_traverse = (traverseproc)TaskObj_traverse, + .tp_clear = (inquiry)TaskObj_clear, + .tp_weaklistoffset = offsetof(TaskObj, task_weakreflist), + .tp_iter = (getiterfunc)future_new_iter, + .tp_methods = TaskType_methods, + .tp_getset = TaskType_getsetlist, + .tp_dictoffset = offsetof(TaskObj, dict), + .tp_init = (initproc)_asyncio_Task___init__, + .tp_new = PyType_GenericNew, + .tp_finalize = (destructor)TaskObj_finalize, +}; + +#define Task_CheckExact(obj) (Py_TYPE(obj) == &TaskType) + +static void +TaskObj_dealloc(PyObject *self) +{ + TaskObj *task = (TaskObj *)self; + + if (Task_CheckExact(self)) { + /* When fut is subclass of Task, finalizer is called from + * subtype_dealloc. + */ + if (PyObject_CallFinalizerFromDealloc(self) < 0) { + // resurrected. + return; + } + } + + if (task->task_weakreflist != NULL) { + PyObject_ClearWeakRefs(self); + } + + (void)TaskObj_clear(task); + Py_TYPE(task)->tp_free(task); +} + +static inline PyObject * +task_call_wakeup(TaskObj *task, PyObject *fut) +{ + if (Task_CheckExact(task)) { + return task_wakeup(task, fut); + } + else { + /* `task` is a subclass of Task */ + return _PyObject_CallMethodId( + (PyObject*)task, &PyId__wakeup, "O", fut, NULL); + } +} + +static inline PyObject * +task_call_step(TaskObj *task, PyObject *arg) +{ + if (Task_CheckExact(task)) { + return task_step(task, arg); + } + else { + /* `task` is a subclass of Task */ + if (arg == NULL) { + arg = Py_None; + } + return _PyObject_CallMethodId( + (PyObject*)task, &PyId__step, "O", arg, NULL); + } +} + +static int +task_call_step_soon(TaskObj *task, PyObject *arg) +{ + PyObject *handle; + + PyObject *cb = TaskSendMethWrapper_new(task, arg); + if (cb == NULL) { + return -1; + } + + handle = _PyObject_CallMethodId( + task->task_loop, &PyId_call_soon, "O", cb, NULL); + Py_DECREF(cb); + if (handle == NULL) { + return -1; + } + + Py_DECREF(handle); + return 0; +} + +static PyObject * +task_set_error_soon(TaskObj *task, PyObject *et, const char *format, ...) +{ + PyObject* msg; + + va_list vargs; +#ifdef HAVE_STDARG_PROTOTYPES + va_start(vargs, format); +#else + va_start(vargs); +#endif + msg = PyUnicode_FromFormatV(format, vargs); + va_end(vargs); + + if (msg == NULL) { + return NULL; + } + + PyObject *e = PyObject_CallFunctionObjArgs(et, msg, NULL); + Py_DECREF(msg); + if (e == NULL) { + return NULL; + } + + if (task_call_step_soon(task, e) == -1) { + Py_DECREF(e); + return NULL; + } + + Py_DECREF(e); + Py_RETURN_NONE; +} + +static PyObject * +task_step_impl(TaskObj *task, PyObject *exc) +{ + int res; + int clear_exc = 0; + PyObject *result = NULL; + PyObject *coro = task->task_coro; + PyObject *o; + + if (task->task_state != STATE_PENDING) { + PyErr_Format(PyExc_AssertionError, + "_step(): already done: %R %R", + task, + exc ? exc : Py_None); goto fail; } - Py_DECREF(module); - module = PyImport_ImportModule("asyncio.futures"); - if (module == NULL) { + if (task->task_must_cancel) { + assert(exc != Py_None); + + if (exc) { + /* Check if exc is a CancelledError */ + res = PyObject_IsInstance(exc, asyncio_CancelledError); + if (res == -1) { + /* An error occurred, abort */ + goto fail; + } + if (res == 0) { + /* exc is not CancelledError; reset it to NULL */ + exc = NULL; + } + } + + if (!exc) { + /* exc was not a CancelledError */ + exc = PyObject_CallFunctionObjArgs(asyncio_CancelledError, NULL); + if (!exc) { + goto fail; + } + clear_exc = 1; + } + + task->task_must_cancel = 0; + } + + Py_CLEAR(task->task_fut_waiter); + + if (exc == NULL) { + if (PyGen_CheckExact(coro) || PyCoro_CheckExact(coro)) { + result = _PyGen_Send((PyGenObject*)coro, Py_None); + } + else { + result = _PyObject_CallMethodIdObjArgs( + coro, &PyId_send, Py_None, NULL); + } + } + else { + result = _PyObject_CallMethodIdObjArgs( + coro, &PyId_throw, exc, NULL); + if (clear_exc) { + /* We created 'exc' during this call */ + Py_CLEAR(exc); + } + } + + if (result == NULL) { + PyObject *et, *ev, *tb; + + if (_PyGen_FetchStopIterationValue(&o) == 0) { + /* The error is StopIteration and that means that + the underlying coroutine has resolved */ + PyObject *res = future_set_result((FutureObj*)task, o); + Py_DECREF(o); + if (res == NULL) { + return NULL; + } + Py_DECREF(res); + Py_RETURN_NONE; + } + + if (PyErr_ExceptionMatches(asyncio_CancelledError)) { + /* CancelledError */ + PyErr_Clear(); + return future_cancel((FutureObj*)task); + } + + /* Some other exception; pop it and call Task.set_exception() */ + PyErr_Fetch(&et, &ev, &tb); + assert(et); + if (!ev || !PyObject_TypeCheck(ev, (PyTypeObject *) et)) { + PyErr_NormalizeException(&et, &ev, &tb); + } + o = future_set_exception((FutureObj*)task, ev); + if (!o) { + /* An exception in Task.set_exception() */ + Py_XDECREF(et); + Py_XDECREF(tb); + Py_XDECREF(ev); + goto fail; + } + assert(o == Py_None); + Py_CLEAR(o); + + if (!PyErr_GivenExceptionMatches(et, PyExc_Exception)) { + /* We've got a BaseException; re-raise it */ + PyErr_Restore(et, ev, tb); + goto fail; + } + + Py_XDECREF(et); + Py_XDECREF(tb); + Py_XDECREF(ev); + + Py_RETURN_NONE; + } + + if (result == (PyObject*)task) { + /* We have a task that wants to await on itself */ + goto self_await; + } + + /* Check if `result` is FutureObj or TaskObj (and not a subclass) */ + if (Future_CheckExact(result) || Task_CheckExact(result)) { + PyObject *wrapper; + PyObject *res; + FutureObj *fut = (FutureObj*)result; + + /* Check if `result` future is attached to a different loop */ + if (fut->fut_loop != task->task_loop) { + goto different_loop; + } + + if (fut->fut_blocking) { + fut->fut_blocking = 0; + + /* result.add_done_callback(task._wakeup) */ + wrapper = TaskWakeupMethWrapper_new(task); + if (wrapper == NULL) { + goto fail; + } + res = future_add_done_callback((FutureObj*)result, wrapper); + Py_DECREF(wrapper); + if (res == NULL) { + goto fail; + } + Py_DECREF(res); + + /* task._fut_waiter = result */ + task->task_fut_waiter = result; /* no incref is necessary */ + + if (task->task_must_cancel) { + PyObject *r; + r = future_cancel(fut); + if (r == NULL) { + return NULL; + } + if (r == Py_True) { + task->task_must_cancel = 0; + } + Py_DECREF(r); + } + + Py_RETURN_NONE; + } + else { + goto yield_insteadof_yf; + } + } + + /* Check if `result` is a Future-compatible object */ + o = PyObject_GetAttrString(result, "_asyncio_future_blocking"); + if (o == NULL) { + if (PyErr_ExceptionMatches(PyExc_AttributeError)) { + PyErr_Clear(); + } + else { + goto fail; + } + } + else { + if (o == Py_None) { + Py_CLEAR(o); + } + else { + /* `result` is a Future-compatible object */ + PyObject *wrapper; + PyObject *res; + + int blocking = PyObject_IsTrue(o); + Py_CLEAR(o); + if (blocking < 0) { + goto fail; + } + + /* Check if `result` future is attached to a different loop */ + PyObject *oloop = PyObject_GetAttrString(result, "_loop"); + if (oloop == NULL) { + goto fail; + } + if (oloop != task->task_loop) { + Py_DECREF(oloop); + goto different_loop; + } + else { + Py_DECREF(oloop); + } + + if (blocking) { + /* result._asyncio_future_blocking = False */ + if (PyObject_SetAttrString( + result, "_asyncio_future_blocking", Py_False) == -1) { + goto fail; + } + + /* result.add_done_callback(task._wakeup) */ + wrapper = TaskWakeupMethWrapper_new(task); + if (wrapper == NULL) { + goto fail; + } + res = _PyObject_CallMethodId( + result, &PyId_add_done_callback, "O", wrapper, NULL); + Py_DECREF(wrapper); + if (res == NULL) { + goto fail; + } + Py_DECREF(res); + + /* task._fut_waiter = result */ + task->task_fut_waiter = result; /* no incref is necessary */ + + if (task->task_must_cancel) { + PyObject *r; + int is_true; + r = _PyObject_CallMethodId(result, &PyId_cancel, NULL); + if (r == NULL) { + return NULL; + } + is_true = PyObject_IsTrue(r); + Py_DECREF(r); + if (is_true < 0) { + return NULL; + } + else if (is_true) { + task->task_must_cancel = 0; + } + } + + Py_RETURN_NONE; + } + else { + goto yield_insteadof_yf; + } + } + } + + /* Check if `result` is None */ + if (result == Py_None) { + /* Bare yield relinquishes control for one event loop iteration. */ + if (task_call_step_soon(task, NULL)) { + goto fail; + } + return result; + } + + /* Check if `result` is a generator */ + o = PyObject_CallFunctionObjArgs(inspect_isgenerator, result, NULL); + if (o == NULL) { + /* An exception in inspect.isgenerator */ goto fail; } - asyncio_repr_info_func = PyObject_GetAttrString(module, - "_future_repr_info"); - if (asyncio_repr_info_func == NULL) { + res = PyObject_IsTrue(o); + Py_CLEAR(o); + if (res == -1) { + /* An exception while checking if 'val' is True */ goto fail; } + if (res == 1) { + /* `result` is a generator */ + PyObject *ret; + ret = task_set_error_soon( + task, PyExc_RuntimeError, + "yield was used instead of yield from for " + "generator in task %R with %S", task, result); + Py_DECREF(result); + return ret; + } + + /* The `result` is none of the above */ + Py_DECREF(result); + return task_set_error_soon( + task, PyExc_RuntimeError, "Task got bad yield: %R", result); + +self_await: + o = task_set_error_soon( + task, PyExc_RuntimeError, + "Task cannot await on itself: %R", task); + Py_DECREF(result); + return o; + +yield_insteadof_yf: + o = task_set_error_soon( + task, PyExc_RuntimeError, + "yield was used instead of yield from " + "in task %R with %R", + task, result); + Py_DECREF(result); + return o; + +different_loop: + o = task_set_error_soon( + task, PyExc_RuntimeError, + "Task %R got Future %R attached to a different loop", + task, result); + Py_DECREF(result); + return o; - asyncio_InvalidStateError = PyObject_GetAttrString(module, - "InvalidStateError"); - if (asyncio_InvalidStateError == NULL) { - goto fail; +fail: + Py_XDECREF(result); + return NULL; +} + +static PyObject * +task_step(TaskObj *task, PyObject *exc) +{ + PyObject *res; + PyObject *ot; + + if (PyDict_SetItem((PyObject *)current_tasks, + task->task_loop, (PyObject*)task) == -1) + { + return NULL; } - asyncio_CancelledError = PyObject_GetAttrString(module, "CancelledError"); - if (asyncio_CancelledError == NULL) { - goto fail; + res = task_step_impl(task, exc); + + if (res == NULL) { + PyObject *et, *ev, *tb; + PyErr_Fetch(&et, &ev, &tb); + ot = _PyDict_Pop(current_tasks, task->task_loop, NULL); + if (ot == NULL) { + Py_XDECREF(et); + Py_XDECREF(tb); + Py_XDECREF(ev); + return NULL; + } + Py_DECREF(ot); + PyErr_Restore(et, ev, tb); + return NULL; + } + else { + ot = _PyDict_Pop(current_tasks, task->task_loop, NULL); + if (ot == NULL) { + Py_DECREF(res); + return NULL; + } + else { + Py_DECREF(ot); + return res; + } } +} - Py_DECREF(module); - return 0; +static PyObject * +task_wakeup(TaskObj *task, PyObject *o) +{ + assert(o); -fail: + if (Future_CheckExact(o) || Task_CheckExact(o)) { + PyObject *fut_result = NULL; + int res = future_get_result((FutureObj*)o, &fut_result); + PyObject *result; + + switch(res) { + case -1: + assert(fut_result == NULL); + return NULL; + case 0: + Py_DECREF(fut_result); + return task_call_step(task, NULL); + default: + assert(res == 1); + result = task_call_step(task, fut_result); + Py_DECREF(fut_result); + return result; + } + } + + PyObject *fut_result = PyObject_CallMethod(o, "result", NULL); + if (fut_result == NULL) { + PyObject *et, *ev, *tb; + PyObject *res; + + PyErr_Fetch(&et, &ev, &tb); + if (!ev || !PyObject_TypeCheck(ev, (PyTypeObject *) et)) { + PyErr_NormalizeException(&et, &ev, &tb); + } + + res = task_call_step(task, ev); + + Py_XDECREF(et); + Py_XDECREF(tb); + Py_XDECREF(ev); + + return res; + } + else { + Py_DECREF(fut_result); + return task_call_step(task, NULL); + } +} + + +/*********************** Module **************************/ + + +static void +module_free(void *m) +{ + Py_CLEAR(current_tasks); + Py_CLEAR(all_tasks); Py_CLEAR(traceback_extract_stack); Py_CLEAR(asyncio_get_event_loop); - Py_CLEAR(asyncio_repr_info_func); + Py_CLEAR(asyncio_future_repr_info_func); + Py_CLEAR(asyncio_task_repr_info_func); + Py_CLEAR(asyncio_task_get_stack_func); + Py_CLEAR(asyncio_task_print_stack_func); Py_CLEAR(asyncio_InvalidStateError); Py_CLEAR(asyncio_CancelledError); + Py_CLEAR(inspect_isgenerator); +} + +static int +module_init(void) +{ + PyObject *module = NULL; + PyObject *cls; + +#define WITH_MOD(NAME) \ + Py_CLEAR(module); \ + module = PyImport_ImportModule(NAME); \ + if (module == NULL) { \ + return -1; \ + } + +#define GET_MOD_ATTR(VAR, NAME) \ + VAR = PyObject_GetAttrString(module, NAME); \ + if (VAR == NULL) { \ + goto fail; \ + } + + WITH_MOD("asyncio.events") + GET_MOD_ATTR(asyncio_get_event_loop, "get_event_loop") + + WITH_MOD("asyncio.base_futures") + GET_MOD_ATTR(asyncio_future_repr_info_func, "_future_repr_info") + GET_MOD_ATTR(asyncio_InvalidStateError, "InvalidStateError") + GET_MOD_ATTR(asyncio_CancelledError, "CancelledError") + + WITH_MOD("asyncio.base_tasks") + GET_MOD_ATTR(asyncio_task_repr_info_func, "_task_repr_info") + GET_MOD_ATTR(asyncio_task_get_stack_func, "_task_get_stack") + GET_MOD_ATTR(asyncio_task_print_stack_func, "_task_print_stack") + + WITH_MOD("inspect") + GET_MOD_ATTR(inspect_isgenerator, "isgenerator") + + WITH_MOD("traceback") + GET_MOD_ATTR(traceback_extract_stack, "extract_stack") + + WITH_MOD("weakref") + GET_MOD_ATTR(cls, "WeakSet") + all_tasks = PyObject_CallObject(cls, NULL); + Py_CLEAR(cls); + if (all_tasks == NULL) { + goto fail; + } + + current_tasks = (PyDictObject *)PyDict_New(); + if (current_tasks == NULL) { + goto fail; + } + Py_CLEAR(module); + return 0; + +fail: + Py_CLEAR(module); + module_free(NULL); return -1; -} +#undef WITH_MOD +#undef GET_MOD_ATTR +} PyDoc_STRVAR(module_doc, "Accelerator module for asyncio"); @@ -1006,14 +2413,14 @@ static struct PyModuleDef _asynciomodule = { NULL, /* m_slots */ NULL, /* m_traverse */ NULL, /* m_clear */ - NULL, /* m_free */ + (freefunc)module_free /* m_free */ }; PyMODINIT_FUNC PyInit__asyncio(void) { - if (init_module() < 0) { + if (module_init() < 0) { return NULL; } if (PyType_Ready(&FutureType) < 0) { @@ -1022,6 +2429,15 @@ PyInit__asyncio(void) if (PyType_Ready(&FutureIterType) < 0) { return NULL; } + if (PyType_Ready(&TaskSendMethWrapper_Type) < 0) { + return NULL; + } + if(PyType_Ready(&TaskWakeupMethWrapper_Type) < 0) { + return NULL; + } + if (PyType_Ready(&TaskType) < 0) { + return NULL; + } PyObject *m = PyModule_Create(&_asynciomodule); if (m == NULL) { @@ -1034,5 +2450,11 @@ PyInit__asyncio(void) return NULL; } + Py_INCREF(&TaskType); + if (PyModule_AddObject(m, "Task", (PyObject *)&TaskType) < 0) { + Py_DECREF(&TaskType); + return NULL; + } + return m; } |