summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-09-29 19:31:19 (GMT)
committerGitHub <noreply@github.com>2023-09-29 19:31:19 (GMT)
commit635184212179b0511768ea1cd57256e134ba2d75 (patch)
tree7755f29d577b5db379d9027cffd8363d1926fad6 /Lib/multiprocessing
parentf3df8fa669158f89af69b5661e98314d98fb916f (diff)
downloadcpython-635184212179b0511768ea1cd57256e134ba2d75.zip
cpython-635184212179b0511768ea1cd57256e134ba2d75.tar.gz
cpython-635184212179b0511768ea1cd57256e134ba2d75.tar.bz2
gh-109047: concurrent.futures catches PythonFinalizationError (#109810)
concurrent.futures: The *executor manager thread* now catches exceptions when adding an item to the *call queue*. During Python finalization, creating a new thread can now raise RuntimeError. Catch the exception and call terminate_broken() in this case. Add test_python_finalization_error() to test_concurrent_futures. concurrent.futures._ExecutorManagerThread changes: * terminate_broken() no longer calls shutdown_workers() since the call queue is no longer working anymore (read and write ends of the queue pipe are closed). * terminate_broken() now terminates child processes, not only wait until they complete. * _ExecutorManagerThread.terminate_broken() now holds shutdown_lock to prevent race conditons with ProcessPoolExecutor.submit(). multiprocessing.Queue changes: * Add _terminate_broken() method. * _start_thread() sets _thread to None on exception to prevent leaking "dangling threads" even if the thread was not started yet.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/queues.py30
1 files changed, 25 insertions, 5 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index daf9ee9..852ae87 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -158,6 +158,20 @@ class Queue(object):
except AttributeError:
pass
+ def _terminate_broken(self):
+ # Close a Queue on error.
+
+ # gh-94777: Prevent queue writing to a pipe which is no longer read.
+ self._reader.close()
+
+ # gh-107219: Close the connection writer which can unblock
+ # Queue._feed() if it was stuck in send_bytes().
+ if sys.platform == 'win32':
+ self._writer.close()
+
+ self.close()
+ self.join_thread()
+
def _start_thread(self):
debug('Queue._start_thread()')
@@ -169,13 +183,19 @@ class Queue(object):
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
- name='QueueFeederThread'
+ name='QueueFeederThread',
+ daemon=True,
)
- self._thread.daemon = True
- debug('doing self._thread.start()')
- self._thread.start()
- debug('... done self._thread.start()')
+ try:
+ debug('doing self._thread.start()')
+ self._thread.start()
+ debug('... done self._thread.start()')
+ except:
+ # gh-109047: During Python finalization, creating a thread
+ # can fail with RuntimeError.
+ self._thread = None
+ raise
if not self._joincancelled:
self._jointhread = Finalize(