1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
"""TestCases for using the DB.join and DBCursor.join_item methods.
"""
import sys, os, string
import tempfile
import time
from pprint import pprint
try:
from threading import Thread, currentThread
have_threads = 1
except ImportError:
have_threads = 0
import unittest
from test_all import verbose
try:
# For Python 2.3
from bsddb import db, dbshelve
except ImportError:
# For earlier Pythons w/distutils pybsddb
from bsddb3 import db, dbshelve
#----------------------------------------------------------------------
ProductIndex = [
('apple', "Convenience Store"),
('blueberry', "Farmer's Market"),
('shotgun', "S-Mart"), # Aisle 12
('pear', "Farmer's Market"),
('chainsaw', "S-Mart"), # "Shop smart. Shop S-Mart!"
('strawberry', "Farmer's Market"),
]
ColorIndex = [
('blue', "blueberry"),
('red', "apple"),
('red', "chainsaw"),
('red', "strawberry"),
('yellow', "peach"),
('yellow', "pear"),
('black', "shotgun"),
]
class JoinTestCase(unittest.TestCase):
keytype = ''
def setUp(self):
self.filename = self.__class__.__name__ + '.db'
homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
self.homeDir = homeDir
try: os.mkdir(homeDir)
except os.error: pass
self.env = db.DBEnv()
self.env.open(homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | db.DB_INIT_LOCK )
def tearDown(self):
self.env.close()
import glob
files = glob.glob(os.path.join(self.homeDir, '*'))
for file in files:
os.remove(file)
def test01_join(self):
if verbose:
print '\n', '-=' * 30
print "Running %s.test01_join..." % \
self.__class__.__name__
# create and populate primary index
priDB = db.DB(self.env)
priDB.open(self.filename, "primary", db.DB_BTREE, db.DB_CREATE)
map(lambda t, priDB=priDB: apply(priDB.put, t), ProductIndex)
# create and populate secondary index
secDB = db.DB(self.env)
secDB.set_flags(db.DB_DUP | db.DB_DUPSORT)
secDB.open(self.filename, "secondary", db.DB_BTREE, db.DB_CREATE)
map(lambda t, secDB=secDB: apply(secDB.put, t), ColorIndex)
sCursor = None
jCursor = None
try:
# lets look up all of the red Products
sCursor = secDB.cursor()
assert sCursor.set('red')
# FIXME: jCursor doesn't properly hold a reference to its
# cursors, if they are closed before jcursor is used it
# can cause a crash.
jCursor = priDB.join([sCursor])
if jCursor.get(0) != ('apple', "Convenience Store"):
self.fail("join cursor positioned wrong")
if jCursor.join_item() != 'chainsaw':
self.fail("DBCursor.join_item returned wrong item")
if jCursor.get(0)[0] != 'strawberry':
self.fail("join cursor returned wrong thing")
if jCursor.get(0): # there were only three red items to return
self.fail("join cursor returned too many items")
finally:
if jCursor:
jCursor.close()
if sCursor:
sCursor.close()
priDB.close()
secDB.close()
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(JoinTestCase))
return suite
|