diff options
author | Antoine Pitrou <pitrou@free.fr> | 2017-11-04 10:05:49 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-04 10:05:49 (GMT) |
commit | 63ff4131af86e8a48cbedb9fbba95bd65ca90061 (patch) | |
tree | e6b205d0bc509e1be7d03a1d755f328f650f5ea1 /Lib/concurrent/futures/process.py | |
parent | b838cc3ff4e039af949c6a19bd896e98e944dcbe (diff) | |
download | cpython-63ff4131af86e8a48cbedb9fbba95bd65ca90061.zip cpython-63ff4131af86e8a48cbedb9fbba95bd65ca90061.tar.gz cpython-63ff4131af86e8a48cbedb9fbba95bd65ca90061.tar.bz2 |
bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor (#4241)
* bpo-21423: Add an initializer argument to {Process,Thread}PoolExecutor
* Fix docstring
Diffstat (limited to 'Lib/concurrent/futures/process.py')
-rw-r--r-- | Lib/concurrent/futures/process.py | 36 |
1 files changed, 29 insertions, 7 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 67ebbf5..35af65d 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -131,6 +131,7 @@ class _CallItem(object): self.args = args self.kwargs = kwargs + def _get_chunks(*iterables, chunksize): """ Iterates over zip()ed iterables in chunks. """ it = zip(*iterables) @@ -151,7 +152,7 @@ def _process_chunk(fn, chunk): """ return [fn(*args) for args in chunk] -def _process_worker(call_queue, result_queue): +def _process_worker(call_queue, result_queue, initializer, initargs): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -161,7 +162,17 @@ def _process_worker(call_queue, result_queue): evaluated by the worker. result_queue: A ctx.Queue of _ResultItems that will written to by the worker. + initializer: A callable initializer, or None + initargs: A tuple of args for the initializer """ + if initializer is not None: + try: + initializer(*initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + # The parent will notice that the process stopped and + # mark the pool broken + return while True: call_item = call_queue.get(block=True) if call_item is None: @@ -277,7 +288,9 @@ def _queue_management_worker(executor_reference, # Mark the process pool broken so that submits fail right now. executor = executor_reference() if executor is not None: - executor._broken = True + executor._broken = ('A child process terminated ' + 'abruptly, the process pool is not ' + 'usable anymore') executor._shutdown_thread = True executor = None # All futures in flight must be marked failed @@ -372,7 +385,7 @@ def _chain_from_iterable_of_lists(iterable): yield element.pop() -class BrokenProcessPool(RuntimeError): +class BrokenProcessPool(_base.BrokenExecutor): """ Raised when a process in a ProcessPoolExecutor terminated abruptly while a future was in the running state. @@ -380,7 +393,8 @@ class BrokenProcessPool(RuntimeError): class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None, mp_context=None): + def __init__(self, max_workers=None, mp_context=None, + initializer=None, initargs=()): """Initializes a new ProcessPoolExecutor instance. Args: @@ -389,6 +403,8 @@ class ProcessPoolExecutor(_base.Executor): worker processes will be created as the machine has processors. mp_context: A multiprocessing context to launch the workers. This object should provide SimpleQueue, Queue and Process. + initializer: An callable used to initialize worker processes. + initargs: A tuple of arguments to pass to the initializer. """ _check_system_limits() @@ -403,6 +419,11 @@ class ProcessPoolExecutor(_base.Executor): mp_context = mp.get_context() self._mp_context = mp_context + if initializer is not None and not callable(initializer): + raise TypeError("initializer must be a callable") + self._initializer = initializer + self._initargs = initargs + # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big # because futures in the call queue cannot be cancelled. @@ -450,15 +471,16 @@ class ProcessPoolExecutor(_base.Executor): p = self._mp_context.Process( target=_process_worker, args=(self._call_queue, - self._result_queue)) + self._result_queue, + self._initializer, + self._initargs)) p.start() self._processes[p.pid] = p def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._broken: - raise BrokenProcessPool('A child process terminated ' - 'abruptly, the process pool is not usable anymore') + raise BrokenProcessPool(self._broken) if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') |