summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent/futures/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/concurrent/futures/process.py')
-rw-r--r--Lib/concurrent/futures/process.py41
1 files changed, 27 insertions, 14 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 8e9b69a..a76e2c9 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -90,6 +90,7 @@ def _python_exit():
_global_shutdown = True
items = list(_threads_wakeups.items())
for _, thread_wakeup in items:
+ # call not protected by ProcessPoolExecutor._shutdown_lock
thread_wakeup.wakeup()
for t, _ in items:
t.join()
@@ -157,8 +158,10 @@ class _CallItem(object):
class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
- def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
+ def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
+ thread_wakeup):
self.pending_work_items = pending_work_items
+ self.shutdown_lock = shutdown_lock
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)
@@ -167,7 +170,8 @@ class _SafeQueue(Queue):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
- self.thread_wakeup.wakeup()
+ with self.shutdown_lock:
+ self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this
# case, the executor_manager_thread fails all work_items
# with BrokenProcessPool
@@ -268,6 +272,7 @@ class _ExecutorManagerThread(threading.Thread):
# A _ThreadWakeup to allow waking up the queue_manager_thread from the
# main Thread and avoid deadlocks caused by permanently locked queues.
self.thread_wakeup = executor._executor_manager_thread_wakeup
+ self.shutdown_lock = executor._shutdown_lock
# A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
# to determine if the ProcessPoolExecutor has been garbage collected
@@ -275,10 +280,13 @@ class _ExecutorManagerThread(threading.Thread):
# When the executor gets garbage collected, the weakref callback
# will wake up the queue management thread so that it can terminate
# if there is no pending work item.
- def weakref_cb(_, thread_wakeup=self.thread_wakeup):
+ def weakref_cb(_,
+ thread_wakeup=self.thread_wakeup,
+ shutdown_lock=self.shutdown_lock):
mp.util.debug('Executor collected: triggering callback for'
' QueueManager wakeup')
- thread_wakeup.wakeup()
+ with shutdown_lock:
+ thread_wakeup.wakeup()
self.executor_reference = weakref.ref(executor, weakref_cb)
@@ -363,6 +371,7 @@ class _ExecutorManagerThread(threading.Thread):
# submitted, from the executor being shutdown/gc-ed, or from the
# shutdown of the python interpreter.
result_reader = self.result_queue._reader
+ assert not self.thread_wakeup._closed
wakeup_reader = self.thread_wakeup._reader
readers = [result_reader, wakeup_reader]
worker_sentinels = [p.sentinel for p in self.processes.values()]
@@ -380,7 +389,9 @@ class _ExecutorManagerThread(threading.Thread):
elif wakeup_reader in ready:
is_broken = False
- self.thread_wakeup.clear()
+
+ with self.shutdown_lock:
+ self.thread_wakeup.clear()
return result_item, is_broken, cause
@@ -500,7 +511,8 @@ class _ExecutorManagerThread(threading.Thread):
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
- self.thread_wakeup.close()
+ with self.shutdown_lock:
+ self.thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in self.processes.values():
@@ -619,6 +631,8 @@ class ProcessPoolExecutor(_base.Executor):
# _result_queue to send wakeup signals to the executor_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
+ #
+ # _shutdown_lock must be locked to access _ThreadWakeup.
self._executor_manager_thread_wakeup = _ThreadWakeup()
# Create communication channels for the executor
@@ -629,6 +643,7 @@ class ProcessPoolExecutor(_base.Executor):
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
+ shutdown_lock=self._shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
@@ -718,12 +733,12 @@ class ProcessPoolExecutor(_base.Executor):
with self._shutdown_lock:
self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True
+ if self._executor_manager_thread_wakeup is not None:
+ # Wake up queue management thread
+ self._executor_manager_thread_wakeup.wakeup()
- if self._executor_manager_thread:
- # Wake up queue management thread
- self._executor_manager_thread_wakeup.wakeup()
- if wait:
- self._executor_manager_thread.join()
+ if self._executor_manager_thread is not None and wait:
+ self._executor_manager_thread.join()
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._executor_manager_thread = None
@@ -732,8 +747,6 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue.close()
self._result_queue = None
self._processes = None
-
- if self._executor_manager_thread_wakeup:
- self._executor_manager_thread_wakeup = None
+ self._executor_manager_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__