summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb/test/test_dbshelve.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/bsddb/test/test_dbshelve.py')
-rw-r--r--Lib/bsddb/test/test_dbshelve.py187
1 files changed, 100 insertions, 87 deletions
diff --git a/Lib/bsddb/test/test_dbshelve.py b/Lib/bsddb/test/test_dbshelve.py
index 95f7ad1..5149ac8 100644
--- a/Lib/bsddb/test/test_dbshelve.py
+++ b/Lib/bsddb/test/test_dbshelve.py
@@ -2,19 +2,14 @@
TestCases for checking dbShelve objects.
"""
-import os
-import shutil
-import tempfile, random
+import os, string
+import random
import unittest
-from bsddb import db, dbshelve
-try:
- from bsddb3 import test_support
-except ImportError:
- from test import support as test_support
+from .test_all import db, dbshelve, test_support, verbose, \
+ get_new_environment_path, get_new_database_path
-from bsddb.test.test_all import verbose
#----------------------------------------------------------------------
@@ -22,43 +17,44 @@ from bsddb.test.test_all import verbose
# We want the objects to be comparable so we can test dbshelve.values
# later on.
class DataClass:
-
def __init__(self):
self.value = random.random()
- def __repr__(self):
- return "DataClass(%r)" % self.value
-
- def __eq__(self, other):
- value = self.value
- if isinstance(other, DataClass):
- other = other.value
- return value == other
+ def __repr__(self) : # For Python 3.0 comparison
+ return "DataClass %f" %self.value
- def __lt__(self, other):
- value = self.value
- if isinstance(other, DataClass):
- other = other.value
- return value < other
+ def __cmp__(self, other): # For Python 2.x comparison
+ return cmp(self.value, other)
-letters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
class DBShelveTestCase(unittest.TestCase):
def setUp(self):
- self.filename = tempfile.mktemp()
+ import sys
+ if sys.version_info[0] >= 3 :
+ from .test_all import do_proxy_db_py3k
+ self._flag_proxy_db_py3k = do_proxy_db_py3k(False)
+ self.filename = get_new_database_path()
self.do_open()
def tearDown(self):
+ import sys
+ if sys.version_info[0] >= 3 :
+ from .test_all import do_proxy_db_py3k
+ do_proxy_db_py3k(self._flag_proxy_db_py3k)
self.do_close()
test_support.unlink(self.filename)
def mk(self, key):
"""Turn key into an appropriate key type for this db"""
# override in child class for RECNO
- return key.encode("ascii")
+ import sys
+ if sys.version_info[0] < 3 :
+ return key
+ else :
+ return bytes(key, "iso8859-1") # 8 bits
def populateDB(self, d):
- for x in letters:
+ for x in string.letters:
d[self.mk('S' + x)] = 10 * x # add a string
d[self.mk('I' + x)] = ord(x) # add an integer
d[self.mk('L' + x)] = [x] * 10 # add a list
@@ -86,19 +82,13 @@ class DBShelveTestCase(unittest.TestCase):
print("Running %s.test01_basics..." % self.__class__.__name__)
self.populateDB(self.d)
- if verbose:
- print(1, self.d.keys())
self.d.sync()
- if verbose:
- print(2, self.d.keys())
self.do_close()
self.do_open()
- if verbose:
- print(3, self.d.keys())
d = self.d
l = len(d)
- k = d.keys()
+ k = list(d.keys())
s = d.stat()
f = d.fd()
@@ -107,30 +97,37 @@ class DBShelveTestCase(unittest.TestCase):
print("keys:", k)
print("stats:", s)
- self.assertFalse(d.has_key(self.mk('bad key')))
- self.assertTrue(d.has_key(self.mk('IA')), d.keys())
- self.assertTrue(d.has_key(self.mk('OA')))
+ self.assertEqual(0, self.mk('bad key') in d)
+ self.assertEqual(1, self.mk('IA') in d)
+ self.assertEqual(1, self.mk('OA') in d)
d.delete(self.mk('IA'))
del d[self.mk('OA')]
- self.assertFalse(d.has_key(self.mk('IA')))
- self.assertFalse(d.has_key(self.mk('OA')))
+ self.assertEqual(0, self.mk('IA') in d)
+ self.assertEqual(0, self.mk('OA') in d)
self.assertEqual(len(d), l-2)
values = []
- for key in d.keys():
+ for key in list(d.keys()):
value = d[key]
values.append(value)
if verbose:
print("%s: %s" % (key, value))
self.checkrec(key, value)
- dbvalues = sorted(d.values(), key=lambda x: (str(type(x)), x))
- self.assertEqual(len(dbvalues), len(d.keys()))
- values.sort(key=lambda x: (str(type(x)), x))
- self.assertEqual(values, dbvalues, "%r != %r" % (values, dbvalues))
-
- items = d.items()
+ dbvalues = list(d.values())
+ self.assertEqual(len(dbvalues), len(list(d.keys())))
+ import sys
+ if sys.version_info[0] < 3 :
+ values.sort()
+ dbvalues.sort()
+ self.assertEqual(values, dbvalues)
+ else : # XXX: Convert all to strings. Please, improve
+ values.sort(key=lambda x : str(x))
+ dbvalues.sort(key=lambda x : str(x))
+ self.assertEqual(repr(values), repr(dbvalues))
+
+ items = list(d.items())
self.assertEqual(len(items), len(values))
for key, value in items:
@@ -138,16 +135,16 @@ class DBShelveTestCase(unittest.TestCase):
self.assertEqual(d.get(self.mk('bad key')), None)
self.assertEqual(d.get(self.mk('bad key'), None), None)
- self.assertEqual(d.get(self.mk('bad key'), b'a string'), b'a string')
+ self.assertEqual(d.get(self.mk('bad key'), 'a string'), 'a string')
self.assertEqual(d.get(self.mk('bad key'), [1, 2, 3]), [1, 2, 3])
d.set_get_returns_none(0)
self.assertRaises(db.DBNotFoundError, d.get, self.mk('bad key'))
d.set_get_returns_none(1)
- d.put(self.mk('new key'), b'new data')
- self.assertEqual(d.get(self.mk('new key')), b'new data')
- self.assertEqual(d[self.mk('new key')], b'new data')
+ d.put(self.mk('new key'), 'new data')
+ self.assertEqual(d.get(self.mk('new key')), 'new data')
+ self.assertEqual(d[self.mk('new key')], 'new data')
@@ -165,10 +162,11 @@ class DBShelveTestCase(unittest.TestCase):
while rec is not None:
count = count + 1
if verbose:
- print(repr(rec))
+ print(rec)
key, value = rec
self.checkrec(key, value)
- rec = c.next()
+ # Hack to avoid conversion by 2to3 tool
+ rec = getattr(c, "next")()
del c
self.assertEqual(count, len(d))
@@ -191,6 +189,7 @@ class DBShelveTestCase(unittest.TestCase):
self.checkrec(key, value)
del c
+
def test03_append(self):
# NOTE: this is overridden in RECNO subclass, don't change its name.
if verbose:
@@ -198,31 +197,44 @@ class DBShelveTestCase(unittest.TestCase):
print("Running %s.test03_append..." % self.__class__.__name__)
self.assertRaises(dbshelve.DBShelveError,
- self.d.append, b'unit test was here')
+ self.d.append, 'unit test was here')
def checkrec(self, key, value):
# override this in a subclass if the key type is different
- x = key[1:]
- if key[0:1] == b'S':
- self.assertEquals(type(value), str)
- self.assertEquals(value, 10 * x.decode("ascii"))
- elif key[0:1] == b'I':
- self.assertEquals(type(value), int)
- self.assertEquals(value, ord(x))
+ import sys
+ if sys.version_info[0] >= 3 :
+ if isinstance(key, bytes) :
+ key = key.decode("iso8859-1") # 8 bits
- elif key[0:1] == b'L':
- self.assertEquals(type(value), list)
- self.assertEquals(value, [x.decode("ascii")] * 10)
+ x = key[1]
+ if key[0] == 'S':
+ self.assertEqual(type(value), str)
+ self.assertEqual(value, 10 * x)
- elif key[0:1] == b'O':
- self.assertEquals(value.S, 10 * x.decode("ascii"))
- self.assertEquals(value.I, ord(x))
- self.assertEquals(value.L, [x.decode("ascii")] * 10)
+ elif key[0] == 'I':
+ self.assertEqual(type(value), int)
+ self.assertEqual(value, ord(x))
+
+ elif key[0] == 'L':
+ self.assertEqual(type(value), list)
+ self.assertEqual(value, [x] * 10)
+
+ elif key[0] == 'O':
+ import sys
+ if sys.version_info[0] < 3 :
+ from types import InstanceType
+ self.assertEqual(type(value), InstanceType)
+ else :
+ self.assertEqual(type(value), DataClass)
+
+ self.assertEqual(value.S, 10 * x)
+ self.assertEqual(value.I, ord(x))
+ self.assertEqual(value.L, [x] * 10)
else:
- self.fail('Unknown key type, fix the test')
+ self.assert_(0, 'Unknown key type, fix the test')
#----------------------------------------------------------------------
@@ -258,19 +270,12 @@ class ThreadHashShelveTestCase(BasicShelveTestCase):
#----------------------------------------------------------------------
class BasicEnvShelveTestCase(DBShelveTestCase):
- def setUp(self):
- self.homeDir = tempfile.mkdtemp()
- self.filename = 'dbshelve_db_file.db'
- self.do_open()
-
def do_open(self):
- self.homeDir = homeDir = os.path.join(
- tempfile.gettempdir(), 'db_home%d'%os.getpid())
- try: os.mkdir(homeDir)
- except os.error: pass
self.env = db.DBEnv()
- self.env.open(self.homeDir, self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE)
+ self.env.open(self.homeDir,
+ self.envflags | db.DB_INIT_MPOOL | db.DB_CREATE)
+ self.filename = os.path.split(self.filename)[1]
self.d = dbshelve.DBShelf(self.env)
self.d.open(self.filename, self.dbtype, self.dbflags)
@@ -280,7 +285,15 @@ class BasicEnvShelveTestCase(DBShelveTestCase):
self.env.close()
+ def setUp(self) :
+ self.homeDir = get_new_environment_path()
+ DBShelveTestCase.setUp(self)
+
def tearDown(self):
+ import sys
+ if sys.version_info[0] >= 3 :
+ from .test_all import do_proxy_db_py3k
+ do_proxy_db_py3k(self._flag_proxy_db_py3k)
self.do_close()
test_support.rmtree(self.homeDir)
@@ -327,7 +340,7 @@ class RecNoShelveTestCase(BasicShelveTestCase):
def mk(self, key):
if key not in self.key_map:
self.key_map[key] = self.key_pool.pop(0)
- self.intkey_map[self.key_map[key]] = key.encode('ascii')
+ self.intkey_map[self.key_map[key]] = key
return self.key_map[key]
def checkrec(self, intkey, value):
@@ -339,14 +352,14 @@ class RecNoShelveTestCase(BasicShelveTestCase):
print('\n', '-=' * 30)
print("Running %s.test03_append..." % self.__class__.__name__)
- self.d[1] = b'spam'
- self.d[5] = b'eggs'
- self.assertEqual(6, self.d.append(b'spam'))
- self.assertEqual(7, self.d.append(b'baked beans'))
- self.assertEqual(b'spam', self.d.get(6))
- self.assertEqual(b'spam', self.d.get(1))
- self.assertEqual(b'baked beans', self.d.get(7))
- self.assertEqual(b'eggs', self.d.get(5))
+ self.d[1] = 'spam'
+ self.d[5] = 'eggs'
+ self.assertEqual(6, self.d.append('spam'))
+ self.assertEqual(7, self.d.append('baked beans'))
+ self.assertEqual('spam', self.d.get(6))
+ self.assertEqual('spam', self.d.get(1))
+ self.assertEqual('baked beans', self.d.get(7))
+ self.assertEqual('eggs', self.d.get(5))
#----------------------------------------------------------------------