summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb/test/test_thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/bsddb/test/test_thread.py')
-rw-r--r--Lib/bsddb/test/test_thread.py140
1 files changed, 69 insertions, 71 deletions
diff --git a/Lib/bsddb/test/test_thread.py b/Lib/bsddb/test/test_thread.py
index fb2ba1c..14f8d26 100644
--- a/Lib/bsddb/test/test_thread.py
+++ b/Lib/bsddb/test/test_thread.py
@@ -1,23 +1,31 @@
-"""
-TestCases for multi-threaded access to a DB.
+"""TestCases for multi-threaded access to a DB.
"""
-import sys, os, string
-import tempfile
+import os
+import sys
import time
+import errno
+import shutil
+import tempfile
from pprint import pprint
from whrandom import random
try:
+ True, False
+except NameError:
+ True = 1
+ False = 0
+
+DASH = '-'
+
+try:
from threading import Thread, currentThread
- have_threads = 1
+ have_threads = True
except ImportError:
- have_threads = 0
-
+ have_threads = False
import unittest
from test_all import verbose
-
from bsddb import db, dbutils
#----------------------------------------------------------------------
@@ -28,15 +36,16 @@ class BaseThreadedTestCase(unittest.TestCase):
dbsetflags = 0
envflags = 0
-
def setUp(self):
if verbose:
dbutils._deadlock_VerboseFile = sys.stdout
homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
self.homeDir = homeDir
- try: os.mkdir(homeDir)
- except os.error: pass
+ try:
+ os.mkdir(homeDir)
+ except OSError, e:
+ if e.errno <> errno.EEXIST: raise
self.env = db.DBEnv()
self.setEnvOpts()
self.env.open(homeDir, self.envflags | db.DB_CREATE)
@@ -47,22 +56,16 @@ class BaseThreadedTestCase(unittest.TestCase):
self.d.set_flags(self.dbsetflags)
self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE)
-
def tearDown(self):
self.d.close()
self.env.close()
- import glob
- files = glob.glob(os.path.join(self.homeDir, '*'))
- for file in files:
- os.remove(file)
-
+ shutil.rmtree(self.homeDir)
def setEnvOpts(self):
pass
-
def makeData(self, key):
- return string.join([key] * 5, '-')
+ return DASH.join([key] * 5)
#----------------------------------------------------------------------
@@ -75,7 +78,6 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
writers = 0
records = 1000
-
def test01_1WriterMultiReaders(self):
if verbose:
print '\n', '-=' * 30
@@ -102,11 +104,11 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
for t in threads:
t.join()
-
def writerThread(self, d, howMany, writerNum):
#time.sleep(0.01 * writerNum + 0.01)
name = currentThread().getName()
- start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1
+ start = howMany * writerNum
+ stop = howMany * (writerNum + 1) - 1
if verbose:
print "%s: creating records %d - %d" % (name, start, stop)
@@ -117,7 +119,8 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
if verbose and x % 100 == 0:
print "%s: records %d - %d finished" % (name, start, x)
- if verbose: print "%s: finished creating records" % name
+ if verbose:
+ print "%s: finished creating records" % name
## # Each write-cursor will be exclusive, the only one that can update the DB...
## if verbose: print "%s: deleting a few records" % name
@@ -130,8 +133,8 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
## c.delete()
## c.close()
- if verbose: print "%s: thread finished" % name
-
+ if verbose:
+ print "%s: thread finished" % name
def readerThread(self, d, readerNum):
time.sleep(0.01 * readerNum)
@@ -142,16 +145,17 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
count = 0
rec = c.first()
while rec:
- count = count + 1
+ count += 1
key, data = rec
- assert self.makeData(key) == data
+ self.assertEqual(self.makeData(key), data)
rec = c.next()
- if verbose: print "%s: found %d records" % (name, count)
+ if verbose:
+ print "%s: found %d records" % (name, count)
c.close()
time.sleep(0.05)
- if verbose: print "%s: thread finished" % name
-
+ if verbose:
+ print "%s: thread finished" % name
class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
@@ -167,6 +171,7 @@ class HashConcurrentDataStore(ConcurrentDataStoreBase):
readers = 10
records = 1000
+
#----------------------------------------------------------------------
class SimpleThreadedBase(BaseThreadedTestCase):
@@ -176,11 +181,9 @@ class SimpleThreadedBase(BaseThreadedTestCase):
writers = 3
records = 1000
-
def setEnvOpts(self):
self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
-
def test02_SimpleLocks(self):
if verbose:
print '\n', '-=' * 30
@@ -205,11 +208,10 @@ class SimpleThreadedBase(BaseThreadedTestCase):
for t in threads:
t.join()
-
-
def writerThread(self, d, howMany, writerNum):
name = currentThread().getName()
- start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1
+ start = howMany * writerNum
+ stop = howMany * (writerNum + 1) - 1
if verbose:
print "%s: creating records %d - %d" % (name, start, stop)
@@ -227,7 +229,7 @@ class SimpleThreadedBase(BaseThreadedTestCase):
for y in xrange(start, x):
key = '%04d' % x
data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
- assert data == self.makeData(key)
+ self.assertEqual(data, self.makeData(key))
# flush them
try:
@@ -242,14 +244,14 @@ class SimpleThreadedBase(BaseThreadedTestCase):
data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
if verbose and x % 100 == 0:
print "%s: fetched record (%s, %s)" % (name, key, data)
- assert data == self.makeData(key), (key, data, self.makeData(key))
+ self.assertEqual(data, self.makeData(key))
if random() <= 0.10:
dbutils.DeadlockWrap(d.delete, key, max_retries=12)
if verbose:
print "%s: deleted record %s" % (name, key)
- if verbose: print "%s: thread finished" % name
-
+ if verbose:
+ print "%s: thread finished" % name
def readerThread(self, d, readerNum):
time.sleep(0.01 * readerNum)
@@ -260,17 +262,17 @@ class SimpleThreadedBase(BaseThreadedTestCase):
count = 0
rec = c.first()
while rec:
- count = count + 1
+ count += 1
key, data = rec
- assert self.makeData(key) == data
+ self.assertEqual(self.makeData(key), data)
rec = c.next()
- if verbose: print "%s: found %d records" % (name, count)
+ if verbose:
+ print "%s: found %d records" % (name, count)
c.close()
time.sleep(0.05)
- if verbose: print "%s: thread finished" % name
-
-
+ if verbose:
+ print "%s: thread finished" % name
class BTreeSimpleThreaded(SimpleThreadedBase):
@@ -284,7 +286,6 @@ class HashSimpleThreaded(SimpleThreadedBase):
#----------------------------------------------------------------------
-
class ThreadedTransactionsBase(BaseThreadedTestCase):
dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
envflags = (db.DB_THREAD |
@@ -296,15 +297,12 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
readers = 0
writers = 0
records = 2000
-
txnFlag = 0
-
def setEnvOpts(self):
#self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
pass
-
def test03_ThreadedTransactions(self):
if verbose:
print '\n', '-=' * 30
@@ -334,12 +332,11 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
for t in threads:
t.join()
- self.doLockDetect = 0
+ self.doLockDetect = False
dt.join()
-
def doWrite(self, d, name, start, stop):
- finished = 0
+ finished = False
while not finished:
try:
txn = self.env.txn_begin(None, self.txnFlag)
@@ -349,18 +346,17 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
if verbose and x % 100 == 0:
print "%s: records %d - %d finished" % (name, start, x)
txn.commit()
- finished = 1
+ finished = True
except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
if verbose:
print "%s: Aborting transaction (%s)" % (name, val[1])
txn.abort()
time.sleep(0.05)
-
-
def writerThread(self, d, howMany, writerNum):
name = currentThread().getName()
- start, stop = howMany * writerNum, howMany * (writerNum + 1) - 1
+ start = howMany * writerNum
+ stop = howMany * (writerNum + 1) - 1
if verbose:
print "%s: creating records %d - %d" % (name, start, stop)
@@ -368,10 +364,12 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
for x in range(start, stop, step):
self.doWrite(d, name, x, min(stop, x+step))
- if verbose: print "%s: finished creating records" % name
- if verbose: print "%s: deleting a few records" % name
+ if verbose:
+ print "%s: finished creating records" % name
+ if verbose:
+ print "%s: deleting a few records" % name
- finished = 0
+ finished = False
while not finished:
try:
recs = []
@@ -384,23 +382,24 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
d.delete(key, txn)
recs.append(key)
txn.commit()
- finished = 1
- if verbose: print "%s: deleted records %s" % (name, recs)
+ finished = True
+ if verbose:
+ print "%s: deleted records %s" % (name, recs)
except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
if verbose:
print "%s: Aborting transaction (%s)" % (name, val[1])
txn.abort()
time.sleep(0.05)
- if verbose: print "%s: thread finished" % name
-
+ if verbose:
+ print "%s: thread finished" % name
def readerThread(self, d, readerNum):
time.sleep(0.01 * readerNum + 0.05)
name = currentThread().getName()
for loop in range(5):
- finished = 0
+ finished = False
while not finished:
try:
txn = self.env.txn_begin(None, self.txnFlag)
@@ -408,14 +407,14 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
count = 0
rec = c.first()
while rec:
- count = count + 1
+ count += 1
key, data = rec
- assert self.makeData(key) == data
+ self.assertEqual(self.makeData(key), data)
rec = c.next()
if verbose: print "%s: found %d records" % (name, count)
c.close()
txn.commit()
- finished = 1
+ finished = True
except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
if verbose:
print "%s: Aborting transaction (%s)" % (name, val[1])
@@ -425,11 +424,11 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
time.sleep(0.05)
- if verbose: print "%s: thread finished" % name
-
+ if verbose:
+ print "%s: thread finished" % name
def deadlockThread(self):
- self.doLockDetect = 1
+ self.doLockDetect = True
while self.doLockDetect:
time.sleep(0.5)
try:
@@ -442,7 +441,6 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
pass
-
class BTreeThreadedTransactions(ThreadedTransactionsBase):
dbtype = db.DB_BTREE
writers = 3