diff options
Diffstat (limited to 'Lib/asyncio/tasks.py')
-rw-r--r-- | Lib/asyncio/tasks.py | 103 |
1 files changed, 61 insertions, 42 deletions
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 275141c..ff8a486 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -575,8 +575,7 @@ class _GatheringFuture(futures.Future): def gather(*coros_or_futures, loop=None, return_exceptions=False): - """Return a future aggregating results from the given coroutines - or futures. + """Return a future aggregating results from the given coroutines/futures. Coroutines will be wrapped in a future and scheduled in the event loop. They will not necessarily be scheduled in the same order as @@ -605,56 +604,76 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): outer.set_result([]) return outer - arg_to_fut = {} - for arg in set(coros_or_futures): - if not futures.isfuture(arg): - fut = ensure_future(arg, loop=loop) - if loop is None: - loop = fut._loop - # The caller cannot control this future, the "destroy pending task" - # warning should not be emitted. - fut._log_destroy_pending = False - else: - fut = arg - if loop is None: - loop = fut._loop - elif fut._loop is not loop: - raise ValueError("futures are tied to different event loops") - arg_to_fut[arg] = fut - - children = [arg_to_fut[arg] for arg in coros_or_futures] - nchildren = len(children) - outer = _GatheringFuture(children, loop=loop) - nfinished = 0 - results = [None] * nchildren - - def _done_callback(i, fut): + def _done_callback(fut): nonlocal nfinished + nfinished += 1 + if outer.done(): if not fut.cancelled(): # Mark exception retrieved. fut.exception() return - if fut.cancelled(): - res = futures.CancelledError() - if not return_exceptions: - outer.set_exception(res) - return - elif fut._exception is not None: - res = fut.exception() # Mark exception retrieved. - if not return_exceptions: - outer.set_exception(res) + if not return_exceptions: + if fut.cancelled(): + # Check if 'fut' is cancelled first, as + # 'fut.exception()' will *raise* a CancelledError + # instead of returning it. + exc = futures.CancelledError() + outer.set_exception(exc) return - else: - res = fut._result - results[i] = res - nfinished += 1 - if nfinished == nchildren: + else: + exc = fut.exception() + if exc is not None: + outer.set_exception(exc) + return + + if nfinished == nfuts: + # All futures are done; create a list of results + # and set it to the 'outer' future. + results = [] + + for fut in children: + if fut.cancelled(): + # Check if 'fut' is cancelled first, as + # 'fut.exception()' will *raise* a CancelledError + # instead of returning it. + res = futures.CancelledError() + else: + res = fut.exception() + if res is None: + res = fut.result() + results.append(res) + outer.set_result(results) - for i, fut in enumerate(children): - fut.add_done_callback(functools.partial(_done_callback, i)) + arg_to_fut = {} + children = [] + nfuts = 0 + nfinished = 0 + for arg in coros_or_futures: + if arg not in arg_to_fut: + fut = ensure_future(arg, loop=loop) + if loop is None: + loop = fut._loop + if fut is not arg: + # 'arg' was not a Future, therefore, 'fut' is a new + # Future created specifically for 'arg'. Since the caller + # can't control it, disable the "destroy pending task" + # warning. + fut._log_destroy_pending = False + + nfuts += 1 + arg_to_fut[arg] = fut + fut.add_done_callback(_done_callback) + + else: + # There's a duplicate Future object in coros_or_futures. + fut = arg_to_fut[arg] + + children.append(fut) + + outer = _GatheringFuture(children, loop=loop) return outer |