diff options
Diffstat (limited to 'Lib/test/test_threading.py')
-rw-r--r-- | Lib/test/test_threading.py | 500 |
1 files changed, 319 insertions, 181 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index fee48a3..73839e7 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -11,19 +11,25 @@ import re import sys _thread = import_module('_thread') threading = import_module('threading') +import _testcapi import time import unittest import weakref import os from test.script_helper import assert_python_ok, assert_python_failure import subprocess -try: - import _testcapi -except ImportError: - _testcapi = None from test import lock_tests + +# Between fork() and exec(), only async-safe functions are allowed (issues +# #12316 and #11870), and fork() from a worker thread is known to trigger +# problems with some operating systems (issue #3863): skip problematic tests +# on platforms known to behave badly. +platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', + 'hp-ux11') + + # A trivial mutable counter. class Counter(object): def __init__(self): @@ -103,7 +109,7 @@ class ThreadTests(BaseTestCase): if verbose: print('waiting for all tasks to complete') for t in threads: - t.join(NUMTASKS) + t.join() self.assertTrue(not t.is_alive()) self.assertNotEqual(t.ident, 0) self.assertFalse(t.ident is None) @@ -471,6 +477,127 @@ class ThreadTests(BaseTestCase): pid, status = os.waitpid(pid, 0) self.assertEqual(0, status) + def test_main_thread(self): + main = threading.main_thread() + self.assertEqual(main.name, 'MainThread') + self.assertEqual(main.ident, threading.current_thread().ident) + self.assertEqual(main.ident, threading.get_ident()) + + def f(): + self.assertNotEqual(threading.main_thread().ident, + threading.current_thread().ident) + th = threading.Thread(target=f) + th.start() + th.join() + + @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") + @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") + def test_main_thread_after_fork(self): + code = """if 1: + import os, threading + + pid = os.fork() + if pid == 0: + main = threading.main_thread() + print(main.name) + print(main.ident == threading.current_thread().ident) + print(main.ident == threading.get_ident()) + else: + os.waitpid(pid, 0) + """ + _, out, err = assert_python_ok("-c", code) + data = out.decode().replace('\r', '') + self.assertEqual(err, b"") + self.assertEqual(data, "MainThread\nTrue\nTrue\n") + + @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") + @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") + @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") + def test_main_thread_after_fork_from_nonmain_thread(self): + code = """if 1: + import os, threading, sys + + def f(): + pid = os.fork() + if pid == 0: + main = threading.main_thread() + print(main.name) + print(main.ident == threading.current_thread().ident) + print(main.ident == threading.get_ident()) + # stdout is fully buffered because not a tty, + # we have to flush before exit. + sys.stdout.flush() + else: + os.waitpid(pid, 0) + + th = threading.Thread(target=f) + th.start() + th.join() + """ + _, out, err = assert_python_ok("-c", code) + data = out.decode().replace('\r', '') + self.assertEqual(err, b"") + self.assertEqual(data, "Thread-1\nTrue\nTrue\n") + + def test_tstate_lock(self): + # Test an implementation detail of Thread objects. + started = _thread.allocate_lock() + finish = _thread.allocate_lock() + started.acquire() + finish.acquire() + def f(): + started.release() + finish.acquire() + time.sleep(0.01) + # The tstate lock is None until the thread is started + t = threading.Thread(target=f) + self.assertIs(t._tstate_lock, None) + t.start() + started.acquire() + self.assertTrue(t.is_alive()) + # The tstate lock can't be acquired when the thread is running + # (or suspended). + tstate_lock = t._tstate_lock + self.assertFalse(tstate_lock.acquire(timeout=0), False) + finish.release() + # When the thread ends, the state_lock can be successfully + # acquired. + self.assertTrue(tstate_lock.acquire(timeout=5), False) + # But is_alive() is still True: we hold _tstate_lock now, which + # prevents is_alive() from knowing the thread's end-of-life C code + # is done. + self.assertTrue(t.is_alive()) + # Let is_alive() find out the C code is done. + tstate_lock.release() + self.assertFalse(t.is_alive()) + # And verify the thread disposed of _tstate_lock. + self.assertTrue(t._tstate_lock is None) + + def test_repr_stopped(self): + # Verify that "stopped" shows up in repr(Thread) appropriately. + started = _thread.allocate_lock() + finish = _thread.allocate_lock() + started.acquire() + finish.acquire() + def f(): + started.release() + finish.acquire() + t = threading.Thread(target=f) + t.start() + started.acquire() + self.assertIn("started", repr(t)) + finish.release() + # "stopped" should appear in the repr in a reasonable amount of time. + # Implementation detail: as of this writing, that's trivially true + # if .join() is called, and almost trivially true if .is_alive() is + # called. The detail we're testing here is that "stopped" shows up + # "all on its own". + LOOKING_FOR = "stopped" + for i in range(500): + if LOOKING_FOR in repr(t): + break + time.sleep(0.01) + self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds def test_BoundedSemaphore_limit(self): # BoundedSemaphore should raise ValueError if released too often. @@ -490,14 +617,91 @@ class ThreadTests(BaseTestCase): t.join() self.assertRaises(ValueError, bs.release) -class ThreadJoinOnShutdown(BaseTestCase): + def test_locals_at_exit(self): + # Issue #19466: thread locals must not be deleted before destructors + # are called + rc, out, err = assert_python_ok("-c", """if 1: + import threading + + class Atexit: + def __del__(self): + print("thread_dict.atexit = %r" % thread_dict.atexit) + + thread_dict = threading.local() + thread_dict.atexit = "atexit" + + atexit = Atexit() + """) + self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'atexit'") + + def test_warnings_at_exit(self): + # Issue #19466: try to call most destructors at Python shutdown before + # destroying Python thread states + filename = __file__ + rc, out, err = assert_python_ok("-Wd", "-c", """if 1: + import time + import threading + + def open_sleep(): + # a warning will be emitted when the open file will be + # destroyed (without being explicitly closed) while the daemon + # thread is destroyed + fileobj = open(%a, 'rb') + start_event.set() + time.sleep(60.0) + + start_event = threading.Event() + + thread = threading.Thread(target=open_sleep) + thread.daemon = True + thread.start() + + # wait until the thread started + start_event.wait() + """ % filename) + self.assertRegex(err.rstrip(), + b"^sys:1: ResourceWarning: unclosed file ") + + def test_frame_tstate_tracing(self): + # Issue #14432: Crash when a generator is created in a C thread that is + # destroyed while the generator is still used. The issue was that a + # generator contains a frame, and the frame kept a reference to the + # Python state of the destroyed C thread. The crash occurs when a trace + # function is setup. + + def noop_trace(frame, event, arg): + # no operation + return noop_trace + + def generator(): + while 1: + yield "genereator" + + def callback(): + if callback.gen is None: + callback.gen = generator() + return next(callback.gen) + callback.gen = None + + old_trace = sys.gettrace() + sys.settrace(noop_trace) + try: + # Install a trace function + threading.settrace(noop_trace) + + # Create a generator in a C thread which exits after the call + _testcapi.call_in_temporary_c_thread(callback) + + # Call the generator in a different Python thread, check that the + # generator didn't keep a reference to the destroyed thread state + for test in range(3): + # The trace function is still called here + callback() + finally: + sys.settrace(old_trace) - # Between fork() and exec(), only async-safe functions are allowed (issues - # #12316 and #11870), and fork() from a worker thread is known to trigger - # problems with some operating systems (issue #3863): skip problematic tests - # on platforms known to behave badly. - platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', - 'os2emx', 'hp-ux11') + +class ThreadJoinOnShutdown(BaseTestCase): def _run_and_join(self, script): script = """if 1: @@ -570,144 +774,8 @@ class ThreadJoinOnShutdown(BaseTestCase): """ self._run_and_join(script) - def assertScriptHasOutput(self, script, expected_output): - rc, out, err = assert_python_ok("-c", script) - data = out.decode().replace('\r', '') - self.assertEqual(data, expected_output) - - @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - def test_4_joining_across_fork_in_worker_thread(self): - # There used to be a possible deadlock when forking from a child - # thread. See http://bugs.python.org/issue6643. - - # The script takes the following steps: - # - The main thread in the parent process starts a new thread and then - # tries to join it. - # - The join operation acquires the Lock inside the thread's _block - # Condition. (See threading.py:Thread.join().) - # - We stub out the acquire method on the condition to force it to wait - # until the child thread forks. (See LOCK ACQUIRED HERE) - # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS - # HERE) - # - The main thread of the parent process enters Condition.wait(), - # which releases the lock on the child thread. - # - The child process returns. Without the necessary fix, when the - # main thread of the child process (which used to be the child thread - # in the parent process) attempts to exit, it will try to acquire the - # lock in the Thread._block Condition object and hang, because the - # lock was held across the fork. - - script = """if 1: - import os, time, threading - - finish_join = False - start_fork = False - - def worker(): - # Wait until this thread's lock is acquired before forking to - # create the deadlock. - global finish_join - while not start_fork: - time.sleep(0.01) - # LOCK HELD: Main thread holds lock across this call. - childpid = os.fork() - finish_join = True - if childpid != 0: - # Parent process just waits for child. - os.waitpid(childpid, 0) - # Child process should just return. - - w = threading.Thread(target=worker) - - # Stub out the private condition variable's lock acquire method. - # This acquires the lock and then waits until the child has forked - # before returning, which will release the lock soon after. If - # someone else tries to fix this test case by acquiring this lock - # before forking instead of resetting it, the test case will - # deadlock when it shouldn't. - condition = w._block - orig_acquire = condition.acquire - call_count_lock = threading.Lock() - call_count = 0 - def my_acquire(): - global call_count - global start_fork - orig_acquire() # LOCK ACQUIRED HERE - start_fork = True - if call_count == 0: - while not finish_join: - time.sleep(0.01) # WORKER THREAD FORKS HERE - with call_count_lock: - call_count += 1 - condition.acquire = my_acquire - - w.start() - w.join() - print('end of main') - """ - self.assertScriptHasOutput(script, "end of main\n") - - @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") - @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - def test_5_clear_waiter_locks_to_avoid_crash(self): - # Check that a spawned thread that forks doesn't segfault on certain - # platforms, namely OS X. This used to happen if there was a waiter - # lock in the thread's condition variable's waiters list. Even though - # we know the lock will be held across the fork, it is not safe to - # release locks held across forks on all platforms, so releasing the - # waiter lock caused a segfault on OS X. Furthermore, since locks on - # OS X are (as of this writing) implemented with a mutex + condition - # variable instead of a semaphore, while we know that the Python-level - # lock will be acquired, we can't know if the internal mutex will be - # acquired at the time of the fork. - - script = """if True: - import os, time, threading - - start_fork = False - - def worker(): - # Wait until the main thread has attempted to join this thread - # before continuing. - while not start_fork: - time.sleep(0.01) - childpid = os.fork() - if childpid != 0: - # Parent process just waits for child. - (cpid, rc) = os.waitpid(childpid, 0) - assert cpid == childpid - assert rc == 0 - print('end of worker thread') - else: - # Child process should just return. - pass - - w = threading.Thread(target=worker) - - # Stub out the private condition variable's _release_save method. - # This releases the condition's lock and flips the global that - # causes the worker to fork. At this point, the problematic waiter - # lock has been acquired once by the waiter and has been put onto - # the waiters list. - condition = w._block - orig_release_save = condition._release_save - def my_release_save(): - global start_fork - orig_release_save() - # Waiter lock held here, condition lock released. - start_fork = True - condition._release_save = my_release_save - - w.start() - w.join() - print('end of main thread') - """ - output = "end of worker thread\nend of main thread\n" - self.assertScriptHasOutput(script, output) - - @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - def test_6_daemon_threads(self): + def test_4_daemon_threads(self): # Check that a daemon thread cannot crash the interpreter on shutdown # by manipulating internal structures that are being disposed of in # the main thread. @@ -717,6 +785,10 @@ class ThreadJoinOnShutdown(BaseTestCase): import sys import time import threading + import warnings + + # ignore "unclosed file ..." warnings + warnings.filterwarnings('ignore', '', ResourceWarning) thread_has_run = set() @@ -773,44 +845,110 @@ class ThreadJoinOnShutdown(BaseTestCase): for t in threads: t.join() - @unittest.skipIf(_testcapi is None, "need _testcapi module") - def test_frame_tstate_tracing(self): - # Issue #14432: Crash when a generator is created in a C thread that is - # destroyed while the generator is still used. The issue was that a - # generator contains a frame, and the frame kept a reference to the - # Python state of the destroyed C thread. The crash occurs when a trace - # function is setup. + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") + def test_clear_threads_states_after_fork(self): + # Issue #17094: check that threads states are cleared after fork() - def noop_trace(frame, event, arg): - # no operation - return noop_trace + # start a bunch of threads + threads = [] + for i in range(16): + t = threading.Thread(target=lambda : time.sleep(0.3)) + threads.append(t) + t.start() - def generator(): - while 1: - yield "genereator" + pid = os.fork() + if pid == 0: + # check that threads states have been cleared + if len(sys._current_frames()) == 1: + os._exit(0) + else: + os._exit(1) + else: + _, status = os.waitpid(pid, 0) + self.assertEqual(0, status) - def callback(): - if callback.gen is None: - callback.gen = generator() - return next(callback.gen) - callback.gen = None + for t in threads: + t.join() - old_trace = sys.gettrace() - sys.settrace(noop_trace) - try: - # Install a trace function - threading.settrace(noop_trace) - # Create a generator in a C thread which exits after the call - _testcapi.call_in_temporary_c_thread(callback) +class SubinterpThreadingTests(BaseTestCase): - # Call the generator in a different Python thread, check that the - # generator didn't keep a reference to the destroyed thread state - for test in range(3): - # The trace function is still called here - callback() - finally: - sys.settrace(old_trace) + def test_threads_join(self): + # Non-daemon threads should be joined at subinterpreter shutdown + # (issue #18808) + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + code = r"""if 1: + import os + import threading + import time + + def f(): + # Sleep a bit so that the thread is still running when + # Py_EndInterpreter is called. + time.sleep(0.05) + os.write(%d, b"x") + threading.Thread(target=f).start() + """ % (w,) + ret = test.support.run_in_subinterp(code) + self.assertEqual(ret, 0) + # The thread was joined properly. + self.assertEqual(os.read(r, 1), b"x") + + def test_threads_join_2(self): + # Same as above, but a delay gets introduced after the thread's + # Python code returned but before the thread state is deleted. + # To achieve this, we register a thread-local object which sleeps + # a bit when deallocated. + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + code = r"""if 1: + import os + import threading + import time + + class Sleeper: + def __del__(self): + time.sleep(0.05) + + tls = threading.local() + + def f(): + # Sleep a bit so that the thread is still running when + # Py_EndInterpreter is called. + time.sleep(0.05) + tls.x = Sleeper() + os.write(%d, b"x") + threading.Thread(target=f).start() + """ % (w,) + ret = test.support.run_in_subinterp(code) + self.assertEqual(ret, 0) + # The thread was joined properly. + self.assertEqual(os.read(r, 1), b"x") + + def test_daemon_threads_fatal_error(self): + subinterp_code = r"""if 1: + import os + import threading + import time + + def f(): + # Make sure the daemon thread is still running when + # Py_EndInterpreter is called. + time.sleep(10) + threading.Thread(target=f, daemon=True).start() + """ + script = r"""if 1: + import _testcapi + + _testcapi.run_in_subinterp(%r) + """ % (subinterp_code,) + with test.support.SuppressCrashReport(): + rc, out, err = assert_python_failure("-c", script) + self.assertIn("Fatal Python error: Py_EndInterpreter: " + "not the last thread", err.decode()) class ThreadingExceptionTests(BaseTestCase): |