diff options
Diffstat (limited to 'Modules/_multiprocessing')
-rw-r--r-- | Modules/_multiprocessing/clinic/posixshmem.c.h | 133 | ||||
-rw-r--r-- | Modules/_multiprocessing/connection.h | 519 | ||||
-rw-r--r-- | Modules/_multiprocessing/multiprocessing.c | 311 | ||||
-rw-r--r-- | Modules/_multiprocessing/multiprocessing.h | 108 | ||||
-rw-r--r-- | Modules/_multiprocessing/pipe_connection.c | 149 | ||||
-rw-r--r-- | Modules/_multiprocessing/posixshmem.c | 131 | ||||
-rw-r--r-- | Modules/_multiprocessing/semaphore.c | 227 | ||||
-rw-r--r-- | Modules/_multiprocessing/socket_connection.c | 277 | ||||
-rw-r--r-- | Modules/_multiprocessing/win32_functions.c | 267 |
9 files changed, 1625 insertions, 497 deletions
diff --git a/Modules/_multiprocessing/clinic/posixshmem.c.h b/Modules/_multiprocessing/clinic/posixshmem.c.h deleted file mode 100644 index a99f0d2..0000000 --- a/Modules/_multiprocessing/clinic/posixshmem.c.h +++ /dev/null @@ -1,133 +0,0 @@ -/*[clinic input] -preserve -[clinic start generated code]*/ - -#if defined(HAVE_SHM_OPEN) - -PyDoc_STRVAR(_posixshmem_shm_open__doc__, -"shm_open($module, /, path, flags, mode=511)\n" -"--\n" -"\n" -"Open a shared memory object. Returns a file descriptor (integer)."); - -#define _POSIXSHMEM_SHM_OPEN_METHODDEF \ - {"shm_open", (PyCFunction)(void(*)(void))_posixshmem_shm_open, METH_FASTCALL|METH_KEYWORDS, _posixshmem_shm_open__doc__}, - -static int -_posixshmem_shm_open_impl(PyObject *module, PyObject *path, int flags, - int mode); - -static PyObject * -_posixshmem_shm_open(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) -{ - PyObject *return_value = NULL; - static const char * const _keywords[] = {"path", "flags", "mode", NULL}; - static _PyArg_Parser _parser = {NULL, _keywords, "shm_open", 0}; - PyObject *argsbuf[3]; - Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 2; - PyObject *path; - int flags; - int mode = 511; - int _return_value; - - args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 2, 3, 0, argsbuf); - if (!args) { - goto exit; - } - if (!PyUnicode_Check(args[0])) { - _PyArg_BadArgument("shm_open", "argument 'path'", "str", args[0]); - goto exit; - } - if (PyUnicode_READY(args[0]) == -1) { - goto exit; - } - path = args[0]; - if (PyFloat_Check(args[1])) { - PyErr_SetString(PyExc_TypeError, - "integer argument expected, got float" ); - goto exit; - } - flags = _PyLong_AsInt(args[1]); - if (flags == -1 && PyErr_Occurred()) { - goto exit; - } - if (!noptargs) { - goto skip_optional_pos; - } - if (PyFloat_Check(args[2])) { - PyErr_SetString(PyExc_TypeError, - "integer argument expected, got float" ); - goto exit; - } - mode = _PyLong_AsInt(args[2]); - if (mode == -1 && PyErr_Occurred()) { - goto exit; - } -skip_optional_pos: - _return_value = _posixshmem_shm_open_impl(module, path, flags, mode); - if ((_return_value == -1) && PyErr_Occurred()) { - goto exit; - } - return_value = PyLong_FromLong((long)_return_value); - -exit: - return return_value; -} - -#endif /* defined(HAVE_SHM_OPEN) */ - -#if defined(HAVE_SHM_UNLINK) - -PyDoc_STRVAR(_posixshmem_shm_unlink__doc__, -"shm_unlink($module, /, path)\n" -"--\n" -"\n" -"Remove a shared memory object (similar to unlink()).\n" -"\n" -"Remove a shared memory object name, and, once all processes have unmapped\n" -"the object, de-allocates and destroys the contents of the associated memory\n" -"region."); - -#define _POSIXSHMEM_SHM_UNLINK_METHODDEF \ - {"shm_unlink", (PyCFunction)(void(*)(void))_posixshmem_shm_unlink, METH_FASTCALL|METH_KEYWORDS, _posixshmem_shm_unlink__doc__}, - -static PyObject * -_posixshmem_shm_unlink_impl(PyObject *module, PyObject *path); - -static PyObject * -_posixshmem_shm_unlink(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) -{ - PyObject *return_value = NULL; - static const char * const _keywords[] = {"path", NULL}; - static _PyArg_Parser _parser = {NULL, _keywords, "shm_unlink", 0}; - PyObject *argsbuf[1]; - PyObject *path; - - args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); - if (!args) { - goto exit; - } - if (!PyUnicode_Check(args[0])) { - _PyArg_BadArgument("shm_unlink", "argument 'path'", "str", args[0]); - goto exit; - } - if (PyUnicode_READY(args[0]) == -1) { - goto exit; - } - path = args[0]; - return_value = _posixshmem_shm_unlink_impl(module, path); - -exit: - return return_value; -} - -#endif /* defined(HAVE_SHM_UNLINK) */ - -#ifndef _POSIXSHMEM_SHM_OPEN_METHODDEF - #define _POSIXSHMEM_SHM_OPEN_METHODDEF -#endif /* !defined(_POSIXSHMEM_SHM_OPEN_METHODDEF) */ - -#ifndef _POSIXSHMEM_SHM_UNLINK_METHODDEF - #define _POSIXSHMEM_SHM_UNLINK_METHODDEF -#endif /* !defined(_POSIXSHMEM_SHM_UNLINK_METHODDEF) */ -/*[clinic end generated code: output=9132861c61d8c2d8 input=a9049054013a1b77]*/ diff --git a/Modules/_multiprocessing/connection.h b/Modules/_multiprocessing/connection.h new file mode 100644 index 0000000..12e6eee --- /dev/null +++ b/Modules/_multiprocessing/connection.h @@ -0,0 +1,519 @@ +/* + * Definition of a `Connection` type. + * Used by `socket_connection.c` and `pipe_connection.c`. + * + * connection.h + * + * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt + */ + +#ifndef CONNECTION_H +#define CONNECTION_H + +/* + * Read/write flags + */ + +#define READABLE 1 +#define WRITABLE 2 + +#define CHECK_READABLE(self) \ + if (!(self->flags & READABLE)) { \ + PyErr_SetString(PyExc_IOError, "connection is write-only"); \ + return NULL; \ + } + +#define CHECK_WRITABLE(self) \ + if (!(self->flags & WRITABLE)) { \ + PyErr_SetString(PyExc_IOError, "connection is read-only"); \ + return NULL; \ + } + +/* + * Allocation and deallocation + */ + +static PyObject * +connection_new(PyTypeObject *type, PyObject *args, PyObject *kwds) +{ + ConnectionObject *self; + HANDLE handle; + BOOL readable = TRUE, writable = TRUE; + + static char *kwlist[] = {"handle", "readable", "writable", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwds, F_HANDLE "|ii", kwlist, + &handle, &readable, &writable)) + return NULL; + + if (handle == INVALID_HANDLE_VALUE || (Py_ssize_t)handle < 0) { + PyErr_Format(PyExc_IOError, "invalid handle %zd", + (Py_ssize_t)handle); + return NULL; + } + + if (!readable && !writable) { + PyErr_SetString(PyExc_ValueError, + "either readable or writable must be true"); + return NULL; + } + + self = PyObject_New(ConnectionObject, type); + if (self == NULL) + return NULL; + + self->weakreflist = NULL; + self->handle = handle; + self->flags = 0; + + if (readable) + self->flags |= READABLE; + if (writable) + self->flags |= WRITABLE; + assert(self->flags >= 1 && self->flags <= 3); + + return (PyObject*)self; +} + +static void +connection_dealloc(ConnectionObject* self) +{ + if (self->weakreflist != NULL) + PyObject_ClearWeakRefs((PyObject*)self); + + if (self->handle != INVALID_HANDLE_VALUE) { + Py_BEGIN_ALLOW_THREADS + CLOSE(self->handle); + Py_END_ALLOW_THREADS + } + PyObject_Del(self); +} + +/* + * Functions for transferring buffers + */ + +static PyObject * +connection_sendbytes(ConnectionObject *self, PyObject *args) +{ + char *buffer; + Py_ssize_t length, offset=0, size=PY_SSIZE_T_MIN; + int res; + + if (!PyArg_ParseTuple(args, F_RBUFFER "#|" F_PY_SSIZE_T F_PY_SSIZE_T, + &buffer, &length, &offset, &size)) + return NULL; + + CHECK_WRITABLE(self); + + if (offset < 0) { + PyErr_SetString(PyExc_ValueError, "offset is negative"); + return NULL; + } + if (length < offset) { + PyErr_SetString(PyExc_ValueError, "buffer length < offset"); + return NULL; + } + + if (size == PY_SSIZE_T_MIN) { + size = length - offset; + } else { + if (size < 0) { + PyErr_SetString(PyExc_ValueError, "size is negative"); + return NULL; + } + if (offset + size > length) { + PyErr_SetString(PyExc_ValueError, + "buffer length < offset + size"); + return NULL; + } + } + + res = conn_send_string(self, buffer + offset, size); + + if (res < 0) { + if (PyErr_Occurred()) + return NULL; + else + return mp_SetError(PyExc_IOError, res); + } + + Py_RETURN_NONE; +} + +static PyObject * +connection_recvbytes(ConnectionObject *self, PyObject *args) +{ + char *freeme = NULL; + Py_ssize_t res, maxlength = PY_SSIZE_T_MAX; + PyObject *result = NULL; + + if (!PyArg_ParseTuple(args, "|" F_PY_SSIZE_T, &maxlength)) + return NULL; + + CHECK_READABLE(self); + + if (maxlength < 0) { + PyErr_SetString(PyExc_ValueError, "maxlength < 0"); + return NULL; + } + + res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE, + &freeme, maxlength); + + if (res < 0) { + if (res == MP_BAD_MESSAGE_LENGTH) { + if ((self->flags & WRITABLE) == 0) { + Py_BEGIN_ALLOW_THREADS + CLOSE(self->handle); + Py_END_ALLOW_THREADS + self->handle = INVALID_HANDLE_VALUE; + } else { + self->flags = WRITABLE; + } + } + mp_SetError(PyExc_IOError, res); + } else { + if (freeme == NULL) { + result = PyString_FromStringAndSize(self->buffer, res); + } else { + result = PyString_FromStringAndSize(freeme, res); + PyMem_Free(freeme); + } + } + + return result; +} + +static PyObject * +connection_recvbytes_into(ConnectionObject *self, PyObject *args) +{ + char *freeme = NULL, *buffer = NULL; + Py_ssize_t res, length, offset = 0; + PyObject *result = NULL; + Py_buffer pbuf; + + CHECK_READABLE(self); + + if (!PyArg_ParseTuple(args, "w*|" F_PY_SSIZE_T, + &pbuf, &offset)) + return NULL; + + buffer = pbuf.buf; + length = pbuf.len; + + if (offset < 0) { + PyErr_SetString(PyExc_ValueError, "negative offset"); + goto _error; + } + + if (offset > length) { + PyErr_SetString(PyExc_ValueError, "offset too large"); + goto _error; + } + + res = conn_recv_string(self, buffer+offset, length-offset, + &freeme, PY_SSIZE_T_MAX); + + if (res < 0) { + if (res == MP_BAD_MESSAGE_LENGTH) { + if ((self->flags & WRITABLE) == 0) { + Py_BEGIN_ALLOW_THREADS + CLOSE(self->handle); + Py_END_ALLOW_THREADS + self->handle = INVALID_HANDLE_VALUE; + } else { + self->flags = WRITABLE; + } + } + mp_SetError(PyExc_IOError, res); + } else { + if (freeme == NULL) { + result = PyInt_FromSsize_t(res); + } else { + result = PyObject_CallFunction(BufferTooShort, + F_RBUFFER "#", + freeme, res); + PyMem_Free(freeme); + if (result) { + PyErr_SetObject(BufferTooShort, result); + Py_DECREF(result); + } + goto _error; + } + } + +_cleanup: + PyBuffer_Release(&pbuf); + return result; + +_error: + result = NULL; + goto _cleanup; +} + +/* + * Functions for transferring objects + */ + +static PyObject * +connection_send_obj(ConnectionObject *self, PyObject *obj) +{ + char *buffer; + int res; + Py_ssize_t length; + PyObject *pickled_string = NULL; + + CHECK_WRITABLE(self); + + pickled_string = PyObject_CallFunctionObjArgs(pickle_dumps, obj, + pickle_protocol, NULL); + if (!pickled_string) + goto failure; + + if (PyString_AsStringAndSize(pickled_string, &buffer, &length) < 0) + goto failure; + + res = conn_send_string(self, buffer, (int)length); + + if (res < 0) { + mp_SetError(PyExc_IOError, res); + goto failure; + } + + Py_XDECREF(pickled_string); + Py_RETURN_NONE; + + failure: + Py_XDECREF(pickled_string); + return NULL; +} + +static PyObject * +connection_recv_obj(ConnectionObject *self) +{ + char *freeme = NULL; + Py_ssize_t res; + PyObject *temp = NULL, *result = NULL; + + CHECK_READABLE(self); + + res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE, + &freeme, PY_SSIZE_T_MAX); + + if (res < 0) { + if (res == MP_BAD_MESSAGE_LENGTH) { + if ((self->flags & WRITABLE) == 0) { + Py_BEGIN_ALLOW_THREADS + CLOSE(self->handle); + Py_END_ALLOW_THREADS + self->handle = INVALID_HANDLE_VALUE; + } else { + self->flags = WRITABLE; + } + } + mp_SetError(PyExc_IOError, res); + } else { + if (freeme == NULL) { + temp = PyString_FromStringAndSize(self->buffer, res); + } else { + temp = PyString_FromStringAndSize(freeme, res); + PyMem_Free(freeme); + } + } + + if (temp) + result = PyObject_CallFunctionObjArgs(pickle_loads, + temp, NULL); + Py_XDECREF(temp); + return result; +} + +/* + * Other functions + */ + +static PyObject * +connection_poll(ConnectionObject *self, PyObject *args) +{ + PyObject *timeout_obj = NULL; + double timeout = 0.0; + int res; + + CHECK_READABLE(self); + + if (!PyArg_ParseTuple(args, "|O", &timeout_obj)) + return NULL; + + if (timeout_obj == NULL) { + timeout = 0.0; + } else if (timeout_obj == Py_None) { + timeout = -1.0; /* block forever */ + } else { + timeout = PyFloat_AsDouble(timeout_obj); + if (PyErr_Occurred()) + return NULL; + if (timeout < 0.0) + timeout = 0.0; + } + + Py_BEGIN_ALLOW_THREADS + res = conn_poll(self, timeout, _save); + Py_END_ALLOW_THREADS + + switch (res) { + case TRUE: + Py_RETURN_TRUE; + case FALSE: + Py_RETURN_FALSE; + default: + return mp_SetError(PyExc_IOError, res); + } +} + +static PyObject * +connection_fileno(ConnectionObject* self) +{ + if (self->handle == INVALID_HANDLE_VALUE) { + PyErr_SetString(PyExc_IOError, "handle is invalid"); + return NULL; + } + return PyInt_FromLong((long)self->handle); +} + +static PyObject * +connection_close(ConnectionObject *self) +{ + if (self->handle != INVALID_HANDLE_VALUE) { + Py_BEGIN_ALLOW_THREADS + CLOSE(self->handle); + Py_END_ALLOW_THREADS + self->handle = INVALID_HANDLE_VALUE; + } + + Py_RETURN_NONE; +} + +static PyObject * +connection_repr(ConnectionObject *self) +{ + static char *conn_type[] = {"read-only", "write-only", "read-write"}; + + assert(self->flags >= 1 && self->flags <= 3); + return FROM_FORMAT("<%s %s, handle %zd>", + conn_type[self->flags - 1], + CONNECTION_NAME, (Py_ssize_t)self->handle); +} + +/* + * Getters and setters + */ + +static PyObject * +connection_closed(ConnectionObject *self, void *closure) +{ + return PyBool_FromLong((long)(self->handle == INVALID_HANDLE_VALUE)); +} + +static PyObject * +connection_readable(ConnectionObject *self, void *closure) +{ + return PyBool_FromLong((long)(self->flags & READABLE)); +} + +static PyObject * +connection_writable(ConnectionObject *self, void *closure) +{ + return PyBool_FromLong((long)(self->flags & WRITABLE)); +} + +/* + * Tables + */ + +static PyMethodDef connection_methods[] = { + {"send_bytes", (PyCFunction)connection_sendbytes, METH_VARARGS, + "send the byte data from a readable buffer-like object"}, + {"recv_bytes", (PyCFunction)connection_recvbytes, METH_VARARGS, + "receive byte data as a string"}, + {"recv_bytes_into",(PyCFunction)connection_recvbytes_into,METH_VARARGS, + "receive byte data into a writeable buffer-like object\n" + "returns the number of bytes read"}, + + {"send", (PyCFunction)connection_send_obj, METH_O, + "send a (picklable) object"}, + {"recv", (PyCFunction)connection_recv_obj, METH_NOARGS, + "receive a (picklable) object"}, + + {"poll", (PyCFunction)connection_poll, METH_VARARGS, + "whether there is any input available to be read"}, + {"fileno", (PyCFunction)connection_fileno, METH_NOARGS, + "file descriptor or handle of the connection"}, + {"close", (PyCFunction)connection_close, METH_NOARGS, + "close the connection"}, + + {NULL} /* Sentinel */ +}; + +static PyGetSetDef connection_getset[] = { + {"closed", (getter)connection_closed, NULL, + "True if the connection is closed", NULL}, + {"readable", (getter)connection_readable, NULL, + "True if the connection is readable", NULL}, + {"writable", (getter)connection_writable, NULL, + "True if the connection is writable", NULL}, + {NULL} +}; + +/* + * Connection type + */ + +PyDoc_STRVAR(connection_doc, + "Connection type whose constructor signature is\n\n" + " Connection(handle, readable=True, writable=True).\n\n" + "The constructor does *not* duplicate the handle."); + +PyTypeObject CONNECTION_TYPE = { + PyVarObject_HEAD_INIT(NULL, 0) + /* tp_name */ "_multiprocessing." CONNECTION_NAME, + /* tp_basicsize */ sizeof(ConnectionObject), + /* tp_itemsize */ 0, + /* tp_dealloc */ (destructor)connection_dealloc, + /* tp_print */ 0, + /* tp_getattr */ 0, + /* tp_setattr */ 0, + /* tp_compare */ 0, + /* tp_repr */ (reprfunc)connection_repr, + /* tp_as_number */ 0, + /* tp_as_sequence */ 0, + /* tp_as_mapping */ 0, + /* tp_hash */ 0, + /* tp_call */ 0, + /* tp_str */ 0, + /* tp_getattro */ 0, + /* tp_setattro */ 0, + /* tp_as_buffer */ 0, + /* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | + Py_TPFLAGS_HAVE_WEAKREFS, + /* tp_doc */ connection_doc, + /* tp_traverse */ 0, + /* tp_clear */ 0, + /* tp_richcompare */ 0, + /* tp_weaklistoffset */ offsetof(ConnectionObject, weakreflist), + /* tp_iter */ 0, + /* tp_iternext */ 0, + /* tp_methods */ connection_methods, + /* tp_members */ 0, + /* tp_getset */ connection_getset, + /* tp_base */ 0, + /* tp_dict */ 0, + /* tp_descr_get */ 0, + /* tp_descr_set */ 0, + /* tp_dictoffset */ 0, + /* tp_init */ 0, + /* tp_alloc */ 0, + /* tp_new */ connection_new, +}; + +#endif /* CONNECTION_H */ diff --git a/Modules/_multiprocessing/multiprocessing.c b/Modules/_multiprocessing/multiprocessing.c index 806e638..eecace8 100644 --- a/Modules/_multiprocessing/multiprocessing.c +++ b/Modules/_multiprocessing/multiprocessing.c @@ -3,30 +3,39 @@ * * multiprocessing.c * - * Copyright (c) 2006-2008, R Oudkerk - * Licensed to PSF under a Contributor Agreement. + * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt */ #include "multiprocessing.h" +#if (defined(CMSG_LEN) && defined(SCM_RIGHTS)) + #define HAVE_FD_TRANSFER 1 +#else + #define HAVE_FD_TRANSFER 0 +#endif + +PyObject *create_win32_namespace(void); + +PyObject *pickle_dumps, *pickle_loads, *pickle_protocol; +PyObject *ProcessError, *BufferTooShort; /* * Function which raises exceptions based on error codes */ PyObject * -_PyMp_SetError(PyObject *Type, int num) +mp_SetError(PyObject *Type, int num) { switch (num) { #ifdef MS_WINDOWS case MP_STANDARD_ERROR: if (Type == NULL) - Type = PyExc_OSError; + Type = PyExc_WindowsError; PyErr_SetExcFromWindowsErr(Type, 0); break; case MP_SOCKET_ERROR: if (Type == NULL) - Type = PyExc_OSError; + Type = PyExc_WindowsError; PyErr_SetExcFromWindowsErr(Type, WSAGetLastError()); break; #else /* !MS_WINDOWS */ @@ -40,6 +49,16 @@ _PyMp_SetError(PyObject *Type, int num) case MP_MEMORY_ERROR: PyErr_NoMemory(); break; + case MP_END_OF_FILE: + PyErr_SetNone(PyExc_EOFError); + break; + case MP_EARLY_END_OF_FILE: + PyErr_SetString(PyExc_IOError, + "got end of file during message"); + break; + case MP_BAD_MESSAGE_LENGTH: + PyErr_SetString(PyExc_IOError, "bad message length"); + break; case MP_EXCEPTION_HAS_BEEN_SET: break; default: @@ -49,87 +68,170 @@ _PyMp_SetError(PyObject *Type, int num) return NULL; } + +/* + * Windows only + */ + #ifdef MS_WINDOWS -static PyObject * -multiprocessing_closesocket(PyObject *self, PyObject *args) + +/* On Windows we set an event to signal Ctrl-C; compare with timemodule.c */ + +HANDLE sigint_event = NULL; + +static BOOL WINAPI +ProcessingCtrlHandler(DWORD dwCtrlType) { - HANDLE handle; - int ret; + SetEvent(sigint_event); + return FALSE; +} + +/* + * Unix only + */ + +#else /* !MS_WINDOWS */ + +#if HAVE_FD_TRANSFER - if (!PyArg_ParseTuple(args, F_HANDLE ":closesocket" , &handle)) +/* Functions for transferring file descriptors between processes. + Reimplements some of the functionality of the fdcred + module at http://www.mca-ltd.com/resources/fdcred_1.tgz. */ +/* Based in http://resin.csoft.net/cgi-bin/man.cgi?section=3&topic=CMSG_DATA */ + +static PyObject * +multiprocessing_sendfd(PyObject *self, PyObject *args) +{ + int conn, fd, res; + struct iovec dummy_iov; + char dummy_char; + struct msghdr msg; + struct cmsghdr *cmsg; + union { + struct cmsghdr hdr; + unsigned char buf[CMSG_SPACE(sizeof(int))]; + } cmsgbuf; + + if (!PyArg_ParseTuple(args, "ii", &conn, &fd)) return NULL; + dummy_iov.iov_base = &dummy_char; + dummy_iov.iov_len = 1; + + memset(&msg, 0, sizeof(msg)); + msg.msg_control = &cmsgbuf.buf; + msg.msg_controllen = sizeof(cmsgbuf.buf); + msg.msg_iov = &dummy_iov; + msg.msg_iovlen = 1; + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + * (int *) CMSG_DATA(cmsg) = fd; + Py_BEGIN_ALLOW_THREADS - ret = closesocket((SOCKET) handle); + res = sendmsg(conn, &msg, 0); Py_END_ALLOW_THREADS - if (ret) - return PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError()); + if (res < 0) + return PyErr_SetFromErrno(PyExc_OSError); Py_RETURN_NONE; } static PyObject * -multiprocessing_recv(PyObject *self, PyObject *args) +multiprocessing_recvfd(PyObject *self, PyObject *args) { - HANDLE handle; - int size, nread; - PyObject *buf; - - if (!PyArg_ParseTuple(args, F_HANDLE "i:recv" , &handle, &size)) + int conn, fd, res; + char dummy_char; + struct iovec dummy_iov; + struct msghdr msg = {0}; + struct cmsghdr *cmsg; + union { + struct cmsghdr hdr; + unsigned char buf[CMSG_SPACE(sizeof(int))]; + } cmsgbuf; + + if (!PyArg_ParseTuple(args, "i", &conn)) return NULL; - buf = PyBytes_FromStringAndSize(NULL, size); - if (!buf) - return NULL; + dummy_iov.iov_base = &dummy_char; + dummy_iov.iov_len = 1; + + memset(&msg, 0, sizeof(msg)); + msg.msg_control = &cmsgbuf.buf; + msg.msg_controllen = sizeof(cmsgbuf.buf); + msg.msg_iov = &dummy_iov; + msg.msg_iovlen = 1; + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_SPACE(sizeof(int)); + msg.msg_controllen = cmsg->cmsg_len; Py_BEGIN_ALLOW_THREADS - nread = recv((SOCKET) handle, PyBytes_AS_STRING(buf), size, 0); + res = recvmsg(conn, &msg, 0); Py_END_ALLOW_THREADS - if (nread < 0) { - Py_DECREF(buf); - return PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError()); + if (res < 0) + return PyErr_SetFromErrno(PyExc_OSError); + + if (msg.msg_controllen < CMSG_LEN(sizeof(int)) || + (cmsg = CMSG_FIRSTHDR(&msg)) == NULL || + cmsg->cmsg_level != SOL_SOCKET || + cmsg->cmsg_type != SCM_RIGHTS || + cmsg->cmsg_len < CMSG_LEN(sizeof(int))) { + /* If at least one control message is present, there should be + no room for any further data in the buffer. */ + PyErr_SetString(PyExc_RuntimeError, "No file descriptor received"); + return NULL; } - _PyBytes_Resize(&buf, nread); - return buf; + + fd = * (int *) CMSG_DATA(cmsg); + return Py_BuildValue("i", fd); } -static PyObject * -multiprocessing_send(PyObject *self, PyObject *args) -{ - HANDLE handle; - Py_buffer buf; - int ret, length; +#endif /* HAVE_FD_TRANSFER */ - if (!PyArg_ParseTuple(args, F_HANDLE "y*:send" , &handle, &buf)) - return NULL; +#endif /* !MS_WINDOWS */ - length = (int)Py_MIN(buf.len, INT_MAX); - Py_BEGIN_ALLOW_THREADS - ret = send((SOCKET) handle, buf.buf, length, 0); - Py_END_ALLOW_THREADS +/* + * All platforms + */ + +static PyObject* +multiprocessing_address_of_buffer(PyObject *self, PyObject *obj) +{ + void *buffer; + Py_ssize_t buffer_len; - PyBuffer_Release(&buf); - if (ret < 0) - return PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError()); - return PyLong_FromLong(ret); + if (PyObject_AsWriteBuffer(obj, &buffer, &buffer_len) < 0) + return NULL; + + return Py_BuildValue("N" F_PY_SSIZE_T, + PyLong_FromVoidPtr(buffer), buffer_len); } -#endif /* * Function table */ static PyMethodDef module_methods[] = { -#ifdef MS_WINDOWS - {"closesocket", multiprocessing_closesocket, METH_VARARGS, ""}, - {"recv", multiprocessing_recv, METH_VARARGS, ""}, - {"send", multiprocessing_send, METH_VARARGS, ""}, -#endif -#if !defined(POSIX_SEMAPHORES_NOT_ENABLED) && !defined(__ANDROID__) - {"sem_unlink", _PyMp_sem_unlink, METH_VARARGS, ""}, + {"address_of_buffer", multiprocessing_address_of_buffer, METH_O, + "address_of_buffer(obj) -> int\n" + "Return address of obj assuming obj supports buffer inteface"}, +#if HAVE_FD_TRANSFER + {"sendfd", multiprocessing_sendfd, METH_VARARGS, + "sendfd(sockfd, fd) -> None\n" + "Send file descriptor given by fd over the unix domain socket\n" + "whose file decriptor is sockfd"}, + {"recvfd", multiprocessing_recvfd, METH_VARARGS, + "recvfd(sockfd) -> fd\n" + "Receive a file descriptor over a unix domain socket\n" + "whose file decriptor is sockfd"}, #endif {NULL} }; @@ -139,64 +241,95 @@ static PyMethodDef module_methods[] = { * Initialize */ -static struct PyModuleDef multiprocessing_module = { - PyModuleDef_HEAD_INIT, - "_multiprocessing", - NULL, - -1, - module_methods, - NULL, - NULL, - NULL, - NULL -}; - - PyMODINIT_FUNC -PyInit__multiprocessing(void) +init_multiprocessing(void) { - PyObject *module, *temp, *value = NULL; + PyObject *module, *temp, *value; /* Initialize module */ - module = PyModule_Create(&multiprocessing_module); + module = Py_InitModule("_multiprocessing", module_methods); if (!module) - return NULL; + return; + + /* Get copy of objects from pickle */ + temp = PyImport_ImportModule(PICKLE_MODULE); + if (!temp) + return; + pickle_dumps = PyObject_GetAttrString(temp, "dumps"); + pickle_loads = PyObject_GetAttrString(temp, "loads"); + pickle_protocol = PyObject_GetAttrString(temp, "HIGHEST_PROTOCOL"); + Py_XDECREF(temp); + + /* Get copy of BufferTooShort */ + temp = PyImport_ImportModule("multiprocessing"); + if (!temp) + return; + BufferTooShort = PyObject_GetAttrString(temp, "BufferTooShort"); + Py_XDECREF(temp); + + /* Add connection type to module */ + if (PyType_Ready(&ConnectionType) < 0) + return; + Py_INCREF(&ConnectionType); + PyModule_AddObject(module, "Connection", (PyObject*)&ConnectionType); #if defined(MS_WINDOWS) || \ (defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED)) - /* Add _PyMp_SemLock type to module */ - if (PyType_Ready(&_PyMp_SemLockType) < 0) - return NULL; - Py_INCREF(&_PyMp_SemLockType); + /* Add SemLock type to module */ + if (PyType_Ready(&SemLockType) < 0) + return; + Py_INCREF(&SemLockType); { PyObject *py_sem_value_max; /* Some systems define SEM_VALUE_MAX as an unsigned value that - * causes it to be negative when used as an int (NetBSD). - * - * Issue #28152: Use (0) instead of 0 to fix a warning on dead code - * when using clang -Wunreachable-code. */ - if ((int)(SEM_VALUE_MAX) < (0)) + * causes it to be negative when used as an int (NetBSD). */ + if ((int)(SEM_VALUE_MAX) < 0) py_sem_value_max = PyLong_FromLong(INT_MAX); else py_sem_value_max = PyLong_FromLong(SEM_VALUE_MAX); if (py_sem_value_max == NULL) - return NULL; - PyDict_SetItemString(_PyMp_SemLockType.tp_dict, "SEM_VALUE_MAX", + return; + PyDict_SetItemString(SemLockType.tp_dict, "SEM_VALUE_MAX", py_sem_value_max); } - PyModule_AddObject(module, "SemLock", (PyObject*)&_PyMp_SemLockType); + PyModule_AddObject(module, "SemLock", (PyObject*)&SemLockType); +#endif + +#ifdef MS_WINDOWS + /* Add PipeConnection to module */ + if (PyType_Ready(&PipeConnectionType) < 0) + return; + Py_INCREF(&PipeConnectionType); + PyModule_AddObject(module, "PipeConnection", + (PyObject*)&PipeConnectionType); + + /* Initialize win32 class and add to multiprocessing */ + temp = create_win32_namespace(); + if (!temp) + return; + PyModule_AddObject(module, "win32", temp); + + /* Initialize the event handle used to signal Ctrl-C */ + sigint_event = CreateEvent(NULL, TRUE, FALSE, NULL); + if (!sigint_event) { + PyErr_SetFromWindowsErr(0); + return; + } + if (!SetConsoleCtrlHandler(ProcessingCtrlHandler, TRUE)) { + PyErr_SetFromWindowsErr(0); + return; + } #endif /* Add configuration macros */ temp = PyDict_New(); if (!temp) - return NULL; - + return; #define ADD_FLAG(name) \ value = Py_BuildValue("i", name); \ - if (value == NULL) { Py_DECREF(temp); return NULL; } \ + if (value == NULL) { Py_DECREF(temp); return; } \ if (PyDict_SetItemString(temp, #name, value) < 0) { \ - Py_DECREF(temp); Py_DECREF(value); return NULL; } \ + Py_DECREF(temp); Py_DECREF(value); return; } \ Py_DECREF(value) #if defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED) @@ -205,15 +338,15 @@ PyInit__multiprocessing(void) #ifdef HAVE_SEM_TIMEDWAIT ADD_FLAG(HAVE_SEM_TIMEDWAIT); #endif +#ifdef HAVE_FD_TRANSFER + ADD_FLAG(HAVE_FD_TRANSFER); +#endif #ifdef HAVE_BROKEN_SEM_GETVALUE ADD_FLAG(HAVE_BROKEN_SEM_GETVALUE); #endif #ifdef HAVE_BROKEN_SEM_UNLINK ADD_FLAG(HAVE_BROKEN_SEM_UNLINK); #endif - if (PyModule_AddObject(module, "flags", temp) < 0) - return NULL; - - return module; + return; } diff --git a/Modules/_multiprocessing/multiprocessing.h b/Modules/_multiprocessing/multiprocessing.h index fe78135..14425de 100644 --- a/Modules/_multiprocessing/multiprocessing.h +++ b/Modules/_multiprocessing/multiprocessing.h @@ -3,6 +3,12 @@ #define PY_SSIZE_T_CLEAN +#ifdef __sun +/* The control message API is only available on Solaris + if XPG 4.2 or later is requested. */ +#define _XOPEN_SOURCE 500 +#endif + #include "Python.h" #include "structmember.h" #include "pythread.h" @@ -23,10 +29,22 @@ # define SEM_VALUE_MAX LONG_MAX #else # include <fcntl.h> /* O_CREAT and O_EXCL */ +# include <netinet/in.h> +# include <sys/socket.h> +# include <sys/uio.h> +# include <arpa/inet.h> /* htonl() and ntohl() */ # if defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED) # include <semaphore.h> typedef sem_t *SEM_HANDLE; # endif +# define HANDLE int +# define SOCKET int +# define BOOL int +# define UINT32 uint32_t +# define INT32 int32_t +# define TRUE 1 +# define FALSE 0 +# define INVALID_HANDLE_VALUE (-1) #endif /* @@ -46,13 +64,27 @@ /* + * Make sure Py_ssize_t available + */ + +#if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN) + typedef int Py_ssize_t; +# define PY_SSIZE_T_MAX INT_MAX +# define PY_SSIZE_T_MIN INT_MIN +# define F_PY_SSIZE_T "i" +# define PyInt_FromSsize_t(n) PyInt_FromLong((long)n) +#else +# define F_PY_SSIZE_T "n" +#endif + +/* * Format codes */ #if SIZEOF_VOID_P == SIZEOF_LONG # define F_POINTER "k" # define T_POINTER T_ULONG -#elif SIZEOF_VOID_P == SIZEOF_LONG_LONG +#elif defined(HAVE_LONG_LONG) && (SIZEOF_VOID_P == SIZEOF_LONG_LONG) # define F_POINTER "K" # define T_POINTER T_ULONGLONG #else @@ -64,6 +96,8 @@ # define T_HANDLE T_POINTER # define F_SEM_HANDLE F_HANDLE # define T_SEM_HANDLE T_HANDLE +# define F_DWORD "k" +# define T_DWORD T_ULONG #else # define F_HANDLE "i" # define T_HANDLE T_INT @@ -71,6 +105,12 @@ # define T_SEM_HANDLE T_POINTER #endif +#if PY_VERSION_HEX >= 0x03000000 +# define F_RBUFFER "y" +#else +# define F_RBUFFER "s" +#endif + /* * Error codes which can be returned by functions called without GIL */ @@ -78,16 +118,72 @@ #define MP_SUCCESS (0) #define MP_STANDARD_ERROR (-1) #define MP_MEMORY_ERROR (-1001) -#define MP_SOCKET_ERROR (-1002) -#define MP_EXCEPTION_HAS_BEEN_SET (-1003) +#define MP_END_OF_FILE (-1002) +#define MP_EARLY_END_OF_FILE (-1003) +#define MP_BAD_MESSAGE_LENGTH (-1004) +#define MP_SOCKET_ERROR (-1005) +#define MP_EXCEPTION_HAS_BEEN_SET (-1006) -PyObject *_PyMp_SetError(PyObject *Type, int num); +PyObject *mp_SetError(PyObject *Type, int num); /* * Externs - not all will really exist on all platforms */ -extern PyTypeObject _PyMp_SemLockType; -extern PyObject *_PyMp_sem_unlink(PyObject *ignore, PyObject *args); +extern PyObject *pickle_dumps; +extern PyObject *pickle_loads; +extern PyObject *pickle_protocol; +extern PyObject *BufferTooShort; +extern PyTypeObject SemLockType; +extern PyTypeObject ConnectionType; +extern PyTypeObject PipeConnectionType; +extern HANDLE sigint_event; + +/* + * Py3k compatibility + */ + +#if PY_VERSION_HEX >= 0x03000000 +# define PICKLE_MODULE "pickle" +# define FROM_FORMAT PyUnicode_FromFormat +# define PyInt_FromLong PyLong_FromLong +# define PyInt_FromSsize_t PyLong_FromSsize_t +#else +# define PICKLE_MODULE "cPickle" +# define FROM_FORMAT PyString_FromFormat +#endif + +#ifndef PyVarObject_HEAD_INIT +# define PyVarObject_HEAD_INIT(type, size) PyObject_HEAD_INIT(type) size, +#endif + +#ifndef Py_TPFLAGS_HAVE_WEAKREFS +# define Py_TPFLAGS_HAVE_WEAKREFS 0 +#endif + +/* + * Connection definition + */ + +#define CONNECTION_BUFFER_SIZE 1024 + +typedef struct { + PyObject_HEAD + HANDLE handle; + int flags; + PyObject *weakreflist; + char buffer[CONNECTION_BUFFER_SIZE]; +} ConnectionObject; + +/* + * Miscellaneous + */ + +#define MAX_MESSAGE_LENGTH 0x7fffffff + +#ifndef MIN +# define MIN(x, y) ((x) < (y) ? x : y) +# define MAX(x, y) ((x) > (y) ? x : y) +#endif #endif /* MULTIPROCESSING_H */ diff --git a/Modules/_multiprocessing/pipe_connection.c b/Modules/_multiprocessing/pipe_connection.c new file mode 100644 index 0000000..05dde0c --- /dev/null +++ b/Modules/_multiprocessing/pipe_connection.c @@ -0,0 +1,149 @@ +/* + * A type which wraps a pipe handle in message oriented mode + * + * pipe_connection.c + * + * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt + */ + +#include "multiprocessing.h" + +#define CLOSE(h) CloseHandle(h) + +/* + * Send string to the pipe; assumes in message oriented mode + */ + +static Py_ssize_t +conn_send_string(ConnectionObject *conn, char *string, size_t length) +{ + DWORD amount_written; + BOOL ret; + + Py_BEGIN_ALLOW_THREADS + ret = WriteFile(conn->handle, string, length, &amount_written, NULL); + Py_END_ALLOW_THREADS + + if (ret == 0 && GetLastError() == ERROR_NO_SYSTEM_RESOURCES) { + PyErr_Format(PyExc_ValueError, "Cannnot send %" PY_FORMAT_SIZE_T "d bytes over connection", length); + return MP_STANDARD_ERROR; + } + + return ret ? MP_SUCCESS : MP_STANDARD_ERROR; +} + +/* + * Attempts to read into buffer, or if buffer too small into *newbuffer. + * + * Returns number of bytes read. Assumes in message oriented mode. + */ + +static Py_ssize_t +conn_recv_string(ConnectionObject *conn, char *buffer, + size_t buflength, char **newbuffer, size_t maxlength) +{ + DWORD left, length, full_length, err; + BOOL ret; + *newbuffer = NULL; + + Py_BEGIN_ALLOW_THREADS + ret = ReadFile(conn->handle, buffer, MIN(buflength, maxlength), + &length, NULL); + Py_END_ALLOW_THREADS + if (ret) + return length; + + err = GetLastError(); + if (err != ERROR_MORE_DATA) { + if (err == ERROR_BROKEN_PIPE) + return MP_END_OF_FILE; + return MP_STANDARD_ERROR; + } + + if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left)) + return MP_STANDARD_ERROR; + + full_length = length + left; + if (full_length > maxlength) + return MP_BAD_MESSAGE_LENGTH; + + *newbuffer = PyMem_Malloc(full_length); + if (*newbuffer == NULL) + return MP_MEMORY_ERROR; + + memcpy(*newbuffer, buffer, length); + + Py_BEGIN_ALLOW_THREADS + ret = ReadFile(conn->handle, *newbuffer+length, left, &length, NULL); + Py_END_ALLOW_THREADS + if (ret) { + assert(length == left); + return full_length; + } else { + PyMem_Free(*newbuffer); + return MP_STANDARD_ERROR; + } +} + +/* + * Check whether any data is available for reading + */ + +static int +conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save) +{ + DWORD bytes, deadline, delay; + int difference, res; + BOOL block = FALSE; + + if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL)) + return MP_STANDARD_ERROR; + + if (timeout == 0.0) + return bytes > 0; + + if (timeout < 0.0) + block = TRUE; + else + /* XXX does not check for overflow */ + deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5); + + Sleep(0); + + for (delay = 1 ; ; delay += 1) { + if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL)) + return MP_STANDARD_ERROR; + else if (bytes > 0) + return TRUE; + + if (!block) { + difference = deadline - GetTickCount(); + if (difference < 0) + return FALSE; + if ((int)delay > difference) + delay = difference; + } + + if (delay > 20) + delay = 20; + + Sleep(delay); + + /* check for signals */ + Py_BLOCK_THREADS + res = PyErr_CheckSignals(); + Py_UNBLOCK_THREADS + + if (res) + return MP_EXCEPTION_HAS_BEEN_SET; + } +} + +/* + * "connection.h" defines the PipeConnection type using the definitions above + */ + +#define CONNECTION_NAME "PipeConnection" +#define CONNECTION_TYPE PipeConnectionType + +#include "connection.h" diff --git a/Modules/_multiprocessing/posixshmem.c b/Modules/_multiprocessing/posixshmem.c deleted file mode 100644 index 2049dbb..0000000 --- a/Modules/_multiprocessing/posixshmem.c +++ /dev/null @@ -1,131 +0,0 @@ -/* -posixshmem - A Python extension that provides shm_open() and shm_unlink() -*/ - -#define PY_SSIZE_T_CLEAN - -#include <Python.h> -#include "structmember.h" - -// for shm_open() and shm_unlink() -#ifdef HAVE_SYS_MMAN_H -#include <sys/mman.h> -#endif - -/*[clinic input] -module _posixshmem -[clinic start generated code]*/ -/*[clinic end generated code: output=da39a3ee5e6b4b0d input=a416734e49164bf8]*/ - -/* - * - * Module-level functions & meta stuff - * - */ - -#ifdef HAVE_SHM_OPEN -/*[clinic input] -_posixshmem.shm_open -> int - path: unicode - flags: int - mode: int = 0o777 - -# "shm_open(path, flags, mode=0o777)\n\n\ - -Open a shared memory object. Returns a file descriptor (integer). - -[clinic start generated code]*/ - -static int -_posixshmem_shm_open_impl(PyObject *module, PyObject *path, int flags, - int mode) -/*[clinic end generated code: output=8d110171a4fa20df input=e83b58fa802fac25]*/ -{ - int fd; - int async_err = 0; - const char *name = PyUnicode_AsUTF8(path); - if (name == NULL) { - return -1; - } - do { - Py_BEGIN_ALLOW_THREADS - fd = shm_open(name, flags, mode); - Py_END_ALLOW_THREADS - } while (fd < 0 && errno == EINTR && !(async_err = PyErr_CheckSignals())); - - if (fd < 0) { - if (!async_err) - PyErr_SetFromErrnoWithFilenameObject(PyExc_OSError, path); - return -1; - } - - return fd; -} -#endif /* HAVE_SHM_OPEN */ - -#ifdef HAVE_SHM_UNLINK -/*[clinic input] -_posixshmem.shm_unlink - path: unicode - -Remove a shared memory object (similar to unlink()). - -Remove a shared memory object name, and, once all processes have unmapped -the object, de-allocates and destroys the contents of the associated memory -region. - -[clinic start generated code]*/ - -static PyObject * -_posixshmem_shm_unlink_impl(PyObject *module, PyObject *path) -/*[clinic end generated code: output=42f8b23d134b9ff5 input=8dc0f87143e3b300]*/ -{ - int rv; - int async_err = 0; - const char *name = PyUnicode_AsUTF8(path); - if (name == NULL) { - return NULL; - } - do { - Py_BEGIN_ALLOW_THREADS - rv = shm_unlink(name); - Py_END_ALLOW_THREADS - } while (rv < 0 && errno == EINTR && !(async_err = PyErr_CheckSignals())); - - if (rv < 0) { - if (!async_err) - PyErr_SetFromErrnoWithFilenameObject(PyExc_OSError, path); - return NULL; - } - - Py_RETURN_NONE; -} -#endif /* HAVE_SHM_UNLINK */ - -#include "clinic/posixshmem.c.h" - -static PyMethodDef module_methods[ ] = { - _POSIXSHMEM_SHM_OPEN_METHODDEF - _POSIXSHMEM_SHM_UNLINK_METHODDEF - {NULL} /* Sentinel */ -}; - - -static struct PyModuleDef this_module = { - PyModuleDef_HEAD_INIT, // m_base - "_posixshmem", // m_name - "POSIX shared memory module", // m_doc - -1, // m_size (space allocated for module globals) - module_methods, // m_methods -}; - -/* Module init function */ -PyMODINIT_FUNC -PyInit__posixshmem(void) { - PyObject *module; - module = PyModule_Create(&this_module); - if (!module) { - return NULL; - } - return module; -} diff --git a/Modules/_multiprocessing/semaphore.c b/Modules/_multiprocessing/semaphore.c index 4be2dea..fc42754 100644 --- a/Modules/_multiprocessing/semaphore.c +++ b/Modules/_multiprocessing/semaphore.c @@ -3,8 +3,7 @@ * * semaphore.c * - * Copyright (c) 2006-2008, R Oudkerk - * Licensed to PSF under a Contributor Agreement. + * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt */ #include "multiprocessing.h" @@ -14,11 +13,10 @@ enum { RECURSIVE_MUTEX, SEMAPHORE }; typedef struct { PyObject_HEAD SEM_HANDLE handle; - unsigned long last_tid; + long last_tid; int count; int maxvalue; int kind; - char *name; } SemLockObject; #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid) @@ -44,7 +42,7 @@ _GetSemaphoreValue(HANDLE handle, long *value) { long previous; - switch (WaitForSingleObjectEx(handle, 0, FALSE)) { + switch (WaitForSingleObject(handle, 0)) { case WAIT_OBJECT_0: if (!ReleaseSemaphore(handle, 1, &previous)) return MP_STANDARD_ERROR; @@ -64,8 +62,7 @@ semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds) int blocking = 1; double timeout; PyObject *timeout_obj = Py_None; - DWORD res, full_msecs, nhandles; - HANDLE handles[2], sigint_event; + DWORD res, full_msecs, msecs, start, ticks; static char *kwlist[] = {"block", "timeout", NULL}; @@ -99,49 +96,59 @@ semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds) Py_RETURN_TRUE; } - /* check whether we can acquire without releasing the GIL and blocking */ - if (WaitForSingleObjectEx(self->handle, 0, FALSE) == WAIT_OBJECT_0) { + /* check whether we can acquire without blocking */ + if (WaitForSingleObject(self->handle, 0) == WAIT_OBJECT_0) { self->last_tid = GetCurrentThreadId(); ++self->count; Py_RETURN_TRUE; } - /* prepare list of handles */ - nhandles = 0; - handles[nhandles++] = self->handle; - if (_PyOS_IsMainThread()) { - sigint_event = _PyOS_SigintEvent(); - assert(sigint_event != NULL); - handles[nhandles++] = sigint_event; - } - else { - sigint_event = NULL; - } + msecs = full_msecs; + start = GetTickCount(); + + for ( ; ; ) { + HANDLE handles[2] = {self->handle, sigint_event}; - /* do the wait */ - Py_BEGIN_ALLOW_THREADS - if (sigint_event != NULL) + /* do the wait */ + Py_BEGIN_ALLOW_THREADS ResetEvent(sigint_event); - res = WaitForMultipleObjectsEx(nhandles, handles, FALSE, full_msecs, FALSE); - Py_END_ALLOW_THREADS + res = WaitForMultipleObjects(2, handles, FALSE, msecs); + Py_END_ALLOW_THREADS + + /* handle result */ + if (res != WAIT_OBJECT_0 + 1) + break; + + /* got SIGINT so give signal handler a chance to run */ + Sleep(1); + + /* if this is main thread let KeyboardInterrupt be raised */ + if (PyErr_CheckSignals()) + return NULL; + + /* recalculate timeout */ + if (msecs != INFINITE) { + ticks = GetTickCount(); + if ((DWORD)(ticks - start) >= full_msecs) + Py_RETURN_FALSE; + msecs = full_msecs - (ticks - start); + } + } /* handle result */ switch (res) { case WAIT_TIMEOUT: Py_RETURN_FALSE; - case WAIT_OBJECT_0 + 0: + case WAIT_OBJECT_0: self->last_tid = GetCurrentThreadId(); ++self->count; Py_RETURN_TRUE; - case WAIT_OBJECT_0 + 1: - errno = EINTR; - return PyErr_SetFromErrno(PyExc_OSError); case WAIT_FAILED: return PyErr_SetFromWindowsErr(0); default: PyErr_Format(PyExc_RuntimeError, "WaitForSingleObject() or " "WaitForMultipleObjects() gave unrecognized " - "value %u", res); + "value %d", res); return NULL; } } @@ -204,7 +211,7 @@ semlock_release(SemLockObject *self, PyObject *args) #ifndef HAVE_SEM_TIMEDWAIT # define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save) -static int +int sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save) { int res; @@ -267,7 +274,7 @@ sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save) static PyObject * semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds) { - int blocking = 1, res, err = 0; + int blocking = 1, res; double timeout; PyObject *timeout_obj = Py_None; struct timespec deadline = {0}; @@ -304,32 +311,20 @@ semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds) deadline.tv_nsec %= 1000000000; } - /* Check whether we can acquire without releasing the GIL and blocking */ do { - res = sem_trywait(self->handle); - err = errno; + Py_BEGIN_ALLOW_THREADS + if (blocking && timeout_obj == Py_None) + res = sem_wait(self->handle); + else if (!blocking) + res = sem_trywait(self->handle); + else + res = sem_timedwait(self->handle, &deadline); + Py_END_ALLOW_THREADS + if (res == MP_EXCEPTION_HAS_BEEN_SET) + break; } while (res < 0 && errno == EINTR && !PyErr_CheckSignals()); - errno = err; - - if (res < 0 && errno == EAGAIN && blocking) { - /* Couldn't acquire immediately, need to block */ - do { - Py_BEGIN_ALLOW_THREADS - if (timeout_obj == Py_None) { - res = sem_wait(self->handle); - } - else { - res = sem_timedwait(self->handle, &deadline); - } - Py_END_ALLOW_THREADS - err = errno; - if (res == MP_EXCEPTION_HAS_BEEN_SET) - break; - } while (res < 0 && errno == EINTR && !PyErr_CheckSignals()); - } if (res < 0) { - errno = err; if (errno == EAGAIN || errno == ETIMEDOUT) Py_RETURN_FALSE; else if (errno == EINTR) @@ -411,8 +406,7 @@ semlock_release(SemLockObject *self, PyObject *args) */ static PyObject * -newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue, - char *name) +newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue) { SemLockObject *self; @@ -424,22 +418,21 @@ newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue, self->count = 0; self->last_tid = 0; self->maxvalue = maxvalue; - self->name = name; return (PyObject*)self; } static PyObject * semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds) { + char buffer[256]; SEM_HANDLE handle = SEM_FAILED; - int kind, maxvalue, value, unlink; + int kind, maxvalue, value; PyObject *result; - char *name, *name_copy = NULL; - static char *kwlist[] = {"kind", "value", "maxvalue", "name", "unlink", - NULL}; + static char *kwlist[] = {"kind", "value", "maxvalue", NULL}; + int try = 0; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "iiisi", kwlist, - &kind, &value, &maxvalue, &name, &unlink)) + if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii", kwlist, + &kind, &value, &maxvalue)) return NULL; if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) { @@ -447,24 +440,26 @@ semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds) return NULL; } - if (!unlink) { - name_copy = PyMem_Malloc(strlen(name) + 1); - if (name_copy == NULL) { - return PyErr_NoMemory(); - } - strcpy(name_copy, name); - } + /* Create a semaphore with a unique name. The bytes returned by + * _PyOS_URandom() are treated as unsigned long to ensure that the filename + * is valid (no special characters). */ + do { + unsigned long suffix; + _PyOS_URandom((char *)&suffix, sizeof(suffix)); + PyOS_snprintf(buffer, sizeof(buffer), "/mp%ld-%lu", (long)getpid(), + suffix); + SEM_CLEAR_ERROR(); + handle = SEM_CREATE(buffer, value, maxvalue); + } while ((handle == SEM_FAILED) && (errno == EEXIST) && (++try < 100)); - SEM_CLEAR_ERROR(); - handle = SEM_CREATE(name, value, maxvalue); /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */ if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0) goto failure; - if (unlink && SEM_UNLINK(name) < 0) + if (SEM_UNLINK(buffer) < 0) goto failure; - result = newsemlockobject(type, handle, kind, maxvalue, name_copy); + result = newsemlockobject(type, handle, kind, maxvalue); if (!result) goto failure; @@ -473,10 +468,7 @@ semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds) failure: if (handle != SEM_FAILED) SEM_CLOSE(handle); - PyMem_Free(name_copy); - if (!PyErr_Occurred()) { - _PyMp_SetError(NULL, MP_STANDARD_ERROR); - } + mp_SetError(NULL, MP_STANDARD_ERROR); return NULL; } @@ -485,30 +477,12 @@ semlock_rebuild(PyTypeObject *type, PyObject *args) { SEM_HANDLE handle; int kind, maxvalue; - char *name, *name_copy = NULL; - if (!PyArg_ParseTuple(args, F_SEM_HANDLE "iiz", - &handle, &kind, &maxvalue, &name)) + if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii", + &handle, &kind, &maxvalue)) return NULL; - if (name != NULL) { - name_copy = PyMem_Malloc(strlen(name) + 1); - if (name_copy == NULL) - return PyErr_NoMemory(); - strcpy(name_copy, name); - } - -#ifndef MS_WINDOWS - if (name != NULL) { - handle = sem_open(name, 0); - if (handle == SEM_FAILED) { - PyMem_Free(name_copy); - return PyErr_SetFromErrno(PyExc_OSError); - } - } -#endif - - return newsemlockobject(type, handle, kind, maxvalue, name_copy); + return newsemlockobject(type, handle, kind, maxvalue); } static void @@ -516,25 +490,24 @@ semlock_dealloc(SemLockObject* self) { if (self->handle != SEM_FAILED) SEM_CLOSE(self->handle); - PyMem_Free(self->name); PyObject_Del(self); } static PyObject * -semlock_count(SemLockObject *self, PyObject *Py_UNUSED(ignored)) +semlock_count(SemLockObject *self) { - return PyLong_FromLong((long)self->count); + return PyInt_FromLong((long)self->count); } static PyObject * -semlock_ismine(SemLockObject *self, PyObject *Py_UNUSED(ignored)) +semlock_ismine(SemLockObject *self) { /* only makes sense for a lock */ return PyBool_FromLong(ISMINE(self)); } static PyObject * -semlock_getvalue(SemLockObject *self, PyObject *Py_UNUSED(ignored)) +semlock_getvalue(SemLockObject *self) { #ifdef HAVE_BROKEN_SEM_GETVALUE PyErr_SetNone(PyExc_NotImplementedError); @@ -542,38 +515,38 @@ semlock_getvalue(SemLockObject *self, PyObject *Py_UNUSED(ignored)) #else int sval; if (SEM_GETVALUE(self->handle, &sval) < 0) - return _PyMp_SetError(NULL, MP_STANDARD_ERROR); + return mp_SetError(NULL, MP_STANDARD_ERROR); /* some posix implementations use negative numbers to indicate the number of waiting threads */ if (sval < 0) sval = 0; - return PyLong_FromLong((long)sval); + return PyInt_FromLong((long)sval); #endif } static PyObject * -semlock_iszero(SemLockObject *self, PyObject *Py_UNUSED(ignored)) +semlock_iszero(SemLockObject *self) { #ifdef HAVE_BROKEN_SEM_GETVALUE if (sem_trywait(self->handle) < 0) { if (errno == EAGAIN) Py_RETURN_TRUE; - return _PyMp_SetError(NULL, MP_STANDARD_ERROR); + return mp_SetError(NULL, MP_STANDARD_ERROR); } else { if (sem_post(self->handle) < 0) - return _PyMp_SetError(NULL, MP_STANDARD_ERROR); + return mp_SetError(NULL, MP_STANDARD_ERROR); Py_RETURN_FALSE; } #else int sval; if (SEM_GETVALUE(self->handle, &sval) < 0) - return _PyMp_SetError(NULL, MP_STANDARD_ERROR); + return mp_SetError(NULL, MP_STANDARD_ERROR); return PyBool_FromLong((long)sval == 0); #endif } static PyObject * -semlock_afterfork(SemLockObject *self, PyObject *Py_UNUSED(ignored)) +semlock_afterfork(SemLockObject *self) { self->count = 0; Py_RETURN_NONE; @@ -584,11 +557,11 @@ semlock_afterfork(SemLockObject *self, PyObject *Py_UNUSED(ignored)) */ static PyMethodDef semlock_methods[] = { - {"acquire", (PyCFunction)(void(*)(void))semlock_acquire, METH_VARARGS | METH_KEYWORDS, + {"acquire", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS, "acquire the semaphore/lock"}, {"release", (PyCFunction)semlock_release, METH_NOARGS, "release the semaphore/lock"}, - {"__enter__", (PyCFunction)(void(*)(void))semlock_acquire, METH_VARARGS | METH_KEYWORDS, + {"__enter__", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS, "enter the semaphore/lock"}, {"__exit__", (PyCFunction)semlock_release, METH_VARARGS, "exit the semaphore/lock"}, @@ -618,8 +591,6 @@ static PyMemberDef semlock_members[] = { ""}, {"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY, ""}, - {"name", T_STRING, offsetof(SemLockObject, name), READONLY, - ""}, {NULL} }; @@ -627,16 +598,16 @@ static PyMemberDef semlock_members[] = { * Semaphore type */ -PyTypeObject _PyMp_SemLockType = { +PyTypeObject SemLockType = { PyVarObject_HEAD_INIT(NULL, 0) /* tp_name */ "_multiprocessing.SemLock", /* tp_basicsize */ sizeof(SemLockObject), /* tp_itemsize */ 0, /* tp_dealloc */ (destructor)semlock_dealloc, - /* tp_vectorcall_offset */ 0, + /* tp_print */ 0, /* tp_getattr */ 0, /* tp_setattr */ 0, - /* tp_as_async */ 0, + /* tp_compare */ 0, /* tp_repr */ 0, /* tp_as_number */ 0, /* tp_as_sequence */ 0, @@ -667,23 +638,3 @@ PyTypeObject _PyMp_SemLockType = { /* tp_alloc */ 0, /* tp_new */ semlock_new, }; - -/* - * Function to unlink semaphore names - */ - -PyObject * -_PyMp_sem_unlink(PyObject *ignore, PyObject *args) -{ - char *name; - - if (!PyArg_ParseTuple(args, "s", &name)) - return NULL; - - if (SEM_UNLINK(name) < 0) { - _PyMp_SetError(NULL, MP_STANDARD_ERROR); - return NULL; - } - - Py_RETURN_NONE; -} diff --git a/Modules/_multiprocessing/socket_connection.c b/Modules/_multiprocessing/socket_connection.c new file mode 100644 index 0000000..bdb0a32 --- /dev/null +++ b/Modules/_multiprocessing/socket_connection.c @@ -0,0 +1,277 @@ +/* + * A type which wraps a socket + * + * socket_connection.c + * + * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt + */ + +#include "multiprocessing.h" + +#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL) +# include "poll.h" +#endif + +#ifdef MS_WINDOWS +# define WRITE(h, buffer, length) send((SOCKET)h, buffer, length, 0) +# define READ(h, buffer, length) recv((SOCKET)h, buffer, length, 0) +# define CLOSE(h) closesocket((SOCKET)h) +#else +# define WRITE(h, buffer, length) write(h, buffer, length) +# define READ(h, buffer, length) read(h, buffer, length) +# define CLOSE(h) close(h) +#endif + +/* + * Wrapper for PyErr_CheckSignals() which can be called without the GIL + */ + +static int +check_signals(void) +{ + PyGILState_STATE state; + int res; + state = PyGILState_Ensure(); + res = PyErr_CheckSignals(); + PyGILState_Release(state); + return res; +} + +/* + * Send string to file descriptor + */ + +static Py_ssize_t +_conn_sendall(HANDLE h, char *string, size_t length) +{ + char *p = string; + Py_ssize_t res; + + while (length > 0) { + res = WRITE(h, p, length); + if (res < 0) { + if (errno == EINTR) { + if (check_signals() < 0) + return MP_EXCEPTION_HAS_BEEN_SET; + continue; + } + return MP_SOCKET_ERROR; + } + length -= res; + p += res; + } + + return MP_SUCCESS; +} + +/* + * Receive string of exact length from file descriptor + */ + +static Py_ssize_t +_conn_recvall(HANDLE h, char *buffer, size_t length) +{ + size_t remaining = length; + Py_ssize_t temp; + char *p = buffer; + + while (remaining > 0) { + temp = READ(h, p, remaining); + if (temp < 0) { + if (errno == EINTR) { + if (check_signals() < 0) + return MP_EXCEPTION_HAS_BEEN_SET; + continue; + } + return temp; + } + else if (temp == 0) { + return remaining == length ? MP_END_OF_FILE : MP_EARLY_END_OF_FILE; + } + remaining -= temp; + p += temp; + } + + return MP_SUCCESS; +} + +/* + * Send a string prepended by the string length in network byte order + */ + +static Py_ssize_t +conn_send_string(ConnectionObject *conn, char *string, size_t length) +{ + Py_ssize_t res; + /* The "header" of the message is a 32 bit unsigned number (in + network order) which specifies the length of the "body". If + the message is shorter than about 16kb then it is quicker to + combine the "header" and the "body" of the message and send + them at once. */ + if (length < (16*1024)) { + char *message; + + message = PyMem_Malloc(length+4); + if (message == NULL) + return MP_MEMORY_ERROR; + + *(UINT32*)message = htonl((UINT32)length); + memcpy(message+4, string, length); + Py_BEGIN_ALLOW_THREADS + res = _conn_sendall(conn->handle, message, length+4); + Py_END_ALLOW_THREADS + PyMem_Free(message); + } else { + UINT32 lenbuff; + + if (length > MAX_MESSAGE_LENGTH) + return MP_BAD_MESSAGE_LENGTH; + + lenbuff = htonl((UINT32)length); + Py_BEGIN_ALLOW_THREADS + res = _conn_sendall(conn->handle, (char*)&lenbuff, 4) || + _conn_sendall(conn->handle, string, length); + Py_END_ALLOW_THREADS + } + return res; +} + +/* + * Attempts to read into buffer, or failing that into *newbuffer + * + * Returns number of bytes read. + */ + +static Py_ssize_t +conn_recv_string(ConnectionObject *conn, char *buffer, + size_t buflength, char **newbuffer, size_t maxlength) +{ + Py_ssize_t res; + UINT32 ulength; + + *newbuffer = NULL; + + Py_BEGIN_ALLOW_THREADS + res = _conn_recvall(conn->handle, (char*)&ulength, 4); + Py_END_ALLOW_THREADS + if (res < 0) + return res; + + ulength = ntohl(ulength); + if (ulength > maxlength) + return MP_BAD_MESSAGE_LENGTH; + + if (ulength > buflength) { + *newbuffer = buffer = PyMem_Malloc((size_t)ulength); + if (buffer == NULL) + return MP_MEMORY_ERROR; + } + + Py_BEGIN_ALLOW_THREADS + res = _conn_recvall(conn->handle, buffer, (size_t)ulength); + Py_END_ALLOW_THREADS + + if (res >= 0) { + res = (Py_ssize_t)ulength; + } else if (*newbuffer != NULL) { + PyMem_Free(*newbuffer); + *newbuffer = NULL; + } + return res; +} + +/* + * Check whether any data is available for reading -- neg timeout blocks + */ + +static int +conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save) +{ +#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL) + int res; + struct pollfd p; + + p.fd = (int)conn->handle; + p.events = POLLIN | POLLPRI; + p.revents = 0; + + if (timeout < 0) { + do { + res = poll(&p, 1, -1); + } while (res < 0 && errno == EINTR); + } else { + res = poll(&p, 1, (int)(timeout * 1000 + 0.5)); + if (res < 0 && errno == EINTR) { + /* We were interrupted by a signal. Just indicate a + timeout even though we are early. */ + return FALSE; + } + } + + if (res < 0) { + return MP_SOCKET_ERROR; + } else if (p.revents & (POLLNVAL|POLLERR)) { + Py_BLOCK_THREADS + PyErr_SetString(PyExc_IOError, "poll() gave POLLNVAL or POLLERR"); + Py_UNBLOCK_THREADS + return MP_EXCEPTION_HAS_BEEN_SET; + } else if (p.revents != 0) { + return TRUE; + } else { + assert(res == 0); + return FALSE; + } +#else + int res; + fd_set rfds; + + /* + * Verify the handle, issue 3321. Not required for windows. + */ + #ifndef MS_WINDOWS + if (((int)conn->handle) < 0 || ((int)conn->handle) >= FD_SETSIZE) { + Py_BLOCK_THREADS + PyErr_SetString(PyExc_IOError, "handle out of range in select()"); + Py_UNBLOCK_THREADS + return MP_EXCEPTION_HAS_BEEN_SET; + } + #endif + + FD_ZERO(&rfds); + FD_SET((SOCKET)conn->handle, &rfds); + + if (timeout < 0.0) { + do { + res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL); + } while (res < 0 && errno == EINTR); + } else { + struct timeval tv; + tv.tv_sec = (long)timeout; + tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5); + res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv); + if (res < 0 && errno == EINTR) { + /* We were interrupted by a signal. Just indicate a + timeout even though we are early. */ + return FALSE; + } + } + + if (res < 0) { + return MP_SOCKET_ERROR; + } else if (FD_ISSET(conn->handle, &rfds)) { + return TRUE; + } else { + assert(res == 0); + return FALSE; + } +#endif +} + +/* + * "connection.h" defines the Connection type using defs above + */ + +#define CONNECTION_NAME "Connection" +#define CONNECTION_TYPE ConnectionType + +#include "connection.h" diff --git a/Modules/_multiprocessing/win32_functions.c b/Modules/_multiprocessing/win32_functions.c new file mode 100644 index 0000000..9425929 --- /dev/null +++ b/Modules/_multiprocessing/win32_functions.c @@ -0,0 +1,267 @@ +/* + * Win32 functions used by multiprocessing package + * + * win32_functions.c + * + * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt + */ + +#include "multiprocessing.h" + + +#define WIN32_FUNCTION(func) \ + {#func, (PyCFunction)win32_ ## func, METH_VARARGS | METH_STATIC, ""} + +#define WIN32_CONSTANT(fmt, con) \ + PyDict_SetItemString(Win32Type.tp_dict, #con, Py_BuildValue(fmt, con)) + + +static PyObject * +win32_CloseHandle(PyObject *self, PyObject *args) +{ + HANDLE hObject; + BOOL success; + + if (!PyArg_ParseTuple(args, F_HANDLE, &hObject)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + success = CloseHandle(hObject); + Py_END_ALLOW_THREADS + + if (!success) + return PyErr_SetFromWindowsErr(0); + + Py_RETURN_NONE; +} + +static PyObject * +win32_ConnectNamedPipe(PyObject *self, PyObject *args) +{ + HANDLE hNamedPipe; + LPOVERLAPPED lpOverlapped; + BOOL success; + + if (!PyArg_ParseTuple(args, F_HANDLE F_POINTER, + &hNamedPipe, &lpOverlapped)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + success = ConnectNamedPipe(hNamedPipe, lpOverlapped); + Py_END_ALLOW_THREADS + + if (!success) + return PyErr_SetFromWindowsErr(0); + + Py_RETURN_NONE; +} + +static PyObject * +win32_CreateFile(PyObject *self, PyObject *args) +{ + LPCTSTR lpFileName; + DWORD dwDesiredAccess; + DWORD dwShareMode; + LPSECURITY_ATTRIBUTES lpSecurityAttributes; + DWORD dwCreationDisposition; + DWORD dwFlagsAndAttributes; + HANDLE hTemplateFile; + HANDLE handle; + + if (!PyArg_ParseTuple(args, "s" F_DWORD F_DWORD F_POINTER + F_DWORD F_DWORD F_HANDLE, + &lpFileName, &dwDesiredAccess, &dwShareMode, + &lpSecurityAttributes, &dwCreationDisposition, + &dwFlagsAndAttributes, &hTemplateFile)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + handle = CreateFile(lpFileName, dwDesiredAccess, + dwShareMode, lpSecurityAttributes, + dwCreationDisposition, + dwFlagsAndAttributes, hTemplateFile); + Py_END_ALLOW_THREADS + + if (handle == INVALID_HANDLE_VALUE) + return PyErr_SetFromWindowsErr(0); + + return Py_BuildValue(F_HANDLE, handle); +} + +static PyObject * +win32_CreateNamedPipe(PyObject *self, PyObject *args) +{ + LPCTSTR lpName; + DWORD dwOpenMode; + DWORD dwPipeMode; + DWORD nMaxInstances; + DWORD nOutBufferSize; + DWORD nInBufferSize; + DWORD nDefaultTimeOut; + LPSECURITY_ATTRIBUTES lpSecurityAttributes; + HANDLE handle; + + if (!PyArg_ParseTuple(args, "s" F_DWORD F_DWORD F_DWORD + F_DWORD F_DWORD F_DWORD F_POINTER, + &lpName, &dwOpenMode, &dwPipeMode, + &nMaxInstances, &nOutBufferSize, + &nInBufferSize, &nDefaultTimeOut, + &lpSecurityAttributes)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + handle = CreateNamedPipe(lpName, dwOpenMode, dwPipeMode, + nMaxInstances, nOutBufferSize, + nInBufferSize, nDefaultTimeOut, + lpSecurityAttributes); + Py_END_ALLOW_THREADS + + if (handle == INVALID_HANDLE_VALUE) + return PyErr_SetFromWindowsErr(0); + + return Py_BuildValue(F_HANDLE, handle); +} + +static PyObject * +win32_ExitProcess(PyObject *self, PyObject *args) +{ + UINT uExitCode; + + if (!PyArg_ParseTuple(args, "I", &uExitCode)) + return NULL; + + #if defined(Py_DEBUG) + SetErrorMode(SEM_FAILCRITICALERRORS|SEM_NOALIGNMENTFAULTEXCEPT|SEM_NOGPFAULTERRORBOX|SEM_NOOPENFILEERRORBOX); + _CrtSetReportMode(_CRT_ASSERT, _CRTDBG_MODE_DEBUG); + #endif + + + ExitProcess(uExitCode); + + return NULL; +} + +static PyObject * +win32_GetLastError(PyObject *self, PyObject *args) +{ + return Py_BuildValue(F_DWORD, GetLastError()); +} + +static PyObject * +win32_OpenProcess(PyObject *self, PyObject *args) +{ + DWORD dwDesiredAccess; + BOOL bInheritHandle; + DWORD dwProcessId; + HANDLE handle; + + if (!PyArg_ParseTuple(args, F_DWORD "i" F_DWORD, + &dwDesiredAccess, &bInheritHandle, &dwProcessId)) + return NULL; + + handle = OpenProcess(dwDesiredAccess, bInheritHandle, dwProcessId); + if (handle == NULL) + return PyErr_SetFromWindowsErr(0); + + return Py_BuildValue(F_HANDLE, handle); +} + +static PyObject * +win32_SetNamedPipeHandleState(PyObject *self, PyObject *args) +{ + HANDLE hNamedPipe; + PyObject *oArgs[3]; + DWORD dwArgs[3], *pArgs[3] = {NULL, NULL, NULL}; + int i; + + if (!PyArg_ParseTuple(args, F_HANDLE "OOO", + &hNamedPipe, &oArgs[0], &oArgs[1], &oArgs[2])) + return NULL; + + PyErr_Clear(); + + for (i = 0 ; i < 3 ; i++) { + if (oArgs[i] != Py_None) { + dwArgs[i] = PyInt_AsUnsignedLongMask(oArgs[i]); + if (PyErr_Occurred()) + return NULL; + pArgs[i] = &dwArgs[i]; + } + } + + if (!SetNamedPipeHandleState(hNamedPipe, pArgs[0], pArgs[1], pArgs[2])) + return PyErr_SetFromWindowsErr(0); + + Py_RETURN_NONE; +} + +static PyObject * +win32_WaitNamedPipe(PyObject *self, PyObject *args) +{ + LPCTSTR lpNamedPipeName; + DWORD nTimeOut; + BOOL success; + + if (!PyArg_ParseTuple(args, "s" F_DWORD, &lpNamedPipeName, &nTimeOut)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + success = WaitNamedPipe(lpNamedPipeName, nTimeOut); + Py_END_ALLOW_THREADS + + if (!success) + return PyErr_SetFromWindowsErr(0); + + Py_RETURN_NONE; +} + +static PyMethodDef win32_methods[] = { + WIN32_FUNCTION(CloseHandle), + WIN32_FUNCTION(GetLastError), + WIN32_FUNCTION(OpenProcess), + WIN32_FUNCTION(ExitProcess), + WIN32_FUNCTION(ConnectNamedPipe), + WIN32_FUNCTION(CreateFile), + WIN32_FUNCTION(CreateNamedPipe), + WIN32_FUNCTION(SetNamedPipeHandleState), + WIN32_FUNCTION(WaitNamedPipe), + {NULL} +}; + + +PyTypeObject Win32Type = { + PyVarObject_HEAD_INIT(NULL, 0) +}; + + +PyObject * +create_win32_namespace(void) +{ + Win32Type.tp_name = "_multiprocessing.win32"; + Win32Type.tp_methods = win32_methods; + if (PyType_Ready(&Win32Type) < 0) + return NULL; + Py_INCREF(&Win32Type); + + WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS); + WIN32_CONSTANT(F_DWORD, ERROR_NO_DATA); + WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY); + WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED); + WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT); + WIN32_CONSTANT(F_DWORD, GENERIC_READ); + WIN32_CONSTANT(F_DWORD, GENERIC_WRITE); + WIN32_CONSTANT(F_DWORD, INFINITE); + WIN32_CONSTANT(F_DWORD, NMPWAIT_WAIT_FOREVER); + WIN32_CONSTANT(F_DWORD, OPEN_EXISTING); + WIN32_CONSTANT(F_DWORD, PIPE_ACCESS_DUPLEX); + WIN32_CONSTANT(F_DWORD, PIPE_ACCESS_INBOUND); + WIN32_CONSTANT(F_DWORD, PIPE_READMODE_MESSAGE); + WIN32_CONSTANT(F_DWORD, PIPE_TYPE_MESSAGE); + WIN32_CONSTANT(F_DWORD, PIPE_UNLIMITED_INSTANCES); + WIN32_CONSTANT(F_DWORD, PIPE_WAIT); + WIN32_CONSTANT(F_DWORD, PROCESS_ALL_ACCESS); + + WIN32_CONSTANT("i", NULL); + + return (PyObject*)&Win32Type; +} |