From d351827b0b7465b14ed5eeac4d37c755d1f93982 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Mon, 14 Mar 2016 10:28:59 +0200 Subject: Issue #20556: Used specific assert methods in threading tests. --- Lib/test/test_dummy_thread.py | 16 ++++++++-------- Lib/test/test_threading.py | 20 ++++++++++---------- Lib/test/test_threading_local.py | 2 +- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py index d9bdd3c..29a8531 100644 --- a/Lib/test/test_dummy_thread.py +++ b/Lib/test/test_dummy_thread.py @@ -24,14 +24,14 @@ class LockTests(unittest.TestCase): def test_initlock(self): #Make sure locks start locked - self.assertTrue(not self.lock.locked(), + self.assertFalse(self.lock.locked(), "Lock object is not initialized unlocked.") def test_release(self): # Test self.lock.release() self.lock.acquire() self.lock.release() - self.assertTrue(not self.lock.locked(), + self.assertFalse(self.lock.locked(), "Lock object did not release properly.") def test_improper_release(self): @@ -46,7 +46,7 @@ class LockTests(unittest.TestCase): def test_cond_acquire_fail(self): #Test acquiring locked lock returns False self.lock.acquire(0) - self.assertTrue(not self.lock.acquire(0), + self.assertFalse(self.lock.acquire(0), "Conditional acquiring of a locked lock incorrectly " "succeeded.") @@ -58,9 +58,9 @@ class LockTests(unittest.TestCase): def test_uncond_acquire_return_val(self): #Make sure that an unconditional locking returns True. - self.assertTrue(self.lock.acquire(1) is True, + self.assertIs(self.lock.acquire(1), True, "Unconditional locking did not return True.") - self.assertTrue(self.lock.acquire() is True) + self.assertIs(self.lock.acquire(), True) def test_uncond_acquire_blocking(self): #Make sure that unconditional acquiring of a locked lock blocks. @@ -80,7 +80,7 @@ class LockTests(unittest.TestCase): end_time = int(time.time()) if test_support.verbose: print "done" - self.assertTrue((end_time - start_time) >= DELAY, + self.assertGreaterEqual(end_time - start_time, DELAY, "Blocking by unconditional acquiring failed.") class MiscTests(unittest.TestCase): @@ -94,7 +94,7 @@ class MiscTests(unittest.TestCase): #Test sanity of _thread.get_ident() self.assertIsInstance(_thread.get_ident(), int, "_thread.get_ident() returned a non-integer") - self.assertTrue(_thread.get_ident() != 0, + self.assertNotEqual(_thread.get_ident(), 0, "_thread.get_ident() returned 0") def test_LockType(self): @@ -164,7 +164,7 @@ class ThreadTests(unittest.TestCase): time.sleep(DELAY) if test_support.verbose: print 'done' - self.assertTrue(testing_queue.qsize() == thread_count, + self.assertEqual(testing_queue.qsize(), thread_count, "Not all %s threads executed properly after %s sec." % (thread_count, DELAY)) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 44f2c57..212bd1a 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -51,7 +51,7 @@ class TestThread(threading.Thread): self.nrunning.inc() if verbose: print self.nrunning.get(), 'tasks are running' - self.testcase.assertTrue(self.nrunning.get() <= 3) + self.testcase.assertLessEqual(self.nrunning.get(), 3) time.sleep(delay) if verbose: @@ -59,7 +59,7 @@ class TestThread(threading.Thread): with self.mutex: self.nrunning.dec() - self.testcase.assertTrue(self.nrunning.get() >= 0) + self.testcase.assertGreaterEqual(self.nrunning.get(), 0) if verbose: print '%s is finished. %d tasks are running' % ( self.name, self.nrunning.get()) @@ -92,25 +92,25 @@ class ThreadTests(BaseTestCase): for i in range(NUMTASKS): t = TestThread(""%i, self, sema, mutex, numrunning) threads.append(t) - self.assertEqual(t.ident, None) - self.assertTrue(re.match('', repr(t))) + self.assertIsNone(t.ident) + self.assertRegexpMatches(repr(t), r'^$') t.start() if verbose: print 'waiting for all tasks to complete' for t in threads: t.join(NUMTASKS) - self.assertTrue(not t.is_alive()) + self.assertFalse(t.is_alive()) self.assertNotEqual(t.ident, 0) - self.assertFalse(t.ident is None) - self.assertTrue(re.match('', repr(t))) + self.assertIsNotNone(t.ident) + self.assertRegexpMatches(repr(t), r'^$') if verbose: print 'all tasks done' self.assertEqual(numrunning.get(), 0) def test_ident_of_no_threading_threads(self): # The ident still must work for the main thread and dummy threads. - self.assertFalse(threading.currentThread().ident is None) + self.assertIsNotNone(threading.currentThread().ident) def f(): ident.append(threading.currentThread().ident) done.set() @@ -118,7 +118,7 @@ class ThreadTests(BaseTestCase): ident = [] thread.start_new_thread(f, ()) done.wait() - self.assertFalse(ident[0] is None) + self.assertIsNotNone(ident[0]) # Kill the "immortal" _DummyThread del threading._active[ident[0]] @@ -236,7 +236,7 @@ class ThreadTests(BaseTestCase): self.assertTrue(ret) if verbose: print " verifying worker hasn't exited" - self.assertTrue(not t.finished) + self.assertFalse(t.finished) if verbose: print " attempting to raise asynch exception in worker" result = set_async_exc(ctypes.c_long(t.id), exception) diff --git a/Lib/test/test_threading_local.py b/Lib/test/test_threading_local.py index b161315..e53400c 100644 --- a/Lib/test/test_threading_local.py +++ b/Lib/test/test_threading_local.py @@ -196,7 +196,7 @@ class ThreadLocalTest(unittest.TestCase, BaseLocalTest): wr = weakref.ref(x) del x gc.collect() - self.assertIs(wr(), None) + self.assertIsNone(wr()) class PyThreadingLocalTest(unittest.TestCase, BaseLocalTest): _local = _threading_local.local -- cgit v0.12