diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/bsddb/__init__.py | 42 | ||||
-rw-r--r-- | Lib/bsddb/db.py | 31 | ||||
-rw-r--r-- | Lib/bsddb/dbobj.py | 34 | ||||
-rw-r--r-- | Lib/bsddb/dbshelve.py | 33 | ||||
-rw-r--r-- | Lib/bsddb/dbtables.py | 7 | ||||
-rw-r--r-- | Lib/bsddb/dbutils.py | 8 | ||||
-rw-r--r-- | Lib/bsddb/test/test_all.py | 23 | ||||
-rw-r--r-- | Lib/bsddb/test/test_associate.py | 35 | ||||
-rw-r--r-- | Lib/bsddb/test/test_basics.py | 75 | ||||
-rw-r--r-- | Lib/bsddb/test/test_compare.py | 5 | ||||
-rw-r--r-- | Lib/bsddb/test/test_lock.py | 23 | ||||
-rw-r--r-- | Lib/bsddb/test/test_misc.py | 18 | ||||
-rw-r--r-- | Lib/bsddb/test/test_replication.py | 262 | ||||
-rw-r--r-- | Lib/bsddb/test/test_thread.py | 35 |
14 files changed, 492 insertions, 139 deletions
diff --git a/Lib/bsddb/__init__.py b/Lib/bsddb/__init__.py index 70b9ccf..0b8f6d9 100644 --- a/Lib/bsddb/__init__.py +++ b/Lib/bsddb/__init__.py @@ -33,18 +33,25 @@ #---------------------------------------------------------------------- -"""Support for Berkeley DB 4.x with a simple interface. +"""Support for Berkeley DB 4.0 through 4.7 with a simple interface. For the full featured object oriented interface use the bsddb.db module instead. It mirrors the Oracle Berkeley DB C API. """ +import sys +absolute_import = (sys.version_info[0] >= 3) + try: if __name__ == 'bsddb3': # import _pybsddb binary as it should be the more recent version from # a standalone pybsddb addon package than the version included with # python as bsddb._bsddb. - import _pybsddb + if absolute_import : + # Because this syntaxis is not valid before Python 2.5 + exec("from . import _pybsddb") + else : + import _pybsddb _bsddb = _pybsddb from bsddb3.dbutils import DeadlockWrap as _DeadlockWrap else: @@ -66,9 +73,16 @@ error = db.DBError # So bsddb.error will mean something... import sys, os -import UserDict from weakref import ref -class _iter_mixin(UserDict.DictMixin): + +if sys.version_info[0:2] <= (2, 5) : + import UserDict + MutableMapping = UserDict.DictMixin +else : + import collections + MutableMapping = collections.MutableMapping + +class _iter_mixin(MutableMapping): def _make_iter_cursor(self): cur = _DeadlockWrap(self.db.cursor) key = id(cur) @@ -115,8 +129,12 @@ class _iter_mixin(UserDict.DictMixin): except _bsddb.DBCursorClosedError: # the database was modified during iteration. abort. pass - finally: +# When Python 2.3 not supported in bsddb3, we can change this to "finally" + except : self._in_iter -= 1 + raise + + self._in_iter -= 1 def iteritems(self): if not self.db: @@ -154,8 +172,12 @@ class _iter_mixin(UserDict.DictMixin): except _bsddb.DBCursorClosedError: # the database was modified during iteration. abort. pass - finally: +# When Python 2.3 not supported in bsddb3, we can change this to "finally" + except : self._in_iter -= 1 + raise + + self._in_iter -= 1 class _DBWithCursor(_iter_mixin): @@ -228,6 +250,12 @@ class _DBWithCursor(_iter_mixin): self._checkOpen() return _DeadlockWrap(lambda: len(self.db)) # len(self.db) + if sys.version_info[0:2] >= (2, 6) : + def __repr__(self) : + if self.isOpen() : + return repr(dict(_DeadlockWrap(self.db.items))) + return repr(dict()) + def __getitem__(self, key): self._checkOpen() return _DeadlockWrap(lambda: self.db[key]) # self.db[key] @@ -407,8 +435,6 @@ def _checkflag(flag, file): try: import thread del thread - if db.version() < (3, 3, 0): - db.DB_THREAD = 0 except ImportError: db.DB_THREAD = 0 diff --git a/Lib/bsddb/db.py b/Lib/bsddb/db.py index 57bb46e..c3aee30 100644 --- a/Lib/bsddb/db.py +++ b/Lib/bsddb/db.py @@ -37,15 +37,24 @@ # case we ever want to augment the stuff in _db in any way. For now # it just simply imports everything from _db. -if __name__.startswith('bsddb3.'): - # import _pybsddb binary as it should be the more recent version from - # a standalone pybsddb addon package than the version included with - # python as bsddb._bsddb. - from _pybsddb import * - from _pybsddb import __version__ -else: - from _bsddb import * - from _bsddb import __version__ +import sys +absolute_import = (sys.version_info[0] >= 3) -if version() < (3, 2, 0): - raise ImportError, "correct Berkeley DB symbols not found. Perhaps python was statically linked with an older version?" +if not absolute_import : + if __name__.startswith('bsddb3.') : + # import _pybsddb binary as it should be the more recent version from + # a standalone pybsddb addon package than the version included with + # python as bsddb._bsddb. + from _pybsddb import * + from _pybsddb import __version__ + else: + from _bsddb import * + from _bsddb import __version__ +else : + # Because this syntaxis is not valid before Python 2.5 + if __name__.startswith('bsddb3.') : + exec("from ._pybsddb import *") + exec("from ._pybsddb import __version__") + else : + exec("from ._bsddb import *") + exec("from ._bsddb import __version__") diff --git a/Lib/bsddb/dbobj.py b/Lib/bsddb/dbobj.py index b74ee72..f6b1d7f 100644 --- a/Lib/bsddb/dbobj.py +++ b/Lib/bsddb/dbobj.py @@ -21,13 +21,24 @@ # added to _bsddb.c. # -import db +import sys +absolute_import = (sys.version_info[0] >= 3) +if absolute_import : + # Because this syntaxis is not valid before Python 2.5 + exec("from . import db") +else : + import db -try: - from UserDict import DictMixin -except ImportError: - # DictMixin is new in Python 2.3 - class DictMixin: pass +if sys.version_info[0:2] <= (2, 5) : + try: + from UserDict import DictMixin + except ImportError: + # DictMixin is new in Python 2.3 + class DictMixin: pass + MutableMapping = DictMixin +else : + import collections + MutableMapping = collections.MutableMapping class DBEnv: def __init__(self, *args, **kwargs): @@ -96,9 +107,8 @@ class DBEnv: def set_get_returns_none(self, *args, **kwargs): return apply(self._cobj.set_get_returns_none, args, kwargs) - if db.version() >= (4,0): - def log_stat(self, *args, **kwargs): - return apply(self._cobj.log_stat, args, kwargs) + def log_stat(self, *args, **kwargs): + return apply(self._cobj.log_stat, args, kwargs) if db.version() >= (4,1): def dbremove(self, *args, **kwargs): @@ -113,7 +123,7 @@ class DBEnv: return apply(self._cobj.lsn_reset, args, kwargs) -class DB(DictMixin): +class DB(MutableMapping): def __init__(self, dbenv, *args, **kwargs): # give it the proper DBEnv C object that its expecting self._cobj = apply(db.DB, (dbenv._cobj,) + args, kwargs) @@ -128,6 +138,10 @@ class DB(DictMixin): def __delitem__(self, arg): del self._cobj[arg] + if sys.version_info[0:2] >= (2, 6) : + def __iter__(self) : + return self._cobj.__iter__() + def append(self, *args, **kwargs): return apply(self._cobj.append, args, kwargs) def associate(self, *args, **kwargs): diff --git a/Lib/bsddb/dbshelve.py b/Lib/bsddb/dbshelve.py index 6d7414e..f5f6f8e 100644 --- a/Lib/bsddb/dbshelve.py +++ b/Lib/bsddb/dbshelve.py @@ -30,11 +30,17 @@ storage. #------------------------------------------------------------------------ import cPickle -import db import sys -#At version 2.3 cPickle switched to using protocol instead of bin and -#DictMixin was added +import sys +absolute_import = (sys.version_info[0] >= 3) +if absolute_import : + # Because this syntaxis is not valid before Python 2.5 + exec("from . import db") +else : + import db + +#At version 2.3 cPickle switched to using protocol instead of bin if sys.version_info[:3] >= (2, 3, 0): HIGHEST_PROTOCOL = cPickle.HIGHEST_PROTOCOL # In python 2.3.*, "cPickle.dumps" accepts no @@ -47,13 +53,22 @@ if sys.version_info[:3] >= (2, 3, 0): def _dumps(object, protocol): return cPickle.dumps(object, protocol=protocol) - from UserDict import DictMixin - else: HIGHEST_PROTOCOL = None def _dumps(object, protocol): return cPickle.dumps(object, bin=protocol) - class DictMixin: pass + + +if sys.version_info[0:2] <= (2, 5) : + try: + from UserDict import DictMixin + except ImportError: + # DictMixin is new in Python 2.3 + class DictMixin: pass + MutableMapping = DictMixin +else : + import collections + MutableMapping = collections.MutableMapping #------------------------------------------------------------------------ @@ -96,7 +111,7 @@ def open(filename, flags=db.DB_CREATE, mode=0660, filetype=db.DB_HASH, class DBShelveError(db.DBError): pass -class DBShelf(DictMixin): +class DBShelf(MutableMapping): """A shelf to hold pickled objects, built upon a bsddb DB object. It automatically pickles/unpickles data objects going to/from the DB. """ @@ -147,6 +162,10 @@ class DBShelf(DictMixin): else: return self.db.keys() + if sys.version_info[0:2] >= (2, 6) : + def __iter__(self) : + return self.db.__iter__() + def open(self, *args, **kwargs): self.db.open(*args, **kwargs) diff --git a/Lib/bsddb/dbtables.py b/Lib/bsddb/dbtables.py index 27028d3..ef52fed 100644 --- a/Lib/bsddb/dbtables.py +++ b/Lib/bsddb/dbtables.py @@ -22,7 +22,6 @@ import sys import copy import random import struct -from types import ListType, StringType import cPickle as pickle try: @@ -229,7 +228,7 @@ class bsdTableDB : raises TableDBError if it already exists or for other DB errors. """ - assert isinstance(columns, ListType) + assert isinstance(columns, list) txn = None try: # checking sanity of the table and column names here on @@ -270,7 +269,7 @@ class bsdTableDB : """Return a list of columns in the given table. [] if the table doesn't exist. """ - assert isinstance(table, StringType) + assert isinstance(table, str) if contains_metastrings(table): raise ValueError, "bad table name: contains reserved metastrings" @@ -300,7 +299,7 @@ class bsdTableDB : additional columns present in the given list as well as all of its current columns. """ - assert isinstance(columns, ListType) + assert isinstance(columns, list) try: self.CreateTable(table, columns) except TableAlreadyExists: diff --git a/Lib/bsddb/dbutils.py b/Lib/bsddb/dbutils.py index 9a2f010..8d2e7ef 100644 --- a/Lib/bsddb/dbutils.py +++ b/Lib/bsddb/dbutils.py @@ -26,7 +26,13 @@ # from time import sleep as _sleep -import db +import sys +absolute_import = (sys.version_info[0] >= 3) +if absolute_import : + # Because this syntaxis is not valid before Python 2.5 + exec("from . import db") +else : + import db # always sleep at least N seconds between retrys _deadlock_MinSleepTime = 1.0/128 diff --git a/Lib/bsddb/test/test_all.py b/Lib/bsddb/test/test_all.py index 11043a2..7ad9a6f 100644 --- a/Lib/bsddb/test/test_all.py +++ b/Lib/bsddb/test/test_all.py @@ -7,15 +7,24 @@ import unittest try: # For Pythons w/distutils pybsddb from bsddb3 import db + import bsddb3 as bsddb except ImportError: # For Python 2.3 from bsddb import db + import bsddb try: from bsddb3 import test_support except ImportError: from test import test_support +try: + from threading import Thread, currentThread + del Thread, currentThread + have_threads = True +except ImportError: + have_threads = False + verbose = 0 if 'verbose' in sys.argv: verbose = 1 @@ -33,6 +42,8 @@ def print_versions(): print 'bsddb.db.version(): %s' % (db.version(), ) print 'bsddb.db.__version__: %s' % db.__version__ print 'bsddb.db.cvsid: %s' % db.cvsid + print 'py module: %s' % bsddb.__file__ + print 'extension module: %s' % bsddb._bsddb.__file__ print 'python version: %s' % sys.version print 'My pid: %s' % os.getpid() print '-=' * 38 @@ -81,11 +92,11 @@ def set_test_path_prefix(path) : def remove_test_path_directory() : test_support.rmtree(get_new_path.prefix) -try : +if have_threads : import threading get_new_path.mutex=threading.Lock() del threading -except ImportError: +else : class Lock(object) : def acquire(self) : pass @@ -104,8 +115,12 @@ class PrintInfoFakeTest(unittest.TestCase): # This little hack is for when this module is run as main and all the # other modules import it so they will still be able to get the right # verbose setting. It's confusing but it works. -import test_all -test_all.verbose = verbose +if sys.version_info[0] < 3 : + import test_all + test_all.verbose = verbose +else : + import sys + print >>sys.stderr, "Work to do!" def suite(module_prefix='', timing_check=None): diff --git a/Lib/bsddb/test/test_associate.py b/Lib/bsddb/test/test_associate.py index 10907e3..935f10a 100644 --- a/Lib/bsddb/test/test_associate.py +++ b/Lib/bsddb/test/test_associate.py @@ -6,14 +6,8 @@ import sys, os, string import time from pprint import pprint -try: - from threading import Thread, currentThread - have_threads = 1 -except ImportError: - have_threads = 0 - import unittest -from test_all import verbose, get_new_environment_path +from test_all import verbose, have_threads, get_new_environment_path try: # For Pythons w/distutils pybsddb @@ -435,24 +429,23 @@ class ThreadedAssociateRecnoTestCase(ShelveAssociateTestCase): def test_suite(): suite = unittest.TestSuite() - if db.version() >= (3, 3, 11): - suite.addTest(unittest.makeSuite(AssociateErrorTestCase)) + suite.addTest(unittest.makeSuite(AssociateErrorTestCase)) - suite.addTest(unittest.makeSuite(AssociateHashTestCase)) - suite.addTest(unittest.makeSuite(AssociateBTreeTestCase)) - suite.addTest(unittest.makeSuite(AssociateRecnoTestCase)) + suite.addTest(unittest.makeSuite(AssociateHashTestCase)) + suite.addTest(unittest.makeSuite(AssociateBTreeTestCase)) + suite.addTest(unittest.makeSuite(AssociateRecnoTestCase)) - if db.version() >= (4, 1): - suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase)) + if db.version() >= (4, 1): + suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase)) - suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase)) - suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase)) - suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase)) + suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase)) + suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase)) + suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase)) - if have_threads: - suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase)) - suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase)) - suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase)) + if have_threads: + suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase)) + suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase)) + suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase)) return suite diff --git a/Lib/bsddb/test/test_basics.py b/Lib/bsddb/test/test_basics.py index a234d2a..285e3a8 100644 --- a/Lib/bsddb/test/test_basics.py +++ b/Lib/bsddb/test/test_basics.py @@ -101,14 +101,14 @@ class BasicTestCase(unittest.TestCase): def populateDB(self, _txn=None): d = self.d - for x in range(self._numKeys/2): + for x in range(self._numKeys//2): key = '%04d' % (self._numKeys - x) # insert keys in reverse order data = self.makeData(key) d.put(key, data, _txn) d.put('empty value', '', _txn) - for x in range(self._numKeys/2-1): + for x in range(self._numKeys//2-1): key = '%04d' % x # and now some in forward order data = self.makeData(key) d.put(key, data, _txn) @@ -536,10 +536,6 @@ class BasicTestCase(unittest.TestCase): #---------------------------------------- def test06_Truncate(self): - if db.version() < (3,3): - # truncate is a feature of Berkeley DB 3.3 and above - return - d = self.d if verbose: print '\n', '-=' * 30 @@ -681,12 +677,11 @@ class BasicTransactionTestCase(BasicTestCase): except db.DBIncompleteError: pass - if db.version() >= (4,0): - statDict = self.env.log_stat(0); - self.assert_(statDict.has_key('magic')) - self.assert_(statDict.has_key('version')) - self.assert_(statDict.has_key('cur_file')) - self.assert_(statDict.has_key('region_nowait')) + statDict = self.env.log_stat(0); + self.assert_(statDict.has_key('magic')) + self.assert_(statDict.has_key('version')) + self.assert_(statDict.has_key('cur_file')) + self.assert_(statDict.has_key('region_nowait')) # must have at least one log file present: logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG) @@ -703,10 +698,6 @@ class BasicTransactionTestCase(BasicTestCase): #---------------------------------------- def test07_TxnTruncate(self): - if db.version() < (3,3): - # truncate is a feature of Berkeley DB 3.3 and above - return - d = self.d if verbose: print '\n', '-=' * 30 @@ -956,6 +947,55 @@ class HashMultiDBTestCase(BasicMultiDBTestCase): envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK +class PrivateObject(unittest.TestCase) : + import sys + if sys.version_info[:3] < (2, 4, 0): + def assertTrue(self, expr, msg=None): + self.failUnless(expr,msg=msg) + + def tearDown(self) : + del self.obj + + def test01_DefaultIsNone(self) : + self.assertEqual(self.obj.get_private(), None) + + def test02_assignment(self) : + a = "example of private object" + self.obj.set_private(a) + b = self.obj.get_private() + self.assertTrue(a is b) # Object identity + + def test03_leak_assignment(self) : + import sys + a = "example of private object" + refcount = sys.getrefcount(a) + self.obj.set_private(a) + self.assertEqual(refcount+1, sys.getrefcount(a)) + self.obj.set_private(None) + self.assertEqual(refcount, sys.getrefcount(a)) + + def test04_leak_GC(self) : + import sys + a = "example of private object" + refcount = sys.getrefcount(a) + self.obj.set_private(a) + self.obj = None + self.assertEqual(refcount, sys.getrefcount(a)) + +class DBEnvPrivateObject(PrivateObject) : + def setUp(self) : + self.obj = db.DBEnv() + +class DBPrivateObject(PrivateObject) : + def setUp(self) : + self.obj = db.DB() + +class CrashAndBurn(unittest.TestCase) : + def test01_OpenCrash(self) : + # See http://bugs.python.org/issue3307 + self.assertRaises(db.DBInvalidArgError, db.DB, None, 65535) + + #---------------------------------------------------------------------- #---------------------------------------------------------------------- @@ -979,6 +1019,9 @@ def test_suite(): suite.addTest(unittest.makeSuite(HashDUPWithThreadTestCase)) suite.addTest(unittest.makeSuite(BTreeMultiDBTestCase)) suite.addTest(unittest.makeSuite(HashMultiDBTestCase)) + suite.addTest(unittest.makeSuite(DBEnvPrivateObject)) + suite.addTest(unittest.makeSuite(DBPrivateObject)) + #suite.addTest(unittest.makeSuite(CrashAndBurn)) return suite diff --git a/Lib/bsddb/test/test_compare.py b/Lib/bsddb/test/test_compare.py index 940e56e..e221182 100644 --- a/Lib/bsddb/test/test_compare.py +++ b/Lib/bsddb/test/test_compare.py @@ -240,9 +240,8 @@ def test_suite (): res = unittest.TestSuite () res.addTest (unittest.makeSuite (ComparatorTests)) - if db.version () >= (3, 3, 11): - res.addTest (unittest.makeSuite (BtreeExceptionsTestCase)) - res.addTest (unittest.makeSuite (BtreeKeyCompareTestCase)) + res.addTest (unittest.makeSuite (BtreeExceptionsTestCase)) + res.addTest (unittest.makeSuite (BtreeKeyCompareTestCase)) return res if __name__ == '__main__': diff --git a/Lib/bsddb/test/test_lock.py b/Lib/bsddb/test/test_lock.py index 9b9ce7b..64a054c 100644 --- a/Lib/bsddb/test/test_lock.py +++ b/Lib/bsddb/test/test_lock.py @@ -4,15 +4,11 @@ TestCases for testing the locking sub-system. import time -try: - from threading import Thread, currentThread - have_threads = 1 -except ImportError: - have_threads = 0 - - import unittest -from test_all import verbose, get_new_environment_path, get_new_database_path +from test_all import verbose, have_threads, get_new_environment_path, get_new_database_path + +if have_threads : + from threading import Thread, currentThread try: # For Pythons w/distutils pybsddb @@ -62,8 +58,7 @@ class LockingTestCase(unittest.TestCase): self.env.lock_put(lock) if verbose: print "Released lock: %s" % lock - if db.version() >= (4,0): - self.env.lock_id_free(anID) + self.env.lock_id_free(anID) def test02_threaded(self): @@ -132,9 +127,8 @@ class LockingTestCase(unittest.TestCase): self.env.lock_put(lock) t.join() - if db.version() >= (4,0): - self.env.lock_id_free(anID) - self.env.lock_id_free(anID2) + self.env.lock_id_free(anID) + self.env.lock_id_free(anID2) if db.version() >= (4,6): self.assertTrue(deadlock_detection.count>0) @@ -159,8 +153,7 @@ class LockingTestCase(unittest.TestCase): if verbose: print "%s: Released %s lock: %s" % (name, lt, lock) - if db.version() >= (4,0): - self.env.lock_id_free(anID) + self.env.lock_id_free(anID) #---------------------------------------------------------------------- diff --git a/Lib/bsddb/test/test_misc.py b/Lib/bsddb/test/test_misc.py index 1da5830..dce7e0c 100644 --- a/Lib/bsddb/test/test_misc.py +++ b/Lib/bsddb/test/test_misc.py @@ -47,6 +47,18 @@ class MiscTestCase(unittest.TestCase): rp = repr(db) self.assertEquals(rp, "{}") + def test04_repr_db(self) : + db = hashopen(self.filename) + d = {} + for i in xrange(100) : + db[repr(i)] = repr(100*i) + d[repr(i)] = repr(100*i) + db.close() + db = hashopen(self.filename) + rp = repr(db) + self.assertEquals(rp, repr(d)) + db.close() + # http://sourceforge.net/tracker/index.php?func=detail&aid=1708868&group_id=13900&atid=313900 # # See the bug report for details. @@ -54,7 +66,7 @@ class MiscTestCase(unittest.TestCase): # The problem was that make_key_dbt() was not allocating a copy of # string keys but FREE_DBT() was always being told to free it when the # database was opened with DB_THREAD. - def test04_double_free_make_key_dbt(self): + def test05_double_free_make_key_dbt(self): try: db1 = db.DB() db1.open(self.filename, None, db.DB_BTREE, @@ -67,7 +79,7 @@ class MiscTestCase(unittest.TestCase): db1.close() os.unlink(self.filename) - def test05_key_with_null_bytes(self): + def test06_key_with_null_bytes(self): try: db1 = db.DB() db1.open(self.filename, None, db.DB_HASH, db.DB_CREATE) @@ -86,7 +98,7 @@ class MiscTestCase(unittest.TestCase): db1.close() os.unlink(self.filename) - def test_DB_set_flags_persists(self): + def test07_DB_set_flags_persists(self): if db.version() < (4,2): # The get_flags API required for this to work is only available # in Berkeley DB >= 4.2 diff --git a/Lib/bsddb/test/test_replication.py b/Lib/bsddb/test/test_replication.py index 25ca34a..8f2651f 100644 --- a/Lib/bsddb/test/test_replication.py +++ b/Lib/bsddb/test/test_replication.py @@ -12,7 +12,7 @@ except ImportError: # For Python 2.3 from bsddb import db -from test_all import get_new_environment_path, get_new_database_path +from test_all import have_threads, get_new_environment_path, get_new_database_path try: from bsddb3 import test_support @@ -58,6 +58,25 @@ class DBReplicationManager(unittest.TestCase): self.dbenvMaster.set_event_notify(confirmed_master) self.dbenvClient.set_event_notify(client_startupdone) + #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) + #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) + #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) + #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) + + self.dbMaster = self.dbClient = None + + + def tearDown(self): + if self.dbClient : + self.dbClient.close() + if self.dbMaster : + self.dbMaster.close() + self.dbenvClient.close() + self.dbenvMaster.close() + test_support.rmtree(self.homeDirClient) + test_support.rmtree(self.homeDirMaster) + + def test01_basic_replication(self) : master_port = test_support.find_unused_port() self.dbenvMaster.repmgr_set_local_site("127.0.0.1", master_port) client_port = test_support.find_unused_port() @@ -69,6 +88,27 @@ class DBReplicationManager(unittest.TestCase): self.dbenvMaster.rep_set_priority(10) self.dbenvClient.rep_set_priority(0) + self.dbenvMaster.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100123) + self.dbenvClient.rep_set_timeout(db.DB_REP_CONNECTION_RETRY,100321) + self.assertEquals(self.dbenvMaster.rep_get_timeout( + db.DB_REP_CONNECTION_RETRY), 100123) + self.assertEquals(self.dbenvClient.rep_get_timeout( + db.DB_REP_CONNECTION_RETRY), 100321) + + self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100234) + self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 100432) + self.assertEquals(self.dbenvMaster.rep_get_timeout( + db.DB_REP_ELECTION_TIMEOUT), 100234) + self.assertEquals(self.dbenvClient.rep_get_timeout( + db.DB_REP_ELECTION_TIMEOUT), 100432) + + self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100345) + self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_RETRY, 100543) + self.assertEquals(self.dbenvMaster.rep_get_timeout( + db.DB_REP_ELECTION_RETRY), 100345) + self.assertEquals(self.dbenvClient.rep_get_timeout( + db.DB_REP_ELECTION_RETRY), 100543) + self.dbenvMaster.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL) self.dbenvClient.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL) @@ -84,23 +124,14 @@ class DBReplicationManager(unittest.TestCase): self.assertEquals(self.dbenvClient.repmgr_get_ack_policy(), db.DB_REPMGR_ACKS_ALL) - #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) - #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) - #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) - #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) - - self.dbMaster = self.dbClient = None - # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE # is not generated if the master has no new transactions. # This is solved in BDB 4.6 (#15542). - timeout = time.time()+10 + import time + timeout = time.time()+2 while (time.time()<timeout) and not (self.confirmed_master and self.client_startupdone) : time.sleep(0.02) - if db.version() >= (4,6) : - self.assertTrue(time.time()<timeout) - else : - self.assertTrue(time.time()>=timeout) + self.assertTrue(time.time()<timeout) d = self.dbenvMaster.repmgr_site_list() self.assertEquals(len(d), 1) @@ -120,17 +151,158 @@ class DBReplicationManager(unittest.TestCase): d = self.dbenvMaster.repmgr_stat(flags=db.DB_STAT_CLEAR); self.assertTrue("msgs_queued" in d) + self.dbMaster=db.DB(self.dbenvMaster) + txn=self.dbenvMaster.txn_begin() + self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn) + txn.commit() + + import time,os.path + timeout=time.time()+10 + while (time.time()<timeout) and \ + not (os.path.exists(os.path.join(self.homeDirClient,"test"))) : + time.sleep(0.01) + + self.dbClient=db.DB(self.dbenvClient) + while True : + txn=self.dbenvClient.txn_begin() + try : + self.dbClient.open("test", db.DB_HASH, flags=db.DB_RDONLY, + mode=0666, txn=txn) + except db.DBRepHandleDeadError : + txn.abort() + self.dbClient.close() + self.dbClient=db.DB(self.dbenvClient) + continue + + txn.commit() + break + + txn=self.dbenvMaster.txn_begin() + self.dbMaster.put("ABC", "123", txn=txn) + txn.commit() + import time + timeout=time.time()+1 + v=None + while (time.time()<timeout) and (v==None) : + txn=self.dbenvClient.txn_begin() + v=self.dbClient.get("ABC", txn=txn) + txn.commit() + self.assertEquals("123", v) + + txn=self.dbenvMaster.txn_begin() + self.dbMaster.delete("ABC", txn=txn) + txn.commit() + timeout=time.time()+1 + while (time.time()<timeout) and (v!=None) : + txn=self.dbenvClient.txn_begin() + v=self.dbClient.get("ABC", txn=txn) + txn.commit() + self.assertEquals(None, v) + +class DBBaseReplication(DBReplicationManager): + def setUp(self) : + DBReplicationManager.setUp(self) + def confirmed_master(a,b,c) : + if (b == db.DB_EVENT_REP_MASTER) or (b == db.DB_EVENT_REP_ELECTED) : + self.confirmed_master = True + + def client_startupdone(a,b,c) : + if b == db.DB_EVENT_REP_STARTUPDONE : + self.client_startupdone = True + + self.dbenvMaster.set_event_notify(confirmed_master) + self.dbenvClient.set_event_notify(client_startupdone) + + import Queue + self.m2c = Queue.Queue() + self.c2m = Queue.Queue() + + # There are only two nodes, so we don't need to + # do any routing decision + def m2c(dbenv, control, rec, lsnp, envid, flags) : + self.m2c.put((control, rec)) + + def c2m(dbenv, control, rec, lsnp, envid, flags) : + self.c2m.put((control, rec)) + + self.dbenvMaster.rep_set_transport(13,m2c) + self.dbenvMaster.rep_set_priority(10) + self.dbenvClient.rep_set_transport(3,c2m) + self.dbenvClient.rep_set_priority(0) + + self.assertEquals(self.dbenvMaster.rep_get_priority(),10) + self.assertEquals(self.dbenvClient.rep_get_priority(),0) + + #self.dbenvMaster.set_verbose(db.DB_VERB_REPLICATION, True) + #self.dbenvMaster.set_verbose(db.DB_VERB_FILEOPS_ALL, True) + #self.dbenvClient.set_verbose(db.DB_VERB_REPLICATION, True) + #self.dbenvClient.set_verbose(db.DB_VERB_FILEOPS_ALL, True) + + def thread_master() : + return self.thread_do(self.dbenvMaster, self.c2m, 3, + self.master_doing_election, True) + + def thread_client() : + return self.thread_do(self.dbenvClient, self.m2c, 13, + self.client_doing_election, False) + + from threading import Thread + t_m=Thread(target=thread_master) + t_m.setDaemon(True) + t_c=Thread(target=thread_client) + t_c.setDaemon(True) + + self.t_m = t_m + self.t_c = t_c + + self.dbMaster = self.dbClient = None + + self.master_doing_election=[False] + self.client_doing_election=[False] + + def tearDown(self): if self.dbClient : self.dbClient.close() if self.dbMaster : self.dbMaster.close() + self.m2c.put(None) + self.c2m.put(None) + self.t_m.join() + self.t_c.join() self.dbenvClient.close() self.dbenvMaster.close() test_support.rmtree(self.homeDirClient) test_support.rmtree(self.homeDirMaster) + def basic_rep_threading(self) : + self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER) + self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT) + + def thread_do(env, q, envid, election_status, must_be_master) : + while True : + v=q.get() + if v == None : return + env.rep_process_message(v[0], v[1], envid) + + self.thread_do = thread_do + + self.t_m.start() + self.t_c.start() + def test01_basic_replication(self) : + self.basic_rep_threading() + + # The timeout is necessary in BDB 4.5, since DB_EVENT_REP_STARTUPDONE + # is not generated if the master has no new transactions. + # This is solved in BDB 4.6 (#15542). + import time + timeout = time.time()+2 + while (time.time()<timeout) and not (self.confirmed_master and + self.client_startupdone) : + time.sleep(0.02) + self.assertTrue(time.time()<timeout) + self.dbMaster=db.DB(self.dbenvMaster) txn=self.dbenvMaster.txn_begin() self.dbMaster.open("test", db.DB_HASH, db.DB_CREATE, 0666, txn=txn) @@ -179,11 +351,69 @@ class DBReplicationManager(unittest.TestCase): txn.commit() self.assertEquals(None, v) + if db.version() >= (4,7) : + def test02_test_request(self) : + self.basic_rep_threading() + (minimum, maximum) = self.dbenvClient.rep_get_request() + self.dbenvClient.rep_set_request(minimum-1, maximum+1) + self.assertEqual(self.dbenvClient.rep_get_request(), + (minimum-1, maximum+1)) + + if db.version() >= (4,6) : + def test03_master_election(self) : + # Get ready to hold an election + #self.dbenvMaster.rep_start(flags=db.DB_REP_MASTER) + self.dbenvMaster.rep_start(flags=db.DB_REP_CLIENT) + self.dbenvClient.rep_start(flags=db.DB_REP_CLIENT) + + def thread_do(env, q, envid, election_status, must_be_master) : + while True : + v=q.get() + if v == None : return + r = env.rep_process_message(v[0],v[1],envid) + if must_be_master and self.confirmed_master : + self.dbenvMaster.rep_start(flags = db.DB_REP_MASTER) + must_be_master = False + + if r[0] == db.DB_REP_HOLDELECTION : + def elect() : + while True : + try : + env.rep_elect(2, 1) + election_status[0] = False + break + except db.DBRepUnavailError : + pass + if not election_status[0] and not self.confirmed_master : + from threading import Thread + election_status[0] = True + t=Thread(target=elect) + t.setDaemon(True) + t.start() + + self.thread_do = thread_do + + self.t_m.start() + self.t_c.start() + + self.dbenvMaster.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000) + self.dbenvClient.rep_set_timeout(db.DB_REP_ELECTION_TIMEOUT, 50000) + self.client_doing_election[0] = True + while True : + try : + self.dbenvClient.rep_elect(2, 1) + self.client_doing_election[0] = False + break + except db.DBRepUnavailError : + pass + + self.assertTrue(self.confirmed_master) + #---------------------------------------------------------------------- def test_suite(): suite = unittest.TestSuite() - if db.version() >= (4,5) : + if db.version() >= (4, 6) : dbenv = db.DBEnv() try : dbenv.repmgr_get_ack_policy() @@ -194,6 +424,10 @@ def test_suite(): del dbenv if ReplicationManager_available : suite.addTest(unittest.makeSuite(DBReplicationManager)) + + if have_threads : + suite.addTest(unittest.makeSuite(DBBaseReplication)) + return suite diff --git a/Lib/bsddb/test/test_thread.py b/Lib/bsddb/test/test_thread.py index 5ac98ce..bdeaf93 100644 --- a/Lib/bsddb/test/test_thread.py +++ b/Lib/bsddb/test/test_thread.py @@ -7,28 +7,19 @@ import time import errno from random import random -try: - True, False -except NameError: - True = 1 - False = 0 - DASH = '-' try: - from threading import Thread, currentThread - have_threads = True -except ImportError: - have_threads = False - -try: WindowsError except NameError: class WindowsError(Exception): pass import unittest -from test_all import verbose, get_new_environment_path, get_new_database_path +from test_all import verbose, have_threads, get_new_environment_path, get_new_database_path + +if have_threads : + from threading import Thread, currentThread try: @@ -103,8 +94,8 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): keys=range(self.records) import random random.shuffle(keys) - records_per_writer=self.records/self.writers - readers_per_writer=self.readers/self.writers + records_per_writer=self.records//self.writers + readers_per_writer=self.readers//self.writers self.assertEqual(self.records,self.writers*records_per_writer) self.assertEqual(self.readers,self.writers*readers_per_writer) self.assertTrue((records_per_writer%readers_per_writer)==0) @@ -143,7 +134,7 @@ class ConcurrentDataStoreBase(BaseThreadedTestCase): if verbose: print "%s: creating records %d - %d" % (name, start, stop) - count=len(keys)/len(readers) + count=len(keys)//len(readers) count2=count for x in keys : key = '%04d' % x @@ -218,8 +209,8 @@ class SimpleThreadedBase(BaseThreadedTestCase): keys=range(self.records) import random random.shuffle(keys) - records_per_writer=self.records/self.writers - readers_per_writer=self.readers/self.writers + records_per_writer=self.records//self.writers + readers_per_writer=self.readers//self.writers self.assertEqual(self.records,self.writers*records_per_writer) self.assertEqual(self.readers,self.writers*readers_per_writer) self.assertTrue((records_per_writer%readers_per_writer)==0) @@ -258,7 +249,7 @@ class SimpleThreadedBase(BaseThreadedTestCase): if verbose: print "%s: creating records %d - %d" % (name, start, stop) - count=len(keys)/len(readers) + count=len(keys)//len(readers) count2=count for x in keys : key = '%04d' % x @@ -332,8 +323,8 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): keys=range(self.records) import random random.shuffle(keys) - records_per_writer=self.records/self.writers - readers_per_writer=self.readers/self.writers + records_per_writer=self.records//self.writers + readers_per_writer=self.readers//self.writers self.assertEqual(self.records,self.writers*records_per_writer) self.assertEqual(self.readers,self.writers*readers_per_writer) self.assertTrue((records_per_writer%readers_per_writer)==0) @@ -375,7 +366,7 @@ class ThreadedTransactionsBase(BaseThreadedTestCase): def writerThread(self, d, keys, readers): name = currentThread().getName() - count=len(keys)/len(readers) + count=len(keys)//len(readers) while len(keys): try: txn = self.env.txn_begin(None, self.txnFlag) |