summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio/tasks.py')
-rw-r--r--Lib/asyncio/tasks.py103
1 files changed, 61 insertions, 42 deletions
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 275141c..ff8a486 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -575,8 +575,7 @@ class _GatheringFuture(futures.Future):
def gather(*coros_or_futures, loop=None, return_exceptions=False):
- """Return a future aggregating results from the given coroutines
- or futures.
+ """Return a future aggregating results from the given coroutines/futures.
Coroutines will be wrapped in a future and scheduled in the event
loop. They will not necessarily be scheduled in the same order as
@@ -605,56 +604,76 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
outer.set_result([])
return outer
- arg_to_fut = {}
- for arg in set(coros_or_futures):
- if not futures.isfuture(arg):
- fut = ensure_future(arg, loop=loop)
- if loop is None:
- loop = fut._loop
- # The caller cannot control this future, the "destroy pending task"
- # warning should not be emitted.
- fut._log_destroy_pending = False
- else:
- fut = arg
- if loop is None:
- loop = fut._loop
- elif fut._loop is not loop:
- raise ValueError("futures are tied to different event loops")
- arg_to_fut[arg] = fut
-
- children = [arg_to_fut[arg] for arg in coros_or_futures]
- nchildren = len(children)
- outer = _GatheringFuture(children, loop=loop)
- nfinished = 0
- results = [None] * nchildren
-
- def _done_callback(i, fut):
+ def _done_callback(fut):
nonlocal nfinished
+ nfinished += 1
+
if outer.done():
if not fut.cancelled():
# Mark exception retrieved.
fut.exception()
return
- if fut.cancelled():
- res = futures.CancelledError()
- if not return_exceptions:
- outer.set_exception(res)
- return
- elif fut._exception is not None:
- res = fut.exception() # Mark exception retrieved.
- if not return_exceptions:
- outer.set_exception(res)
+ if not return_exceptions:
+ if fut.cancelled():
+ # Check if 'fut' is cancelled first, as
+ # 'fut.exception()' will *raise* a CancelledError
+ # instead of returning it.
+ exc = futures.CancelledError()
+ outer.set_exception(exc)
return
- else:
- res = fut._result
- results[i] = res
- nfinished += 1
- if nfinished == nchildren:
+ else:
+ exc = fut.exception()
+ if exc is not None:
+ outer.set_exception(exc)
+ return
+
+ if nfinished == nfuts:
+ # All futures are done; create a list of results
+ # and set it to the 'outer' future.
+ results = []
+
+ for fut in children:
+ if fut.cancelled():
+ # Check if 'fut' is cancelled first, as
+ # 'fut.exception()' will *raise* a CancelledError
+ # instead of returning it.
+ res = futures.CancelledError()
+ else:
+ res = fut.exception()
+ if res is None:
+ res = fut.result()
+ results.append(res)
+
outer.set_result(results)
- for i, fut in enumerate(children):
- fut.add_done_callback(functools.partial(_done_callback, i))
+ arg_to_fut = {}
+ children = []
+ nfuts = 0
+ nfinished = 0
+ for arg in coros_or_futures:
+ if arg not in arg_to_fut:
+ fut = ensure_future(arg, loop=loop)
+ if loop is None:
+ loop = fut._loop
+ if fut is not arg:
+ # 'arg' was not a Future, therefore, 'fut' is a new
+ # Future created specifically for 'arg'. Since the caller
+ # can't control it, disable the "destroy pending task"
+ # warning.
+ fut._log_destroy_pending = False
+
+ nfuts += 1
+ arg_to_fut[arg] = fut
+ fut.add_done_callback(_done_callback)
+
+ else:
+ # There's a duplicate Future object in coros_or_futures.
+ fut = arg_to_fut[arg]
+
+ children.append(fut)
+
+ outer = _GatheringFuture(children, loop=loop)
return outer