summaryrefslogtreecommitdiffstats
path: root/Modules/_multiprocessing/connection.h
diff options
context:
space:
mode:
Diffstat (limited to 'Modules/_multiprocessing/connection.h')
-rw-r--r--Modules/_multiprocessing/connection.h515
1 files changed, 515 insertions, 0 deletions
diff --git a/Modules/_multiprocessing/connection.h b/Modules/_multiprocessing/connection.h
new file mode 100644
index 0000000..e95d4c4
--- /dev/null
+++ b/Modules/_multiprocessing/connection.h
@@ -0,0 +1,515 @@
+/*
+ * Definition of a `Connection` type.
+ * Used by `socket_connection.c` and `pipe_connection.c`.
+ *
+ * connection.h
+ *
+ * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+ */
+
+#ifndef CONNECTION_H
+#define CONNECTION_H
+
+/*
+ * Read/write flags
+ */
+
+#define READABLE 1
+#define WRITABLE 2
+
+#define CHECK_READABLE(self) \
+ if (!(self->flags & READABLE)) { \
+ PyErr_SetString(PyExc_IOError, "connection is write-only"); \
+ return NULL; \
+ }
+
+#define CHECK_WRITABLE(self) \
+ if (!(self->flags & WRITABLE)) { \
+ PyErr_SetString(PyExc_IOError, "connection is read-only"); \
+ return NULL; \
+ }
+
+/*
+ * Allocation and deallocation
+ */
+
+static PyObject *
+connection_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
+{
+ ConnectionObject *self;
+ HANDLE handle;
+ BOOL readable = TRUE, writable = TRUE;
+
+ static char *kwlist[] = {"handle", "readable", "writable", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, F_HANDLE "|ii", kwlist,
+ &handle, &readable, &writable))
+ return NULL;
+
+ if (handle == INVALID_HANDLE_VALUE || (Py_ssize_t)handle < 0) {
+ PyErr_Format(PyExc_IOError, "invalid handle %"
+ PY_FORMAT_SIZE_T "d", (Py_ssize_t)handle);
+ return NULL;
+ }
+
+ if (!readable && !writable) {
+ PyErr_SetString(PyExc_ValueError,
+ "either readable or writable must be true");
+ return NULL;
+ }
+
+ self = PyObject_New(ConnectionObject, type);
+ if (self == NULL)
+ return NULL;
+
+ self->weakreflist = NULL;
+ self->handle = handle;
+ self->flags = 0;
+
+ if (readable)
+ self->flags |= READABLE;
+ if (writable)
+ self->flags |= WRITABLE;
+ assert(self->flags >= 1 && self->flags <= 3);
+
+ return (PyObject*)self;
+}
+
+static void
+connection_dealloc(ConnectionObject* self)
+{
+ if (self->weakreflist != NULL)
+ PyObject_ClearWeakRefs((PyObject*)self);
+
+ if (self->handle != INVALID_HANDLE_VALUE) {
+ Py_BEGIN_ALLOW_THREADS
+ CLOSE(self->handle);
+ Py_END_ALLOW_THREADS
+ }
+ PyObject_Del(self);
+}
+
+/*
+ * Functions for transferring buffers
+ */
+
+static PyObject *
+connection_sendbytes(ConnectionObject *self, PyObject *args)
+{
+ char *buffer;
+ Py_ssize_t length, offset=0, size=PY_SSIZE_T_MIN;
+ int res;
+
+ if (!PyArg_ParseTuple(args, F_RBUFFER "#|" F_PY_SSIZE_T F_PY_SSIZE_T,
+ &buffer, &length, &offset, &size))
+ return NULL;
+
+ CHECK_WRITABLE(self);
+
+ if (offset < 0) {
+ PyErr_SetString(PyExc_ValueError, "offset is negative");
+ return NULL;
+ }
+ if (length < offset) {
+ PyErr_SetString(PyExc_ValueError, "buffer length < offset");
+ return NULL;
+ }
+
+ if (size == PY_SSIZE_T_MIN) {
+ size = length - offset;
+ } else {
+ if (size < 0) {
+ PyErr_SetString(PyExc_ValueError, "size is negative");
+ return NULL;
+ }
+ if (offset + size > length) {
+ PyErr_SetString(PyExc_ValueError,
+ "buffer length < offset + size");
+ return NULL;
+ }
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ res = conn_send_string(self, buffer + offset, size);
+ Py_END_ALLOW_THREADS
+
+ if (res < 0)
+ return mp_SetError(PyExc_IOError, res);
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+connection_recvbytes(ConnectionObject *self, PyObject *args)
+{
+ char *freeme = NULL;
+ Py_ssize_t res, maxlength = PY_SSIZE_T_MAX;
+ PyObject *result = NULL;
+
+ if (!PyArg_ParseTuple(args, "|" F_PY_SSIZE_T, &maxlength))
+ return NULL;
+
+ CHECK_READABLE(self);
+
+ if (maxlength < 0) {
+ PyErr_SetString(PyExc_ValueError, "maxlength < 0");
+ return NULL;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE,
+ &freeme, maxlength);
+ Py_END_ALLOW_THREADS
+
+ if (res < 0) {
+ if (res == MP_BAD_MESSAGE_LENGTH) {
+ if ((self->flags & WRITABLE) == 0) {
+ Py_BEGIN_ALLOW_THREADS
+ CLOSE(self->handle);
+ Py_END_ALLOW_THREADS
+ self->handle = INVALID_HANDLE_VALUE;
+ } else {
+ self->flags = WRITABLE;
+ }
+ }
+ mp_SetError(PyExc_IOError, res);
+ } else {
+ if (freeme == NULL) {
+ result = PyBytes_FromStringAndSize(self->buffer, res);
+ } else {
+ result = PyBytes_FromStringAndSize(freeme, res);
+ PyMem_Free(freeme);
+ }
+ }
+
+ return result;
+}
+
+static PyObject *
+connection_recvbytes_into(ConnectionObject *self, PyObject *args)
+{
+ char *freeme = NULL, *buffer = NULL;
+ Py_ssize_t res, length, offset = 0;
+ PyObject *result = NULL;
+
+ if (!PyArg_ParseTuple(args, "w#|" F_PY_SSIZE_T,
+ &buffer, &length, &offset))
+ return NULL;
+
+ CHECK_READABLE(self);
+
+ if (offset < 0) {
+ PyErr_SetString(PyExc_ValueError, "negative offset");
+ return NULL;
+ }
+
+ if (offset > length) {
+ PyErr_SetString(PyExc_ValueError, "offset too large");
+ return NULL;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ res = conn_recv_string(self, buffer+offset, length-offset,
+ &freeme, PY_SSIZE_T_MAX);
+ Py_END_ALLOW_THREADS
+
+ if (res < 0) {
+ if (res == MP_BAD_MESSAGE_LENGTH) {
+ if ((self->flags & WRITABLE) == 0) {
+ Py_BEGIN_ALLOW_THREADS
+ CLOSE(self->handle);
+ Py_END_ALLOW_THREADS
+ self->handle = INVALID_HANDLE_VALUE;
+ } else {
+ self->flags = WRITABLE;
+ }
+ }
+ mp_SetError(PyExc_IOError, res);
+ } else {
+ if (freeme == NULL) {
+ result = PyInt_FromSsize_t(res);
+ } else {
+ result = PyObject_CallFunction(BufferTooShort,
+ F_RBUFFER "#",
+ freeme, res);
+ PyMem_Free(freeme);
+ if (result) {
+ PyErr_SetObject(BufferTooShort, result);
+ Py_DECREF(result);
+ }
+ return NULL;
+ }
+ }
+
+ return result;
+}
+
+/*
+ * Functions for transferring objects
+ */
+
+static PyObject *
+connection_send_obj(ConnectionObject *self, PyObject *obj)
+{
+ char *buffer;
+ int res;
+ Py_ssize_t length;
+ PyObject *pickled_string = NULL;
+
+ CHECK_WRITABLE(self);
+
+ pickled_string = PyObject_CallFunctionObjArgs(pickle_dumps, obj,
+ pickle_protocol, NULL);
+ if (!pickled_string)
+ goto failure;
+
+ if (PyBytes_AsStringAndSize(pickled_string, &buffer, &length) < 0)
+ goto failure;
+
+ Py_BEGIN_ALLOW_THREADS
+ res = conn_send_string(self, buffer, (int)length);
+ Py_END_ALLOW_THREADS
+
+ if (res < 0) {
+ mp_SetError(PyExc_IOError, res);
+ goto failure;
+ }
+
+ Py_XDECREF(pickled_string);
+ Py_RETURN_NONE;
+
+ failure:
+ Py_XDECREF(pickled_string);
+ return NULL;
+}
+
+static PyObject *
+connection_recv_obj(ConnectionObject *self)
+{
+ char *freeme = NULL;
+ Py_ssize_t res;
+ PyObject *temp = NULL, *result = NULL;
+
+ CHECK_READABLE(self);
+
+ Py_BEGIN_ALLOW_THREADS
+ res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE,
+ &freeme, PY_SSIZE_T_MAX);
+ Py_END_ALLOW_THREADS
+
+ if (res < 0) {
+ if (res == MP_BAD_MESSAGE_LENGTH) {
+ if ((self->flags & WRITABLE) == 0) {
+ Py_BEGIN_ALLOW_THREADS
+ CLOSE(self->handle);
+ Py_END_ALLOW_THREADS
+ self->handle = INVALID_HANDLE_VALUE;
+ } else {
+ self->flags = WRITABLE;
+ }
+ }
+ mp_SetError(PyExc_IOError, res);
+ } else {
+ if (freeme == NULL) {
+ temp = PyBytes_FromStringAndSize(self->buffer, res);
+ } else {
+ temp = PyBytes_FromStringAndSize(freeme, res);
+ PyMem_Free(freeme);
+ }
+ }
+
+ if (temp)
+ result = PyObject_CallFunctionObjArgs(pickle_loads,
+ temp, NULL);
+ Py_XDECREF(temp);
+ return result;
+}
+
+/*
+ * Other functions
+ */
+
+static PyObject *
+connection_poll(ConnectionObject *self, PyObject *args)
+{
+ PyObject *timeout_obj = NULL;
+ double timeout = 0.0;
+ int res;
+
+ CHECK_READABLE(self);
+
+ if (!PyArg_ParseTuple(args, "|O", &timeout_obj))
+ return NULL;
+
+ if (timeout_obj == NULL) {
+ timeout = 0.0;
+ } else if (timeout_obj == Py_None) {
+ timeout = -1.0; /* block forever */
+ } else {
+ timeout = PyFloat_AsDouble(timeout_obj);
+ if (PyErr_Occurred())
+ return NULL;
+ if (timeout < 0.0)
+ timeout = 0.0;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ res = conn_poll(self, timeout);
+ Py_END_ALLOW_THREADS
+
+ switch (res) {
+ case TRUE:
+ Py_RETURN_TRUE;
+ case FALSE:
+ Py_RETURN_FALSE;
+ default:
+ return mp_SetError(PyExc_IOError, res);
+ }
+}
+
+static PyObject *
+connection_fileno(ConnectionObject* self)
+{
+ if (self->handle == INVALID_HANDLE_VALUE) {
+ PyErr_SetString(PyExc_IOError, "handle is invalid");
+ return NULL;
+ }
+ return PyInt_FromLong((long)self->handle);
+}
+
+static PyObject *
+connection_close(ConnectionObject *self)
+{
+ if (self->handle != INVALID_HANDLE_VALUE) {
+ Py_BEGIN_ALLOW_THREADS
+ CLOSE(self->handle);
+ Py_END_ALLOW_THREADS
+ self->handle = INVALID_HANDLE_VALUE;
+ }
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+connection_repr(ConnectionObject *self)
+{
+ static char *conn_type[] = {"read-only", "write-only", "read-write"};
+
+ assert(self->flags >= 1 && self->flags <= 3);
+ return FROM_FORMAT("<%s %s, handle %" PY_FORMAT_SIZE_T "d>",
+ conn_type[self->flags - 1],
+ CONNECTION_NAME, (Py_ssize_t)self->handle);
+}
+
+/*
+ * Getters and setters
+ */
+
+static PyObject *
+connection_closed(ConnectionObject *self, void *closure)
+{
+ return PyBool_FromLong((long)(self->handle == INVALID_HANDLE_VALUE));
+}
+
+static PyObject *
+connection_readable(ConnectionObject *self, void *closure)
+{
+ return PyBool_FromLong((long)(self->flags & READABLE));
+}
+
+static PyObject *
+connection_writable(ConnectionObject *self, void *closure)
+{
+ return PyBool_FromLong((long)(self->flags & WRITABLE));
+}
+
+/*
+ * Tables
+ */
+
+static PyMethodDef connection_methods[] = {
+ {"send_bytes", (PyCFunction)connection_sendbytes, METH_VARARGS,
+ "send the byte data from a readable buffer-like object"},
+ {"recv_bytes", (PyCFunction)connection_recvbytes, METH_VARARGS,
+ "receive byte data as a string"},
+ {"recv_bytes_into",(PyCFunction)connection_recvbytes_into,METH_VARARGS,
+ "receive byte data into a writeable buffer-like object\n"
+ "returns the number of bytes read"},
+
+ {"send", (PyCFunction)connection_send_obj, METH_O,
+ "send a (picklable) object"},
+ {"recv", (PyCFunction)connection_recv_obj, METH_NOARGS,
+ "receive a (picklable) object"},
+
+ {"poll", (PyCFunction)connection_poll, METH_VARARGS,
+ "whether there is any input available to be read"},
+ {"fileno", (PyCFunction)connection_fileno, METH_NOARGS,
+ "file descriptor or handle of the connection"},
+ {"close", (PyCFunction)connection_close, METH_NOARGS,
+ "close the connection"},
+
+ {NULL} /* Sentinel */
+};
+
+static PyGetSetDef connection_getset[] = {
+ {"closed", (getter)connection_closed, NULL,
+ "True if the connection is closed", NULL},
+ {"readable", (getter)connection_readable, NULL,
+ "True if the connection is readable", NULL},
+ {"writable", (getter)connection_writable, NULL,
+ "True if the connection is writable", NULL},
+ {NULL}
+};
+
+/*
+ * Connection type
+ */
+
+PyDoc_STRVAR(connection_doc,
+ "Connection type whose constructor signature is\n\n"
+ " Connection(handle, readable=True, writable=True).\n\n"
+ "The constructor does *not* duplicate the handle.");
+
+PyTypeObject CONNECTION_TYPE = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ /* tp_name */ "_multiprocessing." CONNECTION_NAME,
+ /* tp_basicsize */ sizeof(ConnectionObject),
+ /* tp_itemsize */ 0,
+ /* tp_dealloc */ (destructor)connection_dealloc,
+ /* tp_print */ 0,
+ /* tp_getattr */ 0,
+ /* tp_setattr */ 0,
+ /* tp_compare */ 0,
+ /* tp_repr */ (reprfunc)connection_repr,
+ /* tp_as_number */ 0,
+ /* tp_as_sequence */ 0,
+ /* tp_as_mapping */ 0,
+ /* tp_hash */ 0,
+ /* tp_call */ 0,
+ /* tp_str */ 0,
+ /* tp_getattro */ 0,
+ /* tp_setattro */ 0,
+ /* tp_as_buffer */ 0,
+ /* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
+ Py_TPFLAGS_HAVE_WEAKREFS,
+ /* tp_doc */ connection_doc,
+ /* tp_traverse */ 0,
+ /* tp_clear */ 0,
+ /* tp_richcompare */ 0,
+ /* tp_weaklistoffset */ offsetof(ConnectionObject, weakreflist),
+ /* tp_iter */ 0,
+ /* tp_iternext */ 0,
+ /* tp_methods */ connection_methods,
+ /* tp_members */ 0,
+ /* tp_getset */ connection_getset,
+ /* tp_base */ 0,
+ /* tp_dict */ 0,
+ /* tp_descr_get */ 0,
+ /* tp_descr_set */ 0,
+ /* tp_dictoffset */ 0,
+ /* tp_init */ 0,
+ /* tp_alloc */ 0,
+ /* tp_new */ connection_new,
+};
+
+#endif /* CONNECTION_H */