summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/connection.py388
-rw-r--r--Lib/multiprocessing/dummy/__init__.py13
-rw-r--r--Lib/multiprocessing/forking.py40
-rw-r--r--Lib/multiprocessing/process.py20
-rw-r--r--Lib/multiprocessing/queues.py19
-rw-r--r--Lib/multiprocessing/reduction.py34
-rw-r--r--Lib/multiprocessing/util.py23
7 files changed, 462 insertions, 75 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index d6c23fb..0c96958 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -34,19 +34,31 @@
__all__ = [ 'Client', 'Listener', 'Pipe' ]
+import io
import os
import sys
+import pickle
+import select
import socket
+import struct
import errno
import time
import tempfile
import itertools
import _multiprocessing
-from multiprocessing import current_process, AuthenticationError
-from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
-from multiprocessing.forking import duplicate, close
+from multiprocessing import current_process, AuthenticationError, BufferTooShort
+from multiprocessing.util import (
+ get_temp_dir, Finalize, sub_debug, debug, _eintr_retry)
+try:
+ from _multiprocessing import win32
+ from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
+except ImportError:
+ if sys.platform == 'win32':
+ raise
+ win32 = None
+_select = _eintr_retry(select.select)
#
#
@@ -110,6 +122,326 @@ def address_type(address):
else:
raise ValueError('address type of %r unrecognized' % address)
+
+class SentinelReady(Exception):
+ """
+ Raised when a sentinel is ready when polling.
+ """
+ def __init__(self, *args):
+ Exception.__init__(self, *args)
+ self.sentinels = args[0]
+
+#
+# Connection classes
+#
+
+class _ConnectionBase:
+ _handle = None
+
+ def __init__(self, handle, readable=True, writable=True):
+ handle = handle.__index__()
+ if handle < 0:
+ raise ValueError("invalid handle")
+ if not readable and not writable:
+ raise ValueError(
+ "at least one of `readable` and `writable` must be True")
+ self._handle = handle
+ self._readable = readable
+ self._writable = writable
+
+ # XXX should we use util.Finalize instead of a __del__?
+
+ def __del__(self):
+ if self._handle is not None:
+ self._close()
+
+ def _check_closed(self):
+ if self._handle is None:
+ raise IOError("handle is closed")
+
+ def _check_readable(self):
+ if not self._readable:
+ raise IOError("connection is write-only")
+
+ def _check_writable(self):
+ if not self._writable:
+ raise IOError("connection is read-only")
+
+ def _bad_message_length(self):
+ if self._writable:
+ self._readable = False
+ else:
+ self.close()
+ raise IOError("bad message length")
+
+ @property
+ def closed(self):
+ """True if the connection is closed"""
+ return self._handle is None
+
+ @property
+ def readable(self):
+ """True if the connection is readable"""
+ return self._readable
+
+ @property
+ def writable(self):
+ """True if the connection is writable"""
+ return self._writable
+
+ def fileno(self):
+ """File descriptor or handle of the connection"""
+ self._check_closed()
+ return self._handle
+
+ def close(self):
+ """Close the connection"""
+ if self._handle is not None:
+ try:
+ self._close()
+ finally:
+ self._handle = None
+
+ def send_bytes(self, buf, offset=0, size=None):
+ """Send the bytes data from a bytes-like object"""
+ self._check_closed()
+ self._check_writable()
+ m = memoryview(buf)
+ # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
+ if m.itemsize > 1:
+ m = memoryview(bytes(m))
+ n = len(m)
+ if offset < 0:
+ raise ValueError("offset is negative")
+ if n < offset:
+ raise ValueError("buffer length < offset")
+ if size is None:
+ size = n - offset
+ elif size < 0:
+ raise ValueError("size is negative")
+ elif offset + size > n:
+ raise ValueError("buffer length < offset + size")
+ self._send_bytes(m[offset:offset + size])
+
+ def send(self, obj):
+ """Send a (picklable) object"""
+ self._check_closed()
+ self._check_writable()
+ buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
+ self._send_bytes(memoryview(buf))
+
+ def recv_bytes(self, maxlength=None):
+ """
+ Receive bytes data as a bytes object.
+ """
+ self._check_closed()
+ self._check_readable()
+ if maxlength is not None and maxlength < 0:
+ raise ValueError("negative maxlength")
+ buf = self._recv_bytes(maxlength)
+ if buf is None:
+ self._bad_message_length()
+ return buf.getvalue()
+
+ def recv_bytes_into(self, buf, offset=0):
+ """
+ Receive bytes data into a writeable buffer-like object.
+ Return the number of bytes read.
+ """
+ self._check_closed()
+ self._check_readable()
+ with memoryview(buf) as m:
+ # Get bytesize of arbitrary buffer
+ itemsize = m.itemsize
+ bytesize = itemsize * len(m)
+ if offset < 0:
+ raise ValueError("negative offset")
+ elif offset > bytesize:
+ raise ValueError("offset too large")
+ result = self._recv_bytes()
+ size = result.tell()
+ if bytesize < offset + size:
+ raise BufferTooShort(result.getvalue())
+ # Message can fit in dest
+ result.seek(0)
+ result.readinto(m[offset // itemsize :
+ (offset + size) // itemsize])
+ return size
+
+ def recv(self, sentinels=None):
+ """Receive a (picklable) object"""
+ self._check_closed()
+ self._check_readable()
+ buf = self._recv_bytes(sentinels=sentinels)
+ return pickle.loads(buf.getbuffer())
+
+ def poll(self, timeout=0.0):
+ """Whether there is any input available to be read"""
+ self._check_closed()
+ self._check_readable()
+ return self._poll(timeout)
+
+
+if win32:
+
+ class PipeConnection(_ConnectionBase):
+ """
+ Connection class based on a Windows named pipe.
+ Overlapped I/O is used, so the handles must have been created
+ with FILE_FLAG_OVERLAPPED.
+ """
+ _buffered = b''
+
+ def _close(self, _CloseHandle=win32.CloseHandle):
+ _CloseHandle(self._handle)
+
+ def _send_bytes(self, buf):
+ overlapped = win32.WriteFile(self._handle, buf, overlapped=True)
+ nwritten, complete = overlapped.GetOverlappedResult(True)
+ assert complete
+ assert nwritten == len(buf)
+
+ def _recv_bytes(self, maxsize=None, sentinels=()):
+ if sentinels:
+ self._poll(-1.0, sentinels)
+ buf = io.BytesIO()
+ firstchunk = self._buffered
+ if firstchunk:
+ lenfirstchunk = len(firstchunk)
+ buf.write(firstchunk)
+ self._buffered = b''
+ else:
+ # A reasonable size for the first chunk transfer
+ bufsize = 128
+ if maxsize is not None and maxsize < bufsize:
+ bufsize = maxsize
+ try:
+ overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True)
+ lenfirstchunk, complete = overlapped.GetOverlappedResult(True)
+ firstchunk = overlapped.getbuffer()
+ assert lenfirstchunk == len(firstchunk)
+ except IOError as e:
+ if e.winerror == win32.ERROR_BROKEN_PIPE:
+ raise EOFError
+ raise
+ buf.write(firstchunk)
+ if complete:
+ return buf
+ navail, nleft = win32.PeekNamedPipe(self._handle)
+ if maxsize is not None and lenfirstchunk + nleft > maxsize:
+ return None
+ if nleft > 0:
+ overlapped = win32.ReadFile(self._handle, nleft, overlapped=True)
+ res, complete = overlapped.GetOverlappedResult(True)
+ assert res == nleft
+ assert complete
+ buf.write(overlapped.getbuffer())
+ return buf
+
+ def _poll(self, timeout, sentinels=()):
+ # Fast non-blocking path
+ navail, nleft = win32.PeekNamedPipe(self._handle)
+ if navail > 0:
+ return True
+ elif timeout == 0.0:
+ return False
+ # Blocking: use overlapped I/O
+ if timeout < 0.0:
+ timeout = INFINITE
+ else:
+ timeout = int(timeout * 1000 + 0.5)
+ overlapped = win32.ReadFile(self._handle, 1, overlapped=True)
+ try:
+ handles = [overlapped.event]
+ handles += sentinels
+ res = win32.WaitForMultipleObjects(handles, False, timeout)
+ finally:
+ # Always cancel overlapped I/O in the same thread
+ # (because CancelIoEx() appears only in Vista)
+ overlapped.cancel()
+ if res == WAIT_TIMEOUT:
+ return False
+ idx = res - WAIT_OBJECT_0
+ if idx == 0:
+ # I/O was successful, store received data
+ overlapped.GetOverlappedResult(True)
+ self._buffered += overlapped.getbuffer()
+ return True
+ assert 0 < idx < len(handles)
+ raise SentinelReady([handles[idx]])
+
+
+class Connection(_ConnectionBase):
+ """
+ Connection class based on an arbitrary file descriptor (Unix only), or
+ a socket handle (Windows).
+ """
+
+ if win32:
+ def _close(self, _close=win32.closesocket):
+ _close(self._handle)
+ _write = win32.send
+ _read = win32.recv
+ else:
+ def _close(self, _close=os.close):
+ _close(self._handle)
+ _write = os.write
+ _read = os.read
+
+ def _send(self, buf, write=_write):
+ remaining = len(buf)
+ while True:
+ n = write(self._handle, buf)
+ remaining -= n
+ if remaining == 0:
+ break
+ buf = buf[n:]
+
+ def _recv(self, size, sentinels=(), read=_read):
+ buf = io.BytesIO()
+ handle = self._handle
+ if sentinels:
+ handles = [handle] + sentinels
+ remaining = size
+ while remaining > 0:
+ if sentinels:
+ r = _select(handles, [], [])[0]
+ if handle not in r:
+ raise SentinelReady(r)
+ chunk = read(handle, remaining)
+ n = len(chunk)
+ if n == 0:
+ if remaining == size:
+ raise EOFError
+ else:
+ raise IOError("got end of file during message")
+ buf.write(chunk)
+ remaining -= n
+ return buf
+
+ def _send_bytes(self, buf):
+ # For wire compatibility with 3.2 and lower
+ n = len(buf)
+ self._send(struct.pack("!i", n))
+ # The condition is necessary to avoid "broken pipe" errors
+ # when sending a 0-length buffer if the other end closed the pipe.
+ if n > 0:
+ self._send(buf)
+
+ def _recv_bytes(self, maxsize=None, sentinels=()):
+ buf = self._recv(4, sentinels)
+ size, = struct.unpack("!i", buf.getvalue())
+ if maxsize is not None and size > maxsize:
+ return None
+ return self._recv(size, sentinels)
+
+ def _poll(self, timeout):
+ if timeout < 0.0:
+ timeout = None
+ r = _select([self._handle], [], [], timeout)[0]
+ return bool(r)
+
+
#
# Public functions
#
@@ -186,21 +518,17 @@ if sys.platform != 'win32':
'''
if duplex:
s1, s2 = socket.socketpair()
- c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
- c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
- s1.close()
- s2.close()
+ c1 = Connection(s1.detach())
+ c2 = Connection(s2.detach())
else:
fd1, fd2 = os.pipe()
- c1 = _multiprocessing.Connection(fd1, writable=False)
- c2 = _multiprocessing.Connection(fd2, readable=False)
+ c1 = Connection(fd1, writable=False)
+ c2 = Connection(fd2, readable=False)
return c1, c2
else:
- from _multiprocessing import win32
-
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
@@ -216,26 +544,24 @@ else:
obsize, ibsize = 0, BUFSIZE
h1 = win32.CreateNamedPipe(
- address, openmode,
+ address, openmode | win32.FILE_FLAG_OVERLAPPED,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
h2 = win32.CreateFile(
- address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ address, access, 0, win32.NULL, win32.OPEN_EXISTING,
+ win32.FILE_FLAG_OVERLAPPED, win32.NULL
)
win32.SetNamedPipeHandleState(
h2, win32.PIPE_READMODE_MESSAGE, None, None
)
- try:
- win32.ConnectNamedPipe(h1, win32.NULL)
- except WindowsError as e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
- raise
+ overlapped = win32.ConnectNamedPipe(h1, overlapped=True)
+ overlapped.GetOverlappedResult(True)
- c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
- c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
+ c1 = PipeConnection(h1, writable=duplex)
+ c2 = PipeConnection(h2, readable=duplex)
return c1, c2
@@ -266,7 +592,7 @@ class SocketListener(object):
def accept(self):
s, self._last_accepted = self._socket.accept()
fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
+ conn = Connection(fd)
s.close()
return conn
@@ -298,7 +624,7 @@ def SocketClient(address):
raise
fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
+ conn = Connection(fd)
return conn
#
@@ -343,9 +669,9 @@ if sys.platform == 'win32':
try:
win32.ConnectNamedPipe(handle, win32.NULL)
except WindowsError as e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
+ if e.winerror != win32.ERROR_PIPE_CONNECTED:
raise
- return _multiprocessing.PipeConnection(handle)
+ return PipeConnection(handle)
@staticmethod
def _finalize_pipe_listener(queue, address):
@@ -366,8 +692,8 @@ if sys.platform == 'win32':
0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
)
except WindowsError as e:
- if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
- win32.ERROR_PIPE_BUSY) or _check_timeout(t):
+ if e.winerror not in (win32.ERROR_SEM_TIMEOUT,
+ win32.ERROR_PIPE_BUSY) or _check_timeout(t):
raise
else:
break
@@ -377,7 +703,7 @@ if sys.platform == 'win32':
win32.SetNamedPipeHandleState(
h, win32.PIPE_READMODE_MESSAGE, None, None
)
- return _multiprocessing.PipeConnection(h)
+ return PipeConnection(h)
#
# Authentication stuff
@@ -434,10 +760,10 @@ class ConnectionWrapper(object):
return self._loads(s)
def _xml_dumps(obj):
- return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
+ return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
def _xml_loads(s):
- (obj,), method = xmlrpclib.loads(s.decode('utf8'))
+ (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
return obj
class XmlListener(Listener):
@@ -451,3 +777,7 @@ def XmlClient(*args, **kwds):
global xmlrpclib
import xmlrpc.client as xmlrpclib
return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
+
+
+# Late import because of circular import
+from multiprocessing.forking import duplicate, close
diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py
index c4933d9..60add92 100644
--- a/Lib/multiprocessing/dummy/__init__.py
+++ b/Lib/multiprocessing/dummy/__init__.py
@@ -51,7 +51,7 @@ import itertools
from multiprocessing import TimeoutError, cpu_count
from multiprocessing.dummy.connection import Pipe
from threading import Lock, RLock, Semaphore, BoundedSemaphore
-from threading import Event
+from threading import Event, Condition
from queue import Queue
#
@@ -84,17 +84,6 @@ class DummyProcess(threading.Thread):
#
#
-class Condition(threading._Condition):
- # XXX
- if sys.version_info < (3, 0):
- notify_all = threading._Condition.notify_all.__func__
- else:
- notify_all = threading._Condition.notify_all
-
-#
-#
-#
-
Process = DummyProcess
current_process = threading.current_thread
current_process()._children = weakref.WeakKeyDictionary()
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index cc7c326..a2c61ef 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -35,6 +35,7 @@
import os
import sys
import signal
+import select
from multiprocessing import util, process
@@ -101,10 +102,12 @@ else:
if sys.platform != 'win32':
import time
+ import select
exit = os._exit
duplicate = os.dup
close = os.close
+ _select = util._eintr_retry(select.select)
#
# We define a Popen class similar to the one from subprocess, but
@@ -118,8 +121,12 @@ if sys.platform != 'win32':
sys.stderr.flush()
self.returncode = None
+ r, w = os.pipe()
+ self.sentinel = r
+
self.pid = os.fork()
if self.pid == 0:
+ os.close(r)
if 'random' in sys.modules:
import random
random.seed()
@@ -128,6 +135,11 @@ if sys.platform != 'win32':
sys.stderr.flush()
os._exit(code)
+ # `w` will be closed when the child exits, at which point `r`
+ # will become ready for reading (using e.g. select()).
+ os.close(w)
+ util.Finalize(self, os.close, (r,))
+
def poll(self, flag=os.WNOHANG):
if self.returncode is None:
try:
@@ -145,20 +157,14 @@ if sys.platform != 'win32':
return self.returncode
def wait(self, timeout=None):
- if timeout is None:
- return self.poll(0)
- deadline = time.time() + timeout
- delay = 0.0005
- while 1:
- res = self.poll()
- if res is not None:
- break
- remaining = deadline - time.time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, 0.05)
- time.sleep(delay)
- return res
+ if self.returncode is None:
+ if timeout is not None:
+ r = _select([self.sentinel], [], [], timeout)[0]
+ if not r:
+ return None
+ # This shouldn't block if select() returned successfully.
+ return self.poll(os.WNOHANG if timeout == 0.0 else 0)
+ return self.returncode
def terminate(self):
if self.returncode is None:
@@ -183,7 +189,7 @@ else:
import time
from pickle import dump, load, HIGHEST_PROTOCOL
- from _multiprocessing import win32, Connection, PipeConnection
+ from _multiprocessing import win32
from .util import Finalize
def dump(obj, file, protocol=None):
@@ -258,6 +264,7 @@ else:
self.pid = pid
self.returncode = None
self._handle = hp
+ self.sentinel = int(hp)
# send information to child
prep_data = get_preparation_data(process_obj._name)
@@ -411,6 +418,9 @@ else:
# Make (Pipe)Connection picklable
#
+ # Late import because of circular import
+ from .connection import Connection, PipeConnection
+
def reduce_connection(conn):
if not Popen.thread_is_spawning():
raise RuntimeError(
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
index 5987af9..98ce0da 100644
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -92,12 +92,16 @@ class Process(object):
'''
_Popen = None
- def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
+ def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
+ *, daemon=None):
assert group is None, 'group argument must be None for now'
count = next(_current_process._counter)
self._identity = _current_process._identity + (count,)
self._authkey = _current_process._authkey
- self._daemonic = _current_process._daemonic
+ if daemon is not None:
+ self._daemonic = daemon
+ else:
+ self._daemonic = _current_process._daemonic
self._tempdir = _current_process._tempdir
self._parent_pid = os.getpid()
self._popen = None
@@ -130,6 +134,7 @@ class Process(object):
else:
from .forking import Popen
self._popen = Popen(self)
+ self._sentinel = self._popen.sentinel
_current_process._children.add(self)
def terminate(self):
@@ -216,6 +221,17 @@ class Process(object):
pid = ident
+ @property
+ def sentinel(self):
+ '''
+ Return a file descriptor (Unix) or handle (Windows) suitable for
+ waiting for process termination.
+ '''
+ try:
+ return self._sentinel
+ except AttributeError:
+ raise ValueError("process not started")
+
def __repr__(self):
if self is _current_process:
status = 'started'
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 3280a25..bb4c7d8 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -41,10 +41,11 @@ import collections
import time
import atexit
import weakref
+import errno
from queue import Empty, Full
import _multiprocessing
-from multiprocessing import Pipe
+from multiprocessing.connection import Pipe, SentinelReady
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
from multiprocessing.forking import assert_spawning
@@ -67,6 +68,8 @@ class Queue(object):
else:
self._wlock = Lock()
self._sem = BoundedSemaphore(maxsize)
+ # For use by concurrent.futures
+ self._ignore_epipe = False
self._after_fork()
@@ -75,11 +78,11 @@ class Queue(object):
def __getstate__(self):
assert_spawning(self)
- return (self._maxsize, self._reader, self._writer,
+ return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def __setstate__(self, state):
- (self._maxsize, self._reader, self._writer,
+ (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._after_fork()
@@ -178,7 +181,7 @@ class Queue(object):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send,
- self._wlock, self._writer.close),
+ self._wlock, self._writer.close, self._ignore_epipe),
name='QueueFeederThread'
)
self._thread.daemon = True
@@ -229,7 +232,7 @@ class Queue(object):
notempty.release()
@staticmethod
- def _feed(buffer, notempty, send, writelock, close):
+ def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe')
from .util import is_exiting
@@ -271,6 +274,8 @@ class Queue(object):
except IndexError:
pass
except Exception as e:
+ if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
+ return
# Since this runs in a daemon thread the resources it uses
# may be become unusable while the process is cleaning up.
# We ignore errors which happen after the process has
@@ -372,10 +377,10 @@ class SimpleQueue(object):
def _make_methods(self):
recv = self._reader.recv
racquire, rrelease = self._rlock.acquire, self._rlock.release
- def get():
+ def get(*, sentinels=None):
racquire()
try:
- return recv()
+ return recv(sentinels)
finally:
rrelease()
self.get = get
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index 6e5e5bc..042a136 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -39,19 +39,21 @@ import os
import sys
import socket
import threading
+import struct
import _multiprocessing
from multiprocessing import current_process
from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.connection import Client, Listener
+from multiprocessing.connection import Client, Listener, Connection
#
#
#
-if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
+if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
+ hasattr(socket, 'SCM_RIGHTS'))):
raise ImportError('pickling of connections not supported')
#
@@ -77,10 +79,23 @@ if sys.platform == 'win32':
else:
def send_handle(conn, handle, destination_pid):
- _multiprocessing.sendfd(conn.fileno(), handle)
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
+ struct.pack("@i", handle))])
def recv_handle(conn):
- return _multiprocessing.recvfd(conn.fileno())
+ size = struct.calcsize("@i")
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
+ try:
+ cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+ if (cmsg_level == socket.SOL_SOCKET and
+ cmsg_type == socket.SCM_RIGHTS):
+ return struct.unpack("@i", cmsg_data[:size])[0]
+ except (ValueError, IndexError, struct.error):
+ pass
+ raise RuntimeError('Invalid data received')
+
#
# Support for a per-process server thread which caches pickled handles
@@ -159,7 +174,7 @@ def rebuild_handle(pickled_data):
return new_handle
#
-# Register `_multiprocessing.Connection` with `ForkingPickler`
+# Register `Connection` with `ForkingPickler`
#
def reduce_connection(conn):
@@ -168,11 +183,11 @@ def reduce_connection(conn):
def rebuild_connection(reduced_handle, readable, writable):
handle = rebuild_handle(reduced_handle)
- return _multiprocessing.Connection(
+ return Connection(
handle, readable=readable, writable=writable
)
-ForkingPickler.register(_multiprocessing.Connection, reduce_connection)
+ForkingPickler.register(Connection, reduce_connection)
#
# Register `socket.socket` with `ForkingPickler`
@@ -201,6 +216,7 @@ ForkingPickler.register(socket.socket, reduce_socket)
#
if sys.platform == 'win32':
+ from multiprocessing.connection import PipeConnection
def reduce_pipe_connection(conn):
rh = reduce_handle(conn.fileno())
@@ -208,8 +224,8 @@ if sys.platform == 'win32':
def rebuild_pipe_connection(reduced_handle, readable, writable):
handle = rebuild_handle(reduced_handle)
- return _multiprocessing.PipeConnection(
+ return PipeConnection(
handle, readable=readable, writable=writable
)
- ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection)
+ ForkingPickler.register(PipeConnection, reduce_pipe_connection)
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index 30b7a85..5c26683 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -32,9 +32,11 @@
# SUCH DAMAGE.
#
+import functools
import itertools
import weakref
import atexit
+import select
import threading # we want threading to install it's
# cleanup function before multiprocessing does
@@ -186,7 +188,11 @@ class Finalize(object):
_finalizer_registry[self._key] = self
- def __call__(self, wr=None):
+ def __call__(self, wr=None,
+ # Need to bind these locally because the globals can have
+ # been cleared at shutdown
+ _finalizer_registry=_finalizer_registry,
+ sub_debug=sub_debug):
'''
Run the callback unless it has already been called or cancelled
'''
@@ -315,3 +321,18 @@ class ForkAwareLocal(threading.local):
register_after_fork(self, lambda obj : obj.__dict__.clear())
def __reduce__(self):
return type(self), ()
+
+
+#
+# Automatic retry after EINTR
+#
+
+def _eintr_retry(func):
+ @functools.wraps(func)
+ def wrapped(*args, **kwargs):
+ while True:
+ try:
+ return func(*args, **kwargs)
+ except InterruptedError:
+ continue
+ return wrapped