summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb/test/test_join.py
blob: 838ffe75900978fa145c675197a158bfb2b9e4aa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""TestCases for using the DB.join and DBCursor.join_item methods.
"""

import sys, os, string
import tempfile
import time
from pprint import pprint

try:
    from threading import Thread, currentThread
    have_threads = 1
except ImportError:
    have_threads = 0

import unittest
from test_all import verbose

try:
    # For Python 2.3
    from bsddb import db, dbshelve
except ImportError:
    # For earlier Pythons w/distutils pybsddb
    from bsddb3 import db, dbshelve


#----------------------------------------------------------------------

ProductIndex = [
    ('apple', "Convenience Store"),
    ('blueberry', "Farmer's Market"),
    ('shotgun', "S-Mart"),              # Aisle 12
    ('pear', "Farmer's Market"),
    ('chainsaw', "S-Mart"),             # "Shop smart.  Shop S-Mart!"
    ('strawberry', "Farmer's Market"),
]

ColorIndex = [
    ('blue', "blueberry"),
    ('red', "apple"),
    ('red', "chainsaw"),
    ('red', "strawberry"),
    ('yellow', "peach"),
    ('yellow', "pear"),
    ('black', "shotgun"),
]

class JoinTestCase(unittest.TestCase):
    keytype = ''

    def setUp(self):
        self.filename = self.__class__.__name__ + '.db'
        homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
        self.homeDir = homeDir
        try: os.mkdir(homeDir)
        except os.error: pass
        self.env = db.DBEnv()
        self.env.open(homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | db.DB_INIT_LOCK )

    def tearDown(self):
        self.env.close()
        import glob
        files = glob.glob(os.path.join(self.homeDir, '*'))
        for file in files:
            os.remove(file)

    def test01_join(self):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_join..." % \
                  self.__class__.__name__

        # create and populate primary index
        priDB = db.DB(self.env)
        priDB.open(self.filename, "primary", db.DB_BTREE, db.DB_CREATE)
        map(lambda t, priDB=priDB: apply(priDB.put, t), ProductIndex)

        # create and populate secondary index
        secDB = db.DB(self.env)
        secDB.set_flags(db.DB_DUP | db.DB_DUPSORT)
        secDB.open(self.filename, "secondary", db.DB_BTREE, db.DB_CREATE)
        map(lambda t, secDB=secDB: apply(secDB.put, t), ColorIndex)

        sCursor = None
        jCursor = None
        try:
            # lets look up all of the red Products
            sCursor = secDB.cursor()
            # Don't do the .set() in an assert, or you can get a bogus failure
            # when running python -O
            tmp = sCursor.set('red')
            assert tmp

            # FIXME: jCursor doesn't properly hold a reference to its
            # cursors, if they are closed before jcursor is used it
            # can cause a crash.
            jCursor = priDB.join([sCursor])

            if jCursor.get(0) != ('apple', "Convenience Store"):
                self.fail("join cursor positioned wrong")
            if jCursor.join_item() != 'chainsaw':
                self.fail("DBCursor.join_item returned wrong item")
            if jCursor.get(0)[0] != 'strawberry':
                self.fail("join cursor returned wrong thing")
            if jCursor.get(0):  # there were only three red items to return
                self.fail("join cursor returned too many items")
        finally:
            if jCursor:
                jCursor.close()
            if sCursor:
                sCursor.close()
            priDB.close()
            secDB.close()


def test_suite():
    suite = unittest.TestSuite()

    suite.addTest(unittest.makeSuite(JoinTestCase))

    return suite