summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKyle Stanley <aeros167@gmail.com>2020-02-02 12:49:00 (GMT)
committerGitHub <noreply@github.com>2020-02-02 12:49:00 (GMT)
commit339fd46cb764277cbbdc3e78dcc5b45b156bb6ae (patch)
tree2366d3abf217d3017a50e2b024d67be731a49347
parentbe8147bdc6111a225ec284a4514277304726c3d0 (diff)
downloadcpython-339fd46cb764277cbbdc3e78dcc5b45b156bb6ae.zip
cpython-339fd46cb764277cbbdc3e78dcc5b45b156bb6ae.tar.gz
cpython-339fd46cb764277cbbdc3e78dcc5b45b156bb6ae.tar.bz2
bpo-39349: Add *cancel_futures* to Executor.shutdown() (GH-18057)
-rw-r--r--Doc/library/concurrent.futures.rst14
-rw-r--r--Doc/whatsnew/3.9.rst9
-rw-r--r--Lib/concurrent/futures/process.py23
-rw-r--r--Lib/concurrent/futures/thread.py15
-rw-r--r--Lib/test/test_concurrent_futures.py39
-rw-r--r--Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst4
6 files changed, 101 insertions, 3 deletions
diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst
index d71f2d8..b21d559 100644
--- a/Doc/library/concurrent.futures.rst
+++ b/Doc/library/concurrent.futures.rst
@@ -67,7 +67,7 @@ Executor Objects
.. versionchanged:: 3.5
Added the *chunksize* argument.
- .. method:: shutdown(wait=True)
+ .. method:: shutdown(wait=True, \*, cancel_futures=False)
Signal the executor that it should free any resources that it is using
when the currently pending futures are done executing. Calls to
@@ -82,6 +82,15 @@ Executor Objects
value of *wait*, the entire Python program will not exit until all
pending futures are done executing.
+ If *cancel_futures* is ``True``, this method will cancel all pending
+ futures that the executor has not started running. Any futures that
+ are completed or running won't be cancelled, regardless of the value
+ of *cancel_futures*.
+
+ If both *cancel_futures* and *wait* are ``True``, all futures that the
+ executor has started running will be completed prior to this method
+ returning. The remaining futures are cancelled.
+
You can avoid having to call this method explicitly if you use the
:keyword:`with` statement, which will shutdown the :class:`Executor`
(waiting as if :meth:`Executor.shutdown` were called with *wait* set to
@@ -94,6 +103,9 @@ Executor Objects
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
+ .. versionchanged:: 3.9
+ Added *cancel_futures*.
+
ThreadPoolExecutor
------------------
diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst
index ee37b5a..931f8bf 100644
--- a/Doc/whatsnew/3.9.rst
+++ b/Doc/whatsnew/3.9.rst
@@ -146,6 +146,15 @@ that schedules a shutdown for the default executor that waits on the
Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher
implementation that polls process file descriptors. (:issue:`38692`)
+concurrent.futures
+------------------
+
+Added a new *cancel_futures* parameter to
+:meth:`concurrent.futures.Executor.shutdown` that cancels all pending futures
+which have not started running, instead of waiting for them to complete before
+shutting down the executor.
+(Contributed by Kyle Stanley in :issue:`39349`.)
+
curses
------
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 9e2ab9d..fd9f572 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -435,6 +435,24 @@ def _queue_management_worker(executor_reference,
# is not gc-ed yet.
if executor is not None:
executor._shutdown_thread = True
+ # Unless there are pending work items, we have nothing to cancel.
+ if pending_work_items and executor._cancel_pending_futures:
+ # Cancel all pending futures and update pending_work_items
+ # to only have futures that are currently running.
+ new_pending_work_items = {}
+ for work_id, work_item in pending_work_items.items():
+ if not work_item.future.cancel():
+ new_pending_work_items[work_id] = work_item
+
+ pending_work_items = new_pending_work_items
+ # Drain work_ids_queue since we no longer need to
+ # add items to the call queue.
+ while True:
+ try:
+ work_ids_queue.get_nowait()
+ except queue.Empty:
+ break
+
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
@@ -546,6 +564,7 @@ class ProcessPoolExecutor(_base.Executor):
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
+ self._cancel_pending_futures = False
# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
@@ -660,9 +679,11 @@ class ProcessPoolExecutor(_base.Executor):
timeout=timeout)
return _chain_from_iterable_of_lists(results)
- def shutdown(self, wait=True):
+ def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
+ self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True
+
if self._queue_management_thread:
# Wake up queue management thread
self._queue_management_thread_wakeup.wakeup()
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index b89f8f2..be79161 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -215,9 +215,22 @@ class ThreadPoolExecutor(_base.Executor):
if work_item is not None:
work_item.future.set_exception(BrokenThreadPool(self._broken))
- def shutdown(self, wait=True):
+ def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
+ if cancel_futures:
+ # Drain all work items from the queue, and then cancel their
+ # associated futures.
+ while True:
+ try:
+ work_item = self._work_queue.get_nowait()
+ except queue.Empty:
+ break
+ if work_item is not None:
+ work_item.future.cancel()
+
+ # Send a wake-up to prevent threads calling
+ # _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads:
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index c8fa35e..af77f81 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -342,6 +342,29 @@ class ExecutorShutdownTest:
for f in fs:
f.result()
+ def test_cancel_futures(self):
+ executor = self.executor_type(max_workers=3)
+ fs = [executor.submit(time.sleep, .1) for _ in range(50)]
+ executor.shutdown(cancel_futures=True)
+ # We can't guarantee the exact number of cancellations, but we can
+ # guarantee that *some* were cancelled. With setting max_workers to 3,
+ # most of the submitted futures should have been cancelled.
+ cancelled = [fut for fut in fs if fut.cancelled()]
+ self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
+
+ # Ensure the other futures were able to finish.
+ # Use "not fut.cancelled()" instead of "fut.done()" to include futures
+ # that may have been left in a pending state.
+ others = [fut for fut in fs if not fut.cancelled()]
+ for fut in others:
+ self.assertTrue(fut.done(), msg=f"{fut._state=}")
+ self.assertIsNone(fut.exception())
+
+ # Similar to the number of cancelled futures, we can't guarantee the
+ # exact number that completed. But, we can guarantee that at least
+ # one finished.
+ self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
+
def test_hang_issue39205(self):
"""shutdown(wait=False) doesn't hang at exit with running futures.
@@ -422,6 +445,22 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase
self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
t.join()
+ def test_cancel_futures_wait_false(self):
+ # Can only be reliably tested for TPE, since PPE often hangs with
+ # `wait=False` (even without *cancel_futures*).
+ rc, out, err = assert_python_ok('-c', """if True:
+ from concurrent.futures import ThreadPoolExecutor
+ from test.test_concurrent_futures import sleep_and_print
+ if __name__ == "__main__":
+ t = ThreadPoolExecutor()
+ t.submit(sleep_and_print, .1, "apple")
+ t.shutdown(wait=False, cancel_futures=True)
+ """.format(executor_type=self.executor_type.__name__))
+ # Errors in atexit hooks don't change the process exit code, check
+ # stderr manually.
+ self.assertFalse(err)
+ self.assertEqual(out.strip(), b"apple")
+
class ProcessPoolShutdownTest(ExecutorShutdownTest):
def _prime_executor(self):
diff --git a/Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst b/Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst
new file mode 100644
index 0000000..cc52700
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst
@@ -0,0 +1,4 @@
+Added a new *cancel_futures* parameter to
+:meth:`concurrent.futures.Executor.shutdown` that cancels all pending futures
+which have not started running, instead of waiting for them to complete before
+shutting down the executor. \ No newline at end of file