diff options
author | Antoine Pitrou <pitrou@free.fr> | 2017-09-18 20:04:20 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-18 20:04:20 (GMT) |
commit | b43c4caf81b10e5c7ebaeb3a712c6db584f60bbd (patch) | |
tree | dd02bc320029d436470d906bde52e8357e191217 /Lib/test/test_dummy_thread.py | |
parent | a8e7d903d7c4dd3a64412016e9f44f0e75f1fb3f (diff) | |
download | cpython-b43c4caf81b10e5c7ebaeb3a712c6db584f60bbd.zip cpython-b43c4caf81b10e5c7ebaeb3a712c6db584f60bbd.tar.gz cpython-b43c4caf81b10e5c7ebaeb3a712c6db584f60bbd.tar.bz2 |
Restore dummy_threading and _dummy_thread, but deprecate them (bpo-31370) (#3648)
Diffstat (limited to 'Lib/test/test_dummy_thread.py')
-rw-r--r-- | Lib/test/test_dummy_thread.py | 255 |
1 files changed, 255 insertions, 0 deletions
diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py new file mode 100644 index 0000000..0840be6 --- /dev/null +++ b/Lib/test/test_dummy_thread.py @@ -0,0 +1,255 @@ +import _dummy_thread as _thread +import time +import queue +import random +import unittest +from test import support +from unittest import mock + +DELAY = 0 + + +class LockTests(unittest.TestCase): + """Test lock objects.""" + + def setUp(self): + # Create a lock + self.lock = _thread.allocate_lock() + + def test_initlock(self): + #Make sure locks start locked + self.assertFalse(self.lock.locked(), + "Lock object is not initialized unlocked.") + + def test_release(self): + # Test self.lock.release() + self.lock.acquire() + self.lock.release() + self.assertFalse(self.lock.locked(), + "Lock object did not release properly.") + + def test_LockType_context_manager(self): + with _thread.LockType(): + pass + self.assertFalse(self.lock.locked(), + "Acquired Lock was not released") + + def test_improper_release(self): + #Make sure release of an unlocked thread raises RuntimeError + self.assertRaises(RuntimeError, self.lock.release) + + def test_cond_acquire_success(self): + #Make sure the conditional acquiring of the lock works. + self.assertTrue(self.lock.acquire(0), + "Conditional acquiring of the lock failed.") + + def test_cond_acquire_fail(self): + #Test acquiring locked lock returns False + self.lock.acquire(0) + self.assertFalse(self.lock.acquire(0), + "Conditional acquiring of a locked lock incorrectly " + "succeeded.") + + def test_uncond_acquire_success(self): + #Make sure unconditional acquiring of a lock works. + self.lock.acquire() + self.assertTrue(self.lock.locked(), + "Uncondional locking failed.") + + def test_uncond_acquire_return_val(self): + #Make sure that an unconditional locking returns True. + self.assertIs(self.lock.acquire(1), True, + "Unconditional locking did not return True.") + self.assertIs(self.lock.acquire(), True) + + def test_uncond_acquire_blocking(self): + #Make sure that unconditional acquiring of a locked lock blocks. + def delay_unlock(to_unlock, delay): + """Hold on to lock for a set amount of time before unlocking.""" + time.sleep(delay) + to_unlock.release() + + self.lock.acquire() + start_time = int(time.time()) + _thread.start_new_thread(delay_unlock,(self.lock, DELAY)) + if support.verbose: + print() + print("*** Waiting for thread to release the lock "\ + "(approx. %s sec.) ***" % DELAY) + self.lock.acquire() + end_time = int(time.time()) + if support.verbose: + print("done") + self.assertGreaterEqual(end_time - start_time, DELAY, + "Blocking by unconditional acquiring failed.") + + @mock.patch('time.sleep') + def test_acquire_timeout(self, mock_sleep): + """Test invoking acquire() with a positive timeout when the lock is + already acquired. Ensure that time.sleep() is invoked with the given + timeout and that False is returned.""" + + self.lock.acquire() + retval = self.lock.acquire(waitflag=0, timeout=1) + self.assertTrue(mock_sleep.called) + mock_sleep.assert_called_once_with(1) + self.assertEqual(retval, False) + + def test_lock_representation(self): + self.lock.acquire() + self.assertIn("locked", repr(self.lock)) + self.lock.release() + self.assertIn("unlocked", repr(self.lock)) + + +class MiscTests(unittest.TestCase): + """Miscellaneous tests.""" + + def test_exit(self): + self.assertRaises(SystemExit, _thread.exit) + + def test_ident(self): + self.assertIsInstance(_thread.get_ident(), int, + "_thread.get_ident() returned a non-integer") + self.assertGreater(_thread.get_ident(), 0) + + def test_LockType(self): + self.assertIsInstance(_thread.allocate_lock(), _thread.LockType, + "_thread.LockType is not an instance of what " + "is returned by _thread.allocate_lock()") + + def test_set_sentinel(self): + self.assertIsInstance(_thread._set_sentinel(), _thread.LockType, + "_thread._set_sentinel() did not return a " + "LockType instance.") + + def test_interrupt_main(self): + #Calling start_new_thread with a function that executes interrupt_main + # should raise KeyboardInterrupt upon completion. + def call_interrupt(): + _thread.interrupt_main() + + self.assertRaises(KeyboardInterrupt, + _thread.start_new_thread, + call_interrupt, + tuple()) + + def test_interrupt_in_main(self): + self.assertRaises(KeyboardInterrupt, _thread.interrupt_main) + + def test_stack_size_None(self): + retval = _thread.stack_size(None) + self.assertEqual(retval, 0) + + def test_stack_size_not_None(self): + with self.assertRaises(_thread.error) as cm: + _thread.stack_size("") + self.assertEqual(cm.exception.args[0], + "setting thread stack size not supported") + + +class ThreadTests(unittest.TestCase): + """Test thread creation.""" + + def test_arg_passing(self): + #Make sure that parameter passing works. + def arg_tester(queue, arg1=False, arg2=False): + """Use to test _thread.start_new_thread() passes args properly.""" + queue.put((arg1, arg2)) + + testing_queue = queue.Queue(1) + _thread.start_new_thread(arg_tester, (testing_queue, True, True)) + result = testing_queue.get() + self.assertTrue(result[0] and result[1], + "Argument passing for thread creation " + "using tuple failed") + + _thread.start_new_thread( + arg_tester, + tuple(), + {'queue':testing_queue, 'arg1':True, 'arg2':True}) + + result = testing_queue.get() + self.assertTrue(result[0] and result[1], + "Argument passing for thread creation " + "using kwargs failed") + + _thread.start_new_thread( + arg_tester, + (testing_queue, True), + {'arg2':True}) + + result = testing_queue.get() + self.assertTrue(result[0] and result[1], + "Argument passing for thread creation using both tuple" + " and kwargs failed") + + def test_multi_thread_creation(self): + def queue_mark(queue, delay): + time.sleep(delay) + queue.put(_thread.get_ident()) + + thread_count = 5 + testing_queue = queue.Queue(thread_count) + + if support.verbose: + print() + print("*** Testing multiple thread creation " + "(will take approx. %s to %s sec.) ***" % ( + DELAY, thread_count)) + + for count in range(thread_count): + if DELAY: + local_delay = round(random.random(), 1) + else: + local_delay = 0 + _thread.start_new_thread(queue_mark, + (testing_queue, local_delay)) + time.sleep(DELAY) + if support.verbose: + print('done') + self.assertEqual(testing_queue.qsize(), thread_count, + "Not all %s threads executed properly " + "after %s sec." % (thread_count, DELAY)) + + def test_args_not_tuple(self): + """ + Test invoking start_new_thread() with a non-tuple value for "args". + Expect TypeError with a meaningful error message to be raised. + """ + with self.assertRaises(TypeError) as cm: + _thread.start_new_thread(mock.Mock(), []) + self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple") + + def test_kwargs_not_dict(self): + """ + Test invoking start_new_thread() with a non-dict value for "kwargs". + Expect TypeError with a meaningful error message to be raised. + """ + with self.assertRaises(TypeError) as cm: + _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[]) + self.assertEqual(cm.exception.args[0], "3rd arg must be a dict") + + def test_SystemExit(self): + """ + Test invoking start_new_thread() with a function that raises + SystemExit. + The exception should be discarded. + """ + func = mock.Mock(side_effect=SystemExit()) + try: + _thread.start_new_thread(func, tuple()) + except SystemExit: + self.fail("start_new_thread raised SystemExit.") + + @mock.patch('traceback.print_exc') + def test_RaiseException(self, mock_print_exc): + """ + Test invoking start_new_thread() with a function that raises exception. + + The exception should be discarded and the traceback should be printed + via traceback.print_exc() + """ + func = mock.Mock(side_effect=Exception) + _thread.start_new_thread(func, tuple()) + self.assertTrue(mock_print_exc.called) |