summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb/test/test_sequence.py
blob: cad598b9a793bf740662dc534da60ceb74be0019 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import unittest
import os

from test_all import db, test_support, get_new_environment_path, get_new_database_path


class DBSequenceTest(unittest.TestCase):
    import sys
    if sys.version_info < (2, 4) :
        def assertTrue(self, expr, msg=None):
            self.failUnless(expr,msg=msg)

    def setUp(self):
        self.int_32_max = 0x100000000
        self.homeDir = get_new_environment_path()
        self.filename = "test"

        self.dbenv = db.DBEnv()
        self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666)
        self.d = db.DB(self.dbenv)
        self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666)

    def tearDown(self):
        if hasattr(self, 'seq'):
            self.seq.close()
            del self.seq
        if hasattr(self, 'd'):
            self.d.close()
            del self.d
        if hasattr(self, 'dbenv'):
            self.dbenv.close()
            del self.dbenv

        test_support.rmtree(self.homeDir)

    def test_get(self):
        self.seq = db.DBSequence(self.d, flags=0)
        start_value = 10 * self.int_32_max
        self.assertEqual(0xA00000000, start_value)
        self.assertEquals(None, self.seq.initial_value(start_value))
        self.assertEquals(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE))
        self.assertEquals(start_value, self.seq.get(5))
        self.assertEquals(start_value + 5, self.seq.get())

    def test_remove(self):
        self.seq = db.DBSequence(self.d, flags=0)
        self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
        self.assertEquals(None, self.seq.remove(txn=None, flags=0))
        del self.seq

    def test_get_key(self):
        self.seq = db.DBSequence(self.d, flags=0)
        key = 'foo'
        self.assertEquals(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE))
        self.assertEquals(key, self.seq.get_key())

    def test_get_dbp(self):
        self.seq = db.DBSequence(self.d, flags=0)
        self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
        self.assertEquals(self.d, self.seq.get_dbp())

    def test_cachesize(self):
        self.seq = db.DBSequence(self.d, flags=0)
        cashe_size = 10
        self.assertEquals(None, self.seq.set_cachesize(cashe_size))
        self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
        self.assertEquals(cashe_size, self.seq.get_cachesize())

    def test_flags(self):
        self.seq = db.DBSequence(self.d, flags=0)
        flag = db.DB_SEQ_WRAP;
        self.assertEquals(None, self.seq.set_flags(flag))
        self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
        self.assertEquals(flag, self.seq.get_flags() & flag)

    def test_range(self):
        self.seq = db.DBSequence(self.d, flags=0)
        seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1)
        self.assertEquals(None, self.seq.set_range(seq_range))
        self.seq.initial_value(seq_range[0])
        self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
        self.assertEquals(seq_range, self.seq.get_range())

    def test_stat(self):
        self.seq = db.DBSequence(self.d, flags=0)
        self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
        stat = self.seq.stat()
        for param in ('nowait', 'min', 'max', 'value', 'current',
                      'flags', 'cache_size', 'last_value', 'wait'):
            self.assertTrue(param in stat, "parameter %s isn't in stat info" % param)

    if db.version() >= (4,7) :
        # This code checks a crash solved in Berkeley DB 4.7
        def test_stat_crash(self) :
            d=db.DB()
            d.open(None,dbtype=db.DB_HASH,flags=db.DB_CREATE)  # In RAM
            seq = db.DBSequence(d, flags=0)

            self.assertRaises(db.DBNotFoundError, seq.open,
                    key='id', txn=None, flags=0)

            self.assertRaises(db.DBInvalidArgError, seq.stat)

            d.close()

    def test_64bits(self) :
        # We don't use both extremes because they are problematic
        value_plus=(1L<<63)-2
        self.assertEquals(9223372036854775806L,value_plus)
        value_minus=(-1L<<63)+1  # Two complement
        self.assertEquals(-9223372036854775807L,value_minus)
        self.seq = db.DBSequence(self.d, flags=0)
        self.assertEquals(None, self.seq.initial_value(value_plus-1))
        self.assertEquals(None, self.seq.open(key='id', txn=None,
            flags=db.DB_CREATE))
        self.assertEquals(value_plus-1, self.seq.get(1))
        self.assertEquals(value_plus, self.seq.get(1))

        self.seq.remove(txn=None, flags=0)

        self.seq = db.DBSequence(self.d, flags=0)
        self.assertEquals(None, self.seq.initial_value(value_minus))
        self.assertEquals(None, self.seq.open(key='id', txn=None,
            flags=db.DB_CREATE))
        self.assertEquals(value_minus, self.seq.get(1))
        self.assertEquals(value_minus+1, self.seq.get(1))

    def test_multiple_close(self):
        self.seq = db.DBSequence(self.d)
        self.seq.close()  # You can close a Sequence multiple times
        self.seq.close()
        self.seq.close()

def test_suite():
    suite = unittest.TestSuite()
    if db.version() >= (4,3):
        suite.addTest(unittest.makeSuite(DBSequenceTest))
    return suite


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')