diff options
Diffstat (limited to 'Lib/test/test_threading.py')
-rw-r--r-- | Lib/test/test_threading.py | 333 |
1 files changed, 216 insertions, 117 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 63ef7b9..17be84b 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -1,16 +1,19 @@ # Very rudimentary test of threading module import test.support -from test.support import verbose -import os +from test.support import verbose, strip_python_stderr, import_module +from test.script_helper import assert_python_ok + import random import re import sys -import threading -import _thread +_thread = import_module('_thread') +threading = import_module('threading') import time import unittest import weakref +import os +from test.script_helper import assert_python_ok, assert_python_failure import subprocess from test import lock_tests @@ -59,7 +62,16 @@ class TestThread(threading.Thread): (self.name, self.nrunning.get())) -class ThreadTests(unittest.TestCase): +class BaseTestCase(unittest.TestCase): + def setUp(self): + self._threads = test.support.threading_setup() + + def tearDown(self): + test.support.threading_cleanup(*self._threads) + test.support.reap_children() + + +class ThreadTests(BaseTestCase): # Create a bunch of threads, let each do some work, wait until all are # done. @@ -89,7 +101,8 @@ class ThreadTests(unittest.TestCase): self.assertTrue(not t.is_alive()) self.assertNotEqual(t.ident, 0) self.assertFalse(t.ident is None) - self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t))) + self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>', + repr(t))) if verbose: print('all tasks done') self.assertEqual(numrunning.get(), 0) @@ -115,9 +128,8 @@ class ThreadTests(unittest.TestCase): try: threading.stack_size(262144) except _thread.error: - if verbose: - print('platform does not support changing thread stack size') - return + raise unittest.SkipTest( + 'platform does not support changing thread stack size') self.test_various_ops() threading.stack_size(0) @@ -128,9 +140,8 @@ class ThreadTests(unittest.TestCase): try: threading.stack_size(0x100000) except _thread.error: - if verbose: - print('platform does not support changing thread stack size') - return + raise unittest.SkipTest( + 'platform does not support changing thread stack size') self.test_various_ops() threading.stack_size(0) @@ -147,20 +158,14 @@ class ThreadTests(unittest.TestCase): tid = _thread.start_new_thread(f, (mutex,)) # Wait for the thread to finish. mutex.acquire() - self.assertTrue(tid in threading._active) - self.assertTrue(isinstance(threading._active[tid], - threading._DummyThread)) + self.assertIn(tid, threading._active) + self.assertIsInstance(threading._active[tid], threading._DummyThread) del threading._active[tid] # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) # exposed at the Python level. This test relies on ctypes to get at it. def test_PyThreadState_SetAsyncExc(self): - try: - import ctypes - except ImportError: - if verbose: - print("test_PyThreadState_SetAsyncExc can't import ctypes") - return # can't do anything + ctypes = import_module("ctypes") set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc @@ -169,6 +174,27 @@ class ThreadTests(unittest.TestCase): exception = ctypes.py_object(AsyncExc) + # First check it works when setting the exception from the same thread. + tid = _thread.get_ident() + + try: + result = set_async_exc(ctypes.c_long(tid), exception) + # The exception is async, so we might have to keep the VM busy until + # it notices. + while True: + pass + except AsyncExc: + pass + else: + # This code is unreachable but it reflects the intent. If we wanted + # to be smarter the above loop wouldn't be infinite. + self.fail("AsyncExc not raised") + try: + self.assertEqual(result, 1) # one thread state modified + except UnboundLocalError: + # The exception was raised too quickly for us to get the result. + pass + # `worker_started` is set by the thread when it's inside a try/except # block waiting to catch the asynchronously set AsyncExc exception. # `worker_saw_exception` is set by the thread upon catching that @@ -242,14 +268,9 @@ class ThreadTests(unittest.TestCase): # Issue 1402: the PyGILState_Ensure / _Release functions may be called # very late on python exit: on deallocation of a running thread for # example. - try: - import ctypes - except ImportError: - if verbose: - print("test_finalize_with_runnning_thread can't import ctypes") - return # can't do anything + import_module("ctypes") - rc = subprocess.call([sys.executable, "-c", """if 1: + rc, out, err = assert_python_failure("-c", """if 1: import ctypes, sys, time, _thread # This lock is used as a simple event variable. @@ -273,13 +294,13 @@ class ThreadTests(unittest.TestCase): _thread.start_new_thread(waitingThread, ()) ready.acquire() # Be sure the other thread is waiting. sys.exit(42) - """]) + """) self.assertEqual(rc, 42) def test_finalize_with_trace(self): # Issue1733757 # Avoid a deadlock when sys.settrace steps into threading._shutdown - p = subprocess.Popen([sys.executable, "-c", """if 1: + assert_python_ok("-c", """if 1: import sys, threading # A deadlock-killer, to prevent the @@ -299,19 +320,12 @@ class ThreadTests(unittest.TestCase): return func sys.settrace(func) - """], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - rc = p.returncode - self.assertFalse(rc == 2, "interpreted was blocked") - self.assertTrue(rc == 0, - "Unexpected error: " + ascii(stderr)) + """) def test_join_nondaemon_on_shutdown(self): # Issue 1722344 # Raising SystemExit skipped threading._shutdown - p = subprocess.Popen([sys.executable, "-c", """if 1: + rc, out, err = assert_python_ok("-c", """if 1: import threading from time import sleep @@ -323,33 +337,27 @@ class ThreadTests(unittest.TestCase): threading.Thread(target=child).start() raise SystemExit - """], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - self.assertEqual(stdout.strip(), + """) + self.assertEqual(out.strip(), b"Woke up, sleep function is: <built-in function sleep>") - stderr = re.sub(br"^\[\d+ refs\]", b"", stderr, re.MULTILINE).strip() - self.assertEqual(stderr, b"") + self.assertEqual(err, b"") def test_enumerate_after_join(self): # Try hard to trigger #1703448: a thread is still returned in # threading.enumerate() after it has been join()ed. enum = threading.enumerate - old_interval = sys.getcheckinterval() + old_interval = sys.getswitchinterval() try: for i in range(1, 100): - # Try a couple times at each thread-switching interval - # to get more interleavings. - sys.setcheckinterval(i // 5) + sys.setswitchinterval(i * 0.0002) t = threading.Thread(target=lambda: None) t.start() t.join() l = enum() - self.assertFalse(t in l, + self.assertNotIn(t, l, "#1703448 triggered after %d trials: %s" % (i, l)) finally: - sys.setcheckinterval(old_interval) + sys.setswitchinterval(old_interval) def test_no_refcycle_through_target(self): class RunSelfFunction(object): @@ -370,7 +378,7 @@ class ThreadTests(unittest.TestCase): weak_cyclic_object = weakref.ref(cyclic_object) cyclic_object.thread.join() del cyclic_object - self.assertEqual(None, weak_cyclic_object(), + self.assertIsNone(weak_cyclic_object(), msg=('%d references still around' % sys.getrefcount(weak_cyclic_object()))) @@ -378,7 +386,7 @@ class ThreadTests(unittest.TestCase): weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) raising_cyclic_object.thread.join() del raising_cyclic_object - self.assertEqual(None, weak_raising_cyclic_object(), + self.assertIsNone(weak_raising_cyclic_object(), msg=('%d references still around' % sys.getrefcount(weak_raising_cyclic_object()))) @@ -395,8 +403,48 @@ class ThreadTests(unittest.TestCase): e.isSet() threading.activeCount() - -class ThreadJoinOnShutdown(unittest.TestCase): + def test_repr_daemon(self): + t = threading.Thread() + self.assertFalse('daemon' in repr(t)) + t.daemon = True + self.assertTrue('daemon' in repr(t)) + + @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()') + def test_dummy_thread_after_fork(self): + # Issue #14308: a dummy thread in the active list doesn't mess up + # the after-fork mechanism. + code = """if 1: + import _thread, threading, os, time + + def background_thread(evt): + # Creates and registers the _DummyThread instance + threading.current_thread() + evt.set() + time.sleep(10) + + evt = threading.Event() + _thread.start_new_thread(background_thread, (evt,)) + evt.wait() + assert threading.active_count() == 2, threading.active_count() + if os.fork() == 0: + assert threading.active_count() == 1, threading.active_count() + os._exit(0) + else: + os.wait() + """ + _, out, err = assert_python_ok("-c", code) + self.assertEqual(out, b'') + self.assertEqual(err, b'') + + +class ThreadJoinOnShutdown(BaseTestCase): + + # Between fork() and exec(), only async-safe functions are allowed (issues + # #12316 and #11870), and fork() from a worker thread is known to trigger + # problems with some operating systems (issue #3863): skip problematic tests + # on platforms known to behave badly. + platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', + 'os2emx') def _run_and_join(self, script): script = """if 1: @@ -411,13 +459,9 @@ class ThreadJoinOnShutdown(unittest.TestCase): sys.stdout.flush() \n""" + script - p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) - rc = p.wait() - data = p.stdout.read().decode().replace('\r', '') - p.stdout.close() + rc, out, err = assert_python_ok("-c", script) + data = out.decode().replace('\r', '') self.assertEqual(data, "end of main\nend of thread\n") - self.assertFalse(rc == 2, "interpreter was blocked") - self.assertTrue(rc == 0, "Unexpected error") def test_1_join_on_shutdown(self): # The usual case: on exit, wait for a non-daemon thread @@ -431,12 +475,10 @@ class ThreadJoinOnShutdown(unittest.TestCase): """ self._run_and_join(script) - + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") + @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") def test_2_join_in_forked_process(self): # Like the test above, but from a forked interpreter - import os - if not hasattr(os, 'fork'): - return script = """if 1: childpid = os.fork() if childpid != 0: @@ -450,19 +492,12 @@ class ThreadJoinOnShutdown(unittest.TestCase): """ self._run_and_join(script) + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") + @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") def test_3_join_in_forked_from_thread(self): # Like the test above, but fork() was called from a worker thread # In the forked process, the main Thread object must be marked as stopped. - import os - if not hasattr(os, 'fork'): - return - # Skip platforms with known problems forking from a worker thread. - # See http://bugs.python.org/issue3863. - if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', - 'os2emx'): - print('Skipping test_3_join_in_forked_from_thread' - ' due to known OS bugs on', sys.platform, file=sys.stderr) - return + script = """if 1: main_thread = threading.current_thread() def worker(): @@ -483,23 +518,16 @@ class ThreadJoinOnShutdown(unittest.TestCase): self._run_and_join(script) def assertScriptHasOutput(self, script, expected_output): - p = subprocess.Popen([sys.executable, "-c", script], - stdout=subprocess.PIPE) - rc = p.wait() - data = p.stdout.read().decode().replace('\r', '') - self.assertEqual(rc, 0, "Unexpected error") + rc, out, err = assert_python_ok("-c", script) + data = out.decode().replace('\r', '') self.assertEqual(data, expected_output) @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") + @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") def test_4_joining_across_fork_in_worker_thread(self): # There used to be a possible deadlock when forking from a child # thread. See http://bugs.python.org/issue6643. - # Skip platforms with known problems forking from a worker thread. - # See http://bugs.python.org/issue3863. - if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): - raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) - # The script takes the following steps: # - The main thread in the parent process starts a new thread and then # tries to join it. @@ -568,6 +596,7 @@ class ThreadJoinOnShutdown(unittest.TestCase): self.assertScriptHasOutput(script, "end of main\n") @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") + @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") def test_5_clear_waiter_locks_to_avoid_crash(self): # Check that a spawned thread that forks doesn't segfault on certain # platforms, namely OS X. This used to happen if there was a waiter @@ -580,10 +609,6 @@ class ThreadJoinOnShutdown(unittest.TestCase): # lock will be acquired, we can't know if the internal mutex will be # acquired at the time of the fork. - # Skip platforms with known problems forking from a worker thread. - # See http://bugs.python.org/issue3863. - if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): - raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) script = """if True: import os, time, threading @@ -628,8 +653,75 @@ class ThreadJoinOnShutdown(unittest.TestCase): output = "end of worker thread\nend of main thread\n" self.assertScriptHasOutput(script, output) + @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") + def test_6_daemon_threads(self): + # Check that a daemon thread cannot crash the interpreter on shutdown + # by manipulating internal structures that are being disposed of in + # the main thread. + script = """if True: + import os + import random + import sys + import time + import threading + + thread_has_run = set() + + def random_io(): + '''Loop for a while sleeping random tiny amounts and doing some I/O.''' + while True: + in_f = open(os.__file__, 'rb') + stuff = in_f.read(200) + null_f = open(os.devnull, 'wb') + null_f.write(stuff) + time.sleep(random.random() / 1995) + null_f.close() + in_f.close() + thread_has_run.add(threading.current_thread()) + + def main(): + count = 0 + for _ in range(40): + new_thread = threading.Thread(target=random_io) + new_thread.daemon = True + new_thread.start() + count += 1 + while len(thread_has_run) < count: + time.sleep(0.001) + # Trigger process shutdown + sys.exit(0) + + main() + """ + rc, out, err = assert_python_ok('-c', script) + self.assertFalse(err) + + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") + @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") + def test_reinit_tls_after_fork(self): + # Issue #13817: fork() would deadlock in a multithreaded program with + # the ad-hoc TLS implementation. + + def do_fork_and_wait(): + # just fork a child process and wait it + pid = os.fork() + if pid > 0: + os.waitpid(pid, 0) + else: + os._exit(0) + + # start a bunch of threads that will fork() child processes + threads = [] + for i in range(16): + t = threading.Thread(target=do_fork_and_wait) + threads.append(t) + t.start() + + for t in threads: + t.join() + -class ThreadingExceptionTests(unittest.TestCase): +class ThreadingExceptionTests(BaseTestCase): # A RuntimeError should be raised if Thread.start() is called # multiple times. def test_start_thread_again(self): @@ -650,29 +742,6 @@ class ThreadingExceptionTests(unittest.TestCase): thread.start() self.assertRaises(RuntimeError, setattr, thread, "daemon", True) - -class LockTests(lock_tests.LockTests): - locktype = staticmethod(threading.Lock) - -class RLockTests(lock_tests.RLockTests): - locktype = staticmethod(threading.RLock) - -class EventTests(lock_tests.EventTests): - eventtype = staticmethod(threading.Event) - -class ConditionAsRLockTests(lock_tests.RLockTests): - # An Condition uses an RLock by default and exports its API. - locktype = staticmethod(threading.Condition) - -class ConditionTests(lock_tests.ConditionTests): - condtype = staticmethod(threading.Condition) - -class SemaphoreTests(lock_tests.SemaphoreTests): - semtype = staticmethod(threading.Semaphore) - -class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): - semtype = staticmethod(threading.BoundedSemaphore) - @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem') def test_recursion_limit(self): # Issue 9670 @@ -699,19 +768,49 @@ class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): """ expected_output = "end of main thread\n" p = subprocess.Popen([sys.executable, "-c", script], - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() data = stdout.decode().replace('\r', '') - self.assertEqual(p.returncode, 0, "Unexpected error") + self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode()) self.assertEqual(data, expected_output) +class LockTests(lock_tests.LockTests): + locktype = staticmethod(threading.Lock) + +class PyRLockTests(lock_tests.RLockTests): + locktype = staticmethod(threading._PyRLock) + +@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C') +class CRLockTests(lock_tests.RLockTests): + locktype = staticmethod(threading._CRLock) + +class EventTests(lock_tests.EventTests): + eventtype = staticmethod(threading.Event) + +class ConditionAsRLockTests(lock_tests.RLockTests): + # An Condition uses an RLock by default and exports its API. + locktype = staticmethod(threading.Condition) + +class ConditionTests(lock_tests.ConditionTests): + condtype = staticmethod(threading.Condition) + +class SemaphoreTests(lock_tests.SemaphoreTests): + semtype = staticmethod(threading.Semaphore) + +class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): + semtype = staticmethod(threading.BoundedSemaphore) + +class BarrierTests(lock_tests.BarrierTests): + barriertype = staticmethod(threading.Barrier) + def test_main(): - test.support.run_unittest(LockTests, RLockTests, EventTests, + test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests, ConditionAsRLockTests, ConditionTests, SemaphoreTests, BoundedSemaphoreTests, ThreadTests, ThreadJoinOnShutdown, ThreadingExceptionTests, + BarrierTests ) if __name__ == "__main__": |