From 5438ed1572f5e379cac8b85ed2226101a1bfcacc Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 24 Apr 2012 22:56:57 +0200 Subject: Issue #4892: multiprocessing Connections can now be transferred over multiprocessing Connections. Patch by Richard Oudkerk (sbt). --- Doc/library/multiprocessing.rst | 4 + Lib/multiprocessing/__init__.py | 4 +- Lib/multiprocessing/connection.py | 24 ++- Lib/multiprocessing/forking.py | 19 --- Lib/multiprocessing/reduction.py | 315 +++++++++++++++++++++----------------- Lib/test/test_multiprocessing.py | 103 +++++++++---- Misc/NEWS | 3 + Modules/_winapi.c | 4 + 8 files changed, 283 insertions(+), 193 deletions(-) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index b9dfd19..bdc07f1 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -832,6 +832,10 @@ Connection objects are usually created using :func:`Pipe` -- see also raised and the complete message is available as ``e.args[0]`` where ``e`` is the exception instance. + .. versionchanged:: 3.3 + Connection objects themselves can now be transferred between processes + using :meth:`Connection.send` and :meth:`Connection.recv`. + For example: diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index e012440..28380e5 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -161,7 +161,9 @@ def allow_connection_pickling(): ''' Install support for sending connections and sockets between processes ''' - from multiprocessing import reduction + # This is undocumented. In previous versions of multiprocessing + # its only effect was to make socket objects inheritable on Windows. + import multiprocessing.connection # # Definitions depending on native semaphores diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 3a61e5e..64d71bc 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -50,6 +50,7 @@ import _multiprocessing from multiprocessing import current_process, AuthenticationError, BufferTooShort from multiprocessing.util import ( get_temp_dir, Finalize, sub_debug, debug, _eintr_retry) +from multiprocessing.forking import ForkingPickler try: import _winapi from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE @@ -227,8 +228,9 @@ class _ConnectionBase: """Send a (picklable) object""" self._check_closed() self._check_writable() - buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL) - self._send_bytes(memoryview(buf)) + buf = io.BytesIO() + ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj) + self._send_bytes(buf.getbuffer()) def recv_bytes(self, maxlength=None): """ @@ -880,3 +882,21 @@ else: raise if timeout is not None: timeout = deadline - time.time() + +# +# Make connection and socket objects sharable if possible +# + +if sys.platform == 'win32': + from . import reduction + ForkingPickler.register(socket.socket, reduction.reduce_socket) + ForkingPickler.register(Connection, reduction.reduce_connection) + ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection) +else: + try: + from . import reduction + except ImportError: + pass + else: + ForkingPickler.register(socket.socket, reduction.reduce_socket) + ForkingPickler.register(Connection, reduction.reduce_connection) diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py index 0cbb741..15fdb0e 100644 --- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -407,25 +407,6 @@ else: return d - # - # Make (Pipe)Connection picklable - # - - # Late import because of circular import - from .connection import Connection, PipeConnection - - def reduce_connection(conn): - if not Popen.thread_is_spawning(): - raise RuntimeError( - 'By default %s objects can only be shared between processes\n' - 'using inheritance' % type(conn).__name__ - ) - return type(conn), (Popen.duplicate_for_child(conn.fileno()), - conn.readable, conn.writable) - - ForkingPickler.register(Connection, reduce_connection) - ForkingPickler.register(PipeConnection, reduce_connection) - # # Prepare current process # diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index c80de59..ce38fe3 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -33,7 +33,7 @@ # SUCH DAMAGE. # -__all__ = [] +__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] import os import sys @@ -42,9 +42,8 @@ import threading import struct from multiprocessing import current_process -from multiprocessing.forking import Popen, duplicate, close, ForkingPickler from multiprocessing.util import register_after_fork, debug, sub_debug -from multiprocessing.connection import Client, Listener, Connection +from multiprocessing.util import is_exiting, sub_warning # @@ -60,22 +59,91 @@ if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and # if sys.platform == 'win32': + # Windows + __all__ += ['reduce_pipe_connection'] import _winapi def send_handle(conn, handle, destination_pid): - process_handle = _winapi.OpenProcess( - _winapi.PROCESS_ALL_ACCESS, False, destination_pid - ) - try: - new_handle = duplicate(handle, process_handle) - conn.send(new_handle) - finally: - close(process_handle) + dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) + conn.send(dh) def recv_handle(conn): - return conn.recv() + return conn.recv().detach() + + class DupHandle(object): + def __init__(self, handle, access, pid=None): + # duplicate handle for process with given pid + if pid is None: + pid = os.getpid() + proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) + try: + self._handle = _winapi.DuplicateHandle( + _winapi.GetCurrentProcess(), + handle, proc, access, False, 0) + finally: + _winapi.CloseHandle(proc) + self._access = access + self._pid = pid + + def detach(self): + # retrieve handle from process which currently owns it + if self._pid == os.getpid(): + return self._handle + proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, + self._pid) + try: + return _winapi.DuplicateHandle( + proc, self._handle, _winapi.GetCurrentProcess(), + self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) + finally: + _winapi.CloseHandle(proc) + + class DupSocket(object): + def __init__(self, sock): + new_sock = sock.dup() + def send(conn, pid): + share = new_sock.share(pid) + conn.send_bytes(share) + self._id = resource_sharer.register(send, new_sock.close) + + def detach(self): + conn = resource_sharer.get_connection(self._id) + try: + share = conn.recv_bytes() + return socket.fromshare(share) + finally: + conn.close() + + def reduce_socket(s): + return rebuild_socket, (DupSocket(s),) + + def rebuild_socket(ds): + return ds.detach() + + def reduce_connection(conn): + handle = conn.fileno() + with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: + ds = DupSocket(s) + return rebuild_connection, (ds, conn.readable, conn.writable) + + def rebuild_connection(ds, readable, writable): + from .connection import Connection + sock = ds.detach() + return Connection(sock.detach(), readable, writable) + + def reduce_pipe_connection(conn): + access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | + (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) + dh = DupHandle(conn.fileno(), access) + return rebuild_pipe_connection, (dh, conn.readable, conn.writable) + + def rebuild_pipe_connection(dh, readable, writable): + from .connection import PipeConnection + handle = dh.detach() + return PipeConnection(handle, readable, writable) else: + # Unix def send_handle(conn, handle, destination_pid): with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, @@ -94,136 +162,109 @@ else: pass raise RuntimeError('Invalid data received') + class DupFd(object): + def __init__(self, fd): + new_fd = os.dup(fd) + def send(conn, pid): + send_handle(conn, new_fd, pid) + def close(): + os.close(new_fd) + self._id = resource_sharer.register(send, close) + + def detach(self): + conn = resource_sharer.get_connection(self._id) + try: + return recv_handle(conn) + finally: + conn.close() -# -# Support for a per-process server thread which caches pickled handles -# - -_cache = set() - -def _reset(obj): - global _lock, _listener, _cache - for h in _cache: - close(h) - _cache.clear() - _lock = threading.Lock() - _listener = None - -_reset(None) -register_after_fork(_reset, _reset) - -def _get_listener(): - global _listener - - if _listener is None: - _lock.acquire() - try: - if _listener is None: - debug('starting listener and thread for sending handles') - _listener = Listener(authkey=current_process().authkey) - t = threading.Thread(target=_serve) - t.daemon = True - t.start() - finally: - _lock.release() - - return _listener - -def _serve(): - from .util import is_exiting, sub_warning - - while 1: - try: - conn = _listener.accept() - handle_wanted, destination_pid = conn.recv() - _cache.remove(handle_wanted) - send_handle(conn, handle_wanted, destination_pid) - close(handle_wanted) - conn.close() - except: - if not is_exiting(): - import traceback - sub_warning( - 'thread for sharing handles raised exception :\n' + - '-'*79 + '\n' + traceback.format_exc() + '-'*79 - ) - -# -# Functions to be used for pickling/unpickling objects with handles -# - -def reduce_handle(handle): - if Popen.thread_is_spawning(): - return (None, Popen.duplicate_for_child(handle), True) - dup_handle = duplicate(handle) - _cache.add(dup_handle) - sub_debug('reducing handle %d', handle) - return (_get_listener().address, dup_handle, False) - -def rebuild_handle(pickled_data): - address, handle, inherited = pickled_data - if inherited: - return handle - sub_debug('rebuilding handle %d', handle) - conn = Client(address, authkey=current_process().authkey) - conn.send((handle, os.getpid())) - new_handle = recv_handle(conn) - conn.close() - return new_handle - -# -# Register `Connection` with `ForkingPickler` -# - -def reduce_connection(conn): - rh = reduce_handle(conn.fileno()) - return rebuild_connection, (rh, conn.readable, conn.writable) - -def rebuild_connection(reduced_handle, readable, writable): - handle = rebuild_handle(reduced_handle) - return Connection( - handle, readable=readable, writable=writable - ) - -ForkingPickler.register(Connection, reduce_connection) - -# -# Register `socket.socket` with `ForkingPickler` -# - -def fromfd(fd, family, type_, proto=0): - s = socket.fromfd(fd, family, type_, proto) - if s.__class__ is not socket.socket: - s = socket.socket(_sock=s) - return s + def reduce_socket(s): + df = DupFd(s.fileno()) + return rebuild_socket, (df, s.family, s.type, s.proto) -def reduce_socket(s): - reduced_handle = reduce_handle(s.fileno()) - return rebuild_socket, (reduced_handle, s.family, s.type, s.proto) + def rebuild_socket(df, family, type, proto): + fd = df.detach() + s = socket.fromfd(fd, family, type, proto) + os.close(fd) + return s -def rebuild_socket(reduced_handle, family, type_, proto): - fd = rebuild_handle(reduced_handle) - _sock = fromfd(fd, family, type_, proto) - close(fd) - return _sock + def reduce_connection(conn): + df = DupFd(conn.fileno()) + return rebuild_connection, (df, conn.readable, conn.writable) -ForkingPickler.register(socket.socket, reduce_socket) + def rebuild_connection(df, readable, writable): + from .connection import Connection + fd = df.detach() + return Connection(fd, readable, writable) # -# Register `_multiprocessing.PipeConnection` with `ForkingPickler` +# Server which shares registered resources with clients # -if sys.platform == 'win32': - from multiprocessing.connection import PipeConnection - - def reduce_pipe_connection(conn): - rh = reduce_handle(conn.fileno()) - return rebuild_pipe_connection, (rh, conn.readable, conn.writable) - - def rebuild_pipe_connection(reduced_handle, readable, writable): - handle = rebuild_handle(reduced_handle) - return PipeConnection( - handle, readable=readable, writable=writable - ) - - ForkingPickler.register(PipeConnection, reduce_pipe_connection) +class ResourceSharer(object): + def __init__(self): + self._key = 0 + self._cache = {} + self._old_locks = [] + self._lock = threading.Lock() + self._listener = None + self._address = None + register_after_fork(self, ResourceSharer._afterfork) + + def register(self, send, close): + with self._lock: + if self._address is None: + self._start() + self._key += 1 + self._cache[self._key] = (send, close) + return (self._address, self._key) + + @staticmethod + def get_connection(ident): + from .connection import Client + address, key = ident + c = Client(address, authkey=current_process().authkey) + c.send((key, os.getpid())) + return c + + def _afterfork(self): + for key, (send, close) in self._cache.items(): + close() + self._cache.clear() + # If self._lock was locked at the time of the fork, it may be broken + # -- see issue 6721. Replace it without letting it be gc'ed. + self._old_locks.append(self._lock) + self._lock = threading.Lock() + if self._listener is not None: + self._listener.close() + self._listener = None + self._address = None + + def _start(self): + from .connection import Listener + assert self._listener is None + debug('starting listener and thread for sending handles') + self._listener = Listener(authkey=current_process().authkey) + self._address = self._listener.address + t = threading.Thread(target=self._serve) + t.daemon = True + t.start() + + def _serve(self): + while 1: + try: + conn = self._listener.accept() + key, destination_pid = conn.recv() + send, close = self._cache.pop(key) + send(conn, destination_pid) + close() + conn.close() + except: + if not is_exiting(): + import traceback + sub_warning( + 'thread for sharing handles raised exception :\n' + + '-'*79 + '\n' + traceback.format_exc() + '-'*79 + ) + +resource_sharer = ResourceSharer() diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index f9d58c7..4c22aef 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1959,49 +1959,49 @@ class _TestPoll(unittest.TestCase): # # Test of sending connection and socket objects between processes # -""" + +@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") class _TestPicklingConnections(BaseTestCase): ALLOWED_TYPES = ('processes',) - def _listener(self, conn, families): + @classmethod + def _listener(cls, conn, families): for fam in families: - l = self.connection.Listener(family=fam) + l = cls.connection.Listener(family=fam) conn.send(l.address) new_conn = l.accept() conn.send(new_conn) + new_conn.close() + l.close() - if self.TYPE == 'processes': - l = socket.socket() - l.bind(('localhost', 0)) - conn.send(l.getsockname()) - l.listen(1) - new_conn, addr = l.accept() - conn.send(new_conn) + l = socket.socket() + l.bind(('localhost', 0)) + conn.send(l.getsockname()) + l.listen(1) + new_conn, addr = l.accept() + conn.send(new_conn) + new_conn.close() + l.close() conn.recv() - def _remote(self, conn): + @classmethod + def _remote(cls, conn): for (address, msg) in iter(conn.recv, None): - client = self.connection.Client(address) + client = cls.connection.Client(address) client.send(msg.upper()) client.close() - if self.TYPE == 'processes': - address, msg = conn.recv() - client = socket.socket() - client.connect(address) - client.sendall(msg.upper()) - client.close() + address, msg = conn.recv() + client = socket.socket() + client.connect(address) + client.sendall(msg.upper()) + client.close() conn.close() def test_pickling(self): - try: - multiprocessing.allow_connection_pickling() - except ImportError: - return - families = self.connection.families lconn, lconn0 = self.Pipe() @@ -2025,16 +2025,12 @@ class _TestPicklingConnections(BaseTestCase): rconn.send(None) - if self.TYPE == 'processes': - msg = latin('This connection uses a normal socket') - address = lconn.recv() - rconn.send((address, msg)) - if hasattr(socket, 'fromfd'): - new_conn = lconn.recv() - self.assertEqual(new_conn.recv(100), msg.upper()) - else: - # XXX On Windows with Py2.6 need to backport fromfd() - discard = lconn.recv_bytes() + msg = latin('This connection uses a normal socket') + address = lconn.recv() + rconn.send((address, msg)) + new_conn = lconn.recv() + self.assertEqual(new_conn.recv(100), msg.upper()) + new_conn.close() lconn.send(None) @@ -2043,7 +2039,46 @@ class _TestPicklingConnections(BaseTestCase): lp.join() rp.join() -""" + + @classmethod + def child_access(cls, conn): + w = conn.recv() + w.send('all is well') + w.close() + + r = conn.recv() + msg = r.recv() + conn.send(msg*2) + + conn.close() + + def test_access(self): + # On Windows, if we do not specify a destination pid when + # using DupHandle then we need to be careful to use the + # correct access flags for DuplicateHandle(), or else + # DupHandle.detach() will raise PermissionError. For example, + # for a read only pipe handle we should use + # access=FILE_GENERIC_READ. (Unfortunately + # DUPLICATE_SAME_ACCESS does not work.) + conn, child_conn = self.Pipe() + p = self.Process(target=self.child_access, args=(child_conn,)) + p.daemon = True + p.start() + child_conn.close() + + r, w = self.Pipe(duplex=False) + conn.send(w) + w.close() + self.assertEqual(r.recv(), 'all is well') + r.close() + + r, w = self.Pipe(duplex=False) + conn.send(r) + r.close() + w.send('foobar') + w.close() + self.assertEqual(conn.recv(), 'foobar'*2) + # # # diff --git a/Misc/NEWS b/Misc/NEWS index c2e7cc3..b8e28c1 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -71,6 +71,9 @@ Core and Builtins Library ------- +- Issue #4892: multiprocessing Connections can now be transferred over + multiprocessing Connections. Patch by Richard Oudkerk (sbt). + - Issue #14160: TarFile.extractfile() failed to resolve symbolic links when the links were not located in an archive subdirectory. diff --git a/Modules/_winapi.c b/Modules/_winapi.c index 4b9455e..6c99394 100644 --- a/Modules/_winapi.c +++ b/Modules/_winapi.c @@ -1280,6 +1280,7 @@ PyInit__winapi(void) WINAPI_CONSTANT(F_DWORD, CREATE_NEW_CONSOLE); WINAPI_CONSTANT(F_DWORD, CREATE_NEW_PROCESS_GROUP); WINAPI_CONSTANT(F_DWORD, DUPLICATE_SAME_ACCESS); + WINAPI_CONSTANT(F_DWORD, DUPLICATE_CLOSE_SOURCE); WINAPI_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS); WINAPI_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE); WINAPI_CONSTANT(F_DWORD, ERROR_IO_PENDING); @@ -1298,6 +1299,8 @@ PyInit__winapi(void) WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT); WINAPI_CONSTANT(F_DWORD, FILE_FLAG_FIRST_PIPE_INSTANCE); WINAPI_CONSTANT(F_DWORD, FILE_FLAG_OVERLAPPED); + WINAPI_CONSTANT(F_DWORD, FILE_GENERIC_READ); + WINAPI_CONSTANT(F_DWORD, FILE_GENERIC_WRITE); WINAPI_CONSTANT(F_DWORD, GENERIC_READ); WINAPI_CONSTANT(F_DWORD, GENERIC_WRITE); WINAPI_CONSTANT(F_DWORD, INFINITE); @@ -1310,6 +1313,7 @@ PyInit__winapi(void) WINAPI_CONSTANT(F_DWORD, PIPE_UNLIMITED_INSTANCES); WINAPI_CONSTANT(F_DWORD, PIPE_WAIT); WINAPI_CONSTANT(F_DWORD, PROCESS_ALL_ACCESS); + WINAPI_CONSTANT(F_DWORD, PROCESS_DUP_HANDLE); WINAPI_CONSTANT(F_DWORD, STARTF_USESHOWWINDOW); WINAPI_CONSTANT(F_DWORD, STARTF_USESTDHANDLES); WINAPI_CONSTANT(F_DWORD, STD_INPUT_HANDLE); -- cgit v0.12