summaryrefslogtreecommitdiffstats
path: root/Lib/concurrent
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-09-29 19:31:19 (GMT)
committerGitHub <noreply@github.com>2023-09-29 19:31:19 (GMT)
commit635184212179b0511768ea1cd57256e134ba2d75 (patch)
tree7755f29d577b5db379d9027cffd8363d1926fad6 /Lib/concurrent
parentf3df8fa669158f89af69b5661e98314d98fb916f (diff)
downloadcpython-635184212179b0511768ea1cd57256e134ba2d75.zip
cpython-635184212179b0511768ea1cd57256e134ba2d75.tar.gz
cpython-635184212179b0511768ea1cd57256e134ba2d75.tar.bz2
gh-109047: concurrent.futures catches PythonFinalizationError (#109810)
concurrent.futures: The *executor manager thread* now catches exceptions when adding an item to the *call queue*. During Python finalization, creating a new thread can now raise RuntimeError. Catch the exception and call terminate_broken() in this case. Add test_python_finalization_error() to test_concurrent_futures. concurrent.futures._ExecutorManagerThread changes: * terminate_broken() no longer calls shutdown_workers() since the call queue is no longer working anymore (read and write ends of the queue pipe are closed). * terminate_broken() now terminates child processes, not only wait until they complete. * _ExecutorManagerThread.terminate_broken() now holds shutdown_lock to prevent race conditons with ProcessPoolExecutor.submit(). multiprocessing.Queue changes: * Add _terminate_broken() method. * _start_thread() sets _thread to None on exception to prevent leaking "dangling threads" even if the thread was not started yet.
Diffstat (limited to 'Lib/concurrent')
-rw-r--r--Lib/concurrent/futures/process.py47
1 files changed, 30 insertions, 17 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 73bdcbe..3990e6b 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -341,7 +341,14 @@ class _ExecutorManagerThread(threading.Thread):
# Main loop for the executor manager thread.
while True:
- self.add_call_item_to_queue()
+ # gh-109047: During Python finalization, self.call_queue.put()
+ # creation of a thread can fail with RuntimeError.
+ try:
+ self.add_call_item_to_queue()
+ except BaseException as exc:
+ cause = format_exception(exc)
+ self.terminate_broken(cause)
+ return
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
@@ -425,8 +432,8 @@ class _ExecutorManagerThread(threading.Thread):
try:
result_item = result_reader.recv()
is_broken = False
- except BaseException as e:
- cause = format_exception(type(e), e, e.__traceback__)
+ except BaseException as exc:
+ cause = format_exception(exc)
elif wakeup_reader in ready:
is_broken = False
@@ -463,7 +470,7 @@ class _ExecutorManagerThread(threading.Thread):
return (_global_shutdown or executor is None
or executor._shutdown_thread)
- def terminate_broken(self, cause):
+ def _terminate_broken(self, cause):
# Terminate the executor because it is in a broken state. The cause
# argument can be used to display more information on the error that
# lead the executor into becoming broken.
@@ -490,7 +497,7 @@ class _ExecutorManagerThread(threading.Thread):
for work_id, work_item in self.pending_work_items.items():
try:
work_item.future.set_exception(bpe)
- except _base.InvalidStateError as exc:
+ except _base.InvalidStateError:
# set_exception() fails if the future is cancelled: ignore it.
# Trying to check if the future is cancelled before calling
# set_exception() would leave a race condition if the future is
@@ -505,17 +512,14 @@ class _ExecutorManagerThread(threading.Thread):
for p in self.processes.values():
p.terminate()
- # Prevent queue writing to a pipe which is no longer read.
- # https://github.com/python/cpython/issues/94777
- self.call_queue._reader.close()
-
- # gh-107219: Close the connection writer which can unblock
- # Queue._feed() if it was stuck in send_bytes().
- if sys.platform == 'win32':
- self.call_queue._writer.close()
+ self.call_queue._terminate_broken()
# clean up resources
- self.join_executor_internals()
+ self._join_executor_internals(broken=True)
+
+ def terminate_broken(self, cause):
+ with self.shutdown_lock:
+ self._terminate_broken(cause)
def flag_executor_shutting_down(self):
# Flag the executor as shutting down and cancel remaining tasks if
@@ -558,15 +562,24 @@ class _ExecutorManagerThread(threading.Thread):
break
def join_executor_internals(self):
- self.shutdown_workers()
+ with self.shutdown_lock:
+ self._join_executor_internals()
+
+ def _join_executor_internals(self, broken=False):
+ # If broken, call_queue was closed and so can no longer be used.
+ if not broken:
+ self.shutdown_workers()
+
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
- with self.shutdown_lock:
- self.thread_wakeup.close()
+ self.thread_wakeup.close()
+
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in self.processes.values():
+ if broken:
+ p.terminate()
p.join()
def get_n_children_alive(self):