From 557934f5a9a23908603365e483af3434f3c98284 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Fri, 6 Nov 2009 22:41:14 +0000 Subject: Merged revisions 76137 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r76137 | antoine.pitrou | 2009-11-06 23:34:35 +0100 (ven., 06 nov. 2009) | 4 lines Issue #7270: Add some dedicated unit tests for multi-thread synchronization primitives such as Lock, RLock, Condition, Event and Semaphore. ........ --- Lib/test/lock_tests.py | 533 +++++++++++++++++++++++++++++++++++++++++++++ Lib/test/test_thread.py | 7 +- Lib/test/test_threading.py | 52 +++-- Misc/NEWS | 3 + 4 files changed, 574 insertions(+), 21 deletions(-) create mode 100644 Lib/test/lock_tests.py diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py new file mode 100644 index 0000000..f9c2259 --- /dev/null +++ b/Lib/test/lock_tests.py @@ -0,0 +1,533 @@ +""" +Various tests for synchronization primitives. +""" + +import sys +import time +from _thread import start_new_thread, get_ident +import threading +import unittest + +from test import support + + +def _wait(): + # A crude wait/yield function not relying on synchronization primitives. + time.sleep(0.01) + +class Bunch(object): + """ + A bunch of threads. + """ + def __init__(self, f, n, wait_before_exit=False): + """ + Construct a bunch of `n` threads running the same function `f`. + If `wait_before_exit` is True, the threads won't terminate until + do_finish() is called. + """ + self.f = f + self.n = n + self.started = [] + self.finished = [] + self._can_exit = not wait_before_exit + def task(): + tid = get_ident() + self.started.append(tid) + try: + f() + finally: + self.finished.append(tid) + while not self._can_exit: + _wait() + for i in range(n): + start_new_thread(task, ()) + + def wait_for_started(self): + while len(self.started) < self.n: + _wait() + + def wait_for_finished(self): + while len(self.finished) < self.n: + _wait() + + def do_finish(self): + self._can_exit = True + + +class BaseTestCase(unittest.TestCase): + def setUp(self): + self._threads = support.threading_setup() + + def tearDown(self): + support.threading_cleanup(*self._threads) + support.reap_children() + + +class BaseLockTests(BaseTestCase): + """ + Tests for both recursive and non-recursive locks. + """ + + def test_constructor(self): + lock = self.locktype() + del lock + + def test_acquire_destroy(self): + lock = self.locktype() + lock.acquire() + del lock + + def test_acquire_release(self): + lock = self.locktype() + lock.acquire() + lock.release() + del lock + + def test_try_acquire(self): + lock = self.locktype() + self.assertTrue(lock.acquire(False)) + lock.release() + + def test_try_acquire_contended(self): + lock = self.locktype() + lock.acquire() + result = [] + def f(): + result.append(lock.acquire(False)) + Bunch(f, 1).wait_for_finished() + self.assertFalse(result[0]) + lock.release() + + def test_acquire_contended(self): + lock = self.locktype() + lock.acquire() + N = 5 + def f(): + lock.acquire() + lock.release() + + b = Bunch(f, N) + b.wait_for_started() + _wait() + self.assertEqual(len(b.finished), 0) + lock.release() + b.wait_for_finished() + self.assertEqual(len(b.finished), N) + + def test_with(self): + lock = self.locktype() + def f(): + lock.acquire() + lock.release() + def _with(err=None): + with lock: + if err is not None: + raise err + _with() + # Check the lock is unacquired + Bunch(f, 1).wait_for_finished() + self.assertRaises(TypeError, _with, TypeError) + # Check the lock is unacquired + Bunch(f, 1).wait_for_finished() + + +class LockTests(BaseLockTests): + """ + Tests for non-recursive, weak locks + (which can be acquired and released from different threads). + """ + def test_reacquire(self): + # Lock needs to be released before re-acquiring. + lock = self.locktype() + phase = [] + def f(): + lock.acquire() + phase.append(None) + lock.acquire() + phase.append(None) + start_new_thread(f, ()) + while len(phase) == 0: + _wait() + _wait() + self.assertEqual(len(phase), 1) + lock.release() + while len(phase) == 1: + _wait() + self.assertEqual(len(phase), 2) + + def test_different_thread(self): + # Lock can be released from a different thread. + lock = self.locktype() + lock.acquire() + def f(): + lock.release() + b = Bunch(f, 1) + b.wait_for_finished() + lock.acquire() + lock.release() + + +class RLockTests(BaseLockTests): + """ + Tests for recursive locks. + """ + def test_reacquire(self): + lock = self.locktype() + lock.acquire() + lock.acquire() + lock.release() + lock.acquire() + lock.release() + lock.release() + + def test_release_unacquired(self): + # Cannot release an unacquired lock + lock = self.locktype() + self.assertRaises(RuntimeError, lock.release) + lock.acquire() + lock.acquire() + lock.release() + lock.acquire() + lock.release() + lock.release() + self.assertRaises(RuntimeError, lock.release) + + def test_different_thread(self): + # Cannot release from a different thread + lock = self.locktype() + def f(): + lock.acquire() + b = Bunch(f, 1, True) + try: + self.assertRaises(RuntimeError, lock.release) + finally: + b.do_finish() + + def test__is_owned(self): + lock = self.locktype() + self.assertFalse(lock._is_owned()) + lock.acquire() + self.assertTrue(lock._is_owned()) + lock.acquire() + self.assertTrue(lock._is_owned()) + result = [] + def f(): + result.append(lock._is_owned()) + Bunch(f, 1).wait_for_finished() + self.assertFalse(result[0]) + lock.release() + self.assertTrue(lock._is_owned()) + lock.release() + self.assertFalse(lock._is_owned()) + + +class EventTests(BaseTestCase): + """ + Tests for Event objects. + """ + + def test_is_set(self): + evt = self.eventtype() + self.assertFalse(evt.is_set()) + evt.set() + self.assertTrue(evt.is_set()) + evt.set() + self.assertTrue(evt.is_set()) + evt.clear() + self.assertFalse(evt.is_set()) + evt.clear() + self.assertFalse(evt.is_set()) + + def _check_notify(self, evt): + # All threads get notified + N = 5 + results1 = [] + results2 = [] + def f(): + results1.append(evt.wait()) + results2.append(evt.wait()) + b = Bunch(f, N) + b.wait_for_started() + _wait() + self.assertEqual(len(results1), 0) + evt.set() + b.wait_for_finished() + self.assertEqual(results1, [True] * N) + self.assertEqual(results2, [True] * N) + + def test_notify(self): + evt = self.eventtype() + self._check_notify(evt) + # Another time, after an explicit clear() + evt.set() + evt.clear() + self._check_notify(evt) + + def test_timeout(self): + evt = self.eventtype() + results1 = [] + results2 = [] + N = 5 + def f(): + results1.append(evt.wait(0.0)) + t1 = time.time() + r = evt.wait(0.2) + t2 = time.time() + results2.append((r, t2 - t1)) + Bunch(f, N).wait_for_finished() + self.assertEqual(results1, [False] * N) + for r, dt in results2: + self.assertFalse(r) + self.assertTrue(dt >= 0.2, dt) + # The event is set + results1 = [] + results2 = [] + evt.set() + Bunch(f, N).wait_for_finished() + self.assertEqual(results1, [True] * N) + for r, dt in results2: + self.assertTrue(r) + + +class ConditionTests(BaseTestCase): + """ + Tests for condition variables. + """ + + def test_acquire(self): + cond = self.condtype() + # Be default we have an RLock: the condition can be acquired multiple + # times. + cond.acquire() + cond.acquire() + cond.release() + cond.release() + lock = threading.Lock() + cond = self.condtype(lock) + cond.acquire() + self.assertFalse(lock.acquire(False)) + cond.release() + self.assertTrue(lock.acquire(False)) + self.assertFalse(cond.acquire(False)) + lock.release() + with cond: + self.assertFalse(lock.acquire(False)) + + def test_unacquired_wait(self): + cond = self.condtype() + self.assertRaises(RuntimeError, cond.wait) + + def test_unacquired_notify(self): + cond = self.condtype() + self.assertRaises(RuntimeError, cond.notify) + + def _check_notify(self, cond): + N = 5 + results1 = [] + results2 = [] + phase_num = 0 + def f(): + cond.acquire() + cond.wait() + cond.release() + results1.append(phase_num) + cond.acquire() + cond.wait() + cond.release() + results2.append(phase_num) + b = Bunch(f, N) + b.wait_for_started() + _wait() + self.assertEqual(results1, []) + # Notify 3 threads at first + cond.acquire() + cond.notify(3) + _wait() + phase_num = 1 + cond.release() + while len(results1) < 3: + _wait() + self.assertEqual(results1, [1] * 3) + self.assertEqual(results2, []) + # Notify 5 threads: they might be in their first or second wait + cond.acquire() + cond.notify(5) + _wait() + phase_num = 2 + cond.release() + while len(results1) + len(results2) < 8: + _wait() + self.assertEqual(results1, [1] * 3 + [2] * 2) + self.assertEqual(results2, [2] * 3) + # Notify all threads: they are all in their second wait + cond.acquire() + cond.notify_all() + _wait() + phase_num = 3 + cond.release() + while len(results2) < 5: + _wait() + self.assertEqual(results1, [1] * 3 + [2] * 2) + self.assertEqual(results2, [2] * 3 + [3] * 2) + b.wait_for_finished() + + def test_notify(self): + cond = self.condtype() + self._check_notify(cond) + # A second time, to check internal state is still ok. + self._check_notify(cond) + + def test_timeout(self): + cond = self.condtype() + results = [] + N = 5 + def f(): + cond.acquire() + t1 = time.time() + cond.wait(0.2) + t2 = time.time() + cond.release() + results.append(t2 - t1) + Bunch(f, N).wait_for_finished() + self.assertEqual(len(results), 5) + for dt in results: + self.assertTrue(dt >= 0.2, dt) + + +class BaseSemaphoreTests(BaseTestCase): + """ + Common tests for {bounded, unbounded} semaphore objects. + """ + + def test_constructor(self): + self.assertRaises(ValueError, self.semtype, value = -1) + self.assertRaises(ValueError, self.semtype, value = -sys.maxsize) + + def test_acquire(self): + sem = self.semtype(1) + sem.acquire() + sem.release() + sem = self.semtype(2) + sem.acquire() + sem.acquire() + sem.release() + sem.release() + + def test_acquire_destroy(self): + sem = self.semtype() + sem.acquire() + del sem + + def test_acquire_contended(self): + sem = self.semtype(7) + sem.acquire() + N = 10 + results1 = [] + results2 = [] + phase_num = 0 + def f(): + sem.acquire() + results1.append(phase_num) + sem.acquire() + results2.append(phase_num) + b = Bunch(f, 10) + b.wait_for_started() + while len(results1) + len(results2) < 6: + _wait() + self.assertEqual(results1 + results2, [0] * 6) + phase_num = 1 + for i in range(7): + sem.release() + while len(results1) + len(results2) < 13: + _wait() + self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) + phase_num = 2 + for i in range(6): + sem.release() + while len(results1) + len(results2) < 19: + _wait() + self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) + # The semaphore is still locked + self.assertFalse(sem.acquire(False)) + # Final release, to let the last thread finish + sem.release() + b.wait_for_finished() + + def test_try_acquire(self): + sem = self.semtype(2) + self.assertTrue(sem.acquire(False)) + self.assertTrue(sem.acquire(False)) + self.assertFalse(sem.acquire(False)) + sem.release() + self.assertTrue(sem.acquire(False)) + + def test_try_acquire_contended(self): + sem = self.semtype(4) + sem.acquire() + results = [] + def f(): + results.append(sem.acquire(False)) + results.append(sem.acquire(False)) + Bunch(f, 5).wait_for_finished() + # There can be a thread switch between acquiring the semaphore and + # appending the result, therefore results will not necessarily be + # ordered. + self.assertEqual(sorted(results), [False] * 7 + [True] * 3 ) + + def test_default_value(self): + # The default initial value is 1. + sem = self.semtype() + sem.acquire() + def f(): + sem.acquire() + sem.release() + b = Bunch(f, 1) + b.wait_for_started() + _wait() + self.assertFalse(b.finished) + sem.release() + b.wait_for_finished() + + def test_with(self): + sem = self.semtype(2) + def _with(err=None): + with sem: + self.assertTrue(sem.acquire(False)) + sem.release() + with sem: + self.assertFalse(sem.acquire(False)) + if err: + raise err + _with() + self.assertTrue(sem.acquire(False)) + sem.release() + self.assertRaises(TypeError, _with, TypeError) + self.assertTrue(sem.acquire(False)) + sem.release() + +class SemaphoreTests(BaseSemaphoreTests): + """ + Tests for unbounded semaphores. + """ + + def test_release_unacquired(self): + # Unbounded releases are allowed and increment the semaphore's value + sem = self.semtype(1) + sem.release() + sem.acquire() + sem.acquire() + sem.release() + + +class BoundedSemaphoreTests(BaseSemaphoreTests): + """ + Tests for bounded semaphores. + """ + + def test_release_unacquired(self): + # Cannot go past the initial value + sem = self.semtype() + self.assertRaises(ValueError, sem.release) + sem.acquire() + sem.release() + self.assertRaises(ValueError, sem.release) diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index c25668d..70d89fe 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -6,6 +6,7 @@ import _thread as thread import time import weakref +from test import lock_tests NUMTASKS = 10 NUMTRIPS = 3 @@ -188,8 +189,12 @@ class BarrierTest(BasicThreadTest): if finished: self.done_mutex.release() +class LockTests(lock_tests.LockTests): + locktype = thread.allocate_lock + + def test_main(): - support.run_unittest(ThreadRunningTests, BarrierTest) + support.run_unittest(ThreadRunningTests, BarrierTest, LockTests) if __name__ == "__main__": test_main() diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index b5b05c3..8f7c676 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -12,6 +12,8 @@ import unittest import weakref import os +from test import lock_tests + # A trivial mutable counter. class Counter(object): def __init__(self): @@ -487,22 +489,6 @@ class ThreadingExceptionTests(BaseTestCase): thread.start() self.assertRaises(RuntimeError, thread.start) - def test_releasing_unacquired_rlock(self): - rlock = threading.RLock() - self.assertRaises(RuntimeError, rlock.release) - - def test_waiting_on_unacquired_condition(self): - cond = threading.Condition() - self.assertRaises(RuntimeError, cond.wait) - - def test_notify_on_unacquired_condition(self): - cond = threading.Condition() - self.assertRaises(RuntimeError, cond.notify) - - def test_semaphore_with_negative_value(self): - self.assertRaises(ValueError, threading.Semaphore, value = -1) - self.assertRaises(ValueError, threading.Semaphore, value = -sys.maxsize) - def test_joining_current_thread(self): current_thread = threading.current_thread() self.assertRaises(RuntimeError, current_thread.join); @@ -517,11 +503,37 @@ class ThreadingExceptionTests(BaseTestCase): self.assertRaises(RuntimeError, setattr, thread, "daemon", True) +class LockTests(lock_tests.LockTests): + locktype = staticmethod(threading.Lock) + +class RLockTests(lock_tests.RLockTests): + locktype = staticmethod(threading.RLock) + +class EventTests(lock_tests.EventTests): + eventtype = staticmethod(threading.Event) + +class ConditionAsRLockTests(lock_tests.RLockTests): + # An Condition uses an RLock by default and exports its API. + locktype = staticmethod(threading.Condition) + +class ConditionTests(lock_tests.ConditionTests): + condtype = staticmethod(threading.Condition) + +class SemaphoreTests(lock_tests.SemaphoreTests): + semtype = staticmethod(threading.Semaphore) + +class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): + semtype = staticmethod(threading.BoundedSemaphore) + + def test_main(): - test.support.run_unittest(ThreadTests, - ThreadJoinOnShutdown, - ThreadingExceptionTests, - ) + test.support.run_unittest(LockTests, RLockTests, EventTests, + ConditionAsRLockTests, ConditionTests, + SemaphoreTests, BoundedSemaphoreTests, + ThreadTests, + ThreadJoinOnShutdown, + ThreadingExceptionTests, + ) if __name__ == "__main__": test_main() diff --git a/Misc/NEWS b/Misc/NEWS index 1e93aaa..0b98ccb 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -357,6 +357,9 @@ Documentation Tests ----- +- Issue #7270: Add some dedicated unit tests for multi-thread synchronization + primitives such as Lock, RLock, Condition, Event and Semaphore. + - Issue #7248 (part 2): Use a unique temporary directory for importlib source tests instead of tempfile.tempdir. This prevents the tests from sharing state between concurrent executions on the same system. -- cgit v0.12