summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-10-02 15:47:10 (GMT)
committerGitHub <noreply@github.com>2023-10-02 15:47:10 (GMT)
commit356de021d7dc02d4803627c0807c0950f7453754 (patch)
tree23123500d6b299c5aa722e2684533e910b46b748
parent41eb0c728653d2dfc0f9b4529557288069b6047a (diff)
downloadcpython-356de021d7dc02d4803627c0807c0950f7453754.zip
cpython-356de021d7dc02d4803627c0807c0950f7453754.tar.gz
cpython-356de021d7dc02d4803627c0807c0950f7453754.tar.bz2
[3.12] gh-109047: concurrent.futures catches RuntimeError (#109810) (#110126)
gh-109047: concurrent.futures catches PythonFinalizationError (#109810) concurrent.futures: The *executor manager thread* now catches exceptions when adding an item to the *call queue*. During Python finalization, creating a new thread can now raise RuntimeError. Catch the exception and call terminate_broken() in this case. Add test_python_finalization_error() to test_concurrent_futures. concurrent.futures._ExecutorManagerThread changes: * terminate_broken() no longer calls shutdown_workers() since the call queue is no longer working anymore (read and write ends of the queue pipe are closed). * terminate_broken() now terminates child processes, not only wait until they complete. * _ExecutorManagerThread.terminate_broken() now holds shutdown_lock to prevent race conditons with ProcessPoolExecutor.submit(). multiprocessing.Queue changes: * Add _terminate_broken() method. * _start_thread() sets _thread to None on exception to prevent leaking "dangling threads" even if the thread was not started yet. (cherry picked from commit 635184212179b0511768ea1cd57256e134ba2d75)
-rw-r--r--Lib/concurrent/futures/process.py49
-rw-r--r--Lib/multiprocessing/queues.py25
-rw-r--r--Lib/test/test_concurrent_futures/test_process_pool.py29
-rw-r--r--Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst4
4 files changed, 90 insertions, 17 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 33df65a..8359a4f 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -341,7 +341,14 @@ class _ExecutorManagerThread(threading.Thread):
# Main loop for the executor manager thread.
while True:
- self.add_call_item_to_queue()
+ # gh-109047: During Python finalization, self.call_queue.put()
+ # creation of a thread can fail with RuntimeError.
+ try:
+ self.add_call_item_to_queue()
+ except BaseException as exc:
+ cause = format_exception(exc)
+ self.terminate_broken(cause)
+ return
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
@@ -425,8 +432,8 @@ class _ExecutorManagerThread(threading.Thread):
try:
result_item = result_reader.recv()
is_broken = False
- except BaseException as e:
- cause = format_exception(type(e), e, e.__traceback__)
+ except BaseException as exc:
+ cause = format_exception(exc)
elif wakeup_reader in ready:
is_broken = False
@@ -473,7 +480,7 @@ class _ExecutorManagerThread(threading.Thread):
return (_global_shutdown or executor is None
or executor._shutdown_thread)
- def terminate_broken(self, cause):
+ def _terminate_broken(self, cause):
# Terminate the executor because it is in a broken state. The cause
# argument can be used to display more information on the error that
# lead the executor into becoming broken.
@@ -498,7 +505,14 @@ class _ExecutorManagerThread(threading.Thread):
# Mark pending tasks as failed.
for work_id, work_item in self.pending_work_items.items():
- work_item.future.set_exception(bpe)
+ try:
+ work_item.future.set_exception(bpe)
+ except _base.InvalidStateError:
+ # set_exception() fails if the future is cancelled: ignore it.
+ # Trying to check if the future is cancelled before calling
+ # set_exception() would leave a race condition if the future is
+ # cancelled between the check and set_exception().
+ pass
# Delete references to object. See issue16284
del work_item
self.pending_work_items.clear()
@@ -508,16 +522,18 @@ class _ExecutorManagerThread(threading.Thread):
for p in self.processes.values():
p.terminate()
- # Prevent queue writing to a pipe which is no longer read.
- # https://github.com/python/cpython/issues/94777
- self.call_queue._reader.close()
+ self.call_queue._terminate_broken()
# gh-107219: Close the connection writer which can unblock
# Queue._feed() if it was stuck in send_bytes().
self.call_queue._writer.close()
# clean up resources
- self.join_executor_internals()
+ self._join_executor_internals(broken=True)
+
+ def terminate_broken(self, cause):
+ with self.shutdown_lock:
+ self._terminate_broken(cause)
def flag_executor_shutting_down(self):
# Flag the executor as shutting down and cancel remaining tasks if
@@ -560,15 +576,24 @@ class _ExecutorManagerThread(threading.Thread):
break
def join_executor_internals(self):
- self.shutdown_workers()
+ with self.shutdown_lock:
+ self._join_executor_internals()
+
+ def _join_executor_internals(self, broken=False):
+ # If broken, call_queue was closed and so can no longer be used.
+ if not broken:
+ self.shutdown_workers()
+
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
- with self.shutdown_lock:
- self.thread_wakeup.close()
+ self.thread_wakeup.close()
+
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in self.processes.values():
+ if broken:
+ p.terminate()
p.join()
def get_n_children_alive(self):
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index daf9ee9..d36de75 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -158,6 +158,15 @@ class Queue(object):
except AttributeError:
pass
+ def _terminate_broken(self):
+ # Close a Queue on error.
+
+ # gh-94777: Prevent queue writing to a pipe which is no longer read.
+ self._reader.close()
+
+ self.close()
+ self.join_thread()
+
def _start_thread(self):
debug('Queue._start_thread()')
@@ -169,13 +178,19 @@ class Queue(object):
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
- name='QueueFeederThread'
+ name='QueueFeederThread',
+ daemon=True,
)
- self._thread.daemon = True
- debug('doing self._thread.start()')
- self._thread.start()
- debug('... done self._thread.start()')
+ try:
+ debug('doing self._thread.start()')
+ self._thread.start()
+ debug('... done self._thread.start()')
+ except:
+ # gh-109047: During Python finalization, creating a thread
+ # can fail with RuntimeError.
+ self._thread = None
+ raise
if not self._joincancelled:
self._jointhread = Finalize(
diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py
index 7763a49..c73c2da 100644
--- a/Lib/test/test_concurrent_futures/test_process_pool.py
+++ b/Lib/test/test_concurrent_futures/test_process_pool.py
@@ -1,5 +1,6 @@
import os
import sys
+import threading
import time
import unittest
from concurrent import futures
@@ -187,6 +188,34 @@ class ProcessPoolExecutorTest(ExecutorTest):
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))
+ def test_python_finalization_error(self):
+ # gh-109047: Catch RuntimeError on thread creation
+ # during Python finalization.
+
+ context = self.get_context()
+
+ # gh-109047: Mock the threading.start_new_thread() function to inject
+ # RuntimeError: simulate the error raised during Python finalization.
+ # Block the second creation: create _ExecutorManagerThread, but block
+ # QueueFeederThread.
+ orig_start_new_thread = threading._start_new_thread
+ nthread = 0
+ def mock_start_new_thread(func, *args):
+ nonlocal nthread
+ if nthread >= 1:
+ raise RuntimeError("can't create new thread at "
+ "interpreter shutdown")
+ nthread += 1
+ return orig_start_new_thread(func, *args)
+
+ with support.swap_attr(threading, '_start_new_thread',
+ mock_start_new_thread):
+ executor = self.executor_type(max_workers=2, mp_context=context)
+ with executor:
+ with self.assertRaises(BrokenProcessPool):
+ list(executor.map(mul, [(2, 3)] * 10))
+ executor.shutdown()
+
create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
diff --git a/Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst b/Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst
new file mode 100644
index 0000000..71cb5a8
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst
@@ -0,0 +1,4 @@
+:mod:`concurrent.futures`: The *executor manager thread* now catches exceptions
+when adding an item to the *call queue*. During Python finalization, creating a
+new thread can now raise :exc:`RuntimeError`. Catch the exception and call
+``terminate_broken()`` in this case. Patch by Victor Stinner.