summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_threading.py')
-rw-r--r--Lib/test/test_threading.py288
1 files changed, 252 insertions, 36 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 054df7b..946c1d2 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -5,11 +5,13 @@ from test.test_support import verbose
import random
import re
import sys
-import threading
-import thread
+thread = test.test_support.import_module('thread')
+threading = test.test_support.import_module('threading')
import time
import unittest
import weakref
+import os
+import subprocess
from test import lock_tests
@@ -43,7 +45,7 @@ class TestThread(threading.Thread):
self.nrunning.inc()
if verbose:
print self.nrunning.get(), 'tasks are running'
- self.testcase.assert_(self.nrunning.get() <= 3)
+ self.testcase.assertTrue(self.nrunning.get() <= 3)
time.sleep(delay)
if verbose:
@@ -51,12 +53,21 @@ class TestThread(threading.Thread):
with self.mutex:
self.nrunning.dec()
- self.testcase.assert_(self.nrunning.get() >= 0)
+ self.testcase.assertTrue(self.nrunning.get() >= 0)
if verbose:
print '%s is finished. %d tasks are running' % (
self.name, self.nrunning.get())
-class ThreadTests(unittest.TestCase):
+class BaseTestCase(unittest.TestCase):
+ def setUp(self):
+ self._threads = test.test_support.threading_setup()
+
+ def tearDown(self):
+ test.test_support.threading_cleanup(*self._threads)
+ test.test_support.reap_children()
+
+
+class ThreadTests(BaseTestCase):
# Create a bunch of threads, let each do some work, wait until all are
# done.
@@ -75,18 +86,18 @@ class ThreadTests(unittest.TestCase):
for i in range(NUMTASKS):
t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
threads.append(t)
- self.failUnlessEqual(t.ident, None)
- self.assert_(re.match('<TestThread\(.*, initial\)>', repr(t)))
+ self.assertEqual(t.ident, None)
+ self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
t.start()
if verbose:
print 'waiting for all tasks to complete'
for t in threads:
t.join(NUMTASKS)
- self.assert_(not t.is_alive())
- self.failIfEqual(t.ident, 0)
+ self.assertTrue(not t.is_alive())
+ self.assertNotEqual(t.ident, 0)
self.assertFalse(t.ident is None)
- self.assert_(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
+ self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
if verbose:
print 'all tasks done'
self.assertEqual(numrunning.get(), 0)
@@ -144,9 +155,8 @@ class ThreadTests(unittest.TestCase):
tid = thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
- self.assert_(tid in threading._active)
- self.assert_(isinstance(threading._active[tid],
- threading._DummyThread))
+ self.assertIn(tid, threading._active)
+ self.assertIsInstance(threading._active[tid], threading._DummyThread)
del threading._active[tid]
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
@@ -166,6 +176,27 @@ class ThreadTests(unittest.TestCase):
exception = ctypes.py_object(AsyncExc)
+ # First check it works when setting the exception from the same thread.
+ tid = thread.get_ident()
+
+ try:
+ result = set_async_exc(ctypes.c_long(tid), exception)
+ # The exception is async, so we might have to keep the VM busy until
+ # it notices.
+ while True:
+ pass
+ except AsyncExc:
+ pass
+ else:
+ # This code is unreachable but it reflects the intent. If we wanted
+ # to be smarter the above loop wouldn't be infinite.
+ self.fail("AsyncExc not raised")
+ try:
+ self.assertEqual(result, 1) # one thread state modified
+ except UnboundLocalError:
+ # The exception was raised too quickly for us to get the result.
+ pass
+
# `worker_started` is set by the thread when it's inside a try/except
# block waiting to catch the asynchronously set AsyncExc exception.
# `worker_saw_exception` is set by the thread upon catching that
@@ -201,10 +232,11 @@ class ThreadTests(unittest.TestCase):
# Now raise an exception in the worker thread.
if verbose:
print " waiting for worker thread to get started"
- worker_started.wait()
+ ret = worker_started.wait()
+ self.assertTrue(ret)
if verbose:
print " verifying worker hasn't exited"
- self.assert_(not t.finished)
+ self.assertTrue(not t.finished)
if verbose:
print " attempting to raise asynch exception in worker"
result = set_async_exc(ctypes.c_long(t.id), exception)
@@ -212,7 +244,7 @@ class ThreadTests(unittest.TestCase):
if verbose:
print " waiting for worker to say it caught the exception"
worker_saw_exception.wait(timeout=10)
- self.assert_(t.finished)
+ self.assertTrue(t.finished)
if verbose:
print " all OK -- joining worker"
if t.finished:
@@ -245,7 +277,6 @@ class ThreadTests(unittest.TestCase):
print("test_finalize_with_runnning_thread can't import ctypes")
return # can't do anything
- import subprocess
rc = subprocess.call([sys.executable, "-c", """if 1:
import ctypes, sys, time, thread
@@ -276,8 +307,7 @@ class ThreadTests(unittest.TestCase):
def test_finalize_with_trace(self):
# Issue1733757
# Avoid a deadlock when sys.settrace steps into threading._shutdown
- import subprocess
- rc = subprocess.call([sys.executable, "-c", """if 1:
+ p = subprocess.Popen([sys.executable, "-c", """if 1:
import sys, threading
# A deadlock-killer, to prevent the
@@ -297,14 +327,20 @@ class ThreadTests(unittest.TestCase):
return func
sys.settrace(func)
- """])
- self.failIf(rc == 2, "interpreted was blocked")
- self.failUnless(rc == 0, "Unexpected error")
+ """],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ self.addCleanup(p.stdout.close)
+ self.addCleanup(p.stderr.close)
+ stdout, stderr = p.communicate()
+ rc = p.returncode
+ self.assertFalse(rc == 2, "interpreted was blocked")
+ self.assertTrue(rc == 0,
+ "Unexpected error: " + repr(stderr))
def test_join_nondaemon_on_shutdown(self):
# Issue 1722344
# Raising SystemExit skipped threading._shutdown
- import subprocess
p = subprocess.Popen([sys.executable, "-c", """if 1:
import threading
from time import sleep
@@ -320,6 +356,8 @@ class ThreadTests(unittest.TestCase):
"""],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
+ self.addCleanup(p.stdout.close)
+ self.addCleanup(p.stderr.close)
stdout, stderr = p.communicate()
self.assertEqual(stdout.strip(),
"Woke up, sleep function is: <built-in function sleep>")
@@ -340,7 +378,7 @@ class ThreadTests(unittest.TestCase):
t.start()
t.join()
l = enum()
- self.assertFalse(t in l,
+ self.assertNotIn(t, l,
"#1703448 triggered after %d trials: %s" % (i, l))
finally:
sys.setcheckinterval(old_interval)
@@ -364,20 +402,20 @@ class ThreadTests(unittest.TestCase):
weak_cyclic_object = weakref.ref(cyclic_object)
cyclic_object.thread.join()
del cyclic_object
- self.assertEquals(None, weak_cyclic_object(),
- msg=('%d references still around' %
- sys.getrefcount(weak_cyclic_object())))
+ self.assertEqual(None, weak_cyclic_object(),
+ msg=('%d references still around' %
+ sys.getrefcount(weak_cyclic_object())))
raising_cyclic_object = RunSelfFunction(should_raise=True)
weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
raising_cyclic_object.thread.join()
del raising_cyclic_object
- self.assertEquals(None, weak_raising_cyclic_object(),
- msg=('%d references still around' %
- sys.getrefcount(weak_raising_cyclic_object())))
+ self.assertEqual(None, weak_raising_cyclic_object(),
+ msg=('%d references still around' %
+ sys.getrefcount(weak_raising_cyclic_object())))
-class ThreadJoinOnShutdown(unittest.TestCase):
+class ThreadJoinOnShutdown(BaseTestCase):
def _run_and_join(self, script):
script = """if 1:
@@ -389,13 +427,13 @@ class ThreadJoinOnShutdown(unittest.TestCase):
print 'end of thread'
\n""" + script
- import subprocess
p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
rc = p.wait()
data = p.stdout.read().replace('\r', '')
+ p.stdout.close()
self.assertEqual(data, "end of main\nend of thread\n")
- self.failIf(rc == 2, "interpreter was blocked")
- self.failUnless(rc == 0, "Unexpected error")
+ self.assertFalse(rc == 2, "interpreter was blocked")
+ self.assertTrue(rc == 0, "Unexpected error")
def test_1_join_on_shutdown(self):
# The usual case: on exit, wait for a non-daemon thread
@@ -436,7 +474,8 @@ class ThreadJoinOnShutdown(unittest.TestCase):
return
# Skip platforms with known problems forking from a worker thread.
# See http://bugs.python.org/issue3863.
- if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
+ if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
+ 'os2emx'):
print >>sys.stderr, ('Skipping test_3_join_in_forked_from_thread'
' due to known OS bugs on'), sys.platform
return
@@ -459,8 +498,154 @@ class ThreadJoinOnShutdown(unittest.TestCase):
"""
self._run_and_join(script)
+ def assertScriptHasOutput(self, script, expected_output):
+ p = subprocess.Popen([sys.executable, "-c", script],
+ stdout=subprocess.PIPE)
+ rc = p.wait()
+ data = p.stdout.read().decode().replace('\r', '')
+ self.assertEqual(rc, 0, "Unexpected error")
+ self.assertEqual(data, expected_output)
+
+ @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
+ def test_4_joining_across_fork_in_worker_thread(self):
+ # There used to be a possible deadlock when forking from a child
+ # thread. See http://bugs.python.org/issue6643.
+
+ # Skip platforms with known problems forking from a worker thread.
+ # See http://bugs.python.org/issue3863.
+ if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
+ raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
+
+ # The script takes the following steps:
+ # - The main thread in the parent process starts a new thread and then
+ # tries to join it.
+ # - The join operation acquires the Lock inside the thread's _block
+ # Condition. (See threading.py:Thread.join().)
+ # - We stub out the acquire method on the condition to force it to wait
+ # until the child thread forks. (See LOCK ACQUIRED HERE)
+ # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS
+ # HERE)
+ # - The main thread of the parent process enters Condition.wait(),
+ # which releases the lock on the child thread.
+ # - The child process returns. Without the necessary fix, when the
+ # main thread of the child process (which used to be the child thread
+ # in the parent process) attempts to exit, it will try to acquire the
+ # lock in the Thread._block Condition object and hang, because the
+ # lock was held across the fork.
+
+ script = """if 1:
+ import os, time, threading
+
+ finish_join = False
+ start_fork = False
+
+ def worker():
+ # Wait until this thread's lock is acquired before forking to
+ # create the deadlock.
+ global finish_join
+ while not start_fork:
+ time.sleep(0.01)
+ # LOCK HELD: Main thread holds lock across this call.
+ childpid = os.fork()
+ finish_join = True
+ if childpid != 0:
+ # Parent process just waits for child.
+ os.waitpid(childpid, 0)
+ # Child process should just return.
+
+ w = threading.Thread(target=worker)
+
+ # Stub out the private condition variable's lock acquire method.
+ # This acquires the lock and then waits until the child has forked
+ # before returning, which will release the lock soon after. If
+ # someone else tries to fix this test case by acquiring this lock
+ # before forking instead of resetting it, the test case will
+ # deadlock when it shouldn't.
+ condition = w._block
+ orig_acquire = condition.acquire
+ call_count_lock = threading.Lock()
+ call_count = 0
+ def my_acquire():
+ global call_count
+ global start_fork
+ orig_acquire() # LOCK ACQUIRED HERE
+ start_fork = True
+ if call_count == 0:
+ while not finish_join:
+ time.sleep(0.01) # WORKER THREAD FORKS HERE
+ with call_count_lock:
+ call_count += 1
+ condition.acquire = my_acquire
+
+ w.start()
+ w.join()
+ print('end of main')
+ """
+ self.assertScriptHasOutput(script, "end of main\n")
+
+ @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
+ def test_5_clear_waiter_locks_to_avoid_crash(self):
+ # Check that a spawned thread that forks doesn't segfault on certain
+ # platforms, namely OS X. This used to happen if there was a waiter
+ # lock in the thread's condition variable's waiters list. Even though
+ # we know the lock will be held across the fork, it is not safe to
+ # release locks held across forks on all platforms, so releasing the
+ # waiter lock caused a segfault on OS X. Furthermore, since locks on
+ # OS X are (as of this writing) implemented with a mutex + condition
+ # variable instead of a semaphore, while we know that the Python-level
+ # lock will be acquired, we can't know if the internal mutex will be
+ # acquired at the time of the fork.
+
+ # Skip platforms with known problems forking from a worker thread.
+ # See http://bugs.python.org/issue3863.
+ if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'):
+ raise unittest.SkipTest('due to known OS bugs on ' + sys.platform)
+ script = """if True:
+ import os, time, threading
+
+ start_fork = False
+
+ def worker():
+ # Wait until the main thread has attempted to join this thread
+ # before continuing.
+ while not start_fork:
+ time.sleep(0.01)
+ childpid = os.fork()
+ if childpid != 0:
+ # Parent process just waits for child.
+ (cpid, rc) = os.waitpid(childpid, 0)
+ assert cpid == childpid
+ assert rc == 0
+ print('end of worker thread')
+ else:
+ # Child process should just return.
+ pass
+
+ w = threading.Thread(target=worker)
+
+ # Stub out the private condition variable's _release_save method.
+ # This releases the condition's lock and flips the global that
+ # causes the worker to fork. At this point, the problematic waiter
+ # lock has been acquired once by the waiter and has been put onto
+ # the waiters list.
+ condition = w._block
+ orig_release_save = condition._release_save
+ def my_release_save():
+ global start_fork
+ orig_release_save()
+ # Waiter lock held here, condition lock released.
+ start_fork = True
+ condition._release_save = my_release_save
+
+ w.start()
+ w.join()
+ print('end of main thread')
+ """
+ output = "end of worker thread\nend of main thread\n"
+ self.assertScriptHasOutput(script, output)
+
-class ThreadingExceptionTests(unittest.TestCase):
+class ThreadingExceptionTests(BaseTestCase):
# A RuntimeError should be raised if Thread.start() is called
# multiple times.
def test_start_thread_again(self):
@@ -504,6 +689,37 @@ class SemaphoreTests(lock_tests.SemaphoreTests):
class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
semtype = staticmethod(threading.BoundedSemaphore)
+ @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
+ def test_recursion_limit(self):
+ # Issue 9670
+ # test that excessive recursion within a non-main thread causes
+ # an exception rather than crashing the interpreter on platforms
+ # like Mac OS X or FreeBSD which have small default stack sizes
+ # for threads
+ script = """if True:
+ import threading
+
+ def recurse():
+ return recurse()
+
+ def outer():
+ try:
+ recurse()
+ except RuntimeError:
+ pass
+
+ w = threading.Thread(target=outer)
+ w.start()
+ w.join()
+ print('end of main thread')
+ """
+ expected_output = "end of main thread\n"
+ p = subprocess.Popen([sys.executable, "-c", script],
+ stdout=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ data = stdout.decode().replace('\r', '')
+ self.assertEqual(p.returncode, 0, "Unexpected error")
+ self.assertEqual(data, expected_output)
def test_main():
test.test_support.run_unittest(LockTests, RLockTests, EventTests,