summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_concurrent_futures/test_deadlock.py72
1 files changed, 71 insertions, 1 deletions
diff --git a/Lib/test/test_concurrent_futures/test_deadlock.py b/Lib/test/test_concurrent_futures/test_deadlock.py
index 6b78b36..6be1677 100644
--- a/Lib/test/test_concurrent_futures/test_deadlock.py
+++ b/Lib/test/test_concurrent_futures/test_deadlock.py
@@ -1,10 +1,13 @@
import contextlib
+import queue
+import signal
import sys
import time
import unittest
+import unittest.mock
from pickle import PicklingError
from concurrent import futures
-from concurrent.futures.process import BrokenProcessPool
+from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
from test import support
@@ -239,6 +242,73 @@ class ExecutorDeadlockTest:
with self.assertRaises(BrokenProcessPool):
list(executor.map(_crash_with_data, [data] * 10))
+ def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
+ # Issue #105829: The _ExecutorManagerThread wakeup pipe could
+ # fill up and block. See: https://github.com/python/cpython/issues/105829
+
+ # Lots of cargo culting while writing this test, apologies if
+ # something is really stupid...
+
+ self.executor.shutdown(wait=True)
+
+ if not hasattr(signal, 'alarm'):
+ raise unittest.SkipTest(
+ "Tested platform does not support the alarm signal")
+
+ def timeout(_signum, _frame):
+ import faulthandler
+ faulthandler.dump_traceback()
+
+ raise RuntimeError("timed out while submitting jobs?")
+
+ thread_run = futures.process._ExecutorManagerThread.run
+ def mock_run(self):
+ # Delay thread startup so the wakeup pipe can fill up and block
+ time.sleep(3)
+ thread_run(self)
+
+ class MockWakeup(_ThreadWakeup):
+ """Mock wakeup object to force the wakeup to block"""
+ def __init__(self):
+ super().__init__()
+ self._dummy_queue = queue.Queue(maxsize=1)
+
+ def wakeup(self):
+ self._dummy_queue.put(None, block=True)
+ super().wakeup()
+
+ def clear(self):
+ try:
+ while True:
+ self._dummy_queue.get_nowait()
+ except queue.Empty:
+ super().clear()
+
+ with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
+ 'run', mock_run),
+ unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
+ MockWakeup)):
+ with self.executor_type(max_workers=2,
+ mp_context=self.get_context()) as executor:
+ self.executor = executor # Allow clean up in fail_on_deadlock
+
+ job_num = 100
+ job_data = range(job_num)
+
+ # Need to use sigalarm for timeout detection because
+ # Executor.submit is not guarded by any timeout (both
+ # self._work_ids.put(self._queue_count) and
+ # self._executor_manager_thread_wakeup.wakeup() might
+ # timeout, maybe more?). In this specific case it was
+ # the wakeup call that deadlocked on a blocking pipe.
+ old_handler = signal.signal(signal.SIGALRM, timeout)
+ try:
+ signal.alarm(int(self.TIMEOUT))
+ self.assertEqual(job_num, len(list(executor.map(int, job_data))))
+ finally:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, old_handler)
+
create_executor_tests(globals(), ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,