diff options
-rw-r--r-- | Lib/test/test_dummy_thread.py | 16 | ||||
-rw-r--r-- | Lib/test/test_threading.py | 27 | ||||
-rw-r--r-- | Lib/test/test_threading_local.py | 2 |
3 files changed, 22 insertions, 23 deletions
diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py index 2fafe1d..a6a2856 100644 --- a/Lib/test/test_dummy_thread.py +++ b/Lib/test/test_dummy_thread.py @@ -24,14 +24,14 @@ class LockTests(unittest.TestCase): def test_initlock(self): #Make sure locks start locked - self.assertTrue(not self.lock.locked(), + self.assertFalse(self.lock.locked(), "Lock object is not initialized unlocked.") def test_release(self): # Test self.lock.release() self.lock.acquire() self.lock.release() - self.assertTrue(not self.lock.locked(), + self.assertFalse(self.lock.locked(), "Lock object did not release properly.") def test_improper_release(self): @@ -46,7 +46,7 @@ class LockTests(unittest.TestCase): def test_cond_acquire_fail(self): #Test acquiring locked lock returns False self.lock.acquire(0) - self.assertTrue(not self.lock.acquire(0), + self.assertFalse(self.lock.acquire(0), "Conditional acquiring of a locked lock incorrectly " "succeeded.") @@ -58,9 +58,9 @@ class LockTests(unittest.TestCase): def test_uncond_acquire_return_val(self): #Make sure that an unconditional locking returns True. - self.assertTrue(self.lock.acquire(1) is True, + self.assertIs(self.lock.acquire(1), True, "Unconditional locking did not return True.") - self.assertTrue(self.lock.acquire() is True) + self.assertIs(self.lock.acquire(), True) def test_uncond_acquire_blocking(self): #Make sure that unconditional acquiring of a locked lock blocks. @@ -80,7 +80,7 @@ class LockTests(unittest.TestCase): end_time = int(time.time()) if support.verbose: print("done") - self.assertTrue((end_time - start_time) >= DELAY, + self.assertGreaterEqual(end_time - start_time, DELAY, "Blocking by unconditional acquiring failed.") class MiscTests(unittest.TestCase): @@ -94,7 +94,7 @@ class MiscTests(unittest.TestCase): #Test sanity of _thread.get_ident() self.assertIsInstance(_thread.get_ident(), int, "_thread.get_ident() returned a non-integer") - self.assertTrue(_thread.get_ident() != 0, + self.assertNotEqual(_thread.get_ident(), 0, "_thread.get_ident() returned 0") def test_LockType(self): @@ -164,7 +164,7 @@ class ThreadTests(unittest.TestCase): time.sleep(DELAY) if support.verbose: print('done') - self.assertTrue(testing_queue.qsize() == thread_count, + self.assertEqual(testing_queue.qsize(), thread_count, "Not all %s threads executed properly after %s sec." % (thread_count, DELAY)) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 3b11bf6..06af5f2 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -58,7 +58,7 @@ class TestThread(threading.Thread): self.nrunning.inc() if verbose: print(self.nrunning.get(), 'tasks are running') - self.testcase.assertTrue(self.nrunning.get() <= 3) + self.testcase.assertLessEqual(self.nrunning.get(), 3) time.sleep(delay) if verbose: @@ -66,7 +66,7 @@ class TestThread(threading.Thread): with self.mutex: self.nrunning.dec() - self.testcase.assertTrue(self.nrunning.get() >= 0) + self.testcase.assertGreaterEqual(self.nrunning.get(), 0) if verbose: print('%s is finished. %d tasks are running' % (self.name, self.nrunning.get())) @@ -100,26 +100,25 @@ class ThreadTests(BaseTestCase): for i in range(NUMTASKS): t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) threads.append(t) - self.assertEqual(t.ident, None) - self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t))) + self.assertIsNone(t.ident) + self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$') t.start() if verbose: print('waiting for all tasks to complete') for t in threads: t.join() - self.assertTrue(not t.is_alive()) + self.assertFalse(t.is_alive()) self.assertNotEqual(t.ident, 0) - self.assertFalse(t.ident is None) - self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>', - repr(t))) + self.assertIsNotNone(t.ident) + self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$') if verbose: print('all tasks done') self.assertEqual(numrunning.get(), 0) def test_ident_of_no_threading_threads(self): # The ident still must work for the main thread and dummy threads. - self.assertFalse(threading.currentThread().ident is None) + self.assertIsNotNone(threading.currentThread().ident) def f(): ident.append(threading.currentThread().ident) done.set() @@ -127,7 +126,7 @@ class ThreadTests(BaseTestCase): ident = [] _thread.start_new_thread(f, ()) done.wait() - self.assertFalse(ident[0] is None) + self.assertIsNotNone(ident[0]) # Kill the "immortal" _DummyThread del threading._active[ident[0]] @@ -244,7 +243,7 @@ class ThreadTests(BaseTestCase): self.assertTrue(ret) if verbose: print(" verifying worker hasn't exited") - self.assertTrue(not t.finished) + self.assertFalse(t.finished) if verbose: print(" attempting to raise asynch exception in worker") result = set_async_exc(ctypes.c_long(t.id), exception) @@ -415,9 +414,9 @@ class ThreadTests(BaseTestCase): def test_repr_daemon(self): t = threading.Thread() - self.assertFalse('daemon' in repr(t)) + self.assertNotIn('daemon', repr(t)) t.daemon = True - self.assertTrue('daemon' in repr(t)) + self.assertIn('daemon', repr(t)) def test_deamon_param(self): t = threading.Thread() @@ -569,7 +568,7 @@ class ThreadTests(BaseTestCase): tstate_lock.release() self.assertFalse(t.is_alive()) # And verify the thread disposed of _tstate_lock. - self.assertTrue(t._tstate_lock is None) + self.assertIsNone(t._tstate_lock) def test_repr_stopped(self): # Verify that "stopped" shows up in repr(Thread) appropriately. diff --git a/Lib/test/test_threading_local.py b/Lib/test/test_threading_local.py index c7f394c..4092cf3 100644 --- a/Lib/test/test_threading_local.py +++ b/Lib/test/test_threading_local.py @@ -189,7 +189,7 @@ class BaseLocalTest: wr = weakref.ref(x) del x gc.collect() - self.assertIs(wr(), None) + self.assertIsNone(wr()) class ThreadLocalTest(unittest.TestCase, BaseLocalTest): |