summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent/futures
diff options
context:
space:
mode:
authorLogan Jones <loganasherjones@gmail.com>2021-11-20 20:19:41 (GMT)
committerGitHub <noreply@github.com>2021-11-20 20:19:41 (GMT)
commitfdc0e09c3316098b038996c428e88931f0a4fcdb (patch)
tree92967b2752a10aa9d28cd96aab2a9c9a22dd081c /Lib/concurrent/futures
parent123a3527ddd7774e8db325c778927e49172e01d4 (diff)
downloadcpython-fdc0e09c3316098b038996c428e88931f0a4fcdb.zip
cpython-fdc0e09c3316098b038996c428e88931f0a4fcdb.tar.gz
cpython-fdc0e09c3316098b038996c428e88931f0a4fcdb.tar.bz2
bpo-44733: Add max_tasks_per_child to ProcessPoolExecutor (GH-27373)
Co-authored-by: Antoine Pitrou <antoine@python.org>
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r--Lib/concurrent/futures/process.py69
1 files changed, 55 insertions, 14 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 9904db7..19e93a6 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -141,10 +141,11 @@ class _WorkItem(object):
self.kwargs = kwargs
class _ResultItem(object):
- def __init__(self, work_id, exception=None, result=None):
+ def __init__(self, work_id, exception=None, result=None, exit_pid=None):
self.work_id = work_id
self.exception = exception
self.result = result
+ self.exit_pid = exit_pid
class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
@@ -201,17 +202,19 @@ def _process_chunk(fn, chunk):
return [fn(*args) for args in chunk]
-def _sendback_result(result_queue, work_id, result=None, exception=None):
+def _sendback_result(result_queue, work_id, result=None, exception=None,
+ exit_pid=None):
"""Safely send back the given result or exception"""
try:
result_queue.put(_ResultItem(work_id, result=result,
- exception=exception))
+ exception=exception, exit_pid=exit_pid))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
- result_queue.put(_ResultItem(work_id, exception=exc))
+ result_queue.put(_ResultItem(work_id, exception=exc,
+ exit_pid=exit_pid))
-def _process_worker(call_queue, result_queue, initializer, initargs):
+def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
"""Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a separate process.
@@ -232,25 +235,38 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
# The parent will notice that the process stopped and
# mark the pool broken
return
+ num_tasks = 0
+ exit_pid = None
while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(os.getpid())
return
+
+ if max_tasks is not None:
+ num_tasks += 1
+ if num_tasks >= max_tasks:
+ exit_pid = os.getpid()
+
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
- _sendback_result(result_queue, call_item.work_id, exception=exc)
+ _sendback_result(result_queue, call_item.work_id, exception=exc,
+ exit_pid=exit_pid)
else:
- _sendback_result(result_queue, call_item.work_id, result=r)
+ _sendback_result(result_queue, call_item.work_id, result=r,
+ exit_pid=exit_pid)
del r
# Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
del call_item
+ if exit_pid is not None:
+ return
+
class _ExecutorManagerThread(threading.Thread):
"""Manages the communication between this process and the worker processes.
@@ -301,6 +317,10 @@ class _ExecutorManagerThread(threading.Thread):
# A queue.Queue of work ids e.g. Queue([5, 6, ...]).
self.work_ids_queue = executor._work_ids
+ # Maximum number of tasks a worker process can execute before
+ # exiting safely
+ self.max_tasks_per_child = executor._max_tasks_per_child
+
# A dict mapping work ids to _WorkItems e.g.
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
self.pending_work_items = executor._pending_work_items
@@ -320,15 +340,23 @@ class _ExecutorManagerThread(threading.Thread):
return
if result_item is not None:
self.process_result_item(result_item)
+
+ process_exited = result_item.exit_pid is not None
+ if process_exited:
+ p = self.processes.pop(result_item.exit_pid)
+ p.join()
+
# Delete reference to result_item to avoid keeping references
# while waiting on new results.
del result_item
- # attempt to increment idle process count
- executor = self.executor_reference()
- if executor is not None:
- executor._idle_worker_semaphore.release()
- del executor
+ if executor := self.executor_reference():
+ if process_exited:
+ with self.shutdown_lock:
+ executor._adjust_process_count()
+ else:
+ executor._idle_worker_semaphore.release()
+ del executor
if self.is_shutting_down():
self.flag_executor_shutting_down()
@@ -578,7 +606,7 @@ class BrokenProcessPool(_base.BrokenExecutor):
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
- initializer=None, initargs=()):
+ initializer=None, initargs=(), *, max_tasks_per_child=None):
"""Initializes a new ProcessPoolExecutor instance.
Args:
@@ -589,6 +617,11 @@ class ProcessPoolExecutor(_base.Executor):
object should provide SimpleQueue, Queue and Process.
initializer: A callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
+ max_tasks_per_child: The maximum number of tasks a worker process can
+ complete before it will exit and be replaced with a fresh
+ worker process, to enable unused resources to be freed. The
+ default value is None, which means worker process will live
+ as long as the executor will live.
"""
_check_system_limits()
@@ -616,6 +649,13 @@ class ProcessPoolExecutor(_base.Executor):
self._initializer = initializer
self._initargs = initargs
+ if max_tasks_per_child is not None:
+ if not isinstance(max_tasks_per_child, int):
+ raise TypeError("max_tasks_per_child must be an integer")
+ elif max_tasks_per_child <= 0:
+ raise ValueError("max_tasks_per_child must be >= 1")
+ self._max_tasks_per_child = max_tasks_per_child
+
# Management thread
self._executor_manager_thread = None
@@ -678,7 +718,8 @@ class ProcessPoolExecutor(_base.Executor):
args=(self._call_queue,
self._result_queue,
self._initializer,
- self._initargs))
+ self._initargs,
+ self._max_tasks_per_child))
p.start()
self._processes[p.pid] = p