diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2015-01-21 22:39:51 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2015-01-21 22:39:51 (GMT) |
commit | d0a28dee78d099fcadc71147cba4affb6efa0c97 (patch) | |
tree | dbfccff2f3a3727e942ca6518398efc236f2d83e /Lib/asyncio | |
parent | 442b0adccda014e81950a38b1a56e8f06131767a (diff) | |
download | cpython-d0a28dee78d099fcadc71147cba4affb6efa0c97.zip cpython-d0a28dee78d099fcadc71147cba4affb6efa0c97.tar.gz cpython-d0a28dee78d099fcadc71147cba4affb6efa0c97.tar.bz2 |
Issue #23095, asyncio: Rewrite _WaitHandleFuture.cancel()
This change fixes a race conditon related to _WaitHandleFuture.cancel() leading
to Python crash or "GetQueuedCompletionStatus() returned an unexpected event"
logs. Before, the overlapped object was destroyed too early, it was possible
that the wait completed whereas the overlapped object was already destroyed.
Sometimes, a different overlapped was allocated at the same address, leading to
unexpected completition.
_WaitHandleFuture.cancel() now waits until the wait is cancelled to clear its
reference to the overlapped object. To wait until the cancellation is done,
UnregisterWaitEx() is used with an event instead of UnregisterWait().
To wait for this event, a new _WaitCancelFuture class was added. It's a
simplified version of _WaitCancelFuture. For example, its cancel() method calls
UnregisterWait(), not UnregisterWaitEx(). _WaitCancelFuture should not be
cancelled.
The overlapped object is kept alive in _WaitHandleFuture until the wait is
unregistered.
Other changes:
* Add _overlapped.UnregisterWaitEx()
* Remove fast-path in IocpProactor.wait_for_handle() to immediatly set the
result if the wait already completed. I'm not sure that it's safe to
call immediatly UnregisterWaitEx() before the completion was signaled.
* Add IocpProactor._unregistered() to forget an overlapped which may never be
signaled, but may be signaled for the next loop iteration. It avoids to
block forever IocpProactor.close() if a wait was cancelled, and it may also
avoid some "... unexpected event ..." warnings.
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/windows_events.py | 168 |
1 files changed, 134 insertions, 34 deletions
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 82d0966..5105426 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -78,20 +78,23 @@ class _OverlappedFuture(futures.Future): self._ov = None -class _WaitHandleFuture(futures.Future): +class _BaseWaitHandleFuture(futures.Future): """Subclass of Future which represents a wait handle.""" - def __init__(self, iocp, ov, handle, wait_handle, *, loop=None): + def __init__(self, ov, handle, wait_handle, *, loop=None): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] - # iocp and ov are only used by cancel() to notify IocpProactor - # that the wait was cancelled - self._iocp = iocp + # Keep a reference to the Overlapped object to keep it alive until the + # wait is unregistered self._ov = ov self._handle = handle self._wait_handle = wait_handle + # Should we call UnregisterWaitEx() if the wait completes + # or is cancelled? + self._registered = True + def _poll(self): # non-blocking wait: use a timeout of 0 millisecond return (_winapi.WaitForSingleObject(self._handle, 0) == @@ -99,21 +102,32 @@ class _WaitHandleFuture(futures.Future): def _repr_info(self): info = super()._repr_info() - info.insert(1, 'handle=%#x' % self._handle) - if self._wait_handle: + info.append('handle=%#x' % self._handle) + if self._handle is not None: state = 'signaled' if self._poll() else 'waiting' - info.insert(1, 'wait_handle=<%s, %#x>' - % (state, self._wait_handle)) + info.append(state) + if self._wait_handle is not None: + info.append('wait_handle=%#x' % self._wait_handle) return info + def _unregister_wait_cb(self, fut): + # The wait was unregistered: it's not safe to destroy the Overlapped + # object + self._ov = None + def _unregister_wait(self): - if self._wait_handle is None: + if not self._registered: return + self._registered = False + try: _overlapped.UnregisterWait(self._wait_handle) except OSError as exc: - # ERROR_IO_PENDING is not an error, the wait was unregistered - if exc.winerror != _overlapped.ERROR_IO_PENDING: + self._wait_handle = None + if exc.winerror == _overlapped.ERROR_IO_PENDING: + # ERROR_IO_PENDING is not an error, the wait was unregistered + self._unregister_wait_cb(None) + elif exc.winerror != _overlapped.ERROR_IO_PENDING: context = { 'message': 'Failed to unregister the wait handle', 'exception': exc, @@ -122,26 +136,91 @@ class _WaitHandleFuture(futures.Future): if self._source_traceback: context['source_traceback'] = self._source_traceback self._loop.call_exception_handler(context) - self._wait_handle = None - self._iocp = None - self._ov = None + else: + self._wait_handle = None + self._unregister_wait_cb(None) def cancel(self): - result = super().cancel() - if self._ov is not None: - # signal the cancellation to the overlapped object - _overlapped.PostQueuedCompletionStatus(self._iocp, True, - 0, self._ov.address) self._unregister_wait() - return result + return super().cancel() def set_exception(self, exception): - super().set_exception(exception) self._unregister_wait() + super().set_exception(exception) def set_result(self, result): - super().set_result(result) self._unregister_wait() + super().set_result(result) + + +class _WaitCancelFuture(_BaseWaitHandleFuture): + """Subclass of Future which represents a wait for the cancellation of a + _WaitHandleFuture using an event. + """ + + def __init__(self, ov, event, wait_handle, *, loop=None): + super().__init__(ov, event, wait_handle, loop=loop) + + self._done_callback = None + + def _schedule_callbacks(self): + super(_WaitCancelFuture, self)._schedule_callbacks() + if self._done_callback is not None: + self._done_callback(self) + + +class _WaitHandleFuture(_BaseWaitHandleFuture): + def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): + super().__init__(ov, handle, wait_handle, loop=loop) + self._proactor = proactor + self._unregister_proactor = True + self._event = _overlapped.CreateEvent(None, True, False, None) + self._event_fut = None + + def _unregister_wait_cb(self, fut): + if self._event is not None: + _winapi.CloseHandle(self._event) + self._event = None + self._event_fut = None + + # If the wait was cancelled, the wait may never be signalled, so + # it's required to unregister it. Otherwise, IocpProactor.close() will + # wait forever for an event which will never come. + # + # If the IocpProactor already received the event, it's safe to call + # _unregister() because we kept a reference to the Overlapped object + # which is used as an unique key. + self._proactor._unregister(self._ov) + self._proactor = None + + super()._unregister_wait_cb(fut) + + def _unregister_wait(self): + if not self._registered: + return + self._registered = False + + try: + _overlapped.UnregisterWaitEx(self._wait_handle, self._event) + except OSError as exc: + self._wait_handle = None + if exc.winerror == _overlapped.ERROR_IO_PENDING: + # ERROR_IO_PENDING is not an error, the wait was unregistered + self._unregister_wait_cb(None) + elif exc.winerror != _overlapped.ERROR_IO_PENDING: + context = { + 'message': 'Failed to unregister the wait handle', + 'exception': exc, + 'future': self, + } + if self._source_traceback: + context['source_traceback'] = self._source_traceback + self._loop.call_exception_handler(context) + else: + self._wait_handle = None + self._event_fut = self._proactor._wait_cancel( + self._event, + self._unregister_wait_cb) class PipeServer(object): @@ -291,6 +370,7 @@ class IocpProactor: _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) self._cache = {} self._registered = weakref.WeakSet() + self._unregistered = [] self._stopped_serving = weakref.WeakSet() def __repr__(self): @@ -438,6 +518,16 @@ class IocpProactor: Return a Future object. The result of the future is True if the wait completed, or False if the wait did not complete (on timeout). """ + return self._wait_for_handle(handle, timeout, False) + + def _wait_cancel(self, event, done_callback): + fut = self._wait_for_handle(event, None, True) + # add_done_callback() cannot be used because the wait may only complete + # in IocpProactor.close(), while the event loop is not running. + fut._done_callback = done_callback + return fut + + def _wait_for_handle(self, handle, timeout, _is_cancel): if timeout is None: ms = _winapi.INFINITE else: @@ -447,9 +537,13 @@ class IocpProactor: # We only create ov so we can use ov.address as a key for the cache. ov = _overlapped.Overlapped(NULL) - wh = _overlapped.RegisterWaitWithQueue( + wait_handle = _overlapped.RegisterWaitWithQueue( handle, self._iocp, ov.address, ms) - f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop) + if _is_cancel: + f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) + else: + f = _WaitHandleFuture(ov, handle, wait_handle, self, + loop=self._loop) if f._source_traceback: del f._source_traceback[-1] @@ -462,14 +556,6 @@ class IocpProactor: # False even though we have not timed out. return f._poll() - if f._poll(): - try: - result = f._poll() - except OSError as exc: - f.set_exception(exc) - else: - f.set_result(result) - self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) return f @@ -521,6 +607,15 @@ class IocpProactor: self._cache[ov.address] = (f, ov, obj, callback) return f + def _unregister(self, ov): + """Unregister an overlapped object. + + Call this method when its future has been cancelled. The event can + already be signalled (pending in the proactor event queue). It is also + safe if the event is never signalled (because it was cancelled). + """ + self._unregistered.append(ov) + def _get_accept_socket(self, family): s = socket.socket(family) s.settimeout(0) @@ -541,7 +636,7 @@ class IocpProactor: while True: status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) if status is None: - return + break ms = 0 err, transferred, key, address = status @@ -576,6 +671,11 @@ class IocpProactor: f.set_result(value) self._results.append(f) + # Remove unregisted futures + for ov in self._unregistered: + self._cache.pop(ov.address, None) + self._unregistered.clear() + def _stop_serving(self, obj): # obj is a socket or pipe handle. It will be closed in # BaseProactorEventLoop._stop_serving() which will make any |