diff options
Diffstat (limited to 'Lib/bsddb/test/test_basics.py')
-rw-r--r-- | Lib/bsddb/test/test_basics.py | 465 |
1 files changed, 261 insertions, 204 deletions
diff --git a/Lib/bsddb/test/test_basics.py b/Lib/bsddb/test/test_basics.py index 9c829bd..704bc41 100644 --- a/Lib/bsddb/test/test_basics.py +++ b/Lib/bsddb/test/test_basics.py @@ -4,29 +4,17 @@ various DB flags, etc. """ import os -import sys import errno import string -import tempfile from pprint import pprint import unittest import time -try: - # For Pythons w/distutils pybsddb - from bsddb3 import db -except ImportError: - # For Python 2.3 - from bsddb import db +from .test_all import db, test_support, verbose, get_new_environment_path, \ + get_new_database_path -from bsddb.test.test_all import verbose -try: - from bsddb3 import test_support -except ImportError: - from test import support as test_support +DASH = '-' -DASH = b'-' -letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' #---------------------------------------------------------------------- @@ -38,8 +26,8 @@ class VersionTestCase(unittest.TestCase): print('bsddb.db.version(): %s' % (info, )) print(db.DB_VERSION_STRING) print('-=' * 20) - assert info == (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR, - db.DB_VERSION_PATCH) + self.assertEqual(info, (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR, + db.DB_VERSION_PATCH)) #---------------------------------------------------------------------- @@ -57,10 +45,7 @@ class BasicTestCase(unittest.TestCase): def setUp(self): if self.useEnv: - homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid()) - self.homeDir = homeDir - test_support.rmtree(homeDir) - os.mkdir(homeDir) + self.homeDir=get_new_environment_path() try: self.env = db.DBEnv() self.env.set_lg_max(1024*1024) @@ -68,17 +53,14 @@ class BasicTestCase(unittest.TestCase): self.env.set_tx_timestamp(int(time.time())) self.env.set_flags(self.envsetflags, 1) self.env.open(self.homeDir, self.envflags | db.DB_CREATE) - old_tempfile_tempdir = tempfile.tempdir - tempfile.tempdir = self.homeDir - self.filename = os.path.split(tempfile.mktemp())[1] - tempfile.tempdir = old_tempfile_tempdir + self.filename = "test" # Yes, a bare except is intended, since we're re-raising the exc. except: - test_support.rmtree(homeDir) + test_support.rmtree(self.homeDir) raise else: self.env = None - self.filename = tempfile.mktemp() + self.filename = get_new_database_path() # create and open the DB self.d = db.DB(self.env) @@ -100,13 +82,6 @@ class BasicTestCase(unittest.TestCase): if self.env is not None: self.env.close() test_support.rmtree(self.homeDir) - ## XXX(nnorwitz): is this comment stil valid? - ## Make a new DBEnv to remove the env files from the home dir. - ## (It can't be done while the env is open, nor after it has been - ## closed, so we make a new one to do it.) - #e = db.DBEnv() - #e.remove(self.homeDir) - #os.remove(os.path.join(self.homeDir, self.filename)) else: os.remove(self.filename) @@ -117,15 +92,13 @@ class BasicTestCase(unittest.TestCase): for x in range(self._numKeys//2): key = '%04d' % (self._numKeys - x) # insert keys in reverse order - key = key.encode("utf-8") data = self.makeData(key) d.put(key, data, _txn) - d.put(b'empty value', b'', _txn) + d.put('empty value', '', _txn) for x in range(self._numKeys//2-1): key = '%04d' % x # and now some in forward order - key = key.encode("utf-8") data = self.makeData(key) d.put(key, data, _txn) @@ -151,49 +124,57 @@ class BasicTestCase(unittest.TestCase): print('\n', '-=' * 30) print("Running %s.test01_GetsAndPuts..." % self.__class__.__name__) - for key in [b'0001', b'0100', b'0400', b'0700', b'0999']: + for key in ['0001', '0100', '0400', '0700', '0999']: data = d.get(key) if verbose: print(data) - assert d.get(b'0321') == b'0321-0321-0321-0321-0321' + self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321') # By default non-existant keys return None... - assert d.get(b'abcd') == None + self.assertEqual(d.get('abcd'), None) # ...but they raise exceptions in other situations. Call # set_get_returns_none() to change it. try: - d.delete(b'abcd') + d.delete('abcd') except db.DBNotFoundError as val: - assert val.args[0] == db.DB_NOTFOUND + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_NOTFOUND) + else : + self.assertEqual(val.args[0], db.DB_NOTFOUND) if verbose: print(val) else: self.fail("expected exception") - d.put(b'abcd', b'a new record') - assert d.get(b'abcd') == b'a new record' + d.put('abcd', 'a new record') + self.assertEqual(d.get('abcd'), 'a new record') - d.put(b'abcd', b'same key') + d.put('abcd', 'same key') if self.dbsetflags & db.DB_DUP: - assert d.get(b'abcd') == b'a new record' + self.assertEqual(d.get('abcd'), 'a new record') else: - assert d.get(b'abcd') == b'same key' + self.assertEqual(d.get('abcd'), 'same key') try: - d.put(b'abcd', b'this should fail', flags=db.DB_NOOVERWRITE) + d.put('abcd', 'this should fail', flags=db.DB_NOOVERWRITE) except db.DBKeyExistError as val: - assert val.args[0] == db.DB_KEYEXIST + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_KEYEXIST) + else : + self.assertEqual(val.args[0], db.DB_KEYEXIST) if verbose: print(val) else: self.fail("expected exception") if self.dbsetflags & db.DB_DUP: - assert d.get(b'abcd') == b'a new record' + self.assertEqual(d.get('abcd'), 'a new record') else: - assert d.get(b'abcd') == b'same key' + self.assertEqual(d.get('abcd'), 'same key') d.sync() @@ -207,28 +188,28 @@ class BasicTestCase(unittest.TestCase): self.d.open(self.filename) d = self.d - assert d.get(b'0321') == b'0321-0321-0321-0321-0321' + self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321') if self.dbsetflags & db.DB_DUP: - assert d.get(b'abcd') == b'a new record' + self.assertEqual(d.get('abcd'), 'a new record') else: - assert d.get(b'abcd') == b'same key' + self.assertEqual(d.get('abcd'), 'same key') - rec = d.get_both(b'0555', b'0555-0555-0555-0555-0555') + rec = d.get_both('0555', '0555-0555-0555-0555-0555') if verbose: print(rec) - assert d.get_both(b'0555', b'bad data') == None + self.assertEqual(d.get_both('0555', 'bad data'), None) # test default value - data = d.get(b'bad key', b'bad data') - assert data == b'bad data' + data = d.get('bad key', 'bad data') + self.assertEqual(data, 'bad data') # any object can pass through - data = d.get(b'bad key', self) - assert data == self + data = d.get('bad key', self) + self.assertEqual(data, self) s = d.stat() - assert type(s) == type({}) + self.assertEqual(type(s), type({})) if verbose: print('d.stat() returned this dictionary:') pprint(s) @@ -244,49 +225,51 @@ class BasicTestCase(unittest.TestCase): print("Running %s.test02_DictionaryMethods..." % \ self.__class__.__name__) - for key in [b'0002', b'0101', b'0401', b'0701', b'0998']: + for key in ['0002', '0101', '0401', '0701', '0998']: data = d[key] - assert data == self.makeData(key) + self.assertEqual(data, self.makeData(key)) if verbose: print(data) - assert len(d) == self._numKeys - keys = d.keys() - assert len(keys) == self._numKeys - assert type(keys) == type([]) + self.assertEqual(len(d), self._numKeys) + keys = list(d.keys()) + self.assertEqual(len(keys), self._numKeys) + self.assertEqual(type(keys), type([])) - d[b'new record'] = b'a new record' - assert len(d) == self._numKeys+1 - keys = d.keys() - assert len(keys) == self._numKeys+1 + d['new record'] = 'a new record' + self.assertEqual(len(d), self._numKeys+1) + keys = list(d.keys()) + self.assertEqual(len(keys), self._numKeys+1) - d[b'new record'] = b'a replacement record' - assert len(d) == self._numKeys+1 - keys = d.keys() - assert len(keys) == self._numKeys+1 + d['new record'] = 'a replacement record' + self.assertEqual(len(d), self._numKeys+1) + keys = list(d.keys()) + self.assertEqual(len(keys), self._numKeys+1) if verbose: print("the first 10 keys are:") pprint(keys[:10]) - assert d[b'new record'] == b'a replacement record' + self.assertEqual(d['new record'], 'a replacement record') - assert d.has_key(b'0001') == 1 - assert d.has_key(b'spam') == 0 +# We check also the positional parameter + self.assertEqual(d.has_key('0001', None), 1) +# We check also the keyword parameter + self.assertEqual(d.has_key('spam', txn=None), 0) - items = d.items() - assert len(items) == self._numKeys+1 - assert type(items) == type([]) - assert type(items[0]) == type(()) - assert len(items[0]) == 2 + items = list(d.items()) + self.assertEqual(len(items), self._numKeys+1) + self.assertEqual(type(items), type([])) + self.assertEqual(type(items[0]), type(())) + self.assertEqual(len(items[0]), 2) if verbose: print("the first 10 items are:") pprint(items[:10]) - values = d.values() - assert len(values) == self._numKeys+1 - assert type(values) == type([]) + values = list(d.values()) + self.assertEqual(len(values), self._numKeys+1) + self.assertEqual(type(values), type([])) if verbose: print("the first 10 values are:") @@ -315,17 +298,22 @@ class BasicTestCase(unittest.TestCase): if verbose and count % 100 == 0: print(rec) try: - rec = c.next() + rec = next(c) except db.DBNotFoundError as val: if get_raises_error: - assert val.args[0] == db.DB_NOTFOUND + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_NOTFOUND) + else : + self.assertEqual(val.args[0], db.DB_NOTFOUND) if verbose: print(val) rec = None else: self.fail("unexpected DBNotFoundError") - assert c.get_current_size() == len(c.current()[1]), "%s != len(%r)" % (c.get_current_size(), c.current()[1]) + self.assertEqual(c.get_current_size(), len(c.current()[1]), + "%s != len(%r)" % (c.get_current_size(), c.current()[1])) - assert count == self._numKeys + self.assertEqual(count, self._numKeys) rec = c.last() @@ -338,73 +326,89 @@ class BasicTestCase(unittest.TestCase): rec = c.prev() except db.DBNotFoundError as val: if get_raises_error: - assert val.args[0] == db.DB_NOTFOUND + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_NOTFOUND) + else : + self.assertEqual(val.args[0], db.DB_NOTFOUND) if verbose: print(val) rec = None else: self.fail("unexpected DBNotFoundError") - assert count == self._numKeys + self.assertEqual(count, self._numKeys) - rec = c.set(b'0505') + rec = c.set('0505') rec2 = c.current() - assert rec == rec2, (repr(rec),repr(rec2)) - assert rec[0] == b'0505' - assert rec[1] == self.makeData(b'0505') - assert c.get_current_size() == len(rec[1]) + self.assertEqual(rec, rec2) + self.assertEqual(rec[0], '0505') + self.assertEqual(rec[1], self.makeData('0505')) + self.assertEqual(c.get_current_size(), len(rec[1])) # make sure we get empty values properly - rec = c.set(b'empty value') - assert rec[1] == b'' - assert c.get_current_size() == 0 + rec = c.set('empty value') + self.assertEqual(rec[1], '') + self.assertEqual(c.get_current_size(), 0) try: - n = c.set(b'bad key') + n = c.set('bad key') except db.DBNotFoundError as val: - assert val.args[0] == db.DB_NOTFOUND + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_NOTFOUND) + else : + self.assertEqual(val.args[0], db.DB_NOTFOUND) if verbose: print(val) else: if set_raises_error: self.fail("expected exception") - if n is not None: + if n != None: self.fail("expected None: %r" % (n,)) - rec = c.get_both(b'0404', self.makeData(b'0404')) - assert rec == (b'0404', self.makeData(b'0404')) + rec = c.get_both('0404', self.makeData('0404')) + self.assertEqual(rec, ('0404', self.makeData('0404'))) try: - n = c.get_both(b'0404', b'bad data') + n = c.get_both('0404', 'bad data') except db.DBNotFoundError as val: - assert val.args[0] == db.DB_NOTFOUND + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_NOTFOUND) + else : + self.assertEqual(val.args[0], db.DB_NOTFOUND) if verbose: print(val) else: if get_raises_error: self.fail("expected exception") - if n is not None: + if n != None: self.fail("expected None: %r" % (n,)) if self.d.get_type() == db.DB_BTREE: - rec = c.set_range(b'011') + rec = c.set_range('011') if verbose: print("searched for '011', found: ", rec) - rec = c.set_range(b'011',dlen=0,doff=0) + rec = c.set_range('011',dlen=0,doff=0) if verbose: print("searched (partial) for '011', found: ", rec) - if rec[1] != b'': self.fail('expected empty data portion') + if rec[1] != '': self.fail('expected empty data portion') - ev = c.set_range(b'empty value') + ev = c.set_range('empty value') if verbose: print("search for 'empty value' returned", ev) - if ev[1] != b'': self.fail('empty value lookup failed') + if ev[1] != '': self.fail('empty value lookup failed') - c.set(b'0499') + c.set('0499') c.delete() try: rec = c.current() except db.DBKeyEmptyError as val: if get_raises_error: - assert val.args[0] == db.DB_KEYEMPTY + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_KEYEMPTY) + else : + self.assertEqual(val.args[0], db.DB_KEYEMPTY) if verbose: print(val) else: self.fail("unexpected DBKeyEmptyError") @@ -412,16 +416,16 @@ class BasicTestCase(unittest.TestCase): if get_raises_error: self.fail('DBKeyEmptyError exception expected') - c.next() + next(c) c2 = c.dup(db.DB_POSITION) - assert c.current() == c2.current() + self.assertEqual(c.current(), c2.current()) - c2.put(b'', b'a new value', db.DB_CURRENT) - assert c.current() == c2.current() - assert c.current()[1] == b'a new value' + c2.put('', 'a new value', db.DB_CURRENT) + self.assertEqual(c.current(), c2.current()) + self.assertEqual(c.current()[1], 'a new value') - c2.put(b'', b'er', db.DB_CURRENT, dlen=0, doff=5) - assert c2.current()[1] == b'a newer value' + c2.put('', 'er', db.DB_CURRENT, dlen=0, doff=5) + self.assertEqual(c2.current()[1], 'a newer value') c.close() c2.close() @@ -441,7 +445,7 @@ class BasicTestCase(unittest.TestCase): 'put':('', 'spam', db.DB_CURRENT), 'set': ("0505",), } - for method, args in methods_to_test.items(): + for method, args in list(methods_to_test.items()): try: if verbose: print("attempting to use a closed cursor's %s method" % \ @@ -449,7 +453,11 @@ class BasicTestCase(unittest.TestCase): # a bug may cause a NULL pointer dereference... getattr(c, method)(*args) except db.DBError as val: - assert val.args[0] == 0 + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], 0) + else : + self.assertEqual(val.args[0], 0) if verbose: print(val) else: self.fail("no exception raised when using a buggy cursor's" @@ -474,7 +482,7 @@ class BasicTestCase(unittest.TestCase): self.__class__.__name__) old = self.d.set_get_returns_none(0) - assert old == 2 + self.assertEqual(old, 2) self.test03_SimpleCursorStuff(get_raises_error=1, set_raises_error=1) def test03b_SimpleCursorWithGetReturnsNone1(self): @@ -496,9 +504,9 @@ class BasicTestCase(unittest.TestCase): self.__class__.__name__) old = self.d.set_get_returns_none(1) - assert old == 2 + self.assertEqual(old, 2) old = self.d.set_get_returns_none(2) - assert old == 1 + self.assertEqual(old, 1) self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=0) #---------------------------------------- @@ -510,26 +518,27 @@ class BasicTestCase(unittest.TestCase): print("Running %s.test04_PartialGetAndPut..." % \ self.__class__.__name__) - key = b"partialTest" - data = b"1" * 1000 + b"2" * 1000 + key = "partialTest" + data = "1" * 1000 + "2" * 1000 d.put(key, data) - assert d.get(key) == data - assert d.get(key, dlen=20, doff=990) == (b"1" * 10) + (b"2" * 10) + self.assertEqual(d.get(key), data) + self.assertEqual(d.get(key, dlen=20, doff=990), + ("1" * 10) + ("2" * 10)) - d.put(b"partialtest2", (b"1" * 30000) + b"robin" ) - assert d.get(b"partialtest2", dlen=5, doff=30000) == b"robin" + d.put("partialtest2", ("1" * 30000) + "robin" ) + self.assertEqual(d.get("partialtest2", dlen=5, doff=30000), "robin") # There seems to be a bug in DB here... Commented out the test for # now. - ##assert d.get("partialtest2", dlen=5, doff=30010) == "" + ##self.assertEqual(d.get("partialtest2", dlen=5, doff=30010), "") if self.dbsetflags != db.DB_DUP: # Partial put with duplicate records requires a cursor - d.put(key, b"0000", dlen=2000, doff=0) - assert d.get(key) == b"0000" + d.put(key, "0000", dlen=2000, doff=0) + self.assertEqual(d.get(key), "0000") - d.put(key, b"1111", dlen=1, doff=2) - assert d.get(key) == b"0011110" + d.put(key, "1111", dlen=1, doff=2) + self.assertEqual(d.get(key), "0011110") #---------------------------------------- @@ -540,30 +549,27 @@ class BasicTestCase(unittest.TestCase): print("Running %s.test05_GetSize..." % self.__class__.__name__) for i in range(1, 50000, 500): - key = ("size%s" % i).encode("utf-8") + key = "size%s" % i #print "before ", i, - d.put(key, b"1" * i) + d.put(key, "1" * i) #print "after", - assert d.get_size(key) == i + self.assertEqual(d.get_size(key), i) #print "done" #---------------------------------------- def test06_Truncate(self): - if db.version() < (3,3): - # truncate is a feature of BerkeleyDB 3.3 and above - return - d = self.d if verbose: print('\n', '-=' * 30) print("Running %s.test99_Truncate..." % self.__class__.__name__) - d.put(b"abcde", b"ABCDE"); + d.put("abcde", "ABCDE"); num = d.truncate() - assert num >= 1, "truncate returned <= 0 on non-empty database" + self.assert_(num >= 1, "truncate returned <= 0 on non-empty database") num = d.truncate() - assert num == 0, "truncate on empty DB returned nonzero (%r)" % (num,) + self.assertEqual(num, 0, + "truncate on empty DB returned nonzero (%r)" % (num,)) #---------------------------------------- @@ -628,6 +634,11 @@ class BasicHashWithEnvTestCase(BasicWithEnvTestCase): #---------------------------------------------------------------------- class BasicTransactionTestCase(BasicTestCase): + import sys + if sys.version_info[:3] < (2, 4, 0): + def assertTrue(self, expr, msg=None): + self.failUnless(expr,msg=msg) + dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT useEnv = 1 envflags = (db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | @@ -653,19 +664,21 @@ class BasicTransactionTestCase(BasicTestCase): print('\n', '-=' * 30) print("Running %s.test06_Transactions..." % self.__class__.__name__) - assert d.get(b'new rec', txn=self.txn) == None - d.put(b'new rec', b'this is a new record', self.txn) - assert d.get(b'new rec', txn=self.txn) == b'this is a new record' + self.assertEqual(d.get('new rec', txn=self.txn), None) + d.put('new rec', 'this is a new record', self.txn) + self.assertEqual(d.get('new rec', txn=self.txn), + 'this is a new record') self.txn.abort() - assert d.get(b'new rec') == None + self.assertEqual(d.get('new rec'), None) self.txn = self.env.txn_begin() - assert d.get(b'new rec', txn=self.txn) == None - d.put(b'new rec', b'this is a new record', self.txn) - assert d.get(b'new rec', txn=self.txn) == b'this is a new record' + self.assertEqual(d.get('new rec', txn=self.txn), None) + d.put('new rec', 'this is a new record', self.txn) + self.assertEqual(d.get('new rec', txn=self.txn), + 'this is a new record') self.txn.commit() - assert d.get(b'new rec') == b'this is a new record' + self.assertEqual(d.get('new rec'), 'this is a new record') self.txn = self.env.txn_begin() c = d.cursor(self.txn) @@ -675,8 +688,8 @@ class BasicTransactionTestCase(BasicTestCase): count = count + 1 if verbose and count % 100 == 0: print(rec) - rec = c.next() - assert count == self._numKeys+1 + rec = next(c) + self.assertEqual(count, self._numKeys+1) c.close() # Cursors *MUST* be closed before commit! self.txn.commit() @@ -687,43 +700,39 @@ class BasicTransactionTestCase(BasicTestCase): except db.DBIncompleteError: pass - if db.version() >= (4,0): - statDict = self.env.log_stat(0); - assert 'magic' in statDict - assert 'version' in statDict - assert 'cur_file' in statDict - assert 'region_nowait' in statDict + statDict = self.env.log_stat(0); + self.assert_('magic' in statDict) + self.assert_('version' in statDict) + self.assert_('cur_file' in statDict) + self.assert_('region_nowait' in statDict) # must have at least one log file present: logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG) - assert logs != None + self.assertNotEqual(logs, None) for log in logs: if verbose: print('log file: ' + log) if db.version() >= (4,2): logs = self.env.log_archive(db.DB_ARCH_REMOVE) - assert not logs + self.assertTrue(not logs) self.txn = self.env.txn_begin() #---------------------------------------- def test07_TxnTruncate(self): - if db.version() < (3,3): - # truncate is a feature of BerkeleyDB 3.3 and above - return - d = self.d if verbose: print('\n', '-=' * 30) print("Running %s.test07_TxnTruncate..." % self.__class__.__name__) - d.put(b"abcde", b"ABCDE"); + d.put("abcde", "ABCDE"); txn = self.env.txn_begin() num = d.truncate(txn) - assert num >= 1, "truncate returned <= 0 on non-empty database" + self.assert_(num >= 1, "truncate returned <= 0 on non-empty database") num = d.truncate(txn) - assert num == 0, "truncate on empty DB returned nonzero (%r)" % (num,) + self.assertEqual(num, 0, + "truncate on empty DB returned nonzero (%r)" % (num,)) txn.commit() #---------------------------------------- @@ -769,20 +778,20 @@ class BTreeRecnoTestCase(BasicTestCase): print("Running %s.test07_RecnoInBTree..." % self.__class__.__name__) rec = d.get(200) - assert type(rec) == type(()) - assert len(rec) == 2 + self.assertEqual(type(rec), type(())) + self.assertEqual(len(rec), 2) if verbose: print("Record #200 is ", rec) c = d.cursor() - c.set(b'0200') + c.set('0200') num = c.get_recno() - assert type(num) == type(1) + self.assertEqual(type(num), type(1)) if verbose: print("recno of d['0200'] is ", num) rec = c.current() - assert c.set_recno(num) == rec + self.assertEqual(c.set_recno(num), rec) c.close() @@ -803,40 +812,39 @@ class BasicDUPTestCase(BasicTestCase): print("Running %s.test08_DuplicateKeys..." % \ self.__class__.__name__) - d.put(b"dup0", b"before") + d.put("dup0", "before") for x in "The quick brown fox jumped over the lazy dog.".split(): - x = x.encode("ascii") - d.put(b"dup1", x) - d.put(b"dup2", b"after") + d.put("dup1", x) + d.put("dup2", "after") - data = d.get(b"dup1") - assert data == b"The" + data = d.get("dup1") + self.assertEqual(data, "The") if verbose: print(data) c = d.cursor() - rec = c.set(b"dup1") - assert rec == (b'dup1', b'The') + rec = c.set("dup1") + self.assertEqual(rec, ('dup1', 'The')) - next = c.next() - assert next == (b'dup1', b'quick') + next_reg = next(c) + self.assertEqual(next_reg, ('dup1', 'quick')) - rec = c.set(b"dup1") + rec = c.set("dup1") count = c.count() - assert count == 9 + self.assertEqual(count, 9) next_dup = c.next_dup() - assert next_dup == (b'dup1', b'quick') + self.assertEqual(next_dup, ('dup1', 'quick')) - rec = c.set(b'dup1') + rec = c.set('dup1') while rec is not None: if verbose: print(rec) rec = c.next_dup() - c.set(b'dup1') + c.set('dup1') rec = c.next_nodup() - assert rec[0] != b'dup1' + self.assertNotEqual(rec[0], 'dup1') if verbose: print(rec) @@ -884,11 +892,9 @@ class BasicMultiDBTestCase(BasicTestCase): self.dbopenflags|db.DB_CREATE) for x in "The quick brown fox jumped over the lazy dog".split(): - x = x.encode("ascii") d2.put(x, self.makeData(x)) - for x in letters: - x = x.encode("ascii") + for x in string.letters: d3.put(x, x*70) d1.sync() @@ -917,8 +923,8 @@ class BasicMultiDBTestCase(BasicTestCase): count = count + 1 if verbose and (count % 50) == 0: print(rec) - rec = c1.next() - assert count == self._numKeys + rec = next(c1) + self.assertEqual(count, self._numKeys) count = 0 rec = c2.first() @@ -926,8 +932,8 @@ class BasicMultiDBTestCase(BasicTestCase): count = count + 1 if verbose: print(rec) - rec = c2.next() - assert count == 9 + rec = next(c2) + self.assertEqual(count, 9) count = 0 rec = c3.first() @@ -935,15 +941,14 @@ class BasicMultiDBTestCase(BasicTestCase): count = count + 1 if verbose: print(rec) - rec = c3.next() - assert count == 52 + rec = next(c3) + self.assertEqual(count, len(string.letters)) c1.close() c2.close() c3.close() - d1.close() d2.close() d3.close() @@ -965,6 +970,55 @@ class HashMultiDBTestCase(BasicMultiDBTestCase): envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK +class PrivateObject(unittest.TestCase) : + import sys + if sys.version_info[:3] < (2, 4, 0): + def assertTrue(self, expr, msg=None): + self.failUnless(expr,msg=msg) + + def tearDown(self) : + del self.obj + + def test01_DefaultIsNone(self) : + self.assertEqual(self.obj.get_private(), None) + + def test02_assignment(self) : + a = "example of private object" + self.obj.set_private(a) + b = self.obj.get_private() + self.assertTrue(a is b) # Object identity + + def test03_leak_assignment(self) : + import sys + a = "example of private object" + refcount = sys.getrefcount(a) + self.obj.set_private(a) + self.assertEqual(refcount+1, sys.getrefcount(a)) + self.obj.set_private(None) + self.assertEqual(refcount, sys.getrefcount(a)) + + def test04_leak_GC(self) : + import sys + a = "example of private object" + refcount = sys.getrefcount(a) + self.obj.set_private(a) + self.obj = None + self.assertEqual(refcount, sys.getrefcount(a)) + +class DBEnvPrivateObject(PrivateObject) : + def setUp(self) : + self.obj = db.DBEnv() + +class DBPrivateObject(PrivateObject) : + def setUp(self) : + self.obj = db.DB() + +class CrashAndBurn(unittest.TestCase) : + def test01_OpenCrash(self) : + # See http://bugs.python.org/issue3307 + self.assertRaises(db.DBInvalidArgError, db.DB, None, 65535) + + #---------------------------------------------------------------------- #---------------------------------------------------------------------- @@ -988,6 +1042,9 @@ def test_suite(): suite.addTest(unittest.makeSuite(HashDUPWithThreadTestCase)) suite.addTest(unittest.makeSuite(BTreeMultiDBTestCase)) suite.addTest(unittest.makeSuite(HashMultiDBTestCase)) + suite.addTest(unittest.makeSuite(DBEnvPrivateObject)) + suite.addTest(unittest.makeSuite(DBPrivateObject)) + #suite.addTest(unittest.makeSuite(CrashAndBurn)) return suite |