diff options
Diffstat (limited to 'Lib/test/test_threading.py')
-rw-r--r-- | Lib/test/test_threading.py | 110 |
1 files changed, 73 insertions, 37 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index cbf6202..8b17460 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -1,55 +1,91 @@ # Very rudimentary test of threading module -# Create a bunch of threads, let each do some work, wait until all are done - +import test.test_support from test.test_support import verbose import random import threading import time +import unittest -# This takes about n/3 seconds to run (about n/3 clumps of tasks, times -# about 1 second per clump). -numtasks = 10 - -# no more than 3 of the 10 can run at once -sema = threading.BoundedSemaphore(value=3) -mutex = threading.RLock() -running = 0 +# A trivial mutable counter. +class Counter(object): + def __init__(self): + self.value = 0 + def inc(self): + self.value += 1 + def dec(self): + self.value -= 1 + def get(self): + return self.value class TestThread(threading.Thread): + def __init__(self, name, testcase, sema, mutex, nrunning): + threading.Thread.__init__(self, name=name) + self.testcase = testcase + self.sema = sema + self.mutex = mutex + self.nrunning = nrunning + def run(self): - global running delay = random.random() * 2 if verbose: print 'task', self.getName(), 'will run for', delay, 'sec' - sema.acquire() - mutex.acquire() - running = running + 1 + + self.sema.acquire() + + self.mutex.acquire() + self.nrunning.inc() if verbose: - print running, 'tasks are running' - mutex.release() + print self.nrunning.get(), 'tasks are running' + self.testcase.assert_(self.nrunning.get() <= 3) + self.mutex.release() + time.sleep(delay) if verbose: print 'task', self.getName(), 'done' - mutex.acquire() - running = running - 1 + + self.mutex.acquire() + self.nrunning.dec() + self.testcase.assert_(self.nrunning.get() >= 0) if verbose: - print self.getName(), 'is finished.', running, 'tasks are running' - mutex.release() - sema.release() - -threads = [] -def starttasks(): - for i in range(numtasks): - t = TestThread(name="<thread %d>"%i) - threads.append(t) - t.start() - -starttasks() - -if verbose: - print 'waiting for all tasks to complete' -for t in threads: - t.join() -if verbose: - print 'all tasks done' + print self.getName(), 'is finished.', self.nrunning.get(), \ + 'tasks are running' + self.mutex.release() + + self.sema.release() + +class ThreadTests(unittest.TestCase): + + # Create a bunch of threads, let each do some work, wait until all are + # done. + def test_various_ops(self): + # This takes about n/3 seconds to run (about n/3 clumps of tasks, + # times about 1 second per clump). + NUMTASKS = 10 + + # no more than 3 of the 10 can run at once + sema = threading.BoundedSemaphore(value=3) + mutex = threading.RLock() + numrunning = Counter() + + threads = [] + + for i in range(NUMTASKS): + t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) + threads.append(t) + t.start() + + if verbose: + print 'waiting for all tasks to complete' + for t in threads: + t.join() + if verbose: + print 'all tasks done' + self.assertEqual(numrunning.get(), 0) + + +def test_main(): + test.test_support.run_unittest(ThreadTests) + +if __name__ == "__main__": + test_main() |