diff options
author | Yury Selivanov <yury@magic.io> | 2017-12-19 12:19:53 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-19 12:19:53 (GMT) |
commit | 36c2c044782997520df7fc5604742a615ccf6b17 (patch) | |
tree | e7ea59b96168eaad588a613943a9a47d3d2aa394 /Lib/asyncio | |
parent | a9d7e552c72b6e9515e76a1dd4b247da86da23de (diff) | |
download | cpython-36c2c044782997520df7fc5604742a615ccf6b17.zip cpython-36c2c044782997520df7fc5604742a615ccf6b17.tar.gz cpython-36c2c044782997520df7fc5604742a615ccf6b17.tar.bz2 |
bpo-32355: Optimize asyncio.gather() (#4913)
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/base_events.py | 11 | ||||
-rw-r--r-- | Lib/asyncio/tasks.py | 103 |
2 files changed, 67 insertions, 47 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index bd5bb32..a7f8edd 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -139,11 +139,12 @@ def _ipaddr_info(host, port, family, type, proto): def _run_until_complete_cb(fut): - exc = fut._exception - if isinstance(exc, BaseException) and not isinstance(exc, Exception): - # Issue #22429: run_forever() already finished, no need to - # stop it. - return + if not fut.cancelled(): + exc = fut.exception() + if isinstance(exc, BaseException) and not isinstance(exc, Exception): + # Issue #22429: run_forever() already finished, no need to + # stop it. + return fut._loop.stop() diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 275141c..ff8a486 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -575,8 +575,7 @@ class _GatheringFuture(futures.Future): def gather(*coros_or_futures, loop=None, return_exceptions=False): - """Return a future aggregating results from the given coroutines - or futures. + """Return a future aggregating results from the given coroutines/futures. Coroutines will be wrapped in a future and scheduled in the event loop. They will not necessarily be scheduled in the same order as @@ -605,56 +604,76 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): outer.set_result([]) return outer - arg_to_fut = {} - for arg in set(coros_or_futures): - if not futures.isfuture(arg): - fut = ensure_future(arg, loop=loop) - if loop is None: - loop = fut._loop - # The caller cannot control this future, the "destroy pending task" - # warning should not be emitted. - fut._log_destroy_pending = False - else: - fut = arg - if loop is None: - loop = fut._loop - elif fut._loop is not loop: - raise ValueError("futures are tied to different event loops") - arg_to_fut[arg] = fut - - children = [arg_to_fut[arg] for arg in coros_or_futures] - nchildren = len(children) - outer = _GatheringFuture(children, loop=loop) - nfinished = 0 - results = [None] * nchildren - - def _done_callback(i, fut): + def _done_callback(fut): nonlocal nfinished + nfinished += 1 + if outer.done(): if not fut.cancelled(): # Mark exception retrieved. fut.exception() return - if fut.cancelled(): - res = futures.CancelledError() - if not return_exceptions: - outer.set_exception(res) - return - elif fut._exception is not None: - res = fut.exception() # Mark exception retrieved. - if not return_exceptions: - outer.set_exception(res) + if not return_exceptions: + if fut.cancelled(): + # Check if 'fut' is cancelled first, as + # 'fut.exception()' will *raise* a CancelledError + # instead of returning it. + exc = futures.CancelledError() + outer.set_exception(exc) return - else: - res = fut._result - results[i] = res - nfinished += 1 - if nfinished == nchildren: + else: + exc = fut.exception() + if exc is not None: + outer.set_exception(exc) + return + + if nfinished == nfuts: + # All futures are done; create a list of results + # and set it to the 'outer' future. + results = [] + + for fut in children: + if fut.cancelled(): + # Check if 'fut' is cancelled first, as + # 'fut.exception()' will *raise* a CancelledError + # instead of returning it. + res = futures.CancelledError() + else: + res = fut.exception() + if res is None: + res = fut.result() + results.append(res) + outer.set_result(results) - for i, fut in enumerate(children): - fut.add_done_callback(functools.partial(_done_callback, i)) + arg_to_fut = {} + children = [] + nfuts = 0 + nfinished = 0 + for arg in coros_or_futures: + if arg not in arg_to_fut: + fut = ensure_future(arg, loop=loop) + if loop is None: + loop = fut._loop + if fut is not arg: + # 'arg' was not a Future, therefore, 'fut' is a new + # Future created specifically for 'arg'. Since the caller + # can't control it, disable the "destroy pending task" + # warning. + fut._log_destroy_pending = False + + nfuts += 1 + arg_to_fut[arg] = fut + fut.add_done_callback(_done_callback) + + else: + # There's a duplicate Future object in coros_or_futures. + fut = arg_to_fut[arg] + + children.append(fut) + + outer = _GatheringFuture(children, loop=loop) return outer |