diff options
Diffstat (limited to 'Lib/test/test_asyncio/test_tasks.py')
-rw-r--r-- | Lib/test/test_asyncio/test_tasks.py | 187 |
1 files changed, 0 insertions, 187 deletions
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 128b7ce..e7fb774 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -2260,192 +2260,5 @@ class SleepTests(test_utils.TestCase): self.assertEqual(result, 11) -class TimeoutTests(test_utils.TestCase): - def setUp(self): - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(None) - - def tearDown(self): - self.loop.close() - self.loop = None - - def test_timeout(self): - canceled_raised = [False] - - @asyncio.coroutine - def long_running_task(): - try: - yield from asyncio.sleep(10, loop=self.loop) - except asyncio.CancelledError: - canceled_raised[0] = True - raise - - @asyncio.coroutine - def go(): - with self.assertRaises(asyncio.TimeoutError): - with asyncio.timeout(0.01, loop=self.loop) as t: - yield from long_running_task() - self.assertIs(t._loop, self.loop) - - self.loop.run_until_complete(go()) - self.assertTrue(canceled_raised[0], 'CancelledError was not raised') - - def test_timeout_finish_in_time(self): - @asyncio.coroutine - def long_running_task(): - yield from asyncio.sleep(0.01, loop=self.loop) - return 'done' - - @asyncio.coroutine - def go(): - with asyncio.timeout(0.1, loop=self.loop): - resp = yield from long_running_task() - self.assertEqual(resp, 'done') - - self.loop.run_until_complete(go()) - - def test_timeout_gloabal_loop(self): - asyncio.set_event_loop(self.loop) - - @asyncio.coroutine - def run(): - with asyncio.timeout(0.1) as t: - yield from asyncio.sleep(0.01) - self.assertIs(t._loop, self.loop) - - self.loop.run_until_complete(run()) - - def test_timeout_not_relevant_exception(self): - @asyncio.coroutine - def go(): - yield from asyncio.sleep(0, loop=self.loop) - with self.assertRaises(KeyError): - with asyncio.timeout(0.1, loop=self.loop): - raise KeyError - - self.loop.run_until_complete(go()) - - def test_timeout_canceled_error_is_converted_to_timeout(self): - @asyncio.coroutine - def go(): - yield from asyncio.sleep(0, loop=self.loop) - with self.assertRaises(asyncio.CancelledError): - with asyncio.timeout(0.001, loop=self.loop): - raise asyncio.CancelledError - - self.loop.run_until_complete(go()) - - def test_timeout_blocking_loop(self): - @asyncio.coroutine - def long_running_task(): - time.sleep(0.05) - return 'done' - - @asyncio.coroutine - def go(): - with asyncio.timeout(0.01, loop=self.loop): - result = yield from long_running_task() - self.assertEqual(result, 'done') - - self.loop.run_until_complete(go()) - - def test_for_race_conditions(self): - fut = asyncio.Future(loop=self.loop) - self.loop.call_later(0.1, fut.set_result('done')) - - @asyncio.coroutine - def go(): - with asyncio.timeout(0.2, loop=self.loop): - resp = yield from fut - self.assertEqual(resp, 'done') - - self.loop.run_until_complete(go()) - - def test_timeout_time(self): - @asyncio.coroutine - def go(): - foo_running = None - - start = self.loop.time() - with self.assertRaises(asyncio.TimeoutError): - with asyncio.timeout(0.1, loop=self.loop): - foo_running = True - try: - yield from asyncio.sleep(0.2, loop=self.loop) - finally: - foo_running = False - - dt = self.loop.time() - start - # tolerate a small delta for slow delta or unstable clocks - self.assertTrue(0.09 < dt < 0.12, dt) - self.assertFalse(foo_running) - - self.loop.run_until_complete(go()) - - def test_timeout_disable(self): - @asyncio.coroutine - def long_running_task(): - yield from asyncio.sleep(0.1, loop=self.loop) - return 'done' - - @asyncio.coroutine - def go(): - t0 = self.loop.time() - with asyncio.timeout(None, loop=self.loop): - resp = yield from long_running_task() - self.assertEqual(resp, 'done') - dt = self.loop.time() - t0 - # tolerate a time delta for clocks with bad resolution - # and slow buildbots - self.assertTrue(0.09 < dt < 0.15, dt) - self.loop.run_until_complete(go()) - - def test_raise_runtimeerror_if_no_task(self): - with self.assertRaises(RuntimeError): - with asyncio.timeout(0.1, loop=self.loop): - pass - - def test_outer_coro_is_not_cancelled(self): - - has_timeout = [False] - - @asyncio.coroutine - def outer(): - try: - with asyncio.timeout(0.001, loop=self.loop): - yield from asyncio.sleep(1, loop=self.loop) - except asyncio.TimeoutError: - has_timeout[0] = True - - @asyncio.coroutine - def go(): - task = asyncio.ensure_future(outer(), loop=self.loop) - yield from task - self.assertTrue(has_timeout[0]) - self.assertFalse(task.cancelled()) - self.assertTrue(task.done()) - - self.loop.run_until_complete(go()) - - def test_cancel_outer_coro(self): - fut = asyncio.Future(loop=self.loop) - - @asyncio.coroutine - def outer(): - fut.set_result(None) - yield from asyncio.sleep(1, loop=self.loop) - - @asyncio.coroutine - def go(): - task = asyncio.ensure_future(outer(), loop=self.loop) - yield from fut - task.cancel() - with self.assertRaises(asyncio.CancelledError): - yield from task - self.assertTrue(task.cancelled()) - self.assertTrue(task.done()) - - self.loop.run_until_complete(go()) - if __name__ == '__main__': unittest.main() |