diff options
author | Antoine Pitrou <pitrou@free.fr> | 2017-09-07 16:56:24 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2017-09-07 16:56:24 (GMT) |
commit | a6a4dc816d68df04a7d592e0b6af8c7ecc4d4344 (patch) | |
tree | 1c31738009bee903417cea928e705a112aea2392 /Lib/test/test_importlib | |
parent | 1f06a680de465be0c24a78ea3b610053955daa99 (diff) | |
download | cpython-a6a4dc816d68df04a7d592e0b6af8c7ecc4d4344.zip cpython-a6a4dc816d68df04a7d592e0b6af8c7ecc4d4344.tar.gz cpython-a6a4dc816d68df04a7d592e0b6af8c7ecc4d4344.tar.bz2 |
bpo-31370: Remove support for threads-less builds (#3385)
* Remove Setup.config
* Always define WITH_THREAD for compatibility.
Diffstat (limited to 'Lib/test/test_importlib')
-rw-r--r-- | Lib/test/test_importlib/test_locks.py | 222 |
1 files changed, 100 insertions, 122 deletions
diff --git a/Lib/test/test_importlib/test_locks.py b/Lib/test/test_importlib/test_locks.py index dbce9c2..d86172a 100644 --- a/Lib/test/test_importlib/test_locks.py +++ b/Lib/test/test_importlib/test_locks.py @@ -3,134 +3,112 @@ from . import util as test_util init = test_util.import_importlib('importlib') import sys +import threading import unittest import weakref from test import support - -try: - import threading -except ImportError: - threading = None -else: - from test import lock_tests - -if threading is not None: - class ModuleLockAsRLockTests: - locktype = classmethod(lambda cls: cls.LockType("some_lock")) - - # _is_owned() unsupported - test__is_owned = None - # acquire(blocking=False) unsupported - test_try_acquire = None - test_try_acquire_contended = None - # `with` unsupported - test_with = None - # acquire(timeout=...) unsupported - test_timeout = None - # _release_save() unsupported - test_release_save_unacquired = None - # lock status in repr unsupported - test_repr = None - test_locked_repr = None - - LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock - for kind, splitinit in init.items()} - - (Frozen_ModuleLockAsRLockTests, - Source_ModuleLockAsRLockTests - ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests, - LockType=LOCK_TYPES) -else: - LOCK_TYPES = {} - - class Frozen_ModuleLockAsRLockTests(unittest.TestCase): - pass - - class Source_ModuleLockAsRLockTests(unittest.TestCase): - pass - - -if threading is not None: - class DeadlockAvoidanceTests: - - def setUp(self): +from test import lock_tests + + +class ModuleLockAsRLockTests: + locktype = classmethod(lambda cls: cls.LockType("some_lock")) + + # _is_owned() unsupported + test__is_owned = None + # acquire(blocking=False) unsupported + test_try_acquire = None + test_try_acquire_contended = None + # `with` unsupported + test_with = None + # acquire(timeout=...) unsupported + test_timeout = None + # _release_save() unsupported + test_release_save_unacquired = None + # lock status in repr unsupported + test_repr = None + test_locked_repr = None + +LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock + for kind, splitinit in init.items()} + +(Frozen_ModuleLockAsRLockTests, + Source_ModuleLockAsRLockTests + ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests, + LockType=LOCK_TYPES) + + +class DeadlockAvoidanceTests: + + def setUp(self): + try: + self.old_switchinterval = sys.getswitchinterval() + support.setswitchinterval(0.000001) + except AttributeError: + self.old_switchinterval = None + + def tearDown(self): + if self.old_switchinterval is not None: + sys.setswitchinterval(self.old_switchinterval) + + def run_deadlock_avoidance_test(self, create_deadlock): + NLOCKS = 10 + locks = [self.LockType(str(i)) for i in range(NLOCKS)] + pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)] + if create_deadlock: + NTHREADS = NLOCKS + else: + NTHREADS = NLOCKS - 1 + barrier = threading.Barrier(NTHREADS) + results = [] + + def _acquire(lock): + """Try to acquire the lock. Return True on success, + False on deadlock.""" try: - self.old_switchinterval = sys.getswitchinterval() - support.setswitchinterval(0.000001) - except AttributeError: - self.old_switchinterval = None - - def tearDown(self): - if self.old_switchinterval is not None: - sys.setswitchinterval(self.old_switchinterval) - - def run_deadlock_avoidance_test(self, create_deadlock): - NLOCKS = 10 - locks = [self.LockType(str(i)) for i in range(NLOCKS)] - pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)] - if create_deadlock: - NTHREADS = NLOCKS + lock.acquire() + except self.DeadlockError: + return False else: - NTHREADS = NLOCKS - 1 - barrier = threading.Barrier(NTHREADS) - results = [] - - def _acquire(lock): - """Try to acquire the lock. Return True on success, - False on deadlock.""" - try: - lock.acquire() - except self.DeadlockError: - return False - else: - return True - - def f(): - a, b = pairs.pop() - ra = _acquire(a) - barrier.wait() - rb = _acquire(b) - results.append((ra, rb)) - if rb: - b.release() - if ra: - a.release() - lock_tests.Bunch(f, NTHREADS).wait_for_finished() - self.assertEqual(len(results), NTHREADS) - return results - - def test_deadlock(self): - results = self.run_deadlock_avoidance_test(True) - # At least one of the threads detected a potential deadlock on its - # second acquire() call. It may be several of them, because the - # deadlock avoidance mechanism is conservative. - nb_deadlocks = results.count((True, False)) - self.assertGreaterEqual(nb_deadlocks, 1) - self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks) - - def test_no_deadlock(self): - results = self.run_deadlock_avoidance_test(False) - self.assertEqual(results.count((True, False)), 0) - self.assertEqual(results.count((True, True)), len(results)) - - - DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError - for kind, splitinit in init.items()} - - (Frozen_DeadlockAvoidanceTests, - Source_DeadlockAvoidanceTests - ) = test_util.test_both(DeadlockAvoidanceTests, - LockType=LOCK_TYPES, - DeadlockError=DEADLOCK_ERRORS) -else: - DEADLOCK_ERRORS = {} - - class Frozen_DeadlockAvoidanceTests(unittest.TestCase): - pass - - class Source_DeadlockAvoidanceTests(unittest.TestCase): - pass + return True + + def f(): + a, b = pairs.pop() + ra = _acquire(a) + barrier.wait() + rb = _acquire(b) + results.append((ra, rb)) + if rb: + b.release() + if ra: + a.release() + lock_tests.Bunch(f, NTHREADS).wait_for_finished() + self.assertEqual(len(results), NTHREADS) + return results + + def test_deadlock(self): + results = self.run_deadlock_avoidance_test(True) + # At least one of the threads detected a potential deadlock on its + # second acquire() call. It may be several of them, because the + # deadlock avoidance mechanism is conservative. + nb_deadlocks = results.count((True, False)) + self.assertGreaterEqual(nb_deadlocks, 1) + self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks) + + def test_no_deadlock(self): + results = self.run_deadlock_avoidance_test(False) + self.assertEqual(results.count((True, False)), 0) + self.assertEqual(results.count((True, True)), len(results)) + + +DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError + for kind, splitinit in init.items()} + +(Frozen_DeadlockAvoidanceTests, + Source_DeadlockAvoidanceTests + ) = test_util.test_both(DeadlockAvoidanceTests, + LockType=LOCK_TYPES, + DeadlockError=DEADLOCK_ERRORS) class LifetimeTests: |