diff options
Diffstat (limited to 'Lib/threading.py')
-rw-r--r-- | Lib/threading.py | 349 |
1 files changed, 232 insertions, 117 deletions
diff --git a/Lib/threading.py b/Lib/threading.py index c965c6f..fd93f74 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -17,24 +17,24 @@ from collections import deque # with the multiprocessing module, which doesn't provide the old # Java inspired names. - -# Rename some stuff so "from threading import *" is safe __all__ = ['active_count', 'Condition', 'current_thread', 'enumerate', 'Event', - 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', + 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier', 'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] +# Rename some stuff so "from threading import *" is safe _start_new_thread = _thread.start_new_thread _allocate_lock = _thread.allocate_lock _get_ident = _thread.get_ident ThreadError = _thread.error +try: + _CRLock = _thread.RLock +except AttributeError: + _CRLock = None +TIMEOUT_MAX = _thread.TIMEOUT_MAX del _thread # Debug support (adapted from ihooks.py). -# All the major classes here derive from _Verbose. We force that to -# be a new-style class so that all the major classes here are new-style. -# This helps debugging (type(instance) is more revealing for instances -# of new-style classes). _VERBOSE = False @@ -85,8 +85,12 @@ def settrace(func): Lock = _allocate_lock -def RLock(*args, **kwargs): - return _RLock(*args, **kwargs) +def RLock(verbose=None, *args, **kwargs): + if verbose is None: + verbose = _VERBOSE + if (__debug__ and verbose) or _CRLock is None: + return _PyRLock(verbose, *args, **kwargs) + return _CRLock(*args, **kwargs) class _RLock(_Verbose): @@ -105,14 +109,14 @@ class _RLock(_Verbose): return "<%s owner=%r count=%d>" % ( self.__class__.__name__, owner, self._count) - def acquire(self, blocking=True): + def acquire(self, blocking=True, timeout=-1): me = _get_ident() if self._owner == me: self._count = self._count + 1 if __debug__: self._note("%s.acquire(%s): recursive success", self, blocking) return 1 - rc = self._block.acquire(blocking) + rc = self._block.acquire(blocking, timeout) if rc: self._owner = me self._count = 1 @@ -162,6 +166,8 @@ class _RLock(_Verbose): def _is_owned(self): return self._owner == _get_ident() +_PyRLock = _RLock + def Condition(*args, **kwargs): return _Condition(*args, **kwargs) @@ -227,25 +233,14 @@ class _Condition(_Verbose): try: # restore state no matter what (e.g., KeyboardInterrupt) if timeout is None: waiter.acquire() + gotit = True if __debug__: self._note("%s.wait(): got it", self) else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). - endtime = _time() + timeout - delay = 0.0005 # 500 us -> initial delay of 1 ms - while True: - gotit = waiter.acquire(0) - if gotit: - break - remaining = endtime - _time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, .05) - _sleep(delay) + if timeout > 0: + gotit = waiter.acquire(True, timeout) + else: + gotit = waiter.acquire(False) if not gotit: if __debug__: self._note("%s.wait(%s): timed out", self, timeout) @@ -256,9 +251,36 @@ class _Condition(_Verbose): else: if __debug__: self._note("%s.wait(%s): got it", self, timeout) + return gotit finally: self._acquire_restore(saved_state) + def wait_for(self, predicate, timeout=None): + endtime = None + waittime = timeout + result = predicate() + while not result: + if waittime is not None: + if endtime is None: + endtime = _time() + waittime + else: + waittime = endtime - _time() + if waittime <= 0: + if __debug__: + self._note("%s.wait_for(%r, %r): Timed out.", + self, predicate, timeout) + break + if __debug__: + self._note("%s.wait_for(%r, %r): Waiting with timeout=%s.", + self, predicate, timeout, waittime) + self.wait(waittime) + result = predicate() + else: + if __debug__: + self._note("%s.wait_for(%r, %r): Success.", + self, predicate, timeout) + return result + def notify(self, n=1): if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") @@ -297,8 +319,11 @@ class _Semaphore(_Verbose): self._cond = Condition(Lock()) self._value = value - def acquire(self, blocking=True): + def acquire(self, blocking=True, timeout=None): + if not blocking and timeout is not None: + raise ValueError("can't specify timeout for non-blocking acquire") rc = False + endtime = None self._cond.acquire() while self._value == 0: if not blocking: @@ -306,7 +331,14 @@ class _Semaphore(_Verbose): if __debug__: self._note("%s.acquire(%s): blocked waiting, value=%s", self, blocking, self._value) - self._cond.wait() + if timeout is not None: + if endtime is None: + endtime = _time() + timeout + else: + timeout = endtime - _time() + if timeout <= 0: + break + self._cond.wait(timeout) else: self._value = self._value - 1 if __debug__: @@ -391,6 +423,177 @@ class _Event(_Verbose): finally: self._cond.release() + +# A barrier class. Inspired in part by the pthread_barrier_* api and +# the CyclicBarrier class from Java. See +# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and +# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ +# CyclicBarrier.html +# for information. +# We maintain two main states, 'filling' and 'draining' enabling the barrier +# to be cyclic. Threads are not allowed into it until it has fully drained +# since the previous cycle. In addition, a 'resetting' state exists which is +# similar to 'draining' except that threads leave with a BrokenBarrierError, +# and a 'broken' state in which all threads get get the exception. +class Barrier(_Verbose): + """ + Barrier. Useful for synchronizing a fixed number of threads + at known synchronization points. Threads block on 'wait()' and are + simultaneously once they have all made that call. + """ + def __init__(self, parties, action=None, timeout=None, verbose=None): + """ + Create a barrier, initialised to 'parties' threads. + 'action' is a callable which, when supplied, will be called + by one of the threads after they have all entered the + barrier and just prior to releasing them all. + If a 'timeout' is provided, it is uses as the default for + all subsequent 'wait()' calls. + """ + _Verbose.__init__(self, verbose) + self._cond = Condition(Lock()) + self._action = action + self._timeout = timeout + self._parties = parties + self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken + self._count = 0 + + def wait(self, timeout=None): + """ + Wait for the barrier. When the specified number of threads have + started waiting, they are all simultaneously awoken. If an 'action' + was provided for the barrier, one of the threads will have executed + that callback prior to returning. + Returns an individual index number from 0 to 'parties-1'. + """ + if timeout is None: + timeout = self._timeout + with self._cond: + self._enter() # Block while the barrier drains. + index = self._count + self._count += 1 + try: + if index + 1 == self._parties: + # We release the barrier + self._release() + else: + # We wait until someone releases us + self._wait(timeout) + return index + finally: + self._count -= 1 + # Wake up any threads waiting for barrier to drain. + self._exit() + + # Block until the barrier is ready for us, or raise an exception + # if it is broken. + def _enter(self): + while self._state in (-1, 1): + # It is draining or resetting, wait until done + self._cond.wait() + #see if the barrier is in a broken state + if self._state < 0: + raise BrokenBarrierError + assert self._state == 0 + + # Optionally run the 'action' and release the threads waiting + # in the barrier. + def _release(self): + try: + if self._action: + self._action() + # enter draining state + self._state = 1 + self._cond.notify_all() + except: + #an exception during the _action handler. Break and reraise + self._break() + raise + + # Wait in the barrier until we are relased. Raise an exception + # if the barrier is reset or broken. + def _wait(self, timeout): + if not self._cond.wait_for(lambda : self._state != 0, timeout): + #timed out. Break the barrier + self._break() + raise BrokenBarrierError + if self._state < 0: + raise BrokenBarrierError + assert self._state == 1 + + # If we are the last thread to exit the barrier, signal any threads + # waiting for the barrier to drain. + def _exit(self): + if self._count == 0: + if self._state in (-1, 1): + #resetting or draining + self._state = 0 + self._cond.notify_all() + + def reset(self): + """ + Reset the barrier to the initial state. + Any threads currently waiting will get the BrokenBarrier exception + raised. + """ + with self._cond: + if self._count > 0: + if self._state == 0: + #reset the barrier, waking up threads + self._state = -1 + elif self._state == -2: + #was broken, set it to reset state + #which clears when the last thread exits + self._state = -1 + else: + self._state = 0 + self._cond.notify_all() + + def abort(self): + """ + Place the barrier into a 'broken' state. + Useful in case of error. Any currently waiting threads and + threads attempting to 'wait()' will have BrokenBarrierError + raised. + """ + with self._cond: + self._break() + + def _break(self): + # An internal error was detected. The barrier is set to + # a broken state all parties awakened. + self._state = -2 + self._cond.notify_all() + + @property + def parties(self): + """ + Return the number of threads required to trip the barrier. + """ + return self._parties + + @property + def n_waiting(self): + """ + Return the number of threads that are currently waiting at the barrier. + """ + # We don't need synchronization here since this is an ephemeral result + # anyway. It returns the correct value in the steady state. + if self._state == 0: + return self._count + return 0 + + @property + def broken(self): + """ + Return True if the barrier is in a broken state + """ + return self._state == -2 + +#exception raised by the Barrier class +class BrokenBarrierError(RuntimeError): pass + + # Helper to generate new thread names _counter = 0 def _newname(template="Thread-%d"): @@ -881,91 +1084,3 @@ def _after_fork(): _active.clear() _active.update(new_active) assert len(_active) == 1 - - -# Self-test code - -def _test(): - - class BoundedQueue(_Verbose): - - def __init__(self, limit): - _Verbose.__init__(self) - self.mon = RLock() - self.rc = Condition(self.mon) - self.wc = Condition(self.mon) - self.limit = limit - self.queue = deque() - - def put(self, item): - self.mon.acquire() - while len(self.queue) >= self.limit: - self._note("put(%s): queue full", item) - self.wc.wait() - self.queue.append(item) - self._note("put(%s): appended, length now %d", - item, len(self.queue)) - self.rc.notify() - self.mon.release() - - def get(self): - self.mon.acquire() - while not self.queue: - self._note("get(): queue empty") - self.rc.wait() - item = self.queue.popleft() - self._note("get(): got %s, %d left", item, len(self.queue)) - self.wc.notify() - self.mon.release() - return item - - class ProducerThread(Thread): - - def __init__(self, queue, quota): - Thread.__init__(self, name="Producer") - self.queue = queue - self.quota = quota - - def run(self): - from random import random - counter = 0 - while counter < self.quota: - counter = counter + 1 - self.queue.put("%s.%d" % (self.name, counter)) - _sleep(random() * 0.00001) - - - class ConsumerThread(Thread): - - def __init__(self, queue, count): - Thread.__init__(self, name="Consumer") - self.queue = queue - self.count = count - - def run(self): - while self.count > 0: - item = self.queue.get() - print(item) - self.count = self.count - 1 - - NP = 3 - QL = 4 - NI = 5 - - Q = BoundedQueue(QL) - P = [] - for i in range(NP): - t = ProducerThread(Q, NI) - t.name = "Producer-%d" % (i+1) - P.append(t) - C = ConsumerThread(Q, NI*NP) - for t in P: - t.start() - _sleep(0.000001) - C.start() - for t in P: - t.join() - C.join() - -if __name__ == '__main__': - _test() |