summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJustin Turner Arthur <justinarthur@gmail.com>2024-04-01 17:07:29 (GMT)
committerGitHub <noreply@github.com>2024-04-01 17:07:29 (GMT)
commitc741ad3537193c63fe697a8f0316aecd45eeb9ba (patch)
tree4dc0eaac7cde2150ee10811779f1f62177524dc3
parentddf814db744006e0f42328aa15ace97c9d8ad681 (diff)
downloadcpython-c741ad3537193c63fe697a8f0316aecd45eeb9ba.zip
cpython-c741ad3537193c63fe697a8f0316aecd45eeb9ba.tar.gz
cpython-c741ad3537193c63fe697a8f0316aecd45eeb9ba.tar.bz2
gh-77714: Provide an async iterator version of as_completed (GH-22491)
* as_completed returns object that is both iterator and async iterator * Existing tests adjusted to test both the old and new style * New test to ensure iterator can be resumed * New test to ensure async iterator yields any passed-in Futures as-is Co-authored-by: Serhiy Storchaka <storchaka@gmail.com> Co-authored-by: Guido van Rossum <gvanrossum@gmail.com>
-rw-r--r--Doc/library/asyncio-task.rst61
-rw-r--r--Doc/whatsnew/3.13.rst7
-rw-r--r--Lib/asyncio/tasks.py152
-rw-r--r--Lib/test/test_asyncio/test_tasks.py282
-rw-r--r--Misc/NEWS.d/next/Library/2020-10-02-17-35-19.bpo-33533.GLIhM5.rst5
5 files changed, 387 insertions, 120 deletions
diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst
index 2aab62c..3b10a0d 100644
--- a/Doc/library/asyncio-task.rst
+++ b/Doc/library/asyncio-task.rst
@@ -867,19 +867,50 @@ Waiting Primitives
.. function:: as_completed(aws, *, timeout=None)
- Run :ref:`awaitable objects <asyncio-awaitables>` in the *aws*
- iterable concurrently. Return an iterator of coroutines.
- Each coroutine returned can be awaited to get the earliest next
- result from the iterable of the remaining awaitables.
-
- Raises :exc:`TimeoutError` if the timeout occurs before
- all Futures are done.
-
- Example::
-
- for coro in as_completed(aws):
- earliest_result = await coro
- # ...
+ Run :ref:`awaitable objects <asyncio-awaitables>` in the *aws* iterable
+ concurrently. The returned object can be iterated to obtain the results
+ of the awaitables as they finish.
+
+ The object returned by ``as_completed()`` can be iterated as an
+ :term:`asynchronous iterator` or a plain :term:`iterator`. When asynchronous
+ iteration is used, the originally-supplied awaitables are yielded if they
+ are tasks or futures. This makes it easy to correlate previously-scheduled
+ tasks with their results. Example::
+
+ ipv4_connect = create_task(open_connection("127.0.0.1", 80))
+ ipv6_connect = create_task(open_connection("::1", 80))
+ tasks = [ipv4_connect, ipv6_connect]
+
+ async for earliest_connect in as_completed(tasks):
+ # earliest_connect is done. The result can be obtained by
+ # awaiting it or calling earliest_connect.result()
+ reader, writer = await earliest_connect
+
+ if earliest_connect is ipv6_connect:
+ print("IPv6 connection established.")
+ else:
+ print("IPv4 connection established.")
+
+ During asynchronous iteration, implicitly-created tasks will be yielded for
+ supplied awaitables that aren't tasks or futures.
+
+ When used as a plain iterator, each iteration yields a new coroutine that
+ returns the result or raises the exception of the next completed awaitable.
+ This pattern is compatible with Python versions older than 3.13::
+
+ ipv4_connect = create_task(open_connection("127.0.0.1", 80))
+ ipv6_connect = create_task(open_connection("::1", 80))
+ tasks = [ipv4_connect, ipv6_connect]
+
+ for next_connect in as_completed(tasks):
+ # next_connect is not one of the original task objects. It must be
+ # awaited to obtain the result value or raise the exception of the
+ # awaitable that finishes next.
+ reader, writer = await next_connect
+
+ A :exc:`TimeoutError` is raised if the timeout occurs before all awaitables
+ are done. This is raised by the ``async for`` loop during asynchronous
+ iteration or by the coroutines yielded during plain iteration.
.. versionchanged:: 3.10
Removed the *loop* parameter.
@@ -891,6 +922,10 @@ Waiting Primitives
.. versionchanged:: 3.12
Added support for generators yielding tasks.
+ .. versionchanged:: 3.13
+ The result can now be used as either an :term:`asynchronous iterator`
+ or as a plain :term:`iterator` (previously it was only a plain iterator).
+
Running in Threads
==================
diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst
index 45f7f50..97bee4d 100644
--- a/Doc/whatsnew/3.13.rst
+++ b/Doc/whatsnew/3.13.rst
@@ -289,6 +289,13 @@ asyncio
forcefully close an asyncio server.
(Contributed by Pierre Ossman in :gh:`113538`.)
+* :func:`asyncio.as_completed` now returns an object that is both an
+ :term:`asynchronous iterator` and a plain :term:`iterator` of awaitables.
+ The awaitables yielded by asynchronous iteration include original task or
+ future objects that were passed in, making it easier to associate results
+ with the tasks being completed.
+ (Contributed by Justin Arthur in :gh:`77714`.)
+
base64
------
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 48e31af..7fb697b9 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -25,6 +25,7 @@ from . import coroutines
from . import events
from . import exceptions
from . import futures
+from . import queues
from . import timeouts
# Helper to generate new task names
@@ -564,62 +565,125 @@ async def _cancel_and_wait(fut):
fut.remove_done_callback(cb)
-# This is *not* a @coroutine! It is just an iterator (yielding Futures).
+class _AsCompletedIterator:
+ """Iterator of awaitables representing tasks of asyncio.as_completed.
+
+ As an asynchronous iterator, iteration yields futures as they finish. As a
+ plain iterator, new coroutines are yielded that will return or raise the
+ result of the next underlying future to complete.
+ """
+ def __init__(self, aws, timeout):
+ self._done = queues.Queue()
+ self._timeout_handle = None
+
+ loop = events.get_event_loop()
+ todo = {ensure_future(aw, loop=loop) for aw in set(aws)}
+ for f in todo:
+ f.add_done_callback(self._handle_completion)
+ if todo and timeout is not None:
+ self._timeout_handle = (
+ loop.call_later(timeout, self._handle_timeout)
+ )
+ self._todo = todo
+ self._todo_left = len(todo)
+
+ def __aiter__(self):
+ return self
+
+ def __iter__(self):
+ return self
+
+ async def __anext__(self):
+ if not self._todo_left:
+ raise StopAsyncIteration
+ assert self._todo_left > 0
+ self._todo_left -= 1
+ return await self._wait_for_one()
+
+ def __next__(self):
+ if not self._todo_left:
+ raise StopIteration
+ assert self._todo_left > 0
+ self._todo_left -= 1
+ return self._wait_for_one(resolve=True)
+
+ def _handle_timeout(self):
+ for f in self._todo:
+ f.remove_done_callback(self._handle_completion)
+ self._done.put_nowait(None) # Sentinel for _wait_for_one().
+ self._todo.clear() # Can't do todo.remove(f) in the loop.
+
+ def _handle_completion(self, f):
+ if not self._todo:
+ return # _handle_timeout() was here first.
+ self._todo.remove(f)
+ self._done.put_nowait(f)
+ if not self._todo and self._timeout_handle is not None:
+ self._timeout_handle.cancel()
+
+ async def _wait_for_one(self, resolve=False):
+ # Wait for the next future to be done and return it unless resolve is
+ # set, in which case return either the result of the future or raise
+ # an exception.
+ f = await self._done.get()
+ if f is None:
+ # Dummy value from _handle_timeout().
+ raise exceptions.TimeoutError
+ return f.result() if resolve else f
+
+
def as_completed(fs, *, timeout=None):
- """Return an iterator whose values are coroutines.
+ """Create an iterator of awaitables or their results in completion order.
- When waiting for the yielded coroutines you'll get the results (or
- exceptions!) of the original Futures (or coroutines), in the order
- in which and as soon as they complete.
+ Run the supplied awaitables concurrently. The returned object can be
+ iterated to obtain the results of the awaitables as they finish.
- This differs from PEP 3148; the proper way to use this is:
+ The object returned can be iterated as an asynchronous iterator or a plain
+ iterator. When asynchronous iteration is used, the originally-supplied
+ awaitables are yielded if they are tasks or futures. This makes it easy to
+ correlate previously-scheduled tasks with their results:
- for f in as_completed(fs):
- result = await f # The 'await' may raise.
- # Use result.
+ ipv4_connect = create_task(open_connection("127.0.0.1", 80))
+ ipv6_connect = create_task(open_connection("::1", 80))
+ tasks = [ipv4_connect, ipv6_connect]
- If a timeout is specified, the 'await' will raise
- TimeoutError when the timeout occurs before all Futures are done.
+ async for earliest_connect in as_completed(tasks):
+ # earliest_connect is done. The result can be obtained by
+ # awaiting it or calling earliest_connect.result()
+ reader, writer = await earliest_connect
- Note: The futures 'f' are not necessarily members of fs.
- """
- if futures.isfuture(fs) or coroutines.iscoroutine(fs):
- raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
+ if earliest_connect is ipv6_connect:
+ print("IPv6 connection established.")
+ else:
+ print("IPv4 connection established.")
- from .queues import Queue # Import here to avoid circular import problem.
- done = Queue()
+ During asynchronous iteration, implicitly-created tasks will be yielded for
+ supplied awaitables that aren't tasks or futures.
- loop = events.get_event_loop()
- todo = {ensure_future(f, loop=loop) for f in set(fs)}
- timeout_handle = None
+ When used as a plain iterator, each iteration yields a new coroutine that
+ returns the result or raises the exception of the next completed awaitable.
+ This pattern is compatible with Python versions older than 3.13:
- def _on_timeout():
- for f in todo:
- f.remove_done_callback(_on_completion)
- done.put_nowait(None) # Queue a dummy value for _wait_for_one().
- todo.clear() # Can't do todo.remove(f) in the loop.
+ ipv4_connect = create_task(open_connection("127.0.0.1", 80))
+ ipv6_connect = create_task(open_connection("::1", 80))
+ tasks = [ipv4_connect, ipv6_connect]
- def _on_completion(f):
- if not todo:
- return # _on_timeout() was here first.
- todo.remove(f)
- done.put_nowait(f)
- if not todo and timeout_handle is not None:
- timeout_handle.cancel()
+ for next_connect in as_completed(tasks):
+ # next_connect is not one of the original task objects. It must be
+ # awaited to obtain the result value or raise the exception of the
+ # awaitable that finishes next.
+ reader, writer = await next_connect
- async def _wait_for_one():
- f = await done.get()
- if f is None:
- # Dummy value from _on_timeout().
- raise exceptions.TimeoutError
- return f.result() # May raise f.exception().
+ A TimeoutError is raised if the timeout occurs before all awaitables are
+ done. This is raised by the async for loop during asynchronous iteration or
+ by the coroutines yielded during plain iteration.
+ """
+ if inspect.isawaitable(fs):
+ raise TypeError(
+ f"expects an iterable of awaitables, not {type(fs).__name__}"
+ )
- for f in todo:
- f.add_done_callback(_on_completion)
- if todo and timeout is not None:
- timeout_handle = loop.call_later(timeout, _on_timeout)
- for _ in range(len(todo)):
- yield _wait_for_one()
+ return _AsCompletedIterator(fs, timeout)
@types.coroutine
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 4dfaff8..bc6d88e 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -1,6 +1,7 @@
"""Tests for tasks.py."""
import collections
+import contextlib
import contextvars
import gc
import io
@@ -1409,12 +1410,6 @@ class BaseTaskTests:
yield 0.01
yield 0
- loop = self.new_test_loop(gen)
- # disable "slow callback" warning
- loop.slow_callback_duration = 1.0
- completed = set()
- time_shifted = False
-
async def sleeper(dt, x):
nonlocal time_shifted
await asyncio.sleep(dt)
@@ -1424,21 +1419,78 @@ class BaseTaskTests:
loop.advance_time(0.14)
return x
- a = sleeper(0.01, 'a')
- b = sleeper(0.01, 'b')
- c = sleeper(0.15, 'c')
+ async def try_iterator(awaitables):
+ values = []
+ for f in asyncio.as_completed(awaitables):
+ values.append(await f)
+ return values
- async def foo():
+ async def try_async_iterator(awaitables):
values = []
- for f in asyncio.as_completed([b, c, a]):
+ async for f in asyncio.as_completed(awaitables):
values.append(await f)
return values
- res = loop.run_until_complete(self.new_task(loop, foo()))
- self.assertAlmostEqual(0.15, loop.time())
- self.assertTrue('a' in res[:2])
- self.assertTrue('b' in res[:2])
- self.assertEqual(res[2], 'c')
+ for foo in try_iterator, try_async_iterator:
+ with self.subTest(method=foo.__name__):
+ loop = self.new_test_loop(gen)
+ # disable "slow callback" warning
+ loop.slow_callback_duration = 1.0
+
+ completed = set()
+ time_shifted = False
+
+ a = sleeper(0.01, 'a')
+ b = sleeper(0.01, 'b')
+ c = sleeper(0.15, 'c')
+
+ res = loop.run_until_complete(self.new_task(loop, foo([b, c, a])))
+ self.assertAlmostEqual(0.15, loop.time())
+ self.assertTrue('a' in res[:2])
+ self.assertTrue('b' in res[:2])
+ self.assertEqual(res[2], 'c')
+
+ def test_as_completed_same_tasks_in_as_out(self):
+ # Ensures that asynchronously iterating as_completed's iterator
+ # yields awaitables are the same awaitables that were passed in when
+ # those awaitables are futures.
+ async def try_async_iterator(awaitables):
+ awaitables_out = set()
+ async for out_aw in asyncio.as_completed(awaitables):
+ awaitables_out.add(out_aw)
+ return awaitables_out
+
+ async def coro(i):
+ return i
+
+ with contextlib.closing(asyncio.new_event_loop()) as loop:
+ # Coroutines shouldn't be yielded back as finished coroutines
+ # can't be re-used.
+ awaitables_in = frozenset(
+ (coro(0), coro(1), coro(2), coro(3))
+ )
+ awaitables_out = loop.run_until_complete(
+ try_async_iterator(awaitables_in)
+ )
+ if awaitables_in - awaitables_out != awaitables_in:
+ raise self.failureException('Got original coroutines '
+ 'out of as_completed iterator.')
+
+ # Tasks should be yielded back.
+ coro_obj_a = coro('a')
+ task_b = loop.create_task(coro('b'))
+ coro_obj_c = coro('c')
+ task_d = loop.create_task(coro('d'))
+ awaitables_in = frozenset(
+ (coro_obj_a, task_b, coro_obj_c, task_d)
+ )
+ awaitables_out = loop.run_until_complete(
+ try_async_iterator(awaitables_in)
+ )
+ if awaitables_in & awaitables_out != {task_b, task_d}:
+ raise self.failureException('Only tasks should be yielded '
+ 'from as_completed iterator '
+ 'as-is.')
def test_as_completed_with_timeout(self):
@@ -1448,12 +1500,7 @@ class BaseTaskTests:
yield 0
yield 0.1
- loop = self.new_test_loop(gen)
-
- a = loop.create_task(asyncio.sleep(0.1, 'a'))
- b = loop.create_task(asyncio.sleep(0.15, 'b'))
-
- async def foo():
+ async def try_iterator():
values = []
for f in asyncio.as_completed([a, b], timeout=0.12):
if values:
@@ -1465,16 +1512,33 @@ class BaseTaskTests:
values.append((2, exc))
return values
- res = loop.run_until_complete(self.new_task(loop, foo()))
- self.assertEqual(len(res), 2, res)
- self.assertEqual(res[0], (1, 'a'))
- self.assertEqual(res[1][0], 2)
- self.assertIsInstance(res[1][1], asyncio.TimeoutError)
- self.assertAlmostEqual(0.12, loop.time())
+ async def try_async_iterator():
+ values = []
+ try:
+ async for f in asyncio.as_completed([a, b], timeout=0.12):
+ v = await f
+ values.append((1, v))
+ loop.advance_time(0.02)
+ except asyncio.TimeoutError as exc:
+ values.append((2, exc))
+ return values
- # move forward to close generator
- loop.advance_time(10)
- loop.run_until_complete(asyncio.wait([a, b]))
+ for foo in try_iterator, try_async_iterator:
+ with self.subTest(method=foo.__name__):
+ loop = self.new_test_loop(gen)
+ a = loop.create_task(asyncio.sleep(0.1, 'a'))
+ b = loop.create_task(asyncio.sleep(0.15, 'b'))
+
+ res = loop.run_until_complete(self.new_task(loop, foo()))
+ self.assertEqual(len(res), 2, res)
+ self.assertEqual(res[0], (1, 'a'))
+ self.assertEqual(res[1][0], 2)
+ self.assertIsInstance(res[1][1], asyncio.TimeoutError)
+ self.assertAlmostEqual(0.12, loop.time())
+
+ # move forward to close generator
+ loop.advance_time(10)
+ loop.run_until_complete(asyncio.wait([a, b]))
def test_as_completed_with_unused_timeout(self):
@@ -1483,19 +1547,75 @@ class BaseTaskTests:
yield 0
yield 0.01
- loop = self.new_test_loop(gen)
-
- a = asyncio.sleep(0.01, 'a')
-
- async def foo():
+ async def try_iterator():
for f in asyncio.as_completed([a], timeout=1):
v = await f
self.assertEqual(v, 'a')
- loop.run_until_complete(self.new_task(loop, foo()))
+ async def try_async_iterator():
+ async for f in asyncio.as_completed([a], timeout=1):
+ v = await f
+ self.assertEqual(v, 'a')
- def test_as_completed_reverse_wait(self):
+ for foo in try_iterator, try_async_iterator:
+ with self.subTest(method=foo.__name__):
+ a = asyncio.sleep(0.01, 'a')
+ loop = self.new_test_loop(gen)
+ loop.run_until_complete(self.new_task(loop, foo()))
+ loop.close()
+
+ def test_as_completed_resume_iterator(self):
+ # Test that as_completed returns an iterator that can be resumed
+ # the next time iteration is performed (i.e. if __iter__ is called
+ # again)
+ async def try_iterator(awaitables):
+ iterations = 0
+ iterator = asyncio.as_completed(awaitables)
+ collected = []
+ for f in iterator:
+ collected.append(await f)
+ iterations += 1
+ if iterations == 2:
+ break
+ self.assertEqual(len(collected), 2)
+
+ # Resume same iterator:
+ for f in iterator:
+ collected.append(await f)
+ return collected
+
+ async def try_async_iterator(awaitables):
+ iterations = 0
+ iterator = asyncio.as_completed(awaitables)
+ collected = []
+ async for f in iterator:
+ collected.append(await f)
+ iterations += 1
+ if iterations == 2:
+ break
+ self.assertEqual(len(collected), 2)
+
+ # Resume same iterator:
+ async for f in iterator:
+ collected.append(await f)
+ return collected
+
+ async def coro(i):
+ return i
+
+ with contextlib.closing(asyncio.new_event_loop()) as loop:
+ for foo in try_iterator, try_async_iterator:
+ with self.subTest(method=foo.__name__):
+ results = loop.run_until_complete(
+ foo((coro(0), coro(1), coro(2), coro(3)))
+ )
+ self.assertCountEqual(results, (0, 1, 2, 3))
+ def test_as_completed_reverse_wait(self):
+ # Tests the plain iterator style of as_completed iteration to
+ # ensure that the first future awaited resolves to the first
+ # completed awaitable from the set we passed in, even if it wasn't
+ # the first future generated by as_completed.
def gen():
yield 0
yield 0.05
@@ -1522,7 +1642,8 @@ class BaseTaskTests:
loop.run_until_complete(test())
def test_as_completed_concurrent(self):
-
+ # Ensure that more than one future or coroutine yielded from
+ # as_completed can be awaited concurrently.
def gen():
when = yield
self.assertAlmostEqual(0.05, when)
@@ -1530,38 +1651,55 @@ class BaseTaskTests:
self.assertAlmostEqual(0.05, when)
yield 0.05
- a = asyncio.sleep(0.05, 'a')
- b = asyncio.sleep(0.05, 'b')
- fs = {a, b}
+ async def try_iterator(fs):
+ return list(asyncio.as_completed(fs))
- async def test():
- futs = list(asyncio.as_completed(fs))
- self.assertEqual(len(futs), 2)
- done, pending = await asyncio.wait(
- [asyncio.ensure_future(fut) for fut in futs]
- )
- self.assertEqual(set(f.result() for f in done), {'a', 'b'})
+ async def try_async_iterator(fs):
+ return [f async for f in asyncio.as_completed(fs)]
- loop = self.new_test_loop(gen)
- loop.run_until_complete(test())
+ for runner in try_iterator, try_async_iterator:
+ with self.subTest(method=runner.__name__):
+ a = asyncio.sleep(0.05, 'a')
+ b = asyncio.sleep(0.05, 'b')
+ fs = {a, b}
+
+ async def test():
+ futs = await runner(fs)
+ self.assertEqual(len(futs), 2)
+ done, pending = await asyncio.wait(
+ [asyncio.ensure_future(fut) for fut in futs]
+ )
+ self.assertEqual(set(f.result() for f in done), {'a', 'b'})
+
+ loop = self.new_test_loop(gen)
+ loop.run_until_complete(test())
def test_as_completed_duplicate_coroutines(self):
async def coro(s):
return s
- async def runner():
+ async def try_iterator():
result = []
c = coro('ham')
for f in asyncio.as_completed([c, c, coro('spam')]):
result.append(await f)
return result
- fut = self.new_task(self.loop, runner())
- self.loop.run_until_complete(fut)
- result = fut.result()
- self.assertEqual(set(result), {'ham', 'spam'})
- self.assertEqual(len(result), 2)
+ async def try_async_iterator():
+ result = []
+ c = coro('ham')
+ async for f in asyncio.as_completed([c, c, coro('spam')]):
+ result.append(await f)
+ return result
+
+ for runner in try_iterator, try_async_iterator:
+ with self.subTest(method=runner.__name__):
+ fut = self.new_task(self.loop, runner())
+ self.loop.run_until_complete(fut)
+ result = fut.result()
+ self.assertEqual(set(result), {'ham', 'spam'})
+ self.assertEqual(len(result), 2)
def test_as_completed_coroutine_without_loop(self):
async def coro():
@@ -1570,8 +1708,8 @@ class BaseTaskTests:
a = coro()
self.addCleanup(a.close)
- futs = asyncio.as_completed([a])
with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
+ futs = asyncio.as_completed([a])
list(futs)
def test_as_completed_coroutine_use_running_loop(self):
@@ -2044,14 +2182,32 @@ class BaseTaskTests:
self.assertEqual(res, 42)
def test_as_completed_invalid_args(self):
+ # as_completed() expects a list of futures, not a future instance
+ # TypeError should be raised either on iterator construction or first
+ # iteration
+
+ # Plain iterator
fut = self.new_future(self.loop)
+ with self.assertRaises(TypeError):
+ iterator = asyncio.as_completed(fut)
+ next(iterator)
+ coro = coroutine_function()
+ with self.assertRaises(TypeError):
+ iterator = asyncio.as_completed(coro)
+ next(iterator)
+ coro.close()
- # as_completed() expects a list of futures, not a future instance
- self.assertRaises(TypeError, self.loop.run_until_complete,
- asyncio.as_completed(fut))
+ # Async iterator
+ async def try_async_iterator(aw):
+ async for f in asyncio.as_completed(aw):
+ break
+
+ fut = self.new_future(self.loop)
+ with self.assertRaises(TypeError):
+ self.loop.run_until_complete(try_async_iterator(fut))
coro = coroutine_function()
- self.assertRaises(TypeError, self.loop.run_until_complete,
- asyncio.as_completed(coro))
+ with self.assertRaises(TypeError):
+ self.loop.run_until_complete(try_async_iterator(coro))
coro.close()
def test_wait_invalid_args(self):
diff --git a/Misc/NEWS.d/next/Library/2020-10-02-17-35-19.bpo-33533.GLIhM5.rst b/Misc/NEWS.d/next/Library/2020-10-02-17-35-19.bpo-33533.GLIhM5.rst
new file mode 100644
index 0000000..3ffd723
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2020-10-02-17-35-19.bpo-33533.GLIhM5.rst
@@ -0,0 +1,5 @@
+:func:`asyncio.as_completed` now returns an object that is both an asynchronous
+iterator and plain iterator. The new asynchronous iteration pattern allows for
+easier correlation between prior tasks and their completed results. This is
+a closer match to :func:`concurrent.futures.as_completed`'s iteration pattern.
+Patch by Justin Arthur.