diff options
Diffstat (limited to 'Lib/bsddb/test/test_dbshelve.py')
-rw-r--r-- | Lib/bsddb/test/test_dbshelve.py | 187 |
1 files changed, 100 insertions, 87 deletions
diff --git a/Lib/bsddb/test/test_dbshelve.py b/Lib/bsddb/test/test_dbshelve.py index 95f7ad1..5149ac8 100644 --- a/Lib/bsddb/test/test_dbshelve.py +++ b/Lib/bsddb/test/test_dbshelve.py @@ -2,19 +2,14 @@ TestCases for checking dbShelve objects. """ -import os -import shutil -import tempfile, random +import os, string +import random import unittest -from bsddb import db, dbshelve -try: - from bsddb3 import test_support -except ImportError: - from test import support as test_support +from .test_all import db, dbshelve, test_support, verbose, \ + get_new_environment_path, get_new_database_path -from bsddb.test.test_all import verbose #---------------------------------------------------------------------- @@ -22,43 +17,44 @@ from bsddb.test.test_all import verbose # We want the objects to be comparable so we can test dbshelve.values # later on. class DataClass: - def __init__(self): self.value = random.random() - def __repr__(self): - return "DataClass(%r)" % self.value - - def __eq__(self, other): - value = self.value - if isinstance(other, DataClass): - other = other.value - return value == other + def __repr__(self) : # For Python 3.0 comparison + return "DataClass %f" %self.value - def __lt__(self, other): - value = self.value - if isinstance(other, DataClass): - other = other.value - return value < other + def __cmp__(self, other): # For Python 2.x comparison + return cmp(self.value, other) -letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' class DBShelveTestCase(unittest.TestCase): def setUp(self): - self.filename = tempfile.mktemp() + import sys + if sys.version_info[0] >= 3 : + from .test_all import do_proxy_db_py3k + self._flag_proxy_db_py3k = do_proxy_db_py3k(False) + self.filename = get_new_database_path() self.do_open() def tearDown(self): + import sys + if sys.version_info[0] >= 3 : + from .test_all import do_proxy_db_py3k + do_proxy_db_py3k(self._flag_proxy_db_py3k) self.do_close() test_support.unlink(self.filename) def mk(self, key): """Turn key into an appropriate key type for this db""" # override in child class for RECNO - return key.encode("ascii") + import sys + if sys.version_info[0] < 3 : + return key + else : + return bytes(key, "iso8859-1") # 8 bits def populateDB(self, d): - for x in letters: + for x in string.letters: d[self.mk('S' + x)] = 10 * x # add a string d[self.mk('I' + x)] = ord(x) # add an integer d[self.mk('L' + x)] = [x] * 10 # add a list @@ -86,19 +82,13 @@ class DBShelveTestCase(unittest.TestCase): print("Running %s.test01_basics..." % self.__class__.__name__) self.populateDB(self.d) - if verbose: - print(1, self.d.keys()) self.d.sync() - if verbose: - print(2, self.d.keys()) self.do_close() self.do_open() - if verbose: - print(3, self.d.keys()) d = self.d l = len(d) - k = d.keys() + k = list(d.keys()) s = d.stat() f = d.fd() @@ -107,30 +97,37 @@ class DBShelveTestCase(unittest.TestCase): print("keys:", k) print("stats:", s) - self.assertFalse(d.has_key(self.mk('bad key'))) - self.assertTrue(d.has_key(self.mk('IA')), d.keys()) - self.assertTrue(d.has_key(self.mk('OA'))) + self.assertEqual(0, self.mk('bad key') in d) + self.assertEqual(1, self.mk('IA') in d) + self.assertEqual(1, self.mk('OA') in d) d.delete(self.mk('IA')) del d[self.mk('OA')] - self.assertFalse(d.has_key(self.mk('IA'))) - self.assertFalse(d.has_key(self.mk('OA'))) + self.assertEqual(0, self.mk('IA') in d) + self.assertEqual(0, self.mk('OA') in d) self.assertEqual(len(d), l-2) values = [] - for key in d.keys(): + for key in list(d.keys()): value = d[key] values.append(value) if verbose: print("%s: %s" % (key, value)) self.checkrec(key, value) - dbvalues = sorted(d.values(), key=lambda x: (str(type(x)), x)) - self.assertEqual(len(dbvalues), len(d.keys())) - values.sort(key=lambda x: (str(type(x)), x)) - self.assertEqual(values, dbvalues, "%r != %r" % (values, dbvalues)) - - items = d.items() + dbvalues = list(d.values()) + self.assertEqual(len(dbvalues), len(list(d.keys()))) + import sys + if sys.version_info[0] < 3 : + values.sort() + dbvalues.sort() + self.assertEqual(values, dbvalues) + else : # XXX: Convert all to strings. Please, improve + values.sort(key=lambda x : str(x)) + dbvalues.sort(key=lambda x : str(x)) + self.assertEqual(repr(values), repr(dbvalues)) + + items = list(d.items()) self.assertEqual(len(items), len(values)) for key, value in items: @@ -138,16 +135,16 @@ class DBShelveTestCase(unittest.TestCase): self.assertEqual(d.get(self.mk('bad key')), None) self.assertEqual(d.get(self.mk('bad key'), None), None) - self.assertEqual(d.get(self.mk('bad key'), b'a string'), b'a string') + self.assertEqual(d.get(self.mk('bad key'), 'a string'), 'a string') self.assertEqual(d.get(self.mk('bad key'), [1, 2, 3]), [1, 2, 3]) d.set_get_returns_none(0) self.assertRaises(db.DBNotFoundError, d.get, self.mk('bad key')) d.set_get_returns_none(1) - d.put(self.mk('new key'), b'new data') - self.assertEqual(d.get(self.mk('new key')), b'new data') - self.assertEqual(d[self.mk('new key')], b'new data') + d.put(self.mk('new key'), 'new data') + self.assertEqual(d.get(self.mk('new key')), 'new data') + self.assertEqual(d[self.mk('new key')], 'new data') @@ -165,10 +162,11 @@ class DBShelveTestCase(unittest.TestCase): while rec is not None: count = count + 1 if verbose: - print(repr(rec)) + print(rec) key, value = rec self.checkrec(key, value) - rec = c.next() + # Hack to avoid conversion by 2to3 tool + rec = getattr(c, "next")() del c self.assertEqual(count, len(d)) @@ -191,6 +189,7 @@ class DBShelveTestCase(unittest.TestCase): self.checkrec(key, value) del c + def test03_append(self): # NOTE: this is overridden in RECNO subclass, don't change its name. if verbose: @@ -198,31 +197,44 @@ class DBShelveTestCase(unittest.TestCase): print("Running %s.test03_append..." % self.__class__.__name__) self.assertRaises(dbshelve.DBShelveError, - self.d.append, b'unit test was here') + self.d.append, 'unit test was here') def checkrec(self, key, value): # override this in a subclass if the key type is different - x = key[1:] - if key[0:1] == b'S': - self.assertEquals(type(value), str) - self.assertEquals(value, 10 * x.decode("ascii")) - elif key[0:1] == b'I': - self.assertEquals(type(value), int) - self.assertEquals(value, ord(x)) + import sys + if sys.version_info[0] >= 3 : + if isinstance(key, bytes) : + key = key.decode("iso8859-1") # 8 bits - elif key[0:1] == b'L': - self.assertEquals(type(value), list) - self.assertEquals(value, [x.decode("ascii")] * 10) + x = key[1] + if key[0] == 'S': + self.assertEqual(type(value), str) + self.assertEqual(value, 10 * x) - elif key[0:1] == b'O': - self.assertEquals(value.S, 10 * x.decode("ascii")) - self.assertEquals(value.I, ord(x)) - self.assertEquals(value.L, [x.decode("ascii")] * 10) + elif key[0] == 'I': + self.assertEqual(type(value), int) + self.assertEqual(value, ord(x)) + + elif key[0] == 'L': + self.assertEqual(type(value), list) + self.assertEqual(value, [x] * 10) + + elif key[0] == 'O': + import sys + if sys.version_info[0] < 3 : + from types import InstanceType + self.assertEqual(type(value), InstanceType) + else : + self.assertEqual(type(value), DataClass) + + self.assertEqual(value.S, 10 * x) + self.assertEqual(value.I, ord(x)) + self.assertEqual(value.L, [x] * 10) else: - self.fail('Unknown key type, fix the test') + self.assert_(0, 'Unknown key type, fix the test') #---------------------------------------------------------------------- @@ -258,19 +270,12 @@ class ThreadHashShelveTestCase(BasicShelveTestCase): #---------------------------------------------------------------------- class BasicEnvShelveTestCase(DBShelveTestCase): - def setUp(self): - self.homeDir = tempfile.mkdtemp() - self.filename = 'dbshelve_db_file.db' - self.do_open() - def do_open(self): - self.homeDir = homeDir = os.path.join( - tempfile.gettempdir(), 'db_home%d'%os.getpid()) - try: os.mkdir(homeDir) - except os.error: pass self.env = db.DBEnv() - self.env.open(self.homeDir, self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE) + self.env.open(self.homeDir, + self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE) + self.filename = os.path.split(self.filename)[1] self.d = dbshelve.DBShelf(self.env) self.d.open(self.filename, self.dbtype, self.dbflags) @@ -280,7 +285,15 @@ class BasicEnvShelveTestCase(DBShelveTestCase): self.env.close() + def setUp(self) : + self.homeDir = get_new_environment_path() + DBShelveTestCase.setUp(self) + def tearDown(self): + import sys + if sys.version_info[0] >= 3 : + from .test_all import do_proxy_db_py3k + do_proxy_db_py3k(self._flag_proxy_db_py3k) self.do_close() test_support.rmtree(self.homeDir) @@ -327,7 +340,7 @@ class RecNoShelveTestCase(BasicShelveTestCase): def mk(self, key): if key not in self.key_map: self.key_map[key] = self.key_pool.pop(0) - self.intkey_map[self.key_map[key]] = key.encode('ascii') + self.intkey_map[self.key_map[key]] = key return self.key_map[key] def checkrec(self, intkey, value): @@ -339,14 +352,14 @@ class RecNoShelveTestCase(BasicShelveTestCase): print('\n', '-=' * 30) print("Running %s.test03_append..." % self.__class__.__name__) - self.d[1] = b'spam' - self.d[5] = b'eggs' - self.assertEqual(6, self.d.append(b'spam')) - self.assertEqual(7, self.d.append(b'baked beans')) - self.assertEqual(b'spam', self.d.get(6)) - self.assertEqual(b'spam', self.d.get(1)) - self.assertEqual(b'baked beans', self.d.get(7)) - self.assertEqual(b'eggs', self.d.get(5)) + self.d[1] = 'spam' + self.d[5] = 'eggs' + self.assertEqual(6, self.d.append('spam')) + self.assertEqual(7, self.d.append('baked beans')) + self.assertEqual('spam', self.d.get(6)) + self.assertEqual('spam', self.d.get(1)) + self.assertEqual('baked beans', self.d.get(7)) + self.assertEqual('eggs', self.d.get(5)) #---------------------------------------------------------------------- |