diff options
Diffstat (limited to 'Lib/asyncio/locks.py')
-rw-r--r-- | Lib/asyncio/locks.py | 64 |
1 files changed, 42 insertions, 22 deletions
diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py index 7b81c25..fc03830 100644 --- a/Lib/asyncio/locks.py +++ b/Lib/asyncio/locks.py @@ -349,9 +349,8 @@ class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin): super().__init__(loop=loop) if value < 0: raise ValueError("Semaphore initial value must be >= 0") + self._waiters = None self._value = value - self._waiters = collections.deque() - self._wakeup_scheduled = False def __repr__(self): res = super().__repr__() @@ -360,16 +359,8 @@ class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin): extra = f'{extra}, waiters:{len(self._waiters)}' return f'<{res[1:-1]} [{extra}]>' - def _wake_up_next(self): - while self._waiters: - waiter = self._waiters.popleft() - if not waiter.done(): - waiter.set_result(None) - self._wakeup_scheduled = True - return - def locked(self): - """Returns True if semaphore can not be acquired immediately.""" + """Returns True if semaphore counter is zero.""" return self._value == 0 async def acquire(self): @@ -381,28 +372,57 @@ class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin): called release() to make it larger than 0, and then return True. """ - # _wakeup_scheduled is set if *another* task is scheduled to wakeup - # but its acquire() is not resumed yet - while self._wakeup_scheduled or self._value <= 0: - fut = self._get_loop().create_future() - self._waiters.append(fut) + if (not self.locked() and (self._waiters is None or + all(w.cancelled() for w in self._waiters))): + self._value -= 1 + return True + + if self._waiters is None: + self._waiters = collections.deque() + fut = self._get_loop().create_future() + self._waiters.append(fut) + + # Finally block should be called before the CancelledError + # handling as we don't want CancelledError to call + # _wake_up_first() and attempt to wake up itself. + try: try: await fut - # reset _wakeup_scheduled *after* waiting for a future - self._wakeup_scheduled = False - except exceptions.CancelledError: - self._wake_up_next() - raise + finally: + self._waiters.remove(fut) + except exceptions.CancelledError: + if not self.locked(): + self._wake_up_first() + raise + self._value -= 1 + if not self.locked(): + self._wake_up_first() return True def release(self): """Release a semaphore, incrementing the internal counter by one. + When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine. """ self._value += 1 - self._wake_up_next() + self._wake_up_first() + + def _wake_up_first(self): + """Wake up the first waiter if it isn't done.""" + if not self._waiters: + return + try: + fut = next(iter(self._waiters)) + except StopIteration: + return + + # .done() necessarily means that a waiter will wake up later on and + # either take the lock, or, if it was cancelled and lock wasn't + # taken already, will hit this again and wake up a new waiter. + if not fut.done(): + fut.set_result(True) class BoundedSemaphore(Semaphore): |