diff options
author | Jesus Cea <jcea@jcea.es> | 2008-08-31 14:12:11 (GMT) |
---|---|---|
committer | Jesus Cea <jcea@jcea.es> | 2008-08-31 14:12:11 (GMT) |
commit | 6ba3329c274e2c7876c61f2e98d4592310d26bae (patch) | |
tree | 6bb346e892269279fa2011c3e4bd4648b273a7ae /Lib/bsddb/test/test_thread.py | |
parent | 73c96dbf34c70bbf1ef807b98d51cf9c0e9dc042 (diff) | |
download | cpython-6ba3329c274e2c7876c61f2e98d4592310d26bae.zip cpython-6ba3329c274e2c7876c61f2e98d4592310d26bae.tar.gz cpython-6ba3329c274e2c7876c61f2e98d4592310d26bae.tar.bz2 |
bsddb code updated to version 4.7.3pre2. This code is the same than
Python 2.6 one, since the intention is to keep an unified 2.x/3.x
codebase.
The Python code is automatically translated using "2to3". Please, do not
update this code in Python 3.0 by hand. Update the 2.6 one and then do
"2to3".
Diffstat (limited to 'Lib/bsddb/test/test_thread.py')
-rw-r--r-- | Lib/bsddb/test/test_thread.py | 500 |
1 files changed, 252 insertions, 248 deletions
diff --git a/Lib/bsddb/test/test_thread.py b/Lib/bsddb/test/test_thread.py index fbd7730..bb43034 100644 --- a/Lib/bsddb/test/test_thread.py +++ b/Lib/bsddb/test/test_thread.py @@ -5,17 +5,9 @@ import os import sys import time import errno -import tempfile -from pprint import pprint from random import random -DASH = b'-' - -try: - from threading import Thread, current_thread - have_threads = True -except ImportError: - have_threads = False +DASH = '-' try: WindowsError @@ -24,19 +16,16 @@ except NameError: pass import unittest -from bsddb.test.test_all import verbose +from .test_all import db, dbutils, test_support, verbose, have_threads, \ + get_new_environment_path, get_new_database_path -try: - # For Pythons w/distutils pybsddb - from bsddb3 import db, dbutils -except ImportError: - # For Python 2.3 - from bsddb import db, dbutils - -try: - from bsddb3 import test_support -except ImportError: - from test import support as test_support +if have_threads : + from threading import Thread + import sys + if sys.version_info[0] < 3 : + from threading import currentThread + else : + from threading import current_thread as currentThread #---------------------------------------------------------------------- @@ -47,16 +36,16 @@ class BaseThreadedTestCase(unittest.TestCase): dbsetflags = 0 envflags = 0 + import sys + if sys.version_info[:3] < (2, 4, 0): + def assertTrue(self, expr, msg=None): + self.failUnless(expr,msg=msg) + def setUp(self): if verbose: dbutils._deadlock_VerboseFile = sys.stdout - homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid()) - self.homeDir = homeDir - try: - os.mkdir(homeDir) - except OSError as e: - if e.errno != errno.EEXIST: raise + self.homeDir = get_new_environment_path() self.env = db.DBEnv() self.setEnvOpts() self.env.open(self.homeDir, self.envflags | db.DB_CREATE) @@ -78,33 +67,6 @@ class BaseThreadedTestCase(unittest.TestCase): def makeData(self, key): return DASH.join([key] * 5) - def _writerThread(self, *args, **kwargs): - raise RuntimeError("must override this in a subclass") - - def _readerThread(self, *args, **kwargs): - raise RuntimeError("must override this in a subclass") - - def writerThread(self, *args, **kwargs): - try: - self._writerThread(*args, **kwargs) - except db.DBLockDeadlockError: - if verbose: - print(current_thread().name, 'died from', e) - else: - if verbose: - print(current_thread().name, "finished.") - - def readerThread(self, *args, **kwargs): - try: - self._readerThread(*args, **kwargs) - except db.DBLockDeadlockError as e: - if verbose: - print(current_thread().name, 'died from', e) - else: - if verbose: - print(current_thread().name, "finished.") - - #---------------------------------------------------------------------- @@ -121,60 +83,91 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): print('\n', '-=' * 30) print("Running %s.test01_1WriterMultiReaders..." % \ self.__class__.__name__) - print('Using:', self.homeDir, self.filename) - threads = [] - wt = Thread(target = self.writerThread, - args = (self.d, self.records), - name = 'the writer', - )#verbose = verbose) - threads.append(wt) + keys=list(range(self.records)) + import random + random.shuffle(keys) + records_per_writer=self.records//self.writers + readers_per_writer=self.readers//self.writers + self.assertEqual(self.records,self.writers*records_per_writer) + self.assertEqual(self.readers,self.writers*readers_per_writer) + self.assertTrue((records_per_writer%readers_per_writer)==0) + readers = [] for x in range(self.readers): rt = Thread(target = self.readerThread, args = (self.d, x), name = 'reader %d' % x, )#verbose = verbose) - threads.append(rt) - - for t in threads: + import sys + if sys.version_info[0] < 3 : + rt.setDaemon(True) + else : + rt.daemon = True + readers.append(rt) + + writers=[] + for x in range(self.writers): + a=keys[records_per_writer*x:records_per_writer*(x+1)] + a.sort() # Generate conflicts + b=readers[readers_per_writer*x:readers_per_writer*(x+1)] + wt = Thread(target = self.writerThread, + args = (self.d, a, b), + name = 'writer %d' % x, + )#verbose = verbose) + writers.append(wt) + + for t in writers: + import sys + if sys.version_info[0] < 3 : + t.setDaemon(True) + else : + t.daemon = True t.start() - for t in threads: + + for t in writers: t.join() + for t in readers: + t.join() + + def writerThread(self, d, keys, readers): + import sys + if sys.version_info[0] < 3 : + name = currentThread().getName() + else : + name = currentThread().name - def _writerThread(self, d, howMany): - name = current_thread().name - start = 0 - stop = howMany if verbose: - print(name+": creating records", start, "-", stop) + print("%s: creating records %d - %d" % (name, start, stop)) + + count=len(keys)//len(readers) + count2=count + for x in keys : + key = '%04d' % x + dbutils.DeadlockWrap(d.put, key, self.makeData(key), + max_retries=12) + if verbose and x % 100 == 0: + print("%s: records %d - %d finished" % (name, start, x)) - for x in range(start, stop): - key = ('%04d' % x).encode("ascii") - d.put(key, self.makeData(key)) - if verbose and x > start and x % 50 == 0: - print(name+": records", start, "-", x, "finished") + count2-=1 + if not count2 : + readers.pop().start() + count2=count if verbose: print("%s: finished creating records" % name) -## # Each write-cursor will be exclusive, the only one that can update the DB... -## if verbose: print "%s: deleting a few records" % name -## c = d.cursor(flags = db.DB_WRITECURSOR) -## for x in range(10): -## key = int(random() * howMany) + start -## key = '%04d' % key -## if d.has_key(key): -## c.set(key) -## c.delete() - -## c.close() + if verbose: + print("%s: thread finished" % name) - def _readerThread(self, d, readerNum): - time.sleep(0.01 * readerNum) - name = current_thread().name + def readerThread(self, d, readerNum): + import sys + if sys.version_info[0] < 3 : + name = currentThread().getName() + else : + name = currentThread().name - for loop in range(5): + for i in range(5) : c = d.cursor() count = 0 rec = c.first() @@ -182,24 +175,26 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): count += 1 key, data = rec self.assertEqual(self.makeData(key), data) - rec = c.next() + rec = next(c) if verbose: - print(name+": found", count, "records") + print("%s: found %d records" % (name, count)) c.close() - time.sleep(0.05) + + if verbose: + print("%s: thread finished" % name) class BTreeConcurrentDataStore(ConcurrentDataStoreBase): dbtype = db.DB_BTREE - writers = 1 + writers = 2 readers = 10 records = 1000 class HashConcurrentDataStore(ConcurrentDataStoreBase): dbtype = db.DB_HASH - writers = 1 - readers = 0 + writers = 2 + readers = 10 records = 1000 @@ -208,8 +203,8 @@ class HashConcurrentDataStore(ConcurrentDataStoreBase): class SimpleThreadedBase(BaseThreadedTestCase): dbopenflags = db.DB_THREAD envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK - readers = 5 - writers = 3 + readers = 10 + writers = 2 records = 1000 def setEnvOpts(self): @@ -220,87 +215,98 @@ class SimpleThreadedBase(BaseThreadedTestCase): print('\n', '-=' * 30) print("Running %s.test02_SimpleLocks..." % self.__class__.__name__) - threads = [] - for x in range(self.writers): - wt = Thread(target = self.writerThread, - args = (self.d, self.records, x), - name = 'writer %d' % x, - )#verbose = verbose) - threads.append(wt) + + keys=list(range(self.records)) + import random + random.shuffle(keys) + records_per_writer=self.records//self.writers + readers_per_writer=self.readers//self.writers + self.assertEqual(self.records,self.writers*records_per_writer) + self.assertEqual(self.readers,self.writers*readers_per_writer) + self.assertTrue((records_per_writer%readers_per_writer)==0) + + readers = [] for x in range(self.readers): rt = Thread(target = self.readerThread, args = (self.d, x), name = 'reader %d' % x, )#verbose = verbose) - threads.append(rt) - - for t in threads: + import sys + if sys.version_info[0] < 3 : + rt.setDaemon(True) + else : + rt.daemon = True + readers.append(rt) + + writers = [] + for x in range(self.writers): + a=keys[records_per_writer*x:records_per_writer*(x+1)] + a.sort() # Generate conflicts + b=readers[readers_per_writer*x:readers_per_writer*(x+1)] + wt = Thread(target = self.writerThread, + args = (self.d, a, b), + name = 'writer %d' % x, + )#verbose = verbose) + writers.append(wt) + + for t in writers: + import sys + if sys.version_info[0] < 3 : + t.setDaemon(True) + else : + t.daemon = True t.start() - for t in threads: + + for t in writers: + t.join() + for t in readers: t.join() - def _writerThread(self, d, howMany, writerNum): - name = current_thread().name - start = howMany * writerNum - stop = howMany * (writerNum + 1) - 1 + def writerThread(self, d, keys, readers): + import sys + if sys.version_info[0] < 3 : + name = currentThread().getName() + else : + name = currentThread().name if verbose: print("%s: creating records %d - %d" % (name, start, stop)) - # create a bunch of records - for x in range(start, stop): - key = ('%04d' % x).encode("ascii") + count=len(keys)//len(readers) + count2=count + for x in keys : + key = '%04d' % x dbutils.DeadlockWrap(d.put, key, self.makeData(key), - max_retries=20) + max_retries=12) if verbose and x % 100 == 0: print("%s: records %d - %d finished" % (name, start, x)) - # do a bit or reading too - if random() <= 0.05: - for y in range(start, x): - key = ('%04d' % x).encode("ascii") - data = dbutils.DeadlockWrap(d.get, key, max_retries=20) - self.assertEqual(data, self.makeData(key)) - - # flush them - try: - dbutils.DeadlockWrap(d.sync, max_retries=20) - except db.DBIncompleteError as val: - if verbose: - print("could not complete sync()...") - - # read them back, deleting a few - for x in range(start, stop): - key = ('%04d' % x).encode("ascii") - data = dbutils.DeadlockWrap(d.get, key, max_retries=20) - if verbose and x % 100 == 0: - print("%s: fetched record (%s, %s)" % (name, key, data)) - self.assertEqual(data, self.makeData(key)) - if random() <= 0.10: - dbutils.DeadlockWrap(d.delete, key, max_retries=20) - if verbose: - print("%s: deleted record %s" % (name, key)) + count2-=1 + if not count2 : + readers.pop().start() + count2=count if verbose: print("%s: thread finished" % name) - def _readerThread(self, d, readerNum): - time.sleep(0.01 * readerNum) - name = current_thread().name - - for loop in range(5): - c = d.cursor() - count = 0 - rec = dbutils.DeadlockWrap(c.first, max_retries=20) - while rec: - count += 1 - key, data = rec - self.assertEqual(self.makeData(key), data) - rec = dbutils.DeadlockWrap(c.next, max_retries=20) - if verbose: - print("%s: found %d records" % (name, count)) - c.close() - time.sleep(0.05) + def readerThread(self, d, readerNum): + import sys + if sys.version_info[0] < 3 : + name = currentThread().getName() + else : + name = currentThread().name + + c = d.cursor() + count = 0 + rec = dbutils.DeadlockWrap(c.first, max_retries=10) + while rec: + count += 1 + key, data = rec + self.assertEqual(self.makeData(key), data) + rec = dbutils.DeadlockWrap(c.__next__, max_retries=10) + if verbose: + print("%s: found %d records" % (name, count)) + c.close() if verbose: print("%s: thread finished" % name) @@ -340,120 +346,118 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): print("Running %s.test03_ThreadedTransactions..." % \ self.__class__.__name__) - threads = [] - for x in range(self.writers): - wt = Thread(target = self.writerThread, - args = (self.d, self.records, x), - name = 'writer %d' % x, - )#verbose = verbose) - threads.append(wt) + keys=list(range(self.records)) + import random + random.shuffle(keys) + records_per_writer=self.records//self.writers + readers_per_writer=self.readers//self.writers + self.assertEqual(self.records,self.writers*records_per_writer) + self.assertEqual(self.readers,self.writers*readers_per_writer) + self.assertTrue((records_per_writer%readers_per_writer)==0) + readers=[] for x in range(self.readers): rt = Thread(target = self.readerThread, args = (self.d, x), name = 'reader %d' % x, )#verbose = verbose) - threads.append(rt) + import sys + if sys.version_info[0] < 3 : + rt.setDaemon(True) + else : + rt.daemon = True + readers.append(rt) + + writers = [] + for x in range(self.writers): + a=keys[records_per_writer*x:records_per_writer*(x+1)] + b=readers[readers_per_writer*x:readers_per_writer*(x+1)] + wt = Thread(target = self.writerThread, + args = (self.d, a, b), + name = 'writer %d' % x, + )#verbose = verbose) + writers.append(wt) dt = Thread(target = self.deadlockThread) + import sys + if sys.version_info[0] < 3 : + dt.setDaemon(True) + else : + dt.daemon = True dt.start() - for t in threads: + for t in writers: + import sys + if sys.version_info[0] < 3 : + t.setDaemon(True) + else : + t.daemon = True t.start() - for t in threads: + + for t in writers: + t.join() + for t in readers: t.join() self.doLockDetect = False dt.join() - def doWrite(self, d, name, start, stop): - finished = False - while not finished: + def writerThread(self, d, keys, readers): + import sys + if sys.version_info[0] < 3 : + name = currentThread().getName() + else : + name = currentThread().name + + count=len(keys)//len(readers) + while len(keys): try: txn = self.env.txn_begin(None, self.txnFlag) - for x in range(start, stop): - key = ('%04d' % x).encode("ascii") + keys2=keys[:count] + for x in keys2 : + key = '%04d' % x d.put(key, self.makeData(key), txn) if verbose and x % 100 == 0: print("%s: records %d - %d finished" % (name, start, x)) txn.commit() - finished = True + keys=keys[count:] + readers.pop().start() except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val: if verbose: - print("%s: Aborting transaction (%s)" % (name, val)) + print("%s: Aborting transaction (%s)" % (name, val[1])) txn.abort() - time.sleep(0.05) - def _writerThread(self, d, howMany, writerNum): - name = current_thread().name - start = howMany * writerNum - stop = howMany * (writerNum + 1) - 1 if verbose: - print("%s: creating records %d - %d" % (name, start, stop)) - - step = 100 - for x in range(start, stop, step): - self.doWrite(d, name, x, min(stop, x+step)) + print("%s: thread finished" % name) - if verbose: - print("%s: finished creating records" % name) - if verbose: - print("%s: deleting a few records" % name) + def readerThread(self, d, readerNum): + import sys + if sys.version_info[0] < 3 : + name = currentThread().getName() + else : + name = currentThread().name finished = False while not finished: try: - recs = [] txn = self.env.txn_begin(None, self.txnFlag) - for x in range(10): - key = int(random() * howMany) + start - key = ('%04d' % key).encode("ascii") - data = d.get(key, None, txn, db.DB_RMW) - if data is not None: - d.delete(key, txn) - recs.append(key) + c = d.cursor(txn) + count = 0 + rec = c.first() + while rec: + count += 1 + key, data = rec + self.assertEqual(self.makeData(key), data) + rec = next(c) + if verbose: print("%s: found %d records" % (name, count)) + c.close() txn.commit() finished = True - if verbose: - print("%s: deleted records %s" % (name, recs)) except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val: if verbose: - print("%s: Aborting transaction (%s)" % (name, val)) + print("%s: Aborting transaction (%s)" % (name, val[1])) + c.close() txn.abort() - time.sleep(0.05) - - if verbose: - print("%s: thread finished" % name) - - def _readerThread(self, d, readerNum): - time.sleep(0.01 * readerNum + 0.05) - name = current_thread().name - - for loop in range(5): - finished = False - while not finished: - try: - txn = self.env.txn_begin(None, self.txnFlag) - c = d.cursor(txn) - count = 0 - rec = c.first() - while rec: - count += 1 - key, data = rec - self.assertEqual(self.makeData(key), data) - rec = c.next() - if verbose: print("%s: found %d records" % (name, count)) - c.close() - txn.commit() - finished = True - except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val: - if verbose: - print("%s: Aborting transaction (%s)" % (name, val)) - c.close() - txn.abort() - time.sleep(0.05) - - time.sleep(0.05) if verbose: print("%s: thread finished" % name) @@ -461,7 +465,7 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): def deadlockThread(self): self.doLockDetect = True while self.doLockDetect: - time.sleep(0.5) + time.sleep(0.05) try: aborted = self.env.lock_detect( db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT) @@ -474,28 +478,28 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): class BTreeThreadedTransactions(ThreadedTransactionsBase): dbtype = db.DB_BTREE - writers = 3 - readers = 5 - records = 2000 + writers = 2 + readers = 10 + records = 1000 class HashThreadedTransactions(ThreadedTransactionsBase): dbtype = db.DB_HASH - writers = 1 - readers = 5 - records = 2000 + writers = 2 + readers = 10 + records = 1000 class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase): dbtype = db.DB_BTREE - writers = 3 - readers = 5 - records = 2000 + writers = 2 + readers = 10 + records = 1000 txnFlag = db.DB_TXN_NOWAIT class HashThreadedNoWaitTransactions(ThreadedTransactionsBase): dbtype = db.DB_HASH - writers = 1 - readers = 5 - records = 2000 + writers = 2 + readers = 10 + records = 1000 txnFlag = db.DB_TXN_NOWAIT |