summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_dummy_thread.py
diff options
context:
space:
mode:
authorAntoine Pitrou <pitrou@free.fr>2017-09-07 16:56:24 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2017-09-07 16:56:24 (GMT)
commita6a4dc816d68df04a7d592e0b6af8c7ecc4d4344 (patch)
tree1c31738009bee903417cea928e705a112aea2392 /Lib/test/test_dummy_thread.py
parent1f06a680de465be0c24a78ea3b610053955daa99 (diff)
downloadcpython-a6a4dc816d68df04a7d592e0b6af8c7ecc4d4344.zip
cpython-a6a4dc816d68df04a7d592e0b6af8c7ecc4d4344.tar.gz
cpython-a6a4dc816d68df04a7d592e0b6af8c7ecc4d4344.tar.bz2
bpo-31370: Remove support for threads-less builds (#3385)
* Remove Setup.config * Always define WITH_THREAD for compatibility.
Diffstat (limited to 'Lib/test/test_dummy_thread.py')
-rw-r--r--Lib/test/test_dummy_thread.py255
1 files changed, 0 insertions, 255 deletions
diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py
deleted file mode 100644
index 0840be6..0000000
--- a/Lib/test/test_dummy_thread.py
+++ /dev/null
@@ -1,255 +0,0 @@
-import _dummy_thread as _thread
-import time
-import queue
-import random
-import unittest
-from test import support
-from unittest import mock
-
-DELAY = 0
-
-
-class LockTests(unittest.TestCase):
- """Test lock objects."""
-
- def setUp(self):
- # Create a lock
- self.lock = _thread.allocate_lock()
-
- def test_initlock(self):
- #Make sure locks start locked
- self.assertFalse(self.lock.locked(),
- "Lock object is not initialized unlocked.")
-
- def test_release(self):
- # Test self.lock.release()
- self.lock.acquire()
- self.lock.release()
- self.assertFalse(self.lock.locked(),
- "Lock object did not release properly.")
-
- def test_LockType_context_manager(self):
- with _thread.LockType():
- pass
- self.assertFalse(self.lock.locked(),
- "Acquired Lock was not released")
-
- def test_improper_release(self):
- #Make sure release of an unlocked thread raises RuntimeError
- self.assertRaises(RuntimeError, self.lock.release)
-
- def test_cond_acquire_success(self):
- #Make sure the conditional acquiring of the lock works.
- self.assertTrue(self.lock.acquire(0),
- "Conditional acquiring of the lock failed.")
-
- def test_cond_acquire_fail(self):
- #Test acquiring locked lock returns False
- self.lock.acquire(0)
- self.assertFalse(self.lock.acquire(0),
- "Conditional acquiring of a locked lock incorrectly "
- "succeeded.")
-
- def test_uncond_acquire_success(self):
- #Make sure unconditional acquiring of a lock works.
- self.lock.acquire()
- self.assertTrue(self.lock.locked(),
- "Uncondional locking failed.")
-
- def test_uncond_acquire_return_val(self):
- #Make sure that an unconditional locking returns True.
- self.assertIs(self.lock.acquire(1), True,
- "Unconditional locking did not return True.")
- self.assertIs(self.lock.acquire(), True)
-
- def test_uncond_acquire_blocking(self):
- #Make sure that unconditional acquiring of a locked lock blocks.
- def delay_unlock(to_unlock, delay):
- """Hold on to lock for a set amount of time before unlocking."""
- time.sleep(delay)
- to_unlock.release()
-
- self.lock.acquire()
- start_time = int(time.time())
- _thread.start_new_thread(delay_unlock,(self.lock, DELAY))
- if support.verbose:
- print()
- print("*** Waiting for thread to release the lock "\
- "(approx. %s sec.) ***" % DELAY)
- self.lock.acquire()
- end_time = int(time.time())
- if support.verbose:
- print("done")
- self.assertGreaterEqual(end_time - start_time, DELAY,
- "Blocking by unconditional acquiring failed.")
-
- @mock.patch('time.sleep')
- def test_acquire_timeout(self, mock_sleep):
- """Test invoking acquire() with a positive timeout when the lock is
- already acquired. Ensure that time.sleep() is invoked with the given
- timeout and that False is returned."""
-
- self.lock.acquire()
- retval = self.lock.acquire(waitflag=0, timeout=1)
- self.assertTrue(mock_sleep.called)
- mock_sleep.assert_called_once_with(1)
- self.assertEqual(retval, False)
-
- def test_lock_representation(self):
- self.lock.acquire()
- self.assertIn("locked", repr(self.lock))
- self.lock.release()
- self.assertIn("unlocked", repr(self.lock))
-
-
-class MiscTests(unittest.TestCase):
- """Miscellaneous tests."""
-
- def test_exit(self):
- self.assertRaises(SystemExit, _thread.exit)
-
- def test_ident(self):
- self.assertIsInstance(_thread.get_ident(), int,
- "_thread.get_ident() returned a non-integer")
- self.assertGreater(_thread.get_ident(), 0)
-
- def test_LockType(self):
- self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
- "_thread.LockType is not an instance of what "
- "is returned by _thread.allocate_lock()")
-
- def test_set_sentinel(self):
- self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
- "_thread._set_sentinel() did not return a "
- "LockType instance.")
-
- def test_interrupt_main(self):
- #Calling start_new_thread with a function that executes interrupt_main
- # should raise KeyboardInterrupt upon completion.
- def call_interrupt():
- _thread.interrupt_main()
-
- self.assertRaises(KeyboardInterrupt,
- _thread.start_new_thread,
- call_interrupt,
- tuple())
-
- def test_interrupt_in_main(self):
- self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
-
- def test_stack_size_None(self):
- retval = _thread.stack_size(None)
- self.assertEqual(retval, 0)
-
- def test_stack_size_not_None(self):
- with self.assertRaises(_thread.error) as cm:
- _thread.stack_size("")
- self.assertEqual(cm.exception.args[0],
- "setting thread stack size not supported")
-
-
-class ThreadTests(unittest.TestCase):
- """Test thread creation."""
-
- def test_arg_passing(self):
- #Make sure that parameter passing works.
- def arg_tester(queue, arg1=False, arg2=False):
- """Use to test _thread.start_new_thread() passes args properly."""
- queue.put((arg1, arg2))
-
- testing_queue = queue.Queue(1)
- _thread.start_new_thread(arg_tester, (testing_queue, True, True))
- result = testing_queue.get()
- self.assertTrue(result[0] and result[1],
- "Argument passing for thread creation "
- "using tuple failed")
-
- _thread.start_new_thread(
- arg_tester,
- tuple(),
- {'queue':testing_queue, 'arg1':True, 'arg2':True})
-
- result = testing_queue.get()
- self.assertTrue(result[0] and result[1],
- "Argument passing for thread creation "
- "using kwargs failed")
-
- _thread.start_new_thread(
- arg_tester,
- (testing_queue, True),
- {'arg2':True})
-
- result = testing_queue.get()
- self.assertTrue(result[0] and result[1],
- "Argument passing for thread creation using both tuple"
- " and kwargs failed")
-
- def test_multi_thread_creation(self):
- def queue_mark(queue, delay):
- time.sleep(delay)
- queue.put(_thread.get_ident())
-
- thread_count = 5
- testing_queue = queue.Queue(thread_count)
-
- if support.verbose:
- print()
- print("*** Testing multiple thread creation "
- "(will take approx. %s to %s sec.) ***" % (
- DELAY, thread_count))
-
- for count in range(thread_count):
- if DELAY:
- local_delay = round(random.random(), 1)
- else:
- local_delay = 0
- _thread.start_new_thread(queue_mark,
- (testing_queue, local_delay))
- time.sleep(DELAY)
- if support.verbose:
- print('done')
- self.assertEqual(testing_queue.qsize(), thread_count,
- "Not all %s threads executed properly "
- "after %s sec." % (thread_count, DELAY))
-
- def test_args_not_tuple(self):
- """
- Test invoking start_new_thread() with a non-tuple value for "args".
- Expect TypeError with a meaningful error message to be raised.
- """
- with self.assertRaises(TypeError) as cm:
- _thread.start_new_thread(mock.Mock(), [])
- self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
-
- def test_kwargs_not_dict(self):
- """
- Test invoking start_new_thread() with a non-dict value for "kwargs".
- Expect TypeError with a meaningful error message to be raised.
- """
- with self.assertRaises(TypeError) as cm:
- _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
- self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
-
- def test_SystemExit(self):
- """
- Test invoking start_new_thread() with a function that raises
- SystemExit.
- The exception should be discarded.
- """
- func = mock.Mock(side_effect=SystemExit())
- try:
- _thread.start_new_thread(func, tuple())
- except SystemExit:
- self.fail("start_new_thread raised SystemExit.")
-
- @mock.patch('traceback.print_exc')
- def test_RaiseException(self, mock_print_exc):
- """
- Test invoking start_new_thread() with a function that raises exception.
-
- The exception should be discarded and the traceback should be printed
- via traceback.print_exc()
- """
- func = mock.Mock(side_effect=Exception)
- _thread.start_new_thread(func, tuple())
- self.assertTrue(mock_print_exc.called)