diff options
Diffstat (limited to 'Lib/threading.py')
| -rw-r--r-- | Lib/threading.py | 208 |
1 files changed, 144 insertions, 64 deletions
diff --git a/Lib/threading.py b/Lib/threading.py index f2c6b3d..ee70cd8 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -8,12 +8,27 @@ except ImportError: del _sys.modules[__name__] raise +import warnings + +from functools import wraps from time import time as _time, sleep as _sleep from traceback import format_exc as _format_exc from collections import deque +# Note regarding PEP 8 compliant aliases +# This threading model was originally inspired by Java, and inherited +# the convention of camelCase function and method names from that +# language. While those names are not in any imminent danger of being +# deprecated, starting with Python 2.6, the module now provides a +# PEP 8 compliant alias for any such method name. +# Using the new PEP 8 compliant names also facilitates substitution +# with the multiprocessing module, which doesn't provide the old +# Java inspired names. + + # Rename some stuff so "from threading import *" is safe -__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event', +__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread', + 'current_thread', 'enumerate', 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] @@ -24,6 +39,11 @@ ThreadError = thread.error del thread +# sys.exc_clear is used to work around the fact that except blocks +# don't fully clear the exception until 3.0. +warnings.filterwarnings('ignore', category=DeprecationWarning, + module='threading', message='sys.exc_clear') + # Debug support (adapted from ihooks.py). # All the major classes here derive from _Verbose. We force that to # be a new-style class so that all the major classes here are new-style. @@ -45,7 +65,7 @@ if __debug__: if self.__verbose: format = format % args format = "%s: %s\n" % ( - currentThread().getName(), format) + current_thread().name, format) _sys.stderr.write(format) else: @@ -86,14 +106,16 @@ class _RLock(_Verbose): def __repr__(self): owner = self.__owner - return "<%s(%s, %d)>" % ( - self.__class__.__name__, - owner and owner.getName(), - self.__count) + try: + owner = _active[owner].name + except KeyError: + pass + return "<%s owner=%r count=%d>" % ( + self.__class__.__name__, owner, self.__count) def acquire(self, blocking=1): - me = currentThread() - if self.__owner is me: + me = _get_ident() + if self.__owner == me: self.__count = self.__count + 1 if __debug__: self._note("%s.acquire(%s): recursive success", self, blocking) @@ -112,8 +134,8 @@ class _RLock(_Verbose): __enter__ = acquire def release(self): - if self.__owner is not currentThread(): - raise RuntimeError("cannot release un-aquired lock") + if self.__owner != _get_ident(): + raise RuntimeError("cannot release un-acquired lock") self.__count = count = self.__count - 1 if not count: self.__owner = None @@ -129,7 +151,8 @@ class _RLock(_Verbose): # Internal methods used by condition variables - def _acquire_restore(self, (count, owner)): + def _acquire_restore(self, count_owner): + count, owner = count_owner self.__block.acquire() self.__count = count self.__owner = owner @@ -147,7 +170,7 @@ class _RLock(_Verbose): return (count, owner) def _is_owned(self): - return self.__owner is currentThread() + return self.__owner == _get_ident() def Condition(*args, **kwargs): @@ -196,7 +219,7 @@ class _Condition(_Verbose): self.__lock.acquire() # Ignore saved state def _is_owned(self): - # Return True if lock is owned by currentThread. + # Return True if lock is owned by current_thread. # This method is called only if __lock doesn't have _is_owned(). if self.__lock.acquire(0): self.__lock.release() @@ -206,7 +229,7 @@ class _Condition(_Verbose): def wait(self, timeout=None): if not self._is_owned(): - raise RuntimeError("cannot wait on un-aquired lock") + raise RuntimeError("cannot wait on un-acquired lock") waiter = _allocate_lock() waiter.acquire() self.__waiters.append(waiter) @@ -248,7 +271,7 @@ class _Condition(_Verbose): def notify(self, n=1): if not self._is_owned(): - raise RuntimeError("cannot notify on un-aquired lock") + raise RuntimeError("cannot notify on un-acquired lock") __waiters = self.__waiters waiters = __waiters[:n] if not waiters: @@ -267,6 +290,8 @@ class _Condition(_Verbose): def notifyAll(self): self.notify(len(self.__waiters)) + notify_all = notifyAll + def Semaphore(*args, **kwargs): return _Semaphore(*args, **kwargs) @@ -346,11 +371,13 @@ class _Event(_Verbose): def isSet(self): return self.__flag + is_set = isSet + def set(self): self.__cond.acquire() try: self.__flag = True - self.__cond.notifyAll() + self.__cond.notify_all() finally: self.__cond.release() @@ -392,6 +419,9 @@ class Thread(_Verbose): # shutdown and thus raises an exception about trying to perform some # operation on/with a NoneType __exc_info = _sys.exc_info + # Keep sys.exc_clear too to clear the exception just before + # allowing .join() to return. + __exc_clear = _sys.exc_clear def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): @@ -404,7 +434,8 @@ class Thread(_Verbose): self.__args = args self.__kwargs = kwargs self.__daemonic = self._set_daemon() - self.__started = False + self.__ident = None + self.__started = Event() self.__stopped = False self.__block = Condition(Lock()) self.__initialized = True @@ -414,36 +445,47 @@ class Thread(_Verbose): def _set_daemon(self): # Overridden in _MainThread and _DummyThread - return currentThread().isDaemon() + return current_thread().daemon def __repr__(self): assert self.__initialized, "Thread.__init__() was not called" status = "initial" - if self.__started: + if self.__started.is_set(): status = "started" if self.__stopped: status = "stopped" if self.__daemonic: - status = status + " daemon" + status += " daemon" + if self.__ident is not None: + status += " %s" % self.__ident return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) def start(self): if not self.__initialized: raise RuntimeError("thread.__init__() not called") - if self.__started: + if self.__started.is_set(): raise RuntimeError("thread already started") if __debug__: self._note("%s.start(): starting thread", self) _active_limbo_lock.acquire() _limbo[self] = self _active_limbo_lock.release() - _start_new_thread(self.__bootstrap, ()) - self.__started = True - _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) + try: + _start_new_thread(self.__bootstrap, ()) + except Exception: + with _active_limbo_lock: + del _limbo[self] + raise + self.__started.wait() def run(self): - if self.__target: - self.__target(*self.__args, **self.__kwargs) + try: + if self.__target: + self.__target(*self.__args, **self.__kwargs) + finally: + # Avoid a refcycle if the thread is running a function with + # an argument that has a member that points to the thread. + del self.__target, self.__args, self.__kwargs def __bootstrap(self): # Wrapper around the real bootstrap code that ignores @@ -465,11 +507,15 @@ class Thread(_Verbose): return raise + def _set_ident(self): + self.__ident = _get_ident() + def __bootstrap_inner(self): try: - self.__started = True + self._set_ident() + self.__started.set() _active_limbo_lock.acquire() - _active[_get_ident()] = self + _active[self.__ident] = self del _limbo[self] _active_limbo_lock.release() if __debug__: @@ -496,7 +542,7 @@ class Thread(_Verbose): # self. if _sys: _sys.stderr.write("Exception in thread %s:\n%s\n" % - (self.getName(), _format_exc())) + (self.name, _format_exc())) else: # Do the best job possible w/o a huge amt. of code to # approximate a traceback (code ideas from @@ -504,7 +550,7 @@ class Thread(_Verbose): exc_type, exc_value, exc_tb = self.__exc_info() try: print>>self.__stderr, ( - "Exception in thread " + self.getName() + + "Exception in thread " + self.name + " (most likely raised during interpreter shutdown):") print>>self.__stderr, ( "Traceback (most recent call last):") @@ -523,9 +569,14 @@ class Thread(_Verbose): else: if __debug__: self._note("%s.__bootstrap(): normal return", self) + finally: + # Prevent a race in + # test_threading.test_no_refcycle_through_target when + # the exception keeps the target alive past when we + # assert that it's dead. + self.__exc_clear() finally: - _active_limbo_lock.acquire() - try: + with _active_limbo_lock: self.__stop() try: # We don't call self.__delete() because it also @@ -533,13 +584,11 @@ class Thread(_Verbose): del _active[_get_ident()] except: pass - finally: - _active_limbo_lock.release() def __stop(self): self.__block.acquire() self.__stopped = True - self.__block.notifyAll() + self.__block.notify_all() self.__block.release() def __delete(self): @@ -566,22 +615,23 @@ class Thread(_Verbose): # since it isn't if dummy_threading is *not* being used then don't # hide the exception. - _active_limbo_lock.acquire() try: - try: + with _active_limbo_lock: del _active[_get_ident()] - except KeyError: - if 'dummy_threading' not in _sys.modules: - raise - finally: - _active_limbo_lock.release() + # There must not be any python code between the previous line + # and after the lock is released. Otherwise a tracing function + # could try to acquire the lock again in the same thread, (in + # current_thread()), and would block. + except KeyError: + if 'dummy_threading' not in _sys.modules: + raise def join(self, timeout=None): if not self.__initialized: raise RuntimeError("Thread.__init__() not called") - if not self.__started: + if not self.__started.is_set(): raise RuntimeError("cannot join thread before it is started") - if self is currentThread(): + if self is current_thread(): raise RuntimeError("cannot join current thread") if __debug__: @@ -609,29 +659,52 @@ class Thread(_Verbose): finally: self.__block.release() - def getName(self): + @property + def name(self): assert self.__initialized, "Thread.__init__() not called" return self.__name - def setName(self, name): + @name.setter + def name(self, name): assert self.__initialized, "Thread.__init__() not called" self.__name = str(name) + @property + def ident(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__ident + def isAlive(self): assert self.__initialized, "Thread.__init__() not called" - return self.__started and not self.__stopped + return self.__started.is_set() and not self.__stopped - def isDaemon(self): + is_alive = isAlive + + @property + def daemon(self): assert self.__initialized, "Thread.__init__() not called" return self.__daemonic - def setDaemon(self, daemonic): + @daemon.setter + def daemon(self, daemonic): if not self.__initialized: raise RuntimeError("Thread.__init__() not called") - if self.__started: + if self.__started.is_set(): raise RuntimeError("cannot set daemon status of active thread"); self.__daemonic = daemonic + def isDaemon(self): + return self.daemon + + def setDaemon(self, daemonic): + self.daemon = daemonic + + def getName(self): + return self.name + + def setName(self, name): + self.name = name + # The timer class was contributed by Itamar Shtull-Trauring def Timer(*args, **kwargs): @@ -659,7 +732,7 @@ class _Timer(Thread): def run(self): self.finished.wait(self.interval) - if not self.finished.isSet(): + if not self.finished.is_set(): self.function(*self.args, **self.kwargs) self.finished.set() @@ -670,7 +743,8 @@ class _MainThread(Thread): def __init__(self): Thread.__init__(self, name="MainThread") - self._Thread__started = True + self._Thread__started.set() + self._set_ident() _active_limbo_lock.acquire() _active[_get_ident()] = self _active_limbo_lock.release() @@ -693,16 +767,16 @@ class _MainThread(Thread): def _pickSomeNonDaemonThread(): for t in enumerate(): - if not t.isDaemon() and t.isAlive(): + if not t.daemon and t.is_alive(): return t return None # Dummy thread class to represent threads not started here. # These aren't garbage collected when they die, nor can they be waited for. -# If they invoke anything in threading.py that calls currentThread(), they +# If they invoke anything in threading.py that calls current_thread(), they # leave an entry in the _active dict forever after. -# Their purpose is to return *something* from currentThread(). +# Their purpose is to return *something* from current_thread(). # They are marked as daemon threads so we won't wait for them # when we exit (conform previous semantics). @@ -716,7 +790,8 @@ class _DummyThread(Thread): # instance is immortal, that's bad, so release this resource. del self._Thread__block - self._Thread__started = True + self._Thread__started.set() + self._set_ident() _active_limbo_lock.acquire() _active[_get_ident()] = self _active_limbo_lock.release() @@ -734,15 +809,23 @@ def currentThread(): try: return _active[_get_ident()] except KeyError: - ##print "currentThread(): no current thread for", _get_ident() + ##print "current_thread(): no current thread for", _get_ident() return _DummyThread() +current_thread = currentThread + def activeCount(): _active_limbo_lock.acquire() count = len(_active) + len(_limbo) _active_limbo_lock.release() return count +active_count = activeCount + +def _enumerate(): + # Same as enumerate(), but without the lock. Internal use only. + return _active.values() + _limbo.values() + def enumerate(): _active_limbo_lock.acquire() active = _active.values() + _limbo.values() @@ -778,9 +861,8 @@ def _after_fork(): # fork() only copied the current thread; clear references to others. new_active = {} - current = currentThread() - _active_limbo_lock.acquire() - try: + current = current_thread() + with _active_limbo_lock: for thread in _active.itervalues(): if thread is current: # There is only one active thread. We reset the ident to @@ -799,8 +881,6 @@ def _after_fork(): _active.clear() _active.update(new_active) assert len(_active) == 1 - finally: - _active_limbo_lock.release() # Self-test code @@ -851,7 +931,7 @@ def _test(): counter = 0 while counter < self.quota: counter = counter + 1 - self.queue.put("%s.%d" % (self.getName(), counter)) + self.queue.put("%s.%d" % (self.name, counter)) _sleep(random() * 0.00001) @@ -876,7 +956,7 @@ def _test(): P = [] for i in range(NP): t = ProducerThread(Q, NI) - t.setName("Producer-%d" % (i+1)) + t.name = ("Producer-%d" % (i+1)) P.append(t) C = ConsumerThread(Q, NI*NP) for t in P: |
