summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent/futures/process.py
diff options
context:
space:
mode:
authorAntoine Pitrou <pitrou@free.fr>2017-11-04 10:05:49 (GMT)
committerGitHub <noreply@github.com>2017-11-04 10:05:49 (GMT)
commit63ff4131af86e8a48cbedb9fbba95bd65ca90061 (patch)
treee6b205d0bc509e1be7d03a1d755f328f650f5ea1 /Lib/concurrent/futures/process.py
parentb838cc3ff4e039af949c6a19bd896e98e944dcbe (diff)
downloadcpython-63ff4131af86e8a48cbedb9fbba95bd65ca90061.zip
cpython-63ff4131af86e8a48cbedb9fbba95bd65ca90061.tar.gz
cpython-63ff4131af86e8a48cbedb9fbba95bd65ca90061.tar.bz2
bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor (#4241)
* bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor * Fix docstring
Diffstat (limited to 'Lib/concurrent/futures/process.py')
-rw-r--r--Lib/concurrent/futures/process.py36
1 files changed, 29 insertions, 7 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 67ebbf5..35af65d 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -131,6 +131,7 @@ class _CallItem(object):
self.args = args
self.kwargs = kwargs
+
def _get_chunks(*iterables, chunksize):
""" Iterates over zip()ed iterables in chunks. """
it = zip(*iterables)
@@ -151,7 +152,7 @@ def _process_chunk(fn, chunk):
"""
return [fn(*args) for args in chunk]
-def _process_worker(call_queue, result_queue):
+def _process_worker(call_queue, result_queue, initializer, initargs):
"""Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a separate process.
@@ -161,7 +162,17 @@ def _process_worker(call_queue, result_queue):
evaluated by the worker.
result_queue: A ctx.Queue of _ResultItems that will written
to by the worker.
+ initializer: A callable initializer, or None
+ initargs: A tuple of args for the initializer
"""
+ if initializer is not None:
+ try:
+ initializer(*initargs)
+ except BaseException:
+ _base.LOGGER.critical('Exception in initializer:', exc_info=True)
+ # The parent will notice that the process stopped and
+ # mark the pool broken
+ return
while True:
call_item = call_queue.get(block=True)
if call_item is None:
@@ -277,7 +288,9 @@ def _queue_management_worker(executor_reference,
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
- executor._broken = True
+ executor._broken = ('A child process terminated '
+ 'abruptly, the process pool is not '
+ 'usable anymore')
executor._shutdown_thread = True
executor = None
# All futures in flight must be marked failed
@@ -372,7 +385,7 @@ def _chain_from_iterable_of_lists(iterable):
yield element.pop()
-class BrokenProcessPool(RuntimeError):
+class BrokenProcessPool(_base.BrokenExecutor):
"""
Raised when a process in a ProcessPoolExecutor terminated abruptly
while a future was in the running state.
@@ -380,7 +393,8 @@ class BrokenProcessPool(RuntimeError):
class ProcessPoolExecutor(_base.Executor):
- def __init__(self, max_workers=None, mp_context=None):
+ def __init__(self, max_workers=None, mp_context=None,
+ initializer=None, initargs=()):
"""Initializes a new ProcessPoolExecutor instance.
Args:
@@ -389,6 +403,8 @@ class ProcessPoolExecutor(_base.Executor):
worker processes will be created as the machine has processors.
mp_context: A multiprocessing context to launch the workers. This
object should provide SimpleQueue, Queue and Process.
+ initializer: An callable used to initialize worker processes.
+ initargs: A tuple of arguments to pass to the initializer.
"""
_check_system_limits()
@@ -403,6 +419,11 @@ class ProcessPoolExecutor(_base.Executor):
mp_context = mp.get_context()
self._mp_context = mp_context
+ if initializer is not None and not callable(initializer):
+ raise TypeError("initializer must be a callable")
+ self._initializer = initializer
+ self._initargs = initargs
+
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
@@ -450,15 +471,16 @@ class ProcessPoolExecutor(_base.Executor):
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
- self._result_queue))
+ self._result_queue,
+ self._initializer,
+ self._initargs))
p.start()
self._processes[p.pid] = p
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
- raise BrokenProcessPool('A child process terminated '
- 'abruptly, the process pool is not usable anymore')
+ raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')