diff options
Diffstat (limited to 'Lib/multiprocessing/connection.py')
| -rw-r--r-- | Lib/multiprocessing/connection.py | 659 | 
1 files changed, 541 insertions, 118 deletions
| diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 4fa6f70..fbbd5d9 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -4,49 +4,34 @@  # multiprocessing/connection.py  #  # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -#    notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -#    notice, this list of conditions and the following disclaimer in the -#    documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -#    used to endorse or promote products derived from this software -#    without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement.  # -__all__ = [ 'Client', 'Listener', 'Pipe' ] +__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] +import io  import os  import sys +import pickle +import select  import socket +import struct  import errno  import time  import tempfile  import itertools  import _multiprocessing -from multiprocessing import current_process, AuthenticationError +from multiprocessing import current_process, AuthenticationError, BufferTooShort  from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug -from multiprocessing.forking import duplicate, close - +from multiprocessing.forking import ForkingPickler +try: +    import _winapi +    from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE +except ImportError: +    if sys.platform == 'win32': +        raise +    _winapi = None  #  # @@ -122,6 +107,309 @@ def address_type(address):          raise ValueError('address type of %r unrecognized' % address)  # +# Connection classes +# + +class _ConnectionBase: +    _handle = None + +    def __init__(self, handle, readable=True, writable=True): +        handle = handle.__index__() +        if handle < 0: +            raise ValueError("invalid handle") +        if not readable and not writable: +            raise ValueError( +                "at least one of `readable` and `writable` must be True") +        self._handle = handle +        self._readable = readable +        self._writable = writable + +    # XXX should we use util.Finalize instead of a __del__? + +    def __del__(self): +        if self._handle is not None: +            self._close() + +    def _check_closed(self): +        if self._handle is None: +            raise IOError("handle is closed") + +    def _check_readable(self): +        if not self._readable: +            raise IOError("connection is write-only") + +    def _check_writable(self): +        if not self._writable: +            raise IOError("connection is read-only") + +    def _bad_message_length(self): +        if self._writable: +            self._readable = False +        else: +            self.close() +        raise IOError("bad message length") + +    @property +    def closed(self): +        """True if the connection is closed""" +        return self._handle is None + +    @property +    def readable(self): +        """True if the connection is readable""" +        return self._readable + +    @property +    def writable(self): +        """True if the connection is writable""" +        return self._writable + +    def fileno(self): +        """File descriptor or handle of the connection""" +        self._check_closed() +        return self._handle + +    def close(self): +        """Close the connection""" +        if self._handle is not None: +            try: +                self._close() +            finally: +                self._handle = None + +    def send_bytes(self, buf, offset=0, size=None): +        """Send the bytes data from a bytes-like object""" +        self._check_closed() +        self._check_writable() +        m = memoryview(buf) +        # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) +        if m.itemsize > 1: +            m = memoryview(bytes(m)) +        n = len(m) +        if offset < 0: +            raise ValueError("offset is negative") +        if n < offset: +            raise ValueError("buffer length < offset") +        if size is None: +            size = n - offset +        elif size < 0: +            raise ValueError("size is negative") +        elif offset + size > n: +            raise ValueError("buffer length < offset + size") +        self._send_bytes(m[offset:offset + size]) + +    def send(self, obj): +        """Send a (picklable) object""" +        self._check_closed() +        self._check_writable() +        buf = io.BytesIO() +        ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj) +        self._send_bytes(buf.getbuffer()) + +    def recv_bytes(self, maxlength=None): +        """ +        Receive bytes data as a bytes object. +        """ +        self._check_closed() +        self._check_readable() +        if maxlength is not None and maxlength < 0: +            raise ValueError("negative maxlength") +        buf = self._recv_bytes(maxlength) +        if buf is None: +            self._bad_message_length() +        return buf.getvalue() + +    def recv_bytes_into(self, buf, offset=0): +        """ +        Receive bytes data into a writeable buffer-like object. +        Return the number of bytes read. +        """ +        self._check_closed() +        self._check_readable() +        with memoryview(buf) as m: +            # Get bytesize of arbitrary buffer +            itemsize = m.itemsize +            bytesize = itemsize * len(m) +            if offset < 0: +                raise ValueError("negative offset") +            elif offset > bytesize: +                raise ValueError("offset too large") +            result = self._recv_bytes() +            size = result.tell() +            if bytesize < offset + size: +                raise BufferTooShort(result.getvalue()) +            # Message can fit in dest +            result.seek(0) +            result.readinto(m[offset // itemsize : +                              (offset + size) // itemsize]) +            return size + +    def recv(self): +        """Receive a (picklable) object""" +        self._check_closed() +        self._check_readable() +        buf = self._recv_bytes() +        return pickle.loads(buf.getbuffer()) + +    def poll(self, timeout=0.0): +        """Whether there is any input available to be read""" +        self._check_closed() +        self._check_readable() +        return self._poll(timeout) + +    def __enter__(self): +        return self + +    def __exit__(self, exc_type, exc_value, exc_tb): +        self.close() + + +if _winapi: + +    class PipeConnection(_ConnectionBase): +        """ +        Connection class based on a Windows named pipe. +        Overlapped I/O is used, so the handles must have been created +        with FILE_FLAG_OVERLAPPED. +        """ +        _got_empty_message = False + +        def _close(self, _CloseHandle=_winapi.CloseHandle): +            _CloseHandle(self._handle) + +        def _send_bytes(self, buf): +            ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) +            try: +                if err == _winapi.ERROR_IO_PENDING: +                    waitres = _winapi.WaitForMultipleObjects( +                        [ov.event], False, INFINITE) +                    assert waitres == WAIT_OBJECT_0 +            except: +                ov.cancel() +                raise +            finally: +                nwritten, err = ov.GetOverlappedResult(True) +            assert err == 0 +            assert nwritten == len(buf) + +        def _recv_bytes(self, maxsize=None): +            if self._got_empty_message: +                self._got_empty_message = False +                return io.BytesIO() +            else: +                bsize = 128 if maxsize is None else min(maxsize, 128) +                try: +                    ov, err = _winapi.ReadFile(self._handle, bsize, +                                                overlapped=True) +                    try: +                        if err == _winapi.ERROR_IO_PENDING: +                            waitres = _winapi.WaitForMultipleObjects( +                                [ov.event], False, INFINITE) +                            assert waitres == WAIT_OBJECT_0 +                    except: +                        ov.cancel() +                        raise +                    finally: +                        nread, err = ov.GetOverlappedResult(True) +                        if err == 0: +                            f = io.BytesIO() +                            f.write(ov.getbuffer()) +                            return f +                        elif err == _winapi.ERROR_MORE_DATA: +                            return self._get_more_data(ov, maxsize) +                except IOError as e: +                    if e.winerror == _winapi.ERROR_BROKEN_PIPE: +                        raise EOFError +                    else: +                        raise +            raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") + +        def _poll(self, timeout): +            if (self._got_empty_message or +                        _winapi.PeekNamedPipe(self._handle)[0] != 0): +                return True +            return bool(wait([self], timeout)) + +        def _get_more_data(self, ov, maxsize): +            buf = ov.getbuffer() +            f = io.BytesIO() +            f.write(buf) +            left = _winapi.PeekNamedPipe(self._handle)[1] +            assert left > 0 +            if maxsize is not None and len(buf) + left > maxsize: +                self._bad_message_length() +            ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) +            rbytes, err = ov.GetOverlappedResult(True) +            assert err == 0 +            assert rbytes == left +            f.write(ov.getbuffer()) +            return f + + +class Connection(_ConnectionBase): +    """ +    Connection class based on an arbitrary file descriptor (Unix only), or +    a socket handle (Windows). +    """ + +    if _winapi: +        def _close(self, _close=_multiprocessing.closesocket): +            _close(self._handle) +        _write = _multiprocessing.send +        _read = _multiprocessing.recv +    else: +        def _close(self, _close=os.close): +            _close(self._handle) +        _write = os.write +        _read = os.read + +    def _send(self, buf, write=_write): +        remaining = len(buf) +        while True: +            n = write(self._handle, buf) +            remaining -= n +            if remaining == 0: +                break +            buf = buf[n:] + +    def _recv(self, size, read=_read): +        buf = io.BytesIO() +        handle = self._handle +        remaining = size +        while remaining > 0: +            chunk = read(handle, remaining) +            n = len(chunk) +            if n == 0: +                if remaining == size: +                    raise EOFError +                else: +                    raise IOError("got end of file during message") +            buf.write(chunk) +            remaining -= n +        return buf + +    def _send_bytes(self, buf): +        # For wire compatibility with 3.2 and lower +        n = len(buf) +        self._send(struct.pack("!i", n)) +        # The condition is necessary to avoid "broken pipe" errors +        # when sending a 0-length buffer if the other end closed the pipe. +        if n > 0: +            self._send(buf) + +    def _recv_bytes(self, maxsize=None): +        buf = self._recv(4) +        size, = struct.unpack("!i", buf.getvalue()) +        if maxsize is not None and size > maxsize: +            return None +        return self._recv(size) + +    def _poll(self, timeout): +        r = wait([self._handle], timeout) +        return bool(r) + + +#  # Public functions  # @@ -154,6 +442,8 @@ class Listener(object):          Returns a `Connection` object.          ''' +        if self._listener is None: +            raise IOError('listener is closed')          c = self._listener.accept()          if self._authkey:              deliver_challenge(c, self._authkey) @@ -164,11 +454,19 @@ class Listener(object):          '''          Close the bound socket or named pipe of `self`.          ''' -        return self._listener.close() +        if self._listener is not None: +            self._listener.close() +            self._listener = None      address = property(lambda self: self._listener._address)      last_accepted = property(lambda self: self._listener._last_accepted) +    def __enter__(self): +        return self + +    def __exit__(self, exc_type, exc_value, exc_tb): +        self.close() +  def Client(address, family=None, authkey=None):      ''' @@ -201,56 +499,52 @@ if sys.platform != 'win32':              s1, s2 = socket.socketpair()              s1.setblocking(True)              s2.setblocking(True) -            c1 = _multiprocessing.Connection(os.dup(s1.fileno())) -            c2 = _multiprocessing.Connection(os.dup(s2.fileno())) -            s1.close() -            s2.close() +            c1 = Connection(s1.detach()) +            c2 = Connection(s2.detach())          else:              fd1, fd2 = os.pipe() -            c1 = _multiprocessing.Connection(fd1, writable=False) -            c2 = _multiprocessing.Connection(fd2, readable=False) +            c1 = Connection(fd1, writable=False) +            c2 = Connection(fd2, readable=False)          return c1, c2  else: -    from _multiprocessing import win32 -      def Pipe(duplex=True):          '''          Returns pair of connection objects at either end of a pipe          '''          address = arbitrary_address('AF_PIPE')          if duplex: -            openmode = win32.PIPE_ACCESS_DUPLEX -            access = win32.GENERIC_READ | win32.GENERIC_WRITE +            openmode = _winapi.PIPE_ACCESS_DUPLEX +            access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE              obsize, ibsize = BUFSIZE, BUFSIZE          else: -            openmode = win32.PIPE_ACCESS_INBOUND -            access = win32.GENERIC_WRITE +            openmode = _winapi.PIPE_ACCESS_INBOUND +            access = _winapi.GENERIC_WRITE              obsize, ibsize = 0, BUFSIZE -        h1 = win32.CreateNamedPipe( -            address, openmode, -            win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | -            win32.PIPE_WAIT, -            1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL +        h1 = _winapi.CreateNamedPipe( +            address, openmode | _winapi.FILE_FLAG_OVERLAPPED | +            _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, +            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | +            _winapi.PIPE_WAIT, +            1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL              ) -        h2 = win32.CreateFile( -            address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL +        h2 = _winapi.CreateFile( +            address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, +            _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL              ) -        win32.SetNamedPipeHandleState( -            h2, win32.PIPE_READMODE_MESSAGE, None, None +        _winapi.SetNamedPipeHandleState( +            h2, _winapi.PIPE_READMODE_MESSAGE, None, None              ) -        try: -            win32.ConnectNamedPipe(h1, win32.NULL) -        except WindowsError as e: -            if e.args[0] != win32.ERROR_PIPE_CONNECTED: -                raise +        overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) +        _, err = overlapped.GetOverlappedResult(True) +        assert err == 0 -        c1 = _multiprocessing.PipeConnection(h1, writable=duplex) -        c2 = _multiprocessing.PipeConnection(h2, readable=duplex) +        c1 = PipeConnection(h1, writable=duplex) +        c2 = PipeConnection(h2, readable=duplex)          return c1, c2 @@ -265,12 +559,15 @@ class SocketListener(object):      def __init__(self, address, family, backlog=1):          self._socket = socket.socket(getattr(socket, family))          try: -            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +            # SO_REUSEADDR has different semantics on Windows (issue #2550). +            if os.name == 'posix': +                self._socket.setsockopt(socket.SOL_SOCKET, +                                        socket.SO_REUSEADDR, 1)              self._socket.setblocking(True)              self._socket.bind(address)              self._socket.listen(backlog)              self._address = self._socket.getsockname() -        except socket.error: +        except OSError:              self._socket.close()              raise          self._family = family @@ -286,10 +583,7 @@ class SocketListener(object):      def accept(self):          s, self._last_accepted = self._socket.accept()          s.setblocking(True) -        fd = duplicate(s.fileno()) -        conn = _multiprocessing.Connection(fd) -        s.close() -        return conn +        return Connection(s.detach())      def close(self):          self._socket.close() @@ -304,24 +598,8 @@ def SocketClient(address):      family = address_type(address)      with socket.socket( getattr(socket, family) ) as s:          s.setblocking(True) -        t = _init_timeout() - -        while 1: -            try: -                s.connect(address) -            except socket.error as e: -                if e.args[0] != errno.ECONNREFUSED or _check_timeout(t): -                    debug('failed to connect to address %s', address) -                    raise -                time.sleep(0.01) -            else: -                break -        else: -            raise - -        fd = duplicate(s.fileno()) -    conn = _multiprocessing.Connection(fd) -    return conn +        s.connect(address) +        return Connection(s.detach())  #  # Definitions for connections based on named pipes @@ -335,48 +613,55 @@ if sys.platform == 'win32':          '''          def __init__(self, address, backlog=None):              self._address = address -            handle = win32.CreateNamedPipe( -                address, win32.PIPE_ACCESS_DUPLEX, -                win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | -                win32.PIPE_WAIT, -                win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, -                win32.NMPWAIT_WAIT_FOREVER, win32.NULL -                ) -            self._handle_queue = [handle] -            self._last_accepted = None +            self._handle_queue = [self._new_handle(first=True)] +            self._last_accepted = None              sub_debug('listener created with address=%r', self._address) -              self.close = Finalize(                  self, PipeListener._finalize_pipe_listener,                  args=(self._handle_queue, self._address), exitpriority=0                  ) -        def accept(self): -            newhandle = win32.CreateNamedPipe( -                self._address, win32.PIPE_ACCESS_DUPLEX, -                win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | -                win32.PIPE_WAIT, -                win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, -                win32.NMPWAIT_WAIT_FOREVER, win32.NULL +        def _new_handle(self, first=False): +            flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED +            if first: +                flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE +            return _winapi.CreateNamedPipe( +                self._address, flags, +                _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | +                _winapi.PIPE_WAIT, +                _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, +                _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL                  ) -            self._handle_queue.append(newhandle) + +        def accept(self): +            self._handle_queue.append(self._new_handle())              handle = self._handle_queue.pop(0)              try: -                win32.ConnectNamedPipe(handle, win32.NULL) -            except WindowsError as e: +                ov = _winapi.ConnectNamedPipe(handle, overlapped=True) +            except OSError as e: +                if e.winerror != _winapi.ERROR_NO_DATA: +                    raise                  # ERROR_NO_DATA can occur if a client has already connected,                  # written data and then disconnected -- see Issue 14725. -                if e.args[0] not in (win32.ERROR_PIPE_CONNECTED, -                                     win32.ERROR_NO_DATA): +            else: +                try: +                    res = _winapi.WaitForMultipleObjects( +                        [ov.event], False, INFINITE) +                except: +                    ov.cancel() +                    _winapi.CloseHandle(handle)                      raise -            return _multiprocessing.PipeConnection(handle) +                finally: +                    _, err = ov.GetOverlappedResult(True) +                    assert err == 0 +            return PipeConnection(handle)          @staticmethod          def _finalize_pipe_listener(queue, address):              sub_debug('closing listener with address=%r', address)              for handle in queue: -                close(handle) +                _winapi.CloseHandle(handle)      def PipeClient(address):          ''' @@ -385,24 +670,25 @@ if sys.platform == 'win32':          t = _init_timeout()          while 1:              try: -                win32.WaitNamedPipe(address, 1000) -                h = win32.CreateFile( -                    address, win32.GENERIC_READ | win32.GENERIC_WRITE, -                    0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL +                _winapi.WaitNamedPipe(address, 1000) +                h = _winapi.CreateFile( +                    address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, +                    0, _winapi.NULL, _winapi.OPEN_EXISTING, +                    _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL                      )              except WindowsError as e: -                if e.args[0] not in (win32.ERROR_SEM_TIMEOUT, -                                     win32.ERROR_PIPE_BUSY) or _check_timeout(t): +                if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, +                                      _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):                      raise              else:                  break          else:              raise -        win32.SetNamedPipeHandleState( -            h, win32.PIPE_READMODE_MESSAGE, None, None +        _winapi.SetNamedPipeHandleState( +            h, _winapi.PIPE_READMODE_MESSAGE, None, None              ) -        return _multiprocessing.PipeConnection(h) +        return PipeConnection(h)  #  # Authentication stuff @@ -459,10 +745,10 @@ class ConnectionWrapper(object):          return self._loads(s)  def _xml_dumps(obj): -    return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') +    return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')  def _xml_loads(s): -    (obj,), method = xmlrpclib.loads(s.decode('utf8')) +    (obj,), method = xmlrpclib.loads(s.decode('utf-8'))      return obj  class XmlListener(Listener): @@ -476,3 +762,140 @@ def XmlClient(*args, **kwds):      global xmlrpclib      import xmlrpc.client as xmlrpclib      return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) + +# +# Wait +# + +if sys.platform == 'win32': + +    def _exhaustive_wait(handles, timeout): +        # Return ALL handles which are currently signalled.  (Only +        # returning the first signalled might create starvation issues.) +        L = list(handles) +        ready = [] +        while L: +            res = _winapi.WaitForMultipleObjects(L, False, timeout) +            if res == WAIT_TIMEOUT: +                break +            elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): +                res -= WAIT_OBJECT_0 +            elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): +                res -= WAIT_ABANDONED_0 +            else: +                raise RuntimeError('Should not get here') +            ready.append(L[res]) +            L = L[res+1:] +            timeout = 0 +        return ready + +    _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} + +    def wait(object_list, timeout=None): +        ''' +        Wait till an object in object_list is ready/readable. + +        Returns list of those objects in object_list which are ready/readable. +        ''' +        if timeout is None: +            timeout = INFINITE +        elif timeout < 0: +            timeout = 0 +        else: +            timeout = int(timeout * 1000 + 0.5) + +        object_list = list(object_list) +        waithandle_to_obj = {} +        ov_list = [] +        ready_objects = set() +        ready_handles = set() + +        try: +            for o in object_list: +                try: +                    fileno = getattr(o, 'fileno') +                except AttributeError: +                    waithandle_to_obj[o.__index__()] = o +                else: +                    # start an overlapped read of length zero +                    try: +                        ov, err = _winapi.ReadFile(fileno(), 0, True) +                    except OSError as e: +                        err = e.winerror +                        if err not in _ready_errors: +                            raise +                    if err == _winapi.ERROR_IO_PENDING: +                        ov_list.append(ov) +                        waithandle_to_obj[ov.event] = o +                    else: +                        # If o.fileno() is an overlapped pipe handle and +                        # err == 0 then there is a zero length message +                        # in the pipe, but it HAS NOT been consumed. +                        ready_objects.add(o) +                        timeout = 0 + +            ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) +        finally: +            # request that overlapped reads stop +            for ov in ov_list: +                ov.cancel() + +            # wait for all overlapped reads to stop +            for ov in ov_list: +                try: +                    _, err = ov.GetOverlappedResult(True) +                except OSError as e: +                    err = e.winerror +                    if err not in _ready_errors: +                        raise +                if err != _winapi.ERROR_OPERATION_ABORTED: +                    o = waithandle_to_obj[ov.event] +                    ready_objects.add(o) +                    if err == 0: +                        # If o.fileno() is an overlapped pipe handle then +                        # a zero length message HAS been consumed. +                        if hasattr(o, '_got_empty_message'): +                            o._got_empty_message = True + +        ready_objects.update(waithandle_to_obj[h] for h in ready_handles) +        return [o for o in object_list if o in ready_objects] + +else: + +    def wait(object_list, timeout=None): +        ''' +        Wait till an object in object_list is ready/readable. + +        Returns list of those objects in object_list which are ready/readable. +        ''' +        if timeout is not None: +            if timeout <= 0: +                return select.select(object_list, [], [], 0)[0] +            else: +                deadline = time.time() + timeout +        while True: +            try: +                return select.select(object_list, [], [], timeout)[0] +            except OSError as e: +                if e.errno != errno.EINTR: +                    raise +            if timeout is not None: +                timeout = deadline - time.time() + +# +# Make connection and socket objects sharable if possible +# + +if sys.platform == 'win32': +    from . import reduction +    ForkingPickler.register(socket.socket, reduction.reduce_socket) +    ForkingPickler.register(Connection, reduction.reduce_connection) +    ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection) +else: +    try: +        from . import reduction +    except ImportError: +        pass +    else: +        ForkingPickler.register(socket.socket, reduction.reduce_socket) +        ForkingPickler.register(Connection, reduction.reduce_connection) | 
