diff options
author | Benjamin Peterson <benjamin@python.org> | 2008-06-11 02:40:25 (GMT) |
---|---|---|
committer | Benjamin Peterson <benjamin@python.org> | 2008-06-11 02:40:25 (GMT) |
commit | 190d56e00990eadce8ad1b7d1785016cc109db7b (patch) | |
tree | ef143cf28648baf9fe708640f67586f14273a0ff /Modules/_multiprocessing | |
parent | d5299866f95f1e74255312444176280494d2782a (diff) | |
download | cpython-190d56e00990eadce8ad1b7d1785016cc109db7b.zip cpython-190d56e00990eadce8ad1b7d1785016cc109db7b.tar.gz cpython-190d56e00990eadce8ad1b7d1785016cc109db7b.tar.bz2 |
add the multiprocessing package to fulfill PEP 371
Diffstat (limited to 'Modules/_multiprocessing')
-rw-r--r-- | Modules/_multiprocessing/connection.h | 515 | ||||
-rw-r--r-- | Modules/_multiprocessing/multiprocessing.c | 308 | ||||
-rw-r--r-- | Modules/_multiprocessing/multiprocessing.h | 163 | ||||
-rw-r--r-- | Modules/_multiprocessing/pipe_connection.c | 136 | ||||
-rw-r--r-- | Modules/_multiprocessing/semaphore.c | 625 | ||||
-rw-r--r-- | Modules/_multiprocessing/socket_connection.c | 180 | ||||
-rw-r--r-- | Modules/_multiprocessing/win32_functions.c | 260 |
7 files changed, 2187 insertions, 0 deletions
diff --git a/Modules/_multiprocessing/connection.h b/Modules/_multiprocessing/connection.h new file mode 100644 index 0000000..8e2f7c2 --- /dev/null +++ b/Modules/_multiprocessing/connection.h @@ -0,0 +1,515 @@ +/* + * Definition of a `Connection` type. + * Used by `socket_connection.c` and `pipe_connection.c`. + * + * connection.h + * + * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt + */ + +#ifndef CONNECTION_H +#define CONNECTION_H + +/* + * Read/write flags + */ + +#define READABLE 1 +#define WRITABLE 2 + +#define CHECK_READABLE(self) \ + if (!(self->flags & READABLE)) { \ + PyErr_SetString(PyExc_IOError, "connection is write-only"); \ + return NULL; \ + } + +#define CHECK_WRITABLE(self) \ + if (!(self->flags & WRITABLE)) { \ + PyErr_SetString(PyExc_IOError, "connection is read-only"); \ + return NULL; \ + } + +/* + * Allocation and deallocation + */ + +static PyObject * +connection_new(PyTypeObject *type, PyObject *args, PyObject *kwds) +{ + ConnectionObject *self; + HANDLE handle; + BOOL readable = TRUE, writable = TRUE; + + static char *kwlist[] = {"handle", "readable", "writable", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwds, F_HANDLE "|ii", kwlist, + &handle, &readable, &writable)) + return NULL; + + if (handle == INVALID_HANDLE_VALUE || (Py_ssize_t)handle < 0) { + PyErr_Format(PyExc_IOError, "invalid handle %" + PY_FORMAT_SIZE_T "d", (Py_ssize_t)handle); + return NULL; + } + + if (!readable && !writable) { + PyErr_SetString(PyExc_ValueError, + "either readable or writable must be true"); + return NULL; + } + + self = PyObject_New(ConnectionObject, type); + if (self == NULL) + return NULL; + + self->weakreflist = NULL; + self->handle = handle; + self->flags = 0; + + if (readable) + self->flags |= READABLE; + if (writable) + self->flags |= WRITABLE; + assert(self->flags >= 1 && self->flags <= 3); + + return (PyObject*)self; +} + +static void +connection_dealloc(ConnectionObject* self) +{ + if (self->weakreflist != NULL) + PyObject_ClearWeakRefs((PyObject*)self); + + if (self->handle != INVALID_HANDLE_VALUE) { + Py_BEGIN_ALLOW_THREADS + CLOSE(self->handle); + Py_END_ALLOW_THREADS + } + PyObject_Del(self); +} + +/* + * Functions for transferring buffers + */ + +static PyObject * +connection_sendbytes(ConnectionObject *self, PyObject *args) +{ + char *buffer; + Py_ssize_t length, offset=0, size=PY_SSIZE_T_MIN; + int res; + + if (!PyArg_ParseTuple(args, F_RBUFFER "#|" F_PY_SSIZE_T F_PY_SSIZE_T, + &buffer, &length, &offset, &size)) + return NULL; + + CHECK_WRITABLE(self); + + if (offset < 0) { + PyErr_SetString(PyExc_ValueError, "offset is negative"); + return NULL; + } + if (length < offset) { + PyErr_SetString(PyExc_ValueError, "buffer length < offset"); + return NULL; + } + + if (size == PY_SSIZE_T_MIN) { + size = length - offset; + } else { + if (size < 0) { + PyErr_SetString(PyExc_ValueError, "size is negative"); + return NULL; + } + if (offset + size > length) { + PyErr_SetString(PyExc_ValueError, + "buffer length < offset + size"); + return NULL; + } + } + + Py_BEGIN_ALLOW_THREADS + res = conn_send_string(self, buffer + offset, size); + Py_END_ALLOW_THREADS + + if (res < 0) + return mp_SetError(PyExc_IOError, res); + + Py_RETURN_NONE; +} + +static PyObject * +connection_recvbytes(ConnectionObject *self, PyObject *args) +{ + char *freeme = NULL; + Py_ssize_t res, maxlength = PY_SSIZE_T_MAX; + PyObject *result = NULL; + + if (!PyArg_ParseTuple(args, "|" F_PY_SSIZE_T, &maxlength)) + return NULL; + + CHECK_READABLE(self); + + if (maxlength < 0) { + PyErr_SetString(PyExc_ValueError, "maxlength < 0"); + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE, + &freeme, maxlength); + Py_END_ALLOW_THREADS + + if (res < 0) { + if (res == MP_BAD_MESSAGE_LENGTH) { + if ((self->flags & WRITABLE) == 0) { + Py_BEGIN_ALLOW_THREADS + CLOSE(self->handle); + Py_END_ALLOW_THREADS + self->handle = INVALID_HANDLE_VALUE; + } else { + self->flags = WRITABLE; + } + } + mp_SetError(PyExc_IOError, res); + } else { + if (freeme == NULL) { + result = PyString_FromStringAndSize(self->buffer, res); + } else { + result = PyString_FromStringAndSize(freeme, res); + PyMem_Free(freeme); + } + } + + return result; +} + +static PyObject * +connection_recvbytes_into(ConnectionObject *self, PyObject *args) +{ + char *freeme = NULL, *buffer = NULL; + Py_ssize_t res, length, offset = 0; + PyObject *result = NULL; + + if (!PyArg_ParseTuple(args, "w#|" F_PY_SSIZE_T, + &buffer, &length, &offset)) + return NULL; + + CHECK_READABLE(self); + + if (offset < 0) { + PyErr_SetString(PyExc_ValueError, "negative offset"); + return NULL; + } + + if (offset > length) { + PyErr_SetString(PyExc_ValueError, "offset too large"); + return NULL; + } + + Py_BEGIN_ALLOW_THREADS + res = conn_recv_string(self, buffer+offset, length-offset, + &freeme, PY_SSIZE_T_MAX); + Py_END_ALLOW_THREADS + + if (res < 0) { + if (res == MP_BAD_MESSAGE_LENGTH) { + if ((self->flags & WRITABLE) == 0) { + Py_BEGIN_ALLOW_THREADS + CLOSE(self->handle); + Py_END_ALLOW_THREADS + self->handle = INVALID_HANDLE_VALUE; + } else { + self->flags = WRITABLE; + } + } + mp_SetError(PyExc_IOError, res); + } else { + if (freeme == NULL) { + result = PyInt_FromSsize_t(res); + } else { + result = PyObject_CallFunction(BufferTooShort, + F_RBUFFER "#", + freeme, res); + PyMem_Free(freeme); + if (result) { + PyErr_SetObject(BufferTooShort, result); + Py_DECREF(result); + } + return NULL; + } + } + + return result; +} + +/* + * Functions for transferring objects + */ + +static PyObject * +connection_send_obj(ConnectionObject *self, PyObject *obj) +{ + char *buffer; + int res; + Py_ssize_t length; + PyObject *pickled_string = NULL; + + CHECK_WRITABLE(self); + + pickled_string = PyObject_CallFunctionObjArgs(pickle_dumps, obj, + pickle_protocol, NULL); + if (!pickled_string) + goto failure; + + if (PyString_AsStringAndSize(pickled_string, &buffer, &length) < 0) + goto failure; + + Py_BEGIN_ALLOW_THREADS + res = conn_send_string(self, buffer, (int)length); + Py_END_ALLOW_THREADS + + if (res < 0) { + mp_SetError(PyExc_IOError, res); + goto failure; + } + + Py_XDECREF(pickled_string); + Py_RETURN_NONE; + + failure: + Py_XDECREF(pickled_string); + return NULL; +} + +static PyObject * +connection_recv_obj(ConnectionObject *self) +{ + char *freeme = NULL; + Py_ssize_t res; + PyObject *temp = NULL, *result = NULL; + + CHECK_READABLE(self); + + Py_BEGIN_ALLOW_THREADS + res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE, + &freeme, PY_SSIZE_T_MAX); + Py_END_ALLOW_THREADS + + if (res < 0) { + if (res == MP_BAD_MESSAGE_LENGTH) { + if ((self->flags & WRITABLE) == 0) { + Py_BEGIN_ALLOW_THREADS + CLOSE(self->handle); + Py_END_ALLOW_THREADS + self->handle = INVALID_HANDLE_VALUE; + } else { + self->flags = WRITABLE; + } + } + mp_SetError(PyExc_IOError, res); + } else { + if (freeme == NULL) { + temp = PyString_FromStringAndSize(self->buffer, res); + } else { + temp = PyString_FromStringAndSize(freeme, res); + PyMem_Free(freeme); + } + } + + if (temp) + result = PyObject_CallFunctionObjArgs(pickle_loads, + temp, NULL); + Py_XDECREF(temp); + return result; +} + +/* + * Other functions + */ + +static PyObject * +connection_poll(ConnectionObject *self, PyObject *args) +{ + PyObject *timeout_obj = NULL; + double timeout = 0.0; + int res; + + CHECK_READABLE(self); + + if (!PyArg_ParseTuple(args, "|O", &timeout_obj)) + return NULL; + + if (timeout_obj == NULL) { + timeout = 0.0; + } else if (timeout_obj == Py_None) { + timeout = -1.0; /* block forever */ + } else { + timeout = PyFloat_AsDouble(timeout_obj); + if (PyErr_Occurred()) + return NULL; + if (timeout < 0.0) + timeout = 0.0; + } + + Py_BEGIN_ALLOW_THREADS + res = conn_poll(self, timeout); + Py_END_ALLOW_THREADS + + switch (res) { + case TRUE: + Py_RETURN_TRUE; + case FALSE: + Py_RETURN_FALSE; + default: + return mp_SetError(PyExc_IOError, res); + } +} + +static PyObject * +connection_fileno(ConnectionObject* self) +{ + if (self->handle == INVALID_HANDLE_VALUE) { + PyErr_SetString(PyExc_IOError, "handle is invalid"); + return NULL; + } + return PyInt_FromLong((long)self->handle); +} + +static PyObject * +connection_close(ConnectionObject *self) +{ + if (self->handle != INVALID_HANDLE_VALUE) { + Py_BEGIN_ALLOW_THREADS + CLOSE(self->handle); + Py_END_ALLOW_THREADS + self->handle = INVALID_HANDLE_VALUE; + } + + Py_RETURN_NONE; +} + +static PyObject * +connection_repr(ConnectionObject *self) +{ + static char *conn_type[] = {"read-only", "write-only", "read-write"}; + + assert(self->flags >= 1 && self->flags <= 3); + return FROM_FORMAT("<%s %s, handle %" PY_FORMAT_SIZE_T "d>", + conn_type[self->flags - 1], + CONNECTION_NAME, (Py_ssize_t)self->handle); +} + +/* + * Getters and setters + */ + +static PyObject * +connection_closed(ConnectionObject *self, void *closure) +{ + return PyBool_FromLong((long)(self->handle == INVALID_HANDLE_VALUE)); +} + +static PyObject * +connection_readable(ConnectionObject *self, void *closure) +{ + return PyBool_FromLong((long)(self->flags & READABLE)); +} + +static PyObject * +connection_writable(ConnectionObject *self, void *closure) +{ + return PyBool_FromLong((long)(self->flags & WRITABLE)); +} + +/* + * Tables + */ + +static PyMethodDef connection_methods[] = { + {"send_bytes", (PyCFunction)connection_sendbytes, METH_VARARGS, + "send the byte data from a readable buffer-like object"}, + {"recv_bytes", (PyCFunction)connection_recvbytes, METH_VARARGS, + "receive byte data as a string"}, + {"recv_bytes_into",(PyCFunction)connection_recvbytes_into,METH_VARARGS, + "receive byte data into a writeable buffer-like object\n" + "returns the number of bytes read"}, + + {"send", (PyCFunction)connection_send_obj, METH_O, + "send a (picklable) object"}, + {"recv", (PyCFunction)connection_recv_obj, METH_NOARGS, + "receive a (picklable) object"}, + + {"poll", (PyCFunction)connection_poll, METH_VARARGS, + "whether there is any input available to be read"}, + {"fileno", (PyCFunction)connection_fileno, METH_NOARGS, + "file descriptor or handle of the connection"}, + {"close", (PyCFunction)connection_close, METH_NOARGS, + "close the connection"}, + + {NULL} /* Sentinel */ +}; + +static PyGetSetDef connection_getset[] = { + {"closed", (getter)connection_closed, NULL, + "True if the connection is closed", NULL}, + {"readable", (getter)connection_readable, NULL, + "True if the connection is readable", NULL}, + {"writable", (getter)connection_writable, NULL, + "True if the connection is writable", NULL}, + {NULL} +}; + +/* + * Connection type + */ + +PyDoc_STRVAR(connection_doc, + "Connection type whose constructor signature is\n\n" + " Connection(handle, readable=True, writable=True).\n\n" + "The constructor does *not* duplicate the handle."); + +PyTypeObject CONNECTION_TYPE = { + PyVarObject_HEAD_INIT(NULL, 0) + /* tp_name */ "_multiprocessing." CONNECTION_NAME, + /* tp_basicsize */ sizeof(ConnectionObject), + /* tp_itemsize */ 0, + /* tp_dealloc */ (destructor)connection_dealloc, + /* tp_print */ 0, + /* tp_getattr */ 0, + /* tp_setattr */ 0, + /* tp_compare */ 0, + /* tp_repr */ (reprfunc)connection_repr, + /* tp_as_number */ 0, + /* tp_as_sequence */ 0, + /* tp_as_mapping */ 0, + /* tp_hash */ 0, + /* tp_call */ 0, + /* tp_str */ 0, + /* tp_getattro */ 0, + /* tp_setattro */ 0, + /* tp_as_buffer */ 0, + /* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | + Py_TPFLAGS_HAVE_WEAKREFS, + /* tp_doc */ connection_doc, + /* tp_traverse */ 0, + /* tp_clear */ 0, + /* tp_richcompare */ 0, + /* tp_weaklistoffset */ offsetof(ConnectionObject, weakreflist), + /* tp_iter */ 0, + /* tp_iternext */ 0, + /* tp_methods */ connection_methods, + /* tp_members */ 0, + /* tp_getset */ connection_getset, + /* tp_base */ 0, + /* tp_dict */ 0, + /* tp_descr_get */ 0, + /* tp_descr_set */ 0, + /* tp_dictoffset */ 0, + /* tp_init */ 0, + /* tp_alloc */ 0, + /* tp_new */ connection_new, +}; + +#endif /* CONNECTION_H */ diff --git a/Modules/_multiprocessing/multiprocessing.c b/Modules/_multiprocessing/multiprocessing.c new file mode 100644 index 0000000..1050e1f --- /dev/null +++ b/Modules/_multiprocessing/multiprocessing.c @@ -0,0 +1,308 @@ +/*
+ * Extension module used by mutliprocessing package
+ *
+ * multiprocessing.c
+ *
+ * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+ */
+
+#include "multiprocessing.h"
+
+PyObject *create_win32_namespace(void);
+
+PyObject *pickle_dumps, *pickle_loads, *pickle_protocol;
+PyObject *ProcessError, *BufferTooShort;
+
+/*
+ * Function which raises exceptions based on error codes
+ */
+
+PyObject *
+mp_SetError(PyObject *Type, int num)
+{
+ switch (num) {
+#ifdef MS_WINDOWS
+ case MP_STANDARD_ERROR:
+ if (Type == NULL)
+ Type = PyExc_WindowsError;
+ PyErr_SetExcFromWindowsErr(Type, 0);
+ break;
+ case MP_SOCKET_ERROR:
+ if (Type == NULL)
+ Type = PyExc_WindowsError;
+ PyErr_SetExcFromWindowsErr(Type, WSAGetLastError());
+ break;
+#else /* !MS_WINDOWS */
+ case MP_STANDARD_ERROR:
+ case MP_SOCKET_ERROR:
+ if (Type == NULL)
+ Type = PyExc_OSError;
+ PyErr_SetFromErrno(Type);
+ break;
+#endif /* !MS_WINDOWS */
+ case MP_MEMORY_ERROR:
+ PyErr_NoMemory();
+ break;
+ case MP_END_OF_FILE:
+ PyErr_SetNone(PyExc_EOFError);
+ break;
+ case MP_EARLY_END_OF_FILE:
+ PyErr_SetString(PyExc_IOError,
+ "got end of file during message");
+ break;
+ case MP_BAD_MESSAGE_LENGTH:
+ PyErr_SetString(PyExc_IOError, "bad message length");
+ break;
+ case MP_EXCEPTION_HAS_BEEN_SET:
+ break;
+ default:
+ PyErr_Format(PyExc_RuntimeError,
+ "unkown error number %d", num);
+ }
+ return NULL;
+}
+
+
+/*
+ * Windows only
+ */
+
+#ifdef MS_WINDOWS
+
+/* On Windows we set an event to signal Ctrl-C; compare with timemodule.c */
+
+HANDLE sigint_event = NULL;
+
+static BOOL WINAPI
+ProcessingCtrlHandler(DWORD dwCtrlType)
+{
+ SetEvent(sigint_event);
+ return FALSE;
+}
+
+/*
+ * Unix only
+ */
+
+#else /* !MS_WINDOWS */
+
+#if HAVE_FD_TRANSFER
+
+/* Functions for transferring file descriptors between processes.
+ Reimplements some of the functionality of the fdcred
+ module at http://www.mca-ltd.com/resources/fdcred_1.tgz. */
+
+static PyObject *
+multiprocessing_sendfd(PyObject *self, PyObject *args)
+{
+ int conn, fd, res;
+ char dummy_char;
+ char buf[CMSG_SPACE(sizeof(int))];
+ struct msghdr msg = {0};
+ struct iovec dummy_iov;
+ struct cmsghdr *cmsg;
+
+ if (!PyArg_ParseTuple(args, "ii", &conn, &fd))
+ return NULL;
+
+ dummy_iov.iov_base = &dummy_char;
+ dummy_iov.iov_len = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+ msg.msg_iov = &dummy_iov;
+ msg.msg_iovlen = 1;
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+ msg.msg_controllen = cmsg->cmsg_len;
+ *(int*)CMSG_DATA(cmsg) = fd;
+
+ Py_BEGIN_ALLOW_THREADS
+ res = sendmsg(conn, &msg, 0);
+ Py_END_ALLOW_THREADS
+
+ if (res < 0)
+ return PyErr_SetFromErrno(PyExc_OSError);
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+multiprocessing_recvfd(PyObject *self, PyObject *args)
+{
+ int conn, fd, res;
+ char dummy_char;
+ char buf[CMSG_SPACE(sizeof(int))];
+ struct msghdr msg = {0};
+ struct iovec dummy_iov;
+ struct cmsghdr *cmsg;
+
+ if (!PyArg_ParseTuple(args, "i", &conn))
+ return NULL;
+
+ dummy_iov.iov_base = &dummy_char;
+ dummy_iov.iov_len = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+ msg.msg_iov = &dummy_iov;
+ msg.msg_iovlen = 1;
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ Py_BEGIN_ALLOW_THREADS
+ res = recvmsg(conn, &msg, 0);
+ Py_END_ALLOW_THREADS
+
+ if (res < 0)
+ return PyErr_SetFromErrno(PyExc_OSError);
+
+ fd = *(int*)CMSG_DATA(cmsg);
+ return Py_BuildValue("i", fd);
+}
+
+#endif /* HAVE_FD_TRANSFER */
+
+#endif /* !MS_WINDOWS */
+
+
+/*
+ * All platforms
+ */
+
+static PyObject*
+multiprocessing_address_of_buffer(PyObject *self, PyObject *obj)
+{
+ void *buffer;
+ Py_ssize_t buffer_len;
+
+ if (PyObject_AsWriteBuffer(obj, &buffer, &buffer_len) < 0)
+ return NULL;
+
+ return Py_BuildValue("N" F_PY_SSIZE_T,
+ PyLong_FromVoidPtr(buffer), buffer_len);
+}
+
+
+/*
+ * Function table
+ */
+
+static PyMethodDef module_methods[] = {
+ {"address_of_buffer", multiprocessing_address_of_buffer, METH_O,
+ "address_of_buffer(obj) -> int\n"
+ "Return address of obj assuming obj supports buffer inteface"},
+#if HAVE_FD_TRANSFER
+ {"sendfd", multiprocessing_sendfd, METH_VARARGS,
+ "sendfd(sockfd, fd) -> None\n"
+ "Send file descriptor given by fd over the unix domain socket\n"
+ "whose file decriptor is sockfd"},
+ {"recvfd", multiprocessing_recvfd, METH_VARARGS,
+ "recvfd(sockfd) -> fd\n"
+ "Receive a file descriptor over a unix domain socket\n"
+ "whose file decriptor is sockfd"},
+#endif
+ {NULL}
+};
+
+
+/*
+ * Initialize
+ */
+
+PyMODINIT_FUNC
+init_multiprocessing(void)
+{
+ PyObject *module, *temp;
+
+ /* Initialize module */
+ module = Py_InitModule("_multiprocessing", module_methods);
+ if (!module)
+ return;
+
+ /* Get copy of objects from pickle */
+ temp = PyImport_ImportModule(PICKLE_MODULE);
+ if (!temp)
+ return;
+ pickle_dumps = PyObject_GetAttrString(temp, "dumps");
+ pickle_loads = PyObject_GetAttrString(temp, "loads");
+ pickle_protocol = PyObject_GetAttrString(temp, "HIGHEST_PROTOCOL");
+ Py_XDECREF(temp);
+
+ /* Get copy of BufferTooShort */
+ temp = PyImport_ImportModule("multiprocessing");
+ if (!temp)
+ return;
+ BufferTooShort = PyObject_GetAttrString(temp, "BufferTooShort");
+ Py_XDECREF(temp);
+
+ /* Add connection type to module */
+ if (PyType_Ready(&ConnectionType) < 0)
+ return;
+ Py_INCREF(&ConnectionType);
+ PyModule_AddObject(module, "Connection", (PyObject*)&ConnectionType);
+
+#if defined(MS_WINDOWS) || HAVE_SEM_OPEN
+ /* Add SemLock type to module */
+ if (PyType_Ready(&SemLockType) < 0)
+ return;
+ Py_INCREF(&SemLockType);
+ PyDict_SetItemString(SemLockType.tp_dict, "SEM_VALUE_MAX",
+ Py_BuildValue("i", SEM_VALUE_MAX));
+ PyModule_AddObject(module, "SemLock", (PyObject*)&SemLockType);
+#endif
+
+#ifdef MS_WINDOWS
+ /* Add PipeConnection to module */
+ if (PyType_Ready(&PipeConnectionType) < 0)
+ return;
+ Py_INCREF(&PipeConnectionType);
+ PyModule_AddObject(module, "PipeConnection",
+ (PyObject*)&PipeConnectionType);
+
+ /* Initialize win32 class and add to multiprocessing */
+ temp = create_win32_namespace();
+ if (!temp)
+ return;
+ PyModule_AddObject(module, "win32", temp);
+
+ /* Initialize the event handle used to signal Ctrl-C */
+ sigint_event = CreateEvent(NULL, TRUE, FALSE, NULL);
+ if (!sigint_event) {
+ PyErr_SetFromWindowsErr(0);
+ return;
+ }
+ if (!SetConsoleCtrlHandler(ProcessingCtrlHandler, TRUE)) {
+ PyErr_SetFromWindowsErr(0);
+ return;
+ }
+#endif
+
+ /* Add configuration macros */
+ temp = PyDict_New();
+ if (!temp)
+ return;
+ if (PyModule_AddObject(module, "flags", temp) < 0)
+ return;
+
+#define ADD_FLAG(name) \
+ if (PyDict_SetItemString(temp, #name, Py_BuildValue("i", name)) < 0) return
+
+#ifdef HAVE_SEM_OPEN
+ ADD_FLAG(HAVE_SEM_OPEN);
+#endif
+#ifdef HAVE_SEM_TIMEDWAIT
+ ADD_FLAG(HAVE_SEM_TIMEDWAIT);
+#endif
+#ifdef HAVE_FD_TRANSFER
+ ADD_FLAG(HAVE_FD_TRANSFER);
+#endif
+#ifdef HAVE_BROKEN_SEM_GETVALUE
+ ADD_FLAG(HAVE_BROKEN_SEM_GETVALUE);
+#endif
+#ifdef HAVE_BROKEN_SEM_UNLINK
+ ADD_FLAG(HAVE_BROKEN_SEM_UNLINK);
+#endif
+}
diff --git a/Modules/_multiprocessing/multiprocessing.h b/Modules/_multiprocessing/multiprocessing.h new file mode 100644 index 0000000..40f2c08 --- /dev/null +++ b/Modules/_multiprocessing/multiprocessing.h @@ -0,0 +1,163 @@ +#ifndef MULTIPROCESSING_H
+#define MULTIPROCESSING_H
+
+#define PY_SSIZE_T_CLEAN
+
+#include "Python.h"
+#include "structmember.h"
+#include "pythread.h"
+
+/*
+ * Platform includes and definitions
+ */
+
+#ifdef MS_WINDOWS
+# define WIN32_LEAN_AND_MEAN
+# include <windows.h>
+# include <winsock2.h>
+# include <process.h> /* getpid() */
+# define SEM_HANDLE HANDLE
+# define SEM_VALUE_MAX LONG_MAX
+#else
+# include <fcntl.h> /* O_CREAT and O_EXCL */
+# include <sys/socket.h>
+# include <arpa/inet.h> /* htonl() and ntohl() */
+# if HAVE_SEM_OPEN
+# include <semaphore.h>
+ typedef sem_t *SEM_HANDLE;
+# endif
+# define HANDLE int
+# define SOCKET int
+# define BOOL int
+# define UINT32 uint32_t
+# define INT32 int32_t
+# define TRUE 1
+# define FALSE 0
+# define INVALID_HANDLE_VALUE (-1)
+#endif
+
+/*
+ * Make sure Py_ssize_t available
+ */
+
+#if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN)
+ typedef int Py_ssize_t;
+# define PY_SSIZE_T_MAX INT_MAX
+# define PY_SSIZE_T_MIN INT_MIN
+# define F_PY_SSIZE_T "i"
+# define PY_FORMAT_SIZE_T ""
+# define PyInt_FromSsize_t(n) PyInt_FromLong((long)n)
+#else
+# define F_PY_SSIZE_T "n"
+#endif
+
+/*
+ * Format codes
+ */
+
+#if SIZEOF_VOID_P == SIZEOF_LONG
+# define F_POINTER "k"
+# define T_POINTER T_ULONG
+#elif defined(HAVE_LONG_LONG) && (SIZEOF_VOID_P == SIZEOF_LONG_LONG)
+# define F_POINTER "K"
+# define T_POINTER T_ULONGLONG
+#else
+# error "can't find format code for unsigned integer of same size as void*"
+#endif
+
+#ifdef MS_WINDOWS
+# define F_HANDLE F_POINTER
+# define T_HANDLE T_POINTER
+# define F_SEM_HANDLE F_HANDLE
+# define T_SEM_HANDLE T_HANDLE
+# define F_DWORD "k"
+# define T_DWORD T_ULONG
+#else
+# define F_HANDLE "i"
+# define T_HANDLE T_INT
+# define F_SEM_HANDLE F_POINTER
+# define T_SEM_HANDLE T_POINTER
+#endif
+
+#if PY_VERSION_HEX >= 0x03000000
+# define F_RBUFFER "y"
+#else
+# define F_RBUFFER "s"
+#endif
+
+/*
+ * Error codes which can be returned by functions called without GIL
+ */
+
+#define MP_SUCCESS (0)
+#define MP_STANDARD_ERROR (-1)
+#define MP_MEMORY_ERROR (-1001)
+#define MP_END_OF_FILE (-1002)
+#define MP_EARLY_END_OF_FILE (-1003)
+#define MP_BAD_MESSAGE_LENGTH (-1004)
+#define MP_SOCKET_ERROR (-1005)
+#define MP_EXCEPTION_HAS_BEEN_SET (-1006)
+
+PyObject *mp_SetError(PyObject *Type, int num);
+
+/*
+ * Externs - not all will really exist on all platforms
+ */
+
+extern PyObject *pickle_dumps;
+extern PyObject *pickle_loads;
+extern PyObject *pickle_protocol;
+extern PyObject *BufferTooShort;
+extern PyTypeObject SemLockType;
+extern PyTypeObject ConnectionType;
+extern PyTypeObject PipeConnectionType;
+extern HANDLE sigint_event;
+
+/*
+ * Py3k compatibility
+ */
+
+#if PY_VERSION_HEX >= 0x03000000
+# define PICKLE_MODULE "pickle"
+# define FROM_FORMAT PyUnicode_FromFormat
+# define PyInt_FromLong PyLong_FromLong
+# define PyInt_FromSsize_t PyLong_FromSsize_t
+#else
+# define PICKLE_MODULE "cPickle"
+# define FROM_FORMAT PyString_FromFormat
+#endif
+
+#ifndef PyVarObject_HEAD_INIT
+# define PyVarObject_HEAD_INIT(type, size) PyObject_HEAD_INIT(type) size,
+#endif
+
+#ifndef Py_TPFLAGS_HAVE_WEAKREFS
+# define Py_TPFLAGS_HAVE_WEAKREFS 0
+#endif
+
+/*
+ * Connection definition
+ */
+
+#define CONNECTION_BUFFER_SIZE 1024
+
+typedef struct {
+ PyObject_HEAD
+ HANDLE handle;
+ int flags;
+ PyObject *weakreflist;
+ char buffer[CONNECTION_BUFFER_SIZE];
+} ConnectionObject;
+
+/*
+ * Miscellaneous
+ */
+
+#define MAX_MESSAGE_LENGTH 0x7fffffff
+
+#ifndef MIN
+# define MIN(x, y) ((x) < (y) ? x : y)
+# define MAX(x, y) ((x) > (y) ? x : y)
+#endif
+
+#endif /* MULTIPROCESSING_H */
diff --git a/Modules/_multiprocessing/pipe_connection.c b/Modules/_multiprocessing/pipe_connection.c new file mode 100644 index 0000000..a96338f --- /dev/null +++ b/Modules/_multiprocessing/pipe_connection.c @@ -0,0 +1,136 @@ +/*
+ * A type which wraps a pipe handle in message oriented mode
+ *
+ * pipe_connection.c
+ *
+ * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+ */
+
+#include "multiprocessing.h"
+
+#define CLOSE(h) CloseHandle(h)
+
+/*
+ * Send string to the pipe; assumes in message oriented mode
+ */
+
+static Py_ssize_t
+conn_send_string(ConnectionObject *conn, char *string, size_t length)
+{
+ DWORD amount_written;
+
+ return WriteFile(conn->handle, string, length, &amount_written, NULL)
+ ? MP_SUCCESS : MP_STANDARD_ERROR;
+}
+
+/*
+ * Attempts to read into buffer, or if buffer too small into *newbuffer.
+ *
+ * Returns number of bytes read. Assumes in message oriented mode.
+ */
+
+static Py_ssize_t
+conn_recv_string(ConnectionObject *conn, char *buffer,
+ size_t buflength, char **newbuffer, size_t maxlength)
+{
+ DWORD left, length, full_length, err;
+
+ *newbuffer = NULL;
+
+ if (ReadFile(conn->handle, buffer, MIN(buflength, maxlength),
+ &length, NULL))
+ return length;
+
+ err = GetLastError();
+ if (err != ERROR_MORE_DATA) {
+ if (err == ERROR_BROKEN_PIPE)
+ return MP_END_OF_FILE;
+ return MP_STANDARD_ERROR;
+ }
+
+ if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left))
+ return MP_STANDARD_ERROR;
+
+ full_length = length + left;
+ if (full_length > maxlength)
+ return MP_BAD_MESSAGE_LENGTH;
+
+ *newbuffer = PyMem_Malloc(full_length);
+ if (*newbuffer == NULL)
+ return MP_MEMORY_ERROR;
+
+ memcpy(*newbuffer, buffer, length);
+
+ if (ReadFile(conn->handle, *newbuffer+length, left, &length, NULL)) {
+ assert(length == left);
+ return full_length;
+ } else {
+ PyMem_Free(*newbuffer);
+ return MP_STANDARD_ERROR;
+ }
+}
+
+/*
+ * Check whether any data is available for reading
+ */
+
+#define conn_poll(conn, timeout) conn_poll_save(conn, timeout, _save)
+
+static int
+conn_poll_save(ConnectionObject *conn, double timeout, PyThreadState *_save)
+{
+ DWORD bytes, deadline, delay;
+ int difference, res;
+ BOOL block = FALSE;
+
+ if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
+ return MP_STANDARD_ERROR;
+
+ if (timeout == 0.0)
+ return bytes > 0;
+
+ if (timeout < 0.0)
+ block = TRUE;
+ else
+ /* XXX does not check for overflow */
+ deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5);
+
+ Sleep(0);
+
+ for (delay = 1 ; ; delay += 1) {
+ if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
+ return MP_STANDARD_ERROR;
+ else if (bytes > 0)
+ return TRUE;
+
+ if (!block) {
+ difference = deadline - GetTickCount();
+ if (difference < 0)
+ return FALSE;
+ if ((int)delay > difference)
+ delay = difference;
+ }
+
+ if (delay > 20)
+ delay = 20;
+
+ Sleep(delay);
+
+ /* check for signals */
+ Py_BLOCK_THREADS
+ res = PyErr_CheckSignals();
+ Py_UNBLOCK_THREADS
+
+ if (res)
+ return MP_EXCEPTION_HAS_BEEN_SET;
+ }
+}
+
+/*
+ * "connection.h" defines the PipeConnection type using the definitions above
+ */
+
+#define CONNECTION_NAME "PipeConnection"
+#define CONNECTION_TYPE PipeConnectionType
+
+#include "connection.h"
diff --git a/Modules/_multiprocessing/semaphore.c b/Modules/_multiprocessing/semaphore.c new file mode 100644 index 0000000..a7ffd1e --- /dev/null +++ b/Modules/_multiprocessing/semaphore.c @@ -0,0 +1,625 @@ +/* + * A type which wraps a semaphore + * + * semaphore.c + * + * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt + */ + +#include "multiprocessing.h" + +enum { RECURSIVE_MUTEX, SEMAPHORE }; + +typedef struct { + PyObject_HEAD + SEM_HANDLE handle; + long last_tid; + int count; + int maxvalue; + int kind; +} SemLockObject; + +#define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid) + + +#ifdef MS_WINDOWS + +/* + * Windows definitions + */ + +#define SEM_FAILED NULL + +#define SEM_CLEAR_ERROR() SetLastError(0) +#define SEM_GET_LAST_ERROR() GetLastError() +#define SEM_CREATE(name, val, max) CreateSemaphore(NULL, val, max, NULL) +#define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1) +#define SEM_GETVALUE(sem, pval) _GetSemaphoreValue(sem, pval) +#define SEM_UNLINK(name) 0 + +static int +_GetSemaphoreValue(HANDLE handle, long *value) +{ + long previous; + + switch (WaitForSingleObject(handle, 0)) { + case WAIT_OBJECT_0: + if (!ReleaseSemaphore(handle, 1, &previous)) + return MP_STANDARD_ERROR; + *value = previous + 1; + return 0; + case WAIT_TIMEOUT: + *value = 0; + return 0; + default: + return MP_STANDARD_ERROR; + } +} + +static PyObject * +semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds) +{ + int blocking = 1; + double timeout; + PyObject *timeout_obj = Py_None; + DWORD res, full_msecs, msecs, start, ticks; + + static char *kwlist[] = {"block", "timeout", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist, + &blocking, &timeout_obj)) + return NULL; + + /* calculate timeout */ + if (!blocking) { + full_msecs = 0; + } else if (timeout_obj == Py_None) { + full_msecs = INFINITE; + } else { + timeout = PyFloat_AsDouble(timeout_obj); + if (PyErr_Occurred()) + return NULL; + timeout *= 1000.0; /* convert to millisecs */ + if (timeout < 0.0) { + timeout = 0.0; + } else if (timeout >= 0.5 * INFINITE) { /* 25 days */ + PyErr_SetString(PyExc_OverflowError, + "timeout is too large"); + return NULL; + } + full_msecs = (DWORD)(timeout + 0.5); + } + + /* check whether we already own the lock */ + if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) { + ++self->count; + Py_RETURN_TRUE; + } + + /* check whether we can acquire without blocking */ + if (WaitForSingleObject(self->handle, 0) == WAIT_OBJECT_0) { + self->last_tid = GetCurrentThreadId(); + ++self->count; + Py_RETURN_TRUE; + } + + msecs = full_msecs; + start = GetTickCount(); + + for ( ; ; ) { + HANDLE handles[2] = {self->handle, sigint_event}; + + /* do the wait */ + Py_BEGIN_ALLOW_THREADS + ResetEvent(sigint_event); + res = WaitForMultipleObjects(2, handles, FALSE, msecs); + Py_END_ALLOW_THREADS + + /* handle result */ + if (res != WAIT_OBJECT_0 + 1) + break; + + /* got SIGINT so give signal handler a chance to run */ + Sleep(1); + + /* if this is main thread let KeyboardInterrupt be raised */ + if (PyErr_CheckSignals()) + return NULL; + + /* recalculate timeout */ + if (msecs != INFINITE) { + ticks = GetTickCount(); + if ((DWORD)(ticks - start) >= full_msecs) + Py_RETURN_FALSE; + msecs = full_msecs - (ticks - start); + } + } + + /* handle result */ + switch (res) { + case WAIT_TIMEOUT: + Py_RETURN_FALSE; + case WAIT_OBJECT_0: + self->last_tid = GetCurrentThreadId(); + ++self->count; + Py_RETURN_TRUE; + case WAIT_FAILED: + return PyErr_SetFromWindowsErr(0); + default: + PyErr_Format(PyExc_RuntimeError, "WaitForSingleObject() or " + "WaitForMultipleObjects() gave unrecognized " + "value %d", res); + return NULL; + } +} + +static PyObject * +semlock_release(SemLockObject *self, PyObject *args) +{ + if (self->kind == RECURSIVE_MUTEX) { + if (!ISMINE(self)) { + PyErr_SetString(PyExc_AssertionError, "attempt to " + "release recursive lock not owned " + "by thread"); + return NULL; + } + if (self->count > 1) { + --self->count; + Py_RETURN_NONE; + } + assert(self->count == 1); + } + + if (!ReleaseSemaphore(self->handle, 1, NULL)) { + if (GetLastError() == ERROR_TOO_MANY_POSTS) { + PyErr_SetString(PyExc_ValueError, "semaphore or lock " + "released too many times"); + return NULL; + } else { + return PyErr_SetFromWindowsErr(0); + } + } + + --self->count; + Py_RETURN_NONE; +} + +#else /* !MS_WINDOWS */ + +/* + * Unix definitions + */ + +#define SEM_CLEAR_ERROR() +#define SEM_GET_LAST_ERROR() 0 +#define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val) +#define SEM_CLOSE(sem) sem_close(sem) +#define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval) +#define SEM_UNLINK(name) sem_unlink(name) + +#if HAVE_BROKEN_SEM_UNLINK +# define sem_unlink(name) 0 +#endif + +#if !HAVE_SEM_TIMEDWAIT +# define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save) + +int +sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save) +{ + int res; + unsigned long delay, difference; + struct timeval now, tvdeadline, tvdelay; + + errno = 0; + tvdeadline.tv_sec = deadline->tv_sec; + tvdeadline.tv_usec = deadline->tv_nsec / 1000; + + for (delay = 0 ; ; delay += 1000) { + /* poll */ + if (sem_trywait(sem) == 0) + return 0; + else if (errno != EAGAIN) + return MP_STANDARD_ERROR; + + /* get current time */ + if (gettimeofday(&now, NULL) < 0) + return MP_STANDARD_ERROR; + + /* check for timeout */ + if (tvdeadline.tv_sec < now.tv_sec || + (tvdeadline.tv_sec == now.tv_sec && + tvdeadline.tv_usec <= now.tv_usec)) { + errno = ETIMEDOUT; + return MP_STANDARD_ERROR; + } + + /* calculate how much time is left */ + difference = (tvdeadline.tv_sec - now.tv_sec) * 1000000 + + (tvdeadline.tv_usec - now.tv_usec); + + /* check delay not too long -- maximum is 20 msecs */ + if (delay > 20000) + delay = 20000; + if (delay > difference) + delay = difference; + + /* sleep */ + tvdelay.tv_sec = delay / 1000000; + tvdelay.tv_usec = delay % 1000000; + if (select(0, NULL, NULL, NULL, &tvdelay) < 0) + return MP_STANDARD_ERROR; + + /* check for signals */ + Py_BLOCK_THREADS + res = PyErr_CheckSignals(); + Py_UNBLOCK_THREADS + + if (res) { + errno = EINTR; + return MP_EXCEPTION_HAS_BEEN_SET; + } + } +} + +#endif /* !HAVE_SEM_TIMEDWAIT */ + +static PyObject * +semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds) +{ + int blocking = 1, res; + double timeout; + PyObject *timeout_obj = Py_None; + struct timespec deadline = {0}; + struct timeval now; + long sec, nsec; + + static char *kwlist[] = {"block", "timeout", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist, + &blocking, &timeout_obj)) + return NULL; + + if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) { + ++self->count; + Py_RETURN_TRUE; + } + + if (timeout_obj != Py_None) { + timeout = PyFloat_AsDouble(timeout_obj); + if (PyErr_Occurred()) + return NULL; + if (timeout < 0.0) + timeout = 0.0; + + if (gettimeofday(&now, NULL) < 0) { + PyErr_SetFromErrno(PyExc_OSError); + return NULL; + } + sec = (long) timeout; + nsec = (long) (1e9 * (timeout - sec) + 0.5); + deadline.tv_sec = now.tv_sec + sec; + deadline.tv_nsec = now.tv_usec * 1000 + nsec; + deadline.tv_sec += (deadline.tv_nsec / 1000000000); + deadline.tv_nsec %= 1000000000; + } + + do { + Py_BEGIN_ALLOW_THREADS + if (blocking && timeout_obj == Py_None) + res = sem_wait(self->handle); + else if (!blocking) + res = sem_trywait(self->handle); + else + res = sem_timedwait(self->handle, &deadline); + Py_END_ALLOW_THREADS + if (res == MP_EXCEPTION_HAS_BEEN_SET) + break; + } while (res < 0 && errno == EINTR && !PyErr_CheckSignals()); + + if (res < 0) { + if (errno == EAGAIN || errno == ETIMEDOUT) + Py_RETURN_FALSE; + else if (errno == EINTR) + return NULL; + else + return PyErr_SetFromErrno(PyExc_OSError); + } + + ++self->count; + self->last_tid = PyThread_get_thread_ident(); + + Py_RETURN_TRUE; +} + +static PyObject * +semlock_release(SemLockObject *self, PyObject *args) +{ + if (self->kind == RECURSIVE_MUTEX) { + if (!ISMINE(self)) { + PyErr_SetString(PyExc_AssertionError, "attempt to " + "release recursive lock not owned " + "by thread"); + return NULL; + } + if (self->count > 1) { + --self->count; + Py_RETURN_NONE; + } + assert(self->count == 1); + } else { +#if HAVE_BROKEN_SEM_GETVALUE + /* We will only check properly the maxvalue == 1 case */ + if (self->maxvalue == 1) { + /* make sure that already locked */ + if (sem_trywait(self->handle) < 0) { + if (errno != EAGAIN) { + PyErr_SetFromErrno(PyExc_OSError); + return NULL; + } + /* it is already locked as expected */ + } else { + /* it was not locked so undo wait and raise */ + if (sem_post(self->handle) < 0) { + PyErr_SetFromErrno(PyExc_OSError); + return NULL; + } + PyErr_SetString(PyExc_ValueError, "semaphore " + "or lock released too many " + "times"); + return NULL; + } + } +#else + int sval; + + /* This check is not an absolute guarantee that the semaphore + does not rise above maxvalue. */ + if (sem_getvalue(self->handle, &sval) < 0) { + return PyErr_SetFromErrno(PyExc_OSError); + } else if (sval >= self->maxvalue) { + PyErr_SetString(PyExc_ValueError, "semaphore or lock " + "released too many times"); + return NULL; + } +#endif + } + + if (sem_post(self->handle) < 0) + return PyErr_SetFromErrno(PyExc_OSError); + + --self->count; + Py_RETURN_NONE; +} + +#endif /* !MS_WINDOWS */ + +/* + * All platforms + */ + +static PyObject * +newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue) +{ + SemLockObject *self; + + self = PyObject_New(SemLockObject, type); + if (!self) + return NULL; + self->handle = handle; + self->kind = kind; + self->count = 0; + self->last_tid = 0; + self->maxvalue = maxvalue; + return (PyObject*)self; +} + +static PyObject * +semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds) +{ + char buffer[256]; + SEM_HANDLE handle = SEM_FAILED; + int kind, maxvalue, value; + PyObject *result; + static char *kwlist[] = {"kind", "value", "maxvalue", NULL}; + static int counter = 0; + + if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii", kwlist, + &kind, &value, &maxvalue)) + return NULL; + + if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) { + PyErr_SetString(PyExc_ValueError, "unrecognized kind"); + return NULL; + } + + PyOS_snprintf(buffer, sizeof(buffer), "/mp%d-%d", getpid(), counter++); + + SEM_CLEAR_ERROR(); + handle = SEM_CREATE(buffer, value, maxvalue); + /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */ + if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0) + goto failure; + + if (SEM_UNLINK(buffer) < 0) + goto failure; + + result = newsemlockobject(type, handle, kind, maxvalue); + if (!result) + goto failure; + + return result; + + failure: + if (handle != SEM_FAILED) + SEM_CLOSE(handle); + mp_SetError(NULL, MP_STANDARD_ERROR); + return NULL; +} + +static PyObject * +semlock_rebuild(PyTypeObject *type, PyObject *args) +{ + SEM_HANDLE handle; + int kind, maxvalue; + + if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii", + &handle, &kind, &maxvalue)) + return NULL; + + return newsemlockobject(type, handle, kind, maxvalue); +} + +static void +semlock_dealloc(SemLockObject* self) +{ + if (self->handle != SEM_FAILED) + SEM_CLOSE(self->handle); + PyObject_Del(self); +} + +static PyObject * +semlock_count(SemLockObject *self) +{ + return PyInt_FromLong((long)self->count); +} + +static PyObject * +semlock_ismine(SemLockObject *self) +{ + /* only makes sense for a lock */ + return PyBool_FromLong(ISMINE(self)); +} + +static PyObject * +semlock_getvalue(SemLockObject *self) +{ +#if HAVE_BROKEN_SEM_GETVALUE + PyErr_SetNone(PyExc_NotImplementedError); + return NULL; +#else + int sval; + if (SEM_GETVALUE(self->handle, &sval) < 0) + return mp_SetError(NULL, MP_STANDARD_ERROR); + /* some posix implementations use negative numbers to indicate + the number of waiting threads */ + if (sval < 0) + sval = 0; + return PyInt_FromLong((long)sval); +#endif +} + +static PyObject * +semlock_iszero(SemLockObject *self) +{ + int sval; +#if HAVE_BROKEN_SEM_GETVALUE + if (sem_trywait(self->handle) < 0) { + if (errno == EAGAIN) + Py_RETURN_TRUE; + return mp_SetError(NULL, MP_STANDARD_ERROR); + } else { + if (sem_post(self->handle) < 0) + return mp_SetError(NULL, MP_STANDARD_ERROR); + Py_RETURN_FALSE; + } +#else + if (SEM_GETVALUE(self->handle, &sval) < 0) + return mp_SetError(NULL, MP_STANDARD_ERROR); + return PyBool_FromLong((long)sval == 0); +#endif +} + +static PyObject * +semlock_afterfork(SemLockObject *self) +{ + self->count = 0; + Py_RETURN_NONE; +} + +/* + * Semaphore methods + */ + +static PyMethodDef semlock_methods[] = { + {"acquire", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS, + "acquire the semaphore/lock"}, + {"release", (PyCFunction)semlock_release, METH_NOARGS, + "release the semaphore/lock"}, + {"__enter__", (PyCFunction)semlock_acquire, METH_VARARGS, + "enter the semaphore/lock"}, + {"__exit__", (PyCFunction)semlock_release, METH_VARARGS, + "exit the semaphore/lock"}, + {"_count", (PyCFunction)semlock_count, METH_NOARGS, + "num of `acquire()`s minus num of `release()`s for this process"}, + {"_is_mine", (PyCFunction)semlock_ismine, METH_NOARGS, + "whether the lock is owned by this thread"}, + {"_get_value", (PyCFunction)semlock_getvalue, METH_NOARGS, + "get the value of the semaphore"}, + {"_is_zero", (PyCFunction)semlock_iszero, METH_NOARGS, + "returns whether semaphore has value zero"}, + {"_rebuild", (PyCFunction)semlock_rebuild, METH_VARARGS | METH_CLASS, + ""}, + {"_after_fork", (PyCFunction)semlock_afterfork, METH_NOARGS, + "rezero the net acquisition count after fork()"}, + {NULL} +}; + +/* + * Member table + */ + +static PyMemberDef semlock_members[] = { + {"handle", T_SEM_HANDLE, offsetof(SemLockObject, handle), READONLY, + ""}, + {"kind", T_INT, offsetof(SemLockObject, kind), READONLY, + ""}, + {"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY, + ""}, + {NULL} +}; + +/* + * Semaphore type + */ + +PyTypeObject SemLockType = { + PyVarObject_HEAD_INIT(NULL, 0) + /* tp_name */ "_multiprocessing.SemLock", + /* tp_basicsize */ sizeof(SemLockObject), + /* tp_itemsize */ 0, + /* tp_dealloc */ (destructor)semlock_dealloc, + /* tp_print */ 0, + /* tp_getattr */ 0, + /* tp_setattr */ 0, + /* tp_compare */ 0, + /* tp_repr */ 0, + /* tp_as_number */ 0, + /* tp_as_sequence */ 0, + /* tp_as_mapping */ 0, + /* tp_hash */ 0, + /* tp_call */ 0, + /* tp_str */ 0, + /* tp_getattro */ 0, + /* tp_setattro */ 0, + /* tp_as_buffer */ 0, + /* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, + /* tp_doc */ "Semaphore/Mutex type", + /* tp_traverse */ 0, + /* tp_clear */ 0, + /* tp_richcompare */ 0, + /* tp_weaklistoffset */ 0, + /* tp_iter */ 0, + /* tp_iternext */ 0, + /* tp_methods */ semlock_methods, + /* tp_members */ semlock_members, + /* tp_getset */ 0, + /* tp_base */ 0, + /* tp_dict */ 0, + /* tp_descr_get */ 0, + /* tp_descr_set */ 0, + /* tp_dictoffset */ 0, + /* tp_init */ 0, + /* tp_alloc */ 0, + /* tp_new */ semlock_new, +}; diff --git a/Modules/_multiprocessing/socket_connection.c b/Modules/_multiprocessing/socket_connection.c new file mode 100644 index 0000000..a6ff9dd --- /dev/null +++ b/Modules/_multiprocessing/socket_connection.c @@ -0,0 +1,180 @@ +/* + * A type which wraps a socket + * + * socket_connection.c + * + * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt + */ + +#include "multiprocessing.h" + +#ifdef MS_WINDOWS +# define WRITE(h, buffer, length) send((SOCKET)h, buffer, length, 0) +# define READ(h, buffer, length) recv((SOCKET)h, buffer, length, 0) +# define CLOSE(h) closesocket((SOCKET)h) +#else +# define WRITE(h, buffer, length) write(h, buffer, length) +# define READ(h, buffer, length) read(h, buffer, length) +# define CLOSE(h) close(h) +#endif + +/* + * Send string to file descriptor + */ + +static Py_ssize_t +_conn_sendall(HANDLE h, char *string, size_t length) +{ + char *p = string; + Py_ssize_t res; + + while (length > 0) { + res = WRITE(h, p, length); + if (res < 0) + return MP_SOCKET_ERROR; + length -= res; + p += res; + } + + return MP_SUCCESS; +} + +/* + * Receive string of exact length from file descriptor + */ + +static Py_ssize_t +_conn_recvall(HANDLE h, char *buffer, size_t length) +{ + size_t remaining = length; + Py_ssize_t temp; + char *p = buffer; + + while (remaining > 0) { + temp = READ(h, p, remaining); + if (temp <= 0) { + if (temp == 0) + return remaining == length ? + MP_END_OF_FILE : MP_EARLY_END_OF_FILE; + else + return temp; + } + remaining -= temp; + p += temp; + } + + return MP_SUCCESS; +} + +/* + * Send a string prepended by the string length in network byte order + */ + +static Py_ssize_t +conn_send_string(ConnectionObject *conn, char *string, size_t length) +{ + /* The "header" of the message is a 32 bit unsigned number (in + network order) which specifies the length of the "body". If + the message is shorter than about 16kb then it is quicker to + combine the "header" and the "body" of the message and send + them at once. */ + if (length < (16*1024)) { + char *message; + int res; + + message = PyMem_Malloc(length+4); + if (message == NULL) + return MP_MEMORY_ERROR; + + *(UINT32*)message = htonl((UINT32)length); + memcpy(message+4, string, length); + res = _conn_sendall(conn->handle, message, length+4); + PyMem_Free(message); + return res; + } else { + UINT32 lenbuff; + + if (length > MAX_MESSAGE_LENGTH) + return MP_BAD_MESSAGE_LENGTH; + + lenbuff = htonl((UINT32)length); + return _conn_sendall(conn->handle, (char*)&lenbuff, 4) || + _conn_sendall(conn->handle, string, length); + } +} + +/* + * Attempts to read into buffer, or failing that into *newbuffer + * + * Returns number of bytes read. + */ + +static Py_ssize_t +conn_recv_string(ConnectionObject *conn, char *buffer, + size_t buflength, char **newbuffer, size_t maxlength) +{ + int res; + UINT32 ulength; + + *newbuffer = NULL; + + res = _conn_recvall(conn->handle, (char*)&ulength, 4); + if (res < 0) + return res; + + ulength = ntohl(ulength); + if (ulength > maxlength) + return MP_BAD_MESSAGE_LENGTH; + + if (ulength <= buflength) { + res = _conn_recvall(conn->handle, buffer, (size_t)ulength); + return res < 0 ? res : ulength; + } else { + *newbuffer = PyMem_Malloc((size_t)ulength); + if (*newbuffer == NULL) + return MP_MEMORY_ERROR; + res = _conn_recvall(conn->handle, *newbuffer, (size_t)ulength); + return res < 0 ? (Py_ssize_t)res : (Py_ssize_t)ulength; + } +} + +/* + * Check whether any data is available for reading -- neg timeout blocks + */ + +static int +conn_poll(ConnectionObject *conn, double timeout) +{ + int res; + fd_set rfds; + + FD_ZERO(&rfds); + FD_SET((SOCKET)conn->handle, &rfds); + + if (timeout < 0.0) { + res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL); + } else { + struct timeval tv; + tv.tv_sec = (long)timeout; + tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5); + res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv); + } + + if (res < 0) { + return MP_SOCKET_ERROR; + } else if (FD_ISSET(conn->handle, &rfds)) { + return TRUE; + } else { + assert(res == 0); + return FALSE; + } +} + +/* + * "connection.h" defines the Connection type using defs above + */ + +#define CONNECTION_NAME "Connection" +#define CONNECTION_TYPE ConnectionType + +#include "connection.h" diff --git a/Modules/_multiprocessing/win32_functions.c b/Modules/_multiprocessing/win32_functions.c new file mode 100644 index 0000000..2bb1134 --- /dev/null +++ b/Modules/_multiprocessing/win32_functions.c @@ -0,0 +1,260 @@ +/*
+ * Win32 functions used by multiprocessing package
+ *
+ * win32_functions.c
+ *
+ * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+ */
+
+#include "multiprocessing.h"
+
+
+#define WIN32_FUNCTION(func) \
+ {#func, (PyCFunction)win32_ ## func, METH_VARARGS | METH_STATIC, ""}
+
+#define WIN32_CONSTANT(fmt, con) \
+ PyDict_SetItemString(Win32Type.tp_dict, #con, Py_BuildValue(fmt, con))
+
+
+static PyObject *
+win32_CloseHandle(PyObject *self, PyObject *args)
+{
+ HANDLE hObject;
+ BOOL success;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE, &hObject))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ success = CloseHandle(hObject);
+ Py_END_ALLOW_THREADS
+
+ if (!success)
+ return PyErr_SetFromWindowsErr(0);
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+win32_ConnectNamedPipe(PyObject *self, PyObject *args)
+{
+ HANDLE hNamedPipe;
+ LPOVERLAPPED lpOverlapped;
+ BOOL success;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_POINTER,
+ &hNamedPipe, &lpOverlapped))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ success = ConnectNamedPipe(hNamedPipe, lpOverlapped);
+ Py_END_ALLOW_THREADS
+
+ if (!success)
+ return PyErr_SetFromWindowsErr(0);
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+win32_CreateFile(PyObject *self, PyObject *args)
+{
+ LPCTSTR lpFileName;
+ DWORD dwDesiredAccess;
+ DWORD dwShareMode;
+ LPSECURITY_ATTRIBUTES lpSecurityAttributes;
+ DWORD dwCreationDisposition;
+ DWORD dwFlagsAndAttributes;
+ HANDLE hTemplateFile;
+ HANDLE handle;
+
+ if (!PyArg_ParseTuple(args, "s" F_DWORD F_DWORD F_POINTER
+ F_DWORD F_DWORD F_HANDLE,
+ &lpFileName, &dwDesiredAccess, &dwShareMode,
+ &lpSecurityAttributes, &dwCreationDisposition,
+ &dwFlagsAndAttributes, &hTemplateFile))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ handle = CreateFile(lpFileName, dwDesiredAccess,
+ dwShareMode, lpSecurityAttributes,
+ dwCreationDisposition,
+ dwFlagsAndAttributes, hTemplateFile);
+ Py_END_ALLOW_THREADS
+
+ if (handle == INVALID_HANDLE_VALUE)
+ return PyErr_SetFromWindowsErr(0);
+
+ return Py_BuildValue(F_HANDLE, handle);
+}
+
+static PyObject *
+win32_CreateNamedPipe(PyObject *self, PyObject *args)
+{
+ LPCTSTR lpName;
+ DWORD dwOpenMode;
+ DWORD dwPipeMode;
+ DWORD nMaxInstances;
+ DWORD nOutBufferSize;
+ DWORD nInBufferSize;
+ DWORD nDefaultTimeOut;
+ LPSECURITY_ATTRIBUTES lpSecurityAttributes;
+ HANDLE handle;
+
+ if (!PyArg_ParseTuple(args, "s" F_DWORD F_DWORD F_DWORD
+ F_DWORD F_DWORD F_DWORD F_POINTER,
+ &lpName, &dwOpenMode, &dwPipeMode,
+ &nMaxInstances, &nOutBufferSize,
+ &nInBufferSize, &nDefaultTimeOut,
+ &lpSecurityAttributes))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ handle = CreateNamedPipe(lpName, dwOpenMode, dwPipeMode,
+ nMaxInstances, nOutBufferSize,
+ nInBufferSize, nDefaultTimeOut,
+ lpSecurityAttributes);
+ Py_END_ALLOW_THREADS
+
+ if (handle == INVALID_HANDLE_VALUE)
+ return PyErr_SetFromWindowsErr(0);
+
+ return Py_BuildValue(F_HANDLE, handle);
+}
+
+static PyObject *
+win32_ExitProcess(PyObject *self, PyObject *args)
+{
+ UINT uExitCode;
+
+ if (!PyArg_ParseTuple(args, "I", &uExitCode))
+ return NULL;
+
+ ExitProcess(uExitCode);
+
+ return NULL;
+}
+
+static PyObject *
+win32_GetLastError(PyObject *self, PyObject *args)
+{
+ return Py_BuildValue(F_DWORD, GetLastError());
+}
+
+static PyObject *
+win32_OpenProcess(PyObject *self, PyObject *args)
+{
+ DWORD dwDesiredAccess;
+ BOOL bInheritHandle;
+ DWORD dwProcessId;
+ HANDLE handle;
+
+ if (!PyArg_ParseTuple(args, F_DWORD "i" F_DWORD,
+ &dwDesiredAccess, &bInheritHandle, &dwProcessId))
+ return NULL;
+
+ handle = OpenProcess(dwDesiredAccess, bInheritHandle, dwProcessId);
+ if (handle == NULL)
+ return PyErr_SetFromWindowsErr(0);
+
+ return Py_BuildValue(F_HANDLE, handle);
+}
+
+static PyObject *
+win32_SetNamedPipeHandleState(PyObject *self, PyObject *args)
+{
+ HANDLE hNamedPipe;
+ PyObject *oArgs[3];
+ DWORD dwArgs[3], *pArgs[3] = {NULL, NULL, NULL};
+ int i;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "OOO",
+ &hNamedPipe, &oArgs[0], &oArgs[1], &oArgs[2]))
+ return NULL;
+
+ PyErr_Clear();
+
+ for (i = 0 ; i < 3 ; i++) {
+ if (oArgs[i] != Py_None) {
+ dwArgs[i] = PyInt_AsUnsignedLongMask(oArgs[i]);
+ if (PyErr_Occurred())
+ return NULL;
+ pArgs[i] = &dwArgs[i];
+ }
+ }
+
+ if (!SetNamedPipeHandleState(hNamedPipe, pArgs[0], pArgs[1], pArgs[2]))
+ return PyErr_SetFromWindowsErr(0);
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+win32_WaitNamedPipe(PyObject *self, PyObject *args)
+{
+ LPCTSTR lpNamedPipeName;
+ DWORD nTimeOut;
+ BOOL success;
+
+ if (!PyArg_ParseTuple(args, "s" F_DWORD, &lpNamedPipeName, &nTimeOut))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ success = WaitNamedPipe(lpNamedPipeName, nTimeOut);
+ Py_END_ALLOW_THREADS
+
+ if (!success)
+ return PyErr_SetFromWindowsErr(0);
+
+ Py_RETURN_NONE;
+}
+
+static PyMethodDef win32_methods[] = {
+ WIN32_FUNCTION(CloseHandle),
+ WIN32_FUNCTION(GetLastError),
+ WIN32_FUNCTION(OpenProcess),
+ WIN32_FUNCTION(ExitProcess),
+ WIN32_FUNCTION(ConnectNamedPipe),
+ WIN32_FUNCTION(CreateFile),
+ WIN32_FUNCTION(CreateNamedPipe),
+ WIN32_FUNCTION(SetNamedPipeHandleState),
+ WIN32_FUNCTION(WaitNamedPipe),
+ {NULL}
+};
+
+
+PyTypeObject Win32Type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+};
+
+
+PyObject *
+create_win32_namespace(void)
+{
+ Win32Type.tp_name = "_multiprocessing.win32";
+ Win32Type.tp_methods = win32_methods;
+ if (PyType_Ready(&Win32Type) < 0)
+ return NULL;
+ Py_INCREF(&Win32Type);
+
+ WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS);
+ WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
+ WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED);
+ WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
+ WIN32_CONSTANT(F_DWORD, GENERIC_READ);
+ WIN32_CONSTANT(F_DWORD, GENERIC_WRITE);
+ WIN32_CONSTANT(F_DWORD, INFINITE);
+ WIN32_CONSTANT(F_DWORD, NMPWAIT_WAIT_FOREVER);
+ WIN32_CONSTANT(F_DWORD, OPEN_EXISTING);
+ WIN32_CONSTANT(F_DWORD, PIPE_ACCESS_DUPLEX);
+ WIN32_CONSTANT(F_DWORD, PIPE_ACCESS_INBOUND);
+ WIN32_CONSTANT(F_DWORD, PIPE_READMODE_MESSAGE);
+ WIN32_CONSTANT(F_DWORD, PIPE_TYPE_MESSAGE);
+ WIN32_CONSTANT(F_DWORD, PIPE_UNLIMITED_INSTANCES);
+ WIN32_CONSTANT(F_DWORD, PIPE_WAIT);
+ WIN32_CONSTANT(F_DWORD, PROCESS_ALL_ACCESS);
+
+ WIN32_CONSTANT("i", NULL);
+
+ return (PyObject*)&Win32Type;
+}
|