diff options
author | Logan Jones <loganasherjones@gmail.com> | 2021-11-20 20:19:41 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-20 20:19:41 (GMT) |
commit | fdc0e09c3316098b038996c428e88931f0a4fcdb (patch) | |
tree | 92967b2752a10aa9d28cd96aab2a9c9a22dd081c /Lib/concurrent/futures | |
parent | 123a3527ddd7774e8db325c778927e49172e01d4 (diff) | |
download | cpython-fdc0e09c3316098b038996c428e88931f0a4fcdb.zip cpython-fdc0e09c3316098b038996c428e88931f0a4fcdb.tar.gz cpython-fdc0e09c3316098b038996c428e88931f0a4fcdb.tar.bz2 |
bpo-44733: Add max_tasks_per_child to ProcessPoolExecutor (GH-27373)
Co-authored-by: Antoine Pitrou <antoine@python.org>
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r-- | Lib/concurrent/futures/process.py | 69 |
1 files changed, 55 insertions, 14 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 9904db7..19e93a6 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -141,10 +141,11 @@ class _WorkItem(object): self.kwargs = kwargs class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None): + def __init__(self, work_id, exception=None, result=None, exit_pid=None): self.work_id = work_id self.exception = exception self.result = result + self.exit_pid = exit_pid class _CallItem(object): def __init__(self, work_id, fn, args, kwargs): @@ -201,17 +202,19 @@ def _process_chunk(fn, chunk): return [fn(*args) for args in chunk] -def _sendback_result(result_queue, work_id, result=None, exception=None): +def _sendback_result(result_queue, work_id, result=None, exception=None, + exit_pid=None): """Safely send back the given result or exception""" try: result_queue.put(_ResultItem(work_id, result=result, - exception=exception)) + exception=exception, exit_pid=exit_pid)) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - result_queue.put(_ResultItem(work_id, exception=exc)) + result_queue.put(_ResultItem(work_id, exception=exc, + exit_pid=exit_pid)) -def _process_worker(call_queue, result_queue, initializer, initargs): +def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -232,25 +235,38 @@ def _process_worker(call_queue, result_queue, initializer, initargs): # The parent will notice that the process stopped and # mark the pool broken return + num_tasks = 0 + exit_pid = None while True: call_item = call_queue.get(block=True) if call_item is None: # Wake up queue management thread result_queue.put(os.getpid()) return + + if max_tasks is not None: + num_tasks += 1 + if num_tasks >= max_tasks: + exit_pid = os.getpid() + try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - _sendback_result(result_queue, call_item.work_id, exception=exc) + _sendback_result(result_queue, call_item.work_id, exception=exc, + exit_pid=exit_pid) else: - _sendback_result(result_queue, call_item.work_id, result=r) + _sendback_result(result_queue, call_item.work_id, result=r, + exit_pid=exit_pid) del r # Liberate the resource as soon as possible, to avoid holding onto # open files or shared memory that is not needed anymore del call_item + if exit_pid is not None: + return + class _ExecutorManagerThread(threading.Thread): """Manages the communication between this process and the worker processes. @@ -301,6 +317,10 @@ class _ExecutorManagerThread(threading.Thread): # A queue.Queue of work ids e.g. Queue([5, 6, ...]). self.work_ids_queue = executor._work_ids + # Maximum number of tasks a worker process can execute before + # exiting safely + self.max_tasks_per_child = executor._max_tasks_per_child + # A dict mapping work ids to _WorkItems e.g. # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} self.pending_work_items = executor._pending_work_items @@ -320,15 +340,23 @@ class _ExecutorManagerThread(threading.Thread): return if result_item is not None: self.process_result_item(result_item) + + process_exited = result_item.exit_pid is not None + if process_exited: + p = self.processes.pop(result_item.exit_pid) + p.join() + # Delete reference to result_item to avoid keeping references # while waiting on new results. del result_item - # attempt to increment idle process count - executor = self.executor_reference() - if executor is not None: - executor._idle_worker_semaphore.release() - del executor + if executor := self.executor_reference(): + if process_exited: + with self.shutdown_lock: + executor._adjust_process_count() + else: + executor._idle_worker_semaphore.release() + del executor if self.is_shutting_down(): self.flag_executor_shutting_down() @@ -578,7 +606,7 @@ class BrokenProcessPool(_base.BrokenExecutor): class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None, mp_context=None, - initializer=None, initargs=()): + initializer=None, initargs=(), *, max_tasks_per_child=None): """Initializes a new ProcessPoolExecutor instance. Args: @@ -589,6 +617,11 @@ class ProcessPoolExecutor(_base.Executor): object should provide SimpleQueue, Queue and Process. initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. + max_tasks_per_child: The maximum number of tasks a worker process can + complete before it will exit and be replaced with a fresh + worker process, to enable unused resources to be freed. The + default value is None, which means worker process will live + as long as the executor will live. """ _check_system_limits() @@ -616,6 +649,13 @@ class ProcessPoolExecutor(_base.Executor): self._initializer = initializer self._initargs = initargs + if max_tasks_per_child is not None: + if not isinstance(max_tasks_per_child, int): + raise TypeError("max_tasks_per_child must be an integer") + elif max_tasks_per_child <= 0: + raise ValueError("max_tasks_per_child must be >= 1") + self._max_tasks_per_child = max_tasks_per_child + # Management thread self._executor_manager_thread = None @@ -678,7 +718,8 @@ class ProcessPoolExecutor(_base.Executor): args=(self._call_queue, self._result_queue, self._initializer, - self._initargs)) + self._initargs, + self._max_tasks_per_child)) p.start() self._processes[p.pid] = p |