summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/queues.py
diff options
context:
space:
mode:
authorThomas Moreau <thomas.moreau.2010@gmail.com>2018-01-05 10:15:54 (GMT)
committerAntoine Pitrou <pitrou@free.fr>2018-01-05 10:15:54 (GMT)
commit94459fd7dc25ce19096f2080eb7339497d319eb0 (patch)
tree7623769fafc2025884ac9a8b1a41e2f0ba5f13db /Lib/multiprocessing/queues.py
parent65f2a6dcc2bc28a8566b74c8e9273f982331ec48 (diff)
downloadcpython-94459fd7dc25ce19096f2080eb7339497d319eb0.zip
cpython-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.gz
cpython-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.bz2
bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error (#3895)
Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when task arguments or results cause pickling or unpickling errors. This should make sure that calls to the :class:`ProcessPoolExecutor` API always eventually return.
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r--Lib/multiprocessing/queues.py21
1 files changed, 16 insertions, 5 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 328efbd..d66d37a 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -160,9 +160,10 @@ class Queue(object):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes,
- self._wlock, self._writer.close, self._ignore_epipe),
+ self._wlock, self._writer.close, self._ignore_epipe,
+ self._on_queue_feeder_error),
name='QueueFeederThread'
- )
+ )
self._thread.daemon = True
debug('doing self._thread.start()')
@@ -201,7 +202,8 @@ class Queue(object):
notempty.notify()
@staticmethod
- def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
+ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
+ onerror):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
@@ -253,8 +255,17 @@ class Queue(object):
info('error in queue thread: %s', e)
return
else:
- import traceback
- traceback.print_exc()
+ onerror(e, obj)
+
+ @staticmethod
+ def _on_queue_feeder_error(e, obj):
+ """
+ Private API hook called when feeding data in the background thread
+ raises an exception. For overriding by concurrent.futures.
+ """
+ import traceback
+ traceback.print_exc()
+
_sentinel = object()