summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent/futures
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r--Lib/concurrent/futures/process.py105
1 files changed, 74 insertions, 31 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index f0bf6d5..c2331e7 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -46,10 +46,11 @@ Process #1..n:
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import atexit
+import os
from concurrent.futures import _base
import queue
import multiprocessing
-from multiprocessing.queues import SimpleQueue
+from multiprocessing.queues import SimpleQueue, SentinelReady
import threading
import weakref
@@ -122,7 +123,7 @@ def _process_worker(call_queue, result_queue):
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
- result_queue.put(None)
+ result_queue.put(os.getpid())
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
@@ -194,29 +195,63 @@ def _queue_management_worker(executor_reference,
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
"""
- nb_shutdown_processes = 0
- def shutdown_one_process():
- """Tell a worker to terminate, which will in turn wake us again"""
- nonlocal nb_shutdown_processes
- call_queue.put(None)
- nb_shutdown_processes += 1
+
+ def shutdown_worker():
+ # This is an upper bound
+ nb_children_alive = sum(p.is_alive() for p in processes.values())
+ for i in range(0, nb_children_alive):
+ call_queue.put(None)
+ # If .join() is not called on the created processes then
+ # some multiprocessing.Queue methods may deadlock on Mac OS
+ # X.
+ for p in processes.values():
+ p.join()
+
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
- result_item = result_queue.get()
- if result_item is not None:
- work_item = pending_work_items[result_item.work_id]
- del pending_work_items[result_item.work_id]
-
- if result_item.exception:
- work_item.future.set_exception(result_item.exception)
- else:
- work_item.future.set_result(result_item.result)
- continue
- # If we come here, we either got a timeout or were explicitly woken up.
- # In either case, check whether we should start shutting down.
+ sentinels = [p.sentinel for p in processes.values()]
+ assert sentinels
+ try:
+ result_item = result_queue.get(sentinels=sentinels)
+ except SentinelReady as e:
+ # Mark the process pool broken so that submits fail right now.
+ executor = executor_reference()
+ if executor is not None:
+ executor._broken = True
+ executor._shutdown_thread = True
+ del executor
+ # All futures in flight must be marked failed
+ for work_id, work_item in pending_work_items.items():
+ work_item.future.set_exception(
+ BrokenProcessPool(
+ "A process in the process pool was "
+ "terminated abruptly while the future was "
+ "running or pending."
+ ))
+ pending_work_items.clear()
+ # Terminate remaining workers forcibly: the queues or their
+ # locks may be in a dirty state and block forever.
+ for p in processes.values():
+ p.terminate()
+ for p in processes.values():
+ p.join()
+ return
+ if isinstance(result_item, int):
+ # Clean shutdown of a worker using its PID
+ # (avoids marking the executor broken)
+ del processes[result_item]
+ elif result_item is not None:
+ work_item = pending_work_items.pop(result_item.work_id, None)
+ # work_item can be None if another process terminated (see above)
+ if work_item is not None:
+ if result_item.exception:
+ work_item.future.set_exception(result_item.exception)
+ else:
+ work_item.future.set_result(result_item.result)
+ # Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
@@ -226,17 +261,11 @@ def _queue_management_worker(executor_reference,
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
- while nb_shutdown_processes < len(processes):
- shutdown_one_process()
- # If .join() is not called on the created processes then
- # some multiprocessing.Queue methods may deadlock on Mac OS
- # X.
- for p in processes:
- p.join()
+ shutdown_worker()
return
else:
# Start shutting down by telling a process it can exit.
- shutdown_one_process()
+ call_queue.put(None)
del executor
_system_limits_checked = False
@@ -264,6 +293,14 @@ def _check_system_limits():
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
raise NotImplementedError(_system_limited)
+
+class BrokenProcessPool(RuntimeError):
+ """
+ Raised when a process in a ProcessPoolExecutor terminated abruptly
+ while a future was in the running state.
+ """
+
+
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance.
@@ -288,11 +325,13 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue = SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
- self._processes = set()
+ # Map of pids to processes
+ self._processes = {}
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
+ self._broken = False
self._queue_count = 0
self._pending_work_items = {}
@@ -302,6 +341,8 @@ class ProcessPoolExecutor(_base.Executor):
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
+ # Start the processes so that their sentinels are known.
+ self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
@@ -321,10 +362,13 @@ class ProcessPoolExecutor(_base.Executor):
args=(self._call_queue,
self._result_queue))
p.start()
- self._processes.add(p)
+ self._processes[p.pid] = p
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
+ if self._broken:
+ raise BrokenProcessPool('A child process terminated '
+ 'abruptly, the process pool is not usable anymore')
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
@@ -338,7 +382,6 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue.put(None)
self._start_queue_management_thread()
- self._adjust_process_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__