From dc863ddf7924ca6c4771a604b7041562604a85d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Fran=C3=A7ois=20Natali?= Date: Sat, 24 Sep 2011 20:04:29 +0200 Subject: Issue #12981: rewrite multiprocessing_{sendfd,recvfd} in Python. --- Lib/multiprocessing/reduction.py | 21 ++++- Modules/_multiprocessing/multiprocessing.c | 128 +---------------------------- Modules/_multiprocessing/multiprocessing.h | 10 --- 3 files changed, 19 insertions(+), 140 deletions(-) diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index b32c725..042a136 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -39,6 +39,7 @@ import os import sys import socket import threading +import struct import _multiprocessing from multiprocessing import current_process @@ -51,7 +52,8 @@ from multiprocessing.connection import Client, Listener, Connection # # -if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): +if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and + hasattr(socket, 'SCM_RIGHTS'))): raise ImportError('pickling of connections not supported') # @@ -77,10 +79,23 @@ if sys.platform == 'win32': else: def send_handle(conn, handle, destination_pid): - _multiprocessing.sendfd(conn.fileno(), handle) + with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: + s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, + struct.pack("@i", handle))]) def recv_handle(conn): - return _multiprocessing.recvfd(conn.fileno()) + size = struct.calcsize("@i") + with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: + msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) + try: + cmsg_level, cmsg_type, cmsg_data = ancdata[0] + if (cmsg_level == socket.SOL_SOCKET and + cmsg_type == socket.SCM_RIGHTS): + return struct.unpack("@i", cmsg_data[:size])[0] + except (ValueError, IndexError, struct.error): + pass + raise RuntimeError('Invalid data received') + # # Support for a per-process server thread which caches pickled handles diff --git a/Modules/_multiprocessing/multiprocessing.c b/Modules/_multiprocessing/multiprocessing.c index a1e6ed5..890b96d 100644 --- a/Modules/_multiprocessing/multiprocessing.c +++ b/Modules/_multiprocessing/multiprocessing.c @@ -8,11 +8,6 @@ #include "multiprocessing.h" -#ifdef SCM_RIGHTS - #define HAVE_FD_TRANSFER 1 -#else - #define HAVE_FD_TRANSFER 0 -#endif PyObject *create_win32_namespace(void); @@ -75,115 +70,7 @@ ProcessingCtrlHandler(DWORD dwCtrlType) return FALSE; } -/* - * Unix only - */ - -#else /* !MS_WINDOWS */ - -#if HAVE_FD_TRANSFER - -/* Functions for transferring file descriptors between processes. - Reimplements some of the functionality of the fdcred - module at http://www.mca-ltd.com/resources/fdcred_1.tgz. */ -/* Based in http://resin.csoft.net/cgi-bin/man.cgi?section=3&topic=CMSG_DATA */ - -static PyObject * -multiprocessing_sendfd(PyObject *self, PyObject *args) -{ - int conn, fd, res; - struct iovec dummy_iov; - char dummy_char; - struct msghdr msg; - struct cmsghdr *cmsg; - union { - struct cmsghdr hdr; - unsigned char buf[CMSG_SPACE(sizeof(int))]; - } cmsgbuf; - - if (!PyArg_ParseTuple(args, "ii", &conn, &fd)) - return NULL; - - dummy_iov.iov_base = &dummy_char; - dummy_iov.iov_len = 1; - - memset(&msg, 0, sizeof(msg)); - msg.msg_control = &cmsgbuf.buf; - msg.msg_controllen = sizeof(cmsgbuf.buf); - msg.msg_iov = &dummy_iov; - msg.msg_iovlen = 1; - - cmsg = CMSG_FIRSTHDR(&msg); - cmsg->cmsg_len = CMSG_LEN(sizeof(int)); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - * (int *) CMSG_DATA(cmsg) = fd; - - Py_BEGIN_ALLOW_THREADS - res = sendmsg(conn, &msg, 0); - Py_END_ALLOW_THREADS - - if (res < 0) - return PyErr_SetFromErrno(PyExc_OSError); - Py_RETURN_NONE; -} - -static PyObject * -multiprocessing_recvfd(PyObject *self, PyObject *args) -{ - int conn, fd, res; - char dummy_char; - struct iovec dummy_iov; - struct msghdr msg = {0}; - struct cmsghdr *cmsg; - union { - struct cmsghdr hdr; - unsigned char buf[CMSG_SPACE(sizeof(int))]; - } cmsgbuf; - - if (!PyArg_ParseTuple(args, "i", &conn)) - return NULL; - - dummy_iov.iov_base = &dummy_char; - dummy_iov.iov_len = 1; - - memset(&msg, 0, sizeof(msg)); - msg.msg_control = &cmsgbuf.buf; - msg.msg_controllen = sizeof(cmsgbuf.buf); - msg.msg_iov = &dummy_iov; - msg.msg_iovlen = 1; - - cmsg = CMSG_FIRSTHDR(&msg); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = CMSG_LEN(sizeof(int)); - msg.msg_controllen = cmsg->cmsg_len; - - Py_BEGIN_ALLOW_THREADS - res = recvmsg(conn, &msg, 0); - Py_END_ALLOW_THREADS - - if (res < 0) - return PyErr_SetFromErrno(PyExc_OSError); - - if (msg.msg_controllen < CMSG_LEN(sizeof(int)) || - (cmsg = CMSG_FIRSTHDR(&msg)) == NULL || - cmsg->cmsg_level != SOL_SOCKET || - cmsg->cmsg_type != SCM_RIGHTS || - cmsg->cmsg_len < CMSG_LEN(sizeof(int))) { - /* If at least one control message is present, there should be - no room for any further data in the buffer. */ - PyErr_SetString(PyExc_RuntimeError, "No file descriptor received"); - return NULL; - } - - fd = * (int *) CMSG_DATA(cmsg); - return Py_BuildValue("i", fd); -} - -#endif /* HAVE_FD_TRANSFER */ - -#endif /* !MS_WINDOWS */ +#endif /* MS_WINDOWS */ /* @@ -212,16 +99,6 @@ static PyMethodDef module_methods[] = { {"address_of_buffer", multiprocessing_address_of_buffer, METH_O, "address_of_buffer(obj) -> int\n" "Return address of obj assuming obj supports buffer inteface"}, -#if HAVE_FD_TRANSFER - {"sendfd", multiprocessing_sendfd, METH_VARARGS, - "sendfd(sockfd, fd) -> None\n" - "Send file descriptor given by fd over the unix domain socket\n" - "whose file decriptor is sockfd"}, - {"recvfd", multiprocessing_recvfd, METH_VARARGS, - "recvfd(sockfd) -> fd\n" - "Receive a file descriptor over a unix domain socket\n" - "whose file decriptor is sockfd"}, -#endif {NULL} }; @@ -319,9 +196,6 @@ PyInit__multiprocessing(void) #ifdef HAVE_SEM_TIMEDWAIT ADD_FLAG(HAVE_SEM_TIMEDWAIT); #endif -#ifdef HAVE_FD_TRANSFER - ADD_FLAG(HAVE_FD_TRANSFER); -#endif #ifdef HAVE_BROKEN_SEM_GETVALUE ADD_FLAG(HAVE_BROKEN_SEM_GETVALUE); #endif diff --git a/Modules/_multiprocessing/multiprocessing.h b/Modules/_multiprocessing/multiprocessing.h index c303447..ac0dfd7 100644 --- a/Modules/_multiprocessing/multiprocessing.h +++ b/Modules/_multiprocessing/multiprocessing.h @@ -3,12 +3,6 @@ #define PY_SSIZE_T_CLEAN -#ifdef __sun -/* The control message API is only available on Solaris - if XPG 4.2 or later is requested. */ -#define _XOPEN_SOURCE 500 -#endif - #include "Python.h" #include "structmember.h" #include "pythread.h" @@ -29,10 +23,6 @@ # define SEM_VALUE_MAX LONG_MAX #else # include /* O_CREAT and O_EXCL */ -# include -# include -# include -# include /* htonl() and ntohl() */ # if defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED) # include typedef sem_t *SEM_HANDLE; -- cgit v0.12