summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/asyncio/events.py27
-rw-r--r--Lib/test/test_asyncio/test_events.py133
-rw-r--r--Misc/NEWS.d/next/Library/2017-12-12-18-01-01.bpo-32296.bwscHz.rst2
-rw-r--r--Modules/_asynciomodule.c251
-rw-r--r--Modules/clinic/_asynciomodule.c.h80
5 files changed, 465 insertions, 28 deletions
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index e425b06..a00f861 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -652,6 +652,7 @@ def get_running_loop():
This function is thread-specific.
"""
+ # NOTE: this function is implemented in C (see _asynciomodule.c)
loop = _get_running_loop()
if loop is None:
raise RuntimeError('no running event loop')
@@ -664,6 +665,7 @@ def _get_running_loop():
This is a low-level function intended to be used by event loops.
This function is thread-specific.
"""
+ # NOTE: this function is implemented in C (see _asynciomodule.c)
running_loop, pid = _running_loop.loop_pid
if running_loop is not None and pid == os.getpid():
return running_loop
@@ -675,6 +677,7 @@ def _set_running_loop(loop):
This is a low-level function intended to be used by event loops.
This function is thread-specific.
"""
+ # NOTE: this function is implemented in C (see _asynciomodule.c)
_running_loop.loop_pid = (loop, os.getpid())
@@ -711,6 +714,7 @@ def get_event_loop():
If there is no running event loop set, the function will return
the result of `get_event_loop_policy().get_event_loop()` call.
"""
+ # NOTE: this function is implemented in C (see _asynciomodule.c)
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
@@ -736,3 +740,26 @@ def set_child_watcher(watcher):
"""Equivalent to calling
get_event_loop_policy().set_child_watcher(watcher)."""
return get_event_loop_policy().set_child_watcher(watcher)
+
+
+# Alias pure-Python implementations for testing purposes.
+_py__get_running_loop = _get_running_loop
+_py__set_running_loop = _set_running_loop
+_py_get_running_loop = get_running_loop
+_py_get_event_loop = get_event_loop
+
+
+try:
+ # get_event_loop() is one of the most frequently called
+ # functions in asyncio. Pure Python implementation is
+ # about 4 times slower than C-accelerated.
+ from _asyncio import (_get_running_loop, _set_running_loop,
+ get_running_loop, get_event_loop)
+except ImportError:
+ pass
+else:
+ # Alias C implementations for testing purposes.
+ _c__get_running_loop = _get_running_loop
+ _c__set_running_loop = _set_running_loop
+ _c_get_running_loop = get_running_loop
+ _c_get_event_loop = get_event_loop
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 1315feb..144921a 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -27,6 +27,7 @@ if sys.platform != 'win32':
import asyncio
from asyncio import coroutines
+from asyncio import events
from asyncio import proactor_events
from asyncio import selector_events
from test.test_asyncio import utils as test_utils
@@ -2145,23 +2146,6 @@ else:
asyncio.set_child_watcher(None)
super().tearDown()
- def test_get_event_loop_new_process(self):
- # Issue bpo-32126: The multiprocessing module used by
- # ProcessPoolExecutor is not functional when the
- # multiprocessing.synchronize module cannot be imported.
- support.import_module('multiprocessing.synchronize')
- async def main():
- pool = concurrent.futures.ProcessPoolExecutor()
- result = await self.loop.run_in_executor(
- pool, _test_get_event_loop_new_process__sub_proc)
- pool.shutdown()
- return result
-
- self.unpatch_get_running_loop()
-
- self.assertEqual(
- self.loop.run_until_complete(main()),
- 'hello')
if hasattr(selectors, 'KqueueSelector'):
class KqueueEventLoopTests(UnixEventLoopTestsMixin,
@@ -2722,17 +2706,95 @@ class PolicyTests(unittest.TestCase):
self.assertIs(policy, asyncio.get_event_loop_policy())
self.assertIsNot(policy, old_policy)
+
+class GetEventLoopTestsMixin:
+
+ _get_running_loop_impl = None
+ _set_running_loop_impl = None
+ get_running_loop_impl = None
+ get_event_loop_impl = None
+
+ def setUp(self):
+ self._get_running_loop_saved = events._get_running_loop
+ self._set_running_loop_saved = events._set_running_loop
+ self.get_running_loop_saved = events.get_running_loop
+ self.get_event_loop_saved = events.get_event_loop
+
+ events._get_running_loop = type(self)._get_running_loop_impl
+ events._set_running_loop = type(self)._set_running_loop_impl
+ events.get_running_loop = type(self).get_running_loop_impl
+ events.get_event_loop = type(self).get_event_loop_impl
+
+ asyncio._get_running_loop = type(self)._get_running_loop_impl
+ asyncio._set_running_loop = type(self)._set_running_loop_impl
+ asyncio.get_running_loop = type(self).get_running_loop_impl
+ asyncio.get_event_loop = type(self).get_event_loop_impl
+
+ super().setUp()
+
+ self.loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(self.loop)
+
+ watcher = asyncio.SafeChildWatcher()
+ watcher.attach_loop(self.loop)
+ asyncio.set_child_watcher(watcher)
+
+ def tearDown(self):
+ try:
+ asyncio.set_child_watcher(None)
+ super().tearDown()
+ finally:
+ self.loop.close()
+ asyncio.set_event_loop(None)
+
+ events._get_running_loop = self._get_running_loop_saved
+ events._set_running_loop = self._set_running_loop_saved
+ events.get_running_loop = self.get_running_loop_saved
+ events.get_event_loop = self.get_event_loop_saved
+
+ asyncio._get_running_loop = self._get_running_loop_saved
+ asyncio._set_running_loop = self._set_running_loop_saved
+ asyncio.get_running_loop = self.get_running_loop_saved
+ asyncio.get_event_loop = self.get_event_loop_saved
+
+ if sys.platform != 'win32':
+
+ def test_get_event_loop_new_process(self):
+ # Issue bpo-32126: The multiprocessing module used by
+ # ProcessPoolExecutor is not functional when the
+ # multiprocessing.synchronize module cannot be imported.
+ support.import_module('multiprocessing.synchronize')
+
+ async def main():
+ pool = concurrent.futures.ProcessPoolExecutor()
+ result = await self.loop.run_in_executor(
+ pool, _test_get_event_loop_new_process__sub_proc)
+ pool.shutdown()
+ return result
+
+ self.assertEqual(
+ self.loop.run_until_complete(main()),
+ 'hello')
+
def test_get_event_loop_returns_running_loop(self):
+ class TestError(Exception):
+ pass
+
class Policy(asyncio.DefaultEventLoopPolicy):
def get_event_loop(self):
- raise NotImplementedError
-
- loop = None
+ raise TestError
old_policy = asyncio.get_event_loop_policy()
try:
asyncio.set_event_loop_policy(Policy())
loop = asyncio.new_event_loop()
+
+ with self.assertRaises(TestError):
+ asyncio.get_event_loop()
+ asyncio.set_event_loop(None)
+ with self.assertRaises(TestError):
+ asyncio.get_event_loop()
+
with self.assertRaisesRegex(RuntimeError, 'no running'):
self.assertIs(asyncio.get_running_loop(), None)
self.assertIs(asyncio._get_running_loop(), None)
@@ -2743,6 +2805,15 @@ class PolicyTests(unittest.TestCase):
self.assertIs(asyncio._get_running_loop(), loop)
loop.run_until_complete(func())
+
+ asyncio.set_event_loop(loop)
+ with self.assertRaises(TestError):
+ asyncio.get_event_loop()
+
+ asyncio.set_event_loop(None)
+ with self.assertRaises(TestError):
+ asyncio.get_event_loop()
+
finally:
asyncio.set_event_loop_policy(old_policy)
if loop is not None:
@@ -2754,5 +2825,27 @@ class PolicyTests(unittest.TestCase):
self.assertIs(asyncio._get_running_loop(), None)
+class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
+
+ _get_running_loop_impl = events._py__get_running_loop
+ _set_running_loop_impl = events._py__set_running_loop
+ get_running_loop_impl = events._py_get_running_loop
+ get_event_loop_impl = events._py_get_event_loop
+
+
+try:
+ import _asyncio # NoQA
+except ImportError:
+ pass
+else:
+
+ class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
+
+ _get_running_loop_impl = events._c__get_running_loop
+ _set_running_loop_impl = events._c__set_running_loop
+ get_running_loop_impl = events._c_get_running_loop
+ get_event_loop_impl = events._c_get_event_loop
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/Misc/NEWS.d/next/Library/2017-12-12-18-01-01.bpo-32296.bwscHz.rst b/Misc/NEWS.d/next/Library/2017-12-12-18-01-01.bpo-32296.bwscHz.rst
new file mode 100644
index 0000000..4100d48
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-12-12-18-01-01.bpo-32296.bwscHz.rst
@@ -0,0 +1,2 @@
+Implement asyncio._get_running_loop() and get_event_loop() in C. This makes
+them 4x faster.
diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c
index 2c64c55..01c38b8 100644
--- a/Modules/_asynciomodule.c
+++ b/Modules/_asynciomodule.c
@@ -9,9 +9,11 @@ module _asyncio
/* identifiers used from some functions */
+_Py_IDENTIFIER(__asyncio_running_event_loop__);
_Py_IDENTIFIER(add_done_callback);
_Py_IDENTIFIER(call_soon);
_Py_IDENTIFIER(cancel);
+_Py_IDENTIFIER(get_event_loop);
_Py_IDENTIFIER(send);
_Py_IDENTIFIER(throw);
_Py_IDENTIFIER(_step);
@@ -23,7 +25,7 @@ _Py_IDENTIFIER(_wakeup);
static PyObject *all_tasks;
static PyObject *current_tasks;
static PyObject *traceback_extract_stack;
-static PyObject *asyncio_get_event_loop;
+static PyObject *asyncio_get_event_loop_policy;
static PyObject *asyncio_future_repr_info_func;
static PyObject *asyncio_task_repr_info_func;
static PyObject *asyncio_task_get_stack_func;
@@ -31,6 +33,7 @@ static PyObject *asyncio_task_print_stack_func;
static PyObject *asyncio_InvalidStateError;
static PyObject *asyncio_CancelledError;
static PyObject *inspect_isgenerator;
+static PyObject *os_getpid;
typedef enum {
@@ -88,6 +91,134 @@ class _asyncio.Future "FutureObj *" "&Future_Type"
static PyObject* future_new_iter(PyObject *);
static inline int future_call_schedule_callbacks(FutureObj *);
+
+static int
+get_running_loop(PyObject **loop)
+{
+ PyObject *ts_dict;
+ PyObject *running_tuple;
+ PyObject *running_loop;
+ PyObject *running_loop_pid;
+ PyObject *current_pid;
+ int same_pid;
+
+ ts_dict = PyThreadState_GetDict(); // borrowed
+ if (ts_dict == NULL) {
+ PyErr_SetString(
+ PyExc_RuntimeError, "thread-local storage is not available");
+ goto error;
+ }
+
+ running_tuple = _PyDict_GetItemId(
+ ts_dict, &PyId___asyncio_running_event_loop__); // borrowed
+ if (running_tuple == NULL) {
+ /* _PyDict_GetItemId doesn't set an error if key is not found */
+ goto not_found;
+ }
+
+ assert(PyTuple_CheckExact(running_tuple));
+ assert(PyTuple_Size(running_tuple) == 2);
+ running_loop = PyTuple_GET_ITEM(running_tuple, 0); // borrowed
+ running_loop_pid = PyTuple_GET_ITEM(running_tuple, 1); // borrowed
+
+ if (running_loop == Py_None) {
+ goto not_found;
+ }
+
+ current_pid = _PyObject_CallNoArg(os_getpid);
+ if (current_pid == NULL) {
+ goto error;
+ }
+ same_pid = PyObject_RichCompareBool(current_pid, running_loop_pid, Py_EQ);
+ Py_DECREF(current_pid);
+ if (same_pid == -1) {
+ goto error;
+ }
+
+ if (same_pid) {
+ // current_pid == running_loop_pid
+ goto found;
+ }
+
+not_found:
+ *loop = NULL;
+ return 0;
+
+found:
+ Py_INCREF(running_loop);
+ *loop = running_loop;
+ return 0;
+
+error:
+ *loop = NULL;
+ return -1;
+}
+
+
+static int
+set_running_loop(PyObject *loop)
+{
+ PyObject *ts_dict;
+ PyObject *running_tuple;
+ PyObject *current_pid;
+
+ ts_dict = PyThreadState_GetDict(); // borrowed
+ if (ts_dict == NULL) {
+ PyErr_SetString(
+ PyExc_RuntimeError, "thread-local storage is not available");
+ return -1;
+ }
+
+ current_pid = _PyObject_CallNoArg(os_getpid);
+ if (current_pid == NULL) {
+ return -1;
+ }
+
+ running_tuple = PyTuple_New(2);
+ if (running_tuple == NULL) {
+ Py_DECREF(current_pid);
+ return -1;
+ }
+
+ Py_INCREF(loop);
+ PyTuple_SET_ITEM(running_tuple, 0, loop);
+ PyTuple_SET_ITEM(running_tuple, 1, current_pid); // borrowed
+
+ if (_PyDict_SetItemId(
+ ts_dict, &PyId___asyncio_running_event_loop__, running_tuple)) {
+ Py_DECREF(running_tuple); // will cleanup loop & current_pid
+ return -1;
+ }
+ Py_DECREF(running_tuple);
+
+ return 0;
+}
+
+
+static PyObject *
+get_event_loop(void)
+{
+ PyObject *loop;
+ PyObject *policy;
+
+ if (get_running_loop(&loop)) {
+ return NULL;
+ }
+ if (loop != NULL) {
+ return loop;
+ }
+
+ policy = _PyObject_CallNoArg(asyncio_get_event_loop_policy);
+ if (policy == NULL) {
+ return NULL;
+ }
+
+ loop = _PyObject_CallMethodId(policy, &PyId_get_event_loop, NULL);
+ Py_DECREF(policy);
+ return loop;
+}
+
+
static int
future_schedule_callbacks(FutureObj *fut)
{
@@ -140,7 +271,7 @@ future_init(FutureObj *fut, PyObject *loop)
_Py_IDENTIFIER(get_debug);
if (loop == Py_None) {
- loop = _PyObject_CallNoArg(asyncio_get_event_loop);
+ loop = get_event_loop();
if (loop == NULL) {
return -1;
}
@@ -1449,7 +1580,7 @@ _asyncio_Task_current_task_impl(PyTypeObject *type, PyObject *loop)
PyObject *res;
if (loop == Py_None) {
- loop = _PyObject_CallNoArg(asyncio_get_event_loop);
+ loop = get_event_loop();
if (loop == NULL) {
return NULL;
}
@@ -1536,7 +1667,7 @@ _asyncio_Task_all_tasks_impl(PyTypeObject *type, PyObject *loop)
PyObject *res;
if (loop == Py_None) {
- loop = _PyObject_CallNoArg(asyncio_get_event_loop);
+ loop = get_event_loop();
if (loop == NULL) {
return NULL;
}
@@ -2368,6 +2499,100 @@ task_wakeup(TaskObj *task, PyObject *o)
}
+/*********************** Functions **************************/
+
+
+/*[clinic input]
+_asyncio._get_running_loop
+
+Return the running event loop or None.
+
+This is a low-level function intended to be used by event loops.
+This function is thread-specific.
+
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__get_running_loop_impl(PyObject *module)
+/*[clinic end generated code: output=b4390af721411a0a input=0a21627e25a4bd43]*/
+{
+ PyObject *loop;
+ if (get_running_loop(&loop)) {
+ return NULL;
+ }
+ if (loop == NULL) {
+ /* There's no currently running event loop */
+ Py_RETURN_NONE;
+ }
+ return loop;
+}
+
+/*[clinic input]
+_asyncio._set_running_loop
+ loop: 'O'
+ /
+
+Set the running event loop.
+
+This is a low-level function intended to be used by event loops.
+This function is thread-specific.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__set_running_loop(PyObject *module, PyObject *loop)
+/*[clinic end generated code: output=ae56bf7a28ca189a input=4c9720233d606604]*/
+{
+ if (set_running_loop(loop)) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+/*[clinic input]
+_asyncio.get_event_loop
+
+Return an asyncio event loop.
+
+When called from a coroutine or a callback (e.g. scheduled with
+call_soon or similar API), this function will always return the
+running event loop.
+
+If there is no running event loop set, the function will return
+the result of `get_event_loop_policy().get_event_loop()` call.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_get_event_loop_impl(PyObject *module)
+/*[clinic end generated code: output=2a2d8b2f824c648b input=9364bf2916c8655d]*/
+{
+ return get_event_loop();
+}
+
+/*[clinic input]
+_asyncio.get_running_loop
+
+Return the running event loop. Raise a RuntimeError if there is none.
+
+This function is thread-specific.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_get_running_loop_impl(PyObject *module)
+/*[clinic end generated code: output=c247b5f9e529530e input=2a3bf02ba39f173d]*/
+{
+ PyObject *loop;
+ if (get_running_loop(&loop)) {
+ return NULL;
+ }
+ if (loop == NULL) {
+ /* There's no currently running event loop */
+ PyErr_SetString(
+ PyExc_RuntimeError, "no running event loop");
+ }
+ return loop;
+}
+
+
/*********************** Module **************************/
@@ -2377,7 +2602,7 @@ module_free(void *m)
Py_CLEAR(current_tasks);
Py_CLEAR(all_tasks);
Py_CLEAR(traceback_extract_stack);
- Py_CLEAR(asyncio_get_event_loop);
+ Py_CLEAR(asyncio_get_event_loop_policy);
Py_CLEAR(asyncio_future_repr_info_func);
Py_CLEAR(asyncio_task_repr_info_func);
Py_CLEAR(asyncio_task_get_stack_func);
@@ -2385,6 +2610,7 @@ module_free(void *m)
Py_CLEAR(asyncio_InvalidStateError);
Py_CLEAR(asyncio_CancelledError);
Py_CLEAR(inspect_isgenerator);
+ Py_CLEAR(os_getpid);
}
static int
@@ -2407,7 +2633,7 @@ module_init(void)
}
WITH_MOD("asyncio.events")
- GET_MOD_ATTR(asyncio_get_event_loop, "get_event_loop")
+ GET_MOD_ATTR(asyncio_get_event_loop_policy, "get_event_loop_policy")
WITH_MOD("asyncio.base_futures")
GET_MOD_ATTR(asyncio_future_repr_info_func, "_future_repr_info")
@@ -2422,6 +2648,9 @@ module_init(void)
WITH_MOD("inspect")
GET_MOD_ATTR(inspect_isgenerator, "isgenerator")
+ WITH_MOD("os")
+ GET_MOD_ATTR(os_getpid, "getpid")
+
WITH_MOD("traceback")
GET_MOD_ATTR(traceback_extract_stack, "extract_stack")
@@ -2452,12 +2681,20 @@ fail:
PyDoc_STRVAR(module_doc, "Accelerator module for asyncio");
+static PyMethodDef asyncio_methods[] = {
+ _ASYNCIO_GET_EVENT_LOOP_METHODDEF
+ _ASYNCIO_GET_RUNNING_LOOP_METHODDEF
+ _ASYNCIO__GET_RUNNING_LOOP_METHODDEF
+ _ASYNCIO__SET_RUNNING_LOOP_METHODDEF
+ {NULL, NULL}
+};
+
static struct PyModuleDef _asynciomodule = {
PyModuleDef_HEAD_INIT, /* m_base */
"_asyncio", /* m_name */
module_doc, /* m_doc */
-1, /* m_size */
- NULL, /* m_methods */
+ asyncio_methods, /* m_methods */
NULL, /* m_slots */
NULL, /* m_traverse */
NULL, /* m_clear */
diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h
index 7627849..8022d1c 100644
--- a/Modules/clinic/_asynciomodule.c.h
+++ b/Modules/clinic/_asynciomodule.c.h
@@ -517,4 +517,82 @@ _asyncio_Task__wakeup(TaskObj *self, PyObject **args, Py_ssize_t nargs, PyObject
exit:
return return_value;
}
-/*[clinic end generated code: output=b92f9cd2b9fb37ef input=a9049054013a1b77]*/
+
+PyDoc_STRVAR(_asyncio__get_running_loop__doc__,
+"_get_running_loop($module, /)\n"
+"--\n"
+"\n"
+"Return the running event loop or None.\n"
+"\n"
+"This is a low-level function intended to be used by event loops.\n"
+"This function is thread-specific.");
+
+#define _ASYNCIO__GET_RUNNING_LOOP_METHODDEF \
+ {"_get_running_loop", (PyCFunction)_asyncio__get_running_loop, METH_NOARGS, _asyncio__get_running_loop__doc__},
+
+static PyObject *
+_asyncio__get_running_loop_impl(PyObject *module);
+
+static PyObject *
+_asyncio__get_running_loop(PyObject *module, PyObject *Py_UNUSED(ignored))
+{
+ return _asyncio__get_running_loop_impl(module);
+}
+
+PyDoc_STRVAR(_asyncio__set_running_loop__doc__,
+"_set_running_loop($module, loop, /)\n"
+"--\n"
+"\n"
+"Set the running event loop.\n"
+"\n"
+"This is a low-level function intended to be used by event loops.\n"
+"This function is thread-specific.");
+
+#define _ASYNCIO__SET_RUNNING_LOOP_METHODDEF \
+ {"_set_running_loop", (PyCFunction)_asyncio__set_running_loop, METH_O, _asyncio__set_running_loop__doc__},
+
+PyDoc_STRVAR(_asyncio_get_event_loop__doc__,
+"get_event_loop($module, /)\n"
+"--\n"
+"\n"
+"Return an asyncio event loop.\n"
+"\n"
+"When called from a coroutine or a callback (e.g. scheduled with\n"
+"call_soon or similar API), this function will always return the\n"
+"running event loop.\n"
+"\n"
+"If there is no running event loop set, the function will return\n"
+"the result of `get_event_loop_policy().get_event_loop()` call.");
+
+#define _ASYNCIO_GET_EVENT_LOOP_METHODDEF \
+ {"get_event_loop", (PyCFunction)_asyncio_get_event_loop, METH_NOARGS, _asyncio_get_event_loop__doc__},
+
+static PyObject *
+_asyncio_get_event_loop_impl(PyObject *module);
+
+static PyObject *
+_asyncio_get_event_loop(PyObject *module, PyObject *Py_UNUSED(ignored))
+{
+ return _asyncio_get_event_loop_impl(module);
+}
+
+PyDoc_STRVAR(_asyncio_get_running_loop__doc__,
+"get_running_loop($module, /)\n"
+"--\n"
+"\n"
+"Return the running event loop. Raise a RuntimeError if there is none.\n"
+"\n"
+"This function is thread-specific.");
+
+#define _ASYNCIO_GET_RUNNING_LOOP_METHODDEF \
+ {"get_running_loop", (PyCFunction)_asyncio_get_running_loop, METH_NOARGS, _asyncio_get_running_loop__doc__},
+
+static PyObject *
+_asyncio_get_running_loop_impl(PyObject *module);
+
+static PyObject *
+_asyncio_get_running_loop(PyObject *module, PyObject *Py_UNUSED(ignored))
+{
+ return _asyncio_get_running_loop_impl(module);
+}
+/*[clinic end generated code: output=d40b94e629571d48 input=a9049054013a1b77]*/