summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@python.org>2023-10-04 11:26:45 (GMT)
committerGitHub <noreply@github.com>2023-10-04 11:26:45 (GMT)
commit1d032ea3d67e9725b63322f896d9aa727fd75521 (patch)
treebd4f3f8b7b43d337a60d14a79d5464f7aa1ad1d1
parentf53871e1e8bad0d7a8e58dd2cd4588331480d881 (diff)
downloadcpython-1d032ea3d67e9725b63322f896d9aa727fd75521.zip
cpython-1d032ea3d67e9725b63322f896d9aa727fd75521.tar.gz
cpython-1d032ea3d67e9725b63322f896d9aa727fd75521.tar.bz2
[3.12] gh-109974: Fix threading lock_tests race conditions (#110057) (#110346)
* gh-109974: Fix threading lock_tests race conditions (#110057) Fix race conditions in test_threading lock tests. Wait until a condition is met rather than using time.sleep() with a hardcoded number of seconds. * Replace sleeping loops with support.sleeping_retry() which raises an exception on timeout. * Add wait_threads_blocked(nthread) which computes a sleep depending on the number of threads. Remove _wait() function. * test_set_and_clear(): use a way longer Event.wait() timeout. * BarrierTests.test_repr(): wait until the 2 threads are waiting for the barrier. Use a way longer timeout for Barrier.wait() timeout. * test_thread_leak() no longer needs to count len(threading.enumerate()): Bunch uses threading_helper.wait_threads_exit() internally which does it in wait_for_finished(). * Add BaseLockTests.wait_phase() which implements a timeout. test_reacquire() and test_recursion_count() use wait_phase(). (cherry picked from commit 4e356ad183eeb567783f4a87fd092573da1e9252) * gh-109974: Fix more threading lock_tests race conditions (#110089) * Add context manager on Bunch class. * Bunch now catchs exceptions on executed functions and re-raise them at __exit__() as an ExceptionGroup. * Rewrite BarrierProxy.test_default_timeout(). Use a single thread. Only check that barrier.wait() blocks for at least default timeout seconds. * test_with(): inline _with() function. (cherry picked from commit 743e3572ee940a6cf88fd518e5f4a447905ba5eb)
-rw-r--r--Lib/test/lock_tests.py621
-rw-r--r--Lib/test/test_importlib/test_locks.py3
-rw-r--r--Misc/NEWS.d/next/Tests/2023-09-29-00-19-21.gh-issue-109974.Sh_g-r.rst3
3 files changed, 378 insertions, 249 deletions
diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
index 238e607..024c6de 100644
--- a/Lib/test/lock_tests.py
+++ b/Lib/test/lock_tests.py
@@ -19,54 +19,74 @@ requires_fork = unittest.skipUnless(support.has_fork_support,
"(no _at_fork_reinit method)")
-def _wait():
- # A crude wait/yield function not relying on synchronization primitives.
- time.sleep(0.01)
+def wait_threads_blocked(nthread):
+ # Arbitrary sleep to wait until N threads are blocked,
+ # like waiting for a lock.
+ time.sleep(0.010 * nthread)
+
class Bunch(object):
"""
A bunch of threads.
"""
- def __init__(self, f, n, wait_before_exit=False):
+ def __init__(self, func, nthread, wait_before_exit=False):
"""
- Construct a bunch of `n` threads running the same function `f`.
+ Construct a bunch of `nthread` threads running the same function `func`.
If `wait_before_exit` is True, the threads won't terminate until
do_finish() is called.
"""
- self.f = f
- self.n = n
+ self.func = func
+ self.nthread = nthread
self.started = []
self.finished = []
+ self.exceptions = []
self._can_exit = not wait_before_exit
- self.wait_thread = threading_helper.wait_threads_exit()
- self.wait_thread.__enter__()
+ self._wait_thread = None
- def task():
- tid = threading.get_ident()
- self.started.append(tid)
- try:
- f()
- finally:
- self.finished.append(tid)
- while not self._can_exit:
- _wait()
+ def task(self):
+ tid = threading.get_ident()
+ self.started.append(tid)
+ try:
+ self.func()
+ except BaseException as exc:
+ self.exceptions.append(exc)
+ finally:
+ self.finished.append(tid)
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if self._can_exit:
+ break
+
+ def __enter__(self):
+ self._wait_thread = threading_helper.wait_threads_exit(support.SHORT_TIMEOUT)
+ self._wait_thread.__enter__()
try:
- for i in range(n):
- start_new_thread(task, ())
+ for _ in range(self.nthread):
+ start_new_thread(self.task, ())
except:
self._can_exit = True
raise
- def wait_for_started(self):
- while len(self.started) < self.n:
- _wait()
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if len(self.started) >= self.nthread:
+ break
+
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if len(self.finished) >= self.nthread:
+ break
- def wait_for_finished(self):
- while len(self.finished) < self.n:
- _wait()
- # Wait for threads exit
- self.wait_thread.__exit__(None, None, None)
+ # Wait until threads completely exit according to _thread._count()
+ self._wait_thread.__exit__(None, None, None)
+
+ # Break reference cycle
+ exceptions = self.exceptions
+ self.exceptions = None
+ if exceptions:
+ raise ExceptionGroup(f"{self.func} threads raised exceptions",
+ exceptions)
def do_finish(self):
self._can_exit = True
@@ -94,6 +114,12 @@ class BaseLockTests(BaseTestCase):
Tests for both recursive and non-recursive locks.
"""
+ def wait_phase(self, phase, expected):
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if len(phase) >= expected:
+ break
+ self.assertEqual(len(phase), expected)
+
def test_constructor(self):
lock = self.locktype()
del lock
@@ -131,41 +157,57 @@ class BaseLockTests(BaseTestCase):
result = []
def f():
result.append(lock.acquire(False))
- Bunch(f, 1).wait_for_finished()
+ with Bunch(f, 1):
+ pass
self.assertFalse(result[0])
lock.release()
def test_acquire_contended(self):
lock = self.locktype()
lock.acquire()
- N = 5
def f():
lock.acquire()
lock.release()
- b = Bunch(f, N)
- b.wait_for_started()
- _wait()
- self.assertEqual(len(b.finished), 0)
- lock.release()
- b.wait_for_finished()
- self.assertEqual(len(b.finished), N)
+ N = 5
+ with Bunch(f, N) as bunch:
+ # Threads block on lock.acquire()
+ wait_threads_blocked(N)
+ self.assertEqual(len(bunch.finished), 0)
+
+ # Threads unblocked
+ lock.release()
+
+ self.assertEqual(len(bunch.finished), N)
def test_with(self):
lock = self.locktype()
def f():
lock.acquire()
lock.release()
- def _with(err=None):
+
+ def with_lock(err=None):
with lock:
if err is not None:
raise err
- _with()
- # Check the lock is unacquired
- Bunch(f, 1).wait_for_finished()
- self.assertRaises(TypeError, _with, TypeError)
- # Check the lock is unacquired
- Bunch(f, 1).wait_for_finished()
+
+ # Acquire the lock, do nothing, with releases the lock
+ with lock:
+ pass
+
+ # Check that the lock is unacquired
+ with Bunch(f, 1):
+ pass
+
+ # Acquire the lock, raise an exception, with releases the lock
+ with self.assertRaises(TypeError):
+ with lock:
+ raise TypeError
+
+ # Check that the lock is unacquired even if after an exception
+ # was raised in the previous "with lock:" block
+ with Bunch(f, 1):
+ pass
def test_thread_leak(self):
# The lock shouldn't leak a Thread instance when used from a foreign
@@ -174,17 +216,11 @@ class BaseLockTests(BaseTestCase):
def f():
lock.acquire()
lock.release()
- n = len(threading.enumerate())
+
# We run many threads in the hope that existing threads ids won't
# be recycled.
- Bunch(f, 15).wait_for_finished()
- if len(threading.enumerate()) != n:
- # There is a small window during which a Thread instance's
- # target function has finished running, but the Thread is still
- # alive and registered. Avoid spurious failures by waiting a
- # bit more (seen on a buildbot).
- time.sleep(0.4)
- self.assertEqual(n, len(threading.enumerate()))
+ with Bunch(f, 15):
+ pass
def test_timeout(self):
lock = self.locktype()
@@ -208,7 +244,8 @@ class BaseLockTests(BaseTestCase):
results.append(lock.acquire(timeout=0.5))
t2 = time.monotonic()
results.append(t2 - t1)
- Bunch(f, 1).wait_for_finished()
+ with Bunch(f, 1):
+ pass
self.assertFalse(results[0])
self.assertTimeout(results[1], 0.5)
@@ -242,15 +279,13 @@ class LockTests(BaseLockTests):
phase.append(None)
with threading_helper.wait_threads_exit():
+ # Thread blocked on lock.acquire()
start_new_thread(f, ())
- while len(phase) == 0:
- _wait()
- _wait()
- self.assertEqual(len(phase), 1)
+ self.wait_phase(phase, 1)
+
+ # Thread unblocked
lock.release()
- while len(phase) == 1:
- _wait()
- self.assertEqual(len(phase), 2)
+ self.wait_phase(phase, 2)
def test_different_thread(self):
# Lock can be released from a different thread.
@@ -258,8 +293,8 @@ class LockTests(BaseLockTests):
lock.acquire()
def f():
lock.release()
- b = Bunch(f, 1)
- b.wait_for_finished()
+ with Bunch(f, 1):
+ pass
lock.acquire()
lock.release()
@@ -349,21 +384,20 @@ class RLockTests(BaseLockTests):
def f():
lock.acquire()
phase.append(None)
- while len(phase) == 1:
- _wait()
+
+ self.wait_phase(phase, 2)
lock.release()
phase.append(None)
with threading_helper.wait_threads_exit():
+ # Thread blocked on lock.acquire()
start_new_thread(f, ())
- while len(phase) == 0:
- _wait()
- self.assertEqual(len(phase), 1)
+ self.wait_phase(phase, 1)
self.assertEqual(0, lock._recursion_count())
+
+ # Thread unblocked
phase.append(None)
- while len(phase) == 2:
- _wait()
- self.assertEqual(len(phase), 3)
+ self.wait_phase(phase, 3)
self.assertEqual(0, lock._recursion_count())
def test_different_thread(self):
@@ -371,12 +405,12 @@ class RLockTests(BaseLockTests):
lock = self.locktype()
def f():
lock.acquire()
- b = Bunch(f, 1, True)
- try:
- self.assertRaises(RuntimeError, lock.release)
- finally:
- b.do_finish()
- b.wait_for_finished()
+
+ with Bunch(f, 1, True) as bunch:
+ try:
+ self.assertRaises(RuntimeError, lock.release)
+ finally:
+ bunch.do_finish()
def test__is_owned(self):
lock = self.locktype()
@@ -388,7 +422,8 @@ class RLockTests(BaseLockTests):
result = []
def f():
result.append(lock._is_owned())
- Bunch(f, 1).wait_for_finished()
+ with Bunch(f, 1):
+ pass
self.assertFalse(result[0])
lock.release()
self.assertTrue(lock._is_owned())
@@ -421,12 +456,15 @@ class EventTests(BaseTestCase):
def f():
results1.append(evt.wait())
results2.append(evt.wait())
- b = Bunch(f, N)
- b.wait_for_started()
- _wait()
- self.assertEqual(len(results1), 0)
- evt.set()
- b.wait_for_finished()
+
+ with Bunch(f, N):
+ # Threads blocked on first evt.wait()
+ wait_threads_blocked(N)
+ self.assertEqual(len(results1), 0)
+
+ # Threads unblocked
+ evt.set()
+
self.assertEqual(results1, [True] * N)
self.assertEqual(results2, [True] * N)
@@ -449,35 +487,43 @@ class EventTests(BaseTestCase):
r = evt.wait(0.5)
t2 = time.monotonic()
results2.append((r, t2 - t1))
- Bunch(f, N).wait_for_finished()
+
+ with Bunch(f, N):
+ pass
+
self.assertEqual(results1, [False] * N)
for r, dt in results2:
self.assertFalse(r)
self.assertTimeout(dt, 0.5)
+
# The event is set
results1 = []
results2 = []
evt.set()
- Bunch(f, N).wait_for_finished()
+ with Bunch(f, N):
+ pass
+
self.assertEqual(results1, [True] * N)
for r, dt in results2:
self.assertTrue(r)
def test_set_and_clear(self):
- # Issue #13502: check that wait() returns true even when the event is
+ # gh-57711: check that wait() returns true even when the event is
# cleared before the waiting thread is woken up.
- evt = self.eventtype()
+ event = self.eventtype()
results = []
- timeout = 0.250
- N = 5
def f():
- results.append(evt.wait(timeout * 4))
- b = Bunch(f, N)
- b.wait_for_started()
- time.sleep(timeout)
- evt.set()
- evt.clear()
- b.wait_for_finished()
+ results.append(event.wait(support.LONG_TIMEOUT))
+
+ N = 5
+ with Bunch(f, N):
+ # Threads blocked on event.wait()
+ wait_threads_blocked(N)
+
+ # Threads unblocked
+ event.set()
+ event.clear()
+
self.assertEqual(results, [True] * N)
@requires_fork
@@ -533,15 +579,14 @@ class ConditionTests(BaseTestCase):
# Note that this test is sensitive to timing. If the worker threads
# don't execute in a timely fashion, the main thread may think they
# are further along then they are. The main thread therefore issues
- # _wait() statements to try to make sure that it doesn't race ahead
- # of the workers.
+ # wait_threads_blocked() statements to try to make sure that it doesn't
+ # race ahead of the workers.
# Secondly, this test assumes that condition variables are not subject
# to spurious wakeups. The absence of spurious wakeups is an implementation
# detail of Condition Variables in current CPython, but in general, not
# a guaranteed property of condition variables as a programming
# construct. In particular, it is possible that this can no longer
# be conveniently guaranteed should their implementation ever change.
- N = 5
ready = []
results1 = []
results2 = []
@@ -550,58 +595,83 @@ class ConditionTests(BaseTestCase):
cond.acquire()
ready.append(phase_num)
result = cond.wait()
+
cond.release()
results1.append((result, phase_num))
+
cond.acquire()
ready.append(phase_num)
+
result = cond.wait()
cond.release()
results2.append((result, phase_num))
- b = Bunch(f, N)
- b.wait_for_started()
- # first wait, to ensure all workers settle into cond.wait() before
- # we continue. See issues #8799 and #30727.
- while len(ready) < 5:
- _wait()
- ready.clear()
- self.assertEqual(results1, [])
- # Notify 3 threads at first
- cond.acquire()
- cond.notify(3)
- _wait()
- phase_num = 1
- cond.release()
- while len(results1) < 3:
- _wait()
- self.assertEqual(results1, [(True, 1)] * 3)
- self.assertEqual(results2, [])
- # make sure all awaken workers settle into cond.wait()
- while len(ready) < 3:
- _wait()
- # Notify 5 threads: they might be in their first or second wait
- cond.acquire()
- cond.notify(5)
- _wait()
- phase_num = 2
- cond.release()
- while len(results1) + len(results2) < 8:
- _wait()
- self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
- self.assertEqual(results2, [(True, 2)] * 3)
- # make sure all workers settle into cond.wait()
- while len(ready) < 5:
- _wait()
- # Notify all threads: they are all in their second wait
- cond.acquire()
- cond.notify_all()
- _wait()
- phase_num = 3
- cond.release()
- while len(results2) < 5:
- _wait()
- self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
- self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
- b.wait_for_finished()
+
+ N = 5
+ with Bunch(f, N):
+ # first wait, to ensure all workers settle into cond.wait() before
+ # we continue. See issues #8799 and #30727.
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if len(ready) >= N:
+ break
+
+ ready.clear()
+ self.assertEqual(results1, [])
+
+ # Notify 3 threads at first
+ count1 = 3
+ cond.acquire()
+ cond.notify(count1)
+ wait_threads_blocked(count1)
+
+ # Phase 1
+ phase_num = 1
+ cond.release()
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if len(results1) >= count1:
+ break
+
+ self.assertEqual(results1, [(True, 1)] * count1)
+ self.assertEqual(results2, [])
+
+ # Wait until awaken workers are blocked on cond.wait()
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if len(ready) >= count1 :
+ break
+
+ # Notify 5 threads: they might be in their first or second wait
+ cond.acquire()
+ cond.notify(5)
+ wait_threads_blocked(N)
+
+ # Phase 2
+ phase_num = 2
+ cond.release()
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if len(results1) + len(results2) >= (N + count1):
+ break
+
+ count2 = N - count1
+ self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
+ self.assertEqual(results2, [(True, 2)] * count1)
+
+ # Make sure all workers settle into cond.wait()
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if len(ready) >= N:
+ break
+
+ # Notify all threads: they are all in their second wait
+ cond.acquire()
+ cond.notify_all()
+ wait_threads_blocked(N)
+
+ # Phase 3
+ phase_num = 3
+ cond.release()
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if len(results2) >= N:
+ break
+ self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
+ self.assertEqual(results2, [(True, 2)] * count1 + [(True, 3)] * count2)
def test_notify(self):
cond = self.condtype()
@@ -611,19 +681,23 @@ class ConditionTests(BaseTestCase):
def test_timeout(self):
cond = self.condtype()
+ timeout = 0.5
results = []
- N = 5
def f():
cond.acquire()
t1 = time.monotonic()
- result = cond.wait(0.5)
+ result = cond.wait(timeout)
t2 = time.monotonic()
cond.release()
results.append((t2 - t1, result))
- Bunch(f, N).wait_for_finished()
+
+ N = 5
+ with Bunch(f, N):
+ pass
self.assertEqual(len(results), N)
+
for dt, result in results:
- self.assertTimeout(dt, 0.5)
+ self.assertTimeout(dt, timeout)
# Note that conceptually (that"s the condition variable protocol)
# a wait() may succeed even if no one notifies us and before any
# timeout occurs. Spurious wakeups can occur.
@@ -636,17 +710,16 @@ class ConditionTests(BaseTestCase):
state = 0
def f():
with cond:
- result = cond.wait_for(lambda : state==4)
+ result = cond.wait_for(lambda: state == 4)
self.assertTrue(result)
self.assertEqual(state, 4)
- b = Bunch(f, 1)
- b.wait_for_started()
- for i in range(4):
- time.sleep(0.01)
- with cond:
- state += 1
- cond.notify()
- b.wait_for_finished()
+
+ with Bunch(f, 1):
+ for i in range(4):
+ time.sleep(0.010)
+ with cond:
+ state += 1
+ cond.notify()
def test_waitfor_timeout(self):
cond = self.condtype()
@@ -660,15 +733,15 @@ class ConditionTests(BaseTestCase):
self.assertFalse(result)
self.assertTimeout(dt, 0.1)
success.append(None)
- b = Bunch(f, 1)
- b.wait_for_started()
- # Only increment 3 times, so state == 4 is never reached.
- for i in range(3):
- time.sleep(0.01)
- with cond:
- state += 1
- cond.notify()
- b.wait_for_finished()
+
+ with Bunch(f, 1):
+ # Only increment 3 times, so state == 4 is never reached.
+ for i in range(3):
+ time.sleep(0.010)
+ with cond:
+ state += 1
+ cond.notify()
+
self.assertEqual(len(success), 1)
@@ -697,73 +770,107 @@ class BaseSemaphoreTests(BaseTestCase):
del sem
def test_acquire_contended(self):
- sem = self.semtype(7)
+ sem_value = 7
+ sem = self.semtype(sem_value)
sem.acquire()
- N = 10
+
sem_results = []
results1 = []
results2 = []
phase_num = 0
- def f():
+
+ def func():
sem_results.append(sem.acquire())
results1.append(phase_num)
+
sem_results.append(sem.acquire())
results2.append(phase_num)
- b = Bunch(f, 10)
- b.wait_for_started()
- while len(results1) + len(results2) < 6:
- _wait()
- self.assertEqual(results1 + results2, [0] * 6)
- phase_num = 1
- for i in range(7):
- sem.release()
- while len(results1) + len(results2) < 13:
- _wait()
- self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
- phase_num = 2
- for i in range(6):
+
+ def wait_count(count):
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if len(results1) + len(results2) >= count:
+ break
+
+ N = 10
+ with Bunch(func, N):
+ # Phase 0
+ count1 = sem_value - 1
+ wait_count(count1)
+ self.assertEqual(results1 + results2, [0] * count1)
+
+ # Phase 1
+ phase_num = 1
+ for i in range(sem_value):
+ sem.release()
+ count2 = sem_value
+ wait_count(count1 + count2)
+ self.assertEqual(sorted(results1 + results2),
+ [0] * count1 + [1] * count2)
+
+ # Phase 2
+ phase_num = 2
+ count3 = (sem_value - 1)
+ for i in range(count3):
+ sem.release()
+ wait_count(count1 + count2 + count3)
+ self.assertEqual(sorted(results1 + results2),
+ [0] * count1 + [1] * count2 + [2] * count3)
+ # The semaphore is still locked
+ self.assertFalse(sem.acquire(False))
+
+ # Final release, to let the last thread finish
+ count4 = 1
sem.release()
- while len(results1) + len(results2) < 19:
- _wait()
- self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
- # The semaphore is still locked
- self.assertFalse(sem.acquire(False))
- # Final release, to let the last thread finish
- sem.release()
- b.wait_for_finished()
- self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
+
+ self.assertEqual(sem_results,
+ [True] * (count1 + count2 + count3 + count4))
def test_multirelease(self):
- sem = self.semtype(7)
+ sem_value = 7
+ sem = self.semtype(sem_value)
sem.acquire()
+
results1 = []
results2 = []
phase_num = 0
- def f():
+ def func():
sem.acquire()
results1.append(phase_num)
+
sem.acquire()
results2.append(phase_num)
- b = Bunch(f, 10)
- b.wait_for_started()
- while len(results1) + len(results2) < 6:
- _wait()
- self.assertEqual(results1 + results2, [0] * 6)
- phase_num = 1
- sem.release(7)
- while len(results1) + len(results2) < 13:
- _wait()
- self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
- phase_num = 2
- sem.release(6)
- while len(results1) + len(results2) < 19:
- _wait()
- self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
- # The semaphore is still locked
- self.assertFalse(sem.acquire(False))
- # Final release, to let the last thread finish
- sem.release()
- b.wait_for_finished()
+
+ def wait_count(count):
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if len(results1) + len(results2) >= count:
+ break
+
+ with Bunch(func, 10):
+ # Phase 0
+ count1 = sem_value - 1
+ wait_count(count1)
+ self.assertEqual(results1 + results2, [0] * count1)
+
+ # Phase 1
+ phase_num = 1
+ count2 = sem_value
+ sem.release(count2)
+ wait_count(count1 + count2)
+ self.assertEqual(sorted(results1 + results2),
+ [0] * count1 + [1] * count2)
+
+ # Phase 2
+ phase_num = 2
+ count3 = sem_value - 1
+ sem.release(count3)
+ wait_count(count1 + count2 + count3)
+ self.assertEqual(sorted(results1 + results2),
+ [0] * count1 + [1] * count2 + [2] * count3)
+ # The semaphore is still locked
+ self.assertFalse(sem.acquire(False))
+
+ # Final release, to let the last thread finish
+ sem.release()
def test_try_acquire(self):
sem = self.semtype(2)
@@ -780,7 +887,8 @@ class BaseSemaphoreTests(BaseTestCase):
def f():
results.append(sem.acquire(False))
results.append(sem.acquire(False))
- Bunch(f, 5).wait_for_finished()
+ with Bunch(f, 5):
+ pass
# There can be a thread switch between acquiring the semaphore and
# appending the result, therefore results will not necessarily be
# ordered.
@@ -806,12 +914,14 @@ class BaseSemaphoreTests(BaseTestCase):
def f():
sem.acquire()
sem.release()
- b = Bunch(f, 1)
- b.wait_for_started()
- _wait()
- self.assertFalse(b.finished)
- sem.release()
- b.wait_for_finished()
+
+ with Bunch(f, 1) as bunch:
+ # Thread blocked on sem.acquire()
+ wait_threads_blocked(1)
+ self.assertFalse(bunch.finished)
+
+ # Thread unblocked
+ sem.release()
def test_with(self):
sem = self.semtype(2)
@@ -882,13 +992,13 @@ class BarrierTests(BaseTestCase):
def setUp(self):
self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
+
def tearDown(self):
self.barrier.abort()
def run_threads(self, f):
- b = Bunch(f, self.N-1)
- f()
- b.wait_for_finished()
+ with Bunch(f, self.N):
+ pass
def multipass(self, results, n):
m = self.barrier.parties
@@ -979,8 +1089,9 @@ class BarrierTests(BaseTestCase):
i = self.barrier.wait()
if i == self.N//2:
# Wait until the other threads are all in the barrier.
- while self.barrier.n_waiting < self.N-1:
- time.sleep(0.001)
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if self.barrier.n_waiting >= (self.N - 1):
+ break
self.barrier.reset()
else:
try:
@@ -1040,25 +1151,27 @@ class BarrierTests(BaseTestCase):
i = self.barrier.wait()
if i == self.N // 2:
# One thread is late!
- time.sleep(1.0)
+ time.sleep(self.defaultTimeout / 2)
# Default timeout is 2.0, so this is shorter.
self.assertRaises(threading.BrokenBarrierError,
- self.barrier.wait, 0.5)
+ self.barrier.wait, self.defaultTimeout / 4)
self.run_threads(f)
def test_default_timeout(self):
"""
Test the barrier's default timeout
"""
- # create a barrier with a low default timeout
- barrier = self.barriertype(self.N, timeout=0.3)
+ timeout = 0.100
+ barrier = self.barriertype(2, timeout=timeout)
def f():
- i = barrier.wait()
- if i == self.N // 2:
- # One thread is later than the default timeout of 0.3s.
- time.sleep(1.0)
- self.assertRaises(threading.BrokenBarrierError, barrier.wait)
- self.run_threads(f)
+ self.assertRaises(threading.BrokenBarrierError,
+ barrier.wait)
+
+ start_time = time.monotonic()
+ with Bunch(f, 1):
+ pass
+ dt = time.monotonic() - start_time
+ self.assertGreaterEqual(dt, timeout)
def test_single_thread(self):
b = self.barriertype(1)
@@ -1066,16 +1179,28 @@ class BarrierTests(BaseTestCase):
b.wait()
def test_repr(self):
- b = self.barriertype(3)
- self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>")
+ barrier = self.barriertype(3)
+ timeout = support.LONG_TIMEOUT
+ self.assertRegex(repr(barrier), r"<\w+\.Barrier at .*: waiters=0/3>")
def f():
- b.wait(3)
- bunch = Bunch(f, 2)
- bunch.wait_for_started()
- time.sleep(0.2)
- self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=2/3>")
- b.wait(3)
- bunch.wait_for_finished()
- self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>")
- b.abort()
- self.assertRegex(repr(b), r"<\w+\.Barrier at .*: broken>")
+ barrier.wait(timeout)
+
+ N = 2
+ with Bunch(f, N):
+ # Threads blocked on barrier.wait()
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if barrier.n_waiting >= N:
+ break
+ self.assertRegex(repr(barrier),
+ r"<\w+\.Barrier at .*: waiters=2/3>")
+
+ # Threads unblocked
+ barrier.wait(timeout)
+
+ self.assertRegex(repr(barrier),
+ r"<\w+\.Barrier at .*: waiters=0/3>")
+
+ # Abort the barrier
+ barrier.abort()
+ self.assertRegex(repr(barrier),
+ r"<\w+\.Barrier at .*: broken>")
diff --git a/Lib/test/test_importlib/test_locks.py b/Lib/test/test_importlib/test_locks.py
index 7091c36..befac5d 100644
--- a/Lib/test/test_importlib/test_locks.py
+++ b/Lib/test/test_importlib/test_locks.py
@@ -93,7 +93,8 @@ class DeadlockAvoidanceTests:
b.release()
if ra:
a.release()
- lock_tests.Bunch(f, NTHREADS).wait_for_finished()
+ with lock_tests.Bunch(f, NTHREADS):
+ pass
self.assertEqual(len(results), NTHREADS)
return results
diff --git a/Misc/NEWS.d/next/Tests/2023-09-29-00-19-21.gh-issue-109974.Sh_g-r.rst b/Misc/NEWS.d/next/Tests/2023-09-29-00-19-21.gh-issue-109974.Sh_g-r.rst
new file mode 100644
index 0000000..a130cf6
--- /dev/null
+++ b/Misc/NEWS.d/next/Tests/2023-09-29-00-19-21.gh-issue-109974.Sh_g-r.rst
@@ -0,0 +1,3 @@
+Fix race conditions in test_threading lock tests. Wait until a condition is met
+rather than using :func:`time.sleep` with a hardcoded number of seconds. Patch
+by Victor Stinner.