diff options
Diffstat (limited to 'Lib/test/test_importlib/test_locks.py')
-rw-r--r-- | Lib/test/test_importlib/test_locks.py | 178 |
1 files changed, 99 insertions, 79 deletions
diff --git a/Lib/test/test_importlib/test_locks.py b/Lib/test/test_importlib/test_locks.py index dc97ba1..df0af12 100644 --- a/Lib/test/test_importlib/test_locks.py +++ b/Lib/test/test_importlib/test_locks.py @@ -1,7 +1,6 @@ -from . import util -frozen_init, source_init = util.import_importlib('importlib') -frozen_bootstrap = frozen_init._bootstrap -source_bootstrap = source_init._bootstrap +from . import util as test_util + +init = test_util.import_importlib('importlib') import sys import time @@ -32,14 +31,20 @@ if threading is not None: test_timeout = None # _release_save() unsupported test_release_save_unacquired = None + # lock status in repr unsupported + test_repr = None + test_locked_repr = None - class Frozen_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests): - LockType = frozen_bootstrap._ModuleLock - - class Source_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests): - LockType = source_bootstrap._ModuleLock + LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock + for kind, splitinit in init.items()} + (Frozen_ModuleLockAsRLockTests, + Source_ModuleLockAsRLockTests + ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests, + LockType=LOCK_TYPES) else: + LOCK_TYPES = {} + class Frozen_ModuleLockAsRLockTests(unittest.TestCase): pass @@ -47,78 +52,94 @@ else: pass -class DeadlockAvoidanceTests: - - def setUp(self): - try: - self.old_switchinterval = sys.getswitchinterval() - sys.setswitchinterval(0.000001) - except AttributeError: - self.old_switchinterval = None - - def tearDown(self): - if self.old_switchinterval is not None: - sys.setswitchinterval(self.old_switchinterval) - - def run_deadlock_avoidance_test(self, create_deadlock): - NLOCKS = 10 - locks = [self.LockType(str(i)) for i in range(NLOCKS)] - pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)] - if create_deadlock: - NTHREADS = NLOCKS - else: - NTHREADS = NLOCKS - 1 - barrier = threading.Barrier(NTHREADS) - results = [] - def _acquire(lock): - """Try to acquire the lock. Return True on success, False on deadlock.""" +if threading is not None: + class DeadlockAvoidanceTests: + + def setUp(self): try: - lock.acquire() - except self.DeadlockError: - return False + self.old_switchinterval = sys.getswitchinterval() + sys.setswitchinterval(0.000001) + except AttributeError: + self.old_switchinterval = None + + def tearDown(self): + if self.old_switchinterval is not None: + sys.setswitchinterval(self.old_switchinterval) + + def run_deadlock_avoidance_test(self, create_deadlock): + NLOCKS = 10 + locks = [self.LockType(str(i)) for i in range(NLOCKS)] + pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)] + if create_deadlock: + NTHREADS = NLOCKS else: - return True - def f(): - a, b = pairs.pop() - ra = _acquire(a) - barrier.wait() - rb = _acquire(b) - results.append((ra, rb)) - if rb: - b.release() - if ra: - a.release() - lock_tests.Bunch(f, NTHREADS).wait_for_finished() - self.assertEqual(len(results), NTHREADS) - return results - - def test_deadlock(self): - results = self.run_deadlock_avoidance_test(True) - # At least one of the threads detected a potential deadlock on its - # second acquire() call. It may be several of them, because the - # deadlock avoidance mechanism is conservative. - nb_deadlocks = results.count((True, False)) - self.assertGreaterEqual(nb_deadlocks, 1) - self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks) - - def test_no_deadlock(self): - results = self.run_deadlock_avoidance_test(False) - self.assertEqual(results.count((True, False)), 0) - self.assertEqual(results.count((True, True)), len(results)) - -@unittest.skipUnless(threading, "threads needed for this test") -class Frozen_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase): - LockType = frozen_bootstrap._ModuleLock - DeadlockError = frozen_bootstrap._DeadlockError - -@unittest.skipUnless(threading, "threads needed for this test") -class Source_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase): - LockType = source_bootstrap._ModuleLock - DeadlockError = source_bootstrap._DeadlockError + NTHREADS = NLOCKS - 1 + barrier = threading.Barrier(NTHREADS) + results = [] + + def _acquire(lock): + """Try to acquire the lock. Return True on success, + False on deadlock.""" + try: + lock.acquire() + except self.DeadlockError: + return False + else: + return True + + def f(): + a, b = pairs.pop() + ra = _acquire(a) + barrier.wait() + rb = _acquire(b) + results.append((ra, rb)) + if rb: + b.release() + if ra: + a.release() + lock_tests.Bunch(f, NTHREADS).wait_for_finished() + self.assertEqual(len(results), NTHREADS) + return results + + def test_deadlock(self): + results = self.run_deadlock_avoidance_test(True) + # At least one of the threads detected a potential deadlock on its + # second acquire() call. It may be several of them, because the + # deadlock avoidance mechanism is conservative. + nb_deadlocks = results.count((True, False)) + self.assertGreaterEqual(nb_deadlocks, 1) + self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks) + + def test_no_deadlock(self): + results = self.run_deadlock_avoidance_test(False) + self.assertEqual(results.count((True, False)), 0) + self.assertEqual(results.count((True, True)), len(results)) + + + DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError + for kind, splitinit in init.items()} + + (Frozen_DeadlockAvoidanceTests, + Source_DeadlockAvoidanceTests + ) = test_util.test_both(DeadlockAvoidanceTests, + LockType=LOCK_TYPES, + DeadlockError=DEADLOCK_ERRORS) +else: + DEADLOCK_ERRORS = {} + + class Frozen_DeadlockAvoidanceTests(unittest.TestCase): + pass + + class Source_DeadlockAvoidanceTests(unittest.TestCase): + pass class LifetimeTests: + @property + def bootstrap(self): + return self.init._bootstrap + def test_lock_lifetime(self): name = "xyzzy" self.assertNotIn(name, self.bootstrap._module_locks) @@ -135,11 +156,10 @@ class LifetimeTests: self.assertEqual(0, len(self.bootstrap._module_locks), self.bootstrap._module_locks) -class Frozen_LifetimeTests(LifetimeTests, unittest.TestCase): - bootstrap = frozen_bootstrap -class Source_LifetimeTests(LifetimeTests, unittest.TestCase): - bootstrap = source_bootstrap +(Frozen_LifetimeTests, + Source_LifetimeTests + ) = test_util.test_both(LifetimeTests, init=init) @support.reap_threads |