summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
authorMiss Islington (bot) <31488909+miss-islington@users.noreply.github.com>2022-09-22 17:01:14 (GMT)
committerGitHub <noreply@github.com>2022-09-22 17:01:14 (GMT)
commit646aa7efb3a069150614049d43b469c2c9b4a965 (patch)
treea36c0e952ef18ff8680393f3e2e7306abbf803a9 /Lib/asyncio
parentc9670495bb618651a07d695bd2b4c91b9a52103a (diff)
downloadcpython-646aa7efb3a069150614049d43b469c2c9b4a965.zip
cpython-646aa7efb3a069150614049d43b469c2c9b4a965.tar.gz
cpython-646aa7efb3a069150614049d43b469c2c9b4a965.tar.bz2
gh-90155: Fix bug in asyncio.Semaphore and strengthen FIFO guarantee (GH-93222)
The main problem was that an unluckily timed task cancellation could cause the semaphore to be stuck. There were also doubts about strict FIFO ordering of tasks allowed to pass. The Semaphore implementation was rewritten to be more similar to Lock. Many tests for edge cases (including cancellation) were added. (cherry picked from commit 24e03796248ab8c7f62d715c28156abe2f1c0d20) Co-authored-by: Cyker Way <cykerway@gmail.com>
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/locks.py64
1 files changed, 42 insertions, 22 deletions
diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py
index 7b81c25..fc03830 100644
--- a/Lib/asyncio/locks.py
+++ b/Lib/asyncio/locks.py
@@ -349,9 +349,8 @@ class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
super().__init__(loop=loop)
if value < 0:
raise ValueError("Semaphore initial value must be >= 0")
+ self._waiters = None
self._value = value
- self._waiters = collections.deque()
- self._wakeup_scheduled = False
def __repr__(self):
res = super().__repr__()
@@ -360,16 +359,8 @@ class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
extra = f'{extra}, waiters:{len(self._waiters)}'
return f'<{res[1:-1]} [{extra}]>'
- def _wake_up_next(self):
- while self._waiters:
- waiter = self._waiters.popleft()
- if not waiter.done():
- waiter.set_result(None)
- self._wakeup_scheduled = True
- return
-
def locked(self):
- """Returns True if semaphore can not be acquired immediately."""
+ """Returns True if semaphore counter is zero."""
return self._value == 0
async def acquire(self):
@@ -381,28 +372,57 @@ class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
called release() to make it larger than 0, and then return
True.
"""
- # _wakeup_scheduled is set if *another* task is scheduled to wakeup
- # but its acquire() is not resumed yet
- while self._wakeup_scheduled or self._value <= 0:
- fut = self._get_loop().create_future()
- self._waiters.append(fut)
+ if (not self.locked() and (self._waiters is None or
+ all(w.cancelled() for w in self._waiters))):
+ self._value -= 1
+ return True
+
+ if self._waiters is None:
+ self._waiters = collections.deque()
+ fut = self._get_loop().create_future()
+ self._waiters.append(fut)
+
+ # Finally block should be called before the CancelledError
+ # handling as we don't want CancelledError to call
+ # _wake_up_first() and attempt to wake up itself.
+ try:
try:
await fut
- # reset _wakeup_scheduled *after* waiting for a future
- self._wakeup_scheduled = False
- except exceptions.CancelledError:
- self._wake_up_next()
- raise
+ finally:
+ self._waiters.remove(fut)
+ except exceptions.CancelledError:
+ if not self.locked():
+ self._wake_up_first()
+ raise
+
self._value -= 1
+ if not self.locked():
+ self._wake_up_first()
return True
def release(self):
"""Release a semaphore, incrementing the internal counter by one.
+
When it was zero on entry and another coroutine is waiting for it to
become larger than zero again, wake up that coroutine.
"""
self._value += 1
- self._wake_up_next()
+ self._wake_up_first()
+
+ def _wake_up_first(self):
+ """Wake up the first waiter if it isn't done."""
+ if not self._waiters:
+ return
+ try:
+ fut = next(iter(self._waiters))
+ except StopIteration:
+ return
+
+ # .done() necessarily means that a waiter will wake up later on and
+ # either take the lock, or, if it was cancelled and lock wasn't
+ # taken already, will hit this again and wake up a new waiter.
+ if not fut.done():
+ fut.set_result(True)
class BoundedSemaphore(Semaphore):