diff options
-rw-r--r-- | Doc/library/threading.rst | 36 | ||||
-rw-r--r-- | Lib/test/lock_tests.py | 40 | ||||
-rw-r--r-- | Lib/threading.py | 39 |
3 files changed, 108 insertions, 7 deletions
diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index d3d7d9e..11aa4c4 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -539,6 +539,13 @@ state change can be interesting for only one or several waiting threads. E.g. in a typical producer-consumer situation, adding one item to the buffer only needs to wake up one consumer thread. +Note: Condition variables can be, depending on the implementation, subject +to both spurious wakeups (when :meth:`wait` returns without a :meth:`notify` +call) and stolen wakeups (when another thread acquires the lock before the +awoken thread.) For this reason, it is always necessary to verify the state +the thread is waiting for when :meth:`wait` returns and optionally repeat +the call as often as necessary. + .. class:: Condition(lock=None) @@ -585,6 +592,35 @@ needs to wake up one consumer thread. .. versionchanged:: 3.2 Previously, the method always returned ``None``. + .. method:: wait_for(predicate, timeout=None) + + Wait until a condition evaluates to True. *predicate* should be a + callable which result will be interpreted as a boolean value. + A *timeout* may be provided giving the maximum time to wait. + + This utility method may call :meth:`wait` repeatedly until the predicate + is satisfied, or until a timeout occurs. The return value is + the last return value of the predicate and will evaluate to + ``False`` if the method timed out. + + Ignoring the timeout feature, calling this method is roughly equivalent to + writing:: + + while not predicate(): + cv.wait() + + Therefore, the same rules apply as with :meth:`wait`: The lock must be + held when called and is re-aquired on return. The predicate is evaluated + with the lock held. + + Using this method, the consumer example above can be written thus:: + + with cv: + cv.wait_for(an_item_is_available) + get_an_available_item() + + .. versionadded:: 3.2 + .. method:: notify() Wake up a thread waiting on this condition, if any. If the calling thread diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py index d956bb6..b6d818e 100644 --- a/Lib/test/lock_tests.py +++ b/Lib/test/lock_tests.py @@ -446,6 +446,46 @@ class ConditionTests(BaseTestCase): # In practice, this implementation has no spurious wakeups. self.assertFalse(result) + def test_waitfor(self): + cond = self.condtype() + state = 0 + def f(): + with cond: + result = cond.wait_for(lambda : state==4) + self.assertTrue(result) + self.assertEqual(state, 4) + b = Bunch(f, 1) + b.wait_for_started() + for i in range(5): + time.sleep(0.01) + with cond: + state += 1 + cond.notify() + b.wait_for_finished() + + def test_waitfor_timeout(self): + cond = self.condtype() + state = 0 + success = [] + def f(): + with cond: + dt = time.time() + result = cond.wait_for(lambda : state==4, timeout=0.1) + dt = time.time() - dt + self.assertFalse(result) + self.assertTimeout(dt, 0.1) + success.append(None) + b = Bunch(f, 1) + b.wait_for_started() + # Only increment 3 times, so state == 4 is never reached. + for i in range(3): + time.sleep(0.01) + with cond: + state += 1 + cond.notify() + b.wait_for_finished() + self.assertEqual(len(success), 1) + class BaseSemaphoreTests(BaseTestCase): """ diff --git a/Lib/threading.py b/Lib/threading.py index 41956ed..b6c1e5d 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -254,6 +254,32 @@ class _Condition(_Verbose): finally: self._acquire_restore(saved_state) + def wait_for(self, predicate, timeout=None): + endtime = None + waittime = timeout + result = predicate() + while not result: + if waittime is not None: + if endtime is None: + endtime = _time() + waittime + else: + waittime = endtime - _time() + if waittime <= 0: + if __debug__: + self._note("%s.wait_for(%r, %r): Timed out.", + self, predicate, timeout) + break + if __debug__: + self._note("%s.wait_for(%r, %r): Waiting with timeout=%s.", + self, predicate, timeout, waittime) + self.wait(waittime) + result = predicate() + else: + if __debug__: + self._note("%s.wait_for(%r, %r): Success.", + self, predicate, timeout) + return result + def notify(self, n=1): if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") @@ -482,13 +508,12 @@ class Barrier(_Verbose): # Wait in the barrier until we are relased. Raise an exception # if the barrier is reset or broken. def _wait(self, timeout): - while self._state == 0: - if self._cond.wait(timeout) is False: - #timed out. Break the barrier - self._break() - raise BrokenBarrierError - if self._state < 0: - raise BrokenBarrierError + if not self._cond.wait_for(lambda : self._state != 0, timeout): + #timed out. Break the barrier + self._break() + raise BrokenBarrierError + if self._state < 0: + raise BrokenBarrierError assert self._state == 1 # If we are the last thread to exit the barrier, signal any threads |