summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_threading.py')
-rw-r--r--Lib/test/test_threading.py55
1 files changed, 37 insertions, 18 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 4b75ea6..b630509 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -3,8 +3,9 @@ Tests for the threading module.
"""
import test.support
-from test.support import verbose, strip_python_stderr, import_module, cpython_only
-from test.script_helper import assert_python_ok, assert_python_failure
+from test.support import (verbose, import_module, cpython_only,
+ requires_type_collecting)
+from test.support.script_helper import assert_python_ok, assert_python_failure
import random
import re
@@ -58,7 +59,7 @@ class TestThread(threading.Thread):
self.nrunning.inc()
if verbose:
print(self.nrunning.get(), 'tasks are running')
- self.testcase.assertTrue(self.nrunning.get() <= 3)
+ self.testcase.assertLessEqual(self.nrunning.get(), 3)
time.sleep(delay)
if verbose:
@@ -66,7 +67,7 @@ class TestThread(threading.Thread):
with self.mutex:
self.nrunning.dec()
- self.testcase.assertTrue(self.nrunning.get() >= 0)
+ self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
if verbose:
print('%s is finished. %d tasks are running' %
(self.name, self.nrunning.get()))
@@ -100,26 +101,25 @@ class ThreadTests(BaseTestCase):
for i in range(NUMTASKS):
t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
threads.append(t)
- self.assertEqual(t.ident, None)
- self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
+ self.assertIsNone(t.ident)
+ self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
t.start()
if verbose:
print('waiting for all tasks to complete')
for t in threads:
t.join()
- self.assertTrue(not t.is_alive())
+ self.assertFalse(t.is_alive())
self.assertNotEqual(t.ident, 0)
- self.assertFalse(t.ident is None)
- self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>',
- repr(t)))
+ self.assertIsNotNone(t.ident)
+ self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
if verbose:
print('all tasks done')
self.assertEqual(numrunning.get(), 0)
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
- self.assertFalse(threading.currentThread().ident is None)
+ self.assertIsNotNone(threading.currentThread().ident)
def f():
ident.append(threading.currentThread().ident)
done.set()
@@ -127,7 +127,7 @@ class ThreadTests(BaseTestCase):
ident = []
_thread.start_new_thread(f, ())
done.wait()
- self.assertFalse(ident[0] is None)
+ self.assertIsNotNone(ident[0])
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
@@ -244,7 +244,7 @@ class ThreadTests(BaseTestCase):
self.assertTrue(ret)
if verbose:
print(" verifying worker hasn't exited")
- self.assertTrue(not t.finished)
+ self.assertFalse(t.finished)
if verbose:
print(" attempting to raise asynch exception in worker")
result = set_async_exc(ctypes.c_long(t.id), exception)
@@ -415,9 +415,9 @@ class ThreadTests(BaseTestCase):
def test_repr_daemon(self):
t = threading.Thread()
- self.assertFalse('daemon' in repr(t))
+ self.assertNotIn('daemon', repr(t))
t.daemon = True
- self.assertTrue('daemon' in repr(t))
+ self.assertIn('daemon', repr(t))
def test_deamon_param(self):
t = threading.Thread()
@@ -569,7 +569,7 @@ class ThreadTests(BaseTestCase):
tstate_lock.release()
self.assertFalse(t.is_alive())
# And verify the thread disposed of _tstate_lock.
- self.assertTrue(t._tstate_lock is None)
+ self.assertIsNone(t._tstate_lock)
def test_repr_stopped(self):
# Verify that "stopped" shows up in repr(Thread) appropriately.
@@ -945,7 +945,7 @@ class ThreadingExceptionTests(BaseTestCase):
def outer():
try:
recurse()
- except RuntimeError:
+ except RecursionError:
pass
w = threading.Thread(target=outer)
@@ -988,6 +988,7 @@ class ThreadingExceptionTests(BaseTestCase):
self.assertIn("ZeroDivisionError", err)
self.assertNotIn("Unhandled exception", err)
+ @requires_type_collecting
def test_print_exception_stderr_is_none_1(self):
script = r"""if True:
import sys
@@ -1042,6 +1043,24 @@ class ThreadingExceptionTests(BaseTestCase):
self.assertEqual(out, b'')
self.assertNotIn("Unhandled exception", err.decode())
+ def test_bare_raise_in_brand_new_thread(self):
+ def bare_raise():
+ raise
+
+ class Issue27558(threading.Thread):
+ exc = None
+
+ def run(self):
+ try:
+ bare_raise()
+ except Exception as exc:
+ self.exc = exc
+
+ thread = Issue27558()
+ thread.start()
+ thread.join()
+ self.assertIsNotNone(thread.exc)
+ self.assertIsInstance(thread.exc, RuntimeError)
class TimerTests(BaseTestCase):
@@ -1083,7 +1102,7 @@ class EventTests(lock_tests.EventTests):
eventtype = staticmethod(threading.Event)
class ConditionAsRLockTests(lock_tests.RLockTests):
- # An Condition uses an RLock by default and exports its API.
+ # Condition uses an RLock by default and exports its API.
locktype = staticmethod(threading.Condition)
class ConditionTests(lock_tests.ConditionTests):