diff options
Diffstat (limited to 'Lib/bsddb/test/test_thread.py')
-rw-r--r-- | Lib/bsddb/test/test_thread.py | 140 |
1 files changed, 69 insertions, 71 deletions
diff --git a/Lib/bsddb/test/test_thread.py b/Lib/bsddb/test/test_thread.py index fb2ba1c..14f8d26 100644 --- a/Lib/bsddb/test/test_thread.py +++ b/Lib/bsddb/test/test_thread.py @@ -1,23 +1,31 @@ -""" -TestCases for multi-threaded access to a DB. +"""TestCases for multi-threaded access to a DB. """ -import sys, os, string -import tempfile +import os +import sys import time +import errno +import shutil +import tempfile from pprint import pprint from whrandom import random try: + True, False +except NameError: + True = 1 + False = 0 + +DASH = '-' + +try: from threading import Thread, currentThread - have_threads = 1 + have_threads = True except ImportError: - have_threads = 0 - + have_threads = False import unittest from test_all import verbose - from bsddb import db, dbutils #---------------------------------------------------------------------- @@ -28,15 +36,16 @@ class BaseThreadedTestCase(unittest.TestCase): dbsetflags = 0 envflags = 0 - def setUp(self): if verbose: dbutils._deadlock_VerboseFile = sys.stdout homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home') self.homeDir = homeDir - try: os.mkdir(homeDir) - except os.error: pass + try: + os.mkdir(homeDir) + except OSError, e: + if e.errno <> errno.EEXIST: raise self.env = db.DBEnv() self.setEnvOpts() self.env.open(homeDir, self.envflags | db.DB_CREATE) @@ -47,22 +56,16 @@ class BaseThreadedTestCase(unittest.TestCase): self.d.set_flags(self.dbsetflags) self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE) - def tearDown(self): self.d.close() self.env.close() - import glob - files = glob.glob(os.path.join(self.homeDir, '*')) - for file in files: - os.remove(file) - + shutil.rmtree(self.homeDir) def setEnvOpts(self): pass - def makeData(self, key): - return string.join([key] * 5, '-') + return DASH.join([key] * 5) #---------------------------------------------------------------------- @@ -75,7 +78,6 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): writers = 0 records = 1000 - def test01_1WriterMultiReaders(self): if verbose: print '\n', '-=' * 30 @@ -102,11 +104,11 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): for t in threads: t.join() - def writerThread(self, d, howMany, writerNum): #time.sleep(0.01 * writerNum + 0.01) name = currentThread().getName() - start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1 + start = howMany * writerNum + stop = howMany * (writerNum + 1) - 1 if verbose: print "%s: creating records %d - %d" % (name, start, stop) @@ -117,7 +119,8 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): if verbose and x % 100 == 0: print "%s: records %d - %d finished" % (name, start, x) - if verbose: print "%s: finished creating records" % name + if verbose: + print "%s: finished creating records" % name ## # Each write-cursor will be exclusive, the only one that can update the DB... ## if verbose: print "%s: deleting a few records" % name @@ -130,8 +133,8 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): ## c.delete() ## c.close() - if verbose: print "%s: thread finished" % name - + if verbose: + print "%s: thread finished" % name def readerThread(self, d, readerNum): time.sleep(0.01 * readerNum) @@ -142,16 +145,17 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): count = 0 rec = c.first() while rec: - count = count + 1 + count += 1 key, data = rec - assert self.makeData(key) == data + self.assertEqual(self.makeData(key), data) rec = c.next() - if verbose: print "%s: found %d records" % (name, count) + if verbose: + print "%s: found %d records" % (name, count) c.close() time.sleep(0.05) - if verbose: print "%s: thread finished" % name - + if verbose: + print "%s: thread finished" % name class BTreeConcurrentDataStore(ConcurrentDataStoreBase): @@ -167,6 +171,7 @@ class HashConcurrentDataStore(ConcurrentDataStoreBase): readers = 10 records = 1000 + #---------------------------------------------------------------------- class SimpleThreadedBase(BaseThreadedTestCase): @@ -176,11 +181,9 @@ class SimpleThreadedBase(BaseThreadedTestCase): writers = 3 records = 1000 - def setEnvOpts(self): self.env.set_lk_detect(db.DB_LOCK_DEFAULT) - def test02_SimpleLocks(self): if verbose: print '\n', '-=' * 30 @@ -205,11 +208,10 @@ class SimpleThreadedBase(BaseThreadedTestCase): for t in threads: t.join() - - def writerThread(self, d, howMany, writerNum): name = currentThread().getName() - start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1 + start = howMany * writerNum + stop = howMany * (writerNum + 1) - 1 if verbose: print "%s: creating records %d - %d" % (name, start, stop) @@ -227,7 +229,7 @@ class SimpleThreadedBase(BaseThreadedTestCase): for y in xrange(start, x): key = '%04d' % x data = dbutils.DeadlockWrap(d.get, key, max_retries=12) - assert data == self.makeData(key) + self.assertEqual(data, self.makeData(key)) # flush them try: @@ -242,14 +244,14 @@ class SimpleThreadedBase(BaseThreadedTestCase): data = dbutils.DeadlockWrap(d.get, key, max_retries=12) if verbose and x % 100 == 0: print "%s: fetched record (%s, %s)" % (name, key, data) - assert data == self.makeData(key), (key, data, self.makeData(key)) + self.assertEqual(data, self.makeData(key)) if random() <= 0.10: dbutils.DeadlockWrap(d.delete, key, max_retries=12) if verbose: print "%s: deleted record %s" % (name, key) - if verbose: print "%s: thread finished" % name - + if verbose: + print "%s: thread finished" % name def readerThread(self, d, readerNum): time.sleep(0.01 * readerNum) @@ -260,17 +262,17 @@ class SimpleThreadedBase(BaseThreadedTestCase): count = 0 rec = c.first() while rec: - count = count + 1 + count += 1 key, data = rec - assert self.makeData(key) == data + self.assertEqual(self.makeData(key), data) rec = c.next() - if verbose: print "%s: found %d records" % (name, count) + if verbose: + print "%s: found %d records" % (name, count) c.close() time.sleep(0.05) - if verbose: print "%s: thread finished" % name - - + if verbose: + print "%s: thread finished" % name class BTreeSimpleThreaded(SimpleThreadedBase): @@ -284,7 +286,6 @@ class HashSimpleThreaded(SimpleThreadedBase): #---------------------------------------------------------------------- - class ThreadedTransactionsBase(BaseThreadedTestCase): dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT envflags = (db.DB_THREAD | @@ -296,15 +297,12 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): readers = 0 writers = 0 records = 2000 - txnFlag = 0 - def setEnvOpts(self): #self.env.set_lk_detect(db.DB_LOCK_DEFAULT) pass - def test03_ThreadedTransactions(self): if verbose: print '\n', '-=' * 30 @@ -334,12 +332,11 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): for t in threads: t.join() - self.doLockDetect = 0 + self.doLockDetect = False dt.join() - def doWrite(self, d, name, start, stop): - finished = 0 + finished = False while not finished: try: txn = self.env.txn_begin(None, self.txnFlag) @@ -349,18 +346,17 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): if verbose and x % 100 == 0: print "%s: records %d - %d finished" % (name, start, x) txn.commit() - finished = 1 + finished = True except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: if verbose: print "%s: Aborting transaction (%s)" % (name, val[1]) txn.abort() time.sleep(0.05) - - def writerThread(self, d, howMany, writerNum): name = currentThread().getName() - start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1 + start = howMany * writerNum + stop = howMany * (writerNum + 1) - 1 if verbose: print "%s: creating records %d - %d" % (name, start, stop) @@ -368,10 +364,12 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): for x in range(start, stop, step): self.doWrite(d, name, x, min(stop, x+step)) - if verbose: print "%s: finished creating records" % name - if verbose: print "%s: deleting a few records" % name + if verbose: + print "%s: finished creating records" % name + if verbose: + print "%s: deleting a few records" % name - finished = 0 + finished = False while not finished: try: recs = [] @@ -384,23 +382,24 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): d.delete(key, txn) recs.append(key) txn.commit() - finished = 1 - if verbose: print "%s: deleted records %s" % (name, recs) + finished = True + if verbose: + print "%s: deleted records %s" % (name, recs) except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: if verbose: print "%s: Aborting transaction (%s)" % (name, val[1]) txn.abort() time.sleep(0.05) - if verbose: print "%s: thread finished" % name - + if verbose: + print "%s: thread finished" % name def readerThread(self, d, readerNum): time.sleep(0.01 * readerNum + 0.05) name = currentThread().getName() for loop in range(5): - finished = 0 + finished = False while not finished: try: txn = self.env.txn_begin(None, self.txnFlag) @@ -408,14 +407,14 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): count = 0 rec = c.first() while rec: - count = count + 1 + count += 1 key, data = rec - assert self.makeData(key) == data + self.assertEqual(self.makeData(key), data) rec = c.next() if verbose: print "%s: found %d records" % (name, count) c.close() txn.commit() - finished = 1 + finished = True except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: if verbose: print "%s: Aborting transaction (%s)" % (name, val[1]) @@ -425,11 +424,11 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): time.sleep(0.05) - if verbose: print "%s: thread finished" % name - + if verbose: + print "%s: thread finished" % name def deadlockThread(self): - self.doLockDetect = 1 + self.doLockDetect = True while self.doLockDetect: time.sleep(0.5) try: @@ -442,7 +441,6 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): pass - class BTreeThreadedTransactions(ThreadedTransactionsBase): dbtype = db.DB_BTREE writers = 3 |