summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/__init__.py9
-rw-r--r--Lib/multiprocessing/connection.py553
-rw-r--r--Lib/multiprocessing/dummy/__init__.py15
-rw-r--r--Lib/multiprocessing/forking.py64
-rw-r--r--Lib/multiprocessing/heap.py1
-rw-r--r--Lib/multiprocessing/managers.py9
-rw-r--r--Lib/multiprocessing/pool.py32
-rw-r--r--Lib/multiprocessing/process.py20
-rw-r--r--Lib/multiprocessing/queues.py19
-rw-r--r--Lib/multiprocessing/reduction.py36
-rw-r--r--Lib/multiprocessing/sharedctypes.py1
-rw-r--r--Lib/multiprocessing/synchronize.py5
-rw-r--r--Lib/multiprocessing/util.py24
13 files changed, 642 insertions, 146 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
index e6e16c8..e012440 100644
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -48,7 +48,7 @@ __all__ = [
'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger',
'allow_connection_pickling', 'BufferTooShort', 'TimeoutError',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
- 'Event', 'Queue', 'JoinableQueue', 'Pool', 'Value', 'Array',
+ 'Event', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool', 'Value', 'Array',
'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
]
@@ -223,6 +223,13 @@ def JoinableQueue(maxsize=0):
from multiprocessing.queues import JoinableQueue
return JoinableQueue(maxsize)
+def SimpleQueue():
+ '''
+ Returns a queue object
+ '''
+ from multiprocessing.queues import SimpleQueue
+ return SimpleQueue()
+
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index fa6f7b3..954e901 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -32,21 +32,31 @@
# SUCH DAMAGE.
#
-__all__ = [ 'Client', 'Listener', 'Pipe' ]
+__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
+import io
import os
import sys
+import pickle
+import select
import socket
+import struct
import errno
import time
import tempfile
import itertools
import _multiprocessing
-from multiprocessing import current_process, AuthenticationError
-from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
-from multiprocessing.forking import duplicate, close
-
+from multiprocessing import current_process, AuthenticationError, BufferTooShort
+from multiprocessing.util import (
+ get_temp_dir, Finalize, sub_debug, debug, _eintr_retry)
+try:
+ from _multiprocessing import win32
+ from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
+except ImportError:
+ if sys.platform == 'win32':
+ raise
+ win32 = None
#
#
@@ -118,6 +128,306 @@ def address_type(address):
raise ValueError('address type of %r unrecognized' % address)
#
+# Connection classes
+#
+
+class _ConnectionBase:
+ _handle = None
+
+ def __init__(self, handle, readable=True, writable=True):
+ handle = handle.__index__()
+ if handle < 0:
+ raise ValueError("invalid handle")
+ if not readable and not writable:
+ raise ValueError(
+ "at least one of `readable` and `writable` must be True")
+ self._handle = handle
+ self._readable = readable
+ self._writable = writable
+
+ # XXX should we use util.Finalize instead of a __del__?
+
+ def __del__(self):
+ if self._handle is not None:
+ self._close()
+
+ def _check_closed(self):
+ if self._handle is None:
+ raise IOError("handle is closed")
+
+ def _check_readable(self):
+ if not self._readable:
+ raise IOError("connection is write-only")
+
+ def _check_writable(self):
+ if not self._writable:
+ raise IOError("connection is read-only")
+
+ def _bad_message_length(self):
+ if self._writable:
+ self._readable = False
+ else:
+ self.close()
+ raise IOError("bad message length")
+
+ @property
+ def closed(self):
+ """True if the connection is closed"""
+ return self._handle is None
+
+ @property
+ def readable(self):
+ """True if the connection is readable"""
+ return self._readable
+
+ @property
+ def writable(self):
+ """True if the connection is writable"""
+ return self._writable
+
+ def fileno(self):
+ """File descriptor or handle of the connection"""
+ self._check_closed()
+ return self._handle
+
+ def close(self):
+ """Close the connection"""
+ if self._handle is not None:
+ try:
+ self._close()
+ finally:
+ self._handle = None
+
+ def send_bytes(self, buf, offset=0, size=None):
+ """Send the bytes data from a bytes-like object"""
+ self._check_closed()
+ self._check_writable()
+ m = memoryview(buf)
+ # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
+ if m.itemsize > 1:
+ m = memoryview(bytes(m))
+ n = len(m)
+ if offset < 0:
+ raise ValueError("offset is negative")
+ if n < offset:
+ raise ValueError("buffer length < offset")
+ if size is None:
+ size = n - offset
+ elif size < 0:
+ raise ValueError("size is negative")
+ elif offset + size > n:
+ raise ValueError("buffer length < offset + size")
+ self._send_bytes(m[offset:offset + size])
+
+ def send(self, obj):
+ """Send a (picklable) object"""
+ self._check_closed()
+ self._check_writable()
+ buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
+ self._send_bytes(memoryview(buf))
+
+ def recv_bytes(self, maxlength=None):
+ """
+ Receive bytes data as a bytes object.
+ """
+ self._check_closed()
+ self._check_readable()
+ if maxlength is not None and maxlength < 0:
+ raise ValueError("negative maxlength")
+ buf = self._recv_bytes(maxlength)
+ if buf is None:
+ self._bad_message_length()
+ return buf.getvalue()
+
+ def recv_bytes_into(self, buf, offset=0):
+ """
+ Receive bytes data into a writeable buffer-like object.
+ Return the number of bytes read.
+ """
+ self._check_closed()
+ self._check_readable()
+ with memoryview(buf) as m:
+ # Get bytesize of arbitrary buffer
+ itemsize = m.itemsize
+ bytesize = itemsize * len(m)
+ if offset < 0:
+ raise ValueError("negative offset")
+ elif offset > bytesize:
+ raise ValueError("offset too large")
+ result = self._recv_bytes()
+ size = result.tell()
+ if bytesize < offset + size:
+ raise BufferTooShort(result.getvalue())
+ # Message can fit in dest
+ result.seek(0)
+ result.readinto(m[offset // itemsize :
+ (offset + size) // itemsize])
+ return size
+
+ def recv(self):
+ """Receive a (picklable) object"""
+ self._check_closed()
+ self._check_readable()
+ buf = self._recv_bytes()
+ return pickle.loads(buf.getbuffer())
+
+ def poll(self, timeout=0.0):
+ """Whether there is any input available to be read"""
+ self._check_closed()
+ self._check_readable()
+ return self._poll(timeout)
+
+
+if win32:
+
+ class PipeConnection(_ConnectionBase):
+ """
+ Connection class based on a Windows named pipe.
+ Overlapped I/O is used, so the handles must have been created
+ with FILE_FLAG_OVERLAPPED.
+ """
+ _got_empty_message = False
+
+ def _close(self, _CloseHandle=win32.CloseHandle):
+ _CloseHandle(self._handle)
+
+ def _send_bytes(self, buf):
+ ov, err = win32.WriteFile(self._handle, buf, overlapped=True)
+ try:
+ if err == win32.ERROR_IO_PENDING:
+ waitres = win32.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ assert waitres == WAIT_OBJECT_0
+ except:
+ ov.cancel()
+ raise
+ finally:
+ nwritten, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ assert nwritten == len(buf)
+
+ def _recv_bytes(self, maxsize=None):
+ if self._got_empty_message:
+ self._got_empty_message = False
+ return io.BytesIO()
+ else:
+ bsize = 128 if maxsize is None else min(maxsize, 128)
+ try:
+ ov, err = win32.ReadFile(self._handle, bsize,
+ overlapped=True)
+ try:
+ if err == win32.ERROR_IO_PENDING:
+ waitres = win32.WaitForMultipleObjects(
+ [ov.event], False, INFINITE)
+ assert waitres == WAIT_OBJECT_0
+ except:
+ ov.cancel()
+ raise
+ finally:
+ nread, err = ov.GetOverlappedResult(True)
+ if err == 0:
+ f = io.BytesIO()
+ f.write(ov.getbuffer())
+ return f
+ elif err == win32.ERROR_MORE_DATA:
+ return self._get_more_data(ov, maxsize)
+ except IOError as e:
+ if e.winerror == win32.ERROR_BROKEN_PIPE:
+ raise EOFError
+ else:
+ raise
+ raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
+
+ def _poll(self, timeout):
+ if (self._got_empty_message or
+ win32.PeekNamedPipe(self._handle)[0] != 0):
+ return True
+ if timeout < 0:
+ timeout = None
+ return bool(wait([self], timeout))
+
+ def _get_more_data(self, ov, maxsize):
+ buf = ov.getbuffer()
+ f = io.BytesIO()
+ f.write(buf)
+ left = win32.PeekNamedPipe(self._handle)[1]
+ assert left > 0
+ if maxsize is not None and len(buf) + left > maxsize:
+ self._bad_message_length()
+ ov, err = win32.ReadFile(self._handle, left, overlapped=True)
+ rbytes, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ assert rbytes == left
+ f.write(ov.getbuffer())
+ return f
+
+
+class Connection(_ConnectionBase):
+ """
+ Connection class based on an arbitrary file descriptor (Unix only), or
+ a socket handle (Windows).
+ """
+
+ if win32:
+ def _close(self, _close=win32.closesocket):
+ _close(self._handle)
+ _write = win32.send
+ _read = win32.recv
+ else:
+ def _close(self, _close=os.close):
+ _close(self._handle)
+ _write = os.write
+ _read = os.read
+
+ def _send(self, buf, write=_write):
+ remaining = len(buf)
+ while True:
+ n = write(self._handle, buf)
+ remaining -= n
+ if remaining == 0:
+ break
+ buf = buf[n:]
+
+ def _recv(self, size, read=_read):
+ buf = io.BytesIO()
+ handle = self._handle
+ remaining = size
+ while remaining > 0:
+ chunk = read(handle, remaining)
+ n = len(chunk)
+ if n == 0:
+ if remaining == size:
+ raise EOFError
+ else:
+ raise IOError("got end of file during message")
+ buf.write(chunk)
+ remaining -= n
+ return buf
+
+ def _send_bytes(self, buf):
+ # For wire compatibility with 3.2 and lower
+ n = len(buf)
+ self._send(struct.pack("!i", n))
+ # The condition is necessary to avoid "broken pipe" errors
+ # when sending a 0-length buffer if the other end closed the pipe.
+ if n > 0:
+ self._send(buf)
+
+ def _recv_bytes(self, maxsize=None):
+ buf = self._recv(4)
+ size, = struct.unpack("!i", buf.getvalue())
+ if maxsize is not None and size > maxsize:
+ return None
+ return self._recv(size)
+
+ def _poll(self, timeout):
+ if timeout < 0.0:
+ timeout = None
+ r = wait([self._handle], timeout)
+ return bool(r)
+
+
+#
# Public functions
#
@@ -195,21 +505,17 @@ if sys.platform != 'win32':
'''
if duplex:
s1, s2 = socket.socketpair()
- c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
- c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
- s1.close()
- s2.close()
+ c1 = Connection(s1.detach())
+ c2 = Connection(s2.detach())
else:
fd1, fd2 = os.pipe()
- c1 = _multiprocessing.Connection(fd1, writable=False)
- c2 = _multiprocessing.Connection(fd2, readable=False)
+ c1 = Connection(fd1, writable=False)
+ c2 = Connection(fd2, readable=False)
return c1, c2
else:
- from _multiprocessing import win32
-
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
@@ -225,26 +531,26 @@ else:
obsize, ibsize = 0, BUFSIZE
h1 = win32.CreateNamedPipe(
- address, openmode,
+ address, openmode | win32.FILE_FLAG_OVERLAPPED |
+ win32.FILE_FLAG_FIRST_PIPE_INSTANCE,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
h2 = win32.CreateFile(
- address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ address, access, 0, win32.NULL, win32.OPEN_EXISTING,
+ win32.FILE_FLAG_OVERLAPPED, win32.NULL
)
win32.SetNamedPipeHandleState(
h2, win32.PIPE_READMODE_MESSAGE, None, None
)
- try:
- win32.ConnectNamedPipe(h1, win32.NULL)
- except WindowsError as e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
- raise
+ overlapped = win32.ConnectNamedPipe(h1, overlapped=True)
+ _, err = overlapped.GetOverlappedResult(True)
+ assert err == 0
- c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
- c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
+ c1 = PipeConnection(h1, writable=duplex)
+ c2 = PipeConnection(h2, readable=duplex)
return c1, c2
@@ -259,11 +565,14 @@ class SocketListener(object):
def __init__(self, address, family, backlog=1):
self._socket = socket.socket(getattr(socket, family))
try:
- self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ # SO_REUSEADDR has different semantics on Windows (issue #2550).
+ if os.name == 'posix':
+ self._socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR, 1)
self._socket.bind(address)
self._socket.listen(backlog)
self._address = self._socket.getsockname()
- except socket.error:
+ except OSError:
self._socket.close()
raise
self._family = family
@@ -279,7 +588,7 @@ class SocketListener(object):
def accept(self):
s, self._last_accepted = self._socket.accept()
fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
+ conn = Connection(fd)
s.close()
return conn
@@ -295,23 +604,9 @@ def SocketClient(address):
'''
family = address_type(address)
with socket.socket( getattr(socket, family) ) as s:
- t = _init_timeout()
-
- while 1:
- try:
- s.connect(address)
- except socket.error as e:
- if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
- debug('failed to connect to address %s', address)
- raise
- time.sleep(0.01)
- else:
- break
- else:
- raise
-
+ s.connect(address)
fd = duplicate(s.fileno())
- conn = _multiprocessing.Connection(fd)
+ conn = Connection(fd)
return conn
#
@@ -326,39 +621,41 @@ if sys.platform == 'win32':
'''
def __init__(self, address, backlog=None):
self._address = address
- handle = win32.CreateNamedPipe(
- address, win32.PIPE_ACCESS_DUPLEX,
- win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
- win32.PIPE_WAIT,
- win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
- win32.NMPWAIT_WAIT_FOREVER, win32.NULL
- )
- self._handle_queue = [handle]
- self._last_accepted = None
+ self._handle_queue = [self._new_handle(first=True)]
+ self._last_accepted = None
sub_debug('listener created with address=%r', self._address)
-
self.close = Finalize(
self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0
)
- def accept(self):
- newhandle = win32.CreateNamedPipe(
- self._address, win32.PIPE_ACCESS_DUPLEX,
+ def _new_handle(self, first=False):
+ flags = win32.PIPE_ACCESS_DUPLEX | win32.FILE_FLAG_OVERLAPPED
+ if first:
+ flags |= win32.FILE_FLAG_FIRST_PIPE_INSTANCE
+ return win32.CreateNamedPipe(
+ self._address, flags,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT,
win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
win32.NMPWAIT_WAIT_FOREVER, win32.NULL
)
- self._handle_queue.append(newhandle)
+
+ def accept(self):
+ self._handle_queue.append(self._new_handle())
handle = self._handle_queue.pop(0)
+ ov = win32.ConnectNamedPipe(handle, overlapped=True)
try:
- win32.ConnectNamedPipe(handle, win32.NULL)
- except WindowsError as e:
- if e.args[0] != win32.ERROR_PIPE_CONNECTED:
- raise
- return _multiprocessing.PipeConnection(handle)
+ res = win32.WaitForMultipleObjects([ov.event], False, INFINITE)
+ except:
+ ov.cancel()
+ win32.CloseHandle(handle)
+ raise
+ finally:
+ _, err = ov.GetOverlappedResult(True)
+ assert err == 0
+ return PipeConnection(handle)
@staticmethod
def _finalize_pipe_listener(queue, address):
@@ -376,11 +673,12 @@ if sys.platform == 'win32':
win32.WaitNamedPipe(address, 1000)
h = win32.CreateFile(
address, win32.GENERIC_READ | win32.GENERIC_WRITE,
- 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+ 0, win32.NULL, win32.OPEN_EXISTING,
+ win32.FILE_FLAG_OVERLAPPED, win32.NULL
)
except WindowsError as e:
- if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
- win32.ERROR_PIPE_BUSY) or _check_timeout(t):
+ if e.winerror not in (win32.ERROR_SEM_TIMEOUT,
+ win32.ERROR_PIPE_BUSY) or _check_timeout(t):
raise
else:
break
@@ -390,7 +688,7 @@ if sys.platform == 'win32':
win32.SetNamedPipeHandleState(
h, win32.PIPE_READMODE_MESSAGE, None, None
)
- return _multiprocessing.PipeConnection(h)
+ return PipeConnection(h)
#
# Authentication stuff
@@ -447,10 +745,10 @@ class ConnectionWrapper(object):
return self._loads(s)
def _xml_dumps(obj):
- return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
+ return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
def _xml_loads(s):
- (obj,), method = xmlrpclib.loads(s.decode('utf8'))
+ (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
return obj
class XmlListener(Listener):
@@ -464,3 +762,126 @@ def XmlClient(*args, **kwds):
global xmlrpclib
import xmlrpc.client as xmlrpclib
return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
+
+#
+# Wait
+#
+
+if sys.platform == 'win32':
+
+ def _exhaustive_wait(handles, timeout):
+ # Return ALL handles which are currently signalled. (Only
+ # returning the first signalled might create starvation issues.)
+ L = list(handles)
+ ready = []
+ while L:
+ res = win32.WaitForMultipleObjects(L, False, timeout)
+ if res == WAIT_TIMEOUT:
+ break
+ elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
+ res -= WAIT_OBJECT_0
+ elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
+ res -= WAIT_ABANDONED_0
+ else:
+ raise RuntimeError('Should not get here')
+ ready.append(L[res])
+ L = L[res+1:]
+ timeout = 0
+ return ready
+
+ _ready_errors = {win32.ERROR_BROKEN_PIPE, win32.ERROR_NETNAME_DELETED}
+
+ def wait(object_list, timeout=None):
+ '''
+ Wait till an object in object_list is ready/readable.
+
+ Returns list of those objects in object_list which are ready/readable.
+ '''
+ if timeout is None:
+ timeout = INFINITE
+ elif timeout < 0:
+ timeout = 0
+ else:
+ timeout = int(timeout * 1000 + 0.5)
+
+ object_list = list(object_list)
+ waithandle_to_obj = {}
+ ov_list = []
+ ready_objects = set()
+ ready_handles = set()
+
+ try:
+ for o in object_list:
+ try:
+ fileno = getattr(o, 'fileno')
+ except AttributeError:
+ waithandle_to_obj[o.__index__()] = o
+ else:
+ # start an overlapped read of length zero
+ try:
+ ov, err = win32.ReadFile(fileno(), 0, True)
+ except OSError as e:
+ err = e.winerror
+ if err not in _ready_errors:
+ raise
+ if err == win32.ERROR_IO_PENDING:
+ ov_list.append(ov)
+ waithandle_to_obj[ov.event] = o
+ else:
+ # If o.fileno() is an overlapped pipe handle and
+ # err == 0 then there is a zero length message
+ # in the pipe, but it HAS NOT been consumed.
+ ready_objects.add(o)
+ timeout = 0
+
+ ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
+ finally:
+ # request that overlapped reads stop
+ for ov in ov_list:
+ ov.cancel()
+
+ # wait for all overlapped reads to stop
+ for ov in ov_list:
+ try:
+ _, err = ov.GetOverlappedResult(True)
+ except OSError as e:
+ err = e.winerror
+ if err not in _ready_errors:
+ raise
+ if err != win32.ERROR_OPERATION_ABORTED:
+ o = waithandle_to_obj[ov.event]
+ ready_objects.add(o)
+ if err == 0:
+ # If o.fileno() is an overlapped pipe handle then
+ # a zero length message HAS been consumed.
+ if hasattr(o, '_got_empty_message'):
+ o._got_empty_message = True
+
+ ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
+ return [o for o in object_list if o in ready_objects]
+
+else:
+
+ def wait(object_list, timeout=None):
+ '''
+ Wait till an object in object_list is ready/readable.
+
+ Returns list of those objects in object_list which are ready/readable.
+ '''
+ if timeout is not None:
+ if timeout <= 0:
+ return select.select(object_list, [], [], 0)[0]
+ else:
+ deadline = time.time() + timeout
+ while True:
+ try:
+ return select.select(object_list, [], [], timeout)[0]
+ except OSError as e:
+ if e.errno != errno.EINTR:
+ raise
+ if timeout is not None:
+ timeout = deadline - time.time()
+
+
+# Late import because of circular import
+from multiprocessing.forking import duplicate, close
diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py
index c4933d9..056acfc 100644
--- a/Lib/multiprocessing/dummy/__init__.py
+++ b/Lib/multiprocessing/dummy/__init__.py
@@ -46,12 +46,10 @@ import threading
import sys
import weakref
import array
-import itertools
-from multiprocessing import TimeoutError, cpu_count
from multiprocessing.dummy.connection import Pipe
from threading import Lock, RLock, Semaphore, BoundedSemaphore
-from threading import Event
+from threading import Event, Condition
from queue import Queue
#
@@ -84,17 +82,6 @@ class DummyProcess(threading.Thread):
#
#
-class Condition(threading._Condition):
- # XXX
- if sys.version_info < (3, 0):
- notify_all = threading._Condition.notify_all.__func__
- else:
- notify_all = threading._Condition.notify_all
-
-#
-#
-#
-
Process = DummyProcess
current_process = threading.current_thread
current_process()._children = weakref.WeakKeyDictionary()
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index 4e24d6a..020508a 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -55,18 +55,18 @@ def assert_spawning(self):
# Try making some callable types picklable
#
-from pickle import _Pickler as Pickler
+from pickle import Pickler
+from copyreg import dispatch_table
+
class ForkingPickler(Pickler):
- dispatch = Pickler.dispatch.copy()
+ _extra_reducers = {}
+ def __init__(self, *args):
+ Pickler.__init__(self, *args)
+ self.dispatch_table = dispatch_table.copy()
+ self.dispatch_table.update(self._extra_reducers)
@classmethod
def register(cls, type, reduce):
- def dispatcher(self, obj):
- rv = reduce(obj)
- if isinstance(rv, str):
- self.save_global(obj, rv)
- else:
- self.save_reduce(obj=obj, *rv)
- cls.dispatch[type] = dispatcher
+ cls._extra_reducers[type] = reduce
def _reduce_method(m):
if m.__self__ is None:
@@ -100,11 +100,12 @@ else:
#
if sys.platform != 'win32':
- import time
+ import select
exit = os._exit
duplicate = os.dup
close = os.close
+ _select = util._eintr_retry(select.select)
#
# We define a Popen class similar to the one from subprocess, but
@@ -118,14 +119,23 @@ if sys.platform != 'win32':
sys.stderr.flush()
self.returncode = None
+ r, w = os.pipe()
+ self.sentinel = r
+
self.pid = os.fork()
if self.pid == 0:
+ os.close(r)
if 'random' in sys.modules:
import random
random.seed()
code = process_obj._bootstrap()
os._exit(code)
+ # `w` will be closed when the child exits, at which point `r`
+ # will become ready for reading (using e.g. select()).
+ os.close(w)
+ util.Finalize(self, os.close, (r,))
+
def poll(self, flag=os.WNOHANG):
if self.returncode is None:
try:
@@ -143,26 +153,20 @@ if sys.platform != 'win32':
return self.returncode
def wait(self, timeout=None):
- if timeout is None:
- return self.poll(0)
- deadline = time.time() + timeout
- delay = 0.0005
- while 1:
- res = self.poll()
- if res is not None:
- break
- remaining = deadline - time.time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, 0.05)
- time.sleep(delay)
- return res
+ if self.returncode is None:
+ if timeout is not None:
+ r = _select([self.sentinel], [], [], timeout)[0]
+ if not r:
+ return None
+ # This shouldn't block if select() returned successfully.
+ return self.poll(os.WNOHANG if timeout == 0.0 else 0)
+ return self.returncode
def terminate(self):
if self.returncode is None:
try:
os.kill(self.pid, signal.SIGTERM)
- except OSError as e:
+ except OSError:
if self.wait(timeout=0.1) is None:
raise
@@ -178,11 +182,9 @@ else:
import _thread
import msvcrt
import _subprocess
- import time
- from pickle import dump, load, HIGHEST_PROTOCOL
- from _multiprocessing import win32, Connection, PipeConnection
- from .util import Finalize
+ from pickle import load, HIGHEST_PROTOCOL
+ from _multiprocessing import win32
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
@@ -256,6 +258,7 @@ else:
self.pid = pid
self.returncode = None
self._handle = hp
+ self.sentinel = int(hp)
# send information to child
prep_data = get_preparation_data(process_obj._name)
@@ -409,6 +412,9 @@ else:
# Make (Pipe)Connection picklable
#
+ # Late import because of circular import
+ from .connection import Connection, PipeConnection
+
def reduce_connection(conn):
if not Popen.thread_is_spawning():
raise RuntimeError(
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
index 0a25ef0..7366bd2 100644
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -34,7 +34,6 @@
import bisect
import mmap
-import tempfile
import os
import sys
import threading
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 5588ead..eaf912c 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -39,19 +39,15 @@ __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
# Imports
#
-import os
import sys
-import weakref
import threading
import array
import queue
from traceback import format_exc
-from pickle import PicklingError
from multiprocessing import Process, current_process, active_children, Pool, util, connection
from multiprocessing.process import AuthenticationString
-from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
-from multiprocessing.util import Finalize, info
+from multiprocessing.forking import exit, Popen, ForkingPickler
#
# Register some things for pickling
@@ -1070,11 +1066,12 @@ ArrayProxy = MakeProxyType('ArrayProxy', (
PoolProxy = MakeProxyType('PoolProxy', (
'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
- 'map', 'map_async', 'terminate'
+ 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
))
PoolProxy._method_to_typeid_ = {
'apply_async': 'AsyncResult',
'map_async': 'AsyncResult',
+ 'starmap_async': 'AsyncResult',
'imap': 'Iterator',
'imap_unordered': 'Iterator'
}
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 0c29e64..7039d16 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -64,6 +64,9 @@ job_counter = itertools.count()
def mapstar(args):
return list(map(*args))
+def starmapstar(args):
+ return list(itertools.starmap(args[0], args[1]))
+
#
# Code run by worker processes
#
@@ -248,7 +251,25 @@ class Pool(object):
in a list that is returned.
'''
assert self._state == RUN
- return self.map_async(func, iterable, chunksize).get()
+ return self._map_async(func, iterable, mapstar, chunksize).get()
+
+ def starmap(self, func, iterable, chunksize=None):
+ '''
+ Like `map()` method but the elements of the `iterable` are expected to
+ be iterables as well and will be unpacked as arguments. Hence
+ `func` and (a, b) becomes func(a, b).
+ '''
+ assert self._state == RUN
+ return self._map_async(func, iterable, starmapstar, chunksize).get()
+
+ def starmap_async(self, func, iterable, chunksize=None, callback=None,
+ error_callback=None):
+ '''
+ Asynchronous version of `starmap()` method.
+ '''
+ assert self._state == RUN
+ return self._map_async(func, iterable, starmapstar, chunksize,
+ callback, error_callback)
def imap(self, func, iterable, chunksize=1):
'''
@@ -302,6 +323,13 @@ class Pool(object):
Asynchronous version of `map()` method.
'''
assert self._state == RUN
+ return self._map_async(func, iterable, mapstar, chunksize)
+
+ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
+ error_callback=None):
+ '''
+ Helper function to implement map, starmap and their async counterparts.
+ '''
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
@@ -315,7 +343,7 @@ class Pool(object):
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback,
error_callback=error_callback)
- self._taskqueue.put((((result._job, i, mapstar, (x,), {})
+ self._taskqueue.put((((result._job, i, mapper, (x,), {})
for i, x in enumerate(task_batches)), None))
return result
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
index 2b61ee9..b599f11 100644
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -92,12 +92,16 @@ class Process(object):
'''
_Popen = None
- def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
+ def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
+ *, daemon=None):
assert group is None, 'group argument must be None for now'
count = next(_current_process._counter)
self._identity = _current_process._identity + (count,)
self._authkey = _current_process._authkey
- self._daemonic = _current_process._daemonic
+ if daemon is not None:
+ self._daemonic = daemon
+ else:
+ self._daemonic = _current_process._daemonic
self._tempdir = _current_process._tempdir
self._parent_pid = os.getpid()
self._popen = None
@@ -130,6 +134,7 @@ class Process(object):
else:
from .forking import Popen
self._popen = Popen(self)
+ self._sentinel = self._popen.sentinel
_current_process._children.add(self)
def terminate(self):
@@ -216,6 +221,17 @@ class Process(object):
pid = ident
+ @property
+ def sentinel(self):
+ '''
+ Return a file descriptor (Unix) or handle (Windows) suitable for
+ waiting for process termination.
+ '''
+ try:
+ return self._sentinel
+ except AttributeError:
+ raise ValueError("process not started")
+
def __repr__(self):
if self is _current_process:
status = 'started'
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 51d9912..262fd85 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -39,12 +39,12 @@ import os
import threading
import collections
import time
-import atexit
import weakref
+import errno
from queue import Empty, Full
import _multiprocessing
-from multiprocessing import Pipe
+from multiprocessing.connection import Pipe
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
from multiprocessing.forking import assert_spawning
@@ -67,6 +67,8 @@ class Queue(object):
else:
self._wlock = Lock()
self._sem = BoundedSemaphore(maxsize)
+ # For use by concurrent.futures
+ self._ignore_epipe = False
self._after_fork()
@@ -75,11 +77,11 @@ class Queue(object):
def __getstate__(self):
assert_spawning(self)
- return (self._maxsize, self._reader, self._writer,
+ return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def __setstate__(self, state):
- (self._maxsize, self._reader, self._writer,
+ (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._after_fork()
@@ -182,7 +184,7 @@ class Queue(object):
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send,
- self._wlock, self._writer.close),
+ self._wlock, self._writer.close, self._ignore_epipe),
name='QueueFeederThread'
)
self._thread.daemon = True
@@ -233,7 +235,7 @@ class Queue(object):
notempty.release()
@staticmethod
- def _feed(buffer, notempty, send, writelock, close):
+ def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe')
from .util import is_exiting
@@ -275,6 +277,8 @@ class Queue(object):
except IndexError:
pass
except Exception as e:
+ if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
+ return
# Since this runs in a daemon thread the resources it uses
# may be become unusable while the process is cleaning up.
# We ignore errors which happen after the process has
@@ -356,6 +360,7 @@ class SimpleQueue(object):
def __init__(self):
self._reader, self._writer = Pipe(duplex=False)
self._rlock = Lock()
+ self._poll = self._reader.poll
if sys.platform == 'win32':
self._wlock = None
else:
@@ -363,7 +368,7 @@ class SimpleQueue(object):
self._make_methods()
def empty(self):
- return not self._reader.poll()
+ return not self._poll()
def __getstate__(self):
assert_spawning(self)
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index 6e5e5bc..dda4a41 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -39,19 +39,20 @@ import os
import sys
import socket
import threading
+import struct
-import _multiprocessing
from multiprocessing import current_process
from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.connection import Client, Listener
+from multiprocessing.connection import Client, Listener, Connection
#
#
#
-if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
+if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
+ hasattr(socket, 'SCM_RIGHTS'))):
raise ImportError('pickling of connections not supported')
#
@@ -59,7 +60,6 @@ if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
#
if sys.platform == 'win32':
- import _subprocess
from _multiprocessing import win32
def send_handle(conn, handle, destination_pid):
@@ -77,10 +77,23 @@ if sys.platform == 'win32':
else:
def send_handle(conn, handle, destination_pid):
- _multiprocessing.sendfd(conn.fileno(), handle)
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
+ struct.pack("@i", handle))])
def recv_handle(conn):
- return _multiprocessing.recvfd(conn.fileno())
+ size = struct.calcsize("@i")
+ with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+ msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
+ try:
+ cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+ if (cmsg_level == socket.SOL_SOCKET and
+ cmsg_type == socket.SCM_RIGHTS):
+ return struct.unpack("@i", cmsg_data[:size])[0]
+ except (ValueError, IndexError, struct.error):
+ pass
+ raise RuntimeError('Invalid data received')
+
#
# Support for a per-process server thread which caches pickled handles
@@ -159,7 +172,7 @@ def rebuild_handle(pickled_data):
return new_handle
#
-# Register `_multiprocessing.Connection` with `ForkingPickler`
+# Register `Connection` with `ForkingPickler`
#
def reduce_connection(conn):
@@ -168,11 +181,11 @@ def reduce_connection(conn):
def rebuild_connection(reduced_handle, readable, writable):
handle = rebuild_handle(reduced_handle)
- return _multiprocessing.Connection(
+ return Connection(
handle, readable=readable, writable=writable
)
-ForkingPickler.register(_multiprocessing.Connection, reduce_connection)
+ForkingPickler.register(Connection, reduce_connection)
#
# Register `socket.socket` with `ForkingPickler`
@@ -201,6 +214,7 @@ ForkingPickler.register(socket.socket, reduce_socket)
#
if sys.platform == 'win32':
+ from multiprocessing.connection import PipeConnection
def reduce_pipe_connection(conn):
rh = reduce_handle(conn.fileno())
@@ -208,8 +222,8 @@ if sys.platform == 'win32':
def rebuild_pipe_connection(reduced_handle, readable, writable):
handle = rebuild_handle(reduced_handle)
- return _multiprocessing.PipeConnection(
+ return PipeConnection(
handle, readable=readable, writable=writable
)
- ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection)
+ ForkingPickler.register(PipeConnection, reduce_pipe_connection)
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
index 1e694da..5826379 100644
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -32,7 +32,6 @@
# SUCH DAMAGE.
#
-import sys
import ctypes
import weakref
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index 70ae825..e35bbff 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -37,14 +37,11 @@ __all__ = [
]
import threading
-import os
import sys
-from time import time as _time, sleep as _sleep
-
import _multiprocessing
from multiprocessing.process import current_process
-from multiprocessing.util import Finalize, register_after_fork, debug
+from multiprocessing.util import register_after_fork, debug
from multiprocessing.forking import assert_spawning, Popen
# Try to import the mp.synchronize module cleanly, if it fails
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index 30b7a85..0bbb87e 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -32,6 +32,7 @@
# SUCH DAMAGE.
#
+import functools
import itertools
import weakref
import atexit
@@ -84,7 +85,7 @@ def get_logger():
Returns logger used by multiprocessing
'''
global _logger
- import logging, atexit
+ import logging
logging._acquireLock()
try:
@@ -186,7 +187,11 @@ class Finalize(object):
_finalizer_registry[self._key] = self
- def __call__(self, wr=None):
+ def __call__(self, wr=None,
+ # Need to bind these locally because the globals can have
+ # been cleared at shutdown
+ _finalizer_registry=_finalizer_registry,
+ sub_debug=sub_debug):
'''
Run the callback unless it has already been called or cancelled
'''
@@ -315,3 +320,18 @@ class ForkAwareLocal(threading.local):
register_after_fork(self, lambda obj : obj.__dict__.clear())
def __reduce__(self):
return type(self), ()
+
+
+#
+# Automatic retry after EINTR
+#
+
+def _eintr_retry(func):
+ @functools.wraps(func)
+ def wrapped(*args, **kwargs):
+ while True:
+ try:
+ return func(*args, **kwargs)
+ except InterruptedError:
+ continue
+ return wrapped