diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/bsddb/test/test_db.py | 142 | ||||
-rw-r--r-- | Lib/bsddb/test/test_dbenv.py | 513 | ||||
-rw-r--r-- | Lib/bsddb/test/test_fileid.py | 61 |
3 files changed, 716 insertions, 0 deletions
diff --git a/Lib/bsddb/test/test_db.py b/Lib/bsddb/test/test_db.py new file mode 100644 index 0000000..8d3a32b --- /dev/null +++ b/Lib/bsddb/test/test_db.py @@ -0,0 +1,142 @@ +import unittest +import os, glob + +from test_all import db, test_support, get_new_environment_path, \ + get_new_database_path + +#---------------------------------------------------------------------- + +class DB(unittest.TestCase): + import sys + if sys.version_info < (2, 4) : + def assertTrue(self, expr, msg=None): + self.failUnless(expr,msg=msg) + + def setUp(self): + self.path = get_new_database_path() + self.db = db.DB() + + def tearDown(self): + self.db.close() + del self.db + test_support.rmtree(self.path) + +class DB_general(DB) : + if db.version() >= (4, 2) : + def test_bt_minkey(self) : + for i in [17, 108, 1030] : + self.db.set_bt_minkey(i) + self.assertEqual(i, self.db.get_bt_minkey()) + + def test_lorder(self) : + self.db.set_lorder(1234) + self.assertEqual(1234, self.db.get_lorder()) + self.db.set_lorder(4321) + self.assertEqual(4321, self.db.get_lorder()) + self.assertRaises(db.DBInvalidArgError, self.db.set_lorder, 9182) + + if db.version() >= (4, 6) : + def test_priority(self) : + flags = [db.DB_PRIORITY_VERY_LOW, db.DB_PRIORITY_LOW, + db.DB_PRIORITY_DEFAULT, db.DB_PRIORITY_HIGH, + db.DB_PRIORITY_VERY_HIGH] + for flag in flags : + self.db.set_priority(flag) + self.assertEqual(flag, self.db.get_priority()) + +class DB_hash(DB) : + if db.version() >= (4, 2) : + def test_h_ffactor(self) : + for ffactor in [4, 16, 256] : + self.db.set_h_ffactor(ffactor) + self.assertEqual(ffactor, self.db.get_h_ffactor()) + + def test_h_nelem(self) : + for nelem in [1, 2, 4] : + nelem = nelem*1024*1024 # Millions + self.db.set_h_nelem(nelem) + self.assertEqual(nelem, self.db.get_h_nelem()) + + def test_pagesize(self) : + for i in xrange(9, 17) : # From 512 to 65536 + i = 1<<i + self.db.set_pagesize(i) + self.assertEqual(i, self.db.get_pagesize()) + + # The valid values goes from 512 to 65536 + # Test 131072 bytes... + self.assertRaises(db.DBInvalidArgError, self.db.set_pagesize, 1<<17) + # Test 256 bytes... + self.assertRaises(db.DBInvalidArgError, self.db.set_pagesize, 1<<8) + +class DB_txn(DB) : + def setUp(self) : + self.homeDir = get_new_environment_path() + self.env = db.DBEnv() + self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | + db.DB_INIT_LOG | db.DB_INIT_TXN) + self.db = db.DB(self.env) + + def tearDown(self) : + self.db.close() + del self.db + self.env.close() + del self.env + test_support.rmtree(self.homeDir) + + if db.version() >= (4, 2) : + def test_flags(self) : + self.db.set_flags(db.DB_CHKSUM) + self.assertEqual(db.DB_CHKSUM, self.db.get_flags()) + self.db.set_flags(db.DB_TXN_NOT_DURABLE) + self.assertEqual(db.DB_TXN_NOT_DURABLE | db.DB_CHKSUM, + self.db.get_flags()) + +class DB_recno(DB) : + if db.version() >= (4, 2) : + def test_re_pad(self) : + for i in [' ', '*'] : # Check chars + self.db.set_re_pad(i) + self.assertEqual(ord(i), self.db.get_re_pad()) + for i in [97, 65] : # Check integers + self.db.set_re_pad(i) + self.assertEqual(i, self.db.get_re_pad()) + + def test_re_delim(self) : + for i in [' ', '*'] : # Check chars + self.db.set_re_delim(i) + self.assertEqual(ord(i), self.db.get_re_delim()) + for i in [97, 65] : # Check integers + self.db.set_re_delim(i) + self.assertEqual(i, self.db.get_re_delim()) + + def test_re_source(self) : + for i in ["test", "test2", "test3"] : + self.db.set_re_source(i) + self.assertEqual(i, self.db.get_re_source()) + +class DB_queue(DB) : + if db.version() >= (4, 2) : + def test_re_len(self) : + for i in [33, 65, 300, 2000] : + self.db.set_re_len(i) + self.assertEqual(i, self.db.get_re_len()) + + def test_q_extentsize(self) : + for i in [1, 60, 100] : + self.db.set_q_extentsize(i) + self.assertEqual(i, self.db.get_q_extentsize()) + +def test_suite(): + suite = unittest.TestSuite() + + suite.addTest(unittest.makeSuite(DB_general)) + suite.addTest(unittest.makeSuite(DB_txn)) + suite.addTest(unittest.makeSuite(DB_hash)) + suite.addTest(unittest.makeSuite(DB_recno)) + suite.addTest(unittest.makeSuite(DB_queue)) + + return suite + +if __name__ == '__main__': + unittest.main(defaultTest='test_suite') diff --git a/Lib/bsddb/test/test_dbenv.py b/Lib/bsddb/test/test_dbenv.py new file mode 100644 index 0000000..37281df --- /dev/null +++ b/Lib/bsddb/test/test_dbenv.py @@ -0,0 +1,513 @@ +import unittest +import os, glob + +from test_all import db, test_support, get_new_environment_path, \ + get_new_database_path + +#---------------------------------------------------------------------- + +class DBEnv(unittest.TestCase): + import sys + if sys.version_info < (2, 4) : + def assertTrue(self, expr, msg=None): + self.failUnless(expr,msg=msg) + + def assertFalse(self, expr, msg=None): + self.failIf(expr,msg=msg) + + def setUp(self): + self.homeDir = get_new_environment_path() + self.env = db.DBEnv() + + def tearDown(self): + self.env.close() + del self.env + test_support.rmtree(self.homeDir) + +class DBEnv_general(DBEnv) : + if db.version() >= (4, 7) : + def test_lk_partitions(self) : + for i in [10, 20, 40] : + self.env.set_lk_partitions(i) + self.assertEqual(i, self.env.get_lk_partitions()) + + if db.version() >= (4, 6) : + def test_thread(self) : + for i in [16, 100, 1000] : + self.env.set_thread_count(i) + self.assertEqual(i, self.env.get_thread_count()) + + def test_cache_max(self) : + for size in [64, 128] : + size = size*1024*1024 # Megabytes + self.env.set_cache_max(0, size) + size2 = self.env.get_cache_max() + self.assertEqual(0, size2[0]) + self.assertTrue(size <= size2[1]) + self.assertTrue(2*size > size2[1]) + + if db.version() >= (4, 4) : + def test_mutex_stat(self) : + self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | + db.DB_INIT_LOCK) + stat = self.env.mutex_stat() + self.assertTrue("mutex_inuse_max" in stat) + + def test_lg_filemode(self) : + for i in [0600, 0660, 0666] : + self.env.set_lg_filemode(i) + self.assertEqual(i, self.env.get_lg_filemode()) + + if db.version() >= (4, 3) : + def test_mp_max_openfd(self) : + for i in [17, 31, 42] : + self.env.set_mp_max_openfd(i) + self.assertEqual(i, self.env.get_mp_max_openfd()) + + def test_mp_max_write(self) : + for i in [100, 200, 300] : + for j in [1, 2, 3] : + j *= 1000000 + self.env.set_mp_max_write(i, j) + v=self.env.get_mp_max_write() + self.assertEqual((i, j), v) + + if db.version() >= (4, 2) : + def test_invalid_txn(self) : + # This environment doesn't support transactions + self.assertRaises(db.DBInvalidArgError, self.env.txn_begin) + + def test_mp_mmapsize(self) : + for i in [16, 32, 64] : + i *= 1024*1024 + self.env.set_mp_mmapsize(i) + self.assertEqual(i, self.env.get_mp_mmapsize()) + + def test_tmp_dir(self) : + for i in ["a", "bb", "ccc"] : + self.env.set_tmp_dir(i) + self.assertEqual(i, self.env.get_tmp_dir()) + + def test_flags(self) : + self.env.set_flags(db.DB_AUTO_COMMIT, 1) + self.assertEqual(db.DB_AUTO_COMMIT, self.env.get_flags()) + self.env.set_flags(db.DB_TXN_NOSYNC, 1) + self.assertEqual(db.DB_AUTO_COMMIT | db.DB_TXN_NOSYNC, + self.env.get_flags()) + self.env.set_flags(db.DB_AUTO_COMMIT, 0) + self.assertEqual(db.DB_TXN_NOSYNC, self.env.get_flags()) + self.env.set_flags(db.DB_TXN_NOSYNC, 0) + self.assertEqual(0, self.env.get_flags()) + + def test_lk_max_objects(self) : + for i in [1000, 2000, 3000] : + self.env.set_lk_max_objects(i) + self.assertEqual(i, self.env.get_lk_max_objects()) + + def test_lk_max_locks(self) : + for i in [1000, 2000, 3000] : + self.env.set_lk_max_locks(i) + self.assertEqual(i, self.env.get_lk_max_locks()) + + def test_lk_max_lockers(self) : + for i in [1000, 2000, 3000] : + self.env.set_lk_max_lockers(i) + self.assertEqual(i, self.env.get_lk_max_lockers()) + + def test_lg_regionmax(self) : + for i in [128, 256, 1024] : + i = i*1024*1024 + self.env.set_lg_regionmax(i) + j = self.env.get_lg_regionmax() + self.assertTrue(i <= j) + self.assertTrue(2*i > j) + + def test_lk_detect(self) : + flags= [db.DB_LOCK_DEFAULT, db.DB_LOCK_EXPIRE, db.DB_LOCK_MAXLOCKS, + db.DB_LOCK_MINLOCKS, db.DB_LOCK_MINWRITE, + db.DB_LOCK_OLDEST, db.DB_LOCK_RANDOM, db.DB_LOCK_YOUNGEST] + + if db.version() >= (4, 3) : + flags.append(db.DB_LOCK_MAXWRITE) + + for i in flags : + self.env.set_lk_detect(i) + self.assertEqual(i, self.env.get_lk_detect()) + + def test_lg_dir(self) : + for i in ["a", "bb", "ccc", "dddd"] : + self.env.set_lg_dir(i) + self.assertEqual(i, self.env.get_lg_dir()) + + def test_lg_bsize(self) : + log_size = 70*1024 + self.env.set_lg_bsize(log_size) + self.assertTrue(self.env.get_lg_bsize() >= log_size) + self.assertTrue(self.env.get_lg_bsize() < 4*log_size) + self.env.set_lg_bsize(4*log_size) + self.assertTrue(self.env.get_lg_bsize() >= 4*log_size) + + def test_setget_data_dirs(self) : + dirs = ("a", "b", "c", "d") + for i in dirs : + self.env.set_data_dir(i) + self.assertEqual(dirs, self.env.get_data_dirs()) + + def test_setget_cachesize(self) : + cachesize = (0, 512*1024*1024, 3) + self.env.set_cachesize(*cachesize) + self.assertEqual(cachesize, self.env.get_cachesize()) + + cachesize = (0, 1*1024*1024, 5) + self.env.set_cachesize(*cachesize) + cachesize2 = self.env.get_cachesize() + self.assertEqual(cachesize[0], cachesize2[0]) + self.assertEqual(cachesize[2], cachesize2[2]) + # Berkeley DB expands the cache 25% accounting overhead, + # if the cache is small. + self.assertEqual(125, int(100.0*cachesize2[1]/cachesize[1])) + + # You can not change configuration after opening + # the environment. + self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL) + cachesize = (0, 2*1024*1024, 1) + self.assertRaises(db.DBInvalidArgError, + self.env.set_cachesize, *cachesize) + self.assertEqual(cachesize2, self.env.get_cachesize()) + + def test_set_cachesize_dbenv_db(self) : + # You can not configure the cachesize using + # the database handle, if you are using an environment. + d = db.DB(self.env) + self.assertRaises(db.DBInvalidArgError, + d.set_cachesize, 0, 1024*1024, 1) + + def test_setget_shm_key(self) : + shm_key=137 + self.env.set_shm_key(shm_key) + self.assertEqual(shm_key, self.env.get_shm_key()) + self.env.set_shm_key(shm_key+1) + self.assertEqual(shm_key+1, self.env.get_shm_key()) + + # You can not change configuration after opening + # the environment. + self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL) + # If we try to reconfigure cache after opening the + # environment, core dump. + self.assertRaises(db.DBInvalidArgError, + self.env.set_shm_key, shm_key) + self.assertEqual(shm_key+1, self.env.get_shm_key()) + + if db.version() >= (4, 4) : + def test_mutex_setget_max(self) : + v = self.env.mutex_get_max() + v2 = v*2+1 + + self.env.mutex_set_max(v2) + self.assertEqual(v2, self.env.mutex_get_max()) + + self.env.mutex_set_max(v) + self.assertEqual(v, self.env.mutex_get_max()) + + # You can not change configuration after opening + # the environment. + self.env.open(self.homeDir, db.DB_CREATE) + self.assertRaises(db.DBInvalidArgError, + self.env.mutex_set_max, v2) + + def test_mutex_setget_increment(self) : + v = self.env.mutex_get_increment() + v2 = 127 + + self.env.mutex_set_increment(v2) + self.assertEqual(v2, self.env.mutex_get_increment()) + + self.env.mutex_set_increment(v) + self.assertEqual(v, self.env.mutex_get_increment()) + + # You can not change configuration after opening + # the environment. + self.env.open(self.homeDir, db.DB_CREATE) + self.assertRaises(db.DBInvalidArgError, + self.env.mutex_set_increment, v2) + + def test_mutex_setget_tas_spins(self) : + self.env.mutex_set_tas_spins(0) # Default = BDB decides + v = self.env.mutex_get_tas_spins() + v2 = v*2+1 + + self.env.mutex_set_tas_spins(v2) + self.assertEqual(v2, self.env.mutex_get_tas_spins()) + + self.env.mutex_set_tas_spins(v) + self.assertEqual(v, self.env.mutex_get_tas_spins()) + + # In this case, you can change configuration + # after opening the environment. + self.env.open(self.homeDir, db.DB_CREATE) + self.env.mutex_set_tas_spins(v2) + + def test_mutex_setget_align(self) : + v = self.env.mutex_get_align() + v2 = 64 + if v == 64 : + v2 = 128 + + self.env.mutex_set_align(v2) + self.assertEqual(v2, self.env.mutex_get_align()) + + # Requires a nonzero power of two + self.assertRaises(db.DBInvalidArgError, + self.env.mutex_set_align, 0) + self.assertRaises(db.DBInvalidArgError, + self.env.mutex_set_align, 17) + + self.env.mutex_set_align(2*v2) + self.assertEqual(2*v2, self.env.mutex_get_align()) + + # You can not change configuration after opening + # the environment. + self.env.open(self.homeDir, db.DB_CREATE) + self.assertRaises(db.DBInvalidArgError, + self.env.mutex_set_align, v2) + + +class DBEnv_log(DBEnv) : + def setUp(self): + DBEnv.setUp(self) + self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | db.DB_INIT_LOG) + + def test_log_file(self) : + log_file = self.env.log_file((1, 1)) + self.assertEqual("log.0000000001", log_file[-14:]) + + if db.version() >= (4, 4) : + # The version with transactions is checked in other test object + def test_log_printf(self) : + msg = "This is a test..." + self.env.log_printf(msg) + logc = self.env.log_cursor() + self.assertTrue(msg in (logc.last()[1])) + + if db.version() >= (4, 7) : + def test_log_config(self) : + self.env.log_set_config(db.DB_LOG_DSYNC | db.DB_LOG_ZERO, 1) + self.assertTrue(self.env.log_get_config(db.DB_LOG_DSYNC)) + self.assertTrue(self.env.log_get_config(db.DB_LOG_ZERO)) + self.env.log_set_config(db.DB_LOG_ZERO, 0) + self.assertTrue(self.env.log_get_config(db.DB_LOG_DSYNC)) + self.assertFalse(self.env.log_get_config(db.DB_LOG_ZERO)) + + +class DBEnv_log_txn(DBEnv) : + def setUp(self): + DBEnv.setUp(self) + self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | + db.DB_INIT_LOG | db.DB_INIT_TXN) + + if db.version() >= (4, 5) : + def test_tx_max(self) : + txns=[] + def tx() : + for i in xrange(self.env.get_tx_max()) : + txns.append(self.env.txn_begin()) + + tx() + self.assertRaises(MemoryError, tx) + + # Abort the transactions before garbage collection, + # to avoid "warnings". + for i in txns : + i.abort() + + if db.version() >= (4, 4) : + # The version without transactions is checked in other test object + def test_log_printf(self) : + msg = "This is a test..." + txn = self.env.txn_begin() + self.env.log_printf(msg, txn=txn) + txn.commit() + logc = self.env.log_cursor() + logc.last() # Skip the commit + self.assertTrue(msg in (logc.prev()[1])) + + msg = "This is another test..." + txn = self.env.txn_begin() + self.env.log_printf(msg, txn=txn) + txn.abort() # Do not store the new message + logc.last() # Skip the abort + self.assertTrue(msg not in (logc.prev()[1])) + + msg = "This is a third test..." + txn = self.env.txn_begin() + self.env.log_printf(msg, txn=txn) + txn.commit() # Do not store the new message + logc.last() # Skip the commit + self.assertTrue(msg in (logc.prev()[1])) + + +class DBEnv_memp(DBEnv): + def setUp(self): + DBEnv.setUp(self) + self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | db.DB_INIT_LOG) + self.db = db.DB(self.env) + self.db.open("test", db.DB_HASH, db.DB_CREATE, 0660) + + def tearDown(self): + self.db.close() + del self.db + DBEnv.tearDown(self) + + def test_memp_1_trickle(self) : + self.db.put("hi", "bye") + self.assertTrue(self.env.memp_trickle(100) > 0) + +# Preserve the order, do "memp_trickle" test first + def test_memp_2_sync(self) : + self.db.put("hi", "bye") + self.env.memp_sync() # Full flush + # Nothing to do... + self.assertTrue(self.env.memp_trickle(100) == 0) + + self.db.put("hi", "bye2") + self.env.memp_sync((1, 0)) # NOP, probably + # Something to do... or not + self.assertTrue(self.env.memp_trickle(100) >= 0) + + self.db.put("hi", "bye3") + self.env.memp_sync((123, 99)) # Full flush + # Nothing to do... + self.assertTrue(self.env.memp_trickle(100) == 0) + + def test_memp_stat_1(self) : + stats = self.env.memp_stat() # No param + self.assertTrue(len(stats)==2) + self.assertTrue("cache_miss" in stats[0]) + stats = self.env.memp_stat(db.DB_STAT_CLEAR) # Positional param + self.assertTrue("cache_miss" in stats[0]) + stats = self.env.memp_stat(flags=0) # Keyword param + self.assertTrue("cache_miss" in stats[0]) + + def test_memp_stat_2(self) : + stats=self.env.memp_stat()[1] + self.assertTrue(len(stats))==1 + self.assertTrue("test" in stats) + self.assertTrue("page_in" in stats["test"]) + +class DBEnv_logcursor(DBEnv): + def setUp(self): + DBEnv.setUp(self) + self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | + db.DB_INIT_LOG | db.DB_INIT_TXN) + txn = self.env.txn_begin() + self.db = db.DB(self.env) + self.db.open("test", db.DB_HASH, db.DB_CREATE, 0660, txn=txn) + txn.commit() + for i in ["2", "8", "20"] : + txn = self.env.txn_begin() + self.db.put(key = i, data = i*int(i), txn=txn) + txn.commit() + + def tearDown(self): + self.db.close() + del self.db + DBEnv.tearDown(self) + + def _check_return(self, value) : + self.assertTrue(isinstance(value, tuple)) + self.assertEqual(len(value), 2) + self.assertTrue(isinstance(value[0], tuple)) + self.assertEqual(len(value[0]), 2) + self.assertTrue(isinstance(value[0][0], int)) + self.assertTrue(isinstance(value[0][1], int)) + self.assertTrue(isinstance(value[1], str)) + + # Preserve test order + def test_1_first(self) : + logc = self.env.log_cursor() + v = logc.first() + self._check_return(v) + self.assertTrue((1, 1) < v[0]) + self.assertTrue(len(v[1])>0) + + def test_2_last(self) : + logc = self.env.log_cursor() + lsn_first = logc.first()[0] + v = logc.last() + self._check_return(v) + self.assertTrue(lsn_first < v[0]) + + def test_3_next(self) : + logc = self.env.log_cursor() + lsn_last = logc.last()[0] + self.assertEqual(logc.next(), None) + lsn_first = logc.first()[0] + v = logc.next() + self._check_return(v) + self.assertTrue(lsn_first < v[0]) + self.assertTrue(lsn_last > v[0]) + + v2 = logc.next() + self.assertTrue(v2[0] > v[0]) + self.assertTrue(lsn_last > v2[0]) + + v3 = logc.next() + self.assertTrue(v3[0] > v2[0]) + self.assertTrue(lsn_last > v3[0]) + + def test_4_prev(self) : + logc = self.env.log_cursor() + lsn_first = logc.first()[0] + self.assertEqual(logc.prev(), None) + lsn_last = logc.last()[0] + v = logc.prev() + self._check_return(v) + self.assertTrue(lsn_first < v[0]) + self.assertTrue(lsn_last > v[0]) + + v2 = logc.prev() + self.assertTrue(v2[0] < v[0]) + self.assertTrue(lsn_first < v2[0]) + + v3 = logc.prev() + self.assertTrue(v3[0] < v2[0]) + self.assertTrue(lsn_first < v3[0]) + + def test_5_current(self) : + logc = self.env.log_cursor() + logc.first() + v = logc.next() + self.assertEqual(v, logc.current()) + + def test_6_set(self) : + logc = self.env.log_cursor() + logc.first() + v = logc.next() + self.assertNotEqual(v, logc.next()) + self.assertNotEqual(v, logc.next()) + self.assertEqual(v, logc.set(v[0])) + + def test_explicit_close(self) : + logc = self.env.log_cursor() + logc.close() + self.assertRaises(db.DBCursorClosedError, logc.next) + + def test_implicit_close(self) : + logc = [self.env.log_cursor() for i in xrange(10)] + self.env.close() # This close should close too all its tree + for i in logc : + self.assertRaises(db.DBCursorClosedError, i.next) + +def test_suite(): + suite = unittest.TestSuite() + + suite.addTest(unittest.makeSuite(DBEnv_general)) + suite.addTest(unittest.makeSuite(DBEnv_memp)) + suite.addTest(unittest.makeSuite(DBEnv_logcursor)) + suite.addTest(unittest.makeSuite(DBEnv_log)) + suite.addTest(unittest.makeSuite(DBEnv_log_txn)) + + return suite + +if __name__ == '__main__': + unittest.main(defaultTest='test_suite') diff --git a/Lib/bsddb/test/test_fileid.py b/Lib/bsddb/test/test_fileid.py new file mode 100644 index 0000000..2df2382 --- /dev/null +++ b/Lib/bsddb/test/test_fileid.py @@ -0,0 +1,61 @@ +"""TestCase for reseting File ID. +""" + +import os +import shutil +import unittest + +from test_all import db, test_support, get_new_environment_path, get_new_database_path + +class FileidResetTestCase(unittest.TestCase): + def setUp(self): + self.db_path_1 = get_new_database_path() + self.db_path_2 = get_new_database_path() + self.db_env_path = get_new_environment_path() + + def test_fileid_reset(self): + # create DB 1 + self.db1 = db.DB() + self.db1.open(self.db_path_1, dbtype=db.DB_HASH, flags=(db.DB_CREATE|db.DB_EXCL)) + self.db1.put('spam', 'eggs') + self.db1.close() + + shutil.copy(self.db_path_1, self.db_path_2) + + self.db2 = db.DB() + self.db2.open(self.db_path_2, dbtype=db.DB_HASH) + self.db2.put('spam', 'spam') + self.db2.close() + + self.db_env = db.DBEnv() + self.db_env.open(self.db_env_path, db.DB_CREATE|db.DB_INIT_MPOOL) + + # use fileid_reset() here + self.db_env.fileid_reset(self.db_path_2) + + self.db1 = db.DB(self.db_env) + self.db1.open(self.db_path_1, dbtype=db.DB_HASH, flags=db.DB_RDONLY) + self.assertEquals(self.db1.get('spam'), 'eggs') + + self.db2 = db.DB(self.db_env) + self.db2.open(self.db_path_2, dbtype=db.DB_HASH, flags=db.DB_RDONLY) + self.assertEquals(self.db2.get('spam'), 'spam') + + self.db1.close() + self.db2.close() + + self.db_env.close() + + def tearDown(self): + test_support.unlink(self.db_path_1) + test_support.unlink(self.db_path_2) + test_support.rmtree(self.db_env_path) + +def test_suite(): + suite = unittest.TestSuite() + if db.version() >= (4, 4): + suite.addTest(unittest.makeSuite(FileidResetTestCase)) + return suite + +if __name__ == '__main__': + unittest.main(defaultTest='test_suite') |