summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2014-02-13 01:58:19 (GMT)
committerGuido van Rossum <guido@python.org>2014-02-13 01:58:19 (GMT)
commitb58f053e48923cba2708410bd215371a7d1d5250 (patch)
treebdb0dabc76f4d412fd3bb425b95c07ab5aeb38e5
parentee6dc425c895755c9fe27dee10e52e762b420c7b (diff)
downloadcpython-b58f053e48923cba2708410bd215371a7d1d5250.zip
cpython-b58f053e48923cba2708410bd215371a7d1d5250.tar.gz
cpython-b58f053e48923cba2708410bd215371a7d1d5250.tar.bz2
asyncio: Change as_completed() to use a Queue, to avoid O(N**2) behavior. Fixes issue #20566.
-rw-r--r--Lib/asyncio/tasks.py53
-rw-r--r--Lib/test/test_asyncio/test_tasks.py23
2 files changed, 55 insertions, 21 deletions
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 81a125f..b7ee758 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -463,7 +463,11 @@ def _wait(fs, timeout, return_when, loop):
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, loop=None, timeout=None):
- """Return an iterator whose values, when waited for, are Futures.
+ """Return an iterator whose values are coroutines.
+
+ When waiting for the yielded coroutines you'll get the results (or
+ exceptions!) of the original Futures (or coroutines), in the order
+ in which and as soon as they complete.
This differs from PEP 3148; the proper way to use this is:
@@ -471,8 +475,8 @@ def as_completed(fs, *, loop=None, timeout=None):
result = yield from f # The 'yield from' may raise.
# Use result.
- Raises TimeoutError if the timeout occurs before all Futures are
- done.
+ If a timeout is specified, the 'yield from' will raise
+ TimeoutError when the timeout occurs before all Futures are done.
Note: The futures 'f' are not necessarily members of fs.
"""
@@ -481,27 +485,36 @@ def as_completed(fs, *, loop=None, timeout=None):
loop = loop if loop is not None else events.get_event_loop()
deadline = None if timeout is None else loop.time() + timeout
todo = {async(f, loop=loop) for f in set(fs)}
- completed = collections.deque()
+ from .queues import Queue # Import here to avoid circular import problem.
+ done = Queue(loop=loop)
+ timeout_handle = None
+
+ def _on_timeout():
+ for f in todo:
+ f.remove_done_callback(_on_completion)
+ done.put_nowait(None) # Queue a dummy value for _wait_for_one().
+ todo.clear() # Can't do todo.remove(f) in the loop.
+
+ def _on_completion(f):
+ if not todo:
+ return # _on_timeout() was here first.
+ todo.remove(f)
+ done.put_nowait(f)
+ if not todo and timeout_handle is not None:
+ timeout_handle.cancel()
@coroutine
def _wait_for_one():
- while not completed:
- timeout = None
- if deadline is not None:
- timeout = deadline - loop.time()
- if timeout < 0:
- raise futures.TimeoutError()
- done, pending = yield from _wait(
- todo, timeout, FIRST_COMPLETED, loop)
- # Multiple callers might be waiting for the same events
- # and getting the same outcome. Dedupe by updating todo.
- for f in done:
- if f in todo:
- todo.remove(f)
- completed.append(f)
- f = completed.popleft()
- return f.result() # May raise.
+ f = yield from done.get()
+ if f is None:
+ # Dummy value from _on_timeout().
+ raise futures.TimeoutError
+ return f.result() # May raise f.exception().
+ for f in todo:
+ f.add_done_callback(_on_completion)
+ if todo and timeout is not None:
+ timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)):
yield _wait_for_one()
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 6847de0..024dd2e 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -779,7 +779,6 @@ class TaskTests(unittest.TestCase):
yield 0
yield 0
yield 0.1
- yield 0.02
loop = test_utils.TestLoop(gen)
self.addCleanup(loop.close)
@@ -791,6 +790,8 @@ class TaskTests(unittest.TestCase):
def foo():
values = []
for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop):
+ if values:
+ loop.advance_time(0.02)
try:
v = yield from f
values.append((1, v))
@@ -809,6 +810,26 @@ class TaskTests(unittest.TestCase):
loop.advance_time(10)
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
+ def test_as_completed_with_unused_timeout(self):
+
+ def gen():
+ yield
+ yield 0
+ yield 0.01
+
+ loop = test_utils.TestLoop(gen)
+ self.addCleanup(loop.close)
+
+ a = asyncio.sleep(0.01, 'a', loop=loop)
+
+ @asyncio.coroutine
+ def foo():
+ for f in asyncio.as_completed([a], timeout=1, loop=loop):
+ v = yield from f
+ self.assertEqual(v, 'a')
+
+ res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
+
def test_as_completed_reverse_wait(self):
def gen():