summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/locks.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio/locks.py')
-rw-r--r--Lib/asyncio/locks.py64
1 files changed, 42 insertions, 22 deletions
diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py
index 7b81c25..fc03830 100644
--- a/Lib/asyncio/locks.py
+++ b/Lib/asyncio/locks.py
@@ -349,9 +349,8 @@ class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
super().__init__(loop=loop)
if value < 0:
raise ValueError("Semaphore initial value must be >= 0")
+ self._waiters = None
self._value = value
- self._waiters = collections.deque()
- self._wakeup_scheduled = False
def __repr__(self):
res = super().__repr__()
@@ -360,16 +359,8 @@ class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
extra = f'{extra}, waiters:{len(self._waiters)}'
return f'<{res[1:-1]} [{extra}]>'
- def _wake_up_next(self):
- while self._waiters:
- waiter = self._waiters.popleft()
- if not waiter.done():
- waiter.set_result(None)
- self._wakeup_scheduled = True
- return
-
def locked(self):
- """Returns True if semaphore can not be acquired immediately."""
+ """Returns True if semaphore counter is zero."""
return self._value == 0
async def acquire(self):
@@ -381,28 +372,57 @@ class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
called release() to make it larger than 0, and then return
True.
"""
- # _wakeup_scheduled is set if *another* task is scheduled to wakeup
- # but its acquire() is not resumed yet
- while self._wakeup_scheduled or self._value <= 0:
- fut = self._get_loop().create_future()
- self._waiters.append(fut)
+ if (not self.locked() and (self._waiters is None or
+ all(w.cancelled() for w in self._waiters))):
+ self._value -= 1
+ return True
+
+ if self._waiters is None:
+ self._waiters = collections.deque()
+ fut = self._get_loop().create_future()
+ self._waiters.append(fut)
+
+ # Finally block should be called before the CancelledError
+ # handling as we don't want CancelledError to call
+ # _wake_up_first() and attempt to wake up itself.
+ try:
try:
await fut
- # reset _wakeup_scheduled *after* waiting for a future
- self._wakeup_scheduled = False
- except exceptions.CancelledError:
- self._wake_up_next()
- raise
+ finally:
+ self._waiters.remove(fut)
+ except exceptions.CancelledError:
+ if not self.locked():
+ self._wake_up_first()
+ raise
+
self._value -= 1
+ if not self.locked():
+ self._wake_up_first()
return True
def release(self):
"""Release a semaphore, incrementing the internal counter by one.
+
When it was zero on entry and another coroutine is waiting for it to
become larger than zero again, wake up that coroutine.
"""
self._value += 1
- self._wake_up_next()
+ self._wake_up_first()
+
+ def _wake_up_first(self):
+ """Wake up the first waiter if it isn't done."""
+ if not self._waiters:
+ return
+ try:
+ fut = next(iter(self._waiters))
+ except StopIteration:
+ return
+
+ # .done() necessarily means that a waiter will wake up later on and
+ # either take the lock, or, if it was cancelled and lock wasn't
+ # taken already, will hit this again and wake up a new waiter.
+ if not fut.done():
+ fut.set_result(True)
class BoundedSemaphore(Semaphore):