From c48e0ef7dd13457799e7586f64682da672663c70 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 2 May 2024 09:23:25 +0100 Subject: [3.12] GH-117881: fix athrow().throw()/asend().throw() concurrent access (GH-117882) (#118458) GH-117881: fix athrow().throw()/asend().throw() concurrent access (GH-117882) (cherry picked from commit fc7e1aa3c001bbce25973261fba457035719a559) --- Lib/test/test_asyncgen.py | 148 +++++++++++++++++++++ .../2024-04-15-07-37-09.gh-issue-117881.07H0wI.rst | 1 + Objects/genobject.c | 37 ++++++ 3 files changed, 186 insertions(+) create mode 100644 Misc/NEWS.d/next/Core and Builtins/2024-04-15-07-37-09.gh-issue-117881.07H0wI.rst diff --git a/Lib/test/test_asyncgen.py b/Lib/test/test_asyncgen.py index ee7e396..9e19980 100644 --- a/Lib/test/test_asyncgen.py +++ b/Lib/test/test_asyncgen.py @@ -390,6 +390,151 @@ class AsyncGenTest(unittest.TestCase): r'anext\(\): asynchronous generator is already running'): an.__next__() + with self.assertRaisesRegex(RuntimeError, + r"cannot reuse already awaited __anext__\(\)/asend\(\)"): + an.send(None) + + def test_async_gen_asend_throw_concurrent_with_send(self): + import types + + @types.coroutine + def _async_yield(v): + return (yield v) + + class MyExc(Exception): + pass + + async def agenfn(): + while True: + try: + await _async_yield(None) + except MyExc: + pass + return + yield + + + agen = agenfn() + gen = agen.asend(None) + gen.send(None) + gen2 = agen.asend(None) + + with self.assertRaisesRegex(RuntimeError, + r'anext\(\): asynchronous generator is already running'): + gen2.throw(MyExc) + + with self.assertRaisesRegex(RuntimeError, + r"cannot reuse already awaited __anext__\(\)/asend\(\)"): + gen2.send(None) + + def test_async_gen_athrow_throw_concurrent_with_send(self): + import types + + @types.coroutine + def _async_yield(v): + return (yield v) + + class MyExc(Exception): + pass + + async def agenfn(): + while True: + try: + await _async_yield(None) + except MyExc: + pass + return + yield + + + agen = agenfn() + gen = agen.asend(None) + gen.send(None) + gen2 = agen.athrow(MyExc) + + with self.assertRaisesRegex(RuntimeError, + r'athrow\(\): asynchronous generator is already running'): + gen2.throw(MyExc) + + with self.assertRaisesRegex(RuntimeError, + r"cannot reuse already awaited aclose\(\)/athrow\(\)"): + gen2.send(None) + + def test_async_gen_asend_throw_concurrent_with_throw(self): + import types + + @types.coroutine + def _async_yield(v): + return (yield v) + + class MyExc(Exception): + pass + + async def agenfn(): + try: + yield + except MyExc: + pass + while True: + try: + await _async_yield(None) + except MyExc: + pass + + + agen = agenfn() + with self.assertRaises(StopIteration): + agen.asend(None).send(None) + + gen = agen.athrow(MyExc) + gen.throw(MyExc) + gen2 = agen.asend(MyExc) + + with self.assertRaisesRegex(RuntimeError, + r'anext\(\): asynchronous generator is already running'): + gen2.throw(MyExc) + + with self.assertRaisesRegex(RuntimeError, + r"cannot reuse already awaited __anext__\(\)/asend\(\)"): + gen2.send(None) + + def test_async_gen_athrow_throw_concurrent_with_throw(self): + import types + + @types.coroutine + def _async_yield(v): + return (yield v) + + class MyExc(Exception): + pass + + async def agenfn(): + try: + yield + except MyExc: + pass + while True: + try: + await _async_yield(None) + except MyExc: + pass + + agen = agenfn() + with self.assertRaises(StopIteration): + agen.asend(None).send(None) + + gen = agen.athrow(MyExc) + gen.throw(MyExc) + gen2 = agen.athrow(None) + + with self.assertRaisesRegex(RuntimeError, + r'athrow\(\): asynchronous generator is already running'): + gen2.throw(MyExc) + + with self.assertRaisesRegex(RuntimeError, + r"cannot reuse already awaited aclose\(\)/athrow\(\)"): + gen2.send(None) + def test_async_gen_3_arg_deprecation_warning(self): async def gen(): yield 123 @@ -1567,6 +1712,8 @@ class AsyncGenAsyncioTest(unittest.TestCase): self.assertIsInstance(message['exception'], ZeroDivisionError) self.assertIn('unhandled exception during asyncio.run() shutdown', message['message']) + del message, messages + gc_collect() def test_async_gen_expression_01(self): async def arange(n): @@ -1620,6 +1767,7 @@ class AsyncGenAsyncioTest(unittest.TestCase): asyncio.run(main()) self.assertEqual([], messages) + gc_collect() def test_async_gen_await_same_anext_coro_twice(self): async def async_iterate(): diff --git a/Misc/NEWS.d/next/Core and Builtins/2024-04-15-07-37-09.gh-issue-117881.07H0wI.rst b/Misc/NEWS.d/next/Core and Builtins/2024-04-15-07-37-09.gh-issue-117881.07H0wI.rst new file mode 100644 index 0000000..75b3426 --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2024-04-15-07-37-09.gh-issue-117881.07H0wI.rst @@ -0,0 +1 @@ +prevent concurrent access to an async generator via athrow().throw() or asend().throw() diff --git a/Objects/genobject.c b/Objects/genobject.c index 3fc2ac0..119a71f 100644 --- a/Objects/genobject.c +++ b/Objects/genobject.c @@ -1786,6 +1786,7 @@ async_gen_asend_send(PyAsyncGenASend *o, PyObject *arg) if (o->ags_state == AWAITABLE_STATE_INIT) { if (o->ags_gen->ag_running_async) { + o->ags_state = AWAITABLE_STATE_CLOSED; PyErr_SetString( PyExc_RuntimeError, "anext(): asynchronous generator is already running"); @@ -1829,10 +1830,24 @@ async_gen_asend_throw(PyAsyncGenASend *o, PyObject *const *args, Py_ssize_t narg return NULL; } + if (o->ags_state == AWAITABLE_STATE_INIT) { + if (o->ags_gen->ag_running_async) { + o->ags_state = AWAITABLE_STATE_CLOSED; + PyErr_SetString( + PyExc_RuntimeError, + "anext(): asynchronous generator is already running"); + return NULL; + } + + o->ags_state = AWAITABLE_STATE_ITER; + o->ags_gen->ag_running_async = 1; + } + result = gen_throw((PyGenObject*)o->ags_gen, args, nargs); result = async_gen_unwrap_value(o->ags_gen, result); if (result == NULL) { + o->ags_gen->ag_running_async = 0; o->ags_state = AWAITABLE_STATE_CLOSED; } @@ -2221,10 +2236,31 @@ async_gen_athrow_throw(PyAsyncGenAThrow *o, PyObject *const *args, Py_ssize_t na return NULL; } + if (o->agt_state == AWAITABLE_STATE_INIT) { + if (o->agt_gen->ag_running_async) { + o->agt_state = AWAITABLE_STATE_CLOSED; + if (o->agt_args == NULL) { + PyErr_SetString( + PyExc_RuntimeError, + "aclose(): asynchronous generator is already running"); + } + else { + PyErr_SetString( + PyExc_RuntimeError, + "athrow(): asynchronous generator is already running"); + } + return NULL; + } + + o->agt_state = AWAITABLE_STATE_ITER; + o->agt_gen->ag_running_async = 1; + } + retval = gen_throw((PyGenObject*)o->agt_gen, args, nargs); if (o->agt_args) { retval = async_gen_unwrap_value(o->agt_gen, retval); if (retval == NULL) { + o->agt_gen->ag_running_async = 0; o->agt_state = AWAITABLE_STATE_CLOSED; } return retval; @@ -2238,6 +2274,7 @@ async_gen_athrow_throw(PyAsyncGenAThrow *o, PyObject *const *args, Py_ssize_t na return NULL; } if (retval == NULL) { + o->agt_gen->ag_running_async = 0; o->agt_state = AWAITABLE_STATE_CLOSED; } if (PyErr_ExceptionMatches(PyExc_StopAsyncIteration) || -- cgit v0.12