1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
"""
TestCases for testing the locking sub-system.
"""
import sys
import tempfile
import time
try:
from threading import Thread, currentThread
have_threads = 1
except ImportError:
have_threads = 0
import unittest
from bsddb.test.test_all import verbose
try:
# For Pythons w/distutils pybsddb
from bsddb3 import db
except ImportError:
# For Python 2.3
from bsddb import db
try:
from bsddb3 import test_support
except ImportError:
from test import test_support
#----------------------------------------------------------------------
class LockingTestCase(unittest.TestCase):
def setUp(self):
self.homeDir = tempfile.mkdtemp('.test_lock')
self.env = db.DBEnv()
self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
db.DB_INIT_LOCK | db.DB_CREATE)
def tearDown(self):
self.env.close()
test_support.rmtree(self.homeDir)
def test01_simple(self):
if verbose:
print('\n', '-=' * 30)
print("Running %s.test01_simple..." % self.__class__.__name__)
anID = self.env.lock_id()
if verbose:
print("locker ID: %s" % anID)
lock = self.env.lock_get(anID, b"some locked thing", db.DB_LOCK_WRITE)
if verbose:
print("Aquired lock: %s" % lock)
time.sleep(1)
self.env.lock_put(lock)
if verbose:
print("Released lock: %s" % lock)
if db.version() >= (4,0):
self.env.lock_id_free(anID)
def test02_threaded(self):
if verbose:
print('\n', '-=' * 30)
print("Running %s.test02_threaded..." % self.__class__.__name__)
threads = []
threads.append(Thread(target = self.theThread,
args=(5, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_WRITE)))
for t in threads:
t.start()
for t in threads:
t.join()
def test03_set_timeout(self):
# test that the set_timeout call works
if hasattr(self.env, 'set_timeout'):
self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
def theThread(self, sleepTime, lockType):
name = currentThread().getName()
if lockType == db.DB_LOCK_WRITE:
lt = "write"
else:
lt = "read"
anID = self.env.lock_id()
if verbose:
print("%s: locker ID: %s" % (name, anID))
lock = self.env.lock_get(anID, b"some locked thing", lockType)
if verbose:
print("%s: Aquired %s lock: %s" % (name, lt, lock))
time.sleep(sleepTime)
self.env.lock_put(lock)
if verbose:
print("%s: Released %s lock: %s" % (name, lt, lock))
if db.version() >= (4,0):
self.env.lock_id_free(anID)
#----------------------------------------------------------------------
def test_suite():
suite = unittest.TestSuite()
if have_threads:
suite.addTest(unittest.makeSuite(LockingTestCase))
else:
suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
|