diff options
author | Benjamin Peterson <benjamin@python.org> | 2008-09-03 22:30:12 (GMT) |
---|---|---|
committer | Benjamin Peterson <benjamin@python.org> | 2008-09-03 22:30:12 (GMT) |
commit | b98eb875dc4d23cdbf56e080a31de0881d2b6965 (patch) | |
tree | 2dcc04afc7c0919fd47297d7d13f33f60b79e9f1 /Lib/bsddb/test/test_basics.py | |
parent | dcc1e117604cc64dfa03a703391fbaeba654181a (diff) | |
download | cpython-b98eb875dc4d23cdbf56e080a31de0881d2b6965.zip cpython-b98eb875dc4d23cdbf56e080a31de0881d2b6965.tar.gz cpython-b98eb875dc4d23cdbf56e080a31de0881d2b6965.tar.bz2 |
remove bsddb
Diffstat (limited to 'Lib/bsddb/test/test_basics.py')
-rw-r--r-- | Lib/bsddb/test/test_basics.py | 1053 |
1 files changed, 0 insertions, 1053 deletions
diff --git a/Lib/bsddb/test/test_basics.py b/Lib/bsddb/test/test_basics.py deleted file mode 100644 index 704bc41..0000000 --- a/Lib/bsddb/test/test_basics.py +++ /dev/null @@ -1,1053 +0,0 @@ -""" -Basic TestCases for BTree and hash DBs, with and without a DBEnv, with -various DB flags, etc. -""" - -import os -import errno -import string -from pprint import pprint -import unittest -import time - -from .test_all import db, test_support, verbose, get_new_environment_path, \ - get_new_database_path - -DASH = '-' - - -#---------------------------------------------------------------------- - -class VersionTestCase(unittest.TestCase): - def test00_version(self): - info = db.version() - if verbose: - print('\n', '-=' * 20) - print('bsddb.db.version(): %s' % (info, )) - print(db.DB_VERSION_STRING) - print('-=' * 20) - self.assertEqual(info, (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR, - db.DB_VERSION_PATCH)) - -#---------------------------------------------------------------------- - -class BasicTestCase(unittest.TestCase): - dbtype = db.DB_UNKNOWN # must be set in derived class - dbopenflags = 0 - dbsetflags = 0 - dbmode = 0o660 - dbname = None - useEnv = 0 - envflags = 0 - envsetflags = 0 - - _numKeys = 1002 # PRIVATE. NOTE: must be an even value - - def setUp(self): - if self.useEnv: - self.homeDir=get_new_environment_path() - try: - self.env = db.DBEnv() - self.env.set_lg_max(1024*1024) - self.env.set_tx_max(30) - self.env.set_tx_timestamp(int(time.time())) - self.env.set_flags(self.envsetflags, 1) - self.env.open(self.homeDir, self.envflags | db.DB_CREATE) - self.filename = "test" - # Yes, a bare except is intended, since we're re-raising the exc. - except: - test_support.rmtree(self.homeDir) - raise - else: - self.env = None - self.filename = get_new_database_path() - - # create and open the DB - self.d = db.DB(self.env) - self.d.set_flags(self.dbsetflags) - if self.dbname: - self.d.open(self.filename, self.dbname, self.dbtype, - self.dbopenflags|db.DB_CREATE, self.dbmode) - else: - self.d.open(self.filename, # try out keyword args - mode = self.dbmode, - dbtype = self.dbtype, - flags = self.dbopenflags|db.DB_CREATE) - - self.populateDB() - - - def tearDown(self): - self.d.close() - if self.env is not None: - self.env.close() - test_support.rmtree(self.homeDir) - else: - os.remove(self.filename) - - - - def populateDB(self, _txn=None): - d = self.d - - for x in range(self._numKeys//2): - key = '%04d' % (self._numKeys - x) # insert keys in reverse order - data = self.makeData(key) - d.put(key, data, _txn) - - d.put('empty value', '', _txn) - - for x in range(self._numKeys//2-1): - key = '%04d' % x # and now some in forward order - data = self.makeData(key) - d.put(key, data, _txn) - - if _txn: - _txn.commit() - - num = len(d) - if verbose: - print("created %d records" % num) - - - def makeData(self, key): - return DASH.join([key] * 5) - - - - #---------------------------------------- - - def test01_GetsAndPuts(self): - d = self.d - - if verbose: - print('\n', '-=' * 30) - print("Running %s.test01_GetsAndPuts..." % self.__class__.__name__) - - for key in ['0001', '0100', '0400', '0700', '0999']: - data = d.get(key) - if verbose: - print(data) - - self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321') - - # By default non-existant keys return None... - self.assertEqual(d.get('abcd'), None) - - # ...but they raise exceptions in other situations. Call - # set_get_returns_none() to change it. - try: - d.delete('abcd') - except db.DBNotFoundError as val: - import sys - if sys.version_info[0] < 3 : - self.assertEqual(val[0], db.DB_NOTFOUND) - else : - self.assertEqual(val.args[0], db.DB_NOTFOUND) - if verbose: print(val) - else: - self.fail("expected exception") - - - d.put('abcd', 'a new record') - self.assertEqual(d.get('abcd'), 'a new record') - - d.put('abcd', 'same key') - if self.dbsetflags & db.DB_DUP: - self.assertEqual(d.get('abcd'), 'a new record') - else: - self.assertEqual(d.get('abcd'), 'same key') - - - try: - d.put('abcd', 'this should fail', flags=db.DB_NOOVERWRITE) - except db.DBKeyExistError as val: - import sys - if sys.version_info[0] < 3 : - self.assertEqual(val[0], db.DB_KEYEXIST) - else : - self.assertEqual(val.args[0], db.DB_KEYEXIST) - if verbose: print(val) - else: - self.fail("expected exception") - - if self.dbsetflags & db.DB_DUP: - self.assertEqual(d.get('abcd'), 'a new record') - else: - self.assertEqual(d.get('abcd'), 'same key') - - - d.sync() - d.close() - del d - - self.d = db.DB(self.env) - if self.dbname: - self.d.open(self.filename, self.dbname) - else: - self.d.open(self.filename) - d = self.d - - self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321') - if self.dbsetflags & db.DB_DUP: - self.assertEqual(d.get('abcd'), 'a new record') - else: - self.assertEqual(d.get('abcd'), 'same key') - - rec = d.get_both('0555', '0555-0555-0555-0555-0555') - if verbose: - print(rec) - - self.assertEqual(d.get_both('0555', 'bad data'), None) - - # test default value - data = d.get('bad key', 'bad data') - self.assertEqual(data, 'bad data') - - # any object can pass through - data = d.get('bad key', self) - self.assertEqual(data, self) - - s = d.stat() - self.assertEqual(type(s), type({})) - if verbose: - print('d.stat() returned this dictionary:') - pprint(s) - - - #---------------------------------------- - - def test02_DictionaryMethods(self): - d = self.d - - if verbose: - print('\n', '-=' * 30) - print("Running %s.test02_DictionaryMethods..." % \ - self.__class__.__name__) - - for key in ['0002', '0101', '0401', '0701', '0998']: - data = d[key] - self.assertEqual(data, self.makeData(key)) - if verbose: - print(data) - - self.assertEqual(len(d), self._numKeys) - keys = list(d.keys()) - self.assertEqual(len(keys), self._numKeys) - self.assertEqual(type(keys), type([])) - - d['new record'] = 'a new record' - self.assertEqual(len(d), self._numKeys+1) - keys = list(d.keys()) - self.assertEqual(len(keys), self._numKeys+1) - - d['new record'] = 'a replacement record' - self.assertEqual(len(d), self._numKeys+1) - keys = list(d.keys()) - self.assertEqual(len(keys), self._numKeys+1) - - if verbose: - print("the first 10 keys are:") - pprint(keys[:10]) - - self.assertEqual(d['new record'], 'a replacement record') - -# We check also the positional parameter - self.assertEqual(d.has_key('0001', None), 1) -# We check also the keyword parameter - self.assertEqual(d.has_key('spam', txn=None), 0) - - items = list(d.items()) - self.assertEqual(len(items), self._numKeys+1) - self.assertEqual(type(items), type([])) - self.assertEqual(type(items[0]), type(())) - self.assertEqual(len(items[0]), 2) - - if verbose: - print("the first 10 items are:") - pprint(items[:10]) - - values = list(d.values()) - self.assertEqual(len(values), self._numKeys+1) - self.assertEqual(type(values), type([])) - - if verbose: - print("the first 10 values are:") - pprint(values[:10]) - - - - #---------------------------------------- - - def test03_SimpleCursorStuff(self, get_raises_error=0, set_raises_error=0): - if verbose: - print('\n', '-=' * 30) - print("Running %s.test03_SimpleCursorStuff (get_error %s, set_error %s)..." % \ - (self.__class__.__name__, get_raises_error, set_raises_error)) - - if self.env and self.dbopenflags & db.DB_AUTO_COMMIT: - txn = self.env.txn_begin() - else: - txn = None - c = self.d.cursor(txn=txn) - - rec = c.first() - count = 0 - while rec is not None: - count = count + 1 - if verbose and count % 100 == 0: - print(rec) - try: - rec = next(c) - except db.DBNotFoundError as val: - if get_raises_error: - import sys - if sys.version_info[0] < 3 : - self.assertEqual(val[0], db.DB_NOTFOUND) - else : - self.assertEqual(val.args[0], db.DB_NOTFOUND) - if verbose: print(val) - rec = None - else: - self.fail("unexpected DBNotFoundError") - self.assertEqual(c.get_current_size(), len(c.current()[1]), - "%s != len(%r)" % (c.get_current_size(), c.current()[1])) - - self.assertEqual(count, self._numKeys) - - - rec = c.last() - count = 0 - while rec is not None: - count = count + 1 - if verbose and count % 100 == 0: - print(rec) - try: - rec = c.prev() - except db.DBNotFoundError as val: - if get_raises_error: - import sys - if sys.version_info[0] < 3 : - self.assertEqual(val[0], db.DB_NOTFOUND) - else : - self.assertEqual(val.args[0], db.DB_NOTFOUND) - if verbose: print(val) - rec = None - else: - self.fail("unexpected DBNotFoundError") - - self.assertEqual(count, self._numKeys) - - rec = c.set('0505') - rec2 = c.current() - self.assertEqual(rec, rec2) - self.assertEqual(rec[0], '0505') - self.assertEqual(rec[1], self.makeData('0505')) - self.assertEqual(c.get_current_size(), len(rec[1])) - - # make sure we get empty values properly - rec = c.set('empty value') - self.assertEqual(rec[1], '') - self.assertEqual(c.get_current_size(), 0) - - try: - n = c.set('bad key') - except db.DBNotFoundError as val: - import sys - if sys.version_info[0] < 3 : - self.assertEqual(val[0], db.DB_NOTFOUND) - else : - self.assertEqual(val.args[0], db.DB_NOTFOUND) - if verbose: print(val) - else: - if set_raises_error: - self.fail("expected exception") - if n != None: - self.fail("expected None: %r" % (n,)) - - rec = c.get_both('0404', self.makeData('0404')) - self.assertEqual(rec, ('0404', self.makeData('0404'))) - - try: - n = c.get_both('0404', 'bad data') - except db.DBNotFoundError as val: - import sys - if sys.version_info[0] < 3 : - self.assertEqual(val[0], db.DB_NOTFOUND) - else : - self.assertEqual(val.args[0], db.DB_NOTFOUND) - if verbose: print(val) - else: - if get_raises_error: - self.fail("expected exception") - if n != None: - self.fail("expected None: %r" % (n,)) - - if self.d.get_type() == db.DB_BTREE: - rec = c.set_range('011') - if verbose: - print("searched for '011', found: ", rec) - - rec = c.set_range('011',dlen=0,doff=0) - if verbose: - print("searched (partial) for '011', found: ", rec) - if rec[1] != '': self.fail('expected empty data portion') - - ev = c.set_range('empty value') - if verbose: - print("search for 'empty value' returned", ev) - if ev[1] != '': self.fail('empty value lookup failed') - - c.set('0499') - c.delete() - try: - rec = c.current() - except db.DBKeyEmptyError as val: - if get_raises_error: - import sys - if sys.version_info[0] < 3 : - self.assertEqual(val[0], db.DB_KEYEMPTY) - else : - self.assertEqual(val.args[0], db.DB_KEYEMPTY) - if verbose: print(val) - else: - self.fail("unexpected DBKeyEmptyError") - else: - if get_raises_error: - self.fail('DBKeyEmptyError exception expected') - - next(c) - c2 = c.dup(db.DB_POSITION) - self.assertEqual(c.current(), c2.current()) - - c2.put('', 'a new value', db.DB_CURRENT) - self.assertEqual(c.current(), c2.current()) - self.assertEqual(c.current()[1], 'a new value') - - c2.put('', 'er', db.DB_CURRENT, dlen=0, doff=5) - self.assertEqual(c2.current()[1], 'a newer value') - - c.close() - c2.close() - if txn: - txn.commit() - - # time to abuse the closed cursors and hope we don't crash - methods_to_test = { - 'current': (), - 'delete': (), - 'dup': (db.DB_POSITION,), - 'first': (), - 'get': (0,), - 'next': (), - 'prev': (), - 'last': (), - 'put':('', 'spam', db.DB_CURRENT), - 'set': ("0505",), - } - for method, args in list(methods_to_test.items()): - try: - if verbose: - print("attempting to use a closed cursor's %s method" % \ - method) - # a bug may cause a NULL pointer dereference... - getattr(c, method)(*args) - except db.DBError as val: - import sys - if sys.version_info[0] < 3 : - self.assertEqual(val[0], 0) - else : - self.assertEqual(val.args[0], 0) - if verbose: print(val) - else: - self.fail("no exception raised when using a buggy cursor's" - "%s method" % method) - - # - # free cursor referencing a closed database, it should not barf: - # - oldcursor = self.d.cursor(txn=txn) - self.d.close() - - # this would originally cause a segfault when the cursor for a - # closed database was cleaned up. it should not anymore. - # SF pybsddb bug id 667343 - del oldcursor - - def test03b_SimpleCursorWithoutGetReturnsNone0(self): - # same test but raise exceptions instead of returning None - if verbose: - print('\n', '-=' * 30) - print("Running %s.test03b_SimpleCursorStuffWithoutGetReturnsNone..." % \ - self.__class__.__name__) - - old = self.d.set_get_returns_none(0) - self.assertEqual(old, 2) - self.test03_SimpleCursorStuff(get_raises_error=1, set_raises_error=1) - - def test03b_SimpleCursorWithGetReturnsNone1(self): - # same test but raise exceptions instead of returning None - if verbose: - print('\n', '-=' * 30) - print("Running %s.test03b_SimpleCursorStuffWithoutGetReturnsNone..." % \ - self.__class__.__name__) - - old = self.d.set_get_returns_none(1) - self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=1) - - - def test03c_SimpleCursorGetReturnsNone2(self): - # same test but raise exceptions instead of returning None - if verbose: - print('\n', '-=' * 30) - print("Running %s.test03c_SimpleCursorStuffWithoutSetReturnsNone..." % \ - self.__class__.__name__) - - old = self.d.set_get_returns_none(1) - self.assertEqual(old, 2) - old = self.d.set_get_returns_none(2) - self.assertEqual(old, 1) - self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=0) - - #---------------------------------------- - - def test04_PartialGetAndPut(self): - d = self.d - if verbose: - print('\n', '-=' * 30) - print("Running %s.test04_PartialGetAndPut..." % \ - self.__class__.__name__) - - key = "partialTest" - data = "1" * 1000 + "2" * 1000 - d.put(key, data) - self.assertEqual(d.get(key), data) - self.assertEqual(d.get(key, dlen=20, doff=990), - ("1" * 10) + ("2" * 10)) - - d.put("partialtest2", ("1" * 30000) + "robin" ) - self.assertEqual(d.get("partialtest2", dlen=5, doff=30000), "robin") - - # There seems to be a bug in DB here... Commented out the test for - # now. - ##self.assertEqual(d.get("partialtest2", dlen=5, doff=30010), "") - - if self.dbsetflags != db.DB_DUP: - # Partial put with duplicate records requires a cursor - d.put(key, "0000", dlen=2000, doff=0) - self.assertEqual(d.get(key), "0000") - - d.put(key, "1111", dlen=1, doff=2) - self.assertEqual(d.get(key), "0011110") - - #---------------------------------------- - - def test05_GetSize(self): - d = self.d - if verbose: - print('\n', '-=' * 30) - print("Running %s.test05_GetSize..." % self.__class__.__name__) - - for i in range(1, 50000, 500): - key = "size%s" % i - #print "before ", i, - d.put(key, "1" * i) - #print "after", - self.assertEqual(d.get_size(key), i) - #print "done" - - #---------------------------------------- - - def test06_Truncate(self): - d = self.d - if verbose: - print('\n', '-=' * 30) - print("Running %s.test99_Truncate..." % self.__class__.__name__) - - d.put("abcde", "ABCDE"); - num = d.truncate() - self.assert_(num >= 1, "truncate returned <= 0 on non-empty database") - num = d.truncate() - self.assertEqual(num, 0, - "truncate on empty DB returned nonzero (%r)" % (num,)) - - #---------------------------------------- - - -#---------------------------------------------------------------------- - - -class BasicBTreeTestCase(BasicTestCase): - dbtype = db.DB_BTREE - - -class BasicHashTestCase(BasicTestCase): - dbtype = db.DB_HASH - - -class BasicBTreeWithThreadFlagTestCase(BasicTestCase): - dbtype = db.DB_BTREE - dbopenflags = db.DB_THREAD - - -class BasicHashWithThreadFlagTestCase(BasicTestCase): - dbtype = db.DB_HASH - dbopenflags = db.DB_THREAD - - -class BasicWithEnvTestCase(BasicTestCase): - dbopenflags = db.DB_THREAD - useEnv = 1 - envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK - - #---------------------------------------- - - def test07_EnvRemoveAndRename(self): - if not self.env: - return - - if verbose: - print('\n', '-=' * 30) - print("Running %s.test07_EnvRemoveAndRename..." % self.__class__.__name__) - - # can't rename or remove an open DB - self.d.close() - - newname = self.filename + '.renamed' - self.env.dbrename(self.filename, None, newname) - self.env.dbremove(newname) - - # dbremove and dbrename are in 4.1 and later - if db.version() < (4,1): - del test07_EnvRemoveAndRename - - #---------------------------------------- - -class BasicBTreeWithEnvTestCase(BasicWithEnvTestCase): - dbtype = db.DB_BTREE - - -class BasicHashWithEnvTestCase(BasicWithEnvTestCase): - dbtype = db.DB_HASH - - -#---------------------------------------------------------------------- - -class BasicTransactionTestCase(BasicTestCase): - import sys - if sys.version_info[:3] < (2, 4, 0): - def assertTrue(self, expr, msg=None): - self.failUnless(expr,msg=msg) - - dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT - useEnv = 1 - envflags = (db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | - db.DB_INIT_TXN) - envsetflags = db.DB_AUTO_COMMIT - - - def tearDown(self): - self.txn.commit() - BasicTestCase.tearDown(self) - - - def populateDB(self): - txn = self.env.txn_begin() - BasicTestCase.populateDB(self, _txn=txn) - - self.txn = self.env.txn_begin() - - - def test06_Transactions(self): - d = self.d - if verbose: - print('\n', '-=' * 30) - print("Running %s.test06_Transactions..." % self.__class__.__name__) - - self.assertEqual(d.get('new rec', txn=self.txn), None) - d.put('new rec', 'this is a new record', self.txn) - self.assertEqual(d.get('new rec', txn=self.txn), - 'this is a new record') - self.txn.abort() - self.assertEqual(d.get('new rec'), None) - - self.txn = self.env.txn_begin() - - self.assertEqual(d.get('new rec', txn=self.txn), None) - d.put('new rec', 'this is a new record', self.txn) - self.assertEqual(d.get('new rec', txn=self.txn), - 'this is a new record') - self.txn.commit() - self.assertEqual(d.get('new rec'), 'this is a new record') - - self.txn = self.env.txn_begin() - c = d.cursor(self.txn) - rec = c.first() - count = 0 - while rec is not None: - count = count + 1 - if verbose and count % 100 == 0: - print(rec) - rec = next(c) - self.assertEqual(count, self._numKeys+1) - - c.close() # Cursors *MUST* be closed before commit! - self.txn.commit() - - # flush pending updates - try: - self.env.txn_checkpoint (0, 0, 0) - except db.DBIncompleteError: - pass - - statDict = self.env.log_stat(0); - self.assert_('magic' in statDict) - self.assert_('version' in statDict) - self.assert_('cur_file' in statDict) - self.assert_('region_nowait' in statDict) - - # must have at least one log file present: - logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG) - self.assertNotEqual(logs, None) - for log in logs: - if verbose: - print('log file: ' + log) - if db.version() >= (4,2): - logs = self.env.log_archive(db.DB_ARCH_REMOVE) - self.assertTrue(not logs) - - self.txn = self.env.txn_begin() - - #---------------------------------------- - - def test07_TxnTruncate(self): - d = self.d - if verbose: - print('\n', '-=' * 30) - print("Running %s.test07_TxnTruncate..." % self.__class__.__name__) - - d.put("abcde", "ABCDE"); - txn = self.env.txn_begin() - num = d.truncate(txn) - self.assert_(num >= 1, "truncate returned <= 0 on non-empty database") - num = d.truncate(txn) - self.assertEqual(num, 0, - "truncate on empty DB returned nonzero (%r)" % (num,)) - txn.commit() - - #---------------------------------------- - - def test08_TxnLateUse(self): - txn = self.env.txn_begin() - txn.abort() - try: - txn.abort() - except db.DBError as e: - pass - else: - raise RuntimeError("DBTxn.abort() called after DB_TXN no longer valid w/o an exception") - - txn = self.env.txn_begin() - txn.commit() - try: - txn.commit() - except db.DBError as e: - pass - else: - raise RuntimeError("DBTxn.commit() called after DB_TXN no longer valid w/o an exception") - - -class BTreeTransactionTestCase(BasicTransactionTestCase): - dbtype = db.DB_BTREE - -class HashTransactionTestCase(BasicTransactionTestCase): - dbtype = db.DB_HASH - - - -#---------------------------------------------------------------------- - -class BTreeRecnoTestCase(BasicTestCase): - dbtype = db.DB_BTREE - dbsetflags = db.DB_RECNUM - - def test07_RecnoInBTree(self): - d = self.d - if verbose: - print('\n', '-=' * 30) - print("Running %s.test07_RecnoInBTree..." % self.__class__.__name__) - - rec = d.get(200) - self.assertEqual(type(rec), type(())) - self.assertEqual(len(rec), 2) - if verbose: - print("Record #200 is ", rec) - - c = d.cursor() - c.set('0200') - num = c.get_recno() - self.assertEqual(type(num), type(1)) - if verbose: - print("recno of d['0200'] is ", num) - - rec = c.current() - self.assertEqual(c.set_recno(num), rec) - - c.close() - - - -class BTreeRecnoWithThreadFlagTestCase(BTreeRecnoTestCase): - dbopenflags = db.DB_THREAD - -#---------------------------------------------------------------------- - -class BasicDUPTestCase(BasicTestCase): - dbsetflags = db.DB_DUP - - def test08_DuplicateKeys(self): - d = self.d - if verbose: - print('\n', '-=' * 30) - print("Running %s.test08_DuplicateKeys..." % \ - self.__class__.__name__) - - d.put("dup0", "before") - for x in "The quick brown fox jumped over the lazy dog.".split(): - d.put("dup1", x) - d.put("dup2", "after") - - data = d.get("dup1") - self.assertEqual(data, "The") - if verbose: - print(data) - - c = d.cursor() - rec = c.set("dup1") - self.assertEqual(rec, ('dup1', 'The')) - - next_reg = next(c) - self.assertEqual(next_reg, ('dup1', 'quick')) - - rec = c.set("dup1") - count = c.count() - self.assertEqual(count, 9) - - next_dup = c.next_dup() - self.assertEqual(next_dup, ('dup1', 'quick')) - - rec = c.set('dup1') - while rec is not None: - if verbose: - print(rec) - rec = c.next_dup() - - c.set('dup1') - rec = c.next_nodup() - self.assertNotEqual(rec[0], 'dup1') - if verbose: - print(rec) - - c.close() - - - -class BTreeDUPTestCase(BasicDUPTestCase): - dbtype = db.DB_BTREE - -class HashDUPTestCase(BasicDUPTestCase): - dbtype = db.DB_HASH - -class BTreeDUPWithThreadTestCase(BasicDUPTestCase): - dbtype = db.DB_BTREE - dbopenflags = db.DB_THREAD - -class HashDUPWithThreadTestCase(BasicDUPTestCase): - dbtype = db.DB_HASH - dbopenflags = db.DB_THREAD - - -#---------------------------------------------------------------------- - -class BasicMultiDBTestCase(BasicTestCase): - dbname = 'first' - - def otherType(self): - if self.dbtype == db.DB_BTREE: - return db.DB_HASH - else: - return db.DB_BTREE - - def test09_MultiDB(self): - d1 = self.d - if verbose: - print('\n', '-=' * 30) - print("Running %s.test09_MultiDB..." % self.__class__.__name__) - - d2 = db.DB(self.env) - d2.open(self.filename, "second", self.dbtype, - self.dbopenflags|db.DB_CREATE) - d3 = db.DB(self.env) - d3.open(self.filename, "third", self.otherType(), - self.dbopenflags|db.DB_CREATE) - - for x in "The quick brown fox jumped over the lazy dog".split(): - d2.put(x, self.makeData(x)) - - for x in string.letters: - d3.put(x, x*70) - - d1.sync() - d2.sync() - d3.sync() - d1.close() - d2.close() - d3.close() - - self.d = d1 = d2 = d3 = None - - self.d = d1 = db.DB(self.env) - d1.open(self.filename, self.dbname, flags = self.dbopenflags) - d2 = db.DB(self.env) - d2.open(self.filename, "second", flags = self.dbopenflags) - d3 = db.DB(self.env) - d3.open(self.filename, "third", flags = self.dbopenflags) - - c1 = d1.cursor() - c2 = d2.cursor() - c3 = d3.cursor() - - count = 0 - rec = c1.first() - while rec is not None: - count = count + 1 - if verbose and (count % 50) == 0: - print(rec) - rec = next(c1) - self.assertEqual(count, self._numKeys) - - count = 0 - rec = c2.first() - while rec is not None: - count = count + 1 - if verbose: - print(rec) - rec = next(c2) - self.assertEqual(count, 9) - - count = 0 - rec = c3.first() - while rec is not None: - count = count + 1 - if verbose: - print(rec) - rec = next(c3) - self.assertEqual(count, len(string.letters)) - - - c1.close() - c2.close() - c3.close() - - d2.close() - d3.close() - - - -# Strange things happen if you try to use Multiple DBs per file without a -# DBEnv with MPOOL and LOCKing... - -class BTreeMultiDBTestCase(BasicMultiDBTestCase): - dbtype = db.DB_BTREE - dbopenflags = db.DB_THREAD - useEnv = 1 - envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK - -class HashMultiDBTestCase(BasicMultiDBTestCase): - dbtype = db.DB_HASH - dbopenflags = db.DB_THREAD - useEnv = 1 - envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK - - -class PrivateObject(unittest.TestCase) : - import sys - if sys.version_info[:3] < (2, 4, 0): - def assertTrue(self, expr, msg=None): - self.failUnless(expr,msg=msg) - - def tearDown(self) : - del self.obj - - def test01_DefaultIsNone(self) : - self.assertEqual(self.obj.get_private(), None) - - def test02_assignment(self) : - a = "example of private object" - self.obj.set_private(a) - b = self.obj.get_private() - self.assertTrue(a is b) # Object identity - - def test03_leak_assignment(self) : - import sys - a = "example of private object" - refcount = sys.getrefcount(a) - self.obj.set_private(a) - self.assertEqual(refcount+1, sys.getrefcount(a)) - self.obj.set_private(None) - self.assertEqual(refcount, sys.getrefcount(a)) - - def test04_leak_GC(self) : - import sys - a = "example of private object" - refcount = sys.getrefcount(a) - self.obj.set_private(a) - self.obj = None - self.assertEqual(refcount, sys.getrefcount(a)) - -class DBEnvPrivateObject(PrivateObject) : - def setUp(self) : - self.obj = db.DBEnv() - -class DBPrivateObject(PrivateObject) : - def setUp(self) : - self.obj = db.DB() - -class CrashAndBurn(unittest.TestCase) : - def test01_OpenCrash(self) : - # See http://bugs.python.org/issue3307 - self.assertRaises(db.DBInvalidArgError, db.DB, None, 65535) - - -#---------------------------------------------------------------------- -#---------------------------------------------------------------------- - -def test_suite(): - suite = unittest.TestSuite() - - suite.addTest(unittest.makeSuite(VersionTestCase)) - suite.addTest(unittest.makeSuite(BasicBTreeTestCase)) - suite.addTest(unittest.makeSuite(BasicHashTestCase)) - suite.addTest(unittest.makeSuite(BasicBTreeWithThreadFlagTestCase)) - suite.addTest(unittest.makeSuite(BasicHashWithThreadFlagTestCase)) - suite.addTest(unittest.makeSuite(BasicBTreeWithEnvTestCase)) - suite.addTest(unittest.makeSuite(BasicHashWithEnvTestCase)) - suite.addTest(unittest.makeSuite(BTreeTransactionTestCase)) - suite.addTest(unittest.makeSuite(HashTransactionTestCase)) - suite.addTest(unittest.makeSuite(BTreeRecnoTestCase)) - suite.addTest(unittest.makeSuite(BTreeRecnoWithThreadFlagTestCase)) - suite.addTest(unittest.makeSuite(BTreeDUPTestCase)) - suite.addTest(unittest.makeSuite(HashDUPTestCase)) - suite.addTest(unittest.makeSuite(BTreeDUPWithThreadTestCase)) - suite.addTest(unittest.makeSuite(HashDUPWithThreadTestCase)) - suite.addTest(unittest.makeSuite(BTreeMultiDBTestCase)) - suite.addTest(unittest.makeSuite(HashMultiDBTestCase)) - suite.addTest(unittest.makeSuite(DBEnvPrivateObject)) - suite.addTest(unittest.makeSuite(DBPrivateObject)) - #suite.addTest(unittest.makeSuite(CrashAndBurn)) - - return suite - - -if __name__ == '__main__': - unittest.main(defaultTest='test_suite') |