diff options
author | Charles-François Natali <neologix@free.fr> | 2012-04-17 16:45:57 (GMT) |
---|---|---|
committer | Charles-François Natali <neologix@free.fr> | 2012-04-17 16:45:57 (GMT) |
commit | c8ce715a82fd8034ef1d809b262346c15f2490c4 (patch) | |
tree | 125522b5a5daa37ba405f81902aeebe8d91b6c7f /Lib/test/test_multiprocessing.py | |
parent | a3f4457b172a165fafa65d67e0293f89dbba06b7 (diff) | |
download | cpython-c8ce715a82fd8034ef1d809b262346c15f2490c4.zip cpython-c8ce715a82fd8034ef1d809b262346c15f2490c4.tar.gz cpython-c8ce715a82fd8034ef1d809b262346c15f2490c4.tar.bz2 |
Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt.
Diffstat (limited to 'Lib/test/test_multiprocessing.py')
-rw-r--r-- | Lib/test/test_multiprocessing.py | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index 2bcdb4e..bbde366 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -887,6 +887,73 @@ class _TestCondition(BaseTestCase): self.assertEqual(res, False) self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) + @classmethod + def _test_waitfor_f(cls, cond, state): + with cond: + state.value = 0 + cond.notify() + result = cond.wait_for(lambda : state.value==4) + if not result or state.value != 4: + sys.exit(1) + + @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') + def test_waitfor(self): + # based on test in test/lock_tests.py + cond = self.Condition() + state = self.Value('i', -1) + + p = self.Process(target=self._test_waitfor_f, args=(cond, state)) + p.daemon = True + p.start() + + with cond: + result = cond.wait_for(lambda : state.value==0) + self.assertTrue(result) + self.assertEqual(state.value, 0) + + for i in range(4): + time.sleep(0.01) + with cond: + state.value += 1 + cond.notify() + + p.join(5) + self.assertFalse(p.is_alive()) + self.assertEqual(p.exitcode, 0) + + @classmethod + def _test_waitfor_timeout_f(cls, cond, state, success): + with cond: + expected = 0.1 + dt = time.time() + result = cond.wait_for(lambda : state.value==4, timeout=expected) + dt = time.time() - dt + # borrow logic in assertTimeout() from test/lock_tests.py + if not result and expected * 0.6 < dt < expected * 10.0: + success.value = True + + @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') + def test_waitfor_timeout(self): + # based on test in test/lock_tests.py + cond = self.Condition() + state = self.Value('i', 0) + success = self.Value('i', False) + + p = self.Process(target=self._test_waitfor_timeout_f, + args=(cond, state, success)) + p.daemon = True + p.start() + + # Only increment 3 times, so state == 4 is never reached. + for i in range(3): + time.sleep(0.01) + with cond: + state.value += 1 + cond.notify() + + p.join(5) + self.assertTrue(success.value) + class _TestEvent(BaseTestCase): |