diff options
Diffstat (limited to 'Lib/bsddb/test/test_thread.py')
-rw-r--r-- | Lib/bsddb/test/test_thread.py | 351 |
1 files changed, 166 insertions, 185 deletions
diff --git a/Lib/bsddb/test/test_thread.py b/Lib/bsddb/test/test_thread.py index 0395f6d..5ac98ce 100644 --- a/Lib/bsddb/test/test_thread.py +++ b/Lib/bsddb/test/test_thread.py @@ -5,7 +5,6 @@ import os import sys import time import errno -import tempfile from random import random try: @@ -29,7 +28,8 @@ except NameError: pass import unittest -from test_all import verbose +from test_all import verbose, get_new_environment_path, get_new_database_path + try: # For Pythons w/distutils pybsddb @@ -52,19 +52,19 @@ class BaseThreadedTestCase(unittest.TestCase): dbsetflags = 0 envflags = 0 + import sys + if sys.version_info[:3] < (2, 4, 0): + def assertTrue(self, expr, msg=None): + self.failUnless(expr,msg=msg) + def setUp(self): if verbose: dbutils._deadlock_VerboseFile = sys.stdout - homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid()) - self.homeDir = homeDir - try: - os.mkdir(homeDir) - except OSError, e: - if e.errno != errno.EEXIST: raise + self.homeDir = get_new_environment_path() self.env = db.DBEnv() self.setEnvOpts() - self.env.open(homeDir, self.envflags | db.DB_CREATE) + self.env.open(self.homeDir, self.envflags | db.DB_CREATE) self.filename = self.__class__.__name__ + '.db' self.d = db.DB(self.env) @@ -100,63 +100,73 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): print "Running %s.test01_1WriterMultiReaders..." % \ self.__class__.__name__ - threads = [] - for x in range(self.writers): - wt = Thread(target = self.writerThread, - args = (self.d, self.records, x), - name = 'writer %d' % x, - )#verbose = verbose) - threads.append(wt) - - for x in range(self.readers): + keys=range(self.records) + import random + random.shuffle(keys) + records_per_writer=self.records/self.writers + readers_per_writer=self.readers/self.writers + self.assertEqual(self.records,self.writers*records_per_writer) + self.assertEqual(self.readers,self.writers*readers_per_writer) + self.assertTrue((records_per_writer%readers_per_writer)==0) + readers = [] + + for x in xrange(self.readers): rt = Thread(target = self.readerThread, args = (self.d, x), name = 'reader %d' % x, )#verbose = verbose) - threads.append(rt) + rt.setDaemon(True) + readers.append(rt) + + writers=[] + for x in xrange(self.writers): + a=keys[records_per_writer*x:records_per_writer*(x+1)] + a.sort() # Generate conflicts + b=readers[readers_per_writer*x:readers_per_writer*(x+1)] + wt = Thread(target = self.writerThread, + args = (self.d, a, b), + name = 'writer %d' % x, + )#verbose = verbose) + writers.append(wt) - for t in threads: + for t in writers: + t.setDaemon(True) t.start() - for t in threads: + + for t in writers: + t.join() + for t in readers: t.join() - def writerThread(self, d, howMany, writerNum): - #time.sleep(0.01 * writerNum + 0.01) + def writerThread(self, d, keys, readers): name = currentThread().getName() - start = howMany * writerNum - stop = howMany * (writerNum + 1) - 1 if verbose: print "%s: creating records %d - %d" % (name, start, stop) - for x in range(start, stop): + count=len(keys)/len(readers) + count2=count + for x in keys : key = '%04d' % x dbutils.DeadlockWrap(d.put, key, self.makeData(key), max_retries=12) if verbose and x % 100 == 0: print "%s: records %d - %d finished" % (name, start, x) + count2-=1 + if not count2 : + readers.pop().start() + count2=count + if verbose: print "%s: finished creating records" % name -## # Each write-cursor will be exclusive, the only one that can update the DB... -## if verbose: print "%s: deleting a few records" % name -## c = d.cursor(flags = db.DB_WRITECURSOR) -## for x in range(10): -## key = int(random() * howMany) + start -## key = '%04d' % key -## if d.has_key(key): -## c.set(key) -## c.delete() - -## c.close() if verbose: print "%s: thread finished" % name def readerThread(self, d, readerNum): - time.sleep(0.01 * readerNum) name = currentThread().getName() - for loop in range(5): + for i in xrange(5) : c = d.cursor() count = 0 rec = c.first() @@ -168,7 +178,6 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): if verbose: print "%s: found %d records" % (name, count) c.close() - time.sleep(0.05) if verbose: print "%s: thread finished" % name @@ -193,8 +202,8 @@ class HashConcurrentDataStore(ConcurrentDataStoreBase): class SimpleThreadedBase(BaseThreadedTestCase): dbopenflags = db.DB_THREAD envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK - readers = 5 - writers = 3 + readers = 10 + writers = 2 records = 1000 def setEnvOpts(self): @@ -205,34 +214,53 @@ class SimpleThreadedBase(BaseThreadedTestCase): print '\n', '-=' * 30 print "Running %s.test02_SimpleLocks..." % self.__class__.__name__ - threads = [] - for x in range(self.writers): - wt = Thread(target = self.writerThread, - args = (self.d, self.records, x), - name = 'writer %d' % x, - )#verbose = verbose) - threads.append(wt) - for x in range(self.readers): + + keys=range(self.records) + import random + random.shuffle(keys) + records_per_writer=self.records/self.writers + readers_per_writer=self.readers/self.writers + self.assertEqual(self.records,self.writers*records_per_writer) + self.assertEqual(self.readers,self.writers*readers_per_writer) + self.assertTrue((records_per_writer%readers_per_writer)==0) + + readers = [] + for x in xrange(self.readers): rt = Thread(target = self.readerThread, args = (self.d, x), name = 'reader %d' % x, )#verbose = verbose) - threads.append(rt) + rt.setDaemon(True) + readers.append(rt) + + writers = [] + for x in xrange(self.writers): + a=keys[records_per_writer*x:records_per_writer*(x+1)] + a.sort() # Generate conflicts + b=readers[readers_per_writer*x:readers_per_writer*(x+1)] + wt = Thread(target = self.writerThread, + args = (self.d, a, b), + name = 'writer %d' % x, + )#verbose = verbose) + writers.append(wt) - for t in threads: + for t in writers: + t.setDaemon(True) t.start() - for t in threads: + + for t in writers: + t.join() + for t in readers: t.join() - def writerThread(self, d, howMany, writerNum): + def writerThread(self, d, keys, readers): name = currentThread().getName() - start = howMany * writerNum - stop = howMany * (writerNum + 1) - 1 if verbose: print "%s: creating records %d - %d" % (name, start, stop) - # create a bunch of records - for x in xrange(start, stop): + count=len(keys)/len(readers) + count2=count + for x in keys : key = '%04d' % x dbutils.DeadlockWrap(d.put, key, self.makeData(key), max_retries=12) @@ -240,52 +268,28 @@ class SimpleThreadedBase(BaseThreadedTestCase): if verbose and x % 100 == 0: print "%s: records %d - %d finished" % (name, start, x) - # do a bit or reading too - if random() <= 0.05: - for y in xrange(start, x): - key = '%04d' % x - data = dbutils.DeadlockWrap(d.get, key, max_retries=12) - self.assertEqual(data, self.makeData(key)) - - # flush them - try: - dbutils.DeadlockWrap(d.sync, max_retries=12) - except db.DBIncompleteError, val: - if verbose: - print "could not complete sync()..." - - # read them back, deleting a few - for x in xrange(start, stop): - key = '%04d' % x - data = dbutils.DeadlockWrap(d.get, key, max_retries=12) - if verbose and x % 100 == 0: - print "%s: fetched record (%s, %s)" % (name, key, data) - self.assertEqual(data, self.makeData(key)) - if random() <= 0.10: - dbutils.DeadlockWrap(d.delete, key, max_retries=12) - if verbose: - print "%s: deleted record %s" % (name, key) + count2-=1 + if not count2 : + readers.pop().start() + count2=count if verbose: print "%s: thread finished" % name def readerThread(self, d, readerNum): - time.sleep(0.01 * readerNum) name = currentThread().getName() - for loop in range(5): - c = d.cursor() - count = 0 - rec = dbutils.DeadlockWrap(c.first, max_retries=10) - while rec: - count += 1 - key, data = rec - self.assertEqual(self.makeData(key), data) - rec = dbutils.DeadlockWrap(c.next, max_retries=10) - if verbose: - print "%s: found %d records" % (name, count) - c.close() - time.sleep(0.05) + c = d.cursor() + count = 0 + rec = dbutils.DeadlockWrap(c.first, max_retries=10) + while rec: + count += 1 + key, data = rec + self.assertEqual(self.makeData(key), data) + rec = dbutils.DeadlockWrap(c.next, max_retries=10) + if verbose: + print "%s: found %d records" % (name, count) + c.close() if verbose: print "%s: thread finished" % name @@ -325,120 +329,97 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): print "Running %s.test03_ThreadedTransactions..." % \ self.__class__.__name__ - threads = [] - for x in range(self.writers): - wt = Thread(target = self.writerThread, - args = (self.d, self.records, x), - name = 'writer %d' % x, - )#verbose = verbose) - threads.append(wt) - - for x in range(self.readers): + keys=range(self.records) + import random + random.shuffle(keys) + records_per_writer=self.records/self.writers + readers_per_writer=self.readers/self.writers + self.assertEqual(self.records,self.writers*records_per_writer) + self.assertEqual(self.readers,self.writers*readers_per_writer) + self.assertTrue((records_per_writer%readers_per_writer)==0) + + readers=[] + for x in xrange(self.readers): rt = Thread(target = self.readerThread, args = (self.d, x), name = 'reader %d' % x, )#verbose = verbose) - threads.append(rt) + rt.setDaemon(True) + readers.append(rt) + + writers = [] + for x in xrange(self.writers): + a=keys[records_per_writer*x:records_per_writer*(x+1)] + b=readers[readers_per_writer*x:readers_per_writer*(x+1)] + wt = Thread(target = self.writerThread, + args = (self.d, a, b), + name = 'writer %d' % x, + )#verbose = verbose) + writers.append(wt) dt = Thread(target = self.deadlockThread) + dt.setDaemon(True) dt.start() - for t in threads: + for t in writers: + t.setDaemon(True) t.start() - for t in threads: + + for t in writers: + t.join() + for t in readers: t.join() self.doLockDetect = False dt.join() - def doWrite(self, d, name, start, stop): - finished = False - while not finished: + def writerThread(self, d, keys, readers): + name = currentThread().getName() + count=len(keys)/len(readers) + while len(keys): try: txn = self.env.txn_begin(None, self.txnFlag) - for x in range(start, stop): + keys2=keys[:count] + for x in keys2 : key = '%04d' % x d.put(key, self.makeData(key), txn) if verbose and x % 100 == 0: print "%s: records %d - %d finished" % (name, start, x) txn.commit() - finished = True + keys=keys[count:] + readers.pop().start() except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: if verbose: print "%s: Aborting transaction (%s)" % (name, val[1]) txn.abort() - time.sleep(0.05) - def writerThread(self, d, howMany, writerNum): - name = currentThread().getName() - start = howMany * writerNum - stop = howMany * (writerNum + 1) - 1 if verbose: - print "%s: creating records %d - %d" % (name, start, stop) - - step = 100 - for x in range(start, stop, step): - self.doWrite(d, name, x, min(stop, x+step)) + print "%s: thread finished" % name - if verbose: - print "%s: finished creating records" % name - if verbose: - print "%s: deleting a few records" % name + def readerThread(self, d, readerNum): + name = currentThread().getName() finished = False while not finished: try: - recs = [] txn = self.env.txn_begin(None, self.txnFlag) - for x in range(10): - key = int(random() * howMany) + start - key = '%04d' % key - data = d.get(key, None, txn, db.DB_RMW) - if data is not None: - d.delete(key, txn) - recs.append(key) + c = d.cursor(txn) + count = 0 + rec = c.first() + while rec: + count += 1 + key, data = rec + self.assertEqual(self.makeData(key), data) + rec = c.next() + if verbose: print "%s: found %d records" % (name, count) + c.close() txn.commit() finished = True - if verbose: - print "%s: deleted records %s" % (name, recs) except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: if verbose: print "%s: Aborting transaction (%s)" % (name, val[1]) + c.close() txn.abort() - time.sleep(0.05) - - if verbose: - print "%s: thread finished" % name - - def readerThread(self, d, readerNum): - time.sleep(0.01 * readerNum + 0.05) - name = currentThread().getName() - - for loop in range(5): - finished = False - while not finished: - try: - txn = self.env.txn_begin(None, self.txnFlag) - c = d.cursor(txn) - count = 0 - rec = c.first() - while rec: - count += 1 - key, data = rec - self.assertEqual(self.makeData(key), data) - rec = c.next() - if verbose: print "%s: found %d records" % (name, count) - c.close() - txn.commit() - finished = True - except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val: - if verbose: - print "%s: Aborting transaction (%s)" % (name, val[1]) - c.close() - txn.abort() - time.sleep(0.05) - - time.sleep(0.05) if verbose: print "%s: thread finished" % name @@ -446,7 +427,7 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): def deadlockThread(self): self.doLockDetect = True while self.doLockDetect: - time.sleep(0.5) + time.sleep(0.05) try: aborted = self.env.lock_detect( db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT) @@ -459,28 +440,28 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): class BTreeThreadedTransactions(ThreadedTransactionsBase): dbtype = db.DB_BTREE - writers = 3 - readers = 5 - records = 2000 + writers = 2 + readers = 10 + records = 1000 class HashThreadedTransactions(ThreadedTransactionsBase): dbtype = db.DB_HASH - writers = 1 - readers = 5 - records = 2000 + writers = 2 + readers = 10 + records = 1000 class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase): dbtype = db.DB_BTREE - writers = 3 - readers = 5 - records = 2000 + writers = 2 + readers = 10 + records = 1000 txnFlag = db.DB_TXN_NOWAIT class HashThreadedNoWaitTransactions(ThreadedTransactionsBase): dbtype = db.DB_HASH - writers = 1 - readers = 5 - records = 2000 + writers = 2 + readers = 10 + records = 1000 txnFlag = db.DB_TXN_NOWAIT |