path: root/Lib/
diff options
Diffstat (limited to 'Lib/')
1 files changed, 172 insertions, 0 deletions
diff --git a/Lib/ b/Lib/
index 238a5c4..41956ed 100644
--- a/Lib/
+++ b/Lib/
@@ -392,6 +392,178 @@ class _Event(_Verbose):
+# A barrier class. Inspired in part by the pthread_barrier_* api and
+# the CyclicBarrier class from Java. See
+# and
+# CyclicBarrier.html
+# for information.
+# We maintain two main states, 'filling' and 'draining' enabling the barrier
+# to be cyclic. Threads are not allowed into it until it has fully drained
+# since the previous cycle. In addition, a 'resetting' state exists which is
+# similar to 'draining' except that threads leave with a BrokenBarrierError,
+# and a 'broken' state in which all threads get get the exception.
+class Barrier(_Verbose):
+ """
+ Barrier. Useful for synchronizing a fixed number of threads
+ at known synchronization points. Threads block on 'wait()' and are
+ simultaneously once they have all made that call.
+ """
+ def __init__(self, parties, action=None, timeout=None, verbose=None):
+ """
+ Create a barrier, initialised to 'parties' threads.
+ 'action' is a callable which, when supplied, will be called
+ by one of the threads after they have all entered the
+ barrier and just prior to releasing them all.
+ If a 'timeout' is provided, it is uses as the default for
+ all subsequent 'wait()' calls.
+ """
+ _Verbose.__init__(self, verbose)
+ self._cond = Condition(Lock())
+ self._action = action
+ self._timeout = timeout
+ self._parties = parties
+ self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
+ self._count = 0
+ def wait(self, timeout=None):
+ """
+ Wait for the barrier. When the specified number of threads have
+ started waiting, they are all simultaneously awoken. If an 'action'
+ was provided for the barrier, one of the threads will have executed
+ that callback prior to returning.
+ Returns an individual index number from 0 to 'parties-1'.
+ """
+ if timeout is None:
+ timeout = self._timeout
+ with self._cond:
+ self._enter() # Block while the barrier drains.
+ index = self._count
+ self._count += 1
+ try:
+ if index + 1 == self._parties:
+ # We release the barrier
+ self._release()
+ else:
+ # We wait until someone releases us
+ self._wait(timeout)
+ return index
+ finally:
+ self._count -= 1
+ # Wake up any threads waiting for barrier to drain.
+ self._exit()
+ # Block until the barrier is ready for us, or raise an exception
+ # if it is broken.
+ def _enter(self):
+ while self._state in (-1, 1):
+ # It is draining or resetting, wait until done
+ self._cond.wait()
+ #see if the barrier is in a broken state
+ if self._state < 0:
+ raise BrokenBarrierError
+ assert self._state == 0
+ # Optionally run the 'action' and release the threads waiting
+ # in the barrier.
+ def _release(self):
+ try:
+ if self._action:
+ self._action()
+ # enter draining state
+ self._state = 1
+ self._cond.notify_all()
+ except:
+ #an exception during the _action handler. Break and reraise
+ self._break()
+ raise
+ # Wait in the barrier until we are relased. Raise an exception
+ # if the barrier is reset or broken.
+ def _wait(self, timeout):
+ while self._state == 0:
+ if self._cond.wait(timeout) is False:
+ #timed out. Break the barrier
+ self._break()
+ raise BrokenBarrierError
+ if self._state < 0:
+ raise BrokenBarrierError
+ assert self._state == 1
+ # If we are the last thread to exit the barrier, signal any threads
+ # waiting for the barrier to drain.
+ def _exit(self):
+ if self._count == 0:
+ if self._state in (-1, 1):
+ #resetting or draining
+ self._state = 0
+ self._cond.notify_all()
+ def reset(self):
+ """
+ Reset the barrier to the initial state.
+ Any threads currently waiting will get the BrokenBarrier exception
+ raised.
+ """
+ with self._cond:
+ if self._count > 0:
+ if self._state == 0:
+ #reset the barrier, waking up threads
+ self._state = -1
+ elif self._state == -2:
+ #was broken, set it to reset state
+ #which clears when the last thread exits
+ self._state = -1
+ else:
+ self._state = 0
+ self._cond.notify_all()
+ def abort(self):
+ """
+ Place the barrier into a 'broken' state.
+ Useful in case of error. Any currently waiting threads and
+ threads attempting to 'wait()' will have BrokenBarrierError
+ raised.
+ """
+ with self._cond:
+ self._break()
+ def _break(self):
+ # An internal error was detected. The barrier is set to
+ # a broken state all parties awakened.
+ self._state = -2
+ self._cond.notify_all()
+ @property
+ def parties(self):
+ """
+ Return the number of threads required to trip the barrier.
+ """
+ return self._parties
+ @property
+ def n_waiting(self):
+ """
+ Return the number of threads that are currently waiting at the barrier.
+ """
+ # We don't need synchronization here since this is an ephemeral result
+ # anyway. It returns the correct value in the steady state.
+ if self._state == 0:
+ return self._count
+ return 0
+ @property
+ def broken(self):
+ """
+ Return True if the barrier is in a broken state
+ """
+ return self._state == -2
+#exception raised by the Barrier class
+class BrokenBarrierError(RuntimeError): pass
# Helper to generate new thread names
_counter = 0
def _newname(template="Thread-%d"):