summaryrefslogtreecommitdiffstats
path: root/Lib/threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/threading.py')
-rw-r--r--Lib/threading.py349
1 files changed, 232 insertions, 117 deletions
diff --git a/Lib/threading.py b/Lib/threading.py
index c965c6f..fd93f74 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -17,24 +17,24 @@ from collections import deque
# with the multiprocessing module, which doesn't provide the old
# Java inspired names.
-
-# Rename some stuff so "from threading import *" is safe
__all__ = ['active_count', 'Condition', 'current_thread', 'enumerate', 'Event',
- 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
+ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier',
'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
+# Rename some stuff so "from threading import *" is safe
_start_new_thread = _thread.start_new_thread
_allocate_lock = _thread.allocate_lock
_get_ident = _thread.get_ident
ThreadError = _thread.error
+try:
+ _CRLock = _thread.RLock
+except AttributeError:
+ _CRLock = None
+TIMEOUT_MAX = _thread.TIMEOUT_MAX
del _thread
# Debug support (adapted from ihooks.py).
-# All the major classes here derive from _Verbose. We force that to
-# be a new-style class so that all the major classes here are new-style.
-# This helps debugging (type(instance) is more revealing for instances
-# of new-style classes).
_VERBOSE = False
@@ -85,8 +85,12 @@ def settrace(func):
Lock = _allocate_lock
-def RLock(*args, **kwargs):
- return _RLock(*args, **kwargs)
+def RLock(verbose=None, *args, **kwargs):
+ if verbose is None:
+ verbose = _VERBOSE
+ if (__debug__ and verbose) or _CRLock is None:
+ return _PyRLock(verbose, *args, **kwargs)
+ return _CRLock(*args, **kwargs)
class _RLock(_Verbose):
@@ -105,14 +109,14 @@ class _RLock(_Verbose):
return "<%s owner=%r count=%d>" % (
self.__class__.__name__, owner, self._count)
- def acquire(self, blocking=True):
+ def acquire(self, blocking=True, timeout=-1):
me = _get_ident()
if self._owner == me:
self._count = self._count + 1
if __debug__:
self._note("%s.acquire(%s): recursive success", self, blocking)
return 1
- rc = self._block.acquire(blocking)
+ rc = self._block.acquire(blocking, timeout)
if rc:
self._owner = me
self._count = 1
@@ -162,6 +166,8 @@ class _RLock(_Verbose):
def _is_owned(self):
return self._owner == _get_ident()
+_PyRLock = _RLock
+
def Condition(*args, **kwargs):
return _Condition(*args, **kwargs)
@@ -227,25 +233,14 @@ class _Condition(_Verbose):
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
+ gotit = True
if __debug__:
self._note("%s.wait(): got it", self)
else:
- # Balancing act: We can't afford a pure busy loop, so we
- # have to sleep; but if we sleep the whole timeout time,
- # we'll be unresponsive. The scheme here sleeps very
- # little at first, longer as time goes on, but never longer
- # than 20 times per second (or the timeout time remaining).
- endtime = _time() + timeout
- delay = 0.0005 # 500 us -> initial delay of 1 ms
- while True:
- gotit = waiter.acquire(0)
- if gotit:
- break
- remaining = endtime - _time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, .05)
- _sleep(delay)
+ if timeout > 0:
+ gotit = waiter.acquire(True, timeout)
+ else:
+ gotit = waiter.acquire(False)
if not gotit:
if __debug__:
self._note("%s.wait(%s): timed out", self, timeout)
@@ -256,9 +251,36 @@ class _Condition(_Verbose):
else:
if __debug__:
self._note("%s.wait(%s): got it", self, timeout)
+ return gotit
finally:
self._acquire_restore(saved_state)
+ def wait_for(self, predicate, timeout=None):
+ endtime = None
+ waittime = timeout
+ result = predicate()
+ while not result:
+ if waittime is not None:
+ if endtime is None:
+ endtime = _time() + waittime
+ else:
+ waittime = endtime - _time()
+ if waittime <= 0:
+ if __debug__:
+ self._note("%s.wait_for(%r, %r): Timed out.",
+ self, predicate, timeout)
+ break
+ if __debug__:
+ self._note("%s.wait_for(%r, %r): Waiting with timeout=%s.",
+ self, predicate, timeout, waittime)
+ self.wait(waittime)
+ result = predicate()
+ else:
+ if __debug__:
+ self._note("%s.wait_for(%r, %r): Success.",
+ self, predicate, timeout)
+ return result
+
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
@@ -297,8 +319,11 @@ class _Semaphore(_Verbose):
self._cond = Condition(Lock())
self._value = value
- def acquire(self, blocking=True):
+ def acquire(self, blocking=True, timeout=None):
+ if not blocking and timeout is not None:
+ raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
+ endtime = None
self._cond.acquire()
while self._value == 0:
if not blocking:
@@ -306,7 +331,14 @@ class _Semaphore(_Verbose):
if __debug__:
self._note("%s.acquire(%s): blocked waiting, value=%s",
self, blocking, self._value)
- self._cond.wait()
+ if timeout is not None:
+ if endtime is None:
+ endtime = _time() + timeout
+ else:
+ timeout = endtime - _time()
+ if timeout <= 0:
+ break
+ self._cond.wait(timeout)
else:
self._value = self._value - 1
if __debug__:
@@ -391,6 +423,177 @@ class _Event(_Verbose):
finally:
self._cond.release()
+
+# A barrier class. Inspired in part by the pthread_barrier_* api and
+# the CyclicBarrier class from Java. See
+# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and
+# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/
+# CyclicBarrier.html
+# for information.
+# We maintain two main states, 'filling' and 'draining' enabling the barrier
+# to be cyclic. Threads are not allowed into it until it has fully drained
+# since the previous cycle. In addition, a 'resetting' state exists which is
+# similar to 'draining' except that threads leave with a BrokenBarrierError,
+# and a 'broken' state in which all threads get get the exception.
+class Barrier(_Verbose):
+ """
+ Barrier. Useful for synchronizing a fixed number of threads
+ at known synchronization points. Threads block on 'wait()' and are
+ simultaneously once they have all made that call.
+ """
+ def __init__(self, parties, action=None, timeout=None, verbose=None):
+ """
+ Create a barrier, initialised to 'parties' threads.
+ 'action' is a callable which, when supplied, will be called
+ by one of the threads after they have all entered the
+ barrier and just prior to releasing them all.
+ If a 'timeout' is provided, it is uses as the default for
+ all subsequent 'wait()' calls.
+ """
+ _Verbose.__init__(self, verbose)
+ self._cond = Condition(Lock())
+ self._action = action
+ self._timeout = timeout
+ self._parties = parties
+ self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
+ self._count = 0
+
+ def wait(self, timeout=None):
+ """
+ Wait for the barrier. When the specified number of threads have
+ started waiting, they are all simultaneously awoken. If an 'action'
+ was provided for the barrier, one of the threads will have executed
+ that callback prior to returning.
+ Returns an individual index number from 0 to 'parties-1'.
+ """
+ if timeout is None:
+ timeout = self._timeout
+ with self._cond:
+ self._enter() # Block while the barrier drains.
+ index = self._count
+ self._count += 1
+ try:
+ if index + 1 == self._parties:
+ # We release the barrier
+ self._release()
+ else:
+ # We wait until someone releases us
+ self._wait(timeout)
+ return index
+ finally:
+ self._count -= 1
+ # Wake up any threads waiting for barrier to drain.
+ self._exit()
+
+ # Block until the barrier is ready for us, or raise an exception
+ # if it is broken.
+ def _enter(self):
+ while self._state in (-1, 1):
+ # It is draining or resetting, wait until done
+ self._cond.wait()
+ #see if the barrier is in a broken state
+ if self._state < 0:
+ raise BrokenBarrierError
+ assert self._state == 0
+
+ # Optionally run the 'action' and release the threads waiting
+ # in the barrier.
+ def _release(self):
+ try:
+ if self._action:
+ self._action()
+ # enter draining state
+ self._state = 1
+ self._cond.notify_all()
+ except:
+ #an exception during the _action handler. Break and reraise
+ self._break()
+ raise
+
+ # Wait in the barrier until we are relased. Raise an exception
+ # if the barrier is reset or broken.
+ def _wait(self, timeout):
+ if not self._cond.wait_for(lambda : self._state != 0, timeout):
+ #timed out. Break the barrier
+ self._break()
+ raise BrokenBarrierError
+ if self._state < 0:
+ raise BrokenBarrierError
+ assert self._state == 1
+
+ # If we are the last thread to exit the barrier, signal any threads
+ # waiting for the barrier to drain.
+ def _exit(self):
+ if self._count == 0:
+ if self._state in (-1, 1):
+ #resetting or draining
+ self._state = 0
+ self._cond.notify_all()
+
+ def reset(self):
+ """
+ Reset the barrier to the initial state.
+ Any threads currently waiting will get the BrokenBarrier exception
+ raised.
+ """
+ with self._cond:
+ if self._count > 0:
+ if self._state == 0:
+ #reset the barrier, waking up threads
+ self._state = -1
+ elif self._state == -2:
+ #was broken, set it to reset state
+ #which clears when the last thread exits
+ self._state = -1
+ else:
+ self._state = 0
+ self._cond.notify_all()
+
+ def abort(self):
+ """
+ Place the barrier into a 'broken' state.
+ Useful in case of error. Any currently waiting threads and
+ threads attempting to 'wait()' will have BrokenBarrierError
+ raised.
+ """
+ with self._cond:
+ self._break()
+
+ def _break(self):
+ # An internal error was detected. The barrier is set to
+ # a broken state all parties awakened.
+ self._state = -2
+ self._cond.notify_all()
+
+ @property
+ def parties(self):
+ """
+ Return the number of threads required to trip the barrier.
+ """
+ return self._parties
+
+ @property
+ def n_waiting(self):
+ """
+ Return the number of threads that are currently waiting at the barrier.
+ """
+ # We don't need synchronization here since this is an ephemeral result
+ # anyway. It returns the correct value in the steady state.
+ if self._state == 0:
+ return self._count
+ return 0
+
+ @property
+ def broken(self):
+ """
+ Return True if the barrier is in a broken state
+ """
+ return self._state == -2
+
+#exception raised by the Barrier class
+class BrokenBarrierError(RuntimeError): pass
+
+
# Helper to generate new thread names
_counter = 0
def _newname(template="Thread-%d"):
@@ -881,91 +1084,3 @@ def _after_fork():
_active.clear()
_active.update(new_active)
assert len(_active) == 1
-
-
-# Self-test code
-
-def _test():
-
- class BoundedQueue(_Verbose):
-
- def __init__(self, limit):
- _Verbose.__init__(self)
- self.mon = RLock()
- self.rc = Condition(self.mon)
- self.wc = Condition(self.mon)
- self.limit = limit
- self.queue = deque()
-
- def put(self, item):
- self.mon.acquire()
- while len(self.queue) >= self.limit:
- self._note("put(%s): queue full", item)
- self.wc.wait()
- self.queue.append(item)
- self._note("put(%s): appended, length now %d",
- item, len(self.queue))
- self.rc.notify()
- self.mon.release()
-
- def get(self):
- self.mon.acquire()
- while not self.queue:
- self._note("get(): queue empty")
- self.rc.wait()
- item = self.queue.popleft()
- self._note("get(): got %s, %d left", item, len(self.queue))
- self.wc.notify()
- self.mon.release()
- return item
-
- class ProducerThread(Thread):
-
- def __init__(self, queue, quota):
- Thread.__init__(self, name="Producer")
- self.queue = queue
- self.quota = quota
-
- def run(self):
- from random import random
- counter = 0
- while counter < self.quota:
- counter = counter + 1
- self.queue.put("%s.%d" % (self.name, counter))
- _sleep(random() * 0.00001)
-
-
- class ConsumerThread(Thread):
-
- def __init__(self, queue, count):
- Thread.__init__(self, name="Consumer")
- self.queue = queue
- self.count = count
-
- def run(self):
- while self.count > 0:
- item = self.queue.get()
- print(item)
- self.count = self.count - 1
-
- NP = 3
- QL = 4
- NI = 5
-
- Q = BoundedQueue(QL)
- P = []
- for i in range(NP):
- t = ProducerThread(Q, NI)
- t.name = "Producer-%d" % (i+1)
- P.append(t)
- C = ConsumerThread(Q, NI*NP)
- for t in P:
- t.start()
- _sleep(0.000001)
- C.start()
- for t in P:
- t.join()
- C.join()
-
-if __name__ == '__main__':
- _test()