diff options
author | Victor Stinner <vstinner@python.org> | 2023-09-29 19:31:19 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-29 19:31:19 (GMT) |
commit | 635184212179b0511768ea1cd57256e134ba2d75 (patch) | |
tree | 7755f29d577b5db379d9027cffd8363d1926fad6 /Lib/multiprocessing | |
parent | f3df8fa669158f89af69b5661e98314d98fb916f (diff) | |
download | cpython-635184212179b0511768ea1cd57256e134ba2d75.zip cpython-635184212179b0511768ea1cd57256e134ba2d75.tar.gz cpython-635184212179b0511768ea1cd57256e134ba2d75.tar.bz2 |
gh-109047: concurrent.futures catches PythonFinalizationError (#109810)
concurrent.futures: The *executor manager thread* now catches
exceptions when adding an item to the *call queue*. During Python
finalization, creating a new thread can now raise RuntimeError. Catch
the exception and call terminate_broken() in this case.
Add test_python_finalization_error() to test_concurrent_futures.
concurrent.futures._ExecutorManagerThread changes:
* terminate_broken() no longer calls shutdown_workers() since the
call queue is no longer working anymore (read and write ends of
the queue pipe are closed).
* terminate_broken() now terminates child processes, not only
wait until they complete.
* _ExecutorManagerThread.terminate_broken() now holds shutdown_lock
to prevent race conditons with ProcessPoolExecutor.submit().
multiprocessing.Queue changes:
* Add _terminate_broken() method.
* _start_thread() sets _thread to None on exception to prevent
leaking "dangling threads" even if the thread was not started
yet.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/queues.py | 30 |
1 files changed, 25 insertions, 5 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index daf9ee9..852ae87 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -158,6 +158,20 @@ class Queue(object): except AttributeError: pass + def _terminate_broken(self): + # Close a Queue on error. + + # gh-94777: Prevent queue writing to a pipe which is no longer read. + self._reader.close() + + # gh-107219: Close the connection writer which can unblock + # Queue._feed() if it was stuck in send_bytes(). + if sys.platform == 'win32': + self._writer.close() + + self.close() + self.join_thread() + def _start_thread(self): debug('Queue._start_thread()') @@ -169,13 +183,19 @@ class Queue(object): self._wlock, self._reader.close, self._writer.close, self._ignore_epipe, self._on_queue_feeder_error, self._sem), - name='QueueFeederThread' + name='QueueFeederThread', + daemon=True, ) - self._thread.daemon = True - debug('doing self._thread.start()') - self._thread.start() - debug('... done self._thread.start()') + try: + debug('doing self._thread.start()') + self._thread.start() + debug('... done self._thread.start()') + except: + # gh-109047: During Python finalization, creating a thread + # can fail with RuntimeError. + self._thread = None + raise if not self._joincancelled: self._jointhread = Finalize( |