summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb/test
diff options
context:
space:
mode:
authorJesus Cea <jcea@jcea.es>2008-07-23 11:38:42 (GMT)
committerJesus Cea <jcea@jcea.es>2008-07-23 11:38:42 (GMT)
commitc5a11fabdb03d032f278f86e600bcdaafdb5d783 (patch)
treee7db234112c7ac06aebe9c1e7833b0dd4168c664 /Lib/bsddb/test
parent30e208d525cd472446feb91b774cf472e7122918 (diff)
downloadcpython-c5a11fabdb03d032f278f86e600bcdaafdb5d783.zip
cpython-c5a11fabdb03d032f278f86e600bcdaafdb5d783.tar.gz
cpython-c5a11fabdb03d032f278f86e600bcdaafdb5d783.tar.bz2
bsddb module updated to version 4.7.2devel9.
This patch publishes the work done until now for Python 3.0 compatibility. Still a lot to be done. When possible, we use 3.0 features in Python 2.6, easing development and testing, and exposing internal changes to a wider audience, for better test coverage. Some mode details: http://www.jcea.es/programacion/pybsddb.htm#bsddb3-4.7.2
Diffstat (limited to 'Lib/bsddb/test')
-rw-r--r--Lib/bsddb/test/test_all.py23
-rw-r--r--Lib/bsddb/test/test_associate.py35
-rw-r--r--Lib/bsddb/test/test_basics.py75
-rw-r--r--Lib/bsddb/test/test_compare.py5
-rw-r--r--Lib/bsddb/test/test_lock.py23
-rw-r--r--Lib/bsddb/test/test_misc.py18
-rw-r--r--Lib/bsddb/test/test_replication.py262
-rw-r--r--Lib/bsddb/test/test_thread.py35
8 files changed, 378 insertions, 98 deletions
diff --git a/Lib/bsddb/test/test_all.py b/Lib/bsddb/test/test_all.py
index 11043a2..7ad9a6f 100644
--- a/Lib/bsddb/test/test_all.py
+++ b/Lib/bsddb/test/test_all.py
@@ -7,15 +7,24 @@ import unittest
try:
# For Pythons w/distutils pybsddb
from bsddb3 import db
+ import bsddb3 as bsddb
except ImportError:
# For Python 2.3
from bsddb import db
+ import bsddb
try:
from bsddb3 import test_support
except ImportError:
from test import test_support
+try:
+ from threading import Thread, currentThread
+ del Thread, currentThread
+ have_threads = True
+except ImportError:
+ have_threads = False
+
verbose = 0
if 'verbose' in sys.argv:
verbose = 1
@@ -33,6 +42,8 @@ def print_versions():
print 'bsddb.db.version(): %s' % (db.version(), )
print 'bsddb.db.__version__: %s' % db.__version__
print 'bsddb.db.cvsid: %s' % db.cvsid
+ print 'py module: %s' % bsddb.__file__
+ print 'extension module: %s' % bsddb._bsddb.__file__
print 'python version: %s' % sys.version
print 'My pid: %s' % os.getpid()
print '-=' * 38
@@ -81,11 +92,11 @@ def set_test_path_prefix(path) :
def remove_test_path_directory() :
test_support.rmtree(get_new_path.prefix)
-try :
+if have_threads :
import threading
get_new_path.mutex=threading.Lock()
del threading
-except ImportError:
+else :
class Lock(object) :
def acquire(self) :
pass
@@ -104,8 +115,12 @@ class PrintInfoFakeTest(unittest.TestCase):
# This little hack is for when this module is run as main and all the
# other modules import it so they will still be able to get the right
# verbose setting. It's confusing but it works.
-import test_all
-test_all.verbose = verbose
+if sys.version_info[0] < 3 :
+ import test_all
+ test_all.verbose = verbose
+else :
+ import sys
+ print >>sys.stderr, "Work to do!"
def suite(module_prefix='', timing_check=None):
diff --git a/Lib/bsddb/test/test_associate.py b/Lib/bsddb/test/test_associate.py
index 10907e3..935f10a 100644
--- a/Lib/bsddb/test/test_associate.py
+++ b/Lib/bsddb/test/test_associate.py
@@ -6,14 +6,8 @@ import sys, os, string
import time
from pprint import pprint
-try:
- from threading import Thread, currentThread
- have_threads = 1
-except ImportError:
- have_threads = 0
-
import unittest
-from test_all import verbose, get_new_environment_path
+from test_all import verbose, have_threads, get_new_environment_path
try:
# For Pythons w/distutils pybsddb
@@ -435,24 +429,23 @@ class ThreadedAssociateRecnoTestCase(ShelveAssociateTestCase):
def test_suite():
suite = unittest.TestSuite()
- if db.version() >= (3, 3, 11):
- suite.addTest(unittest.makeSuite(AssociateErrorTestCase))
+ suite.addTest(unittest.makeSuite(AssociateErrorTestCase))
- suite.addTest(unittest.makeSuite(AssociateHashTestCase))
- suite.addTest(unittest.makeSuite(AssociateBTreeTestCase))
- suite.addTest(unittest.makeSuite(AssociateRecnoTestCase))
+ suite.addTest(unittest.makeSuite(AssociateHashTestCase))
+ suite.addTest(unittest.makeSuite(AssociateBTreeTestCase))
+ suite.addTest(unittest.makeSuite(AssociateRecnoTestCase))
- if db.version() >= (4, 1):
- suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase))
+ if db.version() >= (4, 1):
+ suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase))
- suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase))
- suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase))
- suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase))
+ suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase))
+ suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase))
+ suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase))
- if have_threads:
- suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase))
- suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase))
- suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase))
+ if have_threads:
+ suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase))
+ suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase))
+ suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase))
return suite
diff --git a/Lib/bsddb/test/test_basics.py b/Lib/bsddb/test/test_basics.py
index a234d2a..285e3a8 100644
--- a/Lib/bsddb/test/test_basics.py
+++ b/Lib/bsddb/test/test_basics.py
@@ -101,14 +101,14 @@ class BasicTestCase(unittest.TestCase):
def populateDB(self, _txn=None):
d = self.d
- for x in range(self._numKeys/2):
+ for x in range(self._numKeys//2):
key = '%04d' % (self._numKeys - x) # insert keys in reverse order
data = self.makeData(key)
d.put(key, data, _txn)
d.put('empty value', '', _txn)
- for x in range(self._numKeys/2-1):
+ for x in range(self._numKeys//2-1):
key = '%04d' % x # and now some in forward order
data = self.makeData(key)
d.put(key, data, _txn)
@@ -536,10 +536,6 @@ class BasicTestCase(unittest.TestCase):
#----------------------------------------
def test06_Truncate(self):
- if db.version() < (3,3):
- # truncate is a feature of Berkeley DB 3.3 and above
- return
-
d = self.d
if verbose:
print '\n', '-=' * 30
@@ -681,12 +677,11 @@ class BasicTransactionTestCase(BasicTestCase):
except db.DBIncompleteError:
pass
- if db.version() >= (4,0):
- statDict = self.env.log_stat(0);
- self.assert_(statDict.has_key('magic'))
- self.assert_(statDict.has_key('version'))
- self.assert_(statDict.has_key('cur_file'))
- self.assert_(statDict.has_key('region_nowait'))
+ statDict = self.env.log_stat(0);
+ self.assert_(statDict.has_key('magic'))
+ self.assert_(statDict.has_key('version'))
+ self.assert_(statDict.has_key('cur_file'))
+ self.assert_(statDict.has_key('region_nowait'))
# must have at least one log file present:
logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG)
@@ -703,10 +698,6 @@ class BasicTransactionTestCase(BasicTestCase):
#----------------------------------------
def test07_TxnTruncate(self):
- if db.version() < (3,3):
- # truncate is a feature of Berkeley DB 3.3 and above
- return
-
d = self.d
if verbose:
print '\n', '-=' * 30
@@ -956,6 +947,55 @@ class HashMultiDBTestCase(BasicMultiDBTestCase):
envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
+class PrivateObject(unittest.TestCase) :
+ import sys
+ if sys.version_info[:3] < (2, 4, 0):
+ def assertTrue(self, expr, msg=None):
+ self.failUnless(expr,msg=msg)
+
+ def tearDown(self) :
+ del self.obj
+
+ def test01_DefaultIsNone(self) :
+ self.assertEqual(self.obj.get_private(), None)
+
+ def test02_assignment(self) :
+ a = "example of private object"
+ self.obj.set_private(a)
+ b = self.obj.get_private()
+ self.assertTrue(a is b) # Object identity
+
+ def test03_leak_assignment(self) :
+ import sys
+ a = "example of private object"
+ refcount = sys.getrefcount(a)
+ self.obj.set_private(a)
+ self.assertEqual(refcount+1, sys.getrefcount(a))
+ self.obj.set_private(None)
+ self.assertEqual(refcount, sys.getrefcount(a))
+
+ def test04_leak_GC(self) :
+ import sys
+ a = "example of private object"
+ refcount = sys.getrefcount(a)
+ self.obj.set_private(a)
+ self.obj = None
+ self.assertEqual(refcount, sys.getrefcount(a))
+
+class DBEnvPrivateObject(PrivateObject) :
+ def setUp(self) :
+ self.obj = db.DBEnv()
+
+class DBPrivateObject(PrivateObject) :
+ def setUp(self) :
+ self.obj = db.DB()
+
+class CrashAndBurn(unittest.TestCase) :
+ def test01_OpenCrash(self) :
+ # See http://bugs.python.org/issue3307
+ self.assertRaises(db.DBInvalidArgError, db.DB, None, 65535)
+
+
#----------------------------------------------------------------------
#----------------------------------------------------------------------
@@ -979,6 +1019,9 @@ def test_suite():
suite.addTest(unittest.makeSuite(HashDUPWithThreadTestCase))
suite.addTest(unittest.makeSuite(BTreeMultiDBTestCase))
suite.addTest(unittest.makeSuite(HashMultiDBTestCase))
+ suite.addTest(unittest.makeSuite(DBEnvPrivateObject))
+ suite.addTest(unittest.makeSuite(DBPrivateObject))
+ #suite.addTest(unittest.makeSuite(CrashAndBurn))
return suite
diff --git a/Lib/bsddb/test/test_compare.py b/Lib/bsddb/test/test_compare.py
index 940e56e..e221182 100644
--- a/Lib/bsddb/test/test_compare.py
+++ b/Lib/bsddb/test/test_compare.py
@@ -240,9 +240,8 @@ def test_suite ():
res = unittest.TestSuite ()
res.addTest (unittest.makeSuite (ComparatorTests))
- if db.version () >= (3, 3, 11):
- res.addTest (unittest.makeSuite (BtreeExceptionsTestCase))
- res.addTest (unittest.makeSuite (BtreeKeyCompareTestCase))
+ res.addTest (unittest.makeSuite (BtreeExceptionsTestCase))
+ res.addTest (unittest.makeSuite (BtreeKeyCompareTestCase))
return res
if __name__ == '__main__':
diff --git a/Lib/bsddb/test/test_lock.py b/Lib/bsddb/test/test_lock.py
index 9b9ce7b..64a054c 100644
--- a/Lib/bsddb/test/test_lock.py
+++ b/Lib/bsddb/test/test_lock.py
@@ -4,15 +4,11 @@ TestCases for testing the locking sub-system.
import time
-try:
- from threading import Thread, currentThread
- have_threads = 1
-except ImportError:
- have_threads = 0
-
-
import unittest
-from test_all import verbose, get_new_environment_path, get_new_database_path
+from test_all import verbose, have_threads, get_new_environment_path, get_new_database_path
+
+if have_threads :
+ from threading import Thread, currentThread
try:
# For Pythons w/distutils pybsddb
@@ -62,8 +58,7 @@ class LockingTestCase(unittest.TestCase):
self.env.lock_put(lock)
if verbose:
print "Released lock: %s" % lock
- if db.version() >= (4,0):
- self.env.lock_id_free(anID)
+ self.env.lock_id_free(anID)
def test02_threaded(self):
@@ -132,9 +127,8 @@ class LockingTestCase(unittest.TestCase):
self.env.lock_put(lock)
t.join()
- if db.version() >= (4,0):
- self.env.lock_id_free(anID)
- self.env.lock_id_free(anID2)
+ self.env.lock_id_free(anID)
+ self.env.lock_id_free(anID2)
if db.version() >= (4,6):
self.assertTrue(deadlock_detection.count>0)
@@ -159,8 +153,7 @@ class LockingTestCase(unittest.TestCase):
if verbose:
print "%s: Released %s lock: %s" % (name, lt, lock)
- if db.version() >= (4,0):
- self.env.lock_id_free(anID)
+ self.env.lock_id_free(anID)
#----------------------------------------------------------------------
diff --git a/Lib/bsddb/test/test_misc.py b/Lib/bsddb/test/test_misc.py
index 1da5830..dce7e0c 100644
--- a/Lib/bsddb/test/test_misc.py
+++ b/Lib/bsddb/test/test_misc.py
@@ -47,6 +47,18 @@ class MiscTestCase(unittest.TestCase):
rp = repr(db)
self.assertEquals(rp, "{}")
+ def test04_repr_db(self) :
+ db = hashopen(self.filename)
+ d = {}
+ for i in xrange(100) :
+ db[repr(i)] = repr(100*i)
+ d[repr(i)] = repr(100*i)
+ db.close()
+ db = hashopen(self.filename)
+ rp = repr(db)
+ self.assertEquals(rp, repr(d))
+ db.close()
+
# http://sourceforge.net/tracker/index.php?func=detail&aid=1708868&group_id=13900&atid=313900
#
# See the bug report for details.
@@ -54,7 +66,7 @@ class MiscTestCase(unittest.TestCase):
# The problem was that make_key_dbt() was not allocating a copy of
# string keys but FREE_DBT() was always being told to free it when the
# database was opened with DB_THREAD.
- def test04_double_free_make_key_dbt(self):
+ def test05_double_free_make_key_dbt(self):
try:
db1 = db.DB()
db1.open(self.filename, None, db.DB_BTREE,
@@ -67,7 +79,7 @@ class MiscTestCase(unittest.TestCase):
db1.close()
os.unlink(self.filename)
- def test05_key_with_null_bytes(self):
+ def test06_key_with_null_bytes(self):
try:
db1 = db.DB()
db1.open(self.filename, None, db.DB_HASH, db.DB_CREATE)
@@ -86,7 +98,7 @@ class MiscTestCase(unittest.TestCase):
db1.close()
os.unlink(self.filename)
- def test_DB_set_flags_persists(self):
+ def test07_DB_set_flags_persists(self):
if db.version() < (4,2):
# The get_flags API required for this to work is only available
# in Berkeley DB >= 4.2
diff --git a/Lib/bsddb/test/test_replication.py b/Lib/bsddb/test/test_replication.py
index 25ca34a..8f2651f 100644
--- a/Lib/bsddb/test/test_replication.py
+++ b/Lib/bsddb/test/test_replication.py
@@ -12,7 +12,7 @@ except ImportError:
# For Python 2.3
from bsddb import db
-from test_all import get_new_environment_path, get_new_database_path
+from test_all import have_threads, get_new_environment_path, get_new_database_path
try:
from bsddb3 import test_support
@@ -58,6 +58,25 @@ class DBReplicationManager(unittest.TestCase):
self.dbenvMaster.set_event_notify(confirmed_master)
self.dbenvClient.set_event_notify(client_startupdone)
+ #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
+ #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
+ #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
+ #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
+
+ self.dbMaster = self.dbClient = None
+
+
+ def tearDown(self):
+ if self.dbClient :
+ self.dbClient.close()
+ if self.dbMaster :
+ self.dbMaster.close()
+ self.dbenvClient.close()
+ self.dbenvMaster.close()
+ test_support.rmtree(self.homeDirClient)
+ test_support.rmtree(self.homeDirMaster)
+
+ def test01_basic_replication(self) :
master_port = test_support.find_unused_port()
self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port)
client_port = test_support.find_unused_port()
@@ -69,6 +88,27 @@ class DBReplicationManager(unittest.TestCase):
self.dbenvMaster.rep_set_priority(10)
self.dbenvClient.rep_set_priority(0)
+ self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123)
+ self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321)
+ self.assertEquals(self.dbenvMaster.rep_get_timeout(
+ db.DB_REP_CONNECTION_RETRY), 100123)
+ self.assertEquals(self.dbenvClient.rep_get_timeout(
+ db.DB_REP_CONNECTION_RETRY), 100321)
+
+ self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234)
+ self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432)
+ self.assertEquals(self.dbenvMaster.rep_get_timeout(
+ db.DB_REP_ELECTION_TIMEOUT), 100234)
+ self.assertEquals(self.dbenvClient.rep_get_timeout(
+ db.DB_REP_ELECTION_TIMEOUT), 100432)
+
+ self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345)
+ self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543)
+ self.assertEquals(self.dbenvMaster.rep_get_timeout(
+ db.DB_REP_ELECTION_RETRY), 100345)
+ self.assertEquals(self.dbenvClient.rep_get_timeout(
+ db.DB_REP_ELECTION_RETRY), 100543)
+
self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
@@ -84,23 +124,14 @@ class DBReplicationManager(unittest.TestCase):
self.assertEquals(self.dbenvClient.repmgr_get_ack_policy(),
db.DB_REPMGR_ACKS_ALL)
- #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
- #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
- #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
- #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
-
- self.dbMaster = self.dbClient = None
-
# The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
# is not generated if the master has no new transactions.
# This is solved in BDB 4.6 (#15542).
- timeout = time.time()+10
+ import time
+ timeout = time.time()+2
while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) :
time.sleep(0.02)
- if db.version() >= (4,6) :
- self.assertTrue(time.time()<timeout)
- else :
- self.assertTrue(time.time()>=timeout)
+ self.assertTrue(time.time()<timeout)
d = self.dbenvMaster.repmgr_site_list()
self.assertEquals(len(d), 1)
@@ -120,17 +151,158 @@ class DBReplicationManager(unittest.TestCase):
d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR);
self.assertTrue("msgs_queued" in d)
+ self.dbMaster=db.DB(self.dbenvMaster)
+ txn=self.dbenvMaster.txn_begin()
+ self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
+ txn.commit()
+
+ import time,os.path
+ timeout=time.time()+10
+ while (time.time()<timeout) and \
+ not (os.path.exists(os.path.join(self.homeDirClient,"test"))) :
+ time.sleep(0.01)
+
+ self.dbClient=db.DB(self.dbenvClient)
+ while True :
+ txn=self.dbenvClient.txn_begin()
+ try :
+ self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY,
+ mode=0666, txn=txn)
+ except db.DBRepHandleDeadError :
+ txn.abort()
+ self.dbClient.close()
+ self.dbClient=db.DB(self.dbenvClient)
+ continue
+
+ txn.commit()
+ break
+
+ txn=self.dbenvMaster.txn_begin()
+ self.dbMaster.put("ABC", "123", txn=txn)
+ txn.commit()
+ import time
+ timeout=time.time()+1
+ v=None
+ while (time.time()<timeout) and (v==None) :
+ txn=self.dbenvClient.txn_begin()
+ v=self.dbClient.get("ABC", txn=txn)
+ txn.commit()
+ self.assertEquals("123", v)
+
+ txn=self.dbenvMaster.txn_begin()
+ self.dbMaster.delete("ABC", txn=txn)
+ txn.commit()
+ timeout=time.time()+1
+ while (time.time()<timeout) and (v!=None) :
+ txn=self.dbenvClient.txn_begin()
+ v=self.dbClient.get("ABC", txn=txn)
+ txn.commit()
+ self.assertEquals(None, v)
+
+class DBBaseReplication(DBReplicationManager):
+ def setUp(self) :
+ DBReplicationManager.setUp(self)
+ def confirmed_master(a,b,c) :
+ if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) :
+ self.confirmed_master = True
+
+ def client_startupdone(a,b,c) :
+ if b == db.DB_EVENT_REP_STARTUPDONE :
+ self.client_startupdone = True
+
+ self.dbenvMaster.set_event_notify(confirmed_master)
+ self.dbenvClient.set_event_notify(client_startupdone)
+
+ import Queue
+ self.m2c = Queue.Queue()
+ self.c2m = Queue.Queue()
+
+ # There are only two nodes, so we don't need to
+ # do any routing decision
+ def m2c(dbenv, control, rec, lsnp, envid, flags) :
+ self.m2c.put((control, rec))
+
+ def c2m(dbenv, control, rec, lsnp, envid, flags) :
+ self.c2m.put((control, rec))
+
+ self.dbenvMaster.rep_set_transport(13,m2c)
+ self.dbenvMaster.rep_set_priority(10)
+ self.dbenvClient.rep_set_transport(3,c2m)
+ self.dbenvClient.rep_set_priority(0)
+
+ self.assertEquals(self.dbenvMaster.rep_get_priority(),10)
+ self.assertEquals(self.dbenvClient.rep_get_priority(),0)
+
+ #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True)
+ #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
+ #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True)
+ #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True)
+
+ def thread_master() :
+ return self.thread_do(self.dbenvMaster, self.c2m, 3,
+ self.master_doing_election, True)
+
+ def thread_client() :
+ return self.thread_do(self.dbenvClient, self.m2c, 13,
+ self.client_doing_election, False)
+
+ from threading import Thread
+ t_m=Thread(target=thread_master)
+ t_m.setDaemon(True)
+ t_c=Thread(target=thread_client)
+ t_c.setDaemon(True)
+
+ self.t_m = t_m
+ self.t_c = t_c
+
+ self.dbMaster = self.dbClient = None
+
+ self.master_doing_election=[False]
+ self.client_doing_election=[False]
+
+
def tearDown(self):
if self.dbClient :
self.dbClient.close()
if self.dbMaster :
self.dbMaster.close()
+ self.m2c.put(None)
+ self.c2m.put(None)
+ self.t_m.join()
+ self.t_c.join()
self.dbenvClient.close()
self.dbenvMaster.close()
test_support.rmtree(self.homeDirClient)
test_support.rmtree(self.homeDirMaster)
+ def basic_rep_threading(self) :
+ self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
+ self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
+
+ def thread_do(env, q, envid, election_status, must_be_master) :
+ while True :
+ v=q.get()
+ if v == None : return
+ env.rep_process_message(v[0], v[1], envid)
+
+ self.thread_do = thread_do
+
+ self.t_m.start()
+ self.t_c.start()
+
def test01_basic_replication(self) :
+ self.basic_rep_threading()
+
+ # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE
+ # is not generated if the master has no new transactions.
+ # This is solved in BDB 4.6 (#15542).
+ import time
+ timeout = time.time()+2
+ while (time.time()<timeout) and not (self.confirmed_master and
+ self.client_startupdone) :
+ time.sleep(0.02)
+ self.assertTrue(time.time()<timeout)
+
self.dbMaster=db.DB(self.dbenvMaster)
txn=self.dbenvMaster.txn_begin()
self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn)
@@ -179,11 +351,69 @@ class DBReplicationManager(unittest.TestCase):
txn.commit()
self.assertEquals(None, v)
+ if db.version() >= (4,7) :
+ def test02_test_request(self) :
+ self.basic_rep_threading()
+ (minimum, maximum) = self.dbenvClient.rep_get_request()
+ self.dbenvClient.rep_set_request(minimum-1, maximum+1)
+ self.assertEqual(self.dbenvClient.rep_get_request(),
+ (minimum-1, maximum+1))
+
+ if db.version() >= (4,6) :
+ def test03_master_election(self) :
+ # Get ready to hold an election
+ #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER)
+ self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT)
+ self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT)
+
+ def thread_do(env, q, envid, election_status, must_be_master) :
+ while True :
+ v=q.get()
+ if v == None : return
+ r = env.rep_process_message(v[0],v[1],envid)
+ if must_be_master and self.confirmed_master :
+ self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER)
+ must_be_master = False
+
+ if r[0] == db.DB_REP_HOLDELECTION :
+ def elect() :
+ while True :
+ try :
+ env.rep_elect(2, 1)
+ election_status[0] = False
+ break
+ except db.DBRepUnavailError :
+ pass
+ if not election_status[0] and not self.confirmed_master :
+ from threading import Thread
+ election_status[0] = True
+ t=Thread(target=elect)
+ t.setDaemon(True)
+ t.start()
+
+ self.thread_do = thread_do
+
+ self.t_m.start()
+ self.t_c.start()
+
+ self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
+ self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000)
+ self.client_doing_election[0] = True
+ while True :
+ try :
+ self.dbenvClient.rep_elect(2, 1)
+ self.client_doing_election[0] = False
+ break
+ except db.DBRepUnavailError :
+ pass
+
+ self.assertTrue(self.confirmed_master)
+
#----------------------------------------------------------------------
def test_suite():
suite = unittest.TestSuite()
- if db.version() >= (4,5) :
+ if db.version() >= (4, 6) :
dbenv = db.DBEnv()
try :
dbenv.repmgr_get_ack_policy()
@@ -194,6 +424,10 @@ def test_suite():
del dbenv
if ReplicationManager_available :
suite.addTest(unittest.makeSuite(DBReplicationManager))
+
+ if have_threads :
+ suite.addTest(unittest.makeSuite(DBBaseReplication))
+
return suite
diff --git a/Lib/bsddb/test/test_thread.py b/Lib/bsddb/test/test_thread.py
index 5ac98ce..bdeaf93 100644
--- a/Lib/bsddb/test/test_thread.py
+++ b/Lib/bsddb/test/test_thread.py
@@ -7,28 +7,19 @@ import time
import errno
from random import random
-try:
- True, False
-except NameError:
- True = 1
- False = 0
-
DASH = '-'
try:
- from threading import Thread, currentThread
- have_threads = True
-except ImportError:
- have_threads = False
-
-try:
WindowsError
except NameError:
class WindowsError(Exception):
pass
import unittest
-from test_all import verbose, get_new_environment_path, get_new_database_path
+from test_all import verbose, have_threads, get_new_environment_path, get_new_database_path
+
+if have_threads :
+ from threading import Thread, currentThread
try:
@@ -103,8 +94,8 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
keys=range(self.records)
import random
random.shuffle(keys)
- records_per_writer=self.records/self.writers
- readers_per_writer=self.readers/self.writers
+ records_per_writer=self.records//self.writers
+ readers_per_writer=self.readers//self.writers
self.assertEqual(self.records,self.writers*records_per_writer)
self.assertEqual(self.readers,self.writers*readers_per_writer)
self.assertTrue((records_per_writer%readers_per_writer)==0)
@@ -143,7 +134,7 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase):
if verbose:
print "%s: creating records %d - %d" % (name, start, stop)
- count=len(keys)/len(readers)
+ count=len(keys)//len(readers)
count2=count
for x in keys :
key = '%04d' % x
@@ -218,8 +209,8 @@ class SimpleThreadedBase(BaseThreadedTestCase):
keys=range(self.records)
import random
random.shuffle(keys)
- records_per_writer=self.records/self.writers
- readers_per_writer=self.readers/self.writers
+ records_per_writer=self.records//self.writers
+ readers_per_writer=self.readers//self.writers
self.assertEqual(self.records,self.writers*records_per_writer)
self.assertEqual(self.readers,self.writers*readers_per_writer)
self.assertTrue((records_per_writer%readers_per_writer)==0)
@@ -258,7 +249,7 @@ class SimpleThreadedBase(BaseThreadedTestCase):
if verbose:
print "%s: creating records %d - %d" % (name, start, stop)
- count=len(keys)/len(readers)
+ count=len(keys)//len(readers)
count2=count
for x in keys :
key = '%04d' % x
@@ -332,8 +323,8 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
keys=range(self.records)
import random
random.shuffle(keys)
- records_per_writer=self.records/self.writers
- readers_per_writer=self.readers/self.writers
+ records_per_writer=self.records//self.writers
+ readers_per_writer=self.readers//self.writers
self.assertEqual(self.records,self.writers*records_per_writer)
self.assertEqual(self.readers,self.writers*readers_per_writer)
self.assertTrue((records_per_writer%readers_per_writer)==0)
@@ -375,7 +366,7 @@ class ThreadedTransactionsBase(BaseThreadedTestCase):
def writerThread(self, d, keys, readers):
name = currentThread().getName()
- count=len(keys)/len(readers)
+ count=len(keys)//len(readers)
while len(keys):
try:
txn = self.env.txn_begin(None, self.txnFlag)