From fccbfc40b546630fa7ee404c0949d52ab2921a90 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sun, 26 Jan 2025 15:44:16 +0000 Subject: gh-129195: use `future_add_to_awaited_by/future_discard_from_awaited_by` in `asyncio.staggered.staggered_race` (#129253) Co-authored-by: Kumar Aditya --- Lib/asyncio/staggered.py | 7 ++- Lib/test/test_external_inspection.py | 63 ++++++++++++++++++++++ .../2025-01-24-10-48-32.gh-issue-129195.89d5NU.rst | 1 + 3 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 Misc/NEWS.d/next/Library/2025-01-24-10-48-32.gh-issue-129195.89d5NU.rst diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 0afed64..2ad65d8 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -8,6 +8,7 @@ from . import events from . import exceptions as exceptions_mod from . import locks from . import tasks +from . import futures async def staggered_race(coro_fns, delay, *, loop=None): @@ -63,6 +64,7 @@ async def staggered_race(coro_fns, delay, *, loop=None): """ # TODO: when we have aiter() and anext(), allow async iterables in coro_fns. loop = loop or events.get_running_loop() + parent_task = tasks.current_task(loop) enum_coro_fns = enumerate(coro_fns) winner_result = None winner_index = None @@ -73,6 +75,7 @@ async def staggered_race(coro_fns, delay, *, loop=None): def task_done(task): running_tasks.discard(task) + futures.future_discard_from_awaited_by(task, parent_task) if ( on_completed_fut is not None and not on_completed_fut.done() @@ -110,6 +113,7 @@ async def staggered_race(coro_fns, delay, *, loop=None): this_failed = locks.Event() next_ok_to_start = locks.Event() next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed)) + futures.future_add_to_awaited_by(next_task, parent_task) running_tasks.add(next_task) next_task.add_done_callback(task_done) # next_task has been appended to running_tasks so next_task is ok to @@ -148,6 +152,7 @@ async def staggered_race(coro_fns, delay, *, loop=None): try: ok_to_start = locks.Event() first_task = loop.create_task(run_one_coro(ok_to_start, None)) + futures.future_add_to_awaited_by(first_task, parent_task) running_tasks.add(first_task) first_task.add_done_callback(task_done) # first_task has been appended to running_tasks so first_task is ok to start. @@ -171,4 +176,4 @@ async def staggered_race(coro_fns, delay, *, loop=None): raise propagate_cancellation_error return winner_result, winner_index, exceptions finally: - del exceptions, propagate_cancellation_error, unhandled_exceptions + del exceptions, propagate_cancellation_error, unhandled_exceptions, parent_task diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py index eceae53..2ab48a4 100644 --- a/Lib/test/test_external_inspection.py +++ b/Lib/test/test_external_inspection.py @@ -290,6 +290,69 @@ class TestGetStackTrace(unittest.TestCase): "Test only runs on Linux and MacOS") @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support") + def test_async_staggered_race_remote_stack_trace(self): + # Spawn a process with some realistic Python code + script = textwrap.dedent("""\ + import asyncio.staggered + import time + import sys + + async def deep(): + await asyncio.sleep(0) + fifo_path = sys.argv[1] + with open(fifo_path, "w") as fifo: + fifo.write("ready") + time.sleep(10000) + + async def c1(): + await asyncio.sleep(0) + await deep() + + async def c2(): + await asyncio.sleep(10000) + + async def main(): + await asyncio.staggered.staggered_race( + [c1, c2], + delay=None, + ) + + asyncio.run(main()) + """) + stack_trace = None + with os_helper.temp_dir() as work_dir: + script_dir = os.path.join(work_dir, "script_pkg") + os.mkdir(script_dir) + fifo = f"{work_dir}/the_fifo" + os.mkfifo(fifo) + script_name = _make_test_script(script_dir, 'script', script) + try: + p = subprocess.Popen([sys.executable, script_name, str(fifo)]) + with open(fifo, "r") as fifo_file: + response = fifo_file.read() + self.assertEqual(response, "ready") + stack_trace = get_async_stack_trace(p.pid) + except PermissionError: + self.skipTest( + "Insufficient permissions to read the stack trace") + finally: + os.remove(fifo) + p.kill() + p.terminate() + p.wait(timeout=SHORT_TIMEOUT) + + # sets are unordered, so we want to sort "awaited_by"s + stack_trace[2].sort(key=lambda x: x[1]) + + expected_stack_trace = [ + ['deep', 'c1', 'run_one_coro'], 'Task-2', [[['main'], 'Task-1', []]] + ] + self.assertEqual(stack_trace, expected_stack_trace) + + @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", + "Test only runs on Linux and MacOS") + @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support") def test_self_trace(self): stack_trace = get_stack_trace(os.getpid()) self.assertEqual(stack_trace[0], "test_self_trace") diff --git a/Misc/NEWS.d/next/Library/2025-01-24-10-48-32.gh-issue-129195.89d5NU.rst b/Misc/NEWS.d/next/Library/2025-01-24-10-48-32.gh-issue-129195.89d5NU.rst new file mode 100644 index 0000000..daf7297 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-01-24-10-48-32.gh-issue-129195.89d5NU.rst @@ -0,0 +1 @@ +Support reporting call graph information from :func:`!asyncio.staggered.staggered_race`. -- cgit v0.12