diff options
author | Victor Stinner <vstinner@python.org> | 2023-09-29 19:31:19 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-29 19:31:19 (GMT) |
commit | 635184212179b0511768ea1cd57256e134ba2d75 (patch) | |
tree | 7755f29d577b5db379d9027cffd8363d1926fad6 /Lib/concurrent | |
parent | f3df8fa669158f89af69b5661e98314d98fb916f (diff) | |
download | cpython-635184212179b0511768ea1cd57256e134ba2d75.zip cpython-635184212179b0511768ea1cd57256e134ba2d75.tar.gz cpython-635184212179b0511768ea1cd57256e134ba2d75.tar.bz2 |
gh-109047: concurrent.futures catches PythonFinalizationError (#109810)
concurrent.futures: The *executor manager thread* now catches
exceptions when adding an item to the *call queue*. During Python
finalization, creating a new thread can now raise RuntimeError. Catch
the exception and call terminate_broken() in this case.
Add test_python_finalization_error() to test_concurrent_futures.
concurrent.futures._ExecutorManagerThread changes:
* terminate_broken() no longer calls shutdown_workers() since the
call queue is no longer working anymore (read and write ends of
the queue pipe are closed).
* terminate_broken() now terminates child processes, not only
wait until they complete.
* _ExecutorManagerThread.terminate_broken() now holds shutdown_lock
to prevent race conditons with ProcessPoolExecutor.submit().
multiprocessing.Queue changes:
* Add _terminate_broken() method.
* _start_thread() sets _thread to None on exception to prevent
leaking "dangling threads" even if the thread was not started
yet.
Diffstat (limited to 'Lib/concurrent')
-rw-r--r-- | Lib/concurrent/futures/process.py | 47 |
1 files changed, 30 insertions, 17 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 73bdcbe..3990e6b 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -341,7 +341,14 @@ class _ExecutorManagerThread(threading.Thread): # Main loop for the executor manager thread. while True: - self.add_call_item_to_queue() + # gh-109047: During Python finalization, self.call_queue.put() + # creation of a thread can fail with RuntimeError. + try: + self.add_call_item_to_queue() + except BaseException as exc: + cause = format_exception(exc) + self.terminate_broken(cause) + return result_item, is_broken, cause = self.wait_result_broken_or_wakeup() @@ -425,8 +432,8 @@ class _ExecutorManagerThread(threading.Thread): try: result_item = result_reader.recv() is_broken = False - except BaseException as e: - cause = format_exception(type(e), e, e.__traceback__) + except BaseException as exc: + cause = format_exception(exc) elif wakeup_reader in ready: is_broken = False @@ -463,7 +470,7 @@ class _ExecutorManagerThread(threading.Thread): return (_global_shutdown or executor is None or executor._shutdown_thread) - def terminate_broken(self, cause): + def _terminate_broken(self, cause): # Terminate the executor because it is in a broken state. The cause # argument can be used to display more information on the error that # lead the executor into becoming broken. @@ -490,7 +497,7 @@ class _ExecutorManagerThread(threading.Thread): for work_id, work_item in self.pending_work_items.items(): try: work_item.future.set_exception(bpe) - except _base.InvalidStateError as exc: + except _base.InvalidStateError: # set_exception() fails if the future is cancelled: ignore it. # Trying to check if the future is cancelled before calling # set_exception() would leave a race condition if the future is @@ -505,17 +512,14 @@ class _ExecutorManagerThread(threading.Thread): for p in self.processes.values(): p.terminate() - # Prevent queue writing to a pipe which is no longer read. - # https://github.com/python/cpython/issues/94777 - self.call_queue._reader.close() - - # gh-107219: Close the connection writer which can unblock - # Queue._feed() if it was stuck in send_bytes(). - if sys.platform == 'win32': - self.call_queue._writer.close() + self.call_queue._terminate_broken() # clean up resources - self.join_executor_internals() + self._join_executor_internals(broken=True) + + def terminate_broken(self, cause): + with self.shutdown_lock: + self._terminate_broken(cause) def flag_executor_shutting_down(self): # Flag the executor as shutting down and cancel remaining tasks if @@ -558,15 +562,24 @@ class _ExecutorManagerThread(threading.Thread): break def join_executor_internals(self): - self.shutdown_workers() + with self.shutdown_lock: + self._join_executor_internals() + + def _join_executor_internals(self, broken=False): + # If broken, call_queue was closed and so can no longer be used. + if not broken: + self.shutdown_workers() + # Release the queue's resources as soon as possible. self.call_queue.close() self.call_queue.join_thread() - with self.shutdown_lock: - self.thread_wakeup.close() + self.thread_wakeup.close() + # If .join() is not called on the created processes then # some ctx.Queue methods may deadlock on Mac OS X. for p in self.processes.values(): + if broken: + p.terminate() p.join() def get_n_children_alive(self): |