diff options
| author | Jesus Cea <jcea@jcea.es> | 2008-05-13 20:57:59 (GMT) |
|---|---|---|
| committer | Jesus Cea <jcea@jcea.es> | 2008-05-13 20:57:59 (GMT) |
| commit | 18eb1fa2dd0e4f6b80e1cc31fc712eebfc610154 (patch) | |
| tree | 8208c448c32c48d0a8179587745cdad3095f08fe /Lib/bsddb/test/test_basics.py | |
| parent | cb33aeaf02244f84d37f8f3e3f213b09b1828357 (diff) | |
| download | cpython-18eb1fa2dd0e4f6b80e1cc31fc712eebfc610154.zip cpython-18eb1fa2dd0e4f6b80e1cc31fc712eebfc610154.tar.gz cpython-18eb1fa2dd0e4f6b80e1cc31fc712eebfc610154.tar.bz2 | |
Testsuite for bsddb module, version 4.6.4
Diffstat (limited to 'Lib/bsddb/test/test_basics.py')
| -rw-r--r-- | Lib/bsddb/test/test_basics.py | 234 |
1 files changed, 116 insertions, 118 deletions
diff --git a/Lib/bsddb/test/test_basics.py b/Lib/bsddb/test/test_basics.py index 9b38d57..a234d2a 100644 --- a/Lib/bsddb/test/test_basics.py +++ b/Lib/bsddb/test/test_basics.py @@ -6,7 +6,6 @@ various DB flags, etc. import os import errno import string -import tempfile from pprint import pprint import unittest import time @@ -23,7 +22,7 @@ try: except ImportError: from test import test_support -from test_all import verbose +from test_all import verbose, get_new_environment_path, get_new_database_path DASH = '-' @@ -38,8 +37,8 @@ class VersionTestCase(unittest.TestCase): print 'bsddb.db.version(): %s' % (info, ) print db.DB_VERSION_STRING print '-=' * 20 - assert info == (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR, - db.DB_VERSION_PATCH) + self.assertEqual(info, (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR, + db.DB_VERSION_PATCH)) #---------------------------------------------------------------------- @@ -57,27 +56,22 @@ class BasicTestCase(unittest.TestCase): def setUp(self): if self.useEnv: - homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid()) - self.homeDir = homeDir - test_support.rmtree(homeDir) - os.mkdir(homeDir) + self.homeDir=get_new_environment_path() try: self.env = db.DBEnv() self.env.set_lg_max(1024*1024) self.env.set_tx_max(30) self.env.set_tx_timestamp(int(time.time())) self.env.set_flags(self.envsetflags, 1) - self.env.open(homeDir, self.envflags | db.DB_CREATE) - tempfile.tempdir = homeDir - self.filename = os.path.split(tempfile.mktemp())[1] - tempfile.tempdir = None + self.env.open(self.homeDir, self.envflags | db.DB_CREATE) + self.filename = "test" # Yes, a bare except is intended, since we're re-raising the exc. except: - test_support.rmtree(homeDir) + test_support.rmtree(self.homeDir) raise else: self.env = None - self.filename = tempfile.mktemp() + self.filename = get_new_database_path() # create and open the DB self.d = db.DB(self.env) @@ -99,13 +93,6 @@ class BasicTestCase(unittest.TestCase): if self.env is not None: self.env.close() test_support.rmtree(self.homeDir) - ## XXX(nnorwitz): is this comment stil valid? - ## Make a new DBEnv to remove the env files from the home dir. - ## (It can't be done while the env is open, nor after it has been - ## closed, so we make a new one to do it.) - #e = db.DBEnv() - #e.remove(self.homeDir) - #os.remove(os.path.join(self.homeDir, self.filename)) else: os.remove(self.filename) @@ -153,44 +140,44 @@ class BasicTestCase(unittest.TestCase): if verbose: print data - assert d.get('0321') == '0321-0321-0321-0321-0321' + self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321') # By default non-existant keys return None... - assert d.get('abcd') == None + self.assertEqual(d.get('abcd'), None) # ...but they raise exceptions in other situations. Call # set_get_returns_none() to change it. try: d.delete('abcd') except db.DBNotFoundError, val: - assert val[0] == db.DB_NOTFOUND + self.assertEqual(val[0], db.DB_NOTFOUND) if verbose: print val else: self.fail("expected exception") d.put('abcd', 'a new record') - assert d.get('abcd') == 'a new record' + self.assertEqual(d.get('abcd'), 'a new record') d.put('abcd', 'same key') if self.dbsetflags & db.DB_DUP: - assert d.get('abcd') == 'a new record' + self.assertEqual(d.get('abcd'), 'a new record') else: - assert d.get('abcd') == 'same key' + self.assertEqual(d.get('abcd'), 'same key') try: d.put('abcd', 'this should fail', flags=db.DB_NOOVERWRITE) except db.DBKeyExistError, val: - assert val[0] == db.DB_KEYEXIST + self.assertEqual(val[0], db.DB_KEYEXIST) if verbose: print val else: self.fail("expected exception") if self.dbsetflags & db.DB_DUP: - assert d.get('abcd') == 'a new record' + self.assertEqual(d.get('abcd'), 'a new record') else: - assert d.get('abcd') == 'same key' + self.assertEqual(d.get('abcd'), 'same key') d.sync() @@ -204,28 +191,28 @@ class BasicTestCase(unittest.TestCase): self.d.open(self.filename) d = self.d - assert d.get('0321') == '0321-0321-0321-0321-0321' + self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321') if self.dbsetflags & db.DB_DUP: - assert d.get('abcd') == 'a new record' + self.assertEqual(d.get('abcd'), 'a new record') else: - assert d.get('abcd') == 'same key' + self.assertEqual(d.get('abcd'), 'same key') rec = d.get_both('0555', '0555-0555-0555-0555-0555') if verbose: print rec - assert d.get_both('0555', 'bad data') == None + self.assertEqual(d.get_both('0555', 'bad data'), None) # test default value data = d.get('bad key', 'bad data') - assert data == 'bad data' + self.assertEqual(data, 'bad data') # any object can pass through data = d.get('bad key', self) - assert data == self + self.assertEqual(data, self) s = d.stat() - assert type(s) == type({}) + self.assertEqual(type(s), type({})) if verbose: print 'd.stat() returned this dictionary:' pprint(s) @@ -243,47 +230,47 @@ class BasicTestCase(unittest.TestCase): for key in ['0002', '0101', '0401', '0701', '0998']: data = d[key] - assert data == self.makeData(key) + self.assertEqual(data, self.makeData(key)) if verbose: print data - assert len(d) == self._numKeys + self.assertEqual(len(d), self._numKeys) keys = d.keys() - assert len(keys) == self._numKeys - assert type(keys) == type([]) + self.assertEqual(len(keys), self._numKeys) + self.assertEqual(type(keys), type([])) d['new record'] = 'a new record' - assert len(d) == self._numKeys+1 + self.assertEqual(len(d), self._numKeys+1) keys = d.keys() - assert len(keys) == self._numKeys+1 + self.assertEqual(len(keys), self._numKeys+1) d['new record'] = 'a replacement record' - assert len(d) == self._numKeys+1 + self.assertEqual(len(d), self._numKeys+1) keys = d.keys() - assert len(keys) == self._numKeys+1 + self.assertEqual(len(keys), self._numKeys+1) if verbose: print "the first 10 keys are:" pprint(keys[:10]) - assert d['new record'] == 'a replacement record' + self.assertEqual(d['new record'], 'a replacement record') - assert d.has_key('0001') == 1 - assert d.has_key('spam') == 0 + self.assertEqual(d.has_key('0001'), 1) + self.assertEqual(d.has_key('spam'), 0) items = d.items() - assert len(items) == self._numKeys+1 - assert type(items) == type([]) - assert type(items[0]) == type(()) - assert len(items[0]) == 2 + self.assertEqual(len(items), self._numKeys+1) + self.assertEqual(type(items), type([])) + self.assertEqual(type(items[0]), type(())) + self.assertEqual(len(items[0]), 2) if verbose: print "the first 10 items are:" pprint(items[:10]) values = d.values() - assert len(values) == self._numKeys+1 - assert type(values) == type([]) + self.assertEqual(len(values), self._numKeys+1) + self.assertEqual(type(values), type([])) if verbose: print "the first 10 values are:" @@ -315,14 +302,15 @@ class BasicTestCase(unittest.TestCase): rec = c.next() except db.DBNotFoundError, val: if get_raises_error: - assert val[0] == db.DB_NOTFOUND + self.assertEqual(val[0], db.DB_NOTFOUND) if verbose: print val rec = None else: self.fail("unexpected DBNotFoundError") - assert c.get_current_size() == len(c.current()[1]), "%s != len(%r)" % (c.get_current_size(), c.current()[1]) + self.assertEqual(c.get_current_size(), len(c.current()[1]), + "%s != len(%r)" % (c.get_current_size(), c.current()[1])) - assert count == self._numKeys + self.assertEqual(count, self._numKeys) rec = c.last() @@ -335,49 +323,49 @@ class BasicTestCase(unittest.TestCase): rec = c.prev() except db.DBNotFoundError, val: if get_raises_error: - assert val[0] == db.DB_NOTFOUND + self.assertEqual(val[0], db.DB_NOTFOUND) if verbose: print val rec = None else: self.fail("unexpected DBNotFoundError") - assert count == self._numKeys + self.assertEqual(count, self._numKeys) rec = c.set('0505') rec2 = c.current() - assert rec == rec2 - assert rec[0] == '0505' - assert rec[1] == self.makeData('0505') - assert c.get_current_size() == len(rec[1]) + self.assertEqual(rec, rec2) + self.assertEqual(rec[0], '0505') + self.assertEqual(rec[1], self.makeData('0505')) + self.assertEqual(c.get_current_size(), len(rec[1])) # make sure we get empty values properly rec = c.set('empty value') - assert rec[1] == '' - assert c.get_current_size() == 0 + self.assertEqual(rec[1], '') + self.assertEqual(c.get_current_size(), 0) try: n = c.set('bad key') except db.DBNotFoundError, val: - assert val[0] == db.DB_NOTFOUND + self.assertEqual(val[0], db.DB_NOTFOUND) if verbose: print val else: if set_raises_error: self.fail("expected exception") - if n is not None: + if n != None: self.fail("expected None: %r" % (n,)) rec = c.get_both('0404', self.makeData('0404')) - assert rec == ('0404', self.makeData('0404')) + self.assertEqual(rec, ('0404', self.makeData('0404'))) try: n = c.get_both('0404', 'bad data') except db.DBNotFoundError, val: - assert val[0] == db.DB_NOTFOUND + self.assertEqual(val[0], db.DB_NOTFOUND) if verbose: print val else: if get_raises_error: self.fail("expected exception") - if n is not None: + if n != None: self.fail("expected None: %r" % (n,)) if self.d.get_type() == db.DB_BTREE: @@ -401,7 +389,7 @@ class BasicTestCase(unittest.TestCase): rec = c.current() except db.DBKeyEmptyError, val: if get_raises_error: - assert val[0] == db.DB_KEYEMPTY + self.assertEqual(val[0], db.DB_KEYEMPTY) if verbose: print val else: self.fail("unexpected DBKeyEmptyError") @@ -411,14 +399,14 @@ class BasicTestCase(unittest.TestCase): c.next() c2 = c.dup(db.DB_POSITION) - assert c.current() == c2.current() + self.assertEqual(c.current(), c2.current()) c2.put('', 'a new value', db.DB_CURRENT) - assert c.current() == c2.current() - assert c.current()[1] == 'a new value' + self.assertEqual(c.current(), c2.current()) + self.assertEqual(c.current()[1], 'a new value') c2.put('', 'er', db.DB_CURRENT, dlen=0, doff=5) - assert c2.current()[1] == 'a newer value' + self.assertEqual(c2.current()[1], 'a newer value') c.close() c2.close() @@ -446,7 +434,7 @@ class BasicTestCase(unittest.TestCase): # a bug may cause a NULL pointer dereference... apply(getattr(c, method), args) except db.DBError, val: - assert val[0] == 0 + self.assertEqual(val[0], 0) if verbose: print val else: self.fail("no exception raised when using a buggy cursor's" @@ -471,7 +459,7 @@ class BasicTestCase(unittest.TestCase): self.__class__.__name__ old = self.d.set_get_returns_none(0) - assert old == 2 + self.assertEqual(old, 2) self.test03_SimpleCursorStuff(get_raises_error=1, set_raises_error=1) def test03b_SimpleCursorWithGetReturnsNone1(self): @@ -493,9 +481,9 @@ class BasicTestCase(unittest.TestCase): self.__class__.__name__ old = self.d.set_get_returns_none(1) - assert old == 2 + self.assertEqual(old, 2) old = self.d.set_get_returns_none(2) - assert old == 1 + self.assertEqual(old, 1) self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=0) #---------------------------------------- @@ -510,23 +498,24 @@ class BasicTestCase(unittest.TestCase): key = "partialTest" data = "1" * 1000 + "2" * 1000 d.put(key, data) - assert d.get(key) == data - assert d.get(key, dlen=20, doff=990) == ("1" * 10) + ("2" * 10) + self.assertEqual(d.get(key), data) + self.assertEqual(d.get(key, dlen=20, doff=990), + ("1" * 10) + ("2" * 10)) d.put("partialtest2", ("1" * 30000) + "robin" ) - assert d.get("partialtest2", dlen=5, doff=30000) == "robin" + self.assertEqual(d.get("partialtest2", dlen=5, doff=30000), "robin") # There seems to be a bug in DB here... Commented out the test for # now. - ##assert d.get("partialtest2", dlen=5, doff=30010) == "" + ##self.assertEqual(d.get("partialtest2", dlen=5, doff=30010), "") if self.dbsetflags != db.DB_DUP: # Partial put with duplicate records requires a cursor d.put(key, "0000", dlen=2000, doff=0) - assert d.get(key) == "0000" + self.assertEqual(d.get(key), "0000") d.put(key, "1111", dlen=1, doff=2) - assert d.get(key) == "0011110" + self.assertEqual(d.get(key), "0011110") #---------------------------------------- @@ -541,14 +530,14 @@ class BasicTestCase(unittest.TestCase): #print "before ", i, d.put(key, "1" * i) #print "after", - assert d.get_size(key) == i + self.assertEqual(d.get_size(key), i) #print "done" #---------------------------------------- def test06_Truncate(self): if db.version() < (3,3): - # truncate is a feature of BerkeleyDB 3.3 and above + # truncate is a feature of Berkeley DB 3.3 and above return d = self.d @@ -558,9 +547,10 @@ class BasicTestCase(unittest.TestCase): d.put("abcde", "ABCDE"); num = d.truncate() - assert num >= 1, "truncate returned <= 0 on non-empty database" + self.assert_(num >= 1, "truncate returned <= 0 on non-empty database") num = d.truncate() - assert num == 0, "truncate on empty DB returned nonzero (%r)" % (num,) + self.assertEqual(num, 0, + "truncate on empty DB returned nonzero (%r)" % (num,)) #---------------------------------------- @@ -625,6 +615,11 @@ class BasicHashWithEnvTestCase(BasicWithEnvTestCase): #---------------------------------------------------------------------- class BasicTransactionTestCase(BasicTestCase): + import sys + if sys.version_info[:3] < (2, 4, 0): + def assertTrue(self, expr, msg=None): + self.failUnless(expr,msg=msg) + dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT useEnv = 1 envflags = (db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | @@ -650,19 +645,21 @@ class BasicTransactionTestCase(BasicTestCase): print '\n', '-=' * 30 print "Running %s.test06_Transactions..." % self.__class__.__name__ - assert d.get('new rec', txn=self.txn) == None + self.assertEqual(d.get('new rec', txn=self.txn), None) d.put('new rec', 'this is a new record', self.txn) - assert d.get('new rec', txn=self.txn) == 'this is a new record' + self.assertEqual(d.get('new rec', txn=self.txn), + 'this is a new record') self.txn.abort() - assert d.get('new rec') == None + self.assertEqual(d.get('new rec'), None) self.txn = self.env.txn_begin() - assert d.get('new rec', txn=self.txn) == None + self.assertEqual(d.get('new rec', txn=self.txn), None) d.put('new rec', 'this is a new record', self.txn) - assert d.get('new rec', txn=self.txn) == 'this is a new record' + self.assertEqual(d.get('new rec', txn=self.txn), + 'this is a new record') self.txn.commit() - assert d.get('new rec') == 'this is a new record' + self.assertEqual(d.get('new rec'), 'this is a new record') self.txn = self.env.txn_begin() c = d.cursor(self.txn) @@ -673,7 +670,7 @@ class BasicTransactionTestCase(BasicTestCase): if verbose and count % 100 == 0: print rec rec = c.next() - assert count == self._numKeys+1 + self.assertEqual(count, self._numKeys+1) c.close() # Cursors *MUST* be closed before commit! self.txn.commit() @@ -686,20 +683,20 @@ class BasicTransactionTestCase(BasicTestCase): if db.version() >= (4,0): statDict = self.env.log_stat(0); - assert statDict.has_key('magic') - assert statDict.has_key('version') - assert statDict.has_key('cur_file') - assert statDict.has_key('region_nowait') + self.assert_(statDict.has_key('magic')) + self.assert_(statDict.has_key('version')) + self.assert_(statDict.has_key('cur_file')) + self.assert_(statDict.has_key('region_nowait')) # must have at least one log file present: logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG) - assert logs != None + self.assertNotEqual(logs, None) for log in logs: if verbose: print 'log file: ' + log if db.version() >= (4,2): logs = self.env.log_archive(db.DB_ARCH_REMOVE) - assert not logs + self.assertTrue(not logs) self.txn = self.env.txn_begin() @@ -707,7 +704,7 @@ class BasicTransactionTestCase(BasicTestCase): def test07_TxnTruncate(self): if db.version() < (3,3): - # truncate is a feature of BerkeleyDB 3.3 and above + # truncate is a feature of Berkeley DB 3.3 and above return d = self.d @@ -718,9 +715,10 @@ class BasicTransactionTestCase(BasicTestCase): d.put("abcde", "ABCDE"); txn = self.env.txn_begin() num = d.truncate(txn) - assert num >= 1, "truncate returned <= 0 on non-empty database" + self.assert_(num >= 1, "truncate returned <= 0 on non-empty database") num = d.truncate(txn) - assert num == 0, "truncate on empty DB returned nonzero (%r)" % (num,) + self.assertEqual(num, 0, + "truncate on empty DB returned nonzero (%r)" % (num,)) txn.commit() #---------------------------------------- @@ -766,20 +764,20 @@ class BTreeRecnoTestCase(BasicTestCase): print "Running %s.test07_RecnoInBTree..." % self.__class__.__name__ rec = d.get(200) - assert type(rec) == type(()) - assert len(rec) == 2 + self.assertEqual(type(rec), type(())) + self.assertEqual(len(rec), 2) if verbose: print "Record #200 is ", rec c = d.cursor() c.set('0200') num = c.get_recno() - assert type(num) == type(1) + self.assertEqual(type(num), type(1)) if verbose: print "recno of d['0200'] is ", num rec = c.current() - assert c.set_recno(num) == rec + self.assertEqual(c.set_recno(num), rec) c.close() @@ -806,23 +804,23 @@ class BasicDUPTestCase(BasicTestCase): d.put("dup2", "after") data = d.get("dup1") - assert data == "The" + self.assertEqual(data, "The") if verbose: print data c = d.cursor() rec = c.set("dup1") - assert rec == ('dup1', 'The') + self.assertEqual(rec, ('dup1', 'The')) next = c.next() - assert next == ('dup1', 'quick') + self.assertEqual(next, ('dup1', 'quick')) rec = c.set("dup1") count = c.count() - assert count == 9 + self.assertEqual(count, 9) next_dup = c.next_dup() - assert next_dup == ('dup1', 'quick') + self.assertEqual(next_dup, ('dup1', 'quick')) rec = c.set('dup1') while rec is not None: @@ -832,7 +830,7 @@ class BasicDUPTestCase(BasicTestCase): c.set('dup1') rec = c.next_nodup() - assert rec[0] != 'dup1' + self.assertNotEqual(rec[0], 'dup1') if verbose: print rec @@ -912,7 +910,7 @@ class BasicMultiDBTestCase(BasicTestCase): if verbose and (count % 50) == 0: print rec rec = c1.next() - assert count == self._numKeys + self.assertEqual(count, self._numKeys) count = 0 rec = c2.first() @@ -921,7 +919,7 @@ class BasicMultiDBTestCase(BasicTestCase): if verbose: print rec rec = c2.next() - assert count == 9 + self.assertEqual(count, 9) count = 0 rec = c3.first() @@ -930,7 +928,7 @@ class BasicMultiDBTestCase(BasicTestCase): if verbose: print rec rec = c3.next() - assert count == 52 + self.assertEqual(count, 52) c1.close() |
