summaryrefslogtreecommitdiffstats
path: root/Lib/importlib/test/test_locks.py
blob: 7faff490ae5870f50127f30616c6e857dffee75c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from importlib import _bootstrap
import time
import unittest
import weakref

from test import support

try:
    import threading
except ImportError:
    threading = None
else:
    from test import lock_tests


LockType = _bootstrap._ModuleLock
DeadlockError = _bootstrap._DeadlockError


if threading is not None:
    class ModuleLockAsRLockTests(lock_tests.RLockTests):
        locktype = staticmethod(lambda: LockType("some_lock"))

        # _is_owned() unsupported
        test__is_owned = None
        # acquire(blocking=False) unsupported
        test_try_acquire = None
        test_try_acquire_contended = None
        # `with` unsupported
        test_with = None
        # acquire(timeout=...) unsupported
        test_timeout = None
        # _release_save() unsupported
        test_release_save_unacquired = None

else:
    class ModuleLockAsRLockTests(unittest.TestCase):
        pass


@unittest.skipUnless(threading, "threads needed for this test")
class DeadlockAvoidanceTests(unittest.TestCase):

    def run_deadlock_avoidance_test(self, create_deadlock):
        NLOCKS = 10
        locks = [LockType(str(i)) for i in range(NLOCKS)]
        pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
        if create_deadlock:
            NTHREADS = NLOCKS
        else:
            NTHREADS = NLOCKS - 1
        barrier = threading.Barrier(NTHREADS)
        results = []
        def _acquire(lock):
            """Try to acquire the lock. Return True on success, False on deadlock."""
            try:
                lock.acquire()
            except DeadlockError:
                return False
            else:
                return True
        def f():
            a, b = pairs.pop()
            ra = _acquire(a)
            barrier.wait()
            rb = _acquire(b)
            results.append((ra, rb))
            if rb:
                b.release()
            if ra:
                a.release()
        lock_tests.Bunch(f, NTHREADS).wait_for_finished()
        self.assertEqual(len(results), NTHREADS)
        return results

    def test_deadlock(self):
        results = self.run_deadlock_avoidance_test(True)
        # One of the threads detected a potential deadlock on its second
        # acquire() call.
        self.assertEqual(results.count((True, False)), 1)
        self.assertEqual(results.count((True, True)), len(results) - 1)

    def test_no_deadlock(self):
        results = self.run_deadlock_avoidance_test(False)
        self.assertEqual(results.count((True, False)), 0)
        self.assertEqual(results.count((True, True)), len(results))


class LifetimeTests(unittest.TestCase):

    def test_lock_lifetime(self):
        name = "xyzzy"
        self.assertNotIn(name, _bootstrap._module_locks)
        lock = _bootstrap._get_module_lock(name)
        self.assertIn(name, _bootstrap._module_locks)
        wr = weakref.ref(lock)
        del lock
        support.gc_collect()
        self.assertNotIn(name, _bootstrap._module_locks)
        self.assertIs(wr(), None)

    def test_all_locks(self):
        support.gc_collect()
        self.assertEqual(0, len(_bootstrap._module_locks), _bootstrap._module_locks)


@support.reap_threads
def test_main():
    support.run_unittest(ModuleLockAsRLockTests,
                         DeadlockAvoidanceTests,
                         LifetimeTests)


if __name__ == '__main__':
    test_main()