summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGrzegorz Grzywacz <grzgrzgrz3@gmail.com>2017-09-01 16:54:00 (GMT)
committerAntoine Pitrou <pitrou@free.fr>2017-09-01 16:54:00 (GMT)
commit97e1b1c81458d2109b2ffed32ffa1eb643a6c3b9 (patch)
treee87029349aa808857ea5e2d1582f0f9f66f0cfad
parent16432beadb8eba079c9786cc0c0eaacfd9fd2f7b (diff)
downloadcpython-97e1b1c81458d2109b2ffed32ffa1eb643a6c3b9.zip
cpython-97e1b1c81458d2109b2ffed32ffa1eb643a6c3b9.tar.gz
cpython-97e1b1c81458d2109b2ffed32ffa1eb643a6c3b9.tar.bz2
bpo-27144: concurrent.futures as_complete and map iterators do not keep reference to returned object (#1560)
* bpo-27144: concurrent.futures as_complie and map iterators do not keep reference to returned object * Some nits. Improve wordings in docstrings and comments, and avoid relying on sys.getrefcount() in tests.
-rw-r--r--Lib/concurrent/futures/_base.py37
-rw-r--r--Lib/concurrent/futures/process.py14
-rw-r--r--Lib/test/test_concurrent_futures.py48
-rw-r--r--Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst2
4 files changed, 91 insertions, 10 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index 295489c..88521ae 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -170,6 +170,20 @@ def _create_and_install_waiters(fs, return_when):
return waiter
+
+def _yield_and_decref(fs, ref_collect):
+ """
+ Iterate on the list *fs*, yielding objects one by one in reverse order.
+ Before yielding an object, it is removed from each set in
+ the collection of sets *ref_collect*.
+ """
+ while fs:
+ for futures_set in ref_collect:
+ futures_set.remove(fs[-1])
+ # Careful not to keep a reference to the popped value
+ yield fs.pop()
+
+
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
@@ -191,6 +205,8 @@ def as_completed(fs, timeout=None):
if timeout is not None:
end_time = timeout + time.time()
+ total_futures = len(fs)
+
fs = set(fs)
with _AcquireFutures(fs):
finished = set(
@@ -198,9 +214,9 @@ def as_completed(fs, timeout=None):
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
-
+ finished = list(finished)
try:
- yield from finished
+ yield from _yield_and_decref(finished, ref_collect=(fs,))
while pending:
if timeout is None:
@@ -210,7 +226,7 @@ def as_completed(fs, timeout=None):
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
- len(pending), len(fs)))
+ len(pending), total_futures))
waiter.event.wait(wait_timeout)
@@ -219,9 +235,9 @@ def as_completed(fs, timeout=None):
waiter.finished_futures = []
waiter.event.clear()
- for future in finished:
- yield future
- pending.remove(future)
+ # reverse to keep finishing order
+ finished.reverse()
+ yield from _yield_and_decref(finished, ref_collect=(fs, pending))
finally:
for f in fs:
@@ -551,11 +567,14 @@ class Executor(object):
# before the first iterator value is required.
def result_iterator():
try:
- for future in fs:
+ # reverse to keep finishing order
+ fs.reverse()
+ while fs:
+ # Careful not to keep a reference to the popped future
if timeout is None:
- yield future.result()
+ yield fs.pop().result()
else:
- yield future.result(end_time - time.time())
+ yield fs.pop().result(end_time - time.time())
finally:
for future in fs:
future.cancel()
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 8f1d714..03b28ab 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -357,6 +357,18 @@ def _check_system_limits():
raise NotImplementedError(_system_limited)
+def _chain_from_iterable_of_lists(iterable):
+ """
+ Specialized implementation of itertools.chain.from_iterable.
+ Each item in *iterable* should be a list. This function is
+ careful not to keep references to yielded objects.
+ """
+ for element in iterable:
+ element.reverse()
+ while element:
+ yield element.pop()
+
+
class BrokenProcessPool(RuntimeError):
"""
Raised when a process in a ProcessPoolExecutor terminated abruptly
@@ -482,7 +494,7 @@ class ProcessPoolExecutor(_base.Executor):
results = super().map(partial(_process_chunk, fn),
_get_chunks(*iterables, chunksize=chunksize),
timeout=timeout)
- return itertools.chain.from_iterable(results)
+ return _chain_from_iterable_of_lists(results)
def shutdown(self, wait=True):
with self._shutdown_lock:
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index ebc30a4..f1226fe 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -59,6 +59,10 @@ class MyObject(object):
pass
+def make_dummy_object(_):
+ return MyObject()
+
+
class BaseTestCase(unittest.TestCase):
def setUp(self):
self._thread_key = test.support.threading_setup()
@@ -396,6 +400,38 @@ class AsCompletedTests:
completed = [f for f in futures.as_completed([future1,future1])]
self.assertEqual(len(completed), 1)
+ def test_free_reference_yielded_future(self):
+ # Issue #14406: Generator should not keep references
+ # to finished futures.
+ futures_list = [Future() for _ in range(8)]
+ futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
+ futures_list.append(create_future(state=SUCCESSFUL_FUTURE))
+
+ with self.assertRaises(futures.TimeoutError):
+ for future in futures.as_completed(futures_list, timeout=0):
+ futures_list.remove(future)
+ wr = weakref.ref(future)
+ del future
+ self.assertIsNone(wr())
+
+ futures_list[0].set_result("test")
+ for future in futures.as_completed(futures_list):
+ futures_list.remove(future)
+ wr = weakref.ref(future)
+ del future
+ self.assertIsNone(wr())
+ if futures_list:
+ futures_list[0].set_result("test")
+
+ def test_correct_timeout_exception_msg(self):
+ futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
+ RUNNING_FUTURE, SUCCESSFUL_FUTURE]
+
+ with self.assertRaises(futures.TimeoutError) as cm:
+ list(futures.as_completed(futures_list, timeout=0))
+
+ self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
+
class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, BaseTestCase):
pass
@@ -421,6 +457,10 @@ class ExecutorTest:
list(self.executor.map(pow, range(10), range(10))),
list(map(pow, range(10), range(10))))
+ self.assertEqual(
+ list(self.executor.map(pow, range(10), range(10), chunksize=3)),
+ list(map(pow, range(10), range(10))))
+
def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1))
@@ -471,6 +511,14 @@ class ExecutorTest:
"than 0"):
self.executor_type(max_workers=number)
+ def test_free_reference(self):
+ # Issue #14406: Result iterator should not keep an internal
+ # reference to result objects.
+ for obj in self.executor.map(make_dummy_object, range(10)):
+ wr = weakref.ref(obj)
+ del obj
+ self.assertIsNone(wr())
+
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
def test_map_submits_without_iteration(self):
diff --git a/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst b/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst
new file mode 100644
index 0000000..857fad0
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst
@@ -0,0 +1,2 @@
+The ``map()`` and ``as_completed()`` iterators in ``concurrent.futures``
+now avoid keeping a reference to yielded objects. \ No newline at end of file