diff options
-rw-r--r-- | Lib/asyncio/windows_events.py | 168 | ||||
-rw-r--r-- | Modules/overlapped.c | 25 |
2 files changed, 159 insertions, 34 deletions
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 82d0966..5105426 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -78,20 +78,23 @@ class _OverlappedFuture(futures.Future): self._ov = None -class _WaitHandleFuture(futures.Future): +class _BaseWaitHandleFuture(futures.Future): """Subclass of Future which represents a wait handle.""" - def __init__(self, iocp, ov, handle, wait_handle, *, loop=None): + def __init__(self, ov, handle, wait_handle, *, loop=None): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] - # iocp and ov are only used by cancel() to notify IocpProactor - # that the wait was cancelled - self._iocp = iocp + # Keep a reference to the Overlapped object to keep it alive until the + # wait is unregistered self._ov = ov self._handle = handle self._wait_handle = wait_handle + # Should we call UnregisterWaitEx() if the wait completes + # or is cancelled? + self._registered = True + def _poll(self): # non-blocking wait: use a timeout of 0 millisecond return (_winapi.WaitForSingleObject(self._handle, 0) == @@ -99,21 +102,32 @@ class _WaitHandleFuture(futures.Future): def _repr_info(self): info = super()._repr_info() - info.insert(1, 'handle=%#x' % self._handle) - if self._wait_handle: + info.append('handle=%#x' % self._handle) + if self._handle is not None: state = 'signaled' if self._poll() else 'waiting' - info.insert(1, 'wait_handle=<%s, %#x>' - % (state, self._wait_handle)) + info.append(state) + if self._wait_handle is not None: + info.append('wait_handle=%#x' % self._wait_handle) return info + def _unregister_wait_cb(self, fut): + # The wait was unregistered: it's not safe to destroy the Overlapped + # object + self._ov = None + def _unregister_wait(self): - if self._wait_handle is None: + if not self._registered: return + self._registered = False + try: _overlapped.UnregisterWait(self._wait_handle) except OSError as exc: - # ERROR_IO_PENDING is not an error, the wait was unregistered - if exc.winerror != _overlapped.ERROR_IO_PENDING: + self._wait_handle = None + if exc.winerror == _overlapped.ERROR_IO_PENDING: + # ERROR_IO_PENDING is not an error, the wait was unregistered + self._unregister_wait_cb(None) + elif exc.winerror != _overlapped.ERROR_IO_PENDING: context = { 'message': 'Failed to unregister the wait handle', 'exception': exc, @@ -122,26 +136,91 @@ class _WaitHandleFuture(futures.Future): if self._source_traceback: context['source_traceback'] = self._source_traceback self._loop.call_exception_handler(context) - self._wait_handle = None - self._iocp = None - self._ov = None + else: + self._wait_handle = None + self._unregister_wait_cb(None) def cancel(self): - result = super().cancel() - if self._ov is not None: - # signal the cancellation to the overlapped object - _overlapped.PostQueuedCompletionStatus(self._iocp, True, - 0, self._ov.address) self._unregister_wait() - return result + return super().cancel() def set_exception(self, exception): - super().set_exception(exception) self._unregister_wait() + super().set_exception(exception) def set_result(self, result): - super().set_result(result) self._unregister_wait() + super().set_result(result) + + +class _WaitCancelFuture(_BaseWaitHandleFuture): + """Subclass of Future which represents a wait for the cancellation of a + _WaitHandleFuture using an event. + """ + + def __init__(self, ov, event, wait_handle, *, loop=None): + super().__init__(ov, event, wait_handle, loop=loop) + + self._done_callback = None + + def _schedule_callbacks(self): + super(_WaitCancelFuture, self)._schedule_callbacks() + if self._done_callback is not None: + self._done_callback(self) + + +class _WaitHandleFuture(_BaseWaitHandleFuture): + def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): + super().__init__(ov, handle, wait_handle, loop=loop) + self._proactor = proactor + self._unregister_proactor = True + self._event = _overlapped.CreateEvent(None, True, False, None) + self._event_fut = None + + def _unregister_wait_cb(self, fut): + if self._event is not None: + _winapi.CloseHandle(self._event) + self._event = None + self._event_fut = None + + # If the wait was cancelled, the wait may never be signalled, so + # it's required to unregister it. Otherwise, IocpProactor.close() will + # wait forever for an event which will never come. + # + # If the IocpProactor already received the event, it's safe to call + # _unregister() because we kept a reference to the Overlapped object + # which is used as an unique key. + self._proactor._unregister(self._ov) + self._proactor = None + + super()._unregister_wait_cb(fut) + + def _unregister_wait(self): + if not self._registered: + return + self._registered = False + + try: + _overlapped.UnregisterWaitEx(self._wait_handle, self._event) + except OSError as exc: + self._wait_handle = None + if exc.winerror == _overlapped.ERROR_IO_PENDING: + # ERROR_IO_PENDING is not an error, the wait was unregistered + self._unregister_wait_cb(None) + elif exc.winerror != _overlapped.ERROR_IO_PENDING: + context = { + 'message': 'Failed to unregister the wait handle', + 'exception': exc, + 'future': self, + } + if self._source_traceback: + context['source_traceback'] = self._source_traceback + self._loop.call_exception_handler(context) + else: + self._wait_handle = None + self._event_fut = self._proactor._wait_cancel( + self._event, + self._unregister_wait_cb) class PipeServer(object): @@ -291,6 +370,7 @@ class IocpProactor: _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) self._cache = {} self._registered = weakref.WeakSet() + self._unregistered = [] self._stopped_serving = weakref.WeakSet() def __repr__(self): @@ -438,6 +518,16 @@ class IocpProactor: Return a Future object. The result of the future is True if the wait completed, or False if the wait did not complete (on timeout). """ + return self._wait_for_handle(handle, timeout, False) + + def _wait_cancel(self, event, done_callback): + fut = self._wait_for_handle(event, None, True) + # add_done_callback() cannot be used because the wait may only complete + # in IocpProactor.close(), while the event loop is not running. + fut._done_callback = done_callback + return fut + + def _wait_for_handle(self, handle, timeout, _is_cancel): if timeout is None: ms = _winapi.INFINITE else: @@ -447,9 +537,13 @@ class IocpProactor: # We only create ov so we can use ov.address as a key for the cache. ov = _overlapped.Overlapped(NULL) - wh = _overlapped.RegisterWaitWithQueue( + wait_handle = _overlapped.RegisterWaitWithQueue( handle, self._iocp, ov.address, ms) - f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop) + if _is_cancel: + f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) + else: + f = _WaitHandleFuture(ov, handle, wait_handle, self, + loop=self._loop) if f._source_traceback: del f._source_traceback[-1] @@ -462,14 +556,6 @@ class IocpProactor: # False even though we have not timed out. return f._poll() - if f._poll(): - try: - result = f._poll() - except OSError as exc: - f.set_exception(exc) - else: - f.set_result(result) - self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) return f @@ -521,6 +607,15 @@ class IocpProactor: self._cache[ov.address] = (f, ov, obj, callback) return f + def _unregister(self, ov): + """Unregister an overlapped object. + + Call this method when its future has been cancelled. The event can + already be signalled (pending in the proactor event queue). It is also + safe if the event is never signalled (because it was cancelled). + """ + self._unregistered.append(ov) + def _get_accept_socket(self, family): s = socket.socket(family) s.settimeout(0) @@ -541,7 +636,7 @@ class IocpProactor: while True: status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) if status is None: - return + break ms = 0 err, transferred, key, address = status @@ -576,6 +671,11 @@ class IocpProactor: f.set_result(value) self._results.append(f) + # Remove unregisted futures + for ov in self._unregistered: + self._cache.pop(ov.address, None) + self._unregistered.clear() + def _stop_serving(self, obj): # obj is a socket or pipe handle. It will be closed in # BaseProactorEventLoop._stop_serving() which will make any diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 6842efb..d22c626 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -309,6 +309,29 @@ overlapped_UnregisterWait(PyObject *self, PyObject *args) Py_RETURN_NONE; } +PyDoc_STRVAR( + UnregisterWaitEx_doc, + "UnregisterWaitEx(WaitHandle, Event) -> None\n\n" + "Unregister wait handle.\n"); + +static PyObject * +overlapped_UnregisterWaitEx(PyObject *self, PyObject *args) +{ + HANDLE WaitHandle, Event; + BOOL ret; + + if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE, &WaitHandle, &Event)) + return NULL; + + Py_BEGIN_ALLOW_THREADS + ret = UnregisterWaitEx(WaitHandle, Event); + Py_END_ALLOW_THREADS + + if (!ret) + return SetFromWindowsErr(0); + Py_RETURN_NONE; +} + /* * Event functions -- currently only used by tests */ @@ -1319,6 +1342,8 @@ static PyMethodDef overlapped_functions[] = { METH_VARARGS, RegisterWaitWithQueue_doc}, {"UnregisterWait", overlapped_UnregisterWait, METH_VARARGS, UnregisterWait_doc}, + {"UnregisterWaitEx", overlapped_UnregisterWaitEx, + METH_VARARGS, UnregisterWaitEx_doc}, {"CreateEvent", overlapped_CreateEvent, METH_VARARGS, CreateEvent_doc}, {"SetEvent", overlapped_SetEvent, |