summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorelfstrom <elfstrom@users.noreply.github.com>2023-10-02 15:24:19 (GMT)
committerGitHub <noreply@github.com>2023-10-02 15:24:19 (GMT)
commitc2cadb0ec2d192c78d9e01c3c0c1cae12ea57392 (patch)
tree26ef8d8315d7d24bd62705158a7c8da5b8bbfe6f
parent5e6e99646e46d485e018429f6000661609e4f1b5 (diff)
downloadcpython-c2cadb0ec2d192c78d9e01c3c0c1cae12ea57392.zip
cpython-c2cadb0ec2d192c78d9e01c3c0c1cae12ea57392.tar.gz
cpython-c2cadb0ec2d192c78d9e01c3c0c1cae12ea57392.tar.bz2
[3.12] gh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock (GH-108513) (#109784)
This fixes issue GH-105829, https://github.com/python/cpython/issues/105829 (cherry picked from commit 405b06375a8a4cdb08ff53afade09a8b66ec23d5)
-rw-r--r--Lib/concurrent/futures/process.py18
-rw-r--r--Lib/test/test_concurrent_futures/test_deadlock.py72
-rw-r--r--Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst1
3 files changed, 87 insertions, 4 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index a8dab13..33df65a 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -71,6 +71,11 @@ class _ThreadWakeup:
self._reader, self._writer = mp.Pipe(duplex=False)
def close(self):
+ # Please note that we do not take the shutdown lock when
+ # calling clear() (to avoid deadlocking) so this method can
+ # only be called safely from the same thread as all calls to
+ # clear() even if you hold the shutdown lock. Otherwise we
+ # might try to read from the closed pipe.
if not self._closed:
self._closed = True
self._writer.close()
@@ -426,8 +431,12 @@ class _ExecutorManagerThread(threading.Thread):
elif wakeup_reader in ready:
is_broken = False
- with self.shutdown_lock:
- self.thread_wakeup.clear()
+ # No need to hold the _shutdown_lock here because:
+ # 1. we're the only thread to use the wakeup reader
+ # 2. we're also the only thread to call thread_wakeup.close()
+ # 3. we want to avoid a possible deadlock when both reader and writer
+ # would block (gh-105829)
+ self.thread_wakeup.clear()
return result_item, is_broken, cause
@@ -710,7 +719,10 @@ class ProcessPoolExecutor(_base.Executor):
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
#
- # _shutdown_lock must be locked to access _ThreadWakeup.
+ # _shutdown_lock must be locked to access _ThreadWakeup.close() and
+ # .wakeup(). Care must also be taken to not call clear or close from
+ # more than one thread since _ThreadWakeup.clear() is not protected by
+ # the _shutdown_lock
self._executor_manager_thread_wakeup = _ThreadWakeup()
# Create communication channels for the executor
diff --git a/Lib/test/test_concurrent_futures/test_deadlock.py b/Lib/test/test_concurrent_futures/test_deadlock.py
index 6b78b36..6be1677 100644
--- a/Lib/test/test_concurrent_futures/test_deadlock.py
+++ b/Lib/test/test_concurrent_futures/test_deadlock.py
@@ -1,10 +1,13 @@
import contextlib
+import queue
+import signal
import sys
import time
import unittest
+import unittest.mock
from pickle import PicklingError
from concurrent import futures
-from concurrent.futures.process import BrokenProcessPool
+from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
from test import support
@@ -239,6 +242,73 @@ class ExecutorDeadlockTest:
with self.assertRaises(BrokenProcessPool):
list(executor.map(_crash_with_data, [data] * 10))
+ def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
+ # Issue #105829: The _ExecutorManagerThread wakeup pipe could
+ # fill up and block. See: https://github.com/python/cpython/issues/105829
+
+ # Lots of cargo culting while writing this test, apologies if
+ # something is really stupid...
+
+ self.executor.shutdown(wait=True)
+
+ if not hasattr(signal, 'alarm'):
+ raise unittest.SkipTest(
+ "Tested platform does not support the alarm signal")
+
+ def timeout(_signum, _frame):
+ import faulthandler
+ faulthandler.dump_traceback()
+
+ raise RuntimeError("timed out while submitting jobs?")
+
+ thread_run = futures.process._ExecutorManagerThread.run
+ def mock_run(self):
+ # Delay thread startup so the wakeup pipe can fill up and block
+ time.sleep(3)
+ thread_run(self)
+
+ class MockWakeup(_ThreadWakeup):
+ """Mock wakeup object to force the wakeup to block"""
+ def __init__(self):
+ super().__init__()
+ self._dummy_queue = queue.Queue(maxsize=1)
+
+ def wakeup(self):
+ self._dummy_queue.put(None, block=True)
+ super().wakeup()
+
+ def clear(self):
+ try:
+ while True:
+ self._dummy_queue.get_nowait()
+ except queue.Empty:
+ super().clear()
+
+ with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
+ 'run', mock_run),
+ unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
+ MockWakeup)):
+ with self.executor_type(max_workers=2,
+ mp_context=self.get_context()) as executor:
+ self.executor = executor # Allow clean up in fail_on_deadlock
+
+ job_num = 100
+ job_data = range(job_num)
+
+ # Need to use sigalarm for timeout detection because
+ # Executor.submit is not guarded by any timeout (both
+ # self._work_ids.put(self._queue_count) and
+ # self._executor_manager_thread_wakeup.wakeup() might
+ # timeout, maybe more?). In this specific case it was
+ # the wakeup call that deadlocked on a blocking pipe.
+ old_handler = signal.signal(signal.SIGALRM, timeout)
+ try:
+ signal.alarm(int(self.TIMEOUT))
+ self.assertEqual(job_num, len(list(executor.map(int, job_data))))
+ finally:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, old_handler)
+
create_executor_tests(globals(), ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,
diff --git a/Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst b/Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst
new file mode 100644
index 0000000..eaa2a5a
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2023-08-26-12-35-39.gh-issue-105829.kyYhWI.rst
@@ -0,0 +1 @@
+Fix concurrent.futures.ProcessPoolExecutor deadlock