diff options
author | Antoine Pitrou <pitrou@free.fr> | 2017-09-01 17:16:46 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-01 17:16:46 (GMT) |
commit | ea767915f7476c1fe97f7b1a53304d57f105bdd2 (patch) | |
tree | c9945795ded82474153e56aca0560d58cc30ce51 /Lib/concurrent/futures/_base.py | |
parent | 98c849a2f32f6727239b4cce38b8f0ff8adeef22 (diff) | |
download | cpython-ea767915f7476c1fe97f7b1a53304d57f105bdd2.zip cpython-ea767915f7476c1fe97f7b1a53304d57f105bdd2.tar.gz cpython-ea767915f7476c1fe97f7b1a53304d57f105bdd2.tar.bz2 |
[3.6] bpo-27144: concurrent.futures as_complete and map iterators do not keep reference to returned object (GH-1560) (#3266)
bpo-27144: concurrent.futures as_complie and map iterators do not keep
reference to returned object
(cherry picked from commit 97e1b1c81458d2109b2ffed32ffa1eb643a6c3b9)
Diffstat (limited to 'Lib/concurrent/futures/_base.py')
-rw-r--r-- | Lib/concurrent/futures/_base.py | 37 |
1 files changed, 28 insertions, 9 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 295489c..88521ae 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -170,6 +170,20 @@ def _create_and_install_waiters(fs, return_when): return waiter + +def _yield_and_decref(fs, ref_collect): + """ + Iterate on the list *fs*, yielding objects one by one in reverse order. + Before yielding an object, it is removed from each set in + the collection of sets *ref_collect*. + """ + while fs: + for futures_set in ref_collect: + futures_set.remove(fs[-1]) + # Careful not to keep a reference to the popped value + yield fs.pop() + + def as_completed(fs, timeout=None): """An iterator over the given futures that yields each as it completes. @@ -191,6 +205,8 @@ def as_completed(fs, timeout=None): if timeout is not None: end_time = timeout + time.time() + total_futures = len(fs) + fs = set(fs) with _AcquireFutures(fs): finished = set( @@ -198,9 +214,9 @@ def as_completed(fs, timeout=None): if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) pending = fs - finished waiter = _create_and_install_waiters(fs, _AS_COMPLETED) - + finished = list(finished) try: - yield from finished + yield from _yield_and_decref(finished, ref_collect=(fs,)) while pending: if timeout is None: @@ -210,7 +226,7 @@ def as_completed(fs, timeout=None): if wait_timeout < 0: raise TimeoutError( '%d (of %d) futures unfinished' % ( - len(pending), len(fs))) + len(pending), total_futures)) waiter.event.wait(wait_timeout) @@ -219,9 +235,9 @@ def as_completed(fs, timeout=None): waiter.finished_futures = [] waiter.event.clear() - for future in finished: - yield future - pending.remove(future) + # reverse to keep finishing order + finished.reverse() + yield from _yield_and_decref(finished, ref_collect=(fs, pending)) finally: for f in fs: @@ -551,11 +567,14 @@ class Executor(object): # before the first iterator value is required. def result_iterator(): try: - for future in fs: + # reverse to keep finishing order + fs.reverse() + while fs: + # Careful not to keep a reference to the popped future if timeout is None: - yield future.result() + yield fs.pop().result() else: - yield future.result(end_time - time.time()) + yield fs.pop().result(end_time - time.time()) finally: for future in fs: future.cancel() |