1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
import unittest
import os
from test_all import db, test_support, get_new_environment_path, get_new_database_path
class DBSequenceTest(unittest.TestCase):
import sys
if sys.version_info < (2, 4) :
def assertTrue(self, expr, msg=None):
self.failUnless(expr,msg=msg)
def setUp(self):
self.int_32_max = 0x100000000
self.homeDir = get_new_environment_path()
self.filename = "test"
self.dbenv = db.DBEnv()
self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666)
self.d = db.DB(self.dbenv)
self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666)
def tearDown(self):
if hasattr(self, 'seq'):
self.seq.close()
del self.seq
if hasattr(self, 'd'):
self.d.close()
del self.d
if hasattr(self, 'dbenv'):
self.dbenv.close()
del self.dbenv
test_support.rmtree(self.homeDir)
def test_get(self):
self.seq = db.DBSequence(self.d, flags=0)
start_value = 10 * self.int_32_max
self.assertEqual(0xA00000000, start_value)
self.assertEquals(None, self.seq.initial_value(start_value))
self.assertEquals(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE))
self.assertEquals(start_value, self.seq.get(5))
self.assertEquals(start_value + 5, self.seq.get())
def test_remove(self):
self.seq = db.DBSequence(self.d, flags=0)
self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
self.assertEquals(None, self.seq.remove(txn=None, flags=0))
del self.seq
def test_get_key(self):
self.seq = db.DBSequence(self.d, flags=0)
key = 'foo'
self.assertEquals(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE))
self.assertEquals(key, self.seq.get_key())
def test_get_dbp(self):
self.seq = db.DBSequence(self.d, flags=0)
self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
self.assertEquals(self.d, self.seq.get_dbp())
def test_cachesize(self):
self.seq = db.DBSequence(self.d, flags=0)
cashe_size = 10
self.assertEquals(None, self.seq.set_cachesize(cashe_size))
self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
self.assertEquals(cashe_size, self.seq.get_cachesize())
def test_flags(self):
self.seq = db.DBSequence(self.d, flags=0)
flag = db.DB_SEQ_WRAP;
self.assertEquals(None, self.seq.set_flags(flag))
self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
self.assertEquals(flag, self.seq.get_flags() & flag)
def test_range(self):
self.seq = db.DBSequence(self.d, flags=0)
seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1)
self.assertEquals(None, self.seq.set_range(seq_range))
self.seq.initial_value(seq_range[0])
self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
self.assertEquals(seq_range, self.seq.get_range())
def test_stat(self):
self.seq = db.DBSequence(self.d, flags=0)
self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
stat = self.seq.stat()
for param in ('nowait', 'min', 'max', 'value', 'current',
'flags', 'cache_size', 'last_value', 'wait'):
self.assertTrue(param in stat, "parameter %s isn't in stat info" % param)
if db.version() >= (4,7) :
# This code checks a crash solved in Berkeley DB 4.7
def test_stat_crash(self) :
d=db.DB()
d.open(None,dbtype=db.DB_HASH,flags=db.DB_CREATE) # In RAM
seq = db.DBSequence(d, flags=0)
self.assertRaises(db.DBNotFoundError, seq.open,
key='id', txn=None, flags=0)
self.assertRaises(db.DBInvalidArgError, seq.stat)
d.close()
def test_64bits(self) :
# We don't use both extremes because they are problematic
value_plus=(1L<<63)-2
self.assertEquals(9223372036854775806L,value_plus)
value_minus=(-1L<<63)+1 # Two complement
self.assertEquals(-9223372036854775807L,value_minus)
self.seq = db.DBSequence(self.d, flags=0)
self.assertEquals(None, self.seq.initial_value(value_plus-1))
self.assertEquals(None, self.seq.open(key='id', txn=None,
flags=db.DB_CREATE))
self.assertEquals(value_plus-1, self.seq.get(1))
self.assertEquals(value_plus, self.seq.get(1))
self.seq.remove(txn=None, flags=0)
self.seq = db.DBSequence(self.d, flags=0)
self.assertEquals(None, self.seq.initial_value(value_minus))
self.assertEquals(None, self.seq.open(key='id', txn=None,
flags=db.DB_CREATE))
self.assertEquals(value_minus, self.seq.get(1))
self.assertEquals(value_minus+1, self.seq.get(1))
def test_multiple_close(self):
self.seq = db.DBSequence(self.d)
self.seq.close() # You can close a Sequence multiple times
self.seq.close()
self.seq.close()
def test_suite():
suite = unittest.TestSuite()
if db.version() >= (4,3):
suite.addTest(unittest.makeSuite(DBSequenceTest))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
|