1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
"""Miscellaneous bsddb module test cases
"""
import os, sys
import unittest
from test_all import db, dbshelve, hashopen, test_support, get_new_environment_path, get_new_database_path
#----------------------------------------------------------------------
class MiscTestCase(unittest.TestCase):
if sys.version_info < (2, 4) :
def assertTrue(self, expr, msg=None):
self.failUnless(expr, msg=msg)
def assertFalse(self, expr, msg=None):
self.failIf(expr, msg=msg)
def setUp(self):
self.filename = get_new_database_path()
self.homeDir = get_new_environment_path()
def tearDown(self):
test_support.unlink(self.filename)
test_support.rmtree(self.homeDir)
def test01_badpointer(self):
dbs = dbshelve.open(self.filename)
dbs.close()
self.assertRaises(db.DBError, dbs.get, "foo")
def test02_db_home(self):
env = db.DBEnv()
# check for crash fixed when db_home is used before open()
self.assertTrue(env.db_home is None)
env.open(self.homeDir, db.DB_CREATE)
if sys.version_info[0] < 3 :
self.assertEqual(self.homeDir, env.db_home)
else :
self.assertEqual(bytes(self.homeDir, "ascii"), env.db_home)
def test03_repr_closed_db(self):
db = hashopen(self.filename)
db.close()
rp = repr(db)
self.assertEqual(rp, "{}")
def test04_repr_db(self) :
db = hashopen(self.filename)
d = {}
for i in xrange(100) :
db[repr(i)] = repr(100*i)
d[repr(i)] = repr(100*i)
db.close()
db = hashopen(self.filename)
rp = repr(db)
self.assertEqual(rp, repr(d))
db.close()
# http://sourceforge.net/tracker/index.php?func=detail&aid=1708868&group_id=13900&atid=313900
#
# See the bug report for details.
#
# The problem was that make_key_dbt() was not allocating a copy of
# string keys but FREE_DBT() was always being told to free it when the
# database was opened with DB_THREAD.
def test05_double_free_make_key_dbt(self):
try:
db1 = db.DB()
db1.open(self.filename, None, db.DB_BTREE,
db.DB_CREATE | db.DB_THREAD)
curs = db1.cursor()
t = curs.get("/foo", db.DB_SET)
# double free happened during exit from DBC_get
finally:
db1.close()
test_support.unlink(self.filename)
def test06_key_with_null_bytes(self):
try:
db1 = db.DB()
db1.open(self.filename, None, db.DB_HASH, db.DB_CREATE)
db1['a'] = 'eh?'
db1['a\x00'] = 'eh zed.'
db1['a\x00a'] = 'eh zed eh?'
db1['aaa'] = 'eh eh eh!'
keys = db1.keys()
keys.sort()
self.assertEqual(['a', 'a\x00', 'a\x00a', 'aaa'], keys)
self.assertEqual(db1['a'], 'eh?')
self.assertEqual(db1['a\x00'], 'eh zed.')
self.assertEqual(db1['a\x00a'], 'eh zed eh?')
self.assertEqual(db1['aaa'], 'eh eh eh!')
finally:
db1.close()
test_support.unlink(self.filename)
def test07_DB_set_flags_persists(self):
if db.version() < (4,2):
# The get_flags API required for this to work is only available
# in Berkeley DB >= 4.2
return
try:
db1 = db.DB()
db1.set_flags(db.DB_DUPSORT)
db1.open(self.filename, db.DB_HASH, db.DB_CREATE)
db1['a'] = 'eh'
db1['a'] = 'A'
self.assertEqual([('a', 'A')], db1.items())
db1.put('a', 'Aa')
self.assertEqual([('a', 'A'), ('a', 'Aa')], db1.items())
db1.close()
db1 = db.DB()
# no set_flags call, we're testing that it reads and obeys
# the flags on open.
db1.open(self.filename, db.DB_HASH)
self.assertEqual([('a', 'A'), ('a', 'Aa')], db1.items())
# if it read the flags right this will replace all values
# for key 'a' instead of adding a new one. (as a dict should)
db1['a'] = 'new A'
self.assertEqual([('a', 'new A')], db1.items())
finally:
db1.close()
test_support.unlink(self.filename)
def test08_ExceptionTypes(self) :
self.assertTrue(issubclass(db.DBError, Exception))
for i, j in db.__dict__.items() :
if i.startswith("DB") and i.endswith("Error") :
self.assertTrue(issubclass(j, db.DBError), msg=i)
if i not in ("DBKeyEmptyError", "DBNotFoundError") :
self.assertFalse(issubclass(j, KeyError), msg=i)
# This two exceptions have two bases
self.assertTrue(issubclass(db.DBKeyEmptyError, KeyError))
self.assertTrue(issubclass(db.DBNotFoundError, KeyError))
#----------------------------------------------------------------------
def test_suite():
return unittest.makeSuite(MiscTestCase)
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
|