diff options
author | Kristján Valur Jónsson <kristjan@ccpgames.com> | 2010-10-28 09:43:10 (GMT) |
---|---|---|
committer | Kristján Valur Jónsson <kristjan@ccpgames.com> | 2010-10-28 09:43:10 (GMT) |
commit | 3be00037d65178644b20a826f68eb3d0b25ccb5f (patch) | |
tree | 31d7ff67d789c0ca1fc0ce927afaa0fb5afe5714 /Lib/threading.py | |
parent | 65ffae0aa3b31f26503182cbc7cd79943b6b8ff5 (diff) | |
download | cpython-3be00037d65178644b20a826f68eb3d0b25ccb5f.zip cpython-3be00037d65178644b20a826f68eb3d0b25ccb5f.tar.gz cpython-3be00037d65178644b20a826f68eb3d0b25ccb5f.tar.bz2 |
issue 8777
Add threading.Barrier
Diffstat (limited to 'Lib/threading.py')
-rw-r--r-- | Lib/threading.py | 172 |
1 files changed, 172 insertions, 0 deletions
diff --git a/Lib/threading.py b/Lib/threading.py index 238a5c4..41956ed 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -392,6 +392,178 @@ class _Event(_Verbose): finally: self._cond.release() + +# A barrier class. Inspired in part by the pthread_barrier_* api and +# the CyclicBarrier class from Java. See +# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and +# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ +# CyclicBarrier.html +# for information. +# We maintain two main states, 'filling' and 'draining' enabling the barrier +# to be cyclic. Threads are not allowed into it until it has fully drained +# since the previous cycle. In addition, a 'resetting' state exists which is +# similar to 'draining' except that threads leave with a BrokenBarrierError, +# and a 'broken' state in which all threads get get the exception. +class Barrier(_Verbose): + """ + Barrier. Useful for synchronizing a fixed number of threads + at known synchronization points. Threads block on 'wait()' and are + simultaneously once they have all made that call. + """ + def __init__(self, parties, action=None, timeout=None, verbose=None): + """ + Create a barrier, initialised to 'parties' threads. + 'action' is a callable which, when supplied, will be called + by one of the threads after they have all entered the + barrier and just prior to releasing them all. + If a 'timeout' is provided, it is uses as the default for + all subsequent 'wait()' calls. + """ + _Verbose.__init__(self, verbose) + self._cond = Condition(Lock()) + self._action = action + self._timeout = timeout + self._parties = parties + self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken + self._count = 0 + + def wait(self, timeout=None): + """ + Wait for the barrier. When the specified number of threads have + started waiting, they are all simultaneously awoken. If an 'action' + was provided for the barrier, one of the threads will have executed + that callback prior to returning. + Returns an individual index number from 0 to 'parties-1'. + """ + if timeout is None: + timeout = self._timeout + with self._cond: + self._enter() # Block while the barrier drains. + index = self._count + self._count += 1 + try: + if index + 1 == self._parties: + # We release the barrier + self._release() + else: + # We wait until someone releases us + self._wait(timeout) + return index + finally: + self._count -= 1 + # Wake up any threads waiting for barrier to drain. + self._exit() + + # Block until the barrier is ready for us, or raise an exception + # if it is broken. + def _enter(self): + while self._state in (-1, 1): + # It is draining or resetting, wait until done + self._cond.wait() + #see if the barrier is in a broken state + if self._state < 0: + raise BrokenBarrierError + assert self._state == 0 + + # Optionally run the 'action' and release the threads waiting + # in the barrier. + def _release(self): + try: + if self._action: + self._action() + # enter draining state + self._state = 1 + self._cond.notify_all() + except: + #an exception during the _action handler. Break and reraise + self._break() + raise + + # Wait in the barrier until we are relased. Raise an exception + # if the barrier is reset or broken. + def _wait(self, timeout): + while self._state == 0: + if self._cond.wait(timeout) is False: + #timed out. Break the barrier + self._break() + raise BrokenBarrierError + if self._state < 0: + raise BrokenBarrierError + assert self._state == 1 + + # If we are the last thread to exit the barrier, signal any threads + # waiting for the barrier to drain. + def _exit(self): + if self._count == 0: + if self._state in (-1, 1): + #resetting or draining + self._state = 0 + self._cond.notify_all() + + def reset(self): + """ + Reset the barrier to the initial state. + Any threads currently waiting will get the BrokenBarrier exception + raised. + """ + with self._cond: + if self._count > 0: + if self._state == 0: + #reset the barrier, waking up threads + self._state = -1 + elif self._state == -2: + #was broken, set it to reset state + #which clears when the last thread exits + self._state = -1 + else: + self._state = 0 + self._cond.notify_all() + + def abort(self): + """ + Place the barrier into a 'broken' state. + Useful in case of error. Any currently waiting threads and + threads attempting to 'wait()' will have BrokenBarrierError + raised. + """ + with self._cond: + self._break() + + def _break(self): + # An internal error was detected. The barrier is set to + # a broken state all parties awakened. + self._state = -2 + self._cond.notify_all() + + @property + def parties(self): + """ + Return the number of threads required to trip the barrier. + """ + return self._parties + + @property + def n_waiting(self): + """ + Return the number of threads that are currently waiting at the barrier. + """ + # We don't need synchronization here since this is an ephemeral result + # anyway. It returns the correct value in the steady state. + if self._state == 0: + return self._count + return 0 + + @property + def broken(self): + """ + Return True if the barrier is in a broken state + """ + return self._state == -2 + +#exception raised by the Barrier class +class BrokenBarrierError(RuntimeError): pass + + # Helper to generate new thread names _counter = 0 def _newname(template="Thread-%d"): |