summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_threading.py')
-rw-r--r--Lib/test/test_threading.py500
1 files changed, 319 insertions, 181 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index fee48a3..73839e7 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -11,19 +11,25 @@ import re
import sys
_thread = import_module('_thread')
threading = import_module('threading')
+import _testcapi
import time
import unittest
import weakref
import os
from test.script_helper import assert_python_ok, assert_python_failure
import subprocess
-try:
- import _testcapi
-except ImportError:
- _testcapi = None
from test import lock_tests
+
+# Between fork() and exec(), only async-safe functions are allowed (issues
+# #12316 and #11870), and fork() from a worker thread is known to trigger
+# problems with some operating systems (issue #3863): skip problematic tests
+# on platforms known to behave badly.
+platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
+ 'hp-ux11')
+
+
# A trivial mutable counter.
class Counter(object):
def __init__(self):
@@ -103,7 +109,7 @@ class ThreadTests(BaseTestCase):
if verbose:
print('waiting for all tasks to complete')
for t in threads:
- t.join(NUMTASKS)
+ t.join()
self.assertTrue(not t.is_alive())
self.assertNotEqual(t.ident, 0)
self.assertFalse(t.ident is None)
@@ -471,6 +477,127 @@ class ThreadTests(BaseTestCase):
pid, status = os.waitpid(pid, 0)
self.assertEqual(0, status)
+ def test_main_thread(self):
+ main = threading.main_thread()
+ self.assertEqual(main.name, 'MainThread')
+ self.assertEqual(main.ident, threading.current_thread().ident)
+ self.assertEqual(main.ident, threading.get_ident())
+
+ def f():
+ self.assertNotEqual(threading.main_thread().ident,
+ threading.current_thread().ident)
+ th = threading.Thread(target=f)
+ th.start()
+ th.join()
+
+ @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
+ @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
+ def test_main_thread_after_fork(self):
+ code = """if 1:
+ import os, threading
+
+ pid = os.fork()
+ if pid == 0:
+ main = threading.main_thread()
+ print(main.name)
+ print(main.ident == threading.current_thread().ident)
+ print(main.ident == threading.get_ident())
+ else:
+ os.waitpid(pid, 0)
+ """
+ _, out, err = assert_python_ok("-c", code)
+ data = out.decode().replace('\r', '')
+ self.assertEqual(err, b"")
+ self.assertEqual(data, "MainThread\nTrue\nTrue\n")
+
+ @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
+ @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
+ @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
+ def test_main_thread_after_fork_from_nonmain_thread(self):
+ code = """if 1:
+ import os, threading, sys
+
+ def f():
+ pid = os.fork()
+ if pid == 0:
+ main = threading.main_thread()
+ print(main.name)
+ print(main.ident == threading.current_thread().ident)
+ print(main.ident == threading.get_ident())
+ # stdout is fully buffered because not a tty,
+ # we have to flush before exit.
+ sys.stdout.flush()
+ else:
+ os.waitpid(pid, 0)
+
+ th = threading.Thread(target=f)
+ th.start()
+ th.join()
+ """
+ _, out, err = assert_python_ok("-c", code)
+ data = out.decode().replace('\r', '')
+ self.assertEqual(err, b"")
+ self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
+
+ def test_tstate_lock(self):
+ # Test an implementation detail of Thread objects.
+ started = _thread.allocate_lock()
+ finish = _thread.allocate_lock()
+ started.acquire()
+ finish.acquire()
+ def f():
+ started.release()
+ finish.acquire()
+ time.sleep(0.01)
+ # The tstate lock is None until the thread is started
+ t = threading.Thread(target=f)
+ self.assertIs(t._tstate_lock, None)
+ t.start()
+ started.acquire()
+ self.assertTrue(t.is_alive())
+ # The tstate lock can't be acquired when the thread is running
+ # (or suspended).
+ tstate_lock = t._tstate_lock
+ self.assertFalse(tstate_lock.acquire(timeout=0), False)
+ finish.release()
+ # When the thread ends, the state_lock can be successfully
+ # acquired.
+ self.assertTrue(tstate_lock.acquire(timeout=5), False)
+ # But is_alive() is still True: we hold _tstate_lock now, which
+ # prevents is_alive() from knowing the thread's end-of-life C code
+ # is done.
+ self.assertTrue(t.is_alive())
+ # Let is_alive() find out the C code is done.
+ tstate_lock.release()
+ self.assertFalse(t.is_alive())
+ # And verify the thread disposed of _tstate_lock.
+ self.assertTrue(t._tstate_lock is None)
+
+ def test_repr_stopped(self):
+ # Verify that "stopped" shows up in repr(Thread) appropriately.
+ started = _thread.allocate_lock()
+ finish = _thread.allocate_lock()
+ started.acquire()
+ finish.acquire()
+ def f():
+ started.release()
+ finish.acquire()
+ t = threading.Thread(target=f)
+ t.start()
+ started.acquire()
+ self.assertIn("started", repr(t))
+ finish.release()
+ # "stopped" should appear in the repr in a reasonable amount of time.
+ # Implementation detail: as of this writing, that's trivially true
+ # if .join() is called, and almost trivially true if .is_alive() is
+ # called. The detail we're testing here is that "stopped" shows up
+ # "all on its own".
+ LOOKING_FOR = "stopped"
+ for i in range(500):
+ if LOOKING_FOR in repr(t):
+ break
+ time.sleep(0.01)
+ self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
def test_BoundedSemaphore_limit(self):
# BoundedSemaphore should raise ValueError if released too often.
@@ -490,14 +617,91 @@ class ThreadTests(BaseTestCase):
t.join()
self.assertRaises(ValueError, bs.release)
-class ThreadJoinOnShutdown(BaseTestCase):
+ def test_locals_at_exit(self):
+ # Issue #19466: thread locals must not be deleted before destructors
+ # are called
+ rc, out, err = assert_python_ok("-c", """if 1:
+ import threading
+
+ class Atexit:
+ def __del__(self):
+ print("thread_dict.atexit = %r" % thread_dict.atexit)
+
+ thread_dict = threading.local()
+ thread_dict.atexit = "atexit"
+
+ atexit = Atexit()
+ """)
+ self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'atexit'")
+
+ def test_warnings_at_exit(self):
+ # Issue #19466: try to call most destructors at Python shutdown before
+ # destroying Python thread states
+ filename = __file__
+ rc, out, err = assert_python_ok("-Wd", "-c", """if 1:
+ import time
+ import threading
+
+ def open_sleep():
+ # a warning will be emitted when the open file will be
+ # destroyed (without being explicitly closed) while the daemon
+ # thread is destroyed
+ fileobj = open(%a, 'rb')
+ start_event.set()
+ time.sleep(60.0)
+
+ start_event = threading.Event()
+
+ thread = threading.Thread(target=open_sleep)
+ thread.daemon = True
+ thread.start()
+
+ # wait until the thread started
+ start_event.wait()
+ """ % filename)
+ self.assertRegex(err.rstrip(),
+ b"^sys:1: ResourceWarning: unclosed file ")
+
+ def test_frame_tstate_tracing(self):
+ # Issue #14432: Crash when a generator is created in a C thread that is
+ # destroyed while the generator is still used. The issue was that a
+ # generator contains a frame, and the frame kept a reference to the
+ # Python state of the destroyed C thread. The crash occurs when a trace
+ # function is setup.
+
+ def noop_trace(frame, event, arg):
+ # no operation
+ return noop_trace
+
+ def generator():
+ while 1:
+ yield "genereator"
+
+ def callback():
+ if callback.gen is None:
+ callback.gen = generator()
+ return next(callback.gen)
+ callback.gen = None
+
+ old_trace = sys.gettrace()
+ sys.settrace(noop_trace)
+ try:
+ # Install a trace function
+ threading.settrace(noop_trace)
+
+ # Create a generator in a C thread which exits after the call
+ _testcapi.call_in_temporary_c_thread(callback)
+
+ # Call the generator in a different Python thread, check that the
+ # generator didn't keep a reference to the destroyed thread state
+ for test in range(3):
+ # The trace function is still called here
+ callback()
+ finally:
+ sys.settrace(old_trace)
- # Between fork() and exec(), only async-safe functions are allowed (issues
- # #12316 and #11870), and fork() from a worker thread is known to trigger
- # problems with some operating systems (issue #3863): skip problematic tests
- # on platforms known to behave badly.
- platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
- 'os2emx', 'hp-ux11')
+
+class ThreadJoinOnShutdown(BaseTestCase):
def _run_and_join(self, script):
script = """if 1:
@@ -570,144 +774,8 @@ class ThreadJoinOnShutdown(BaseTestCase):
"""
self._run_and_join(script)
- def assertScriptHasOutput(self, script, expected_output):
- rc, out, err = assert_python_ok("-c", script)
- data = out.decode().replace('\r', '')
- self.assertEqual(data, expected_output)
-
- @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_4_joining_across_fork_in_worker_thread(self):
- # There used to be a possible deadlock when forking from a child
- # thread. See http://bugs.python.org/issue6643.
-
- # The script takes the following steps:
- # - The main thread in the parent process starts a new thread and then
- # tries to join it.
- # - The join operation acquires the Lock inside the thread's _block
- # Condition. (See threading.py:Thread.join().)
- # - We stub out the acquire method on the condition to force it to wait
- # until the child thread forks. (See LOCK ACQUIRED HERE)
- # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS
- # HERE)
- # - The main thread of the parent process enters Condition.wait(),
- # which releases the lock on the child thread.
- # - The child process returns. Without the necessary fix, when the
- # main thread of the child process (which used to be the child thread
- # in the parent process) attempts to exit, it will try to acquire the
- # lock in the Thread._block Condition object and hang, because the
- # lock was held across the fork.
-
- script = """if 1:
- import os, time, threading
-
- finish_join = False
- start_fork = False
-
- def worker():
- # Wait until this thread's lock is acquired before forking to
- # create the deadlock.
- global finish_join
- while not start_fork:
- time.sleep(0.01)
- # LOCK HELD: Main thread holds lock across this call.
- childpid = os.fork()
- finish_join = True
- if childpid != 0:
- # Parent process just waits for child.
- os.waitpid(childpid, 0)
- # Child process should just return.
-
- w = threading.Thread(target=worker)
-
- # Stub out the private condition variable's lock acquire method.
- # This acquires the lock and then waits until the child has forked
- # before returning, which will release the lock soon after. If
- # someone else tries to fix this test case by acquiring this lock
- # before forking instead of resetting it, the test case will
- # deadlock when it shouldn't.
- condition = w._block
- orig_acquire = condition.acquire
- call_count_lock = threading.Lock()
- call_count = 0
- def my_acquire():
- global call_count
- global start_fork
- orig_acquire() # LOCK ACQUIRED HERE
- start_fork = True
- if call_count == 0:
- while not finish_join:
- time.sleep(0.01) # WORKER THREAD FORKS HERE
- with call_count_lock:
- call_count += 1
- condition.acquire = my_acquire
-
- w.start()
- w.join()
- print('end of main')
- """
- self.assertScriptHasOutput(script, "end of main\n")
-
- @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
- @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_5_clear_waiter_locks_to_avoid_crash(self):
- # Check that a spawned thread that forks doesn't segfault on certain
- # platforms, namely OS X. This used to happen if there was a waiter
- # lock in the thread's condition variable's waiters list. Even though
- # we know the lock will be held across the fork, it is not safe to
- # release locks held across forks on all platforms, so releasing the
- # waiter lock caused a segfault on OS X. Furthermore, since locks on
- # OS X are (as of this writing) implemented with a mutex + condition
- # variable instead of a semaphore, while we know that the Python-level
- # lock will be acquired, we can't know if the internal mutex will be
- # acquired at the time of the fork.
-
- script = """if True:
- import os, time, threading
-
- start_fork = False
-
- def worker():
- # Wait until the main thread has attempted to join this thread
- # before continuing.
- while not start_fork:
- time.sleep(0.01)
- childpid = os.fork()
- if childpid != 0:
- # Parent process just waits for child.
- (cpid, rc) = os.waitpid(childpid, 0)
- assert cpid == childpid
- assert rc == 0
- print('end of worker thread')
- else:
- # Child process should just return.
- pass
-
- w = threading.Thread(target=worker)
-
- # Stub out the private condition variable's _release_save method.
- # This releases the condition's lock and flips the global that
- # causes the worker to fork. At this point, the problematic waiter
- # lock has been acquired once by the waiter and has been put onto
- # the waiters list.
- condition = w._block
- orig_release_save = condition._release_save
- def my_release_save():
- global start_fork
- orig_release_save()
- # Waiter lock held here, condition lock released.
- start_fork = True
- condition._release_save = my_release_save
-
- w.start()
- w.join()
- print('end of main thread')
- """
- output = "end of worker thread\nend of main thread\n"
- self.assertScriptHasOutput(script, output)
-
- @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_6_daemon_threads(self):
+ def test_4_daemon_threads(self):
# Check that a daemon thread cannot crash the interpreter on shutdown
# by manipulating internal structures that are being disposed of in
# the main thread.
@@ -717,6 +785,10 @@ class ThreadJoinOnShutdown(BaseTestCase):
import sys
import time
import threading
+ import warnings
+
+ # ignore "unclosed file ..." warnings
+ warnings.filterwarnings('ignore', '', ResourceWarning)
thread_has_run = set()
@@ -773,44 +845,110 @@ class ThreadJoinOnShutdown(BaseTestCase):
for t in threads:
t.join()
- @unittest.skipIf(_testcapi is None, "need _testcapi module")
- def test_frame_tstate_tracing(self):
- # Issue #14432: Crash when a generator is created in a C thread that is
- # destroyed while the generator is still used. The issue was that a
- # generator contains a frame, and the frame kept a reference to the
- # Python state of the destroyed C thread. The crash occurs when a trace
- # function is setup.
+ @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
+ def test_clear_threads_states_after_fork(self):
+ # Issue #17094: check that threads states are cleared after fork()
- def noop_trace(frame, event, arg):
- # no operation
- return noop_trace
+ # start a bunch of threads
+ threads = []
+ for i in range(16):
+ t = threading.Thread(target=lambda : time.sleep(0.3))
+ threads.append(t)
+ t.start()
- def generator():
- while 1:
- yield "genereator"
+ pid = os.fork()
+ if pid == 0:
+ # check that threads states have been cleared
+ if len(sys._current_frames()) == 1:
+ os._exit(0)
+ else:
+ os._exit(1)
+ else:
+ _, status = os.waitpid(pid, 0)
+ self.assertEqual(0, status)
- def callback():
- if callback.gen is None:
- callback.gen = generator()
- return next(callback.gen)
- callback.gen = None
+ for t in threads:
+ t.join()
- old_trace = sys.gettrace()
- sys.settrace(noop_trace)
- try:
- # Install a trace function
- threading.settrace(noop_trace)
- # Create a generator in a C thread which exits after the call
- _testcapi.call_in_temporary_c_thread(callback)
+class SubinterpThreadingTests(BaseTestCase):
- # Call the generator in a different Python thread, check that the
- # generator didn't keep a reference to the destroyed thread state
- for test in range(3):
- # The trace function is still called here
- callback()
- finally:
- sys.settrace(old_trace)
+ def test_threads_join(self):
+ # Non-daemon threads should be joined at subinterpreter shutdown
+ # (issue #18808)
+ r, w = os.pipe()
+ self.addCleanup(os.close, r)
+ self.addCleanup(os.close, w)
+ code = r"""if 1:
+ import os
+ import threading
+ import time
+
+ def f():
+ # Sleep a bit so that the thread is still running when
+ # Py_EndInterpreter is called.
+ time.sleep(0.05)
+ os.write(%d, b"x")
+ threading.Thread(target=f).start()
+ """ % (w,)
+ ret = test.support.run_in_subinterp(code)
+ self.assertEqual(ret, 0)
+ # The thread was joined properly.
+ self.assertEqual(os.read(r, 1), b"x")
+
+ def test_threads_join_2(self):
+ # Same as above, but a delay gets introduced after the thread's
+ # Python code returned but before the thread state is deleted.
+ # To achieve this, we register a thread-local object which sleeps
+ # a bit when deallocated.
+ r, w = os.pipe()
+ self.addCleanup(os.close, r)
+ self.addCleanup(os.close, w)
+ code = r"""if 1:
+ import os
+ import threading
+ import time
+
+ class Sleeper:
+ def __del__(self):
+ time.sleep(0.05)
+
+ tls = threading.local()
+
+ def f():
+ # Sleep a bit so that the thread is still running when
+ # Py_EndInterpreter is called.
+ time.sleep(0.05)
+ tls.x = Sleeper()
+ os.write(%d, b"x")
+ threading.Thread(target=f).start()
+ """ % (w,)
+ ret = test.support.run_in_subinterp(code)
+ self.assertEqual(ret, 0)
+ # The thread was joined properly.
+ self.assertEqual(os.read(r, 1), b"x")
+
+ def test_daemon_threads_fatal_error(self):
+ subinterp_code = r"""if 1:
+ import os
+ import threading
+ import time
+
+ def f():
+ # Make sure the daemon thread is still running when
+ # Py_EndInterpreter is called.
+ time.sleep(10)
+ threading.Thread(target=f, daemon=True).start()
+ """
+ script = r"""if 1:
+ import _testcapi
+
+ _testcapi.run_in_subinterp(%r)
+ """ % (subinterp_code,)
+ with test.support.SuppressCrashReport():
+ rc, out, err = assert_python_failure("-c", script)
+ self.assertIn("Fatal Python error: Py_EndInterpreter: "
+ "not the last thread", err.decode())
class ThreadingExceptionTests(BaseTestCase):