diff options
| author | Jesus Cea <jcea@jcea.es> | 2008-08-31 14:12:11 (GMT) |
|---|---|---|
| committer | Jesus Cea <jcea@jcea.es> | 2008-08-31 14:12:11 (GMT) |
| commit | 6ba3329c274e2c7876c61f2e98d4592310d26bae (patch) | |
| tree | 6bb346e892269279fa2011c3e4bd4648b273a7ae /Lib/bsddb/test/test_associate.py | |
| parent | 73c96dbf34c70bbf1ef807b98d51cf9c0e9dc042 (diff) | |
| download | cpython-6ba3329c274e2c7876c61f2e98d4592310d26bae.zip cpython-6ba3329c274e2c7876c61f2e98d4592310d26bae.tar.gz cpython-6ba3329c274e2c7876c61f2e98d4592310d26bae.tar.bz2 | |
bsddb code updated to version 4.7.3pre2. This code is the same than
Python 2.6 one, since the intention is to keep an unified 2.x/3.x
codebase.
The Python code is automatically translated using "2to3". Please, do not
update this code in Python 3.0 by hand. Update the 2.6 one and then do
"2to3".
Diffstat (limited to 'Lib/bsddb/test/test_associate.py')
| -rw-r--r-- | Lib/bsddb/test/test_associate.py | 149 |
1 files changed, 58 insertions, 91 deletions
diff --git a/Lib/bsddb/test/test_associate.py b/Lib/bsddb/test/test_associate.py index 2876528..c5d0c92 100644 --- a/Lib/bsddb/test/test_associate.py +++ b/Lib/bsddb/test/test_associate.py @@ -2,32 +2,13 @@ TestCases for DB.associate. """ -import shutil -import sys, os -import tempfile +import sys, os, string import time from pprint import pprint -try: - from threading import Thread, current_thread - have_threads = 1 -except ImportError: - have_threads = 0 - import unittest -from bsddb.test.test_all import verbose - -try: - # For Pythons w/distutils pybsddb - from bsddb3 import db, dbshelve -except ImportError: - # For Python 2.3 - from bsddb import db, dbshelve - -try: - from bsddb3 import test_support -except ImportError: - from test import support as test_support +from .test_all import db, dbshelve, test_support, verbose, have_threads, \ + get_new_environment_path #---------------------------------------------------------------------- @@ -97,15 +78,7 @@ musicdata = { class AssociateErrorTestCase(unittest.TestCase): def setUp(self): self.filename = self.__class__.__name__ + '.db' - homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid()) - self.homeDir = homeDir - try: - os.mkdir(homeDir) - except os.error: - import glob - files = glob.glob(os.path.join(self.homeDir, '*')) - for file in files: - os.remove(file) + self.homeDir = get_new_environment_path() self.env = db.DBEnv() self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL) @@ -128,7 +101,7 @@ class AssociateErrorTestCase(unittest.TestCase): secDB.open(self.filename, "secondary", db.DB_BTREE, db.DB_CREATE) # dupDB has been configured to allow duplicates, it can't - # associate with a secondary. BerkeleyDB will return an error. + # associate with a secondary. Berkeley DB will return an error. try: def f(a,b): return a+b dupDB.associate(secDB, f) @@ -153,15 +126,7 @@ class AssociateTestCase(unittest.TestCase): def setUp(self): self.filename = self.__class__.__name__ + '.db' - homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid()) - self.homeDir = homeDir - try: - os.mkdir(homeDir) - except os.error: - import glob - files = glob.glob(os.path.join(self.homeDir, '*')) - for file in files: - os.remove(file) + self.homeDir = get_new_environment_path() self.env = db.DBEnv() self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | db.DB_THREAD | self.envFlags) @@ -170,13 +135,13 @@ class AssociateTestCase(unittest.TestCase): self.closeDB() self.env.close() self.env = None - shutil.rmtree(self.homeDir) + test_support.rmtree(self.homeDir) def addDataToDB(self, d, txn=None): - for key, value in musicdata.items(): + for key, value in list(musicdata.items()): if type(self.keytype) == type(''): - key = ("%02d" % key).encode("utf-8") - d.put(key, '|'.join(value).encode("utf-8"), txn=txn) + key = "%02d" % key + d.put(key, '|'.join(value), txn=txn) def createDB(self, txn=None): self.cur = None @@ -246,14 +211,14 @@ class AssociateTestCase(unittest.TestCase): def finish_test(self, secDB, txn=None): # 'Blues' should not be in the secondary database - vals = secDB.pget(b'Blues', txn=txn) - assert vals == None, vals + vals = secDB.pget('Blues', txn=txn) + self.assertEqual(vals, None, vals) - vals = secDB.pget(b'Unknown', txn=txn) - assert vals[0] == 99 or vals[0] == b'99', vals - vals[1].index(b'Unknown') - vals[1].index(b'Unnamed') - vals[1].index(b'unknown') + vals = secDB.pget('Unknown', txn=txn) + self.assert_(vals[0] == 99 or vals[0] == '99', vals) + vals[1].index('Unknown') + vals[1].index('Unnamed') + vals[1].index('unknown') if verbose: print("Primary key traversal:") @@ -262,14 +227,14 @@ class AssociateTestCase(unittest.TestCase): rec = self.cur.first() while rec is not None: if type(self.keytype) == type(''): - assert int(rec[0]) # for primary db, key is a number + self.assert_(int(rec[0])) # for primary db, key is a number else: - assert rec[0] and type(rec[0]) == type(0) + self.assert_(rec[0] and type(rec[0]) == type(0)) count = count + 1 if verbose: print(rec) - rec = self.cur.next() - assert count == len(musicdata) # all items accounted for + rec = getattr(self.cur, "next")() + self.assertEqual(count, len(musicdata)) # all items accounted for if verbose: @@ -278,38 +243,39 @@ class AssociateTestCase(unittest.TestCase): count = 0 # test cursor pget - vals = self.cur.pget(b'Unknown', flags=db.DB_LAST) - assert vals[1] == 99 or vals[1] == b'99', vals - assert vals[0] == b'Unknown' - vals[2].index(b'Unknown') - vals[2].index(b'Unnamed') - vals[2].index(b'unknown') + vals = self.cur.pget('Unknown', flags=db.DB_LAST) + self.assert_(vals[1] == 99 or vals[1] == '99', vals) + self.assertEqual(vals[0], 'Unknown') + vals[2].index('Unknown') + vals[2].index('Unnamed') + vals[2].index('unknown') - vals = self.cur.pget(b'Unknown', data=b'wrong value', flags=db.DB_GET_BOTH) - assert vals == None, vals + vals = self.cur.pget('Unknown', data='wrong value', flags=db.DB_GET_BOTH) + self.assertEqual(vals, None, vals) rec = self.cur.first() - assert rec[0] == b"Jazz" + self.assertEqual(rec[0], "Jazz") while rec is not None: count = count + 1 if verbose: print(rec) - rec = self.cur.next() + rec = getattr(self.cur, "next")() # all items accounted for EXCEPT for 1 with "Blues" genre - assert count == len(musicdata)-1 + self.assertEqual(count, len(musicdata)-1) self.cur = None def getGenre(self, priKey, priData): - assert type(priData) == type(b"") - priData = priData.decode("utf-8") + self.assertEqual(type(priData), type("")) + genre = priData.split('|')[2] + if verbose: print('getGenre key: %r data: %r' % (priKey, priData)) - genre = priData.split('|')[2] + if genre == 'Blues': return db.DB_DONOTINDEX else: - return genre.encode("utf-8") + return genre #---------------------------------------------------------------------- @@ -380,21 +346,21 @@ class ShelveAssociateTestCase(AssociateTestCase): filetype=self.dbtype) def addDataToDB(self, d): - for key, value in musicdata.items(): + for key, value in list(musicdata.items()): if type(self.keytype) == type(''): - key = ("%02d" % key).encode("utf-8") + key = "%02d" % key d.put(key, value) # save the value as is this time def getGenre(self, priKey, priData): - assert type(priData) == type(()) + self.assertEqual(type(priData), type(())) if verbose: print('getGenre key: %r data: %r' % (priKey, priData)) genre = priData[2] if genre == 'Blues': return db.DB_DONOTINDEX else: - return genre.encode("utf-8") + return genre class ShelveAssociateHashTestCase(ShelveAssociateTestCase): @@ -418,15 +384,17 @@ class ThreadedAssociateTestCase(AssociateTestCase): t2 = Thread(target = self.writer2, args = (d, )) + t1.setDaemon(True) + t2.setDaemon(True) t1.start() t2.start() t1.join() t2.join() def writer1(self, d): - for key, value in musicdata.items(): + for key, value in list(musicdata.items()): if type(self.keytype) == type(''): - key = ("%02d" % key).encode("utf-8") + key = "%02d" % key d.put(key, '|'.join(value)) def writer2(self, d): @@ -452,24 +420,23 @@ class ThreadedAssociateRecnoTestCase(ShelveAssociateTestCase): def test_suite(): suite = unittest.TestSuite() - if db.version() >= (3, 3, 11): - suite.addTest(unittest.makeSuite(AssociateErrorTestCase)) + suite.addTest(unittest.makeSuite(AssociateErrorTestCase)) - suite.addTest(unittest.makeSuite(AssociateHashTestCase)) - suite.addTest(unittest.makeSuite(AssociateBTreeTestCase)) - suite.addTest(unittest.makeSuite(AssociateRecnoTestCase)) + suite.addTest(unittest.makeSuite(AssociateHashTestCase)) + suite.addTest(unittest.makeSuite(AssociateBTreeTestCase)) + suite.addTest(unittest.makeSuite(AssociateRecnoTestCase)) - if db.version() >= (4, 1): - suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase)) + if db.version() >= (4, 1): + suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase)) - suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase)) - suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase)) - suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase)) + suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase)) + suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase)) + suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase)) - if have_threads: - suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase)) - suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase)) - suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase)) + if have_threads: + suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase)) + suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase)) + suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase)) return suite |
