summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent/futures/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent/futures/process.py')
-rw-r--r--Lib/concurrent/futures/process.py71
1 files changed, 71 insertions, 0 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 42eee72..d79d6b9 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -626,6 +626,14 @@ class BrokenProcessPool(_base.BrokenExecutor):
while a future was in the running state.
"""
+_TERMINATE = "terminate"
+_KILL = "kill"
+
+_SHUTDOWN_CALLBACK_OPERATION = {
+ _TERMINATE,
+ _KILL
+}
+
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
@@ -855,3 +863,66 @@ class ProcessPoolExecutor(_base.Executor):
self._executor_manager_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__
+
+ def _force_shutdown(self, operation):
+ """Attempts to terminate or kill the executor's workers based off the
+ given operation. Iterates through all of the current processes and
+ performs the relevant task if the process is still alive.
+
+ After terminating workers, the pool will be in a broken state
+ and no longer usable (for instance, new tasks should not be
+ submitted).
+ """
+ if operation not in _SHUTDOWN_CALLBACK_OPERATION:
+ raise ValueError(f"Unsupported operation: {operation!r}")
+
+ processes = {}
+ if self._processes:
+ processes = self._processes.copy()
+
+ # shutdown will invalidate ._processes, so we copy it right before
+ # calling. If we waited here, we would deadlock if a process decides not
+ # to exit.
+ self.shutdown(wait=False, cancel_futures=True)
+
+ if not processes:
+ return
+
+ for proc in processes.values():
+ try:
+ if not proc.is_alive():
+ continue
+ except ValueError:
+ # The process is already exited/closed out.
+ continue
+
+ try:
+ if operation == _TERMINATE:
+ proc.terminate()
+ elif operation == _KILL:
+ proc.kill()
+ except ProcessLookupError:
+ # The process just ended before our signal
+ continue
+
+ def terminate_workers(self):
+ """Attempts to terminate the executor's workers.
+ Iterates through all of the current worker processes and terminates
+ each one that is still alive.
+
+ After terminating workers, the pool will be in a broken state
+ and no longer usable (for instance, new tasks should not be
+ submitted).
+ """
+ return self._force_shutdown(operation=_TERMINATE)
+
+ def kill_workers(self):
+ """Attempts to kill the executor's workers.
+ Iterates through all of the current worker processes and kills
+ each one that is still alive.
+
+ After killing workers, the pool will be in a broken state
+ and no longer usable (for instance, new tasks should not be
+ submitted).
+ """
+ return self._force_shutdown(operation=_KILL)