diff options
Diffstat (limited to 'Lib/test/test_threading.py')
| -rw-r--r-- | Lib/test/test_threading.py | 288 |
1 files changed, 252 insertions, 36 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 054df7b..946c1d2 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -5,11 +5,13 @@ from test.test_support import verbose import random import re import sys -import threading -import thread +thread = test.test_support.import_module('thread') +threading = test.test_support.import_module('threading') import time import unittest import weakref +import os +import subprocess from test import lock_tests @@ -43,7 +45,7 @@ class TestThread(threading.Thread): self.nrunning.inc() if verbose: print self.nrunning.get(), 'tasks are running' - self.testcase.assert_(self.nrunning.get() <= 3) + self.testcase.assertTrue(self.nrunning.get() <= 3) time.sleep(delay) if verbose: @@ -51,12 +53,21 @@ class TestThread(threading.Thread): with self.mutex: self.nrunning.dec() - self.testcase.assert_(self.nrunning.get() >= 0) + self.testcase.assertTrue(self.nrunning.get() >= 0) if verbose: print '%s is finished. %d tasks are running' % ( self.name, self.nrunning.get()) -class ThreadTests(unittest.TestCase): +class BaseTestCase(unittest.TestCase): + def setUp(self): + self._threads = test.test_support.threading_setup() + + def tearDown(self): + test.test_support.threading_cleanup(*self._threads) + test.test_support.reap_children() + + +class ThreadTests(BaseTestCase): # Create a bunch of threads, let each do some work, wait until all are # done. @@ -75,18 +86,18 @@ class ThreadTests(unittest.TestCase): for i in range(NUMTASKS): t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) threads.append(t) - self.failUnlessEqual(t.ident, None) - self.assert_(re.match('<TestThread\(.*, initial\)>', repr(t))) + self.assertEqual(t.ident, None) + self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t))) t.start() if verbose: print 'waiting for all tasks to complete' for t in threads: t.join(NUMTASKS) - self.assert_(not t.is_alive()) - self.failIfEqual(t.ident, 0) + self.assertTrue(not t.is_alive()) + self.assertNotEqual(t.ident, 0) self.assertFalse(t.ident is None) - self.assert_(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t))) + self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t))) if verbose: print 'all tasks done' self.assertEqual(numrunning.get(), 0) @@ -144,9 +155,8 @@ class ThreadTests(unittest.TestCase): tid = thread.start_new_thread(f, (mutex,)) # Wait for the thread to finish. mutex.acquire() - self.assert_(tid in threading._active) - self.assert_(isinstance(threading._active[tid], - threading._DummyThread)) + self.assertIn(tid, threading._active) + self.assertIsInstance(threading._active[tid], threading._DummyThread) del threading._active[tid] # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) @@ -166,6 +176,27 @@ class ThreadTests(unittest.TestCase): exception = ctypes.py_object(AsyncExc) + # First check it works when setting the exception from the same thread. + tid = thread.get_ident() + + try: + result = set_async_exc(ctypes.c_long(tid), exception) + # The exception is async, so we might have to keep the VM busy until + # it notices. + while True: + pass + except AsyncExc: + pass + else: + # This code is unreachable but it reflects the intent. If we wanted + # to be smarter the above loop wouldn't be infinite. + self.fail("AsyncExc not raised") + try: + self.assertEqual(result, 1) # one thread state modified + except UnboundLocalError: + # The exception was raised too quickly for us to get the result. + pass + # `worker_started` is set by the thread when it's inside a try/except # block waiting to catch the asynchronously set AsyncExc exception. # `worker_saw_exception` is set by the thread upon catching that @@ -201,10 +232,11 @@ class ThreadTests(unittest.TestCase): # Now raise an exception in the worker thread. if verbose: print " waiting for worker thread to get started" - worker_started.wait() + ret = worker_started.wait() + self.assertTrue(ret) if verbose: print " verifying worker hasn't exited" - self.assert_(not t.finished) + self.assertTrue(not t.finished) if verbose: print " attempting to raise asynch exception in worker" result = set_async_exc(ctypes.c_long(t.id), exception) @@ -212,7 +244,7 @@ class ThreadTests(unittest.TestCase): if verbose: print " waiting for worker to say it caught the exception" worker_saw_exception.wait(timeout=10) - self.assert_(t.finished) + self.assertTrue(t.finished) if verbose: print " all OK -- joining worker" if t.finished: @@ -245,7 +277,6 @@ class ThreadTests(unittest.TestCase): print("test_finalize_with_runnning_thread can't import ctypes") return # can't do anything - import subprocess rc = subprocess.call([sys.executable, "-c", """if 1: import ctypes, sys, time, thread @@ -276,8 +307,7 @@ class ThreadTests(unittest.TestCase): def test_finalize_with_trace(self): # Issue1733757 # Avoid a deadlock when sys.settrace steps into threading._shutdown - import subprocess - rc = subprocess.call([sys.executable, "-c", """if 1: + p = subprocess.Popen([sys.executable, "-c", """if 1: import sys, threading # A deadlock-killer, to prevent the @@ -297,14 +327,20 @@ class ThreadTests(unittest.TestCase): return func sys.settrace(func) - """]) - self.failIf(rc == 2, "interpreted was blocked") - self.failUnless(rc == 0, "Unexpected error") + """], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stderr.close) + stdout, stderr = p.communicate() + rc = p.returncode + self.assertFalse(rc == 2, "interpreted was blocked") + self.assertTrue(rc == 0, + "Unexpected error: " + repr(stderr)) def test_join_nondaemon_on_shutdown(self): # Issue 1722344 # Raising SystemExit skipped threading._shutdown - import subprocess p = subprocess.Popen([sys.executable, "-c", """if 1: import threading from time import sleep @@ -320,6 +356,8 @@ class ThreadTests(unittest.TestCase): """], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stderr.close) stdout, stderr = p.communicate() self.assertEqual(stdout.strip(), "Woke up, sleep function is: <built-in function sleep>") @@ -340,7 +378,7 @@ class ThreadTests(unittest.TestCase): t.start() t.join() l = enum() - self.assertFalse(t in l, + self.assertNotIn(t, l, "#1703448 triggered after %d trials: %s" % (i, l)) finally: sys.setcheckinterval(old_interval) @@ -364,20 +402,20 @@ class ThreadTests(unittest.TestCase): weak_cyclic_object = weakref.ref(cyclic_object) cyclic_object.thread.join() del cyclic_object - self.assertEquals(None, weak_cyclic_object(), - msg=('%d references still around' % - sys.getrefcount(weak_cyclic_object()))) + self.assertEqual(None, weak_cyclic_object(), + msg=('%d references still around' % + sys.getrefcount(weak_cyclic_object()))) raising_cyclic_object = RunSelfFunction(should_raise=True) weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) raising_cyclic_object.thread.join() del raising_cyclic_object - self.assertEquals(None, weak_raising_cyclic_object(), - msg=('%d references still around' % - sys.getrefcount(weak_raising_cyclic_object()))) + self.assertEqual(None, weak_raising_cyclic_object(), + msg=('%d references still around' % + sys.getrefcount(weak_raising_cyclic_object()))) -class ThreadJoinOnShutdown(unittest.TestCase): +class ThreadJoinOnShutdown(BaseTestCase): def _run_and_join(self, script): script = """if 1: @@ -389,13 +427,13 @@ class ThreadJoinOnShutdown(unittest.TestCase): print 'end of thread' \n""" + script - import subprocess p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) rc = p.wait() data = p.stdout.read().replace('\r', '') + p.stdout.close() self.assertEqual(data, "end of main\nend of thread\n") - self.failIf(rc == 2, "interpreter was blocked") - self.failUnless(rc == 0, "Unexpected error") + self.assertFalse(rc == 2, "interpreter was blocked") + self.assertTrue(rc == 0, "Unexpected error") def test_1_join_on_shutdown(self): # The usual case: on exit, wait for a non-daemon thread @@ -436,7 +474,8 @@ class ThreadJoinOnShutdown(unittest.TestCase): return # Skip platforms with known problems forking from a worker thread. # See http://bugs.python.org/issue3863. - if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): + if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', + 'os2emx'): print >>sys.stderr, ('Skipping test_3_join_in_forked_from_thread' ' due to known OS bugs on'), sys.platform return @@ -459,8 +498,154 @@ class ThreadJoinOnShutdown(unittest.TestCase): """ self._run_and_join(script) + def assertScriptHasOutput(self, script, expected_output): + p = subprocess.Popen([sys.executable, "-c", script], + stdout=subprocess.PIPE) + rc = p.wait() + data = p.stdout.read().decode().replace('\r', '') + self.assertEqual(rc, 0, "Unexpected error") + self.assertEqual(data, expected_output) + + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") + def test_4_joining_across_fork_in_worker_thread(self): + # There used to be a possible deadlock when forking from a child + # thread. See http://bugs.python.org/issue6643. + + # Skip platforms with known problems forking from a worker thread. + # See http://bugs.python.org/issue3863. + if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): + raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) + + # The script takes the following steps: + # - The main thread in the parent process starts a new thread and then + # tries to join it. + # - The join operation acquires the Lock inside the thread's _block + # Condition. (See threading.py:Thread.join().) + # - We stub out the acquire method on the condition to force it to wait + # until the child thread forks. (See LOCK ACQUIRED HERE) + # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS + # HERE) + # - The main thread of the parent process enters Condition.wait(), + # which releases the lock on the child thread. + # - The child process returns. Without the necessary fix, when the + # main thread of the child process (which used to be the child thread + # in the parent process) attempts to exit, it will try to acquire the + # lock in the Thread._block Condition object and hang, because the + # lock was held across the fork. + + script = """if 1: + import os, time, threading + + finish_join = False + start_fork = False + + def worker(): + # Wait until this thread's lock is acquired before forking to + # create the deadlock. + global finish_join + while not start_fork: + time.sleep(0.01) + # LOCK HELD: Main thread holds lock across this call. + childpid = os.fork() + finish_join = True + if childpid != 0: + # Parent process just waits for child. + os.waitpid(childpid, 0) + # Child process should just return. + + w = threading.Thread(target=worker) + + # Stub out the private condition variable's lock acquire method. + # This acquires the lock and then waits until the child has forked + # before returning, which will release the lock soon after. If + # someone else tries to fix this test case by acquiring this lock + # before forking instead of resetting it, the test case will + # deadlock when it shouldn't. + condition = w._block + orig_acquire = condition.acquire + call_count_lock = threading.Lock() + call_count = 0 + def my_acquire(): + global call_count + global start_fork + orig_acquire() # LOCK ACQUIRED HERE + start_fork = True + if call_count == 0: + while not finish_join: + time.sleep(0.01) # WORKER THREAD FORKS HERE + with call_count_lock: + call_count += 1 + condition.acquire = my_acquire + + w.start() + w.join() + print('end of main') + """ + self.assertScriptHasOutput(script, "end of main\n") + + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") + def test_5_clear_waiter_locks_to_avoid_crash(self): + # Check that a spawned thread that forks doesn't segfault on certain + # platforms, namely OS X. This used to happen if there was a waiter + # lock in the thread's condition variable's waiters list. Even though + # we know the lock will be held across the fork, it is not safe to + # release locks held across forks on all platforms, so releasing the + # waiter lock caused a segfault on OS X. Furthermore, since locks on + # OS X are (as of this writing) implemented with a mutex + condition + # variable instead of a semaphore, while we know that the Python-level + # lock will be acquired, we can't know if the internal mutex will be + # acquired at the time of the fork. + + # Skip platforms with known problems forking from a worker thread. + # See http://bugs.python.org/issue3863. + if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): + raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) + script = """if True: + import os, time, threading + + start_fork = False + + def worker(): + # Wait until the main thread has attempted to join this thread + # before continuing. + while not start_fork: + time.sleep(0.01) + childpid = os.fork() + if childpid != 0: + # Parent process just waits for child. + (cpid, rc) = os.waitpid(childpid, 0) + assert cpid == childpid + assert rc == 0 + print('end of worker thread') + else: + # Child process should just return. + pass + + w = threading.Thread(target=worker) + + # Stub out the private condition variable's _release_save method. + # This releases the condition's lock and flips the global that + # causes the worker to fork. At this point, the problematic waiter + # lock has been acquired once by the waiter and has been put onto + # the waiters list. + condition = w._block + orig_release_save = condition._release_save + def my_release_save(): + global start_fork + orig_release_save() + # Waiter lock held here, condition lock released. + start_fork = True + condition._release_save = my_release_save + + w.start() + w.join() + print('end of main thread') + """ + output = "end of worker thread\nend of main thread\n" + self.assertScriptHasOutput(script, output) + -class ThreadingExceptionTests(unittest.TestCase): +class ThreadingExceptionTests(BaseTestCase): # A RuntimeError should be raised if Thread.start() is called # multiple times. def test_start_thread_again(self): @@ -504,6 +689,37 @@ class SemaphoreTests(lock_tests.SemaphoreTests): class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): semtype = staticmethod(threading.BoundedSemaphore) + @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem') + def test_recursion_limit(self): + # Issue 9670 + # test that excessive recursion within a non-main thread causes + # an exception rather than crashing the interpreter on platforms + # like Mac OS X or FreeBSD which have small default stack sizes + # for threads + script = """if True: + import threading + + def recurse(): + return recurse() + + def outer(): + try: + recurse() + except RuntimeError: + pass + + w = threading.Thread(target=outer) + w.start() + w.join() + print('end of main thread') + """ + expected_output = "end of main thread\n" + p = subprocess.Popen([sys.executable, "-c", script], + stdout=subprocess.PIPE) + stdout, stderr = p.communicate() + data = stdout.decode().replace('\r', '') + self.assertEqual(p.returncode, 0, "Unexpected error") + self.assertEqual(data, expected_output) def test_main(): test.test_support.run_unittest(LockTests, RLockTests, EventTests, |
