summaryrefslogtreecommitdiffstats
path: root/Modules/_asynciomodule.c
diff options
context:
space:
mode:
authorYury Selivanov <yury@magic.io>2016-10-28 16:52:37 (GMT)
committerYury Selivanov <yury@magic.io>2016-10-28 16:52:37 (GMT)
commita0c1ba608eb89b4e10155f7652c50a3ac0b709af (patch)
tree90e811ae976793876c5c04de7d91cbb7f1518fd9 /Modules/_asynciomodule.c
parentbbcb79920b0e220c2e1b0e77db5aca2f3a2a52d4 (diff)
downloadcpython-a0c1ba608eb89b4e10155f7652c50a3ac0b709af.zip
cpython-a0c1ba608eb89b4e10155f7652c50a3ac0b709af.tar.gz
cpython-a0c1ba608eb89b4e10155f7652c50a3ac0b709af.tar.bz2
Issue #28544: Implement asyncio.Task in C.
This implementation provides additional 10-20% speed boost for asyncio programs. The patch also fixes _asynciomodule.c to use Arguments Clinic, and makes '_schedule_callbacks' an overridable method (as it was in 3.5).
Diffstat (limited to 'Modules/_asynciomodule.c')
-rw-r--r--Modules/_asynciomodule.c1976
1 files changed, 1699 insertions, 277 deletions
diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c
index a3c96c8..d9419df 100644
--- a/Modules/_asynciomodule.c
+++ b/Modules/_asynciomodule.c
@@ -2,20 +2,35 @@
#include "structmember.h"
+/*[clinic input]
+module _asyncio
+[clinic start generated code]*/
+/*[clinic end generated code: output=da39a3ee5e6b4b0d input=8fd17862aa989c69]*/
+
+
/* identifiers used from some functions */
+_Py_IDENTIFIER(add_done_callback);
_Py_IDENTIFIER(call_soon);
+_Py_IDENTIFIER(cancel);
+_Py_IDENTIFIER(send);
+_Py_IDENTIFIER(throw);
+_Py_IDENTIFIER(_step);
+_Py_IDENTIFIER(_schedule_callbacks);
+_Py_IDENTIFIER(_wakeup);
/* State of the _asyncio module */
+static PyObject *all_tasks;
+static PyDictObject *current_tasks;
static PyObject *traceback_extract_stack;
static PyObject *asyncio_get_event_loop;
-static PyObject *asyncio_repr_info_func;
+static PyObject *asyncio_future_repr_info_func;
+static PyObject *asyncio_task_repr_info_func;
+static PyObject *asyncio_task_get_stack_func;
+static PyObject *asyncio_task_print_stack_func;
static PyObject *asyncio_InvalidStateError;
static PyObject *asyncio_CancelledError;
-
-
-/* Get FutureIter from Future */
-static PyObject* new_future_iter(PyObject *fut);
+static PyObject *inspect_isgenerator;
typedef enum {
@@ -24,24 +39,57 @@ typedef enum {
STATE_FINISHED
} fut_state;
+#define FutureObj_HEAD(prefix) \
+ PyObject_HEAD \
+ PyObject *prefix##_loop; \
+ PyObject *prefix##_callbacks; \
+ PyObject *prefix##_exception; \
+ PyObject *prefix##_result; \
+ PyObject *prefix##_source_tb; \
+ fut_state prefix##_state; \
+ int prefix##_log_tb; \
+ int prefix##_blocking; \
+ PyObject *dict; \
+ PyObject *prefix##_weakreflist;
typedef struct {
- PyObject_HEAD
- PyObject *fut_loop;
- PyObject *fut_callbacks;
- PyObject *fut_exception;
- PyObject *fut_result;
- PyObject *fut_source_tb;
- fut_state fut_state;
- int fut_log_tb;
- int fut_blocking;
- PyObject *dict;
- PyObject *fut_weakreflist;
+ FutureObj_HEAD(fut)
} FutureObj;
+typedef struct {
+ FutureObj_HEAD(task)
+ PyObject *task_fut_waiter;
+ PyObject *task_coro;
+ int task_must_cancel;
+ int task_log_destroy_pending;
+} TaskObj;
+
+typedef struct {
+ PyObject_HEAD
+ TaskObj *sw_task;
+ PyObject *sw_arg;
+} TaskSendMethWrapper;
+
+typedef struct {
+ PyObject_HEAD
+ TaskObj *ww_task;
+} TaskWakeupMethWrapper;
+
+
+#include "clinic/_asynciomodule.c.h"
+
+
+/*[clinic input]
+class _asyncio.Future "FutureObj *" "&Future_Type"
+[clinic start generated code]*/
+/*[clinic end generated code: output=da39a3ee5e6b4b0d input=00d3e4abca711e0f]*/
+
+/* Get FutureIter from Future */
+static PyObject* future_new_iter(PyObject *);
+static inline int future_call_schedule_callbacks(FutureObj *);
static int
-_schedule_callbacks(FutureObj *fut)
+future_schedule_callbacks(FutureObj *fut)
{
Py_ssize_t len;
PyObject* iters;
@@ -87,16 +135,11 @@ _schedule_callbacks(FutureObj *fut)
}
static int
-FutureObj_init(FutureObj *fut, PyObject *args, PyObject *kwds)
+future_init(FutureObj *fut, PyObject *loop)
{
- static char *kwlist[] = {"loop", NULL};
- PyObject *loop = NULL;
PyObject *res = NULL;
_Py_IDENTIFIER(get_debug);
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "|$O", kwlist, &loop)) {
- return -1;
- }
if (loop == NULL || loop == Py_None) {
loop = PyObject_CallObject(asyncio_get_event_loop, NULL);
if (loop == NULL) {
@@ -128,106 +171,12 @@ FutureObj_init(FutureObj *fut, PyObject *args, PyObject *kwds)
if (fut->fut_callbacks == NULL) {
return -1;
}
- return 0;
-}
-
-static int
-FutureObj_clear(FutureObj *fut)
-{
- Py_CLEAR(fut->fut_loop);
- Py_CLEAR(fut->fut_callbacks);
- Py_CLEAR(fut->fut_result);
- Py_CLEAR(fut->fut_exception);
- Py_CLEAR(fut->fut_source_tb);
- Py_CLEAR(fut->dict);
- return 0;
-}
-static int
-FutureObj_traverse(FutureObj *fut, visitproc visit, void *arg)
-{
- Py_VISIT(fut->fut_loop);
- Py_VISIT(fut->fut_callbacks);
- Py_VISIT(fut->fut_result);
- Py_VISIT(fut->fut_exception);
- Py_VISIT(fut->fut_source_tb);
- Py_VISIT(fut->dict);
return 0;
}
-PyDoc_STRVAR(pydoc_result,
- "Return the result this future represents.\n"
- "\n"
- "If the future has been cancelled, raises CancelledError. If the\n"
- "future's result isn't yet available, raises InvalidStateError. If\n"
- "the future is done and has an exception set, this exception is raised."
-);
-
-static PyObject *
-FutureObj_result(FutureObj *fut, PyObject *arg)
-{
- if (fut->fut_state == STATE_CANCELLED) {
- PyErr_SetString(asyncio_CancelledError, "");
- return NULL;
- }
-
- if (fut->fut_state != STATE_FINISHED) {
- PyErr_SetString(asyncio_InvalidStateError, "Result is not ready.");
- return NULL;
- }
-
- fut->fut_log_tb = 0;
- if (fut->fut_exception != NULL) {
- PyObject *type = NULL;
- type = PyExceptionInstance_Class(fut->fut_exception);
- PyErr_SetObject(type, fut->fut_exception);
- return NULL;
- }
-
- Py_INCREF(fut->fut_result);
- return fut->fut_result;
-}
-
-PyDoc_STRVAR(pydoc_exception,
- "Return the exception that was set on this future.\n"
- "\n"
- "The exception (or None if no exception was set) is returned only if\n"
- "the future is done. If the future has been cancelled, raises\n"
- "CancelledError. If the future isn't done yet, raises\n"
- "InvalidStateError."
-);
-
-static PyObject *
-FutureObj_exception(FutureObj *fut, PyObject *arg)
-{
- if (fut->fut_state == STATE_CANCELLED) {
- PyErr_SetString(asyncio_CancelledError, "");
- return NULL;
- }
-
- if (fut->fut_state != STATE_FINISHED) {
- PyErr_SetString(asyncio_InvalidStateError, "Result is not ready.");
- return NULL;
- }
-
- if (fut->fut_exception != NULL) {
- fut->fut_log_tb = 0;
- Py_INCREF(fut->fut_exception);
- return fut->fut_exception;
- }
-
- Py_RETURN_NONE;
-}
-
-PyDoc_STRVAR(pydoc_set_result,
- "Mark the future done and set its result.\n"
- "\n"
- "If the future is already done when this method is called, raises\n"
- "InvalidStateError."
-);
-
static PyObject *
-FutureObj_set_result(FutureObj *fut, PyObject *res)
+future_set_result(FutureObj *fut, PyObject *res)
{
if (fut->fut_state != STATE_PENDING) {
PyErr_SetString(asyncio_InvalidStateError, "invalid state");
@@ -238,21 +187,14 @@ FutureObj_set_result(FutureObj *fut, PyObject *res)
fut->fut_result = res;
fut->fut_state = STATE_FINISHED;
- if (_schedule_callbacks(fut) == -1) {
+ if (future_call_schedule_callbacks(fut) == -1) {
return NULL;
}
Py_RETURN_NONE;
}
-PyDoc_STRVAR(pydoc_set_exception,
- "Mark the future done and set an exception.\n"
- "\n"
- "If the future is already done when this method is called, raises\n"
- "InvalidStateError."
-);
-
static PyObject *
-FutureObj_set_exception(FutureObj *fut, PyObject *exc)
+future_set_exception(FutureObj *fut, PyObject *exc)
{
PyObject *exc_val = NULL;
@@ -287,7 +229,7 @@ FutureObj_set_exception(FutureObj *fut, PyObject *exc)
fut->fut_exception = exc_val;
fut->fut_state = STATE_FINISHED;
- if (_schedule_callbacks(fut) == -1) {
+ if (future_call_schedule_callbacks(fut) == -1) {
return NULL;
}
@@ -295,16 +237,50 @@ FutureObj_set_exception(FutureObj *fut, PyObject *exc)
Py_RETURN_NONE;
}
-PyDoc_STRVAR(pydoc_add_done_callback,
- "Add a callback to be run when the future becomes done.\n"
- "\n"
- "The callback is called with a single argument - the future object. If\n"
- "the future is already done when this is called, the callback is\n"
- "scheduled with call_soon.";
-);
+static int
+future_get_result(FutureObj *fut, PyObject **result)
+{
+ PyObject *exc;
+
+ if (fut->fut_state == STATE_CANCELLED) {
+ exc = _PyObject_CallNoArg(asyncio_CancelledError);
+ if (exc == NULL) {
+ return -1;
+ }
+ *result = exc;
+ return 1;
+ }
+
+ if (fut->fut_state != STATE_FINISHED) {
+ PyObject *msg = PyUnicode_FromString("Result is not ready.");
+ if (msg == NULL) {
+ return -1;
+ }
+
+ exc = _PyObject_CallArg1(asyncio_InvalidStateError, msg);
+ Py_DECREF(msg);
+ if (exc == NULL) {
+ return -1;
+ }
+
+ *result = exc;
+ return 1;
+ }
+
+ fut->fut_log_tb = 0;
+ if (fut->fut_exception != NULL) {
+ Py_INCREF(fut->fut_exception);
+ *result = fut->fut_exception;
+ return 1;
+ }
+
+ Py_INCREF(fut->fut_result);
+ *result = fut->fut_result;
+ return 0;
+}
static PyObject *
-FutureObj_add_done_callback(FutureObj *fut, PyObject *arg)
+future_add_done_callback(FutureObj *fut, PyObject *arg)
{
if (fut->fut_state != STATE_PENDING) {
PyObject *handle = _PyObject_CallMethodId(
@@ -326,19 +302,216 @@ FutureObj_add_done_callback(FutureObj *fut, PyObject *arg)
Py_RETURN_NONE;
}
-PyDoc_STRVAR(pydoc_remove_done_callback,
- "Remove all instances of a callback from the \"call when done\" list.\n"
- "\n"
- "Returns the number of callbacks removed."
-);
+static PyObject *
+future_cancel(FutureObj *fut)
+{
+ if (fut->fut_state != STATE_PENDING) {
+ Py_RETURN_FALSE;
+ }
+ fut->fut_state = STATE_CANCELLED;
+
+ if (future_call_schedule_callbacks(fut) == -1) {
+ return NULL;
+ }
+
+ Py_RETURN_TRUE;
+}
+
+/*[clinic input]
+_asyncio.Future.__init__
+
+ *
+ loop: 'O' = NULL
+
+This class is *almost* compatible with concurrent.futures.Future.
+
+ Differences:
+
+ - result() and exception() do not take a timeout argument and
+ raise an exception when the future isn't done yet.
+
+ - Callbacks registered with add_done_callback() are always called
+ via the event loop's call_soon_threadsafe().
+
+ - This class is not compatible with the wait() and as_completed()
+ methods in the concurrent.futures package.
+[clinic start generated code]*/
+
+static int
+_asyncio_Future___init___impl(FutureObj *self, PyObject *loop)
+/*[clinic end generated code: output=9ed75799eaccb5d6 input=8e1681f23605be2d]*/
+
+{
+ return future_init(self, loop);
+}
+
+static int
+FutureObj_clear(FutureObj *fut)
+{
+ Py_CLEAR(fut->fut_loop);
+ Py_CLEAR(fut->fut_callbacks);
+ Py_CLEAR(fut->fut_result);
+ Py_CLEAR(fut->fut_exception);
+ Py_CLEAR(fut->fut_source_tb);
+ Py_CLEAR(fut->dict);
+ return 0;
+}
+
+static int
+FutureObj_traverse(FutureObj *fut, visitproc visit, void *arg)
+{
+ Py_VISIT(fut->fut_loop);
+ Py_VISIT(fut->fut_callbacks);
+ Py_VISIT(fut->fut_result);
+ Py_VISIT(fut->fut_exception);
+ Py_VISIT(fut->fut_source_tb);
+ Py_VISIT(fut->dict);
+ return 0;
+}
+
+/*[clinic input]
+_asyncio.Future.result
+
+Return the result this future represents.
+
+If the future has been cancelled, raises CancelledError. If the
+future's result isn't yet available, raises InvalidStateError. If
+the future is done and has an exception set, this exception is raised.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Future_result_impl(FutureObj *self)
+/*[clinic end generated code: output=f35f940936a4b1e5 input=49ecf9cf5ec50dc5]*/
+{
+ PyObject *result;
+ int res = future_get_result(self, &result);
+
+ if (res == -1) {
+ return NULL;
+ }
+
+ if (res == 0) {
+ return result;
+ }
+
+ assert(res == 1);
+
+ PyErr_SetObject(PyExceptionInstance_Class(result), result);
+ Py_DECREF(result);
+ return NULL;
+}
+
+/*[clinic input]
+_asyncio.Future.exception
+
+Return the exception that was set on this future.
+
+The exception (or None if no exception was set) is returned only if
+the future is done. If the future has been cancelled, raises
+CancelledError. If the future isn't done yet, raises
+InvalidStateError.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Future_exception_impl(FutureObj *self)
+/*[clinic end generated code: output=88b20d4f855e0710 input=733547a70c841c68]*/
+{
+ if (self->fut_state == STATE_CANCELLED) {
+ PyErr_SetString(asyncio_CancelledError, "");
+ return NULL;
+ }
+
+ if (self->fut_state != STATE_FINISHED) {
+ PyErr_SetString(asyncio_InvalidStateError, "Result is not ready.");
+ return NULL;
+ }
+
+ if (self->fut_exception != NULL) {
+ self->fut_log_tb = 0;
+ Py_INCREF(self->fut_exception);
+ return self->fut_exception;
+ }
+
+ Py_RETURN_NONE;
+}
+
+/*[clinic input]
+_asyncio.Future.set_result
+
+ res: 'O'
+ /
+
+Mark the future done and set its result.
+
+If the future is already done when this method is called, raises
+InvalidStateError.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Future_set_result(FutureObj *self, PyObject *res)
+/*[clinic end generated code: output=a620abfc2796bfb6 input=8619565e0503357e]*/
+{
+ return future_set_result(self, res);
+}
+
+/*[clinic input]
+_asyncio.Future.set_exception
+
+ exception: 'O'
+ /
+
+Mark the future done and set an exception.
+
+If the future is already done when this method is called, raises
+InvalidStateError.
+[clinic start generated code]*/
static PyObject *
-FutureObj_remove_done_callback(FutureObj *fut, PyObject *arg)
+_asyncio_Future_set_exception(FutureObj *self, PyObject *exception)
+/*[clinic end generated code: output=f1c1b0cd321be360 input=1377dbe15e6ea186]*/
+{
+ return future_set_exception(self, exception);
+}
+
+/*[clinic input]
+_asyncio.Future.add_done_callback
+
+ fn: 'O'
+ /
+
+Add a callback to be run when the future becomes done.
+
+The callback is called with a single argument - the future object. If
+the future is already done when this is called, the callback is
+scheduled with call_soon.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Future_add_done_callback(FutureObj *self, PyObject *fn)
+/*[clinic end generated code: output=819e09629b2ec2b5 input=8cce187e32cec6a8]*/
+{
+ return future_add_done_callback(self, fn);
+}
+
+/*[clinic input]
+_asyncio.Future.remove_done_callback
+
+ fn: 'O'
+ /
+
+Remove all instances of a callback from the "call when done" list.
+
+Returns the number of callbacks removed.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Future_remove_done_callback(FutureObj *self, PyObject *fn)
+/*[clinic end generated code: output=5ab1fb52b24ef31f input=3fedb73e1409c31c]*/
{
PyObject *newlist;
Py_ssize_t len, i, j=0;
- len = PyList_GET_SIZE(fut->fut_callbacks);
+ len = PyList_GET_SIZE(self->fut_callbacks);
if (len == 0) {
return PyLong_FromSsize_t(0);
}
@@ -350,9 +523,9 @@ FutureObj_remove_done_callback(FutureObj *fut, PyObject *arg)
for (i = 0; i < len; i++) {
int ret;
- PyObject *item = PyList_GET_ITEM(fut->fut_callbacks, i);
+ PyObject *item = PyList_GET_ITEM(self->fut_callbacks, i);
- if ((ret = PyObject_RichCompareBool(arg, item, Py_EQ)) < 0) {
+ if ((ret = PyObject_RichCompareBool(fn, item, Py_EQ)) < 0) {
goto fail;
}
if (ret == 0) {
@@ -365,7 +538,7 @@ FutureObj_remove_done_callback(FutureObj *fut, PyObject *arg)
if (PyList_SetSlice(newlist, j, len, NULL) < 0) {
goto fail;
}
- if (PyList_SetSlice(fut->fut_callbacks, 0, len, newlist) < 0) {
+ if (PyList_SetSlice(self->fut_callbacks, 0, len, newlist) < 0) {
goto fail;
}
Py_DECREF(newlist);
@@ -376,35 +549,34 @@ fail:
return NULL;
}
-PyDoc_STRVAR(pydoc_cancel,
- "Cancel the future and schedule callbacks.\n"
- "\n"
- "If the future is already done or cancelled, return False. Otherwise,\n"
- "change the future's state to cancelled, schedule the callbacks and\n"
- "return True."
-);
+/*[clinic input]
+_asyncio.Future.cancel
-static PyObject *
-FutureObj_cancel(FutureObj *fut, PyObject *arg)
-{
- if (fut->fut_state != STATE_PENDING) {
- Py_RETURN_FALSE;
- }
- fut->fut_state = STATE_CANCELLED;
+Cancel the future and schedule callbacks.
- if (_schedule_callbacks(fut) == -1) {
- return NULL;
- }
+If the future is already done or cancelled, return False. Otherwise,
+change the future's state to cancelled, schedule the callbacks and
+return True.
+[clinic start generated code]*/
- Py_RETURN_TRUE;
+static PyObject *
+_asyncio_Future_cancel_impl(FutureObj *self)
+/*[clinic end generated code: output=e45b932ba8bd68a1 input=515709a127995109]*/
+{
+ return future_cancel(self);
}
-PyDoc_STRVAR(pydoc_cancelled, "Return True if the future was cancelled.");
+/*[clinic input]
+_asyncio.Future.cancelled
+
+Return True if the future was cancelled.
+[clinic start generated code]*/
static PyObject *
-FutureObj_cancelled(FutureObj *fut, PyObject *arg)
+_asyncio_Future_cancelled_impl(FutureObj *self)
+/*[clinic end generated code: output=145197ced586357d input=943ab8b7b7b17e45]*/
{
- if (fut->fut_state == STATE_CANCELLED) {
+ if (self->fut_state == STATE_CANCELLED) {
Py_RETURN_TRUE;
}
else {
@@ -412,17 +584,20 @@ FutureObj_cancelled(FutureObj *fut, PyObject *arg)
}
}
-PyDoc_STRVAR(pydoc_done,
- "Return True if the future is done.\n"
- "\n"
- "Done means either that a result / exception are available, or that the\n"
- "future was cancelled."
-);
+/*[clinic input]
+_asyncio.Future.done
+
+Return True if the future is done.
+
+Done means either that a result / exception are available, or that the
+future was cancelled.
+[clinic start generated code]*/
static PyObject *
-FutureObj_done(FutureObj *fut, PyObject *arg)
+_asyncio_Future_done_impl(FutureObj *self)
+/*[clinic end generated code: output=244c5ac351145096 input=28d7b23fdb65d2ac]*/
{
- if (fut->fut_state == STATE_PENDING) {
+ if (self->fut_state == STATE_PENDING) {
Py_RETURN_FALSE;
}
else {
@@ -538,13 +713,31 @@ FutureObj_get_state(FutureObj *fut)
return ret;
}
-static PyObject*
-FutureObj__repr_info(FutureObj *fut)
+/*[clinic input]
+_asyncio.Future._repr_info
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Future__repr_info_impl(FutureObj *self)
+/*[clinic end generated code: output=fa69e901bd176cfb input=f21504d8e2ae1ca2]*/
+{
+ return PyObject_CallFunctionObjArgs(
+ asyncio_future_repr_info_func, self, NULL);
+}
+
+/*[clinic input]
+_asyncio.Future._schedule_callbacks
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Future__schedule_callbacks_impl(FutureObj *self)
+/*[clinic end generated code: output=5e8958d89ea1c5dc input=4f5f295f263f4a88]*/
{
- if (asyncio_repr_info_func == NULL) {
- return PyList_New(0);
+ int ret = future_schedule_callbacks(self);
+ if (ret == -1) {
+ return NULL;
}
- return PyObject_CallFunctionObjArgs(asyncio_repr_info_func, fut, NULL);
+ Py_RETURN_NONE;
}
static PyObject *
@@ -661,43 +854,39 @@ finally:
static PyAsyncMethods FutureType_as_async = {
- (unaryfunc)new_future_iter, /* am_await */
+ (unaryfunc)future_new_iter, /* am_await */
0, /* am_aiter */
0 /* am_anext */
};
static PyMethodDef FutureType_methods[] = {
- {"_repr_info", (PyCFunction)FutureObj__repr_info, METH_NOARGS, NULL},
- {"add_done_callback",
- (PyCFunction)FutureObj_add_done_callback,
- METH_O, pydoc_add_done_callback},
- {"remove_done_callback",
- (PyCFunction)FutureObj_remove_done_callback,
- METH_O, pydoc_remove_done_callback},
- {"set_result",
- (PyCFunction)FutureObj_set_result, METH_O, pydoc_set_result},
- {"set_exception",
- (PyCFunction)FutureObj_set_exception, METH_O, pydoc_set_exception},
- {"cancel", (PyCFunction)FutureObj_cancel, METH_NOARGS, pydoc_cancel},
- {"cancelled",
- (PyCFunction)FutureObj_cancelled, METH_NOARGS, pydoc_cancelled},
- {"done", (PyCFunction)FutureObj_done, METH_NOARGS, pydoc_done},
- {"result", (PyCFunction)FutureObj_result, METH_NOARGS, pydoc_result},
- {"exception",
- (PyCFunction)FutureObj_exception, METH_NOARGS, pydoc_exception},
+ _ASYNCIO_FUTURE_RESULT_METHODDEF
+ _ASYNCIO_FUTURE_EXCEPTION_METHODDEF
+ _ASYNCIO_FUTURE_SET_RESULT_METHODDEF
+ _ASYNCIO_FUTURE_SET_EXCEPTION_METHODDEF
+ _ASYNCIO_FUTURE_ADD_DONE_CALLBACK_METHODDEF
+ _ASYNCIO_FUTURE_REMOVE_DONE_CALLBACK_METHODDEF
+ _ASYNCIO_FUTURE_CANCEL_METHODDEF
+ _ASYNCIO_FUTURE_CANCELLED_METHODDEF
+ _ASYNCIO_FUTURE_DONE_METHODDEF
+ _ASYNCIO_FUTURE__REPR_INFO_METHODDEF
+ _ASYNCIO_FUTURE__SCHEDULE_CALLBACKS_METHODDEF
{NULL, NULL} /* Sentinel */
};
-static PyGetSetDef FutureType_getsetlist[] = {
- {"_state", (getter)FutureObj_get_state, NULL, NULL},
- {"_asyncio_future_blocking", (getter)FutureObj_get_blocking,
- (setter)FutureObj_set_blocking, NULL},
- {"_loop", (getter)FutureObj_get_loop, NULL, NULL},
- {"_callbacks", (getter)FutureObj_get_callbacks, NULL, NULL},
- {"_result", (getter)FutureObj_get_result, NULL, NULL},
- {"_exception", (getter)FutureObj_get_exception, NULL, NULL},
- {"_log_traceback", (getter)FutureObj_get_log_traceback, NULL, NULL},
+#define FUTURE_COMMON_GETSETLIST \
+ {"_state", (getter)FutureObj_get_state, NULL, NULL}, \
+ {"_asyncio_future_blocking", (getter)FutureObj_get_blocking, \
+ (setter)FutureObj_set_blocking, NULL}, \
+ {"_loop", (getter)FutureObj_get_loop, NULL, NULL}, \
+ {"_callbacks", (getter)FutureObj_get_callbacks, NULL, NULL}, \
+ {"_result", (getter)FutureObj_get_result, NULL, NULL}, \
+ {"_exception", (getter)FutureObj_get_exception, NULL, NULL}, \
+ {"_log_traceback", (getter)FutureObj_get_log_traceback, NULL, NULL}, \
{"_source_traceback", (getter)FutureObj_get_source_traceback, NULL, NULL},
+
+static PyGetSetDef FutureType_getsetlist[] = {
+ FUTURE_COMMON_GETSETLIST
{NULL} /* Sentinel */
};
@@ -712,25 +901,46 @@ static PyTypeObject FutureType = {
.tp_repr = (reprfunc)FutureObj_repr,
.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE
| Py_TPFLAGS_HAVE_FINALIZE,
- .tp_doc = "Fast asyncio.Future implementation.",
+ .tp_doc = _asyncio_Future___init____doc__,
.tp_traverse = (traverseproc)FutureObj_traverse,
.tp_clear = (inquiry)FutureObj_clear,
.tp_weaklistoffset = offsetof(FutureObj, fut_weakreflist),
- .tp_iter = (getiterfunc)new_future_iter,
+ .tp_iter = (getiterfunc)future_new_iter,
.tp_methods = FutureType_methods,
.tp_getset = FutureType_getsetlist,
.tp_dictoffset = offsetof(FutureObj, dict),
- .tp_init = (initproc)FutureObj_init,
+ .tp_init = (initproc)_asyncio_Future___init__,
.tp_new = PyType_GenericNew,
.tp_finalize = (destructor)FutureObj_finalize,
};
+#define Future_CheckExact(obj) (Py_TYPE(obj) == &FutureType)
+
+static inline int
+future_call_schedule_callbacks(FutureObj *fut)
+{
+ if (Future_CheckExact(fut)) {
+ return future_schedule_callbacks(fut);
+ }
+ else {
+ /* `fut` is a subclass of Future */
+ PyObject *ret = _PyObject_CallMethodId(
+ (PyObject*)fut, &PyId__schedule_callbacks, NULL);
+ if (ret == NULL) {
+ return -1;
+ }
+
+ Py_DECREF(ret);
+ return 0;
+ }
+}
+
static void
FutureObj_dealloc(PyObject *self)
{
FutureObj *fut = (FutureObj *)self;
- if (Py_TYPE(fut) == &FutureType) {
+ if (Future_CheckExact(fut)) {
/* When fut is subclass of Future, finalizer is called from
* subtype_dealloc.
*/
@@ -744,7 +954,7 @@ FutureObj_dealloc(PyObject *self)
PyObject_ClearWeakRefs(self);
}
- FutureObj_clear(fut);
+ (void)FutureObj_clear(fut);
Py_TYPE(fut)->tp_free(fut);
}
@@ -759,7 +969,7 @@ typedef struct {
static void
FutureIter_dealloc(futureiterobject *it)
{
- _PyObject_GC_UNTRACK(it);
+ PyObject_GC_UnTrack(it);
Py_XDECREF(it->future);
PyObject_GC_Del(it);
}
@@ -785,7 +995,7 @@ FutureIter_iternext(futureiterobject *it)
return NULL;
}
- res = FutureObj_result(fut, NULL);
+ res = _asyncio_Future_result_impl(fut);
if (res != NULL) {
/* The result of the Future is not an exception.
@@ -884,37 +1094,19 @@ static PyMethodDef FutureIter_methods[] = {
static PyTypeObject FutureIterType = {
PyVarObject_HEAD_INIT(0, 0)
"_asyncio.FutureIter",
- sizeof(futureiterobject), /* tp_basicsize */
- 0, /* tp_itemsize */
- (destructor)FutureIter_dealloc, /* tp_dealloc */
- 0, /* tp_print */
- 0, /* tp_getattr */
- 0, /* tp_setattr */
- 0, /* tp_as_async */
- 0, /* tp_repr */
- 0, /* tp_as_number */
- 0, /* tp_as_sequence */
- 0, /* tp_as_mapping */
- 0, /* tp_hash */
- 0, /* tp_call */
- 0, /* tp_str */
- PyObject_GenericGetAttr, /* tp_getattro */
- 0, /* tp_setattro */
- 0, /* tp_as_buffer */
- Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /* tp_flags */
- 0, /* tp_doc */
- (traverseproc)FutureIter_traverse, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- PyObject_SelfIter, /* tp_iter */
- (iternextfunc)FutureIter_iternext, /* tp_iternext */
- FutureIter_methods, /* tp_methods */
- 0, /* tp_members */
+ .tp_basicsize = sizeof(futureiterobject),
+ .tp_itemsize = 0,
+ .tp_dealloc = (destructor)FutureIter_dealloc,
+ .tp_getattro = PyObject_GenericGetAttr,
+ .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC,
+ .tp_traverse = (traverseproc)FutureIter_traverse,
+ .tp_iter = PyObject_SelfIter,
+ .tp_iternext = (iternextfunc)FutureIter_iternext,
+ .tp_methods = FutureIter_methods,
};
static PyObject *
-new_future_iter(PyObject *fut)
+future_new_iter(PyObject *fut)
{
futureiterobject *it;
@@ -932,68 +1124,1283 @@ new_future_iter(PyObject *fut)
return (PyObject*)it;
}
-/*********************** Module **************************/
+
+/*********************** Task **************************/
+
+
+/*[clinic input]
+class _asyncio.Task "TaskObj *" "&Task_Type"
+[clinic start generated code]*/
+/*[clinic end generated code: output=da39a3ee5e6b4b0d input=719dcef0fcc03b37]*/
+
+static int task_call_step_soon(TaskObj *, PyObject *);
+static inline PyObject * task_call_wakeup(TaskObj *, PyObject *);
+static inline PyObject * task_call_step(TaskObj *, PyObject *);
+static PyObject * task_wakeup(TaskObj *, PyObject *);
+static PyObject * task_step(TaskObj *, PyObject *);
+
+/* ----- Task._step wrapper */
static int
-init_module(void)
+TaskSendMethWrapper_clear(TaskSendMethWrapper *o)
{
- PyObject *module = NULL;
+ Py_CLEAR(o->sw_task);
+ Py_CLEAR(o->sw_arg);
+ return 0;
+}
+
+static void
+TaskSendMethWrapper_dealloc(TaskSendMethWrapper *o)
+{
+ PyObject_GC_UnTrack(o);
+ (void)TaskSendMethWrapper_clear(o);
+ Py_TYPE(o)->tp_free(o);
+}
+
+static PyObject *
+TaskSendMethWrapper_call(TaskSendMethWrapper *o,
+ PyObject *args, PyObject *kwds)
+{
+ return task_call_step(o->sw_task, o->sw_arg);
+}
+
+static int
+TaskSendMethWrapper_traverse(TaskSendMethWrapper *o,
+ visitproc visit, void *arg)
+{
+ Py_VISIT(o->sw_task);
+ Py_VISIT(o->sw_arg);
+ return 0;
+}
+
+static PyObject *
+TaskSendMethWrapper_get___self__(TaskSendMethWrapper *o)
+{
+ if (o->sw_task) {
+ Py_INCREF(o->sw_task);
+ return (PyObject*)o->sw_task;
+ }
+ Py_RETURN_NONE;
+}
+
+static PyGetSetDef TaskSendMethWrapper_getsetlist[] = {
+ {"__self__", (getter)TaskSendMethWrapper_get___self__, NULL, NULL},
+ {NULL} /* Sentinel */
+};
+
+PyTypeObject TaskSendMethWrapper_Type = {
+ PyVarObject_HEAD_INIT(&PyType_Type, 0)
+ "TaskSendMethWrapper",
+ .tp_basicsize = sizeof(TaskSendMethWrapper),
+ .tp_itemsize = 0,
+ .tp_getset = TaskSendMethWrapper_getsetlist,
+ .tp_dealloc = (destructor)TaskSendMethWrapper_dealloc,
+ .tp_call = (ternaryfunc)TaskSendMethWrapper_call,
+ .tp_getattro = PyObject_GenericGetAttr,
+ .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC,
+ .tp_traverse = (traverseproc)TaskSendMethWrapper_traverse,
+ .tp_clear = (inquiry)TaskSendMethWrapper_clear,
+};
+
+static PyObject *
+TaskSendMethWrapper_new(TaskObj *task, PyObject *arg)
+{
+ TaskSendMethWrapper *o;
+ o = PyObject_GC_New(TaskSendMethWrapper, &TaskSendMethWrapper_Type);
+ if (o == NULL) {
+ return NULL;
+ }
- module = PyImport_ImportModule("traceback");
- if (module == NULL) {
+ Py_INCREF(task);
+ o->sw_task = task;
+
+ Py_XINCREF(arg);
+ o->sw_arg = arg;
+
+ PyObject_GC_Track(o);
+ return (PyObject*) o;
+}
+
+/* ----- Task._wakeup wrapper */
+
+static PyObject *
+TaskWakeupMethWrapper_call(TaskWakeupMethWrapper *o,
+ PyObject *args, PyObject *kwds)
+{
+ PyObject *fut;
+
+ if (!PyArg_ParseTuple(args, "O|", &fut)) {
+ return NULL;
+ }
+
+ return task_call_wakeup(o->ww_task, fut);
+}
+
+static int
+TaskWakeupMethWrapper_clear(TaskWakeupMethWrapper *o)
+{
+ Py_CLEAR(o->ww_task);
+ return 0;
+}
+
+static int
+TaskWakeupMethWrapper_traverse(TaskWakeupMethWrapper *o,
+ visitproc visit, void *arg)
+{
+ Py_VISIT(o->ww_task);
+ return 0;
+}
+
+static void
+TaskWakeupMethWrapper_dealloc(TaskWakeupMethWrapper *o)
+{
+ PyObject_GC_UnTrack(o);
+ (void)TaskWakeupMethWrapper_clear(o);
+ Py_TYPE(o)->tp_free(o);
+}
+
+PyTypeObject TaskWakeupMethWrapper_Type = {
+ PyVarObject_HEAD_INIT(&PyType_Type, 0)
+ "TaskWakeupMethWrapper",
+ .tp_basicsize = sizeof(TaskWakeupMethWrapper),
+ .tp_itemsize = 0,
+ .tp_dealloc = (destructor)TaskWakeupMethWrapper_dealloc,
+ .tp_call = (ternaryfunc)TaskWakeupMethWrapper_call,
+ .tp_getattro = PyObject_GenericGetAttr,
+ .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC,
+ .tp_traverse = (traverseproc)TaskWakeupMethWrapper_traverse,
+ .tp_clear = (inquiry)TaskWakeupMethWrapper_clear,
+};
+
+static PyObject *
+TaskWakeupMethWrapper_new(TaskObj *task)
+{
+ TaskWakeupMethWrapper *o;
+ o = PyObject_GC_New(TaskWakeupMethWrapper, &TaskWakeupMethWrapper_Type);
+ if (o == NULL) {
+ return NULL;
+ }
+
+ Py_INCREF(task);
+ o->ww_task = task;
+
+ PyObject_GC_Track(o);
+ return (PyObject*) o;
+}
+
+/* ----- Task */
+
+/*[clinic input]
+_asyncio.Task.__init__
+
+ coro: 'O'
+ *
+ loop: 'O' = NULL
+
+A coroutine wrapped in a Future.
+[clinic start generated code]*/
+
+static int
+_asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop)
+/*[clinic end generated code: output=9f24774c2287fc2f input=71d8d28c201a18cd]*/
+{
+ PyObject *res;
+ _Py_IDENTIFIER(add);
+
+ if (future_init((FutureObj*)self, loop)) {
return -1;
}
- // new reference
- traceback_extract_stack = PyObject_GetAttrString(module, "extract_stack");
- if (traceback_extract_stack == NULL) {
- goto fail;
+
+ self->task_fut_waiter = NULL;
+ self->task_must_cancel = 0;
+ self->task_log_destroy_pending = 1;
+
+ Py_INCREF(coro);
+ self->task_coro = coro;
+
+ if (task_call_step_soon(self, NULL)) {
+ return -1;
+ }
+
+ res = _PyObject_CallMethodId(all_tasks, &PyId_add, "O", self, NULL);
+ if (res == NULL) {
+ return -1;
+ }
+ Py_DECREF(res);
+
+ return 0;
+}
+
+static int
+TaskObj_clear(TaskObj *task)
+{
+ (void)FutureObj_clear((FutureObj*) task);
+ Py_CLEAR(task->task_coro);
+ Py_CLEAR(task->task_fut_waiter);
+ return 0;
+}
+
+static int
+TaskObj_traverse(TaskObj *task, visitproc visit, void *arg)
+{
+ Py_VISIT(task->task_coro);
+ Py_VISIT(task->task_fut_waiter);
+ (void)FutureObj_traverse((FutureObj*) task, visit, arg);
+ return 0;
+}
+
+static PyObject *
+TaskObj_get_log_destroy_pending(TaskObj *task)
+{
+ if (task->task_log_destroy_pending) {
+ Py_RETURN_TRUE;
+ }
+ else {
+ Py_RETURN_FALSE;
+ }
+}
+
+static int
+TaskObj_set_log_destroy_pending(TaskObj *task, PyObject *val)
+{
+ int is_true = PyObject_IsTrue(val);
+ if (is_true < 0) {
+ return -1;
+ }
+ task->task_log_destroy_pending = is_true;
+ return 0;
+}
+
+static PyObject *
+TaskObj_get_must_cancel(TaskObj *task)
+{
+ if (task->task_must_cancel) {
+ Py_RETURN_TRUE;
+ }
+ else {
+ Py_RETURN_FALSE;
+ }
+}
+
+static PyObject *
+TaskObj_get_coro(TaskObj *task)
+{
+ if (task->task_coro) {
+ Py_INCREF(task->task_coro);
+ return task->task_coro;
+ }
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+TaskObj_get_fut_waiter(TaskObj *task)
+{
+ if (task->task_fut_waiter) {
+ Py_INCREF(task->task_fut_waiter);
+ return task->task_fut_waiter;
+ }
+
+ Py_RETURN_NONE;
+}
+
+/*[clinic input]
+@classmethod
+_asyncio.Task.current_task
+
+ loop: 'O' = NULL
+
+Return the currently running task in an event loop or None.
+
+By default the current task for the current event loop is returned.
+
+None is returned when called not in the context of a Task.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Task_current_task_impl(PyTypeObject *type, PyObject *loop)
+/*[clinic end generated code: output=99fbe7332c516e03 input=cd784537f02cf833]*/
+{
+ PyObject *res;
+
+ if (loop == NULL) {
+ loop = PyObject_CallObject(asyncio_get_event_loop, NULL);
+ if (loop == NULL) {
+ return NULL;
+ }
+
+ res = PyDict_GetItem((PyObject*)current_tasks, loop);
+ Py_DECREF(loop);
+ }
+ else {
+ res = PyDict_GetItem((PyObject*)current_tasks, loop);
}
- Py_DECREF(module);
- module = PyImport_ImportModule("asyncio.events");
- if (module == NULL) {
+ if (res == NULL) {
+ Py_RETURN_NONE;
+ }
+ else {
+ Py_INCREF(res);
+ return res;
+ }
+}
+
+static PyObject *
+task_all_tasks(PyObject *loop)
+{
+ PyObject *task;
+ PyObject *task_loop;
+ PyObject *set;
+ PyObject *iter;
+
+ assert(loop != NULL);
+
+ set = PySet_New(NULL);
+ if (set == NULL) {
+ return NULL;
+ }
+
+ iter = PyObject_GetIter(all_tasks);
+ if (iter == NULL) {
goto fail;
}
- asyncio_get_event_loop = PyObject_GetAttrString(module, "get_event_loop");
- if (asyncio_get_event_loop == NULL) {
+
+ while ((task = PyIter_Next(iter))) {
+ task_loop = PyObject_GetAttrString(task, "_loop");
+ if (task_loop == NULL) {
+ Py_DECREF(task);
+ goto fail;
+ }
+ if (task_loop == loop) {
+ if (PySet_Add(set, task) == -1) {
+ Py_DECREF(task_loop);
+ Py_DECREF(task);
+ goto fail;
+ }
+ }
+ Py_DECREF(task_loop);
+ Py_DECREF(task);
+ }
+
+ Py_DECREF(iter);
+ return set;
+
+fail:
+ Py_XDECREF(set);
+ Py_XDECREF(iter);
+ return NULL;
+}
+
+/*[clinic input]
+@classmethod
+_asyncio.Task.all_tasks
+
+ loop: 'O' = NULL
+
+Return a set of all tasks for an event loop.
+
+By default all tasks for the current event loop are returned.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Task_all_tasks_impl(PyTypeObject *type, PyObject *loop)
+/*[clinic end generated code: output=11f9b20749ccca5d input=cd64aa5f88bd5c49]*/
+{
+ PyObject *res;
+
+ if (loop == NULL) {
+ loop = PyObject_CallObject(asyncio_get_event_loop, NULL);
+ if (loop == NULL) {
+ return NULL;
+ }
+
+ res = task_all_tasks(loop);
+ Py_DECREF(loop);
+ }
+ else {
+ res = task_all_tasks(loop);
+ }
+
+ return res;
+}
+
+/*[clinic input]
+_asyncio.Task._repr_info
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Task__repr_info_impl(TaskObj *self)
+/*[clinic end generated code: output=6a490eb66d5ba34b input=3c6d051ed3ddec8b]*/
+{
+ return PyObject_CallFunctionObjArgs(
+ asyncio_task_repr_info_func, self, NULL);
+}
+
+/*[clinic input]
+_asyncio.Task.cancel
+
+Request that this task cancel itself.
+
+This arranges for a CancelledError to be thrown into the
+wrapped coroutine on the next cycle through the event loop.
+The coroutine then has a chance to clean up or even deny
+the request using try/except/finally.
+
+Unlike Future.cancel, this does not guarantee that the
+task will be cancelled: the exception might be caught and
+acted upon, delaying cancellation of the task or preventing
+cancellation completely. The task may also return a value or
+raise a different exception.
+
+Immediately after this method is called, Task.cancelled() will
+not return True (unless the task was already cancelled). A
+task will be marked as cancelled when the wrapped coroutine
+terminates with a CancelledError exception (even if cancel()
+was not called).
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Task_cancel_impl(TaskObj *self)
+/*[clinic end generated code: output=6bfc0479da9d5757 input=13f9bf496695cb52]*/
+{
+ if (self->task_state != STATE_PENDING) {
+ Py_RETURN_FALSE;
+ }
+
+ if (self->task_fut_waiter) {
+ PyObject *res;
+ int is_true;
+
+ res = _PyObject_CallMethodId(
+ self->task_fut_waiter, &PyId_cancel, NULL);
+ if (res == NULL) {
+ return NULL;
+ }
+
+ is_true = PyObject_IsTrue(res);
+ Py_DECREF(res);
+ if (is_true < 0) {
+ return NULL;
+ }
+
+ if (is_true) {
+ Py_RETURN_TRUE;
+ }
+ }
+
+ self->task_must_cancel = 1;
+ Py_RETURN_TRUE;
+}
+
+/*[clinic input]
+_asyncio.Task.get_stack
+
+ *
+ limit: 'O' = None
+
+Return the list of stack frames for this task's coroutine.
+
+If the coroutine is not done, this returns the stack where it is
+suspended. If the coroutine has completed successfully or was
+cancelled, this returns an empty list. If the coroutine was
+terminated by an exception, this returns the list of traceback
+frames.
+
+The frames are always ordered from oldest to newest.
+
+The optional limit gives the maximum number of frames to
+return; by default all available frames are returned. Its
+meaning differs depending on whether a stack or a traceback is
+returned: the newest frames of a stack are returned, but the
+oldest frames of a traceback are returned. (This matches the
+behavior of the traceback module.)
+
+For reasons beyond our control, only one stack frame is
+returned for a suspended coroutine.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Task_get_stack_impl(TaskObj *self, PyObject *limit)
+/*[clinic end generated code: output=c9aeeeebd1e18118 input=b1920230a766d17a]*/
+{
+ return PyObject_CallFunctionObjArgs(
+ asyncio_task_get_stack_func, self, limit, NULL);
+}
+
+/*[clinic input]
+_asyncio.Task.print_stack
+
+ *
+ limit: 'O' = None
+ file: 'O' = None
+
+Print the stack or traceback for this task's coroutine.
+
+This produces output similar to that of the traceback module,
+for the frames retrieved by get_stack(). The limit argument
+is passed to get_stack(). The file argument is an I/O stream
+to which the output is written; by default output is written
+to sys.stderr.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Task_print_stack_impl(TaskObj *self, PyObject *limit,
+ PyObject *file)
+/*[clinic end generated code: output=7339e10314cd3f4d input=19f1e99ab5400bc3]*/
+{
+ return PyObject_CallFunctionObjArgs(
+ asyncio_task_print_stack_func, self, limit, file, NULL);
+}
+
+/*[clinic input]
+_asyncio.Task._step
+
+ exc: 'O' = NULL
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Task__step_impl(TaskObj *self, PyObject *exc)
+/*[clinic end generated code: output=7ed23f0cefd5ae42 input=ada4b2324e5370af]*/
+{
+ return task_step(self, exc == Py_None ? NULL : exc);
+}
+
+/*[clinic input]
+_asyncio.Task._wakeup
+
+ fut: 'O'
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Task__wakeup_impl(TaskObj *self, PyObject *fut)
+/*[clinic end generated code: output=75cb341c760fd071 input=11ee4918a5bdbf21]*/
+{
+ return task_wakeup(self, fut);
+}
+
+static void
+TaskObj_finalize(TaskObj *task)
+{
+ _Py_IDENTIFIER(call_exception_handler);
+ _Py_IDENTIFIER(task);
+ _Py_IDENTIFIER(message);
+ _Py_IDENTIFIER(source_traceback);
+
+ PyObject *message = NULL;
+ PyObject *context = NULL;
+ PyObject *func = NULL;
+ PyObject *res = NULL;
+
+ PyObject *error_type, *error_value, *error_traceback;
+
+ if (task->task_state != STATE_PENDING || !task->task_log_destroy_pending) {
+ goto done;
+ }
+
+ /* Save the current exception, if any. */
+ PyErr_Fetch(&error_type, &error_value, &error_traceback);
+
+ context = PyDict_New();
+ if (context == NULL) {
+ goto finally;
+ }
+
+ message = PyUnicode_FromString("Task was destroyed but it is pending!");
+ if (message == NULL) {
+ goto finally;
+ }
+
+ if (_PyDict_SetItemId(context, &PyId_message, message) < 0 ||
+ _PyDict_SetItemId(context, &PyId_task, (PyObject*)task) < 0)
+ {
+ goto finally;
+ }
+
+ if (task->task_source_tb != NULL) {
+ if (_PyDict_SetItemId(context, &PyId_source_traceback,
+ task->task_source_tb) < 0)
+ {
+ goto finally;
+ }
+ }
+
+ func = _PyObject_GetAttrId(task->task_loop, &PyId_call_exception_handler);
+ if (func != NULL) {
+ res = _PyObject_CallArg1(func, context);
+ if (res == NULL) {
+ PyErr_WriteUnraisable(func);
+ }
+ }
+
+finally:
+ Py_CLEAR(context);
+ Py_CLEAR(message);
+ Py_CLEAR(func);
+ Py_CLEAR(res);
+
+ /* Restore the saved exception. */
+ PyErr_Restore(error_type, error_value, error_traceback);
+
+done:
+ FutureObj_finalize((FutureObj*)task);
+}
+
+static void TaskObj_dealloc(PyObject *); /* Needs Task_CheckExact */
+
+static PyMethodDef TaskType_methods[] = {
+ _ASYNCIO_FUTURE_RESULT_METHODDEF
+ _ASYNCIO_FUTURE_EXCEPTION_METHODDEF
+ _ASYNCIO_FUTURE_SET_RESULT_METHODDEF
+ _ASYNCIO_FUTURE_SET_EXCEPTION_METHODDEF
+ _ASYNCIO_FUTURE_ADD_DONE_CALLBACK_METHODDEF
+ _ASYNCIO_FUTURE_REMOVE_DONE_CALLBACK_METHODDEF
+ _ASYNCIO_FUTURE_CANCELLED_METHODDEF
+ _ASYNCIO_FUTURE_DONE_METHODDEF
+ _ASYNCIO_TASK_CURRENT_TASK_METHODDEF
+ _ASYNCIO_TASK_ALL_TASKS_METHODDEF
+ _ASYNCIO_TASK_CANCEL_METHODDEF
+ _ASYNCIO_TASK_GET_STACK_METHODDEF
+ _ASYNCIO_TASK_PRINT_STACK_METHODDEF
+ _ASYNCIO_TASK__WAKEUP_METHODDEF
+ _ASYNCIO_TASK__STEP_METHODDEF
+ _ASYNCIO_TASK__REPR_INFO_METHODDEF
+ {NULL, NULL} /* Sentinel */
+};
+
+static PyGetSetDef TaskType_getsetlist[] = {
+ FUTURE_COMMON_GETSETLIST
+ {"_log_destroy_pending", (getter)TaskObj_get_log_destroy_pending,
+ (setter)TaskObj_set_log_destroy_pending, NULL},
+ {"_must_cancel", (getter)TaskObj_get_must_cancel, NULL, NULL},
+ {"_coro", (getter)TaskObj_get_coro, NULL, NULL},
+ {"_fut_waiter", (getter)TaskObj_get_fut_waiter, NULL, NULL},
+ {NULL} /* Sentinel */
+};
+
+static PyTypeObject TaskType = {
+ PyVarObject_HEAD_INIT(0, 0)
+ "_asyncio.Task",
+ sizeof(TaskObj), /* tp_basicsize */
+ .tp_base = &FutureType,
+ .tp_dealloc = TaskObj_dealloc,
+ .tp_as_async = &FutureType_as_async,
+ .tp_repr = (reprfunc)FutureObj_repr,
+ .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE
+ | Py_TPFLAGS_HAVE_FINALIZE,
+ .tp_doc = _asyncio_Task___init____doc__,
+ .tp_traverse = (traverseproc)TaskObj_traverse,
+ .tp_clear = (inquiry)TaskObj_clear,
+ .tp_weaklistoffset = offsetof(TaskObj, task_weakreflist),
+ .tp_iter = (getiterfunc)future_new_iter,
+ .tp_methods = TaskType_methods,
+ .tp_getset = TaskType_getsetlist,
+ .tp_dictoffset = offsetof(TaskObj, dict),
+ .tp_init = (initproc)_asyncio_Task___init__,
+ .tp_new = PyType_GenericNew,
+ .tp_finalize = (destructor)TaskObj_finalize,
+};
+
+#define Task_CheckExact(obj) (Py_TYPE(obj) == &TaskType)
+
+static void
+TaskObj_dealloc(PyObject *self)
+{
+ TaskObj *task = (TaskObj *)self;
+
+ if (Task_CheckExact(self)) {
+ /* When fut is subclass of Task, finalizer is called from
+ * subtype_dealloc.
+ */
+ if (PyObject_CallFinalizerFromDealloc(self) < 0) {
+ // resurrected.
+ return;
+ }
+ }
+
+ if (task->task_weakreflist != NULL) {
+ PyObject_ClearWeakRefs(self);
+ }
+
+ (void)TaskObj_clear(task);
+ Py_TYPE(task)->tp_free(task);
+}
+
+static inline PyObject *
+task_call_wakeup(TaskObj *task, PyObject *fut)
+{
+ if (Task_CheckExact(task)) {
+ return task_wakeup(task, fut);
+ }
+ else {
+ /* `task` is a subclass of Task */
+ return _PyObject_CallMethodId(
+ (PyObject*)task, &PyId__wakeup, "O", fut, NULL);
+ }
+}
+
+static inline PyObject *
+task_call_step(TaskObj *task, PyObject *arg)
+{
+ if (Task_CheckExact(task)) {
+ return task_step(task, arg);
+ }
+ else {
+ /* `task` is a subclass of Task */
+ if (arg == NULL) {
+ arg = Py_None;
+ }
+ return _PyObject_CallMethodId(
+ (PyObject*)task, &PyId__step, "O", arg, NULL);
+ }
+}
+
+static int
+task_call_step_soon(TaskObj *task, PyObject *arg)
+{
+ PyObject *handle;
+
+ PyObject *cb = TaskSendMethWrapper_new(task, arg);
+ if (cb == NULL) {
+ return -1;
+ }
+
+ handle = _PyObject_CallMethodId(
+ task->task_loop, &PyId_call_soon, "O", cb, NULL);
+ Py_DECREF(cb);
+ if (handle == NULL) {
+ return -1;
+ }
+
+ Py_DECREF(handle);
+ return 0;
+}
+
+static PyObject *
+task_set_error_soon(TaskObj *task, PyObject *et, const char *format, ...)
+{
+ PyObject* msg;
+
+ va_list vargs;
+#ifdef HAVE_STDARG_PROTOTYPES
+ va_start(vargs, format);
+#else
+ va_start(vargs);
+#endif
+ msg = PyUnicode_FromFormatV(format, vargs);
+ va_end(vargs);
+
+ if (msg == NULL) {
+ return NULL;
+ }
+
+ PyObject *e = PyObject_CallFunctionObjArgs(et, msg, NULL);
+ Py_DECREF(msg);
+ if (e == NULL) {
+ return NULL;
+ }
+
+ if (task_call_step_soon(task, e) == -1) {
+ Py_DECREF(e);
+ return NULL;
+ }
+
+ Py_DECREF(e);
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+task_step_impl(TaskObj *task, PyObject *exc)
+{
+ int res;
+ int clear_exc = 0;
+ PyObject *result = NULL;
+ PyObject *coro = task->task_coro;
+ PyObject *o;
+
+ if (task->task_state != STATE_PENDING) {
+ PyErr_Format(PyExc_AssertionError,
+ "_step(): already done: %R %R",
+ task,
+ exc ? exc : Py_None);
goto fail;
}
- Py_DECREF(module);
- module = PyImport_ImportModule("asyncio.futures");
- if (module == NULL) {
+ if (task->task_must_cancel) {
+ assert(exc != Py_None);
+
+ if (exc) {
+ /* Check if exc is a CancelledError */
+ res = PyObject_IsInstance(exc, asyncio_CancelledError);
+ if (res == -1) {
+ /* An error occurred, abort */
+ goto fail;
+ }
+ if (res == 0) {
+ /* exc is not CancelledError; reset it to NULL */
+ exc = NULL;
+ }
+ }
+
+ if (!exc) {
+ /* exc was not a CancelledError */
+ exc = PyObject_CallFunctionObjArgs(asyncio_CancelledError, NULL);
+ if (!exc) {
+ goto fail;
+ }
+ clear_exc = 1;
+ }
+
+ task->task_must_cancel = 0;
+ }
+
+ Py_CLEAR(task->task_fut_waiter);
+
+ if (exc == NULL) {
+ if (PyGen_CheckExact(coro) || PyCoro_CheckExact(coro)) {
+ result = _PyGen_Send((PyGenObject*)coro, Py_None);
+ }
+ else {
+ result = _PyObject_CallMethodIdObjArgs(
+ coro, &PyId_send, Py_None, NULL);
+ }
+ }
+ else {
+ result = _PyObject_CallMethodIdObjArgs(
+ coro, &PyId_throw, exc, NULL);
+ if (clear_exc) {
+ /* We created 'exc' during this call */
+ Py_CLEAR(exc);
+ }
+ }
+
+ if (result == NULL) {
+ PyObject *et, *ev, *tb;
+
+ if (_PyGen_FetchStopIterationValue(&o) == 0) {
+ /* The error is StopIteration and that means that
+ the underlying coroutine has resolved */
+ PyObject *res = future_set_result((FutureObj*)task, o);
+ Py_DECREF(o);
+ if (res == NULL) {
+ return NULL;
+ }
+ Py_DECREF(res);
+ Py_RETURN_NONE;
+ }
+
+ if (PyErr_ExceptionMatches(asyncio_CancelledError)) {
+ /* CancelledError */
+ PyErr_Clear();
+ return future_cancel((FutureObj*)task);
+ }
+
+ /* Some other exception; pop it and call Task.set_exception() */
+ PyErr_Fetch(&et, &ev, &tb);
+ assert(et);
+ if (!ev || !PyObject_TypeCheck(ev, (PyTypeObject *) et)) {
+ PyErr_NormalizeException(&et, &ev, &tb);
+ }
+ o = future_set_exception((FutureObj*)task, ev);
+ if (!o) {
+ /* An exception in Task.set_exception() */
+ Py_XDECREF(et);
+ Py_XDECREF(tb);
+ Py_XDECREF(ev);
+ goto fail;
+ }
+ assert(o == Py_None);
+ Py_CLEAR(o);
+
+ if (!PyErr_GivenExceptionMatches(et, PyExc_Exception)) {
+ /* We've got a BaseException; re-raise it */
+ PyErr_Restore(et, ev, tb);
+ goto fail;
+ }
+
+ Py_XDECREF(et);
+ Py_XDECREF(tb);
+ Py_XDECREF(ev);
+
+ Py_RETURN_NONE;
+ }
+
+ if (result == (PyObject*)task) {
+ /* We have a task that wants to await on itself */
+ goto self_await;
+ }
+
+ /* Check if `result` is FutureObj or TaskObj (and not a subclass) */
+ if (Future_CheckExact(result) || Task_CheckExact(result)) {
+ PyObject *wrapper;
+ PyObject *res;
+ FutureObj *fut = (FutureObj*)result;
+
+ /* Check if `result` future is attached to a different loop */
+ if (fut->fut_loop != task->task_loop) {
+ goto different_loop;
+ }
+
+ if (fut->fut_blocking) {
+ fut->fut_blocking = 0;
+
+ /* result.add_done_callback(task._wakeup) */
+ wrapper = TaskWakeupMethWrapper_new(task);
+ if (wrapper == NULL) {
+ goto fail;
+ }
+ res = future_add_done_callback((FutureObj*)result, wrapper);
+ Py_DECREF(wrapper);
+ if (res == NULL) {
+ goto fail;
+ }
+ Py_DECREF(res);
+
+ /* task._fut_waiter = result */
+ task->task_fut_waiter = result; /* no incref is necessary */
+
+ if (task->task_must_cancel) {
+ PyObject *r;
+ r = future_cancel(fut);
+ if (r == NULL) {
+ return NULL;
+ }
+ if (r == Py_True) {
+ task->task_must_cancel = 0;
+ }
+ Py_DECREF(r);
+ }
+
+ Py_RETURN_NONE;
+ }
+ else {
+ goto yield_insteadof_yf;
+ }
+ }
+
+ /* Check if `result` is a Future-compatible object */
+ o = PyObject_GetAttrString(result, "_asyncio_future_blocking");
+ if (o == NULL) {
+ if (PyErr_ExceptionMatches(PyExc_AttributeError)) {
+ PyErr_Clear();
+ }
+ else {
+ goto fail;
+ }
+ }
+ else {
+ if (o == Py_None) {
+ Py_CLEAR(o);
+ }
+ else {
+ /* `result` is a Future-compatible object */
+ PyObject *wrapper;
+ PyObject *res;
+
+ int blocking = PyObject_IsTrue(o);
+ Py_CLEAR(o);
+ if (blocking < 0) {
+ goto fail;
+ }
+
+ /* Check if `result` future is attached to a different loop */
+ PyObject *oloop = PyObject_GetAttrString(result, "_loop");
+ if (oloop == NULL) {
+ goto fail;
+ }
+ if (oloop != task->task_loop) {
+ Py_DECREF(oloop);
+ goto different_loop;
+ }
+ else {
+ Py_DECREF(oloop);
+ }
+
+ if (blocking) {
+ /* result._asyncio_future_blocking = False */
+ if (PyObject_SetAttrString(
+ result, "_asyncio_future_blocking", Py_False) == -1) {
+ goto fail;
+ }
+
+ /* result.add_done_callback(task._wakeup) */
+ wrapper = TaskWakeupMethWrapper_new(task);
+ if (wrapper == NULL) {
+ goto fail;
+ }
+ res = _PyObject_CallMethodId(
+ result, &PyId_add_done_callback, "O", wrapper, NULL);
+ Py_DECREF(wrapper);
+ if (res == NULL) {
+ goto fail;
+ }
+ Py_DECREF(res);
+
+ /* task._fut_waiter = result */
+ task->task_fut_waiter = result; /* no incref is necessary */
+
+ if (task->task_must_cancel) {
+ PyObject *r;
+ int is_true;
+ r = _PyObject_CallMethodId(result, &PyId_cancel, NULL);
+ if (r == NULL) {
+ return NULL;
+ }
+ is_true = PyObject_IsTrue(r);
+ Py_DECREF(r);
+ if (is_true < 0) {
+ return NULL;
+ }
+ else if (is_true) {
+ task->task_must_cancel = 0;
+ }
+ }
+
+ Py_RETURN_NONE;
+ }
+ else {
+ goto yield_insteadof_yf;
+ }
+ }
+ }
+
+ /* Check if `result` is None */
+ if (result == Py_None) {
+ /* Bare yield relinquishes control for one event loop iteration. */
+ if (task_call_step_soon(task, NULL)) {
+ goto fail;
+ }
+ return result;
+ }
+
+ /* Check if `result` is a generator */
+ o = PyObject_CallFunctionObjArgs(inspect_isgenerator, result, NULL);
+ if (o == NULL) {
+ /* An exception in inspect.isgenerator */
goto fail;
}
- asyncio_repr_info_func = PyObject_GetAttrString(module,
- "_future_repr_info");
- if (asyncio_repr_info_func == NULL) {
+ res = PyObject_IsTrue(o);
+ Py_CLEAR(o);
+ if (res == -1) {
+ /* An exception while checking if 'val' is True */
goto fail;
}
+ if (res == 1) {
+ /* `result` is a generator */
+ PyObject *ret;
+ ret = task_set_error_soon(
+ task, PyExc_RuntimeError,
+ "yield was used instead of yield from for "
+ "generator in task %R with %S", task, result);
+ Py_DECREF(result);
+ return ret;
+ }
+
+ /* The `result` is none of the above */
+ Py_DECREF(result);
+ return task_set_error_soon(
+ task, PyExc_RuntimeError, "Task got bad yield: %R", result);
+
+self_await:
+ o = task_set_error_soon(
+ task, PyExc_RuntimeError,
+ "Task cannot await on itself: %R", task);
+ Py_DECREF(result);
+ return o;
+
+yield_insteadof_yf:
+ o = task_set_error_soon(
+ task, PyExc_RuntimeError,
+ "yield was used instead of yield from "
+ "in task %R with %R",
+ task, result);
+ Py_DECREF(result);
+ return o;
+
+different_loop:
+ o = task_set_error_soon(
+ task, PyExc_RuntimeError,
+ "Task %R got Future %R attached to a different loop",
+ task, result);
+ Py_DECREF(result);
+ return o;
- asyncio_InvalidStateError = PyObject_GetAttrString(module,
- "InvalidStateError");
- if (asyncio_InvalidStateError == NULL) {
- goto fail;
+fail:
+ Py_XDECREF(result);
+ return NULL;
+}
+
+static PyObject *
+task_step(TaskObj *task, PyObject *exc)
+{
+ PyObject *res;
+ PyObject *ot;
+
+ if (PyDict_SetItem((PyObject *)current_tasks,
+ task->task_loop, (PyObject*)task) == -1)
+ {
+ return NULL;
}
- asyncio_CancelledError = PyObject_GetAttrString(module, "CancelledError");
- if (asyncio_CancelledError == NULL) {
- goto fail;
+ res = task_step_impl(task, exc);
+
+ if (res == NULL) {
+ PyObject *et, *ev, *tb;
+ PyErr_Fetch(&et, &ev, &tb);
+ ot = _PyDict_Pop(current_tasks, task->task_loop, NULL);
+ if (ot == NULL) {
+ Py_XDECREF(et);
+ Py_XDECREF(tb);
+ Py_XDECREF(ev);
+ return NULL;
+ }
+ Py_DECREF(ot);
+ PyErr_Restore(et, ev, tb);
+ return NULL;
+ }
+ else {
+ ot = _PyDict_Pop(current_tasks, task->task_loop, NULL);
+ if (ot == NULL) {
+ Py_DECREF(res);
+ return NULL;
+ }
+ else {
+ Py_DECREF(ot);
+ return res;
+ }
}
+}
- Py_DECREF(module);
- return 0;
+static PyObject *
+task_wakeup(TaskObj *task, PyObject *o)
+{
+ assert(o);
-fail:
+ if (Future_CheckExact(o) || Task_CheckExact(o)) {
+ PyObject *fut_result = NULL;
+ int res = future_get_result((FutureObj*)o, &fut_result);
+ PyObject *result;
+
+ switch(res) {
+ case -1:
+ assert(fut_result == NULL);
+ return NULL;
+ case 0:
+ Py_DECREF(fut_result);
+ return task_call_step(task, NULL);
+ default:
+ assert(res == 1);
+ result = task_call_step(task, fut_result);
+ Py_DECREF(fut_result);
+ return result;
+ }
+ }
+
+ PyObject *fut_result = PyObject_CallMethod(o, "result", NULL);
+ if (fut_result == NULL) {
+ PyObject *et, *ev, *tb;
+ PyObject *res;
+
+ PyErr_Fetch(&et, &ev, &tb);
+ if (!ev || !PyObject_TypeCheck(ev, (PyTypeObject *) et)) {
+ PyErr_NormalizeException(&et, &ev, &tb);
+ }
+
+ res = task_call_step(task, ev);
+
+ Py_XDECREF(et);
+ Py_XDECREF(tb);
+ Py_XDECREF(ev);
+
+ return res;
+ }
+ else {
+ Py_DECREF(fut_result);
+ return task_call_step(task, NULL);
+ }
+}
+
+
+/*********************** Module **************************/
+
+
+static void
+module_free(void *m)
+{
+ Py_CLEAR(current_tasks);
+ Py_CLEAR(all_tasks);
Py_CLEAR(traceback_extract_stack);
Py_CLEAR(asyncio_get_event_loop);
- Py_CLEAR(asyncio_repr_info_func);
+ Py_CLEAR(asyncio_future_repr_info_func);
+ Py_CLEAR(asyncio_task_repr_info_func);
+ Py_CLEAR(asyncio_task_get_stack_func);
+ Py_CLEAR(asyncio_task_print_stack_func);
Py_CLEAR(asyncio_InvalidStateError);
Py_CLEAR(asyncio_CancelledError);
+ Py_CLEAR(inspect_isgenerator);
+}
+
+static int
+module_init(void)
+{
+ PyObject *module = NULL;
+ PyObject *cls;
+
+#define WITH_MOD(NAME) \
+ Py_CLEAR(module); \
+ module = PyImport_ImportModule(NAME); \
+ if (module == NULL) { \
+ return -1; \
+ }
+
+#define GET_MOD_ATTR(VAR, NAME) \
+ VAR = PyObject_GetAttrString(module, NAME); \
+ if (VAR == NULL) { \
+ goto fail; \
+ }
+
+ WITH_MOD("asyncio.events")
+ GET_MOD_ATTR(asyncio_get_event_loop, "get_event_loop")
+
+ WITH_MOD("asyncio.base_futures")
+ GET_MOD_ATTR(asyncio_future_repr_info_func, "_future_repr_info")
+ GET_MOD_ATTR(asyncio_InvalidStateError, "InvalidStateError")
+ GET_MOD_ATTR(asyncio_CancelledError, "CancelledError")
+
+ WITH_MOD("asyncio.base_tasks")
+ GET_MOD_ATTR(asyncio_task_repr_info_func, "_task_repr_info")
+ GET_MOD_ATTR(asyncio_task_get_stack_func, "_task_get_stack")
+ GET_MOD_ATTR(asyncio_task_print_stack_func, "_task_print_stack")
+
+ WITH_MOD("inspect")
+ GET_MOD_ATTR(inspect_isgenerator, "isgenerator")
+
+ WITH_MOD("traceback")
+ GET_MOD_ATTR(traceback_extract_stack, "extract_stack")
+
+ WITH_MOD("weakref")
+ GET_MOD_ATTR(cls, "WeakSet")
+ all_tasks = PyObject_CallObject(cls, NULL);
+ Py_CLEAR(cls);
+ if (all_tasks == NULL) {
+ goto fail;
+ }
+
+ current_tasks = (PyDictObject *)PyDict_New();
+ if (current_tasks == NULL) {
+ goto fail;
+ }
+
Py_CLEAR(module);
+ return 0;
+
+fail:
+ Py_CLEAR(module);
+ module_free(NULL);
return -1;
-}
+#undef WITH_MOD
+#undef GET_MOD_ATTR
+}
PyDoc_STRVAR(module_doc, "Accelerator module for asyncio");
@@ -1006,14 +2413,14 @@ static struct PyModuleDef _asynciomodule = {
NULL, /* m_slots */
NULL, /* m_traverse */
NULL, /* m_clear */
- NULL, /* m_free */
+ (freefunc)module_free /* m_free */
};
PyMODINIT_FUNC
PyInit__asyncio(void)
{
- if (init_module() < 0) {
+ if (module_init() < 0) {
return NULL;
}
if (PyType_Ready(&FutureType) < 0) {
@@ -1022,6 +2429,15 @@ PyInit__asyncio(void)
if (PyType_Ready(&FutureIterType) < 0) {
return NULL;
}
+ if (PyType_Ready(&TaskSendMethWrapper_Type) < 0) {
+ return NULL;
+ }
+ if(PyType_Ready(&TaskWakeupMethWrapper_Type) < 0) {
+ return NULL;
+ }
+ if (PyType_Ready(&TaskType) < 0) {
+ return NULL;
+ }
PyObject *m = PyModule_Create(&_asynciomodule);
if (m == NULL) {
@@ -1034,5 +2450,11 @@ PyInit__asyncio(void)
return NULL;
}
+ Py_INCREF(&TaskType);
+ if (PyModule_AddObject(m, "Task", (PyObject *)&TaskType) < 0) {
+ Py_DECREF(&TaskType);
+ return NULL;
+ }
+
return m;
}