summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent/futures
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r--Lib/concurrent/futures/process.py69
-rw-r--r--Lib/concurrent/futures/thread.py12
2 files changed, 29 insertions, 52 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index a899d5f..36cd411 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -104,7 +104,7 @@ class _CallItem(object):
self.args = args
self.kwargs = kwargs
-def _process_worker(call_queue, result_queue, shutdown):
+def _process_worker(call_queue, result_queue):
"""Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a separate process.
@@ -118,24 +118,19 @@ def _process_worker(call_queue, result_queue, shutdown):
worker that it should exit when call_queue is empty.
"""
while True:
+ call_item = call_queue.get(block=True)
+ if call_item is None:
+ # Wake up queue management thread
+ result_queue.put(None)
+ return
try:
- call_item = call_queue.get(block=True)
- except queue.Empty:
- if shutdown.is_set():
- return
+ r = call_item.fn(*call_item.args, **call_item.kwargs)
+ except BaseException as e:
+ result_queue.put(_ResultItem(call_item.work_id,
+ exception=e))
else:
- if call_item is None:
- # Wake up queue management thread
- result_queue.put(None)
- return
- try:
- r = call_item.fn(*call_item.args, **call_item.kwargs)
- except BaseException as e:
- result_queue.put(_ResultItem(call_item.work_id,
- exception=e))
- else:
- result_queue.put(_ResultItem(call_item.work_id,
- result=r))
+ result_queue.put(_ResultItem(call_item.work_id,
+ result=r))
def _add_call_item_to_queue(pending_work_items,
work_ids,
@@ -179,8 +174,7 @@ def _queue_manangement_worker(executor_reference,
pending_work_items,
work_ids_queue,
call_queue,
- result_queue,
- shutdown_process_event):
+ result_queue):
"""Manages the communication between this process and the worker processes.
This function is run in a local thread.
@@ -198,9 +192,6 @@ def _queue_manangement_worker(executor_reference,
derived from _WorkItems for processing by the process workers.
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
- shutdown_process_event: A multiprocessing.Event used to signal the
- process workers that they should exit when their work queue is
- empty.
"""
nb_shutdown_processes = 0
def shutdown_one_process():
@@ -213,20 +204,16 @@ def _queue_manangement_worker(executor_reference,
work_ids_queue,
call_queue)
- try:
- result_item = result_queue.get(block=True)
- except queue.Empty:
- pass
- else:
- if result_item is not None:
- work_item = pending_work_items[result_item.work_id]
- del pending_work_items[result_item.work_id]
-
- if result_item.exception:
- work_item.future.set_exception(result_item.exception)
- else:
- work_item.future.set_result(result_item.result)
- continue
+ result_item = result_queue.get(block=True)
+ if result_item is not None:
+ work_item = pending_work_items[result_item.work_id]
+ del pending_work_items[result_item.work_id]
+
+ if result_item.exception:
+ work_item.future.set_exception(result_item.exception)
+ else:
+ work_item.future.set_result(result_item.result)
+ continue
# If we come here, we either got a timeout or were explicitly woken up.
# In either case, check whether we should start shutting down.
executor = executor_reference()
@@ -238,8 +225,6 @@ def _queue_manangement_worker(executor_reference,
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
- shutdown_process_event.set()
-
while nb_shutdown_processes < len(processes):
shutdown_one_process()
# If .join() is not called on the created processes then
@@ -306,7 +291,6 @@ class ProcessPoolExecutor(_base.Executor):
# Shutdown is a two-step process.
self._shutdown_thread = False
- self._shutdown_process_event = multiprocessing.Event()
self._shutdown_lock = threading.Lock()
self._queue_count = 0
self._pending_work_items = {}
@@ -324,8 +308,7 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items,
self._work_ids,
self._call_queue,
- self._result_queue,
- self._shutdown_process_event))
+ self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue
@@ -335,8 +318,7 @@ class ProcessPoolExecutor(_base.Executor):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
- self._result_queue,
- self._shutdown_process_event))
+ self._result_queue))
p.start()
self._processes.add(p)
@@ -372,7 +354,6 @@ class ProcessPoolExecutor(_base.Executor):
self._queue_management_thread = None
self._call_queue = None
self._result_queue = None
- self._shutdown_process_event = None
self._processes = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index 93b495f..fbac088 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -60,14 +60,10 @@ class _WorkItem(object):
def _worker(executor_reference, work_queue):
try:
while True:
- try:
- work_item = work_queue.get(block=True)
- except queue.Empty:
- pass
- else:
- if work_item is not None:
- work_item.run()
- continue
+ work_item = work_queue.get(block=True)
+ if work_item is not None:
+ work_item.run()
+ continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR