summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Moreau <thomas.moreau.2010@gmail.com>2018-01-05 10:15:54 (GMT)
committerAntoine Pitrou <pitrou@free.fr>2018-01-05 10:15:54 (GMT)
commit94459fd7dc25ce19096f2080eb7339497d319eb0 (patch)
tree7623769fafc2025884ac9a8b1a41e2f0ba5f13db
parent65f2a6dcc2bc28a8566b74c8e9273f982331ec48 (diff)
downloadcpython-94459fd7dc25ce19096f2080eb7339497d319eb0.zip
cpython-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.gz
cpython-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.bz2
bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error (#3895)
Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when task arguments or results cause pickling or unpickling errors. This should make sure that calls to the :class:`ProcessPoolExecutor` API always eventually return.
-rw-r--r--Lib/concurrent/futures/process.py208
-rw-r--r--Lib/multiprocessing/queues.py21
-rw-r--r--Lib/test/_test_multiprocessing.py37
-rw-r--r--Lib/test/test_concurrent_futures.py172
-rw-r--r--Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst4
5 files changed, 386 insertions, 56 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 35af65d..aaa5151 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -8,10 +8,10 @@ The follow diagram and text describe the data-flow through the system:
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
-| | => | Work Ids | => | | => | Call Q | => | |
-| | +----------+ | | +-----------+ | |
-| | | ... | | | | ... | | |
-| | | 6 | | | | 5, call() | | |
+| | => | Work Ids | | | | Call Q | | Process |
+| | +----------+ | | +-----------+ | Pool |
+| | | ... | | | | ... | +---------+
+| | | 6 | => | | => | 5, call() | => | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
@@ -52,6 +52,7 @@ import queue
from queue import Full
import multiprocessing as mp
from multiprocessing.connection import wait
+from multiprocessing.queues import Queue
import threading
import weakref
from functools import partial
@@ -72,16 +73,31 @@ import traceback
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
-_threads_queues = weakref.WeakKeyDictionary()
+_threads_wakeups = weakref.WeakKeyDictionary()
_global_shutdown = False
+
+class _ThreadWakeup:
+ __slot__ = ["_state"]
+
+ def __init__(self):
+ self._reader, self._writer = mp.Pipe(duplex=False)
+
+ def wakeup(self):
+ self._writer.send_bytes(b"")
+
+ def clear(self):
+ while self._reader.poll():
+ self._reader.recv_bytes()
+
+
def _python_exit():
global _global_shutdown
_global_shutdown = True
- items = list(_threads_queues.items())
- for t, q in items:
- q.put(None)
- for t, q in items:
+ items = list(_threads_wakeups.items())
+ for _, thread_wakeup in items:
+ thread_wakeup.wakeup()
+ for t, _ in items:
t.join()
# Controls how many more calls than processes will be queued in the call queue.
@@ -90,6 +106,7 @@ def _python_exit():
# (Futures in the call queue cannot be cancelled).
EXTRA_QUEUED_CALLS = 1
+
# Hack to embed stringification of remote traceback in local traceback
class _RemoteTraceback(Exception):
@@ -132,6 +149,25 @@ class _CallItem(object):
self.kwargs = kwargs
+class _SafeQueue(Queue):
+ """Safe Queue set exception to the future object linked to a job"""
+ def __init__(self, max_size=0, *, ctx, pending_work_items):
+ self.pending_work_items = pending_work_items
+ super().__init__(max_size, ctx=ctx)
+
+ def _on_queue_feeder_error(self, e, obj):
+ if isinstance(obj, _CallItem):
+ tb = traceback.format_exception(type(e), e, e.__traceback__)
+ e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
+ work_item = self.pending_work_items.pop(obj.work_id, None)
+ # work_item can be None if another process terminated. In this case,
+ # the queue_manager_thread fails all work_items with BrokenProcessPool
+ if work_item is not None:
+ work_item.future.set_exception(e)
+ else:
+ super()._on_queue_feeder_error(e, obj)
+
+
def _get_chunks(*iterables, chunksize):
""" Iterates over zip()ed iterables in chunks. """
it = zip(*iterables)
@@ -152,6 +188,17 @@ def _process_chunk(fn, chunk):
"""
return [fn(*args) for args in chunk]
+
+def _sendback_result(result_queue, work_id, result=None, exception=None):
+ """Safely send back the given result or exception"""
+ try:
+ result_queue.put(_ResultItem(work_id, result=result,
+ exception=exception))
+ except BaseException as e:
+ exc = _ExceptionWithTraceback(e, e.__traceback__)
+ result_queue.put(_ResultItem(work_id, exception=exc))
+
+
def _process_worker(call_queue, result_queue, initializer, initargs):
"""Evaluates calls from call_queue and places the results in result_queue.
@@ -183,10 +230,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
- result_queue.put(_ResultItem(call_item.work_id, exception=exc))
+ _sendback_result(result_queue, call_item.work_id, exception=exc)
else:
- result_queue.put(_ResultItem(call_item.work_id,
- result=r))
+ _sendback_result(result_queue, call_item.work_id, result=r)
# Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
@@ -230,12 +276,14 @@ def _add_call_item_to_queue(pending_work_items,
del pending_work_items[work_id]
continue
+
def _queue_management_worker(executor_reference,
processes,
pending_work_items,
work_ids_queue,
call_queue,
- result_queue):
+ result_queue,
+ thread_wakeup):
"""Manages the communication between this process and the worker processes.
This function is run in a local thread.
@@ -253,6 +301,9 @@ def _queue_management_worker(executor_reference,
derived from _WorkItems for processing by the process workers.
result_queue: A ctx.SimpleQueue of _ResultItems generated by the
process workers.
+ thread_wakeup: A _ThreadWakeup to allow waking up the
+ queue_manager_thread from the main Thread and avoid deadlocks
+ caused by permanently locked queues.
"""
executor = None
@@ -261,10 +312,21 @@ def _queue_management_worker(executor_reference,
or executor._shutdown_thread)
def shutdown_worker():
- # This is an upper bound
- nb_children_alive = sum(p.is_alive() for p in processes.values())
- for i in range(0, nb_children_alive):
- call_queue.put_nowait(None)
+ # This is an upper bound on the number of children alive.
+ n_children_alive = sum(p.is_alive() for p in processes.values())
+ n_children_to_stop = n_children_alive
+ n_sentinels_sent = 0
+ # Send the right number of sentinels, to make sure all children are
+ # properly terminated.
+ while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
+ for i in range(n_children_to_stop - n_sentinels_sent):
+ try:
+ call_queue.put_nowait(None)
+ n_sentinels_sent += 1
+ except Full:
+ break
+ n_children_alive = sum(p.is_alive() for p in processes.values())
+
# Release the queue's resources as soon as possible.
call_queue.close()
# If .join() is not called on the created processes then
@@ -272,19 +334,37 @@ def _queue_management_worker(executor_reference,
for p in processes.values():
p.join()
- reader = result_queue._reader
+ result_reader = result_queue._reader
+ wakeup_reader = thread_wakeup._reader
+ readers = [result_reader, wakeup_reader]
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
- sentinels = [p.sentinel for p in processes.values()]
- assert sentinels
- ready = wait([reader] + sentinels)
- if reader in ready:
- result_item = reader.recv()
- else:
+ # Wait for a result to be ready in the result_queue while checking
+ # that all worker processes are still running, or for a wake up
+ # signal send. The wake up signals come either from new tasks being
+ # submitted, from the executor being shutdown/gc-ed, or from the
+ # shutdown of the python interpreter.
+ worker_sentinels = [p.sentinel for p in processes.values()]
+ ready = wait(readers + worker_sentinels)
+
+ cause = None
+ is_broken = True
+ if result_reader in ready:
+ try:
+ result_item = result_reader.recv()
+ is_broken = False
+ except BaseException as e:
+ cause = traceback.format_exception(type(e), e, e.__traceback__)
+
+ elif wakeup_reader in ready:
+ is_broken = False
+ result_item = None
+ thread_wakeup.clear()
+ if is_broken:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
@@ -293,14 +373,15 @@ def _queue_management_worker(executor_reference,
'usable anymore')
executor._shutdown_thread = True
executor = None
+ bpe = BrokenProcessPool("A process in the process pool was "
+ "terminated abruptly while the future was "
+ "running or pending.")
+ if cause is not None:
+ bpe.__cause__ = _RemoteTraceback(
+ f"\n'''\n{''.join(cause)}'''")
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
- work_item.future.set_exception(
- BrokenProcessPool(
- "A process in the process pool was "
- "terminated abruptly while the future was "
- "running or pending."
- ))
+ work_item.future.set_exception(bpe)
# Delete references to object. See issue16284
del work_item
pending_work_items.clear()
@@ -329,6 +410,9 @@ def _queue_management_worker(executor_reference,
work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284
del work_item
+ # Delete reference to result_item
+ del result_item
+
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
@@ -348,8 +432,11 @@ def _queue_management_worker(executor_reference,
pass
executor = None
+
_system_limits_checked = False
_system_limited = None
+
+
def _check_system_limits():
global _system_limits_checked, _system_limited
if _system_limits_checked:
@@ -369,7 +456,8 @@ def _check_system_limits():
# minimum number of semaphores available
# according to POSIX
return
- _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
+ _system_limited = ("system provides too few semaphores (%d"
+ " available, 256 necessary)" % nsems_max)
raise NotImplementedError(_system_limited)
@@ -415,6 +503,7 @@ class ProcessPoolExecutor(_base.Executor):
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
+
if mp_context is None:
mp_context = mp.get_context()
self._mp_context = mp_context
@@ -424,34 +513,52 @@ class ProcessPoolExecutor(_base.Executor):
self._initializer = initializer
self._initargs = initargs
+ # Management thread
+ self._queue_management_thread = None
+
+ # Map of pids to processes
+ self._processes = {}
+
+ # Shutdown is a two-step process.
+ self._shutdown_thread = False
+ self._shutdown_lock = threading.Lock()
+ self._broken = False
+ self._queue_count = 0
+ self._pending_work_items = {}
+
+ # Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
- self._call_queue = mp_context.Queue(queue_size)
+ self._call_queue = _SafeQueue(
+ max_size=queue_size, ctx=self._mp_context,
+ pending_work_items=self._pending_work_items)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
- self._queue_management_thread = None
- # Map of pids to processes
- self._processes = {}
- # Shutdown is a two-step process.
- self._shutdown_thread = False
- self._shutdown_lock = threading.Lock()
- self._broken = False
- self._queue_count = 0
- self._pending_work_items = {}
+ # _ThreadWakeup is a communication channel used to interrupt the wait
+ # of the main loop of queue_manager_thread from another thread (e.g.
+ # when calling executor.submit or executor.shutdown). We do not use the
+ # _result_queue to send the wakeup signal to the queue_manager_thread
+ # as it could result in a deadlock if a worker process dies with the
+ # _result_queue write lock still acquired.
+ self._queue_management_thread_wakeup = _ThreadWakeup()
def _start_queue_management_thread(self):
- # When the executor gets lost, the weakref callback will wake up
- # the queue management thread.
- def weakref_cb(_, q=self._result_queue):
- q.put(None)
if self._queue_management_thread is None:
+ # When the executor gets garbarge collected, the weakref callback
+ # will wake up the queue management thread so that it can terminate
+ # if there is no pending work item.
+ def weakref_cb(_,
+ thread_wakeup=self._queue_management_thread_wakeup):
+ mp.util.debug('Executor collected: triggering callback for'
+ ' QueueManager wakeup')
+ thread_wakeup.wakeup()
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
@@ -461,10 +568,13 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items,
self._work_ids,
self._call_queue,
- self._result_queue))
+ self._result_queue,
+ self._queue_management_thread_wakeup),
+ name="QueueManagerThread")
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
- _threads_queues[self._queue_management_thread] = self._result_queue
+ _threads_wakeups[self._queue_management_thread] = \
+ self._queue_management_thread_wakeup
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
@@ -491,7 +601,7 @@ class ProcessPoolExecutor(_base.Executor):
self._work_ids.put(self._queue_count)
self._queue_count += 1
# Wake up queue management thread
- self._result_queue.put(None)
+ self._queue_management_thread_wakeup.wakeup()
self._start_queue_management_thread()
return f
@@ -531,7 +641,7 @@ class ProcessPoolExecutor(_base.Executor):
self._shutdown_thread = True
if self._queue_management_thread:
# Wake up queue management thread
- self._result_queue.put(None)
+ self._queue_management_thread_wakeup.wakeup()
if wait:
self._queue_management_thread.join()
# To reduce the risk of opening too many files, remove references to
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 328efbd..d66d37a 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -160,9 +160,10 @@ class Queue(object):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes,
- self._wlock, self._writer.close, self._ignore_epipe),
+ self._wlock, self._writer.close, self._ignore_epipe,
+ self._on_queue_feeder_error),
name='QueueFeederThread'
- )
+ )
self._thread.daemon = True
debug('doing self._thread.start()')
@@ -201,7 +202,8 @@ class Queue(object):
notempty.notify()
@staticmethod
- def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
+ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
+ onerror):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
@@ -253,8 +255,17 @@ class Queue(object):
info('error in queue thread: %s', e)
return
else:
- import traceback
- traceback.print_exc()
+ onerror(e, obj)
+
+ @staticmethod
+ def _on_queue_feeder_error(e, obj):
+ """
+ Private API hook called when feeding data in the background thread
+ raises an exception. For overriding by concurrent.futures.
+ """
+ import traceback
+ traceback.print_exc()
+
_sentinel = object()
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 7575c5d..05166b9 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -1029,6 +1029,43 @@ class _TestQueue(BaseTestCase):
self.assertTrue(q.get(timeout=1.0))
close_queue(q)
+ def test_queue_feeder_on_queue_feeder_error(self):
+ # bpo-30006: verify feeder handles exceptions using the
+ # _on_queue_feeder_error hook.
+ if self.TYPE != 'processes':
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+ class NotSerializable(object):
+ """Mock unserializable object"""
+ def __init__(self):
+ self.reduce_was_called = False
+ self.on_queue_feeder_error_was_called = False
+
+ def __reduce__(self):
+ self.reduce_was_called = True
+ raise AttributeError
+
+ class SafeQueue(multiprocessing.queues.Queue):
+ """Queue with overloaded _on_queue_feeder_error hook"""
+ @staticmethod
+ def _on_queue_feeder_error(e, obj):
+ if (isinstance(e, AttributeError) and
+ isinstance(obj, NotSerializable)):
+ obj.on_queue_feeder_error_was_called = True
+
+ not_serializable_obj = NotSerializable()
+ # The captured_stderr reduces the noise in the test report
+ with test.support.captured_stderr():
+ q = SafeQueue(ctx=multiprocessing.get_context())
+ q.put(not_serializable_obj)
+
+ # Verify that q is still functionning correctly
+ q.put(True)
+ self.assertTrue(q.get(timeout=1.0))
+
+ # Assert that the serialization and the hook have been called correctly
+ self.assertTrue(not_serializable_obj.reduce_was_called)
+ self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
#
#
#
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 7687899..675cd7a 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -18,6 +18,7 @@ import threading
import time
import unittest
import weakref
+from pickle import PicklingError
from concurrent import futures
from concurrent.futures._base import (
@@ -394,16 +395,17 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
queue_management_thread = executor._queue_management_thread
processes = executor._processes
call_queue = executor._call_queue
+ queue_management_thread = executor._queue_management_thread
del executor
+ # Make sure that all the executor ressources were properly cleaned by
+ # the shutdown process
queue_management_thread.join()
for p in processes.values():
p.join()
- call_queue.close()
call_queue.join_thread()
-
create_executor_tests(ProcessPoolShutdownTest,
executor_mixins=(ProcessPoolForkMixin,
ProcessPoolForkserverMixin,
@@ -784,6 +786,172 @@ create_executor_tests(ProcessPoolExecutorTest,
ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin))
+def hide_process_stderr():
+ import io
+ sys.stderr = io.StringIO()
+
+
+def _crash(delay=None):
+ """Induces a segfault."""
+ if delay:
+ time.sleep(delay)
+ import faulthandler
+ faulthandler.disable()
+ faulthandler._sigsegv()
+
+
+def _exit():
+ """Induces a sys exit with exitcode 1."""
+ sys.exit(1)
+
+
+def _raise_error(Err):
+ """Function that raises an Exception in process."""
+ hide_process_stderr()
+ raise Err()
+
+
+def _return_instance(cls):
+ """Function that returns a instance of cls."""
+ hide_process_stderr()
+ return cls()
+
+
+class CrashAtPickle(object):
+ """Bad object that triggers a segfault at pickling time."""
+ def __reduce__(self):
+ _crash()
+
+
+class CrashAtUnpickle(object):
+ """Bad object that triggers a segfault at unpickling time."""
+ def __reduce__(self):
+ return _crash, ()
+
+
+class ExitAtPickle(object):
+ """Bad object that triggers a process exit at pickling time."""
+ def __reduce__(self):
+ _exit()
+
+
+class ExitAtUnpickle(object):
+ """Bad object that triggers a process exit at unpickling time."""
+ def __reduce__(self):
+ return _exit, ()
+
+
+class ErrorAtPickle(object):
+ """Bad object that triggers an error at pickling time."""
+ def __reduce__(self):
+ from pickle import PicklingError
+ raise PicklingError("Error in pickle")
+
+
+class ErrorAtUnpickle(object):
+ """Bad object that triggers an error at unpickling time."""
+ def __reduce__(self):
+ from pickle import UnpicklingError
+ return _raise_error, (UnpicklingError, )
+
+
+class ExecutorDeadlockTest:
+ TIMEOUT = 15
+
+ @classmethod
+ def _sleep_id(cls, x, delay):
+ time.sleep(delay)
+ return x
+
+ def _fail_on_deadlock(self, executor):
+ # If we did not recover before TIMEOUT seconds, consider that the
+ # executor is in a deadlock state and forcefully clean all its
+ # composants.
+ import faulthandler
+ from tempfile import TemporaryFile
+ with TemporaryFile(mode="w+") as f:
+ faulthandler.dump_traceback(file=f)
+ f.seek(0)
+ tb = f.read()
+ for p in executor._processes.values():
+ p.terminate()
+ # This should be safe to call executor.shutdown here as all possible
+ # deadlocks should have been broken.
+ executor.shutdown(wait=True)
+ print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
+ self.fail(f"Executor deadlock:\n\n{tb}")
+
+
+ def test_crash(self):
+ # extensive testing for deadlock caused by crashes in a pool.
+ self.executor.shutdown(wait=True)
+ crash_cases = [
+ # Check problem occuring while pickling a task in
+ # the task_handler thread
+ (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"),
+ # Check problem occuring while unpickling a task on workers
+ (id, (ExitAtUnpickle(),), BrokenProcessPool,
+ "exit at task unpickle"),
+ (id, (ErrorAtUnpickle(),), BrokenProcessPool,
+ "error at task unpickle"),
+ (id, (CrashAtUnpickle(),), BrokenProcessPool,
+ "crash at task unpickle"),
+ # Check problem occuring during func execution on workers
+ (_crash, (), BrokenProcessPool,
+ "crash during func execution on worker"),
+ (_exit, (), SystemExit,
+ "exit during func execution on worker"),
+ (_raise_error, (RuntimeError, ), RuntimeError,
+ "error during func execution on worker"),
+ # Check problem occuring while pickling a task result
+ # on workers
+ (_return_instance, (CrashAtPickle,), BrokenProcessPool,
+ "crash during result pickle on worker"),
+ (_return_instance, (ExitAtPickle,), SystemExit,
+ "exit during result pickle on worker"),
+ (_return_instance, (ErrorAtPickle,), PicklingError,
+ "error during result pickle on worker"),
+ # Check problem occuring while unpickling a task in
+ # the result_handler thread
+ (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool,
+ "error during result unpickle in result_handler"),
+ (_return_instance, (ExitAtUnpickle,), BrokenProcessPool,
+ "exit during result unpickle in result_handler")
+ ]
+ for func, args, error, name in crash_cases:
+ with self.subTest(name):
+ # The captured_stderr reduces the noise in the test report
+ with test.support.captured_stderr():
+ executor = self.executor_type(
+ max_workers=2, mp_context=get_context(self.ctx))
+ res = executor.submit(func, *args)
+ with self.assertRaises(error):
+ try:
+ res.result(timeout=self.TIMEOUT)
+ except futures.TimeoutError:
+ # If we did not recover before TIMEOUT seconds,
+ # consider that the executor is in a deadlock state
+ self._fail_on_deadlock(executor)
+ executor.shutdown(wait=True)
+
+ def test_shutdown_deadlock(self):
+ # Test that the pool calling shutdown do not cause deadlock
+ # if a worker fails after the shutdown call.
+ self.executor.shutdown(wait=True)
+ with self.executor_type(max_workers=2,
+ mp_context=get_context(self.ctx)) as executor:
+ self.executor = executor # Allow clean up in fail_on_deadlock
+ f = executor.submit(_crash, delay=.1)
+ executor.shutdown(wait=True)
+ with self.assertRaises(BrokenProcessPool):
+ f.result()
+
+
+create_executor_tests(ExecutorDeadlockTest,
+ executor_mixins=(ProcessPoolForkMixin,
+ ProcessPoolForkserverMixin,
+ ProcessPoolSpawnMixin))
+
class FutureTests(BaseTestCase):
def test_done_callback_with_result(self):
diff --git a/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst b/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst
new file mode 100644
index 0000000..49cbbb3
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-10-05-11-06-32.bpo-31699.MF47Y6.rst
@@ -0,0 +1,4 @@
+Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when
+task arguments or results cause pickling or unpickling errors.
+This should make sure that calls to the :class:`ProcessPoolExecutor` API
+always eventually return.