diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2014-10-04 18:20:10 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2014-10-04 18:20:10 (GMT) |
commit | 4aae276eca9a9213ad45158d30ae2f15843dd463 (patch) | |
tree | e3ec365b19f7eb88212595c3ea47f3362bfe75e3 /Lib/concurrent/futures | |
parent | e4f47088af0040d73449d0cb0fe1e6d863f3ad07 (diff) | |
download | cpython-4aae276eca9a9213ad45158d30ae2f15843dd463.zip cpython-4aae276eca9a9213ad45158d30ae2f15843dd463.tar.gz cpython-4aae276eca9a9213ad45158d30ae2f15843dd463.tar.bz2 |
Issue #11271: concurrent.futures.Executor.map() now takes a *chunksize*
argument to allow batching of tasks in child processes and improve
performance of ProcessPoolExecutor. Patch by Dan O'Reilly.
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r-- | Lib/concurrent/futures/_base.py | 6 | ||||
-rw-r--r-- | Lib/concurrent/futures/process.py | 51 |
2 files changed, 56 insertions, 1 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index c13b3b6..9e44713 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -520,7 +520,7 @@ class Executor(object): """ raise NotImplementedError() - def map(self, fn, *iterables, timeout=None): + def map(self, fn, *iterables, timeout=None, chunksize=1): """Returns a iterator equivalent to map(fn, iter). Args: @@ -528,6 +528,10 @@ class Executor(object): passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. + chunksize: The size of the chunks the iterable will be broken into + before being passed to a child process. This argument is only + used by ProcessPoolExecutor; it is ignored by + ThreadPoolExecutor. Returns: An iterator equivalent to: map(func, *iterables) but the calls may diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 1299390..fc64dbe 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -55,6 +55,8 @@ from multiprocessing import SimpleQueue from multiprocessing.connection import wait import threading import weakref +from functools import partial +import itertools # Workers are created as daemon threads and processes. This is done to allow the # interpreter to exit when there are still idle processes in a @@ -108,6 +110,26 @@ class _CallItem(object): self.args = args self.kwargs = kwargs +def _get_chunks(*iterables, chunksize): + """ Iterates over zip()ed iterables in chunks. """ + it = zip(*iterables) + while True: + chunk = tuple(itertools.islice(it, chunksize)) + if not chunk: + return + yield chunk + +def _process_chunk(fn, chunk): + """ Processes a chunk of an iterable passed to map. + + Runs the function passed to map() on a chunk of the + iterable passed to map. + + This function is run in a separate process. + + """ + return [fn(*args) for args in chunk] + def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. @@ -411,6 +433,35 @@ class ProcessPoolExecutor(_base.Executor): return f submit.__doc__ = _base.Executor.submit.__doc__ + def map(self, fn, *iterables, timeout=None, chunksize=1): + """Returns a iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + chunksize: If greater than one, the iterables will be chopped into + chunks of size chunksize and submitted to the process pool. + If set to one, the items in the list will be sent one at a time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + if chunksize < 1: + raise ValueError("chunksize must be >= 1.") + + results = super().map(partial(_process_chunk, fn), + _get_chunks(*iterables, chunksize=chunksize), + timeout=timeout) + return itertools.chain.from_iterable(results) + def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown_thread = True |