summaryrefslogtreecommitdiffstats
path: root/Modules/_multiprocessing/pipe_connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'Modules/_multiprocessing/pipe_connection.c')
-rw-r--r--Modules/_multiprocessing/pipe_connection.c136
1 files changed, 136 insertions, 0 deletions
diff --git a/Modules/_multiprocessing/pipe_connection.c b/Modules/_multiprocessing/pipe_connection.c
new file mode 100644
index 0000000..a96338f
--- /dev/null
+++ b/Modules/_multiprocessing/pipe_connection.c
@@ -0,0 +1,136 @@
+/*
+ * A type which wraps a pipe handle in message oriented mode
+ *
+ * pipe_connection.c
+ *
+ * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
+ */
+
+#include "multiprocessing.h"
+
+#define CLOSE(h) CloseHandle(h)
+
+/*
+ * Send string to the pipe; assumes in message oriented mode
+ */
+
+static Py_ssize_t
+conn_send_string(ConnectionObject *conn, char *string, size_t length)
+{
+ DWORD amount_written;
+
+ return WriteFile(conn->handle, string, length, &amount_written, NULL)
+ ? MP_SUCCESS : MP_STANDARD_ERROR;
+}
+
+/*
+ * Attempts to read into buffer, or if buffer too small into *newbuffer.
+ *
+ * Returns number of bytes read. Assumes in message oriented mode.
+ */
+
+static Py_ssize_t
+conn_recv_string(ConnectionObject *conn, char *buffer,
+ size_t buflength, char **newbuffer, size_t maxlength)
+{
+ DWORD left, length, full_length, err;
+
+ *newbuffer = NULL;
+
+ if (ReadFile(conn->handle, buffer, MIN(buflength, maxlength),
+ &length, NULL))
+ return length;
+
+ err = GetLastError();
+ if (err != ERROR_MORE_DATA) {
+ if (err == ERROR_BROKEN_PIPE)
+ return MP_END_OF_FILE;
+ return MP_STANDARD_ERROR;
+ }
+
+ if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left))
+ return MP_STANDARD_ERROR;
+
+ full_length = length + left;
+ if (full_length > maxlength)
+ return MP_BAD_MESSAGE_LENGTH;
+
+ *newbuffer = PyMem_Malloc(full_length);
+ if (*newbuffer == NULL)
+ return MP_MEMORY_ERROR;
+
+ memcpy(*newbuffer, buffer, length);
+
+ if (ReadFile(conn->handle, *newbuffer+length, left, &length, NULL)) {
+ assert(length == left);
+ return full_length;
+ } else {
+ PyMem_Free(*newbuffer);
+ return MP_STANDARD_ERROR;
+ }
+}
+
+/*
+ * Check whether any data is available for reading
+ */
+
+#define conn_poll(conn, timeout) conn_poll_save(conn, timeout, _save)
+
+static int
+conn_poll_save(ConnectionObject *conn, double timeout, PyThreadState *_save)
+{
+ DWORD bytes, deadline, delay;
+ int difference, res;
+ BOOL block = FALSE;
+
+ if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
+ return MP_STANDARD_ERROR;
+
+ if (timeout == 0.0)
+ return bytes > 0;
+
+ if (timeout < 0.0)
+ block = TRUE;
+ else
+ /* XXX does not check for overflow */
+ deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5);
+
+ Sleep(0);
+
+ for (delay = 1 ; ; delay += 1) {
+ if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
+ return MP_STANDARD_ERROR;
+ else if (bytes > 0)
+ return TRUE;
+
+ if (!block) {
+ difference = deadline - GetTickCount();
+ if (difference < 0)
+ return FALSE;
+ if ((int)delay > difference)
+ delay = difference;
+ }
+
+ if (delay > 20)
+ delay = 20;
+
+ Sleep(delay);
+
+ /* check for signals */
+ Py_BLOCK_THREADS
+ res = PyErr_CheckSignals();
+ Py_UNBLOCK_THREADS
+
+ if (res)
+ return MP_EXCEPTION_HAS_BEEN_SET;
+ }
+}
+
+/*
+ * "connection.h" defines the PipeConnection type using the definitions above
+ */
+
+#define CONNECTION_NAME "PipeConnection"
+#define CONNECTION_TYPE PipeConnectionType
+
+#include "connection.h"