summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_threading.py')
-rw-r--r--Lib/test/test_threading.py333
1 files changed, 216 insertions, 117 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 63ef7b9..17be84b 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -1,16 +1,19 @@
# Very rudimentary test of threading module
import test.support
-from test.support import verbose
-import os
+from test.support import verbose, strip_python_stderr, import_module
+from test.script_helper import assert_python_ok
+
import random
import re
import sys
-import threading
-import _thread
+_thread = import_module('_thread')
+threading = import_module('threading')
import time
import unittest
import weakref
+import os
+from test.script_helper import assert_python_ok, assert_python_failure
import subprocess
from test import lock_tests
@@ -59,7 +62,16 @@ class TestThread(threading.Thread):
(self.name, self.nrunning.get()))
-class ThreadTests(unittest.TestCase):
+class BaseTestCase(unittest.TestCase):
+ def setUp(self):
+ self._threads = test.support.threading_setup()
+
+ def tearDown(self):
+ test.support.threading_cleanup(*self._threads)
+ test.support.reap_children()
+
+
+class ThreadTests(BaseTestCase):
# Create a bunch of threads, let each do some work, wait until all are
# done.
@@ -89,7 +101,8 @@ class ThreadTests(unittest.TestCase):
self.assertTrue(not t.is_alive())
self.assertNotEqual(t.ident, 0)
self.assertFalse(t.ident is None)
- self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
+ self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>',
+ repr(t)))
if verbose:
print('all tasks done')
self.assertEqual(numrunning.get(), 0)
@@ -115,9 +128,8 @@ class ThreadTests(unittest.TestCase):
try:
threading.stack_size(262144)
except _thread.error:
- if verbose:
- print('platform does not support changing thread stack size')
- return
+ raise unittest.SkipTest(
+ 'platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
@@ -128,9 +140,8 @@ class ThreadTests(unittest.TestCase):
try:
threading.stack_size(0x100000)
except _thread.error:
- if verbose:
- print('platform does not support changing thread stack size')
- return
+ raise unittest.SkipTest(
+ 'platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
@@ -147,20 +158,14 @@ class ThreadTests(unittest.TestCase):
tid = _thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
- self.assertTrue(tid in threading._active)
- self.assertTrue(isinstance(threading._active[tid],
- threading._DummyThread))
+ self.assertIn(tid, threading._active)
+ self.assertIsInstance(threading._active[tid], threading._DummyThread)
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
# exposed at the Python level. This test relies on ctypes to get at it.
def test_PyThreadState_SetAsyncExc(self):
- try:
- import ctypes
- except ImportError:
- if verbose:
- print("test_PyThreadState_SetAsyncExc can't import ctypes")
- return # can't do anything
+ ctypes = import_module("ctypes")
set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
@@ -169,6 +174,27 @@ class ThreadTests(unittest.TestCase):
exception = ctypes.py_object(AsyncExc)
+ # First check it works when setting the exception from the same thread.
+ tid = _thread.get_ident()
+
+ try:
+ result = set_async_exc(ctypes.c_long(tid), exception)
+ # The exception is async, so we might have to keep the VM busy until
+ # it notices.
+ while True:
+ pass
+ except AsyncExc:
+ pass
+ else:
+ # This code is unreachable but it reflects the intent. If we wanted
+ # to be smarter the above loop wouldn't be infinite.
+ self.fail("AsyncExc not raised")
+ try:
+ self.assertEqual(result, 1) # one thread state modified
+ except UnboundLocalError:
+ # The exception was raised too quickly for us to get the result.
+ pass
+
# `worker_started` is set by the thread when it's inside a try/except
# block waiting to catch the asynchronously set AsyncExc exception.
# `worker_saw_exception` is set by the thread upon catching that
@@ -242,14 +268,9 @@ class ThreadTests(unittest.TestCase):
# Issue 1402: the PyGILState_Ensure / _Release functions may be called
# very late on python exit: on deallocation of a running thread for
# example.
- try:
- import ctypes
- except ImportError:
- if verbose:
- print("test_finalize_with_runnning_thread can't import ctypes")
- return # can't do anything
+ import_module("ctypes")
- rc = subprocess.call([sys.executable, "-c", """if 1:
+ rc, out, err = assert_python_failure("-c", """if 1:
import ctypes, sys, time, _thread
# This lock is used as a simple event variable.
@@ -273,13 +294,13 @@ class ThreadTests(unittest.TestCase):
_thread.start_new_thread(waitingThread, ())
ready.acquire() # Be sure the other thread is waiting.
sys.exit(42)
- """])
+ """)
self.assertEqual(rc, 42)
def test_finalize_with_trace(self):
# Issue1733757
# Avoid a deadlock when sys.settrace steps into threading._shutdown
- p = subprocess.Popen([sys.executable, "-c", """if 1:
+ assert_python_ok("-c", """if 1:
import sys, threading
# A deadlock-killer, to prevent the
@@ -299,19 +320,12 @@ class ThreadTests(unittest.TestCase):
return func
sys.settrace(func)
- """],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- rc = p.returncode
- self.assertFalse(rc == 2, "interpreted was blocked")
- self.assertTrue(rc == 0,
- "Unexpected error: " + ascii(stderr))
+ """)
def test_join_nondaemon_on_shutdown(self):
# Issue 1722344
# Raising SystemExit skipped threading._shutdown
- p = subprocess.Popen([sys.executable, "-c", """if 1:
+ rc, out, err = assert_python_ok("-c", """if 1:
import threading
from time import sleep
@@ -323,33 +337,27 @@ class ThreadTests(unittest.TestCase):
threading.Thread(target=child).start()
raise SystemExit
- """],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- self.assertEqual(stdout.strip(),
+ """)
+ self.assertEqual(out.strip(),
b"Woke up, sleep function is: <built-in function sleep>")
- stderr = re.sub(br"^\[\d+ refs\]", b"", stderr, re.MULTILINE).strip()
- self.assertEqual(stderr, b"")
+ self.assertEqual(err, b"")
def test_enumerate_after_join(self):
# Try hard to trigger #1703448: a thread is still returned in
# threading.enumerate() after it has been join()ed.
enum = threading.enumerate
- old_interval = sys.getcheckinterval()
+ old_interval = sys.getswitchinterval()
try:
for i in range(1, 100):
- # Try a couple times at each thread-switching interval
- # to get more interleavings.
- sys.setcheckinterval(i // 5)
+ sys.setswitchinterval(i * 0.0002)
t = threading.Thread(target=lambda: None)
t.start()
t.join()
l = enum()
- self.assertFalse(t in l,
+ self.assertNotIn(t, l,
"#1703448 triggered after %d trials: %s" % (i, l))
finally:
- sys.setcheckinterval(old_interval)
+ sys.setswitchinterval(old_interval)
def test_no_refcycle_through_target(self):
class RunSelfFunction(object):
@@ -370,7 +378,7 @@ class ThreadTests(unittest.TestCase):
weak_cyclic_object = weakref.ref(cyclic_object)
cyclic_object.thread.join()
del cyclic_object
- self.assertEqual(None, weak_cyclic_object(),
+ self.assertIsNone(weak_cyclic_object(),
msg=('%d references still around' %
sys.getrefcount(weak_cyclic_object())))
@@ -378,7 +386,7 @@ class ThreadTests(unittest.TestCase):
weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
raising_cyclic_object.thread.join()
del raising_cyclic_object
- self.assertEqual(None, weak_raising_cyclic_object(),
+ self.assertIsNone(weak_raising_cyclic_object(),
msg=('%d references still around' %
sys.getrefcount(weak_raising_cyclic_object())))
@@ -395,8 +403,48 @@ class ThreadTests(unittest.TestCase):
e.isSet()
threading.activeCount()
-
-class ThreadJoinOnShutdown(unittest.TestCase):
+ def test_repr_daemon(self):
+ t = threading.Thread()
+ self.assertFalse('daemon' in repr(t))
+ t.daemon = True
+ self.assertTrue('daemon' in repr(t))
+
+ @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
+ def test_dummy_thread_after_fork(self):
+ # Issue #14308: a dummy thread in the active list doesn't mess up
+ # the after-fork mechanism.
+ code = """if 1:
+ import _thread, threading, os, time
+
+ def background_thread(evt):
+ # Creates and registers the _DummyThread instance
+ threading.current_thread()
+ evt.set()
+ time.sleep(10)
+
+ evt = threading.Event()
+ _thread.start_new_thread(background_thread, (evt,))
+ evt.wait()
+ assert threading.active_count() == 2, threading.active_count()
+ if os.fork() == 0:
+ assert threading.active_count() == 1, threading.active_count()
+ os._exit(0)
+ else:
+ os.wait()
+ """
+ _, out, err = assert_python_ok("-c", code)
+ self.assertEqual(out, b'')
+ self.assertEqual(err, b'')
+
+
+class ThreadJoinOnShutdown(BaseTestCase):
+
+ # Between fork() and exec(), only async-safe functions are allowed (issues
+ # #12316 and #11870), and fork() from a worker thread is known to trigger
+ # problems with some operating systems (issue #3863): skip problematic tests
+ # on platforms known to behave badly.
+ platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
+ 'os2emx')
def _run_and_join(self, script):
script = """if 1:
@@ -411,13 +459,9 @@ class ThreadJoinOnShutdown(unittest.TestCase):
sys.stdout.flush()
\n""" + script
- p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
- rc = p.wait()
- data = p.stdout.read().decode().replace('\r', '')
- p.stdout.close()
+ rc, out, err = assert_python_ok("-c", script)
+ data = out.decode().replace('\r', '')
self.assertEqual(data, "end of main\nend of thread\n")
- self.assertFalse(rc == 2, "interpreter was blocked")
- self.assertTrue(rc == 0, "Unexpected error")
def test_1_join_on_shutdown(self):
# The usual case: on exit, wait for a non-daemon thread
@@ -431,12 +475,10 @@ class ThreadJoinOnShutdown(unittest.TestCase):
"""
self._run_and_join(script)
-
+ @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
+ @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
def test_2_join_in_forked_process(self):
# Like the test above, but from a forked interpreter
- import os
- if not hasattr(os, 'fork'):
- return
script = """if 1:
childpid = os.fork()
if childpid != 0:
@@ -450,19 +492,12 @@ class ThreadJoinOnShutdown(unittest.TestCase):
"""
self._run_and_join(script)
+ @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
+ @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
def test_3_join_in_forked_from_thread(self):
# Like the test above, but fork() was called from a worker thread
# In the forked process, the main Thread object must be marked as stopped.
- import os
- if not hasattr(os, 'fork'):
- return
- # Skip platforms with known problems forking from a worker thread.
- # See http://bugs.python.org/issue3863.
- if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
- 'os2emx'):
- print('Skipping test_3_join_in_forked_from_thread'
- ' due to known OS bugs on', sys.platform, file=sys.stderr)
- return
+
script = """if 1:
main_thread = threading.current_thread()
def worker():
@@ -483,23 +518,16 @@ class ThreadJoinOnShutdown(unittest.TestCase):
self._run_and_join(script)
def assertScriptHasOutput(self, script, expected_output):
- p = subprocess.Popen([sys.executable, "-c", script],
- stdout=subprocess.PIPE)
- rc = p.wait()
- data = p.stdout.read().decode().replace('\r', '')
- self.assertEqual(rc, 0, "Unexpected error")
+ rc, out, err = assert_python_ok("-c", script)
+ data = out.decode().replace('\r', '')
self.assertEqual(data, expected_output)
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
+ @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
def test_4_joining_across_fork_in_worker_thread(self):
# There used to be a possible deadlock when forking from a child
# thread. See http://bugs.python.org/issue6643.
- # Skip platforms with known problems forking from a worker thread.
- # See http://bugs.python.org/issue3863.
- if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
- raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
-
# The script takes the following steps:
# - The main thread in the parent process starts a new thread and then
# tries to join it.
@@ -568,6 +596,7 @@ class ThreadJoinOnShutdown(unittest.TestCase):
self.assertScriptHasOutput(script, "end of main\n")
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
+ @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
def test_5_clear_waiter_locks_to_avoid_crash(self):
# Check that a spawned thread that forks doesn't segfault on certain
# platforms, namely OS X. This used to happen if there was a waiter
@@ -580,10 +609,6 @@ class ThreadJoinOnShutdown(unittest.TestCase):
# lock will be acquired, we can't know if the internal mutex will be
# acquired at the time of the fork.
- # Skip platforms with known problems forking from a worker thread.
- # See http://bugs.python.org/issue3863.
- if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
- raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
script = """if True:
import os, time, threading
@@ -628,8 +653,75 @@ class ThreadJoinOnShutdown(unittest.TestCase):
output = "end of worker thread\nend of main thread\n"
self.assertScriptHasOutput(script, output)
+ @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
+ def test_6_daemon_threads(self):
+ # Check that a daemon thread cannot crash the interpreter on shutdown
+ # by manipulating internal structures that are being disposed of in
+ # the main thread.
+ script = """if True:
+ import os
+ import random
+ import sys
+ import time
+ import threading
+
+ thread_has_run = set()
+
+ def random_io():
+ '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
+ while True:
+ in_f = open(os.__file__, 'rb')
+ stuff = in_f.read(200)
+ null_f = open(os.devnull, 'wb')
+ null_f.write(stuff)
+ time.sleep(random.random() / 1995)
+ null_f.close()
+ in_f.close()
+ thread_has_run.add(threading.current_thread())
+
+ def main():
+ count = 0
+ for _ in range(40):
+ new_thread = threading.Thread(target=random_io)
+ new_thread.daemon = True
+ new_thread.start()
+ count += 1
+ while len(thread_has_run) < count:
+ time.sleep(0.001)
+ # Trigger process shutdown
+ sys.exit(0)
+
+ main()
+ """
+ rc, out, err = assert_python_ok('-c', script)
+ self.assertFalse(err)
+
+ @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
+ @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
+ def test_reinit_tls_after_fork(self):
+ # Issue #13817: fork() would deadlock in a multithreaded program with
+ # the ad-hoc TLS implementation.
+
+ def do_fork_and_wait():
+ # just fork a child process and wait it
+ pid = os.fork()
+ if pid > 0:
+ os.waitpid(pid, 0)
+ else:
+ os._exit(0)
+
+ # start a bunch of threads that will fork() child processes
+ threads = []
+ for i in range(16):
+ t = threading.Thread(target=do_fork_and_wait)
+ threads.append(t)
+ t.start()
+
+ for t in threads:
+ t.join()
+
-class ThreadingExceptionTests(unittest.TestCase):
+class ThreadingExceptionTests(BaseTestCase):
# A RuntimeError should be raised if Thread.start() is called
# multiple times.
def test_start_thread_again(self):
@@ -650,29 +742,6 @@ class ThreadingExceptionTests(unittest.TestCase):
thread.start()
self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
-
-class LockTests(lock_tests.LockTests):
- locktype = staticmethod(threading.Lock)
-
-class RLockTests(lock_tests.RLockTests):
- locktype = staticmethod(threading.RLock)
-
-class EventTests(lock_tests.EventTests):
- eventtype = staticmethod(threading.Event)
-
-class ConditionAsRLockTests(lock_tests.RLockTests):
- # An Condition uses an RLock by default and exports its API.
- locktype = staticmethod(threading.Condition)
-
-class ConditionTests(lock_tests.ConditionTests):
- condtype = staticmethod(threading.Condition)
-
-class SemaphoreTests(lock_tests.SemaphoreTests):
- semtype = staticmethod(threading.Semaphore)
-
-class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
- semtype = staticmethod(threading.BoundedSemaphore)
-
@unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
def test_recursion_limit(self):
# Issue 9670
@@ -699,19 +768,49 @@ class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
"""
expected_output = "end of main thread\n"
p = subprocess.Popen([sys.executable, "-c", script],
- stdout=subprocess.PIPE)
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
data = stdout.decode().replace('\r', '')
- self.assertEqual(p.returncode, 0, "Unexpected error")
+ self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
self.assertEqual(data, expected_output)
+class LockTests(lock_tests.LockTests):
+ locktype = staticmethod(threading.Lock)
+
+class PyRLockTests(lock_tests.RLockTests):
+ locktype = staticmethod(threading._PyRLock)
+
+@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
+class CRLockTests(lock_tests.RLockTests):
+ locktype = staticmethod(threading._CRLock)
+
+class EventTests(lock_tests.EventTests):
+ eventtype = staticmethod(threading.Event)
+
+class ConditionAsRLockTests(lock_tests.RLockTests):
+ # An Condition uses an RLock by default and exports its API.
+ locktype = staticmethod(threading.Condition)
+
+class ConditionTests(lock_tests.ConditionTests):
+ condtype = staticmethod(threading.Condition)
+
+class SemaphoreTests(lock_tests.SemaphoreTests):
+ semtype = staticmethod(threading.Semaphore)
+
+class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
+ semtype = staticmethod(threading.BoundedSemaphore)
+
+class BarrierTests(lock_tests.BarrierTests):
+ barriertype = staticmethod(threading.Barrier)
+
def test_main():
- test.support.run_unittest(LockTests, RLockTests, EventTests,
+ test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests,
ConditionAsRLockTests, ConditionTests,
SemaphoreTests, BoundedSemaphoreTests,
ThreadTests,
ThreadJoinOnShutdown,
ThreadingExceptionTests,
+ BarrierTests
)
if __name__ == "__main__":