From 339fd46cb764277cbbdc3e78dcc5b45b156bb6ae Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Sun, 2 Feb 2020 07:49:00 -0500 Subject: bpo-39349: Add *cancel_futures* to Executor.shutdown() (GH-18057) --- Doc/library/concurrent.futures.rst | 14 +++++++- Doc/whatsnew/3.9.rst | 9 +++++ Lib/concurrent/futures/process.py | 23 ++++++++++++- Lib/concurrent/futures/thread.py | 15 ++++++++- Lib/test/test_concurrent_futures.py | 39 ++++++++++++++++++++++ .../2020-01-19-04-12-34.bpo-39349.7CV-LC.rst | 4 +++ 6 files changed, 101 insertions(+), 3 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index d71f2d8..b21d559 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -67,7 +67,7 @@ Executor Objects .. versionchanged:: 3.5 Added the *chunksize* argument. - .. method:: shutdown(wait=True) + .. method:: shutdown(wait=True, \*, cancel_futures=False) Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to @@ -82,6 +82,15 @@ Executor Objects value of *wait*, the entire Python program will not exit until all pending futures are done executing. + If *cancel_futures* is ``True``, this method will cancel all pending + futures that the executor has not started running. Any futures that + are completed or running won't be cancelled, regardless of the value + of *cancel_futures*. + + If both *cancel_futures* and *wait* are ``True``, all futures that the + executor has started running will be completed prior to this method + returning. The remaining futures are cancelled. + You can avoid having to call this method explicitly if you use the :keyword:`with` statement, which will shutdown the :class:`Executor` (waiting as if :meth:`Executor.shutdown` were called with *wait* set to @@ -94,6 +103,9 @@ Executor Objects e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt') + .. versionchanged:: 3.9 + Added *cancel_futures*. + ThreadPoolExecutor ------------------ diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst index ee37b5a..931f8bf 100644 --- a/Doc/whatsnew/3.9.rst +++ b/Doc/whatsnew/3.9.rst @@ -146,6 +146,15 @@ that schedules a shutdown for the default executor that waits on the Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher implementation that polls process file descriptors. (:issue:`38692`) +concurrent.futures +------------------ + +Added a new *cancel_futures* parameter to +:meth:`concurrent.futures.Executor.shutdown` that cancels all pending futures +which have not started running, instead of waiting for them to complete before +shutting down the executor. +(Contributed by Kyle Stanley in :issue:`39349`.) + curses ------ diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 9e2ab9d..fd9f572 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -435,6 +435,24 @@ def _queue_management_worker(executor_reference, # is not gc-ed yet. if executor is not None: executor._shutdown_thread = True + # Unless there are pending work items, we have nothing to cancel. + if pending_work_items and executor._cancel_pending_futures: + # Cancel all pending futures and update pending_work_items + # to only have futures that are currently running. + new_pending_work_items = {} + for work_id, work_item in pending_work_items.items(): + if not work_item.future.cancel(): + new_pending_work_items[work_id] = work_item + + pending_work_items = new_pending_work_items + # Drain work_ids_queue since we no longer need to + # add items to the call queue. + while True: + try: + work_ids_queue.get_nowait() + except queue.Empty: + break + # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: @@ -546,6 +564,7 @@ class ProcessPoolExecutor(_base.Executor): self._broken = False self._queue_count = 0 self._pending_work_items = {} + self._cancel_pending_futures = False # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to @@ -660,9 +679,11 @@ class ProcessPoolExecutor(_base.Executor): timeout=timeout) return _chain_from_iterable_of_lists(results) - def shutdown(self, wait=True): + def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: + self._cancel_pending_futures = cancel_futures self._shutdown_thread = True + if self._queue_management_thread: # Wake up queue management thread self._queue_management_thread_wakeup.wakeup() diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index b89f8f2..be79161 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -215,9 +215,22 @@ class ThreadPoolExecutor(_base.Executor): if work_item is not None: work_item.future.set_exception(BrokenThreadPool(self._broken)) - def shutdown(self, wait=True): + def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: self._shutdown = True + if cancel_futures: + # Drain all work items from the queue, and then cancel their + # associated futures. + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.cancel() + + # Send a wake-up to prevent threads calling + # _work_queue.get(block=True) from permanently blocking. self._work_queue.put(None) if wait: for t in self._threads: diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index c8fa35e..af77f81 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -342,6 +342,29 @@ class ExecutorShutdownTest: for f in fs: f.result() + def test_cancel_futures(self): + executor = self.executor_type(max_workers=3) + fs = [executor.submit(time.sleep, .1) for _ in range(50)] + executor.shutdown(cancel_futures=True) + # We can't guarantee the exact number of cancellations, but we can + # guarantee that *some* were cancelled. With setting max_workers to 3, + # most of the submitted futures should have been cancelled. + cancelled = [fut for fut in fs if fut.cancelled()] + self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}") + + # Ensure the other futures were able to finish. + # Use "not fut.cancelled()" instead of "fut.done()" to include futures + # that may have been left in a pending state. + others = [fut for fut in fs if not fut.cancelled()] + for fut in others: + self.assertTrue(fut.done(), msg=f"{fut._state=}") + self.assertIsNone(fut.exception()) + + # Similar to the number of cancelled futures, we can't guarantee the + # exact number that completed. But, we can guarantee that at least + # one finished. + self.assertTrue(len(others) > 0, msg=f"{len(others)=}") + def test_hang_issue39205(self): """shutdown(wait=False) doesn't hang at exit with running futures. @@ -422,6 +445,22 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') t.join() + def test_cancel_futures_wait_false(self): + # Can only be reliably tested for TPE, since PPE often hangs with + # `wait=False` (even without *cancel_futures*). + rc, out, err = assert_python_ok('-c', """if True: + from concurrent.futures import ThreadPoolExecutor + from test.test_concurrent_futures import sleep_and_print + if __name__ == "__main__": + t = ThreadPoolExecutor() + t.submit(sleep_and_print, .1, "apple") + t.shutdown(wait=False, cancel_futures=True) + """.format(executor_type=self.executor_type.__name__)) + # Errors in atexit hooks don't change the process exit code, check + # stderr manually. + self.assertFalse(err) + self.assertEqual(out.strip(), b"apple") + class ProcessPoolShutdownTest(ExecutorShutdownTest): def _prime_executor(self): diff --git a/Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst b/Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst new file mode 100644 index 0000000..cc52700 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst @@ -0,0 +1,4 @@ +Added a new *cancel_futures* parameter to +:meth:`concurrent.futures.Executor.shutdown` that cancels all pending futures +which have not started running, instead of waiting for them to complete before +shutting down the executor. \ No newline at end of file -- cgit v0.12