summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_concurrent_futures.py
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2020-04-27 19:36:51 (GMT)
committerGitHub <noreply@github.com>2020-04-27 19:36:51 (GMT)
commit5d1f32d33ba24d0aa87235ae40207bb57778388b (patch)
treeab6fcf35029922b68c4354f3da6af53bdd5c756f /Lib/test/test_concurrent_futures.py
parent1a275013d1ecc2e3778d64fda86174b2f13d6969 (diff)
downloadcpython-5d1f32d33ba24d0aa87235ae40207bb57778388b.zip
cpython-5d1f32d33ba24d0aa87235ae40207bb57778388b.tar.gz
cpython-5d1f32d33ba24d0aa87235ae40207bb57778388b.tar.bz2
bpo-39995: Split test_concurrent_futures.test_crash() into sub-tests (GH-19739)
Now only test_error_during_result_unpickle_in_result_handler() captures and ignores sys.stderr in the test process. Tools like test.bisect_cmd don't support subTest() but only work with the granularity of one method. Remove unused ExecutorDeadlockTest._sleep_id() method.
Diffstat (limited to 'Lib/test/test_concurrent_futures.py')
-rw-r--r--Lib/test/test_concurrent_futures.py148
1 files changed, 85 insertions, 63 deletions
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index a8c5bb6..40597ff 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -1006,11 +1006,6 @@ create_executor_tests(ProcessPoolExecutorTest,
ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin))
-def hide_process_stderr():
- import io
- sys.stderr = io.StringIO()
-
-
def _crash(delay=None):
"""Induces a segfault."""
if delay:
@@ -1027,13 +1022,18 @@ def _exit():
def _raise_error(Err):
"""Function that raises an Exception in process."""
- hide_process_stderr()
+ raise Err()
+
+
+def _raise_error_ignore_stderr(Err):
+ """Function that raises an Exception in process and ignores stderr."""
+ import io
+ sys.stderr = io.StringIO()
raise Err()
def _return_instance(cls):
"""Function that returns a instance of cls."""
- hide_process_stderr()
return cls()
@@ -1072,17 +1072,12 @@ class ErrorAtUnpickle(object):
"""Bad object that triggers an error at unpickling time."""
def __reduce__(self):
from pickle import UnpicklingError
- return _raise_error, (UnpicklingError, )
+ return _raise_error_ignore_stderr, (UnpicklingError, )
class ExecutorDeadlockTest:
TIMEOUT = support.SHORT_TIMEOUT
- @classmethod
- def _sleep_id(cls, x, delay):
- time.sleep(delay)
- return x
-
def _fail_on_deadlock(self, executor):
# If we did not recover before TIMEOUT seconds, consider that the
# executor is in a deadlock state and forcefully clean all its
@@ -1102,57 +1097,84 @@ class ExecutorDeadlockTest:
self.fail(f"Executor deadlock:\n\n{tb}")
- def test_crash(self):
- # extensive testing for deadlock caused by crashes in a pool.
+ def _check_crash(self, error, func, *args, ignore_stderr=False):
+ # test for deadlock caused by crashes in a pool
self.executor.shutdown(wait=True)
- crash_cases = [
- # Check problem occurring while pickling a task in
- # the task_handler thread
- (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"),
- # Check problem occurring while unpickling a task on workers
- (id, (ExitAtUnpickle(),), BrokenProcessPool,
- "exit at task unpickle"),
- (id, (ErrorAtUnpickle(),), BrokenProcessPool,
- "error at task unpickle"),
- (id, (CrashAtUnpickle(),), BrokenProcessPool,
- "crash at task unpickle"),
- # Check problem occurring during func execution on workers
- (_crash, (), BrokenProcessPool,
- "crash during func execution on worker"),
- (_exit, (), SystemExit,
- "exit during func execution on worker"),
- (_raise_error, (RuntimeError, ), RuntimeError,
- "error during func execution on worker"),
- # Check problem occurring while pickling a task result
- # on workers
- (_return_instance, (CrashAtPickle,), BrokenProcessPool,
- "crash during result pickle on worker"),
- (_return_instance, (ExitAtPickle,), SystemExit,
- "exit during result pickle on worker"),
- (_return_instance, (ErrorAtPickle,), PicklingError,
- "error during result pickle on worker"),
- # Check problem occurring while unpickling a task in
- # the result_handler thread
- (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool,
- "error during result unpickle in result_handler"),
- (_return_instance, (ExitAtUnpickle,), BrokenProcessPool,
- "exit during result unpickle in result_handler")
- ]
- for func, args, error, name in crash_cases:
- with self.subTest(name):
- # The captured_stderr reduces the noise in the test report
- with support.captured_stderr():
- executor = self.executor_type(
- max_workers=2, mp_context=get_context(self.ctx))
- res = executor.submit(func, *args)
- with self.assertRaises(error):
- try:
- res.result(timeout=self.TIMEOUT)
- except futures.TimeoutError:
- # If we did not recover before TIMEOUT seconds,
- # consider that the executor is in a deadlock state
- self._fail_on_deadlock(executor)
- executor.shutdown(wait=True)
+
+ executor = self.executor_type(
+ max_workers=2, mp_context=get_context(self.ctx))
+ res = executor.submit(func, *args)
+
+ if ignore_stderr:
+ cm = support.captured_stderr()
+ else:
+ cm = contextlib.nullcontext()
+
+ try:
+ with self.assertRaises(error):
+ with cm:
+ res.result(timeout=self.TIMEOUT)
+ except futures.TimeoutError:
+ # If we did not recover before TIMEOUT seconds,
+ # consider that the executor is in a deadlock state
+ self._fail_on_deadlock(executor)
+ executor.shutdown(wait=True)
+
+ def test_error_at_task_pickle(self):
+ # Check problem occurring while pickling a task in
+ # the task_handler thread
+ self._check_crash(PicklingError, id, ErrorAtPickle())
+
+ def test_exit_at_task_unpickle(self):
+ # Check problem occurring while unpickling a task on workers
+ self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
+
+ def test_error_at_task_unpickle(self):
+ # Check problem occurring while unpickling a task on workers
+ self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
+
+ def test_crash_at_task_unpickle(self):
+ # Check problem occurring while unpickling a task on workers
+ self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
+
+ def test_crash_during_func_exec_on_worker(self):
+ # Check problem occurring during func execution on workers
+ self._check_crash(BrokenProcessPool, _crash)
+
+ def test_exit_during_func_exec_on_worker(self):
+ # Check problem occurring during func execution on workers
+ self._check_crash(SystemExit, _exit)
+
+ def test_error_during_func_exec_on_worker(self):
+ # Check problem occurring during func execution on workers
+ self._check_crash(RuntimeError, _raise_error, RuntimeError)
+
+ def test_crash_during_result_pickle_on_worker(self):
+ # Check problem occurring while pickling a task result
+ # on workers
+ self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
+
+ def test_exit_during_result_pickle_on_worker(self):
+ # Check problem occurring while pickling a task result
+ # on workers
+ self._check_crash(SystemExit, _return_instance, ExitAtPickle)
+
+ def test_error_during_result_pickle_on_worker(self):
+ # Check problem occurring while pickling a task result
+ # on workers
+ self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
+
+ def test_error_during_result_unpickle_in_result_handler(self):
+ # Check problem occurring while unpickling a task in
+ # the result_handler thread
+ self._check_crash(BrokenProcessPool,
+ _return_instance, ErrorAtUnpickle,
+ ignore_stderr=True)
+
+ def test_exit_during_result_unpickle_in_result_handler(self):
+ # Check problem occurring while unpickling a task in
+ # the result_handler thread
+ self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
def test_shutdown_deadlock(self):
# Test that the pool calling shutdown do not cause deadlock