summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb/test/test_lock.py
blob: bbd88bdbea51991bc162377442357f6143e737de (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
TestCases for testing the locking sub-system.
"""

import os
from pprint import pprint
import shutil
import sys
import tempfile
import time

try:
    from threading import Thread, currentThread
    have_threads = 1
except ImportError:
    have_threads = 0


import unittest
from bsddb.test.test_all import verbose

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db
except ImportError:
    # For Python 2.3
    from bsddb import db


#----------------------------------------------------------------------

class LockingTestCase(unittest.TestCase):

    def setUp(self):
        self.homeDir = tempfile.mkdtemp('.test_lock')
        self.env = db.DBEnv()
        self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
                                    db.DB_INIT_LOCK | db.DB_CREATE)


    def tearDown(self):
        self.env.close()
        shutil.rmtree(self.homeDir)


    def test01_simple(self):
        if verbose:
            print('\n', '-=' * 30)
            print("Running %s.test01_simple..." % self.__class__.__name__)

        anID = self.env.lock_id()
        if verbose:
            print("locker ID: %s" % anID)
        lock = self.env.lock_get(anID, b"some locked thing", db.DB_LOCK_WRITE)
        if verbose:
            print("Aquired lock: %s" % lock)
        time.sleep(1)
        self.env.lock_put(lock)
        if verbose:
            print("Released lock: %s" % lock)
        if db.version() >= (4,0):
            self.env.lock_id_free(anID)


    def test02_threaded(self):
        if verbose:
            print('\n', '-=' * 30)
            print("Running %s.test02_threaded..." % self.__class__.__name__)

        threads = []
        threads.append(Thread(target = self.theThread,
                              args=(5, db.DB_LOCK_WRITE)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_READ)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_READ)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_WRITE)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_READ)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_READ)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_WRITE)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_WRITE)))
        threads.append(Thread(target = self.theThread,
                              args=(1, db.DB_LOCK_WRITE)))

        for t in threads:
            t.start()
        for t in threads:
            t.join()

    def test03_set_timeout(self):
        # test that the set_timeout call works
        if hasattr(self.env, 'set_timeout'):
            self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
            self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
            self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
            self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)

    def theThread(self, sleepTime, lockType):
        name = currentThread().getName()
        if lockType ==  db.DB_LOCK_WRITE:
            lt = "write"
        else:
            lt = "read"

        anID = self.env.lock_id()
        if verbose:
            print("%s: locker ID: %s" % (name, anID))

        lock = self.env.lock_get(anID, b"some locked thing", lockType)
        if verbose:
            print("%s: Aquired %s lock: %s" % (name, lt, lock))

        time.sleep(sleepTime)

        self.env.lock_put(lock)
        if verbose:
            print("%s: Released %s lock: %s" % (name, lt, lock))
        if db.version() >= (4,0):
            self.env.lock_id_free(anID)


#----------------------------------------------------------------------

def test_suite():
    suite = unittest.TestSuite()

    if have_threads:
        suite.addTest(unittest.makeSuite(LockingTestCase))
    else:
        suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))

    return suite


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')