diff options
author | Gregory P. Smith <greg@krypto.org> | 2022-05-06 07:04:53 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-06 07:04:53 (GMT) |
commit | fa4f0a134e7911b2494ea9866c8a49ff446f9d6c (patch) | |
tree | 4890fee63ed702f2b7d05f554d56c8757f6c0c4e | |
parent | 2b563f1ad31af0bb0a9947e6f1f76e58dbf170f0 (diff) | |
download | cpython-fa4f0a134e7911b2494ea9866c8a49ff446f9d6c.zip cpython-fa4f0a134e7911b2494ea9866c8a49ff446f9d6c.tar.gz cpython-fa4f0a134e7911b2494ea9866c8a49ff446f9d6c.tar.bz2 |
gh-90622: Prevent max_tasks_per_child use with a fork mp_context. (#91587)
Prevent `max_tasks_per_child` use with a "fork" mp_context to avoid deadlocks.
Also defaults to "spawn" when no mp_context is supplied for safe convenience.
-rw-r--r-- | Doc/library/concurrent.futures.rst | 7 | ||||
-rw-r--r-- | Lib/concurrent/futures/process.py | 24 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 18 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2022-04-15-18-32-38.gh-issue-90622.WQjFDe.rst | 5 |
4 files changed, 43 insertions, 11 deletions
diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 9592808..99703ff 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -254,8 +254,11 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. *max_tasks_per_child* is an optional argument that specifies the maximum number of tasks a single process can execute before it will exit and be - replaced with a fresh worker process. The default *max_tasks_per_child* is - ``None`` which means worker processes will live as long as the pool. + replaced with a fresh worker process. By default *max_tasks_per_child* is + ``None`` which means worker processes will live as long as the pool. When + a max is specified, the "spawn" multiprocessing start method will be used by + default in absense of a *mp_context* parameter. This feature is incompatible + with the "fork" start method. .. versionchanged:: 3.3 When one of the worker processes terminates abruptly, a diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 0d49379c..821034d 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -617,14 +617,16 @@ class ProcessPoolExecutor(_base.Executor): execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. mp_context: A multiprocessing context to launch the workers. This - object should provide SimpleQueue, Queue and Process. + object should provide SimpleQueue, Queue and Process. Useful + to allow specific multiprocessing start methods. initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. - max_tasks_per_child: The maximum number of tasks a worker process can - complete before it will exit and be replaced with a fresh - worker process, to enable unused resources to be freed. The - default value is None, which means worker process will live - as long as the executor will live. + max_tasks_per_child: The maximum number of tasks a worker process + can complete before it will exit and be replaced with a fresh + worker process. The default of None means worker process will + live as long as the executor. Requires a non-'fork' mp_context + start method. When given, we default to using 'spawn' if no + mp_context is supplied. """ _check_system_limits() @@ -644,7 +646,10 @@ class ProcessPoolExecutor(_base.Executor): self._max_workers = max_workers if mp_context is None: - mp_context = mp.get_context() + if max_tasks_per_child is not None: + mp_context = mp.get_context("spawn") + else: + mp_context = mp.get_context() self._mp_context = mp_context if initializer is not None and not callable(initializer): @@ -657,6 +662,11 @@ class ProcessPoolExecutor(_base.Executor): raise TypeError("max_tasks_per_child must be an integer") elif max_tasks_per_child <= 0: raise ValueError("max_tasks_per_child must be >= 1") + if self._mp_context.get_start_method(allow_none=False) == "fork": + # https://github.com/python/cpython/issues/90622 + raise ValueError("max_tasks_per_child is incompatible with" + " the 'fork' multiprocessing start method;" + " supply a different mp_context.") self._max_tasks_per_child = max_tasks_per_child # Management thread diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 978a748..4363e90 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1039,10 +1039,15 @@ class ProcessPoolExecutorTest(ExecutorTest): executor.shutdown() def test_max_tasks_per_child(self): + context = self.get_context() + if context.get_start_method(allow_none=False) == "fork": + with self.assertRaises(ValueError): + self.executor_type(1, mp_context=context, max_tasks_per_child=3) + return # not using self.executor as we need to control construction. # arguably this could go in another class w/o that mixin. executor = self.executor_type( - 1, mp_context=self.get_context(), max_tasks_per_child=3) + 1, mp_context=context, max_tasks_per_child=3) f1 = executor.submit(os.getpid) original_pid = f1.result() # The worker pid remains the same as the worker could be reused @@ -1061,11 +1066,20 @@ class ProcessPoolExecutorTest(ExecutorTest): executor.shutdown() + def test_max_tasks_per_child_defaults_to_spawn_context(self): + # not using self.executor as we need to control construction. + # arguably this could go in another class w/o that mixin. + executor = self.executor_type(1, max_tasks_per_child=3) + self.assertEqual(executor._mp_context.get_start_method(), "spawn") + def test_max_tasks_early_shutdown(self): + context = self.get_context() + if context.get_start_method(allow_none=False) == "fork": + raise unittest.SkipTest("Incompatible with the fork start method.") # not using self.executor as we need to control construction. # arguably this could go in another class w/o that mixin. executor = self.executor_type( - 3, mp_context=self.get_context(), max_tasks_per_child=1) + 3, mp_context=context, max_tasks_per_child=1) futures = [] for i in range(6): futures.append(executor.submit(mul, i, i)) diff --git a/Misc/NEWS.d/next/Library/2022-04-15-18-32-38.gh-issue-90622.WQjFDe.rst b/Misc/NEWS.d/next/Library/2022-04-15-18-32-38.gh-issue-90622.WQjFDe.rst new file mode 100644 index 0000000..4144e4c --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-04-15-18-32-38.gh-issue-90622.WQjFDe.rst @@ -0,0 +1,5 @@ +In ``concurrent.futures.process.ProcessPoolExecutor`` disallow the "fork" +multiprocessing start method when the new ``max_tasks_per_child`` feature is +used as the mix of threads+fork can hang the child processes. Default to +using the safe "spawn" start method in that circumstance if no +``mp_context`` was supplied. |