summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-09-11 08:11:31 (GMT)
committerGitHub <noreply@github.com>2023-09-11 08:11:31 (GMT)
commita9b1f84790e977fb09f75b148c4c4f5924a6ef99 (patch)
tree836a2e8244e2bcafa81cddaeb88e8489f63231b0
parent3b2ecbc1275bd05534885cee9ac1389987238561 (diff)
downloadcpython-a9b1f84790e977fb09f75b148c4c4f5924a6ef99.zip
cpython-a9b1f84790e977fb09f75b148c4c4f5924a6ef99.tar.gz
cpython-a9b1f84790e977fb09f75b148c4c4f5924a6ef99.tar.bz2
gh-107219: Fix concurrent.futures terminate_broken() (#109244)
Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation.
-rw-r--r--Lib/concurrent/futures/process.py4
-rw-r--r--Lib/multiprocessing/connection.py18
-rw-r--r--Misc/NEWS.d/next/Library/2023-09-11-00-32-18.gh-issue-107219.3zqyFT.rst5
3 files changed, 27 insertions, 0 deletions
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 9933d3d..f4b5cd1 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -510,6 +510,10 @@ class _ExecutorManagerThread(threading.Thread):
# https://github.com/python/cpython/issues/94777
self.call_queue._reader.close()
+ # gh-107219: Close the connection writer which can unblock
+ # Queue._feed() if it was stuck in send_bytes().
+ self.call_queue._writer.close()
+
# clean up resources
self.join_executor_internals()
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 04eaea8..7c425a2 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -9,6 +9,7 @@
__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
+import errno
import io
import os
import sys
@@ -41,6 +42,7 @@ except ImportError:
BUFSIZE = 8192
# A very generous timeout when it comes to local connections...
CONNECTION_TIMEOUT = 20.
+WSA_OPERATION_ABORTED = 995
_mmap_counter = itertools.count()
@@ -271,12 +273,22 @@ if _winapi:
with FILE_FLAG_OVERLAPPED.
"""
_got_empty_message = False
+ _send_ov = None
def _close(self, _CloseHandle=_winapi.CloseHandle):
+ ov = self._send_ov
+ if ov is not None:
+ # Interrupt WaitForMultipleObjects() in _send_bytes()
+ ov.cancel()
_CloseHandle(self._handle)
def _send_bytes(self, buf):
+ if self._send_ov is not None:
+ # A connection should only be used by a single thread
+ raise ValueError("concurrent send_bytes() calls "
+ "are not supported")
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
+ self._send_ov = ov
try:
if err == _winapi.ERROR_IO_PENDING:
waitres = _winapi.WaitForMultipleObjects(
@@ -286,7 +298,13 @@ if _winapi:
ov.cancel()
raise
finally:
+ self._send_ov = None
nwritten, err = ov.GetOverlappedResult(True)
+ if err == WSA_OPERATION_ABORTED:
+ # close() was called by another thread while
+ # WaitForMultipleObjects() was waiting for the overlapped
+ # operation.
+ raise OSError(errno.EPIPE, "handle is closed")
assert err == 0
assert nwritten == len(buf)
diff --git a/Misc/NEWS.d/next/Library/2023-09-11-00-32-18.gh-issue-107219.3zqyFT.rst b/Misc/NEWS.d/next/Library/2023-09-11-00-32-18.gh-issue-107219.3zqyFT.rst
new file mode 100644
index 0000000..10afbcf
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2023-09-11-00-32-18.gh-issue-107219.3zqyFT.rst
@@ -0,0 +1,5 @@
+Fix a race condition in ``concurrent.futures``. When a process in the
+process pool was terminated abruptly (while the future was running or
+pending), close the connection write end. If the call queue is blocked on
+sending bytes to a worker process, closing the connection write end interrupts
+the send, so the queue can be closed. Patch by Victor Stinner.