summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb/test/test_sequence.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/bsddb/test/test_sequence.py')
-rw-r--r--Lib/bsddb/test/test_sequence.py88
1 files changed, 58 insertions, 30 deletions
diff --git a/Lib/bsddb/test/test_sequence.py b/Lib/bsddb/test/test_sequence.py
index 2e9c993..e2acbef 100644
--- a/Lib/bsddb/test/test_sequence.py
+++ b/Lib/bsddb/test/test_sequence.py
@@ -1,33 +1,19 @@
import unittest
import os
-import shutil
-import sys
-import tempfile
-try:
- # For Pythons w/distutils pybsddb
- from bsddb3 import db
-except ImportError:
- from bsddb import db
-
-from bsddb.test.test_all import verbose
-try:
- from bsddb3 import test_support
-except ImportError:
- from test import support as test_support
+from .test_all import db, test_support, get_new_environment_path, get_new_database_path
class DBSequenceTest(unittest.TestCase):
+ import sys
+ if sys.version_info[:3] < (2, 4, 0):
+ def assertTrue(self, expr, msg=None):
+ self.failUnless(expr,msg=msg)
+
def setUp(self):
self.int_32_max = 0x100000000
- self.homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid())
- try:
- os.mkdir(self.homeDir)
- except os.error:
- pass
- tempfile.tempdir = self.homeDir
- self.filename = os.path.split(tempfile.mktemp())[1]
- tempfile.tempdir = None
+ self.homeDir = get_new_environment_path()
+ self.filename = "test"
self.dbenv = db.DBEnv()
self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0o666)
@@ -52,39 +38,39 @@ class DBSequenceTest(unittest.TestCase):
start_value = 10 * self.int_32_max
self.assertEqual(0xA00000000, start_value)
self.assertEquals(None, self.seq.init_value(start_value))
- self.assertEquals(None, self.seq.open(key=b'id', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE))
self.assertEquals(start_value, self.seq.get(5))
self.assertEquals(start_value + 5, self.seq.get())
def test_remove(self):
self.seq = db.DBSequence(self.d, flags=0)
- self.assertEquals(None, self.seq.open(key=b'foo', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
self.assertEquals(None, self.seq.remove(txn=None, flags=0))
del self.seq
def test_get_key(self):
self.seq = db.DBSequence(self.d, flags=0)
- key = b'foo'
+ key = 'foo'
self.assertEquals(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE))
self.assertEquals(key, self.seq.get_key())
def test_get_dbp(self):
self.seq = db.DBSequence(self.d, flags=0)
- self.assertEquals(None, self.seq.open(key=b'foo', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
self.assertEquals(self.d, self.seq.get_dbp())
def test_cachesize(self):
self.seq = db.DBSequence(self.d, flags=0)
cashe_size = 10
self.assertEquals(None, self.seq.set_cachesize(cashe_size))
- self.assertEquals(None, self.seq.open(key=b'foo', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
self.assertEquals(cashe_size, self.seq.get_cachesize())
def test_flags(self):
self.seq = db.DBSequence(self.d, flags=0)
flag = db.DB_SEQ_WRAP;
self.assertEquals(None, self.seq.set_flags(flag))
- self.assertEquals(None, self.seq.open(key=b'foo', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
self.assertEquals(flag, self.seq.get_flags() & flag)
def test_range(self):
@@ -92,17 +78,59 @@ class DBSequenceTest(unittest.TestCase):
seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1)
self.assertEquals(None, self.seq.set_range(seq_range))
self.seq.init_value(seq_range[0])
- self.assertEquals(None, self.seq.open(key=b'foo', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
self.assertEquals(seq_range, self.seq.get_range())
def test_stat(self):
self.seq = db.DBSequence(self.d, flags=0)
- self.assertEquals(None, self.seq.open(key=b'foo', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
stat = self.seq.stat()
for param in ('nowait', 'min', 'max', 'value', 'current',
'flags', 'cache_size', 'last_value', 'wait'):
self.assertTrue(param in stat, "parameter %s isn't in stat info" % param)
+ if db.version() >= (4,7) :
+ # This code checks a crash solved in Berkeley DB 4.7
+ def test_stat_crash(self) :
+ d=db.DB()
+ d.open(None,dbtype=db.DB_HASH,flags=db.DB_CREATE) # In RAM
+ seq = db.DBSequence(d, flags=0)
+
+ self.assertRaises(db.DBNotFoundError, seq.open,
+ key='id', txn=None, flags=0)
+
+ self.assertRaises(db.DBInvalidArgError, seq.stat)
+
+ d.close()
+
+ def test_64bits(self) :
+ # We don't use both extremes because they are problematic
+ value_plus=(1<<63)-2
+ self.assertEquals(9223372036854775806,value_plus)
+ value_minus=(-1<<63)+1 # Two complement
+ self.assertEquals(-9223372036854775807,value_minus)
+ self.seq = db.DBSequence(self.d, flags=0)
+ self.assertEquals(None, self.seq.init_value(value_plus-1))
+ self.assertEquals(None, self.seq.open(key='id', txn=None,
+ flags=db.DB_CREATE))
+ self.assertEquals(value_plus-1, self.seq.get(1))
+ self.assertEquals(value_plus, self.seq.get(1))
+
+ self.seq.remove(txn=None, flags=0)
+
+ self.seq = db.DBSequence(self.d, flags=0)
+ self.assertEquals(None, self.seq.init_value(value_minus))
+ self.assertEquals(None, self.seq.open(key='id', txn=None,
+ flags=db.DB_CREATE))
+ self.assertEquals(value_minus, self.seq.get(1))
+ self.assertEquals(value_minus+1, self.seq.get(1))
+
+ def test_multiple_close(self):
+ self.seq = db.DBSequence(self.d)
+ self.seq.close() # You can close a Sequence multiple times
+ self.seq.close()
+ self.seq.close()
+
def test_suite():
suite = unittest.TestSuite()
if db.version() >= (4,3):