diff options
Diffstat (limited to 'Lib/test/test_thread.py')
-rw-r--r-- | Lib/test/test_thread.py | 295 |
1 files changed, 148 insertions, 147 deletions
diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index 577d4cb..c89c5a1 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -1,160 +1,161 @@ -# Very rudimentary test of thread module - -# Create a bunch of threads, let each do some work, wait until all are done - -from test.test_support import verbose +import os +import unittest import random +from test import test_support import thread import time -mutex = thread.allocate_lock() -rmutex = thread.allocate_lock() # for calls to random -running = 0 -done = thread.allocate_lock() -done.acquire() - -numtasks = 10 - -def task(ident): - global running - rmutex.acquire() - delay = random.random() * numtasks - rmutex.release() - if verbose: - print('task', ident, 'will run for', round(delay, 1), 'sec') - time.sleep(delay) - if verbose: - print('task', ident, 'done') - mutex.acquire() - running = running - 1 - if running == 0: - done.release() - mutex.release() - -next_ident = 0 -def newtask(): - global next_ident, running - mutex.acquire() - next_ident = next_ident + 1 - if verbose: - print('creating task', next_ident) - thread.start_new_thread(task, (next_ident,)) - running = running + 1 - mutex.release() - -for i in range(numtasks): - newtask() - -print('waiting for all tasks to complete') -done.acquire() -print('all tasks done') - -class barrier: - def __init__(self, n): - self.n = n + +NUMTASKS = 10 +NUMTRIPS = 3 + +def verbose_print(arg): + """Helper function for printing out debugging output.""" + if test_support.verbose: + print(arg) + +class BasicThreadTest(unittest.TestCase): + + def setUp(self): + self.done_mutex = thread.allocate_lock() + self.done_mutex.acquire() + self.running_mutex = thread.allocate_lock() + self.random_mutex = thread.allocate_lock() + self.running = 0 + self.next_ident = 0 + + +class ThreadRunningTests(BasicThreadTest): + + def newtask(self): + with self.running_mutex: + self.next_ident += 1 + verbose_print("creating task %s" % self.next_ident) + thread.start_new_thread(self.task, (self.next_ident,)) + self.running += 1 + + def task(self, ident): + with self.random_mutex: + delay = random.random() * NUMTASKS + verbose_print("task %s will run for %s" % (ident, round(delay, 1))) + time.sleep(delay) + verbose_print("task %s done" % ident) + with self.running_mutex: + self.running -= 1 + if self.running == 0: + self.done_mutex.release() + + def test_starting_threads(self): + # Basic test for thread creation. + for i in range(NUMTASKS): + self.newtask() + verbose_print("waiting for tasks to complete...") + self.done_mutex.acquire() + verbose_print("all tasks done") + + def test_stack_size(self): + # Various stack size tests. + self.assertEquals(thread.stack_size(), 0, "intial stack size is not 0") + + thread.stack_size(0) + self.assertEquals(thread.stack_size(), 0, "stack_size not reset to default") + + if os.name not in ("nt", "os2", "posix"): + return + + tss_supported = True + try: + thread.stack_size(4096) + except ValueError: + verbose_print("caught expected ValueError setting " + "stack_size(4096)") + except thread.error: + tss_supported = False + verbose_print("platform does not support changing thread stack " + "size") + + if tss_supported: + fail_msg = "stack_size(%d) failed - should succeed" + for tss in (262144, 0x100000, 0): + thread.stack_size(tss) + self.assertEquals(thread.stack_size(), tss, fail_msg % tss) + verbose_print("successfully set stack_size(%d)" % tss) + + for tss in (262144, 0x100000): + verbose_print("trying stack_size = (%d)" % tss) + self.next_ident = 0 + for i in range(NUMTASKS): + self.newtask() + + verbose_print("waiting for all tasks to complete") + self.done_mutex.acquire() + verbose_print("all tasks done") + + thread.stack_size(0) + + +class Barrier: + def __init__(self, num_threads): + self.num_threads = num_threads self.waiting = 0 - self.checkin = thread.allocate_lock() - self.checkout = thread.allocate_lock() - self.checkout.acquire() + self.checkin_mutex = thread.allocate_lock() + self.checkout_mutex = thread.allocate_lock() + self.checkout_mutex.acquire() def enter(self): - checkin, checkout = self.checkin, self.checkout - - checkin.acquire() + self.checkin_mutex.acquire() self.waiting = self.waiting + 1 - if self.waiting == self.n: - self.waiting = self.n - 1 - checkout.release() + if self.waiting == self.num_threads: + self.waiting = self.num_threads - 1 + self.checkout_mutex.release() return - checkin.release() + self.checkin_mutex.release() - checkout.acquire() + self.checkout_mutex.acquire() self.waiting = self.waiting - 1 if self.waiting == 0: - checkin.release() + self.checkin_mutex.release() return - checkout.release() - -numtrips = 3 -def task2(ident): - global running - for i in range(numtrips): - if ident == 0: - # give it a good chance to enter the next - # barrier before the others are all out - # of the current one - delay = 0.001 - else: - rmutex.acquire() - delay = random.random() * numtasks - rmutex.release() - if verbose: - print('task', ident, 'will run for', round(delay, 1), 'sec') - time.sleep(delay) - if verbose: - print('task', ident, 'entering barrier', i) - bar.enter() - if verbose: - print('task', ident, 'leaving barrier', i) - mutex.acquire() - running -= 1 - # Must release mutex before releasing done, else the main thread can - # exit and set mutex to None as part of global teardown; then - # mutex.release() raises AttributeError. - finished = running == 0 - mutex.release() - if finished: - done.release() - -print('\n*** Barrier Test ***') -if done.acquire(0): - raise ValueError("'done' should have remained acquired") -bar = barrier(numtasks) -running = numtasks -for i in range(numtasks): - thread.start_new_thread(task2, (i,)) -done.acquire() -print('all tasks done') - -# not all platforms support changing thread stack size -print('\n*** Changing thread stack size ***') -if thread.stack_size() != 0: - raise ValueError("initial stack_size not 0") - -thread.stack_size(0) -if thread.stack_size() != 0: - raise ValueError("stack_size not reset to default") - -from os import name as os_name -if os_name in ("nt", "os2", "posix"): - - tss_supported = 1 - try: - thread.stack_size(4096) - except ValueError: - print('caught expected ValueError setting stack_size(4096)') - except thread.error: - tss_supported = 0 - print('platform does not support changing thread stack size') - - if tss_supported: - failed = lambda s, e: s != e - fail_msg = "stack_size(%d) failed - should succeed" - for tss in (262144, 0x100000, 0): - thread.stack_size(tss) - if failed(thread.stack_size(), tss): - raise ValueError(fail_msg % tss) - print('successfully set stack_size(%d)' % tss) - - for tss in (262144, 0x100000): - print('trying stack_size = %d' % tss) - next_ident = 0 - for i in range(numtasks): - newtask() - - print('waiting for all tasks to complete') - done.acquire() - print('all tasks done') - - # reset stack size to default - thread.stack_size(0) + self.checkout_mutex.release() + + +class BarrierTest(BasicThreadTest): + + def test_barrier(self): + self.bar = Barrier(NUMTASKS) + self.running = NUMTASKS + for i in range(NUMTASKS): + thread.start_new_thread(self.task2, (i,)) + verbose_print("waiting for tasks to end") + self.done_mutex.acquire() + verbose_print("tasks done") + + def task2(self, ident): + for i in range(NUMTRIPS): + if ident == 0: + # give it a good chance to enter the next + # barrier before the others are all out + # of the current one + delay = 0.001 + else: + with self.random_mutex: + delay = random.random() * NUMTASKS + verbose_print("task %s will run for %s" % (ident, round(delay, 1))) + time.sleep(delay) + verbose_print("task %s entering %s" % (ident, i)) + self.bar.enter() + verbose_print("task %s leaving barrier" % ident) + with self.running_mutex: + self.running -= 1 + # Must release mutex before releasing done, else the main thread can + # exit and set mutex to None as part of global teardown; then + # mutex.release() raises AttributeError. + finished = self.running == 0 + if finished: + self.done_mutex.release() + +def test_main(): + test_support.run_unittest(ThreadRunningTests, BarrierTest) + +if __name__ == "__main__": + test_main() |