diff options
author | Jesus Cea <jcea@jcea.es> | 2008-08-31 14:12:11 (GMT) |
---|---|---|
committer | Jesus Cea <jcea@jcea.es> | 2008-08-31 14:12:11 (GMT) |
commit | 6ba3329c274e2c7876c61f2e98d4592310d26bae (patch) | |
tree | 6bb346e892269279fa2011c3e4bd4648b273a7ae /Lib/bsddb/test/test_basics.py | |
parent | 73c96dbf34c70bbf1ef807b98d51cf9c0e9dc042 (diff) | |
download | cpython-6ba3329c274e2c7876c61f2e98d4592310d26bae.zip cpython-6ba3329c274e2c7876c61f2e98d4592310d26bae.tar.gz cpython-6ba3329c274e2c7876c61f2e98d4592310d26bae.tar.bz2 |
bsddb code updated to version 4.7.3pre2. This code is the same than
Python 2.6 one, since the intention is to keep an unified 2.x/3.x
codebase.
The Python code is automatically translated using "2to3". Please, do not
update this code in Python 3.0 by hand. Update the 2.6 one and then do
"2to3".
Diffstat (limited to 'Lib/bsddb/test/test_basics.py')
-rw-r--r-- | Lib/bsddb/test/test_basics.py | 465 |
1 files changed, 261 insertions, 204 deletions
diff --git a/Lib/bsddb/test/test_basics.py b/Lib/bsddb/test/test_basics.py index 9c829bd..704bc41 100644 --- a/Lib/bsddb/test/test_basics.py +++ b/Lib/bsddb/test/test_basics.py @@ -4,29 +4,17 @@ various DB flags, etc. """ import os -import sys import errno import string -import tempfile from pprint import pprint import unittest import time -try: - # For Pythons w/distutils pybsddb - from bsddb3 import db -except ImportError: - # For Python 2.3 - from bsddb import db +from .test_all import db, test_support, verbose, get_new_environment_path, \ + get_new_database_path -from bsddb.test.test_all import verbose -try: - from bsddb3 import test_support -except ImportError: - from test import support as test_support +DASH = '-' -DASH = b'-' -letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' #---------------------------------------------------------------------- @@ -38,8 +26,8 @@ class VersionTestCase(unittest.TestCase): print('bsddb.db.version(): %s' % (info, )) print(db.DB_VERSION_STRING) print('-=' * 20) - assert info == (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR, - db.DB_VERSION_PATCH) + self.assertEqual(info, (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR, + db.DB_VERSION_PATCH)) #---------------------------------------------------------------------- @@ -57,10 +45,7 @@ class BasicTestCase(unittest.TestCase): def setUp(self): if self.useEnv: - homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid()) - self.homeDir = homeDir - test_support.rmtree(homeDir) - os.mkdir(homeDir) + self.homeDir=get_new_environment_path() try: self.env = db.DBEnv() self.env.set_lg_max(1024*1024) @@ -68,17 +53,14 @@ class BasicTestCase(unittest.TestCase): self.env.set_tx_timestamp(int(time.time())) self.env.set_flags(self.envsetflags, 1) self.env.open(self.homeDir, self.envflags | db.DB_CREATE) - old_tempfile_tempdir = tempfile.tempdir - tempfile.tempdir = self.homeDir - self.filename = os.path.split(tempfile.mktemp())[1] - tempfile.tempdir = old_tempfile_tempdir + self.filename = "test" # Yes, a bare except is intended, since we're re-raising the exc. except: - test_support.rmtree(homeDir) + test_support.rmtree(self.homeDir) raise else: self.env = None - self.filename = tempfile.mktemp() + self.filename = get_new_database_path() # create and open the DB self.d = db.DB(self.env) @@ -100,13 +82,6 @@ class BasicTestCase(unittest.TestCase): if self.env is not None: self.env.close() test_support.rmtree(self.homeDir) - ## XXX(nnorwitz): is this comment stil valid? - ## Make a new DBEnv to remove the env files from the home dir. - ## (It can't be done while the env is open, nor after it has been - ## closed, so we make a new one to do it.) - #e = db.DBEnv() - #e.remove(self.homeDir) - #os.remove(os.path.join(self.homeDir, self.filename)) else: os.remove(self.filename) @@ -117,15 +92,13 @@ class BasicTestCase(unittest.TestCase): for x in range(self._numKeys//2): key = '%04d' % (self._numKeys - x) # insert keys in reverse order - key = key.encode("utf-8") data = self.makeData(key) d.put(key, data, _txn) - d.put(b'empty value', b'', _txn) + d.put('empty value', '', _txn) for x in range(self._numKeys//2-1): key = '%04d' % x # and now some in forward order - key = key.encode("utf-8") data = self.makeData(key) d.put(key, data, _txn) @@ -151,49 +124,57 @@ class BasicTestCase(unittest.TestCase): print('\n', '-=' * 30) print("Running %s.test01_GetsAndPuts..." % self.__class__.__name__) - for key in [b'0001', b'0100', b'0400', b'0700', b'0999']: + for key in ['0001', '0100', '0400', '0700', '0999']: data = d.get(key) if verbose: print(data) - assert d.get(b'0321') == b'0321-0321-0321-0321-0321' + self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321') # By default non-existant keys return None... - assert d.get(b'abcd') == None + self.assertEqual(d.get('abcd'), None) # ...but they raise exceptions in other situations. Call # set_get_returns_none() to change it. try: - d.delete(b'abcd') + d.delete('abcd') except db.DBNotFoundError as val: - assert val.args[0] == db.DB_NOTFOUND + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_NOTFOUND) + else : + self.assertEqual(val.args[0], db.DB_NOTFOUND) if verbose: print(val) else: self.fail("expected exception") - d.put(b'abcd', b'a new record') - assert d.get(b'abcd') == b'a new record' + d.put('abcd', 'a new record') + self.assertEqual(d.get('abcd'), 'a new record') - d.put(b'abcd', b'same key') + d.put('abcd', 'same key') if self.dbsetflags & db.DB_DUP: - assert d.get(b'abcd') == b'a new record' + self.assertEqual(d.get('abcd'), 'a new record') else: - assert d.get(b'abcd') == b'same key' + self.assertEqual(d.get('abcd'), 'same key') try: - d.put(b'abcd', b'this should fail', flags=db.DB_NOOVERWRITE) + d.put('abcd', 'this should fail', flags=db.DB_NOOVERWRITE) except db.DBKeyExistError as val: - assert val.args[0] == db.DB_KEYEXIST + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_KEYEXIST) + else : + self.assertEqual(val.args[0], db.DB_KEYEXIST) if verbose: print(val) else: self.fail("expected exception") if self.dbsetflags & db.DB_DUP: - assert d.get(b'abcd') == b'a new record' + self.assertEqual(d.get('abcd'), 'a new record') else: - assert d.get(b'abcd') == b'same key' + self.assertEqual(d.get('abcd'), 'same key') d.sync() @@ -207,28 +188,28 @@ class BasicTestCase(unittest.TestCase): self.d.open(self.filename) d = self.d - assert d.get(b'0321') == b'0321-0321-0321-0321-0321' + self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321') if self.dbsetflags & db.DB_DUP: - assert d.get(b'abcd') == b'a new record' + self.assertEqual(d.get('abcd'), 'a new record') else: - assert d.get(b'abcd') == b'same key' + self.assertEqual(d.get('abcd'), 'same key') - rec = d.get_both(b'0555', b'0555-0555-0555-0555-0555') + rec = d.get_both('0555', '0555-0555-0555-0555-0555') if verbose: print(rec) - assert d.get_both(b'0555', b'bad data') == None + self.assertEqual(d.get_both('0555', 'bad data'), None) # test default value - data = d.get(b'bad key', b'bad data') - assert data == b'bad data' + data = d.get('bad key', 'bad data') + self.assertEqual(data, 'bad data') # any object can pass through - data = d.get(b'bad key', self) - assert data == self + data = d.get('bad key', self) + self.assertEqual(data, self) s = d.stat() - assert type(s) == type({}) + self.assertEqual(type(s), type({})) if verbose: print('d.stat() returned this dictionary:') pprint(s) @@ -244,49 +225,51 @@ class BasicTestCase(unittest.TestCase): print("Running %s.test02_DictionaryMethods..." % \ self.__class__.__name__) - for key in [b'0002', b'0101', b'0401', b'0701', b'0998']: + for key in ['0002', '0101', '0401', '0701', '0998']: data = d[key] - assert data == self.makeData(key) + self.assertEqual(data, self.makeData(key)) if verbose: print(data) - assert len(d) == self._numKeys - keys = d.keys() - assert len(keys) == self._numKeys - assert type(keys) == type([]) + self.assertEqual(len(d), self._numKeys) + keys = list(d.keys()) + self.assertEqual(len(keys), self._numKeys) + self.assertEqual(type(keys), type([])) - d[b'new record'] = b'a new record' - assert len(d) == self._numKeys+1 - keys = d.keys() - assert len(keys) == self._numKeys+1 + d['new record'] = 'a new record' + self.assertEqual(len(d), self._numKeys+1) + keys = list(d.keys()) + self.assertEqual(len(keys), self._numKeys+1) - d[b'new record'] = b'a replacement record' - assert len(d) == self._numKeys+1 - keys = d.keys() - assert len(keys) == self._numKeys+1 + d['new record'] = 'a replacement record' + self.assertEqual(len(d), self._numKeys+1) + keys = list(d.keys()) + self.assertEqual(len(keys), self._numKeys+1) if verbose: print("the first 10 keys are:") pprint(keys[:10]) - assert d[b'new record'] == b'a replacement record' + self.assertEqual(d['new record'], 'a replacement record') - assert d.has_key(b'0001') == 1 - assert d.has_key(b'spam') == 0 +# We check also the positional parameter + self.assertEqual(d.has_key('0001', None), 1) +# We check also the keyword parameter + self.assertEqual(d.has_key('spam', txn=None), 0) - items = d.items() - assert len(items) == self._numKeys+1 - assert type(items) == type([]) - assert type(items[0]) == type(()) - assert len(items[0]) == 2 + items = list(d.items()) + self.assertEqual(len(items), self._numKeys+1) + self.assertEqual(type(items), type([])) + self.assertEqual(type(items[0]), type(())) + self.assertEqual(len(items[0]), 2) if verbose: print("the first 10 items are:") pprint(items[:10]) - values = d.values() - assert len(values) == self._numKeys+1 - assert type(values) == type([]) + values = list(d.values()) + self.assertEqual(len(values), self._numKeys+1) + self.assertEqual(type(values), type([])) if verbose: print("the first 10 values are:") @@ -315,17 +298,22 @@ class BasicTestCase(unittest.TestCase): if verbose and count % 100 == 0: print(rec) try: - rec = c.next() + rec = next(c) except db.DBNotFoundError as val: if get_raises_error: - assert val.args[0] == db.DB_NOTFOUND + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_NOTFOUND) + else : + self.assertEqual(val.args[0], db.DB_NOTFOUND) if verbose: print(val) rec = None else: self.fail("unexpected DBNotFoundError") - assert c.get_current_size() == len(c.current()[1]), "%s != len(%r)" % (c.get_current_size(), c.current()[1]) + self.assertEqual(c.get_current_size(), len(c.current()[1]), + "%s != len(%r)" % (c.get_current_size(), c.current()[1])) - assert count == self._numKeys + self.assertEqual(count, self._numKeys) rec = c.last() @@ -338,73 +326,89 @@ class BasicTestCase(unittest.TestCase): rec = c.prev() except db.DBNotFoundError as val: if get_raises_error: - assert val.args[0] == db.DB_NOTFOUND + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_NOTFOUND) + else : + self.assertEqual(val.args[0], db.DB_NOTFOUND) if verbose: print(val) rec = None else: self.fail("unexpected DBNotFoundError") - assert count == self._numKeys + self.assertEqual(count, self._numKeys) - rec = c.set(b'0505') + rec = c.set('0505') rec2 = c.current() - assert rec == rec2, (repr(rec),repr(rec2)) - assert rec[0] == b'0505' - assert rec[1] == self.makeData(b'0505') - assert c.get_current_size() == len(rec[1]) + self.assertEqual(rec, rec2) + self.assertEqual(rec[0], '0505') + self.assertEqual(rec[1], self.makeData('0505')) + self.assertEqual(c.get_current_size(), len(rec[1])) # make sure we get empty values properly - rec = c.set(b'empty value') - assert rec[1] == b'' - assert c.get_current_size() == 0 + rec = c.set('empty value') + self.assertEqual(rec[1], '') + self.assertEqual(c.get_current_size(), 0) try: - n = c.set(b'bad key') + n = c.set('bad key') except db.DBNotFoundError as val: - assert val.args[0] == db.DB_NOTFOUND + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_NOTFOUND) + else : + self.assertEqual(val.args[0], db.DB_NOTFOUND) if verbose: print(val) else: if set_raises_error: self.fail("expected exception") - if n is not None: + if n != None: self.fail("expected None: %r" % (n,)) - rec = c.get_both(b'0404', self.makeData(b'0404')) - assert rec == (b'0404', self.makeData(b'0404')) + rec = c.get_both('0404', self.makeData('0404')) + self.assertEqual(rec, ('0404', self.makeData('0404'))) try: - n = c.get_both(b'0404', b'bad data') + n = c.get_both('0404', 'bad data') except db.DBNotFoundError as val: - assert val.args[0] == db.DB_NOTFOUND + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_NOTFOUND) + else : + self.assertEqual(val.args[0], db.DB_NOTFOUND) if verbose: print(val) else: if get_raises_error: self.fail("expected exception") - if n is not None: + if n != None: self.fail("expected None: %r" % (n,)) if self.d.get_type() == db.DB_BTREE: - rec = c.set_range(b'011') + rec = c.set_range('011') if verbose: print("searched for '011', found: ", rec) - rec = c.set_range(b'011',dlen=0,doff=0) + rec = c.set_range('011',dlen=0,doff=0) if verbose: print("searched (partial) for '011', found: ", rec) - if rec[1] != b'': self.fail('expected empty data portion') + if rec[1] != '': self.fail('expected empty data portion') - ev = c.set_range(b'empty value') + ev = c.set_range('empty value') if verbose: print("search for 'empty value' returned", ev) - if ev[1] != b'': self.fail('empty value lookup failed') + if ev[1] != '': self.fail('empty value lookup failed') - c.set(b'0499') + c.set('0499') c.delete() try: rec = c.current() except db.DBKeyEmptyError as val: if get_raises_error: - assert val.args[0] == db.DB_KEYEMPTY + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], db.DB_KEYEMPTY) + else : + self.assertEqual(val.args[0], db.DB_KEYEMPTY) if verbose: print(val) else: self.fail("unexpected DBKeyEmptyError") @@ -412,16 +416,16 @@ class BasicTestCase(unittest.TestCase): if get_raises_error: self.fail('DBKeyEmptyError exception expected') - c.next() + next(c) c2 = c.dup(db.DB_POSITION) - assert c.current() == c2.current() + self.assertEqual(c.current(), c2.current()) - c2.put(b'', b'a new value', db.DB_CURRENT) - assert c.current() == c2.current() - assert c.current()[1] == b'a new value' + c2.put('', 'a new value', db.DB_CURRENT) + self.assertEqual(c.current(), c2.current()) + self.assertEqual(c.current()[1], 'a new value') - c2.put(b'', b'er', db.DB_CURRENT, dlen=0, doff=5) - assert c2.current()[1] == b'a newer value' + c2.put('', 'er', db.DB_CURRENT, dlen=0, doff=5) + self.assertEqual(c2.current()[1], 'a newer value') c.close() c2.close() @@ -441,7 +445,7 @@ class BasicTestCase(unittest.TestCase): 'put':('', 'spam', db.DB_CURRENT), 'set': ("0505",), } - for method, args in methods_to_test.items(): + for method, args in list(methods_to_test.items()): try: if verbose: print("attempting to use a closed cursor's %s method" % \ @@ -449,7 +453,11 @@ class BasicTestCase(unittest.TestCase): # a bug may cause a NULL pointer dereference... getattr(c, method)(*args) except db.DBError as val: - assert val.args[0] == 0 + import sys + if sys.version_info[0] < 3 : + self.assertEqual(val[0], 0) + else : + self.assertEqual(val.args[0], 0) if verbose: print(val) else: self.fail("no exception raised when using a buggy cursor's" @@ -474,7 +482,7 @@ class BasicTestCase(unittest.TestCase): self.__class__.__name__) old = self.d.set_get_returns_none(0) - assert old == 2 + self.assertEqual(old, 2) self.test03_SimpleCursorStuff(get_raises_error=1, set_raises_error=1) def test03b_SimpleCursorWithGetReturnsNone1(self): @@ -496,9 +504,9 @@ class BasicTestCase(unittest.TestCase): self.__class__.__name__) old = self.d.set_get_returns_none(1) - assert old == 2 + self.assertEqual(old, 2) old = self.d.set_get_returns_none(2) - assert old == 1 + self.assertEqual(old, 1) self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=0) #---------------------------------------- @@ -510,26 +518,27 @@ class BasicTestCase(unittest.TestCase): print("Running %s.test04_PartialGetAndPut..." % \ self.__class__.__name__) - key = b"partialTest" - data = b"1" * 1000 + b"2" * 1000 + key = "partialTest" + data = "1" * 1000 + "2" * 1000 d.put(key, data) - assert d.get(key) == data - assert d.get(key, dlen=20, doff=990) == (b"1" * 10) + (b"2" * 10) + self.assertEqual(d.get(key), data) + self.assertEqual(d.get(key, dlen=20, doff=990), + ("1" * 10) + ("2" * 10)) - d.put(b"partialtest2", (b"1" * 30000) + b"robin" ) - assert d.get(b"partialtest2", dlen=5, doff=30000) == b"robin" + d.put("partialtest2", ("1" * 30000) + "robin" ) + self.assertEqual(d.get("partialtest2", dlen=5, doff=30000), "robin") # There seems to be a bug in DB here... Commented out the test for # now. - ##assert d.get("partialtest2", dlen=5, doff=30010) == "" + ##self.assertEqual(d.get("partialtest2", dlen=5, doff=30010), "") if self.dbsetflags != db.DB_DUP: # Partial put with duplicate records requires a cursor - d.put(key, b"0000", dlen=2000, doff=0) - assert d.get(key) == b"0000" + d.put(key, "0000", dlen=2000, doff=0) + self.assertEqual(d.get(key), "0000") - d.put(key, b"1111", dlen=1, doff=2) - assert d.get(key) == b"0011110" + d.put(key, "1111", dlen=1, doff=2) + self.assertEqual(d.get(key), "0011110") #---------------------------------------- @@ -540,30 +549,27 @@ class BasicTestCase(unittest.TestCase): print("Running %s.test05_GetSize..." % self.__class__.__name__) for i in range(1, 50000, 500): - key = ("size%s" % i).encode("utf-8") + key = "size%s" % i #print "before ", i, - d.put(key, b"1" * i) + d.put(key, "1" * i) #print "after", - assert d.get_size(key) == i + self.assertEqual(d.get_size(key), i) #print "done" #---------------------------------------- def test06_Truncate(self): - if db.version() < (3,3): - # truncate is a feature of BerkeleyDB 3.3 and above - return - d = self.d if verbose: print('\n', '-=' * 30) print("Running %s.test99_Truncate..." % self.__class__.__name__) - d.put(b"abcde", b"ABCDE"); + d.put("abcde", "ABCDE"); num = d.truncate() - assert num >= 1, "truncate returned <= 0 on non-empty database" + self.assert_(num >= 1, "truncate returned <= 0 on non-empty database") num = d.truncate() - assert num == 0, "truncate on empty DB returned nonzero (%r)" % (num,) + self.assertEqual(num, 0, + "truncate on empty DB returned nonzero (%r)" % (num,)) #---------------------------------------- @@ -628,6 +634,11 @@ class BasicHashWithEnvTestCase(BasicWithEnvTestCase): #---------------------------------------------------------------------- class BasicTransactionTestCase(BasicTestCase): + import sys + if sys.version_info[:3] < (2, 4, 0): + def assertTrue(self, expr, msg=None): + self.failUnless(expr,msg=msg) + dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT useEnv = 1 envflags = (db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | @@ -653,19 +664,21 @@ class BasicTransactionTestCase(BasicTestCase): print('\n', '-=' * 30) print("Running %s.test06_Transactions..." % self.__class__.__name__) - assert d.get(b'new rec', txn=self.txn) == None - d.put(b'new rec', b'this is a new record', self.txn) - assert d.get(b'new rec', txn=self.txn) == b'this is a new record' + self.assertEqual(d.get('new rec', txn=self.txn), None) + d.put('new rec', 'this is a new record', self.txn) + self.assertEqual(d.get('new rec', txn=self.txn), + 'this is a new record') self.txn.abort() - assert d.get(b'new rec') == None + self.assertEqual(d.get('new rec'), None) self.txn = self.env.txn_begin() - assert d.get(b'new rec', txn=self.txn) == None - d.put(b'new rec', b'this is a new record', self.txn) - assert d.get(b'new rec', txn=self.txn) == b'this is a new record' + self.assertEqual(d.get('new rec', txn=self.txn), None) + d.put('new rec', 'this is a new record', self.txn) + self.assertEqual(d.get('new rec', txn=self.txn), + 'this is a new record') self.txn.commit() - assert d.get(b'new rec') == b'this is a new record' + self.assertEqual(d.get('new rec'), 'this is a new record') self.txn = self.env.txn_begin() c = d.cursor(self.txn) @@ -675,8 +688,8 @@ class BasicTransactionTestCase(BasicTestCase): count = count + 1 if verbose and count % 100 == 0: print(rec) - rec = c.next() - assert count == self._numKeys+1 + rec = next(c) + self.assertEqual(count, self._numKeys+1) c.close() # Cursors *MUST* be closed before commit! self.txn.commit() @@ -687,43 +700,39 @@ class BasicTransactionTestCase(BasicTestCase): except db.DBIncompleteError: pass - if db.version() >= (4,0): - statDict = self.env.log_stat(0); - assert 'magic' in statDict - assert 'version' in statDict - assert 'cur_file' in statDict - assert 'region_nowait' in statDict + statDict = self.env.log_stat(0); + self.assert_('magic' in statDict) + self.assert_('version' in statDict) + self.assert_('cur_file' in statDict) + self.assert_('region_nowait' in statDict) # must have at least one log file present: logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG) - assert logs != None + self.assertNotEqual(logs, None) for log in logs: if verbose: print('log file: ' + log) if db.version() >= (4,2): logs = self.env.log_archive(db.DB_ARCH_REMOVE) - assert not logs + self.assertTrue(not logs) self.txn = self.env.txn_begin() #---------------------------------------- def test07_TxnTruncate(self): - if db.version() < (3,3): - # truncate is a feature of BerkeleyDB 3.3 and above - return - d = self.d if verbose: print('\n', '-=' * 30) print("Running %s.test07_TxnTruncate..." % self.__class__.__name__) - d.put(b"abcde", b"ABCDE"); + d.put("abcde", "ABCDE"); txn = self.env.txn_begin() num = d.truncate(txn) - assert num >= 1, "truncate returned <= 0 on non-empty database" + self.assert_(num >= 1, "truncate returned <= 0 on non-empty database") num = d.truncate(txn) - assert num == 0, "truncate on empty DB returned nonzero (%r)" % (num,) + self.assertEqual(num, 0, + "truncate on empty DB returned nonzero (%r)" % (num,)) txn.commit() #---------------------------------------- @@ -769,20 +778,20 @@ class BTreeRecnoTestCase(BasicTestCase): print("Running %s.test07_RecnoInBTree..." % self.__class__.__name__) rec = d.get(200) - assert type(rec) == type(()) - assert len(rec) == 2 + self.assertEqual(type(rec), type(())) + self.assertEqual(len(rec), 2) if verbose: print("Record #200 is ", rec) c = d.cursor() - c.set(b'0200') + c.set('0200') num = c.get_recno() - assert type(num) == type(1) + self.assertEqual(type(num), type(1)) if verbose: print("recno of d['0200'] is ", num) rec = c.current() - assert c.set_recno(num) == rec + self.assertEqual(c.set_recno(num), rec) c.close() @@ -803,40 +812,39 @@ class BasicDUPTestCase(BasicTestCase): print("Running %s.test08_DuplicateKeys..." % \ self.__class__.__name__) - d.put(b"dup0", b"before") + d.put("dup0", "before") for x in "The quick brown fox jumped over the lazy dog.".split(): - x = x.encode("ascii") - d.put(b"dup1", x) - d.put(b"dup2", b"after") + d.put("dup1", x) + d.put("dup2", "after") - data = d.get(b"dup1") - assert data == b"The" + data = d.get("dup1") + self.assertEqual(data, "The") if verbose: print(data) c = d.cursor() - rec = c.set(b"dup1") - assert rec == (b'dup1', b'The') + rec = c.set("dup1") + self.assertEqual(rec, ('dup1', 'The')) - next = c.next() - assert next == (b'dup1', b'quick') + next_reg = next(c) + self.assertEqual(next_reg, ('dup1', 'quick')) - rec = c.set(b"dup1") + rec = c.set("dup1") count = c.count() - assert count == 9 + self.assertEqual(count, 9) next_dup = c.next_dup() - assert next_dup == (b'dup1', b'quick') + self.assertEqual(next_dup, ('dup1', 'quick')) - rec = c.set(b'dup1') + rec = c.set('dup1') while rec is not None: if verbose: print(rec) rec = c.next_dup() - c.set(b'dup1') + c.set('dup1') rec = c.next_nodup() - assert rec[0] != b'dup1' + self.assertNotEqual(rec[0], 'dup1') if verbose: print(rec) @@ -884,11 +892,9 @@ class BasicMultiDBTestCase(BasicTestCase): self.dbopenflags|db.DB_CREATE) for x in "The quick brown fox jumped over the lazy dog".split(): - x = x.encode("ascii") d2.put(x, self.makeData(x)) - for x in letters: - x = x.encode("ascii") + for x in string.letters: d3.put(x, x*70) d1.sync() @@ -917,8 +923,8 @@ class BasicMultiDBTestCase(BasicTestCase): count = count + 1 if verbose and (count % 50) == 0: print(rec) - rec = c1.next() - assert count == self._numKeys + rec = next(c1) + self.assertEqual(count, self._numKeys) count = 0 rec = c2.first() @@ -926,8 +932,8 @@ class BasicMultiDBTestCase(BasicTestCase): count = count + 1 if verbose: print(rec) - rec = c2.next() - assert count == 9 + rec = next(c2) + self.assertEqual(count, 9) count = 0 rec = c3.first() @@ -935,15 +941,14 @@ class BasicMultiDBTestCase(BasicTestCase): count = count + 1 if verbose: print(rec) - rec = c3.next() - assert count == 52 + rec = next(c3) + self.assertEqual(count, len(string.letters)) c1.close() c2.close() c3.close() - d1.close() d2.close() d3.close() @@ -965,6 +970,55 @@ class HashMultiDBTestCase(BasicMultiDBTestCase): envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK +class PrivateObject(unittest.TestCase) : + import sys + if sys.version_info[:3] < (2, 4, 0): + def assertTrue(self, expr, msg=None): + self.failUnless(expr,msg=msg) + + def tearDown(self) : + del self.obj + + def test01_DefaultIsNone(self) : + self.assertEqual(self.obj.get_private(), None) + + def test02_assignment(self) : + a = "example of private object" + self.obj.set_private(a) + b = self.obj.get_private() + self.assertTrue(a is b) # Object identity + + def test03_leak_assignment(self) : + import sys + a = "example of private object" + refcount = sys.getrefcount(a) + self.obj.set_private(a) + self.assertEqual(refcount+1, sys.getrefcount(a)) + self.obj.set_private(None) + self.assertEqual(refcount, sys.getrefcount(a)) + + def test04_leak_GC(self) : + import sys + a = "example of private object" + refcount = sys.getrefcount(a) + self.obj.set_private(a) + self.obj = None + self.assertEqual(refcount, sys.getrefcount(a)) + +class DBEnvPrivateObject(PrivateObject) : + def setUp(self) : + self.obj = db.DBEnv() + +class DBPrivateObject(PrivateObject) : + def setUp(self) : + self.obj = db.DB() + +class CrashAndBurn(unittest.TestCase) : + def test01_OpenCrash(self) : + # See http://bugs.python.org/issue3307 + self.assertRaises(db.DBInvalidArgError, db.DB, None, 65535) + + #---------------------------------------------------------------------- #---------------------------------------------------------------------- @@ -988,6 +1042,9 @@ def test_suite(): suite.addTest(unittest.makeSuite(HashDUPWithThreadTestCase)) suite.addTest(unittest.makeSuite(BTreeMultiDBTestCase)) suite.addTest(unittest.makeSuite(HashMultiDBTestCase)) + suite.addTest(unittest.makeSuite(DBEnvPrivateObject)) + suite.addTest(unittest.makeSuite(DBPrivateObject)) + #suite.addTest(unittest.makeSuite(CrashAndBurn)) return suite |