diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2012-03-05 18:28:37 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2012-03-05 18:28:37 (GMT) |
commit | bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973 (patch) | |
tree | 54137f9699833726def7c803cff7c995af22cfa5 /Modules/_multiprocessing | |
parent | 1e88f3faa61dbaa9ea0d2404aa8563c1eeceba54 (diff) | |
download | cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.zip cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.gz cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.bz2 |
Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows.
Also, add a multiprocessing.connection.wait(rlist, timeout=None) function
for polling multiple objects at once. Patch by sbt.
Complete changelist from sbt's patch:
* Adds a wait(rlist, timeout=None) function for polling multiple
objects at once. On Unix this is just a wrapper for
select(rlist, [], [], timeout=None).
* Removes use of the SentinelReady exception and the sentinels argument
to certain methods. concurrent.futures.process has been changed to
use wait() instead of SentinelReady.
* Fixes bugs concerning PipeConnection.poll() and messages of zero
length.
* Fixes PipeListener.accept() to call ConnectNamedPipe() with
overlapped=True.
* Fixes Queue.empty() and SimpleQueue.empty() so that they are
threadsafe on Windows.
* Now PipeConnection.poll() and wait() will not modify the pipe except
possibly by consuming a zero length message. (Previously poll()
could consume a partial message.)
* All of multiprocesing's pipe related blocking functions/methods are
now interruptible by SIGINT on Windows.
Diffstat (limited to 'Modules/_multiprocessing')
-rw-r--r-- | Modules/_multiprocessing/win32_functions.c | 69 |
1 files changed, 42 insertions, 27 deletions
diff --git a/Modules/_multiprocessing/win32_functions.c b/Modules/_multiprocessing/win32_functions.c index 15aeeda..93c8fc9 100644 --- a/Modules/_multiprocessing/win32_functions.c +++ b/Modules/_multiprocessing/win32_functions.c @@ -60,16 +60,18 @@ typedef struct { static void overlapped_dealloc(OverlappedObject *self) { + DWORD bytes; int err = GetLastError(); if (self->pending) { - if (check_CancelIoEx()) - Py_CancelIoEx(self->handle, &self->overlapped); - else { - PyErr_SetString(PyExc_RuntimeError, - "I/O operations still in flight while destroying " - "Overlapped object, the process may crash"); - PyErr_WriteUnraisable(NULL); - } + /* make it a programming error to deallocate while operation + is pending, even if we can safely cancel it */ + if (check_CancelIoEx() && + Py_CancelIoEx(self->handle, &self->overlapped)) + GetOverlappedResult(self->handle, &self->overlapped, &bytes, TRUE); + PyErr_SetString(PyExc_RuntimeError, + "I/O operations still in flight while destroying " + "Overlapped object, the process may crash"); + PyErr_WriteUnraisable(NULL); } CloseHandle(self->overlapped.hEvent); SetLastError(err); @@ -85,6 +87,7 @@ overlapped_GetOverlappedResult(OverlappedObject *self, PyObject *waitobj) int wait; BOOL res; DWORD transferred = 0; + DWORD err; wait = PyObject_IsTrue(waitobj); if (wait < 0) @@ -94,23 +97,27 @@ overlapped_GetOverlappedResult(OverlappedObject *self, PyObject *waitobj) wait != 0); Py_END_ALLOW_THREADS - if (!res) { - int err = GetLastError(); - if (err == ERROR_IO_INCOMPLETE) - Py_RETURN_NONE; - if (err != ERROR_MORE_DATA) { + err = res ? ERROR_SUCCESS : GetLastError(); + switch (err) { + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + case ERROR_OPERATION_ABORTED: + self->completed = 1; + self->pending = 0; + break; + case ERROR_IO_INCOMPLETE: + break; + default: self->pending = 0; return PyErr_SetExcFromWindowsErr(PyExc_IOError, err); - } } - self->pending = 0; - self->completed = 1; - if (self->read_buffer) { + if (self->completed && self->read_buffer != NULL) { assert(PyBytes_CheckExact(self->read_buffer)); - if (_PyBytes_Resize(&self->read_buffer, transferred)) + if (transferred != PyBytes_GET_SIZE(self->read_buffer) && + _PyBytes_Resize(&self->read_buffer, transferred)) return NULL; } - return Py_BuildValue("lN", (long) transferred, PyBool_FromLong(res)); + return Py_BuildValue("II", (unsigned) transferred, (unsigned) err); } static PyObject * @@ -522,9 +529,10 @@ win32_WriteFile(PyObject *self, PyObject *args, PyObject *kwds) HANDLE handle; Py_buffer _buf, *buf; PyObject *bufobj; - int written; + DWORD written; BOOL ret; int use_overlapped = 0; + DWORD err; OverlappedObject *overlapped = NULL; static char *kwlist[] = {"handle", "buffer", "overlapped", NULL}; @@ -553,8 +561,9 @@ win32_WriteFile(PyObject *self, PyObject *args, PyObject *kwds) overlapped ? &overlapped->overlapped : NULL); Py_END_ALLOW_THREADS + err = ret ? 0 : GetLastError(); + if (overlapped) { - int err = GetLastError(); if (!ret) { if (err == ERROR_IO_PENDING) overlapped->pending = 1; @@ -563,13 +572,13 @@ win32_WriteFile(PyObject *self, PyObject *args, PyObject *kwds) return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); } } - return (PyObject *) overlapped; + return Py_BuildValue("NI", (PyObject *) overlapped, err); } PyBuffer_Release(buf); if (!ret) return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); - return PyLong_FromLong(written); + return Py_BuildValue("II", written, err); } static PyObject * @@ -581,6 +590,7 @@ win32_ReadFile(PyObject *self, PyObject *args, PyObject *kwds) PyObject *buf; BOOL ret; int use_overlapped = 0; + DWORD err; OverlappedObject *overlapped = NULL; static char *kwlist[] = {"handle", "size", "overlapped", NULL}; @@ -607,8 +617,9 @@ win32_ReadFile(PyObject *self, PyObject *args, PyObject *kwds) overlapped ? &overlapped->overlapped : NULL); Py_END_ALLOW_THREADS + err = ret ? 0 : GetLastError(); + if (overlapped) { - int err = GetLastError(); if (!ret) { if (err == ERROR_IO_PENDING) overlapped->pending = 1; @@ -617,16 +628,16 @@ win32_ReadFile(PyObject *self, PyObject *args, PyObject *kwds) return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); } } - return (PyObject *) overlapped; + return Py_BuildValue("NI", (PyObject *) overlapped, err); } - if (!ret && GetLastError() != ERROR_MORE_DATA) { + if (!ret && err != ERROR_MORE_DATA) { Py_DECREF(buf); return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0); } if (_PyBytes_Resize(&buf, nread)) return NULL; - return Py_BuildValue("NN", buf, PyBool_FromLong(ret)); + return Py_BuildValue("NI", buf, err); } static PyObject * @@ -783,7 +794,11 @@ create_win32_namespace(void) WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS); WIN32_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE); + WIN32_CONSTANT(F_DWORD, ERROR_IO_PENDING); + WIN32_CONSTANT(F_DWORD, ERROR_MORE_DATA); + WIN32_CONSTANT(F_DWORD, ERROR_NETNAME_DELETED); WIN32_CONSTANT(F_DWORD, ERROR_NO_SYSTEM_RESOURCES); + WIN32_CONSTANT(F_DWORD, ERROR_OPERATION_ABORTED); WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY); WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED); WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT); |