diff options
Diffstat (limited to 'Lib/bsddb/test/test_associate.py')
| -rw-r--r-- | Lib/bsddb/test/test_associate.py | 149 |
1 files changed, 58 insertions, 91 deletions
diff --git a/Lib/bsddb/test/test_associate.py b/Lib/bsddb/test/test_associate.py index 2876528..c5d0c92 100644 --- a/Lib/bsddb/test/test_associate.py +++ b/Lib/bsddb/test/test_associate.py @@ -2,32 +2,13 @@ TestCases for DB.associate. """ -import shutil -import sys, os -import tempfile +import sys, os, string import time from pprint import pprint -try: - from threading import Thread, current_thread - have_threads = 1 -except ImportError: - have_threads = 0 - import unittest -from bsddb.test.test_all import verbose - -try: - # For Pythons w/distutils pybsddb - from bsddb3 import db, dbshelve -except ImportError: - # For Python 2.3 - from bsddb import db, dbshelve - -try: - from bsddb3 import test_support -except ImportError: - from test import support as test_support +from .test_all import db, dbshelve, test_support, verbose, have_threads, \ + get_new_environment_path #---------------------------------------------------------------------- @@ -97,15 +78,7 @@ musicdata = { class AssociateErrorTestCase(unittest.TestCase): def setUp(self): self.filename = self.__class__.__name__ + '.db' - homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid()) - self.homeDir = homeDir - try: - os.mkdir(homeDir) - except os.error: - import glob - files = glob.glob(os.path.join(self.homeDir, '*')) - for file in files: - os.remove(file) + self.homeDir = get_new_environment_path() self.env = db.DBEnv() self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL) @@ -128,7 +101,7 @@ class AssociateErrorTestCase(unittest.TestCase): secDB.open(self.filename, "secondary", db.DB_BTREE, db.DB_CREATE) # dupDB has been configured to allow duplicates, it can't - # associate with a secondary. BerkeleyDB will return an error. + # associate with a secondary. Berkeley DB will return an error. try: def f(a,b): return a+b dupDB.associate(secDB, f) @@ -153,15 +126,7 @@ class AssociateTestCase(unittest.TestCase): def setUp(self): self.filename = self.__class__.__name__ + '.db' - homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid()) - self.homeDir = homeDir - try: - os.mkdir(homeDir) - except os.error: - import glob - files = glob.glob(os.path.join(self.homeDir, '*')) - for file in files: - os.remove(file) + self.homeDir = get_new_environment_path() self.env = db.DBEnv() self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | db.DB_THREAD | self.envFlags) @@ -170,13 +135,13 @@ class AssociateTestCase(unittest.TestCase): self.closeDB() self.env.close() self.env = None - shutil.rmtree(self.homeDir) + test_support.rmtree(self.homeDir) def addDataToDB(self, d, txn=None): - for key, value in musicdata.items(): + for key, value in list(musicdata.items()): if type(self.keytype) == type(''): - key = ("%02d" % key).encode("utf-8") - d.put(key, '|'.join(value).encode("utf-8"), txn=txn) + key = "%02d" % key + d.put(key, '|'.join(value), txn=txn) def createDB(self, txn=None): self.cur = None @@ -246,14 +211,14 @@ class AssociateTestCase(unittest.TestCase): def finish_test(self, secDB, txn=None): # 'Blues' should not be in the secondary database - vals = secDB.pget(b'Blues', txn=txn) - assert vals == None, vals + vals = secDB.pget('Blues', txn=txn) + self.assertEqual(vals, None, vals) - vals = secDB.pget(b'Unknown', txn=txn) - assert vals[0] == 99 or vals[0] == b'99', vals - vals[1].index(b'Unknown') - vals[1].index(b'Unnamed') - vals[1].index(b'unknown') + vals = secDB.pget('Unknown', txn=txn) + self.assert_(vals[0] == 99 or vals[0] == '99', vals) + vals[1].index('Unknown') + vals[1].index('Unnamed') + vals[1].index('unknown') if verbose: print("Primary key traversal:") @@ -262,14 +227,14 @@ class AssociateTestCase(unittest.TestCase): rec = self.cur.first() while rec is not None: if type(self.keytype) == type(''): - assert int(rec[0]) # for primary db, key is a number + self.assert_(int(rec[0])) # for primary db, key is a number else: - assert rec[0] and type(rec[0]) == type(0) + self.assert_(rec[0] and type(rec[0]) == type(0)) count = count + 1 if verbose: print(rec) - rec = self.cur.next() - assert count == len(musicdata) # all items accounted for + rec = getattr(self.cur, "next")() + self.assertEqual(count, len(musicdata)) # all items accounted for if verbose: @@ -278,38 +243,39 @@ class AssociateTestCase(unittest.TestCase): count = 0 # test cursor pget - vals = self.cur.pget(b'Unknown', flags=db.DB_LAST) - assert vals[1] == 99 or vals[1] == b'99', vals - assert vals[0] == b'Unknown' - vals[2].index(b'Unknown') - vals[2].index(b'Unnamed') - vals[2].index(b'unknown') + vals = self.cur.pget('Unknown', flags=db.DB_LAST) + self.assert_(vals[1] == 99 or vals[1] == '99', vals) + self.assertEqual(vals[0], 'Unknown') + vals[2].index('Unknown') + vals[2].index('Unnamed') + vals[2].index('unknown') - vals = self.cur.pget(b'Unknown', data=b'wrong value', flags=db.DB_GET_BOTH) - assert vals == None, vals + vals = self.cur.pget('Unknown', data='wrong value', flags=db.DB_GET_BOTH) + self.assertEqual(vals, None, vals) rec = self.cur.first() - assert rec[0] == b"Jazz" + self.assertEqual(rec[0], "Jazz") while rec is not None: count = count + 1 if verbose: print(rec) - rec = self.cur.next() + rec = getattr(self.cur, "next")() # all items accounted for EXCEPT for 1 with "Blues" genre - assert count == len(musicdata)-1 + self.assertEqual(count, len(musicdata)-1) self.cur = None def getGenre(self, priKey, priData): - assert type(priData) == type(b"") - priData = priData.decode("utf-8") + self.assertEqual(type(priData), type("")) + genre = priData.split('|')[2] + if verbose: print('getGenre key: %r data: %r' % (priKey, priData)) - genre = priData.split('|')[2] + if genre == 'Blues': return db.DB_DONOTINDEX else: - return genre.encode("utf-8") + return genre #---------------------------------------------------------------------- @@ -380,21 +346,21 @@ class ShelveAssociateTestCase(AssociateTestCase): filetype=self.dbtype) def addDataToDB(self, d): - for key, value in musicdata.items(): + for key, value in list(musicdata.items()): if type(self.keytype) == type(''): - key = ("%02d" % key).encode("utf-8") + key = "%02d" % key d.put(key, value) # save the value as is this time def getGenre(self, priKey, priData): - assert type(priData) == type(()) + self.assertEqual(type(priData), type(())) if verbose: print('getGenre key: %r data: %r' % (priKey, priData)) genre = priData[2] if genre == 'Blues': return db.DB_DONOTINDEX else: - return genre.encode("utf-8") + return genre class ShelveAssociateHashTestCase(ShelveAssociateTestCase): @@ -418,15 +384,17 @@ class ThreadedAssociateTestCase(AssociateTestCase): t2 = Thread(target = self.writer2, args = (d, )) + t1.setDaemon(True) + t2.setDaemon(True) t1.start() t2.start() t1.join() t2.join() def writer1(self, d): - for key, value in musicdata.items(): + for key, value in list(musicdata.items()): if type(self.keytype) == type(''): - key = ("%02d" % key).encode("utf-8") + key = "%02d" % key d.put(key, '|'.join(value)) def writer2(self, d): @@ -452,24 +420,23 @@ class ThreadedAssociateRecnoTestCase(ShelveAssociateTestCase): def test_suite(): suite = unittest.TestSuite() - if db.version() >= (3, 3, 11): - suite.addTest(unittest.makeSuite(AssociateErrorTestCase)) + suite.addTest(unittest.makeSuite(AssociateErrorTestCase)) - suite.addTest(unittest.makeSuite(AssociateHashTestCase)) - suite.addTest(unittest.makeSuite(AssociateBTreeTestCase)) - suite.addTest(unittest.makeSuite(AssociateRecnoTestCase)) + suite.addTest(unittest.makeSuite(AssociateHashTestCase)) + suite.addTest(unittest.makeSuite(AssociateBTreeTestCase)) + suite.addTest(unittest.makeSuite(AssociateRecnoTestCase)) - if db.version() >= (4, 1): - suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase)) + if db.version() >= (4, 1): + suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase)) - suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase)) - suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase)) - suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase)) + suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase)) + suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase)) + suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase)) - if have_threads: - suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase)) - suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase)) - suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase)) + if have_threads: + suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase)) + suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase)) + suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase)) return suite |
