diff options
Diffstat (limited to 'Lib/bsddb/test/test_thread.py')
-rw-r--r-- | Lib/bsddb/test/test_thread.py | 500 |
1 files changed, 252 insertions, 248 deletions
diff --git a/Lib/bsddb/test/test_thread.py b/Lib/bsddb/test/test_thread.py index fbd7730..bb43034 100644 --- a/Lib/bsddb/test/test_thread.py +++ b/Lib/bsddb/test/test_thread.py @@ -5,17 +5,9 @@ import os import sys import time import errno -import tempfile -from pprint import pprint from random import random -DASH = b'-' - -try: - from threading import Thread, current_thread - have_threads = True -except ImportError: - have_threads = False +DASH = '-' try: WindowsError @@ -24,19 +16,16 @@ except NameError: pass import unittest -from bsddb.test.test_all import verbose +from .test_all import db, dbutils, test_support, verbose, have_threads, \ + get_new_environment_path, get_new_database_path -try: - # For Pythons w/distutils pybsddb - from bsddb3 import db, dbutils -except ImportError: - # For Python 2.3 - from bsddb import db, dbutils - -try: - from bsddb3 import test_support -except ImportError: - from test import support as test_support +if have_threads : + from threading import Thread + import sys + if sys.version_info[0] < 3 : + from threading import currentThread + else : + from threading import current_thread as currentThread #---------------------------------------------------------------------- @@ -47,16 +36,16 @@ class BaseThreadedTestCase(unittest.TestCase): dbsetflags = 0 envflags = 0 + import sys + if sys.version_info[:3] < (2, 4, 0): + def assertTrue(self, expr, msg=None): + self.failUnless(expr,msg=msg) + def setUp(self): if verbose: dbutils._deadlock_VerboseFile = sys.stdout - homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid()) - self.homeDir = homeDir - try: - os.mkdir(homeDir) - except OSError as e: - if e.errno != errno.EEXIST: raise + self.homeDir = get_new_environment_path() self.env = db.DBEnv() self.setEnvOpts() self.env.open(self.homeDir, self.envflags | db.DB_CREATE) @@ -78,33 +67,6 @@ class BaseThreadedTestCase(unittest.TestCase): def makeData(self, key): return DASH.join([key] * 5) - def _writerThread(self, *args, **kwargs): - raise RuntimeError("must override this in a subclass") - - def _readerThread(self, *args, **kwargs): - raise RuntimeError("must override this in a subclass") - - def writerThread(self, *args, **kwargs): - try: - self._writerThread(*args, **kwargs) - except db.DBLockDeadlockError: - if verbose: - print(current_thread().name, 'died from', e) - else: - if verbose: - print(current_thread().name, "finished.") - - def readerThread(self, *args, **kwargs): - try: - self._readerThread(*args, **kwargs) - except db.DBLockDeadlockError as e: - if verbose: - print(current_thread().name, 'died from', e) - else: - if verbose: - print(current_thread().name, "finished.") - - #---------------------------------------------------------------------- @@ -121,60 +83,91 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): print('\n', '-=' * 30) print("Running %s.test01_1WriterMultiReaders..." % \ self.__class__.__name__) - print('Using:', self.homeDir, self.filename) - threads = [] - wt = Thread(target = self.writerThread, - args = (self.d, self.records), - name = 'the writer', - )#verbose = verbose) - threads.append(wt) + keys=list(range(self.records)) + import random + random.shuffle(keys) + records_per_writer=self.records//self.writers + readers_per_writer=self.readers//self.writers + self.assertEqual(self.records,self.writers*records_per_writer) + self.assertEqual(self.readers,self.writers*readers_per_writer) + self.assertTrue((records_per_writer%readers_per_writer)==0) + readers = [] for x in range(self.readers): rt = Thread(target = self.readerThread, args = (self.d, x), name = 'reader %d' % x, )#verbose = verbose) - threads.append(rt) - - for t in threads: + import sys + if sys.version_info[0] < 3 : + rt.setDaemon(True) + else : + rt.daemon = True + readers.append(rt) + + writers=[] + for x in range(self.writers): + a=keys[records_per_writer*x:records_per_writer*(x+1)] + a.sort() # Generate conflicts + b=readers[readers_per_writer*x:readers_per_writer*(x+1)] + wt = Thread(target = self.writerThread, + args = (self.d, a, b), + name = 'writer %d' % x, + )#verbose = verbose) + writers.append(wt) + + for t in writers: + import sys + if sys.version_info[0] < 3 : + t.setDaemon(True) + else : + t.daemon = True t.start() - for t in threads: + + for t in writers: t.join() + for t in readers: + t.join() + + def writerThread(self, d, keys, readers): + import sys + if sys.version_info[0] < 3 : + name = currentThread().getName() + else : + name = currentThread().name - def _writerThread(self, d, howMany): - name = current_thread().name - start = 0 - stop = howMany if verbose: - print(name+": creating records", start, "-", stop) + print("%s: creating records %d - %d" % (name, start, stop)) + + count=len(keys)//len(readers) + count2=count + for x in keys : + key = '%04d' % x + dbutils.DeadlockWrap(d.put, key, self.makeData(key), + max_retries=12) + if verbose and x % 100 == 0: + print("%s: records %d - %d finished" % (name, start, x)) - for x in range(start, stop): - key = ('%04d' % x).encode("ascii") - d.put(key, self.makeData(key)) - if verbose and x > start and x % 50 == 0: - print(name+": records", start, "-", x, "finished") + count2-=1 + if not count2 : + readers.pop().start() + count2=count if verbose: print("%s: finished creating records" % name) -## # Each write-cursor will be exclusive, the only one that can update the DB... -## if verbose: print "%s: deleting a few records" % name -## c = d.cursor(flags = db.DB_WRITECURSOR) -## for x in range(10): -## key = int(random() * howMany) + start -## key = '%04d' % key -## if d.has_key(key): -## c.set(key) -## c.delete() - -## c.close() + if verbose: + print("%s: thread finished" % name) - def _readerThread(self, d, readerNum): - time.sleep(0.01 * readerNum) - name = current_thread().name + def readerThread(self, d, readerNum): + import sys + if sys.version_info[0] < 3 : + name = currentThread().getName() + else : + name = currentThread().name - for loop in range(5): + for i in range(5) : c = d.cursor() count = 0 rec = c.first() @@ -182,24 +175,26 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): count += 1 key, data = rec self.assertEqual(self.makeData(key), data) - rec = c.next() + rec = next(c) if verbose: - print(name+": found", count, "records") + print("%s: found %d records" % (name, count)) c.close() - time.sleep(0.05) + + if verbose: + print("%s: thread finished" % name) class BTreeConcurrentDataStore(ConcurrentDataStoreBase): dbtype = db.DB_BTREE - writers = 1 + writers = 2 readers = 10 records = 1000 class HashConcurrentDataStore(ConcurrentDataStoreBase): dbtype = db.DB_HASH - writers = 1 - readers = 0 + writers = 2 + readers = 10 records = 1000 @@ -208,8 +203,8 @@ class HashConcurrentDataStore(ConcurrentDataStoreBase): class SimpleThreadedBase(BaseThreadedTestCase): dbopenflags = db.DB_THREAD envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK - readers = 5 - writers = 3 + readers = 10 + writers = 2 records = 1000 def setEnvOpts(self): @@ -220,87 +215,98 @@ class SimpleThreadedBase(BaseThreadedTestCase): print('\n', '-=' * 30) print("Running %s.test02_SimpleLocks..." % self.__class__.__name__) - threads = [] - for x in range(self.writers): - wt = Thread(target = self.writerThread, - args = (self.d, self.records, x), - name = 'writer %d' % x, - )#verbose = verbose) - threads.append(wt) + + keys=list(range(self.records)) + import random + random.shuffle(keys) + records_per_writer=self.records//self.writers + readers_per_writer=self.readers//self.writers + self.assertEqual(self.records,self.writers*records_per_writer) + self.assertEqual(self.readers,self.writers*readers_per_writer) + self.assertTrue((records_per_writer%readers_per_writer)==0) + + readers = [] for x in range(self.readers): rt = Thread(target = self.readerThread, args = (self.d, x), name = 'reader %d' % x, )#verbose = verbose) - threads.append(rt) - - for t in threads: + import sys + if sys.version_info[0] < 3 : + rt.setDaemon(True) + else : + rt.daemon = True + readers.append(rt) + + writers = [] + for x in range(self.writers): + a=keys[records_per_writer*x:records_per_writer*(x+1)] + a.sort() # Generate conflicts + b=readers[readers_per_writer*x:readers_per_writer*(x+1)] + wt = Thread(target = self.writerThread, + args = (self.d, a, b), + name = 'writer %d' % x, + )#verbose = verbose) + writers.append(wt) + + for t in writers: + import sys + if sys.version_info[0] < 3 : + t.setDaemon(True) + else : + t.daemon = True t.start() - for t in threads: + + for t in writers: + t.join() + for t in readers: t.join() - def _writerThread(self, d, howMany, writerNum): - name = current_thread().name - start = howMany * writerNum - stop = howMany * (writerNum + 1) - 1 + def writerThread(self, d, keys, readers): + import sys + if sys.version_info[0] < 3 : + name = currentThread().getName() + else : + name = currentThread().name if verbose: print("%s: creating records %d - %d" % (name, start, stop)) - # create a bunch of records - for x in range(start, stop): - key = ('%04d' % x).encode("ascii") + count=len(keys)//len(readers) + count2=count + for x in keys : + key = '%04d' % x dbutils.DeadlockWrap(d.put, key, self.makeData(key), - max_retries=20) + max_retries=12) if verbose and x % 100 == 0: print("%s: records %d - %d finished" % (name, start, x)) - # do a bit or reading too - if random() <= 0.05: - for y in range(start, x): - key = ('%04d' % x).encode("ascii") - data = dbutils.DeadlockWrap(d.get, key, max_retries=20) - self.assertEqual(data, self.makeData(key)) - - # flush them - try: - dbutils.DeadlockWrap(d.sync, max_retries=20) - except db.DBIncompleteError as val: - if verbose: - print("could not complete sync()...") - - # read them back, deleting a few - for x in range(start, stop): - key = ('%04d' % x).encode("ascii") - data = dbutils.DeadlockWrap(d.get, key, max_retries=20) - if verbose and x % 100 == 0: - print("%s: fetched record (%s, %s)" % (name, key, data)) - self.assertEqual(data, self.makeData(key)) - if random() <= 0.10: - dbutils.DeadlockWrap(d.delete, key, max_retries=20) - if verbose: - print("%s: deleted record %s" % (name, key)) + count2-=1 + if not count2 : + readers.pop().start() + count2=count if verbose: print("%s: thread finished" % name) - def _readerThread(self, d, readerNum): - time.sleep(0.01 * readerNum) - name = current_thread().name - - for loop in range(5): - c = d.cursor() - count = 0 - rec = dbutils.DeadlockWrap(c.first, max_retries=20) - while rec: - count += 1 - key, data = rec - self.assertEqual(self.makeData(key), data) - rec = dbutils.DeadlockWrap(c.next, max_retries=20) - if verbose: - print("%s: found %d records" % (name, count)) - c.close() - time.sleep(0.05) + def readerThread(self, d, readerNum): + import sys + if sys.version_info[0] < 3 : + name = currentThread().getName() + else : + name = currentThread().name + + c = d.cursor() + count = 0 + rec = dbutils.DeadlockWrap(c.first, max_retries=10) + while rec: + count += 1 + key, data = rec + self.assertEqual(self.makeData(key), data) + rec = dbutils.DeadlockWrap(c.__next__, max_retries=10) + if verbose: + print("%s: found %d records" % (name, count)) + c.close() if verbose: print("%s: thread finished" % name) @@ -340,120 +346,118 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): print("Running %s.test03_ThreadedTransactions..." % \ self.__class__.__name__) - threads = [] - for x in range(self.writers): - wt = Thread(target = self.writerThread, - args = (self.d, self.records, x), - name = 'writer %d' % x, - )#verbose = verbose) - threads.append(wt) + keys=list(range(self.records)) + import random + random.shuffle(keys) + records_per_writer=self.records//self.writers + readers_per_writer=self.readers//self.writers + self.assertEqual(self.records,self.writers*records_per_writer) + self.assertEqual(self.readers,self.writers*readers_per_writer) + self.assertTrue((records_per_writer%readers_per_writer)==0) + readers=[] for x in range(self.readers): rt = Thread(target = self.readerThread, args = (self.d, x), name = 'reader %d' % x, )#verbose = verbose) - threads.append(rt) + import sys + if sys.version_info[0] < 3 : + rt.setDaemon(True) + else : + rt.daemon = True + readers.append(rt) + + writers = [] + for x in range(self.writers): + a=keys[records_per_writer*x:records_per_writer*(x+1)] + b=readers[readers_per_writer*x:readers_per_writer*(x+1)] + wt = Thread(target = self.writerThread, + args = (self.d, a, b), + name = 'writer %d' % x, + )#verbose = verbose) + writers.append(wt) dt = Thread(target = self.deadlockThread) + import sys + if sys.version_info[0] < 3 : + dt.setDaemon(True) + else : + dt.daemon = True dt.start() - for t in threads: + for t in writers: + import sys + if sys.version_info[0] < 3 : + t.setDaemon(True) + else : + t.daemon = True t.start() - for t in threads: + + for t in writers: + t.join() + for t in readers: t.join() self.doLockDetect = False dt.join() - def doWrite(self, d, name, start, stop): - finished = False - while not finished: + def writerThread(self, d, keys, readers): + import sys + if sys.version_info[0] < 3 : + name = currentThread().getName() + else : + name = currentThread().name + + count=len(keys)//len(readers) + while len(keys): try: txn = self.env.txn_begin(None, self.txnFlag) - for x in range(start, stop): - key = ('%04d' % x).encode("ascii") + keys2=keys[:count] + for x in keys2 : + key = '%04d' % x d.put(key, self.makeData(key), txn) if verbose and x % 100 == 0: print("%s: records %d - %d finished" % (name, start, x)) txn.commit() - finished = True + keys=keys[count:] + readers.pop().start() except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val: if verbose: - print("%s: Aborting transaction (%s)" % (name, val)) + print("%s: Aborting transaction (%s)" % (name, val[1])) txn.abort() - time.sleep(0.05) - def _writerThread(self, d, howMany, writerNum): - name = current_thread().name - start = howMany * writerNum - stop = howMany * (writerNum + 1) - 1 if verbose: - print("%s: creating records %d - %d" % (name, start, stop)) - - step = 100 - for x in range(start, stop, step): - self.doWrite(d, name, x, min(stop, x+step)) + print("%s: thread finished" % name) - if verbose: - print("%s: finished creating records" % name) - if verbose: - print("%s: deleting a few records" % name) + def readerThread(self, d, readerNum): + import sys + if sys.version_info[0] < 3 : + name = currentThread().getName() + else : + name = currentThread().name finished = False while not finished: try: - recs = [] txn = self.env.txn_begin(None, self.txnFlag) - for x in range(10): - key = int(random() * howMany) + start - key = ('%04d' % key).encode("ascii") - data = d.get(key, None, txn, db.DB_RMW) - if data is not None: - d.delete(key, txn) - recs.append(key) + c = d.cursor(txn) + count = 0 + rec = c.first() + while rec: + count += 1 + key, data = rec + self.assertEqual(self.makeData(key), data) + rec = next(c) + if verbose: print("%s: found %d records" % (name, count)) + c.close() txn.commit() finished = True - if verbose: - print("%s: deleted records %s" % (name, recs)) except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val: if verbose: - print("%s: Aborting transaction (%s)" % (name, val)) + print("%s: Aborting transaction (%s)" % (name, val[1])) + c.close() txn.abort() - time.sleep(0.05) - - if verbose: - print("%s: thread finished" % name) - - def _readerThread(self, d, readerNum): - time.sleep(0.01 * readerNum + 0.05) - name = current_thread().name - - for loop in range(5): - finished = False - while not finished: - try: - txn = self.env.txn_begin(None, self.txnFlag) - c = d.cursor(txn) - count = 0 - rec = c.first() - while rec: - count += 1 - key, data = rec - self.assertEqual(self.makeData(key), data) - rec = c.next() - if verbose: print("%s: found %d records" % (name, count)) - c.close() - txn.commit() - finished = True - except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val: - if verbose: - print("%s: Aborting transaction (%s)" % (name, val)) - c.close() - txn.abort() - time.sleep(0.05) - - time.sleep(0.05) if verbose: print("%s: thread finished" % name) @@ -461,7 +465,7 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): def deadlockThread(self): self.doLockDetect = True while self.doLockDetect: - time.sleep(0.5) + time.sleep(0.05) try: aborted = self.env.lock_detect( db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT) @@ -474,28 +478,28 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): class BTreeThreadedTransactions(ThreadedTransactionsBase): dbtype = db.DB_BTREE - writers = 3 - readers = 5 - records = 2000 + writers = 2 + readers = 10 + records = 1000 class HashThreadedTransactions(ThreadedTransactionsBase): dbtype = db.DB_HASH - writers = 1 - readers = 5 - records = 2000 + writers = 2 + readers = 10 + records = 1000 class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase): dbtype = db.DB_BTREE - writers = 3 - readers = 5 - records = 2000 + writers = 2 + readers = 10 + records = 1000 txnFlag = db.DB_TXN_NOWAIT class HashThreadedNoWaitTransactions(ThreadedTransactionsBase): dbtype = db.DB_HASH - writers = 1 - readers = 5 - records = 2000 + writers = 2 + readers = 10 + records = 1000 txnFlag = db.DB_TXN_NOWAIT |