summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_threading.py')
-rw-r--r--Lib/test/test_threading.py319
1 files changed, 179 insertions, 140 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index ef04cd3..17be84b 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -1,18 +1,19 @@
# Very rudimentary test of threading module
-import test.test_support
-from test.test_support import verbose
+import test.support
+from test.support import verbose, strip_python_stderr, import_module
from test.script_helper import assert_python_ok
import random
import re
import sys
-thread = test.test_support.import_module('thread')
-threading = test.test_support.import_module('threading')
+_thread = import_module('_thread')
+threading = import_module('threading')
import time
import unittest
import weakref
import os
+from test.script_helper import assert_python_ok, assert_python_failure
import subprocess
from test import lock_tests
@@ -39,34 +40,35 @@ class TestThread(threading.Thread):
def run(self):
delay = random.random() / 10000.0
if verbose:
- print 'task %s will run for %.1f usec' % (
- self.name, delay * 1e6)
+ print('task %s will run for %.1f usec' %
+ (self.name, delay * 1e6))
with self.sema:
with self.mutex:
self.nrunning.inc()
if verbose:
- print self.nrunning.get(), 'tasks are running'
+ print(self.nrunning.get(), 'tasks are running')
self.testcase.assertTrue(self.nrunning.get() <= 3)
time.sleep(delay)
if verbose:
- print 'task', self.name, 'done'
+ print('task', self.name, 'done')
with self.mutex:
self.nrunning.dec()
self.testcase.assertTrue(self.nrunning.get() >= 0)
if verbose:
- print '%s is finished. %d tasks are running' % (
- self.name, self.nrunning.get())
+ print('%s is finished. %d tasks are running' %
+ (self.name, self.nrunning.get()))
+
class BaseTestCase(unittest.TestCase):
def setUp(self):
- self._threads = test.test_support.threading_setup()
+ self._threads = test.support.threading_setup()
def tearDown(self):
- test.test_support.threading_cleanup(*self._threads)
- test.test_support.reap_children()
+ test.support.threading_cleanup(*self._threads)
+ test.support.reap_children()
class ThreadTests(BaseTestCase):
@@ -93,15 +95,16 @@ class ThreadTests(BaseTestCase):
t.start()
if verbose:
- print 'waiting for all tasks to complete'
+ print('waiting for all tasks to complete')
for t in threads:
t.join(NUMTASKS)
self.assertTrue(not t.is_alive())
self.assertNotEqual(t.ident, 0)
self.assertFalse(t.ident is None)
- self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
+ self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>',
+ repr(t)))
if verbose:
- print 'all tasks done'
+ print('all tasks done')
self.assertEqual(numrunning.get(), 0)
def test_ident_of_no_threading_threads(self):
@@ -112,7 +115,7 @@ class ThreadTests(BaseTestCase):
done.set()
done = threading.Event()
ident = []
- thread.start_new_thread(f, ())
+ _thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0] is None)
# Kill the "immortal" _DummyThread
@@ -121,26 +124,24 @@ class ThreadTests(BaseTestCase):
# run with a small(ish) thread stack size (256kB)
def test_various_ops_small_stack(self):
if verbose:
- print 'with 256kB thread stack size...'
+ print('with 256kB thread stack size...')
try:
threading.stack_size(262144)
- except thread.error:
- if verbose:
- print 'platform does not support changing thread stack size'
- return
+ except _thread.error:
+ raise unittest.SkipTest(
+ 'platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
# run with a large thread stack size (1MB)
def test_various_ops_large_stack(self):
if verbose:
- print 'with 1MB thread stack size...'
+ print('with 1MB thread stack size...')
try:
threading.stack_size(0x100000)
- except thread.error:
- if verbose:
- print 'platform does not support changing thread stack size'
- return
+ except _thread.error:
+ raise unittest.SkipTest(
+ 'platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
@@ -154,7 +155,7 @@ class ThreadTests(BaseTestCase):
mutex = threading.Lock()
mutex.acquire()
- tid = thread.start_new_thread(f, (mutex,))
+ tid = _thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assertIn(tid, threading._active)
@@ -164,12 +165,7 @@ class ThreadTests(BaseTestCase):
# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
# exposed at the Python level. This test relies on ctypes to get at it.
def test_PyThreadState_SetAsyncExc(self):
- try:
- import ctypes
- except ImportError:
- if verbose:
- print "test_PyThreadState_SetAsyncExc can't import ctypes"
- return # can't do anything
+ ctypes = import_module("ctypes")
set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
@@ -179,7 +175,7 @@ class ThreadTests(BaseTestCase):
exception = ctypes.py_object(AsyncExc)
# First check it works when setting the exception from the same thread.
- tid = thread.get_ident()
+ tid = _thread.get_ident()
try:
result = set_async_exc(ctypes.c_long(tid), exception)
@@ -208,7 +204,7 @@ class ThreadTests(BaseTestCase):
class Worker(threading.Thread):
def run(self):
- self.id = thread.get_ident()
+ self.id = _thread.get_ident()
self.finished = False
try:
@@ -223,32 +219,32 @@ class ThreadTests(BaseTestCase):
t.daemon = True # so if this fails, we don't hang Python at shutdown
t.start()
if verbose:
- print " started worker thread"
+ print(" started worker thread")
# Try a thread id that doesn't make sense.
if verbose:
- print " trying nonsensical thread id"
+ print(" trying nonsensical thread id")
result = set_async_exc(ctypes.c_long(-1), exception)
self.assertEqual(result, 0) # no thread states modified
# Now raise an exception in the worker thread.
if verbose:
- print " waiting for worker thread to get started"
+ print(" waiting for worker thread to get started")
ret = worker_started.wait()
self.assertTrue(ret)
if verbose:
- print " verifying worker hasn't exited"
+ print(" verifying worker hasn't exited")
self.assertTrue(not t.finished)
if verbose:
- print " attempting to raise asynch exception in worker"
+ print(" attempting to raise asynch exception in worker")
result = set_async_exc(ctypes.c_long(t.id), exception)
self.assertEqual(result, 1) # one thread state modified
if verbose:
- print " waiting for worker to say it caught the exception"
+ print(" waiting for worker to say it caught the exception")
worker_saw_exception.wait(timeout=10)
self.assertTrue(t.finished)
if verbose:
- print " all OK -- joining worker"
+ print(" all OK -- joining worker")
if t.finished:
t.join()
# else the thread is still running, and we have no way to kill it
@@ -256,12 +252,12 @@ class ThreadTests(BaseTestCase):
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
- raise thread.error()
+ raise threading.ThreadError()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
- self.assertRaises(thread.error, t.start)
+ self.assertRaises(threading.ThreadError, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
@@ -272,18 +268,13 @@ class ThreadTests(BaseTestCase):
# Issue 1402: the PyGILState_Ensure / _Release functions may be called
# very late on python exit: on deallocation of a running thread for
# example.
- try:
- import ctypes
- except ImportError:
- if verbose:
- print("test_finalize_with_runnning_thread can't import ctypes")
- return # can't do anything
+ import_module("ctypes")
- rc = subprocess.call([sys.executable, "-c", """if 1:
- import ctypes, sys, time, thread
+ rc, out, err = assert_python_failure("-c", """if 1:
+ import ctypes, sys, time, _thread
# This lock is used as a simple event variable.
- ready = thread.allocate_lock()
+ ready = _thread.allocate_lock()
ready.acquire()
# Module globals are cleared before __del__ is run
@@ -300,16 +291,16 @@ class ThreadTests(BaseTestCase):
ready.release()
time.sleep(100)
- thread.start_new_thread(waitingThread, ())
+ _thread.start_new_thread(waitingThread, ())
ready.acquire() # Be sure the other thread is waiting.
sys.exit(42)
- """])
+ """)
self.assertEqual(rc, 42)
def test_finalize_with_trace(self):
# Issue1733757
# Avoid a deadlock when sys.settrace steps into threading._shutdown
- p = subprocess.Popen([sys.executable, "-c", """if 1:
+ assert_python_ok("-c", """if 1:
import sys, threading
# A deadlock-killer, to prevent the
@@ -317,7 +308,7 @@ class ThreadTests(BaseTestCase):
def killer():
import os, time
time.sleep(2)
- print 'program blocked; aborting'
+ print('program blocked; aborting')
os._exit(2)
t = threading.Thread(target=killer)
t.daemon = True
@@ -329,21 +320,12 @@ class ThreadTests(BaseTestCase):
return func
sys.settrace(func)
- """],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- stdout, stderr = p.communicate()
- rc = p.returncode
- self.assertFalse(rc == 2, "interpreted was blocked")
- self.assertTrue(rc == 0,
- "Unexpected error: " + repr(stderr))
+ """)
def test_join_nondaemon_on_shutdown(self):
# Issue 1722344
# Raising SystemExit skipped threading._shutdown
- p = subprocess.Popen([sys.executable, "-c", """if 1:
+ rc, out, err = assert_python_ok("-c", """if 1:
import threading
from time import sleep
@@ -351,31 +333,23 @@ class ThreadTests(BaseTestCase):
sleep(1)
# As a non-daemon thread we SHOULD wake up and nothing
# should be torn down yet
- print "Woke up, sleep function is:", sleep
+ print("Woke up, sleep function is:", sleep)
threading.Thread(target=child).start()
raise SystemExit
- """],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- stdout, stderr = p.communicate()
- self.assertEqual(stdout.strip(),
- "Woke up, sleep function is: <built-in function sleep>")
- stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
- self.assertEqual(stderr, "")
+ """)
+ self.assertEqual(out.strip(),
+ b"Woke up, sleep function is: <built-in function sleep>")
+ self.assertEqual(err, b"")
def test_enumerate_after_join(self):
# Try hard to trigger #1703448: a thread is still returned in
# threading.enumerate() after it has been join()ed.
enum = threading.enumerate
- old_interval = sys.getcheckinterval()
+ old_interval = sys.getswitchinterval()
try:
- for i in xrange(1, 100):
- # Try a couple times at each thread-switching interval
- # to get more interleavings.
- sys.setcheckinterval(i // 5)
+ for i in range(1, 100):
+ sys.setswitchinterval(i * 0.0002)
t = threading.Thread(target=lambda: None)
t.start()
t.join()
@@ -383,7 +357,7 @@ class ThreadTests(BaseTestCase):
self.assertNotIn(t, l,
"#1703448 triggered after %d trials: %s" % (i, l))
finally:
- sys.setcheckinterval(old_interval)
+ sys.setswitchinterval(old_interval)
def test_no_refcycle_through_target(self):
class RunSelfFunction(object):
@@ -404,7 +378,7 @@ class ThreadTests(BaseTestCase):
weak_cyclic_object = weakref.ref(cyclic_object)
cyclic_object.thread.join()
del cyclic_object
- self.assertEqual(None, weak_cyclic_object(),
+ self.assertIsNone(weak_cyclic_object(),
msg=('%d references still around' %
sys.getrefcount(weak_cyclic_object())))
@@ -412,16 +386,35 @@ class ThreadTests(BaseTestCase):
weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
raising_cyclic_object.thread.join()
del raising_cyclic_object
- self.assertEqual(None, weak_raising_cyclic_object(),
+ self.assertIsNone(weak_raising_cyclic_object(),
msg=('%d references still around' %
sys.getrefcount(weak_raising_cyclic_object())))
+ def test_old_threading_api(self):
+ # Just a quick sanity check to make sure the old method names are
+ # still present
+ t = threading.Thread()
+ t.isDaemon()
+ t.setDaemon(True)
+ t.getName()
+ t.setName("name")
+ t.isAlive()
+ e = threading.Event()
+ e.isSet()
+ threading.activeCount()
+
+ def test_repr_daemon(self):
+ t = threading.Thread()
+ self.assertFalse('daemon' in repr(t))
+ t.daemon = True
+ self.assertTrue('daemon' in repr(t))
+
@unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
def test_dummy_thread_after_fork(self):
# Issue #14308: a dummy thread in the active list doesn't mess up
# the after-fork mechanism.
code = """if 1:
- import thread, threading, os, time
+ import _thread, threading, os, time
def background_thread(evt):
# Creates and registers the _DummyThread instance
@@ -430,7 +423,7 @@ class ThreadTests(BaseTestCase):
time.sleep(10)
evt = threading.Event()
- thread.start_new_thread(background_thread, (evt,))
+ _thread.start_new_thread(background_thread, (evt,))
evt.wait()
assert threading.active_count() == 2, threading.active_count()
if os.fork() == 0:
@@ -440,8 +433,8 @@ class ThreadTests(BaseTestCase):
os.wait()
"""
_, out, err = assert_python_ok("-c", code)
- self.assertEqual(out, '')
- self.assertEqual(err, '')
+ self.assertEqual(out, b'')
+ self.assertEqual(err, b'')
class ThreadJoinOnShutdown(BaseTestCase):
@@ -460,16 +453,15 @@ class ThreadJoinOnShutdown(BaseTestCase):
# a thread, which waits for the main program to terminate
def joiningfunc(mainthread):
mainthread.join()
- print 'end of thread'
+ print('end of thread')
+ # stdout is fully buffered because not a tty, we have to flush
+ # before exit.
+ sys.stdout.flush()
\n""" + script
- p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
- rc = p.wait()
- data = p.stdout.read().replace('\r', '')
- p.stdout.close()
+ rc, out, err = assert_python_ok("-c", script)
+ data = out.decode().replace('\r', '')
self.assertEqual(data, "end of main\nend of thread\n")
- self.assertFalse(rc == 2, "interpreter was blocked")
- self.assertTrue(rc == 0, "Unexpected error")
def test_1_join_on_shutdown(self):
# The usual case: on exit, wait for a non-daemon thread
@@ -479,11 +471,10 @@ class ThreadJoinOnShutdown(BaseTestCase):
args=(threading.current_thread(),))
t.start()
time.sleep(0.1)
- print 'end of main'
+ print('end of main')
"""
self._run_and_join(script)
-
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
def test_2_join_in_forked_process(self):
@@ -497,7 +488,7 @@ class ThreadJoinOnShutdown(BaseTestCase):
t = threading.Thread(target=joiningfunc,
args=(threading.current_thread(),))
t.start()
- print 'end of main'
+ print('end of main')
"""
self._run_and_join(script)
@@ -506,6 +497,7 @@ class ThreadJoinOnShutdown(BaseTestCase):
def test_3_join_in_forked_from_thread(self):
# Like the test above, but fork() was called from a worker thread
# In the forked process, the main Thread object must be marked as stopped.
+
script = """if 1:
main_thread = threading.current_thread()
def worker():
@@ -516,7 +508,7 @@ class ThreadJoinOnShutdown(BaseTestCase):
t = threading.Thread(target=joiningfunc,
args=(main_thread,))
- print 'end of main'
+ print('end of main')
t.start()
t.join() # Should not block: main_thread is already stopped
@@ -526,11 +518,8 @@ class ThreadJoinOnShutdown(BaseTestCase):
self._run_and_join(script)
def assertScriptHasOutput(self, script, expected_output):
- p = subprocess.Popen([sys.executable, "-c", script],
- stdout=subprocess.PIPE)
- rc = p.wait()
- data = p.stdout.read().decode().replace('\r', '')
- self.assertEqual(rc, 0, "Unexpected error")
+ rc, out, err = assert_python_ok("-c", script)
+ data = out.decode().replace('\r', '')
self.assertEqual(data, expected_output)
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
@@ -664,6 +653,49 @@ class ThreadJoinOnShutdown(BaseTestCase):
output = "end of worker thread\nend of main thread\n"
self.assertScriptHasOutput(script, output)
+ @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
+ def test_6_daemon_threads(self):
+ # Check that a daemon thread cannot crash the interpreter on shutdown
+ # by manipulating internal structures that are being disposed of in
+ # the main thread.
+ script = """if True:
+ import os
+ import random
+ import sys
+ import time
+ import threading
+
+ thread_has_run = set()
+
+ def random_io():
+ '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
+ while True:
+ in_f = open(os.__file__, 'rb')
+ stuff = in_f.read(200)
+ null_f = open(os.devnull, 'wb')
+ null_f.write(stuff)
+ time.sleep(random.random() / 1995)
+ null_f.close()
+ in_f.close()
+ thread_has_run.add(threading.current_thread())
+
+ def main():
+ count = 0
+ for _ in range(40):
+ new_thread = threading.Thread(target=random_io)
+ new_thread.daemon = True
+ new_thread.start()
+ count += 1
+ while len(thread_has_run) < count:
+ time.sleep(0.001)
+ # Trigger process shutdown
+ sys.exit(0)
+
+ main()
+ """
+ rc, out, err = assert_python_ok('-c', script)
+ self.assertFalse(err)
+
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
def test_reinit_tls_after_fork(self):
@@ -710,29 +742,6 @@ class ThreadingExceptionTests(BaseTestCase):
thread.start()
self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
-
-class LockTests(lock_tests.LockTests):
- locktype = staticmethod(threading.Lock)
-
-class RLockTests(lock_tests.RLockTests):
- locktype = staticmethod(threading.RLock)
-
-class EventTests(lock_tests.EventTests):
- eventtype = staticmethod(threading.Event)
-
-class ConditionAsRLockTests(lock_tests.RLockTests):
- # An Condition uses an RLock by default and exports its API.
- locktype = staticmethod(threading.Condition)
-
-class ConditionTests(lock_tests.ConditionTests):
- condtype = staticmethod(threading.Condition)
-
-class SemaphoreTests(lock_tests.SemaphoreTests):
- semtype = staticmethod(threading.Semaphore)
-
-class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
- semtype = staticmethod(threading.BoundedSemaphore)
-
@unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
def test_recursion_limit(self):
# Issue 9670
@@ -759,20 +768,50 @@ class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
"""
expected_output = "end of main thread\n"
p = subprocess.Popen([sys.executable, "-c", script],
- stdout=subprocess.PIPE)
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
data = stdout.decode().replace('\r', '')
- self.assertEqual(p.returncode, 0, "Unexpected error")
+ self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
self.assertEqual(data, expected_output)
+class LockTests(lock_tests.LockTests):
+ locktype = staticmethod(threading.Lock)
+
+class PyRLockTests(lock_tests.RLockTests):
+ locktype = staticmethod(threading._PyRLock)
+
+@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
+class CRLockTests(lock_tests.RLockTests):
+ locktype = staticmethod(threading._CRLock)
+
+class EventTests(lock_tests.EventTests):
+ eventtype = staticmethod(threading.Event)
+
+class ConditionAsRLockTests(lock_tests.RLockTests):
+ # An Condition uses an RLock by default and exports its API.
+ locktype = staticmethod(threading.Condition)
+
+class ConditionTests(lock_tests.ConditionTests):
+ condtype = staticmethod(threading.Condition)
+
+class SemaphoreTests(lock_tests.SemaphoreTests):
+ semtype = staticmethod(threading.Semaphore)
+
+class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
+ semtype = staticmethod(threading.BoundedSemaphore)
+
+class BarrierTests(lock_tests.BarrierTests):
+ barriertype = staticmethod(threading.Barrier)
+
def test_main():
- test.test_support.run_unittest(LockTests, RLockTests, EventTests,
- ConditionAsRLockTests, ConditionTests,
- SemaphoreTests, BoundedSemaphoreTests,
- ThreadTests,
- ThreadJoinOnShutdown,
- ThreadingExceptionTests,
- )
+ test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests,
+ ConditionAsRLockTests, ConditionTests,
+ SemaphoreTests, BoundedSemaphoreTests,
+ ThreadTests,
+ ThreadJoinOnShutdown,
+ ThreadingExceptionTests,
+ BarrierTests
+ )
if __name__ == "__main__":
test_main()