summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb/test/test_thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/bsddb/test/test_thread.py')
-rw-r--r--Lib/bsddb/test/test_thread.py500
1 files changed, 252 insertions, 248 deletions
diff --git a/Lib/bsddb/test/test_thread.py b/Lib/bsddb/test/test_thread.py
index fbd7730..bb43034 100644
--- a/Lib/bsddb/test/test_thread.py
+++ b/Lib/bsddb/test/test_thread.py
@@ -5,17 +5,9 @@ import os
import sys
import time
import errno
-import tempfile
-from pprint import pprint
from random import random
-DASH = b'-'
-
-try:
- from threading import Thread, current_thread
- have_threads = True
-except ImportError:
- have_threads = False
+DASH = '-'
try:
WindowsError
@@ -24,19 +16,16 @@ except NameError:
pass
import unittest
-from bsddb.test.test_all import verbose
+from .test_all import db, dbutils, test_support, verbose, have_threads, \
+ get_new_environment_path, get_new_database_path
-try:
- # For Pythons w/distutils pybsddb
- from bsddb3 import db, dbutils
-except ImportError:
- # For Python 2.3
- from bsddb import db, dbutils
-
-try:
- from bsddb3 import test_support
-except ImportError:
- from test import support as test_support
+if have_threads :
+ from threading import Thread
+ import sys
+ if sys.version_info[0] < 3 :
+ from threading import currentThread
+ else :
+ from threading import current_thread as currentThread
#----------------------------------------------------------------------
@@ -47,16 +36,16 @@ class BaseThreadedTestCase(unittest.TestCase):
dbsetflags = 0
envflags = 0
+ import sys
+ if sys.version_info[:3] < (2, 4, 0):
+ def assertTrue(self, expr, msg=None):
+ self.failUnless(expr,msg=msg)
+
def setUp(self):
if verbose:
dbutils._deadlock_VerboseFile = sys.stdout
- homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid())
- self.homeDir = homeDir
- try:
- os.mkdir(homeDir)
- except OSError as e:
- if e.errno != errno.EEXIST: raise
+ self.homeDir = get_new_environment_path()
self.env = db.DBEnv()
self.setEnvOpts()
self.env.open(self.homeDir, self.envflags | db.DB_CREATE)
@@ -78,33 +67,6 @@ class BaseThreadedTestCase(unittest.TestCase):
def makeData(self, key):
return DASH.join([key] * 5)
- def _writerThread(self, *args, **kwargs):
- raise RuntimeError("must override this in a subclass")
-
- def _readerThread(self, *args, **kwargs):
- raise RuntimeError("must override this in a subclass")
-
- def writerThread(self, *args, **kwargs):
- try:
- self._writerThread(*args, **kwargs)
- except db.DBLockDeadlockError:
- if verbose:
- print(current_thread().name, 'died from', e)
- else:
- if verbose:
- print(current_thread().name, "finished.")
-
- def readerThread(self, *args, **kwargs):
- try:
- self._readerThread(*args, **kwargs)
- except db.DBLockDeadlockError as e:
- if verbose:
- print(current_thread().name, 'died from', e)
- else:
- if verbose:
- print(current_thread().name, "finished.")
-
-
#----------------------------------------------------------------------
@@ -121,60 +83,91 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
print('\n', '-=' * 30)
print("Running %s.test01_1WriterMultiReaders..." % \
self.__class__.__name__)
- print('Using:', self.homeDir, self.filename)
- threads = []
- wt = Thread(target = self.writerThread,
- args = (self.d, self.records),
- name = 'the writer',
- )#verbose = verbose)
- threads.append(wt)
+ keys=list(range(self.records))
+ import random
+ random.shuffle(keys)
+ records_per_writer=self.records//self.writers
+ readers_per_writer=self.readers//self.writers
+ self.assertEqual(self.records,self.writers*records_per_writer)
+ self.assertEqual(self.readers,self.writers*readers_per_writer)
+ self.assertTrue((records_per_writer%readers_per_writer)==0)
+ readers = []
for x in range(self.readers):
rt = Thread(target = self.readerThread,
args = (self.d, x),
name = 'reader %d' % x,
)#verbose = verbose)
- threads.append(rt)
-
- for t in threads:
+ import sys
+ if sys.version_info[0] < 3 :
+ rt.setDaemon(True)
+ else :
+ rt.daemon = True
+ readers.append(rt)
+
+ writers=[]
+ for x in range(self.writers):
+ a=keys[records_per_writer*x:records_per_writer*(x+1)]
+ a.sort() # Generate conflicts
+ b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
+ wt = Thread(target = self.writerThread,
+ args = (self.d, a, b),
+ name = 'writer %d' % x,
+ )#verbose = verbose)
+ writers.append(wt)
+
+ for t in writers:
+ import sys
+ if sys.version_info[0] < 3 :
+ t.setDaemon(True)
+ else :
+ t.daemon = True
t.start()
- for t in threads:
+
+ for t in writers:
t.join()
+ for t in readers:
+ t.join()
+
+ def writerThread(self, d, keys, readers):
+ import sys
+ if sys.version_info[0] < 3 :
+ name = currentThread().getName()
+ else :
+ name = currentThread().name
- def _writerThread(self, d, howMany):
- name = current_thread().name
- start = 0
- stop = howMany
if verbose:
- print(name+": creating records", start, "-", stop)
+ print("%s: creating records %d - %d" % (name, start, stop))
+
+ count=len(keys)//len(readers)
+ count2=count
+ for x in keys :
+ key = '%04d' % x
+ dbutils.DeadlockWrap(d.put, key, self.makeData(key),
+ max_retries=12)
+ if verbose and x % 100 == 0:
+ print("%s: records %d - %d finished" % (name, start, x))
- for x in range(start, stop):
- key = ('%04d' % x).encode("ascii")
- d.put(key, self.makeData(key))
- if verbose and x > start and x % 50 == 0:
- print(name+": records", start, "-", x, "finished")
+ count2-=1
+ if not count2 :
+ readers.pop().start()
+ count2=count
if verbose:
print("%s: finished creating records" % name)
-## # Each write-cursor will be exclusive, the only one that can update the DB...
-## if verbose: print "%s: deleting a few records" % name
-## c = d.cursor(flags = db.DB_WRITECURSOR)
-## for x in range(10):
-## key = int(random() * howMany) + start
-## key = '%04d' % key
-## if d.has_key(key):
-## c.set(key)
-## c.delete()
-
-## c.close()
+ if verbose:
+ print("%s: thread finished" % name)
- def _readerThread(self, d, readerNum):
- time.sleep(0.01 * readerNum)
- name = current_thread().name
+ def readerThread(self, d, readerNum):
+ import sys
+ if sys.version_info[0] < 3 :
+ name = currentThread().getName()
+ else :
+ name = currentThread().name
- for loop in range(5):
+ for i in range(5) :
c = d.cursor()
count = 0
rec = c.first()
@@ -182,24 +175,26 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
count += 1
key, data = rec
self.assertEqual(self.makeData(key), data)
- rec = c.next()
+ rec = next(c)
if verbose:
- print(name+": found", count, "records")
+ print("%s: found %d records" % (name, count))
c.close()
- time.sleep(0.05)
+
+ if verbose:
+ print("%s: thread finished" % name)
class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
dbtype = db.DB_BTREE
- writers = 1
+ writers = 2
readers = 10
records = 1000
class HashConcurrentDataStore(ConcurrentDataStoreBase):
dbtype = db.DB_HASH
- writers = 1
- readers = 0
+ writers = 2
+ readers = 10
records = 1000
@@ -208,8 +203,8 @@ class HashConcurrentDataStore(ConcurrentDataStoreBase):
class SimpleThreadedBase(BaseThreadedTestCase):
dbopenflags = db.DB_THREAD
envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
- readers = 5
- writers = 3
+ readers = 10
+ writers = 2
records = 1000
def setEnvOpts(self):
@@ -220,87 +215,98 @@ class SimpleThreadedBase(BaseThreadedTestCase):
print('\n', '-=' * 30)
print("Running %s.test02_SimpleLocks..." % self.__class__.__name__)
- threads = []
- for x in range(self.writers):
- wt = Thread(target = self.writerThread,
- args = (self.d, self.records, x),
- name = 'writer %d' % x,
- )#verbose = verbose)
- threads.append(wt)
+
+ keys=list(range(self.records))
+ import random
+ random.shuffle(keys)
+ records_per_writer=self.records//self.writers
+ readers_per_writer=self.readers//self.writers
+ self.assertEqual(self.records,self.writers*records_per_writer)
+ self.assertEqual(self.readers,self.writers*readers_per_writer)
+ self.assertTrue((records_per_writer%readers_per_writer)==0)
+
+ readers = []
for x in range(self.readers):
rt = Thread(target = self.readerThread,
args = (self.d, x),
name = 'reader %d' % x,
)#verbose = verbose)
- threads.append(rt)
-
- for t in threads:
+ import sys
+ if sys.version_info[0] < 3 :
+ rt.setDaemon(True)
+ else :
+ rt.daemon = True
+ readers.append(rt)
+
+ writers = []
+ for x in range(self.writers):
+ a=keys[records_per_writer*x:records_per_writer*(x+1)]
+ a.sort() # Generate conflicts
+ b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
+ wt = Thread(target = self.writerThread,
+ args = (self.d, a, b),
+ name = 'writer %d' % x,
+ )#verbose = verbose)
+ writers.append(wt)
+
+ for t in writers:
+ import sys
+ if sys.version_info[0] < 3 :
+ t.setDaemon(True)
+ else :
+ t.daemon = True
t.start()
- for t in threads:
+
+ for t in writers:
+ t.join()
+ for t in readers:
t.join()
- def _writerThread(self, d, howMany, writerNum):
- name = current_thread().name
- start = howMany * writerNum
- stop = howMany * (writerNum + 1) - 1
+ def writerThread(self, d, keys, readers):
+ import sys
+ if sys.version_info[0] < 3 :
+ name = currentThread().getName()
+ else :
+ name = currentThread().name
if verbose:
print("%s: creating records %d - %d" % (name, start, stop))
- # create a bunch of records
- for x in range(start, stop):
- key = ('%04d' % x).encode("ascii")
+ count=len(keys)//len(readers)
+ count2=count
+ for x in keys :
+ key = '%04d' % x
dbutils.DeadlockWrap(d.put, key, self.makeData(key),
- max_retries=20)
+ max_retries=12)
if verbose and x % 100 == 0:
print("%s: records %d - %d finished" % (name, start, x))
- # do a bit or reading too
- if random() <= 0.05:
- for y in range(start, x):
- key = ('%04d' % x).encode("ascii")
- data = dbutils.DeadlockWrap(d.get, key, max_retries=20)
- self.assertEqual(data, self.makeData(key))
-
- # flush them
- try:
- dbutils.DeadlockWrap(d.sync, max_retries=20)
- except db.DBIncompleteError as val:
- if verbose:
- print("could not complete sync()...")
-
- # read them back, deleting a few
- for x in range(start, stop):
- key = ('%04d' % x).encode("ascii")
- data = dbutils.DeadlockWrap(d.get, key, max_retries=20)
- if verbose and x % 100 == 0:
- print("%s: fetched record (%s, %s)" % (name, key, data))
- self.assertEqual(data, self.makeData(key))
- if random() <= 0.10:
- dbutils.DeadlockWrap(d.delete, key, max_retries=20)
- if verbose:
- print("%s: deleted record %s" % (name, key))
+ count2-=1
+ if not count2 :
+ readers.pop().start()
+ count2=count
if verbose:
print("%s: thread finished" % name)
- def _readerThread(self, d, readerNum):
- time.sleep(0.01 * readerNum)
- name = current_thread().name
-
- for loop in range(5):
- c = d.cursor()
- count = 0
- rec = dbutils.DeadlockWrap(c.first, max_retries=20)
- while rec:
- count += 1
- key, data = rec
- self.assertEqual(self.makeData(key), data)
- rec = dbutils.DeadlockWrap(c.next, max_retries=20)
- if verbose:
- print("%s: found %d records" % (name, count))
- c.close()
- time.sleep(0.05)
+ def readerThread(self, d, readerNum):
+ import sys
+ if sys.version_info[0] < 3 :
+ name = currentThread().getName()
+ else :
+ name = currentThread().name
+
+ c = d.cursor()
+ count = 0
+ rec = dbutils.DeadlockWrap(c.first, max_retries=10)
+ while rec:
+ count += 1
+ key, data = rec
+ self.assertEqual(self.makeData(key), data)
+ rec = dbutils.DeadlockWrap(c.__next__, max_retries=10)
+ if verbose:
+ print("%s: found %d records" % (name, count))
+ c.close()
if verbose:
print("%s: thread finished" % name)
@@ -340,120 +346,118 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
print("Running %s.test03_ThreadedTransactions..." % \
self.__class__.__name__)
- threads = []
- for x in range(self.writers):
- wt = Thread(target = self.writerThread,
- args = (self.d, self.records, x),
- name = 'writer %d' % x,
- )#verbose = verbose)
- threads.append(wt)
+ keys=list(range(self.records))
+ import random
+ random.shuffle(keys)
+ records_per_writer=self.records//self.writers
+ readers_per_writer=self.readers//self.writers
+ self.assertEqual(self.records,self.writers*records_per_writer)
+ self.assertEqual(self.readers,self.writers*readers_per_writer)
+ self.assertTrue((records_per_writer%readers_per_writer)==0)
+ readers=[]
for x in range(self.readers):
rt = Thread(target = self.readerThread,
args = (self.d, x),
name = 'reader %d' % x,
)#verbose = verbose)
- threads.append(rt)
+ import sys
+ if sys.version_info[0] < 3 :
+ rt.setDaemon(True)
+ else :
+ rt.daemon = True
+ readers.append(rt)
+
+ writers = []
+ for x in range(self.writers):
+ a=keys[records_per_writer*x:records_per_writer*(x+1)]
+ b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
+ wt = Thread(target = self.writerThread,
+ args = (self.d, a, b),
+ name = 'writer %d' % x,
+ )#verbose = verbose)
+ writers.append(wt)
dt = Thread(target = self.deadlockThread)
+ import sys
+ if sys.version_info[0] < 3 :
+ dt.setDaemon(True)
+ else :
+ dt.daemon = True
dt.start()
- for t in threads:
+ for t in writers:
+ import sys
+ if sys.version_info[0] < 3 :
+ t.setDaemon(True)
+ else :
+ t.daemon = True
t.start()
- for t in threads:
+
+ for t in writers:
+ t.join()
+ for t in readers:
t.join()
self.doLockDetect = False
dt.join()
- def doWrite(self, d, name, start, stop):
- finished = False
- while not finished:
+ def writerThread(self, d, keys, readers):
+ import sys
+ if sys.version_info[0] < 3 :
+ name = currentThread().getName()
+ else :
+ name = currentThread().name
+
+ count=len(keys)//len(readers)
+ while len(keys):
try:
txn = self.env.txn_begin(None, self.txnFlag)
- for x in range(start, stop):
- key = ('%04d' % x).encode("ascii")
+ keys2=keys[:count]
+ for x in keys2 :
+ key = '%04d' % x
d.put(key, self.makeData(key), txn)
if verbose and x % 100 == 0:
print("%s: records %d - %d finished" % (name, start, x))
txn.commit()
- finished = True
+ keys=keys[count:]
+ readers.pop().start()
except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val:
if verbose:
- print("%s: Aborting transaction (%s)" % (name, val))
+ print("%s: Aborting transaction (%s)" % (name, val[1]))
txn.abort()
- time.sleep(0.05)
- def _writerThread(self, d, howMany, writerNum):
- name = current_thread().name
- start = howMany * writerNum
- stop = howMany * (writerNum + 1) - 1
if verbose:
- print("%s: creating records %d - %d" % (name, start, stop))
-
- step = 100
- for x in range(start, stop, step):
- self.doWrite(d, name, x, min(stop, x+step))
+ print("%s: thread finished" % name)
- if verbose:
- print("%s: finished creating records" % name)
- if verbose:
- print("%s: deleting a few records" % name)
+ def readerThread(self, d, readerNum):
+ import sys
+ if sys.version_info[0] < 3 :
+ name = currentThread().getName()
+ else :
+ name = currentThread().name
finished = False
while not finished:
try:
- recs = []
txn = self.env.txn_begin(None, self.txnFlag)
- for x in range(10):
- key = int(random() * howMany) + start
- key = ('%04d' % key).encode("ascii")
- data = d.get(key, None, txn, db.DB_RMW)
- if data is not None:
- d.delete(key, txn)
- recs.append(key)
+ c = d.cursor(txn)
+ count = 0
+ rec = c.first()
+ while rec:
+ count += 1
+ key, data = rec
+ self.assertEqual(self.makeData(key), data)
+ rec = next(c)
+ if verbose: print("%s: found %d records" % (name, count))
+ c.close()
txn.commit()
finished = True
- if verbose:
- print("%s: deleted records %s" % (name, recs))
except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val:
if verbose:
- print("%s: Aborting transaction (%s)" % (name, val))
+ print("%s: Aborting transaction (%s)" % (name, val[1]))
+ c.close()
txn.abort()
- time.sleep(0.05)
-
- if verbose:
- print("%s: thread finished" % name)
-
- def _readerThread(self, d, readerNum):
- time.sleep(0.01 * readerNum + 0.05)
- name = current_thread().name
-
- for loop in range(5):
- finished = False
- while not finished:
- try:
- txn = self.env.txn_begin(None, self.txnFlag)
- c = d.cursor(txn)
- count = 0
- rec = c.first()
- while rec:
- count += 1
- key, data = rec
- self.assertEqual(self.makeData(key), data)
- rec = c.next()
- if verbose: print("%s: found %d records" % (name, count))
- c.close()
- txn.commit()
- finished = True
- except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val:
- if verbose:
- print("%s: Aborting transaction (%s)" % (name, val))
- c.close()
- txn.abort()
- time.sleep(0.05)
-
- time.sleep(0.05)
if verbose:
print("%s: thread finished" % name)
@@ -461,7 +465,7 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
def deadlockThread(self):
self.doLockDetect = True
while self.doLockDetect:
- time.sleep(0.5)
+ time.sleep(0.05)
try:
aborted = self.env.lock_detect(
db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
@@ -474,28 +478,28 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
class BTreeThreadedTransactions(ThreadedTransactionsBase):
dbtype = db.DB_BTREE
- writers = 3
- readers = 5
- records = 2000
+ writers = 2
+ readers = 10
+ records = 1000
class HashThreadedTransactions(ThreadedTransactionsBase):
dbtype = db.DB_HASH
- writers = 1
- readers = 5
- records = 2000
+ writers = 2
+ readers = 10
+ records = 1000
class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
dbtype = db.DB_BTREE
- writers = 3
- readers = 5
- records = 2000
+ writers = 2
+ readers = 10
+ records = 1000
txnFlag = db.DB_TXN_NOWAIT
class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
dbtype = db.DB_HASH
- writers = 1
- readers = 5
- records = 2000
+ writers = 2
+ readers = 10
+ records = 1000
txnFlag = db.DB_TXN_NOWAIT