summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent/futures/_base.py
diff options
context:
space:
mode:
authorAntoine Pitrou <pitrou@free.fr>2017-09-01 17:16:46 (GMT)
committerGitHub <noreply@github.com>2017-09-01 17:16:46 (GMT)
commitea767915f7476c1fe97f7b1a53304d57f105bdd2 (patch)
treec9945795ded82474153e56aca0560d58cc30ce51 /Lib/concurrent/futures/_base.py
parent98c849a2f32f6727239b4cce38b8f0ff8adeef22 (diff)
downloadcpython-ea767915f7476c1fe97f7b1a53304d57f105bdd2.zip
cpython-ea767915f7476c1fe97f7b1a53304d57f105bdd2.tar.gz
cpython-ea767915f7476c1fe97f7b1a53304d57f105bdd2.tar.bz2
[3.6] bpo-27144: concurrent.futures as_complete and map iterators do not keep reference to returned object (GH-1560) (#3266)
bpo-27144: concurrent.futures as_complie and map iterators do not keep reference to returned object (cherry picked from commit 97e1b1c81458d2109b2ffed32ffa1eb643a6c3b9)
Diffstat (limited to 'Lib/concurrent/futures/_base.py')
-rw-r--r--Lib/concurrent/futures/_base.py37
1 files changed, 28 insertions, 9 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index 295489c..88521ae 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -170,6 +170,20 @@ def _create_and_install_waiters(fs, return_when):
return waiter
+
+def _yield_and_decref(fs, ref_collect):
+ """
+ Iterate on the list *fs*, yielding objects one by one in reverse order.
+ Before yielding an object, it is removed from each set in
+ the collection of sets *ref_collect*.
+ """
+ while fs:
+ for futures_set in ref_collect:
+ futures_set.remove(fs[-1])
+ # Careful not to keep a reference to the popped value
+ yield fs.pop()
+
+
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
@@ -191,6 +205,8 @@ def as_completed(fs, timeout=None):
if timeout is not None:
end_time = timeout + time.time()
+ total_futures = len(fs)
+
fs = set(fs)
with _AcquireFutures(fs):
finished = set(
@@ -198,9 +214,9 @@ def as_completed(fs, timeout=None):
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
-
+ finished = list(finished)
try:
- yield from finished
+ yield from _yield_and_decref(finished, ref_collect=(fs,))
while pending:
if timeout is None:
@@ -210,7 +226,7 @@ def as_completed(fs, timeout=None):
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
- len(pending), len(fs)))
+ len(pending), total_futures))
waiter.event.wait(wait_timeout)
@@ -219,9 +235,9 @@ def as_completed(fs, timeout=None):
waiter.finished_futures = []
waiter.event.clear()
- for future in finished:
- yield future
- pending.remove(future)
+ # reverse to keep finishing order
+ finished.reverse()
+ yield from _yield_and_decref(finished, ref_collect=(fs, pending))
finally:
for f in fs:
@@ -551,11 +567,14 @@ class Executor(object):
# before the first iterator value is required.
def result_iterator():
try:
- for future in fs:
+ # reverse to keep finishing order
+ fs.reverse()
+ while fs:
+ # Careful not to keep a reference to the popped future
if timeout is None:
- yield future.result()
+ yield fs.pop().result()
else:
- yield future.result(end_time - time.time())
+ yield fs.pop().result(end_time - time.time())
finally:
for future in fs:
future.cancel()