summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Doc/library/threading.rst36
-rw-r--r--Lib/test/lock_tests.py40
-rw-r--r--Lib/threading.py39
3 files changed, 108 insertions, 7 deletions
diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst
index d3d7d9e..11aa4c4 100644
--- a/Doc/library/threading.rst
+++ b/Doc/library/threading.rst
@@ -539,6 +539,13 @@ state change can be interesting for only one or several waiting threads. E.g.
in a typical producer-consumer situation, adding one item to the buffer only
needs to wake up one consumer thread.
+Note: Condition variables can be, depending on the implementation, subject
+to both spurious wakeups (when :meth:`wait` returns without a :meth:`notify`
+call) and stolen wakeups (when another thread acquires the lock before the
+awoken thread.) For this reason, it is always necessary to verify the state
+the thread is waiting for when :meth:`wait` returns and optionally repeat
+the call as often as necessary.
+
.. class:: Condition(lock=None)
@@ -585,6 +592,35 @@ needs to wake up one consumer thread.
.. versionchanged:: 3.2
Previously, the method always returned ``None``.
+ .. method:: wait_for(predicate, timeout=None)
+
+ Wait until a condition evaluates to True. *predicate* should be a
+ callable which result will be interpreted as a boolean value.
+ A *timeout* may be provided giving the maximum time to wait.
+
+ This utility method may call :meth:`wait` repeatedly until the predicate
+ is satisfied, or until a timeout occurs. The return value is
+ the last return value of the predicate and will evaluate to
+ ``False`` if the method timed out.
+
+ Ignoring the timeout feature, calling this method is roughly equivalent to
+ writing::
+
+ while not predicate():
+ cv.wait()
+
+ Therefore, the same rules apply as with :meth:`wait`: The lock must be
+ held when called and is re-aquired on return. The predicate is evaluated
+ with the lock held.
+
+ Using this method, the consumer example above can be written thus::
+
+ with cv:
+ cv.wait_for(an_item_is_available)
+ get_an_available_item()
+
+ .. versionadded:: 3.2
+
.. method:: notify()
Wake up a thread waiting on this condition, if any. If the calling thread
diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
index d956bb6..b6d818e 100644
--- a/Lib/test/lock_tests.py
+++ b/Lib/test/lock_tests.py
@@ -446,6 +446,46 @@ class ConditionTests(BaseTestCase):
# In practice, this implementation has no spurious wakeups.
self.assertFalse(result)
+ def test_waitfor(self):
+ cond = self.condtype()
+ state = 0
+ def f():
+ with cond:
+ result = cond.wait_for(lambda : state==4)
+ self.assertTrue(result)
+ self.assertEqual(state, 4)
+ b = Bunch(f, 1)
+ b.wait_for_started()
+ for i in range(5):
+ time.sleep(0.01)
+ with cond:
+ state += 1
+ cond.notify()
+ b.wait_for_finished()
+
+ def test_waitfor_timeout(self):
+ cond = self.condtype()
+ state = 0
+ success = []
+ def f():
+ with cond:
+ dt = time.time()
+ result = cond.wait_for(lambda : state==4, timeout=0.1)
+ dt = time.time() - dt
+ self.assertFalse(result)
+ self.assertTimeout(dt, 0.1)
+ success.append(None)
+ b = Bunch(f, 1)
+ b.wait_for_started()
+ # Only increment 3 times, so state == 4 is never reached.
+ for i in range(3):
+ time.sleep(0.01)
+ with cond:
+ state += 1
+ cond.notify()
+ b.wait_for_finished()
+ self.assertEqual(len(success), 1)
+
class BaseSemaphoreTests(BaseTestCase):
"""
diff --git a/Lib/threading.py b/Lib/threading.py
index 41956ed..b6c1e5d 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -254,6 +254,32 @@ class _Condition(_Verbose):
finally:
self._acquire_restore(saved_state)
+ def wait_for(self, predicate, timeout=None):
+ endtime = None
+ waittime = timeout
+ result = predicate()
+ while not result:
+ if waittime is not None:
+ if endtime is None:
+ endtime = _time() + waittime
+ else:
+ waittime = endtime - _time()
+ if waittime <= 0:
+ if __debug__:
+ self._note("%s.wait_for(%r, %r): Timed out.",
+ self, predicate, timeout)
+ break
+ if __debug__:
+ self._note("%s.wait_for(%r, %r): Waiting with timeout=%s.",
+ self, predicate, timeout, waittime)
+ self.wait(waittime)
+ result = predicate()
+ else:
+ if __debug__:
+ self._note("%s.wait_for(%r, %r): Success.",
+ self, predicate, timeout)
+ return result
+
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
@@ -482,13 +508,12 @@ class Barrier(_Verbose):
# Wait in the barrier until we are relased. Raise an exception
# if the barrier is reset or broken.
def _wait(self, timeout):
- while self._state == 0:
- if self._cond.wait(timeout) is False:
- #timed out. Break the barrier
- self._break()
- raise BrokenBarrierError
- if self._state < 0:
- raise BrokenBarrierError
+ if not self._cond.wait_for(lambda : self._state != 0, timeout):
+ #timed out. Break the barrier
+ self._break()
+ raise BrokenBarrierError
+ if self._state < 0:
+ raise BrokenBarrierError
assert self._state == 1
# If we are the last thread to exit the barrier, signal any threads