diff options
Diffstat (limited to 'Lib/test/test_threading.py')
| -rw-r--r-- | Lib/test/test_threading.py | 277 |
1 files changed, 242 insertions, 35 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index cc9da05..054df7b 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -3,11 +3,15 @@ import test.test_support from test.test_support import verbose import random +import re import sys import threading import thread import time import unittest +import weakref + +from test import lock_tests # A trivial mutable counter. class Counter(object): @@ -29,32 +33,28 @@ class TestThread(threading.Thread): self.nrunning = nrunning def run(self): - delay = random.random() * 2 + delay = random.random() / 10000.0 if verbose: - print 'task', self.getName(), 'will run for', delay, 'sec' - - self.sema.acquire() + print 'task %s will run for %.1f usec' % ( + self.name, delay * 1e6) - self.mutex.acquire() - self.nrunning.inc() - if verbose: - print self.nrunning.get(), 'tasks are running' - self.testcase.assert_(self.nrunning.get() <= 3) - self.mutex.release() - - time.sleep(delay) - if verbose: - print 'task', self.getName(), 'done' + with self.sema: + with self.mutex: + self.nrunning.inc() + if verbose: + print self.nrunning.get(), 'tasks are running' + self.testcase.assert_(self.nrunning.get() <= 3) - self.mutex.acquire() - self.nrunning.dec() - self.testcase.assert_(self.nrunning.get() >= 0) - if verbose: - print self.getName(), 'is finished.', self.nrunning.get(), \ - 'tasks are running' - self.mutex.release() + time.sleep(delay) + if verbose: + print 'task', self.name, 'done' - self.sema.release() + with self.mutex: + self.nrunning.dec() + self.testcase.assert_(self.nrunning.get() >= 0) + if verbose: + print '%s is finished. %d tasks are running' % ( + self.name, self.nrunning.get()) class ThreadTests(unittest.TestCase): @@ -75,17 +75,36 @@ class ThreadTests(unittest.TestCase): for i in range(NUMTASKS): t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) threads.append(t) + self.failUnlessEqual(t.ident, None) + self.assert_(re.match('<TestThread\(.*, initial\)>', repr(t))) t.start() if verbose: print 'waiting for all tasks to complete' for t in threads: t.join(NUMTASKS) - self.assert_(not t.isAlive()) + self.assert_(not t.is_alive()) + self.failIfEqual(t.ident, 0) + self.assertFalse(t.ident is None) + self.assert_(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t))) if verbose: print 'all tasks done' self.assertEqual(numrunning.get(), 0) + def test_ident_of_no_threading_threads(self): + # The ident still must work for the main thread and dummy threads. + self.assertFalse(threading.currentThread().ident is None) + def f(): + ident.append(threading.currentThread().ident) + done.set() + done = threading.Event() + ident = [] + thread.start_new_thread(f, ()) + done.wait() + self.assertFalse(ident[0] is None) + # Kill the "immortal" _DummyThread + del threading._active[ident[0]] + # run with a small(ish) thread stack size (256kB) def test_various_ops_small_stack(self): if verbose: @@ -115,11 +134,9 @@ class ThreadTests(unittest.TestCase): def test_foreign_thread(self): # Check that a "foreign" thread can use the threading module. def f(mutex): - # Acquiring an RLock forces an entry for the foreign + # Calling current_thread() forces an entry for the foreign # thread to get made in the threading._active map. - r = threading.RLock() - r.acquire() - r.release() + threading.current_thread() mutex.release() mutex = threading.Lock() @@ -170,7 +187,7 @@ class ThreadTests(unittest.TestCase): worker_saw_exception.set() t = Worker() - t.setDaemon(True) # so if this fails, we don't hang Python at shutdown + t.daemon = True # so if this fails, we don't hang Python at shutdown t.start() if verbose: print " started worker thread" @@ -202,14 +219,123 @@ class ThreadTests(unittest.TestCase): t.join() # else the thread is still running, and we have no way to kill it + def test_limbo_cleanup(self): + # Issue 7481: Failure to start thread should cleanup the limbo map. + def fail_new_thread(*args): + raise thread.error() + _start_new_thread = threading._start_new_thread + threading._start_new_thread = fail_new_thread + try: + t = threading.Thread(target=lambda: None) + self.assertRaises(thread.error, t.start) + self.assertFalse( + t in threading._limbo, + "Failed to cleanup _limbo map on failure of Thread.start().") + finally: + threading._start_new_thread = _start_new_thread + + def test_finalize_runnning_thread(self): + # Issue 1402: the PyGILState_Ensure / _Release functions may be called + # very late on python exit: on deallocation of a running thread for + # example. + try: + import ctypes + except ImportError: + if verbose: + print("test_finalize_with_runnning_thread can't import ctypes") + return # can't do anything + + import subprocess + rc = subprocess.call([sys.executable, "-c", """if 1: + import ctypes, sys, time, thread + + # This lock is used as a simple event variable. + ready = thread.allocate_lock() + ready.acquire() + + # Module globals are cleared before __del__ is run + # So we save the functions in class dict + class C: + ensure = ctypes.pythonapi.PyGILState_Ensure + release = ctypes.pythonapi.PyGILState_Release + def __del__(self): + state = self.ensure() + self.release(state) + + def waitingThread(): + x = C() + ready.release() + time.sleep(100) + + thread.start_new_thread(waitingThread, ()) + ready.acquire() # Be sure the other thread is waiting. + sys.exit(42) + """]) + self.assertEqual(rc, 42) + + def test_finalize_with_trace(self): + # Issue1733757 + # Avoid a deadlock when sys.settrace steps into threading._shutdown + import subprocess + rc = subprocess.call([sys.executable, "-c", """if 1: + import sys, threading + + # A deadlock-killer, to prevent the + # testsuite to hang forever + def killer(): + import os, time + time.sleep(2) + print 'program blocked; aborting' + os._exit(2) + t = threading.Thread(target=killer) + t.daemon = True + t.start() + + # This is the trace function + def func(frame, event, arg): + threading.current_thread() + return func + + sys.settrace(func) + """]) + self.failIf(rc == 2, "interpreted was blocked") + self.failUnless(rc == 0, "Unexpected error") + + def test_join_nondaemon_on_shutdown(self): + # Issue 1722344 + # Raising SystemExit skipped threading._shutdown + import subprocess + p = subprocess.Popen([sys.executable, "-c", """if 1: + import threading + from time import sleep + + def child(): + sleep(1) + # As a non-daemon thread we SHOULD wake up and nothing + # should be torn down yet + print "Woke up, sleep function is:", sleep + + threading.Thread(target=child).start() + raise SystemExit + """], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + self.assertEqual(stdout.strip(), + "Woke up, sleep function is: <built-in function sleep>") + stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip() + self.assertEqual(stderr, "") + def test_enumerate_after_join(self): # Try hard to trigger #1703448: a thread is still returned in # threading.enumerate() after it has been join()ed. enum = threading.enumerate old_interval = sys.getcheckinterval() - sys.setcheckinterval(1) try: - for i in xrange(1, 1000): + for i in xrange(1, 100): + # Try a couple times at each thread-switching interval + # to get more interleavings. + sys.setcheckinterval(i // 5) t = threading.Thread(target=lambda: None) t.start() t.join() @@ -219,6 +345,37 @@ class ThreadTests(unittest.TestCase): finally: sys.setcheckinterval(old_interval) + def test_no_refcycle_through_target(self): + class RunSelfFunction(object): + def __init__(self, should_raise): + # The links in this refcycle from Thread back to self + # should be cleaned up when the thread completes. + self.should_raise = should_raise + self.thread = threading.Thread(target=self._run, + args=(self,), + kwargs={'yet_another':self}) + self.thread.start() + + def _run(self, other_ref, yet_another): + if self.should_raise: + raise SystemExit + + cyclic_object = RunSelfFunction(should_raise=False) + weak_cyclic_object = weakref.ref(cyclic_object) + cyclic_object.thread.join() + del cyclic_object + self.assertEquals(None, weak_cyclic_object(), + msg=('%d references still around' % + sys.getrefcount(weak_cyclic_object()))) + + raising_cyclic_object = RunSelfFunction(should_raise=True) + weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) + raising_cyclic_object.thread.join() + del raising_cyclic_object + self.assertEquals(None, weak_raising_cyclic_object(), + msg=('%d references still around' % + sys.getrefcount(weak_raising_cyclic_object()))) + class ThreadJoinOnShutdown(unittest.TestCase): @@ -245,7 +402,7 @@ class ThreadJoinOnShutdown(unittest.TestCase): script = """if 1: import os t = threading.Thread(target=joiningfunc, - args=(threading.currentThread(),)) + args=(threading.current_thread(),)) t.start() time.sleep(0.1) print 'end of main' @@ -265,7 +422,7 @@ class ThreadJoinOnShutdown(unittest.TestCase): sys.exit(0) t = threading.Thread(target=joiningfunc, - args=(threading.currentThread(),)) + args=(threading.current_thread(),)) t.start() print 'end of main' """ @@ -284,7 +441,7 @@ class ThreadJoinOnShutdown(unittest.TestCase): ' due to known OS bugs on'), sys.platform return script = """if 1: - main_thread = threading.currentThread() + main_thread = threading.current_thread() def worker(): childpid = os.fork() if childpid != 0: @@ -303,9 +460,59 @@ class ThreadJoinOnShutdown(unittest.TestCase): self._run_and_join(script) +class ThreadingExceptionTests(unittest.TestCase): + # A RuntimeError should be raised if Thread.start() is called + # multiple times. + def test_start_thread_again(self): + thread = threading.Thread() + thread.start() + self.assertRaises(RuntimeError, thread.start) + + def test_joining_current_thread(self): + current_thread = threading.current_thread() + self.assertRaises(RuntimeError, current_thread.join); + + def test_joining_inactive_thread(self): + thread = threading.Thread() + self.assertRaises(RuntimeError, thread.join) + + def test_daemonize_active_thread(self): + thread = threading.Thread() + thread.start() + self.assertRaises(RuntimeError, setattr, thread, "daemon", True) + + +class LockTests(lock_tests.LockTests): + locktype = staticmethod(threading.Lock) + +class RLockTests(lock_tests.RLockTests): + locktype = staticmethod(threading.RLock) + +class EventTests(lock_tests.EventTests): + eventtype = staticmethod(threading.Event) + +class ConditionAsRLockTests(lock_tests.RLockTests): + # An Condition uses an RLock by default and exports its API. + locktype = staticmethod(threading.Condition) + +class ConditionTests(lock_tests.ConditionTests): + condtype = staticmethod(threading.Condition) + +class SemaphoreTests(lock_tests.SemaphoreTests): + semtype = staticmethod(threading.Semaphore) + +class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): + semtype = staticmethod(threading.BoundedSemaphore) + + def test_main(): - test.test_support.run_unittest(ThreadTests, - ThreadJoinOnShutdown) + test.test_support.run_unittest(LockTests, RLockTests, EventTests, + ConditionAsRLockTests, ConditionTests, + SemaphoreTests, BoundedSemaphoreTests, + ThreadTests, + ThreadJoinOnShutdown, + ThreadingExceptionTests, + ) if __name__ == "__main__": test_main() |
