diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2011-06-08 15:21:55 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2011-06-08 15:21:55 (GMT) |
commit | dd696496605883a44da983ad81e55a01e996a004 (patch) | |
tree | 505cc588dc520311ff2935b8804c04663b26c054 /Lib/multiprocessing/connection.py | |
parent | 4a5e5de03f47b7076fc0eabc9817a59efd20049d (diff) | |
download | cpython-dd696496605883a44da983ad81e55a01e996a004.zip cpython-dd696496605883a44da983ad81e55a01e996a004.tar.gz cpython-dd696496605883a44da983ad81e55a01e996a004.tar.bz2 |
Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed
children and raises BrokenProcessPool in such a situation. Previously it
would reliably freeze/deadlock.
Diffstat (limited to 'Lib/multiprocessing/connection.py')
-rw-r--r-- | Lib/multiprocessing/connection.py | 149 |
1 files changed, 97 insertions, 52 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 415e210..ede2908 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -48,14 +48,18 @@ import itertools import _multiprocessing from multiprocessing import current_process, AuthenticationError, BufferTooShort -from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug +from multiprocessing.util import ( + get_temp_dir, Finalize, sub_debug, debug, _eintr_retry) try: from _multiprocessing import win32 + from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE except ImportError: if sys.platform == 'win32': raise win32 = None +_select = _eintr_retry(select.select) + # # # @@ -118,6 +122,15 @@ def address_type(address): else: raise ValueError('address type of %r unrecognized' % address) + +class SentinelReady(Exception): + """ + Raised when a sentinel is ready when polling. + """ + def __init__(self, *args): + Exception.__init__(self, *args) + self.sentinels = args[0] + # # Connection classes # @@ -253,19 +266,17 @@ class _ConnectionBase: (offset + size) // itemsize]) return size - def recv(self): + def recv(self, sentinels=None): """Receive a (picklable) object""" self._check_closed() self._check_readable() - buf = self._recv_bytes() + buf = self._recv_bytes(sentinels=sentinels) return pickle.loads(buf.getbuffer()) def poll(self, timeout=0.0): """Whether there is any input available to be read""" self._check_closed() self._check_readable() - if timeout < 0.0: - timeout = None return self._poll(timeout) @@ -274,61 +285,88 @@ if win32: class PipeConnection(_ConnectionBase): """ Connection class based on a Windows named pipe. + Overlapped I/O is used, so the handles must have been created + with FILE_FLAG_OVERLAPPED. """ + _buffered = b'' def _close(self): win32.CloseHandle(self._handle) def _send_bytes(self, buf): - nwritten = win32.WriteFile(self._handle, buf) + overlapped = win32.WriteFile(self._handle, buf, overlapped=True) + nwritten, complete = overlapped.GetOverlappedResult(True) + assert complete assert nwritten == len(buf) - def _recv_bytes(self, maxsize=None): + def _recv_bytes(self, maxsize=None, sentinels=()): + if sentinels: + self._poll(-1.0, sentinels) buf = io.BytesIO() - bufsize = 512 - if maxsize is not None: - bufsize = min(bufsize, maxsize) - try: - firstchunk, complete = win32.ReadFile(self._handle, bufsize) - except IOError as e: - if e.errno == win32.ERROR_BROKEN_PIPE: - raise EOFError - raise - lenfirstchunk = len(firstchunk) - buf.write(firstchunk) - if complete: - return buf + firstchunk = self._buffered + if firstchunk: + lenfirstchunk = len(firstchunk) + buf.write(firstchunk) + self._buffered = b'' + else: + # A reasonable size for the first chunk transfer + bufsize = 128 + if maxsize is not None and maxsize < bufsize: + bufsize = maxsize + try: + overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True) + lenfirstchunk, complete = overlapped.GetOverlappedResult(True) + firstchunk = overlapped.getbuffer() + assert lenfirstchunk == len(firstchunk) + except IOError as e: + if e.errno == win32.ERROR_BROKEN_PIPE: + raise EOFError + raise + buf.write(firstchunk) + if complete: + return buf navail, nleft = win32.PeekNamedPipe(self._handle) if maxsize is not None and lenfirstchunk + nleft > maxsize: return None - lastchunk, complete = win32.ReadFile(self._handle, nleft) - assert complete - buf.write(lastchunk) + if nleft > 0: + overlapped = win32.ReadFile(self._handle, nleft, overlapped=True) + res, complete = overlapped.GetOverlappedResult(True) + assert res == nleft + assert complete + buf.write(overlapped.getbuffer()) return buf - def _poll(self, timeout): + def _poll(self, timeout, sentinels=()): + # Fast non-blocking path navail, nleft = win32.PeekNamedPipe(self._handle) if navail > 0: return True elif timeout == 0.0: return False - # Setup a polling loop (translated straight from old - # pipe_connection.c) + # Blocking: use overlapped I/O if timeout < 0.0: - deadline = None + timeout = INFINITE else: - deadline = time.time() + timeout - delay = 0.001 - max_delay = 0.02 - while True: - time.sleep(delay) - navail, nleft = win32.PeekNamedPipe(self._handle) - if navail > 0: - return True - if deadline and time.time() > deadline: - return False - if delay < max_delay: - delay += 0.001 + timeout = int(timeout * 1000 + 0.5) + overlapped = win32.ReadFile(self._handle, 1, overlapped=True) + try: + handles = [overlapped.event] + handles += sentinels + res = win32.WaitForMultipleObjects(handles, False, timeout) + finally: + # Always cancel overlapped I/O in the same thread + # (because CancelIoEx() appears only in Vista) + overlapped.cancel() + if res == WAIT_TIMEOUT: + return False + idx = res - WAIT_OBJECT_0 + if idx == 0: + # I/O was successful, store received data + overlapped.GetOverlappedResult(True) + self._buffered += overlapped.getbuffer() + return True + assert 0 < idx < len(handles) + raise SentinelReady([handles[idx]]) class Connection(_ConnectionBase): @@ -357,11 +395,18 @@ class Connection(_ConnectionBase): break buf = buf[n:] - def _recv(self, size, read=_read): + def _recv(self, size, sentinels=(), read=_read): buf = io.BytesIO() + handle = self._handle + if sentinels: + handles = [handle] + sentinels remaining = size while remaining > 0: - chunk = read(self._handle, remaining) + if sentinels: + r = _select(handles, [], [])[0] + if handle not in r: + raise SentinelReady(r) + chunk = read(handle, remaining) n = len(chunk) if n == 0: if remaining == size: @@ -381,15 +426,17 @@ class Connection(_ConnectionBase): if n > 0: self._send(buf) - def _recv_bytes(self, maxsize=None): - buf = self._recv(4) + def _recv_bytes(self, maxsize=None, sentinels=()): + buf = self._recv(4, sentinels) size, = struct.unpack("=i", buf.getvalue()) if maxsize is not None and size > maxsize: return None - return self._recv(size) + return self._recv(size, sentinels) def _poll(self, timeout): - r = select.select([self._handle], [], [], timeout)[0] + if timeout < 0.0: + timeout = None + r = _select([self._handle], [], [], timeout)[0] return bool(r) @@ -495,23 +542,21 @@ else: obsize, ibsize = 0, BUFSIZE h1 = win32.CreateNamedPipe( - address, openmode, + address, openmode | win32.FILE_FLAG_OVERLAPPED, win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_WAIT, 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL ) h2 = win32.CreateFile( - address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + address, access, 0, win32.NULL, win32.OPEN_EXISTING, + win32.FILE_FLAG_OVERLAPPED, win32.NULL ) win32.SetNamedPipeHandleState( h2, win32.PIPE_READMODE_MESSAGE, None, None ) - try: - win32.ConnectNamedPipe(h1, win32.NULL) - except WindowsError as e: - if e.args[0] != win32.ERROR_PIPE_CONNECTED: - raise + overlapped = win32.ConnectNamedPipe(h1, overlapped=True) + overlapped.GetOverlappedResult(True) c1 = PipeConnection(h1, writable=duplex) c2 = PipeConnection(h2, readable=duplex) |