diff options
Diffstat (limited to 'Lib/concurrent/futures/process.py')
-rw-r--r-- | Lib/concurrent/futures/process.py | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7c22a62..9b2e0f3 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -205,12 +205,12 @@ def _queue_management_worker(executor_reference, nb_children_alive = sum(p.is_alive() for p in processes.values()) for i in range(0, nb_children_alive): call_queue.put_nowait(None) + # Release the queue's resources as soon as possible. + call_queue.close() # If .join() is not called on the created processes then # some multiprocessing.Queue methods may deadlock on Mac OS X. for p in processes.values(): p.join() - # Release resources held by the queue - call_queue.close() while True: _add_call_item_to_queue(pending_work_items, @@ -241,8 +241,7 @@ def _queue_management_worker(executor_reference, # locks may be in a dirty state and block forever. for p in processes.values(): p.terminate() - for p in processes.values(): - p.join() + shutdown_worker() return if isinstance(result_item, int): # Clean shutdown of a worker using its PID @@ -337,6 +336,10 @@ class ProcessPoolExecutor(_base.Executor): # because futures in the call queue cannot be cancelled. self._call_queue = multiprocessing.Queue(self._max_workers + EXTRA_QUEUED_CALLS) + # Killed worker processes can produce spurious "broken pipe" + # tracebacks in the queue's own worker thread. But we detect killed + # processes anyway, so silence the tracebacks. + self._call_queue._ignore_epipe = True self._result_queue = SimpleQueue() self._work_ids = queue.Queue() self._queue_management_thread = None |