summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/concurrent/futures/process.py44
-rw-r--r--Lib/test/test_concurrent_futures.py12
-rw-r--r--Misc/NEWS.d/next/Library/2022-04-15-22-07-36.gh-issue-90622.0C6l8h.rst4
3 files changed, 49 insertions, 11 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 821034d..7e2f5fa 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -652,6 +652,10 @@ class ProcessPoolExecutor(_base.Executor):
mp_context = mp.get_context()
self._mp_context = mp_context
+ # https://github.com/python/cpython/issues/90622
+ self._safe_to_dynamically_spawn_children = (
+ self._mp_context.get_start_method(allow_none=False) != "fork")
+
if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")
self._initializer = initializer
@@ -714,6 +718,8 @@ class ProcessPoolExecutor(_base.Executor):
def _start_executor_manager_thread(self):
if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known.
+ if not self._safe_to_dynamically_spawn_children: # ie, using fork.
+ self._launch_processes()
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
_threads_wakeups[self._executor_manager_thread] = \
@@ -726,15 +732,32 @@ class ProcessPoolExecutor(_base.Executor):
process_count = len(self._processes)
if process_count < self._max_workers:
- p = self._mp_context.Process(
- target=_process_worker,
- args=(self._call_queue,
- self._result_queue,
- self._initializer,
- self._initargs,
- self._max_tasks_per_child))
- p.start()
- self._processes[p.pid] = p
+ # Assertion disabled as this codepath is also used to replace a
+ # worker that unexpectedly dies, even when using the 'fork' start
+ # method. That means there is still a potential deadlock bug. If a
+ # 'fork' mp_context worker dies, we'll be forking a new one when
+ # we know a thread is running (self._executor_manager_thread).
+ #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
+ self._spawn_process()
+
+ def _launch_processes(self):
+ # https://github.com/python/cpython/issues/90622
+ assert not self._executor_manager_thread, (
+ 'Processes cannot be fork()ed after the thread has started, '
+ 'deadlock in the child processes could result.')
+ for _ in range(len(self._processes), self._max_workers):
+ self._spawn_process()
+
+ def _spawn_process(self):
+ p = self._mp_context.Process(
+ target=_process_worker,
+ args=(self._call_queue,
+ self._result_queue,
+ self._initializer,
+ self._initargs,
+ self._max_tasks_per_child))
+ p.start()
+ self._processes[p.pid] = p
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
@@ -755,7 +778,8 @@ class ProcessPoolExecutor(_base.Executor):
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()
- self._adjust_process_count()
+ if self._safe_to_dynamically_spawn_children:
+ self._adjust_process_count()
self._start_executor_manager_thread()
return f
submit.__doc__ = _base.Executor.submit.__doc__
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 4363e90..6f3b460 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -497,10 +497,16 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
lock.acquire()
mp_context = self.get_context()
+ if mp_context.get_start_method(allow_none=False) == "fork":
+ # fork pre-spawns, not on demand.
+ expected_num_processes = self.worker_count
+ else:
+ expected_num_processes = 3
+
sem = mp_context.Semaphore(0)
for _ in range(3):
self.executor.submit(acquire_lock, sem)
- self.assertEqual(len(self.executor._processes), 3)
+ self.assertEqual(len(self.executor._processes), expected_num_processes)
for _ in range(3):
sem.release()
processes = self.executor._processes
@@ -1021,6 +1027,8 @@ class ProcessPoolExecutorTest(ExecutorTest):
def test_idle_process_reuse_one(self):
executor = self.executor
assert executor._max_workers >= 4
+ if self.get_context().get_start_method(allow_none=False) == "fork":
+ raise unittest.SkipTest("Incompatible with the fork start method.")
executor.submit(mul, 21, 2).result()
executor.submit(mul, 6, 7).result()
executor.submit(mul, 3, 14).result()
@@ -1029,6 +1037,8 @@ class ProcessPoolExecutorTest(ExecutorTest):
def test_idle_process_reuse_multiple(self):
executor = self.executor
assert executor._max_workers <= 5
+ if self.get_context().get_start_method(allow_none=False) == "fork":
+ raise unittest.SkipTest("Incompatible with the fork start method.")
executor.submit(mul, 12, 7).result()
executor.submit(mul, 33, 25)
executor.submit(mul, 25, 26).result()
diff --git a/Misc/NEWS.d/next/Library/2022-04-15-22-07-36.gh-issue-90622.0C6l8h.rst b/Misc/NEWS.d/next/Library/2022-04-15-22-07-36.gh-issue-90622.0C6l8h.rst
new file mode 100644
index 0000000..5db0a1b
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-04-15-22-07-36.gh-issue-90622.0C6l8h.rst
@@ -0,0 +1,4 @@
+Worker processes for :class:`concurrent.futures.ProcessPoolExecutor` are no
+longer spawned on demand (a feature added in 3.9) when the multiprocessing
+context start method is ``"fork"`` as that can lead to deadlocks in the
+child processes due to a fork happening while threads are running.