summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_concurrent_futures.py
diff options
context:
space:
mode:
authorThomas Moreau <thomas.moreau.2010@gmail.com>2018-01-05 10:15:54 (GMT)
committerAntoine Pitrou <pitrou@free.fr>2018-01-05 10:15:54 (GMT)
commit94459fd7dc25ce19096f2080eb7339497d319eb0 (patch)
tree7623769fafc2025884ac9a8b1a41e2f0ba5f13db /Lib/test/test_concurrent_futures.py
parent65f2a6dcc2bc28a8566b74c8e9273f982331ec48 (diff)
downloadcpython-94459fd7dc25ce19096f2080eb7339497d319eb0.zip
cpython-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.gz
cpython-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.bz2
bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error (#3895)
Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when task arguments or results cause pickling or unpickling errors. This should make sure that calls to the :class:`ProcessPoolExecutor` API always eventually return.
Diffstat (limited to 'Lib/test/test_concurrent_futures.py')
-rw-r--r--Lib/test/test_concurrent_futures.py172
1 files changed, 170 insertions, 2 deletions
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 7687899..675cd7a 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -18,6 +18,7 @@ import threading
import time
import unittest
import weakref
+from pickle import PicklingError
from concurrent import futures
from concurrent.futures._base import (
@@ -394,16 +395,17 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest):
queue_management_thread = executor._queue_management_thread
processes = executor._processes
call_queue = executor._call_queue
+ queue_management_thread = executor._queue_management_thread
del executor
+ # Make sure that all the executor ressources were properly cleaned by
+ # the shutdown process
queue_management_thread.join()
for p in processes.values():
p.join()
- call_queue.close()
call_queue.join_thread()
-
create_executor_tests(ProcessPoolShutdownTest,
executor_mixins=(ProcessPoolForkMixin,
ProcessPoolForkserverMixin,
@@ -784,6 +786,172 @@ create_executor_tests(ProcessPoolExecutorTest,
ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin))
+def hide_process_stderr():
+ import io
+ sys.stderr = io.StringIO()
+
+
+def _crash(delay=None):
+ """Induces a segfault."""
+ if delay:
+ time.sleep(delay)
+ import faulthandler
+ faulthandler.disable()
+ faulthandler._sigsegv()
+
+
+def _exit():
+ """Induces a sys exit with exitcode 1."""
+ sys.exit(1)
+
+
+def _raise_error(Err):
+ """Function that raises an Exception in process."""
+ hide_process_stderr()
+ raise Err()
+
+
+def _return_instance(cls):
+ """Function that returns a instance of cls."""
+ hide_process_stderr()
+ return cls()
+
+
+class CrashAtPickle(object):
+ """Bad object that triggers a segfault at pickling time."""
+ def __reduce__(self):
+ _crash()
+
+
+class CrashAtUnpickle(object):
+ """Bad object that triggers a segfault at unpickling time."""
+ def __reduce__(self):
+ return _crash, ()
+
+
+class ExitAtPickle(object):
+ """Bad object that triggers a process exit at pickling time."""
+ def __reduce__(self):
+ _exit()
+
+
+class ExitAtUnpickle(object):
+ """Bad object that triggers a process exit at unpickling time."""
+ def __reduce__(self):
+ return _exit, ()
+
+
+class ErrorAtPickle(object):
+ """Bad object that triggers an error at pickling time."""
+ def __reduce__(self):
+ from pickle import PicklingError
+ raise PicklingError("Error in pickle")
+
+
+class ErrorAtUnpickle(object):
+ """Bad object that triggers an error at unpickling time."""
+ def __reduce__(self):
+ from pickle import UnpicklingError
+ return _raise_error, (UnpicklingError, )
+
+
+class ExecutorDeadlockTest:
+ TIMEOUT = 15
+
+ @classmethod
+ def _sleep_id(cls, x, delay):
+ time.sleep(delay)
+ return x
+
+ def _fail_on_deadlock(self, executor):
+ # If we did not recover before TIMEOUT seconds, consider that the
+ # executor is in a deadlock state and forcefully clean all its
+ # composants.
+ import faulthandler
+ from tempfile import TemporaryFile
+ with TemporaryFile(mode="w+") as f:
+ faulthandler.dump_traceback(file=f)
+ f.seek(0)
+ tb = f.read()
+ for p in executor._processes.values():
+ p.terminate()
+ # This should be safe to call executor.shutdown here as all possible
+ # deadlocks should have been broken.
+ executor.shutdown(wait=True)
+ print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
+ self.fail(f"Executor deadlock:\n\n{tb}")
+
+
+ def test_crash(self):
+ # extensive testing for deadlock caused by crashes in a pool.
+ self.executor.shutdown(wait=True)
+ crash_cases = [
+ # Check problem occuring while pickling a task in
+ # the task_handler thread
+ (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"),
+ # Check problem occuring while unpickling a task on workers
+ (id, (ExitAtUnpickle(),), BrokenProcessPool,
+ "exit at task unpickle"),
+ (id, (ErrorAtUnpickle(),), BrokenProcessPool,
+ "error at task unpickle"),
+ (id, (CrashAtUnpickle(),), BrokenProcessPool,
+ "crash at task unpickle"),
+ # Check problem occuring during func execution on workers
+ (_crash, (), BrokenProcessPool,
+ "crash during func execution on worker"),
+ (_exit, (), SystemExit,
+ "exit during func execution on worker"),
+ (_raise_error, (RuntimeError, ), RuntimeError,
+ "error during func execution on worker"),
+ # Check problem occuring while pickling a task result
+ # on workers
+ (_return_instance, (CrashAtPickle,), BrokenProcessPool,
+ "crash during result pickle on worker"),
+ (_return_instance, (ExitAtPickle,), SystemExit,
+ "exit during result pickle on worker"),
+ (_return_instance, (ErrorAtPickle,), PicklingError,
+ "error during result pickle on worker"),
+ # Check problem occuring while unpickling a task in
+ # the result_handler thread
+ (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool,
+ "error during result unpickle in result_handler"),
+ (_return_instance, (ExitAtUnpickle,), BrokenProcessPool,
+ "exit during result unpickle in result_handler")
+ ]
+ for func, args, error, name in crash_cases:
+ with self.subTest(name):
+ # The captured_stderr reduces the noise in the test report
+ with test.support.captured_stderr():
+ executor = self.executor_type(
+ max_workers=2, mp_context=get_context(self.ctx))
+ res = executor.submit(func, *args)
+ with self.assertRaises(error):
+ try:
+ res.result(timeout=self.TIMEOUT)
+ except futures.TimeoutError:
+ # If we did not recover before TIMEOUT seconds,
+ # consider that the executor is in a deadlock state
+ self._fail_on_deadlock(executor)
+ executor.shutdown(wait=True)
+
+ def test_shutdown_deadlock(self):
+ # Test that the pool calling shutdown do not cause deadlock
+ # if a worker fails after the shutdown call.
+ self.executor.shutdown(wait=True)
+ with self.executor_type(max_workers=2,
+ mp_context=get_context(self.ctx)) as executor:
+ self.executor = executor # Allow clean up in fail_on_deadlock
+ f = executor.submit(_crash, delay=.1)
+ executor.shutdown(wait=True)
+ with self.assertRaises(BrokenProcessPool):
+ f.result()
+
+
+create_executor_tests(ExecutorDeadlockTest,
+ executor_mixins=(ProcessPoolForkMixin,
+ ProcessPoolForkserverMixin,
+ ProcessPoolSpawnMixin))
+
class FutureTests(BaseTestCase):
def test_done_callback_with_result(self):