summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2015-01-22 21:55:08 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2015-01-22 21:55:08 (GMT)
commit7ffa2c5fdda8a9cc254edf67c4458b15db1252fa (patch)
tree1a9d320b24c440a94d5e48b2d0d456b5731e059f
parent752aba7f999b08c833979464a36840de8be0baf0 (diff)
downloadcpython-7ffa2c5fdda8a9cc254edf67c4458b15db1252fa.zip
cpython-7ffa2c5fdda8a9cc254edf67c4458b15db1252fa.tar.gz
cpython-7ffa2c5fdda8a9cc254edf67c4458b15db1252fa.tar.bz2
Issue #23293, asyncio: Rewrite IocpProactor.connect_pipe()
Add _overlapped.ConnectPipe() which tries to connect to the pipe for asynchronous I/O (overlapped): call CreateFile() in a loop until it doesn't fail with ERROR_PIPE_BUSY. Use an increasing delay between 1 ms and 100 ms. Remove Overlapped.WaitNamedPipeAndConnect() which is no more used.
-rw-r--r--Lib/asyncio/windows_events.py43
-rw-r--r--Modules/overlapped.c115
2 files changed, 48 insertions, 110 deletions
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index 315455a..7d0dbe9 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -29,6 +29,12 @@ INFINITE = 0xffffffff
ERROR_CONNECTION_REFUSED = 1225
ERROR_CONNECTION_ABORTED = 1236
+# Initial delay in seconds for connect_pipe() before retrying to connect
+CONNECT_PIPE_INIT_DELAY = 0.001
+
+# Maximum delay in seconds for connect_pipe() before retrying to connect
+CONNECT_PIPE_MAX_DELAY = 0.100
+
class _OverlappedFuture(futures.Future):
"""Subclass of Future which represents an overlapped operation.
@@ -495,25 +501,28 @@ class IocpProactor:
return self._register(ov, pipe, finish_accept_pipe,
register=False)
- def connect_pipe(self, address):
- ov = _overlapped.Overlapped(NULL)
- ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
-
- def finish_connect_pipe(err, handle, ov):
- # err, handle were arguments passed to PostQueuedCompletionStatus()
- # in a function run in a thread pool.
- if err == _overlapped.ERROR_SEM_TIMEOUT:
- # Connection did not succeed within time limit.
- msg = _overlapped.FormatMessage(err)
- raise ConnectionRefusedError(0, msg, None, err)
- elif err != 0:
- msg = _overlapped.FormatMessage(err)
- raise OSError(0, msg, None, err)
+ def _connect_pipe(self, fut, address, delay):
+ # Unfortunately there is no way to do an overlapped connect to a pipe.
+ # Call CreateFile() in a loop until it doesn't fail with
+ # ERROR_PIPE_BUSY
+ try:
+ handle = _overlapped.ConnectPipe(address)
+ except OSError as exc:
+ if exc.winerror == _overlapped.ERROR_PIPE_BUSY:
+ # Polling: retry later
+ delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
+ self._loop.call_later(delay,
+ self._connect_pipe, fut, address, delay)
else:
- return windows_utils.PipeHandle(handle)
+ fut.set_exception(exc)
+ else:
+ pipe = windows_utils.PipeHandle(handle)
+ fut.set_result(pipe)
- return self._register(ov, None, finish_connect_pipe,
- wait_for_post=True)
+ def connect_pipe(self, address):
+ fut = futures.Future(loop=self._loop)
+ self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY)
+ return fut
def wait_for_handle(self, handle, timeout=None):
"""Wait for a handle.
diff --git a/Modules/overlapped.c b/Modules/overlapped.c
index d22c626..8fe2e24 100644
--- a/Modules/overlapped.c
+++ b/Modules/overlapped.c
@@ -52,12 +52,6 @@ typedef struct {
};
} OverlappedObject;
-typedef struct {
- OVERLAPPED *Overlapped;
- HANDLE IocpHandle;
- char Address[1];
-} WaitNamedPipeAndConnectContext;
-
/*
* Map Windows error codes to subclasses of OSError
*/
@@ -1133,99 +1127,33 @@ Overlapped_ConnectNamedPipe(OverlappedObject *self, PyObject *args)
}
}
-/* Unfortunately there is no way to do an overlapped connect to a
- pipe. We instead use WaitNamedPipe() and CreateFile() in a thread
- pool thread. If a connection succeeds within a time limit (10
- seconds) then PostQueuedCompletionStatus() is used to return the
- pipe handle to the completion port. */
-
-static DWORD WINAPI
-WaitNamedPipeAndConnectInThread(WaitNamedPipeAndConnectContext *ctx)
-{
- HANDLE PipeHandle = INVALID_HANDLE_VALUE;
- DWORD Start = GetTickCount();
- DWORD Deadline = Start + 10*1000;
- DWORD Error = 0;
- DWORD Timeout;
- BOOL Success;
-
- for ( ; ; ) {
- Timeout = Deadline - GetTickCount();
- if ((int)Timeout < 0)
- break;
- Success = WaitNamedPipe(ctx->Address, Timeout);
- Error = Success ? ERROR_SUCCESS : GetLastError();
- switch (Error) {
- case ERROR_SUCCESS:
- PipeHandle = CreateFile(ctx->Address,
- GENERIC_READ | GENERIC_WRITE,
- 0, NULL, OPEN_EXISTING,
- FILE_FLAG_OVERLAPPED, NULL);
- if (PipeHandle == INVALID_HANDLE_VALUE)
- continue;
- break;
- case ERROR_SEM_TIMEOUT:
- continue;
- }
- break;
- }
- if (!PostQueuedCompletionStatus(ctx->IocpHandle, Error,
- (ULONG_PTR)PipeHandle, ctx->Overlapped))
- CloseHandle(PipeHandle);
- free(ctx);
- return 0;
-}
-
PyDoc_STRVAR(
- Overlapped_WaitNamedPipeAndConnect_doc,
- "WaitNamedPipeAndConnect(addr, iocp_handle) -> Overlapped[pipe_handle]\n\n"
- "Start overlapped connection to address, notifying iocp_handle when\n"
- "finished");
+ ConnectPipe_doc,
+ "ConnectPipe(addr) -> pipe_handle\n\n"
+ "Connect to the pipe for asynchronous I/O (overlapped).");
static PyObject *
-Overlapped_WaitNamedPipeAndConnect(OverlappedObject *self, PyObject *args)
+ConnectPipe(OverlappedObject *self, PyObject *args)
{
- char *Address;
- Py_ssize_t AddressLength;
- HANDLE IocpHandle;
- OVERLAPPED Overlapped;
- BOOL ret;
- DWORD err;
- WaitNamedPipeAndConnectContext *ctx;
- Py_ssize_t ContextLength;
+ PyObject *AddressObj;
+ wchar_t *Address;
+ HANDLE PipeHandle;
- if (!PyArg_ParseTuple(args, "s#" F_HANDLE F_POINTER,
- &Address, &AddressLength, &IocpHandle, &Overlapped))
+ if (!PyArg_ParseTuple(args, "U", &AddressObj))
return NULL;
- if (self->type != TYPE_NONE) {
- PyErr_SetString(PyExc_ValueError, "operation already attempted");
+ Address = PyUnicode_AsWideCharString(AddressObj, NULL);
+ if (Address == NULL)
return NULL;
- }
- ContextLength = (AddressLength +
- offsetof(WaitNamedPipeAndConnectContext, Address));
- ctx = calloc(1, ContextLength + 1);
- if (ctx == NULL)
- return PyErr_NoMemory();
- memcpy(ctx->Address, Address, AddressLength + 1);
- ctx->Overlapped = &self->overlapped;
- ctx->IocpHandle = IocpHandle;
-
- self->type = TYPE_WAIT_NAMED_PIPE_AND_CONNECT;
- self->handle = NULL;
-
- Py_BEGIN_ALLOW_THREADS
- ret = QueueUserWorkItem(WaitNamedPipeAndConnectInThread, ctx,
- WT_EXECUTELONGFUNCTION);
- Py_END_ALLOW_THREADS
-
- mark_as_completed(&self->overlapped);
-
- self->error = err = ret ? ERROR_SUCCESS : GetLastError();
- if (!ret)
- return SetFromWindowsErr(err);
- Py_RETURN_NONE;
+ PipeHandle = CreateFileW(Address,
+ GENERIC_READ | GENERIC_WRITE,
+ 0, NULL, OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED, NULL);
+ PyMem_Free(Address);
+ if (PipeHandle == INVALID_HANDLE_VALUE)
+ return SetFromWindowsErr(0);
+ return Py_BuildValue(F_HANDLE, PipeHandle);
}
static PyObject*
@@ -1262,9 +1190,6 @@ static PyMethodDef Overlapped_methods[] = {
METH_VARARGS, Overlapped_DisconnectEx_doc},
{"ConnectNamedPipe", (PyCFunction) Overlapped_ConnectNamedPipe,
METH_VARARGS, Overlapped_ConnectNamedPipe_doc},
- {"WaitNamedPipeAndConnect",
- (PyCFunction) Overlapped_WaitNamedPipeAndConnect,
- METH_VARARGS, Overlapped_WaitNamedPipeAndConnect_doc},
{NULL}
};
@@ -1350,6 +1275,9 @@ static PyMethodDef overlapped_functions[] = {
METH_VARARGS, SetEvent_doc},
{"ResetEvent", overlapped_ResetEvent,
METH_VARARGS, ResetEvent_doc},
+ {"ConnectPipe",
+ (PyCFunction) ConnectPipe,
+ METH_VARARGS, ConnectPipe_doc},
{NULL}
};
@@ -1394,6 +1322,7 @@ PyInit__overlapped(void)
WINAPI_CONSTANT(F_DWORD, ERROR_IO_PENDING);
WINAPI_CONSTANT(F_DWORD, ERROR_NETNAME_DELETED);
WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
+ WINAPI_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
WINAPI_CONSTANT(F_DWORD, INFINITE);
WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE);
WINAPI_CONSTANT(F_HANDLE, NULL);