diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2011-07-15 23:51:58 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2011-07-15 23:51:58 (GMT) |
commit | dc19c24832fac20402f6cf4d2396c299a73766bb (patch) | |
tree | 310ee12d3a4ccaf1ceb12cda042ec2bb54271673 /Lib/concurrent/futures/process.py | |
parent | d06a065a441896477f8dc4f5543654f6ba20bb51 (diff) | |
download | cpython-dc19c24832fac20402f6cf4d2396c299a73766bb.zip cpython-dc19c24832fac20402f6cf4d2396c299a73766bb.tar.gz cpython-dc19c24832fac20402f6cf4d2396c299a73766bb.tar.bz2 |
Silence spurious "broken pipe" tracebacks when shutting down a ProcessPoolExecutor.
Diffstat (limited to 'Lib/concurrent/futures/process.py')
-rw-r--r-- | Lib/concurrent/futures/process.py | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7c22a62..9b2e0f3 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -205,12 +205,12 @@ def _queue_management_worker(executor_reference, nb_children_alive = sum(p.is_alive() for p in processes.values()) for i in range(0, nb_children_alive): call_queue.put_nowait(None) + # Release the queue's resources as soon as possible. + call_queue.close() # If .join() is not called on the created processes then # some multiprocessing.Queue methods may deadlock on Mac OS X. for p in processes.values(): p.join() - # Release resources held by the queue - call_queue.close() while True: _add_call_item_to_queue(pending_work_items, @@ -241,8 +241,7 @@ def _queue_management_worker(executor_reference, # locks may be in a dirty state and block forever. for p in processes.values(): p.terminate() - for p in processes.values(): - p.join() + shutdown_worker() return if isinstance(result_item, int): # Clean shutdown of a worker using its PID @@ -337,6 +336,10 @@ class ProcessPoolExecutor(_base.Executor): # because futures in the call queue cannot be cancelled. self._call_queue = multiprocessing.Queue(self._max_workers + EXTRA_QUEUED_CALLS) + # Killed worker processes can produce spurious "broken pipe" + # tracebacks in the queue's own worker thread. But we detect killed + # processes anyway, so silence the tracebacks. + self._call_queue._ignore_epipe = True self._result_queue = SimpleQueue() self._work_ids = queue.Queue() self._queue_management_thread = None |