diff options
-rw-r--r-- | Lib/concurrent/futures/process.py | 11 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 9 | ||||
-rw-r--r-- | Misc/NEWS | 3 |
3 files changed, 17 insertions, 6 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7c22a62..9b2e0f3 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -205,12 +205,12 @@ def _queue_management_worker(executor_reference, nb_children_alive = sum(p.is_alive() for p in processes.values()) for i in range(0, nb_children_alive): call_queue.put_nowait(None) + # Release the queue's resources as soon as possible. + call_queue.close() # If .join() is not called on the created processes then # some multiprocessing.Queue methods may deadlock on Mac OS X. for p in processes.values(): p.join() - # Release resources held by the queue - call_queue.close() while True: _add_call_item_to_queue(pending_work_items, @@ -241,8 +241,7 @@ def _queue_management_worker(executor_reference, # locks may be in a dirty state and block forever. for p in processes.values(): p.terminate() - for p in processes.values(): - p.join() + shutdown_worker() return if isinstance(result_item, int): # Clean shutdown of a worker using its PID @@ -337,6 +336,10 @@ class ProcessPoolExecutor(_base.Executor): # because futures in the call queue cannot be cancelled. self._call_queue = multiprocessing.Queue(self._max_workers + EXTRA_QUEUED_CALLS) + # Killed worker processes can produce spurious "broken pipe" + # tracebacks in the queue's own worker thread. But we detect killed + # processes anyway, so silence the tracebacks. + self._call_queue._ignore_epipe = True self._result_queue = SimpleQueue() self._work_ids = queue.Queue() self._queue_management_thread = None diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 3324363..4696ccc 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -41,6 +41,7 @@ import collections import time import atexit import weakref +import errno from queue import Empty, Full import _multiprocessing @@ -67,6 +68,8 @@ class Queue(object): else: self._wlock = Lock() self._sem = BoundedSemaphore(maxsize) + # For use by concurrent.futures + self._ignore_epipe = False self._after_fork() @@ -178,7 +181,7 @@ class Queue(object): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send, - self._wlock, self._writer.close), + self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) self._thread.daemon = True @@ -229,7 +232,7 @@ class Queue(object): notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close): + def _feed(buffer, notempty, send, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -271,6 +274,8 @@ class Queue(object): except IndexError: pass except Exception as e: + if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: + return # Since this runs in a daemon thread the resources it uses # may be become unusable while the process is cleaning up. # We ignore errors which happen after the process has @@ -228,6 +228,9 @@ Core and Builtins Library ------- +- Silence spurious "broken pipe" tracebacks when shutting down a + ProcessPoolExecutor. + - Fix potential resource leaks in concurrent.futures.ProcessPoolExecutor by joining all queues and processes when shutdown() is called. |