summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/connection.py')
-rw-r--r--Lib/multiprocessing/connection.py553
1 files changed, 487 insertions, 66 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index fa6f7b3..954e901 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -32,21 +32,31 @@
# SUCH DAMAGE.
#
-__all__ = [ 'Client', 'Listener', 'Pipe' ]
+__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
+import io
import os
import sys
+import pickle
+import select
import socket
+import struct
import errno
import time
import tempfile
import itertools
import _multiprocessing
-from multiprocessing import current_process, AuthenticationError
-from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
-from multiprocessing.forking import duplicate, close
-
+from multiprocessing import current_process, AuthenticationError, BufferTooShort
+from multiprocessing.util import (
+ get_temp_dir, Finalize, sub_debug, debug, _eintr_retry)
+try:
+ from _multiprocessing import win32
+ from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
+except ImportError:
+ if sys.platform == 'win32':
+ raise
+ win32 = None
#
#
@@ -118,6 +128,306 @@ def address_type(address):
raise ValueError('address type of %r unrecognized' % address)
#
+# Connection classes
+#
+
+class _ConnectionBase:
+ _handle = None
+
+ def __init__(self, handle, readable=True, writable=True):
+ handle = handle.__index__()
+ if handle < 0:
+ raise ValueError("invalid handle")
+ if not readable and not writable:
+ raise ValueError(
+ "at least one of `readable` and `writable` must be True")
+ self._handle = handle
+ self._readable = readable
+ self._writable = writable
+
+ # XXX should we use util.Finalize instead of a __del__?
+
+ def __del__(self):
+ if self._handle is not None:
+ self._close()
+
+ def _check_closed(self):
+ if self._handle is None:
+ raise IOError("handle is closed")
+
+ def _check_readable(self):
+ if not self._readable:
+ raise IOError("connection is write-only")
+
+ def _check_writable(self):
+ if not self._writable:
+ raise IOError("connection is read-only")
+
+ def _bad_message_length(self):
+ if self._writable:
+ self._readable = False
+ else:
+ self.close()
+ raise IOError("bad message length")
+
+ @property
+ def closed(self):
+ """True if the connection is closed"""
+ return self._handle is None
+
+ @property
+ def readable(self):
+ """True if the connection is readable"""
+ return self._readable
+
+ @property
+ def writable(self):
+ """True if the connection is writable"""
+ return self._writable
+
+ def fileno(self):
+ """File descriptor or handle of the connection"""
+ self._check_closed()
+ return self._handle
+
+ def close(self):
+ """Close the connection"""
+ if self._handle is not None:
+ try:
+ self._close()
+ finally:
+ self._handle = None
+
+ def send_bytes(self, buf, offset=0, size=None):
+ """Send the bytes data from a bytes-like object"""
+ self._check_closed()
+ self._check_writable()
+ m = memoryview(buf)
+ # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
+ if m.itemsize > 1:
+ m = memoryview(bytes(m))
+ n = len(m)
+ if offset < 0:
+ raise ValueError("offset is negative")
+ if n < offset:
+ raise ValueError("buffer length < offset")
+ if size is None:
+ size = n - offset
+ elif size < 0:
+ raise ValueError("size is negative")
+ elif offset + size > n:
+ raise ValueError("buffer length < offset + size")
+ self._send_bytes(m[offset:offset + size])
+
+ def send(self, obj):
+ """Send a (picklable) object"""
+ self._check_closed()
+ self._check_writable()
+ buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
+ self._send_bytes(memoryview(buf))
+
+ def recv_bytes(self, maxlength=None):
+ """
+ Receive bytes data as a bytes object.
+ """
+ self._check_closed()
+ self._check_readable()
+ if maxlength is not None and maxlength < 0:
+ raise ValueError("negative maxlength")
+ buf = self._recv_bytes(maxlength)
+ if buf is None:
+ self._bad_message_length()
+ return buf.getvalue()
+
+ def recv_bytes_into(self, buf, offset=0):
+ """
+ Receive bytes data into a writeable buffer-like object.
+ Return the number of bytes read.
+ """
+ self._check_closed()
+ self._check_readable()
+ with memoryview(buf) as m:
+ # Get bytesize of arbitrary buffer
+ itemsize = m.itemsize
+ bytesize = itemsize * len(m)
+ if offset < 0:
+ raise ValueError("negative offset")
+ elif offset > bytesize:
+ raise ValueError("offset too large")
+ result = self._recv_bytes()
+ size = result.tell()
+ if bytesize < offset + size:
+ raise BufferTooShort(result.getvalue())
+ # Message can fit in dest
+ result.seek(0)
+ result.readinto(m[offset // itemsize :
+ (offset + size) // itemsize])
+ return size
+
+ def recv(self):
+ """Receive a (picklable) object"""
+ self._check_closed()
+ self._check_readable()
+ buf = self._recv_bytes()
+ return pickle.loads(buf.getbuffer())
+
+ def poll(self, timeout=0.0):
+ """Whether there is any input available to be read"""
+ self._check_closed()
+ self._check_readable()
+ return self._poll(timeout)
+
+
+if win32:
+
+ class PipeConnection(_ConnectionBase):
+ """
+ Connection class based on a Windows named pipe.
+ Overlapped I/O is used, so the handles must have been created
+ with FILE_FLAG_OVERLAPPED.
+ """
+ _got_empty_message = False
+
+ def _close(self, _CloseHandle=win32.CloseHandle):
+ _CloseHandle(self._handle)
+
+ def _send_bytes(self, buf):
+ ov, err = win32.WriteFile(self._handle, buf, overlapped=True)
+ try:
+ if err == win32.ERROR_IO_PENDING:
+ waitres = win32.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ assert waitres == WAIT_OBJECT_0
+ except:
+ ov.cancel()
+ raise
+ finally:
+ nwritten, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ assert nwritten == len(buf)
+
+ def _recv_bytes(self, maxsize=None):
+ if self._got_empty_message:
+ self._got_empty_message = False
+ return io.BytesIO()
+ else:
+ bsize = 128 if maxsize is None else min(maxsize, 128)
+ try:
+ ov, err = win32.ReadFile(self._handle, bsize,
+ overlapped=True)
+ try:
+ if err == win32.ERROR_IO_PENDING:
+ waitres = win32.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ assert waitres == WAIT_OBJECT_0
+ except:
+ ov.cancel()
+ raise
+ finally:
+ nread, err = ov.GetOverlappedResult(True)
+ if err == 0:
+ f = io.BytesIO()
+ f.write(ov.getbuffer())
+ return f
+ elif err == win32.ERROR_MORE_DATA:
+ return self._get_more_data(ov, maxsize)
+ except IOError as e:
+ if e.winerror == win32.ERROR_BROKEN_PIPE:
+ raise EOFError
+ else:
+ raise
+ raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
+
+ def _poll(self, timeout):
+ if (self._got_empty_message or
+ win32.PeekNamedPipe(self._handle)[0] != 0):
+ return True
+ if timeout < 0:
+ timeout = None
+ return bool(wait([self], timeout))
+
+ def _get_more_data(self, ov, maxsize):
+ buf = ov.getbuffer()
+ f = io.BytesIO()
+ f.write(buf)
+ left = win32.PeekNamedPipe(self._handle)[1]
+ assert left > 0
+ if maxsize is not None and len(buf) + left > maxsize:
+ self._bad_message_length()
+ ov, err = win32.ReadFile(self._handle, left, overlapped=True)
+ rbytes, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ assert rbytes == left
+ f.write(ov.getbuffer())
+ return f
+
+
+class Connection(_ConnectionBase):
+ """
+ Connection class based on an arbitrary file descriptor (Unix only), or
+ a socket handle (Windows).
+ """
+
+ if win32:
+ def _close(self, _close=win32.closesocket):
+ _close(self._handle)
+ _write = win32.send
+ _read = win32.recv
+ else:
+ def _close(self, _close=os.close):
+ _close(self._handle)
+ _write = os.write
+ _read = os.read
+
+ def _send(self, buf, write=_write):
+ remaining = len(buf)
+ while True:
+ n = write(self._handle, buf)
+ remaining -= n
+ if remaining == 0:
+ break
+ buf = buf[n:]
+
+ def _recv(self, size, read=_read):
+ buf = io.BytesIO()
+ handle = self._handle
+ remaining = size
+ while remaining > 0:
+ chunk = read(handle, remaining)
+ n = len(chunk)
+ if n == 0:
+ if remaining == size:
+ raise EOFError
+ else:
+ raise IOError("got end of file during message")
+ buf.write(chunk)
+ remaining -= n
+ return buf
+
+ def _send_bytes(self, buf):
+ # For wire compatibility with 3.2 and lower
+ n = len(buf)
+ self._send(struct.pack("!i", n))
+ # The condition is necessary to avoid "broken pipe" errors
+ # when sending a 0-length buffer if the other end closed the pipe.
+ if n > 0:
+ self._send(buf)
+
+ def _recv_bytes(self, maxsize=None):
+ buf = self._recv(4)
+ size, = struct.unpack("!i", buf.getvalue())
+ if maxsize is not None and size > maxsize:
+ return None
+ return self._recv(size)
+
+ def _poll(self, timeout):
+ if timeout < 0.0:
+ timeout = None
+ r = wait([self._handle], timeout)
+ return bool(r)
+
+
+#
# Public functions
#
@@ -195,21 +505,17 @@ if sys.platform != 'win32':
'''
if duplex:
s1, s2 = socket.socketpair()
- c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
- c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
- s1.close()
- s2.close()
+ c1 = Connection(s1.detach())
+ c2 = Connection(s2.detach())
else:
fd1, fd2 = os.pipe()
- c1 = _multiprocessing.Connection(fd1, writable=False)
- c2 = _multiprocessing.Connection(fd2, readable=False)
+ c1 = Connection(fd1, writable=False)
+ c2 = Connection(fd2, readable=False)
return c1, c2
else:
- from _multiprocessing import win32
-
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
@@ -225,26 +531,26 @@ else:
obsize, ibsize = 0, BUFSIZE
h1 = win32.CreateNamedPipe(
- address, openmode,
+ address, openmode | win32.FILE_FLAG_OVERLAPPED |
+ win32.FILE_FLAG_FIRST_PIPE_INSTANCE,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
h2 = win32.CreateFile(
- address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ address, access, 0, win32.NULL, win32.OPEN_EXISTING,
+ win32.FILE_FLAG_OVERLAPPED, win32.NULL
)
win32.SetNamedPipeHandleState(
h2, win32.PIPE_READMODE_MESSAGE, None, None
)
- try:
- win32.ConnectNamedPipe(h1, win32.NULL)
- except WindowsError as e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
- raise
+ overlapped = win32.ConnectNamedPipe(h1, overlapped=True)
+ _, err = overlapped.GetOverlappedResult(True)
+ assert err == 0
- c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
- c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
+ c1 = PipeConnection(h1, writable=duplex)
+ c2 = PipeConnection(h2, readable=duplex)
return c1, c2
@@ -259,11 +565,14 @@ class SocketListener(object):
def __init__(self, address, family, backlog=1):
self._socket = socket.socket(getattr(socket, family))
try:
- self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ # SO_REUSEADDR has different semantics on Windows (issue #2550).
+ if os.name == 'posix':
+ self._socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR, 1)
self._socket.bind(address)
self._socket.listen(backlog)
self._address = self._socket.getsockname()
- except socket.error:
+ except OSError:
self._socket.close()
raise
self._family = family
@@ -279,7 +588,7 @@ class SocketListener(object):
def accept(self):
s, self._last_accepted = self._socket.accept()
fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
+ conn = Connection(fd)
s.close()
return conn
@@ -295,23 +604,9 @@ def SocketClient(address):
'''
family = address_type(address)
with socket.socket( getattr(socket, family) ) as s:
- t = _init_timeout()
-
- while 1:
- try:
- s.connect(address)
- except socket.error as e:
- if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
- debug('failed to connect to address %s', address)
- raise
- time.sleep(0.01)
- else:
- break
- else:
- raise
-
+ s.connect(address)
fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
+ conn = Connection(fd)
return conn
#
@@ -326,39 +621,41 @@ if sys.platform == 'win32':
'''
def __init__(self, address, backlog=None):
self._address = address
- handle = win32.CreateNamedPipe(
- address, win32.PIPE_ACCESS_DUPLEX,
- win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
- win32.PIPE_WAIT,
- win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
- win32.NMPWAIT_WAIT_FOREVER, win32.NULL
- )
- self._handle_queue = [handle]
- self._last_accepted = None
+ self._handle_queue = [self._new_handle(first=True)]
+ self._last_accepted = None
sub_debug('listener created with address=%r', self._address)
-
self.close = Finalize(
self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0
)
- def accept(self):
- newhandle = win32.CreateNamedPipe(
- self._address, win32.PIPE_ACCESS_DUPLEX,
+ def _new_handle(self, first=False):
+ flags = win32.PIPE_ACCESS_DUPLEX | win32.FILE_FLAG_OVERLAPPED
+ if first:
+ flags |= win32.FILE_FLAG_FIRST_PIPE_INSTANCE
+ return win32.CreateNamedPipe(
+ self._address, flags,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
- self._handle_queue.append(newhandle)
+
+ def accept(self):
+ self._handle_queue.append(self._new_handle())
handle = self._handle_queue.pop(0)
+ ov = win32.ConnectNamedPipe(handle, overlapped=True)
try:
- win32.ConnectNamedPipe(handle, win32.NULL)
- except WindowsError as e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
- raise
- return _multiprocessing.PipeConnection(handle)
+ res = win32.WaitForMultipleObjects([ov.event], False, INFINITE)
+ except:
+ ov.cancel()
+ win32.CloseHandle(handle)
+ raise
+ finally:
+ _, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ return PipeConnection(handle)
@staticmethod
def _finalize_pipe_listener(queue, address):
@@ -376,11 +673,12 @@ if sys.platform == 'win32':
win32.WaitNamedPipe(address, 1000)
h = win32.CreateFile(
address, win32.GENERIC_READ | win32.GENERIC_WRITE,
- 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ 0, win32.NULL, win32.OPEN_EXISTING,
+ win32.FILE_FLAG_OVERLAPPED, win32.NULL
)
except WindowsError as e:
- if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
- win32.ERROR_PIPE_BUSY) or _check_timeout(t):
+ if e.winerror not in (win32.ERROR_SEM_TIMEOUT,
+ win32.ERROR_PIPE_BUSY) or _check_timeout(t):
raise
else:
break
@@ -390,7 +688,7 @@ if sys.platform == 'win32':
win32.SetNamedPipeHandleState(
h, win32.PIPE_READMODE_MESSAGE, None, None
)
- return _multiprocessing.PipeConnection(h)
+ return PipeConnection(h)
#
# Authentication stuff
@@ -447,10 +745,10 @@ class ConnectionWrapper(object):
return self._loads(s)
def _xml_dumps(obj):
- return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
+ return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
def _xml_loads(s):
- (obj,), method = xmlrpclib.loads(s.decode('utf8'))
+ (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
return obj
class XmlListener(Listener):
@@ -464,3 +762,126 @@ def XmlClient(*args, **kwds):
global xmlrpclib
import xmlrpc.client as xmlrpclib
return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
+
+#
+# Wait
+#
+
+if sys.platform == 'win32':
+
+ def _exhaustive_wait(handles, timeout):
+ # Return ALL handles which are currently signalled. (Only
+ # returning the first signalled might create starvation issues.)
+ L = list(handles)
+ ready = []
+ while L:
+ res = win32.WaitForMultipleObjects(L, False, timeout)
+ if res == WAIT_TIMEOUT:
+ break
+ elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
+ res -= WAIT_OBJECT_0
+ elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
+ res -= WAIT_ABANDONED_0
+ else:
+ raise RuntimeError('Should not get here')
+ ready.append(L[res])
+ L = L[res+1:]
+ timeout = 0
+ return ready
+
+ _ready_errors = {win32.ERROR_BROKEN_PIPE, win32.ERROR_NETNAME_DELETED}
+
+ def wait(object_list, timeout=None):
+ '''
+ Wait till an object in object_list is ready/readable.
+
+ Returns list of those objects in object_list which are ready/readable.
+ '''
+ if timeout is None:
+ timeout = INFINITE
+ elif timeout < 0:
+ timeout = 0
+ else:
+ timeout = int(timeout * 1000 + 0.5)
+
+ object_list = list(object_list)
+ waithandle_to_obj = {}
+ ov_list = []
+ ready_objects = set()
+ ready_handles = set()
+
+ try:
+ for o in object_list:
+ try:
+ fileno = getattr(o, 'fileno')
+ except AttributeError:
+ waithandle_to_obj[o.__index__()] = o
+ else:
+ # start an overlapped read of length zero
+ try:
+ ov, err = win32.ReadFile(fileno(), 0, True)
+ except OSError as e:
+ err = e.winerror
+ if err not in _ready_errors:
+ raise
+ if err == win32.ERROR_IO_PENDING:
+ ov_list.append(ov)
+ waithandle_to_obj[ov.event] = o
+ else:
+ # If o.fileno() is an overlapped pipe handle and
+ # err == 0 then there is a zero length message
+ # in the pipe, but it HAS NOT been consumed.
+ ready_objects.add(o)
+ timeout = 0
+
+ ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
+ finally:
+ # request that overlapped reads stop
+ for ov in ov_list:
+ ov.cancel()
+
+ # wait for all overlapped reads to stop
+ for ov in ov_list:
+ try:
+ _, err = ov.GetOverlappedResult(True)
+ except OSError as e:
+ err = e.winerror
+ if err not in _ready_errors:
+ raise
+ if err != win32.ERROR_OPERATION_ABORTED:
+ o = waithandle_to_obj[ov.event]
+ ready_objects.add(o)
+ if err == 0:
+ # If o.fileno() is an overlapped pipe handle then
+ # a zero length message HAS been consumed.
+ if hasattr(o, '_got_empty_message'):
+ o._got_empty_message = True
+
+ ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
+ return [o for o in object_list if o in ready_objects]
+
+else:
+
+ def wait(object_list, timeout=None):
+ '''
+ Wait till an object in object_list is ready/readable.
+
+ Returns list of those objects in object_list which are ready/readable.
+ '''
+ if timeout is not None:
+ if timeout <= 0:
+ return select.select(object_list, [], [], 0)[0]
+ else:
+ deadline = time.time() + timeout
+ while True:
+ try:
+ return select.select(object_list, [], [], timeout)[0]
+ except OSError as e:
+ if e.errno != errno.EINTR:
+ raise
+ if timeout is not None:
+ timeout = deadline - time.time()
+
+
+# Late import because of circular import
+from multiprocessing.forking import duplicate, close