summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_threading.py')
-rw-r--r--Lib/test/test_threading.py277
1 files changed, 242 insertions, 35 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index cc9da05..054df7b 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -3,11 +3,15 @@
import test.test_support
from test.test_support import verbose
import random
+import re
import sys
import threading
import thread
import time
import unittest
+import weakref
+
+from test import lock_tests
# A trivial mutable counter.
class Counter(object):
@@ -29,32 +33,28 @@ class TestThread(threading.Thread):
self.nrunning = nrunning
def run(self):
- delay = random.random() * 2
+ delay = random.random() / 10000.0
if verbose:
- print 'task', self.getName(), 'will run for', delay, 'sec'
-
- self.sema.acquire()
+ print 'task %s will run for %.1f usec' % (
+ self.name, delay * 1e6)
- self.mutex.acquire()
- self.nrunning.inc()
- if verbose:
- print self.nrunning.get(), 'tasks are running'
- self.testcase.assert_(self.nrunning.get() <= 3)
- self.mutex.release()
-
- time.sleep(delay)
- if verbose:
- print 'task', self.getName(), 'done'
+ with self.sema:
+ with self.mutex:
+ self.nrunning.inc()
+ if verbose:
+ print self.nrunning.get(), 'tasks are running'
+ self.testcase.assert_(self.nrunning.get() <= 3)
- self.mutex.acquire()
- self.nrunning.dec()
- self.testcase.assert_(self.nrunning.get() >= 0)
- if verbose:
- print self.getName(), 'is finished.', self.nrunning.get(), \
- 'tasks are running'
- self.mutex.release()
+ time.sleep(delay)
+ if verbose:
+ print 'task', self.name, 'done'
- self.sema.release()
+ with self.mutex:
+ self.nrunning.dec()
+ self.testcase.assert_(self.nrunning.get() >= 0)
+ if verbose:
+ print '%s is finished. %d tasks are running' % (
+ self.name, self.nrunning.get())
class ThreadTests(unittest.TestCase):
@@ -75,17 +75,36 @@ class ThreadTests(unittest.TestCase):
for i in range(NUMTASKS):
t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
threads.append(t)
+ self.failUnlessEqual(t.ident, None)
+ self.assert_(re.match('<TestThread\(.*, initial\)>', repr(t)))
t.start()
if verbose:
print 'waiting for all tasks to complete'
for t in threads:
t.join(NUMTASKS)
- self.assert_(not t.isAlive())
+ self.assert_(not t.is_alive())
+ self.failIfEqual(t.ident, 0)
+ self.assertFalse(t.ident is None)
+ self.assert_(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
if verbose:
print 'all tasks done'
self.assertEqual(numrunning.get(), 0)
+ def test_ident_of_no_threading_threads(self):
+ # The ident still must work for the main thread and dummy threads.
+ self.assertFalse(threading.currentThread().ident is None)
+ def f():
+ ident.append(threading.currentThread().ident)
+ done.set()
+ done = threading.Event()
+ ident = []
+ thread.start_new_thread(f, ())
+ done.wait()
+ self.assertFalse(ident[0] is None)
+ # Kill the "immortal" _DummyThread
+ del threading._active[ident[0]]
+
# run with a small(ish) thread stack size (256kB)
def test_various_ops_small_stack(self):
if verbose:
@@ -115,11 +134,9 @@ class ThreadTests(unittest.TestCase):
def test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module.
def f(mutex):
- # Acquiring an RLock forces an entry for the foreign
+ # Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map.
- r = threading.RLock()
- r.acquire()
- r.release()
+ threading.current_thread()
mutex.release()
mutex = threading.Lock()
@@ -170,7 +187,7 @@ class ThreadTests(unittest.TestCase):
worker_saw_exception.set()
t = Worker()
- t.setDaemon(True) # so if this fails, we don't hang Python at shutdown
+ t.daemon = True # so if this fails, we don't hang Python at shutdown
t.start()
if verbose:
print " started worker thread"
@@ -202,14 +219,123 @@ class ThreadTests(unittest.TestCase):
t.join()
# else the thread is still running, and we have no way to kill it
+ def test_limbo_cleanup(self):
+ # Issue 7481: Failure to start thread should cleanup the limbo map.
+ def fail_new_thread(*args):
+ raise thread.error()
+ _start_new_thread = threading._start_new_thread
+ threading._start_new_thread = fail_new_thread
+ try:
+ t = threading.Thread(target=lambda: None)
+ self.assertRaises(thread.error, t.start)
+ self.assertFalse(
+ t in threading._limbo,
+ "Failed to cleanup _limbo map on failure of Thread.start().")
+ finally:
+ threading._start_new_thread = _start_new_thread
+
+ def test_finalize_runnning_thread(self):
+ # Issue 1402: the PyGILState_Ensure / _Release functions may be called
+ # very late on python exit: on deallocation of a running thread for
+ # example.
+ try:
+ import ctypes
+ except ImportError:
+ if verbose:
+ print("test_finalize_with_runnning_thread can't import ctypes")
+ return # can't do anything
+
+ import subprocess
+ rc = subprocess.call([sys.executable, "-c", """if 1:
+ import ctypes, sys, time, thread
+
+ # This lock is used as a simple event variable.
+ ready = thread.allocate_lock()
+ ready.acquire()
+
+ # Module globals are cleared before __del__ is run
+ # So we save the functions in class dict
+ class C:
+ ensure = ctypes.pythonapi.PyGILState_Ensure
+ release = ctypes.pythonapi.PyGILState_Release
+ def __del__(self):
+ state = self.ensure()
+ self.release(state)
+
+ def waitingThread():
+ x = C()
+ ready.release()
+ time.sleep(100)
+
+ thread.start_new_thread(waitingThread, ())
+ ready.acquire() # Be sure the other thread is waiting.
+ sys.exit(42)
+ """])
+ self.assertEqual(rc, 42)
+
+ def test_finalize_with_trace(self):
+ # Issue1733757
+ # Avoid a deadlock when sys.settrace steps into threading._shutdown
+ import subprocess
+ rc = subprocess.call([sys.executable, "-c", """if 1:
+ import sys, threading
+
+ # A deadlock-killer, to prevent the
+ # testsuite to hang forever
+ def killer():
+ import os, time
+ time.sleep(2)
+ print 'program blocked; aborting'
+ os._exit(2)
+ t = threading.Thread(target=killer)
+ t.daemon = True
+ t.start()
+
+ # This is the trace function
+ def func(frame, event, arg):
+ threading.current_thread()
+ return func
+
+ sys.settrace(func)
+ """])
+ self.failIf(rc == 2, "interpreted was blocked")
+ self.failUnless(rc == 0, "Unexpected error")
+
+ def test_join_nondaemon_on_shutdown(self):
+ # Issue 1722344
+ # Raising SystemExit skipped threading._shutdown
+ import subprocess
+ p = subprocess.Popen([sys.executable, "-c", """if 1:
+ import threading
+ from time import sleep
+
+ def child():
+ sleep(1)
+ # As a non-daemon thread we SHOULD wake up and nothing
+ # should be torn down yet
+ print "Woke up, sleep function is:", sleep
+
+ threading.Thread(target=child).start()
+ raise SystemExit
+ """],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ self.assertEqual(stdout.strip(),
+ "Woke up, sleep function is: <built-in function sleep>")
+ stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
+ self.assertEqual(stderr, "")
+
def test_enumerate_after_join(self):
# Try hard to trigger #1703448: a thread is still returned in
# threading.enumerate() after it has been join()ed.
enum = threading.enumerate
old_interval = sys.getcheckinterval()
- sys.setcheckinterval(1)
try:
- for i in xrange(1, 1000):
+ for i in xrange(1, 100):
+ # Try a couple times at each thread-switching interval
+ # to get more interleavings.
+ sys.setcheckinterval(i // 5)
t = threading.Thread(target=lambda: None)
t.start()
t.join()
@@ -219,6 +345,37 @@ class ThreadTests(unittest.TestCase):
finally:
sys.setcheckinterval(old_interval)
+ def test_no_refcycle_through_target(self):
+ class RunSelfFunction(object):
+ def __init__(self, should_raise):
+ # The links in this refcycle from Thread back to self
+ # should be cleaned up when the thread completes.
+ self.should_raise = should_raise
+ self.thread = threading.Thread(target=self._run,
+ args=(self,),
+ kwargs={'yet_another':self})
+ self.thread.start()
+
+ def _run(self, other_ref, yet_another):
+ if self.should_raise:
+ raise SystemExit
+
+ cyclic_object = RunSelfFunction(should_raise=False)
+ weak_cyclic_object = weakref.ref(cyclic_object)
+ cyclic_object.thread.join()
+ del cyclic_object
+ self.assertEquals(None, weak_cyclic_object(),
+ msg=('%d references still around' %
+ sys.getrefcount(weak_cyclic_object())))
+
+ raising_cyclic_object = RunSelfFunction(should_raise=True)
+ weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
+ raising_cyclic_object.thread.join()
+ del raising_cyclic_object
+ self.assertEquals(None, weak_raising_cyclic_object(),
+ msg=('%d references still around' %
+ sys.getrefcount(weak_raising_cyclic_object())))
+
class ThreadJoinOnShutdown(unittest.TestCase):
@@ -245,7 +402,7 @@ class ThreadJoinOnShutdown(unittest.TestCase):
script = """if 1:
import os
t = threading.Thread(target=joiningfunc,
- args=(threading.currentThread(),))
+ args=(threading.current_thread(),))
t.start()
time.sleep(0.1)
print 'end of main'
@@ -265,7 +422,7 @@ class ThreadJoinOnShutdown(unittest.TestCase):
sys.exit(0)
t = threading.Thread(target=joiningfunc,
- args=(threading.currentThread(),))
+ args=(threading.current_thread(),))
t.start()
print 'end of main'
"""
@@ -284,7 +441,7 @@ class ThreadJoinOnShutdown(unittest.TestCase):
' due to known OS bugs on'), sys.platform
return
script = """if 1:
- main_thread = threading.currentThread()
+ main_thread = threading.current_thread()
def worker():
childpid = os.fork()
if childpid != 0:
@@ -303,9 +460,59 @@ class ThreadJoinOnShutdown(unittest.TestCase):
self._run_and_join(script)
+class ThreadingExceptionTests(unittest.TestCase):
+ # A RuntimeError should be raised if Thread.start() is called
+ # multiple times.
+ def test_start_thread_again(self):
+ thread = threading.Thread()
+ thread.start()
+ self.assertRaises(RuntimeError, thread.start)
+
+ def test_joining_current_thread(self):
+ current_thread = threading.current_thread()
+ self.assertRaises(RuntimeError, current_thread.join);
+
+ def test_joining_inactive_thread(self):
+ thread = threading.Thread()
+ self.assertRaises(RuntimeError, thread.join)
+
+ def test_daemonize_active_thread(self):
+ thread = threading.Thread()
+ thread.start()
+ self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
+
+
+class LockTests(lock_tests.LockTests):
+ locktype = staticmethod(threading.Lock)
+
+class RLockTests(lock_tests.RLockTests):
+ locktype = staticmethod(threading.RLock)
+
+class EventTests(lock_tests.EventTests):
+ eventtype = staticmethod(threading.Event)
+
+class ConditionAsRLockTests(lock_tests.RLockTests):
+ # An Condition uses an RLock by default and exports its API.
+ locktype = staticmethod(threading.Condition)
+
+class ConditionTests(lock_tests.ConditionTests):
+ condtype = staticmethod(threading.Condition)
+
+class SemaphoreTests(lock_tests.SemaphoreTests):
+ semtype = staticmethod(threading.Semaphore)
+
+class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
+ semtype = staticmethod(threading.BoundedSemaphore)
+
+
def test_main():
- test.test_support.run_unittest(ThreadTests,
- ThreadJoinOnShutdown)
+ test.test_support.run_unittest(LockTests, RLockTests, EventTests,
+ ConditionAsRLockTests, ConditionTests,
+ SemaphoreTests, BoundedSemaphoreTests,
+ ThreadTests,
+ ThreadJoinOnShutdown,
+ ThreadingExceptionTests,
+ )
if __name__ == "__main__":
test_main()