diff options
Diffstat (limited to 'Lib/concurrent/futures/process.py')
-rw-r--r-- | Lib/concurrent/futures/process.py | 41 |
1 files changed, 27 insertions, 14 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 8e9b69a..a76e2c9 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -90,6 +90,7 @@ def _python_exit(): _global_shutdown = True items = list(_threads_wakeups.items()) for _, thread_wakeup in items: + # call not protected by ProcessPoolExecutor._shutdown_lock thread_wakeup.wakeup() for t, _ in items: t.join() @@ -157,8 +158,10 @@ class _CallItem(object): class _SafeQueue(Queue): """Safe Queue set exception to the future object linked to a job""" - def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup): + def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, + thread_wakeup): self.pending_work_items = pending_work_items + self.shutdown_lock = shutdown_lock self.thread_wakeup = thread_wakeup super().__init__(max_size, ctx=ctx) @@ -167,7 +170,8 @@ class _SafeQueue(Queue): tb = traceback.format_exception(type(e), e, e.__traceback__) e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) work_item = self.pending_work_items.pop(obj.work_id, None) - self.thread_wakeup.wakeup() + with self.shutdown_lock: + self.thread_wakeup.wakeup() # work_item can be None if another process terminated. In this # case, the executor_manager_thread fails all work_items # with BrokenProcessPool @@ -268,6 +272,7 @@ class _ExecutorManagerThread(threading.Thread): # A _ThreadWakeup to allow waking up the queue_manager_thread from the # main Thread and avoid deadlocks caused by permanently locked queues. self.thread_wakeup = executor._executor_manager_thread_wakeup + self.shutdown_lock = executor._shutdown_lock # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used # to determine if the ProcessPoolExecutor has been garbage collected @@ -275,10 +280,13 @@ class _ExecutorManagerThread(threading.Thread): # When the executor gets garbage collected, the weakref callback # will wake up the queue management thread so that it can terminate # if there is no pending work item. - def weakref_cb(_, thread_wakeup=self.thread_wakeup): + def weakref_cb(_, + thread_wakeup=self.thread_wakeup, + shutdown_lock=self.shutdown_lock): mp.util.debug('Executor collected: triggering callback for' ' QueueManager wakeup') - thread_wakeup.wakeup() + with shutdown_lock: + thread_wakeup.wakeup() self.executor_reference = weakref.ref(executor, weakref_cb) @@ -363,6 +371,7 @@ class _ExecutorManagerThread(threading.Thread): # submitted, from the executor being shutdown/gc-ed, or from the # shutdown of the python interpreter. result_reader = self.result_queue._reader + assert not self.thread_wakeup._closed wakeup_reader = self.thread_wakeup._reader readers = [result_reader, wakeup_reader] worker_sentinels = [p.sentinel for p in self.processes.values()] @@ -380,7 +389,9 @@ class _ExecutorManagerThread(threading.Thread): elif wakeup_reader in ready: is_broken = False - self.thread_wakeup.clear() + + with self.shutdown_lock: + self.thread_wakeup.clear() return result_item, is_broken, cause @@ -500,7 +511,8 @@ class _ExecutorManagerThread(threading.Thread): # Release the queue's resources as soon as possible. self.call_queue.close() self.call_queue.join_thread() - self.thread_wakeup.close() + with self.shutdown_lock: + self.thread_wakeup.close() # If .join() is not called on the created processes then # some ctx.Queue methods may deadlock on Mac OS X. for p in self.processes.values(): @@ -619,6 +631,8 @@ class ProcessPoolExecutor(_base.Executor): # _result_queue to send wakeup signals to the executor_manager_thread # as it could result in a deadlock if a worker process dies with the # _result_queue write lock still acquired. + # + # _shutdown_lock must be locked to access _ThreadWakeup. self._executor_manager_thread_wakeup = _ThreadWakeup() # Create communication channels for the executor @@ -629,6 +643,7 @@ class ProcessPoolExecutor(_base.Executor): self._call_queue = _SafeQueue( max_size=queue_size, ctx=self._mp_context, pending_work_items=self._pending_work_items, + shutdown_lock=self._shutdown_lock, thread_wakeup=self._executor_manager_thread_wakeup) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed @@ -718,12 +733,12 @@ class ProcessPoolExecutor(_base.Executor): with self._shutdown_lock: self._cancel_pending_futures = cancel_futures self._shutdown_thread = True + if self._executor_manager_thread_wakeup is not None: + # Wake up queue management thread + self._executor_manager_thread_wakeup.wakeup() - if self._executor_manager_thread: - # Wake up queue management thread - self._executor_manager_thread_wakeup.wakeup() - if wait: - self._executor_manager_thread.join() + if self._executor_manager_thread is not None and wait: + self._executor_manager_thread.join() # To reduce the risk of opening too many files, remove references to # objects that use file descriptors. self._executor_manager_thread = None @@ -732,8 +747,6 @@ class ProcessPoolExecutor(_base.Executor): self._result_queue.close() self._result_queue = None self._processes = None - - if self._executor_manager_thread_wakeup: - self._executor_manager_thread_wakeup = None + self._executor_manager_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ |