summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2012-04-24 20:56:57 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2012-04-24 20:56:57 (GMT)
commit5438ed1572f5e379cac8b85ed2226101a1bfcacc (patch)
treebf6a9df1be40d5d3df6fbfcdee606d5a49fca273
parent9f478c021dc499b0d23ee418c0dcc6b5076524aa (diff)
downloadcpython-5438ed1572f5e379cac8b85ed2226101a1bfcacc.zip
cpython-5438ed1572f5e379cac8b85ed2226101a1bfcacc.tar.gz
cpython-5438ed1572f5e379cac8b85ed2226101a1bfcacc.tar.bz2
Issue #4892: multiprocessing Connections can now be transferred over multiprocessing Connections.
Patch by Richard Oudkerk (sbt).
-rw-r--r--Doc/library/multiprocessing.rst4
-rw-r--r--Lib/multiprocessing/__init__.py4
-rw-r--r--Lib/multiprocessing/connection.py24
-rw-r--r--Lib/multiprocessing/forking.py19
-rw-r--r--Lib/multiprocessing/reduction.py315
-rw-r--r--Lib/test/test_multiprocessing.py103
-rw-r--r--Misc/NEWS3
-rw-r--r--Modules/_winapi.c4
8 files changed, 283 insertions, 193 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
index b9dfd19..bdc07f1 100644
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -832,6 +832,10 @@ Connection objects are usually created using :func:`Pipe` -- see also
raised and the complete message is available as ``e.args[0]`` where ``e``
is the exception instance.
+ .. versionchanged:: 3.3
+ Connection objects themselves can now be transferred between processes
+ using :meth:`Connection.send` and :meth:`Connection.recv`.
+
For example:
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
index e012440..28380e5 100644
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -161,7 +161,9 @@ def allow_connection_pickling():
'''
Install support for sending connections and sockets between processes
'''
- from multiprocessing import reduction
+ # This is undocumented. In previous versions of multiprocessing
+ # its only effect was to make socket objects inheritable on Windows.
+ import multiprocessing.connection
#
# Definitions depending on native semaphores
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 3a61e5e..64d71bc 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -50,6 +50,7 @@ import _multiprocessing
from multiprocessing import current_process, AuthenticationError, BufferTooShort
from multiprocessing.util import (
get_temp_dir, Finalize, sub_debug, debug, _eintr_retry)
+from multiprocessing.forking import ForkingPickler
try:
import _winapi
from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
@@ -227,8 +228,9 @@ class _ConnectionBase:
"""Send a (picklable) object"""
self._check_closed()
self._check_writable()
- buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
- self._send_bytes(memoryview(buf))
+ buf = io.BytesIO()
+ ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
+ self._send_bytes(buf.getbuffer())
def recv_bytes(self, maxlength=None):
"""
@@ -880,3 +882,21 @@ else:
raise
if timeout is not None:
timeout = deadline - time.time()
+
+#
+# Make connection and socket objects sharable if possible
+#
+
+if sys.platform == 'win32':
+ from . import reduction
+ ForkingPickler.register(socket.socket, reduction.reduce_socket)
+ ForkingPickler.register(Connection, reduction.reduce_connection)
+ ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection)
+else:
+ try:
+ from . import reduction
+ except ImportError:
+ pass
+ else:
+ ForkingPickler.register(socket.socket, reduction.reduce_socket)
+ ForkingPickler.register(Connection, reduction.reduce_connection)
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
index 0cbb741..15fdb0e 100644
--- a/Lib/multiprocessing/forking.py
+++ b/Lib/multiprocessing/forking.py
@@ -407,25 +407,6 @@ else:
return d
- #
- # Make (Pipe)Connection picklable
- #
-
- # Late import because of circular import
- from .connection import Connection, PipeConnection
-
- def reduce_connection(conn):
- if not Popen.thread_is_spawning():
- raise RuntimeError(
- 'By default %s objects can only be shared between processes\n'
- 'using inheritance' % type(conn).__name__
- )
- return type(conn), (Popen.duplicate_for_child(conn.fileno()),
- conn.readable, conn.writable)
-
- ForkingPickler.register(Connection, reduce_connection)
- ForkingPickler.register(PipeConnection, reduce_connection)
-
#
# Prepare current process
#
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index c80de59..ce38fe3 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -33,7 +33,7 @@
# SUCH DAMAGE.
#
-__all__ = []
+__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle']
import os
import sys
@@ -42,9 +42,8 @@ import threading
import struct
from multiprocessing import current_process
-from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.connection import Client, Listener, Connection
+from multiprocessing.util import is_exiting, sub_warning
#
@@ -60,22 +59,91 @@ if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
#
if sys.platform == 'win32':
+ # Windows
+ __all__ += ['reduce_pipe_connection']
import _winapi
def send_handle(conn, handle, destination_pid):
- process_handle = _winapi.OpenProcess(
- _winapi.PROCESS_ALL_ACCESS, False, destination_pid
- )
- try:
- new_handle = duplicate(handle, process_handle)
- conn.send(new_handle)
- finally:
- close(process_handle)
+ dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
+ conn.send(dh)
def recv_handle(conn):
- return conn.recv()
+ return conn.recv().detach()
+
+ class DupHandle(object):
+ def __init__(self, handle, access, pid=None):
+ # duplicate handle for process with given pid
+ if pid is None:
+ pid = os.getpid()
+ proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
+ try:
+ self._handle = _winapi.DuplicateHandle(
+ _winapi.GetCurrentProcess(),
+ handle, proc, access, False, 0)
+ finally:
+ _winapi.CloseHandle(proc)
+ self._access = access
+ self._pid = pid
+
+ def detach(self):
+ # retrieve handle from process which currently owns it
+ if self._pid == os.getpid():
+ return self._handle
+ proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
+ self._pid)
+ try:
+ return _winapi.DuplicateHandle(
+ proc, self._handle, _winapi.GetCurrentProcess(),
+ self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
+ finally:
+ _winapi.CloseHandle(proc)
+
+ class DupSocket(object):
+ def __init__(self, sock):
+ new_sock = sock.dup()
+ def send(conn, pid):
+ share = new_sock.share(pid)
+ conn.send_bytes(share)
+ self._id = resource_sharer.register(send, new_sock.close)
+
+ def detach(self):
+ conn = resource_sharer.get_connection(self._id)
+ try:
+ share = conn.recv_bytes()
+ return socket.fromshare(share)
+ finally:
+ conn.close()
+
+ def reduce_socket(s):
+ return rebuild_socket, (DupSocket(s),)
+
+ def rebuild_socket(ds):
+ return ds.detach()
+
+ def reduce_connection(conn):
+ handle = conn.fileno()
+ with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
+ ds = DupSocket(s)
+ return rebuild_connection, (ds, conn.readable, conn.writable)
+
+ def rebuild_connection(ds, readable, writable):
+ from .connection import Connection
+ sock = ds.detach()
+ return Connection(sock.detach(), readable, writable)
+
+ def reduce_pipe_connection(conn):
+ access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
+ (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
+ dh = DupHandle(conn.fileno(), access)
+ return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
+
+ def rebuild_pipe_connection(dh, readable, writable):
+ from .connection import PipeConnection
+ handle = dh.detach()
+ return PipeConnection(handle, readable, writable)
else:
+ # Unix
def send_handle(conn, handle, destination_pid):
with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
@@ -94,136 +162,109 @@ else:
pass
raise RuntimeError('Invalid data received')
+ class DupFd(object):
+ def __init__(self, fd):
+ new_fd = os.dup(fd)
+ def send(conn, pid):
+ send_handle(conn, new_fd, pid)
+ def close():
+ os.close(new_fd)
+ self._id = resource_sharer.register(send, close)
+
+ def detach(self):
+ conn = resource_sharer.get_connection(self._id)
+ try:
+ return recv_handle(conn)
+ finally:
+ conn.close()
-#
-# Support for a per-process server thread which caches pickled handles
-#
-
-_cache = set()
-
-def _reset(obj):
- global _lock, _listener, _cache
- for h in _cache:
- close(h)
- _cache.clear()
- _lock = threading.Lock()
- _listener = None
-
-_reset(None)
-register_after_fork(_reset, _reset)
-
-def _get_listener():
- global _listener
-
- if _listener is None:
- _lock.acquire()
- try:
- if _listener is None:
- debug('starting listener and thread for sending handles')
- _listener = Listener(authkey=current_process().authkey)
- t = threading.Thread(target=_serve)
- t.daemon = True
- t.start()
- finally:
- _lock.release()
-
- return _listener
-
-def _serve():
- from .util import is_exiting, sub_warning
-
- while 1:
- try:
- conn = _listener.accept()
- handle_wanted, destination_pid = conn.recv()
- _cache.remove(handle_wanted)
- send_handle(conn, handle_wanted, destination_pid)
- close(handle_wanted)
- conn.close()
- except:
- if not is_exiting():
- import traceback
- sub_warning(
- 'thread for sharing handles raised exception :\n' +
- '-'*79 + '\n' + traceback.format_exc() + '-'*79
- )
-
-#
-# Functions to be used for pickling/unpickling objects with handles
-#
-
-def reduce_handle(handle):
- if Popen.thread_is_spawning():
- return (None, Popen.duplicate_for_child(handle), True)
- dup_handle = duplicate(handle)
- _cache.add(dup_handle)
- sub_debug('reducing handle %d', handle)
- return (_get_listener().address, dup_handle, False)
-
-def rebuild_handle(pickled_data):
- address, handle, inherited = pickled_data
- if inherited:
- return handle
- sub_debug('rebuilding handle %d', handle)
- conn = Client(address, authkey=current_process().authkey)
- conn.send((handle, os.getpid()))
- new_handle = recv_handle(conn)
- conn.close()
- return new_handle
-
-#
-# Register `Connection` with `ForkingPickler`
-#
-
-def reduce_connection(conn):
- rh = reduce_handle(conn.fileno())
- return rebuild_connection, (rh, conn.readable, conn.writable)
-
-def rebuild_connection(reduced_handle, readable, writable):
- handle = rebuild_handle(reduced_handle)
- return Connection(
- handle, readable=readable, writable=writable
- )
-
-ForkingPickler.register(Connection, reduce_connection)
-
-#
-# Register `socket.socket` with `ForkingPickler`
-#
-
-def fromfd(fd, family, type_, proto=0):
- s = socket.fromfd(fd, family, type_, proto)
- if s.__class__ is not socket.socket:
- s = socket.socket(_sock=s)
- return s
+ def reduce_socket(s):
+ df = DupFd(s.fileno())
+ return rebuild_socket, (df, s.family, s.type, s.proto)
-def reduce_socket(s):
- reduced_handle = reduce_handle(s.fileno())
- return rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
+ def rebuild_socket(df, family, type, proto):
+ fd = df.detach()
+ s = socket.fromfd(fd, family, type, proto)
+ os.close(fd)
+ return s
-def rebuild_socket(reduced_handle, family, type_, proto):
- fd = rebuild_handle(reduced_handle)
- _sock = fromfd(fd, family, type_, proto)
- close(fd)
- return _sock
+ def reduce_connection(conn):
+ df = DupFd(conn.fileno())
+ return rebuild_connection, (df, conn.readable, conn.writable)
-ForkingPickler.register(socket.socket, reduce_socket)
+ def rebuild_connection(df, readable, writable):
+ from .connection import Connection
+ fd = df.detach()
+ return Connection(fd, readable, writable)
#
-# Register `_multiprocessing.PipeConnection` with `ForkingPickler`
+# Server which shares registered resources with clients
#
-if sys.platform == 'win32':
- from multiprocessing.connection import PipeConnection
-
- def reduce_pipe_connection(conn):
- rh = reduce_handle(conn.fileno())
- return rebuild_pipe_connection, (rh, conn.readable, conn.writable)
-
- def rebuild_pipe_connection(reduced_handle, readable, writable):
- handle = rebuild_handle(reduced_handle)
- return PipeConnection(
- handle, readable=readable, writable=writable
- )
-
- ForkingPickler.register(PipeConnection, reduce_pipe_connection)
+class ResourceSharer(object):
+ def __init__(self):
+ self._key = 0
+ self._cache = {}
+ self._old_locks = []
+ self._lock = threading.Lock()
+ self._listener = None
+ self._address = None
+ register_after_fork(self, ResourceSharer._afterfork)
+
+ def register(self, send, close):
+ with self._lock:
+ if self._address is None:
+ self._start()
+ self._key += 1
+ self._cache[self._key] = (send, close)
+ return (self._address, self._key)
+
+ @staticmethod
+ def get_connection(ident):
+ from .connection import Client
+ address, key = ident
+ c = Client(address, authkey=current_process().authkey)
+ c.send((key, os.getpid()))
+ return c
+
+ def _afterfork(self):
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+ # If self._lock was locked at the time of the fork, it may be broken
+ # -- see issue 6721. Replace it without letting it be gc'ed.
+ self._old_locks.append(self._lock)
+ self._lock = threading.Lock()
+ if self._listener is not None:
+ self._listener.close()
+ self._listener = None
+ self._address = None
+
+ def _start(self):
+ from .connection import Listener
+ assert self._listener is None
+ debug('starting listener and thread for sending handles')
+ self._listener = Listener(authkey=current_process().authkey)
+ self._address = self._listener.address
+ t = threading.Thread(target=self._serve)
+ t.daemon = True
+ t.start()
+
+ def _serve(self):
+ while 1:
+ try:
+ conn = self._listener.accept()
+ key, destination_pid = conn.recv()
+ send, close = self._cache.pop(key)
+ send(conn, destination_pid)
+ close()
+ conn.close()
+ except:
+ if not is_exiting():
+ import traceback
+ sub_warning(
+ 'thread for sharing handles raised exception :\n' +
+ '-'*79 + '\n' + traceback.format_exc() + '-'*79
+ )
+
+resource_sharer = ResourceSharer()
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index f9d58c7..4c22aef 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1959,49 +1959,49 @@ class _TestPoll(unittest.TestCase):
#
# Test of sending connection and socket objects between processes
#
-"""
+
+@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
class _TestPicklingConnections(BaseTestCase):
ALLOWED_TYPES = ('processes',)
- def _listener(self, conn, families):
+ @classmethod
+ def _listener(cls, conn, families):
for fam in families:
- l = self.connection.Listener(family=fam)
+ l = cls.connection.Listener(family=fam)
conn.send(l.address)
new_conn = l.accept()
conn.send(new_conn)
+ new_conn.close()
+ l.close()
- if self.TYPE == 'processes':
- l = socket.socket()
- l.bind(('localhost', 0))
- conn.send(l.getsockname())
- l.listen(1)
- new_conn, addr = l.accept()
- conn.send(new_conn)
+ l = socket.socket()
+ l.bind(('localhost', 0))
+ conn.send(l.getsockname())
+ l.listen(1)
+ new_conn, addr = l.accept()
+ conn.send(new_conn)
+ new_conn.close()
+ l.close()
conn.recv()
- def _remote(self, conn):
+ @classmethod
+ def _remote(cls, conn):
for (address, msg) in iter(conn.recv, None):
- client = self.connection.Client(address)
+ client = cls.connection.Client(address)
client.send(msg.upper())
client.close()
- if self.TYPE == 'processes':
- address, msg = conn.recv()
- client = socket.socket()
- client.connect(address)
- client.sendall(msg.upper())
- client.close()
+ address, msg = conn.recv()
+ client = socket.socket()
+ client.connect(address)
+ client.sendall(msg.upper())
+ client.close()
conn.close()
def test_pickling(self):
- try:
- multiprocessing.allow_connection_pickling()
- except ImportError:
- return
-
families = self.connection.families
lconn, lconn0 = self.Pipe()
@@ -2025,16 +2025,12 @@ class _TestPicklingConnections(BaseTestCase):
rconn.send(None)
- if self.TYPE == 'processes':
- msg = latin('This connection uses a normal socket')
- address = lconn.recv()
- rconn.send((address, msg))
- if hasattr(socket, 'fromfd'):
- new_conn = lconn.recv()
- self.assertEqual(new_conn.recv(100), msg.upper())
- else:
- # XXX On Windows with Py2.6 need to backport fromfd()
- discard = lconn.recv_bytes()
+ msg = latin('This connection uses a normal socket')
+ address = lconn.recv()
+ rconn.send((address, msg))
+ new_conn = lconn.recv()
+ self.assertEqual(new_conn.recv(100), msg.upper())
+ new_conn.close()
lconn.send(None)
@@ -2043,7 +2039,46 @@ class _TestPicklingConnections(BaseTestCase):
lp.join()
rp.join()
-"""
+
+ @classmethod
+ def child_access(cls, conn):
+ w = conn.recv()
+ w.send('all is well')
+ w.close()
+
+ r = conn.recv()
+ msg = r.recv()
+ conn.send(msg*2)
+
+ conn.close()
+
+ def test_access(self):
+ # On Windows, if we do not specify a destination pid when
+ # using DupHandle then we need to be careful to use the
+ # correct access flags for DuplicateHandle(), or else
+ # DupHandle.detach() will raise PermissionError. For example,
+ # for a read only pipe handle we should use
+ # access=FILE_GENERIC_READ. (Unfortunately
+ # DUPLICATE_SAME_ACCESS does not work.)
+ conn, child_conn = self.Pipe()
+ p = self.Process(target=self.child_access, args=(child_conn,))
+ p.daemon = True
+ p.start()
+ child_conn.close()
+
+ r, w = self.Pipe(duplex=False)
+ conn.send(w)
+ w.close()
+ self.assertEqual(r.recv(), 'all is well')
+ r.close()
+
+ r, w = self.Pipe(duplex=False)
+ conn.send(r)
+ r.close()
+ w.send('foobar')
+ w.close()
+ self.assertEqual(conn.recv(), 'foobar'*2)
+
#
#
#
diff --git a/Misc/NEWS b/Misc/NEWS
index c2e7cc3..b8e28c1 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -71,6 +71,9 @@ Core and Builtins
Library
-------
+- Issue #4892: multiprocessing Connections can now be transferred over
+ multiprocessing Connections. Patch by Richard Oudkerk (sbt).
+
- Issue #14160: TarFile.extractfile() failed to resolve symbolic links when
the links were not located in an archive subdirectory.
diff --git a/Modules/_winapi.c b/Modules/_winapi.c
index 4b9455e..6c99394 100644
--- a/Modules/_winapi.c
+++ b/Modules/_winapi.c
@@ -1280,6 +1280,7 @@ PyInit__winapi(void)
WINAPI_CONSTANT(F_DWORD, CREATE_NEW_CONSOLE);
WINAPI_CONSTANT(F_DWORD, CREATE_NEW_PROCESS_GROUP);
WINAPI_CONSTANT(F_DWORD, DUPLICATE_SAME_ACCESS);
+ WINAPI_CONSTANT(F_DWORD, DUPLICATE_CLOSE_SOURCE);
WINAPI_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS);
WINAPI_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE);
WINAPI_CONSTANT(F_DWORD, ERROR_IO_PENDING);
@@ -1298,6 +1299,8 @@ PyInit__winapi(void)
WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
WINAPI_CONSTANT(F_DWORD, FILE_FLAG_FIRST_PIPE_INSTANCE);
WINAPI_CONSTANT(F_DWORD, FILE_FLAG_OVERLAPPED);
+ WINAPI_CONSTANT(F_DWORD, FILE_GENERIC_READ);
+ WINAPI_CONSTANT(F_DWORD, FILE_GENERIC_WRITE);
WINAPI_CONSTANT(F_DWORD, GENERIC_READ);
WINAPI_CONSTANT(F_DWORD, GENERIC_WRITE);
WINAPI_CONSTANT(F_DWORD, INFINITE);
@@ -1310,6 +1313,7 @@ PyInit__winapi(void)
WINAPI_CONSTANT(F_DWORD, PIPE_UNLIMITED_INSTANCES);
WINAPI_CONSTANT(F_DWORD, PIPE_WAIT);
WINAPI_CONSTANT(F_DWORD, PROCESS_ALL_ACCESS);
+ WINAPI_CONSTANT(F_DWORD, PROCESS_DUP_HANDLE);
WINAPI_CONSTANT(F_DWORD, STARTF_USESHOWWINDOW);
WINAPI_CONSTANT(F_DWORD, STARTF_USESTDHANDLES);
WINAPI_CONSTANT(F_DWORD, STD_INPUT_HANDLE);