diff options
author | elfstrom <elfstrom@users.noreply.github.com> | 2023-10-02 15:24:19 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-02 15:24:19 (GMT) |
commit | c2cadb0ec2d192c78d9e01c3c0c1cae12ea57392 (patch) | |
tree | 26ef8d8315d7d24bd62705158a7c8da5b8bbfe6f | |
parent | 5e6e99646e46d485e018429f6000661609e4f1b5 (diff) | |
download | cpython-c2cadb0ec2d192c78d9e01c3c0c1cae12ea57392.zip cpython-c2cadb0ec2d192c78d9e01c3c0c1cae12ea57392.tar.gz cpython-c2cadb0ec2d192c78d9e01c3c0c1cae12ea57392.tar.bz2 |
[3.12] gh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock (GH-108513) (#109784)
This fixes issue GH-105829, https://github.com/python/cpython/issues/105829
(cherry picked from commit 405b06375a8a4cdb08ff53afade09a8b66ec23d5)
-rw-r--r-- | Lib/concurrent/futures/process.py | 18 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures/test_deadlock.py | 72 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst | 1 |
3 files changed, 87 insertions, 4 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index a8dab13..33df65a 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -71,6 +71,11 @@ class _ThreadWakeup: self._reader, self._writer = mp.Pipe(duplex=False) def close(self): + # Please note that we do not take the shutdown lock when + # calling clear() (to avoid deadlocking) so this method can + # only be called safely from the same thread as all calls to + # clear() even if you hold the shutdown lock. Otherwise we + # might try to read from the closed pipe. if not self._closed: self._closed = True self._writer.close() @@ -426,8 +431,12 @@ class _ExecutorManagerThread(threading.Thread): elif wakeup_reader in ready: is_broken = False - with self.shutdown_lock: - self.thread_wakeup.clear() + # No need to hold the _shutdown_lock here because: + # 1. we're the only thread to use the wakeup reader + # 2. we're also the only thread to call thread_wakeup.close() + # 3. we want to avoid a possible deadlock when both reader and writer + # would block (gh-105829) + self.thread_wakeup.clear() return result_item, is_broken, cause @@ -710,7 +719,10 @@ class ProcessPoolExecutor(_base.Executor): # as it could result in a deadlock if a worker process dies with the # _result_queue write lock still acquired. # - # _shutdown_lock must be locked to access _ThreadWakeup. + # _shutdown_lock must be locked to access _ThreadWakeup.close() and + # .wakeup(). Care must also be taken to not call clear or close from + # more than one thread since _ThreadWakeup.clear() is not protected by + # the _shutdown_lock self._executor_manager_thread_wakeup = _ThreadWakeup() # Create communication channels for the executor diff --git a/Lib/test/test_concurrent_futures/test_deadlock.py b/Lib/test/test_concurrent_futures/test_deadlock.py index 6b78b36..6be1677 100644 --- a/Lib/test/test_concurrent_futures/test_deadlock.py +++ b/Lib/test/test_concurrent_futures/test_deadlock.py @@ -1,10 +1,13 @@ import contextlib +import queue +import signal import sys import time import unittest +import unittest.mock from pickle import PicklingError from concurrent import futures -from concurrent.futures.process import BrokenProcessPool +from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup from test import support @@ -239,6 +242,73 @@ class ExecutorDeadlockTest: with self.assertRaises(BrokenProcessPool): list(executor.map(_crash_with_data, [data] * 10)) + def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self): + # Issue #105829: The _ExecutorManagerThread wakeup pipe could + # fill up and block. See: https://github.com/python/cpython/issues/105829 + + # Lots of cargo culting while writing this test, apologies if + # something is really stupid... + + self.executor.shutdown(wait=True) + + if not hasattr(signal, 'alarm'): + raise unittest.SkipTest( + "Tested platform does not support the alarm signal") + + def timeout(_signum, _frame): + import faulthandler + faulthandler.dump_traceback() + + raise RuntimeError("timed out while submitting jobs?") + + thread_run = futures.process._ExecutorManagerThread.run + def mock_run(self): + # Delay thread startup so the wakeup pipe can fill up and block + time.sleep(3) + thread_run(self) + + class MockWakeup(_ThreadWakeup): + """Mock wakeup object to force the wakeup to block""" + def __init__(self): + super().__init__() + self._dummy_queue = queue.Queue(maxsize=1) + + def wakeup(self): + self._dummy_queue.put(None, block=True) + super().wakeup() + + def clear(self): + try: + while True: + self._dummy_queue.get_nowait() + except queue.Empty: + super().clear() + + with (unittest.mock.patch.object(futures.process._ExecutorManagerThread, + 'run', mock_run), + unittest.mock.patch('concurrent.futures.process._ThreadWakeup', + MockWakeup)): + with self.executor_type(max_workers=2, + mp_context=self.get_context()) as executor: + self.executor = executor # Allow clean up in fail_on_deadlock + + job_num = 100 + job_data = range(job_num) + + # Need to use sigalarm for timeout detection because + # Executor.submit is not guarded by any timeout (both + # self._work_ids.put(self._queue_count) and + # self._executor_manager_thread_wakeup.wakeup() might + # timeout, maybe more?). In this specific case it was + # the wakeup call that deadlocked on a blocking pipe. + old_handler = signal.signal(signal.SIGALRM, timeout) + try: + signal.alarm(int(self.TIMEOUT)) + self.assertEqual(job_num, len(list(executor.map(int, job_data)))) + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, old_handler) + create_executor_tests(globals(), ExecutorDeadlockTest, executor_mixins=(ProcessPoolForkMixin, diff --git a/Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst b/Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst new file mode 100644 index 0000000..eaa2a5a --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst @@ -0,0 +1 @@ +Fix concurrent.futures.ProcessPoolExecutor deadlock |