summaryrefslogtreecommitdiffstats
path: root/Modules/_multiprocessing
diff options
context:
space:
mode:
Diffstat (limited to 'Modules/_multiprocessing')
-rw-r--r--Modules/_multiprocessing/clinic/posixshmem.c.h133
-rw-r--r--Modules/_multiprocessing/connection.h519
-rw-r--r--Modules/_multiprocessing/multiprocessing.c311
-rw-r--r--Modules/_multiprocessing/multiprocessing.h108
-rw-r--r--Modules/_multiprocessing/pipe_connection.c149
-rw-r--r--Modules/_multiprocessing/posixshmem.c131
-rw-r--r--Modules/_multiprocessing/semaphore.c227
-rw-r--r--Modules/_multiprocessing/socket_connection.c277
-rw-r--r--Modules/_multiprocessing/win32_functions.c267
9 files changed, 1625 insertions, 497 deletions
diff --git a/Modules/_multiprocessing/clinic/posixshmem.c.h b/Modules/_multiprocessing/clinic/posixshmem.c.h
deleted file mode 100644
index a99f0d2..0000000
--- a/Modules/_multiprocessing/clinic/posixshmem.c.h
+++ /dev/null
@@ -1,133 +0,0 @@
-/*[clinic input]
-preserve
-[clinic start generated code]*/
-
-#if defined(HAVE_SHM_OPEN)
-
-PyDoc_STRVAR(_posixshmem_shm_open__doc__,
-"shm_open($module, /, path, flags, mode=511)\n"
-"--\n"
-"\n"
-"Open a shared memory object. Returns a file descriptor (integer).");
-
-#define _POSIXSHMEM_SHM_OPEN_METHODDEF \
- {"shm_open", (PyCFunction)(void(*)(void))_posixshmem_shm_open, METH_FASTCALL|METH_KEYWORDS, _posixshmem_shm_open__doc__},
-
-static int
-_posixshmem_shm_open_impl(PyObject *module, PyObject *path, int flags,
- int mode);
-
-static PyObject *
-_posixshmem_shm_open(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
-{
- PyObject *return_value = NULL;
- static const char * const _keywords[] = {"path", "flags", "mode", NULL};
- static _PyArg_Parser _parser = {NULL, _keywords, "shm_open", 0};
- PyObject *argsbuf[3];
- Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 2;
- PyObject *path;
- int flags;
- int mode = 511;
- int _return_value;
-
- args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 2, 3, 0, argsbuf);
- if (!args) {
- goto exit;
- }
- if (!PyUnicode_Check(args[0])) {
- _PyArg_BadArgument("shm_open", "argument 'path'", "str", args[0]);
- goto exit;
- }
- if (PyUnicode_READY(args[0]) == -1) {
- goto exit;
- }
- path = args[0];
- if (PyFloat_Check(args[1])) {
- PyErr_SetString(PyExc_TypeError,
- "integer argument expected, got float" );
- goto exit;
- }
- flags = _PyLong_AsInt(args[1]);
- if (flags == -1 && PyErr_Occurred()) {
- goto exit;
- }
- if (!noptargs) {
- goto skip_optional_pos;
- }
- if (PyFloat_Check(args[2])) {
- PyErr_SetString(PyExc_TypeError,
- "integer argument expected, got float" );
- goto exit;
- }
- mode = _PyLong_AsInt(args[2]);
- if (mode == -1 && PyErr_Occurred()) {
- goto exit;
- }
-skip_optional_pos:
- _return_value = _posixshmem_shm_open_impl(module, path, flags, mode);
- if ((_return_value == -1) && PyErr_Occurred()) {
- goto exit;
- }
- return_value = PyLong_FromLong((long)_return_value);
-
-exit:
- return return_value;
-}
-
-#endif /* defined(HAVE_SHM_OPEN) */
-
-#if defined(HAVE_SHM_UNLINK)
-
-PyDoc_STRVAR(_posixshmem_shm_unlink__doc__,
-"shm_unlink($module, /, path)\n"
-"--\n"
-"\n"
-"Remove a shared memory object (similar to unlink()).\n"
-"\n"
-"Remove a shared memory object name, and, once all processes have unmapped\n"
-"the object, de-allocates and destroys the contents of the associated memory\n"
-"region.");
-
-#define _POSIXSHMEM_SHM_UNLINK_METHODDEF \
- {"shm_unlink", (PyCFunction)(void(*)(void))_posixshmem_shm_unlink, METH_FASTCALL|METH_KEYWORDS, _posixshmem_shm_unlink__doc__},
-
-static PyObject *
-_posixshmem_shm_unlink_impl(PyObject *module, PyObject *path);
-
-static PyObject *
-_posixshmem_shm_unlink(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
-{
- PyObject *return_value = NULL;
- static const char * const _keywords[] = {"path", NULL};
- static _PyArg_Parser _parser = {NULL, _keywords, "shm_unlink", 0};
- PyObject *argsbuf[1];
- PyObject *path;
-
- args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf);
- if (!args) {
- goto exit;
- }
- if (!PyUnicode_Check(args[0])) {
- _PyArg_BadArgument("shm_unlink", "argument 'path'", "str", args[0]);
- goto exit;
- }
- if (PyUnicode_READY(args[0]) == -1) {
- goto exit;
- }
- path = args[0];
- return_value = _posixshmem_shm_unlink_impl(module, path);
-
-exit:
- return return_value;
-}
-
-#endif /* defined(HAVE_SHM_UNLINK) */
-
-#ifndef _POSIXSHMEM_SHM_OPEN_METHODDEF
- #define _POSIXSHMEM_SHM_OPEN_METHODDEF
-#endif /* !defined(_POSIXSHMEM_SHM_OPEN_METHODDEF) */
-
-#ifndef _POSIXSHMEM_SHM_UNLINK_METHODDEF
- #define _POSIXSHMEM_SHM_UNLINK_METHODDEF
-#endif /* !defined(_POSIXSHMEM_SHM_UNLINK_METHODDEF) */
-/*[clinic end generated code: output=9132861c61d8c2d8 input=a9049054013a1b77]*/
diff --git a/Modules/_multiprocessing/connection.h b/Modules/_multiprocessing/connection.h
new file mode 100644
index 0000000..12e6eee
--- /dev/null
+++ b/Modules/_multiprocessing/connection.h
@@ -0,0 +1,519 @@
+/*
+ * Definition of a `Connection` type.
+ * Used by `socket_connection.c` and `pipe_connection.c`.
+ *
+ * connection.h
+ *
+ * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+ */
+
+#ifndef CONNECTION_H
+#define CONNECTION_H
+
+/*
+ * Read/write flags
+ */
+
+#define READABLE 1
+#define WRITABLE 2
+
+#define CHECK_READABLE(self) \
+ if (!(self->flags & READABLE)) { \
+ PyErr_SetString(PyExc_IOError, "connection is write-only"); \
+ return NULL; \
+ }
+
+#define CHECK_WRITABLE(self) \
+ if (!(self->flags & WRITABLE)) { \
+ PyErr_SetString(PyExc_IOError, "connection is read-only"); \
+ return NULL; \
+ }
+
+/*
+ * Allocation and deallocation
+ */
+
+static PyObject *
+connection_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
+{
+ ConnectionObject *self;
+ HANDLE handle;
+ BOOL readable = TRUE, writable = TRUE;
+
+ static char *kwlist[] = {"handle", "readable", "writable", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, F_HANDLE "|ii", kwlist,
+ &handle, &readable, &writable))
+ return NULL;
+
+ if (handle == INVALID_HANDLE_VALUE || (Py_ssize_t)handle < 0) {
+ PyErr_Format(PyExc_IOError, "invalid handle %zd",
+ (Py_ssize_t)handle);
+ return NULL;
+ }
+
+ if (!readable && !writable) {
+ PyErr_SetString(PyExc_ValueError,
+ "either readable or writable must be true");
+ return NULL;
+ }
+
+ self = PyObject_New(ConnectionObject, type);
+ if (self == NULL)
+ return NULL;
+
+ self->weakreflist = NULL;
+ self->handle = handle;
+ self->flags = 0;
+
+ if (readable)
+ self->flags |= READABLE;
+ if (writable)
+ self->flags |= WRITABLE;
+ assert(self->flags >= 1 && self->flags <= 3);
+
+ return (PyObject*)self;
+}
+
+static void
+connection_dealloc(ConnectionObject* self)
+{
+ if (self->weakreflist != NULL)
+ PyObject_ClearWeakRefs((PyObject*)self);
+
+ if (self->handle != INVALID_HANDLE_VALUE) {
+ Py_BEGIN_ALLOW_THREADS
+ CLOSE(self->handle);
+ Py_END_ALLOW_THREADS
+ }
+ PyObject_Del(self);
+}
+
+/*
+ * Functions for transferring buffers
+ */
+
+static PyObject *
+connection_sendbytes(ConnectionObject *self, PyObject *args)
+{
+ char *buffer;
+ Py_ssize_t length, offset=0, size=PY_SSIZE_T_MIN;
+ int res;
+
+ if (!PyArg_ParseTuple(args, F_RBUFFER "#|" F_PY_SSIZE_T F_PY_SSIZE_T,
+ &buffer, &length, &offset, &size))
+ return NULL;
+
+ CHECK_WRITABLE(self);
+
+ if (offset < 0) {
+ PyErr_SetString(PyExc_ValueError, "offset is negative");
+ return NULL;
+ }
+ if (length < offset) {
+ PyErr_SetString(PyExc_ValueError, "buffer length < offset");
+ return NULL;
+ }
+
+ if (size == PY_SSIZE_T_MIN) {
+ size = length - offset;
+ } else {
+ if (size < 0) {
+ PyErr_SetString(PyExc_ValueError, "size is negative");
+ return NULL;
+ }
+ if (offset + size > length) {
+ PyErr_SetString(PyExc_ValueError,
+ "buffer length < offset + size");
+ return NULL;
+ }
+ }
+
+ res = conn_send_string(self, buffer + offset, size);
+
+ if (res < 0) {
+ if (PyErr_Occurred())
+ return NULL;
+ else
+ return mp_SetError(PyExc_IOError, res);
+ }
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+connection_recvbytes(ConnectionObject *self, PyObject *args)
+{
+ char *freeme = NULL;
+ Py_ssize_t res, maxlength = PY_SSIZE_T_MAX;
+ PyObject *result = NULL;
+
+ if (!PyArg_ParseTuple(args, "|" F_PY_SSIZE_T, &maxlength))
+ return NULL;
+
+ CHECK_READABLE(self);
+
+ if (maxlength < 0) {
+ PyErr_SetString(PyExc_ValueError, "maxlength < 0");
+ return NULL;
+ }
+
+ res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE,
+ &freeme, maxlength);
+
+ if (res < 0) {
+ if (res == MP_BAD_MESSAGE_LENGTH) {
+ if ((self->flags & WRITABLE) == 0) {
+ Py_BEGIN_ALLOW_THREADS
+ CLOSE(self->handle);
+ Py_END_ALLOW_THREADS
+ self->handle = INVALID_HANDLE_VALUE;
+ } else {
+ self->flags = WRITABLE;
+ }
+ }
+ mp_SetError(PyExc_IOError, res);
+ } else {
+ if (freeme == NULL) {
+ result = PyString_FromStringAndSize(self->buffer, res);
+ } else {
+ result = PyString_FromStringAndSize(freeme, res);
+ PyMem_Free(freeme);
+ }
+ }
+
+ return result;
+}
+
+static PyObject *
+connection_recvbytes_into(ConnectionObject *self, PyObject *args)
+{
+ char *freeme = NULL, *buffer = NULL;
+ Py_ssize_t res, length, offset = 0;
+ PyObject *result = NULL;
+ Py_buffer pbuf;
+
+ CHECK_READABLE(self);
+
+ if (!PyArg_ParseTuple(args, "w*|" F_PY_SSIZE_T,
+ &pbuf, &offset))
+ return NULL;
+
+ buffer = pbuf.buf;
+ length = pbuf.len;
+
+ if (offset < 0) {
+ PyErr_SetString(PyExc_ValueError, "negative offset");
+ goto _error;
+ }
+
+ if (offset > length) {
+ PyErr_SetString(PyExc_ValueError, "offset too large");
+ goto _error;
+ }
+
+ res = conn_recv_string(self, buffer+offset, length-offset,
+ &freeme, PY_SSIZE_T_MAX);
+
+ if (res < 0) {
+ if (res == MP_BAD_MESSAGE_LENGTH) {
+ if ((self->flags & WRITABLE) == 0) {
+ Py_BEGIN_ALLOW_THREADS
+ CLOSE(self->handle);
+ Py_END_ALLOW_THREADS
+ self->handle = INVALID_HANDLE_VALUE;
+ } else {
+ self->flags = WRITABLE;
+ }
+ }
+ mp_SetError(PyExc_IOError, res);
+ } else {
+ if (freeme == NULL) {
+ result = PyInt_FromSsize_t(res);
+ } else {
+ result = PyObject_CallFunction(BufferTooShort,
+ F_RBUFFER "#",
+ freeme, res);
+ PyMem_Free(freeme);
+ if (result) {
+ PyErr_SetObject(BufferTooShort, result);
+ Py_DECREF(result);
+ }
+ goto _error;
+ }
+ }
+
+_cleanup:
+ PyBuffer_Release(&pbuf);
+ return result;
+
+_error:
+ result = NULL;
+ goto _cleanup;
+}
+
+/*
+ * Functions for transferring objects
+ */
+
+static PyObject *
+connection_send_obj(ConnectionObject *self, PyObject *obj)
+{
+ char *buffer;
+ int res;
+ Py_ssize_t length;
+ PyObject *pickled_string = NULL;
+
+ CHECK_WRITABLE(self);
+
+ pickled_string = PyObject_CallFunctionObjArgs(pickle_dumps, obj,
+ pickle_protocol, NULL);
+ if (!pickled_string)
+ goto failure;
+
+ if (PyString_AsStringAndSize(pickled_string, &buffer, &length) < 0)
+ goto failure;
+
+ res = conn_send_string(self, buffer, (int)length);
+
+ if (res < 0) {
+ mp_SetError(PyExc_IOError, res);
+ goto failure;
+ }
+
+ Py_XDECREF(pickled_string);
+ Py_RETURN_NONE;
+
+ failure:
+ Py_XDECREF(pickled_string);
+ return NULL;
+}
+
+static PyObject *
+connection_recv_obj(ConnectionObject *self)
+{
+ char *freeme = NULL;
+ Py_ssize_t res;
+ PyObject *temp = NULL, *result = NULL;
+
+ CHECK_READABLE(self);
+
+ res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE,
+ &freeme, PY_SSIZE_T_MAX);
+
+ if (res < 0) {
+ if (res == MP_BAD_MESSAGE_LENGTH) {
+ if ((self->flags & WRITABLE) == 0) {
+ Py_BEGIN_ALLOW_THREADS
+ CLOSE(self->handle);
+ Py_END_ALLOW_THREADS
+ self->handle = INVALID_HANDLE_VALUE;
+ } else {
+ self->flags = WRITABLE;
+ }
+ }
+ mp_SetError(PyExc_IOError, res);
+ } else {
+ if (freeme == NULL) {
+ temp = PyString_FromStringAndSize(self->buffer, res);
+ } else {
+ temp = PyString_FromStringAndSize(freeme, res);
+ PyMem_Free(freeme);
+ }
+ }
+
+ if (temp)
+ result = PyObject_CallFunctionObjArgs(pickle_loads,
+ temp, NULL);
+ Py_XDECREF(temp);
+ return result;
+}
+
+/*
+ * Other functions
+ */
+
+static PyObject *
+connection_poll(ConnectionObject *self, PyObject *args)
+{
+ PyObject *timeout_obj = NULL;
+ double timeout = 0.0;
+ int res;
+
+ CHECK_READABLE(self);
+
+ if (!PyArg_ParseTuple(args, "|O", &timeout_obj))
+ return NULL;
+
+ if (timeout_obj == NULL) {
+ timeout = 0.0;
+ } else if (timeout_obj == Py_None) {
+ timeout = -1.0; /* block forever */
+ } else {
+ timeout = PyFloat_AsDouble(timeout_obj);
+ if (PyErr_Occurred())
+ return NULL;
+ if (timeout < 0.0)
+ timeout = 0.0;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ res = conn_poll(self, timeout, _save);
+ Py_END_ALLOW_THREADS
+
+ switch (res) {
+ case TRUE:
+ Py_RETURN_TRUE;
+ case FALSE:
+ Py_RETURN_FALSE;
+ default:
+ return mp_SetError(PyExc_IOError, res);
+ }
+}
+
+static PyObject *
+connection_fileno(ConnectionObject* self)
+{
+ if (self->handle == INVALID_HANDLE_VALUE) {
+ PyErr_SetString(PyExc_IOError, "handle is invalid");
+ return NULL;
+ }
+ return PyInt_FromLong((long)self->handle);
+}
+
+static PyObject *
+connection_close(ConnectionObject *self)
+{
+ if (self->handle != INVALID_HANDLE_VALUE) {
+ Py_BEGIN_ALLOW_THREADS
+ CLOSE(self->handle);
+ Py_END_ALLOW_THREADS
+ self->handle = INVALID_HANDLE_VALUE;
+ }
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+connection_repr(ConnectionObject *self)
+{
+ static char *conn_type[] = {"read-only", "write-only", "read-write"};
+
+ assert(self->flags >= 1 && self->flags <= 3);
+ return FROM_FORMAT("<%s %s, handle %zd>",
+ conn_type[self->flags - 1],
+ CONNECTION_NAME, (Py_ssize_t)self->handle);
+}
+
+/*
+ * Getters and setters
+ */
+
+static PyObject *
+connection_closed(ConnectionObject *self, void *closure)
+{
+ return PyBool_FromLong((long)(self->handle == INVALID_HANDLE_VALUE));
+}
+
+static PyObject *
+connection_readable(ConnectionObject *self, void *closure)
+{
+ return PyBool_FromLong((long)(self->flags & READABLE));
+}
+
+static PyObject *
+connection_writable(ConnectionObject *self, void *closure)
+{
+ return PyBool_FromLong((long)(self->flags & WRITABLE));
+}
+
+/*
+ * Tables
+ */
+
+static PyMethodDef connection_methods[] = {
+ {"send_bytes", (PyCFunction)connection_sendbytes, METH_VARARGS,
+ "send the byte data from a readable buffer-like object"},
+ {"recv_bytes", (PyCFunction)connection_recvbytes, METH_VARARGS,
+ "receive byte data as a string"},
+ {"recv_bytes_into",(PyCFunction)connection_recvbytes_into,METH_VARARGS,
+ "receive byte data into a writeable buffer-like object\n"
+ "returns the number of bytes read"},
+
+ {"send", (PyCFunction)connection_send_obj, METH_O,
+ "send a (picklable) object"},
+ {"recv", (PyCFunction)connection_recv_obj, METH_NOARGS,
+ "receive a (picklable) object"},
+
+ {"poll", (PyCFunction)connection_poll, METH_VARARGS,
+ "whether there is any input available to be read"},
+ {"fileno", (PyCFunction)connection_fileno, METH_NOARGS,
+ "file descriptor or handle of the connection"},
+ {"close", (PyCFunction)connection_close, METH_NOARGS,
+ "close the connection"},
+
+ {NULL} /* Sentinel */
+};
+
+static PyGetSetDef connection_getset[] = {
+ {"closed", (getter)connection_closed, NULL,
+ "True if the connection is closed", NULL},
+ {"readable", (getter)connection_readable, NULL,
+ "True if the connection is readable", NULL},
+ {"writable", (getter)connection_writable, NULL,
+ "True if the connection is writable", NULL},
+ {NULL}
+};
+
+/*
+ * Connection type
+ */
+
+PyDoc_STRVAR(connection_doc,
+ "Connection type whose constructor signature is\n\n"
+ " Connection(handle, readable=True, writable=True).\n\n"
+ "The constructor does *not* duplicate the handle.");
+
+PyTypeObject CONNECTION_TYPE = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ /* tp_name */ "_multiprocessing." CONNECTION_NAME,
+ /* tp_basicsize */ sizeof(ConnectionObject),
+ /* tp_itemsize */ 0,
+ /* tp_dealloc */ (destructor)connection_dealloc,
+ /* tp_print */ 0,
+ /* tp_getattr */ 0,
+ /* tp_setattr */ 0,
+ /* tp_compare */ 0,
+ /* tp_repr */ (reprfunc)connection_repr,
+ /* tp_as_number */ 0,
+ /* tp_as_sequence */ 0,
+ /* tp_as_mapping */ 0,
+ /* tp_hash */ 0,
+ /* tp_call */ 0,
+ /* tp_str */ 0,
+ /* tp_getattro */ 0,
+ /* tp_setattro */ 0,
+ /* tp_as_buffer */ 0,
+ /* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
+ Py_TPFLAGS_HAVE_WEAKREFS,
+ /* tp_doc */ connection_doc,
+ /* tp_traverse */ 0,
+ /* tp_clear */ 0,
+ /* tp_richcompare */ 0,
+ /* tp_weaklistoffset */ offsetof(ConnectionObject, weakreflist),
+ /* tp_iter */ 0,
+ /* tp_iternext */ 0,
+ /* tp_methods */ connection_methods,
+ /* tp_members */ 0,
+ /* tp_getset */ connection_getset,
+ /* tp_base */ 0,
+ /* tp_dict */ 0,
+ /* tp_descr_get */ 0,
+ /* tp_descr_set */ 0,
+ /* tp_dictoffset */ 0,
+ /* tp_init */ 0,
+ /* tp_alloc */ 0,
+ /* tp_new */ connection_new,
+};
+
+#endif /* CONNECTION_H */
diff --git a/Modules/_multiprocessing/multiprocessing.c b/Modules/_multiprocessing/multiprocessing.c
index 806e638..eecace8 100644
--- a/Modules/_multiprocessing/multiprocessing.c
+++ b/Modules/_multiprocessing/multiprocessing.c
@@ -3,30 +3,39 @@
*
* multiprocessing.c
*
- * Copyright (c) 2006-2008, R Oudkerk
- * Licensed to PSF under a Contributor Agreement.
+ * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
*/
#include "multiprocessing.h"
+#if (defined(CMSG_LEN) && defined(SCM_RIGHTS))
+ #define HAVE_FD_TRANSFER 1
+#else
+ #define HAVE_FD_TRANSFER 0
+#endif
+
+PyObject *create_win32_namespace(void);
+
+PyObject *pickle_dumps, *pickle_loads, *pickle_protocol;
+PyObject *ProcessError, *BufferTooShort;
/*
* Function which raises exceptions based on error codes
*/
PyObject *
-_PyMp_SetError(PyObject *Type, int num)
+mp_SetError(PyObject *Type, int num)
{
switch (num) {
#ifdef MS_WINDOWS
case MP_STANDARD_ERROR:
if (Type == NULL)
- Type = PyExc_OSError;
+ Type = PyExc_WindowsError;
PyErr_SetExcFromWindowsErr(Type, 0);
break;
case MP_SOCKET_ERROR:
if (Type == NULL)
- Type = PyExc_OSError;
+ Type = PyExc_WindowsError;
PyErr_SetExcFromWindowsErr(Type, WSAGetLastError());
break;
#else /* !MS_WINDOWS */
@@ -40,6 +49,16 @@ _PyMp_SetError(PyObject *Type, int num)
case MP_MEMORY_ERROR:
PyErr_NoMemory();
break;
+ case MP_END_OF_FILE:
+ PyErr_SetNone(PyExc_EOFError);
+ break;
+ case MP_EARLY_END_OF_FILE:
+ PyErr_SetString(PyExc_IOError,
+ "got end of file during message");
+ break;
+ case MP_BAD_MESSAGE_LENGTH:
+ PyErr_SetString(PyExc_IOError, "bad message length");
+ break;
case MP_EXCEPTION_HAS_BEEN_SET:
break;
default:
@@ -49,87 +68,170 @@ _PyMp_SetError(PyObject *Type, int num)
return NULL;
}
+
+/*
+ * Windows only
+ */
+
#ifdef MS_WINDOWS
-static PyObject *
-multiprocessing_closesocket(PyObject *self, PyObject *args)
+
+/* On Windows we set an event to signal Ctrl-C; compare with timemodule.c */
+
+HANDLE sigint_event = NULL;
+
+static BOOL WINAPI
+ProcessingCtrlHandler(DWORD dwCtrlType)
{
- HANDLE handle;
- int ret;
+ SetEvent(sigint_event);
+ return FALSE;
+}
+
+/*
+ * Unix only
+ */
+
+#else /* !MS_WINDOWS */
+
+#if HAVE_FD_TRANSFER
- if (!PyArg_ParseTuple(args, F_HANDLE ":closesocket" , &handle))
+/* Functions for transferring file descriptors between processes.
+ Reimplements some of the functionality of the fdcred
+ module at http://www.mca-ltd.com/resources/fdcred_1.tgz. */
+/* Based in http://resin.csoft.net/cgi-bin/man.cgi?section=3&topic=CMSG_DATA */
+
+static PyObject *
+multiprocessing_sendfd(PyObject *self, PyObject *args)
+{
+ int conn, fd, res;
+ struct iovec dummy_iov;
+ char dummy_char;
+ struct msghdr msg;
+ struct cmsghdr *cmsg;
+ union {
+ struct cmsghdr hdr;
+ unsigned char buf[CMSG_SPACE(sizeof(int))];
+ } cmsgbuf;
+
+ if (!PyArg_ParseTuple(args, "ii", &conn, &fd))
return NULL;
+ dummy_iov.iov_base = &dummy_char;
+ dummy_iov.iov_len = 1;
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_control = &cmsgbuf.buf;
+ msg.msg_controllen = sizeof(cmsgbuf.buf);
+ msg.msg_iov = &dummy_iov;
+ msg.msg_iovlen = 1;
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ * (int *) CMSG_DATA(cmsg) = fd;
+
Py_BEGIN_ALLOW_THREADS
- ret = closesocket((SOCKET) handle);
+ res = sendmsg(conn, &msg, 0);
Py_END_ALLOW_THREADS
- if (ret)
- return PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError());
+ if (res < 0)
+ return PyErr_SetFromErrno(PyExc_OSError);
Py_RETURN_NONE;
}
static PyObject *
-multiprocessing_recv(PyObject *self, PyObject *args)
+multiprocessing_recvfd(PyObject *self, PyObject *args)
{
- HANDLE handle;
- int size, nread;
- PyObject *buf;
-
- if (!PyArg_ParseTuple(args, F_HANDLE "i:recv" , &handle, &size))
+ int conn, fd, res;
+ char dummy_char;
+ struct iovec dummy_iov;
+ struct msghdr msg = {0};
+ struct cmsghdr *cmsg;
+ union {
+ struct cmsghdr hdr;
+ unsigned char buf[CMSG_SPACE(sizeof(int))];
+ } cmsgbuf;
+
+ if (!PyArg_ParseTuple(args, "i", &conn))
return NULL;
- buf = PyBytes_FromStringAndSize(NULL, size);
- if (!buf)
- return NULL;
+ dummy_iov.iov_base = &dummy_char;
+ dummy_iov.iov_len = 1;
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_control = &cmsgbuf.buf;
+ msg.msg_controllen = sizeof(cmsgbuf.buf);
+ msg.msg_iov = &dummy_iov;
+ msg.msg_iovlen = 1;
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_SPACE(sizeof(int));
+ msg.msg_controllen = cmsg->cmsg_len;
Py_BEGIN_ALLOW_THREADS
- nread = recv((SOCKET) handle, PyBytes_AS_STRING(buf), size, 0);
+ res = recvmsg(conn, &msg, 0);
Py_END_ALLOW_THREADS
- if (nread < 0) {
- Py_DECREF(buf);
- return PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError());
+ if (res < 0)
+ return PyErr_SetFromErrno(PyExc_OSError);
+
+ if (msg.msg_controllen < CMSG_LEN(sizeof(int)) ||
+ (cmsg = CMSG_FIRSTHDR(&msg)) == NULL ||
+ cmsg->cmsg_level != SOL_SOCKET ||
+ cmsg->cmsg_type != SCM_RIGHTS ||
+ cmsg->cmsg_len < CMSG_LEN(sizeof(int))) {
+ /* If at least one control message is present, there should be
+ no room for any further data in the buffer. */
+ PyErr_SetString(PyExc_RuntimeError, "No file descriptor received");
+ return NULL;
}
- _PyBytes_Resize(&buf, nread);
- return buf;
+
+ fd = * (int *) CMSG_DATA(cmsg);
+ return Py_BuildValue("i", fd);
}
-static PyObject *
-multiprocessing_send(PyObject *self, PyObject *args)
-{
- HANDLE handle;
- Py_buffer buf;
- int ret, length;
+#endif /* HAVE_FD_TRANSFER */
- if (!PyArg_ParseTuple(args, F_HANDLE "y*:send" , &handle, &buf))
- return NULL;
+#endif /* !MS_WINDOWS */
- length = (int)Py_MIN(buf.len, INT_MAX);
- Py_BEGIN_ALLOW_THREADS
- ret = send((SOCKET) handle, buf.buf, length, 0);
- Py_END_ALLOW_THREADS
+/*
+ * All platforms
+ */
+
+static PyObject*
+multiprocessing_address_of_buffer(PyObject *self, PyObject *obj)
+{
+ void *buffer;
+ Py_ssize_t buffer_len;
- PyBuffer_Release(&buf);
- if (ret < 0)
- return PyErr_SetExcFromWindowsErr(PyExc_OSError, WSAGetLastError());
- return PyLong_FromLong(ret);
+ if (PyObject_AsWriteBuffer(obj, &buffer, &buffer_len) < 0)
+ return NULL;
+
+ return Py_BuildValue("N" F_PY_SSIZE_T,
+ PyLong_FromVoidPtr(buffer), buffer_len);
}
-#endif
/*
* Function table
*/
static PyMethodDef module_methods[] = {
-#ifdef MS_WINDOWS
- {"closesocket", multiprocessing_closesocket, METH_VARARGS, ""},
- {"recv", multiprocessing_recv, METH_VARARGS, ""},
- {"send", multiprocessing_send, METH_VARARGS, ""},
-#endif
-#if !defined(POSIX_SEMAPHORES_NOT_ENABLED) && !defined(__ANDROID__)
- {"sem_unlink", _PyMp_sem_unlink, METH_VARARGS, ""},
+ {"address_of_buffer", multiprocessing_address_of_buffer, METH_O,
+ "address_of_buffer(obj) -> int\n"
+ "Return address of obj assuming obj supports buffer inteface"},
+#if HAVE_FD_TRANSFER
+ {"sendfd", multiprocessing_sendfd, METH_VARARGS,
+ "sendfd(sockfd, fd) -> None\n"
+ "Send file descriptor given by fd over the unix domain socket\n"
+ "whose file decriptor is sockfd"},
+ {"recvfd", multiprocessing_recvfd, METH_VARARGS,
+ "recvfd(sockfd) -> fd\n"
+ "Receive a file descriptor over a unix domain socket\n"
+ "whose file decriptor is sockfd"},
#endif
{NULL}
};
@@ -139,64 +241,95 @@ static PyMethodDef module_methods[] = {
* Initialize
*/
-static struct PyModuleDef multiprocessing_module = {
- PyModuleDef_HEAD_INIT,
- "_multiprocessing",
- NULL,
- -1,
- module_methods,
- NULL,
- NULL,
- NULL,
- NULL
-};
-
-
PyMODINIT_FUNC
-PyInit__multiprocessing(void)
+init_multiprocessing(void)
{
- PyObject *module, *temp, *value = NULL;
+ PyObject *module, *temp, *value;
/* Initialize module */
- module = PyModule_Create(&multiprocessing_module);
+ module = Py_InitModule("_multiprocessing", module_methods);
if (!module)
- return NULL;
+ return;
+
+ /* Get copy of objects from pickle */
+ temp = PyImport_ImportModule(PICKLE_MODULE);
+ if (!temp)
+ return;
+ pickle_dumps = PyObject_GetAttrString(temp, "dumps");
+ pickle_loads = PyObject_GetAttrString(temp, "loads");
+ pickle_protocol = PyObject_GetAttrString(temp, "HIGHEST_PROTOCOL");
+ Py_XDECREF(temp);
+
+ /* Get copy of BufferTooShort */
+ temp = PyImport_ImportModule("multiprocessing");
+ if (!temp)
+ return;
+ BufferTooShort = PyObject_GetAttrString(temp, "BufferTooShort");
+ Py_XDECREF(temp);
+
+ /* Add connection type to module */
+ if (PyType_Ready(&ConnectionType) < 0)
+ return;
+ Py_INCREF(&ConnectionType);
+ PyModule_AddObject(module, "Connection", (PyObject*)&ConnectionType);
#if defined(MS_WINDOWS) || \
(defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED))
- /* Add _PyMp_SemLock type to module */
- if (PyType_Ready(&_PyMp_SemLockType) < 0)
- return NULL;
- Py_INCREF(&_PyMp_SemLockType);
+ /* Add SemLock type to module */
+ if (PyType_Ready(&SemLockType) < 0)
+ return;
+ Py_INCREF(&SemLockType);
{
PyObject *py_sem_value_max;
/* Some systems define SEM_VALUE_MAX as an unsigned value that
- * causes it to be negative when used as an int (NetBSD).
- *
- * Issue #28152: Use (0) instead of 0 to fix a warning on dead code
- * when using clang -Wunreachable-code. */
- if ((int)(SEM_VALUE_MAX) < (0))
+ * causes it to be negative when used as an int (NetBSD). */
+ if ((int)(SEM_VALUE_MAX) < 0)
py_sem_value_max = PyLong_FromLong(INT_MAX);
else
py_sem_value_max = PyLong_FromLong(SEM_VALUE_MAX);
if (py_sem_value_max == NULL)
- return NULL;
- PyDict_SetItemString(_PyMp_SemLockType.tp_dict, "SEM_VALUE_MAX",
+ return;
+ PyDict_SetItemString(SemLockType.tp_dict, "SEM_VALUE_MAX",
py_sem_value_max);
}
- PyModule_AddObject(module, "SemLock", (PyObject*)&_PyMp_SemLockType);
+ PyModule_AddObject(module, "SemLock", (PyObject*)&SemLockType);
+#endif
+
+#ifdef MS_WINDOWS
+ /* Add PipeConnection to module */
+ if (PyType_Ready(&PipeConnectionType) < 0)
+ return;
+ Py_INCREF(&PipeConnectionType);
+ PyModule_AddObject(module, "PipeConnection",
+ (PyObject*)&PipeConnectionType);
+
+ /* Initialize win32 class and add to multiprocessing */
+ temp = create_win32_namespace();
+ if (!temp)
+ return;
+ PyModule_AddObject(module, "win32", temp);
+
+ /* Initialize the event handle used to signal Ctrl-C */
+ sigint_event = CreateEvent(NULL, TRUE, FALSE, NULL);
+ if (!sigint_event) {
+ PyErr_SetFromWindowsErr(0);
+ return;
+ }
+ if (!SetConsoleCtrlHandler(ProcessingCtrlHandler, TRUE)) {
+ PyErr_SetFromWindowsErr(0);
+ return;
+ }
#endif
/* Add configuration macros */
temp = PyDict_New();
if (!temp)
- return NULL;
-
+ return;
#define ADD_FLAG(name) \
value = Py_BuildValue("i", name); \
- if (value == NULL) { Py_DECREF(temp); return NULL; } \
+ if (value == NULL) { Py_DECREF(temp); return; } \
if (PyDict_SetItemString(temp, #name, value) < 0) { \
- Py_DECREF(temp); Py_DECREF(value); return NULL; } \
+ Py_DECREF(temp); Py_DECREF(value); return; } \
Py_DECREF(value)
#if defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED)
@@ -205,15 +338,15 @@ PyInit__multiprocessing(void)
#ifdef HAVE_SEM_TIMEDWAIT
ADD_FLAG(HAVE_SEM_TIMEDWAIT);
#endif
+#ifdef HAVE_FD_TRANSFER
+ ADD_FLAG(HAVE_FD_TRANSFER);
+#endif
#ifdef HAVE_BROKEN_SEM_GETVALUE
ADD_FLAG(HAVE_BROKEN_SEM_GETVALUE);
#endif
#ifdef HAVE_BROKEN_SEM_UNLINK
ADD_FLAG(HAVE_BROKEN_SEM_UNLINK);
#endif
-
if (PyModule_AddObject(module, "flags", temp) < 0)
- return NULL;
-
- return module;
+ return;
}
diff --git a/Modules/_multiprocessing/multiprocessing.h b/Modules/_multiprocessing/multiprocessing.h
index fe78135..14425de 100644
--- a/Modules/_multiprocessing/multiprocessing.h
+++ b/Modules/_multiprocessing/multiprocessing.h
@@ -3,6 +3,12 @@
#define PY_SSIZE_T_CLEAN
+#ifdef __sun
+/* The control message API is only available on Solaris
+ if XPG 4.2 or later is requested. */
+#define _XOPEN_SOURCE 500
+#endif
+
#include "Python.h"
#include "structmember.h"
#include "pythread.h"
@@ -23,10 +29,22 @@
# define SEM_VALUE_MAX LONG_MAX
#else
# include <fcntl.h> /* O_CREAT and O_EXCL */
+# include <netinet/in.h>
+# include <sys/socket.h>
+# include <sys/uio.h>
+# include <arpa/inet.h> /* htonl() and ntohl() */
# if defined(HAVE_SEM_OPEN) && !defined(POSIX_SEMAPHORES_NOT_ENABLED)
# include <semaphore.h>
typedef sem_t *SEM_HANDLE;
# endif
+# define HANDLE int
+# define SOCKET int
+# define BOOL int
+# define UINT32 uint32_t
+# define INT32 int32_t
+# define TRUE 1
+# define FALSE 0
+# define INVALID_HANDLE_VALUE (-1)
#endif
/*
@@ -46,13 +64,27 @@
/*
+ * Make sure Py_ssize_t available
+ */
+
+#if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN)
+ typedef int Py_ssize_t;
+# define PY_SSIZE_T_MAX INT_MAX
+# define PY_SSIZE_T_MIN INT_MIN
+# define F_PY_SSIZE_T "i"
+# define PyInt_FromSsize_t(n) PyInt_FromLong((long)n)
+#else
+# define F_PY_SSIZE_T "n"
+#endif
+
+/*
* Format codes
*/
#if SIZEOF_VOID_P == SIZEOF_LONG
# define F_POINTER "k"
# define T_POINTER T_ULONG
-#elif SIZEOF_VOID_P == SIZEOF_LONG_LONG
+#elif defined(HAVE_LONG_LONG) && (SIZEOF_VOID_P == SIZEOF_LONG_LONG)
# define F_POINTER "K"
# define T_POINTER T_ULONGLONG
#else
@@ -64,6 +96,8 @@
# define T_HANDLE T_POINTER
# define F_SEM_HANDLE F_HANDLE
# define T_SEM_HANDLE T_HANDLE
+# define F_DWORD "k"
+# define T_DWORD T_ULONG
#else
# define F_HANDLE "i"
# define T_HANDLE T_INT
@@ -71,6 +105,12 @@
# define T_SEM_HANDLE T_POINTER
#endif
+#if PY_VERSION_HEX >= 0x03000000
+# define F_RBUFFER "y"
+#else
+# define F_RBUFFER "s"
+#endif
+
/*
* Error codes which can be returned by functions called without GIL
*/
@@ -78,16 +118,72 @@
#define MP_SUCCESS (0)
#define MP_STANDARD_ERROR (-1)
#define MP_MEMORY_ERROR (-1001)
-#define MP_SOCKET_ERROR (-1002)
-#define MP_EXCEPTION_HAS_BEEN_SET (-1003)
+#define MP_END_OF_FILE (-1002)
+#define MP_EARLY_END_OF_FILE (-1003)
+#define MP_BAD_MESSAGE_LENGTH (-1004)
+#define MP_SOCKET_ERROR (-1005)
+#define MP_EXCEPTION_HAS_BEEN_SET (-1006)
-PyObject *_PyMp_SetError(PyObject *Type, int num);
+PyObject *mp_SetError(PyObject *Type, int num);
/*
* Externs - not all will really exist on all platforms
*/
-extern PyTypeObject _PyMp_SemLockType;
-extern PyObject *_PyMp_sem_unlink(PyObject *ignore, PyObject *args);
+extern PyObject *pickle_dumps;
+extern PyObject *pickle_loads;
+extern PyObject *pickle_protocol;
+extern PyObject *BufferTooShort;
+extern PyTypeObject SemLockType;
+extern PyTypeObject ConnectionType;
+extern PyTypeObject PipeConnectionType;
+extern HANDLE sigint_event;
+
+/*
+ * Py3k compatibility
+ */
+
+#if PY_VERSION_HEX >= 0x03000000
+# define PICKLE_MODULE "pickle"
+# define FROM_FORMAT PyUnicode_FromFormat
+# define PyInt_FromLong PyLong_FromLong
+# define PyInt_FromSsize_t PyLong_FromSsize_t
+#else
+# define PICKLE_MODULE "cPickle"
+# define FROM_FORMAT PyString_FromFormat
+#endif
+
+#ifndef PyVarObject_HEAD_INIT
+# define PyVarObject_HEAD_INIT(type, size) PyObject_HEAD_INIT(type) size,
+#endif
+
+#ifndef Py_TPFLAGS_HAVE_WEAKREFS
+# define Py_TPFLAGS_HAVE_WEAKREFS 0
+#endif
+
+/*
+ * Connection definition
+ */
+
+#define CONNECTION_BUFFER_SIZE 1024
+
+typedef struct {
+ PyObject_HEAD
+ HANDLE handle;
+ int flags;
+ PyObject *weakreflist;
+ char buffer[CONNECTION_BUFFER_SIZE];
+} ConnectionObject;
+
+/*
+ * Miscellaneous
+ */
+
+#define MAX_MESSAGE_LENGTH 0x7fffffff
+
+#ifndef MIN
+# define MIN(x, y) ((x) < (y) ? x : y)
+# define MAX(x, y) ((x) > (y) ? x : y)
+#endif
#endif /* MULTIPROCESSING_H */
diff --git a/Modules/_multiprocessing/pipe_connection.c b/Modules/_multiprocessing/pipe_connection.c
new file mode 100644
index 0000000..05dde0c
--- /dev/null
+++ b/Modules/_multiprocessing/pipe_connection.c
@@ -0,0 +1,149 @@
+/*
+ * A type which wraps a pipe handle in message oriented mode
+ *
+ * pipe_connection.c
+ *
+ * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+ */
+
+#include "multiprocessing.h"
+
+#define CLOSE(h) CloseHandle(h)
+
+/*
+ * Send string to the pipe; assumes in message oriented mode
+ */
+
+static Py_ssize_t
+conn_send_string(ConnectionObject *conn, char *string, size_t length)
+{
+ DWORD amount_written;
+ BOOL ret;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = WriteFile(conn->handle, string, length, &amount_written, NULL);
+ Py_END_ALLOW_THREADS
+
+ if (ret == 0 && GetLastError() == ERROR_NO_SYSTEM_RESOURCES) {
+ PyErr_Format(PyExc_ValueError, "Cannnot send %" PY_FORMAT_SIZE_T "d bytes over connection", length);
+ return MP_STANDARD_ERROR;
+ }
+
+ return ret ? MP_SUCCESS : MP_STANDARD_ERROR;
+}
+
+/*
+ * Attempts to read into buffer, or if buffer too small into *newbuffer.
+ *
+ * Returns number of bytes read. Assumes in message oriented mode.
+ */
+
+static Py_ssize_t
+conn_recv_string(ConnectionObject *conn, char *buffer,
+ size_t buflength, char **newbuffer, size_t maxlength)
+{
+ DWORD left, length, full_length, err;
+ BOOL ret;
+ *newbuffer = NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = ReadFile(conn->handle, buffer, MIN(buflength, maxlength),
+ &length, NULL);
+ Py_END_ALLOW_THREADS
+ if (ret)
+ return length;
+
+ err = GetLastError();
+ if (err != ERROR_MORE_DATA) {
+ if (err == ERROR_BROKEN_PIPE)
+ return MP_END_OF_FILE;
+ return MP_STANDARD_ERROR;
+ }
+
+ if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left))
+ return MP_STANDARD_ERROR;
+
+ full_length = length + left;
+ if (full_length > maxlength)
+ return MP_BAD_MESSAGE_LENGTH;
+
+ *newbuffer = PyMem_Malloc(full_length);
+ if (*newbuffer == NULL)
+ return MP_MEMORY_ERROR;
+
+ memcpy(*newbuffer, buffer, length);
+
+ Py_BEGIN_ALLOW_THREADS
+ ret = ReadFile(conn->handle, *newbuffer+length, left, &length, NULL);
+ Py_END_ALLOW_THREADS
+ if (ret) {
+ assert(length == left);
+ return full_length;
+ } else {
+ PyMem_Free(*newbuffer);
+ return MP_STANDARD_ERROR;
+ }
+}
+
+/*
+ * Check whether any data is available for reading
+ */
+
+static int
+conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
+{
+ DWORD bytes, deadline, delay;
+ int difference, res;
+ BOOL block = FALSE;
+
+ if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
+ return MP_STANDARD_ERROR;
+
+ if (timeout == 0.0)
+ return bytes > 0;
+
+ if (timeout < 0.0)
+ block = TRUE;
+ else
+ /* XXX does not check for overflow */
+ deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5);
+
+ Sleep(0);
+
+ for (delay = 1 ; ; delay += 1) {
+ if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
+ return MP_STANDARD_ERROR;
+ else if (bytes > 0)
+ return TRUE;
+
+ if (!block) {
+ difference = deadline - GetTickCount();
+ if (difference < 0)
+ return FALSE;
+ if ((int)delay > difference)
+ delay = difference;
+ }
+
+ if (delay > 20)
+ delay = 20;
+
+ Sleep(delay);
+
+ /* check for signals */
+ Py_BLOCK_THREADS
+ res = PyErr_CheckSignals();
+ Py_UNBLOCK_THREADS
+
+ if (res)
+ return MP_EXCEPTION_HAS_BEEN_SET;
+ }
+}
+
+/*
+ * "connection.h" defines the PipeConnection type using the definitions above
+ */
+
+#define CONNECTION_NAME "PipeConnection"
+#define CONNECTION_TYPE PipeConnectionType
+
+#include "connection.h"
diff --git a/Modules/_multiprocessing/posixshmem.c b/Modules/_multiprocessing/posixshmem.c
deleted file mode 100644
index 2049dbb..0000000
--- a/Modules/_multiprocessing/posixshmem.c
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
-posixshmem - A Python extension that provides shm_open() and shm_unlink()
-*/
-
-#define PY_SSIZE_T_CLEAN
-
-#include <Python.h>
-#include "structmember.h"
-
-// for shm_open() and shm_unlink()
-#ifdef HAVE_SYS_MMAN_H
-#include <sys/mman.h>
-#endif
-
-/*[clinic input]
-module _posixshmem
-[clinic start generated code]*/
-/*[clinic end generated code: output=da39a3ee5e6b4b0d input=a416734e49164bf8]*/
-
-/*
- *
- * Module-level functions & meta stuff
- *
- */
-
-#ifdef HAVE_SHM_OPEN
-/*[clinic input]
-_posixshmem.shm_open -> int
- path: unicode
- flags: int
- mode: int = 0o777
-
-# "shm_open(path, flags, mode=0o777)\n\n\
-
-Open a shared memory object. Returns a file descriptor (integer).
-
-[clinic start generated code]*/
-
-static int
-_posixshmem_shm_open_impl(PyObject *module, PyObject *path, int flags,
- int mode)
-/*[clinic end generated code: output=8d110171a4fa20df input=e83b58fa802fac25]*/
-{
- int fd;
- int async_err = 0;
- const char *name = PyUnicode_AsUTF8(path);
- if (name == NULL) {
- return -1;
- }
- do {
- Py_BEGIN_ALLOW_THREADS
- fd = shm_open(name, flags, mode);
- Py_END_ALLOW_THREADS
- } while (fd < 0 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
-
- if (fd < 0) {
- if (!async_err)
- PyErr_SetFromErrnoWithFilenameObject(PyExc_OSError, path);
- return -1;
- }
-
- return fd;
-}
-#endif /* HAVE_SHM_OPEN */
-
-#ifdef HAVE_SHM_UNLINK
-/*[clinic input]
-_posixshmem.shm_unlink
- path: unicode
-
-Remove a shared memory object (similar to unlink()).
-
-Remove a shared memory object name, and, once all processes have unmapped
-the object, de-allocates and destroys the contents of the associated memory
-region.
-
-[clinic start generated code]*/
-
-static PyObject *
-_posixshmem_shm_unlink_impl(PyObject *module, PyObject *path)
-/*[clinic end generated code: output=42f8b23d134b9ff5 input=8dc0f87143e3b300]*/
-{
- int rv;
- int async_err = 0;
- const char *name = PyUnicode_AsUTF8(path);
- if (name == NULL) {
- return NULL;
- }
- do {
- Py_BEGIN_ALLOW_THREADS
- rv = shm_unlink(name);
- Py_END_ALLOW_THREADS
- } while (rv < 0 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
-
- if (rv < 0) {
- if (!async_err)
- PyErr_SetFromErrnoWithFilenameObject(PyExc_OSError, path);
- return NULL;
- }
-
- Py_RETURN_NONE;
-}
-#endif /* HAVE_SHM_UNLINK */
-
-#include "clinic/posixshmem.c.h"
-
-static PyMethodDef module_methods[ ] = {
- _POSIXSHMEM_SHM_OPEN_METHODDEF
- _POSIXSHMEM_SHM_UNLINK_METHODDEF
- {NULL} /* Sentinel */
-};
-
-
-static struct PyModuleDef this_module = {
- PyModuleDef_HEAD_INIT, // m_base
- "_posixshmem", // m_name
- "POSIX shared memory module", // m_doc
- -1, // m_size (space allocated for module globals)
- module_methods, // m_methods
-};
-
-/* Module init function */
-PyMODINIT_FUNC
-PyInit__posixshmem(void) {
- PyObject *module;
- module = PyModule_Create(&this_module);
- if (!module) {
- return NULL;
- }
- return module;
-}
diff --git a/Modules/_multiprocessing/semaphore.c b/Modules/_multiprocessing/semaphore.c
index 4be2dea..fc42754 100644
--- a/Modules/_multiprocessing/semaphore.c
+++ b/Modules/_multiprocessing/semaphore.c
@@ -3,8 +3,7 @@
*
* semaphore.c
*
- * Copyright (c) 2006-2008, R Oudkerk
- * Licensed to PSF under a Contributor Agreement.
+ * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
*/
#include "multiprocessing.h"
@@ -14,11 +13,10 @@ enum { RECURSIVE_MUTEX, SEMAPHORE };
typedef struct {
PyObject_HEAD
SEM_HANDLE handle;
- unsigned long last_tid;
+ long last_tid;
int count;
int maxvalue;
int kind;
- char *name;
} SemLockObject;
#define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
@@ -44,7 +42,7 @@ _GetSemaphoreValue(HANDLE handle, long *value)
{
long previous;
- switch (WaitForSingleObjectEx(handle, 0, FALSE)) {
+ switch (WaitForSingleObject(handle, 0)) {
case WAIT_OBJECT_0:
if (!ReleaseSemaphore(handle, 1, &previous))
return MP_STANDARD_ERROR;
@@ -64,8 +62,7 @@ semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
int blocking = 1;
double timeout;
PyObject *timeout_obj = Py_None;
- DWORD res, full_msecs, nhandles;
- HANDLE handles[2], sigint_event;
+ DWORD res, full_msecs, msecs, start, ticks;
static char *kwlist[] = {"block", "timeout", NULL};
@@ -99,49 +96,59 @@ semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
Py_RETURN_TRUE;
}
- /* check whether we can acquire without releasing the GIL and blocking */
- if (WaitForSingleObjectEx(self->handle, 0, FALSE) == WAIT_OBJECT_0) {
+ /* check whether we can acquire without blocking */
+ if (WaitForSingleObject(self->handle, 0) == WAIT_OBJECT_0) {
self->last_tid = GetCurrentThreadId();
++self->count;
Py_RETURN_TRUE;
}
- /* prepare list of handles */
- nhandles = 0;
- handles[nhandles++] = self->handle;
- if (_PyOS_IsMainThread()) {
- sigint_event = _PyOS_SigintEvent();
- assert(sigint_event != NULL);
- handles[nhandles++] = sigint_event;
- }
- else {
- sigint_event = NULL;
- }
+ msecs = full_msecs;
+ start = GetTickCount();
+
+ for ( ; ; ) {
+ HANDLE handles[2] = {self->handle, sigint_event};
- /* do the wait */
- Py_BEGIN_ALLOW_THREADS
- if (sigint_event != NULL)
+ /* do the wait */
+ Py_BEGIN_ALLOW_THREADS
ResetEvent(sigint_event);
- res = WaitForMultipleObjectsEx(nhandles, handles, FALSE, full_msecs, FALSE);
- Py_END_ALLOW_THREADS
+ res = WaitForMultipleObjects(2, handles, FALSE, msecs);
+ Py_END_ALLOW_THREADS
+
+ /* handle result */
+ if (res != WAIT_OBJECT_0 + 1)
+ break;
+
+ /* got SIGINT so give signal handler a chance to run */
+ Sleep(1);
+
+ /* if this is main thread let KeyboardInterrupt be raised */
+ if (PyErr_CheckSignals())
+ return NULL;
+
+ /* recalculate timeout */
+ if (msecs != INFINITE) {
+ ticks = GetTickCount();
+ if ((DWORD)(ticks - start) >= full_msecs)
+ Py_RETURN_FALSE;
+ msecs = full_msecs - (ticks - start);
+ }
+ }
/* handle result */
switch (res) {
case WAIT_TIMEOUT:
Py_RETURN_FALSE;
- case WAIT_OBJECT_0 + 0:
+ case WAIT_OBJECT_0:
self->last_tid = GetCurrentThreadId();
++self->count;
Py_RETURN_TRUE;
- case WAIT_OBJECT_0 + 1:
- errno = EINTR;
- return PyErr_SetFromErrno(PyExc_OSError);
case WAIT_FAILED:
return PyErr_SetFromWindowsErr(0);
default:
PyErr_Format(PyExc_RuntimeError, "WaitForSingleObject() or "
"WaitForMultipleObjects() gave unrecognized "
- "value %u", res);
+ "value %d", res);
return NULL;
}
}
@@ -204,7 +211,7 @@ semlock_release(SemLockObject *self, PyObject *args)
#ifndef HAVE_SEM_TIMEDWAIT
# define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save)
-static int
+int
sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save)
{
int res;
@@ -267,7 +274,7 @@ sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save)
static PyObject *
semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
{
- int blocking = 1, res, err = 0;
+ int blocking = 1, res;
double timeout;
PyObject *timeout_obj = Py_None;
struct timespec deadline = {0};
@@ -304,32 +311,20 @@ semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
deadline.tv_nsec %= 1000000000;
}
- /* Check whether we can acquire without releasing the GIL and blocking */
do {
- res = sem_trywait(self->handle);
- err = errno;
+ Py_BEGIN_ALLOW_THREADS
+ if (blocking && timeout_obj == Py_None)
+ res = sem_wait(self->handle);
+ else if (!blocking)
+ res = sem_trywait(self->handle);
+ else
+ res = sem_timedwait(self->handle, &deadline);
+ Py_END_ALLOW_THREADS
+ if (res == MP_EXCEPTION_HAS_BEEN_SET)
+ break;
} while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
- errno = err;
-
- if (res < 0 && errno == EAGAIN && blocking) {
- /* Couldn't acquire immediately, need to block */
- do {
- Py_BEGIN_ALLOW_THREADS
- if (timeout_obj == Py_None) {
- res = sem_wait(self->handle);
- }
- else {
- res = sem_timedwait(self->handle, &deadline);
- }
- Py_END_ALLOW_THREADS
- err = errno;
- if (res == MP_EXCEPTION_HAS_BEEN_SET)
- break;
- } while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
- }
if (res < 0) {
- errno = err;
if (errno == EAGAIN || errno == ETIMEDOUT)
Py_RETURN_FALSE;
else if (errno == EINTR)
@@ -411,8 +406,7 @@ semlock_release(SemLockObject *self, PyObject *args)
*/
static PyObject *
-newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue,
- char *name)
+newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
{
SemLockObject *self;
@@ -424,22 +418,21 @@ newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue,
self->count = 0;
self->last_tid = 0;
self->maxvalue = maxvalue;
- self->name = name;
return (PyObject*)self;
}
static PyObject *
semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
+ char buffer[256];
SEM_HANDLE handle = SEM_FAILED;
- int kind, maxvalue, value, unlink;
+ int kind, maxvalue, value;
PyObject *result;
- char *name, *name_copy = NULL;
- static char *kwlist[] = {"kind", "value", "maxvalue", "name", "unlink",
- NULL};
+ static char *kwlist[] = {"kind", "value", "maxvalue", NULL};
+ int try = 0;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "iiisi", kwlist,
- &kind, &value, &maxvalue, &name, &unlink))
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii", kwlist,
+ &kind, &value, &maxvalue))
return NULL;
if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) {
@@ -447,24 +440,26 @@ semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
return NULL;
}
- if (!unlink) {
- name_copy = PyMem_Malloc(strlen(name) + 1);
- if (name_copy == NULL) {
- return PyErr_NoMemory();
- }
- strcpy(name_copy, name);
- }
+ /* Create a semaphore with a unique name. The bytes returned by
+ * _PyOS_URandom() are treated as unsigned long to ensure that the filename
+ * is valid (no special characters). */
+ do {
+ unsigned long suffix;
+ _PyOS_URandom((char *)&suffix, sizeof(suffix));
+ PyOS_snprintf(buffer, sizeof(buffer), "/mp%ld-%lu", (long)getpid(),
+ suffix);
+ SEM_CLEAR_ERROR();
+ handle = SEM_CREATE(buffer, value, maxvalue);
+ } while ((handle == SEM_FAILED) && (errno == EEXIST) && (++try < 100));
- SEM_CLEAR_ERROR();
- handle = SEM_CREATE(name, value, maxvalue);
/* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */
if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0)
goto failure;
- if (unlink && SEM_UNLINK(name) < 0)
+ if (SEM_UNLINK(buffer) < 0)
goto failure;
- result = newsemlockobject(type, handle, kind, maxvalue, name_copy);
+ result = newsemlockobject(type, handle, kind, maxvalue);
if (!result)
goto failure;
@@ -473,10 +468,7 @@ semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
failure:
if (handle != SEM_FAILED)
SEM_CLOSE(handle);
- PyMem_Free(name_copy);
- if (!PyErr_Occurred()) {
- _PyMp_SetError(NULL, MP_STANDARD_ERROR);
- }
+ mp_SetError(NULL, MP_STANDARD_ERROR);
return NULL;
}
@@ -485,30 +477,12 @@ semlock_rebuild(PyTypeObject *type, PyObject *args)
{
SEM_HANDLE handle;
int kind, maxvalue;
- char *name, *name_copy = NULL;
- if (!PyArg_ParseTuple(args, F_SEM_HANDLE "iiz",
- &handle, &kind, &maxvalue, &name))
+ if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii",
+ &handle, &kind, &maxvalue))
return NULL;
- if (name != NULL) {
- name_copy = PyMem_Malloc(strlen(name) + 1);
- if (name_copy == NULL)
- return PyErr_NoMemory();
- strcpy(name_copy, name);
- }
-
-#ifndef MS_WINDOWS
- if (name != NULL) {
- handle = sem_open(name, 0);
- if (handle == SEM_FAILED) {
- PyMem_Free(name_copy);
- return PyErr_SetFromErrno(PyExc_OSError);
- }
- }
-#endif
-
- return newsemlockobject(type, handle, kind, maxvalue, name_copy);
+ return newsemlockobject(type, handle, kind, maxvalue);
}
static void
@@ -516,25 +490,24 @@ semlock_dealloc(SemLockObject* self)
{
if (self->handle != SEM_FAILED)
SEM_CLOSE(self->handle);
- PyMem_Free(self->name);
PyObject_Del(self);
}
static PyObject *
-semlock_count(SemLockObject *self, PyObject *Py_UNUSED(ignored))
+semlock_count(SemLockObject *self)
{
- return PyLong_FromLong((long)self->count);
+ return PyInt_FromLong((long)self->count);
}
static PyObject *
-semlock_ismine(SemLockObject *self, PyObject *Py_UNUSED(ignored))
+semlock_ismine(SemLockObject *self)
{
/* only makes sense for a lock */
return PyBool_FromLong(ISMINE(self));
}
static PyObject *
-semlock_getvalue(SemLockObject *self, PyObject *Py_UNUSED(ignored))
+semlock_getvalue(SemLockObject *self)
{
#ifdef HAVE_BROKEN_SEM_GETVALUE
PyErr_SetNone(PyExc_NotImplementedError);
@@ -542,38 +515,38 @@ semlock_getvalue(SemLockObject *self, PyObject *Py_UNUSED(ignored))
#else
int sval;
if (SEM_GETVALUE(self->handle, &sval) < 0)
- return _PyMp_SetError(NULL, MP_STANDARD_ERROR);
+ return mp_SetError(NULL, MP_STANDARD_ERROR);
/* some posix implementations use negative numbers to indicate
the number of waiting threads */
if (sval < 0)
sval = 0;
- return PyLong_FromLong((long)sval);
+ return PyInt_FromLong((long)sval);
#endif
}
static PyObject *
-semlock_iszero(SemLockObject *self, PyObject *Py_UNUSED(ignored))
+semlock_iszero(SemLockObject *self)
{
#ifdef HAVE_BROKEN_SEM_GETVALUE
if (sem_trywait(self->handle) < 0) {
if (errno == EAGAIN)
Py_RETURN_TRUE;
- return _PyMp_SetError(NULL, MP_STANDARD_ERROR);
+ return mp_SetError(NULL, MP_STANDARD_ERROR);
} else {
if (sem_post(self->handle) < 0)
- return _PyMp_SetError(NULL, MP_STANDARD_ERROR);
+ return mp_SetError(NULL, MP_STANDARD_ERROR);
Py_RETURN_FALSE;
}
#else
int sval;
if (SEM_GETVALUE(self->handle, &sval) < 0)
- return _PyMp_SetError(NULL, MP_STANDARD_ERROR);
+ return mp_SetError(NULL, MP_STANDARD_ERROR);
return PyBool_FromLong((long)sval == 0);
#endif
}
static PyObject *
-semlock_afterfork(SemLockObject *self, PyObject *Py_UNUSED(ignored))
+semlock_afterfork(SemLockObject *self)
{
self->count = 0;
Py_RETURN_NONE;
@@ -584,11 +557,11 @@ semlock_afterfork(SemLockObject *self, PyObject *Py_UNUSED(ignored))
*/
static PyMethodDef semlock_methods[] = {
- {"acquire", (PyCFunction)(void(*)(void))semlock_acquire, METH_VARARGS | METH_KEYWORDS,
+ {"acquire", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
"acquire the semaphore/lock"},
{"release", (PyCFunction)semlock_release, METH_NOARGS,
"release the semaphore/lock"},
- {"__enter__", (PyCFunction)(void(*)(void))semlock_acquire, METH_VARARGS | METH_KEYWORDS,
+ {"__enter__", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
"enter the semaphore/lock"},
{"__exit__", (PyCFunction)semlock_release, METH_VARARGS,
"exit the semaphore/lock"},
@@ -618,8 +591,6 @@ static PyMemberDef semlock_members[] = {
""},
{"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY,
""},
- {"name", T_STRING, offsetof(SemLockObject, name), READONLY,
- ""},
{NULL}
};
@@ -627,16 +598,16 @@ static PyMemberDef semlock_members[] = {
* Semaphore type
*/
-PyTypeObject _PyMp_SemLockType = {
+PyTypeObject SemLockType = {
PyVarObject_HEAD_INIT(NULL, 0)
/* tp_name */ "_multiprocessing.SemLock",
/* tp_basicsize */ sizeof(SemLockObject),
/* tp_itemsize */ 0,
/* tp_dealloc */ (destructor)semlock_dealloc,
- /* tp_vectorcall_offset */ 0,
+ /* tp_print */ 0,
/* tp_getattr */ 0,
/* tp_setattr */ 0,
- /* tp_as_async */ 0,
+ /* tp_compare */ 0,
/* tp_repr */ 0,
/* tp_as_number */ 0,
/* tp_as_sequence */ 0,
@@ -667,23 +638,3 @@ PyTypeObject _PyMp_SemLockType = {
/* tp_alloc */ 0,
/* tp_new */ semlock_new,
};
-
-/*
- * Function to unlink semaphore names
- */
-
-PyObject *
-_PyMp_sem_unlink(PyObject *ignore, PyObject *args)
-{
- char *name;
-
- if (!PyArg_ParseTuple(args, "s", &name))
- return NULL;
-
- if (SEM_UNLINK(name) < 0) {
- _PyMp_SetError(NULL, MP_STANDARD_ERROR);
- return NULL;
- }
-
- Py_RETURN_NONE;
-}
diff --git a/Modules/_multiprocessing/socket_connection.c b/Modules/_multiprocessing/socket_connection.c
new file mode 100644
index 0000000..bdb0a32
--- /dev/null
+++ b/Modules/_multiprocessing/socket_connection.c
@@ -0,0 +1,277 @@
+/*
+ * A type which wraps a socket
+ *
+ * socket_connection.c
+ *
+ * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+ */
+
+#include "multiprocessing.h"
+
+#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL)
+# include "poll.h"
+#endif
+
+#ifdef MS_WINDOWS
+# define WRITE(h, buffer, length) send((SOCKET)h, buffer, length, 0)
+# define READ(h, buffer, length) recv((SOCKET)h, buffer, length, 0)
+# define CLOSE(h) closesocket((SOCKET)h)
+#else
+# define WRITE(h, buffer, length) write(h, buffer, length)
+# define READ(h, buffer, length) read(h, buffer, length)
+# define CLOSE(h) close(h)
+#endif
+
+/*
+ * Wrapper for PyErr_CheckSignals() which can be called without the GIL
+ */
+
+static int
+check_signals(void)
+{
+ PyGILState_STATE state;
+ int res;
+ state = PyGILState_Ensure();
+ res = PyErr_CheckSignals();
+ PyGILState_Release(state);
+ return res;
+}
+
+/*
+ * Send string to file descriptor
+ */
+
+static Py_ssize_t
+_conn_sendall(HANDLE h, char *string, size_t length)
+{
+ char *p = string;
+ Py_ssize_t res;
+
+ while (length > 0) {
+ res = WRITE(h, p, length);
+ if (res < 0) {
+ if (errno == EINTR) {
+ if (check_signals() < 0)
+ return MP_EXCEPTION_HAS_BEEN_SET;
+ continue;
+ }
+ return MP_SOCKET_ERROR;
+ }
+ length -= res;
+ p += res;
+ }
+
+ return MP_SUCCESS;
+}
+
+/*
+ * Receive string of exact length from file descriptor
+ */
+
+static Py_ssize_t
+_conn_recvall(HANDLE h, char *buffer, size_t length)
+{
+ size_t remaining = length;
+ Py_ssize_t temp;
+ char *p = buffer;
+
+ while (remaining > 0) {
+ temp = READ(h, p, remaining);
+ if (temp < 0) {
+ if (errno == EINTR) {
+ if (check_signals() < 0)
+ return MP_EXCEPTION_HAS_BEEN_SET;
+ continue;
+ }
+ return temp;
+ }
+ else if (temp == 0) {
+ return remaining == length ? MP_END_OF_FILE : MP_EARLY_END_OF_FILE;
+ }
+ remaining -= temp;
+ p += temp;
+ }
+
+ return MP_SUCCESS;
+}
+
+/*
+ * Send a string prepended by the string length in network byte order
+ */
+
+static Py_ssize_t
+conn_send_string(ConnectionObject *conn, char *string, size_t length)
+{
+ Py_ssize_t res;
+ /* The "header" of the message is a 32 bit unsigned number (in
+ network order) which specifies the length of the "body". If
+ the message is shorter than about 16kb then it is quicker to
+ combine the "header" and the "body" of the message and send
+ them at once. */
+ if (length < (16*1024)) {
+ char *message;
+
+ message = PyMem_Malloc(length+4);
+ if (message == NULL)
+ return MP_MEMORY_ERROR;
+
+ *(UINT32*)message = htonl((UINT32)length);
+ memcpy(message+4, string, length);
+ Py_BEGIN_ALLOW_THREADS
+ res = _conn_sendall(conn->handle, message, length+4);
+ Py_END_ALLOW_THREADS
+ PyMem_Free(message);
+ } else {
+ UINT32 lenbuff;
+
+ if (length > MAX_MESSAGE_LENGTH)
+ return MP_BAD_MESSAGE_LENGTH;
+
+ lenbuff = htonl((UINT32)length);
+ Py_BEGIN_ALLOW_THREADS
+ res = _conn_sendall(conn->handle, (char*)&lenbuff, 4) ||
+ _conn_sendall(conn->handle, string, length);
+ Py_END_ALLOW_THREADS
+ }
+ return res;
+}
+
+/*
+ * Attempts to read into buffer, or failing that into *newbuffer
+ *
+ * Returns number of bytes read.
+ */
+
+static Py_ssize_t
+conn_recv_string(ConnectionObject *conn, char *buffer,
+ size_t buflength, char **newbuffer, size_t maxlength)
+{
+ Py_ssize_t res;
+ UINT32 ulength;
+
+ *newbuffer = NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ res = _conn_recvall(conn->handle, (char*)&ulength, 4);
+ Py_END_ALLOW_THREADS
+ if (res < 0)
+ return res;
+
+ ulength = ntohl(ulength);
+ if (ulength > maxlength)
+ return MP_BAD_MESSAGE_LENGTH;
+
+ if (ulength > buflength) {
+ *newbuffer = buffer = PyMem_Malloc((size_t)ulength);
+ if (buffer == NULL)
+ return MP_MEMORY_ERROR;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ res = _conn_recvall(conn->handle, buffer, (size_t)ulength);
+ Py_END_ALLOW_THREADS
+
+ if (res >= 0) {
+ res = (Py_ssize_t)ulength;
+ } else if (*newbuffer != NULL) {
+ PyMem_Free(*newbuffer);
+ *newbuffer = NULL;
+ }
+ return res;
+}
+
+/*
+ * Check whether any data is available for reading -- neg timeout blocks
+ */
+
+static int
+conn_poll(ConnectionObject *conn, double timeout, PyThreadState *_save)
+{
+#if defined(HAVE_POLL) && !defined(HAVE_BROKEN_POLL)
+ int res;
+ struct pollfd p;
+
+ p.fd = (int)conn->handle;
+ p.events = POLLIN | POLLPRI;
+ p.revents = 0;
+
+ if (timeout < 0) {
+ do {
+ res = poll(&p, 1, -1);
+ } while (res < 0 && errno == EINTR);
+ } else {
+ res = poll(&p, 1, (int)(timeout * 1000 + 0.5));
+ if (res < 0 && errno == EINTR) {
+ /* We were interrupted by a signal. Just indicate a
+ timeout even though we are early. */
+ return FALSE;
+ }
+ }
+
+ if (res < 0) {
+ return MP_SOCKET_ERROR;
+ } else if (p.revents & (POLLNVAL|POLLERR)) {
+ Py_BLOCK_THREADS
+ PyErr_SetString(PyExc_IOError, "poll() gave POLLNVAL or POLLERR");
+ Py_UNBLOCK_THREADS
+ return MP_EXCEPTION_HAS_BEEN_SET;
+ } else if (p.revents != 0) {
+ return TRUE;
+ } else {
+ assert(res == 0);
+ return FALSE;
+ }
+#else
+ int res;
+ fd_set rfds;
+
+ /*
+ * Verify the handle, issue 3321. Not required for windows.
+ */
+ #ifndef MS_WINDOWS
+ if (((int)conn->handle) < 0 || ((int)conn->handle) >= FD_SETSIZE) {
+ Py_BLOCK_THREADS
+ PyErr_SetString(PyExc_IOError, "handle out of range in select()");
+ Py_UNBLOCK_THREADS
+ return MP_EXCEPTION_HAS_BEEN_SET;
+ }
+ #endif
+
+ FD_ZERO(&rfds);
+ FD_SET((SOCKET)conn->handle, &rfds);
+
+ if (timeout < 0.0) {
+ do {
+ res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL);
+ } while (res < 0 && errno == EINTR);
+ } else {
+ struct timeval tv;
+ tv.tv_sec = (long)timeout;
+ tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5);
+ res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv);
+ if (res < 0 && errno == EINTR) {
+ /* We were interrupted by a signal. Just indicate a
+ timeout even though we are early. */
+ return FALSE;
+ }
+ }
+
+ if (res < 0) {
+ return MP_SOCKET_ERROR;
+ } else if (FD_ISSET(conn->handle, &rfds)) {
+ return TRUE;
+ } else {
+ assert(res == 0);
+ return FALSE;
+ }
+#endif
+}
+
+/*
+ * "connection.h" defines the Connection type using defs above
+ */
+
+#define CONNECTION_NAME "Connection"
+#define CONNECTION_TYPE ConnectionType
+
+#include "connection.h"
diff --git a/Modules/_multiprocessing/win32_functions.c b/Modules/_multiprocessing/win32_functions.c
new file mode 100644
index 0000000..9425929
--- /dev/null
+++ b/Modules/_multiprocessing/win32_functions.c
@@ -0,0 +1,267 @@
+/*
+ * Win32 functions used by multiprocessing package
+ *
+ * win32_functions.c
+ *
+ * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+ */
+
+#include "multiprocessing.h"
+
+
+#define WIN32_FUNCTION(func) \
+ {#func, (PyCFunction)win32_ ## func, METH_VARARGS | METH_STATIC, ""}
+
+#define WIN32_CONSTANT(fmt, con) \
+ PyDict_SetItemString(Win32Type.tp_dict, #con, Py_BuildValue(fmt, con))
+
+
+static PyObject *
+win32_CloseHandle(PyObject *self, PyObject *args)
+{
+ HANDLE hObject;
+ BOOL success;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE, &hObject))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ success = CloseHandle(hObject);
+ Py_END_ALLOW_THREADS
+
+ if (!success)
+ return PyErr_SetFromWindowsErr(0);
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+win32_ConnectNamedPipe(PyObject *self, PyObject *args)
+{
+ HANDLE hNamedPipe;
+ LPOVERLAPPED lpOverlapped;
+ BOOL success;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE F_POINTER,
+ &hNamedPipe, &lpOverlapped))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ success = ConnectNamedPipe(hNamedPipe, lpOverlapped);
+ Py_END_ALLOW_THREADS
+
+ if (!success)
+ return PyErr_SetFromWindowsErr(0);
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+win32_CreateFile(PyObject *self, PyObject *args)
+{
+ LPCTSTR lpFileName;
+ DWORD dwDesiredAccess;
+ DWORD dwShareMode;
+ LPSECURITY_ATTRIBUTES lpSecurityAttributes;
+ DWORD dwCreationDisposition;
+ DWORD dwFlagsAndAttributes;
+ HANDLE hTemplateFile;
+ HANDLE handle;
+
+ if (!PyArg_ParseTuple(args, "s" F_DWORD F_DWORD F_POINTER
+ F_DWORD F_DWORD F_HANDLE,
+ &lpFileName, &dwDesiredAccess, &dwShareMode,
+ &lpSecurityAttributes, &dwCreationDisposition,
+ &dwFlagsAndAttributes, &hTemplateFile))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ handle = CreateFile(lpFileName, dwDesiredAccess,
+ dwShareMode, lpSecurityAttributes,
+ dwCreationDisposition,
+ dwFlagsAndAttributes, hTemplateFile);
+ Py_END_ALLOW_THREADS
+
+ if (handle == INVALID_HANDLE_VALUE)
+ return PyErr_SetFromWindowsErr(0);
+
+ return Py_BuildValue(F_HANDLE, handle);
+}
+
+static PyObject *
+win32_CreateNamedPipe(PyObject *self, PyObject *args)
+{
+ LPCTSTR lpName;
+ DWORD dwOpenMode;
+ DWORD dwPipeMode;
+ DWORD nMaxInstances;
+ DWORD nOutBufferSize;
+ DWORD nInBufferSize;
+ DWORD nDefaultTimeOut;
+ LPSECURITY_ATTRIBUTES lpSecurityAttributes;
+ HANDLE handle;
+
+ if (!PyArg_ParseTuple(args, "s" F_DWORD F_DWORD F_DWORD
+ F_DWORD F_DWORD F_DWORD F_POINTER,
+ &lpName, &dwOpenMode, &dwPipeMode,
+ &nMaxInstances, &nOutBufferSize,
+ &nInBufferSize, &nDefaultTimeOut,
+ &lpSecurityAttributes))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ handle = CreateNamedPipe(lpName, dwOpenMode, dwPipeMode,
+ nMaxInstances, nOutBufferSize,
+ nInBufferSize, nDefaultTimeOut,
+ lpSecurityAttributes);
+ Py_END_ALLOW_THREADS
+
+ if (handle == INVALID_HANDLE_VALUE)
+ return PyErr_SetFromWindowsErr(0);
+
+ return Py_BuildValue(F_HANDLE, handle);
+}
+
+static PyObject *
+win32_ExitProcess(PyObject *self, PyObject *args)
+{
+ UINT uExitCode;
+
+ if (!PyArg_ParseTuple(args, "I", &uExitCode))
+ return NULL;
+
+ #if defined(Py_DEBUG)
+ SetErrorMode(SEM_FAILCRITICALERRORS|SEM_NOALIGNMENTFAULTEXCEPT|SEM_NOGPFAULTERRORBOX|SEM_NOOPENFILEERRORBOX);
+ _CrtSetReportMode(_CRT_ASSERT, _CRTDBG_MODE_DEBUG);
+ #endif
+
+
+ ExitProcess(uExitCode);
+
+ return NULL;
+}
+
+static PyObject *
+win32_GetLastError(PyObject *self, PyObject *args)
+{
+ return Py_BuildValue(F_DWORD, GetLastError());
+}
+
+static PyObject *
+win32_OpenProcess(PyObject *self, PyObject *args)
+{
+ DWORD dwDesiredAccess;
+ BOOL bInheritHandle;
+ DWORD dwProcessId;
+ HANDLE handle;
+
+ if (!PyArg_ParseTuple(args, F_DWORD "i" F_DWORD,
+ &dwDesiredAccess, &bInheritHandle, &dwProcessId))
+ return NULL;
+
+ handle = OpenProcess(dwDesiredAccess, bInheritHandle, dwProcessId);
+ if (handle == NULL)
+ return PyErr_SetFromWindowsErr(0);
+
+ return Py_BuildValue(F_HANDLE, handle);
+}
+
+static PyObject *
+win32_SetNamedPipeHandleState(PyObject *self, PyObject *args)
+{
+ HANDLE hNamedPipe;
+ PyObject *oArgs[3];
+ DWORD dwArgs[3], *pArgs[3] = {NULL, NULL, NULL};
+ int i;
+
+ if (!PyArg_ParseTuple(args, F_HANDLE "OOO",
+ &hNamedPipe, &oArgs[0], &oArgs[1], &oArgs[2]))
+ return NULL;
+
+ PyErr_Clear();
+
+ for (i = 0 ; i < 3 ; i++) {
+ if (oArgs[i] != Py_None) {
+ dwArgs[i] = PyInt_AsUnsignedLongMask(oArgs[i]);
+ if (PyErr_Occurred())
+ return NULL;
+ pArgs[i] = &dwArgs[i];
+ }
+ }
+
+ if (!SetNamedPipeHandleState(hNamedPipe, pArgs[0], pArgs[1], pArgs[2]))
+ return PyErr_SetFromWindowsErr(0);
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+win32_WaitNamedPipe(PyObject *self, PyObject *args)
+{
+ LPCTSTR lpNamedPipeName;
+ DWORD nTimeOut;
+ BOOL success;
+
+ if (!PyArg_ParseTuple(args, "s" F_DWORD, &lpNamedPipeName, &nTimeOut))
+ return NULL;
+
+ Py_BEGIN_ALLOW_THREADS
+ success = WaitNamedPipe(lpNamedPipeName, nTimeOut);
+ Py_END_ALLOW_THREADS
+
+ if (!success)
+ return PyErr_SetFromWindowsErr(0);
+
+ Py_RETURN_NONE;
+}
+
+static PyMethodDef win32_methods[] = {
+ WIN32_FUNCTION(CloseHandle),
+ WIN32_FUNCTION(GetLastError),
+ WIN32_FUNCTION(OpenProcess),
+ WIN32_FUNCTION(ExitProcess),
+ WIN32_FUNCTION(ConnectNamedPipe),
+ WIN32_FUNCTION(CreateFile),
+ WIN32_FUNCTION(CreateNamedPipe),
+ WIN32_FUNCTION(SetNamedPipeHandleState),
+ WIN32_FUNCTION(WaitNamedPipe),
+ {NULL}
+};
+
+
+PyTypeObject Win32Type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+};
+
+
+PyObject *
+create_win32_namespace(void)
+{
+ Win32Type.tp_name = "_multiprocessing.win32";
+ Win32Type.tp_methods = win32_methods;
+ if (PyType_Ready(&Win32Type) < 0)
+ return NULL;
+ Py_INCREF(&Win32Type);
+
+ WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS);
+ WIN32_CONSTANT(F_DWORD, ERROR_NO_DATA);
+ WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
+ WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED);
+ WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
+ WIN32_CONSTANT(F_DWORD, GENERIC_READ);
+ WIN32_CONSTANT(F_DWORD, GENERIC_WRITE);
+ WIN32_CONSTANT(F_DWORD, INFINITE);
+ WIN32_CONSTANT(F_DWORD, NMPWAIT_WAIT_FOREVER);
+ WIN32_CONSTANT(F_DWORD, OPEN_EXISTING);
+ WIN32_CONSTANT(F_DWORD, PIPE_ACCESS_DUPLEX);
+ WIN32_CONSTANT(F_DWORD, PIPE_ACCESS_INBOUND);
+ WIN32_CONSTANT(F_DWORD, PIPE_READMODE_MESSAGE);
+ WIN32_CONSTANT(F_DWORD, PIPE_TYPE_MESSAGE);
+ WIN32_CONSTANT(F_DWORD, PIPE_UNLIMITED_INSTANCES);
+ WIN32_CONSTANT(F_DWORD, PIPE_WAIT);
+ WIN32_CONSTANT(F_DWORD, PROCESS_ALL_ACCESS);
+
+ WIN32_CONSTANT("i", NULL);
+
+ return (PyObject*)&Win32Type;
+}