diff options
author | Thomas Moreau <thomas.moreau.2010@gmail.com> | 2018-01-05 10:15:54 (GMT) |
---|---|---|
committer | Antoine Pitrou <pitrou@free.fr> | 2018-01-05 10:15:54 (GMT) |
commit | 94459fd7dc25ce19096f2080eb7339497d319eb0 (patch) | |
tree | 7623769fafc2025884ac9a8b1a41e2f0ba5f13db /Lib/test/test_concurrent_futures.py | |
parent | 65f2a6dcc2bc28a8566b74c8e9273f982331ec48 (diff) | |
download | cpython-94459fd7dc25ce19096f2080eb7339497d319eb0.zip cpython-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.gz cpython-94459fd7dc25ce19096f2080eb7339497d319eb0.tar.bz2 |
bpo-31699 Deadlocks in `concurrent.futures.ProcessPoolExecutor` with pickling error (#3895)
Fix deadlocks in :class:`concurrent.futures.ProcessPoolExecutor` when task arguments or results cause pickling or unpickling errors.
This should make sure that calls to the :class:`ProcessPoolExecutor` API always eventually return.
Diffstat (limited to 'Lib/test/test_concurrent_futures.py')
-rw-r--r-- | Lib/test/test_concurrent_futures.py | 172 |
1 files changed, 170 insertions, 2 deletions
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 7687899..675cd7a 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -18,6 +18,7 @@ import threading import time import unittest import weakref +from pickle import PicklingError from concurrent import futures from concurrent.futures._base import ( @@ -394,16 +395,17 @@ class ProcessPoolShutdownTest(ExecutorShutdownTest): queue_management_thread = executor._queue_management_thread processes = executor._processes call_queue = executor._call_queue + queue_management_thread = executor._queue_management_thread del executor + # Make sure that all the executor ressources were properly cleaned by + # the shutdown process queue_management_thread.join() for p in processes.values(): p.join() - call_queue.close() call_queue.join_thread() - create_executor_tests(ProcessPoolShutdownTest, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, @@ -784,6 +786,172 @@ create_executor_tests(ProcessPoolExecutorTest, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)) +def hide_process_stderr(): + import io + sys.stderr = io.StringIO() + + +def _crash(delay=None): + """Induces a segfault.""" + if delay: + time.sleep(delay) + import faulthandler + faulthandler.disable() + faulthandler._sigsegv() + + +def _exit(): + """Induces a sys exit with exitcode 1.""" + sys.exit(1) + + +def _raise_error(Err): + """Function that raises an Exception in process.""" + hide_process_stderr() + raise Err() + + +def _return_instance(cls): + """Function that returns a instance of cls.""" + hide_process_stderr() + return cls() + + +class CrashAtPickle(object): + """Bad object that triggers a segfault at pickling time.""" + def __reduce__(self): + _crash() + + +class CrashAtUnpickle(object): + """Bad object that triggers a segfault at unpickling time.""" + def __reduce__(self): + return _crash, () + + +class ExitAtPickle(object): + """Bad object that triggers a process exit at pickling time.""" + def __reduce__(self): + _exit() + + +class ExitAtUnpickle(object): + """Bad object that triggers a process exit at unpickling time.""" + def __reduce__(self): + return _exit, () + + +class ErrorAtPickle(object): + """Bad object that triggers an error at pickling time.""" + def __reduce__(self): + from pickle import PicklingError + raise PicklingError("Error in pickle") + + +class ErrorAtUnpickle(object): + """Bad object that triggers an error at unpickling time.""" + def __reduce__(self): + from pickle import UnpicklingError + return _raise_error, (UnpicklingError, ) + + +class ExecutorDeadlockTest: + TIMEOUT = 15 + + @classmethod + def _sleep_id(cls, x, delay): + time.sleep(delay) + return x + + def _fail_on_deadlock(self, executor): + # If we did not recover before TIMEOUT seconds, consider that the + # executor is in a deadlock state and forcefully clean all its + # composants. + import faulthandler + from tempfile import TemporaryFile + with TemporaryFile(mode="w+") as f: + faulthandler.dump_traceback(file=f) + f.seek(0) + tb = f.read() + for p in executor._processes.values(): + p.terminate() + # This should be safe to call executor.shutdown here as all possible + # deadlocks should have been broken. + executor.shutdown(wait=True) + print(f"\nTraceback:\n {tb}", file=sys.__stderr__) + self.fail(f"Executor deadlock:\n\n{tb}") + + + def test_crash(self): + # extensive testing for deadlock caused by crashes in a pool. + self.executor.shutdown(wait=True) + crash_cases = [ + # Check problem occuring while pickling a task in + # the task_handler thread + (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"), + # Check problem occuring while unpickling a task on workers + (id, (ExitAtUnpickle(),), BrokenProcessPool, + "exit at task unpickle"), + (id, (ErrorAtUnpickle(),), BrokenProcessPool, + "error at task unpickle"), + (id, (CrashAtUnpickle(),), BrokenProcessPool, + "crash at task unpickle"), + # Check problem occuring during func execution on workers + (_crash, (), BrokenProcessPool, + "crash during func execution on worker"), + (_exit, (), SystemExit, + "exit during func execution on worker"), + (_raise_error, (RuntimeError, ), RuntimeError, + "error during func execution on worker"), + # Check problem occuring while pickling a task result + # on workers + (_return_instance, (CrashAtPickle,), BrokenProcessPool, + "crash during result pickle on worker"), + (_return_instance, (ExitAtPickle,), SystemExit, + "exit during result pickle on worker"), + (_return_instance, (ErrorAtPickle,), PicklingError, + "error during result pickle on worker"), + # Check problem occuring while unpickling a task in + # the result_handler thread + (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool, + "error during result unpickle in result_handler"), + (_return_instance, (ExitAtUnpickle,), BrokenProcessPool, + "exit during result unpickle in result_handler") + ] + for func, args, error, name in crash_cases: + with self.subTest(name): + # The captured_stderr reduces the noise in the test report + with test.support.captured_stderr(): + executor = self.executor_type( + max_workers=2, mp_context=get_context(self.ctx)) + res = executor.submit(func, *args) + with self.assertRaises(error): + try: + res.result(timeout=self.TIMEOUT) + except futures.TimeoutError: + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + self._fail_on_deadlock(executor) + executor.shutdown(wait=True) + + def test_shutdown_deadlock(self): + # Test that the pool calling shutdown do not cause deadlock + # if a worker fails after the shutdown call. + self.executor.shutdown(wait=True) + with self.executor_type(max_workers=2, + mp_context=get_context(self.ctx)) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock + f = executor.submit(_crash, delay=.1) + executor.shutdown(wait=True) + with self.assertRaises(BrokenProcessPool): + f.result() + + +create_executor_tests(ExecutorDeadlockTest, + executor_mixins=(ProcessPoolForkMixin, + ProcessPoolForkserverMixin, + ProcessPoolSpawnMixin)) + class FutureTests(BaseTestCase): def test_done_callback_with_result(self): |