summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent')
-rw-r--r--Lib/concurrent/futures/_base.py22
-rw-r--r--Lib/concurrent/futures/process.py5
2 files changed, 16 insertions, 11 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index 79b91d4..6cfded3 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -536,15 +536,19 @@ class Executor(object):
fs = [self.submit(fn, *args) for args in zip(*iterables)]
- try:
- for future in fs:
- if timeout is None:
- yield future.result()
- else:
- yield future.result(end_time - time.time())
- finally:
- for future in fs:
- future.cancel()
+ # Yield must be hidden in closure so that the futures are submitted
+ # before the first iterator value is required.
+ def result_iterator():
+ try:
+ for future in fs:
+ if timeout is None:
+ yield future.result()
+ else:
+ yield future.result(end_time - time.time())
+ finally:
+ for future in fs:
+ future.cancel()
+ return result_iterator()
def shutdown(self, wait=True):
"""Clean-up the resources associated with the Executor.
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index ff566d3..f0bf6d5 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -49,6 +49,7 @@ import atexit
from concurrent.futures import _base
import queue
import multiprocessing
+from multiprocessing.queues import SimpleQueue
import threading
import weakref
@@ -204,7 +205,7 @@ def _queue_management_worker(executor_reference,
work_ids_queue,
call_queue)
- result_item = result_queue.get(block=True)
+ result_item = result_queue.get()
if result_item is not None:
work_item = pending_work_items[result_item.work_id]
del pending_work_items[result_item.work_id]
@@ -284,7 +285,7 @@ class ProcessPoolExecutor(_base.Executor):
# because futures in the call queue cannot be cancelled.
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
- self._result_queue = multiprocessing.Queue()
+ self._result_queue = SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
self._processes = set()