diff options
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r-- | Lib/concurrent/futures/_base.py | 29 | ||||
-rw-r--r-- | Lib/concurrent/futures/process.py | 136 | ||||
-rw-r--r-- | Lib/concurrent/futures/thread.py | 2 |
3 files changed, 118 insertions, 49 deletions
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 7929c3c..ca3aebd 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -4,7 +4,6 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import collections -import functools import logging import threading import time @@ -333,7 +332,7 @@ class Future(object): return True def cancelled(self): - """Return True if the future has cancelled.""" + """Return True if the future was cancelled.""" with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] @@ -471,8 +470,8 @@ class Future(object): return True else: LOGGER.critical('Future %s in unexpected state: %s', - id(self.future), - self.future._state) + id(self), + self._state) raise RuntimeError('Future in unexpected state') def set_result(self, result): @@ -538,15 +537,19 @@ class Executor(object): fs = [self.submit(fn, *args) for args in zip(*iterables)] - try: - for future in fs: - if timeout is None: - yield future.result() - else: - yield future.result(end_time - time.time()) - finally: - for future in fs: - future.cancel() + # Yield must be hidden in closure so that the futures are submitted + # before the first iterator value is required. + def result_iterator(): + try: + for future in fs: + if timeout is None: + yield future.result() + else: + yield future.result(end_time - time.time()) + finally: + for future in fs: + future.cancel() + return result_iterator() def shutdown(self, wait=True): """Clean-up the resources associated with the Executor. diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index d3bbe2c..04238a7 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -46,9 +46,12 @@ Process #1..n: __author__ = 'Brian Quinlan (brian@sweetapp.com)' import atexit +import os from concurrent.futures import _base import queue import multiprocessing +from multiprocessing.queues import SimpleQueue, Full +from multiprocessing.connection import wait import threading import weakref @@ -121,7 +124,7 @@ def _process_worker(call_queue, result_queue): call_item = call_queue.get(block=True) if call_item is None: # Wake up queue management thread - result_queue.put(None) + result_queue.put(os.getpid()) return try: r = call_item.fn(*call_item.args, **call_item.kwargs) @@ -193,46 +196,92 @@ def _queue_management_worker(executor_reference, result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. """ - nb_shutdown_processes = 0 - def shutdown_one_process(): - """Tell a worker to terminate, which will in turn wake us again""" - nonlocal nb_shutdown_processes - call_queue.put(None) - nb_shutdown_processes += 1 + executor = None + + def shutting_down(): + return _shutdown or executor is None or executor._shutdown_thread + + def shutdown_worker(): + # This is an upper bound + nb_children_alive = sum(p.is_alive() for p in processes.values()) + for i in range(0, nb_children_alive): + call_queue.put_nowait(None) + # Release the queue's resources as soon as possible. + call_queue.close() + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS X. + for p in processes.values(): + p.join() + + reader = result_queue._reader + while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) - result_item = result_queue.get(block=True) - if result_item is not None: - work_item = pending_work_items[result_item.work_id] - del pending_work_items[result_item.work_id] - - if result_item.exception: - work_item.future.set_exception(result_item.exception) - else: - work_item.future.set_result(result_item.result) + sentinels = [p.sentinel for p in processes.values()] + assert sentinels + ready = wait([reader] + sentinels) + if reader in ready: + result_item = reader.recv() + else: + # Mark the process pool broken so that submits fail right now. + executor = executor_reference() + if executor is not None: + executor._broken = True + executor._shutdown_thread = True + executor = None + # All futures in flight must be marked failed + for work_id, work_item in pending_work_items.items(): + work_item.future.set_exception( + BrokenProcessPool( + "A process in the process pool was " + "terminated abruptly while the future was " + "running or pending." + )) + pending_work_items.clear() + # Terminate remaining workers forcibly: the queues or their + # locks may be in a dirty state and block forever. + for p in processes.values(): + p.terminate() + shutdown_worker() + return + if isinstance(result_item, int): + # Clean shutdown of a worker using its PID + # (avoids marking the executor broken) + assert shutting_down() + p = processes.pop(result_item) + p.join() + if not processes: + shutdown_worker() + return + elif result_item is not None: + work_item = pending_work_items.pop(result_item.work_id, None) + # work_item can be None if another process terminated (see above) + if work_item is not None: + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) # Check whether we should start shutting down. executor = executor_reference() # No more work items can be added if: # - The interpreter is shutting down OR # - The executor that owns this worker has been collected OR # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - while nb_shutdown_processes < len(processes): - shutdown_one_process() - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() - call_queue.close() - return - del executor + if shutting_down(): + try: + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + shutdown_worker() + return + except Full: + # This is not a problem: we will eventually be woken up (in + # result_queue.get()) and be able to send a sentinel again. + pass + executor = None _system_limits_checked = False _system_limited = None @@ -243,7 +292,6 @@ def _check_system_limits(): raise NotImplementedError(_system_limited) _system_limits_checked = True try: - import os nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") except (AttributeError, ValueError): # sysconf not available or setting not available @@ -259,6 +307,14 @@ def _check_system_limits(): _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max raise NotImplementedError(_system_limited) + +class BrokenProcessPool(RuntimeError): + """ + Raised when a process in a ProcessPoolExecutor terminated abruptly + while a future was in the running state. + """ + + class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None): """Initializes a new ProcessPoolExecutor instance. @@ -280,14 +336,20 @@ class ProcessPoolExecutor(_base.Executor): # because futures in the call queue cannot be cancelled. self._call_queue = multiprocessing.Queue(self._max_workers + EXTRA_QUEUED_CALLS) - self._result_queue = multiprocessing.Queue() + # Killed worker processes can produce spurious "broken pipe" + # tracebacks in the queue's own worker thread. But we detect killed + # processes anyway, so silence the tracebacks. + self._call_queue._ignore_epipe = True + self._result_queue = SimpleQueue() self._work_ids = queue.Queue() self._queue_management_thread = None - self._processes = set() + # Map of pids to processes + self._processes = {} # Shutdown is a two-step process. self._shutdown_thread = False self._shutdown_lock = threading.Lock() + self._broken = False self._queue_count = 0 self._pending_work_items = {} @@ -297,6 +359,8 @@ class ProcessPoolExecutor(_base.Executor): def weakref_cb(_, q=self._result_queue): q.put(None) if self._queue_management_thread is None: + # Start the processes so that their sentinels are known. + self._adjust_process_count() self._queue_management_thread = threading.Thread( target=_queue_management_worker, args=(weakref.ref(self, weakref_cb), @@ -316,10 +380,13 @@ class ProcessPoolExecutor(_base.Executor): args=(self._call_queue, self._result_queue)) p.start() - self._processes.add(p) + self._processes[p.pid] = p def submit(self, fn, *args, **kwargs): with self._shutdown_lock: + if self._broken: + raise BrokenProcessPool('A child process terminated ' + 'abruptly, the process pool is not usable anymore') if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') @@ -333,7 +400,6 @@ class ProcessPoolExecutor(_base.Executor): self._result_queue.put(None) self._start_queue_management_thread() - self._adjust_process_count() return f submit.__doc__ = _base.Executor.submit.__doc__ diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index fbac088..95bb682 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -74,7 +74,7 @@ def _worker(executor_reference, work_queue): work_queue.put(None) return del executor - except BaseException as e: + except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) class ThreadPoolExecutor(_base.Executor): |