diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2009-11-09 16:52:46 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2009-11-09 16:52:46 (GMT) |
commit | 959f3e50322467b9d894de67b118eb688753f9f1 (patch) | |
tree | 3368d6ff7fcaf4ff8923f72b89643de9634fb344 /Lib/test/lock_tests.py | |
parent | 536d299ca0d06c249f2674266acaf4fc4411d5bf (diff) | |
download | cpython-959f3e50322467b9d894de67b118eb688753f9f1.zip cpython-959f3e50322467b9d894de67b118eb688753f9f1.tar.gz cpython-959f3e50322467b9d894de67b118eb688753f9f1.tar.bz2 |
Merged revisions 76138,76173 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/branches/py3k
................
r76138 | antoine.pitrou | 2009-11-06 23:41:14 +0100 (ven., 06 nov. 2009) | 10 lines
Merged revisions 76137 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r76137 | antoine.pitrou | 2009-11-06 23:34:35 +0100 (ven., 06 nov. 2009) | 4 lines
Issue #7270: Add some dedicated unit tests for multi-thread synchronization
primitives such as Lock, RLock, Condition, Event and Semaphore.
........
................
r76173 | antoine.pitrou | 2009-11-09 17:08:16 +0100 (lun., 09 nov. 2009) | 11 lines
Merged revisions 76172 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r76172 | antoine.pitrou | 2009-11-09 17:00:11 +0100 (lun., 09 nov. 2009) | 5 lines
Issue #7282: Fix a memory leak when an RLock was used in a thread other
than those started through `threading.Thread` (for example, using
`thread.start_new_thread()`.
........
................
Diffstat (limited to 'Lib/test/lock_tests.py')
-rw-r--r-- | Lib/test/lock_tests.py | 546 |
1 files changed, 546 insertions, 0 deletions
diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py new file mode 100644 index 0000000..04f7422 --- /dev/null +++ b/Lib/test/lock_tests.py @@ -0,0 +1,546 @@ +""" +Various tests for synchronization primitives. +""" + +import sys +import time +from _thread import start_new_thread, get_ident +import threading +import unittest + +from test import support + + +def _wait(): + # A crude wait/yield function not relying on synchronization primitives. + time.sleep(0.01) + +class Bunch(object): + """ + A bunch of threads. + """ + def __init__(self, f, n, wait_before_exit=False): + """ + Construct a bunch of `n` threads running the same function `f`. + If `wait_before_exit` is True, the threads won't terminate until + do_finish() is called. + """ + self.f = f + self.n = n + self.started = [] + self.finished = [] + self._can_exit = not wait_before_exit + def task(): + tid = get_ident() + self.started.append(tid) + try: + f() + finally: + self.finished.append(tid) + while not self._can_exit: + _wait() + for i in range(n): + start_new_thread(task, ()) + + def wait_for_started(self): + while len(self.started) < self.n: + _wait() + + def wait_for_finished(self): + while len(self.finished) < self.n: + _wait() + + def do_finish(self): + self._can_exit = True + + +class BaseTestCase(unittest.TestCase): + def setUp(self): + self._threads = support.threading_setup() + + def tearDown(self): + support.threading_cleanup(*self._threads) + support.reap_children() + + +class BaseLockTests(BaseTestCase): + """ + Tests for both recursive and non-recursive locks. + """ + + def test_constructor(self): + lock = self.locktype() + del lock + + def test_acquire_destroy(self): + lock = self.locktype() + lock.acquire() + del lock + + def test_acquire_release(self): + lock = self.locktype() + lock.acquire() + lock.release() + del lock + + def test_try_acquire(self): + lock = self.locktype() + self.assertTrue(lock.acquire(False)) + lock.release() + + def test_try_acquire_contended(self): + lock = self.locktype() + lock.acquire() + result = [] + def f(): + result.append(lock.acquire(False)) + Bunch(f, 1).wait_for_finished() + self.assertFalse(result[0]) + lock.release() + + def test_acquire_contended(self): + lock = self.locktype() + lock.acquire() + N = 5 + def f(): + lock.acquire() + lock.release() + + b = Bunch(f, N) + b.wait_for_started() + _wait() + self.assertEqual(len(b.finished), 0) + lock.release() + b.wait_for_finished() + self.assertEqual(len(b.finished), N) + + def test_with(self): + lock = self.locktype() + def f(): + lock.acquire() + lock.release() + def _with(err=None): + with lock: + if err is not None: + raise err + _with() + # Check the lock is unacquired + Bunch(f, 1).wait_for_finished() + self.assertRaises(TypeError, _with, TypeError) + # Check the lock is unacquired + Bunch(f, 1).wait_for_finished() + + def test_thread_leak(self): + # The lock shouldn't leak a Thread instance when used from a foreign + # (non-threading) thread. + lock = self.locktype() + def f(): + lock.acquire() + lock.release() + n = len(threading.enumerate()) + # We run many threads in the hope that existing threads ids won't + # be recycled. + Bunch(f, 15).wait_for_finished() + self.assertEqual(n, len(threading.enumerate())) + + +class LockTests(BaseLockTests): + """ + Tests for non-recursive, weak locks + (which can be acquired and released from different threads). + """ + def test_reacquire(self): + # Lock needs to be released before re-acquiring. + lock = self.locktype() + phase = [] + def f(): + lock.acquire() + phase.append(None) + lock.acquire() + phase.append(None) + start_new_thread(f, ()) + while len(phase) == 0: + _wait() + _wait() + self.assertEqual(len(phase), 1) + lock.release() + while len(phase) == 1: + _wait() + self.assertEqual(len(phase), 2) + + def test_different_thread(self): + # Lock can be released from a different thread. + lock = self.locktype() + lock.acquire() + def f(): + lock.release() + b = Bunch(f, 1) + b.wait_for_finished() + lock.acquire() + lock.release() + + +class RLockTests(BaseLockTests): + """ + Tests for recursive locks. + """ + def test_reacquire(self): + lock = self.locktype() + lock.acquire() + lock.acquire() + lock.release() + lock.acquire() + lock.release() + lock.release() + + def test_release_unacquired(self): + # Cannot release an unacquired lock + lock = self.locktype() + self.assertRaises(RuntimeError, lock.release) + lock.acquire() + lock.acquire() + lock.release() + lock.acquire() + lock.release() + lock.release() + self.assertRaises(RuntimeError, lock.release) + + def test_different_thread(self): + # Cannot release from a different thread + lock = self.locktype() + def f(): + lock.acquire() + b = Bunch(f, 1, True) + try: + self.assertRaises(RuntimeError, lock.release) + finally: + b.do_finish() + + def test__is_owned(self): + lock = self.locktype() + self.assertFalse(lock._is_owned()) + lock.acquire() + self.assertTrue(lock._is_owned()) + lock.acquire() + self.assertTrue(lock._is_owned()) + result = [] + def f(): + result.append(lock._is_owned()) + Bunch(f, 1).wait_for_finished() + self.assertFalse(result[0]) + lock.release() + self.assertTrue(lock._is_owned()) + lock.release() + self.assertFalse(lock._is_owned()) + + +class EventTests(BaseTestCase): + """ + Tests for Event objects. + """ + + def test_is_set(self): + evt = self.eventtype() + self.assertFalse(evt.is_set()) + evt.set() + self.assertTrue(evt.is_set()) + evt.set() + self.assertTrue(evt.is_set()) + evt.clear() + self.assertFalse(evt.is_set()) + evt.clear() + self.assertFalse(evt.is_set()) + + def _check_notify(self, evt): + # All threads get notified + N = 5 + results1 = [] + results2 = [] + def f(): + results1.append(evt.wait()) + results2.append(evt.wait()) + b = Bunch(f, N) + b.wait_for_started() + _wait() + self.assertEqual(len(results1), 0) + evt.set() + b.wait_for_finished() + self.assertEqual(results1, [True] * N) + self.assertEqual(results2, [True] * N) + + def test_notify(self): + evt = self.eventtype() + self._check_notify(evt) + # Another time, after an explicit clear() + evt.set() + evt.clear() + self._check_notify(evt) + + def test_timeout(self): + evt = self.eventtype() + results1 = [] + results2 = [] + N = 5 + def f(): + results1.append(evt.wait(0.0)) + t1 = time.time() + r = evt.wait(0.2) + t2 = time.time() + results2.append((r, t2 - t1)) + Bunch(f, N).wait_for_finished() + self.assertEqual(results1, [False] * N) + for r, dt in results2: + self.assertFalse(r) + self.assertTrue(dt >= 0.2, dt) + # The event is set + results1 = [] + results2 = [] + evt.set() + Bunch(f, N).wait_for_finished() + self.assertEqual(results1, [True] * N) + for r, dt in results2: + self.assertTrue(r) + + +class ConditionTests(BaseTestCase): + """ + Tests for condition variables. + """ + + def test_acquire(self): + cond = self.condtype() + # Be default we have an RLock: the condition can be acquired multiple + # times. + cond.acquire() + cond.acquire() + cond.release() + cond.release() + lock = threading.Lock() + cond = self.condtype(lock) + cond.acquire() + self.assertFalse(lock.acquire(False)) + cond.release() + self.assertTrue(lock.acquire(False)) + self.assertFalse(cond.acquire(False)) + lock.release() + with cond: + self.assertFalse(lock.acquire(False)) + + def test_unacquired_wait(self): + cond = self.condtype() + self.assertRaises(RuntimeError, cond.wait) + + def test_unacquired_notify(self): + cond = self.condtype() + self.assertRaises(RuntimeError, cond.notify) + + def _check_notify(self, cond): + N = 5 + results1 = [] + results2 = [] + phase_num = 0 + def f(): + cond.acquire() + cond.wait() + cond.release() + results1.append(phase_num) + cond.acquire() + cond.wait() + cond.release() + results2.append(phase_num) + b = Bunch(f, N) + b.wait_for_started() + _wait() + self.assertEqual(results1, []) + # Notify 3 threads at first + cond.acquire() + cond.notify(3) + _wait() + phase_num = 1 + cond.release() + while len(results1) < 3: + _wait() + self.assertEqual(results1, [1] * 3) + self.assertEqual(results2, []) + # Notify 5 threads: they might be in their first or second wait + cond.acquire() + cond.notify(5) + _wait() + phase_num = 2 + cond.release() + while len(results1) + len(results2) < 8: + _wait() + self.assertEqual(results1, [1] * 3 + [2] * 2) + self.assertEqual(results2, [2] * 3) + # Notify all threads: they are all in their second wait + cond.acquire() + cond.notify_all() + _wait() + phase_num = 3 + cond.release() + while len(results2) < 5: + _wait() + self.assertEqual(results1, [1] * 3 + [2] * 2) + self.assertEqual(results2, [2] * 3 + [3] * 2) + b.wait_for_finished() + + def test_notify(self): + cond = self.condtype() + self._check_notify(cond) + # A second time, to check internal state is still ok. + self._check_notify(cond) + + def test_timeout(self): + cond = self.condtype() + results = [] + N = 5 + def f(): + cond.acquire() + t1 = time.time() + cond.wait(0.2) + t2 = time.time() + cond.release() + results.append(t2 - t1) + Bunch(f, N).wait_for_finished() + self.assertEqual(len(results), 5) + for dt in results: + self.assertTrue(dt >= 0.2, dt) + + +class BaseSemaphoreTests(BaseTestCase): + """ + Common tests for {bounded, unbounded} semaphore objects. + """ + + def test_constructor(self): + self.assertRaises(ValueError, self.semtype, value = -1) + self.assertRaises(ValueError, self.semtype, value = -sys.maxsize) + + def test_acquire(self): + sem = self.semtype(1) + sem.acquire() + sem.release() + sem = self.semtype(2) + sem.acquire() + sem.acquire() + sem.release() + sem.release() + + def test_acquire_destroy(self): + sem = self.semtype() + sem.acquire() + del sem + + def test_acquire_contended(self): + sem = self.semtype(7) + sem.acquire() + N = 10 + results1 = [] + results2 = [] + phase_num = 0 + def f(): + sem.acquire() + results1.append(phase_num) + sem.acquire() + results2.append(phase_num) + b = Bunch(f, 10) + b.wait_for_started() + while len(results1) + len(results2) < 6: + _wait() + self.assertEqual(results1 + results2, [0] * 6) + phase_num = 1 + for i in range(7): + sem.release() + while len(results1) + len(results2) < 13: + _wait() + self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) + phase_num = 2 + for i in range(6): + sem.release() + while len(results1) + len(results2) < 19: + _wait() + self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) + # The semaphore is still locked + self.assertFalse(sem.acquire(False)) + # Final release, to let the last thread finish + sem.release() + b.wait_for_finished() + + def test_try_acquire(self): + sem = self.semtype(2) + self.assertTrue(sem.acquire(False)) + self.assertTrue(sem.acquire(False)) + self.assertFalse(sem.acquire(False)) + sem.release() + self.assertTrue(sem.acquire(False)) + + def test_try_acquire_contended(self): + sem = self.semtype(4) + sem.acquire() + results = [] + def f(): + results.append(sem.acquire(False)) + results.append(sem.acquire(False)) + Bunch(f, 5).wait_for_finished() + # There can be a thread switch between acquiring the semaphore and + # appending the result, therefore results will not necessarily be + # ordered. + self.assertEqual(sorted(results), [False] * 7 + [True] * 3 ) + + def test_default_value(self): + # The default initial value is 1. + sem = self.semtype() + sem.acquire() + def f(): + sem.acquire() + sem.release() + b = Bunch(f, 1) + b.wait_for_started() + _wait() + self.assertFalse(b.finished) + sem.release() + b.wait_for_finished() + + def test_with(self): + sem = self.semtype(2) + def _with(err=None): + with sem: + self.assertTrue(sem.acquire(False)) + sem.release() + with sem: + self.assertFalse(sem.acquire(False)) + if err: + raise err + _with() + self.assertTrue(sem.acquire(False)) + sem.release() + self.assertRaises(TypeError, _with, TypeError) + self.assertTrue(sem.acquire(False)) + sem.release() + +class SemaphoreTests(BaseSemaphoreTests): + """ + Tests for unbounded semaphores. + """ + + def test_release_unacquired(self): + # Unbounded releases are allowed and increment the semaphore's value + sem = self.semtype(1) + sem.release() + sem.acquire() + sem.acquire() + sem.release() + + +class BoundedSemaphoreTests(BaseSemaphoreTests): + """ + Tests for bounded semaphores. + """ + + def test_release_unacquired(self): + # Cannot go past the initial value + sem = self.semtype() + self.assertRaises(ValueError, sem.release) + sem.acquire() + sem.release() + self.assertRaises(ValueError, sem.release) |