diff options
author | Logan Jones <loganasherjones@gmail.com> | 2021-11-20 20:19:41 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-20 20:19:41 (GMT) |
commit | fdc0e09c3316098b038996c428e88931f0a4fcdb (patch) | |
tree | 92967b2752a10aa9d28cd96aab2a9c9a22dd081c | |
parent | 123a3527ddd7774e8db325c778927e49172e01d4 (diff) | |
download | cpython-fdc0e09c3316098b038996c428e88931f0a4fcdb.zip cpython-fdc0e09c3316098b038996c428e88931f0a4fcdb.tar.gz cpython-fdc0e09c3316098b038996c428e88931f0a4fcdb.tar.bz2 |
bpo-44733: Add max_tasks_per_child to ProcessPoolExecutor (GH-27373)
Co-authored-by: Antoine Pitrou <antoine@python.org>
-rw-r--r-- | Doc/library/concurrent.futures.rst | 11 | ||||
-rw-r--r-- | Lib/concurrent/futures/process.py | 69 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 31 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst | 3 |
4 files changed, 98 insertions, 16 deletions
diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 897efc2..b4213b4 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -231,7 +231,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter. Calling :class:`Executor` or :class:`Future` methods from a callable submitted to a :class:`ProcessPoolExecutor` will result in deadlock. -.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=()) +.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None) An :class:`Executor` subclass that executes calls asynchronously using a pool of at most *max_workers* processes. If *max_workers* is ``None`` or not @@ -252,6 +252,11 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`, as well as any attempt to submit more jobs to the pool. + *max_tasks_per_child* is an optional argument that specifies the maximum + number of tasks a single process can execute before it will exit and be + replaced with a fresh worker process. The default *max_tasks_per_child* is + ``None`` which means worker processes will live as long as the pool. + .. versionchanged:: 3.3 When one of the worker processes terminates abruptly, a :exc:`BrokenProcessPool` error is now raised. Previously, behaviour @@ -264,6 +269,10 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. Added the *initializer* and *initargs* arguments. + .. versionchanged:: 3.11 + The *max_tasks_per_child* argument was added to allow users to + control the lifetime of workers in the pool. + .. _processpoolexecutor-example: diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 9904db7..19e93a6 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -141,10 +141,11 @@ class _WorkItem(object): self.kwargs = kwargs class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None): + def __init__(self, work_id, exception=None, result=None, exit_pid=None): self.work_id = work_id self.exception = exception self.result = result + self.exit_pid = exit_pid class _CallItem(object): def __init__(self, work_id, fn, args, kwargs): @@ -201,17 +202,19 @@ def _process_chunk(fn, chunk): return [fn(*args) for args in chunk] -def _sendback_result(result_queue, work_id, result=None, exception=None): +def _sendback_result(result_queue, work_id, result=None, exception=None, + exit_pid=None): """Safely send back the given result or exception""" try: result_queue.put(_ResultItem(work_id, result=result, - exception=exception)) + exception=exception, exit_pid=exit_pid)) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - result_queue.put(_ResultItem(work_id, exception=exc)) + result_queue.put(_ResultItem(work_id, exception=exc, + exit_pid=exit_pid)) -def _process_worker(call_queue, result_queue, initializer, initargs): +def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -232,25 +235,38 @@ def _process_worker(call_queue, result_queue, initializer, initargs): # The parent will notice that the process stopped and # mark the pool broken return + num_tasks = 0 + exit_pid = None while True: call_item = call_queue.get(block=True) if call_item is None: # Wake up queue management thread result_queue.put(os.getpid()) return + + if max_tasks is not None: + num_tasks += 1 + if num_tasks >= max_tasks: + exit_pid = os.getpid() + try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - _sendback_result(result_queue, call_item.work_id, exception=exc) + _sendback_result(result_queue, call_item.work_id, exception=exc, + exit_pid=exit_pid) else: - _sendback_result(result_queue, call_item.work_id, result=r) + _sendback_result(result_queue, call_item.work_id, result=r, + exit_pid=exit_pid) del r # Liberate the resource as soon as possible, to avoid holding onto # open files or shared memory that is not needed anymore del call_item + if exit_pid is not None: + return + class _ExecutorManagerThread(threading.Thread): """Manages the communication between this process and the worker processes. @@ -301,6 +317,10 @@ class _ExecutorManagerThread(threading.Thread): # A queue.Queue of work ids e.g. Queue([5, 6, ...]). self.work_ids_queue = executor._work_ids + # Maximum number of tasks a worker process can execute before + # exiting safely + self.max_tasks_per_child = executor._max_tasks_per_child + # A dict mapping work ids to _WorkItems e.g. # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} self.pending_work_items = executor._pending_work_items @@ -320,15 +340,23 @@ class _ExecutorManagerThread(threading.Thread): return if result_item is not None: self.process_result_item(result_item) + + process_exited = result_item.exit_pid is not None + if process_exited: + p = self.processes.pop(result_item.exit_pid) + p.join() + # Delete reference to result_item to avoid keeping references # while waiting on new results. del result_item - # attempt to increment idle process count - executor = self.executor_reference() - if executor is not None: - executor._idle_worker_semaphore.release() - del executor + if executor := self.executor_reference(): + if process_exited: + with self.shutdown_lock: + executor._adjust_process_count() + else: + executor._idle_worker_semaphore.release() + del executor if self.is_shutting_down(): self.flag_executor_shutting_down() @@ -578,7 +606,7 @@ class BrokenProcessPool(_base.BrokenExecutor): class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None, mp_context=None, - initializer=None, initargs=()): + initializer=None, initargs=(), *, max_tasks_per_child=None): """Initializes a new ProcessPoolExecutor instance. Args: @@ -589,6 +617,11 @@ class ProcessPoolExecutor(_base.Executor): object should provide SimpleQueue, Queue and Process. initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. + max_tasks_per_child: The maximum number of tasks a worker process can + complete before it will exit and be replaced with a fresh + worker process, to enable unused resources to be freed. The + default value is None, which means worker process will live + as long as the executor will live. """ _check_system_limits() @@ -616,6 +649,13 @@ class ProcessPoolExecutor(_base.Executor): self._initializer = initializer self._initargs = initargs + if max_tasks_per_child is not None: + if not isinstance(max_tasks_per_child, int): + raise TypeError("max_tasks_per_child must be an integer") + elif max_tasks_per_child <= 0: + raise ValueError("max_tasks_per_child must be >= 1") + self._max_tasks_per_child = max_tasks_per_child + # Management thread self._executor_manager_thread = None @@ -678,7 +718,8 @@ class ProcessPoolExecutor(_base.Executor): args=(self._call_queue, self._result_queue, self._initializer, - self._initargs)) + self._initargs, + self._max_tasks_per_child)) p.start() self._processes[p.pid] = p diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 84209ca..bbb6aa1 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -49,7 +49,6 @@ SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) INITIALIZER_STATUS = 'uninitialized' - def mul(x, y): return x * y @@ -1038,6 +1037,36 @@ class ProcessPoolExecutorTest(ExecutorTest): self.assertLessEqual(len(executor._processes), 2) executor.shutdown() + def test_max_tasks_per_child(self): + executor = self.executor_type(1, max_tasks_per_child=3) + f1 = executor.submit(os.getpid) + original_pid = f1.result() + # The worker pid remains the same as the worker could be reused + f2 = executor.submit(os.getpid) + self.assertEqual(f2.result(), original_pid) + self.assertEqual(len(executor._processes), 1) + f3 = executor.submit(os.getpid) + self.assertEqual(f3.result(), original_pid) + + # A new worker is spawned, with a statistically different pid, + # while the previous was reaped. + f4 = executor.submit(os.getpid) + new_pid = f4.result() + self.assertNotEqual(original_pid, new_pid) + self.assertEqual(len(executor._processes), 1) + + executor.shutdown() + + def test_max_tasks_early_shutdown(self): + executor = self.executor_type(3, max_tasks_per_child=1) + futures = [] + for i in range(6): + futures.append(executor.submit(mul, i, i)) + executor.shutdown() + for i, future in enumerate(futures): + self.assertEqual(future.result(), mul(i, i)) + + create_executor_tests(ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, diff --git a/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst b/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst new file mode 100644 index 0000000..666b5f7 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst @@ -0,0 +1,3 @@ +Add ``max_tasks_per_child`` to :class:`concurrent.futures.ProcessPoolExecutor`. +This allows users to specify the maximum number of tasks a single process +should execute before the process needs to be restarted. |