summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSenthil Kumaran <senthil@uthcode.com>2016-09-08 09:47:52 (GMT)
committerSenthil Kumaran <senthil@uthcode.com>2016-09-08 09:47:52 (GMT)
commit9dda0c9569dfc5fd7c717f97379416ba51d5656b (patch)
tree1fce1e5f694be9b8d9448fc31179faf38bdc7f69
parent7f9eb6eda30398ea7a5b630856abce81f91591f6 (diff)
parent82733fac8dca465a1f0eb8e82b3b3f53cc6da145 (diff)
downloadcpython-9dda0c9569dfc5fd7c717f97379416ba51d5656b.zip
cpython-9dda0c9569dfc5fd7c717f97379416ba51d5656b.tar.gz
cpython-9dda0c9569dfc5fd7c717f97379416ba51d5656b.tar.bz2
[merge from 3.5] Issue11551 - Increase the test coverage of _dummy_thread module to 100%.
Initial patch contributed by Denver Coneybeare.
-rw-r--r--Lib/test/test_dummy_thread.py156
1 files changed, 115 insertions, 41 deletions
diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py
index a6a2856..bb4bfe7 100644
--- a/Lib/test/test_dummy_thread.py
+++ b/Lib/test/test_dummy_thread.py
@@ -1,19 +1,13 @@
-"""Generic thread tests.
-
-Meant to be used by dummy_thread and thread. To allow for different modules
-to be used, test_main() can be called with the module to use as the thread
-implementation as its sole argument.
-
-"""
import _dummy_thread as _thread
import time
import queue
import random
import unittest
from test import support
+from unittest import mock
+
+DELAY = 0
-DELAY = 0 # Set > 0 when testing a module other than _dummy_thread, such as
- # the '_thread' module.
class LockTests(unittest.TestCase):
"""Test lock objects."""
@@ -34,6 +28,12 @@ class LockTests(unittest.TestCase):
self.assertFalse(self.lock.locked(),
"Lock object did not release properly.")
+ def test_LockType_context_manager(self):
+ with _thread.LockType():
+ pass
+ self.assertFalse(self.lock.locked(),
+ "Acquired Lock was not released")
+
def test_improper_release(self):
#Make sure release of an unlocked thread raises RuntimeError
self.assertRaises(RuntimeError, self.lock.release)
@@ -83,39 +83,72 @@ class LockTests(unittest.TestCase):
self.assertGreaterEqual(end_time - start_time, DELAY,
"Blocking by unconditional acquiring failed.")
+ @mock.patch('time.sleep')
+ def test_acquire_timeout(self, mock_sleep):
+ """Test invoking acquire() with a positive timeout when the lock is
+ already acquired. Ensure that time.sleep() is invoked with the given
+ timeout and that False is returned."""
+
+ self.lock.acquire()
+ retval = self.lock.acquire(waitflag=0, timeout=1)
+ self.assertTrue(mock_sleep.called)
+ mock_sleep.assert_called_once_with(1)
+ self.assertEqual(retval, False)
+
+ def test_lock_representation(self):
+ self.lock.acquire()
+ self.assertIn("locked", repr(self.lock))
+ self.lock.release()
+ self.assertIn("unlocked", repr(self.lock))
+
+
class MiscTests(unittest.TestCase):
"""Miscellaneous tests."""
def test_exit(self):
- #Make sure _thread.exit() raises SystemExit
self.assertRaises(SystemExit, _thread.exit)
def test_ident(self):
- #Test sanity of _thread.get_ident()
self.assertIsInstance(_thread.get_ident(), int,
"_thread.get_ident() returned a non-integer")
self.assertNotEqual(_thread.get_ident(), 0,
"_thread.get_ident() returned 0")
def test_LockType(self):
- #Make sure _thread.LockType is the same type as _thread.allocate_locke()
self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
"_thread.LockType is not an instance of what "
"is returned by _thread.allocate_lock()")
+ def test_set_sentinel(self):
+ self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
+ "_thread._set_sentinel() did not return a "
+ "LockType instance.")
+
def test_interrupt_main(self):
#Calling start_new_thread with a function that executes interrupt_main
# should raise KeyboardInterrupt upon completion.
def call_interrupt():
_thread.interrupt_main()
- self.assertRaises(KeyboardInterrupt, _thread.start_new_thread,
- call_interrupt, tuple())
+
+ self.assertRaises(KeyboardInterrupt,
+ _thread.start_new_thread,
+ call_interrupt,
+ tuple())
def test_interrupt_in_main(self):
- # Make sure that if interrupt_main is called in main threat that
- # KeyboardInterrupt is raised instantly.
self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
+ def test_stack_size_None(self):
+ retval = _thread.stack_size(None)
+ self.assertEqual(retval, 0)
+
+ def test_stack_size_not_None(self):
+ with self.assertRaises(_thread.error) as cm:
+ _thread.stack_size("")
+ self.assertEqual(cm.exception.args[0],
+ "setting thread stack size not supported")
+
+
class ThreadTests(unittest.TestCase):
"""Test thread creation."""
@@ -129,31 +162,43 @@ class ThreadTests(unittest.TestCase):
_thread.start_new_thread(arg_tester, (testing_queue, True, True))
result = testing_queue.get()
self.assertTrue(result[0] and result[1],
- "Argument passing for thread creation using tuple failed")
- _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue,
- 'arg1':True, 'arg2':True})
+ "Argument passing for thread creation "
+ "using tuple failed")
+
+ _thread.start_new_thread(
+ arg_tester,
+ tuple(),
+ {'queue':testing_queue, 'arg1':True, 'arg2':True})
+
result = testing_queue.get()
self.assertTrue(result[0] and result[1],
- "Argument passing for thread creation using kwargs failed")
- _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True})
+ "Argument passing for thread creation "
+ "using kwargs failed")
+
+ _thread.start_new_thread(
+ arg_tester,
+ (testing_queue, True),
+ {'arg2':True})
+
result = testing_queue.get()
self.assertTrue(result[0] and result[1],
"Argument passing for thread creation using both tuple"
" and kwargs failed")
- def test_multi_creation(self):
- #Make sure multiple threads can be created.
+ def test_multi_thread_creation(self):
def queue_mark(queue, delay):
- """Wait for ``delay`` seconds and then put something into ``queue``"""
time.sleep(delay)
queue.put(_thread.get_ident())
thread_count = 5
testing_queue = queue.Queue(thread_count)
+
if support.verbose:
print()
- print("*** Testing multiple thread creation "\
- "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count))
+ print("*** Testing multiple thread creation "
+ "(will take approx. %s to %s sec.) ***" % (
+ DELAY, thread_count))
+
for count in range(thread_count):
if DELAY:
local_delay = round(random.random(), 1)
@@ -165,18 +210,47 @@ class ThreadTests(unittest.TestCase):
if support.verbose:
print('done')
self.assertEqual(testing_queue.qsize(), thread_count,
- "Not all %s threads executed properly after %s sec." %
- (thread_count, DELAY))
-
-def test_main(imported_module=None):
- global _thread, DELAY
- if imported_module:
- _thread = imported_module
- DELAY = 2
- if support.verbose:
- print()
- print("*** Using %s as _thread module ***" % _thread)
- support.run_unittest(LockTests, MiscTests, ThreadTests)
-
-if __name__ == '__main__':
- test_main()
+ "Not all %s threads executed properly "
+ "after %s sec." % (thread_count, DELAY))
+
+ def test_args_not_tuple(self):
+ """
+ Test invoking start_new_thread() with a non-tuple value for "args".
+ Expect TypeError with a meaningful error message to be raised.
+ """
+ with self.assertRaises(TypeError) as cm:
+ _thread.start_new_thread(mock.Mock(), [])
+ self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
+
+ def test_kwargs_not_dict(self):
+ """
+ Test invoking start_new_thread() with a non-dict value for "kwargs".
+ Expect TypeError with a meaningful error message to be raised.
+ """
+ with self.assertRaises(TypeError) as cm:
+ _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
+ self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
+
+ def test_SystemExit(self):
+ """
+ Test invoking start_new_thread() with a function that raises
+ SystemExit.
+ The exception should be discarded.
+ """
+ func = mock.Mock(side_effect=SystemExit())
+ try:
+ _thread.start_new_thread(func, tuple())
+ except SystemExit:
+ self.fail("start_new_thread raised SystemExit.")
+
+ @mock.patch('traceback.print_exc')
+ def test_RaiseException(self, mock_print_exc):
+ """
+ Test invoking start_new_thread() with a function that raises exception.
+
+ The exception should be discarded and the traceback should be printed
+ via traceback.print_exc()
+ """
+ func = mock.Mock(side_effect=Exception)
+ _thread.start_new_thread(func, tuple())
+ self.assertTrue(mock_print_exc.called)