summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_threading.py
diff options
context:
space:
mode:
authorTim Peters <tim.peters@gmail.com>2005-01-08 07:30:42 (GMT)
committerTim Peters <tim.peters@gmail.com>2005-01-08 07:30:42 (GMT)
commit711906e0c26c010600221da50aa930008b8a9447 (patch)
tree3e5ac91205cbd5f93a7d9efc669ebc3f6d6a1a1c /Lib/test/test_threading.py
parent84d548994ea58385e398baf1bf015aa6b258f7bb (diff)
downloadcpython-711906e0c26c010600221da50aa930008b8a9447.zip
cpython-711906e0c26c010600221da50aa930008b8a9447.tar.gz
cpython-711906e0c26c010600221da50aa930008b8a9447.tar.bz2
threading._DummyThread.__init__(): document obscure new code.
test_threading.test_foreign_thread(): new test does a basic check that "foreign" threads can using the threading module, and that they create a _DummyThread instance in at least one use case. This isn't a very good test, since a thread created by thread.start_new_thread() isn't particularly "foreign".
Diffstat (limited to 'Lib/test/test_threading.py')
-rw-r--r--Lib/test/test_threading.py23
1 files changed, 22 insertions, 1 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 8b17460..7eb9758 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -4,6 +4,7 @@ import test.test_support
from test.test_support import verbose
import random
import threading
+import thread
import time
import unittest
@@ -78,11 +79,31 @@ class ThreadTests(unittest.TestCase):
if verbose:
print 'waiting for all tasks to complete'
for t in threads:
- t.join()
+ t.join(NUMTASKS)
+ self.assert_(not t.isAlive())
if verbose:
print 'all tasks done'
self.assertEqual(numrunning.get(), 0)
+ def test_foreign_thread(self):
+ # Check that a "foreign" thread can use the threading module.
+ def f(mutex):
+ # Acquiring an RLock forces an entry for the foreign
+ # thread to get made in the threading._active map.
+ r = threading.RLock()
+ r.acquire()
+ r.release()
+ mutex.release()
+
+ mutex = threading.Lock()
+ mutex.acquire()
+ tid = thread.start_new_thread(f, (mutex,))
+ # Wait for the thread to finish.
+ mutex.acquire()
+ self.assert_(tid in threading._active)
+ self.assert_(isinstance(threading._active[tid],
+ threading._DummyThread))
+ del threading._active[tid]
def test_main():
test.test_support.run_unittest(ThreadTests)