diff options
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/connection.py | 388 | ||||
-rw-r--r-- | Lib/multiprocessing/dummy/__init__.py | 13 | ||||
-rw-r--r-- | Lib/multiprocessing/forking.py | 40 | ||||
-rw-r--r-- | Lib/multiprocessing/process.py | 20 | ||||
-rw-r--r-- | Lib/multiprocessing/queues.py | 19 | ||||
-rw-r--r-- | Lib/multiprocessing/reduction.py | 34 | ||||
-rw-r--r-- | Lib/multiprocessing/util.py | 23 |
7 files changed, 462 insertions, 75 deletions
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index d6c23fb..0c96958 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -34,19 +34,31 @@ __all__ = [ 'Client', 'Listener', 'Pipe' ] +import io import os import sys +import pickle +import select import socket +import struct import errno import time import tempfile import itertools import _multiprocessing -from multiprocessing import current_process, AuthenticationError -from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug -from multiprocessing.forking import duplicate, close +from multiprocessing import current_process, AuthenticationError, BufferTooShort +from multiprocessing.util import ( + get_temp_dir, Finalize, sub_debug, debug, _eintr_retry) +try: + from _multiprocessing import win32 + from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE +except ImportError: + if sys.platform == 'win32': + raise + win32 = None +_select = _eintr_retry(select.select) # # @@ -110,6 +122,326 @@ def address_type(address): else: raise ValueError('address type of %r unrecognized' % address) + +class SentinelReady(Exception): + """ + Raised when a sentinel is ready when polling. + """ + def __init__(self, *args): + Exception.__init__(self, *args) + self.sentinels = args[0] + +# +# Connection classes +# + +class _ConnectionBase: + _handle = None + + def __init__(self, handle, readable=True, writable=True): + handle = handle.__index__() + if handle < 0: + raise ValueError("invalid handle") + if not readable and not writable: + raise ValueError( + "at least one of `readable` and `writable` must be True") + self._handle = handle + self._readable = readable + self._writable = writable + + # XXX should we use util.Finalize instead of a __del__? + + def __del__(self): + if self._handle is not None: + self._close() + + def _check_closed(self): + if self._handle is None: + raise IOError("handle is closed") + + def _check_readable(self): + if not self._readable: + raise IOError("connection is write-only") + + def _check_writable(self): + if not self._writable: + raise IOError("connection is read-only") + + def _bad_message_length(self): + if self._writable: + self._readable = False + else: + self.close() + raise IOError("bad message length") + + @property + def closed(self): + """True if the connection is closed""" + return self._handle is None + + @property + def readable(self): + """True if the connection is readable""" + return self._readable + + @property + def writable(self): + """True if the connection is writable""" + return self._writable + + def fileno(self): + """File descriptor or handle of the connection""" + self._check_closed() + return self._handle + + def close(self): + """Close the connection""" + if self._handle is not None: + try: + self._close() + finally: + self._handle = None + + def send_bytes(self, buf, offset=0, size=None): + """Send the bytes data from a bytes-like object""" + self._check_closed() + self._check_writable() + m = memoryview(buf) + # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) + if m.itemsize > 1: + m = memoryview(bytes(m)) + n = len(m) + if offset < 0: + raise ValueError("offset is negative") + if n < offset: + raise ValueError("buffer length < offset") + if size is None: + size = n - offset + elif size < 0: + raise ValueError("size is negative") + elif offset + size > n: + raise ValueError("buffer length < offset + size") + self._send_bytes(m[offset:offset + size]) + + def send(self, obj): + """Send a (picklable) object""" + self._check_closed() + self._check_writable() + buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL) + self._send_bytes(memoryview(buf)) + + def recv_bytes(self, maxlength=None): + """ + Receive bytes data as a bytes object. + """ + self._check_closed() + self._check_readable() + if maxlength is not None and maxlength < 0: + raise ValueError("negative maxlength") + buf = self._recv_bytes(maxlength) + if buf is None: + self._bad_message_length() + return buf.getvalue() + + def recv_bytes_into(self, buf, offset=0): + """ + Receive bytes data into a writeable buffer-like object. + Return the number of bytes read. + """ + self._check_closed() + self._check_readable() + with memoryview(buf) as m: + # Get bytesize of arbitrary buffer + itemsize = m.itemsize + bytesize = itemsize * len(m) + if offset < 0: + raise ValueError("negative offset") + elif offset > bytesize: + raise ValueError("offset too large") + result = self._recv_bytes() + size = result.tell() + if bytesize < offset + size: + raise BufferTooShort(result.getvalue()) + # Message can fit in dest + result.seek(0) + result.readinto(m[offset // itemsize : + (offset + size) // itemsize]) + return size + + def recv(self, sentinels=None): + """Receive a (picklable) object""" + self._check_closed() + self._check_readable() + buf = self._recv_bytes(sentinels=sentinels) + return pickle.loads(buf.getbuffer()) + + def poll(self, timeout=0.0): + """Whether there is any input available to be read""" + self._check_closed() + self._check_readable() + return self._poll(timeout) + + +if win32: + + class PipeConnection(_ConnectionBase): + """ + Connection class based on a Windows named pipe. + Overlapped I/O is used, so the handles must have been created + with FILE_FLAG_OVERLAPPED. + """ + _buffered = b'' + + def _close(self, _CloseHandle=win32.CloseHandle): + _CloseHandle(self._handle) + + def _send_bytes(self, buf): + overlapped = win32.WriteFile(self._handle, buf, overlapped=True) + nwritten, complete = overlapped.GetOverlappedResult(True) + assert complete + assert nwritten == len(buf) + + def _recv_bytes(self, maxsize=None, sentinels=()): + if sentinels: + self._poll(-1.0, sentinels) + buf = io.BytesIO() + firstchunk = self._buffered + if firstchunk: + lenfirstchunk = len(firstchunk) + buf.write(firstchunk) + self._buffered = b'' + else: + # A reasonable size for the first chunk transfer + bufsize = 128 + if maxsize is not None and maxsize < bufsize: + bufsize = maxsize + try: + overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True) + lenfirstchunk, complete = overlapped.GetOverlappedResult(True) + firstchunk = overlapped.getbuffer() + assert lenfirstchunk == len(firstchunk) + except IOError as e: + if e.winerror == win32.ERROR_BROKEN_PIPE: + raise EOFError + raise + buf.write(firstchunk) + if complete: + return buf + navail, nleft = win32.PeekNamedPipe(self._handle) + if maxsize is not None and lenfirstchunk + nleft > maxsize: + return None + if nleft > 0: + overlapped = win32.ReadFile(self._handle, nleft, overlapped=True) + res, complete = overlapped.GetOverlappedResult(True) + assert res == nleft + assert complete + buf.write(overlapped.getbuffer()) + return buf + + def _poll(self, timeout, sentinels=()): + # Fast non-blocking path + navail, nleft = win32.PeekNamedPipe(self._handle) + if navail > 0: + return True + elif timeout == 0.0: + return False + # Blocking: use overlapped I/O + if timeout < 0.0: + timeout = INFINITE + else: + timeout = int(timeout * 1000 + 0.5) + overlapped = win32.ReadFile(self._handle, 1, overlapped=True) + try: + handles = [overlapped.event] + handles += sentinels + res = win32.WaitForMultipleObjects(handles, False, timeout) + finally: + # Always cancel overlapped I/O in the same thread + # (because CancelIoEx() appears only in Vista) + overlapped.cancel() + if res == WAIT_TIMEOUT: + return False + idx = res - WAIT_OBJECT_0 + if idx == 0: + # I/O was successful, store received data + overlapped.GetOverlappedResult(True) + self._buffered += overlapped.getbuffer() + return True + assert 0 < idx < len(handles) + raise SentinelReady([handles[idx]]) + + +class Connection(_ConnectionBase): + """ + Connection class based on an arbitrary file descriptor (Unix only), or + a socket handle (Windows). + """ + + if win32: + def _close(self, _close=win32.closesocket): + _close(self._handle) + _write = win32.send + _read = win32.recv + else: + def _close(self, _close=os.close): + _close(self._handle) + _write = os.write + _read = os.read + + def _send(self, buf, write=_write): + remaining = len(buf) + while True: + n = write(self._handle, buf) + remaining -= n + if remaining == 0: + break + buf = buf[n:] + + def _recv(self, size, sentinels=(), read=_read): + buf = io.BytesIO() + handle = self._handle + if sentinels: + handles = [handle] + sentinels + remaining = size + while remaining > 0: + if sentinels: + r = _select(handles, [], [])[0] + if handle not in r: + raise SentinelReady(r) + chunk = read(handle, remaining) + n = len(chunk) + if n == 0: + if remaining == size: + raise EOFError + else: + raise IOError("got end of file during message") + buf.write(chunk) + remaining -= n + return buf + + def _send_bytes(self, buf): + # For wire compatibility with 3.2 and lower + n = len(buf) + self._send(struct.pack("!i", n)) + # The condition is necessary to avoid "broken pipe" errors + # when sending a 0-length buffer if the other end closed the pipe. + if n > 0: + self._send(buf) + + def _recv_bytes(self, maxsize=None, sentinels=()): + buf = self._recv(4, sentinels) + size, = struct.unpack("!i", buf.getvalue()) + if maxsize is not None and size > maxsize: + return None + return self._recv(size, sentinels) + + def _poll(self, timeout): + if timeout < 0.0: + timeout = None + r = _select([self._handle], [], [], timeout)[0] + return bool(r) + + # # Public functions # @@ -186,21 +518,17 @@ if sys.platform != 'win32': ''' if duplex: s1, s2 = socket.socketpair() - c1 = _multiprocessing.Connection(os.dup(s1.fileno())) - c2 = _multiprocessing.Connection(os.dup(s2.fileno())) - s1.close() - s2.close() + c1 = Connection(s1.detach()) + c2 = Connection(s2.detach()) else: fd1, fd2 = os.pipe() - c1 = _multiprocessing.Connection(fd1, writable=False) - c2 = _multiprocessing.Connection(fd2, readable=False) + c1 = Connection(fd1, writable=False) + c2 = Connection(fd2, readable=False) return c1, c2 else: - from _multiprocessing import win32 - def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe @@ -216,26 +544,24 @@ else: obsize, ibsize = 0, BUFSIZE h1 = win32.CreateNamedPipe( - address, openmode, + address, openmode | win32.FILE_FLAG_OVERLAPPED, win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_WAIT, 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL ) h2 = win32.CreateFile( - address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL + address, access, 0, win32.NULL, win32.OPEN_EXISTING, + win32.FILE_FLAG_OVERLAPPED, win32.NULL ) win32.SetNamedPipeHandleState( h2, win32.PIPE_READMODE_MESSAGE, None, None ) - try: - win32.ConnectNamedPipe(h1, win32.NULL) - except WindowsError as e: - if e.args[0] != win32.ERROR_PIPE_CONNECTED: - raise + overlapped = win32.ConnectNamedPipe(h1, overlapped=True) + overlapped.GetOverlappedResult(True) - c1 = _multiprocessing.PipeConnection(h1, writable=duplex) - c2 = _multiprocessing.PipeConnection(h2, readable=duplex) + c1 = PipeConnection(h1, writable=duplex) + c2 = PipeConnection(h2, readable=duplex) return c1, c2 @@ -266,7 +592,7 @@ class SocketListener(object): def accept(self): s, self._last_accepted = self._socket.accept() fd = duplicate(s.fileno()) - conn = _multiprocessing.Connection(fd) + conn = Connection(fd) s.close() return conn @@ -298,7 +624,7 @@ def SocketClient(address): raise fd = duplicate(s.fileno()) - conn = _multiprocessing.Connection(fd) + conn = Connection(fd) return conn # @@ -343,9 +669,9 @@ if sys.platform == 'win32': try: win32.ConnectNamedPipe(handle, win32.NULL) except WindowsError as e: - if e.args[0] != win32.ERROR_PIPE_CONNECTED: + if e.winerror != win32.ERROR_PIPE_CONNECTED: raise - return _multiprocessing.PipeConnection(handle) + return PipeConnection(handle) @staticmethod def _finalize_pipe_listener(queue, address): @@ -366,8 +692,8 @@ if sys.platform == 'win32': 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL ) except WindowsError as e: - if e.args[0] not in (win32.ERROR_SEM_TIMEOUT, - win32.ERROR_PIPE_BUSY) or _check_timeout(t): + if e.winerror not in (win32.ERROR_SEM_TIMEOUT, + win32.ERROR_PIPE_BUSY) or _check_timeout(t): raise else: break @@ -377,7 +703,7 @@ if sys.platform == 'win32': win32.SetNamedPipeHandleState( h, win32.PIPE_READMODE_MESSAGE, None, None ) - return _multiprocessing.PipeConnection(h) + return PipeConnection(h) # # Authentication stuff @@ -434,10 +760,10 @@ class ConnectionWrapper(object): return self._loads(s) def _xml_dumps(obj): - return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') + return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') def _xml_loads(s): - (obj,), method = xmlrpclib.loads(s.decode('utf8')) + (obj,), method = xmlrpclib.loads(s.decode('utf-8')) return obj class XmlListener(Listener): @@ -451,3 +777,7 @@ def XmlClient(*args, **kwds): global xmlrpclib import xmlrpc.client as xmlrpclib return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) + + +# Late import because of circular import +from multiprocessing.forking import duplicate, close diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py index c4933d9..60add92 100644 --- a/Lib/multiprocessing/dummy/__init__.py +++ b/Lib/multiprocessing/dummy/__init__.py @@ -51,7 +51,7 @@ import itertools from multiprocessing import TimeoutError, cpu_count from multiprocessing.dummy.connection import Pipe from threading import Lock, RLock, Semaphore, BoundedSemaphore -from threading import Event +from threading import Event, Condition from queue import Queue # @@ -84,17 +84,6 @@ class DummyProcess(threading.Thread): # # -class Condition(threading._Condition): - # XXX - if sys.version_info < (3, 0): - notify_all = threading._Condition.notify_all.__func__ - else: - notify_all = threading._Condition.notify_all - -# -# -# - Process = DummyProcess current_process = threading.current_thread current_process()._children = weakref.WeakKeyDictionary() diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index cc7c326..a2c61ef 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -35,6 +35,7 @@ import os import sys import signal +import select from multiprocessing import util, process @@ -101,10 +102,12 @@ else: if sys.platform != 'win32': import time + import select exit = os._exit duplicate = os.dup close = os.close + _select = util._eintr_retry(select.select) # # We define a Popen class similar to the one from subprocess, but @@ -118,8 +121,12 @@ if sys.platform != 'win32': sys.stderr.flush() self.returncode = None + r, w = os.pipe() + self.sentinel = r + self.pid = os.fork() if self.pid == 0: + os.close(r) if 'random' in sys.modules: import random random.seed() @@ -128,6 +135,11 @@ if sys.platform != 'win32': sys.stderr.flush() os._exit(code) + # `w` will be closed when the child exits, at which point `r` + # will become ready for reading (using e.g. select()). + os.close(w) + util.Finalize(self, os.close, (r,)) + def poll(self, flag=os.WNOHANG): if self.returncode is None: try: @@ -145,20 +157,14 @@ if sys.platform != 'win32': return self.returncode def wait(self, timeout=None): - if timeout is None: - return self.poll(0) - deadline = time.time() + timeout - delay = 0.0005 - while 1: - res = self.poll() - if res is not None: - break - remaining = deadline - time.time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, 0.05) - time.sleep(delay) - return res + if self.returncode is None: + if timeout is not None: + r = _select([self.sentinel], [], [], timeout)[0] + if not r: + return None + # This shouldn't block if select() returned successfully. + return self.poll(os.WNOHANG if timeout == 0.0 else 0) + return self.returncode def terminate(self): if self.returncode is None: @@ -183,7 +189,7 @@ else: import time from pickle import dump, load, HIGHEST_PROTOCOL - from _multiprocessing import win32, Connection, PipeConnection + from _multiprocessing import win32 from .util import Finalize def dump(obj, file, protocol=None): @@ -258,6 +264,7 @@ else: self.pid = pid self.returncode = None self._handle = hp + self.sentinel = int(hp) # send information to child prep_data = get_preparation_data(process_obj._name) @@ -411,6 +418,9 @@ else: # Make (Pipe)Connection picklable # + # Late import because of circular import + from .connection import Connection, PipeConnection + def reduce_connection(conn): if not Popen.thread_is_spawning(): raise RuntimeError( diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py index 5987af9..98ce0da 100644 --- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -92,12 +92,16 @@ class Process(object): ''' _Popen = None - def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): + def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, + *, daemon=None): assert group is None, 'group argument must be None for now' count = next(_current_process._counter) self._identity = _current_process._identity + (count,) self._authkey = _current_process._authkey - self._daemonic = _current_process._daemonic + if daemon is not None: + self._daemonic = daemon + else: + self._daemonic = _current_process._daemonic self._tempdir = _current_process._tempdir self._parent_pid = os.getpid() self._popen = None @@ -130,6 +134,7 @@ class Process(object): else: from .forking import Popen self._popen = Popen(self) + self._sentinel = self._popen.sentinel _current_process._children.add(self) def terminate(self): @@ -216,6 +221,17 @@ class Process(object): pid = ident + @property + def sentinel(self): + ''' + Return a file descriptor (Unix) or handle (Windows) suitable for + waiting for process termination. + ''' + try: + return self._sentinel + except AttributeError: + raise ValueError("process not started") + def __repr__(self): if self is _current_process: status = 'started' diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 3280a25..bb4c7d8 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -41,10 +41,11 @@ import collections import time import atexit import weakref +import errno from queue import Empty, Full import _multiprocessing -from multiprocessing import Pipe +from multiprocessing.connection import Pipe, SentinelReady from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.util import debug, info, Finalize, register_after_fork from multiprocessing.forking import assert_spawning @@ -67,6 +68,8 @@ class Queue(object): else: self._wlock = Lock() self._sem = BoundedSemaphore(maxsize) + # For use by concurrent.futures + self._ignore_epipe = False self._after_fork() @@ -75,11 +78,11 @@ class Queue(object): def __getstate__(self): assert_spawning(self) - return (self._maxsize, self._reader, self._writer, + return (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) def __setstate__(self, state): - (self._maxsize, self._reader, self._writer, + (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) = state self._after_fork() @@ -178,7 +181,7 @@ class Queue(object): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send, - self._wlock, self._writer.close), + self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) self._thread.daemon = True @@ -229,7 +232,7 @@ class Queue(object): notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close): + def _feed(buffer, notempty, send, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -271,6 +274,8 @@ class Queue(object): except IndexError: pass except Exception as e: + if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: + return # Since this runs in a daemon thread the resources it uses # may be become unusable while the process is cleaning up. # We ignore errors which happen after the process has @@ -372,10 +377,10 @@ class SimpleQueue(object): def _make_methods(self): recv = self._reader.recv racquire, rrelease = self._rlock.acquire, self._rlock.release - def get(): + def get(*, sentinels=None): racquire() try: - return recv() + return recv(sentinels) finally: rrelease() self.get = get diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 6e5e5bc..042a136 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -39,19 +39,21 @@ import os import sys import socket import threading +import struct import _multiprocessing from multiprocessing import current_process from multiprocessing.forking import Popen, duplicate, close, ForkingPickler from multiprocessing.util import register_after_fork, debug, sub_debug -from multiprocessing.connection import Client, Listener +from multiprocessing.connection import Client, Listener, Connection # # # -if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): +if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and + hasattr(socket, 'SCM_RIGHTS'))): raise ImportError('pickling of connections not supported') # @@ -77,10 +79,23 @@ if sys.platform == 'win32': else: def send_handle(conn, handle, destination_pid): - _multiprocessing.sendfd(conn.fileno(), handle) + with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: + s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, + struct.pack("@i", handle))]) def recv_handle(conn): - return _multiprocessing.recvfd(conn.fileno()) + size = struct.calcsize("@i") + with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: + msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) + try: + cmsg_level, cmsg_type, cmsg_data = ancdata[0] + if (cmsg_level == socket.SOL_SOCKET and + cmsg_type == socket.SCM_RIGHTS): + return struct.unpack("@i", cmsg_data[:size])[0] + except (ValueError, IndexError, struct.error): + pass + raise RuntimeError('Invalid data received') + # # Support for a per-process server thread which caches pickled handles @@ -159,7 +174,7 @@ def rebuild_handle(pickled_data): return new_handle # -# Register `_multiprocessing.Connection` with `ForkingPickler` +# Register `Connection` with `ForkingPickler` # def reduce_connection(conn): @@ -168,11 +183,11 @@ def reduce_connection(conn): def rebuild_connection(reduced_handle, readable, writable): handle = rebuild_handle(reduced_handle) - return _multiprocessing.Connection( + return Connection( handle, readable=readable, writable=writable ) -ForkingPickler.register(_multiprocessing.Connection, reduce_connection) +ForkingPickler.register(Connection, reduce_connection) # # Register `socket.socket` with `ForkingPickler` @@ -201,6 +216,7 @@ ForkingPickler.register(socket.socket, reduce_socket) # if sys.platform == 'win32': + from multiprocessing.connection import PipeConnection def reduce_pipe_connection(conn): rh = reduce_handle(conn.fileno()) @@ -208,8 +224,8 @@ if sys.platform == 'win32': def rebuild_pipe_connection(reduced_handle, readable, writable): handle = rebuild_handle(reduced_handle) - return _multiprocessing.PipeConnection( + return PipeConnection( handle, readable=readable, writable=writable ) - ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection) + ForkingPickler.register(PipeConnection, reduce_pipe_connection) diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 30b7a85..5c26683 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -32,9 +32,11 @@ # SUCH DAMAGE. # +import functools import itertools import weakref import atexit +import select import threading # we want threading to install it's # cleanup function before multiprocessing does @@ -186,7 +188,11 @@ class Finalize(object): _finalizer_registry[self._key] = self - def __call__(self, wr=None): + def __call__(self, wr=None, + # Need to bind these locally because the globals can have + # been cleared at shutdown + _finalizer_registry=_finalizer_registry, + sub_debug=sub_debug): ''' Run the callback unless it has already been called or cancelled ''' @@ -315,3 +321,18 @@ class ForkAwareLocal(threading.local): register_after_fork(self, lambda obj : obj.__dict__.clear()) def __reduce__(self): return type(self), () + + +# +# Automatic retry after EINTR +# + +def _eintr_retry(func): + @functools.wraps(func) + def wrapped(*args, **kwargs): + while True: + try: + return func(*args, **kwargs) + except InterruptedError: + continue + return wrapped |