diff options
author | Thomas Moreau <thomas.moreau.2010@gmail.com> | 2018-01-05 10:15:54 (GMT) |
---|---|---|
committer | Antoine Pitrou <pitrou@free.fr> | 2018-01-05 10:15:54 (GMT) |
commit | 94459fd7dc25ce19096f2080eb7339497d319eb0 (patch) | |
tree | 7623769fafc2025884ac9a8b1a41e2f0ba5f13db /Lib/multiprocessing | |
parent | 65f2a6dcc2bc28a8566b74c8e9273f982331ec48 (diff) | |
download | cpython-94459fd7dc25ce19096f2080eb7339497d319eb0.zip cpython-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.gz cpython-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.bz2 |
bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error (#3895)
Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when task arguments or results cause pickling or unpickling errors.
This should make sure that calls to the :class:`ProcessPoolExecutor` API always eventually return.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/queues.py | 21 |
1 files changed, 16 insertions, 5 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 328efbd..d66d37a 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -160,9 +160,10 @@ class Queue(object): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, - self._wlock, self._writer.close, self._ignore_epipe), + self._wlock, self._writer.close, self._ignore_epipe, + self._on_queue_feeder_error), name='QueueFeederThread' - ) + ) self._thread.daemon = True debug('doing self._thread.start()') @@ -201,7 +202,8 @@ class Queue(object): notempty.notify() @staticmethod - def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): + def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, + onerror): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release @@ -253,8 +255,17 @@ class Queue(object): info('error in queue thread: %s', e) return else: - import traceback - traceback.print_exc() + onerror(e, obj) + + @staticmethod + def _on_queue_feeder_error(e, obj): + """ + Private API hook called when feeding data in the background thread + raises an exception. For overriding by concurrent.futures. + """ + import traceback + traceback.print_exc() + _sentinel = object() |