summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2011-07-15 23:51:58 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2011-07-15 23:51:58 (GMT)
commitdc19c24832fac20402f6cf4d2396c299a73766bb (patch)
tree310ee12d3a4ccaf1ceb12cda042ec2bb54271673 /Lib
parentd06a065a441896477f8dc4f5543654f6ba20bb51 (diff)
downloadcpython-dc19c24832fac20402f6cf4d2396c299a73766bb.zip
cpython-dc19c24832fac20402f6cf4d2396c299a73766bb.tar.gz
cpython-dc19c24832fac20402f6cf4d2396c299a73766bb.tar.bz2
Silence spurious "broken pipe" tracebacks when shutting down a ProcessPoolExecutor.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/concurrent/futures/process.py11
-rw-r--r--Lib/multiprocessing/queues.py9
2 files changed, 14 insertions, 6 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 7c22a62..9b2e0f3 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -205,12 +205,12 @@ def _queue_management_worker(executor_reference,
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
call_queue.put_nowait(None)
+ # Release the queue's resources as soon as possible.
+ call_queue.close()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS X.
for p in processes.values():
p.join()
- # Release resources held by the queue
- call_queue.close()
while True:
_add_call_item_to_queue(pending_work_items,
@@ -241,8 +241,7 @@ def _queue_management_worker(executor_reference,
# locks may be in a dirty state and block forever.
for p in processes.values():
p.terminate()
- for p in processes.values():
- p.join()
+ shutdown_worker()
return
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
@@ -337,6 +336,10 @@ class ProcessPoolExecutor(_base.Executor):
# because futures in the call queue cannot be cancelled.
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
+ # Killed worker processes can produce spurious "broken pipe"
+ # tracebacks in the queue's own worker thread. But we detect killed
+ # processes anyway, so silence the tracebacks.
+ self._call_queue._ignore_epipe = True
self._result_queue = SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 3324363..4696ccc 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -41,6 +41,7 @@ import collections
import time
import atexit
import weakref
+import errno
from queue import Empty, Full
import _multiprocessing
@@ -67,6 +68,8 @@ class Queue(object):
else:
self._wlock = Lock()
self._sem = BoundedSemaphore(maxsize)
+ # For use by concurrent.futures
+ self._ignore_epipe = False
self._after_fork()
@@ -178,7 +181,7 @@ class Queue(object):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send,
- self._wlock, self._writer.close),
+ self._wlock, self._writer.close, self._ignore_epipe),
name='QueueFeederThread'
)
self._thread.daemon = True
@@ -229,7 +232,7 @@ class Queue(object):
notempty.release()
@staticmethod
- def _feed(buffer, notempty, send, writelock, close):
+ def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe')
from .util import is_exiting
@@ -271,6 +274,8 @@ class Queue(object):
except IndexError:
pass
except Exception as e:
+ if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
+ return
# Since this runs in a daemon thread the resources it uses
# may be become unusable while the process is cleaning up.
# We ignore errors which happen after the process has