summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb/test/test_thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/bsddb/test/test_thread.py')
-rw-r--r--Lib/bsddb/test/test_thread.py351
1 files changed, 166 insertions, 185 deletions
diff --git a/Lib/bsddb/test/test_thread.py b/Lib/bsddb/test/test_thread.py
index 0395f6d..5ac98ce 100644
--- a/Lib/bsddb/test/test_thread.py
+++ b/Lib/bsddb/test/test_thread.py
@@ -5,7 +5,6 @@ import os
import sys
import time
import errno
-import tempfile
from random import random
try:
@@ -29,7 +28,8 @@ except NameError:
pass
import unittest
-from test_all import verbose
+from test_all import verbose, get_new_environment_path, get_new_database_path
+
try:
# For Pythons w/distutils pybsddb
@@ -52,19 +52,19 @@ class BaseThreadedTestCase(unittest.TestCase):
dbsetflags = 0
envflags = 0
+ import sys
+ if sys.version_info[:3] < (2, 4, 0):
+ def assertTrue(self, expr, msg=None):
+ self.failUnless(expr,msg=msg)
+
def setUp(self):
if verbose:
dbutils._deadlock_VerboseFile = sys.stdout
- homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid())
- self.homeDir = homeDir
- try:
- os.mkdir(homeDir)
- except OSError, e:
- if e.errno != errno.EEXIST: raise
+ self.homeDir = get_new_environment_path()
self.env = db.DBEnv()
self.setEnvOpts()
- self.env.open(homeDir, self.envflags | db.DB_CREATE)
+ self.env.open(self.homeDir, self.envflags | db.DB_CREATE)
self.filename = self.__class__.__name__ + '.db'
self.d = db.DB(self.env)
@@ -100,63 +100,73 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
print "Running %s.test01_1WriterMultiReaders..." % \
self.__class__.__name__
- threads = []
- for x in range(self.writers):
- wt = Thread(target = self.writerThread,
- args = (self.d, self.records, x),
- name = 'writer %d' % x,
- )#verbose = verbose)
- threads.append(wt)
-
- for x in range(self.readers):
+ keys=range(self.records)
+ import random
+ random.shuffle(keys)
+ records_per_writer=self.records/self.writers
+ readers_per_writer=self.readers/self.writers
+ self.assertEqual(self.records,self.writers*records_per_writer)
+ self.assertEqual(self.readers,self.writers*readers_per_writer)
+ self.assertTrue((records_per_writer%readers_per_writer)==0)
+ readers = []
+
+ for x in xrange(self.readers):
rt = Thread(target = self.readerThread,
args = (self.d, x),
name = 'reader %d' % x,
)#verbose = verbose)
- threads.append(rt)
+ rt.setDaemon(True)
+ readers.append(rt)
+
+ writers=[]
+ for x in xrange(self.writers):
+ a=keys[records_per_writer*x:records_per_writer*(x+1)]
+ a.sort() # Generate conflicts
+ b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
+ wt = Thread(target = self.writerThread,
+ args = (self.d, a, b),
+ name = 'writer %d' % x,
+ )#verbose = verbose)
+ writers.append(wt)
- for t in threads:
+ for t in writers:
+ t.setDaemon(True)
t.start()
- for t in threads:
+
+ for t in writers:
+ t.join()
+ for t in readers:
t.join()
- def writerThread(self, d, howMany, writerNum):
- #time.sleep(0.01 * writerNum + 0.01)
+ def writerThread(self, d, keys, readers):
name = currentThread().getName()
- start = howMany * writerNum
- stop = howMany * (writerNum + 1) - 1
if verbose:
print "%s: creating records %d - %d" % (name, start, stop)
- for x in range(start, stop):
+ count=len(keys)/len(readers)
+ count2=count
+ for x in keys :
key = '%04d' % x
dbutils.DeadlockWrap(d.put, key, self.makeData(key),
max_retries=12)
if verbose and x % 100 == 0:
print "%s: records %d - %d finished" % (name, start, x)
+ count2-=1
+ if not count2 :
+ readers.pop().start()
+ count2=count
+
if verbose:
print "%s: finished creating records" % name
-## # Each write-cursor will be exclusive, the only one that can update the DB...
-## if verbose: print "%s: deleting a few records" % name
-## c = d.cursor(flags = db.DB_WRITECURSOR)
-## for x in range(10):
-## key = int(random() * howMany) + start
-## key = '%04d' % key
-## if d.has_key(key):
-## c.set(key)
-## c.delete()
-
-## c.close()
if verbose:
print "%s: thread finished" % name
def readerThread(self, d, readerNum):
- time.sleep(0.01 * readerNum)
name = currentThread().getName()
- for loop in range(5):
+ for i in xrange(5) :
c = d.cursor()
count = 0
rec = c.first()
@@ -168,7 +178,6 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
if verbose:
print "%s: found %d records" % (name, count)
c.close()
- time.sleep(0.05)
if verbose:
print "%s: thread finished" % name
@@ -193,8 +202,8 @@ class HashConcurrentDataStore(ConcurrentDataStoreBase):
class SimpleThreadedBase(BaseThreadedTestCase):
dbopenflags = db.DB_THREAD
envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
- readers = 5
- writers = 3
+ readers = 10
+ writers = 2
records = 1000
def setEnvOpts(self):
@@ -205,34 +214,53 @@ class SimpleThreadedBase(BaseThreadedTestCase):
print '\n', '-=' * 30
print "Running %s.test02_SimpleLocks..." % self.__class__.__name__
- threads = []
- for x in range(self.writers):
- wt = Thread(target = self.writerThread,
- args = (self.d, self.records, x),
- name = 'writer %d' % x,
- )#verbose = verbose)
- threads.append(wt)
- for x in range(self.readers):
+
+ keys=range(self.records)
+ import random
+ random.shuffle(keys)
+ records_per_writer=self.records/self.writers
+ readers_per_writer=self.readers/self.writers
+ self.assertEqual(self.records,self.writers*records_per_writer)
+ self.assertEqual(self.readers,self.writers*readers_per_writer)
+ self.assertTrue((records_per_writer%readers_per_writer)==0)
+
+ readers = []
+ for x in xrange(self.readers):
rt = Thread(target = self.readerThread,
args = (self.d, x),
name = 'reader %d' % x,
)#verbose = verbose)
- threads.append(rt)
+ rt.setDaemon(True)
+ readers.append(rt)
+
+ writers = []
+ for x in xrange(self.writers):
+ a=keys[records_per_writer*x:records_per_writer*(x+1)]
+ a.sort() # Generate conflicts
+ b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
+ wt = Thread(target = self.writerThread,
+ args = (self.d, a, b),
+ name = 'writer %d' % x,
+ )#verbose = verbose)
+ writers.append(wt)
- for t in threads:
+ for t in writers:
+ t.setDaemon(True)
t.start()
- for t in threads:
+
+ for t in writers:
+ t.join()
+ for t in readers:
t.join()
- def writerThread(self, d, howMany, writerNum):
+ def writerThread(self, d, keys, readers):
name = currentThread().getName()
- start = howMany * writerNum
- stop = howMany * (writerNum + 1) - 1
if verbose:
print "%s: creating records %d - %d" % (name, start, stop)
- # create a bunch of records
- for x in xrange(start, stop):
+ count=len(keys)/len(readers)
+ count2=count
+ for x in keys :
key = '%04d' % x
dbutils.DeadlockWrap(d.put, key, self.makeData(key),
max_retries=12)
@@ -240,52 +268,28 @@ class SimpleThreadedBase(BaseThreadedTestCase):
if verbose and x % 100 == 0:
print "%s: records %d - %d finished" % (name, start, x)
- # do a bit or reading too
- if random() <= 0.05:
- for y in xrange(start, x):
- key = '%04d' % x
- data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
- self.assertEqual(data, self.makeData(key))
-
- # flush them
- try:
- dbutils.DeadlockWrap(d.sync, max_retries=12)
- except db.DBIncompleteError, val:
- if verbose:
- print "could not complete sync()..."
-
- # read them back, deleting a few
- for x in xrange(start, stop):
- key = '%04d' % x
- data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
- if verbose and x % 100 == 0:
- print "%s: fetched record (%s, %s)" % (name, key, data)
- self.assertEqual(data, self.makeData(key))
- if random() <= 0.10:
- dbutils.DeadlockWrap(d.delete, key, max_retries=12)
- if verbose:
- print "%s: deleted record %s" % (name, key)
+ count2-=1
+ if not count2 :
+ readers.pop().start()
+ count2=count
if verbose:
print "%s: thread finished" % name
def readerThread(self, d, readerNum):
- time.sleep(0.01 * readerNum)
name = currentThread().getName()
- for loop in range(5):
- c = d.cursor()
- count = 0
- rec = dbutils.DeadlockWrap(c.first, max_retries=10)
- while rec:
- count += 1
- key, data = rec
- self.assertEqual(self.makeData(key), data)
- rec = dbutils.DeadlockWrap(c.next, max_retries=10)
- if verbose:
- print "%s: found %d records" % (name, count)
- c.close()
- time.sleep(0.05)
+ c = d.cursor()
+ count = 0
+ rec = dbutils.DeadlockWrap(c.first, max_retries=10)
+ while rec:
+ count += 1
+ key, data = rec
+ self.assertEqual(self.makeData(key), data)
+ rec = dbutils.DeadlockWrap(c.next, max_retries=10)
+ if verbose:
+ print "%s: found %d records" % (name, count)
+ c.close()
if verbose:
print "%s: thread finished" % name
@@ -325,120 +329,97 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
print "Running %s.test03_ThreadedTransactions..." % \
self.__class__.__name__
- threads = []
- for x in range(self.writers):
- wt = Thread(target = self.writerThread,
- args = (self.d, self.records, x),
- name = 'writer %d' % x,
- )#verbose = verbose)
- threads.append(wt)
-
- for x in range(self.readers):
+ keys=range(self.records)
+ import random
+ random.shuffle(keys)
+ records_per_writer=self.records/self.writers
+ readers_per_writer=self.readers/self.writers
+ self.assertEqual(self.records,self.writers*records_per_writer)
+ self.assertEqual(self.readers,self.writers*readers_per_writer)
+ self.assertTrue((records_per_writer%readers_per_writer)==0)
+
+ readers=[]
+ for x in xrange(self.readers):
rt = Thread(target = self.readerThread,
args = (self.d, x),
name = 'reader %d' % x,
)#verbose = verbose)
- threads.append(rt)
+ rt.setDaemon(True)
+ readers.append(rt)
+
+ writers = []
+ for x in xrange(self.writers):
+ a=keys[records_per_writer*x:records_per_writer*(x+1)]
+ b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
+ wt = Thread(target = self.writerThread,
+ args = (self.d, a, b),
+ name = 'writer %d' % x,
+ )#verbose = verbose)
+ writers.append(wt)
dt = Thread(target = self.deadlockThread)
+ dt.setDaemon(True)
dt.start()
- for t in threads:
+ for t in writers:
+ t.setDaemon(True)
t.start()
- for t in threads:
+
+ for t in writers:
+ t.join()
+ for t in readers:
t.join()
self.doLockDetect = False
dt.join()
- def doWrite(self, d, name, start, stop):
- finished = False
- while not finished:
+ def writerThread(self, d, keys, readers):
+ name = currentThread().getName()
+ count=len(keys)/len(readers)
+ while len(keys):
try:
txn = self.env.txn_begin(None, self.txnFlag)
- for x in range(start, stop):
+ keys2=keys[:count]
+ for x in keys2 :
key = '%04d' % x
d.put(key, self.makeData(key), txn)
if verbose and x % 100 == 0:
print "%s: records %d - %d finished" % (name, start, x)
txn.commit()
- finished = True
+ keys=keys[count:]
+ readers.pop().start()
except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
if verbose:
print "%s: Aborting transaction (%s)" % (name, val[1])
txn.abort()
- time.sleep(0.05)
- def writerThread(self, d, howMany, writerNum):
- name = currentThread().getName()
- start = howMany * writerNum
- stop = howMany * (writerNum + 1) - 1
if verbose:
- print "%s: creating records %d - %d" % (name, start, stop)
-
- step = 100
- for x in range(start, stop, step):
- self.doWrite(d, name, x, min(stop, x+step))
+ print "%s: thread finished" % name
- if verbose:
- print "%s: finished creating records" % name
- if verbose:
- print "%s: deleting a few records" % name
+ def readerThread(self, d, readerNum):
+ name = currentThread().getName()
finished = False
while not finished:
try:
- recs = []
txn = self.env.txn_begin(None, self.txnFlag)
- for x in range(10):
- key = int(random() * howMany) + start
- key = '%04d' % key
- data = d.get(key, None, txn, db.DB_RMW)
- if data is not None:
- d.delete(key, txn)
- recs.append(key)
+ c = d.cursor(txn)
+ count = 0
+ rec = c.first()
+ while rec:
+ count += 1
+ key, data = rec
+ self.assertEqual(self.makeData(key), data)
+ rec = c.next()
+ if verbose: print "%s: found %d records" % (name, count)
+ c.close()
txn.commit()
finished = True
- if verbose:
- print "%s: deleted records %s" % (name, recs)
except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
if verbose:
print "%s: Aborting transaction (%s)" % (name, val[1])
+ c.close()
txn.abort()
- time.sleep(0.05)
-
- if verbose:
- print "%s: thread finished" % name
-
- def readerThread(self, d, readerNum):
- time.sleep(0.01 * readerNum + 0.05)
- name = currentThread().getName()
-
- for loop in range(5):
- finished = False
- while not finished:
- try:
- txn = self.env.txn_begin(None, self.txnFlag)
- c = d.cursor(txn)
- count = 0
- rec = c.first()
- while rec:
- count += 1
- key, data = rec
- self.assertEqual(self.makeData(key), data)
- rec = c.next()
- if verbose: print "%s: found %d records" % (name, count)
- c.close()
- txn.commit()
- finished = True
- except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
- if verbose:
- print "%s: Aborting transaction (%s)" % (name, val[1])
- c.close()
- txn.abort()
- time.sleep(0.05)
-
- time.sleep(0.05)
if verbose:
print "%s: thread finished" % name
@@ -446,7 +427,7 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
def deadlockThread(self):
self.doLockDetect = True
while self.doLockDetect:
- time.sleep(0.5)
+ time.sleep(0.05)
try:
aborted = self.env.lock_detect(
db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
@@ -459,28 +440,28 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
class BTreeThreadedTransactions(ThreadedTransactionsBase):
dbtype = db.DB_BTREE
- writers = 3
- readers = 5
- records = 2000
+ writers = 2
+ readers = 10
+ records = 1000
class HashThreadedTransactions(ThreadedTransactionsBase):
dbtype = db.DB_HASH
- writers = 1
- readers = 5
- records = 2000
+ writers = 2
+ readers = 10
+ records = 1000
class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
dbtype = db.DB_BTREE
- writers = 3
- readers = 5
- records = 2000
+ writers = 2
+ readers = 10
+ records = 1000
txnFlag = db.DB_TXN_NOWAIT
class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
dbtype = db.DB_HASH
- writers = 1
- readers = 5
- records = 2000
+ writers = 2
+ readers = 10
+ records = 1000
txnFlag = db.DB_TXN_NOWAIT