summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb/test/test_basics.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/bsddb/test/test_basics.py')
-rw-r--r--Lib/bsddb/test/test_basics.py465
1 files changed, 261 insertions, 204 deletions
diff --git a/Lib/bsddb/test/test_basics.py b/Lib/bsddb/test/test_basics.py
index 9c829bd..704bc41 100644
--- a/Lib/bsddb/test/test_basics.py
+++ b/Lib/bsddb/test/test_basics.py
@@ -4,29 +4,17 @@ various DB flags, etc.
"""
import os
-import sys
import errno
import string
-import tempfile
from pprint import pprint
import unittest
import time
-try:
- # For Pythons w/distutils pybsddb
- from bsddb3 import db
-except ImportError:
- # For Python 2.3
- from bsddb import db
+from .test_all import db, test_support, verbose, get_new_environment_path, \
+ get_new_database_path
-from bsddb.test.test_all import verbose
-try:
- from bsddb3 import test_support
-except ImportError:
- from test import support as test_support
+DASH = '-'
-DASH = b'-'
-letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
#----------------------------------------------------------------------
@@ -38,8 +26,8 @@ class VersionTestCase(unittest.TestCase):
print('bsddb.db.version(): %s' % (info, ))
print(db.DB_VERSION_STRING)
print('-=' * 20)
- assert info == (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR,
- db.DB_VERSION_PATCH)
+ self.assertEqual(info, (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR,
+ db.DB_VERSION_PATCH))
#----------------------------------------------------------------------
@@ -57,10 +45,7 @@ class BasicTestCase(unittest.TestCase):
def setUp(self):
if self.useEnv:
- homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid())
- self.homeDir = homeDir
- test_support.rmtree(homeDir)
- os.mkdir(homeDir)
+ self.homeDir=get_new_environment_path()
try:
self.env = db.DBEnv()
self.env.set_lg_max(1024*1024)
@@ -68,17 +53,14 @@ class BasicTestCase(unittest.TestCase):
self.env.set_tx_timestamp(int(time.time()))
self.env.set_flags(self.envsetflags, 1)
self.env.open(self.homeDir, self.envflags | db.DB_CREATE)
- old_tempfile_tempdir = tempfile.tempdir
- tempfile.tempdir = self.homeDir
- self.filename = os.path.split(tempfile.mktemp())[1]
- tempfile.tempdir = old_tempfile_tempdir
+ self.filename = "test"
# Yes, a bare except is intended, since we're re-raising the exc.
except:
- test_support.rmtree(homeDir)
+ test_support.rmtree(self.homeDir)
raise
else:
self.env = None
- self.filename = tempfile.mktemp()
+ self.filename = get_new_database_path()
# create and open the DB
self.d = db.DB(self.env)
@@ -100,13 +82,6 @@ class BasicTestCase(unittest.TestCase):
if self.env is not None:
self.env.close()
test_support.rmtree(self.homeDir)
- ## XXX(nnorwitz): is this comment stil valid?
- ## Make a new DBEnv to remove the env files from the home dir.
- ## (It can't be done while the env is open, nor after it has been
- ## closed, so we make a new one to do it.)
- #e = db.DBEnv()
- #e.remove(self.homeDir)
- #os.remove(os.path.join(self.homeDir, self.filename))
else:
os.remove(self.filename)
@@ -117,15 +92,13 @@ class BasicTestCase(unittest.TestCase):
for x in range(self._numKeys//2):
key = '%04d' % (self._numKeys - x) # insert keys in reverse order
- key = key.encode("utf-8")
data = self.makeData(key)
d.put(key, data, _txn)
- d.put(b'empty value', b'', _txn)
+ d.put('empty value', '', _txn)
for x in range(self._numKeys//2-1):
key = '%04d' % x # and now some in forward order
- key = key.encode("utf-8")
data = self.makeData(key)
d.put(key, data, _txn)
@@ -151,49 +124,57 @@ class BasicTestCase(unittest.TestCase):
print('\n', '-=' * 30)
print("Running %s.test01_GetsAndPuts..." % self.__class__.__name__)
- for key in [b'0001', b'0100', b'0400', b'0700', b'0999']:
+ for key in ['0001', '0100', '0400', '0700', '0999']:
data = d.get(key)
if verbose:
print(data)
- assert d.get(b'0321') == b'0321-0321-0321-0321-0321'
+ self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321')
# By default non-existant keys return None...
- assert d.get(b'abcd') == None
+ self.assertEqual(d.get('abcd'), None)
# ...but they raise exceptions in other situations. Call
# set_get_returns_none() to change it.
try:
- d.delete(b'abcd')
+ d.delete('abcd')
except db.DBNotFoundError as val:
- assert val.args[0] == db.DB_NOTFOUND
+ import sys
+ if sys.version_info[0] < 3 :
+ self.assertEqual(val[0], db.DB_NOTFOUND)
+ else :
+ self.assertEqual(val.args[0], db.DB_NOTFOUND)
if verbose: print(val)
else:
self.fail("expected exception")
- d.put(b'abcd', b'a new record')
- assert d.get(b'abcd') == b'a new record'
+ d.put('abcd', 'a new record')
+ self.assertEqual(d.get('abcd'), 'a new record')
- d.put(b'abcd', b'same key')
+ d.put('abcd', 'same key')
if self.dbsetflags & db.DB_DUP:
- assert d.get(b'abcd') == b'a new record'
+ self.assertEqual(d.get('abcd'), 'a new record')
else:
- assert d.get(b'abcd') == b'same key'
+ self.assertEqual(d.get('abcd'), 'same key')
try:
- d.put(b'abcd', b'this should fail', flags=db.DB_NOOVERWRITE)
+ d.put('abcd', 'this should fail', flags=db.DB_NOOVERWRITE)
except db.DBKeyExistError as val:
- assert val.args[0] == db.DB_KEYEXIST
+ import sys
+ if sys.version_info[0] < 3 :
+ self.assertEqual(val[0], db.DB_KEYEXIST)
+ else :
+ self.assertEqual(val.args[0], db.DB_KEYEXIST)
if verbose: print(val)
else:
self.fail("expected exception")
if self.dbsetflags & db.DB_DUP:
- assert d.get(b'abcd') == b'a new record'
+ self.assertEqual(d.get('abcd'), 'a new record')
else:
- assert d.get(b'abcd') == b'same key'
+ self.assertEqual(d.get('abcd'), 'same key')
d.sync()
@@ -207,28 +188,28 @@ class BasicTestCase(unittest.TestCase):
self.d.open(self.filename)
d = self.d
- assert d.get(b'0321') == b'0321-0321-0321-0321-0321'
+ self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321')
if self.dbsetflags & db.DB_DUP:
- assert d.get(b'abcd') == b'a new record'
+ self.assertEqual(d.get('abcd'), 'a new record')
else:
- assert d.get(b'abcd') == b'same key'
+ self.assertEqual(d.get('abcd'), 'same key')
- rec = d.get_both(b'0555', b'0555-0555-0555-0555-0555')
+ rec = d.get_both('0555', '0555-0555-0555-0555-0555')
if verbose:
print(rec)
- assert d.get_both(b'0555', b'bad data') == None
+ self.assertEqual(d.get_both('0555', 'bad data'), None)
# test default value
- data = d.get(b'bad key', b'bad data')
- assert data == b'bad data'
+ data = d.get('bad key', 'bad data')
+ self.assertEqual(data, 'bad data')
# any object can pass through
- data = d.get(b'bad key', self)
- assert data == self
+ data = d.get('bad key', self)
+ self.assertEqual(data, self)
s = d.stat()
- assert type(s) == type({})
+ self.assertEqual(type(s), type({}))
if verbose:
print('d.stat() returned this dictionary:')
pprint(s)
@@ -244,49 +225,51 @@ class BasicTestCase(unittest.TestCase):
print("Running %s.test02_DictionaryMethods..." % \
self.__class__.__name__)
- for key in [b'0002', b'0101', b'0401', b'0701', b'0998']:
+ for key in ['0002', '0101', '0401', '0701', '0998']:
data = d[key]
- assert data == self.makeData(key)
+ self.assertEqual(data, self.makeData(key))
if verbose:
print(data)
- assert len(d) == self._numKeys
- keys = d.keys()
- assert len(keys) == self._numKeys
- assert type(keys) == type([])
+ self.assertEqual(len(d), self._numKeys)
+ keys = list(d.keys())
+ self.assertEqual(len(keys), self._numKeys)
+ self.assertEqual(type(keys), type([]))
- d[b'new record'] = b'a new record'
- assert len(d) == self._numKeys+1
- keys = d.keys()
- assert len(keys) == self._numKeys+1
+ d['new record'] = 'a new record'
+ self.assertEqual(len(d), self._numKeys+1)
+ keys = list(d.keys())
+ self.assertEqual(len(keys), self._numKeys+1)
- d[b'new record'] = b'a replacement record'
- assert len(d) == self._numKeys+1
- keys = d.keys()
- assert len(keys) == self._numKeys+1
+ d['new record'] = 'a replacement record'
+ self.assertEqual(len(d), self._numKeys+1)
+ keys = list(d.keys())
+ self.assertEqual(len(keys), self._numKeys+1)
if verbose:
print("the first 10 keys are:")
pprint(keys[:10])
- assert d[b'new record'] == b'a replacement record'
+ self.assertEqual(d['new record'], 'a replacement record')
- assert d.has_key(b'0001') == 1
- assert d.has_key(b'spam') == 0
+# We check also the positional parameter
+ self.assertEqual(d.has_key('0001', None), 1)
+# We check also the keyword parameter
+ self.assertEqual(d.has_key('spam', txn=None), 0)
- items = d.items()
- assert len(items) == self._numKeys+1
- assert type(items) == type([])
- assert type(items[0]) == type(())
- assert len(items[0]) == 2
+ items = list(d.items())
+ self.assertEqual(len(items), self._numKeys+1)
+ self.assertEqual(type(items), type([]))
+ self.assertEqual(type(items[0]), type(()))
+ self.assertEqual(len(items[0]), 2)
if verbose:
print("the first 10 items are:")
pprint(items[:10])
- values = d.values()
- assert len(values) == self._numKeys+1
- assert type(values) == type([])
+ values = list(d.values())
+ self.assertEqual(len(values), self._numKeys+1)
+ self.assertEqual(type(values), type([]))
if verbose:
print("the first 10 values are:")
@@ -315,17 +298,22 @@ class BasicTestCase(unittest.TestCase):
if verbose and count % 100 == 0:
print(rec)
try:
- rec = c.next()
+ rec = next(c)
except db.DBNotFoundError as val:
if get_raises_error:
- assert val.args[0] == db.DB_NOTFOUND
+ import sys
+ if sys.version_info[0] < 3 :
+ self.assertEqual(val[0], db.DB_NOTFOUND)
+ else :
+ self.assertEqual(val.args[0], db.DB_NOTFOUND)
if verbose: print(val)
rec = None
else:
self.fail("unexpected DBNotFoundError")
- assert c.get_current_size() == len(c.current()[1]), "%s != len(%r)" % (c.get_current_size(), c.current()[1])
+ self.assertEqual(c.get_current_size(), len(c.current()[1]),
+ "%s != len(%r)" % (c.get_current_size(), c.current()[1]))
- assert count == self._numKeys
+ self.assertEqual(count, self._numKeys)
rec = c.last()
@@ -338,73 +326,89 @@ class BasicTestCase(unittest.TestCase):
rec = c.prev()
except db.DBNotFoundError as val:
if get_raises_error:
- assert val.args[0] == db.DB_NOTFOUND
+ import sys
+ if sys.version_info[0] < 3 :
+ self.assertEqual(val[0], db.DB_NOTFOUND)
+ else :
+ self.assertEqual(val.args[0], db.DB_NOTFOUND)
if verbose: print(val)
rec = None
else:
self.fail("unexpected DBNotFoundError")
- assert count == self._numKeys
+ self.assertEqual(count, self._numKeys)
- rec = c.set(b'0505')
+ rec = c.set('0505')
rec2 = c.current()
- assert rec == rec2, (repr(rec),repr(rec2))
- assert rec[0] == b'0505'
- assert rec[1] == self.makeData(b'0505')
- assert c.get_current_size() == len(rec[1])
+ self.assertEqual(rec, rec2)
+ self.assertEqual(rec[0], '0505')
+ self.assertEqual(rec[1], self.makeData('0505'))
+ self.assertEqual(c.get_current_size(), len(rec[1]))
# make sure we get empty values properly
- rec = c.set(b'empty value')
- assert rec[1] == b''
- assert c.get_current_size() == 0
+ rec = c.set('empty value')
+ self.assertEqual(rec[1], '')
+ self.assertEqual(c.get_current_size(), 0)
try:
- n = c.set(b'bad key')
+ n = c.set('bad key')
except db.DBNotFoundError as val:
- assert val.args[0] == db.DB_NOTFOUND
+ import sys
+ if sys.version_info[0] < 3 :
+ self.assertEqual(val[0], db.DB_NOTFOUND)
+ else :
+ self.assertEqual(val.args[0], db.DB_NOTFOUND)
if verbose: print(val)
else:
if set_raises_error:
self.fail("expected exception")
- if n is not None:
+ if n != None:
self.fail("expected None: %r" % (n,))
- rec = c.get_both(b'0404', self.makeData(b'0404'))
- assert rec == (b'0404', self.makeData(b'0404'))
+ rec = c.get_both('0404', self.makeData('0404'))
+ self.assertEqual(rec, ('0404', self.makeData('0404')))
try:
- n = c.get_both(b'0404', b'bad data')
+ n = c.get_both('0404', 'bad data')
except db.DBNotFoundError as val:
- assert val.args[0] == db.DB_NOTFOUND
+ import sys
+ if sys.version_info[0] < 3 :
+ self.assertEqual(val[0], db.DB_NOTFOUND)
+ else :
+ self.assertEqual(val.args[0], db.DB_NOTFOUND)
if verbose: print(val)
else:
if get_raises_error:
self.fail("expected exception")
- if n is not None:
+ if n != None:
self.fail("expected None: %r" % (n,))
if self.d.get_type() == db.DB_BTREE:
- rec = c.set_range(b'011')
+ rec = c.set_range('011')
if verbose:
print("searched for '011', found: ", rec)
- rec = c.set_range(b'011',dlen=0,doff=0)
+ rec = c.set_range('011',dlen=0,doff=0)
if verbose:
print("searched (partial) for '011', found: ", rec)
- if rec[1] != b'': self.fail('expected empty data portion')
+ if rec[1] != '': self.fail('expected empty data portion')
- ev = c.set_range(b'empty value')
+ ev = c.set_range('empty value')
if verbose:
print("search for 'empty value' returned", ev)
- if ev[1] != b'': self.fail('empty value lookup failed')
+ if ev[1] != '': self.fail('empty value lookup failed')
- c.set(b'0499')
+ c.set('0499')
c.delete()
try:
rec = c.current()
except db.DBKeyEmptyError as val:
if get_raises_error:
- assert val.args[0] == db.DB_KEYEMPTY
+ import sys
+ if sys.version_info[0] < 3 :
+ self.assertEqual(val[0], db.DB_KEYEMPTY)
+ else :
+ self.assertEqual(val.args[0], db.DB_KEYEMPTY)
if verbose: print(val)
else:
self.fail("unexpected DBKeyEmptyError")
@@ -412,16 +416,16 @@ class BasicTestCase(unittest.TestCase):
if get_raises_error:
self.fail('DBKeyEmptyError exception expected')
- c.next()
+ next(c)
c2 = c.dup(db.DB_POSITION)
- assert c.current() == c2.current()
+ self.assertEqual(c.current(), c2.current())
- c2.put(b'', b'a new value', db.DB_CURRENT)
- assert c.current() == c2.current()
- assert c.current()[1] == b'a new value'
+ c2.put('', 'a new value', db.DB_CURRENT)
+ self.assertEqual(c.current(), c2.current())
+ self.assertEqual(c.current()[1], 'a new value')
- c2.put(b'', b'er', db.DB_CURRENT, dlen=0, doff=5)
- assert c2.current()[1] == b'a newer value'
+ c2.put('', 'er', db.DB_CURRENT, dlen=0, doff=5)
+ self.assertEqual(c2.current()[1], 'a newer value')
c.close()
c2.close()
@@ -441,7 +445,7 @@ class BasicTestCase(unittest.TestCase):
'put':('', 'spam', db.DB_CURRENT),
'set': ("0505",),
}
- for method, args in methods_to_test.items():
+ for method, args in list(methods_to_test.items()):
try:
if verbose:
print("attempting to use a closed cursor's %s method" % \
@@ -449,7 +453,11 @@ class BasicTestCase(unittest.TestCase):
# a bug may cause a NULL pointer dereference...
getattr(c, method)(*args)
except db.DBError as val:
- assert val.args[0] == 0
+ import sys
+ if sys.version_info[0] < 3 :
+ self.assertEqual(val[0], 0)
+ else :
+ self.assertEqual(val.args[0], 0)
if verbose: print(val)
else:
self.fail("no exception raised when using a buggy cursor's"
@@ -474,7 +482,7 @@ class BasicTestCase(unittest.TestCase):
self.__class__.__name__)
old = self.d.set_get_returns_none(0)
- assert old == 2
+ self.assertEqual(old, 2)
self.test03_SimpleCursorStuff(get_raises_error=1, set_raises_error=1)
def test03b_SimpleCursorWithGetReturnsNone1(self):
@@ -496,9 +504,9 @@ class BasicTestCase(unittest.TestCase):
self.__class__.__name__)
old = self.d.set_get_returns_none(1)
- assert old == 2
+ self.assertEqual(old, 2)
old = self.d.set_get_returns_none(2)
- assert old == 1
+ self.assertEqual(old, 1)
self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=0)
#----------------------------------------
@@ -510,26 +518,27 @@ class BasicTestCase(unittest.TestCase):
print("Running %s.test04_PartialGetAndPut..." % \
self.__class__.__name__)
- key = b"partialTest"
- data = b"1" * 1000 + b"2" * 1000
+ key = "partialTest"
+ data = "1" * 1000 + "2" * 1000
d.put(key, data)
- assert d.get(key) == data
- assert d.get(key, dlen=20, doff=990) == (b"1" * 10) + (b"2" * 10)
+ self.assertEqual(d.get(key), data)
+ self.assertEqual(d.get(key, dlen=20, doff=990),
+ ("1" * 10) + ("2" * 10))
- d.put(b"partialtest2", (b"1" * 30000) + b"robin" )
- assert d.get(b"partialtest2", dlen=5, doff=30000) == b"robin"
+ d.put("partialtest2", ("1" * 30000) + "robin" )
+ self.assertEqual(d.get("partialtest2", dlen=5, doff=30000), "robin")
# There seems to be a bug in DB here... Commented out the test for
# now.
- ##assert d.get("partialtest2", dlen=5, doff=30010) == ""
+ ##self.assertEqual(d.get("partialtest2", dlen=5, doff=30010), "")
if self.dbsetflags != db.DB_DUP:
# Partial put with duplicate records requires a cursor
- d.put(key, b"0000", dlen=2000, doff=0)
- assert d.get(key) == b"0000"
+ d.put(key, "0000", dlen=2000, doff=0)
+ self.assertEqual(d.get(key), "0000")
- d.put(key, b"1111", dlen=1, doff=2)
- assert d.get(key) == b"0011110"
+ d.put(key, "1111", dlen=1, doff=2)
+ self.assertEqual(d.get(key), "0011110")
#----------------------------------------
@@ -540,30 +549,27 @@ class BasicTestCase(unittest.TestCase):
print("Running %s.test05_GetSize..." % self.__class__.__name__)
for i in range(1, 50000, 500):
- key = ("size%s" % i).encode("utf-8")
+ key = "size%s" % i
#print "before ", i,
- d.put(key, b"1" * i)
+ d.put(key, "1" * i)
#print "after",
- assert d.get_size(key) == i
+ self.assertEqual(d.get_size(key), i)
#print "done"
#----------------------------------------
def test06_Truncate(self):
- if db.version() < (3,3):
- # truncate is a feature of BerkeleyDB 3.3 and above
- return
-
d = self.d
if verbose:
print('\n', '-=' * 30)
print("Running %s.test99_Truncate..." % self.__class__.__name__)
- d.put(b"abcde", b"ABCDE");
+ d.put("abcde", "ABCDE");
num = d.truncate()
- assert num >= 1, "truncate returned <= 0 on non-empty database"
+ self.assert_(num >= 1, "truncate returned <= 0 on non-empty database")
num = d.truncate()
- assert num == 0, "truncate on empty DB returned nonzero (%r)" % (num,)
+ self.assertEqual(num, 0,
+ "truncate on empty DB returned nonzero (%r)" % (num,))
#----------------------------------------
@@ -628,6 +634,11 @@ class BasicHashWithEnvTestCase(BasicWithEnvTestCase):
#----------------------------------------------------------------------
class BasicTransactionTestCase(BasicTestCase):
+ import sys
+ if sys.version_info[:3] < (2, 4, 0):
+ def assertTrue(self, expr, msg=None):
+ self.failUnless(expr,msg=msg)
+
dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
useEnv = 1
envflags = (db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
@@ -653,19 +664,21 @@ class BasicTransactionTestCase(BasicTestCase):
print('\n', '-=' * 30)
print("Running %s.test06_Transactions..." % self.__class__.__name__)
- assert d.get(b'new rec', txn=self.txn) == None
- d.put(b'new rec', b'this is a new record', self.txn)
- assert d.get(b'new rec', txn=self.txn) == b'this is a new record'
+ self.assertEqual(d.get('new rec', txn=self.txn), None)
+ d.put('new rec', 'this is a new record', self.txn)
+ self.assertEqual(d.get('new rec', txn=self.txn),
+ 'this is a new record')
self.txn.abort()
- assert d.get(b'new rec') == None
+ self.assertEqual(d.get('new rec'), None)
self.txn = self.env.txn_begin()
- assert d.get(b'new rec', txn=self.txn) == None
- d.put(b'new rec', b'this is a new record', self.txn)
- assert d.get(b'new rec', txn=self.txn) == b'this is a new record'
+ self.assertEqual(d.get('new rec', txn=self.txn), None)
+ d.put('new rec', 'this is a new record', self.txn)
+ self.assertEqual(d.get('new rec', txn=self.txn),
+ 'this is a new record')
self.txn.commit()
- assert d.get(b'new rec') == b'this is a new record'
+ self.assertEqual(d.get('new rec'), 'this is a new record')
self.txn = self.env.txn_begin()
c = d.cursor(self.txn)
@@ -675,8 +688,8 @@ class BasicTransactionTestCase(BasicTestCase):
count = count + 1
if verbose and count % 100 == 0:
print(rec)
- rec = c.next()
- assert count == self._numKeys+1
+ rec = next(c)
+ self.assertEqual(count, self._numKeys+1)
c.close() # Cursors *MUST* be closed before commit!
self.txn.commit()
@@ -687,43 +700,39 @@ class BasicTransactionTestCase(BasicTestCase):
except db.DBIncompleteError:
pass
- if db.version() >= (4,0):
- statDict = self.env.log_stat(0);
- assert 'magic' in statDict
- assert 'version' in statDict
- assert 'cur_file' in statDict
- assert 'region_nowait' in statDict
+ statDict = self.env.log_stat(0);
+ self.assert_('magic' in statDict)
+ self.assert_('version' in statDict)
+ self.assert_('cur_file' in statDict)
+ self.assert_('region_nowait' in statDict)
# must have at least one log file present:
logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG)
- assert logs != None
+ self.assertNotEqual(logs, None)
for log in logs:
if verbose:
print('log file: ' + log)
if db.version() >= (4,2):
logs = self.env.log_archive(db.DB_ARCH_REMOVE)
- assert not logs
+ self.assertTrue(not logs)
self.txn = self.env.txn_begin()
#----------------------------------------
def test07_TxnTruncate(self):
- if db.version() < (3,3):
- # truncate is a feature of BerkeleyDB 3.3 and above
- return
-
d = self.d
if verbose:
print('\n', '-=' * 30)
print("Running %s.test07_TxnTruncate..." % self.__class__.__name__)
- d.put(b"abcde", b"ABCDE");
+ d.put("abcde", "ABCDE");
txn = self.env.txn_begin()
num = d.truncate(txn)
- assert num >= 1, "truncate returned <= 0 on non-empty database"
+ self.assert_(num >= 1, "truncate returned <= 0 on non-empty database")
num = d.truncate(txn)
- assert num == 0, "truncate on empty DB returned nonzero (%r)" % (num,)
+ self.assertEqual(num, 0,
+ "truncate on empty DB returned nonzero (%r)" % (num,))
txn.commit()
#----------------------------------------
@@ -769,20 +778,20 @@ class BTreeRecnoTestCase(BasicTestCase):
print("Running %s.test07_RecnoInBTree..." % self.__class__.__name__)
rec = d.get(200)
- assert type(rec) == type(())
- assert len(rec) == 2
+ self.assertEqual(type(rec), type(()))
+ self.assertEqual(len(rec), 2)
if verbose:
print("Record #200 is ", rec)
c = d.cursor()
- c.set(b'0200')
+ c.set('0200')
num = c.get_recno()
- assert type(num) == type(1)
+ self.assertEqual(type(num), type(1))
if verbose:
print("recno of d['0200'] is ", num)
rec = c.current()
- assert c.set_recno(num) == rec
+ self.assertEqual(c.set_recno(num), rec)
c.close()
@@ -803,40 +812,39 @@ class BasicDUPTestCase(BasicTestCase):
print("Running %s.test08_DuplicateKeys..." % \
self.__class__.__name__)
- d.put(b"dup0", b"before")
+ d.put("dup0", "before")
for x in "The quick brown fox jumped over the lazy dog.".split():
- x = x.encode("ascii")
- d.put(b"dup1", x)
- d.put(b"dup2", b"after")
+ d.put("dup1", x)
+ d.put("dup2", "after")
- data = d.get(b"dup1")
- assert data == b"The"
+ data = d.get("dup1")
+ self.assertEqual(data, "The")
if verbose:
print(data)
c = d.cursor()
- rec = c.set(b"dup1")
- assert rec == (b'dup1', b'The')
+ rec = c.set("dup1")
+ self.assertEqual(rec, ('dup1', 'The'))
- next = c.next()
- assert next == (b'dup1', b'quick')
+ next_reg = next(c)
+ self.assertEqual(next_reg, ('dup1', 'quick'))
- rec = c.set(b"dup1")
+ rec = c.set("dup1")
count = c.count()
- assert count == 9
+ self.assertEqual(count, 9)
next_dup = c.next_dup()
- assert next_dup == (b'dup1', b'quick')
+ self.assertEqual(next_dup, ('dup1', 'quick'))
- rec = c.set(b'dup1')
+ rec = c.set('dup1')
while rec is not None:
if verbose:
print(rec)
rec = c.next_dup()
- c.set(b'dup1')
+ c.set('dup1')
rec = c.next_nodup()
- assert rec[0] != b'dup1'
+ self.assertNotEqual(rec[0], 'dup1')
if verbose:
print(rec)
@@ -884,11 +892,9 @@ class BasicMultiDBTestCase(BasicTestCase):
self.dbopenflags|db.DB_CREATE)
for x in "The quick brown fox jumped over the lazy dog".split():
- x = x.encode("ascii")
d2.put(x, self.makeData(x))
- for x in letters:
- x = x.encode("ascii")
+ for x in string.letters:
d3.put(x, x*70)
d1.sync()
@@ -917,8 +923,8 @@ class BasicMultiDBTestCase(BasicTestCase):
count = count + 1
if verbose and (count % 50) == 0:
print(rec)
- rec = c1.next()
- assert count == self._numKeys
+ rec = next(c1)
+ self.assertEqual(count, self._numKeys)
count = 0
rec = c2.first()
@@ -926,8 +932,8 @@ class BasicMultiDBTestCase(BasicTestCase):
count = count + 1
if verbose:
print(rec)
- rec = c2.next()
- assert count == 9
+ rec = next(c2)
+ self.assertEqual(count, 9)
count = 0
rec = c3.first()
@@ -935,15 +941,14 @@ class BasicMultiDBTestCase(BasicTestCase):
count = count + 1
if verbose:
print(rec)
- rec = c3.next()
- assert count == 52
+ rec = next(c3)
+ self.assertEqual(count, len(string.letters))
c1.close()
c2.close()
c3.close()
- d1.close()
d2.close()
d3.close()
@@ -965,6 +970,55 @@ class HashMultiDBTestCase(BasicMultiDBTestCase):
envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
+class PrivateObject(unittest.TestCase) :
+ import sys
+ if sys.version_info[:3] < (2, 4, 0):
+ def assertTrue(self, expr, msg=None):
+ self.failUnless(expr,msg=msg)
+
+ def tearDown(self) :
+ del self.obj
+
+ def test01_DefaultIsNone(self) :
+ self.assertEqual(self.obj.get_private(), None)
+
+ def test02_assignment(self) :
+ a = "example of private object"
+ self.obj.set_private(a)
+ b = self.obj.get_private()
+ self.assertTrue(a is b) # Object identity
+
+ def test03_leak_assignment(self) :
+ import sys
+ a = "example of private object"
+ refcount = sys.getrefcount(a)
+ self.obj.set_private(a)
+ self.assertEqual(refcount+1, sys.getrefcount(a))
+ self.obj.set_private(None)
+ self.assertEqual(refcount, sys.getrefcount(a))
+
+ def test04_leak_GC(self) :
+ import sys
+ a = "example of private object"
+ refcount = sys.getrefcount(a)
+ self.obj.set_private(a)
+ self.obj = None
+ self.assertEqual(refcount, sys.getrefcount(a))
+
+class DBEnvPrivateObject(PrivateObject) :
+ def setUp(self) :
+ self.obj = db.DBEnv()
+
+class DBPrivateObject(PrivateObject) :
+ def setUp(self) :
+ self.obj = db.DB()
+
+class CrashAndBurn(unittest.TestCase) :
+ def test01_OpenCrash(self) :
+ # See http://bugs.python.org/issue3307
+ self.assertRaises(db.DBInvalidArgError, db.DB, None, 65535)
+
+
#----------------------------------------------------------------------
#----------------------------------------------------------------------
@@ -988,6 +1042,9 @@ def test_suite():
suite.addTest(unittest.makeSuite(HashDUPWithThreadTestCase))
suite.addTest(unittest.makeSuite(BTreeMultiDBTestCase))
suite.addTest(unittest.makeSuite(HashMultiDBTestCase))
+ suite.addTest(unittest.makeSuite(DBEnvPrivateObject))
+ suite.addTest(unittest.makeSuite(DBPrivateObject))
+ #suite.addTest(unittest.makeSuite(CrashAndBurn))
return suite