summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorYury Selivanov <yury@magic.io>2017-12-19 12:19:53 (GMT)
committerGitHub <noreply@github.com>2017-12-19 12:19:53 (GMT)
commit36c2c044782997520df7fc5604742a615ccf6b17 (patch)
treee7ea59b96168eaad588a613943a9a47d3d2aa394 /Lib
parenta9d7e552c72b6e9515e76a1dd4b247da86da23de (diff)
downloadcpython-36c2c044782997520df7fc5604742a615ccf6b17.zip
cpython-36c2c044782997520df7fc5604742a615ccf6b17.tar.gz
cpython-36c2c044782997520df7fc5604742a615ccf6b17.tar.bz2
bpo-32355: Optimize asyncio.gather() (#4913)
Diffstat (limited to 'Lib')
-rw-r--r--Lib/asyncio/base_events.py11
-rw-r--r--Lib/asyncio/tasks.py103
2 files changed, 67 insertions, 47 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index bd5bb32..a7f8edd 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -139,11 +139,12 @@ def _ipaddr_info(host, port, family, type, proto):
def _run_until_complete_cb(fut):
- exc = fut._exception
- if isinstance(exc, BaseException) and not isinstance(exc, Exception):
- # Issue #22429: run_forever() already finished, no need to
- # stop it.
- return
+ if not fut.cancelled():
+ exc = fut.exception()
+ if isinstance(exc, BaseException) and not isinstance(exc, Exception):
+ # Issue #22429: run_forever() already finished, no need to
+ # stop it.
+ return
fut._loop.stop()
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 275141c..ff8a486 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -575,8 +575,7 @@ class _GatheringFuture(futures.Future):
def gather(*coros_or_futures, loop=None, return_exceptions=False):
- """Return a future aggregating results from the given coroutines
- or futures.
+ """Return a future aggregating results from the given coroutines/futures.
Coroutines will be wrapped in a future and scheduled in the event
loop. They will not necessarily be scheduled in the same order as
@@ -605,56 +604,76 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
outer.set_result([])
return outer
- arg_to_fut = {}
- for arg in set(coros_or_futures):
- if not futures.isfuture(arg):
- fut = ensure_future(arg, loop=loop)
- if loop is None:
- loop = fut._loop
- # The caller cannot control this future, the "destroy pending task"
- # warning should not be emitted.
- fut._log_destroy_pending = False
- else:
- fut = arg
- if loop is None:
- loop = fut._loop
- elif fut._loop is not loop:
- raise ValueError("futures are tied to different event loops")
- arg_to_fut[arg] = fut
-
- children = [arg_to_fut[arg] for arg in coros_or_futures]
- nchildren = len(children)
- outer = _GatheringFuture(children, loop=loop)
- nfinished = 0
- results = [None] * nchildren
-
- def _done_callback(i, fut):
+ def _done_callback(fut):
nonlocal nfinished
+ nfinished += 1
+
if outer.done():
if not fut.cancelled():
# Mark exception retrieved.
fut.exception()
return
- if fut.cancelled():
- res = futures.CancelledError()
- if not return_exceptions:
- outer.set_exception(res)
- return
- elif fut._exception is not None:
- res = fut.exception() # Mark exception retrieved.
- if not return_exceptions:
- outer.set_exception(res)
+ if not return_exceptions:
+ if fut.cancelled():
+ # Check if 'fut' is cancelled first, as
+ # 'fut.exception()' will *raise* a CancelledError
+ # instead of returning it.
+ exc = futures.CancelledError()
+ outer.set_exception(exc)
return
- else:
- res = fut._result
- results[i] = res
- nfinished += 1
- if nfinished == nchildren:
+ else:
+ exc = fut.exception()
+ if exc is not None:
+ outer.set_exception(exc)
+ return
+
+ if nfinished == nfuts:
+ # All futures are done; create a list of results
+ # and set it to the 'outer' future.
+ results = []
+
+ for fut in children:
+ if fut.cancelled():
+ # Check if 'fut' is cancelled first, as
+ # 'fut.exception()' will *raise* a CancelledError
+ # instead of returning it.
+ res = futures.CancelledError()
+ else:
+ res = fut.exception()
+ if res is None:
+ res = fut.result()
+ results.append(res)
+
outer.set_result(results)
- for i, fut in enumerate(children):
- fut.add_done_callback(functools.partial(_done_callback, i))
+ arg_to_fut = {}
+ children = []
+ nfuts = 0
+ nfinished = 0
+ for arg in coros_or_futures:
+ if arg not in arg_to_fut:
+ fut = ensure_future(arg, loop=loop)
+ if loop is None:
+ loop = fut._loop
+ if fut is not arg:
+ # 'arg' was not a Future, therefore, 'fut' is a new
+ # Future created specifically for 'arg'. Since the caller
+ # can't control it, disable the "destroy pending task"
+ # warning.
+ fut._log_destroy_pending = False
+
+ nfuts += 1
+ arg_to_fut[arg] = fut
+ fut.add_done_callback(_done_callback)
+
+ else:
+ # There's a duplicate Future object in coros_or_futures.
+ fut = arg_to_fut[arg]
+
+ children.append(fut)
+
+ outer = _GatheringFuture(children, loop=loop)
return outer