diff options
Diffstat (limited to 'Lib/test/test_threading.py')
| -rw-r--r-- | Lib/test/test_threading.py | 223 |
1 files changed, 213 insertions, 10 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index fef3314..58b0b4e 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -11,6 +11,7 @@ import re import sys _thread = import_module('_thread') threading = import_module('threading') +import _testcapi import time import unittest import weakref @@ -20,6 +21,15 @@ import subprocess from test import lock_tests + +# Between fork() and exec(), only async-safe functions are allowed (issues +# #12316 and #11870), and fork() from a worker thread is known to trigger +# problems with some operating systems (issue #3863): skip problematic tests +# on platforms known to behave badly. +platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', + 'hp-ux11') + + # A trivial mutable counter. class Counter(object): def __init__(self): @@ -99,7 +109,7 @@ class ThreadTests(BaseTestCase): if verbose: print('waiting for all tasks to complete') for t in threads: - t.join(NUMTASKS) + t.join() self.assertTrue(not t.is_alive()) self.assertNotEqual(t.ident, 0) self.assertFalse(t.ident is None) @@ -467,15 +477,104 @@ class ThreadTests(BaseTestCase): pid, status = os.waitpid(pid, 0) self.assertEqual(0, status) + def test_main_thread(self): + main = threading.main_thread() + self.assertEqual(main.name, 'MainThread') + self.assertEqual(main.ident, threading.current_thread().ident) + self.assertEqual(main.ident, threading.get_ident()) -class ThreadJoinOnShutdown(BaseTestCase): + def f(): + self.assertNotEqual(threading.main_thread().ident, + threading.current_thread().ident) + th = threading.Thread(target=f) + th.start() + th.join() + + @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") + @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") + def test_main_thread_after_fork(self): + code = """if 1: + import os, threading - # Between fork() and exec(), only async-safe functions are allowed (issues - # #12316 and #11870), and fork() from a worker thread is known to trigger - # problems with some operating systems (issue #3863): skip problematic tests - # on platforms known to behave badly. - platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', - 'os2emx', 'hp-ux11') + pid = os.fork() + if pid == 0: + main = threading.main_thread() + print(main.name) + print(main.ident == threading.current_thread().ident) + print(main.ident == threading.get_ident()) + else: + os.waitpid(pid, 0) + """ + _, out, err = assert_python_ok("-c", code) + data = out.decode().replace('\r', '') + self.assertEqual(err, b"") + self.assertEqual(data, "MainThread\nTrue\nTrue\n") + + @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") + @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") + @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") + def test_main_thread_after_fork_from_nonmain_thread(self): + code = """if 1: + import os, threading, sys + + def f(): + pid = os.fork() + if pid == 0: + main = threading.main_thread() + print(main.name) + print(main.ident == threading.current_thread().ident) + print(main.ident == threading.get_ident()) + # stdout is fully buffered because not a tty, + # we have to flush before exit. + sys.stdout.flush() + else: + os.waitpid(pid, 0) + + th = threading.Thread(target=f) + th.start() + th.join() + """ + _, out, err = assert_python_ok("-c", code) + data = out.decode().replace('\r', '') + self.assertEqual(err, b"") + self.assertEqual(data, "Thread-1\nTrue\nTrue\n") + + def test_tstate_lock(self): + # Test an implementation detail of Thread objects. + started = _thread.allocate_lock() + finish = _thread.allocate_lock() + started.acquire() + finish.acquire() + def f(): + started.release() + finish.acquire() + time.sleep(0.01) + # The tstate lock is None until the thread is started + t = threading.Thread(target=f) + self.assertIs(t._tstate_lock, None) + t.start() + started.acquire() + self.assertTrue(t.is_alive()) + # The tstate lock can't be acquired when the thread is running + # (or suspended). + tstate_lock = t._tstate_lock + self.assertFalse(tstate_lock.acquire(timeout=0), False) + finish.release() + # When the thread ends, the state_lock can be successfully + # acquired. + self.assertTrue(tstate_lock.acquire(timeout=5), False) + # But is_alive() is still True: we hold _tstate_lock now, which + # prevents is_alive() from knowing the thread's end-of-life C code + # is done. + self.assertTrue(t.is_alive()) + # Let is_alive() find out the C code is done. + tstate_lock.release() + self.assertFalse(t.is_alive()) + # And verify the thread disposed of _tstate_lock. + self.assertTrue(t._tstate_lock is None) + + +class ThreadJoinOnShutdown(BaseTestCase): def _run_and_join(self, script): script = """if 1: @@ -604,7 +703,7 @@ class ThreadJoinOnShutdown(BaseTestCase): # someone else tries to fix this test case by acquiring this lock # before forking instead of resetting it, the test case will # deadlock when it shouldn't. - condition = w._block + condition = w._stopped._cond orig_acquire = condition.acquire call_count_lock = threading.Lock() call_count = 0 @@ -668,7 +767,7 @@ class ThreadJoinOnShutdown(BaseTestCase): # causes the worker to fork. At this point, the problematic waiter # lock has been acquired once by the waiter and has been put onto # the waiters list. - condition = w._block + condition = w._stopped._cond orig_release_save = condition._release_save def my_release_save(): global start_fork @@ -751,6 +850,110 @@ class ThreadJoinOnShutdown(BaseTestCase): for t in threads: t.join() + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") + def test_clear_threads_states_after_fork(self): + # Issue #17094: check that threads states are cleared after fork() + + # start a bunch of threads + threads = [] + for i in range(16): + t = threading.Thread(target=lambda : time.sleep(0.3)) + threads.append(t) + t.start() + + pid = os.fork() + if pid == 0: + # check that threads states have been cleared + if len(sys._current_frames()) == 1: + os._exit(0) + else: + os._exit(1) + else: + _, status = os.waitpid(pid, 0) + self.assertEqual(0, status) + + for t in threads: + t.join() + + +class SubinterpThreadingTests(BaseTestCase): + + def test_threads_join(self): + # Non-daemon threads should be joined at subinterpreter shutdown + # (issue #18808) + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + code = r"""if 1: + import os + import threading + import time + + def f(): + # Sleep a bit so that the thread is still running when + # Py_EndInterpreter is called. + time.sleep(0.05) + os.write(%d, b"x") + threading.Thread(target=f).start() + """ % (w,) + ret = _testcapi.run_in_subinterp(code) + self.assertEqual(ret, 0) + # The thread was joined properly. + self.assertEqual(os.read(r, 1), b"x") + + def test_threads_join_2(self): + # Same as above, but a delay gets introduced after the thread's + # Python code returned but before the thread state is deleted. + # To achieve this, we register a thread-local object which sleeps + # a bit when deallocated. + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + code = r"""if 1: + import os + import threading + import time + + class Sleeper: + def __del__(self): + time.sleep(0.05) + + tls = threading.local() + + def f(): + # Sleep a bit so that the thread is still running when + # Py_EndInterpreter is called. + time.sleep(0.05) + tls.x = Sleeper() + os.write(%d, b"x") + threading.Thread(target=f).start() + """ % (w,) + ret = _testcapi.run_in_subinterp(code) + self.assertEqual(ret, 0) + # The thread was joined properly. + self.assertEqual(os.read(r, 1), b"x") + + def test_daemon_threads_fatal_error(self): + subinterp_code = r"""if 1: + import os + import threading + import time + + def f(): + # Make sure the daemon thread is still running when + # Py_EndInterpreter is called. + time.sleep(10) + threading.Thread(target=f, daemon=True).start() + """ + script = r"""if 1: + import _testcapi + + _testcapi.run_in_subinterp(%r) + """ % (subinterp_code,) + rc, out, err = assert_python_failure("-c", script) + self.assertIn("Fatal Python error: Py_EndInterpreter: " + "not the last thread", err.decode()) + class ThreadingExceptionTests(BaseTestCase): # A RuntimeError should be raised if Thread.start() is called |
