diff options
author | Guido van Rossum <guido@python.org> | 2014-02-13 01:58:19 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2014-02-13 01:58:19 (GMT) |
commit | b58f053e48923cba2708410bd215371a7d1d5250 (patch) | |
tree | bdb0dabc76f4d412fd3bb425b95c07ab5aeb38e5 /Lib/asyncio/tasks.py | |
parent | ee6dc425c895755c9fe27dee10e52e762b420c7b (diff) | |
download | cpython-b58f053e48923cba2708410bd215371a7d1d5250.zip cpython-b58f053e48923cba2708410bd215371a7d1d5250.tar.gz cpython-b58f053e48923cba2708410bd215371a7d1d5250.tar.bz2 |
asyncio: Change as_completed() to use a Queue, to avoid O(N**2) behavior. Fixes issue #20566.
Diffstat (limited to 'Lib/asyncio/tasks.py')
-rw-r--r-- | Lib/asyncio/tasks.py | 53 |
1 files changed, 33 insertions, 20 deletions
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 81a125f..b7ee758 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -463,7 +463,11 @@ def _wait(fs, timeout, return_when, loop): # This is *not* a @coroutine! It is just an iterator (yielding Futures). def as_completed(fs, *, loop=None, timeout=None): - """Return an iterator whose values, when waited for, are Futures. + """Return an iterator whose values are coroutines. + + When waiting for the yielded coroutines you'll get the results (or + exceptions!) of the original Futures (or coroutines), in the order + in which and as soon as they complete. This differs from PEP 3148; the proper way to use this is: @@ -471,8 +475,8 @@ def as_completed(fs, *, loop=None, timeout=None): result = yield from f # The 'yield from' may raise. # Use result. - Raises TimeoutError if the timeout occurs before all Futures are - done. + If a timeout is specified, the 'yield from' will raise + TimeoutError when the timeout occurs before all Futures are done. Note: The futures 'f' are not necessarily members of fs. """ @@ -481,27 +485,36 @@ def as_completed(fs, *, loop=None, timeout=None): loop = loop if loop is not None else events.get_event_loop() deadline = None if timeout is None else loop.time() + timeout todo = {async(f, loop=loop) for f in set(fs)} - completed = collections.deque() + from .queues import Queue # Import here to avoid circular import problem. + done = Queue(loop=loop) + timeout_handle = None + + def _on_timeout(): + for f in todo: + f.remove_done_callback(_on_completion) + done.put_nowait(None) # Queue a dummy value for _wait_for_one(). + todo.clear() # Can't do todo.remove(f) in the loop. + + def _on_completion(f): + if not todo: + return # _on_timeout() was here first. + todo.remove(f) + done.put_nowait(f) + if not todo and timeout_handle is not None: + timeout_handle.cancel() @coroutine def _wait_for_one(): - while not completed: - timeout = None - if deadline is not None: - timeout = deadline - loop.time() - if timeout < 0: - raise futures.TimeoutError() - done, pending = yield from _wait( - todo, timeout, FIRST_COMPLETED, loop) - # Multiple callers might be waiting for the same events - # and getting the same outcome. Dedupe by updating todo. - for f in done: - if f in todo: - todo.remove(f) - completed.append(f) - f = completed.popleft() - return f.result() # May raise. + f = yield from done.get() + if f is None: + # Dummy value from _on_timeout(). + raise futures.TimeoutError + return f.result() # May raise f.exception(). + for f in todo: + f.add_done_callback(_on_completion) + if todo and timeout is not None: + timeout_handle = loop.call_later(timeout, _on_timeout) for _ in range(len(todo)): yield _wait_for_one() |