summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb/test/test_associate.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/bsddb/test/test_associate.py')
-rw-r--r--Lib/bsddb/test/test_associate.py149
1 files changed, 58 insertions, 91 deletions
diff --git a/Lib/bsddb/test/test_associate.py b/Lib/bsddb/test/test_associate.py
index 2876528..c5d0c92 100644
--- a/Lib/bsddb/test/test_associate.py
+++ b/Lib/bsddb/test/test_associate.py
@@ -2,32 +2,13 @@
TestCases for DB.associate.
"""
-import shutil
-import sys, os
-import tempfile
+import sys, os, string
import time
from pprint import pprint
-try:
- from threading import Thread, current_thread
- have_threads = 1
-except ImportError:
- have_threads = 0
-
import unittest
-from bsddb.test.test_all import verbose
-
-try:
- # For Pythons w/distutils pybsddb
- from bsddb3 import db, dbshelve
-except ImportError:
- # For Python 2.3
- from bsddb import db, dbshelve
-
-try:
- from bsddb3 import test_support
-except ImportError:
- from test import support as test_support
+from .test_all import db, dbshelve, test_support, verbose, have_threads, \
+ get_new_environment_path
#----------------------------------------------------------------------
@@ -97,15 +78,7 @@ musicdata = {
class AssociateErrorTestCase(unittest.TestCase):
def setUp(self):
self.filename = self.__class__.__name__ + '.db'
- homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid())
- self.homeDir = homeDir
- try:
- os.mkdir(homeDir)
- except os.error:
- import glob
- files = glob.glob(os.path.join(self.homeDir, '*'))
- for file in files:
- os.remove(file)
+ self.homeDir = get_new_environment_path()
self.env = db.DBEnv()
self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
@@ -128,7 +101,7 @@ class AssociateErrorTestCase(unittest.TestCase):
secDB.open(self.filename, "secondary", db.DB_BTREE, db.DB_CREATE)
# dupDB has been configured to allow duplicates, it can't
- # associate with a secondary. BerkeleyDB will return an error.
+ # associate with a secondary. Berkeley DB will return an error.
try:
def f(a,b): return a+b
dupDB.associate(secDB, f)
@@ -153,15 +126,7 @@ class AssociateTestCase(unittest.TestCase):
def setUp(self):
self.filename = self.__class__.__name__ + '.db'
- homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid())
- self.homeDir = homeDir
- try:
- os.mkdir(homeDir)
- except os.error:
- import glob
- files = glob.glob(os.path.join(self.homeDir, '*'))
- for file in files:
- os.remove(file)
+ self.homeDir = get_new_environment_path()
self.env = db.DBEnv()
self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL |
db.DB_INIT_LOCK | db.DB_THREAD | self.envFlags)
@@ -170,13 +135,13 @@ class AssociateTestCase(unittest.TestCase):
self.closeDB()
self.env.close()
self.env = None
- shutil.rmtree(self.homeDir)
+ test_support.rmtree(self.homeDir)
def addDataToDB(self, d, txn=None):
- for key, value in musicdata.items():
+ for key, value in list(musicdata.items()):
if type(self.keytype) == type(''):
- key = ("%02d" % key).encode("utf-8")
- d.put(key, '|'.join(value).encode("utf-8"), txn=txn)
+ key = "%02d" % key
+ d.put(key, '|'.join(value), txn=txn)
def createDB(self, txn=None):
self.cur = None
@@ -246,14 +211,14 @@ class AssociateTestCase(unittest.TestCase):
def finish_test(self, secDB, txn=None):
# 'Blues' should not be in the secondary database
- vals = secDB.pget(b'Blues', txn=txn)
- assert vals == None, vals
+ vals = secDB.pget('Blues', txn=txn)
+ self.assertEqual(vals, None, vals)
- vals = secDB.pget(b'Unknown', txn=txn)
- assert vals[0] == 99 or vals[0] == b'99', vals
- vals[1].index(b'Unknown')
- vals[1].index(b'Unnamed')
- vals[1].index(b'unknown')
+ vals = secDB.pget('Unknown', txn=txn)
+ self.assert_(vals[0] == 99 or vals[0] == '99', vals)
+ vals[1].index('Unknown')
+ vals[1].index('Unnamed')
+ vals[1].index('unknown')
if verbose:
print("Primary key traversal:")
@@ -262,14 +227,14 @@ class AssociateTestCase(unittest.TestCase):
rec = self.cur.first()
while rec is not None:
if type(self.keytype) == type(''):
- assert int(rec[0]) # for primary db, key is a number
+ self.assert_(int(rec[0])) # for primary db, key is a number
else:
- assert rec[0] and type(rec[0]) == type(0)
+ self.assert_(rec[0] and type(rec[0]) == type(0))
count = count + 1
if verbose:
print(rec)
- rec = self.cur.next()
- assert count == len(musicdata) # all items accounted for
+ rec = getattr(self.cur, "next")()
+ self.assertEqual(count, len(musicdata)) # all items accounted for
if verbose:
@@ -278,38 +243,39 @@ class AssociateTestCase(unittest.TestCase):
count = 0
# test cursor pget
- vals = self.cur.pget(b'Unknown', flags=db.DB_LAST)
- assert vals[1] == 99 or vals[1] == b'99', vals
- assert vals[0] == b'Unknown'
- vals[2].index(b'Unknown')
- vals[2].index(b'Unnamed')
- vals[2].index(b'unknown')
+ vals = self.cur.pget('Unknown', flags=db.DB_LAST)
+ self.assert_(vals[1] == 99 or vals[1] == '99', vals)
+ self.assertEqual(vals[0], 'Unknown')
+ vals[2].index('Unknown')
+ vals[2].index('Unnamed')
+ vals[2].index('unknown')
- vals = self.cur.pget(b'Unknown', data=b'wrong value', flags=db.DB_GET_BOTH)
- assert vals == None, vals
+ vals = self.cur.pget('Unknown', data='wrong value', flags=db.DB_GET_BOTH)
+ self.assertEqual(vals, None, vals)
rec = self.cur.first()
- assert rec[0] == b"Jazz"
+ self.assertEqual(rec[0], "Jazz")
while rec is not None:
count = count + 1
if verbose:
print(rec)
- rec = self.cur.next()
+ rec = getattr(self.cur, "next")()
# all items accounted for EXCEPT for 1 with "Blues" genre
- assert count == len(musicdata)-1
+ self.assertEqual(count, len(musicdata)-1)
self.cur = None
def getGenre(self, priKey, priData):
- assert type(priData) == type(b"")
- priData = priData.decode("utf-8")
+ self.assertEqual(type(priData), type(""))
+ genre = priData.split('|')[2]
+
if verbose:
print('getGenre key: %r data: %r' % (priKey, priData))
- genre = priData.split('|')[2]
+
if genre == 'Blues':
return db.DB_DONOTINDEX
else:
- return genre.encode("utf-8")
+ return genre
#----------------------------------------------------------------------
@@ -380,21 +346,21 @@ class ShelveAssociateTestCase(AssociateTestCase):
filetype=self.dbtype)
def addDataToDB(self, d):
- for key, value in musicdata.items():
+ for key, value in list(musicdata.items()):
if type(self.keytype) == type(''):
- key = ("%02d" % key).encode("utf-8")
+ key = "%02d" % key
d.put(key, value) # save the value as is this time
def getGenre(self, priKey, priData):
- assert type(priData) == type(())
+ self.assertEqual(type(priData), type(()))
if verbose:
print('getGenre key: %r data: %r' % (priKey, priData))
genre = priData[2]
if genre == 'Blues':
return db.DB_DONOTINDEX
else:
- return genre.encode("utf-8")
+ return genre
class ShelveAssociateHashTestCase(ShelveAssociateTestCase):
@@ -418,15 +384,17 @@ class ThreadedAssociateTestCase(AssociateTestCase):
t2 = Thread(target = self.writer2,
args = (d, ))
+ t1.setDaemon(True)
+ t2.setDaemon(True)
t1.start()
t2.start()
t1.join()
t2.join()
def writer1(self, d):
- for key, value in musicdata.items():
+ for key, value in list(musicdata.items()):
if type(self.keytype) == type(''):
- key = ("%02d" % key).encode("utf-8")
+ key = "%02d" % key
d.put(key, '|'.join(value))
def writer2(self, d):
@@ -452,24 +420,23 @@ class ThreadedAssociateRecnoTestCase(ShelveAssociateTestCase):
def test_suite():
suite = unittest.TestSuite()
- if db.version() >= (3, 3, 11):
- suite.addTest(unittest.makeSuite(AssociateErrorTestCase))
+ suite.addTest(unittest.makeSuite(AssociateErrorTestCase))
- suite.addTest(unittest.makeSuite(AssociateHashTestCase))
- suite.addTest(unittest.makeSuite(AssociateBTreeTestCase))
- suite.addTest(unittest.makeSuite(AssociateRecnoTestCase))
+ suite.addTest(unittest.makeSuite(AssociateHashTestCase))
+ suite.addTest(unittest.makeSuite(AssociateBTreeTestCase))
+ suite.addTest(unittest.makeSuite(AssociateRecnoTestCase))
- if db.version() >= (4, 1):
- suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase))
+ if db.version() >= (4, 1):
+ suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase))
- suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase))
- suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase))
- suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase))
+ suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase))
+ suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase))
+ suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase))
- if have_threads:
- suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase))
- suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase))
- suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase))
+ if have_threads:
+ suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase))
+ suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase))
+ suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase))
return suite