summaryrefslogtreecommitdiffstats
path: root/Lib/bsddb
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/bsddb')
-rw-r--r--Lib/bsddb/test/test_lock.py58
1 files changed, 49 insertions, 9 deletions
diff --git a/Lib/bsddb/test/test_lock.py b/Lib/bsddb/test/test_lock.py
index 973bc42..402d147 100644
--- a/Lib/bsddb/test/test_lock.py
+++ b/Lib/bsddb/test/test_lock.py
@@ -94,15 +94,55 @@ class LockingTestCase(unittest.TestCase):
for t in threads:
t.join()
- def test03_set_timeout(self):
- # test that the set_timeout call works
- if hasattr(self.env, 'set_timeout'):
- self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
- self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
- self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
- self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
-
- def theThread(self, sleepTime, lockType):
+ def _DISABLED_test03_lock_timeout(self):
+ # Disabled as this test crashes the python interpreter built in
+ # debug mode with:
+ # Fatal Python error: UNREF invalid object
+ # the error occurs as marked below.
+ self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
+ self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
+ self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
+ self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
+
+ def deadlock_detection() :
+ while not deadlock_detection.end :
+ deadlock_detection.count = \
+ self.env.lock_detect(db.DB_LOCK_EXPIRE)
+ if deadlock_detection.count :
+ while not deadlock_detection.end :
+ pass
+ break
+ time.sleep(0.01)
+
+ deadlock_detection.end=False
+ deadlock_detection.count=0
+ t=Thread(target=deadlock_detection)
+ t.setDaemon(True)
+ t.start()
+ self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)
+ anID = self.env.lock_id()
+ anID2 = self.env.lock_id()
+ self.assertNotEqual(anID, anID2)
+ lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)
+ start_time=time.time()
+ # FIXME: I see the UNREF crash as the interpreter trys to exit
+ # from this call to lock_get.
+ self.assertRaises(db.DBLockNotGrantedError,
+ self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)
+ end_time=time.time()
+ deadlock_detection.end=True
+ self.assertTrue((end_time-start_time) >= 0.1)
+ self.env.lock_put(lock)
+ t.join()
+
+ if db.version() >= (4,0):
+ self.env.lock_id_free(anID)
+ self.env.lock_id_free(anID2)
+
+ if db.version() >= (4,6):
+ self.assertTrue(deadlock_detection.count>0)
+
+ def theThread(self, lockType):
name = currentThread().getName()
if lockType == db.DB_LOCK_WRITE:
lt = "write"