summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent
diff options
context:
space:
mode:
authorKyle Stanley <aeros167@gmail.com>2020-02-02 12:49:00 (GMT)
committerGitHub <noreply@github.com>2020-02-02 12:49:00 (GMT)
commit339fd46cb764277cbbdc3e78dcc5b45b156bb6ae (patch)
tree2366d3abf217d3017a50e2b024d67be731a49347 /Lib/concurrent
parentbe8147bdc6111a225ec284a4514277304726c3d0 (diff)
downloadcpython-339fd46cb764277cbbdc3e78dcc5b45b156bb6ae.zip
cpython-339fd46cb764277cbbdc3e78dcc5b45b156bb6ae.tar.gz
cpython-339fd46cb764277cbbdc3e78dcc5b45b156bb6ae.tar.bz2
bpo-39349: Add *cancel_futures* to Executor.shutdown() (GH-18057)
Diffstat (limited to 'Lib/concurrent')
-rw-r--r--Lib/concurrent/futures/process.py23
-rw-r--r--Lib/concurrent/futures/thread.py15
2 files changed, 36 insertions, 2 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 9e2ab9d..fd9f572 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -435,6 +435,24 @@ def _queue_management_worker(executor_reference,
# is not gc-ed yet.
if executor is not None:
executor._shutdown_thread = True
+ # Unless there are pending work items, we have nothing to cancel.
+ if pending_work_items and executor._cancel_pending_futures:
+ # Cancel all pending futures and update pending_work_items
+ # to only have futures that are currently running.
+ new_pending_work_items = {}
+ for work_id, work_item in pending_work_items.items():
+ if not work_item.future.cancel():
+ new_pending_work_items[work_id] = work_item
+
+ pending_work_items = new_pending_work_items
+ # Drain work_ids_queue since we no longer need to
+ # add items to the call queue.
+ while True:
+ try:
+ work_ids_queue.get_nowait()
+ except queue.Empty:
+ break
+
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
@@ -546,6 +564,7 @@ class ProcessPoolExecutor(_base.Executor):
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
+ self._cancel_pending_futures = False
# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
@@ -660,9 +679,11 @@ class ProcessPoolExecutor(_base.Executor):
timeout=timeout)
return _chain_from_iterable_of_lists(results)
- def shutdown(self, wait=True):
+ def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
+ self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True
+
if self._queue_management_thread:
# Wake up queue management thread
self._queue_management_thread_wakeup.wakeup()
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index b89f8f2..be79161 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -215,9 +215,22 @@ class ThreadPoolExecutor(_base.Executor):
if work_item is not None:
work_item.future.set_exception(BrokenThreadPool(self._broken))
- def shutdown(self, wait=True):
+ def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
+ if cancel_futures:
+ # Drain all work items from the queue, and then cancel their
+ # associated futures.
+ while True:
+ try:
+ work_item = self._work_queue.get_nowait()
+ except queue.Empty:
+ break
+ if work_item is not None:
+ work_item.future.cancel()
+
+ # Send a wake-up to prevent threads calling
+ # _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads: