diff options
author | Kyle Stanley <aeros167@gmail.com> | 2020-02-02 12:49:00 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-02 12:49:00 (GMT) |
commit | 339fd46cb764277cbbdc3e78dcc5b45b156bb6ae (patch) | |
tree | 2366d3abf217d3017a50e2b024d67be731a49347 /Lib/concurrent | |
parent | be8147bdc6111a225ec284a4514277304726c3d0 (diff) | |
download | cpython-339fd46cb764277cbbdc3e78dcc5b45b156bb6ae.zip cpython-339fd46cb764277cbbdc3e78dcc5b45b156bb6ae.tar.gz cpython-339fd46cb764277cbbdc3e78dcc5b45b156bb6ae.tar.bz2 |
bpo-39349: Add *cancel_futures* to Executor.shutdown() (GH-18057)
Diffstat (limited to 'Lib/concurrent')
-rw-r--r-- | Lib/concurrent/futures/process.py | 23 | ||||
-rw-r--r-- | Lib/concurrent/futures/thread.py | 15 |
2 files changed, 36 insertions, 2 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 9e2ab9d..fd9f572 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -435,6 +435,24 @@ def _queue_management_worker(executor_reference, # is not gc-ed yet. if executor is not None: executor._shutdown_thread = True + # Unless there are pending work items, we have nothing to cancel. + if pending_work_items and executor._cancel_pending_futures: + # Cancel all pending futures and update pending_work_items + # to only have futures that are currently running. + new_pending_work_items = {} + for work_id, work_item in pending_work_items.items(): + if not work_item.future.cancel(): + new_pending_work_items[work_id] = work_item + + pending_work_items = new_pending_work_items + # Drain work_ids_queue since we no longer need to + # add items to the call queue. + while True: + try: + work_ids_queue.get_nowait() + except queue.Empty: + break + # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: @@ -546,6 +564,7 @@ class ProcessPoolExecutor(_base.Executor): self._broken = False self._queue_count = 0 self._pending_work_items = {} + self._cancel_pending_futures = False # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to @@ -660,9 +679,11 @@ class ProcessPoolExecutor(_base.Executor): timeout=timeout) return _chain_from_iterable_of_lists(results) - def shutdown(self, wait=True): + def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: + self._cancel_pending_futures = cancel_futures self._shutdown_thread = True + if self._queue_management_thread: # Wake up queue management thread self._queue_management_thread_wakeup.wakeup() diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index b89f8f2..be79161 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -215,9 +215,22 @@ class ThreadPoolExecutor(_base.Executor): if work_item is not None: work_item.future.set_exception(BrokenThreadPool(self._broken)) - def shutdown(self, wait=True): + def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: self._shutdown = True + if cancel_futures: + # Drain all work items from the queue, and then cancel their + # associated futures. + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.cancel() + + # Send a wake-up to prevent threads calling + # _work_queue.get(block=True) from permanently blocking. self._work_queue.put(None) if wait: for t in self._threads: |