diff options
Diffstat (limited to 'Lib/test/test_threading.py')
-rw-r--r-- | Lib/test/test_threading.py | 127 |
1 files changed, 80 insertions, 47 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 75235ad..32f9e99 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -1,16 +1,16 @@ # Very rudimentary test of threading module import test.support -from test.support import verbose -import os +from test.support import verbose, strip_python_stderr import random import re import sys -import threading -import _thread +_thread = test.support.import_module('_thread') +threading = test.support.import_module('threading') import time import unittest import weakref +import os import subprocess from test import lock_tests @@ -59,7 +59,16 @@ class TestThread(threading.Thread): (self.name, self.nrunning.get())) -class ThreadTests(unittest.TestCase): +class BaseTestCase(unittest.TestCase): + def setUp(self): + self._threads = test.support.threading_setup() + + def tearDown(self): + test.support.threading_cleanup(*self._threads) + test.support.reap_children() + + +class ThreadTests(BaseTestCase): # Create a bunch of threads, let each do some work, wait until all are # done. @@ -89,7 +98,8 @@ class ThreadTests(unittest.TestCase): self.assertTrue(not t.is_alive()) self.assertNotEqual(t.ident, 0) self.assertFalse(t.ident is None) - self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t))) + self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>', + repr(t))) if verbose: print('all tasks done') self.assertEqual(numrunning.get(), 0) @@ -115,9 +125,8 @@ class ThreadTests(unittest.TestCase): try: threading.stack_size(262144) except _thread.error: - if verbose: - print('platform does not support changing thread stack size') - return + raise unittest.SkipTest( + 'platform does not support changing thread stack size') self.test_various_ops() threading.stack_size(0) @@ -128,9 +137,8 @@ class ThreadTests(unittest.TestCase): try: threading.stack_size(0x100000) except _thread.error: - if verbose: - print('platform does not support changing thread stack size') - return + raise unittest.SkipTest( + 'platform does not support changing thread stack size') self.test_various_ops() threading.stack_size(0) @@ -147,9 +155,8 @@ class ThreadTests(unittest.TestCase): tid = _thread.start_new_thread(f, (mutex,)) # Wait for the thread to finish. mutex.acquire() - self.assertTrue(tid in threading._active) - self.assertTrue(isinstance(threading._active[tid], - threading._DummyThread)) + self.assertIn(tid, threading._active) + self.assertIsInstance(threading._active[tid], threading._DummyThread) del threading._active[tid] # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) @@ -158,9 +165,7 @@ class ThreadTests(unittest.TestCase): try: import ctypes except ImportError: - if verbose: - print("test_PyThreadState_SetAsyncExc can't import ctypes") - return # can't do anything + raise unittest.SkipTest("cannot import ctypes") set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc @@ -169,6 +174,27 @@ class ThreadTests(unittest.TestCase): exception = ctypes.py_object(AsyncExc) + # First check it works when setting the exception from the same thread. + tid = _thread.get_ident() + + try: + result = set_async_exc(ctypes.c_long(tid), exception) + # The exception is async, so we might have to keep the VM busy until + # it notices. + while True: + pass + except AsyncExc: + pass + else: + # This code is unreachable but it reflects the intent. If we wanted + # to be smarter the above loop wouldn't be infinite. + self.fail("AsyncExc not raised") + try: + self.assertEqual(result, 1) # one thread state modified + except UnboundLocalError: + # The exception was raised too quickly for us to get the result. + pass + # `worker_started` is set by the thread when it's inside a try/except # block waiting to catch the asynchronously set AsyncExc exception. # `worker_saw_exception` is set by the thread upon catching that @@ -245,9 +271,7 @@ class ThreadTests(unittest.TestCase): try: import ctypes except ImportError: - if verbose: - print("test_finalize_with_runnning_thread can't import ctypes") - return # can't do anything + raise unittest.SkipTest("cannot import ctypes") rc = subprocess.call([sys.executable, "-c", """if 1: import ctypes, sys, time, _thread @@ -302,6 +326,8 @@ class ThreadTests(unittest.TestCase): """], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stderr.close) stdout, stderr = p.communicate() rc = p.returncode self.assertFalse(rc == 2, "interpreted was blocked") @@ -326,30 +352,30 @@ class ThreadTests(unittest.TestCase): """], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stderr.close) stdout, stderr = p.communicate() self.assertEqual(stdout.strip(), b"Woke up, sleep function is: <built-in function sleep>") - stderr = re.sub(br"^\[\d+ refs\]", b"", stderr, re.MULTILINE).strip() + stderr = strip_python_stderr(stderr) self.assertEqual(stderr, b"") def test_enumerate_after_join(self): # Try hard to trigger #1703448: a thread is still returned in # threading.enumerate() after it has been join()ed. enum = threading.enumerate - old_interval = sys.getcheckinterval() + old_interval = sys.getswitchinterval() try: for i in range(1, 100): - # Try a couple times at each thread-switching interval - # to get more interleavings. - sys.setcheckinterval(i // 5) + sys.setswitchinterval(i * 0.0002) t = threading.Thread(target=lambda: None) t.start() t.join() l = enum() - self.assertFalse(t in l, + self.assertNotIn(t, l, "#1703448 triggered after %d trials: %s" % (i, l)) finally: - sys.setcheckinterval(old_interval) + sys.setswitchinterval(old_interval) def test_no_refcycle_through_target(self): class RunSelfFunction(object): @@ -370,7 +396,7 @@ class ThreadTests(unittest.TestCase): weak_cyclic_object = weakref.ref(cyclic_object) cyclic_object.thread.join() del cyclic_object - self.assertEqual(None, weak_cyclic_object(), + self.assertIsNone(weak_cyclic_object(), msg=('%d references still around' % sys.getrefcount(weak_cyclic_object()))) @@ -378,7 +404,7 @@ class ThreadTests(unittest.TestCase): weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) raising_cyclic_object.thread.join() del raising_cyclic_object - self.assertEqual(None, weak_raising_cyclic_object(), + self.assertIsNone(weak_raising_cyclic_object(), msg=('%d references still around' % sys.getrefcount(weak_raising_cyclic_object()))) @@ -395,8 +421,14 @@ class ThreadTests(unittest.TestCase): e.isSet() threading.activeCount() + def test_repr_daemon(self): + t = threading.Thread() + self.assertFalse('daemon' in repr(t)) + t.daemon = True + self.assertTrue('daemon' in repr(t)) + -class ThreadJoinOnShutdown(unittest.TestCase): +class ThreadJoinOnShutdown(BaseTestCase): def _run_and_join(self, script): script = """if 1: @@ -432,11 +464,9 @@ class ThreadJoinOnShutdown(unittest.TestCase): self._run_and_join(script) + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") def test_2_join_in_forked_process(self): # Like the test above, but from a forked interpreter - import os - if not hasattr(os, 'fork'): - return script = """if 1: childpid = os.fork() if childpid != 0: @@ -450,19 +480,16 @@ class ThreadJoinOnShutdown(unittest.TestCase): """ self._run_and_join(script) + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") def test_3_join_in_forked_from_thread(self): # Like the test above, but fork() was called from a worker thread # In the forked process, the main Thread object must be marked as stopped. - import os - if not hasattr(os, 'fork'): - return + # Skip platforms with known problems forking from a worker thread. # See http://bugs.python.org/issue3863. if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', 'os2emx'): - print('Skipping test_3_join_in_forked_from_thread' - ' due to known OS bugs on', sys.platform, file=sys.stderr) - return + raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) script = """if 1: main_thread = threading.current_thread() def worker(): @@ -485,9 +512,9 @@ class ThreadJoinOnShutdown(unittest.TestCase): def assertScriptHasOutput(self, script, expected_output): p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) - rc = p.wait() - data = p.stdout.read().decode().replace('\r', '') - self.assertEqual(rc, 0, "Unexpected error") + stdout, stderr = p.communicate() + data = stdout.decode().replace('\r', '') + self.assertEqual(p.returncode, 0, "Unexpected error") self.assertEqual(data, expected_output) @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") @@ -629,7 +656,7 @@ class ThreadJoinOnShutdown(unittest.TestCase): self.assertScriptHasOutput(script, output) -class ThreadingExceptionTests(unittest.TestCase): +class ThreadingExceptionTests(BaseTestCase): # A RuntimeError should be raised if Thread.start() is called # multiple times. def test_start_thread_again(self): @@ -684,8 +711,11 @@ class ThreadingExceptionTests(unittest.TestCase): class LockTests(lock_tests.LockTests): locktype = staticmethod(threading.Lock) -class RLockTests(lock_tests.RLockTests): - locktype = staticmethod(threading.RLock) +class PyRLockTests(lock_tests.RLockTests): + locktype = staticmethod(threading._PyRLock) + +class CRLockTests(lock_tests.RLockTests): + locktype = staticmethod(threading._CRLock) class EventTests(lock_tests.EventTests): eventtype = staticmethod(threading.Event) @@ -703,14 +733,17 @@ class SemaphoreTests(lock_tests.SemaphoreTests): class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): semtype = staticmethod(threading.BoundedSemaphore) +class BarrierTests(lock_tests.BarrierTests): + barriertype = staticmethod(threading.Barrier) def test_main(): - test.support.run_unittest(LockTests, RLockTests, EventTests, + test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests, ConditionAsRLockTests, ConditionTests, SemaphoreTests, BoundedSemaphoreTests, ThreadTests, ThreadJoinOnShutdown, ThreadingExceptionTests, + BarrierTests ) if __name__ == "__main__": |