diff options
| author | Berker Peksag <berker.peksag@gmail.com> | 2014-07-03 03:25:10 (GMT) | 
|---|---|---|
| committer | Berker Peksag <berker.peksag@gmail.com> | 2014-07-03 03:25:10 (GMT) | 
| commit | f7eaa0c63ccac33cf092e2a3a7dbfe691eb579cb (patch) | |
| tree | 8a06989359222561d7bec6473257eece20f7c8fc /Lib/test/test_importlib/test_locks.py | |
| parent | 748ff8bfd137a834056eb0cd3bfb1d5105c46522 (diff) | |
| download | cpython-f7eaa0c63ccac33cf092e2a3a7dbfe691eb579cb.zip cpython-f7eaa0c63ccac33cf092e2a3a7dbfe691eb579cb.tar.gz cpython-f7eaa0c63ccac33cf092e2a3a7dbfe691eb579cb.tar.bz2  | |
Issue #21755: Skip {Frozen,Source}_DeadlockAvoidanceTests tests when
Python is built without threads.
Diffstat (limited to 'Lib/test/test_importlib/test_locks.py')
| -rw-r--r-- | Lib/test/test_importlib/test_locks.py | 144 | 
1 files changed, 78 insertions, 66 deletions
diff --git a/Lib/test/test_importlib/test_locks.py b/Lib/test/test_importlib/test_locks.py index 4c01177..df0af12 100644 --- a/Lib/test/test_importlib/test_locks.py +++ b/Lib/test/test_importlib/test_locks.py @@ -52,74 +52,86 @@ else:          pass -@unittest.skipUnless(threading, "threads needed for this test") -class DeadlockAvoidanceTests: - -    def setUp(self): -        try: -            self.old_switchinterval = sys.getswitchinterval() -            sys.setswitchinterval(0.000001) -        except AttributeError: -            self.old_switchinterval = None - -    def tearDown(self): -        if self.old_switchinterval is not None: -            sys.setswitchinterval(self.old_switchinterval) - -    def run_deadlock_avoidance_test(self, create_deadlock): -        NLOCKS = 10 -        locks = [self.LockType(str(i)) for i in range(NLOCKS)] -        pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)] -        if create_deadlock: -            NTHREADS = NLOCKS -        else: -            NTHREADS = NLOCKS - 1 -        barrier = threading.Barrier(NTHREADS) -        results = [] -        def _acquire(lock): -            """Try to acquire the lock. Return True on success, False on deadlock.""" +if threading is not None: +    class DeadlockAvoidanceTests: + +        def setUp(self):              try: -                lock.acquire() -            except self.DeadlockError: -                return False +                self.old_switchinterval = sys.getswitchinterval() +                sys.setswitchinterval(0.000001) +            except AttributeError: +                self.old_switchinterval = None + +        def tearDown(self): +            if self.old_switchinterval is not None: +                sys.setswitchinterval(self.old_switchinterval) + +        def run_deadlock_avoidance_test(self, create_deadlock): +            NLOCKS = 10 +            locks = [self.LockType(str(i)) for i in range(NLOCKS)] +            pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)] +            if create_deadlock: +                NTHREADS = NLOCKS              else: -                return True -        def f(): -            a, b = pairs.pop() -            ra = _acquire(a) -            barrier.wait() -            rb = _acquire(b) -            results.append((ra, rb)) -            if rb: -                b.release() -            if ra: -                a.release() -        lock_tests.Bunch(f, NTHREADS).wait_for_finished() -        self.assertEqual(len(results), NTHREADS) -        return results - -    def test_deadlock(self): -        results = self.run_deadlock_avoidance_test(True) -        # At least one of the threads detected a potential deadlock on its -        # second acquire() call.  It may be several of them, because the -        # deadlock avoidance mechanism is conservative. -        nb_deadlocks = results.count((True, False)) -        self.assertGreaterEqual(nb_deadlocks, 1) -        self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks) - -    def test_no_deadlock(self): -        results = self.run_deadlock_avoidance_test(False) -        self.assertEqual(results.count((True, False)), 0) -        self.assertEqual(results.count((True, True)), len(results)) - - -DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError -                   for kind, splitinit in init.items()} - -(Frozen_DeadlockAvoidanceTests, - Source_DeadlockAvoidanceTests - ) = test_util.test_both(DeadlockAvoidanceTests, -                         LockType=LOCK_TYPES, DeadlockError=DEADLOCK_ERRORS) +                NTHREADS = NLOCKS - 1 +            barrier = threading.Barrier(NTHREADS) +            results = [] + +            def _acquire(lock): +                """Try to acquire the lock. Return True on success, +                False on deadlock.""" +                try: +                    lock.acquire() +                except self.DeadlockError: +                    return False +                else: +                    return True + +            def f(): +                a, b = pairs.pop() +                ra = _acquire(a) +                barrier.wait() +                rb = _acquire(b) +                results.append((ra, rb)) +                if rb: +                    b.release() +                if ra: +                    a.release() +            lock_tests.Bunch(f, NTHREADS).wait_for_finished() +            self.assertEqual(len(results), NTHREADS) +            return results + +        def test_deadlock(self): +            results = self.run_deadlock_avoidance_test(True) +            # At least one of the threads detected a potential deadlock on its +            # second acquire() call.  It may be several of them, because the +            # deadlock avoidance mechanism is conservative. +            nb_deadlocks = results.count((True, False)) +            self.assertGreaterEqual(nb_deadlocks, 1) +            self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks) + +        def test_no_deadlock(self): +            results = self.run_deadlock_avoidance_test(False) +            self.assertEqual(results.count((True, False)), 0) +            self.assertEqual(results.count((True, True)), len(results)) + + +    DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError +                       for kind, splitinit in init.items()} + +    (Frozen_DeadlockAvoidanceTests, +     Source_DeadlockAvoidanceTests +     ) = test_util.test_both(DeadlockAvoidanceTests, +                             LockType=LOCK_TYPES, +                             DeadlockError=DEADLOCK_ERRORS) +else: +    DEADLOCK_ERRORS = {} + +    class Frozen_DeadlockAvoidanceTests(unittest.TestCase): +        pass + +    class Source_DeadlockAvoidanceTests(unittest.TestCase): +        pass  class LifetimeTests:  | 
