diff options
Diffstat (limited to 'Lib/test/test_threading.py')
| -rw-r--r-- | Lib/test/test_threading.py | 319 |
1 files changed, 179 insertions, 140 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index ef04cd3..17be84b 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -1,18 +1,19 @@ # Very rudimentary test of threading module -import test.test_support -from test.test_support import verbose +import test.support +from test.support import verbose, strip_python_stderr, import_module from test.script_helper import assert_python_ok import random import re import sys -thread = test.test_support.import_module('thread') -threading = test.test_support.import_module('threading') +_thread = import_module('_thread') +threading = import_module('threading') import time import unittest import weakref import os +from test.script_helper import assert_python_ok, assert_python_failure import subprocess from test import lock_tests @@ -39,34 +40,35 @@ class TestThread(threading.Thread): def run(self): delay = random.random() / 10000.0 if verbose: - print 'task %s will run for %.1f usec' % ( - self.name, delay * 1e6) + print('task %s will run for %.1f usec' % + (self.name, delay * 1e6)) with self.sema: with self.mutex: self.nrunning.inc() if verbose: - print self.nrunning.get(), 'tasks are running' + print(self.nrunning.get(), 'tasks are running') self.testcase.assertTrue(self.nrunning.get() <= 3) time.sleep(delay) if verbose: - print 'task', self.name, 'done' + print('task', self.name, 'done') with self.mutex: self.nrunning.dec() self.testcase.assertTrue(self.nrunning.get() >= 0) if verbose: - print '%s is finished. %d tasks are running' % ( - self.name, self.nrunning.get()) + print('%s is finished. %d tasks are running' % + (self.name, self.nrunning.get())) + class BaseTestCase(unittest.TestCase): def setUp(self): - self._threads = test.test_support.threading_setup() + self._threads = test.support.threading_setup() def tearDown(self): - test.test_support.threading_cleanup(*self._threads) - test.test_support.reap_children() + test.support.threading_cleanup(*self._threads) + test.support.reap_children() class ThreadTests(BaseTestCase): @@ -93,15 +95,16 @@ class ThreadTests(BaseTestCase): t.start() if verbose: - print 'waiting for all tasks to complete' + print('waiting for all tasks to complete') for t in threads: t.join(NUMTASKS) self.assertTrue(not t.is_alive()) self.assertNotEqual(t.ident, 0) self.assertFalse(t.ident is None) - self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t))) + self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>', + repr(t))) if verbose: - print 'all tasks done' + print('all tasks done') self.assertEqual(numrunning.get(), 0) def test_ident_of_no_threading_threads(self): @@ -112,7 +115,7 @@ class ThreadTests(BaseTestCase): done.set() done = threading.Event() ident = [] - thread.start_new_thread(f, ()) + _thread.start_new_thread(f, ()) done.wait() self.assertFalse(ident[0] is None) # Kill the "immortal" _DummyThread @@ -121,26 +124,24 @@ class ThreadTests(BaseTestCase): # run with a small(ish) thread stack size (256kB) def test_various_ops_small_stack(self): if verbose: - print 'with 256kB thread stack size...' + print('with 256kB thread stack size...') try: threading.stack_size(262144) - except thread.error: - if verbose: - print 'platform does not support changing thread stack size' - return + except _thread.error: + raise unittest.SkipTest( + 'platform does not support changing thread stack size') self.test_various_ops() threading.stack_size(0) # run with a large thread stack size (1MB) def test_various_ops_large_stack(self): if verbose: - print 'with 1MB thread stack size...' + print('with 1MB thread stack size...') try: threading.stack_size(0x100000) - except thread.error: - if verbose: - print 'platform does not support changing thread stack size' - return + except _thread.error: + raise unittest.SkipTest( + 'platform does not support changing thread stack size') self.test_various_ops() threading.stack_size(0) @@ -154,7 +155,7 @@ class ThreadTests(BaseTestCase): mutex = threading.Lock() mutex.acquire() - tid = thread.start_new_thread(f, (mutex,)) + tid = _thread.start_new_thread(f, (mutex,)) # Wait for the thread to finish. mutex.acquire() self.assertIn(tid, threading._active) @@ -164,12 +165,7 @@ class ThreadTests(BaseTestCase): # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) # exposed at the Python level. This test relies on ctypes to get at it. def test_PyThreadState_SetAsyncExc(self): - try: - import ctypes - except ImportError: - if verbose: - print "test_PyThreadState_SetAsyncExc can't import ctypes" - return # can't do anything + ctypes = import_module("ctypes") set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc @@ -179,7 +175,7 @@ class ThreadTests(BaseTestCase): exception = ctypes.py_object(AsyncExc) # First check it works when setting the exception from the same thread. - tid = thread.get_ident() + tid = _thread.get_ident() try: result = set_async_exc(ctypes.c_long(tid), exception) @@ -208,7 +204,7 @@ class ThreadTests(BaseTestCase): class Worker(threading.Thread): def run(self): - self.id = thread.get_ident() + self.id = _thread.get_ident() self.finished = False try: @@ -223,32 +219,32 @@ class ThreadTests(BaseTestCase): t.daemon = True # so if this fails, we don't hang Python at shutdown t.start() if verbose: - print " started worker thread" + print(" started worker thread") # Try a thread id that doesn't make sense. if verbose: - print " trying nonsensical thread id" + print(" trying nonsensical thread id") result = set_async_exc(ctypes.c_long(-1), exception) self.assertEqual(result, 0) # no thread states modified # Now raise an exception in the worker thread. if verbose: - print " waiting for worker thread to get started" + print(" waiting for worker thread to get started") ret = worker_started.wait() self.assertTrue(ret) if verbose: - print " verifying worker hasn't exited" + print(" verifying worker hasn't exited") self.assertTrue(not t.finished) if verbose: - print " attempting to raise asynch exception in worker" + print(" attempting to raise asynch exception in worker") result = set_async_exc(ctypes.c_long(t.id), exception) self.assertEqual(result, 1) # one thread state modified if verbose: - print " waiting for worker to say it caught the exception" + print(" waiting for worker to say it caught the exception") worker_saw_exception.wait(timeout=10) self.assertTrue(t.finished) if verbose: - print " all OK -- joining worker" + print(" all OK -- joining worker") if t.finished: t.join() # else the thread is still running, and we have no way to kill it @@ -256,12 +252,12 @@ class ThreadTests(BaseTestCase): def test_limbo_cleanup(self): # Issue 7481: Failure to start thread should cleanup the limbo map. def fail_new_thread(*args): - raise thread.error() + raise threading.ThreadError() _start_new_thread = threading._start_new_thread threading._start_new_thread = fail_new_thread try: t = threading.Thread(target=lambda: None) - self.assertRaises(thread.error, t.start) + self.assertRaises(threading.ThreadError, t.start) self.assertFalse( t in threading._limbo, "Failed to cleanup _limbo map on failure of Thread.start().") @@ -272,18 +268,13 @@ class ThreadTests(BaseTestCase): # Issue 1402: the PyGILState_Ensure / _Release functions may be called # very late on python exit: on deallocation of a running thread for # example. - try: - import ctypes - except ImportError: - if verbose: - print("test_finalize_with_runnning_thread can't import ctypes") - return # can't do anything + import_module("ctypes") - rc = subprocess.call([sys.executable, "-c", """if 1: - import ctypes, sys, time, thread + rc, out, err = assert_python_failure("-c", """if 1: + import ctypes, sys, time, _thread # This lock is used as a simple event variable. - ready = thread.allocate_lock() + ready = _thread.allocate_lock() ready.acquire() # Module globals are cleared before __del__ is run @@ -300,16 +291,16 @@ class ThreadTests(BaseTestCase): ready.release() time.sleep(100) - thread.start_new_thread(waitingThread, ()) + _thread.start_new_thread(waitingThread, ()) ready.acquire() # Be sure the other thread is waiting. sys.exit(42) - """]) + """) self.assertEqual(rc, 42) def test_finalize_with_trace(self): # Issue1733757 # Avoid a deadlock when sys.settrace steps into threading._shutdown - p = subprocess.Popen([sys.executable, "-c", """if 1: + assert_python_ok("-c", """if 1: import sys, threading # A deadlock-killer, to prevent the @@ -317,7 +308,7 @@ class ThreadTests(BaseTestCase): def killer(): import os, time time.sleep(2) - print 'program blocked; aborting' + print('program blocked; aborting') os._exit(2) t = threading.Thread(target=killer) t.daemon = True @@ -329,21 +320,12 @@ class ThreadTests(BaseTestCase): return func sys.settrace(func) - """], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - stdout, stderr = p.communicate() - rc = p.returncode - self.assertFalse(rc == 2, "interpreted was blocked") - self.assertTrue(rc == 0, - "Unexpected error: " + repr(stderr)) + """) def test_join_nondaemon_on_shutdown(self): # Issue 1722344 # Raising SystemExit skipped threading._shutdown - p = subprocess.Popen([sys.executable, "-c", """if 1: + rc, out, err = assert_python_ok("-c", """if 1: import threading from time import sleep @@ -351,31 +333,23 @@ class ThreadTests(BaseTestCase): sleep(1) # As a non-daemon thread we SHOULD wake up and nothing # should be torn down yet - print "Woke up, sleep function is:", sleep + print("Woke up, sleep function is:", sleep) threading.Thread(target=child).start() raise SystemExit - """], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - stdout, stderr = p.communicate() - self.assertEqual(stdout.strip(), - "Woke up, sleep function is: <built-in function sleep>") - stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip() - self.assertEqual(stderr, "") + """) + self.assertEqual(out.strip(), + b"Woke up, sleep function is: <built-in function sleep>") + self.assertEqual(err, b"") def test_enumerate_after_join(self): # Try hard to trigger #1703448: a thread is still returned in # threading.enumerate() after it has been join()ed. enum = threading.enumerate - old_interval = sys.getcheckinterval() + old_interval = sys.getswitchinterval() try: - for i in xrange(1, 100): - # Try a couple times at each thread-switching interval - # to get more interleavings. - sys.setcheckinterval(i // 5) + for i in range(1, 100): + sys.setswitchinterval(i * 0.0002) t = threading.Thread(target=lambda: None) t.start() t.join() @@ -383,7 +357,7 @@ class ThreadTests(BaseTestCase): self.assertNotIn(t, l, "#1703448 triggered after %d trials: %s" % (i, l)) finally: - sys.setcheckinterval(old_interval) + sys.setswitchinterval(old_interval) def test_no_refcycle_through_target(self): class RunSelfFunction(object): @@ -404,7 +378,7 @@ class ThreadTests(BaseTestCase): weak_cyclic_object = weakref.ref(cyclic_object) cyclic_object.thread.join() del cyclic_object - self.assertEqual(None, weak_cyclic_object(), + self.assertIsNone(weak_cyclic_object(), msg=('%d references still around' % sys.getrefcount(weak_cyclic_object()))) @@ -412,16 +386,35 @@ class ThreadTests(BaseTestCase): weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) raising_cyclic_object.thread.join() del raising_cyclic_object - self.assertEqual(None, weak_raising_cyclic_object(), + self.assertIsNone(weak_raising_cyclic_object(), msg=('%d references still around' % sys.getrefcount(weak_raising_cyclic_object()))) + def test_old_threading_api(self): + # Just a quick sanity check to make sure the old method names are + # still present + t = threading.Thread() + t.isDaemon() + t.setDaemon(True) + t.getName() + t.setName("name") + t.isAlive() + e = threading.Event() + e.isSet() + threading.activeCount() + + def test_repr_daemon(self): + t = threading.Thread() + self.assertFalse('daemon' in repr(t)) + t.daemon = True + self.assertTrue('daemon' in repr(t)) + @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()') def test_dummy_thread_after_fork(self): # Issue #14308: a dummy thread in the active list doesn't mess up # the after-fork mechanism. code = """if 1: - import thread, threading, os, time + import _thread, threading, os, time def background_thread(evt): # Creates and registers the _DummyThread instance @@ -430,7 +423,7 @@ class ThreadTests(BaseTestCase): time.sleep(10) evt = threading.Event() - thread.start_new_thread(background_thread, (evt,)) + _thread.start_new_thread(background_thread, (evt,)) evt.wait() assert threading.active_count() == 2, threading.active_count() if os.fork() == 0: @@ -440,8 +433,8 @@ class ThreadTests(BaseTestCase): os.wait() """ _, out, err = assert_python_ok("-c", code) - self.assertEqual(out, '') - self.assertEqual(err, '') + self.assertEqual(out, b'') + self.assertEqual(err, b'') class ThreadJoinOnShutdown(BaseTestCase): @@ -460,16 +453,15 @@ class ThreadJoinOnShutdown(BaseTestCase): # a thread, which waits for the main program to terminate def joiningfunc(mainthread): mainthread.join() - print 'end of thread' + print('end of thread') + # stdout is fully buffered because not a tty, we have to flush + # before exit. + sys.stdout.flush() \n""" + script - p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) - rc = p.wait() - data = p.stdout.read().replace('\r', '') - p.stdout.close() + rc, out, err = assert_python_ok("-c", script) + data = out.decode().replace('\r', '') self.assertEqual(data, "end of main\nend of thread\n") - self.assertFalse(rc == 2, "interpreter was blocked") - self.assertTrue(rc == 0, "Unexpected error") def test_1_join_on_shutdown(self): # The usual case: on exit, wait for a non-daemon thread @@ -479,11 +471,10 @@ class ThreadJoinOnShutdown(BaseTestCase): args=(threading.current_thread(),)) t.start() time.sleep(0.1) - print 'end of main' + print('end of main') """ self._run_and_join(script) - @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") def test_2_join_in_forked_process(self): @@ -497,7 +488,7 @@ class ThreadJoinOnShutdown(BaseTestCase): t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) t.start() - print 'end of main' + print('end of main') """ self._run_and_join(script) @@ -506,6 +497,7 @@ class ThreadJoinOnShutdown(BaseTestCase): def test_3_join_in_forked_from_thread(self): # Like the test above, but fork() was called from a worker thread # In the forked process, the main Thread object must be marked as stopped. + script = """if 1: main_thread = threading.current_thread() def worker(): @@ -516,7 +508,7 @@ class ThreadJoinOnShutdown(BaseTestCase): t = threading.Thread(target=joiningfunc, args=(main_thread,)) - print 'end of main' + print('end of main') t.start() t.join() # Should not block: main_thread is already stopped @@ -526,11 +518,8 @@ class ThreadJoinOnShutdown(BaseTestCase): self._run_and_join(script) def assertScriptHasOutput(self, script, expected_output): - p = subprocess.Popen([sys.executable, "-c", script], - stdout=subprocess.PIPE) - rc = p.wait() - data = p.stdout.read().decode().replace('\r', '') - self.assertEqual(rc, 0, "Unexpected error") + rc, out, err = assert_python_ok("-c", script) + data = out.decode().replace('\r', '') self.assertEqual(data, expected_output) @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") @@ -664,6 +653,49 @@ class ThreadJoinOnShutdown(BaseTestCase): output = "end of worker thread\nend of main thread\n" self.assertScriptHasOutput(script, output) + @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") + def test_6_daemon_threads(self): + # Check that a daemon thread cannot crash the interpreter on shutdown + # by manipulating internal structures that are being disposed of in + # the main thread. + script = """if True: + import os + import random + import sys + import time + import threading + + thread_has_run = set() + + def random_io(): + '''Loop for a while sleeping random tiny amounts and doing some I/O.''' + while True: + in_f = open(os.__file__, 'rb') + stuff = in_f.read(200) + null_f = open(os.devnull, 'wb') + null_f.write(stuff) + time.sleep(random.random() / 1995) + null_f.close() + in_f.close() + thread_has_run.add(threading.current_thread()) + + def main(): + count = 0 + for _ in range(40): + new_thread = threading.Thread(target=random_io) + new_thread.daemon = True + new_thread.start() + count += 1 + while len(thread_has_run) < count: + time.sleep(0.001) + # Trigger process shutdown + sys.exit(0) + + main() + """ + rc, out, err = assert_python_ok('-c', script) + self.assertFalse(err) + @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") def test_reinit_tls_after_fork(self): @@ -710,29 +742,6 @@ class ThreadingExceptionTests(BaseTestCase): thread.start() self.assertRaises(RuntimeError, setattr, thread, "daemon", True) - -class LockTests(lock_tests.LockTests): - locktype = staticmethod(threading.Lock) - -class RLockTests(lock_tests.RLockTests): - locktype = staticmethod(threading.RLock) - -class EventTests(lock_tests.EventTests): - eventtype = staticmethod(threading.Event) - -class ConditionAsRLockTests(lock_tests.RLockTests): - # An Condition uses an RLock by default and exports its API. - locktype = staticmethod(threading.Condition) - -class ConditionTests(lock_tests.ConditionTests): - condtype = staticmethod(threading.Condition) - -class SemaphoreTests(lock_tests.SemaphoreTests): - semtype = staticmethod(threading.Semaphore) - -class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): - semtype = staticmethod(threading.BoundedSemaphore) - @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem') def test_recursion_limit(self): # Issue 9670 @@ -759,20 +768,50 @@ class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): """ expected_output = "end of main thread\n" p = subprocess.Popen([sys.executable, "-c", script], - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() data = stdout.decode().replace('\r', '') - self.assertEqual(p.returncode, 0, "Unexpected error") + self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode()) self.assertEqual(data, expected_output) +class LockTests(lock_tests.LockTests): + locktype = staticmethod(threading.Lock) + +class PyRLockTests(lock_tests.RLockTests): + locktype = staticmethod(threading._PyRLock) + +@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C') +class CRLockTests(lock_tests.RLockTests): + locktype = staticmethod(threading._CRLock) + +class EventTests(lock_tests.EventTests): + eventtype = staticmethod(threading.Event) + +class ConditionAsRLockTests(lock_tests.RLockTests): + # An Condition uses an RLock by default and exports its API. + locktype = staticmethod(threading.Condition) + +class ConditionTests(lock_tests.ConditionTests): + condtype = staticmethod(threading.Condition) + +class SemaphoreTests(lock_tests.SemaphoreTests): + semtype = staticmethod(threading.Semaphore) + +class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): + semtype = staticmethod(threading.BoundedSemaphore) + +class BarrierTests(lock_tests.BarrierTests): + barriertype = staticmethod(threading.Barrier) + def test_main(): - test.test_support.run_unittest(LockTests, RLockTests, EventTests, - ConditionAsRLockTests, ConditionTests, - SemaphoreTests, BoundedSemaphoreTests, - ThreadTests, - ThreadJoinOnShutdown, - ThreadingExceptionTests, - ) + test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests, + ConditionAsRLockTests, ConditionTests, + SemaphoreTests, BoundedSemaphoreTests, + ThreadTests, + ThreadJoinOnShutdown, + ThreadingExceptionTests, + BarrierTests + ) if __name__ == "__main__": test_main() |
