diff options
Diffstat (limited to 'Lib/concurrent/futures/process.py')
-rw-r--r-- | Lib/concurrent/futures/process.py | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 42eee72..d79d6b9 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -626,6 +626,14 @@ class BrokenProcessPool(_base.BrokenExecutor): while a future was in the running state. """ +_TERMINATE = "terminate" +_KILL = "kill" + +_SHUTDOWN_CALLBACK_OPERATION = { + _TERMINATE, + _KILL +} + class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None, mp_context=None, @@ -855,3 +863,66 @@ class ProcessPoolExecutor(_base.Executor): self._executor_manager_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ + + def _force_shutdown(self, operation): + """Attempts to terminate or kill the executor's workers based off the + given operation. Iterates through all of the current processes and + performs the relevant task if the process is still alive. + + After terminating workers, the pool will be in a broken state + and no longer usable (for instance, new tasks should not be + submitted). + """ + if operation not in _SHUTDOWN_CALLBACK_OPERATION: + raise ValueError(f"Unsupported operation: {operation!r}") + + processes = {} + if self._processes: + processes = self._processes.copy() + + # shutdown will invalidate ._processes, so we copy it right before + # calling. If we waited here, we would deadlock if a process decides not + # to exit. + self.shutdown(wait=False, cancel_futures=True) + + if not processes: + return + + for proc in processes.values(): + try: + if not proc.is_alive(): + continue + except ValueError: + # The process is already exited/closed out. + continue + + try: + if operation == _TERMINATE: + proc.terminate() + elif operation == _KILL: + proc.kill() + except ProcessLookupError: + # The process just ended before our signal + continue + + def terminate_workers(self): + """Attempts to terminate the executor's workers. + Iterates through all of the current worker processes and terminates + each one that is still alive. + + After terminating workers, the pool will be in a broken state + and no longer usable (for instance, new tasks should not be + submitted). + """ + return self._force_shutdown(operation=_TERMINATE) + + def kill_workers(self): + """Attempts to kill the executor's workers. + Iterates through all of the current worker processes and kills + each one that is still alive. + + After killing workers, the pool will be in a broken state + and no longer usable (for instance, new tasks should not be + submitted). + """ + return self._force_shutdown(operation=_KILL) |