summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/lock_tests.py546
-rw-r--r--Lib/test/test_thread.py7
-rw-r--r--Lib/test/test_threading.py58
-rw-r--r--Lib/threading.py18
4 files changed, 596 insertions, 33 deletions
diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
new file mode 100644
index 0000000..04f7422
--- /dev/null
+++ b/Lib/test/lock_tests.py
@@ -0,0 +1,546 @@
+"""
+Various tests for synchronization primitives.
+"""
+
+import sys
+import time
+from _thread import start_new_thread, get_ident
+import threading
+import unittest
+
+from test import support
+
+
+def _wait():
+ # A crude wait/yield function not relying on synchronization primitives.
+ time.sleep(0.01)
+
+class Bunch(object):
+ """
+ A bunch of threads.
+ """
+ def __init__(self, f, n, wait_before_exit=False):
+ """
+ Construct a bunch of `n` threads running the same function `f`.
+ If `wait_before_exit` is True, the threads won't terminate until
+ do_finish() is called.
+ """
+ self.f = f
+ self.n = n
+ self.started = []
+ self.finished = []
+ self._can_exit = not wait_before_exit
+ def task():
+ tid = get_ident()
+ self.started.append(tid)
+ try:
+ f()
+ finally:
+ self.finished.append(tid)
+ while not self._can_exit:
+ _wait()
+ for i in range(n):
+ start_new_thread(task, ())
+
+ def wait_for_started(self):
+ while len(self.started) < self.n:
+ _wait()
+
+ def wait_for_finished(self):
+ while len(self.finished) < self.n:
+ _wait()
+
+ def do_finish(self):
+ self._can_exit = True
+
+
+class BaseTestCase(unittest.TestCase):
+ def setUp(self):
+ self._threads = support.threading_setup()
+
+ def tearDown(self):
+ support.threading_cleanup(*self._threads)
+ support.reap_children()
+
+
+class BaseLockTests(BaseTestCase):
+ """
+ Tests for both recursive and non-recursive locks.
+ """
+
+ def test_constructor(self):
+ lock = self.locktype()
+ del lock
+
+ def test_acquire_destroy(self):
+ lock = self.locktype()
+ lock.acquire()
+ del lock
+
+ def test_acquire_release(self):
+ lock = self.locktype()
+ lock.acquire()
+ lock.release()
+ del lock
+
+ def test_try_acquire(self):
+ lock = self.locktype()
+ self.assertTrue(lock.acquire(False))
+ lock.release()
+
+ def test_try_acquire_contended(self):
+ lock = self.locktype()
+ lock.acquire()
+ result = []
+ def f():
+ result.append(lock.acquire(False))
+ Bunch(f, 1).wait_for_finished()
+ self.assertFalse(result[0])
+ lock.release()
+
+ def test_acquire_contended(self):
+ lock = self.locktype()
+ lock.acquire()
+ N = 5
+ def f():
+ lock.acquire()
+ lock.release()
+
+ b = Bunch(f, N)
+ b.wait_for_started()
+ _wait()
+ self.assertEqual(len(b.finished), 0)
+ lock.release()
+ b.wait_for_finished()
+ self.assertEqual(len(b.finished), N)
+
+ def test_with(self):
+ lock = self.locktype()
+ def f():
+ lock.acquire()
+ lock.release()
+ def _with(err=None):
+ with lock:
+ if err is not None:
+ raise err
+ _with()
+ # Check the lock is unacquired
+ Bunch(f, 1).wait_for_finished()
+ self.assertRaises(TypeError, _with, TypeError)
+ # Check the lock is unacquired
+ Bunch(f, 1).wait_for_finished()
+
+ def test_thread_leak(self):
+ # The lock shouldn't leak a Thread instance when used from a foreign
+ # (non-threading) thread.
+ lock = self.locktype()
+ def f():
+ lock.acquire()
+ lock.release()
+ n = len(threading.enumerate())
+ # We run many threads in the hope that existing threads ids won't
+ # be recycled.
+ Bunch(f, 15).wait_for_finished()
+ self.assertEqual(n, len(threading.enumerate()))
+
+
+class LockTests(BaseLockTests):
+ """
+ Tests for non-recursive, weak locks
+ (which can be acquired and released from different threads).
+ """
+ def test_reacquire(self):
+ # Lock needs to be released before re-acquiring.
+ lock = self.locktype()
+ phase = []
+ def f():
+ lock.acquire()
+ phase.append(None)
+ lock.acquire()
+ phase.append(None)
+ start_new_thread(f, ())
+ while len(phase) == 0:
+ _wait()
+ _wait()
+ self.assertEqual(len(phase), 1)
+ lock.release()
+ while len(phase) == 1:
+ _wait()
+ self.assertEqual(len(phase), 2)
+
+ def test_different_thread(self):
+ # Lock can be released from a different thread.
+ lock = self.locktype()
+ lock.acquire()
+ def f():
+ lock.release()
+ b = Bunch(f, 1)
+ b.wait_for_finished()
+ lock.acquire()
+ lock.release()
+
+
+class RLockTests(BaseLockTests):
+ """
+ Tests for recursive locks.
+ """
+ def test_reacquire(self):
+ lock = self.locktype()
+ lock.acquire()
+ lock.acquire()
+ lock.release()
+ lock.acquire()
+ lock.release()
+ lock.release()
+
+ def test_release_unacquired(self):
+ # Cannot release an unacquired lock
+ lock = self.locktype()
+ self.assertRaises(RuntimeError, lock.release)
+ lock.acquire()
+ lock.acquire()
+ lock.release()
+ lock.acquire()
+ lock.release()
+ lock.release()
+ self.assertRaises(RuntimeError, lock.release)
+
+ def test_different_thread(self):
+ # Cannot release from a different thread
+ lock = self.locktype()
+ def f():
+ lock.acquire()
+ b = Bunch(f, 1, True)
+ try:
+ self.assertRaises(RuntimeError, lock.release)
+ finally:
+ b.do_finish()
+
+ def test__is_owned(self):
+ lock = self.locktype()
+ self.assertFalse(lock._is_owned())
+ lock.acquire()
+ self.assertTrue(lock._is_owned())
+ lock.acquire()
+ self.assertTrue(lock._is_owned())
+ result = []
+ def f():
+ result.append(lock._is_owned())
+ Bunch(f, 1).wait_for_finished()
+ self.assertFalse(result[0])
+ lock.release()
+ self.assertTrue(lock._is_owned())
+ lock.release()
+ self.assertFalse(lock._is_owned())
+
+
+class EventTests(BaseTestCase):
+ """
+ Tests for Event objects.
+ """
+
+ def test_is_set(self):
+ evt = self.eventtype()
+ self.assertFalse(evt.is_set())
+ evt.set()
+ self.assertTrue(evt.is_set())
+ evt.set()
+ self.assertTrue(evt.is_set())
+ evt.clear()
+ self.assertFalse(evt.is_set())
+ evt.clear()
+ self.assertFalse(evt.is_set())
+
+ def _check_notify(self, evt):
+ # All threads get notified
+ N = 5
+ results1 = []
+ results2 = []
+ def f():
+ results1.append(evt.wait())
+ results2.append(evt.wait())
+ b = Bunch(f, N)
+ b.wait_for_started()
+ _wait()
+ self.assertEqual(len(results1), 0)
+ evt.set()
+ b.wait_for_finished()
+ self.assertEqual(results1, [True] * N)
+ self.assertEqual(results2, [True] * N)
+
+ def test_notify(self):
+ evt = self.eventtype()
+ self._check_notify(evt)
+ # Another time, after an explicit clear()
+ evt.set()
+ evt.clear()
+ self._check_notify(evt)
+
+ def test_timeout(self):
+ evt = self.eventtype()
+ results1 = []
+ results2 = []
+ N = 5
+ def f():
+ results1.append(evt.wait(0.0))
+ t1 = time.time()
+ r = evt.wait(0.2)
+ t2 = time.time()
+ results2.append((r, t2 - t1))
+ Bunch(f, N).wait_for_finished()
+ self.assertEqual(results1, [False] * N)
+ for r, dt in results2:
+ self.assertFalse(r)
+ self.assertTrue(dt >= 0.2, dt)
+ # The event is set
+ results1 = []
+ results2 = []
+ evt.set()
+ Bunch(f, N).wait_for_finished()
+ self.assertEqual(results1, [True] * N)
+ for r, dt in results2:
+ self.assertTrue(r)
+
+
+class ConditionTests(BaseTestCase):
+ """
+ Tests for condition variables.
+ """
+
+ def test_acquire(self):
+ cond = self.condtype()
+ # Be default we have an RLock: the condition can be acquired multiple
+ # times.
+ cond.acquire()
+ cond.acquire()
+ cond.release()
+ cond.release()
+ lock = threading.Lock()
+ cond = self.condtype(lock)
+ cond.acquire()
+ self.assertFalse(lock.acquire(False))
+ cond.release()
+ self.assertTrue(lock.acquire(False))
+ self.assertFalse(cond.acquire(False))
+ lock.release()
+ with cond:
+ self.assertFalse(lock.acquire(False))
+
+ def test_unacquired_wait(self):
+ cond = self.condtype()
+ self.assertRaises(RuntimeError, cond.wait)
+
+ def test_unacquired_notify(self):
+ cond = self.condtype()
+ self.assertRaises(RuntimeError, cond.notify)
+
+ def _check_notify(self, cond):
+ N = 5
+ results1 = []
+ results2 = []
+ phase_num = 0
+ def f():
+ cond.acquire()
+ cond.wait()
+ cond.release()
+ results1.append(phase_num)
+ cond.acquire()
+ cond.wait()
+ cond.release()
+ results2.append(phase_num)
+ b = Bunch(f, N)
+ b.wait_for_started()
+ _wait()
+ self.assertEqual(results1, [])
+ # Notify 3 threads at first
+ cond.acquire()
+ cond.notify(3)
+ _wait()
+ phase_num = 1
+ cond.release()
+ while len(results1) < 3:
+ _wait()
+ self.assertEqual(results1, [1] * 3)
+ self.assertEqual(results2, [])
+ # Notify 5 threads: they might be in their first or second wait
+ cond.acquire()
+ cond.notify(5)
+ _wait()
+ phase_num = 2
+ cond.release()
+ while len(results1) + len(results2) < 8:
+ _wait()
+ self.assertEqual(results1, [1] * 3 + [2] * 2)
+ self.assertEqual(results2, [2] * 3)
+ # Notify all threads: they are all in their second wait
+ cond.acquire()
+ cond.notify_all()
+ _wait()
+ phase_num = 3
+ cond.release()
+ while len(results2) < 5:
+ _wait()
+ self.assertEqual(results1, [1] * 3 + [2] * 2)
+ self.assertEqual(results2, [2] * 3 + [3] * 2)
+ b.wait_for_finished()
+
+ def test_notify(self):
+ cond = self.condtype()
+ self._check_notify(cond)
+ # A second time, to check internal state is still ok.
+ self._check_notify(cond)
+
+ def test_timeout(self):
+ cond = self.condtype()
+ results = []
+ N = 5
+ def f():
+ cond.acquire()
+ t1 = time.time()
+ cond.wait(0.2)
+ t2 = time.time()
+ cond.release()
+ results.append(t2 - t1)
+ Bunch(f, N).wait_for_finished()
+ self.assertEqual(len(results), 5)
+ for dt in results:
+ self.assertTrue(dt >= 0.2, dt)
+
+
+class BaseSemaphoreTests(BaseTestCase):
+ """
+ Common tests for {bounded, unbounded} semaphore objects.
+ """
+
+ def test_constructor(self):
+ self.assertRaises(ValueError, self.semtype, value = -1)
+ self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
+
+ def test_acquire(self):
+ sem = self.semtype(1)
+ sem.acquire()
+ sem.release()
+ sem = self.semtype(2)
+ sem.acquire()
+ sem.acquire()
+ sem.release()
+ sem.release()
+
+ def test_acquire_destroy(self):
+ sem = self.semtype()
+ sem.acquire()
+ del sem
+
+ def test_acquire_contended(self):
+ sem = self.semtype(7)
+ sem.acquire()
+ N = 10
+ results1 = []
+ results2 = []
+ phase_num = 0
+ def f():
+ sem.acquire()
+ results1.append(phase_num)
+ sem.acquire()
+ results2.append(phase_num)
+ b = Bunch(f, 10)
+ b.wait_for_started()
+ while len(results1) + len(results2) < 6:
+ _wait()
+ self.assertEqual(results1 + results2, [0] * 6)
+ phase_num = 1
+ for i in range(7):
+ sem.release()
+ while len(results1) + len(results2) < 13:
+ _wait()
+ self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
+ phase_num = 2
+ for i in range(6):
+ sem.release()
+ while len(results1) + len(results2) < 19:
+ _wait()
+ self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
+ # The semaphore is still locked
+ self.assertFalse(sem.acquire(False))
+ # Final release, to let the last thread finish
+ sem.release()
+ b.wait_for_finished()
+
+ def test_try_acquire(self):
+ sem = self.semtype(2)
+ self.assertTrue(sem.acquire(False))
+ self.assertTrue(sem.acquire(False))
+ self.assertFalse(sem.acquire(False))
+ sem.release()
+ self.assertTrue(sem.acquire(False))
+
+ def test_try_acquire_contended(self):
+ sem = self.semtype(4)
+ sem.acquire()
+ results = []
+ def f():
+ results.append(sem.acquire(False))
+ results.append(sem.acquire(False))
+ Bunch(f, 5).wait_for_finished()
+ # There can be a thread switch between acquiring the semaphore and
+ # appending the result, therefore results will not necessarily be
+ # ordered.
+ self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
+
+ def test_default_value(self):
+ # The default initial value is 1.
+ sem = self.semtype()
+ sem.acquire()
+ def f():
+ sem.acquire()
+ sem.release()
+ b = Bunch(f, 1)
+ b.wait_for_started()
+ _wait()
+ self.assertFalse(b.finished)
+ sem.release()
+ b.wait_for_finished()
+
+ def test_with(self):
+ sem = self.semtype(2)
+ def _with(err=None):
+ with sem:
+ self.assertTrue(sem.acquire(False))
+ sem.release()
+ with sem:
+ self.assertFalse(sem.acquire(False))
+ if err:
+ raise err
+ _with()
+ self.assertTrue(sem.acquire(False))
+ sem.release()
+ self.assertRaises(TypeError, _with, TypeError)
+ self.assertTrue(sem.acquire(False))
+ sem.release()
+
+class SemaphoreTests(BaseSemaphoreTests):
+ """
+ Tests for unbounded semaphores.
+ """
+
+ def test_release_unacquired(self):
+ # Unbounded releases are allowed and increment the semaphore's value
+ sem = self.semtype(1)
+ sem.release()
+ sem.acquire()
+ sem.acquire()
+ sem.release()
+
+
+class BoundedSemaphoreTests(BaseSemaphoreTests):
+ """
+ Tests for bounded semaphores.
+ """
+
+ def test_release_unacquired(self):
+ # Cannot go past the initial value
+ sem = self.semtype()
+ self.assertRaises(ValueError, sem.release)
+ sem.acquire()
+ sem.release()
+ self.assertRaises(ValueError, sem.release)
diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py
index 73d87b8..f6ce1ae 100644
--- a/Lib/test/test_thread.py
+++ b/Lib/test/test_thread.py
@@ -5,6 +5,7 @@ from test import support
import _thread as thread
import time
+from test import lock_tests
NUMTASKS = 10
NUMTRIPS = 3
@@ -161,8 +162,12 @@ class BarrierTest(BasicThreadTest):
if finished:
self.done_mutex.release()
+class LockTests(lock_tests.LockTests):
+ locktype = thread.allocate_lock
+
+
def test_main():
- support.run_unittest(ThreadRunningTests, BarrierTest)
+ support.run_unittest(ThreadRunningTests, BarrierTest, LockTests)
if __name__ == "__main__":
test_main()
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 7b6d82b..86f5773 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -11,6 +11,8 @@ import time
import unittest
import weakref
+from test import lock_tests
+
# A trivial mutable counter.
class Counter(object):
def __init__(self):
@@ -133,11 +135,9 @@ class ThreadTests(unittest.TestCase):
def test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module.
def f(mutex):
- # Acquiring an RLock forces an entry for the foreign
+ # Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map.
- r = threading.RLock()
- r.acquire()
- r.release()
+ threading.current_thread()
mutex.release()
mutex = threading.Lock()
@@ -471,22 +471,6 @@ class ThreadingExceptionTests(unittest.TestCase):
thread.start()
self.assertRaises(RuntimeError, thread.start)
- def test_releasing_unacquired_rlock(self):
- rlock = threading.RLock()
- self.assertRaises(RuntimeError, rlock.release)
-
- def test_waiting_on_unacquired_condition(self):
- cond = threading.Condition()
- self.assertRaises(RuntimeError, cond.wait)
-
- def test_notify_on_unacquired_condition(self):
- cond = threading.Condition()
- self.assertRaises(RuntimeError, cond.notify)
-
- def test_semaphore_with_negative_value(self):
- self.assertRaises(ValueError, threading.Semaphore, value = -1)
- self.assertRaises(ValueError, threading.Semaphore, value = -sys.maxsize)
-
def test_joining_current_thread(self):
current_thread = threading.current_thread()
self.assertRaises(RuntimeError, current_thread.join);
@@ -501,11 +485,37 @@ class ThreadingExceptionTests(unittest.TestCase):
self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
+class LockTests(lock_tests.LockTests):
+ locktype = staticmethod(threading.Lock)
+
+class RLockTests(lock_tests.RLockTests):
+ locktype = staticmethod(threading.RLock)
+
+class EventTests(lock_tests.EventTests):
+ eventtype = staticmethod(threading.Event)
+
+class ConditionAsRLockTests(lock_tests.RLockTests):
+ # An Condition uses an RLock by default and exports its API.
+ locktype = staticmethod(threading.Condition)
+
+class ConditionTests(lock_tests.ConditionTests):
+ condtype = staticmethod(threading.Condition)
+
+class SemaphoreTests(lock_tests.SemaphoreTests):
+ semtype = staticmethod(threading.Semaphore)
+
+class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
+ semtype = staticmethod(threading.BoundedSemaphore)
+
+
def test_main():
- test.support.run_unittest(ThreadTests,
- ThreadJoinOnShutdown,
- ThreadingExceptionTests,
- )
+ test.support.run_unittest(LockTests, RLockTests, EventTests,
+ ConditionAsRLockTests, ConditionTests,
+ SemaphoreTests, BoundedSemaphoreTests,
+ ThreadTests,
+ ThreadJoinOnShutdown,
+ ThreadingExceptionTests,
+ )
if __name__ == "__main__":
test_main()
diff --git a/Lib/threading.py b/Lib/threading.py
index d5412e9..4bb0182 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -92,14 +92,16 @@ class _RLock(_Verbose):
def __repr__(self):
owner = self._owner
- return "<%s(%s, %d)>" % (
- self.__class__.__name__,
- owner and owner.name,
- self._count)
+ try:
+ owner = _active[owner].name
+ except KeyError:
+ pass
+ return "<%s owner=%r count=%d>" % (
+ self.__class__.__name__, owner, self._count)
def acquire(self, blocking=True):
- me = current_thread()
- if self._owner is me:
+ me = _get_ident()
+ if self._owner == me:
self._count = self._count + 1
if __debug__:
self._note("%s.acquire(%s): recursive success", self, blocking)
@@ -118,7 +120,7 @@ class _RLock(_Verbose):
__enter__ = acquire
def release(self):
- if self._owner is not current_thread():
+ if self._owner != _get_ident():
raise RuntimeError("cannot release un-acquired lock")
self._count = count = self._count - 1
if not count:
@@ -152,7 +154,7 @@ class _RLock(_Verbose):
return (count, owner)
def _is_owned(self):
- return self._owner is current_thread()
+ return self._owner == _get_ident()
def Condition(*args, **kwargs):