diff options
author | Peter Bierma <zintensitydev@gmail.com> | 2024-10-01 01:37:27 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-01 01:37:27 (GMT) |
commit | 133e929a791d209b578b4822a7a07f4570b3803b (patch) | |
tree | de0a1d104737285e3a5709dd7e95dc603413ca11 /Lib/asyncio/staggered.py | |
parent | 7bdfabe2d1ec353ecdc75a5aec41cce83e572391 (diff) | |
download | cpython-133e929a791d209b578b4822a7a07f4570b3803b.zip cpython-133e929a791d209b578b4822a7a07f4570b3803b.tar.gz cpython-133e929a791d209b578b4822a7a07f4570b3803b.tar.bz2 |
gh-124309: Revert eager task factory fix to prevent breaking downstream (#124810)
* Revert "GH-124639: add back loop param to staggered_race (#124700)"
This reverts commit e0a41a5dd12cb6e9277b05abebac5c70be684dd7.
* Revert "gh-124309: Modernize the `staggered_race` implementation to support eager task factories (#124390)"
This reverts commit de929f353c413459834a2a37b2d9b0240673d874.
Diffstat (limited to 'Lib/asyncio/staggered.py')
-rw-r--r-- | Lib/asyncio/staggered.py | 83 |
1 files changed, 60 insertions, 23 deletions
diff --git a/Lib/asyncio/staggered.py b/Lib/asyncio/staggered.py index 6ccf5c3..c3a7441 100644 --- a/Lib/asyncio/staggered.py +++ b/Lib/asyncio/staggered.py @@ -4,12 +4,11 @@ __all__ = 'staggered_race', import contextlib +from . import events +from . import exceptions as exceptions_mod from . import locks from . import tasks -from . import taskgroups -class _Done(Exception): - pass async def staggered_race(coro_fns, delay, *, loop=None): """Run coroutines with staggered start times and take the first to finish. @@ -43,6 +42,8 @@ async def staggered_race(coro_fns, delay, *, loop=None): delay: amount of time, in seconds, between starting coroutines. If ``None``, the coroutines will run sequentially. + loop: the event loop to use. + Returns: tuple *(winner_result, winner_index, exceptions)* where @@ -61,11 +62,36 @@ async def staggered_race(coro_fns, delay, *, loop=None): """ # TODO: when we have aiter() and anext(), allow async iterables in coro_fns. + loop = loop or events.get_running_loop() + enum_coro_fns = enumerate(coro_fns) winner_result = None winner_index = None exceptions = [] + running_tasks = [] + + async def run_one_coro(previous_failed) -> None: + # Wait for the previous task to finish, or for delay seconds + if previous_failed is not None: + with contextlib.suppress(exceptions_mod.TimeoutError): + # Use asyncio.wait_for() instead of asyncio.wait() here, so + # that if we get cancelled at this point, Event.wait() is also + # cancelled, otherwise there will be a "Task destroyed but it is + # pending" later. + await tasks.wait_for(previous_failed.wait(), delay) + # Get the next coroutine to run + try: + this_index, coro_fn = next(enum_coro_fns) + except StopIteration: + return + # Start task that will run the next coroutine + this_failed = locks.Event() + next_task = loop.create_task(run_one_coro(this_failed)) + running_tasks.append(next_task) + assert len(running_tasks) == this_index + 2 + # Prepare place to put this coroutine's exceptions if not won + exceptions.append(None) + assert len(exceptions) == this_index + 1 - async def run_one_coro(this_index, coro_fn, this_failed): try: result = await coro_fn() except (SystemExit, KeyboardInterrupt): @@ -79,23 +105,34 @@ async def staggered_race(coro_fns, delay, *, loop=None): assert winner_index is None winner_index = this_index winner_result = result - raise _Done - + # Cancel all other tasks. We take care to not cancel the current + # task as well. If we do so, then since there is no `await` after + # here and CancelledError are usually thrown at one, we will + # encounter a curious corner case where the current task will end + # up as done() == True, cancelled() == False, exception() == + # asyncio.CancelledError. This behavior is specified in + # https://bugs.python.org/issue30048 + for i, t in enumerate(running_tasks): + if i != this_index: + t.cancel() + + first_task = loop.create_task(run_one_coro(None)) + running_tasks.append(first_task) try: - tg = taskgroups.TaskGroup() - # Intentionally override the loop in the TaskGroup to avoid - # using the running loop, preserving backwards compatibility - # TaskGroup only starts using `_loop` after `__aenter__` - # so overriding it here is safe. - tg._loop = loop - async with tg: - for this_index, coro_fn in enumerate(coro_fns): - this_failed = locks.Event() - exceptions.append(None) - tg.create_task(run_one_coro(this_index, coro_fn, this_failed)) - with contextlib.suppress(TimeoutError): - await tasks.wait_for(this_failed.wait(), delay) - except* _Done: - pass - - return winner_result, winner_index, exceptions + # Wait for a growing list of tasks to all finish: poor man's version of + # curio's TaskGroup or trio's nursery + done_count = 0 + while done_count != len(running_tasks): + done, _ = await tasks.wait(running_tasks) + done_count = len(done) + # If run_one_coro raises an unhandled exception, it's probably a + # programming error, and I want to see it. + if __debug__: + for d in done: + if d.done() and not d.cancelled() and d.exception(): + raise d.exception() + return winner_result, winner_index, exceptions + finally: + # Make sure no tasks are left running if we leave this function + for t in running_tasks: + t.cancel() |