summaryrefslogtreecommitdiffstats
path: root/Lib/test/lock_tests.py
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2009-11-09 16:52:46 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2009-11-09 16:52:46 (GMT)
commit959f3e50322467b9d894de67b118eb688753f9f1 (patch)
tree3368d6ff7fcaf4ff8923f72b89643de9634fb344 /Lib/test/lock_tests.py
parent536d299ca0d06c249f2674266acaf4fc4411d5bf (diff)
downloadcpython-959f3e50322467b9d894de67b118eb688753f9f1.zip
cpython-959f3e50322467b9d894de67b118eb688753f9f1.tar.gz
cpython-959f3e50322467b9d894de67b118eb688753f9f1.tar.bz2
Merged revisions 76138,76173 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/branches/py3k ................ r76138 | antoine.pitrou | 2009-11-06 23:41:14 +0100 (ven., 06 nov. 2009) | 10 lines Merged revisions 76137 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r76137 | antoine.pitrou | 2009-11-06 23:34:35 +0100 (ven., 06 nov. 2009) | 4 lines Issue #7270: Add some dedicated unit tests for multi-thread synchronization primitives such as Lock, RLock, Condition, Event and Semaphore. ........ ................ r76173 | antoine.pitrou | 2009-11-09 17:08:16 +0100 (lun., 09 nov. 2009) | 11 lines Merged revisions 76172 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r76172 | antoine.pitrou | 2009-11-09 17:00:11 +0100 (lun., 09 nov. 2009) | 5 lines Issue #7282: Fix a memory leak when an RLock was used in a thread other than those started through `threading.Thread` (for example, using `thread.start_new_thread()`. ........ ................
Diffstat (limited to 'Lib/test/lock_tests.py')
-rw-r--r--Lib/test/lock_tests.py546
1 files changed, 546 insertions, 0 deletions
diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
new file mode 100644
index 0000000..04f7422
--- /dev/null
+++ b/Lib/test/lock_tests.py
@@ -0,0 +1,546 @@
+"""
+Various tests for synchronization primitives.
+"""
+
+import sys
+import time
+from _thread import start_new_thread, get_ident
+import threading
+import unittest
+
+from test import support
+
+
+def _wait():
+ # A crude wait/yield function not relying on synchronization primitives.
+ time.sleep(0.01)
+
+class Bunch(object):
+ """
+ A bunch of threads.
+ """
+ def __init__(self, f, n, wait_before_exit=False):
+ """
+ Construct a bunch of `n` threads running the same function `f`.
+ If `wait_before_exit` is True, the threads won't terminate until
+ do_finish() is called.
+ """
+ self.f = f
+ self.n = n
+ self.started = []
+ self.finished = []
+ self._can_exit = not wait_before_exit
+ def task():
+ tid = get_ident()
+ self.started.append(tid)
+ try:
+ f()
+ finally:
+ self.finished.append(tid)
+ while not self._can_exit:
+ _wait()
+ for i in range(n):
+ start_new_thread(task, ())
+
+ def wait_for_started(self):
+ while len(self.started) < self.n:
+ _wait()
+
+ def wait_for_finished(self):
+ while len(self.finished) < self.n:
+ _wait()
+
+ def do_finish(self):
+ self._can_exit = True
+
+
+class BaseTestCase(unittest.TestCase):
+ def setUp(self):
+ self._threads = support.threading_setup()
+
+ def tearDown(self):
+ support.threading_cleanup(*self._threads)
+ support.reap_children()
+
+
+class BaseLockTests(BaseTestCase):
+ """
+ Tests for both recursive and non-recursive locks.
+ """
+
+ def test_constructor(self):
+ lock = self.locktype()
+ del lock
+
+ def test_acquire_destroy(self):
+ lock = self.locktype()
+ lock.acquire()
+ del lock
+
+ def test_acquire_release(self):
+ lock = self.locktype()
+ lock.acquire()
+ lock.release()
+ del lock
+
+ def test_try_acquire(self):
+ lock = self.locktype()
+ self.assertTrue(lock.acquire(False))
+ lock.release()
+
+ def test_try_acquire_contended(self):
+ lock = self.locktype()
+ lock.acquire()
+ result = []
+ def f():
+ result.append(lock.acquire(False))
+ Bunch(f, 1).wait_for_finished()
+ self.assertFalse(result[0])
+ lock.release()
+
+ def test_acquire_contended(self):
+ lock = self.locktype()
+ lock.acquire()
+ N = 5
+ def f():
+ lock.acquire()
+ lock.release()
+
+ b = Bunch(f, N)
+ b.wait_for_started()
+ _wait()
+ self.assertEqual(len(b.finished), 0)
+ lock.release()
+ b.wait_for_finished()
+ self.assertEqual(len(b.finished), N)
+
+ def test_with(self):
+ lock = self.locktype()
+ def f():
+ lock.acquire()
+ lock.release()
+ def _with(err=None):
+ with lock:
+ if err is not None:
+ raise err
+ _with()
+ # Check the lock is unacquired
+ Bunch(f, 1).wait_for_finished()
+ self.assertRaises(TypeError, _with, TypeError)
+ # Check the lock is unacquired
+ Bunch(f, 1).wait_for_finished()
+
+ def test_thread_leak(self):
+ # The lock shouldn't leak a Thread instance when used from a foreign
+ # (non-threading) thread.
+ lock = self.locktype()
+ def f():
+ lock.acquire()
+ lock.release()
+ n = len(threading.enumerate())
+ # We run many threads in the hope that existing threads ids won't
+ # be recycled.
+ Bunch(f, 15).wait_for_finished()
+ self.assertEqual(n, len(threading.enumerate()))
+
+
+class LockTests(BaseLockTests):
+ """
+ Tests for non-recursive, weak locks
+ (which can be acquired and released from different threads).
+ """
+ def test_reacquire(self):
+ # Lock needs to be released before re-acquiring.
+ lock = self.locktype()
+ phase = []
+ def f():
+ lock.acquire()
+ phase.append(None)
+ lock.acquire()
+ phase.append(None)
+ start_new_thread(f, ())
+ while len(phase) == 0:
+ _wait()
+ _wait()
+ self.assertEqual(len(phase), 1)
+ lock.release()
+ while len(phase) == 1:
+ _wait()
+ self.assertEqual(len(phase), 2)
+
+ def test_different_thread(self):
+ # Lock can be released from a different thread.
+ lock = self.locktype()
+ lock.acquire()
+ def f():
+ lock.release()
+ b = Bunch(f, 1)
+ b.wait_for_finished()
+ lock.acquire()
+ lock.release()
+
+
+class RLockTests(BaseLockTests):
+ """
+ Tests for recursive locks.
+ """
+ def test_reacquire(self):
+ lock = self.locktype()
+ lock.acquire()
+ lock.acquire()
+ lock.release()
+ lock.acquire()
+ lock.release()
+ lock.release()
+
+ def test_release_unacquired(self):
+ # Cannot release an unacquired lock
+ lock = self.locktype()
+ self.assertRaises(RuntimeError, lock.release)
+ lock.acquire()
+ lock.acquire()
+ lock.release()
+ lock.acquire()
+ lock.release()
+ lock.release()
+ self.assertRaises(RuntimeError, lock.release)
+
+ def test_different_thread(self):
+ # Cannot release from a different thread
+ lock = self.locktype()
+ def f():
+ lock.acquire()
+ b = Bunch(f, 1, True)
+ try:
+ self.assertRaises(RuntimeError, lock.release)
+ finally:
+ b.do_finish()
+
+ def test__is_owned(self):
+ lock = self.locktype()
+ self.assertFalse(lock._is_owned())
+ lock.acquire()
+ self.assertTrue(lock._is_owned())
+ lock.acquire()
+ self.assertTrue(lock._is_owned())
+ result = []
+ def f():
+ result.append(lock._is_owned())
+ Bunch(f, 1).wait_for_finished()
+ self.assertFalse(result[0])
+ lock.release()
+ self.assertTrue(lock._is_owned())
+ lock.release()
+ self.assertFalse(lock._is_owned())
+
+
+class EventTests(BaseTestCase):
+ """
+ Tests for Event objects.
+ """
+
+ def test_is_set(self):
+ evt = self.eventtype()
+ self.assertFalse(evt.is_set())
+ evt.set()
+ self.assertTrue(evt.is_set())
+ evt.set()
+ self.assertTrue(evt.is_set())
+ evt.clear()
+ self.assertFalse(evt.is_set())
+ evt.clear()
+ self.assertFalse(evt.is_set())
+
+ def _check_notify(self, evt):
+ # All threads get notified
+ N = 5
+ results1 = []
+ results2 = []
+ def f():
+ results1.append(evt.wait())
+ results2.append(evt.wait())
+ b = Bunch(f, N)
+ b.wait_for_started()
+ _wait()
+ self.assertEqual(len(results1), 0)
+ evt.set()
+ b.wait_for_finished()
+ self.assertEqual(results1, [True] * N)
+ self.assertEqual(results2, [True] * N)
+
+ def test_notify(self):
+ evt = self.eventtype()
+ self._check_notify(evt)
+ # Another time, after an explicit clear()
+ evt.set()
+ evt.clear()
+ self._check_notify(evt)
+
+ def test_timeout(self):
+ evt = self.eventtype()
+ results1 = []
+ results2 = []
+ N = 5
+ def f():
+ results1.append(evt.wait(0.0))
+ t1 = time.time()
+ r = evt.wait(0.2)
+ t2 = time.time()
+ results2.append((r, t2 - t1))
+ Bunch(f, N).wait_for_finished()
+ self.assertEqual(results1, [False] * N)
+ for r, dt in results2:
+ self.assertFalse(r)
+ self.assertTrue(dt >= 0.2, dt)
+ # The event is set
+ results1 = []
+ results2 = []
+ evt.set()
+ Bunch(f, N).wait_for_finished()
+ self.assertEqual(results1, [True] * N)
+ for r, dt in results2:
+ self.assertTrue(r)
+
+
+class ConditionTests(BaseTestCase):
+ """
+ Tests for condition variables.
+ """
+
+ def test_acquire(self):
+ cond = self.condtype()
+ # Be default we have an RLock: the condition can be acquired multiple
+ # times.
+ cond.acquire()
+ cond.acquire()
+ cond.release()
+ cond.release()
+ lock = threading.Lock()
+ cond = self.condtype(lock)
+ cond.acquire()
+ self.assertFalse(lock.acquire(False))
+ cond.release()
+ self.assertTrue(lock.acquire(False))
+ self.assertFalse(cond.acquire(False))
+ lock.release()
+ with cond:
+ self.assertFalse(lock.acquire(False))
+
+ def test_unacquired_wait(self):
+ cond = self.condtype()
+ self.assertRaises(RuntimeError, cond.wait)
+
+ def test_unacquired_notify(self):
+ cond = self.condtype()
+ self.assertRaises(RuntimeError, cond.notify)
+
+ def _check_notify(self, cond):
+ N = 5
+ results1 = []
+ results2 = []
+ phase_num = 0
+ def f():
+ cond.acquire()
+ cond.wait()
+ cond.release()
+ results1.append(phase_num)
+ cond.acquire()
+ cond.wait()
+ cond.release()
+ results2.append(phase_num)
+ b = Bunch(f, N)
+ b.wait_for_started()
+ _wait()
+ self.assertEqual(results1, [])
+ # Notify 3 threads at first
+ cond.acquire()
+ cond.notify(3)
+ _wait()
+ phase_num = 1
+ cond.release()
+ while len(results1) < 3:
+ _wait()
+ self.assertEqual(results1, [1] * 3)
+ self.assertEqual(results2, [])
+ # Notify 5 threads: they might be in their first or second wait
+ cond.acquire()
+ cond.notify(5)
+ _wait()
+ phase_num = 2
+ cond.release()
+ while len(results1) + len(results2) < 8:
+ _wait()
+ self.assertEqual(results1, [1] * 3 + [2] * 2)
+ self.assertEqual(results2, [2] * 3)
+ # Notify all threads: they are all in their second wait
+ cond.acquire()
+ cond.notify_all()
+ _wait()
+ phase_num = 3
+ cond.release()
+ while len(results2) < 5:
+ _wait()
+ self.assertEqual(results1, [1] * 3 + [2] * 2)
+ self.assertEqual(results2, [2] * 3 + [3] * 2)
+ b.wait_for_finished()
+
+ def test_notify(self):
+ cond = self.condtype()
+ self._check_notify(cond)
+ # A second time, to check internal state is still ok.
+ self._check_notify(cond)
+
+ def test_timeout(self):
+ cond = self.condtype()
+ results = []
+ N = 5
+ def f():
+ cond.acquire()
+ t1 = time.time()
+ cond.wait(0.2)
+ t2 = time.time()
+ cond.release()
+ results.append(t2 - t1)
+ Bunch(f, N).wait_for_finished()
+ self.assertEqual(len(results), 5)
+ for dt in results:
+ self.assertTrue(dt >= 0.2, dt)
+
+
+class BaseSemaphoreTests(BaseTestCase):
+ """
+ Common tests for {bounded, unbounded} semaphore objects.
+ """
+
+ def test_constructor(self):
+ self.assertRaises(ValueError, self.semtype, value = -1)
+ self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
+
+ def test_acquire(self):
+ sem = self.semtype(1)
+ sem.acquire()
+ sem.release()
+ sem = self.semtype(2)
+ sem.acquire()
+ sem.acquire()
+ sem.release()
+ sem.release()
+
+ def test_acquire_destroy(self):
+ sem = self.semtype()
+ sem.acquire()
+ del sem
+
+ def test_acquire_contended(self):
+ sem = self.semtype(7)
+ sem.acquire()
+ N = 10
+ results1 = []
+ results2 = []
+ phase_num = 0
+ def f():
+ sem.acquire()
+ results1.append(phase_num)
+ sem.acquire()
+ results2.append(phase_num)
+ b = Bunch(f, 10)
+ b.wait_for_started()
+ while len(results1) + len(results2) < 6:
+ _wait()
+ self.assertEqual(results1 + results2, [0] * 6)
+ phase_num = 1
+ for i in range(7):
+ sem.release()
+ while len(results1) + len(results2) < 13:
+ _wait()
+ self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
+ phase_num = 2
+ for i in range(6):
+ sem.release()
+ while len(results1) + len(results2) < 19:
+ _wait()
+ self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
+ # The semaphore is still locked
+ self.assertFalse(sem.acquire(False))
+ # Final release, to let the last thread finish
+ sem.release()
+ b.wait_for_finished()
+
+ def test_try_acquire(self):
+ sem = self.semtype(2)
+ self.assertTrue(sem.acquire(False))
+ self.assertTrue(sem.acquire(False))
+ self.assertFalse(sem.acquire(False))
+ sem.release()
+ self.assertTrue(sem.acquire(False))
+
+ def test_try_acquire_contended(self):
+ sem = self.semtype(4)
+ sem.acquire()
+ results = []
+ def f():
+ results.append(sem.acquire(False))
+ results.append(sem.acquire(False))
+ Bunch(f, 5).wait_for_finished()
+ # There can be a thread switch between acquiring the semaphore and
+ # appending the result, therefore results will not necessarily be
+ # ordered.
+ self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
+
+ def test_default_value(self):
+ # The default initial value is 1.
+ sem = self.semtype()
+ sem.acquire()
+ def f():
+ sem.acquire()
+ sem.release()
+ b = Bunch(f, 1)
+ b.wait_for_started()
+ _wait()
+ self.assertFalse(b.finished)
+ sem.release()
+ b.wait_for_finished()
+
+ def test_with(self):
+ sem = self.semtype(2)
+ def _with(err=None):
+ with sem:
+ self.assertTrue(sem.acquire(False))
+ sem.release()
+ with sem:
+ self.assertFalse(sem.acquire(False))
+ if err:
+ raise err
+ _with()
+ self.assertTrue(sem.acquire(False))
+ sem.release()
+ self.assertRaises(TypeError, _with, TypeError)
+ self.assertTrue(sem.acquire(False))
+ sem.release()
+
+class SemaphoreTests(BaseSemaphoreTests):
+ """
+ Tests for unbounded semaphores.
+ """
+
+ def test_release_unacquired(self):
+ # Unbounded releases are allowed and increment the semaphore's value
+ sem = self.semtype(1)
+ sem.release()
+ sem.acquire()
+ sem.acquire()
+ sem.release()
+
+
+class BoundedSemaphoreTests(BaseSemaphoreTests):
+ """
+ Tests for bounded semaphores.
+ """
+
+ def test_release_unacquired(self):
+ # Cannot go past the initial value
+ sem = self.semtype()
+ self.assertRaises(ValueError, sem.release)
+ sem.acquire()
+ sem.release()
+ self.assertRaises(ValueError, sem.release)