summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_dummy_thread.py
diff options
context:
space:
mode:
authorAntoine Pitrou <pitrou@free.fr>2017-09-18 20:04:20 (GMT)
committerGitHub <noreply@github.com>2017-09-18 20:04:20 (GMT)
commitb43c4caf81b10e5c7ebaeb3a712c6db584f60bbd (patch)
treedd02bc320029d436470d906bde52e8357e191217 /Lib/test/test_dummy_thread.py
parenta8e7d903d7c4dd3a64412016e9f44f0e75f1fb3f (diff)
downloadcpython-b43c4caf81b10e5c7ebaeb3a712c6db584f60bbd.zip
cpython-b43c4caf81b10e5c7ebaeb3a712c6db584f60bbd.tar.gz
cpython-b43c4caf81b10e5c7ebaeb3a712c6db584f60bbd.tar.bz2
Restore dummy_threading and _dummy_thread, but deprecate them (bpo-31370) (#3648)
Diffstat (limited to 'Lib/test/test_dummy_thread.py')
-rw-r--r--Lib/test/test_dummy_thread.py255
1 files changed, 255 insertions, 0 deletions
diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py
new file mode 100644
index 0000000..0840be6
--- /dev/null
+++ b/Lib/test/test_dummy_thread.py
@@ -0,0 +1,255 @@
+import _dummy_thread as _thread
+import time
+import queue
+import random
+import unittest
+from test import support
+from unittest import mock
+
+DELAY = 0
+
+
+class LockTests(unittest.TestCase):
+ """Test lock objects."""
+
+ def setUp(self):
+ # Create a lock
+ self.lock = _thread.allocate_lock()
+
+ def test_initlock(self):
+ #Make sure locks start locked
+ self.assertFalse(self.lock.locked(),
+ "Lock object is not initialized unlocked.")
+
+ def test_release(self):
+ # Test self.lock.release()
+ self.lock.acquire()
+ self.lock.release()
+ self.assertFalse(self.lock.locked(),
+ "Lock object did not release properly.")
+
+ def test_LockType_context_manager(self):
+ with _thread.LockType():
+ pass
+ self.assertFalse(self.lock.locked(),
+ "Acquired Lock was not released")
+
+ def test_improper_release(self):
+ #Make sure release of an unlocked thread raises RuntimeError
+ self.assertRaises(RuntimeError, self.lock.release)
+
+ def test_cond_acquire_success(self):
+ #Make sure the conditional acquiring of the lock works.
+ self.assertTrue(self.lock.acquire(0),
+ "Conditional acquiring of the lock failed.")
+
+ def test_cond_acquire_fail(self):
+ #Test acquiring locked lock returns False
+ self.lock.acquire(0)
+ self.assertFalse(self.lock.acquire(0),
+ "Conditional acquiring of a locked lock incorrectly "
+ "succeeded.")
+
+ def test_uncond_acquire_success(self):
+ #Make sure unconditional acquiring of a lock works.
+ self.lock.acquire()
+ self.assertTrue(self.lock.locked(),
+ "Uncondional locking failed.")
+
+ def test_uncond_acquire_return_val(self):
+ #Make sure that an unconditional locking returns True.
+ self.assertIs(self.lock.acquire(1), True,
+ "Unconditional locking did not return True.")
+ self.assertIs(self.lock.acquire(), True)
+
+ def test_uncond_acquire_blocking(self):
+ #Make sure that unconditional acquiring of a locked lock blocks.
+ def delay_unlock(to_unlock, delay):
+ """Hold on to lock for a set amount of time before unlocking."""
+ time.sleep(delay)
+ to_unlock.release()
+
+ self.lock.acquire()
+ start_time = int(time.time())
+ _thread.start_new_thread(delay_unlock,(self.lock, DELAY))
+ if support.verbose:
+ print()
+ print("*** Waiting for thread to release the lock "\
+ "(approx. %s sec.) ***" % DELAY)
+ self.lock.acquire()
+ end_time = int(time.time())
+ if support.verbose:
+ print("done")
+ self.assertGreaterEqual(end_time - start_time, DELAY,
+ "Blocking by unconditional acquiring failed.")
+
+ @mock.patch('time.sleep')
+ def test_acquire_timeout(self, mock_sleep):
+ """Test invoking acquire() with a positive timeout when the lock is
+ already acquired. Ensure that time.sleep() is invoked with the given
+ timeout and that False is returned."""
+
+ self.lock.acquire()
+ retval = self.lock.acquire(waitflag=0, timeout=1)
+ self.assertTrue(mock_sleep.called)
+ mock_sleep.assert_called_once_with(1)
+ self.assertEqual(retval, False)
+
+ def test_lock_representation(self):
+ self.lock.acquire()
+ self.assertIn("locked", repr(self.lock))
+ self.lock.release()
+ self.assertIn("unlocked", repr(self.lock))
+
+
+class MiscTests(unittest.TestCase):
+ """Miscellaneous tests."""
+
+ def test_exit(self):
+ self.assertRaises(SystemExit, _thread.exit)
+
+ def test_ident(self):
+ self.assertIsInstance(_thread.get_ident(), int,
+ "_thread.get_ident() returned a non-integer")
+ self.assertGreater(_thread.get_ident(), 0)
+
+ def test_LockType(self):
+ self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
+ "_thread.LockType is not an instance of what "
+ "is returned by _thread.allocate_lock()")
+
+ def test_set_sentinel(self):
+ self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
+ "_thread._set_sentinel() did not return a "
+ "LockType instance.")
+
+ def test_interrupt_main(self):
+ #Calling start_new_thread with a function that executes interrupt_main
+ # should raise KeyboardInterrupt upon completion.
+ def call_interrupt():
+ _thread.interrupt_main()
+
+ self.assertRaises(KeyboardInterrupt,
+ _thread.start_new_thread,
+ call_interrupt,
+ tuple())
+
+ def test_interrupt_in_main(self):
+ self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
+
+ def test_stack_size_None(self):
+ retval = _thread.stack_size(None)
+ self.assertEqual(retval, 0)
+
+ def test_stack_size_not_None(self):
+ with self.assertRaises(_thread.error) as cm:
+ _thread.stack_size("")
+ self.assertEqual(cm.exception.args[0],
+ "setting thread stack size not supported")
+
+
+class ThreadTests(unittest.TestCase):
+ """Test thread creation."""
+
+ def test_arg_passing(self):
+ #Make sure that parameter passing works.
+ def arg_tester(queue, arg1=False, arg2=False):
+ """Use to test _thread.start_new_thread() passes args properly."""
+ queue.put((arg1, arg2))
+
+ testing_queue = queue.Queue(1)
+ _thread.start_new_thread(arg_tester, (testing_queue, True, True))
+ result = testing_queue.get()
+ self.assertTrue(result[0] and result[1],
+ "Argument passing for thread creation "
+ "using tuple failed")
+
+ _thread.start_new_thread(
+ arg_tester,
+ tuple(),
+ {'queue':testing_queue, 'arg1':True, 'arg2':True})
+
+ result = testing_queue.get()
+ self.assertTrue(result[0] and result[1],
+ "Argument passing for thread creation "
+ "using kwargs failed")
+
+ _thread.start_new_thread(
+ arg_tester,
+ (testing_queue, True),
+ {'arg2':True})
+
+ result = testing_queue.get()
+ self.assertTrue(result[0] and result[1],
+ "Argument passing for thread creation using both tuple"
+ " and kwargs failed")
+
+ def test_multi_thread_creation(self):
+ def queue_mark(queue, delay):
+ time.sleep(delay)
+ queue.put(_thread.get_ident())
+
+ thread_count = 5
+ testing_queue = queue.Queue(thread_count)
+
+ if support.verbose:
+ print()
+ print("*** Testing multiple thread creation "
+ "(will take approx. %s to %s sec.) ***" % (
+ DELAY, thread_count))
+
+ for count in range(thread_count):
+ if DELAY:
+ local_delay = round(random.random(), 1)
+ else:
+ local_delay = 0
+ _thread.start_new_thread(queue_mark,
+ (testing_queue, local_delay))
+ time.sleep(DELAY)
+ if support.verbose:
+ print('done')
+ self.assertEqual(testing_queue.qsize(), thread_count,
+ "Not all %s threads executed properly "
+ "after %s sec." % (thread_count, DELAY))
+
+ def test_args_not_tuple(self):
+ """
+ Test invoking start_new_thread() with a non-tuple value for "args".
+ Expect TypeError with a meaningful error message to be raised.
+ """
+ with self.assertRaises(TypeError) as cm:
+ _thread.start_new_thread(mock.Mock(), [])
+ self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
+
+ def test_kwargs_not_dict(self):
+ """
+ Test invoking start_new_thread() with a non-dict value for "kwargs".
+ Expect TypeError with a meaningful error message to be raised.
+ """
+ with self.assertRaises(TypeError) as cm:
+ _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
+ self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
+
+ def test_SystemExit(self):
+ """
+ Test invoking start_new_thread() with a function that raises
+ SystemExit.
+ The exception should be discarded.
+ """
+ func = mock.Mock(side_effect=SystemExit())
+ try:
+ _thread.start_new_thread(func, tuple())
+ except SystemExit:
+ self.fail("start_new_thread raised SystemExit.")
+
+ @mock.patch('traceback.print_exc')
+ def test_RaiseException(self, mock_print_exc):
+ """
+ Test invoking start_new_thread() with a function that raises exception.
+
+ The exception should be discarded and the traceback should be printed
+ via traceback.print_exc()
+ """
+ func = mock.Mock(side_effect=Exception)
+ _thread.start_new_thread(func, tuple())
+ self.assertTrue(mock_print_exc.called)