diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2012-03-05 18:28:37 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2012-03-05 18:28:37 (GMT) |
commit | bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973 (patch) | |
tree | 54137f9699833726def7c803cff7c995af22cfa5 /Lib/multiprocessing | |
parent | 1e88f3faa61dbaa9ea0d2404aa8563c1eeceba54 (diff) | |
download | cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.zip cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.gz cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.bz2 |
Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows.
Also, add a multiprocessing.connection.wait(rlist, timeout=None) function
for polling multiple objects at once. Patch by sbt.
Complete changelist from sbt's patch:
* Adds a wait(rlist, timeout=None) function for polling multiple
objects at once. On Unix this is just a wrapper for
select(rlist, [], [], timeout=None).
* Removes use of the SentinelReady exception and the sentinels argument
to certain methods. concurrent.futures.process has been changed to
use wait() instead of SentinelReady.
* Fixes bugs concerning PipeConnection.poll() and messages of zero
length.
* Fixes PipeListener.accept() to call ConnectNamedPipe() with
overlapped=True.
* Fixes Queue.empty() and SimpleQueue.empty() so that they are
threadsafe on Windows.
* Now PipeConnection.poll() and wait() will not modify the pipe except
possibly by consuming a zero length message. (Previously poll()
could consume a partial message.)
* All of multiprocesing's pipe related blocking functions/methods are
now interruptible by SIGINT on Windows.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/connection.py | 324 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 9 |
2 files changed, 217 insertions, 116 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 8807618..ca0c973 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -32,7 +32,7 @@ # SUCH DAMAGE. # -__all__ = [ 'Client', 'Listener', 'Pipe' ] +__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] import io import os @@ -58,8 +58,6 @@ except ImportError: raise win32 = None -_select = _eintr_retry(select.select) - # # # @@ -122,15 +120,6 @@ def address_type(address): else: raise ValueError('address type of %r unrecognized' % address) - -class SentinelReady(Exception): - """ - Raised when a sentinel is ready when polling. - """ - def __init__(self, *args): - Exception.__init__(self, *args) - self.sentinels = args[0] - # # Connection classes # @@ -268,11 +257,11 @@ class _ConnectionBase: (offset + size) // itemsize]) return size - def recv(self, sentinels=None): + def recv(self): """Receive a (picklable) object""" self._check_closed() self._check_readable() - buf = self._recv_bytes(sentinels=sentinels) + buf = self._recv_bytes() return pickle.loads(buf.getbuffer()) def poll(self, timeout=0.0): @@ -290,85 +279,80 @@ if win32: Overlapped I/O is used, so the handles must have been created with FILE_FLAG_OVERLAPPED. """ - _buffered = b'' + _got_empty_message = False def _close(self, _CloseHandle=win32.CloseHandle): _CloseHandle(self._handle) def _send_bytes(self, buf): - overlapped = win32.WriteFile(self._handle, buf, overlapped=True) - nwritten, complete = overlapped.GetOverlappedResult(True) - assert complete + ov, err = win32.WriteFile(self._handle, buf, overlapped=True) + try: + if err == win32.ERROR_IO_PENDING: + waitres = win32.WaitForMultipleObjects( + [ov.event], False, INFINITE) + assert waitres == WAIT_OBJECT_0 + except: + ov.cancel() + raise + finally: + nwritten, err = ov.GetOverlappedResult(True) + assert err == 0 assert nwritten == len(buf) - def _recv_bytes(self, maxsize=None, sentinels=()): - if sentinels: - self._poll(-1.0, sentinels) - buf = io.BytesIO() - firstchunk = self._buffered - if firstchunk: - lenfirstchunk = len(firstchunk) - buf.write(firstchunk) - self._buffered = b'' + def _recv_bytes(self, maxsize=None): + if self._got_empty_message: + self._got_empty_message = False + return io.BytesIO() else: - # A reasonable size for the first chunk transfer - bufsize = 128 - if maxsize is not None and maxsize < bufsize: - bufsize = maxsize + bsize = 128 if maxsize is None else min(maxsize, 128) try: - overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True) - lenfirstchunk, complete = overlapped.GetOverlappedResult(True) - firstchunk = overlapped.getbuffer() - assert lenfirstchunk == len(firstchunk) + ov, err = win32.ReadFile(self._handle, bsize, + overlapped=True) + try: + if err == win32.ERROR_IO_PENDING: + waitres = win32.WaitForMultipleObjects( + [ov.event], False, INFINITE) + assert waitres == WAIT_OBJECT_0 + except: + ov.cancel() + raise + finally: + nread, err = ov.GetOverlappedResult(True) + if err == 0: + f = io.BytesIO() + f.write(ov.getbuffer()) + return f + elif err == win32.ERROR_MORE_DATA: + return self._get_more_data(ov, maxsize) except IOError as e: if e.winerror == win32.ERROR_BROKEN_PIPE: raise EOFError - raise - buf.write(firstchunk) - if complete: - return buf - navail, nleft = win32.PeekNamedPipe(self._handle) - if maxsize is not None and lenfirstchunk + nleft > maxsize: - return None - if nleft > 0: - overlapped = win32.ReadFile(self._handle, nleft, overlapped=True) - res, complete = overlapped.GetOverlappedResult(True) - assert res == nleft - assert complete - buf.write(overlapped.getbuffer()) - return buf - - def _poll(self, timeout, sentinels=()): - # Fast non-blocking path - navail, nleft = win32.PeekNamedPipe(self._handle) - if navail > 0: - return True - elif timeout == 0.0: - return False - # Blocking: use overlapped I/O - if timeout < 0.0: - timeout = INFINITE - else: - timeout = int(timeout * 1000 + 0.5) - overlapped = win32.ReadFile(self._handle, 1, overlapped=True) - try: - handles = [overlapped.event] - handles += sentinels - res = win32.WaitForMultipleObjects(handles, False, timeout) - finally: - # Always cancel overlapped I/O in the same thread - # (because CancelIoEx() appears only in Vista) - overlapped.cancel() - if res == WAIT_TIMEOUT: - return False - idx = res - WAIT_OBJECT_0 - if idx == 0: - # I/O was successful, store received data - overlapped.GetOverlappedResult(True) - self._buffered += overlapped.getbuffer() + else: + raise + raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") + + def _poll(self, timeout): + if (self._got_empty_message or + win32.PeekNamedPipe(self._handle)[0] != 0): return True - assert 0 < idx < len(handles) - raise SentinelReady([handles[idx]]) + if timeout < 0: + timeout = None + return bool(wait([self], timeout)) + + def _get_more_data(self, ov, maxsize): + buf = ov.getbuffer() + f = io.BytesIO() + f.write(buf) + left = win32.PeekNamedPipe(self._handle)[1] + assert left > 0 + if maxsize is not None and len(buf) + left > maxsize: + self._bad_message_length() + ov, err = win32.ReadFile(self._handle, left, overlapped=True) + rbytes, err = ov.GetOverlappedResult(True) + assert err == 0 + assert rbytes == left + f.write(ov.getbuffer()) + return f class Connection(_ConnectionBase): @@ -397,17 +381,11 @@ class Connection(_ConnectionBase): break buf = buf[n:] - def _recv(self, size, sentinels=(), read=_read): + def _recv(self, size, read=_read): buf = io.BytesIO() handle = self._handle - if sentinels: - handles = [handle] + sentinels remaining = size while remaining > 0: - if sentinels: - r = _select(handles, [], [])[0] - if handle not in r: - raise SentinelReady(r) chunk = read(handle, remaining) n = len(chunk) if n == 0: @@ -428,17 +406,17 @@ class Connection(_ConnectionBase): if n > 0: self._send(buf) - def _recv_bytes(self, maxsize=None, sentinels=()): - buf = self._recv(4, sentinels) + def _recv_bytes(self, maxsize=None): + buf = self._recv(4) size, = struct.unpack("!i", buf.getvalue()) if maxsize is not None and size > maxsize: return None - return self._recv(size, sentinels) + return self._recv(size) def _poll(self, timeout): if timeout < 0.0: timeout = None - r = _select([self._handle], [], [], timeout)[0] + r = wait([self._handle], timeout) return bool(r) @@ -559,7 +537,8 @@ else: ) overlapped = win32.ConnectNamedPipe(h1, overlapped=True) - overlapped.GetOverlappedResult(True) + _, err = overlapped.GetOverlappedResult(True) + assert err == 0 c1 = PipeConnection(h1, writable=duplex) c2 = PipeConnection(h2, readable=duplex) @@ -633,39 +612,40 @@ if sys.platform == 'win32': ''' def __init__(self, address, backlog=None): self._address = address - handle = win32.CreateNamedPipe( - address, win32.PIPE_ACCESS_DUPLEX | - win32.FILE_FLAG_FIRST_PIPE_INSTANCE, - win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | - win32.PIPE_WAIT, - win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, - win32.NMPWAIT_WAIT_FOREVER, win32.NULL - ) - self._handle_queue = [handle] - self._last_accepted = None + self._handle_queue = [self._new_handle(first=True)] + self._last_accepted = None sub_debug('listener created with address=%r', self._address) - self.close = Finalize( self, PipeListener._finalize_pipe_listener, args=(self._handle_queue, self._address), exitpriority=0 ) - def accept(self): - newhandle = win32.CreateNamedPipe( - self._address, win32.PIPE_ACCESS_DUPLEX, + def _new_handle(self, first=False): + flags = win32.PIPE_ACCESS_DUPLEX | win32.FILE_FLAG_OVERLAPPED + if first: + flags |= win32.FILE_FLAG_FIRST_PIPE_INSTANCE + return win32.CreateNamedPipe( + self._address, flags, win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_WAIT, win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, win32.NMPWAIT_WAIT_FOREVER, win32.NULL ) - self._handle_queue.append(newhandle) + + def accept(self): + self._handle_queue.append(self._new_handle()) handle = self._handle_queue.pop(0) + ov = win32.ConnectNamedPipe(handle, overlapped=True) try: - win32.ConnectNamedPipe(handle, win32.NULL) - except WindowsError as e: - if e.winerror != win32.ERROR_PIPE_CONNECTED: - raise + res = win32.WaitForMultipleObjects([ov.event], False, INFINITE) + except: + ov.cancel() + win32.CloseHandle(handle) + raise + finally: + _, err = ov.GetOverlappedResult(True) + assert err == 0 return PipeConnection(handle) @staticmethod @@ -684,7 +664,8 @@ if sys.platform == 'win32': win32.WaitNamedPipe(address, 1000) h = win32.CreateFile( address, win32.GENERIC_READ | win32.GENERIC_WRITE, - 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + 0, win32.NULL, win32.OPEN_EXISTING, + win32.FILE_FLAG_OVERLAPPED, win32.NULL ) except WindowsError as e: if e.winerror not in (win32.ERROR_SEM_TIMEOUT, @@ -773,6 +754,125 @@ def XmlClient(*args, **kwds): import xmlrpc.client as xmlrpclib return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) +# +# Wait +# + +if sys.platform == 'win32': + + def _exhaustive_wait(handles, timeout): + # Return ALL handles which are currently signalled. (Only + # returning the first signalled might create starvation issues.) + L = list(handles) + ready = [] + while L: + res = win32.WaitForMultipleObjects(L, False, timeout) + if res == WAIT_TIMEOUT: + break + elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): + res -= WAIT_OBJECT_0 + elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): + res -= WAIT_ABANDONED_0 + else: + raise RuntimeError('Should not get here') + ready.append(L[res]) + L = L[res+1:] + timeout = 0 + return ready + + _ready_errors = {win32.ERROR_BROKEN_PIPE, win32.ERROR_NETNAME_DELETED} + + def wait(object_list, timeout=None): + ''' + Wait till an object in object_list is ready/readable. + + Returns list of those objects in object_list which are ready/readable. + ''' + if timeout is None: + timeout = INFINITE + elif timeout < 0: + timeout = 0 + else: + timeout = int(timeout * 1000 + 0.5) + + object_list = list(object_list) + waithandle_to_obj = {} + ov_list = [] + ready_objects = set() + ready_handles = set() + + try: + for o in object_list: + try: + fileno = getattr(o, 'fileno') + except AttributeError: + waithandle_to_obj[o.__index__()] = o + else: + # start an overlapped read of length zero + try: + ov, err = win32.ReadFile(fileno(), 0, True) + except OSError as e: + err = e.winerror + if err not in _ready_errors: + raise + if err == win32.ERROR_IO_PENDING: + ov_list.append(ov) + waithandle_to_obj[ov.event] = o + else: + # If o.fileno() is an overlapped pipe handle and + # err == 0 then there is a zero length message + # in the pipe, but it HAS NOT been consumed. + ready_objects.add(o) + timeout = 0 + + ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) + finally: + # request that overlapped reads stop + for ov in ov_list: + ov.cancel() + + # wait for all overlapped reads to stop + for ov in ov_list: + try: + _, err = ov.GetOverlappedResult(True) + except OSError as e: + err = e.winerror + if err not in _ready_errors: + raise + if err != win32.ERROR_OPERATION_ABORTED: + o = waithandle_to_obj[ov.event] + ready_objects.add(o) + if err == 0: + # If o.fileno() is an overlapped pipe handle then + # a zero length message HAS been consumed. + if hasattr(o, '_got_empty_message'): + o._got_empty_message = True + + ready_objects.update(waithandle_to_obj[h] for h in ready_handles) + return [o for o in object_list if o in ready_objects] + +else: + + def wait(object_list, timeout=None): + ''' + Wait till an object in object_list is ready/readable. + + Returns list of those objects in object_list which are ready/readable. + ''' + if timeout is not None: + if timeout <= 0: + return select.select(object_list, [], [], 0)[0] + else: + deadline = time.time() + timeout + while True: + try: + return select.select(object_list, [], [], timeout)[0] + except OSError as e: + if e.errno != errno.EINTR: + raise + if timeout is not None: + timeout = deadline - time.time() + # Late import because of circular import from multiprocessing.forking import duplicate, close diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index c4f9cda..262fd85 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -44,7 +44,7 @@ import errno from queue import Empty, Full import _multiprocessing -from multiprocessing.connection import Pipe, SentinelReady +from multiprocessing.connection import Pipe from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.util import debug, info, Finalize, register_after_fork from multiprocessing.forking import assert_spawning @@ -360,6 +360,7 @@ class SimpleQueue(object): def __init__(self): self._reader, self._writer = Pipe(duplex=False) self._rlock = Lock() + self._poll = self._reader.poll if sys.platform == 'win32': self._wlock = None else: @@ -367,7 +368,7 @@ class SimpleQueue(object): self._make_methods() def empty(self): - return not self._reader.poll() + return not self._poll() def __getstate__(self): assert_spawning(self) @@ -380,10 +381,10 @@ class SimpleQueue(object): def _make_methods(self): recv = self._reader.recv racquire, rrelease = self._rlock.acquire, self._rlock.release - def get(*, sentinels=None): + def get(): racquire() try: - return recv(sentinels) + return recv() finally: rrelease() self.get = get |