summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb/test/test_thread.py
diff options
context:
space:
mode:
authorJesus Cea <jcea@jcea.es>2008-08-31 14:12:11 (GMT)
committerJesus Cea <jcea@jcea.es>2008-08-31 14:12:11 (GMT)
commit6ba3329c274e2c7876c61f2e98d4592310d26bae (patch)
tree6bb346e892269279fa2011c3e4bd4648b273a7ae /Lib/bsddb/test/test_thread.py
parent73c96dbf34c70bbf1ef807b98d51cf9c0e9dc042 (diff)
downloadcpython-6ba3329c274e2c7876c61f2e98d4592310d26bae.zip
cpython-6ba3329c274e2c7876c61f2e98d4592310d26bae.tar.gz
cpython-6ba3329c274e2c7876c61f2e98d4592310d26bae.tar.bz2
bsddb code updated to version 4.7.3pre2. This code is the same than
Python 2.6 one, since the intention is to keep an unified 2.x/3.x codebase. The Python code is automatically translated using "2to3". Please, do not update this code in Python 3.0 by hand. Update the 2.6 one and then do "2to3".
Diffstat (limited to 'Lib/bsddb/test/test_thread.py')
-rw-r--r--Lib/bsddb/test/test_thread.py500
1 files changed, 252 insertions, 248 deletions
diff --git a/Lib/bsddb/test/test_thread.py b/Lib/bsddb/test/test_thread.py
index fbd7730..bb43034 100644
--- a/Lib/bsddb/test/test_thread.py
+++ b/Lib/bsddb/test/test_thread.py
@@ -5,17 +5,9 @@ import os
import sys
import time
import errno
-import tempfile
-from pprint import pprint
from random import random
-DASH = b'-'
-
-try:
- from threading import Thread, current_thread
- have_threads = True
-except ImportError:
- have_threads = False
+DASH = '-'
try:
WindowsError
@@ -24,19 +16,16 @@ except NameError:
pass
import unittest
-from bsddb.test.test_all import verbose
+from .test_all import db, dbutils, test_support, verbose, have_threads, \
+ get_new_environment_path, get_new_database_path
-try:
- # For Pythons w/distutils pybsddb
- from bsddb3 import db, dbutils
-except ImportError:
- # For Python 2.3
- from bsddb import db, dbutils
-
-try:
- from bsddb3 import test_support
-except ImportError:
- from test import support as test_support
+if have_threads :
+ from threading import Thread
+ import sys
+ if sys.version_info[0] < 3 :
+ from threading import currentThread
+ else :
+ from threading import current_thread as currentThread
#----------------------------------------------------------------------
@@ -47,16 +36,16 @@ class BaseThreadedTestCase(unittest.TestCase):
dbsetflags = 0
envflags = 0
+ import sys
+ if sys.version_info[:3] < (2, 4, 0):
+ def assertTrue(self, expr, msg=None):
+ self.failUnless(expr,msg=msg)
+
def setUp(self):
if verbose:
dbutils._deadlock_VerboseFile = sys.stdout
- homeDir = os.path.join(tempfile.gettempdir(), 'db_home%d'%os.getpid())
- self.homeDir = homeDir
- try:
- os.mkdir(homeDir)
- except OSError as e:
- if e.errno != errno.EEXIST: raise
+ self.homeDir = get_new_environment_path()
self.env = db.DBEnv()
self.setEnvOpts()
self.env.open(self.homeDir, self.envflags | db.DB_CREATE)
@@ -78,33 +67,6 @@ class BaseThreadedTestCase(unittest.TestCase):
def makeData(self, key):
return DASH.join([key] * 5)
- def _writerThread(self, *args, **kwargs):
- raise RuntimeError("must override this in a subclass")
-
- def _readerThread(self, *args, **kwargs):
- raise RuntimeError("must override this in a subclass")
-
- def writerThread(self, *args, **kwargs):
- try:
- self._writerThread(*args, **kwargs)
- except db.DBLockDeadlockError:
- if verbose:
- print(current_thread().name, 'died from', e)
- else:
- if verbose:
- print(current_thread().name, "finished.")
-
- def readerThread(self, *args, **kwargs):
- try:
- self._readerThread(*args, **kwargs)
- except db.DBLockDeadlockError as e:
- if verbose:
- print(current_thread().name, 'died from', e)
- else:
- if verbose:
- print(current_thread().name, "finished.")
-
-
#----------------------------------------------------------------------
@@ -121,60 +83,91 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
print('\n', '-=' * 30)
print("Running %s.test01_1WriterMultiReaders..." % \
self.__class__.__name__)
- print('Using:', self.homeDir, self.filename)
- threads = []
- wt = Thread(target = self.writerThread,
- args = (self.d, self.records),
- name = 'the writer',
- )#verbose = verbose)
- threads.append(wt)
+ keys=list(range(self.records))
+ import random
+ random.shuffle(keys)
+ records_per_writer=self.records//self.writers
+ readers_per_writer=self.readers//self.writers
+ self.assertEqual(self.records,self.writers*records_per_writer)
+ self.assertEqual(self.readers,self.writers*readers_per_writer)
+ self.assertTrue((records_per_writer%readers_per_writer)==0)
+ readers = []
for x in range(self.readers):
rt = Thread(target = self.readerThread,
args = (self.d, x),
name = 'reader %d' % x,
)#verbose = verbose)
- threads.append(rt)
-
- for t in threads:
+ import sys
+ if sys.version_info[0] < 3 :
+ rt.setDaemon(True)
+ else :
+ rt.daemon = True
+ readers.append(rt)
+
+ writers=[]
+ for x in range(self.writers):
+ a=keys[records_per_writer*x:records_per_writer*(x+1)]
+ a.sort() # Generate conflicts
+ b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
+ wt = Thread(target = self.writerThread,
+ args = (self.d, a, b),
+ name = 'writer %d' % x,
+ )#verbose = verbose)
+ writers.append(wt)
+
+ for t in writers:
+ import sys
+ if sys.version_info[0] < 3 :
+ t.setDaemon(True)
+ else :
+ t.daemon = True
t.start()
- for t in threads:
+
+ for t in writers:
t.join()
+ for t in readers:
+ t.join()
+
+ def writerThread(self, d, keys, readers):
+ import sys
+ if sys.version_info[0] < 3 :
+ name = currentThread().getName()
+ else :
+ name = currentThread().name
- def _writerThread(self, d, howMany):
- name = current_thread().name
- start = 0
- stop = howMany
if verbose:
- print(name+": creating records", start, "-", stop)
+ print("%s: creating records %d - %d" % (name, start, stop))
+
+ count=len(keys)//len(readers)
+ count2=count
+ for x in keys :
+ key = '%04d' % x
+ dbutils.DeadlockWrap(d.put, key, self.makeData(key),
+ max_retries=12)
+ if verbose and x % 100 == 0:
+ print("%s: records %d - %d finished" % (name, start, x))
- for x in range(start, stop):
- key = ('%04d' % x).encode("ascii")
- d.put(key, self.makeData(key))
- if verbose and x > start and x % 50 == 0:
- print(name+": records", start, "-", x, "finished")
+ count2-=1
+ if not count2 :
+ readers.pop().start()
+ count2=count
if verbose:
print("%s: finished creating records" % name)
-## # Each write-cursor will be exclusive, the only one that can update the DB...
-## if verbose: print "%s: deleting a few records" % name
-## c = d.cursor(flags = db.DB_WRITECURSOR)
-## for x in range(10):
-## key = int(random() * howMany) + start
-## key = '%04d' % key
-## if d.has_key(key):
-## c.set(key)
-## c.delete()
-
-## c.close()
+ if verbose:
+ print("%s: thread finished" % name)
- def _readerThread(self, d, readerNum):
- time.sleep(0.01 * readerNum)
- name = current_thread().name
+ def readerThread(self, d, readerNum):
+ import sys
+ if sys.version_info[0] < 3 :
+ name = currentThread().getName()
+ else :
+ name = currentThread().name
- for loop in range(5):
+ for i in range(5) :
c = d.cursor()
count = 0
rec = c.first()
@@ -182,24 +175,26 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
count += 1
key, data = rec
self.assertEqual(self.makeData(key), data)
- rec = c.next()
+ rec = next(c)
if verbose:
- print(name+": found", count, "records")
+ print("%s: found %d records" % (name, count))
c.close()
- time.sleep(0.05)
+
+ if verbose:
+ print("%s: thread finished" % name)
class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
dbtype = db.DB_BTREE
- writers = 1
+ writers = 2
readers = 10
records = 1000
class HashConcurrentDataStore(ConcurrentDataStoreBase):
dbtype = db.DB_HASH
- writers = 1
- readers = 0
+ writers = 2
+ readers = 10
records = 1000
@@ -208,8 +203,8 @@ class HashConcurrentDataStore(ConcurrentDataStoreBase):
class SimpleThreadedBase(BaseThreadedTestCase):
dbopenflags = db.DB_THREAD
envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
- readers = 5
- writers = 3
+ readers = 10
+ writers = 2
records = 1000
def setEnvOpts(self):
@@ -220,87 +215,98 @@ class SimpleThreadedBase(BaseThreadedTestCase):
print('\n', '-=' * 30)
print("Running %s.test02_SimpleLocks..." % self.__class__.__name__)
- threads = []
- for x in range(self.writers):
- wt = Thread(target = self.writerThread,
- args = (self.d, self.records, x),
- name = 'writer %d' % x,
- )#verbose = verbose)
- threads.append(wt)
+
+ keys=list(range(self.records))
+ import random
+ random.shuffle(keys)
+ records_per_writer=self.records//self.writers
+ readers_per_writer=self.readers//self.writers
+ self.assertEqual(self.records,self.writers*records_per_writer)
+ self.assertEqual(self.readers,self.writers*readers_per_writer)
+ self.assertTrue((records_per_writer%readers_per_writer)==0)
+
+ readers = []
for x in range(self.readers):
rt = Thread(target = self.readerThread,
args = (self.d, x),
name = 'reader %d' % x,
)#verbose = verbose)
- threads.append(rt)
-
- for t in threads:
+ import sys
+ if sys.version_info[0] < 3 :
+ rt.setDaemon(True)
+ else :
+ rt.daemon = True
+ readers.append(rt)
+
+ writers = []
+ for x in range(self.writers):
+ a=keys[records_per_writer*x:records_per_writer*(x+1)]
+ a.sort() # Generate conflicts
+ b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
+ wt = Thread(target = self.writerThread,
+ args = (self.d, a, b),
+ name = 'writer %d' % x,
+ )#verbose = verbose)
+ writers.append(wt)
+
+ for t in writers:
+ import sys
+ if sys.version_info[0] < 3 :
+ t.setDaemon(True)
+ else :
+ t.daemon = True
t.start()
- for t in threads:
+
+ for t in writers:
+ t.join()
+ for t in readers:
t.join()
- def _writerThread(self, d, howMany, writerNum):
- name = current_thread().name
- start = howMany * writerNum
- stop = howMany * (writerNum + 1) - 1
+ def writerThread(self, d, keys, readers):
+ import sys
+ if sys.version_info[0] < 3 :
+ name = currentThread().getName()
+ else :
+ name = currentThread().name
if verbose:
print("%s: creating records %d - %d" % (name, start, stop))
- # create a bunch of records
- for x in range(start, stop):
- key = ('%04d' % x).encode("ascii")
+ count=len(keys)//len(readers)
+ count2=count
+ for x in keys :
+ key = '%04d' % x
dbutils.DeadlockWrap(d.put, key, self.makeData(key),
- max_retries=20)
+ max_retries=12)
if verbose and x % 100 == 0:
print("%s: records %d - %d finished" % (name, start, x))
- # do a bit or reading too
- if random() <= 0.05:
- for y in range(start, x):
- key = ('%04d' % x).encode("ascii")
- data = dbutils.DeadlockWrap(d.get, key, max_retries=20)
- self.assertEqual(data, self.makeData(key))
-
- # flush them
- try:
- dbutils.DeadlockWrap(d.sync, max_retries=20)
- except db.DBIncompleteError as val:
- if verbose:
- print("could not complete sync()...")
-
- # read them back, deleting a few
- for x in range(start, stop):
- key = ('%04d' % x).encode("ascii")
- data = dbutils.DeadlockWrap(d.get, key, max_retries=20)
- if verbose and x % 100 == 0:
- print("%s: fetched record (%s, %s)" % (name, key, data))
- self.assertEqual(data, self.makeData(key))
- if random() <= 0.10:
- dbutils.DeadlockWrap(d.delete, key, max_retries=20)
- if verbose:
- print("%s: deleted record %s" % (name, key))
+ count2-=1
+ if not count2 :
+ readers.pop().start()
+ count2=count
if verbose:
print("%s: thread finished" % name)
- def _readerThread(self, d, readerNum):
- time.sleep(0.01 * readerNum)
- name = current_thread().name
-
- for loop in range(5):
- c = d.cursor()
- count = 0
- rec = dbutils.DeadlockWrap(c.first, max_retries=20)
- while rec:
- count += 1
- key, data = rec
- self.assertEqual(self.makeData(key), data)
- rec = dbutils.DeadlockWrap(c.next, max_retries=20)
- if verbose:
- print("%s: found %d records" % (name, count))
- c.close()
- time.sleep(0.05)
+ def readerThread(self, d, readerNum):
+ import sys
+ if sys.version_info[0] < 3 :
+ name = currentThread().getName()
+ else :
+ name = currentThread().name
+
+ c = d.cursor()
+ count = 0
+ rec = dbutils.DeadlockWrap(c.first, max_retries=10)
+ while rec:
+ count += 1
+ key, data = rec
+ self.assertEqual(self.makeData(key), data)
+ rec = dbutils.DeadlockWrap(c.__next__, max_retries=10)
+ if verbose:
+ print("%s: found %d records" % (name, count))
+ c.close()
if verbose:
print("%s: thread finished" % name)
@@ -340,120 +346,118 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
print("Running %s.test03_ThreadedTransactions..." % \
self.__class__.__name__)
- threads = []
- for x in range(self.writers):
- wt = Thread(target = self.writerThread,
- args = (self.d, self.records, x),
- name = 'writer %d' % x,
- )#verbose = verbose)
- threads.append(wt)
+ keys=list(range(self.records))
+ import random
+ random.shuffle(keys)
+ records_per_writer=self.records//self.writers
+ readers_per_writer=self.readers//self.writers
+ self.assertEqual(self.records,self.writers*records_per_writer)
+ self.assertEqual(self.readers,self.writers*readers_per_writer)
+ self.assertTrue((records_per_writer%readers_per_writer)==0)
+ readers=[]
for x in range(self.readers):
rt = Thread(target = self.readerThread,
args = (self.d, x),
name = 'reader %d' % x,
)#verbose = verbose)
- threads.append(rt)
+ import sys
+ if sys.version_info[0] < 3 :
+ rt.setDaemon(True)
+ else :
+ rt.daemon = True
+ readers.append(rt)
+
+ writers = []
+ for x in range(self.writers):
+ a=keys[records_per_writer*x:records_per_writer*(x+1)]
+ b=readers[readers_per_writer*x:readers_per_writer*(x+1)]
+ wt = Thread(target = self.writerThread,
+ args = (self.d, a, b),
+ name = 'writer %d' % x,
+ )#verbose = verbose)
+ writers.append(wt)
dt = Thread(target = self.deadlockThread)
+ import sys
+ if sys.version_info[0] < 3 :
+ dt.setDaemon(True)
+ else :
+ dt.daemon = True
dt.start()
- for t in threads:
+ for t in writers:
+ import sys
+ if sys.version_info[0] < 3 :
+ t.setDaemon(True)
+ else :
+ t.daemon = True
t.start()
- for t in threads:
+
+ for t in writers:
+ t.join()
+ for t in readers:
t.join()
self.doLockDetect = False
dt.join()
- def doWrite(self, d, name, start, stop):
- finished = False
- while not finished:
+ def writerThread(self, d, keys, readers):
+ import sys
+ if sys.version_info[0] < 3 :
+ name = currentThread().getName()
+ else :
+ name = currentThread().name
+
+ count=len(keys)//len(readers)
+ while len(keys):
try:
txn = self.env.txn_begin(None, self.txnFlag)
- for x in range(start, stop):
- key = ('%04d' % x).encode("ascii")
+ keys2=keys[:count]
+ for x in keys2 :
+ key = '%04d' % x
d.put(key, self.makeData(key), txn)
if verbose and x % 100 == 0:
print("%s: records %d - %d finished" % (name, start, x))
txn.commit()
- finished = True
+ keys=keys[count:]
+ readers.pop().start()
except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val:
if verbose:
- print("%s: Aborting transaction (%s)" % (name, val))
+ print("%s: Aborting transaction (%s)" % (name, val[1]))
txn.abort()
- time.sleep(0.05)
- def _writerThread(self, d, howMany, writerNum):
- name = current_thread().name
- start = howMany * writerNum
- stop = howMany * (writerNum + 1) - 1
if verbose:
- print("%s: creating records %d - %d" % (name, start, stop))
-
- step = 100
- for x in range(start, stop, step):
- self.doWrite(d, name, x, min(stop, x+step))
+ print("%s: thread finished" % name)
- if verbose:
- print("%s: finished creating records" % name)
- if verbose:
- print("%s: deleting a few records" % name)
+ def readerThread(self, d, readerNum):
+ import sys
+ if sys.version_info[0] < 3 :
+ name = currentThread().getName()
+ else :
+ name = currentThread().name
finished = False
while not finished:
try:
- recs = []
txn = self.env.txn_begin(None, self.txnFlag)
- for x in range(10):
- key = int(random() * howMany) + start
- key = ('%04d' % key).encode("ascii")
- data = d.get(key, None, txn, db.DB_RMW)
- if data is not None:
- d.delete(key, txn)
- recs.append(key)
+ c = d.cursor(txn)
+ count = 0
+ rec = c.first()
+ while rec:
+ count += 1
+ key, data = rec
+ self.assertEqual(self.makeData(key), data)
+ rec = next(c)
+ if verbose: print("%s: found %d records" % (name, count))
+ c.close()
txn.commit()
finished = True
- if verbose:
- print("%s: deleted records %s" % (name, recs))
except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val:
if verbose:
- print("%s: Aborting transaction (%s)" % (name, val))
+ print("%s: Aborting transaction (%s)" % (name, val[1]))
+ c.close()
txn.abort()
- time.sleep(0.05)
-
- if verbose:
- print("%s: thread finished" % name)
-
- def _readerThread(self, d, readerNum):
- time.sleep(0.01 * readerNum + 0.05)
- name = current_thread().name
-
- for loop in range(5):
- finished = False
- while not finished:
- try:
- txn = self.env.txn_begin(None, self.txnFlag)
- c = d.cursor(txn)
- count = 0
- rec = c.first()
- while rec:
- count += 1
- key, data = rec
- self.assertEqual(self.makeData(key), data)
- rec = c.next()
- if verbose: print("%s: found %d records" % (name, count))
- c.close()
- txn.commit()
- finished = True
- except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val:
- if verbose:
- print("%s: Aborting transaction (%s)" % (name, val))
- c.close()
- txn.abort()
- time.sleep(0.05)
-
- time.sleep(0.05)
if verbose:
print("%s: thread finished" % name)
@@ -461,7 +465,7 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
def deadlockThread(self):
self.doLockDetect = True
while self.doLockDetect:
- time.sleep(0.5)
+ time.sleep(0.05)
try:
aborted = self.env.lock_detect(
db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
@@ -474,28 +478,28 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
class BTreeThreadedTransactions(ThreadedTransactionsBase):
dbtype = db.DB_BTREE
- writers = 3
- readers = 5
- records = 2000
+ writers = 2
+ readers = 10
+ records = 1000
class HashThreadedTransactions(ThreadedTransactionsBase):
dbtype = db.DB_HASH
- writers = 1
- readers = 5
- records = 2000
+ writers = 2
+ readers = 10
+ records = 1000
class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
dbtype = db.DB_BTREE
- writers = 3
- readers = 5
- records = 2000
+ writers = 2
+ readers = 10
+ records = 1000
txnFlag = db.DB_TXN_NOWAIT
class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
dbtype = db.DB_HASH
- writers = 1
- readers = 5
- records = 2000
+ writers = 2
+ readers = 10
+ records = 1000
txnFlag = db.DB_TXN_NOWAIT