1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
"""
TestCases for testing the locking sub-system.
"""
import shutil
import sys, os
import tempfile
import time
from pprint import pprint
try:
from threading import Thread, currentThread
have_threads = 1
except ImportError:
have_threads = 0
import unittest
from bsddb.test.test_all import verbose
try:
# For Pythons w/distutils pybsddb
from bsddb3 import db
except ImportError:
# For Python 2.3
from bsddb import db
#----------------------------------------------------------------------
class LockingTestCase(unittest.TestCase):
def setUp(self):
self.homeDir = tempfile.mkdtemp()
self.env = db.DBEnv()
self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
db.DB_INIT_LOCK | db.DB_CREATE)
def tearDown(self):
self.env.close()
shutil.rmtree(self.homeDir)
def test01_simple(self):
if verbose:
print('\n', '-=' * 30)
print("Running %s.test01_simple..." % self.__class__.__name__)
anID = self.env.lock_id()
if verbose:
print("locker ID: %s" % anID)
lock = self.env.lock_get(anID, b"some locked thing", db.DB_LOCK_WRITE)
if verbose:
print("Aquired lock: %s" % lock)
time.sleep(1)
self.env.lock_put(lock)
if verbose:
print("Released lock: %s" % lock)
def test02_threaded(self):
if verbose:
print('\n', '-=' * 30)
print("Running %s.test02_threaded..." % self.__class__.__name__)
threads = []
threads.append(Thread(target = self.theThread,
args=(5, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_READ)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_WRITE)))
threads.append(Thread(target = self.theThread,
args=(1, db.DB_LOCK_WRITE)))
for t in threads:
t.start()
for t in threads:
t.join()
def test03_set_timeout(self):
# test that the set_timeout call works
if hasattr(self.env, 'set_timeout'):
self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
def theThread(self, sleepTime, lockType):
name = currentThread().getName()
if lockType == db.DB_LOCK_WRITE:
lt = "write"
else:
lt = "read"
anID = self.env.lock_id()
if verbose:
print("%s: locker ID: %s" % (name, anID))
lock = self.env.lock_get(anID, b"some locked thing", lockType)
if verbose:
print("%s: Aquired %s lock: %s" % (name, lt, lock))
time.sleep(sleepTime)
self.env.lock_put(lock)
if verbose:
print("%s: Released %s lock: %s" % (name, lt, lock))
#----------------------------------------------------------------------
def test_suite():
suite = unittest.TestSuite()
if have_threads:
suite.addTest(unittest.makeSuite(LockingTestCase))
else:
suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
|