summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/queues.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r--Lib/multiprocessing/queues.py21
1 files changed, 16 insertions, 5 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 328efbd..d66d37a 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -160,9 +160,10 @@ class Queue(object):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes,
- self._wlock, self._writer.close, self._ignore_epipe),
+ self._wlock, self._writer.close, self._ignore_epipe,
+ self._on_queue_feeder_error),
name='QueueFeederThread'
- )
+ )
self._thread.daemon = True
debug('doing self._thread.start()')
@@ -201,7 +202,8 @@ class Queue(object):
notempty.notify()
@staticmethod
- def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
+ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
+ onerror):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
@@ -253,8 +255,17 @@ class Queue(object):
info('error in queue thread: %s', e)
return
else:
- import traceback
- traceback.print_exc()
+ onerror(e, obj)
+
+ @staticmethod
+ def _on_queue_feeder_error(e, obj):
+ """
+ Private API hook called when feeding data in the background thread
+ raises an exception. For overriding by concurrent.futures.
+ """
+ import traceback
+ traceback.print_exc()
+
_sentinel = object()