diff options
author | Charles-François Natali <neologix@free.fr> | 2011-09-24 18:04:29 (GMT) |
---|---|---|
committer | Charles-François Natali <neologix@free.fr> | 2011-09-24 18:04:29 (GMT) |
commit | dc863ddf7924ca6c4771a604b7041562604a85d9 (patch) | |
tree | 201f44689f03f091de3b3a9835cc57323b8e3638 /Lib/multiprocessing | |
parent | 57e683e53eed1455176b17304b3ac007ae7eb181 (diff) | |
download | cpython-dc863ddf7924ca6c4771a604b7041562604a85d9.zip cpython-dc863ddf7924ca6c4771a604b7041562604a85d9.tar.gz cpython-dc863ddf7924ca6c4771a604b7041562604a85d9.tar.bz2 |
Issue #12981: rewrite multiprocessing_{sendfd,recvfd} in Python.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/reduction.py | 21 |
1 files changed, 18 insertions, 3 deletions
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index b32c725..042a136 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -39,6 +39,7 @@ import os import sys import socket import threading +import struct import _multiprocessing from multiprocessing import current_process @@ -51,7 +52,8 @@ from multiprocessing.connection import Client, Listener, Connection # # -if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): +if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and + hasattr(socket, 'SCM_RIGHTS'))): raise ImportError('pickling of connections not supported') # @@ -77,10 +79,23 @@ if sys.platform == 'win32': else: def send_handle(conn, handle, destination_pid): - _multiprocessing.sendfd(conn.fileno(), handle) + with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: + s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, + struct.pack("@i", handle))]) def recv_handle(conn): - return _multiprocessing.recvfd(conn.fileno()) + size = struct.calcsize("@i") + with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: + msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) + try: + cmsg_level, cmsg_type, cmsg_data = ancdata[0] + if (cmsg_level == socket.SOL_SOCKET and + cmsg_type == socket.SCM_RIGHTS): + return struct.unpack("@i", cmsg_data[:size])[0] + except (ValueError, IndexError, struct.error): + pass + raise RuntimeError('Invalid data received') + # # Support for a per-process server thread which caches pickled handles |