summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Moreau <thomas.moreau.2010@gmail.com>2020-03-01 20:49:14 (GMT)
committerGitHub <noreply@github.com>2020-03-01 20:49:14 (GMT)
commit0e89076247580ba0e570c4816f0e5628a7e36e83 (patch)
tree1d1d8ceede408eec2b138be7e2f13ca2beaf1f74
parent397b96f6d7a89f778ebc0591e32216a8183fe667 (diff)
downloadcpython-0e89076247580ba0e570c4816f0e5628a7e36e83.zip
cpython-0e89076247580ba0e570c4816f0e5628a7e36e83.tar.gz
cpython-0e89076247580ba0e570c4816f0e5628a7e36e83.tar.bz2
bpo-39678: refactor queue manager thread (GH-18551)
-rw-r--r--Lib/concurrent/futures/process.py436
-rw-r--r--Lib/test/test_concurrent_futures.py16
-rw-r--r--Misc/NEWS.d/next/Library/2020-02-28-12-59-30.bpo-39678.3idfxM.rst2
3 files changed, 239 insertions, 215 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index d773228..39fadcc 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -49,7 +49,6 @@ import atexit
import os
from concurrent.futures import _base
import queue
-from queue import Full
import multiprocessing as mp
import multiprocessing.connection
from multiprocessing.queues import Queue
@@ -176,8 +175,9 @@ class _SafeQueue(Queue):
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
self.thread_wakeup.wakeup()
- # work_item can be None if another process terminated. In this case,
- # the queue_manager_thread fails all work_items with BrokenProcessPool
+ # work_item can be None if another process terminated. In this
+ # case, the executor_manager_thread fails all work_items
+ # with BrokenProcessPool
if work_item is not None:
work_item.future.set_exception(e)
else:
@@ -193,6 +193,7 @@ def _get_chunks(*iterables, chunksize):
return
yield chunk
+
def _process_chunk(fn, chunk):
""" Processes a chunk of an iterable passed to map.
@@ -256,122 +257,123 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
del call_item
-def _add_call_item_to_queue(pending_work_items,
- work_ids,
- call_queue):
- """Fills call_queue with _WorkItems from pending_work_items.
+class _ExecutorManagerThread(threading.Thread):
+ """Manages the communication between this process and the worker processes.
- This function never blocks.
+ The manager is run in a local thread.
Args:
- pending_work_items: A dict mapping work ids to _WorkItems e.g.
- {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
- work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
- are consumed and the corresponding _WorkItems from
- pending_work_items are transformed into _CallItems and put in
- call_queue.
- call_queue: A multiprocessing.Queue that will be filled with _CallItems
- derived from _WorkItems.
+ executor: A reference to the ProcessPoolExecutor that owns
+ this thread. A weakref will be own by the manager as well as
+ references to internal objects used to introspect the state of
+ the executor.
"""
- while True:
- if call_queue.full():
- return
- try:
- work_id = work_ids.get(block=False)
- except queue.Empty:
- return
- else:
- work_item = pending_work_items[work_id]
-
- if work_item.future.set_running_or_notify_cancel():
- call_queue.put(_CallItem(work_id,
- work_item.fn,
- work_item.args,
- work_item.kwargs),
- block=True)
- else:
- del pending_work_items[work_id]
- continue
+ def __init__(self, executor):
+ # Store references to necessary internals of the executor.
-def _queue_management_worker(executor_reference,
- processes,
- pending_work_items,
- work_ids_queue,
- call_queue,
- result_queue,
- thread_wakeup):
- """Manages the communication between this process and the worker processes.
+ # A _ThreadWakeup to allow waking up the queue_manager_thread from the
+ # main Thread and avoid deadlocks caused by permanently locked queues.
+ self.thread_wakeup = executor._executor_manager_thread_wakeup
- This function is run in a local thread.
+ # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
+ # to determine if the ProcessPoolExecutor has been garbage collected
+ # and that the manager can exit.
+ # When the executor gets garbage collected, the weakref callback
+ # will wake up the queue management thread so that it can terminate
+ # if there is no pending work item.
+ def weakref_cb(_, thread_wakeup=self.thread_wakeup):
+ mp.util.debug('Executor collected: triggering callback for'
+ ' QueueManager wakeup')
+ thread_wakeup.wakeup()
- Args:
- executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
- this thread. Used to determine if the ProcessPoolExecutor has been
- garbage collected and that this function can exit.
- process: A list of the ctx.Process instances used as
- workers.
- pending_work_items: A dict mapping work ids to _WorkItems e.g.
- {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
- work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
- call_queue: A ctx.Queue that will be filled with _CallItems
- derived from _WorkItems for processing by the process workers.
- result_queue: A ctx.SimpleQueue of _ResultItems generated by the
- process workers.
- thread_wakeup: A _ThreadWakeup to allow waking up the
- queue_manager_thread from the main Thread and avoid deadlocks
- caused by permanently locked queues.
- """
- executor = None
+ self.executor_reference = weakref.ref(executor, weakref_cb)
- def shutting_down():
- return (_global_shutdown or executor is None
- or executor._shutdown_thread)
+ # A list of the ctx.Process instances used as workers.
+ self.processes = executor._processes
- def shutdown_worker():
- # This is an upper bound on the number of children alive.
- n_children_alive = sum(p.is_alive() for p in processes.values())
- n_children_to_stop = n_children_alive
- n_sentinels_sent = 0
- # Send the right number of sentinels, to make sure all children are
- # properly terminated.
- while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
- for i in range(n_children_to_stop - n_sentinels_sent):
- try:
- call_queue.put_nowait(None)
- n_sentinels_sent += 1
- except Full:
- break
- n_children_alive = sum(p.is_alive() for p in processes.values())
+ # A ctx.Queue that will be filled with _CallItems derived from
+ # _WorkItems for processing by the process workers.
+ self.call_queue = executor._call_queue
- # Release the queue's resources as soon as possible.
- call_queue.close()
- call_queue.join_thread()
- thread_wakeup.close()
- # If .join() is not called on the created processes then
- # some ctx.Queue methods may deadlock on Mac OS X.
- for p in processes.values():
- p.join()
+ # A ctx.SimpleQueue of _ResultItems generated by the process workers.
+ self.result_queue = executor._result_queue
- result_reader = result_queue._reader
- wakeup_reader = thread_wakeup._reader
- readers = [result_reader, wakeup_reader]
+ # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
+ self.work_ids_queue = executor._work_ids
- while True:
- _add_call_item_to_queue(pending_work_items,
- work_ids_queue,
- call_queue)
+ # A dict mapping work ids to _WorkItems e.g.
+ # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
+ self.pending_work_items = executor._pending_work_items
+
+ # Set this thread to be daemonized
+ super().__init__()
+ self.daemon = True
+ def run(self):
+ # Main loop for the executor manager thread.
+
+ while True:
+ self.add_call_item_to_queue()
+
+ result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
+
+ if is_broken:
+ self.terminate_broken(cause)
+ return
+ if result_item is not None:
+ self.process_result_item(result_item)
+ # Delete reference to result_item to avoid keeping references
+ # while waiting on new results.
+ del result_item
+
+ if self.is_shutting_down():
+ self.flag_executor_shutting_down()
+
+ # Since no new work items can be added, it is safe to shutdown
+ # this thread if there are no pending work items.
+ if not self.pending_work_items:
+ self.join_executor_internals()
+ return
+
+ def add_call_item_to_queue(self):
+ # Fills call_queue with _WorkItems from pending_work_items.
+ # This function never blocks.
+ while True:
+ if self.call_queue.full():
+ return
+ try:
+ work_id = self.work_ids_queue.get(block=False)
+ except queue.Empty:
+ return
+ else:
+ work_item = self.pending_work_items[work_id]
+
+ if work_item.future.set_running_or_notify_cancel():
+ self.call_queue.put(_CallItem(work_id,
+ work_item.fn,
+ work_item.args,
+ work_item.kwargs),
+ block=True)
+ else:
+ del self.pending_work_items[work_id]
+ continue
+
+ def wait_result_broken_or_wakeup(self):
# Wait for a result to be ready in the result_queue while checking
# that all worker processes are still running, or for a wake up
# signal send. The wake up signals come either from new tasks being
# submitted, from the executor being shutdown/gc-ed, or from the
# shutdown of the python interpreter.
- worker_sentinels = [p.sentinel for p in processes.values()]
+ result_reader = self.result_queue._reader
+ wakeup_reader = self.thread_wakeup._reader
+ readers = [result_reader, wakeup_reader]
+ worker_sentinels = [p.sentinel for p in self.processes.values()]
ready = mp.connection.wait(readers + worker_sentinels)
cause = None
is_broken = True
+ result_item = None
if result_reader in ready:
try:
result_item = result_reader.recv()
@@ -381,97 +383,135 @@ def _queue_management_worker(executor_reference,
elif wakeup_reader in ready:
is_broken = False
- result_item = None
- thread_wakeup.clear()
- if is_broken:
- # Mark the process pool broken so that submits fail right now.
- executor = executor_reference()
- if executor is not None:
- executor._broken = ('A child process terminated '
- 'abruptly, the process pool is not '
- 'usable anymore')
- executor._shutdown_thread = True
- executor = None
- bpe = BrokenProcessPool("A process in the process pool was "
- "terminated abruptly while the future was "
- "running or pending.")
- if cause is not None:
- bpe.__cause__ = _RemoteTraceback(
- f"\n'''\n{''.join(cause)}'''")
- # All futures in flight must be marked failed
- for work_id, work_item in pending_work_items.items():
- work_item.future.set_exception(bpe)
- # Delete references to object. See issue16284
- del work_item
- pending_work_items.clear()
- # Terminate remaining workers forcibly: the queues or their
- # locks may be in a dirty state and block forever.
- for p in processes.values():
- p.terminate()
- shutdown_worker()
- return
+ self.thread_wakeup.clear()
+
+ return result_item, is_broken, cause
+
+ def process_result_item(self, result_item):
+ # Process the received a result_item. This can be either the PID of a
+ # worker that exited gracefully or a _ResultItem
+
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
- assert shutting_down()
- p = processes.pop(result_item)
+ assert self.is_shutting_down()
+ p = self.processes.pop(result_item)
p.join()
- if not processes:
- shutdown_worker()
+ if not self.processes:
+ self.join_executor_internals()
return
- elif result_item is not None:
- work_item = pending_work_items.pop(result_item.work_id, None)
+ else:
+ # Received a _ResultItem so mark the future as completed.
+ work_item = self.pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
- # Delete references to object. See issue16284
- del work_item
- # Delete reference to result_item
- del result_item
- # Check whether we should start shutting down.
- executor = executor_reference()
+ def is_shutting_down(self):
+ # Check whether we should start shutting down the executor.
+ executor = self.executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
- if shutting_down():
- try:
- # Flag the executor as shutting down as early as possible if it
- # is not gc-ed yet.
- if executor is not None:
- executor._shutdown_thread = True
- # Unless there are pending work items, we have nothing to cancel.
- if pending_work_items and executor._cancel_pending_futures:
- # Cancel all pending futures and update pending_work_items
- # to only have futures that are currently running.
- new_pending_work_items = {}
- for work_id, work_item in pending_work_items.items():
- if not work_item.future.cancel():
- new_pending_work_items[work_id] = work_item
-
- pending_work_items = new_pending_work_items
- # Drain work_ids_queue since we no longer need to
- # add items to the call queue.
- while True:
- try:
- work_ids_queue.get_nowait()
- except queue.Empty:
- break
+ return (_global_shutdown or executor is None
+ or executor._shutdown_thread)
- # Since no new work items can be added, it is safe to shutdown
- # this thread if there are no pending work items.
- if not pending_work_items:
- shutdown_worker()
- return
- except Full:
- # This is not a problem: we will eventually be woken up (in
- # result_queue.get()) and be able to send a sentinel again.
- pass
- executor = None
+ def terminate_broken(self, cause):
+ # Terminate the executor because it is in a broken state. The cause
+ # argument can be used to display more information on the error that
+ # lead the executor into becoming broken.
+
+ # Mark the process pool broken so that submits fail right now.
+ executor = self.executor_reference()
+ if executor is not None:
+ executor._broken = ('A child process terminated '
+ 'abruptly, the process pool is not '
+ 'usable anymore')
+ executor._shutdown_thread = True
+ executor = None
+
+ # All pending tasks are to be marked failed with the following
+ # BrokenProcessPool error
+ bpe = BrokenProcessPool("A process in the process pool was "
+ "terminated abruptly while the future was "
+ "running or pending.")
+ if cause is not None:
+ bpe.__cause__ = _RemoteTraceback(
+ f"\n'''\n{''.join(cause)}'''")
+
+ # Mark pending tasks as failed.
+ for work_id, work_item in self.pending_work_items.items():
+ work_item.future.set_exception(bpe)
+ # Delete references to object. See issue16284
+ del work_item
+ self.pending_work_items.clear()
+
+ # Terminate remaining workers forcibly: the queues or their
+ # locks may be in a dirty state and block forever.
+ for p in self.processes.values():
+ p.terminate()
+
+ # clean up resources
+ self.join_executor_internals()
+
+ def flag_executor_shutting_down(self):
+ # Flag the executor as shutting down and cancel remaining tasks if
+ # requested as early as possible if it is not gc-ed yet.
+ executor = self.executor_reference()
+ if executor is not None:
+ executor._shutdown_thread = True
+ # Cancel pending work items if requested.
+ if executor._cancel_pending_futures:
+ # Cancel all pending futures and update pending_work_items
+ # to only have futures that are currently running.
+ new_pending_work_items = {}
+ for work_id, work_item in self.pending_work_items.items():
+ if not work_item.future.cancel():
+ new_pending_work_items[work_id] = work_item
+ self.pending_work_items = new_pending_work_items
+ # Drain work_ids_queue since we no longer need to
+ # add items to the call queue.
+ while True:
+ try:
+ self.work_ids_queue.get_nowait()
+ except queue.Empty:
+ break
+ # Make sure we do this only once to not waste time looping
+ # on running processes over and over.
+ executor._cancel_pending_futures = False
+
+ def shutdown_workers(self):
+ n_children_to_stop = self.get_n_children_alive()
+ n_sentinels_sent = 0
+ # Send the right number of sentinels, to make sure all children are
+ # properly terminated.
+ while (n_sentinels_sent < n_children_to_stop
+ and self.get_n_children_alive() > 0):
+ for i in range(n_children_to_stop - n_sentinels_sent):
+ try:
+ self.call_queue.put_nowait(None)
+ n_sentinels_sent += 1
+ except queue.Full:
+ break
+
+ def join_executor_internals(self):
+ self.shutdown_workers()
+ # Release the queue's resources as soon as possible.
+ self.call_queue.close()
+ self.call_queue.join_thread()
+ self.thread_wakeup.close()
+ # If .join() is not called on the created processes then
+ # some ctx.Queue methods may deadlock on Mac OS X.
+ for p in self.processes.values():
+ p.join()
+
+ def get_n_children_alive(self):
+ # This is an upper bound on the number of children alive.
+ return sum(p.is_alive() for p in self.processes.values())
_system_limits_checked = False
@@ -562,7 +602,7 @@ class ProcessPoolExecutor(_base.Executor):
self._initargs = initargs
# Management thread
- self._queue_management_thread = None
+ self._executor_manager_thread = None
# Map of pids to processes
self._processes = {}
@@ -576,12 +616,12 @@ class ProcessPoolExecutor(_base.Executor):
self._cancel_pending_futures = False
# _ThreadWakeup is a communication channel used to interrupt the wait
- # of the main loop of queue_manager_thread from another thread (e.g.
+ # of the main loop of executor_manager_thread from another thread (e.g.
# when calling executor.submit or executor.shutdown). We do not use the
- # _result_queue to send the wakeup signal to the queue_manager_thread
+ # _result_queue to send wakeup signals to the executor_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
- self._queue_management_thread_wakeup = _ThreadWakeup()
+ self._executor_manager_thread_wakeup = _ThreadWakeup()
# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
@@ -591,7 +631,7 @@ class ProcessPoolExecutor(_base.Executor):
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
- thread_wakeup=self._queue_management_thread_wakeup)
+ thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
@@ -599,32 +639,14 @@ class ProcessPoolExecutor(_base.Executor):
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
- def _start_queue_management_thread(self):
- if self._queue_management_thread is None:
- # When the executor gets garbarge collected, the weakref callback
- # will wake up the queue management thread so that it can terminate
- # if there is no pending work item.
- def weakref_cb(_,
- thread_wakeup=self._queue_management_thread_wakeup):
- mp.util.debug('Executor collected: triggering callback for'
- ' QueueManager wakeup')
- thread_wakeup.wakeup()
+ def _start_executor_manager_thread(self):
+ if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known.
self._adjust_process_count()
- self._queue_management_thread = threading.Thread(
- target=_queue_management_worker,
- args=(weakref.ref(self, weakref_cb),
- self._processes,
- self._pending_work_items,
- self._work_ids,
- self._call_queue,
- self._result_queue,
- self._queue_management_thread_wakeup),
- name="QueueManagerThread")
- self._queue_management_thread.daemon = True
- self._queue_management_thread.start()
- _threads_wakeups[self._queue_management_thread] = \
- self._queue_management_thread_wakeup
+ self._executor_manager_thread = _ExecutorManagerThread(self)
+ self._executor_manager_thread.start()
+ _threads_wakeups[self._executor_manager_thread] = \
+ self._executor_manager_thread_wakeup
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
@@ -654,9 +676,9 @@ class ProcessPoolExecutor(_base.Executor):
self._work_ids.put(self._queue_count)
self._queue_count += 1
# Wake up queue management thread
- self._queue_management_thread_wakeup.wakeup()
+ self._executor_manager_thread_wakeup.wakeup()
- self._start_queue_management_thread()
+ self._start_executor_manager_thread()
return f
submit.__doc__ = _base.Executor.submit.__doc__
@@ -694,20 +716,20 @@ class ProcessPoolExecutor(_base.Executor):
self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True
- if self._queue_management_thread:
+ if self._executor_manager_thread:
# Wake up queue management thread
- self._queue_management_thread_wakeup.wakeup()
+ self._executor_manager_thread_wakeup.wakeup()
if wait:
- self._queue_management_thread.join()
+ self._executor_manager_thread.join()
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
- self._queue_management_thread = None
+ self._executor_manager_thread = None
self._call_queue = None
self._result_queue = None
self._processes = None
- if self._queue_management_thread_wakeup:
- self._queue_management_thread_wakeup = None
+ if self._executor_manager_thread_wakeup:
+ self._executor_manager_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index a7381f9..868415a 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -508,15 +508,15 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(max_workers=5)
res = executor.map(abs, range(-5, 5))
- queue_management_thread = executor._queue_management_thread
+ executor_manager_thread = executor._executor_manager_thread
processes = executor._processes
call_queue = executor._call_queue
- queue_management_thread = executor._queue_management_thread
+ executor_manager_thread = executor._executor_manager_thread
del executor
# Make sure that all the executor resources were properly cleaned by
# the shutdown process
- queue_management_thread.join()
+ executor_manager_thread.join()
for p in processes.values():
p.join()
call_queue.join_thread()
@@ -532,12 +532,12 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
res = executor.map(abs, range(-5, 5))
processes = executor._processes
call_queue = executor._call_queue
- queue_management_thread = executor._queue_management_thread
+ executor_manager_thread = executor._executor_manager_thread
executor.shutdown(wait=False)
# Make sure that all the executor resources were properly cleaned by
# the shutdown process
- queue_management_thread.join()
+ executor_manager_thread.join()
for p in processes.values():
p.join()
call_queue.join_thread()
@@ -1139,11 +1139,11 @@ class ExecutorDeadlockTest:
mp_context=get_context(self.ctx)) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
- # Start the executor and get the queue_management_thread to collect
+ # Start the executor and get the executor_manager_thread to collect
# the threads and avoid dangling thread that should be cleaned up
# asynchronously.
executor.submit(id, 42).result()
- queue_manager = executor._queue_management_thread
+ executor_manager = executor._executor_manager_thread
# Submit a task that fails at pickle and shutdown the executor
# without waiting
@@ -1154,7 +1154,7 @@ class ExecutorDeadlockTest:
# Make sure the executor is eventually shutdown and do not leave
# dangling threads
- queue_manager.join()
+ executor_manager.join()
create_executor_tests(ExecutorDeadlockTest,
diff --git a/Misc/NEWS.d/next/Library/2020-02-28-12-59-30.bpo-39678.3idfxM.rst b/Misc/NEWS.d/next/Library/2020-02-28-12-59-30.bpo-39678.3idfxM.rst
new file mode 100644
index 0000000..8b18e22
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2020-02-28-12-59-30.bpo-39678.3idfxM.rst
@@ -0,0 +1,2 @@
+Refactor queue_manager in :class:`concurrent.futures.ProcessPoolExecutor` to
+make it easier to maintain. \ No newline at end of file