summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Grainger <tagrain@gmail.com>2022-07-28 09:20:10 (GMT)
committerGitHub <noreply@github.com>2022-07-28 09:20:10 (GMT)
commite16d4ed59072839b49bda4b447f260201aae7e39 (patch)
tree57a11cdeb219ecb0e85b4548cf5aa1025ad1a479
parentb8b2990fb3218cffedfe7bc92e9e7ae2275b3c98 (diff)
downloadcpython-e16d4ed59072839b49bda4b447f260201aae7e39.zip
cpython-e16d4ed59072839b49bda4b447f260201aae7e39.tar.gz
cpython-e16d4ed59072839b49bda4b447f260201aae7e39.tar.bz2
gh-95166: cancel map waited on future on timeout (GH-95169)
Co-authored-by: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com>
-rw-r--r--Lib/concurrent/futures/_base.py16
-rw-r--r--Lib/test/test_concurrent_futures.py27
-rw-r--r--Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst1
3 files changed, 42 insertions, 2 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index d7e7e41..6742a07 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -310,6 +310,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, fs - done)
+
+def _result_or_cancel(fut, timeout=None):
+ try:
+ try:
+ return fut.result(timeout)
+ finally:
+ fut.cancel()
+ finally:
+ # Break a reference cycle with the exception in self._exception
+ del fut
+
+
class Future(object):
"""Represents the result of an asynchronous computation."""
@@ -604,9 +616,9 @@ class Executor(object):
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
- yield fs.pop().result()
+ yield _result_or_cancel(fs.pop())
else:
- yield fs.pop().result(end_time - time.monotonic())
+ yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index e294bd3..fe9fdc4 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -932,6 +932,33 @@ class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
workers.submit(tuple)
+ def test_executor_map_current_future_cancel(self):
+ stop_event = threading.Event()
+ log = []
+
+ def log_n_wait(ident):
+ log.append(f"{ident=} started")
+ try:
+ stop_event.wait()
+ finally:
+ log.append(f"{ident=} stopped")
+
+ with self.executor_type(max_workers=1) as pool:
+ # submit work to saturate the pool
+ fut = pool.submit(log_n_wait, ident="first")
+ try:
+ with contextlib.closing(
+ pool.map(log_n_wait, ["second", "third"], timeout=0)
+ ) as gen:
+ with self.assertRaises(TimeoutError):
+ next(gen)
+ finally:
+ stop_event.set()
+ fut.result()
+ # ident='second' is cancelled as a result of raising a TimeoutError
+ # ident='third' is cancelled because it remained in the collection of futures
+ self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
+
class ProcessPoolExecutorTest(ExecutorTest):
diff --git a/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst b/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst
new file mode 100644
index 0000000..34b0170
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst
@@ -0,0 +1 @@
+Fix :meth:`concurrent.futures.Executor.map` to cancel the currently waiting on future on an error - e.g. TimeoutError or KeyboardInterrupt.