diff options
author | Gregory P. Smith <greg@krypto.org> | 2022-05-08 16:20:34 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-08 16:20:34 (GMT) |
commit | ebb37fc3fdcb03db4e206db017eeef7aaffbae84 (patch) | |
tree | 11824a40e84b9841d17040751a98ec4576fda4d9 /Lib/concurrent | |
parent | a84a56d80fa3d9a5909d074bbcd2efff7ef8f1b7 (diff) | |
download | cpython-ebb37fc3fdcb03db4e206db017eeef7aaffbae84.zip cpython-ebb37fc3fdcb03db4e206db017eeef7aaffbae84.tar.gz cpython-ebb37fc3fdcb03db4e206db017eeef7aaffbae84.tar.bz2 |
gh-90622: Do not spawn ProcessPool workers on demand via fork method. (#91598)
Do not spawn ProcessPool workers on demand when they spawn via fork.
This avoids potential deadlocks in the child processes due to forking from
a multithreaded process.
Diffstat (limited to 'Lib/concurrent')
-rw-r--r-- | Lib/concurrent/futures/process.py | 44 |
1 files changed, 34 insertions, 10 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 821034d..7e2f5fa 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -652,6 +652,10 @@ class ProcessPoolExecutor(_base.Executor): mp_context = mp.get_context() self._mp_context = mp_context + # https://github.com/python/cpython/issues/90622 + self._safe_to_dynamically_spawn_children = ( + self._mp_context.get_start_method(allow_none=False) != "fork") + if initializer is not None and not callable(initializer): raise TypeError("initializer must be a callable") self._initializer = initializer @@ -714,6 +718,8 @@ class ProcessPoolExecutor(_base.Executor): def _start_executor_manager_thread(self): if self._executor_manager_thread is None: # Start the processes so that their sentinels are known. + if not self._safe_to_dynamically_spawn_children: # ie, using fork. + self._launch_processes() self._executor_manager_thread = _ExecutorManagerThread(self) self._executor_manager_thread.start() _threads_wakeups[self._executor_manager_thread] = \ @@ -726,15 +732,32 @@ class ProcessPoolExecutor(_base.Executor): process_count = len(self._processes) if process_count < self._max_workers: - p = self._mp_context.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._initializer, - self._initargs, - self._max_tasks_per_child)) - p.start() - self._processes[p.pid] = p + # Assertion disabled as this codepath is also used to replace a + # worker that unexpectedly dies, even when using the 'fork' start + # method. That means there is still a potential deadlock bug. If a + # 'fork' mp_context worker dies, we'll be forking a new one when + # we know a thread is running (self._executor_manager_thread). + #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622' + self._spawn_process() + + def _launch_processes(self): + # https://github.com/python/cpython/issues/90622 + assert not self._executor_manager_thread, ( + 'Processes cannot be fork()ed after the thread has started, ' + 'deadlock in the child processes could result.') + for _ in range(len(self._processes), self._max_workers): + self._spawn_process() + + def _spawn_process(self): + p = self._mp_context.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._initializer, + self._initargs, + self._max_tasks_per_child)) + p.start() + self._processes[p.pid] = p def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: @@ -755,7 +778,8 @@ class ProcessPoolExecutor(_base.Executor): # Wake up queue management thread self._executor_manager_thread_wakeup.wakeup() - self._adjust_process_count() + if self._safe_to_dynamically_spawn_children: + self._adjust_process_count() self._start_executor_manager_thread() return f submit.__doc__ = _base.Executor.submit.__doc__ |