diff options
Diffstat (limited to 'Lib/test/test_dummy_thread.py')
| -rw-r--r-- | Lib/test/test_dummy_thread.py | 167 | 
1 files changed, 167 insertions, 0 deletions
diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py new file mode 100644 index 0000000..3be3931 --- /dev/null +++ b/Lib/test/test_dummy_thread.py @@ -0,0 +1,167 @@ +"""Generic thread tests. + +Meant to be used by dummy_thread and thread.  To allow for different modules +to be used, test_main() can be called with the module to use as the thread +implementation as its sole argument. + +""" +import dummy_thread as _thread +import time +import Queue +import random +import unittest +from test import test_support + + +class LockTests(unittest.TestCase): +    """Test lock objects.""" + +    def setUp(self): +        # Create a lock +        self.lock = _thread.allocate_lock() + +    def test_initlock(self): +        #Make sure locks start locked +        self.failUnless(not self.lock.locked(), +                        "Lock object is not initialized unlocked.") + +    def test_release(self): +        # Test self.lock.release() +        self.lock.acquire() +        self.lock.release() +        self.failUnless(not self.lock.locked(), +                        "Lock object did not release properly.") +     +    def test_improper_release(self): +        #Make sure release of an unlocked thread raises _thread.error +        self.failUnlessRaises(_thread.error, self.lock.release) + +    def test_cond_acquire_success(self): +        #Make sure the conditional acquiring of the lock works. +        self.failUnless(self.lock.acquire(0), +                        "Conditional acquiring of the lock failed.") + +    def test_cond_acquire_fail(self): +        #Test acquiring locked lock returns False +        self.lock.acquire(0) +        self.failUnless(not self.lock.acquire(0), +                        "Conditional acquiring of a locked lock incorrectly " +                         "succeeded.") + +    def test_uncond_acquire_success(self): +        #Make sure unconditional acquiring of a lock works. +        self.lock.acquire() +        self.failUnless(self.lock.locked(), +                        "Uncondional locking failed.") + +    def test_uncond_acquire_return_val(self): +        #Make sure that an unconditional locking returns True. +        self.failUnless(self.lock.acquire(1) is True, +                        "Unconditional locking did not return True.") +     +    def test_uncond_acquire_blocking(self): +        #Make sure that unconditional acquiring of a locked lock blocks. +        def delay_unlock(to_unlock, delay): +            """Hold on to lock for a set amount of time before unlocking.""" +            time.sleep(delay) +            to_unlock.release() + +        self.lock.acquire() +        delay = 1  #In seconds +        start_time = int(time.time()) +        _thread.start_new_thread(delay_unlock,(self.lock, delay)) +        if test_support.verbose: +            print +            print "*** Waiting for thread to release the lock "\ +            "(approx. %s sec.) ***" % delay +        self.lock.acquire() +        end_time = int(time.time()) +        if test_support.verbose: +            print "done" +        self.failUnless((end_time - start_time) >= delay, +                        "Blocking by unconditional acquiring failed.") + +class MiscTests(unittest.TestCase): +    """Miscellaneous tests.""" + +    def test_exit(self): +        #Make sure _thread.exit() raises SystemExit +        self.failUnlessRaises(SystemExit, _thread.exit) + +    def test_ident(self): +        #Test sanity of _thread.get_ident() +        self.failUnless(isinstance(_thread.get_ident(), int), +                        "_thread.get_ident() returned a non-integer") +        self.failUnless(_thread.get_ident() != 0, +                        "_thread.get_ident() returned 0") + +    def test_LockType(self): +        #Make sure _thread.LockType is the same type as _thread.allocate_locke() +        self.failUnless(isinstance(_thread.allocate_lock(), _thread.LockType), +                        "_thread.LockType is not an instance of what is " +                         "returned by _thread.allocate_lock()") + +class ThreadTests(unittest.TestCase): +    """Test thread creation.""" + +    def test_arg_passing(self): +        #Make sure that parameter passing works. +        def arg_tester(queue, arg1=False, arg2=False): +            """Use to test _thread.start_new_thread() passes args properly.""" +            queue.put((arg1, arg2)) + +        testing_queue = Queue.Queue(1) +        _thread.start_new_thread(arg_tester, (testing_queue, True, True)) +        result = testing_queue.get() +        self.failUnless(result[0] and result[1], +                        "Argument passing for thread creation using tuple failed") +        _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue, +                                                       'arg1':True, 'arg2':True}) +        result = testing_queue.get() +        self.failUnless(result[0] and result[1], +                        "Argument passing for thread creation using kwargs failed") +        _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True}) +        result = testing_queue.get() +        self.failUnless(result[0] and result[1], +                        "Argument passing for thread creation using both tuple" +                        " and kwargs failed") +     +    def test_multi_creation(self): +        #Make sure multiple threads can be created. +        def queue_mark(queue, delay): +            """Wait for ``delay`` seconds and then put something into ``queue``""" +            time.sleep(delay) +            queue.put(_thread.get_ident()) +         +        thread_count = 5 +        delay = 1.5 +        testing_queue = Queue.Queue(thread_count) +        if test_support.verbose: +            print +            print "*** Testing multiple thread creation "\ +            "(will take approx. %s to %s sec.) ***" % (delay, thread_count) +        for count in xrange(thread_count): +            _thread.start_new_thread(queue_mark, +                                     (testing_queue, round(random.random(), 1))) +        time.sleep(delay) +        if test_support.verbose: +            print 'done' +        self.failUnless(testing_queue.qsize() == thread_count, +                        "Not all %s threads executed properly after %s sec." %  +                        (thread_count, delay)) + +def test_main(imported_module=None): +    global _thread +    if imported_module: +        _thread = imported_module +    if test_support.verbose: +        print +        print "*** Using %s as _thread module ***" % _thread +    suite = unittest.TestSuite() +    suite.addTest(unittest.makeSuite(LockTests)) +    suite.addTest(unittest.makeSuite(MiscTests)) +    suite.addTest(unittest.makeSuite(ThreadTests)) +    test_support.run_suite(suite) + +if __name__ == '__main__': +    test_main()  | 
