diff options
Diffstat (limited to 'Lib/test/test_asyncio/test_tasks.py')
-rw-r--r-- | Lib/test/test_asyncio/test_tasks.py | 3375 |
1 files changed, 0 insertions, 3375 deletions
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py deleted file mode 100644 index dde84b8..0000000 --- a/Lib/test/test_asyncio/test_tasks.py +++ /dev/null @@ -1,3375 +0,0 @@ -"""Tests for tasks.py.""" - -import collections -import contextlib -import contextvars -import functools -import gc -import io -import random -import re -import sys -import textwrap -import types -import unittest -import weakref -from unittest import mock - -import asyncio -from asyncio import coroutines -from asyncio import futures -from asyncio import tasks -from test.test_asyncio import utils as test_utils -from test import support -from test.support.script_helper import assert_python_ok - - -def tearDownModule(): - asyncio.set_event_loop_policy(None) - - -async def coroutine_function(): - pass - - -@contextlib.contextmanager -def set_coroutine_debug(enabled): - coroutines = asyncio.coroutines - - old_debug = coroutines._DEBUG - try: - coroutines._DEBUG = enabled - yield - finally: - coroutines._DEBUG = old_debug - - -def format_coroutine(qualname, state, src, source_traceback, generator=False): - if generator: - state = '%s' % state - else: - state = '%s, defined' % state - if source_traceback is not None: - frame = source_traceback[-1] - return ('coro=<%s() %s at %s> created at %s:%s' - % (qualname, state, src, frame[0], frame[1])) - else: - return 'coro=<%s() %s at %s>' % (qualname, state, src) - - -class Dummy: - - def __repr__(self): - return '<Dummy>' - - def __call__(self, *args): - pass - - -class CoroLikeObject: - def send(self, v): - raise StopIteration(42) - - def throw(self, *exc): - pass - - def close(self): - pass - - def __await__(self): - return self - - -class BaseTaskTests: - - Task = None - Future = None - - def new_task(self, loop, coro, name='TestTask'): - return self.__class__.Task(coro, loop=loop, name=name) - - def new_future(self, loop): - return self.__class__.Future(loop=loop) - - def setUp(self): - super().setUp() - self.loop = self.new_test_loop() - self.loop.set_task_factory(self.new_task) - self.loop.create_future = lambda: self.new_future(self.loop) - - def test_task_del_collect(self): - class Evil: - def __del__(self): - gc.collect() - - async def run(): - return Evil() - - self.loop.run_until_complete( - asyncio.gather(*[ - self.new_task(self.loop, run()) for _ in range(100) - ], loop=self.loop)) - - def test_other_loop_future(self): - other_loop = asyncio.new_event_loop() - fut = self.new_future(other_loop) - - async def run(fut): - await fut - - try: - with self.assertRaisesRegex(RuntimeError, - r'Task .* got Future .* attached'): - self.loop.run_until_complete(run(fut)) - finally: - other_loop.close() - - def test_task_awaits_on_itself(self): - - async def test(): - await task - - task = asyncio.ensure_future(test(), loop=self.loop) - - with self.assertRaisesRegex(RuntimeError, - 'Task cannot await on itself'): - self.loop.run_until_complete(task) - - def test_task_class(self): - async def notmuch(): - return 'ok' - t = self.new_task(self.loop, notmuch()) - self.loop.run_until_complete(t) - self.assertTrue(t.done()) - self.assertEqual(t.result(), 'ok') - self.assertIs(t._loop, self.loop) - self.assertIs(t.get_loop(), self.loop) - - loop = asyncio.new_event_loop() - self.set_event_loop(loop) - t = self.new_task(loop, notmuch()) - self.assertIs(t._loop, loop) - loop.run_until_complete(t) - loop.close() - - def test_ensure_future_coroutine(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def notmuch(): - return 'ok' - t = asyncio.ensure_future(notmuch(), loop=self.loop) - self.loop.run_until_complete(t) - self.assertTrue(t.done()) - self.assertEqual(t.result(), 'ok') - self.assertIs(t._loop, self.loop) - - loop = asyncio.new_event_loop() - self.set_event_loop(loop) - t = asyncio.ensure_future(notmuch(), loop=loop) - self.assertIs(t._loop, loop) - loop.run_until_complete(t) - loop.close() - - def test_ensure_future_future(self): - f_orig = self.new_future(self.loop) - f_orig.set_result('ko') - - f = asyncio.ensure_future(f_orig) - self.loop.run_until_complete(f) - self.assertTrue(f.done()) - self.assertEqual(f.result(), 'ko') - self.assertIs(f, f_orig) - - loop = asyncio.new_event_loop() - self.set_event_loop(loop) - - with self.assertRaises(ValueError): - f = asyncio.ensure_future(f_orig, loop=loop) - - loop.close() - - f = asyncio.ensure_future(f_orig, loop=self.loop) - self.assertIs(f, f_orig) - - def test_ensure_future_task(self): - async def notmuch(): - return 'ok' - t_orig = self.new_task(self.loop, notmuch()) - t = asyncio.ensure_future(t_orig) - self.loop.run_until_complete(t) - self.assertTrue(t.done()) - self.assertEqual(t.result(), 'ok') - self.assertIs(t, t_orig) - - loop = asyncio.new_event_loop() - self.set_event_loop(loop) - - with self.assertRaises(ValueError): - t = asyncio.ensure_future(t_orig, loop=loop) - - loop.close() - - t = asyncio.ensure_future(t_orig, loop=self.loop) - self.assertIs(t, t_orig) - - def test_ensure_future_awaitable(self): - class Aw: - def __init__(self, coro): - self.coro = coro - def __await__(self): - return (yield from self.coro) - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(): - return 'ok' - - loop = asyncio.new_event_loop() - self.set_event_loop(loop) - fut = asyncio.ensure_future(Aw(coro()), loop=loop) - loop.run_until_complete(fut) - assert fut.result() == 'ok' - - def test_ensure_future_neither(self): - with self.assertRaises(TypeError): - asyncio.ensure_future('ok') - - def test_ensure_future_error_msg(self): - loop = asyncio.new_event_loop() - f = self.new_future(self.loop) - with self.assertRaisesRegex(ValueError, 'The future belongs to a ' - 'different loop than the one specified as ' - 'the loop argument'): - asyncio.ensure_future(f, loop=loop) - loop.close() - - def test_get_stack(self): - T = None - - async def foo(): - await bar() - - async def bar(): - # test get_stack() - f = T.get_stack(limit=1) - try: - self.assertEqual(f[0].f_code.co_name, 'foo') - finally: - f = None - - # test print_stack() - file = io.StringIO() - T.print_stack(limit=1, file=file) - file.seek(0) - tb = file.read() - self.assertRegex(tb, r'foo\(\) running') - - async def runner(): - nonlocal T - T = asyncio.ensure_future(foo(), loop=self.loop) - await T - - self.loop.run_until_complete(runner()) - - def test_task_repr(self): - self.loop.set_debug(False) - - async def notmuch(): - return 'abc' - - # test coroutine function - self.assertEqual(notmuch.__name__, 'notmuch') - self.assertRegex(notmuch.__qualname__, - r'\w+.test_task_repr.<locals>.notmuch') - self.assertEqual(notmuch.__module__, __name__) - - filename, lineno = test_utils.get_function_source(notmuch) - src = "%s:%s" % (filename, lineno) - - # test coroutine object - gen = notmuch() - coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch' - self.assertEqual(gen.__name__, 'notmuch') - self.assertEqual(gen.__qualname__, coro_qualname) - - # test pending Task - t = self.new_task(self.loop, gen) - t.add_done_callback(Dummy()) - - coro = format_coroutine(coro_qualname, 'running', src, - t._source_traceback, generator=True) - self.assertEqual(repr(t), - "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) - - # test cancelling Task - t.cancel() # Does not take immediate effect! - self.assertEqual(repr(t), - "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro) - - # test cancelled Task - self.assertRaises(asyncio.CancelledError, - self.loop.run_until_complete, t) - coro = format_coroutine(coro_qualname, 'done', src, - t._source_traceback) - self.assertEqual(repr(t), - "<Task cancelled name='TestTask' %s>" % coro) - - # test finished Task - t = self.new_task(self.loop, notmuch()) - self.loop.run_until_complete(t) - coro = format_coroutine(coro_qualname, 'done', src, - t._source_traceback) - self.assertEqual(repr(t), - "<Task finished name='TestTask' %s result='abc'>" % coro) - - def test_task_repr_autogenerated(self): - async def notmuch(): - return 123 - - t1 = self.new_task(self.loop, notmuch(), None) - t2 = self.new_task(self.loop, notmuch(), None) - self.assertNotEqual(repr(t1), repr(t2)) - - match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1)) - self.assertIsNotNone(match1) - match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2)) - self.assertIsNotNone(match2) - - # Autogenerated task names should have monotonically increasing numbers - self.assertLess(int(match1.group(1)), int(match2.group(1))) - self.loop.run_until_complete(t1) - self.loop.run_until_complete(t2) - - def test_task_repr_name_not_str(self): - async def notmuch(): - return 123 - - t = self.new_task(self.loop, notmuch()) - t.set_name({6}) - self.assertEqual(t.get_name(), '{6}') - self.loop.run_until_complete(t) - - def test_task_repr_coro_decorator(self): - self.loop.set_debug(False) - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def notmuch(): - # notmuch() function doesn't use yield from: it will be wrapped by - # @coroutine decorator - return 123 - - # test coroutine function - self.assertEqual(notmuch.__name__, 'notmuch') - self.assertRegex(notmuch.__qualname__, - r'\w+.test_task_repr_coro_decorator' - r'\.<locals>\.notmuch') - self.assertEqual(notmuch.__module__, __name__) - - # test coroutine object - gen = notmuch() - # On Python >= 3.5, generators now inherit the name of the - # function, as expected, and have a qualified name (__qualname__ - # attribute). - coro_name = 'notmuch' - coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator' - '.<locals>.notmuch') - self.assertEqual(gen.__name__, coro_name) - self.assertEqual(gen.__qualname__, coro_qualname) - - # test repr(CoroWrapper) - if coroutines._DEBUG: - # format the coroutine object - if coroutines._DEBUG: - filename, lineno = test_utils.get_function_source(notmuch) - frame = gen._source_traceback[-1] - coro = ('%s() running, defined at %s:%s, created at %s:%s' - % (coro_qualname, filename, lineno, - frame[0], frame[1])) - else: - code = gen.gi_code - coro = ('%s() running at %s:%s' - % (coro_qualname, code.co_filename, - code.co_firstlineno)) - - self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro) - - # test pending Task - t = self.new_task(self.loop, gen) - t.add_done_callback(Dummy()) - - # format the coroutine object - if coroutines._DEBUG: - src = '%s:%s' % test_utils.get_function_source(notmuch) - else: - code = gen.gi_code - src = '%s:%s' % (code.co_filename, code.co_firstlineno) - coro = format_coroutine(coro_qualname, 'running', src, - t._source_traceback, - generator=not coroutines._DEBUG) - self.assertEqual(repr(t), - "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) - self.loop.run_until_complete(t) - - def test_task_repr_wait_for(self): - self.loop.set_debug(False) - - async def wait_for(fut): - return await fut - - fut = self.new_future(self.loop) - task = self.new_task(self.loop, wait_for(fut)) - test_utils.run_briefly(self.loop) - self.assertRegex(repr(task), - '<Task .* wait_for=%s>' % re.escape(repr(fut))) - - fut.set_result(None) - self.loop.run_until_complete(task) - - def test_task_repr_partial_corowrapper(self): - # Issue #222: repr(CoroWrapper) must not fail in debug mode if the - # coroutine is a partial function - with set_coroutine_debug(True): - self.loop.set_debug(True) - - async def func(x, y): - await asyncio.sleep(0) - - with self.assertWarns(DeprecationWarning): - partial_func = asyncio.coroutine(functools.partial(func, 1)) - task = self.loop.create_task(partial_func(2)) - - # make warnings quiet - task._log_destroy_pending = False - self.addCleanup(task._coro.close) - - coro_repr = repr(task._coro) - expected = ( - r'<coroutine object \w+\.test_task_repr_partial_corowrapper' - r'\.<locals>\.func at' - ) - self.assertRegex(coro_repr, expected) - - def test_task_basics(self): - - async def outer(): - a = await inner1() - b = await inner2() - return a+b - - async def inner1(): - return 42 - - async def inner2(): - return 1000 - - t = outer() - self.assertEqual(self.loop.run_until_complete(t), 1042) - - def test_cancel(self): - - def gen(): - when = yield - self.assertAlmostEqual(10.0, when) - yield 0 - - loop = self.new_test_loop(gen) - - async def task(): - await asyncio.sleep(10.0) - return 12 - - t = self.new_task(loop, task()) - loop.call_soon(t.cancel) - with self.assertRaises(asyncio.CancelledError): - loop.run_until_complete(t) - self.assertTrue(t.done()) - self.assertTrue(t.cancelled()) - self.assertFalse(t.cancel()) - - def test_cancel_yield(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def task(): - yield - yield - return 12 - - t = self.new_task(self.loop, task()) - test_utils.run_briefly(self.loop) # start coro - t.cancel() - self.assertRaises( - asyncio.CancelledError, self.loop.run_until_complete, t) - self.assertTrue(t.done()) - self.assertTrue(t.cancelled()) - self.assertFalse(t.cancel()) - - def test_cancel_inner_future(self): - f = self.new_future(self.loop) - - async def task(): - await f - return 12 - - t = self.new_task(self.loop, task()) - test_utils.run_briefly(self.loop) # start task - f.cancel() - with self.assertRaises(asyncio.CancelledError): - self.loop.run_until_complete(t) - self.assertTrue(f.cancelled()) - self.assertTrue(t.cancelled()) - - def test_cancel_both_task_and_inner_future(self): - f = self.new_future(self.loop) - - async def task(): - await f - return 12 - - t = self.new_task(self.loop, task()) - test_utils.run_briefly(self.loop) - - f.cancel() - t.cancel() - - with self.assertRaises(asyncio.CancelledError): - self.loop.run_until_complete(t) - - self.assertTrue(t.done()) - self.assertTrue(f.cancelled()) - self.assertTrue(t.cancelled()) - - def test_cancel_task_catching(self): - fut1 = self.new_future(self.loop) - fut2 = self.new_future(self.loop) - - async def task(): - await fut1 - try: - await fut2 - except asyncio.CancelledError: - return 42 - - t = self.new_task(self.loop, task()) - test_utils.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut1) # White-box test. - fut1.set_result(None) - test_utils.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut2) # White-box test. - t.cancel() - self.assertTrue(fut2.cancelled()) - res = self.loop.run_until_complete(t) - self.assertEqual(res, 42) - self.assertFalse(t.cancelled()) - - def test_cancel_task_ignoring(self): - fut1 = self.new_future(self.loop) - fut2 = self.new_future(self.loop) - fut3 = self.new_future(self.loop) - - async def task(): - await fut1 - try: - await fut2 - except asyncio.CancelledError: - pass - res = await fut3 - return res - - t = self.new_task(self.loop, task()) - test_utils.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut1) # White-box test. - fut1.set_result(None) - test_utils.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut2) # White-box test. - t.cancel() - self.assertTrue(fut2.cancelled()) - test_utils.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut3) # White-box test. - fut3.set_result(42) - res = self.loop.run_until_complete(t) - self.assertEqual(res, 42) - self.assertFalse(fut3.cancelled()) - self.assertFalse(t.cancelled()) - - def test_cancel_current_task(self): - loop = asyncio.new_event_loop() - self.set_event_loop(loop) - - async def task(): - t.cancel() - self.assertTrue(t._must_cancel) # White-box test. - # The sleep should be cancelled immediately. - await asyncio.sleep(100) - return 12 - - t = self.new_task(loop, task()) - self.assertFalse(t.cancelled()) - self.assertRaises( - asyncio.CancelledError, loop.run_until_complete, t) - self.assertTrue(t.done()) - self.assertTrue(t.cancelled()) - self.assertFalse(t._must_cancel) # White-box test. - self.assertFalse(t.cancel()) - - def test_cancel_at_end(self): - """coroutine end right after task is cancelled""" - loop = asyncio.new_event_loop() - self.set_event_loop(loop) - - async def task(): - t.cancel() - self.assertTrue(t._must_cancel) # White-box test. - return 12 - - t = self.new_task(loop, task()) - self.assertFalse(t.cancelled()) - self.assertRaises( - asyncio.CancelledError, loop.run_until_complete, t) - self.assertTrue(t.done()) - self.assertTrue(t.cancelled()) - self.assertFalse(t._must_cancel) # White-box test. - self.assertFalse(t.cancel()) - - def test_cancel_awaited_task(self): - # This tests for a relatively rare condition when - # a task cancellation is requested for a task which is not - # currently blocked, such as a task cancelling itself. - # In this situation we must ensure that whatever next future - # or task the cancelled task blocks on is cancelled correctly - # as well. See also bpo-34872. - loop = asyncio.new_event_loop() - self.addCleanup(lambda: loop.close()) - - task = nested_task = None - fut = self.new_future(loop) - - async def nested(): - await fut - - async def coro(): - nonlocal nested_task - # Create a sub-task and wait for it to run. - nested_task = self.new_task(loop, nested()) - await asyncio.sleep(0) - - # Request the current task to be cancelled. - task.cancel() - # Block on the nested task, which should be immediately - # cancelled. - await nested_task - - task = self.new_task(loop, coro()) - with self.assertRaises(asyncio.CancelledError): - loop.run_until_complete(task) - - self.assertTrue(task.cancelled()) - self.assertTrue(nested_task.cancelled()) - self.assertTrue(fut.cancelled()) - - def test_stop_while_run_in_complete(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.1, when) - when = yield 0.1 - self.assertAlmostEqual(0.2, when) - when = yield 0.1 - self.assertAlmostEqual(0.3, when) - yield 0.1 - - loop = self.new_test_loop(gen) - - x = 0 - - async def task(): - nonlocal x - while x < 10: - await asyncio.sleep(0.1) - x += 1 - if x == 2: - loop.stop() - - t = self.new_task(loop, task()) - with self.assertRaises(RuntimeError) as cm: - loop.run_until_complete(t) - self.assertEqual(str(cm.exception), - 'Event loop stopped before Future completed.') - self.assertFalse(t.done()) - self.assertEqual(x, 2) - self.assertAlmostEqual(0.3, loop.time()) - - t.cancel() - self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t) - - def test_log_traceback(self): - async def coro(): - pass - - task = self.new_task(self.loop, coro()) - with self.assertRaisesRegex(ValueError, 'can only be set to False'): - task._log_traceback = True - self.loop.run_until_complete(task) - - def test_wait_for_timeout_less_then_0_or_0_future_done(self): - def gen(): - when = yield - self.assertAlmostEqual(0, when) - - loop = self.new_test_loop(gen) - - fut = self.new_future(loop) - fut.set_result('done') - - ret = loop.run_until_complete(asyncio.wait_for(fut, 0)) - - self.assertEqual(ret, 'done') - self.assertTrue(fut.done()) - self.assertAlmostEqual(0, loop.time()) - - def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self): - def gen(): - when = yield - self.assertAlmostEqual(0, when) - - loop = self.new_test_loop(gen) - - foo_started = False - - async def foo(): - nonlocal foo_started - foo_started = True - - with self.assertRaises(asyncio.TimeoutError): - loop.run_until_complete(asyncio.wait_for(foo(), 0)) - - self.assertAlmostEqual(0, loop.time()) - self.assertEqual(foo_started, False) - - def test_wait_for_timeout_less_then_0_or_0(self): - def gen(): - when = yield - self.assertAlmostEqual(0.2, when) - when = yield 0 - self.assertAlmostEqual(0, when) - - for timeout in [0, -1]: - with self.subTest(timeout=timeout): - loop = self.new_test_loop(gen) - - foo_running = None - - async def foo(): - nonlocal foo_running - foo_running = True - try: - await asyncio.sleep(0.2) - finally: - foo_running = False - return 'done' - - fut = self.new_task(loop, foo()) - - with self.assertRaises(asyncio.TimeoutError): - loop.run_until_complete(asyncio.wait_for(fut, timeout)) - self.assertTrue(fut.done()) - # it should have been cancelled due to the timeout - self.assertTrue(fut.cancelled()) - self.assertAlmostEqual(0, loop.time()) - self.assertEqual(foo_running, False) - - def test_wait_for(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.2, when) - when = yield 0 - self.assertAlmostEqual(0.1, when) - when = yield 0.1 - - loop = self.new_test_loop(gen) - - foo_running = None - - async def foo(): - nonlocal foo_running - foo_running = True - try: - await asyncio.sleep(0.2) - finally: - foo_running = False - return 'done' - - fut = self.new_task(loop, foo()) - - with self.assertRaises(asyncio.TimeoutError): - loop.run_until_complete(asyncio.wait_for(fut, 0.1)) - self.assertTrue(fut.done()) - # it should have been cancelled due to the timeout - self.assertTrue(fut.cancelled()) - self.assertAlmostEqual(0.1, loop.time()) - self.assertEqual(foo_running, False) - - def test_wait_for_blocking(self): - loop = self.new_test_loop() - - async def coro(): - return 'done' - - res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None)) - self.assertEqual(res, 'done') - - def test_wait_for_with_global_loop(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.2, when) - when = yield 0 - self.assertAlmostEqual(0.01, when) - yield 0.01 - - loop = self.new_test_loop(gen) - - async def foo(): - await asyncio.sleep(0.2) - return 'done' - - asyncio.set_event_loop(loop) - try: - fut = self.new_task(loop, foo()) - with self.assertRaises(asyncio.TimeoutError): - loop.run_until_complete(asyncio.wait_for(fut, 0.01)) - finally: - asyncio.set_event_loop(None) - - self.assertAlmostEqual(0.01, loop.time()) - self.assertTrue(fut.done()) - self.assertTrue(fut.cancelled()) - - def test_wait_for_race_condition(self): - - def gen(): - yield 0.1 - yield 0.1 - yield 0.1 - - loop = self.new_test_loop(gen) - - fut = self.new_future(loop) - task = asyncio.wait_for(fut, timeout=0.2) - loop.call_later(0.1, fut.set_result, "ok") - res = loop.run_until_complete(task) - self.assertEqual(res, "ok") - - def test_wait_for_waits_for_task_cancellation(self): - loop = asyncio.new_event_loop() - self.addCleanup(loop.close) - - task_done = False - - async def foo(): - async def inner(): - nonlocal task_done - try: - await asyncio.sleep(0.2) - finally: - task_done = True - - inner_task = self.new_task(loop, inner()) - - with self.assertRaises(asyncio.TimeoutError): - await asyncio.wait_for(inner_task, timeout=0.1) - - self.assertTrue(task_done) - - loop.run_until_complete(foo()) - - def test_wait_for_self_cancellation(self): - loop = asyncio.new_event_loop() - self.addCleanup(loop.close) - - async def foo(): - async def inner(): - try: - await asyncio.sleep(0.3) - except asyncio.CancelledError: - try: - await asyncio.sleep(0.3) - except asyncio.CancelledError: - await asyncio.sleep(0.3) - - return 42 - - inner_task = self.new_task(loop, inner()) - - wait = asyncio.wait_for(inner_task, timeout=0.1) - - # Test that wait_for itself is properly cancellable - # even when the initial task holds up the initial cancellation. - task = self.new_task(loop, wait) - await asyncio.sleep(0.2) - task.cancel() - - with self.assertRaises(asyncio.CancelledError): - await task - - self.assertEqual(await inner_task, 42) - - loop.run_until_complete(foo()) - - def test_wait(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.1, when) - when = yield 0 - self.assertAlmostEqual(0.15, when) - yield 0.15 - - loop = self.new_test_loop(gen) - - a = self.new_task(loop, asyncio.sleep(0.1)) - b = self.new_task(loop, asyncio.sleep(0.15)) - - async def foo(): - done, pending = await asyncio.wait([b, a]) - self.assertEqual(done, set([a, b])) - self.assertEqual(pending, set()) - return 42 - - res = loop.run_until_complete(self.new_task(loop, foo())) - self.assertEqual(res, 42) - self.assertAlmostEqual(0.15, loop.time()) - - # Doing it again should take no time and exercise a different path. - res = loop.run_until_complete(self.new_task(loop, foo())) - self.assertAlmostEqual(0.15, loop.time()) - self.assertEqual(res, 42) - - def test_wait_with_global_loop(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.01, when) - when = yield 0 - self.assertAlmostEqual(0.015, when) - yield 0.015 - - loop = self.new_test_loop(gen) - - a = self.new_task(loop, asyncio.sleep(0.01)) - b = self.new_task(loop, asyncio.sleep(0.015)) - - async def foo(): - done, pending = await asyncio.wait([b, a]) - self.assertEqual(done, set([a, b])) - self.assertEqual(pending, set()) - return 42 - - asyncio.set_event_loop(loop) - res = loop.run_until_complete( - self.new_task(loop, foo())) - - self.assertEqual(res, 42) - - def test_wait_duplicate_coroutines(self): - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(s): - return s - c = coro('test') - - task =self.new_task( - self.loop, - asyncio.wait([c, c, coro('spam')])) - - done, pending = self.loop.run_until_complete(task) - - self.assertFalse(pending) - self.assertEqual(set(f.result() for f in done), {'test', 'spam'}) - - def test_wait_errors(self): - self.assertRaises( - ValueError, self.loop.run_until_complete, - asyncio.wait(set())) - - # -1 is an invalid return_when value - sleep_coro = asyncio.sleep(10.0) - wait_coro = asyncio.wait([sleep_coro], return_when=-1) - self.assertRaises(ValueError, - self.loop.run_until_complete, wait_coro) - - sleep_coro.close() - - def test_wait_first_completed(self): - - def gen(): - when = yield - self.assertAlmostEqual(10.0, when) - when = yield 0 - self.assertAlmostEqual(0.1, when) - yield 0.1 - - loop = self.new_test_loop(gen) - - a = self.new_task(loop, asyncio.sleep(10.0)) - b = self.new_task(loop, asyncio.sleep(0.1)) - task = self.new_task( - loop, - asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) - - done, pending = loop.run_until_complete(task) - self.assertEqual({b}, done) - self.assertEqual({a}, pending) - self.assertFalse(a.done()) - self.assertTrue(b.done()) - self.assertIsNone(b.result()) - self.assertAlmostEqual(0.1, loop.time()) - - # move forward to close generator - loop.advance_time(10) - loop.run_until_complete(asyncio.wait([a, b])) - - def test_wait_really_done(self): - # there is possibility that some tasks in the pending list - # became done but their callbacks haven't all been called yet - - async def coro1(): - await asyncio.sleep(0) - - async def coro2(): - await asyncio.sleep(0) - await asyncio.sleep(0) - - a = self.new_task(self.loop, coro1()) - b = self.new_task(self.loop, coro2()) - task = self.new_task( - self.loop, - asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) - - done, pending = self.loop.run_until_complete(task) - self.assertEqual({a, b}, done) - self.assertTrue(a.done()) - self.assertIsNone(a.result()) - self.assertTrue(b.done()) - self.assertIsNone(b.result()) - - def test_wait_first_exception(self): - - def gen(): - when = yield - self.assertAlmostEqual(10.0, when) - yield 0 - - loop = self.new_test_loop(gen) - - # first_exception, task already has exception - a = self.new_task(loop, asyncio.sleep(10.0)) - - async def exc(): - raise ZeroDivisionError('err') - - b = self.new_task(loop, exc()) - task = self.new_task( - loop, - asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)) - - done, pending = loop.run_until_complete(task) - self.assertEqual({b}, done) - self.assertEqual({a}, pending) - self.assertAlmostEqual(0, loop.time()) - - # move forward to close generator - loop.advance_time(10) - loop.run_until_complete(asyncio.wait([a, b])) - - def test_wait_first_exception_in_wait(self): - - def gen(): - when = yield - self.assertAlmostEqual(10.0, when) - when = yield 0 - self.assertAlmostEqual(0.01, when) - yield 0.01 - - loop = self.new_test_loop(gen) - - # first_exception, exception during waiting - a = self.new_task(loop, asyncio.sleep(10.0)) - - async def exc(): - await asyncio.sleep(0.01) - raise ZeroDivisionError('err') - - b = self.new_task(loop, exc()) - task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION) - - done, pending = loop.run_until_complete(task) - self.assertEqual({b}, done) - self.assertEqual({a}, pending) - self.assertAlmostEqual(0.01, loop.time()) - - # move forward to close generator - loop.advance_time(10) - loop.run_until_complete(asyncio.wait([a, b])) - - def test_wait_with_exception(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.1, when) - when = yield 0 - self.assertAlmostEqual(0.15, when) - yield 0.15 - - loop = self.new_test_loop(gen) - - a = self.new_task(loop, asyncio.sleep(0.1)) - - async def sleeper(): - await asyncio.sleep(0.15) - raise ZeroDivisionError('really') - - b = self.new_task(loop, sleeper()) - - async def foo(): - done, pending = await asyncio.wait([b, a]) - self.assertEqual(len(done), 2) - self.assertEqual(pending, set()) - errors = set(f for f in done if f.exception() is not None) - self.assertEqual(len(errors), 1) - - loop.run_until_complete(self.new_task(loop, foo())) - self.assertAlmostEqual(0.15, loop.time()) - - loop.run_until_complete(self.new_task(loop, foo())) - self.assertAlmostEqual(0.15, loop.time()) - - def test_wait_with_timeout(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.1, when) - when = yield 0 - self.assertAlmostEqual(0.15, when) - when = yield 0 - self.assertAlmostEqual(0.11, when) - yield 0.11 - - loop = self.new_test_loop(gen) - - a = self.new_task(loop, asyncio.sleep(0.1)) - b = self.new_task(loop, asyncio.sleep(0.15)) - - async def foo(): - done, pending = await asyncio.wait([b, a], timeout=0.11) - self.assertEqual(done, set([a])) - self.assertEqual(pending, set([b])) - - loop.run_until_complete(self.new_task(loop, foo())) - self.assertAlmostEqual(0.11, loop.time()) - - # move forward to close generator - loop.advance_time(10) - loop.run_until_complete(asyncio.wait([a, b])) - - def test_wait_concurrent_complete(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.1, when) - when = yield 0 - self.assertAlmostEqual(0.15, when) - when = yield 0 - self.assertAlmostEqual(0.1, when) - yield 0.1 - - loop = self.new_test_loop(gen) - - a = self.new_task(loop, asyncio.sleep(0.1)) - b = self.new_task(loop, asyncio.sleep(0.15)) - - done, pending = loop.run_until_complete( - asyncio.wait([b, a], timeout=0.1)) - - self.assertEqual(done, set([a])) - self.assertEqual(pending, set([b])) - self.assertAlmostEqual(0.1, loop.time()) - - # move forward to close generator - loop.advance_time(10) - loop.run_until_complete(asyncio.wait([a, b])) - - def test_as_completed(self): - - def gen(): - yield 0 - yield 0 - yield 0.01 - yield 0 - - loop = self.new_test_loop(gen) - # disable "slow callback" warning - loop.slow_callback_duration = 1.0 - completed = set() - time_shifted = False - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def sleeper(dt, x): - nonlocal time_shifted - yield from asyncio.sleep(dt) - completed.add(x) - if not time_shifted and 'a' in completed and 'b' in completed: - time_shifted = True - loop.advance_time(0.14) - return x - - a = sleeper(0.01, 'a') - b = sleeper(0.01, 'b') - c = sleeper(0.15, 'c') - - async def foo(): - values = [] - for f in asyncio.as_completed([b, c, a], loop=loop): - values.append(await f) - return values - with self.assertWarns(DeprecationWarning): - res = loop.run_until_complete(self.new_task(loop, foo())) - self.assertAlmostEqual(0.15, loop.time()) - self.assertTrue('a' in res[:2]) - self.assertTrue('b' in res[:2]) - self.assertEqual(res[2], 'c') - - # Doing it again should take no time and exercise a different path. - with self.assertWarns(DeprecationWarning): - res = loop.run_until_complete(self.new_task(loop, foo())) - self.assertAlmostEqual(0.15, loop.time()) - - def test_as_completed_with_timeout(self): - - def gen(): - yield - yield 0 - yield 0 - yield 0.1 - - loop = self.new_test_loop(gen) - - a = loop.create_task(asyncio.sleep(0.1, 'a')) - b = loop.create_task(asyncio.sleep(0.15, 'b')) - - async def foo(): - values = [] - for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop): - if values: - loop.advance_time(0.02) - try: - v = await f - values.append((1, v)) - except asyncio.TimeoutError as exc: - values.append((2, exc)) - return values - - with self.assertWarns(DeprecationWarning): - res = loop.run_until_complete(self.new_task(loop, foo())) - self.assertEqual(len(res), 2, res) - self.assertEqual(res[0], (1, 'a')) - self.assertEqual(res[1][0], 2) - self.assertIsInstance(res[1][1], asyncio.TimeoutError) - self.assertAlmostEqual(0.12, loop.time()) - - # move forward to close generator - loop.advance_time(10) - loop.run_until_complete(asyncio.wait([a, b])) - - def test_as_completed_with_unused_timeout(self): - - def gen(): - yield - yield 0 - yield 0.01 - - loop = self.new_test_loop(gen) - - a = asyncio.sleep(0.01, 'a') - - async def foo(): - for f in asyncio.as_completed([a], timeout=1, loop=loop): - v = await f - self.assertEqual(v, 'a') - - with self.assertWarns(DeprecationWarning): - loop.run_until_complete(self.new_task(loop, foo())) - - def test_as_completed_reverse_wait(self): - - def gen(): - yield 0 - yield 0.05 - yield 0 - - loop = self.new_test_loop(gen) - - a = asyncio.sleep(0.05, 'a') - b = asyncio.sleep(0.10, 'b') - fs = {a, b} - - with self.assertWarns(DeprecationWarning): - futs = list(asyncio.as_completed(fs, loop=loop)) - self.assertEqual(len(futs), 2) - - x = loop.run_until_complete(futs[1]) - self.assertEqual(x, 'a') - self.assertAlmostEqual(0.05, loop.time()) - loop.advance_time(0.05) - y = loop.run_until_complete(futs[0]) - self.assertEqual(y, 'b') - self.assertAlmostEqual(0.10, loop.time()) - - def test_as_completed_concurrent(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.05, when) - when = yield 0 - self.assertAlmostEqual(0.05, when) - yield 0.05 - - loop = self.new_test_loop(gen) - - a = asyncio.sleep(0.05, 'a') - b = asyncio.sleep(0.05, 'b') - fs = {a, b} - with self.assertWarns(DeprecationWarning): - futs = list(asyncio.as_completed(fs, loop=loop)) - self.assertEqual(len(futs), 2) - waiter = asyncio.wait(futs) - done, pending = loop.run_until_complete(waiter) - self.assertEqual(set(f.result() for f in done), {'a', 'b'}) - - def test_as_completed_duplicate_coroutines(self): - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(s): - return s - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def runner(): - result = [] - c = coro('ham') - for f in asyncio.as_completed([c, c, coro('spam')], - loop=self.loop): - result.append((yield from f)) - return result - - with self.assertWarns(DeprecationWarning): - fut = self.new_task(self.loop, runner()) - self.loop.run_until_complete(fut) - result = fut.result() - self.assertEqual(set(result), {'ham', 'spam'}) - self.assertEqual(len(result), 2) - - def test_sleep(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.05, when) - when = yield 0.05 - self.assertAlmostEqual(0.1, when) - yield 0.05 - - loop = self.new_test_loop(gen) - - async def sleeper(dt, arg): - await asyncio.sleep(dt/2) - res = await asyncio.sleep(dt/2, arg) - return res - - t = self.new_task(loop, sleeper(0.1, 'yeah')) - loop.run_until_complete(t) - self.assertTrue(t.done()) - self.assertEqual(t.result(), 'yeah') - self.assertAlmostEqual(0.1, loop.time()) - - def test_sleep_cancel(self): - - def gen(): - when = yield - self.assertAlmostEqual(10.0, when) - yield 0 - - loop = self.new_test_loop(gen) - - t = self.new_task(loop, asyncio.sleep(10.0, 'yeah')) - - handle = None - orig_call_later = loop.call_later - - def call_later(delay, callback, *args): - nonlocal handle - handle = orig_call_later(delay, callback, *args) - return handle - - loop.call_later = call_later - test_utils.run_briefly(loop) - - self.assertFalse(handle._cancelled) - - t.cancel() - test_utils.run_briefly(loop) - self.assertTrue(handle._cancelled) - - def test_task_cancel_sleeping_task(self): - - def gen(): - when = yield - self.assertAlmostEqual(0.1, when) - when = yield 0 - self.assertAlmostEqual(5000, when) - yield 0.1 - - loop = self.new_test_loop(gen) - - async def sleep(dt): - await asyncio.sleep(dt) - - async def doit(): - sleeper = self.new_task(loop, sleep(5000)) - loop.call_later(0.1, sleeper.cancel) - try: - await sleeper - except asyncio.CancelledError: - return 'cancelled' - else: - return 'slept in' - - doer = doit() - self.assertEqual(loop.run_until_complete(doer), 'cancelled') - self.assertAlmostEqual(0.1, loop.time()) - - def test_task_cancel_waiter_future(self): - fut = self.new_future(self.loop) - - async def coro(): - await fut - - task = self.new_task(self.loop, coro()) - test_utils.run_briefly(self.loop) - self.assertIs(task._fut_waiter, fut) - - task.cancel() - test_utils.run_briefly(self.loop) - self.assertRaises( - asyncio.CancelledError, self.loop.run_until_complete, task) - self.assertIsNone(task._fut_waiter) - self.assertTrue(fut.cancelled()) - - def test_task_set_methods(self): - async def notmuch(): - return 'ko' - - gen = notmuch() - task = self.new_task(self.loop, gen) - - with self.assertRaisesRegex(RuntimeError, 'not support set_result'): - task.set_result('ok') - - with self.assertRaisesRegex(RuntimeError, 'not support set_exception'): - task.set_exception(ValueError()) - - self.assertEqual( - self.loop.run_until_complete(task), - 'ko') - - def test_step_result(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def notmuch(): - yield None - yield 1 - return 'ko' - - self.assertRaises( - RuntimeError, self.loop.run_until_complete, notmuch()) - - def test_step_result_future(self): - # If coroutine returns future, task waits on this future. - - class Fut(asyncio.Future): - def __init__(self, *args, **kwds): - self.cb_added = False - super().__init__(*args, **kwds) - - def add_done_callback(self, *args, **kwargs): - self.cb_added = True - super().add_done_callback(*args, **kwargs) - - fut = Fut(loop=self.loop) - result = None - - async def wait_for_future(): - nonlocal result - result = await fut - - t = self.new_task(self.loop, wait_for_future()) - test_utils.run_briefly(self.loop) - self.assertTrue(fut.cb_added) - - res = object() - fut.set_result(res) - test_utils.run_briefly(self.loop) - self.assertIs(res, result) - self.assertTrue(t.done()) - self.assertIsNone(t.result()) - - def test_baseexception_during_cancel(self): - - def gen(): - when = yield - self.assertAlmostEqual(10.0, when) - yield 0 - - loop = self.new_test_loop(gen) - - async def sleeper(): - await asyncio.sleep(10) - - base_exc = SystemExit() - - async def notmutch(): - try: - await sleeper() - except asyncio.CancelledError: - raise base_exc - - task = self.new_task(loop, notmutch()) - test_utils.run_briefly(loop) - - task.cancel() - self.assertFalse(task.done()) - - self.assertRaises(SystemExit, test_utils.run_briefly, loop) - - self.assertTrue(task.done()) - self.assertFalse(task.cancelled()) - self.assertIs(task.exception(), base_exc) - - def test_iscoroutinefunction(self): - def fn(): - pass - - self.assertFalse(asyncio.iscoroutinefunction(fn)) - - def fn1(): - yield - self.assertFalse(asyncio.iscoroutinefunction(fn1)) - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def fn2(): - yield - self.assertTrue(asyncio.iscoroutinefunction(fn2)) - - self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) - - def test_yield_vs_yield_from(self): - fut = self.new_future(self.loop) - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def wait_for_future(): - yield fut - - task = wait_for_future() - with self.assertRaises(RuntimeError): - self.loop.run_until_complete(task) - - self.assertFalse(fut.done()) - - def test_yield_vs_yield_from_generator(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(): - yield - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def wait_for_future(): - gen = coro() - try: - yield gen - finally: - gen.close() - - task = wait_for_future() - self.assertRaises( - RuntimeError, - self.loop.run_until_complete, task) - - def test_coroutine_non_gen_function(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def func(): - return 'test' - - self.assertTrue(asyncio.iscoroutinefunction(func)) - - coro = func() - self.assertTrue(asyncio.iscoroutine(coro)) - - res = self.loop.run_until_complete(coro) - self.assertEqual(res, 'test') - - def test_coroutine_non_gen_function_return_future(self): - fut = self.new_future(self.loop) - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def func(): - return fut - - async def coro(): - fut.set_result('test') - - t1 = self.new_task(self.loop, func()) - t2 = self.new_task(self.loop, coro()) - res = self.loop.run_until_complete(t1) - self.assertEqual(res, 'test') - self.assertIsNone(t2.result()) - - - def test_current_task_deprecated(self): - Task = self.__class__.Task - - with self.assertWarns(DeprecationWarning): - self.assertIsNone(Task.current_task(loop=self.loop)) - - async def coro(loop): - with self.assertWarns(DeprecationWarning): - self.assertIs(Task.current_task(loop=loop), task) - - # See http://bugs.python.org/issue29271 for details: - asyncio.set_event_loop(loop) - try: - with self.assertWarns(DeprecationWarning): - self.assertIs(Task.current_task(None), task) - with self.assertWarns(DeprecationWarning): - self.assertIs(Task.current_task(), task) - finally: - asyncio.set_event_loop(None) - - task = self.new_task(self.loop, coro(self.loop)) - self.loop.run_until_complete(task) - with self.assertWarns(DeprecationWarning): - self.assertIsNone(Task.current_task(loop=self.loop)) - - def test_current_task(self): - self.assertIsNone(asyncio.current_task(loop=self.loop)) - - async def coro(loop): - self.assertIs(asyncio.current_task(loop=loop), task) - - self.assertIs(asyncio.current_task(None), task) - self.assertIs(asyncio.current_task(), task) - - task = self.new_task(self.loop, coro(self.loop)) - self.loop.run_until_complete(task) - self.assertIsNone(asyncio.current_task(loop=self.loop)) - - def test_current_task_with_interleaving_tasks(self): - self.assertIsNone(asyncio.current_task(loop=self.loop)) - - fut1 = self.new_future(self.loop) - fut2 = self.new_future(self.loop) - - async def coro1(loop): - self.assertTrue(asyncio.current_task(loop=loop) is task1) - await fut1 - self.assertTrue(asyncio.current_task(loop=loop) is task1) - fut2.set_result(True) - - async def coro2(loop): - self.assertTrue(asyncio.current_task(loop=loop) is task2) - fut1.set_result(True) - await fut2 - self.assertTrue(asyncio.current_task(loop=loop) is task2) - - task1 = self.new_task(self.loop, coro1(self.loop)) - task2 = self.new_task(self.loop, coro2(self.loop)) - - self.loop.run_until_complete(asyncio.wait((task1, task2))) - self.assertIsNone(asyncio.current_task(loop=self.loop)) - - # Some thorough tests for cancellation propagation through - # coroutines, tasks and wait(). - - def test_yield_future_passes_cancel(self): - # Cancelling outer() cancels inner() cancels waiter. - proof = 0 - waiter = self.new_future(self.loop) - - async def inner(): - nonlocal proof - try: - await waiter - except asyncio.CancelledError: - proof += 1 - raise - else: - self.fail('got past sleep() in inner()') - - async def outer(): - nonlocal proof - try: - await inner() - except asyncio.CancelledError: - proof += 100 # Expect this path. - else: - proof += 10 - - f = asyncio.ensure_future(outer(), loop=self.loop) - test_utils.run_briefly(self.loop) - f.cancel() - self.loop.run_until_complete(f) - self.assertEqual(proof, 101) - self.assertTrue(waiter.cancelled()) - - def test_yield_wait_does_not_shield_cancel(self): - # Cancelling outer() makes wait() return early, leaves inner() - # running. - proof = 0 - waiter = self.new_future(self.loop) - - async def inner(): - nonlocal proof - await waiter - proof += 1 - - async def outer(): - nonlocal proof - d, p = await asyncio.wait([inner()]) - proof += 100 - - f = asyncio.ensure_future(outer(), loop=self.loop) - test_utils.run_briefly(self.loop) - f.cancel() - self.assertRaises( - asyncio.CancelledError, self.loop.run_until_complete, f) - waiter.set_result(None) - test_utils.run_briefly(self.loop) - self.assertEqual(proof, 1) - - def test_shield_result(self): - inner = self.new_future(self.loop) - outer = asyncio.shield(inner) - inner.set_result(42) - res = self.loop.run_until_complete(outer) - self.assertEqual(res, 42) - - def test_shield_exception(self): - inner = self.new_future(self.loop) - outer = asyncio.shield(inner) - test_utils.run_briefly(self.loop) - exc = RuntimeError('expected') - inner.set_exception(exc) - test_utils.run_briefly(self.loop) - self.assertIs(outer.exception(), exc) - - def test_shield_cancel_inner(self): - inner = self.new_future(self.loop) - outer = asyncio.shield(inner) - test_utils.run_briefly(self.loop) - inner.cancel() - test_utils.run_briefly(self.loop) - self.assertTrue(outer.cancelled()) - - def test_shield_cancel_outer(self): - inner = self.new_future(self.loop) - outer = asyncio.shield(inner) - test_utils.run_briefly(self.loop) - outer.cancel() - test_utils.run_briefly(self.loop) - self.assertTrue(outer.cancelled()) - self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks)) - - def test_shield_shortcut(self): - fut = self.new_future(self.loop) - fut.set_result(42) - res = self.loop.run_until_complete(asyncio.shield(fut)) - self.assertEqual(res, 42) - - def test_shield_effect(self): - # Cancelling outer() does not affect inner(). - proof = 0 - waiter = self.new_future(self.loop) - - async def inner(): - nonlocal proof - await waiter - proof += 1 - - async def outer(): - nonlocal proof - await asyncio.shield(inner()) - proof += 100 - - f = asyncio.ensure_future(outer(), loop=self.loop) - test_utils.run_briefly(self.loop) - f.cancel() - with self.assertRaises(asyncio.CancelledError): - self.loop.run_until_complete(f) - waiter.set_result(None) - test_utils.run_briefly(self.loop) - self.assertEqual(proof, 1) - - def test_shield_gather(self): - child1 = self.new_future(self.loop) - child2 = self.new_future(self.loop) - parent = asyncio.gather(child1, child2) - outer = asyncio.shield(parent) - test_utils.run_briefly(self.loop) - outer.cancel() - test_utils.run_briefly(self.loop) - self.assertTrue(outer.cancelled()) - child1.set_result(1) - child2.set_result(2) - test_utils.run_briefly(self.loop) - self.assertEqual(parent.result(), [1, 2]) - - def test_gather_shield(self): - child1 = self.new_future(self.loop) - child2 = self.new_future(self.loop) - inner1 = asyncio.shield(child1) - inner2 = asyncio.shield(child2) - parent = asyncio.gather(inner1, inner2) - test_utils.run_briefly(self.loop) - parent.cancel() - # This should cancel inner1 and inner2 but bot child1 and child2. - test_utils.run_briefly(self.loop) - self.assertIsInstance(parent.exception(), asyncio.CancelledError) - self.assertTrue(inner1.cancelled()) - self.assertTrue(inner2.cancelled()) - child1.set_result(1) - child2.set_result(2) - test_utils.run_briefly(self.loop) - - def test_as_completed_invalid_args(self): - fut = self.new_future(self.loop) - - # as_completed() expects a list of futures, not a future instance - self.assertRaises(TypeError, self.loop.run_until_complete, - asyncio.as_completed(fut, loop=self.loop)) - coro = coroutine_function() - self.assertRaises(TypeError, self.loop.run_until_complete, - asyncio.as_completed(coro, loop=self.loop)) - coro.close() - - def test_wait_invalid_args(self): - fut = self.new_future(self.loop) - - # wait() expects a list of futures, not a future instance - self.assertRaises(TypeError, self.loop.run_until_complete, - asyncio.wait(fut)) - coro = coroutine_function() - self.assertRaises(TypeError, self.loop.run_until_complete, - asyncio.wait(coro)) - coro.close() - - # wait() expects at least a future - self.assertRaises(ValueError, self.loop.run_until_complete, - asyncio.wait([])) - - def test_corowrapper_mocks_generator(self): - - def check(): - # A function that asserts various things. - # Called twice, with different debug flag values. - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(): - # The actual coroutine. - self.assertTrue(gen.gi_running) - yield from fut - - # A completed Future used to run the coroutine. - fut = self.new_future(self.loop) - fut.set_result(None) - - # Call the coroutine. - gen = coro() - - # Check some properties. - self.assertTrue(asyncio.iscoroutine(gen)) - self.assertIsInstance(gen.gi_frame, types.FrameType) - self.assertFalse(gen.gi_running) - self.assertIsInstance(gen.gi_code, types.CodeType) - - # Run it. - self.loop.run_until_complete(gen) - - # The frame should have changed. - self.assertIsNone(gen.gi_frame) - - # Test with debug flag cleared. - with set_coroutine_debug(False): - check() - - # Test with debug flag set. - with set_coroutine_debug(True): - check() - - def test_yield_from_corowrapper(self): - with set_coroutine_debug(True): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def t1(): - return (yield from t2()) - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def t2(): - f = self.new_future(self.loop) - self.new_task(self.loop, t3(f)) - return (yield from f) - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def t3(f): - f.set_result((1, 2, 3)) - - task = self.new_task(self.loop, t1()) - val = self.loop.run_until_complete(task) - self.assertEqual(val, (1, 2, 3)) - - def test_yield_from_corowrapper_send(self): - def foo(): - a = yield - return a - - def call(arg): - cw = asyncio.coroutines.CoroWrapper(foo()) - cw.send(None) - try: - cw.send(arg) - except StopIteration as ex: - return ex.args[0] - else: - raise AssertionError('StopIteration was expected') - - self.assertEqual(call((1, 2)), (1, 2)) - self.assertEqual(call('spam'), 'spam') - - def test_corowrapper_weakref(self): - wd = weakref.WeakValueDictionary() - def foo(): yield from [] - cw = asyncio.coroutines.CoroWrapper(foo()) - wd['cw'] = cw # Would fail without __weakref__ slot. - cw.gen = None # Suppress warning from __del__. - - def test_corowrapper_throw(self): - # Issue 429: CoroWrapper.throw must be compatible with gen.throw - def foo(): - value = None - while True: - try: - value = yield value - except Exception as e: - value = e - - exception = Exception("foo") - cw = asyncio.coroutines.CoroWrapper(foo()) - cw.send(None) - self.assertIs(exception, cw.throw(exception)) - - cw = asyncio.coroutines.CoroWrapper(foo()) - cw.send(None) - self.assertIs(exception, cw.throw(Exception, exception)) - - cw = asyncio.coroutines.CoroWrapper(foo()) - cw.send(None) - exception = cw.throw(Exception, "foo") - self.assertIsInstance(exception, Exception) - self.assertEqual(exception.args, ("foo", )) - - cw = asyncio.coroutines.CoroWrapper(foo()) - cw.send(None) - exception = cw.throw(Exception, "foo", None) - self.assertIsInstance(exception, Exception) - self.assertEqual(exception.args, ("foo", )) - - def test_all_tasks_deprecated(self): - Task = self.__class__.Task - - async def coro(): - with self.assertWarns(DeprecationWarning): - assert Task.all_tasks(self.loop) == {t} - - t = self.new_task(self.loop, coro()) - self.loop.run_until_complete(t) - - def test_log_destroyed_pending_task(self): - Task = self.__class__.Task - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def kill_me(loop): - future = self.new_future(loop) - yield from future - # at this point, the only reference to kill_me() task is - # the Task._wakeup() method in future._callbacks - raise Exception("code never reached") - - mock_handler = mock.Mock() - self.loop.set_debug(True) - self.loop.set_exception_handler(mock_handler) - - # schedule the task - coro = kill_me(self.loop) - task = asyncio.ensure_future(coro, loop=self.loop) - - self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) - - # See http://bugs.python.org/issue29271 for details: - asyncio.set_event_loop(self.loop) - try: - with self.assertWarns(DeprecationWarning): - self.assertEqual(Task.all_tasks(), {task}) - with self.assertWarns(DeprecationWarning): - self.assertEqual(Task.all_tasks(None), {task}) - finally: - asyncio.set_event_loop(None) - - # execute the task so it waits for future - self.loop._run_once() - self.assertEqual(len(self.loop._ready), 0) - - # remove the future used in kill_me(), and references to the task - del coro.gi_frame.f_locals['future'] - coro = None - source_traceback = task._source_traceback - task = None - - # no more reference to kill_me() task: the task is destroyed by the GC - support.gc_collect() - - self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) - - mock_handler.assert_called_with(self.loop, { - 'message': 'Task was destroyed but it is pending!', - 'task': mock.ANY, - 'source_traceback': source_traceback, - }) - mock_handler.reset_mock() - - @mock.patch('asyncio.base_events.logger') - def test_tb_logger_not_called_after_cancel(self, m_log): - loop = asyncio.new_event_loop() - self.set_event_loop(loop) - - async def coro(): - raise TypeError - - async def runner(): - task = self.new_task(loop, coro()) - await asyncio.sleep(0.05) - task.cancel() - task = None - - loop.run_until_complete(runner()) - self.assertFalse(m_log.error.called) - - @mock.patch('asyncio.coroutines.logger') - def test_coroutine_never_yielded(self, m_log): - with set_coroutine_debug(True): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro_noop(): - pass - - tb_filename = __file__ - tb_lineno = sys._getframe().f_lineno + 2 - # create a coroutine object but don't use it - coro_noop() - support.gc_collect() - - self.assertTrue(m_log.error.called) - message = m_log.error.call_args[0][0] - func_filename, func_lineno = test_utils.get_function_source(coro_noop) - - regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> ' - r'was never yielded from\n' - r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n' - r'.*\n' - r' File "%s", line %s, in test_coroutine_never_yielded\n' - r' coro_noop\(\)$' - % (re.escape(coro_noop.__qualname__), - re.escape(func_filename), func_lineno, - re.escape(tb_filename), tb_lineno)) - - self.assertRegex(message, re.compile(regex, re.DOTALL)) - - def test_return_coroutine_from_coroutine(self): - """Return of @asyncio.coroutine()-wrapped function generator object - from @asyncio.coroutine()-wrapped function should have same effect as - returning generator object or Future.""" - def check(): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def outer_coro(): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def inner_coro(): - return 1 - - return inner_coro() - - result = self.loop.run_until_complete(outer_coro()) - self.assertEqual(result, 1) - - # Test with debug flag cleared. - with set_coroutine_debug(False): - check() - - # Test with debug flag set. - with set_coroutine_debug(True): - check() - - def test_task_source_traceback(self): - self.loop.set_debug(True) - - task = self.new_task(self.loop, coroutine_function()) - lineno = sys._getframe().f_lineno - 1 - self.assertIsInstance(task._source_traceback, list) - self.assertEqual(task._source_traceback[-2][:3], - (__file__, - lineno, - 'test_task_source_traceback')) - self.loop.run_until_complete(task) - - def _test_cancel_wait_for(self, timeout): - loop = asyncio.new_event_loop() - self.addCleanup(loop.close) - - async def blocking_coroutine(): - fut = self.new_future(loop) - # Block: fut result is never set - await fut - - task = loop.create_task(blocking_coroutine()) - - wait = loop.create_task(asyncio.wait_for(task, timeout)) - loop.call_soon(wait.cancel) - - self.assertRaises(asyncio.CancelledError, - loop.run_until_complete, wait) - - # Python issue #23219: cancelling the wait must also cancel the task - self.assertTrue(task.cancelled()) - - def test_cancel_blocking_wait_for(self): - self._test_cancel_wait_for(None) - - def test_cancel_wait_for(self): - self._test_cancel_wait_for(60.0) - - def test_cancel_gather_1(self): - """Ensure that a gathering future refuses to be cancelled once all - children are done""" - loop = asyncio.new_event_loop() - self.addCleanup(loop.close) - - fut = self.new_future(loop) - # The indirection fut->child_coro is needed since otherwise the - # gathering task is done at the same time as the child future - def child_coro(): - return (yield from fut) - gather_future = asyncio.gather(child_coro(), loop=loop) - gather_task = asyncio.ensure_future(gather_future, loop=loop) - - cancel_result = None - def cancelling_callback(_): - nonlocal cancel_result - cancel_result = gather_task.cancel() - fut.add_done_callback(cancelling_callback) - - fut.set_result(42) # calls the cancelling_callback after fut is done() - - # At this point the task should complete. - loop.run_until_complete(gather_task) - - # Python issue #26923: asyncio.gather drops cancellation - self.assertEqual(cancel_result, False) - self.assertFalse(gather_task.cancelled()) - self.assertEqual(gather_task.result(), [42]) - - def test_cancel_gather_2(self): - loop = asyncio.new_event_loop() - self.addCleanup(loop.close) - - async def test(): - time = 0 - while True: - time += 0.05 - await asyncio.gather(asyncio.sleep(0.05), - return_exceptions=True, - loop=loop) - if time > 1: - return - - async def main(): - qwe = self.new_task(loop, test()) - await asyncio.sleep(0.2) - qwe.cancel() - try: - await qwe - except asyncio.CancelledError: - pass - else: - self.fail('gather did not propagate the cancellation request') - - loop.run_until_complete(main()) - - def test_exception_traceback(self): - # See http://bugs.python.org/issue28843 - - async def foo(): - 1 / 0 - - async def main(): - task = self.new_task(self.loop, foo()) - await asyncio.sleep(0) # skip one loop iteration - self.assertIsNotNone(task.exception().__traceback__) - - self.loop.run_until_complete(main()) - - @mock.patch('asyncio.base_events.logger') - def test_error_in_call_soon(self, m_log): - def call_soon(callback, *args, **kwargs): - raise ValueError - self.loop.call_soon = call_soon - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(): - pass - - self.assertFalse(m_log.error.called) - - with self.assertRaises(ValueError): - gen = coro() - try: - self.new_task(self.loop, gen) - finally: - gen.close() - - self.assertTrue(m_log.error.called) - message = m_log.error.call_args[0][0] - self.assertIn('Task was destroyed but it is pending', message) - - self.assertEqual(asyncio.all_tasks(self.loop), set()) - - def test_create_task_with_noncoroutine(self): - with self.assertRaisesRegex(TypeError, - "a coroutine was expected, got 123"): - self.new_task(self.loop, 123) - - # test it for the second time to ensure that caching - # in asyncio.iscoroutine() doesn't break things. - with self.assertRaisesRegex(TypeError, - "a coroutine was expected, got 123"): - self.new_task(self.loop, 123) - - def test_create_task_with_oldstyle_coroutine(self): - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(): - pass - - task = self.new_task(self.loop, coro()) - self.assertIsInstance(task, self.Task) - self.loop.run_until_complete(task) - - # test it for the second time to ensure that caching - # in asyncio.iscoroutine() doesn't break things. - task = self.new_task(self.loop, coro()) - self.assertIsInstance(task, self.Task) - self.loop.run_until_complete(task) - - def test_create_task_with_async_function(self): - - async def coro(): - pass - - task = self.new_task(self.loop, coro()) - self.assertIsInstance(task, self.Task) - self.loop.run_until_complete(task) - - # test it for the second time to ensure that caching - # in asyncio.iscoroutine() doesn't break things. - task = self.new_task(self.loop, coro()) - self.assertIsInstance(task, self.Task) - self.loop.run_until_complete(task) - - def test_create_task_with_asynclike_function(self): - task = self.new_task(self.loop, CoroLikeObject()) - self.assertIsInstance(task, self.Task) - self.assertEqual(self.loop.run_until_complete(task), 42) - - # test it for the second time to ensure that caching - # in asyncio.iscoroutine() doesn't break things. - task = self.new_task(self.loop, CoroLikeObject()) - self.assertIsInstance(task, self.Task) - self.assertEqual(self.loop.run_until_complete(task), 42) - - def test_bare_create_task(self): - - async def inner(): - return 1 - - async def coro(): - task = asyncio.create_task(inner()) - self.assertIsInstance(task, self.Task) - ret = await task - self.assertEqual(1, ret) - - self.loop.run_until_complete(coro()) - - def test_bare_create_named_task(self): - - async def coro_noop(): - pass - - async def coro(): - task = asyncio.create_task(coro_noop(), name='No-op') - self.assertEqual(task.get_name(), 'No-op') - await task - - self.loop.run_until_complete(coro()) - - def test_context_1(self): - cvar = contextvars.ContextVar('cvar', default='nope') - - async def sub(): - await asyncio.sleep(0.01) - self.assertEqual(cvar.get(), 'nope') - cvar.set('something else') - - async def main(): - self.assertEqual(cvar.get(), 'nope') - subtask = self.new_task(loop, sub()) - cvar.set('yes') - self.assertEqual(cvar.get(), 'yes') - await subtask - self.assertEqual(cvar.get(), 'yes') - - loop = asyncio.new_event_loop() - try: - task = self.new_task(loop, main()) - loop.run_until_complete(task) - finally: - loop.close() - - def test_context_2(self): - cvar = contextvars.ContextVar('cvar', default='nope') - - async def main(): - def fut_on_done(fut): - # This change must not pollute the context - # of the "main()" task. - cvar.set('something else') - - self.assertEqual(cvar.get(), 'nope') - - for j in range(2): - fut = self.new_future(loop) - fut.add_done_callback(fut_on_done) - cvar.set(f'yes{j}') - loop.call_soon(fut.set_result, None) - await fut - self.assertEqual(cvar.get(), f'yes{j}') - - for i in range(3): - # Test that task passed its context to add_done_callback: - cvar.set(f'yes{i}-{j}') - await asyncio.sleep(0.001) - self.assertEqual(cvar.get(), f'yes{i}-{j}') - - loop = asyncio.new_event_loop() - try: - task = self.new_task(loop, main()) - loop.run_until_complete(task) - finally: - loop.close() - - self.assertEqual(cvar.get(), 'nope') - - def test_context_3(self): - # Run 100 Tasks in parallel, each modifying cvar. - - cvar = contextvars.ContextVar('cvar', default=-1) - - async def sub(num): - for i in range(10): - cvar.set(num + i) - await asyncio.sleep(random.uniform(0.001, 0.05)) - self.assertEqual(cvar.get(), num + i) - - async def main(): - tasks = [] - for i in range(100): - task = loop.create_task(sub(random.randint(0, 10))) - tasks.append(task) - - await asyncio.gather(*tasks, loop=loop) - - loop = asyncio.new_event_loop() - try: - loop.run_until_complete(main()) - finally: - loop.close() - - self.assertEqual(cvar.get(), -1) - - def test_get_coro(self): - loop = asyncio.new_event_loop() - coro = coroutine_function() - try: - task = self.new_task(loop, coro) - loop.run_until_complete(task) - self.assertIs(task.get_coro(), coro) - finally: - loop.close() - - -def add_subclass_tests(cls): - BaseTask = cls.Task - BaseFuture = cls.Future - - if BaseTask is None or BaseFuture is None: - return cls - - class CommonFuture: - def __init__(self, *args, **kwargs): - self.calls = collections.defaultdict(lambda: 0) - super().__init__(*args, **kwargs) - - def add_done_callback(self, *args, **kwargs): - self.calls['add_done_callback'] += 1 - return super().add_done_callback(*args, **kwargs) - - class Task(CommonFuture, BaseTask): - pass - - class Future(CommonFuture, BaseFuture): - pass - - def test_subclasses_ctask_cfuture(self): - fut = self.Future(loop=self.loop) - - async def func(): - self.loop.call_soon(lambda: fut.set_result('spam')) - return await fut - - task = self.Task(func(), loop=self.loop) - - result = self.loop.run_until_complete(task) - - self.assertEqual(result, 'spam') - - self.assertEqual( - dict(task.calls), - {'add_done_callback': 1}) - - self.assertEqual( - dict(fut.calls), - {'add_done_callback': 1}) - - # Add patched Task & Future back to the test case - cls.Task = Task - cls.Future = Future - - # Add an extra unit-test - cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture - - # Disable the "test_task_source_traceback" test - # (the test is hardcoded for a particular call stack, which - # is slightly different for Task subclasses) - cls.test_task_source_traceback = None - - return cls - - -class SetMethodsTest: - - def test_set_result_causes_invalid_state(self): - Future = type(self).Future - self.loop.call_exception_handler = exc_handler = mock.Mock() - - async def foo(): - await asyncio.sleep(0.1) - return 10 - - coro = foo() - task = self.new_task(self.loop, coro) - Future.set_result(task, 'spam') - - self.assertEqual( - self.loop.run_until_complete(task), - 'spam') - - exc_handler.assert_called_once() - exc = exc_handler.call_args[0][0]['exception'] - with self.assertRaisesRegex(asyncio.InvalidStateError, - r'step\(\): already done'): - raise exc - - coro.close() - - def test_set_exception_causes_invalid_state(self): - class MyExc(Exception): - pass - - Future = type(self).Future - self.loop.call_exception_handler = exc_handler = mock.Mock() - - async def foo(): - await asyncio.sleep(0.1) - return 10 - - coro = foo() - task = self.new_task(self.loop, coro) - Future.set_exception(task, MyExc()) - - with self.assertRaises(MyExc): - self.loop.run_until_complete(task) - - exc_handler.assert_called_once() - exc = exc_handler.call_args[0][0]['exception'] - with self.assertRaisesRegex(asyncio.InvalidStateError, - r'step\(\): already done'): - raise exc - - coro.close() - - -@unittest.skipUnless(hasattr(futures, '_CFuture') and - hasattr(tasks, '_CTask'), - 'requires the C _asyncio module') -class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest, - test_utils.TestCase): - - Task = getattr(tasks, '_CTask', None) - Future = getattr(futures, '_CFuture', None) - - @support.refcount_test - def test_refleaks_in_task___init__(self): - gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') - async def coro(): - pass - task = self.new_task(self.loop, coro()) - self.loop.run_until_complete(task) - refs_before = gettotalrefcount() - for i in range(100): - task.__init__(coro(), loop=self.loop) - self.loop.run_until_complete(task) - self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) - - def test_del__log_destroy_pending_segfault(self): - async def coro(): - pass - task = self.new_task(self.loop, coro()) - self.loop.run_until_complete(task) - with self.assertRaises(AttributeError): - del task._log_destroy_pending - - -@unittest.skipUnless(hasattr(futures, '_CFuture') and - hasattr(tasks, '_CTask'), - 'requires the C _asyncio module') -@add_subclass_tests -class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): - - Task = getattr(tasks, '_CTask', None) - Future = getattr(futures, '_CFuture', None) - - -@unittest.skipUnless(hasattr(tasks, '_CTask'), - 'requires the C _asyncio module') -@add_subclass_tests -class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): - - Task = getattr(tasks, '_CTask', None) - Future = futures._PyFuture - - -@unittest.skipUnless(hasattr(futures, '_CFuture'), - 'requires the C _asyncio module') -@add_subclass_tests -class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase): - - Future = getattr(futures, '_CFuture', None) - Task = tasks._PyTask - - -@unittest.skipUnless(hasattr(tasks, '_CTask'), - 'requires the C _asyncio module') -class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): - - Task = getattr(tasks, '_CTask', None) - Future = futures._PyFuture - - -@unittest.skipUnless(hasattr(futures, '_CFuture'), - 'requires the C _asyncio module') -class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): - - Task = tasks._PyTask - Future = getattr(futures, '_CFuture', None) - - -class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, - test_utils.TestCase): - - Task = tasks._PyTask - Future = futures._PyFuture - - -@add_subclass_tests -class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): - Task = tasks._PyTask - Future = futures._PyFuture - - -@unittest.skipUnless(hasattr(tasks, '_CTask'), - 'requires the C _asyncio module') -class CTask_Future_Tests(test_utils.TestCase): - - def test_foobar(self): - class Fut(asyncio.Future): - @property - def get_loop(self): - raise AttributeError - - async def coro(): - await fut - return 'spam' - - self.loop = asyncio.new_event_loop() - try: - fut = Fut(loop=self.loop) - self.loop.call_later(0.1, fut.set_result, 1) - task = self.loop.create_task(coro()) - res = self.loop.run_until_complete(task) - finally: - self.loop.close() - - self.assertEqual(res, 'spam') - - -class BaseTaskIntrospectionTests: - _register_task = None - _unregister_task = None - _enter_task = None - _leave_task = None - - def test__register_task_1(self): - class TaskLike: - @property - def _loop(self): - return loop - - def done(self): - return False - - task = TaskLike() - loop = mock.Mock() - - self.assertEqual(asyncio.all_tasks(loop), set()) - self._register_task(task) - self.assertEqual(asyncio.all_tasks(loop), {task}) - self._unregister_task(task) - - def test__register_task_2(self): - class TaskLike: - def get_loop(self): - return loop - - def done(self): - return False - - task = TaskLike() - loop = mock.Mock() - - self.assertEqual(asyncio.all_tasks(loop), set()) - self._register_task(task) - self.assertEqual(asyncio.all_tasks(loop), {task}) - self._unregister_task(task) - - def test__register_task_3(self): - class TaskLike: - def get_loop(self): - return loop - - def done(self): - return True - - task = TaskLike() - loop = mock.Mock() - - self.assertEqual(asyncio.all_tasks(loop), set()) - self._register_task(task) - self.assertEqual(asyncio.all_tasks(loop), set()) - with self.assertWarns(DeprecationWarning): - self.assertEqual(asyncio.Task.all_tasks(loop), {task}) - self._unregister_task(task) - - def test__enter_task(self): - task = mock.Mock() - loop = mock.Mock() - self.assertIsNone(asyncio.current_task(loop)) - self._enter_task(loop, task) - self.assertIs(asyncio.current_task(loop), task) - self._leave_task(loop, task) - - def test__enter_task_failure(self): - task1 = mock.Mock() - task2 = mock.Mock() - loop = mock.Mock() - self._enter_task(loop, task1) - with self.assertRaises(RuntimeError): - self._enter_task(loop, task2) - self.assertIs(asyncio.current_task(loop), task1) - self._leave_task(loop, task1) - - def test__leave_task(self): - task = mock.Mock() - loop = mock.Mock() - self._enter_task(loop, task) - self._leave_task(loop, task) - self.assertIsNone(asyncio.current_task(loop)) - - def test__leave_task_failure1(self): - task1 = mock.Mock() - task2 = mock.Mock() - loop = mock.Mock() - self._enter_task(loop, task1) - with self.assertRaises(RuntimeError): - self._leave_task(loop, task2) - self.assertIs(asyncio.current_task(loop), task1) - self._leave_task(loop, task1) - - def test__leave_task_failure2(self): - task = mock.Mock() - loop = mock.Mock() - with self.assertRaises(RuntimeError): - self._leave_task(loop, task) - self.assertIsNone(asyncio.current_task(loop)) - - def test__unregister_task(self): - task = mock.Mock() - loop = mock.Mock() - task.get_loop = lambda: loop - self._register_task(task) - self._unregister_task(task) - self.assertEqual(asyncio.all_tasks(loop), set()) - - def test__unregister_task_not_registered(self): - task = mock.Mock() - loop = mock.Mock() - self._unregister_task(task) - self.assertEqual(asyncio.all_tasks(loop), set()) - - -class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): - _register_task = staticmethod(tasks._py_register_task) - _unregister_task = staticmethod(tasks._py_unregister_task) - _enter_task = staticmethod(tasks._py_enter_task) - _leave_task = staticmethod(tasks._py_leave_task) - - -@unittest.skipUnless(hasattr(tasks, '_c_register_task'), - 'requires the C _asyncio module') -class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): - if hasattr(tasks, '_c_register_task'): - _register_task = staticmethod(tasks._c_register_task) - _unregister_task = staticmethod(tasks._c_unregister_task) - _enter_task = staticmethod(tasks._c_enter_task) - _leave_task = staticmethod(tasks._c_leave_task) - else: - _register_task = _unregister_task = _enter_task = _leave_task = None - - -class BaseCurrentLoopTests: - - def setUp(self): - super().setUp() - self.loop = asyncio.new_event_loop() - self.set_event_loop(self.loop) - - def new_task(self, coro): - raise NotImplementedError - - def test_current_task_no_running_loop(self): - self.assertIsNone(asyncio.current_task(loop=self.loop)) - - def test_current_task_no_running_loop_implicit(self): - with self.assertRaises(RuntimeError): - asyncio.current_task() - - def test_current_task_with_implicit_loop(self): - async def coro(): - self.assertIs(asyncio.current_task(loop=self.loop), task) - - self.assertIs(asyncio.current_task(None), task) - self.assertIs(asyncio.current_task(), task) - - task = self.new_task(coro()) - self.loop.run_until_complete(task) - self.assertIsNone(asyncio.current_task(loop=self.loop)) - - -class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): - - def new_task(self, coro): - return tasks._PyTask(coro, loop=self.loop) - - -@unittest.skipUnless(hasattr(tasks, '_CTask'), - 'requires the C _asyncio module') -class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): - - def new_task(self, coro): - return getattr(tasks, '_CTask')(coro, loop=self.loop) - - -class GenericTaskTests(test_utils.TestCase): - - def test_future_subclass(self): - self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) - - def test_asyncio_module_compiled(self): - # Because of circular imports it's easy to make _asyncio - # module non-importable. This is a simple test that will - # fail on systems where C modules were successfully compiled - # (hence the test for _functools), but _asyncio somehow didn't. - try: - import _functools - except ImportError: - pass - else: - try: - import _asyncio - except ImportError: - self.fail('_asyncio module is missing') - - -class GatherTestsBase: - - def setUp(self): - super().setUp() - self.one_loop = self.new_test_loop() - self.other_loop = self.new_test_loop() - self.set_event_loop(self.one_loop, cleanup=False) - - def _run_loop(self, loop): - while loop._ready: - test_utils.run_briefly(loop) - - def _check_success(self, **kwargs): - a, b, c = [self.one_loop.create_future() for i in range(3)] - fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs) - cb = test_utils.MockCallback() - fut.add_done_callback(cb) - b.set_result(1) - a.set_result(2) - self._run_loop(self.one_loop) - self.assertEqual(cb.called, False) - self.assertFalse(fut.done()) - c.set_result(3) - self._run_loop(self.one_loop) - cb.assert_called_once_with(fut) - self.assertEqual(fut.result(), [2, 1, 3]) - - def test_success(self): - self._check_success() - self._check_success(return_exceptions=False) - - def test_result_exception_success(self): - self._check_success(return_exceptions=True) - - def test_one_exception(self): - a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] - fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e)) - cb = test_utils.MockCallback() - fut.add_done_callback(cb) - exc = ZeroDivisionError() - a.set_result(1) - b.set_exception(exc) - self._run_loop(self.one_loop) - self.assertTrue(fut.done()) - cb.assert_called_once_with(fut) - self.assertIs(fut.exception(), exc) - # Does nothing - c.set_result(3) - d.cancel() - e.set_exception(RuntimeError()) - e.exception() - - def test_return_exceptions(self): - a, b, c, d = [self.one_loop.create_future() for i in range(4)] - fut = asyncio.gather(*self.wrap_futures(a, b, c, d), - return_exceptions=True) - cb = test_utils.MockCallback() - fut.add_done_callback(cb) - exc = ZeroDivisionError() - exc2 = RuntimeError() - b.set_result(1) - c.set_exception(exc) - a.set_result(3) - self._run_loop(self.one_loop) - self.assertFalse(fut.done()) - d.set_exception(exc2) - self._run_loop(self.one_loop) - self.assertTrue(fut.done()) - cb.assert_called_once_with(fut) - self.assertEqual(fut.result(), [3, 1, exc, exc2]) - - def test_env_var_debug(self): - code = '\n'.join(( - 'import asyncio.coroutines', - 'print(asyncio.coroutines._DEBUG)')) - - # Test with -E to not fail if the unit test was run with - # PYTHONASYNCIODEBUG set to a non-empty string - sts, stdout, stderr = assert_python_ok('-E', '-c', code) - self.assertEqual(stdout.rstrip(), b'False') - - sts, stdout, stderr = assert_python_ok('-c', code, - PYTHONASYNCIODEBUG='', - PYTHONDEVMODE='') - self.assertEqual(stdout.rstrip(), b'False') - - sts, stdout, stderr = assert_python_ok('-c', code, - PYTHONASYNCIODEBUG='1', - PYTHONDEVMODE='') - self.assertEqual(stdout.rstrip(), b'True') - - sts, stdout, stderr = assert_python_ok('-E', '-c', code, - PYTHONASYNCIODEBUG='1', - PYTHONDEVMODE='') - self.assertEqual(stdout.rstrip(), b'False') - - # -X dev - sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', - '-c', code) - self.assertEqual(stdout.rstrip(), b'True') - - -class FutureGatherTests(GatherTestsBase, test_utils.TestCase): - - def wrap_futures(self, *futures): - return futures - - def _check_empty_sequence(self, seq_or_iter): - asyncio.set_event_loop(self.one_loop) - self.addCleanup(asyncio.set_event_loop, None) - fut = asyncio.gather(*seq_or_iter) - self.assertIsInstance(fut, asyncio.Future) - self.assertIs(fut._loop, self.one_loop) - self._run_loop(self.one_loop) - self.assertTrue(fut.done()) - self.assertEqual(fut.result(), []) - with self.assertWarns(DeprecationWarning): - fut = asyncio.gather(*seq_or_iter, loop=self.other_loop) - self.assertIs(fut._loop, self.other_loop) - - def test_constructor_empty_sequence(self): - self._check_empty_sequence([]) - self._check_empty_sequence(()) - self._check_empty_sequence(set()) - self._check_empty_sequence(iter("")) - - def test_constructor_heterogenous_futures(self): - fut1 = self.one_loop.create_future() - fut2 = self.other_loop.create_future() - with self.assertRaises(ValueError): - asyncio.gather(fut1, fut2) - with self.assertRaises(ValueError): - asyncio.gather(fut1, loop=self.other_loop) - - def test_constructor_homogenous_futures(self): - children = [self.other_loop.create_future() for i in range(3)] - fut = asyncio.gather(*children) - self.assertIs(fut._loop, self.other_loop) - self._run_loop(self.other_loop) - self.assertFalse(fut.done()) - fut = asyncio.gather(*children, loop=self.other_loop) - self.assertIs(fut._loop, self.other_loop) - self._run_loop(self.other_loop) - self.assertFalse(fut.done()) - - def test_one_cancellation(self): - a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] - fut = asyncio.gather(a, b, c, d, e) - cb = test_utils.MockCallback() - fut.add_done_callback(cb) - a.set_result(1) - b.cancel() - self._run_loop(self.one_loop) - self.assertTrue(fut.done()) - cb.assert_called_once_with(fut) - self.assertFalse(fut.cancelled()) - self.assertIsInstance(fut.exception(), asyncio.CancelledError) - # Does nothing - c.set_result(3) - d.cancel() - e.set_exception(RuntimeError()) - e.exception() - - def test_result_exception_one_cancellation(self): - a, b, c, d, e, f = [self.one_loop.create_future() - for i in range(6)] - fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True) - cb = test_utils.MockCallback() - fut.add_done_callback(cb) - a.set_result(1) - zde = ZeroDivisionError() - b.set_exception(zde) - c.cancel() - self._run_loop(self.one_loop) - self.assertFalse(fut.done()) - d.set_result(3) - e.cancel() - rte = RuntimeError() - f.set_exception(rte) - res = self.one_loop.run_until_complete(fut) - self.assertIsInstance(res[2], asyncio.CancelledError) - self.assertIsInstance(res[4], asyncio.CancelledError) - res[2] = res[4] = None - self.assertEqual(res, [1, zde, None, 3, None, rte]) - cb.assert_called_once_with(fut) - - -class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): - - def setUp(self): - super().setUp() - asyncio.set_event_loop(self.one_loop) - - def wrap_futures(self, *futures): - coros = [] - for fut in futures: - async def coro(fut=fut): - return await fut - coros.append(coro()) - return coros - - def test_constructor_loop_selection(self): - async def coro(): - return 'abc' - gen1 = coro() - gen2 = coro() - fut = asyncio.gather(gen1, gen2) - self.assertIs(fut._loop, self.one_loop) - self.one_loop.run_until_complete(fut) - - self.set_event_loop(self.other_loop, cleanup=False) - gen3 = coro() - gen4 = coro() - fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop) - self.assertIs(fut2._loop, self.other_loop) - self.other_loop.run_until_complete(fut2) - - def test_duplicate_coroutines(self): - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(s): - return s - c = coro('abc') - fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop) - self._run_loop(self.one_loop) - self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc']) - - def test_cancellation_broadcast(self): - # Cancelling outer() cancels all children. - proof = 0 - waiter = self.one_loop.create_future() - - async def inner(): - nonlocal proof - await waiter - proof += 1 - - child1 = asyncio.ensure_future(inner(), loop=self.one_loop) - child2 = asyncio.ensure_future(inner(), loop=self.one_loop) - gatherer = None - - async def outer(): - nonlocal proof, gatherer - gatherer = asyncio.gather(child1, child2, loop=self.one_loop) - await gatherer - proof += 100 - - f = asyncio.ensure_future(outer(), loop=self.one_loop) - test_utils.run_briefly(self.one_loop) - self.assertTrue(f.cancel()) - with self.assertRaises(asyncio.CancelledError): - self.one_loop.run_until_complete(f) - self.assertFalse(gatherer.cancel()) - self.assertTrue(waiter.cancelled()) - self.assertTrue(child1.cancelled()) - self.assertTrue(child2.cancelled()) - test_utils.run_briefly(self.one_loop) - self.assertEqual(proof, 0) - - def test_exception_marking(self): - # Test for the first line marked "Mark exception retrieved." - - async def inner(f): - await f - raise RuntimeError('should not be ignored') - - a = self.one_loop.create_future() - b = self.one_loop.create_future() - - async def outer(): - await asyncio.gather(inner(a), inner(b), loop=self.one_loop) - - f = asyncio.ensure_future(outer(), loop=self.one_loop) - test_utils.run_briefly(self.one_loop) - a.set_result(None) - test_utils.run_briefly(self.one_loop) - b.set_result(None) - test_utils.run_briefly(self.one_loop) - self.assertIsInstance(f.exception(), RuntimeError) - - -class RunCoroutineThreadsafeTests(test_utils.TestCase): - """Test case for asyncio.run_coroutine_threadsafe.""" - - def setUp(self): - super().setUp() - self.loop = asyncio.new_event_loop() - self.set_event_loop(self.loop) # Will cleanup properly - - async def add(self, a, b, fail=False, cancel=False): - """Wait 0.05 second and return a + b.""" - await asyncio.sleep(0.05) - if fail: - raise RuntimeError("Fail!") - if cancel: - asyncio.current_task(self.loop).cancel() - await asyncio.sleep(0) - return a + b - - def target(self, fail=False, cancel=False, timeout=None, - advance_coro=False): - """Run add coroutine in the event loop.""" - coro = self.add(1, 2, fail=fail, cancel=cancel) - future = asyncio.run_coroutine_threadsafe(coro, self.loop) - if advance_coro: - # this is for test_run_coroutine_threadsafe_task_factory_exception; - # otherwise it spills errors and breaks **other** unittests, since - # 'target' is interacting with threads. - - # With this call, `coro` will be advanced, so that - # CoroWrapper.__del__ won't do anything when asyncio tests run - # in debug mode. - self.loop.call_soon_threadsafe(coro.send, None) - try: - return future.result(timeout) - finally: - future.done() or future.cancel() - - def test_run_coroutine_threadsafe(self): - """Test coroutine submission from a thread to an event loop.""" - future = self.loop.run_in_executor(None, self.target) - result = self.loop.run_until_complete(future) - self.assertEqual(result, 3) - - def test_run_coroutine_threadsafe_with_exception(self): - """Test coroutine submission from a thread to an event loop - when an exception is raised.""" - future = self.loop.run_in_executor(None, self.target, True) - with self.assertRaises(RuntimeError) as exc_context: - self.loop.run_until_complete(future) - self.assertIn("Fail!", exc_context.exception.args) - - def test_run_coroutine_threadsafe_with_timeout(self): - """Test coroutine submission from a thread to an event loop - when a timeout is raised.""" - callback = lambda: self.target(timeout=0) - future = self.loop.run_in_executor(None, callback) - with self.assertRaises(asyncio.TimeoutError): - self.loop.run_until_complete(future) - test_utils.run_briefly(self.loop) - # Check that there's no pending task (add has been cancelled) - for task in asyncio.all_tasks(self.loop): - self.assertTrue(task.done()) - - def test_run_coroutine_threadsafe_task_cancelled(self): - """Test coroutine submission from a tread to an event loop - when the task is cancelled.""" - callback = lambda: self.target(cancel=True) - future = self.loop.run_in_executor(None, callback) - with self.assertRaises(asyncio.CancelledError): - self.loop.run_until_complete(future) - - def test_run_coroutine_threadsafe_task_factory_exception(self): - """Test coroutine submission from a tread to an event loop - when the task factory raise an exception.""" - - def task_factory(loop, coro): - raise NameError - - run = self.loop.run_in_executor( - None, lambda: self.target(advance_coro=True)) - - # Set exception handler - callback = test_utils.MockCallback() - self.loop.set_exception_handler(callback) - - # Set corrupted task factory - self.addCleanup(self.loop.set_task_factory, - self.loop.get_task_factory()) - self.loop.set_task_factory(task_factory) - - # Run event loop - with self.assertRaises(NameError) as exc_context: - self.loop.run_until_complete(run) - - # Check exceptions - self.assertEqual(len(callback.call_args_list), 1) - (loop, context), kwargs = callback.call_args - self.assertEqual(context['exception'], exc_context.exception) - - -class SleepTests(test_utils.TestCase): - def setUp(self): - super().setUp() - self.loop = asyncio.new_event_loop() - self.set_event_loop(self.loop) - - def tearDown(self): - self.loop.close() - self.loop = None - super().tearDown() - - def test_sleep_zero(self): - result = 0 - - def inc_result(num): - nonlocal result - result += num - - async def coro(): - self.loop.call_soon(inc_result, 1) - self.assertEqual(result, 0) - num = await asyncio.sleep(0, result=10) - self.assertEqual(result, 1) # inc'ed by call_soon - inc_result(num) # num should be 11 - - self.loop.run_until_complete(coro()) - self.assertEqual(result, 11) - - def test_loop_argument_is_deprecated(self): - # Remove test when loop argument is removed in Python 3.10 - with self.assertWarns(DeprecationWarning): - self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop)) - - -class WaitTests(test_utils.TestCase): - def setUp(self): - super().setUp() - self.loop = asyncio.new_event_loop() - self.set_event_loop(self.loop) - - def tearDown(self): - self.loop.close() - self.loop = None - super().tearDown() - - def test_loop_argument_is_deprecated_in_wait(self): - # Remove test when loop argument is removed in Python 3.10 - with self.assertWarns(DeprecationWarning): - self.loop.run_until_complete( - asyncio.wait([coroutine_function()], loop=self.loop)) - - def test_loop_argument_is_deprecated_in_wait_for(self): - # Remove test when loop argument is removed in Python 3.10 - with self.assertWarns(DeprecationWarning): - self.loop.run_until_complete( - asyncio.wait_for(coroutine_function(), 0.01, loop=self.loop)) - - -class CompatibilityTests(test_utils.TestCase): - # Tests for checking a bridge between old-styled coroutines - # and async/await syntax - - def setUp(self): - super().setUp() - self.loop = asyncio.new_event_loop() - self.set_event_loop(self.loop) - - def tearDown(self): - self.loop.close() - self.loop = None - super().tearDown() - - def test_yield_from_awaitable(self): - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro(): - yield from asyncio.sleep(0) - return 'ok' - - result = self.loop.run_until_complete(coro()) - self.assertEqual('ok', result) - - def test_await_old_style_coro(self): - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro1(): - return 'ok1' - - with self.assertWarns(DeprecationWarning): - @asyncio.coroutine - def coro2(): - yield from asyncio.sleep(0) - return 'ok2' - - async def inner(): - return await asyncio.gather(coro1(), coro2(), loop=self.loop) - - result = self.loop.run_until_complete(inner()) - self.assertEqual(['ok1', 'ok2'], result) - - def test_debug_mode_interop(self): - # https://bugs.python.org/issue32636 - code = textwrap.dedent(""" - import asyncio - - async def native_coro(): - pass - - @asyncio.coroutine - def old_style_coro(): - yield from native_coro() - - asyncio.run(old_style_coro()) - """) - - assert_python_ok("-Wignore::DeprecationWarning", "-c", code, - PYTHONASYNCIODEBUG="1") - - -if __name__ == '__main__': - unittest.main() |