diff options
Diffstat (limited to 'Lib/asyncio/windows_events.py')
| -rw-r--r-- | Lib/asyncio/windows_events.py | 168 | 
1 files changed, 134 insertions, 34 deletions
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 82d0966..5105426 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -78,20 +78,23 @@ class _OverlappedFuture(futures.Future):          self._ov = None -class _WaitHandleFuture(futures.Future): +class _BaseWaitHandleFuture(futures.Future):      """Subclass of Future which represents a wait handle.""" -    def __init__(self, iocp, ov, handle, wait_handle, *, loop=None): +    def __init__(self, ov, handle, wait_handle, *, loop=None):          super().__init__(loop=loop)          if self._source_traceback:              del self._source_traceback[-1] -        # iocp and ov are only used by cancel() to notify IocpProactor -        # that the wait was cancelled -        self._iocp = iocp +        # Keep a reference to the Overlapped object to keep it alive until the +        # wait is unregistered          self._ov = ov          self._handle = handle          self._wait_handle = wait_handle +        # Should we call UnregisterWaitEx() if the wait completes +        # or is cancelled? +        self._registered = True +      def _poll(self):          # non-blocking wait: use a timeout of 0 millisecond          return (_winapi.WaitForSingleObject(self._handle, 0) == @@ -99,21 +102,32 @@ class _WaitHandleFuture(futures.Future):      def _repr_info(self):          info = super()._repr_info() -        info.insert(1, 'handle=%#x' % self._handle) -        if self._wait_handle: +        info.append('handle=%#x' % self._handle) +        if self._handle is not None:              state = 'signaled' if self._poll() else 'waiting' -            info.insert(1, 'wait_handle=<%s, %#x>' -                           % (state, self._wait_handle)) +            info.append(state) +        if self._wait_handle is not None: +            info.append('wait_handle=%#x' % self._wait_handle)          return info +    def _unregister_wait_cb(self, fut): +        # The wait was unregistered: it's not safe to destroy the Overlapped +        # object +        self._ov = None +      def _unregister_wait(self): -        if self._wait_handle is None: +        if not self._registered:              return +        self._registered = False +          try:              _overlapped.UnregisterWait(self._wait_handle)          except OSError as exc: -            # ERROR_IO_PENDING is not an error, the wait was unregistered -            if exc.winerror != _overlapped.ERROR_IO_PENDING: +            self._wait_handle = None +            if exc.winerror == _overlapped.ERROR_IO_PENDING: +                # ERROR_IO_PENDING is not an error, the wait was unregistered +                self._unregister_wait_cb(None) +            elif exc.winerror != _overlapped.ERROR_IO_PENDING:                  context = {                      'message': 'Failed to unregister the wait handle',                      'exception': exc, @@ -122,26 +136,91 @@ class _WaitHandleFuture(futures.Future):                  if self._source_traceback:                      context['source_traceback'] = self._source_traceback                  self._loop.call_exception_handler(context) -        self._wait_handle = None -        self._iocp = None -        self._ov = None +        else: +            self._wait_handle = None +            self._unregister_wait_cb(None)      def cancel(self): -        result = super().cancel() -        if self._ov is not None: -            # signal the cancellation to the overlapped object -            _overlapped.PostQueuedCompletionStatus(self._iocp, True, -                                                   0, self._ov.address)          self._unregister_wait() -        return result +        return super().cancel()      def set_exception(self, exception): -        super().set_exception(exception)          self._unregister_wait() +        super().set_exception(exception)      def set_result(self, result): -        super().set_result(result)          self._unregister_wait() +        super().set_result(result) + + +class _WaitCancelFuture(_BaseWaitHandleFuture): +    """Subclass of Future which represents a wait for the cancellation of a +    _WaitHandleFuture using an event. +    """ + +    def __init__(self, ov, event, wait_handle, *, loop=None): +        super().__init__(ov, event, wait_handle, loop=loop) + +        self._done_callback = None + +    def _schedule_callbacks(self): +        super(_WaitCancelFuture, self)._schedule_callbacks() +        if self._done_callback is not None: +            self._done_callback(self) + + +class _WaitHandleFuture(_BaseWaitHandleFuture): +    def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): +        super().__init__(ov, handle, wait_handle, loop=loop) +        self._proactor = proactor +        self._unregister_proactor = True +        self._event = _overlapped.CreateEvent(None, True, False, None) +        self._event_fut = None + +    def _unregister_wait_cb(self, fut): +        if self._event is not None: +            _winapi.CloseHandle(self._event) +            self._event = None +            self._event_fut = None + +        # If the wait was cancelled, the wait may never be signalled, so +        # it's required to unregister it. Otherwise, IocpProactor.close() will +        # wait forever for an event which will never come. +        # +        # If the IocpProactor already received the event, it's safe to call +        # _unregister() because we kept a reference to the Overlapped object +        # which is used as an unique key. +        self._proactor._unregister(self._ov) +        self._proactor = None + +        super()._unregister_wait_cb(fut) + +    def _unregister_wait(self): +        if not self._registered: +            return +        self._registered = False + +        try: +            _overlapped.UnregisterWaitEx(self._wait_handle, self._event) +        except OSError as exc: +            self._wait_handle = None +            if exc.winerror == _overlapped.ERROR_IO_PENDING: +                # ERROR_IO_PENDING is not an error, the wait was unregistered +                self._unregister_wait_cb(None) +            elif exc.winerror != _overlapped.ERROR_IO_PENDING: +                context = { +                    'message': 'Failed to unregister the wait handle', +                    'exception': exc, +                    'future': self, +                } +                if self._source_traceback: +                    context['source_traceback'] = self._source_traceback +                self._loop.call_exception_handler(context) +        else: +            self._wait_handle = None +            self._event_fut = self._proactor._wait_cancel( +                                                self._event, +                                                self._unregister_wait_cb)  class PipeServer(object): @@ -291,6 +370,7 @@ class IocpProactor:              _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)          self._cache = {}          self._registered = weakref.WeakSet() +        self._unregistered = []          self._stopped_serving = weakref.WeakSet()      def __repr__(self): @@ -438,6 +518,16 @@ class IocpProactor:          Return a Future object. The result of the future is True if the wait          completed, or False if the wait did not complete (on timeout).          """ +        return self._wait_for_handle(handle, timeout, False) + +    def _wait_cancel(self, event, done_callback): +        fut = self._wait_for_handle(event, None, True) +        # add_done_callback() cannot be used because the wait may only complete +        # in IocpProactor.close(), while the event loop is not running. +        fut._done_callback = done_callback +        return fut + +    def _wait_for_handle(self, handle, timeout, _is_cancel):          if timeout is None:              ms = _winapi.INFINITE          else: @@ -447,9 +537,13 @@ class IocpProactor:          # We only create ov so we can use ov.address as a key for the cache.          ov = _overlapped.Overlapped(NULL) -        wh = _overlapped.RegisterWaitWithQueue( +        wait_handle = _overlapped.RegisterWaitWithQueue(              handle, self._iocp, ov.address, ms) -        f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop) +        if _is_cancel: +            f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) +        else: +            f = _WaitHandleFuture(ov, handle, wait_handle, self, +                                  loop=self._loop)          if f._source_traceback:              del f._source_traceback[-1] @@ -462,14 +556,6 @@ class IocpProactor:              # False even though we have not timed out.              return f._poll() -        if f._poll(): -            try: -                result = f._poll() -            except OSError as exc: -                f.set_exception(exc) -            else: -                f.set_result(result) -          self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)          return f @@ -521,6 +607,15 @@ class IocpProactor:              self._cache[ov.address] = (f, ov, obj, callback)          return f +    def _unregister(self, ov): +        """Unregister an overlapped object. + +        Call this method when its future has been cancelled. The event can +        already be signalled (pending in the proactor event queue). It is also +        safe if the event is never signalled (because it was cancelled). +        """ +        self._unregistered.append(ov) +      def _get_accept_socket(self, family):          s = socket.socket(family)          s.settimeout(0) @@ -541,7 +636,7 @@ class IocpProactor:          while True:              status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)              if status is None: -                return +                break              ms = 0              err, transferred, key, address = status @@ -576,6 +671,11 @@ class IocpProactor:                      f.set_result(value)                      self._results.append(f) +        # Remove unregisted futures +        for ov in self._unregistered: +            self._cache.pop(ov.address, None) +        self._unregistered.clear() +      def _stop_serving(self, obj):          # obj is a socket or pipe handle.  It will be closed in          # BaseProactorEventLoop._stop_serving() which will make any  | 
