summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/locks.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio/locks.py')
-rw-r--r--Lib/asyncio/locks.py401
1 files changed, 401 insertions, 0 deletions
diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py
new file mode 100644
index 0000000..06edbbc
--- /dev/null
+++ b/Lib/asyncio/locks.py
@@ -0,0 +1,401 @@
+"""Synchronization primitives."""
+
+__all__ = ['Lock', 'Event', 'Condition', 'Semaphore']
+
+import collections
+
+from . import events
+from . import futures
+from . import tasks
+
+
+class Lock:
+ """Primitive lock objects.
+
+ A primitive lock is a synchronization primitive that is not owned
+ by a particular coroutine when locked. A primitive lock is in one
+ of two states, 'locked' or 'unlocked'.
+
+ It is created in the unlocked state. It has two basic methods,
+ acquire() and release(). When the state is unlocked, acquire()
+ changes the state to locked and returns immediately. When the
+ state is locked, acquire() blocks until a call to release() in
+ another coroutine changes it to unlocked, then the acquire() call
+ resets it to locked and returns. The release() method should only
+ be called in the locked state; it changes the state to unlocked
+ and returns immediately. If an attempt is made to release an
+ unlocked lock, a RuntimeError will be raised.
+
+ When more than one coroutine is blocked in acquire() waiting for
+ the state to turn to unlocked, only one coroutine proceeds when a
+ release() call resets the state to unlocked; first coroutine which
+ is blocked in acquire() is being processed.
+
+ acquire() is a coroutine and should be called with 'yield from'.
+
+ Locks also support the context manager protocol. '(yield from lock)'
+ should be used as context manager expression.
+
+ Usage:
+
+ lock = Lock()
+ ...
+ yield from lock
+ try:
+ ...
+ finally:
+ lock.release()
+
+ Context manager usage:
+
+ lock = Lock()
+ ...
+ with (yield from lock):
+ ...
+
+ Lock objects can be tested for locking state:
+
+ if not lock.locked():
+ yield from lock
+ else:
+ # lock is acquired
+ ...
+
+ """
+
+ def __init__(self, *, loop=None):
+ self._waiters = collections.deque()
+ self._locked = False
+ if loop is not None:
+ self._loop = loop
+ else:
+ self._loop = events.get_event_loop()
+
+ def __repr__(self):
+ res = super().__repr__()
+ extra = 'locked' if self._locked else 'unlocked'
+ if self._waiters:
+ extra = '{},waiters:{}'.format(extra, len(self._waiters))
+ return '<{} [{}]>'.format(res[1:-1], extra)
+
+ def locked(self):
+ """Return true if lock is acquired."""
+ return self._locked
+
+ @tasks.coroutine
+ def acquire(self):
+ """Acquire a lock.
+
+ This method blocks until the lock is unlocked, then sets it to
+ locked and returns True.
+ """
+ if not self._waiters and not self._locked:
+ self._locked = True
+ return True
+
+ fut = futures.Future(loop=self._loop)
+ self._waiters.append(fut)
+ try:
+ yield from fut
+ self._locked = True
+ return True
+ finally:
+ self._waiters.remove(fut)
+
+ def release(self):
+ """Release a lock.
+
+ When the lock is locked, reset it to unlocked, and return.
+ If any other coroutines are blocked waiting for the lock to become
+ unlocked, allow exactly one of them to proceed.
+
+ When invoked on an unlocked lock, a RuntimeError is raised.
+
+ There is no return value.
+ """
+ if self._locked:
+ self._locked = False
+ # Wake up the first waiter who isn't cancelled.
+ for fut in self._waiters:
+ if not fut.done():
+ fut.set_result(True)
+ break
+ else:
+ raise RuntimeError('Lock is not acquired.')
+
+ def __enter__(self):
+ if not self._locked:
+ raise RuntimeError(
+ '"yield from" should be used as context manager expression')
+ return True
+
+ def __exit__(self, *args):
+ self.release()
+
+ def __iter__(self):
+ yield from self.acquire()
+ return self
+
+
+class Event:
+ """An Event implementation, our equivalent to threading.Event.
+
+ Class implementing event objects. An event manages a flag that can be set
+ to true with the set() method and reset to false with the clear() method.
+ The wait() method blocks until the flag is true. The flag is initially
+ false.
+ """
+
+ def __init__(self, *, loop=None):
+ self._waiters = collections.deque()
+ self._value = False
+ if loop is not None:
+ self._loop = loop
+ else:
+ self._loop = events.get_event_loop()
+
+ def __repr__(self):
+ # TODO: add waiters:N if > 0.
+ res = super().__repr__()
+ return '<{} [{}]>'.format(res[1:-1], 'set' if self._value else 'unset')
+
+ def is_set(self):
+ """Return true if and only if the internal flag is true."""
+ return self._value
+
+ def set(self):
+ """Set the internal flag to true. All coroutines waiting for it to
+ become true are awakened. Coroutine that call wait() once the flag is
+ true will not block at all.
+ """
+ if not self._value:
+ self._value = True
+
+ for fut in self._waiters:
+ if not fut.done():
+ fut.set_result(True)
+
+ def clear(self):
+ """Reset the internal flag to false. Subsequently, coroutines calling
+ wait() will block until set() is called to set the internal flag
+ to true again."""
+ self._value = False
+
+ @tasks.coroutine
+ def wait(self):
+ """Block until the internal flag is true.
+
+ If the internal flag is true on entry, return True
+ immediately. Otherwise, block until another coroutine calls
+ set() to set the flag to true, then return True.
+ """
+ if self._value:
+ return True
+
+ fut = futures.Future(loop=self._loop)
+ self._waiters.append(fut)
+ try:
+ yield from fut
+ return True
+ finally:
+ self._waiters.remove(fut)
+
+
+# TODO: Why is this a Lock subclass? threading.Condition *has* a lock.
+class Condition(Lock):
+ """A Condition implementation.
+
+ This class implements condition variable objects. A condition variable
+ allows one or more coroutines to wait until they are notified by another
+ coroutine.
+ """
+
+ def __init__(self, *, loop=None):
+ super().__init__(loop=loop)
+ self._condition_waiters = collections.deque()
+
+ # TODO: Add __repr__() with len(_condition_waiters).
+
+ @tasks.coroutine
+ def wait(self):
+ """Wait until notified.
+
+ If the calling coroutine has not acquired the lock when this
+ method is called, a RuntimeError is raised.
+
+ This method releases the underlying lock, and then blocks
+ until it is awakened by a notify() or notify_all() call for
+ the same condition variable in another coroutine. Once
+ awakened, it re-acquires the lock and returns True.
+ """
+ if not self._locked:
+ raise RuntimeError('cannot wait on un-acquired lock')
+
+ keep_lock = True
+ self.release()
+ try:
+ fut = futures.Future(loop=self._loop)
+ self._condition_waiters.append(fut)
+ try:
+ yield from fut
+ return True
+ finally:
+ self._condition_waiters.remove(fut)
+
+ except GeneratorExit:
+ keep_lock = False # Prevent yield in finally clause.
+ raise
+ finally:
+ if keep_lock:
+ yield from self.acquire()
+
+ @tasks.coroutine
+ def wait_for(self, predicate):
+ """Wait until a predicate becomes true.
+
+ The predicate should be a callable which result will be
+ interpreted as a boolean value. The final predicate value is
+ the return value.
+ """
+ result = predicate()
+ while not result:
+ yield from self.wait()
+ result = predicate()
+ return result
+
+ def notify(self, n=1):
+ """By default, wake up one coroutine waiting on this condition, if any.
+ If the calling coroutine has not acquired the lock when this method
+ is called, a RuntimeError is raised.
+
+ This method wakes up at most n of the coroutines waiting for the
+ condition variable; it is a no-op if no coroutines are waiting.
+
+ Note: an awakened coroutine does not actually return from its
+ wait() call until it can reacquire the lock. Since notify() does
+ not release the lock, its caller should.
+ """
+ if not self._locked:
+ raise RuntimeError('cannot notify on un-acquired lock')
+
+ idx = 0
+ for fut in self._condition_waiters:
+ if idx >= n:
+ break
+
+ if not fut.done():
+ idx += 1
+ fut.set_result(False)
+
+ def notify_all(self):
+ """Wake up all threads waiting on this condition. This method acts
+ like notify(), but wakes up all waiting threads instead of one. If the
+ calling thread has not acquired the lock when this method is called,
+ a RuntimeError is raised.
+ """
+ self.notify(len(self._condition_waiters))
+
+
+class Semaphore:
+ """A Semaphore implementation.
+
+ A semaphore manages an internal counter which is decremented by each
+ acquire() call and incremented by each release() call. The counter
+ can never go below zero; when acquire() finds that it is zero, it blocks,
+ waiting until some other thread calls release().
+
+ Semaphores also support the context manager protocol.
+
+ The first optional argument gives the initial value for the internal
+ counter; it defaults to 1. If the value given is less than 0,
+ ValueError is raised.
+
+ The second optional argument determins can semophore be released more than
+ initial internal counter value; it defaults to False. If the value given
+ is True and number of release() is more than number of successfull
+ acquire() calls ValueError is raised.
+ """
+
+ def __init__(self, value=1, bound=False, *, loop=None):
+ if value < 0:
+ raise ValueError("Semaphore initial value must be > 0")
+ self._value = value
+ self._bound = bound
+ self._bound_value = value
+ self._waiters = collections.deque()
+ self._locked = False
+ if loop is not None:
+ self._loop = loop
+ else:
+ self._loop = events.get_event_loop()
+
+ def __repr__(self):
+ # TODO: add waiters:N if > 0.
+ res = super().__repr__()
+ return '<{} [{}]>'.format(
+ res[1:-1],
+ 'locked' if self._locked else 'unlocked,value:{}'.format(
+ self._value))
+
+ def locked(self):
+ """Returns True if semaphore can not be acquired immediately."""
+ return self._locked
+
+ @tasks.coroutine
+ def acquire(self):
+ """Acquire a semaphore.
+
+ If the internal counter is larger than zero on entry,
+ decrement it by one and return True immediately. If it is
+ zero on entry, block, waiting until some other coroutine has
+ called release() to make it larger than 0, and then return
+ True.
+ """
+ if not self._waiters and self._value > 0:
+ self._value -= 1
+ if self._value == 0:
+ self._locked = True
+ return True
+
+ fut = futures.Future(loop=self._loop)
+ self._waiters.append(fut)
+ try:
+ yield from fut
+ self._value -= 1
+ if self._value == 0:
+ self._locked = True
+ return True
+ finally:
+ self._waiters.remove(fut)
+
+ def release(self):
+ """Release a semaphore, incrementing the internal counter by one.
+ When it was zero on entry and another coroutine is waiting for it to
+ become larger than zero again, wake up that coroutine.
+
+ If Semaphore is create with "bound" paramter equals true, then
+ release() method checks to make sure its current value doesn't exceed
+ its initial value. If it does, ValueError is raised.
+ """
+ if self._bound and self._value >= self._bound_value:
+ raise ValueError('Semaphore released too many times')
+
+ self._value += 1
+ self._locked = False
+
+ for waiter in self._waiters:
+ if not waiter.done():
+ waiter.set_result(True)
+ break
+
+ def __enter__(self):
+ # TODO: This is questionable. How do we know the user actually
+ # wrote "with (yield from sema)" instead of "with sema"?
+ return True
+
+ def __exit__(self, *args):
+ self.release()
+
+ def __iter__(self):
+ yield from self.acquire()
+ return self