diff options
Diffstat (limited to 'Lib/concurrent')
-rw-r--r-- | Lib/concurrent/futures/_base.py | 22 | ||||
-rw-r--r-- | Lib/concurrent/futures/process.py | 5 |
2 files changed, 16 insertions, 11 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 79b91d4..6cfded3 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -536,15 +536,19 @@ class Executor(object): fs = [self.submit(fn, *args) for args in zip(*iterables)] - try: - for future in fs: - if timeout is None: - yield future.result() - else: - yield future.result(end_time - time.time()) - finally: - for future in fs: - future.cancel() + # Yield must be hidden in closure so that the futures are submitted + # before the first iterator value is required. + def result_iterator(): + try: + for future in fs: + if timeout is None: + yield future.result() + else: + yield future.result(end_time - time.time()) + finally: + for future in fs: + future.cancel() + return result_iterator() def shutdown(self, wait=True): """Clean-up the resources associated with the Executor. diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index ff566d3..f0bf6d5 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -49,6 +49,7 @@ import atexit from concurrent.futures import _base import queue import multiprocessing +from multiprocessing.queues import SimpleQueue import threading import weakref @@ -204,7 +205,7 @@ def _queue_management_worker(executor_reference, work_ids_queue, call_queue) - result_item = result_queue.get(block=True) + result_item = result_queue.get() if result_item is not None: work_item = pending_work_items[result_item.work_id] del pending_work_items[result_item.work_id] @@ -284,7 +285,7 @@ class ProcessPoolExecutor(_base.Executor): # because futures in the call queue cannot be cancelled. self._call_queue = multiprocessing.Queue(self._max_workers + EXTRA_QUEUED_CALLS) - self._result_queue = multiprocessing.Queue() + self._result_queue = SimpleQueue() self._work_ids = queue.Queue() self._queue_management_thread = None self._processes = set() |