summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_importlib/test_locks.py
blob: c373b112569406277c5e66d70cfc737f1b04e04d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from importlib import _bootstrap
import sys
import time
import unittest
import weakref

from test import support

try:
    import threading
except ImportError:
    threading = None
else:
    from test import lock_tests


LockType = _bootstrap._ModuleLock
DeadlockError = _bootstrap._DeadlockError


if threading is not None:
    class ModuleLockAsRLockTests(lock_tests.RLockTests):
        locktype = staticmethod(lambda: LockType("some_lock"))

        # _is_owned() unsupported
        test__is_owned = None
        # acquire(blocking=False) unsupported
        test_try_acquire = None
        test_try_acquire_contended = None
        # `with` unsupported
        test_with = None
        # acquire(timeout=...) unsupported
        test_timeout = None
        # _release_save() unsupported
        test_release_save_unacquired = None

else:
    class ModuleLockAsRLockTests(unittest.TestCase):
        pass


@unittest.skipUnless(threading, "threads needed for this test")
class DeadlockAvoidanceTests(unittest.TestCase):

    def setUp(self):
        try:
            self.old_switchinterval = sys.getswitchinterval()
            sys.setswitchinterval(0.000001)
        except AttributeError:
            self.old_switchinterval = None

    def tearDown(self):
        if self.old_switchinterval is not None:
            sys.setswitchinterval(self.old_switchinterval)

    def run_deadlock_avoidance_test(self, create_deadlock):
        NLOCKS = 10
        locks = [LockType(str(i)) for i in range(NLOCKS)]
        pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
        if create_deadlock:
            NTHREADS = NLOCKS
        else:
            NTHREADS = NLOCKS - 1
        barrier = threading.Barrier(NTHREADS)
        results = []
        def _acquire(lock):
            """Try to acquire the lock. Return True on success, False on deadlock."""
            try:
                lock.acquire()
            except DeadlockError:
                return False
            else:
                return True
        def f():
            a, b = pairs.pop()
            ra = _acquire(a)
            barrier.wait()
            rb = _acquire(b)
            results.append((ra, rb))
            if rb:
                b.release()
            if ra:
                a.release()
        lock_tests.Bunch(f, NTHREADS).wait_for_finished()
        self.assertEqual(len(results), NTHREADS)
        return results

    def test_deadlock(self):
        results = self.run_deadlock_avoidance_test(True)
        # At least one of the threads detected a potential deadlock on its
        # second acquire() call.  It may be several of them, because the
        # deadlock avoidance mechanism is conservative.
        nb_deadlocks = results.count((True, False))
        self.assertGreaterEqual(nb_deadlocks, 1)
        self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)

    def test_no_deadlock(self):
        results = self.run_deadlock_avoidance_test(False)
        self.assertEqual(results.count((True, False)), 0)
        self.assertEqual(results.count((True, True)), len(results))


class LifetimeTests(unittest.TestCase):

    def test_lock_lifetime(self):
        name = "xyzzy"
        self.assertNotIn(name, _bootstrap._module_locks)
        lock = _bootstrap._get_module_lock(name)
        self.assertIn(name, _bootstrap._module_locks)
        wr = weakref.ref(lock)
        del lock
        support.gc_collect()
        self.assertNotIn(name, _bootstrap._module_locks)
        self.assertIsNone(wr())

    def test_all_locks(self):
        support.gc_collect()
        self.assertEqual(0, len(_bootstrap._module_locks), _bootstrap._module_locks)


@support.reap_threads
def test_main():
    support.run_unittest(ModuleLockAsRLockTests,
                         DeadlockAvoidanceTests,
                         LifetimeTests)


if __name__ == '__main__':
    test_main()