diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2011-07-02 19:20:25 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2011-07-02 19:20:25 (GMT) |
commit | 020436b0d4ae271638ed5d0881c1fa7f7c0a1b09 (patch) | |
tree | ad620d4691d253b267ff7c3d03c7bc3811bceca9 /Lib/concurrent/futures | |
parent | aac0f75b3b28513c45116534720e66b0262e2e72 (diff) | |
download | cpython-020436b0d4ae271638ed5d0881c1fa7f7c0a1b09.zip cpython-020436b0d4ae271638ed5d0881c1fa7f7c0a1b09.tar.gz cpython-020436b0d4ae271638ed5d0881c1fa7f7c0a1b09.tar.bz2 |
Issue #12456: fix a possible hang on shutdown of a concurrent.futures.ProcessPoolExecutor.
Diffstat (limited to 'Lib/concurrent/futures')
-rw-r--r-- | Lib/concurrent/futures/process.py | 30 |
1 files changed, 21 insertions, 9 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index c2331e7..689f9ba 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -50,7 +50,7 @@ import os from concurrent.futures import _base import queue import multiprocessing -from multiprocessing.queues import SimpleQueue, SentinelReady +from multiprocessing.queues import SimpleQueue, SentinelReady, Full import threading import weakref @@ -195,6 +195,10 @@ def _queue_management_worker(executor_reference, result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. """ + executor = None + + def shutting_down(): + return _shutdown or executor is None or executor._shutdown_thread def shutdown_worker(): # This is an upper bound @@ -202,8 +206,7 @@ def _queue_management_worker(executor_reference, for i in range(0, nb_children_alive): call_queue.put(None) # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. + # some multiprocessing.Queue methods may deadlock on Mac OS X. for p in processes.values(): p.join() @@ -222,7 +225,7 @@ def _queue_management_worker(executor_reference, if executor is not None: executor._broken = True executor._shutdown_thread = True - del executor + executor = None # All futures in flight must be marked failed for work_id, work_item in pending_work_items.items(): work_item.future.set_exception( @@ -242,7 +245,11 @@ def _queue_management_worker(executor_reference, if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) + assert shutting_down() del processes[result_item] + if not processes: + shutdown_worker() + return elif result_item is not None: work_item = pending_work_items.pop(result_item.work_id, None) # work_item can be None if another process terminated (see above) @@ -257,16 +264,21 @@ def _queue_management_worker(executor_reference, # - The interpreter is shutting down OR # - The executor that owns this worker has been collected OR # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: + if shutting_down(): # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. - if not pending_work_items: + if not pending_work_items and call_queue.qsize() == 0: shutdown_worker() return - else: + try: # Start shutting down by telling a process it can exit. - call_queue.put(None) - del executor + call_queue.put_nowait(None) + except Full: + # This is not a problem: we will eventually be woken up (in + # result_queue.get()) and be able to send a sentinel again, + # if necessary. + pass + executor = None _system_limits_checked = False _system_limited = None |