summaryrefslogtreecommitdiffstats
path: root/Modules/_multiprocessing
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2012-03-05 18:28:37 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2012-03-05 18:28:37 (GMT)
commitbdb1cf1ca56db25b33fb15dd91eef2cc32cd8973 (patch)
tree54137f9699833726def7c803cff7c995af22cfa5 /Modules/_multiprocessing
parent1e88f3faa61dbaa9ea0d2404aa8563c1eeceba54 (diff)
downloadcpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.zip
cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.gz
cpython-bdb1cf1ca56db25b33fb15dd91eef2cc32cd8973.tar.bz2
Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows.
Also, add a multiprocessing.connection.wait(rlist, timeout=None) function for polling multiple objects at once. Patch by sbt. Complete changelist from sbt's patch: * Adds a wait(rlist, timeout=None) function for polling multiple objects at once. On Unix this is just a wrapper for select(rlist, [], [], timeout=None). * Removes use of the SentinelReady exception and the sentinels argument to certain methods. concurrent.futures.process has been changed to use wait() instead of SentinelReady. * Fixes bugs concerning PipeConnection.poll() and messages of zero length. * Fixes PipeListener.accept() to call ConnectNamedPipe() with overlapped=True. * Fixes Queue.empty() and SimpleQueue.empty() so that they are threadsafe on Windows. * Now PipeConnection.poll() and wait() will not modify the pipe except possibly by consuming a zero length message. (Previously poll() could consume a partial message.) * All of multiprocesing's pipe related blocking functions/methods are now interruptible by SIGINT on Windows.
Diffstat (limited to 'Modules/_multiprocessing')
-rw-r--r--Modules/_multiprocessing/win32_functions.c69
1 files changed, 42 insertions, 27 deletions
diff --git a/Modules/_multiprocessing/win32_functions.c b/Modules/_multiprocessing/win32_functions.c
index 15aeeda..93c8fc9 100644
--- a/Modules/_multiprocessing/win32_functions.c
+++ b/Modules/_multiprocessing/win32_functions.c
@@ -60,16 +60,18 @@ typedef struct {
static void
overlapped_dealloc(OverlappedObject *self)
{
+ DWORD bytes;
int err = GetLastError();
if (self->pending) {
- if (check_CancelIoEx())
- Py_CancelIoEx(self->handle, &self->overlapped);
- else {
- PyErr_SetString(PyExc_RuntimeError,
- "I/O operations still in flight while destroying "
- "Overlapped object, the process may crash");
- PyErr_WriteUnraisable(NULL);
- }
+ /* make it a programming error to deallocate while operation
+ is pending, even if we can safely cancel it */
+ if (check_CancelIoEx() &&
+ Py_CancelIoEx(self->handle, &self->overlapped))
+ GetOverlappedResult(self->handle, &self->overlapped, &bytes, TRUE);
+ PyErr_SetString(PyExc_RuntimeError,
+ "I/O operations still in flight while destroying "
+ "Overlapped object, the process may crash");
+ PyErr_WriteUnraisable(NULL);
}
CloseHandle(self->overlapped.hEvent);
SetLastError(err);
@@ -85,6 +87,7 @@ overlapped_GetOverlappedResult(OverlappedObject *self, PyObject *waitobj)
int wait;
BOOL res;
DWORD transferred = 0;
+ DWORD err;
wait = PyObject_IsTrue(waitobj);
if (wait < 0)
@@ -94,23 +97,27 @@ overlapped_GetOverlappedResult(OverlappedObject *self, PyObject *waitobj)
wait != 0);
Py_END_ALLOW_THREADS
- if (!res) {
- int err = GetLastError();
- if (err == ERROR_IO_INCOMPLETE)
- Py_RETURN_NONE;
- if (err != ERROR_MORE_DATA) {
+ err = res ? ERROR_SUCCESS : GetLastError();
+ switch (err) {
+ case ERROR_SUCCESS:
+ case ERROR_MORE_DATA:
+ case ERROR_OPERATION_ABORTED:
+ self->completed = 1;
+ self->pending = 0;
+ break;
+ case ERROR_IO_INCOMPLETE:
+ break;
+ default:
self->pending = 0;
return PyErr_SetExcFromWindowsErr(PyExc_IOError, err);
- }
}
- self->pending = 0;
- self->completed = 1;
- if (self->read_buffer) {
+ if (self->completed && self->read_buffer != NULL) {
assert(PyBytes_CheckExact(self->read_buffer));
- if (_PyBytes_Resize(&self->read_buffer, transferred))
+ if (transferred != PyBytes_GET_SIZE(self->read_buffer) &&
+ _PyBytes_Resize(&self->read_buffer, transferred))
return NULL;
}
- return Py_BuildValue("lN", (long) transferred, PyBool_FromLong(res));
+ return Py_BuildValue("II", (unsigned) transferred, (unsigned) err);
}
static PyObject *
@@ -522,9 +529,10 @@ win32_WriteFile(PyObject *self, PyObject *args, PyObject *kwds)
HANDLE handle;
Py_buffer _buf, *buf;
PyObject *bufobj;
- int written;
+ DWORD written;
BOOL ret;
int use_overlapped = 0;
+ DWORD err;
OverlappedObject *overlapped = NULL;
static char *kwlist[] = {"handle", "buffer", "overlapped", NULL};
@@ -553,8 +561,9 @@ win32_WriteFile(PyObject *self, PyObject *args, PyObject *kwds)
overlapped ? &overlapped->overlapped : NULL);
Py_END_ALLOW_THREADS
+ err = ret ? 0 : GetLastError();
+
if (overlapped) {
- int err = GetLastError();
if (!ret) {
if (err == ERROR_IO_PENDING)
overlapped->pending = 1;
@@ -563,13 +572,13 @@ win32_WriteFile(PyObject *self, PyObject *args, PyObject *kwds)
return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
}
}
- return (PyObject *) overlapped;
+ return Py_BuildValue("NI", (PyObject *) overlapped, err);
}
PyBuffer_Release(buf);
if (!ret)
return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
- return PyLong_FromLong(written);
+ return Py_BuildValue("II", written, err);
}
static PyObject *
@@ -581,6 +590,7 @@ win32_ReadFile(PyObject *self, PyObject *args, PyObject *kwds)
PyObject *buf;
BOOL ret;
int use_overlapped = 0;
+ DWORD err;
OverlappedObject *overlapped = NULL;
static char *kwlist[] = {"handle", "size", "overlapped", NULL};
@@ -607,8 +617,9 @@ win32_ReadFile(PyObject *self, PyObject *args, PyObject *kwds)
overlapped ? &overlapped->overlapped : NULL);
Py_END_ALLOW_THREADS
+ err = ret ? 0 : GetLastError();
+
if (overlapped) {
- int err = GetLastError();
if (!ret) {
if (err == ERROR_IO_PENDING)
overlapped->pending = 1;
@@ -617,16 +628,16 @@ win32_ReadFile(PyObject *self, PyObject *args, PyObject *kwds)
return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
}
}
- return (PyObject *) overlapped;
+ return Py_BuildValue("NI", (PyObject *) overlapped, err);
}
- if (!ret && GetLastError() != ERROR_MORE_DATA) {
+ if (!ret && err != ERROR_MORE_DATA) {
Py_DECREF(buf);
return PyErr_SetExcFromWindowsErr(PyExc_IOError, 0);
}
if (_PyBytes_Resize(&buf, nread))
return NULL;
- return Py_BuildValue("NN", buf, PyBool_FromLong(ret));
+ return Py_BuildValue("NI", buf, err);
}
static PyObject *
@@ -783,7 +794,11 @@ create_win32_namespace(void)
WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS);
WIN32_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE);
+ WIN32_CONSTANT(F_DWORD, ERROR_IO_PENDING);
+ WIN32_CONSTANT(F_DWORD, ERROR_MORE_DATA);
+ WIN32_CONSTANT(F_DWORD, ERROR_NETNAME_DELETED);
WIN32_CONSTANT(F_DWORD, ERROR_NO_SYSTEM_RESOURCES);
+ WIN32_CONSTANT(F_DWORD, ERROR_OPERATION_ABORTED);
WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED);
WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);