summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/test_tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asyncio/test_tasks.py')
-rw-r--r--Lib/test/test_asyncio/test_tasks.py3375
1 files changed, 0 insertions, 3375 deletions
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
deleted file mode 100644
index dde84b8..0000000
--- a/Lib/test/test_asyncio/test_tasks.py
+++ /dev/null
@@ -1,3375 +0,0 @@
-"""Tests for tasks.py."""
-
-import collections
-import contextlib
-import contextvars
-import functools
-import gc
-import io
-import random
-import re
-import sys
-import textwrap
-import types
-import unittest
-import weakref
-from unittest import mock
-
-import asyncio
-from asyncio import coroutines
-from asyncio import futures
-from asyncio import tasks
-from test.test_asyncio import utils as test_utils
-from test import support
-from test.support.script_helper import assert_python_ok
-
-
-def tearDownModule():
- asyncio.set_event_loop_policy(None)
-
-
-async def coroutine_function():
- pass
-
-
-@contextlib.contextmanager
-def set_coroutine_debug(enabled):
- coroutines = asyncio.coroutines
-
- old_debug = coroutines._DEBUG
- try:
- coroutines._DEBUG = enabled
- yield
- finally:
- coroutines._DEBUG = old_debug
-
-
-def format_coroutine(qualname, state, src, source_traceback, generator=False):
- if generator:
- state = '%s' % state
- else:
- state = '%s, defined' % state
- if source_traceback is not None:
- frame = source_traceback[-1]
- return ('coro=<%s() %s at %s> created at %s:%s'
- % (qualname, state, src, frame[0], frame[1]))
- else:
- return 'coro=<%s() %s at %s>' % (qualname, state, src)
-
-
-class Dummy:
-
- def __repr__(self):
- return '<Dummy>'
-
- def __call__(self, *args):
- pass
-
-
-class CoroLikeObject:
- def send(self, v):
- raise StopIteration(42)
-
- def throw(self, *exc):
- pass
-
- def close(self):
- pass
-
- def __await__(self):
- return self
-
-
-class BaseTaskTests:
-
- Task = None
- Future = None
-
- def new_task(self, loop, coro, name='TestTask'):
- return self.__class__.Task(coro, loop=loop, name=name)
-
- def new_future(self, loop):
- return self.__class__.Future(loop=loop)
-
- def setUp(self):
- super().setUp()
- self.loop = self.new_test_loop()
- self.loop.set_task_factory(self.new_task)
- self.loop.create_future = lambda: self.new_future(self.loop)
-
- def test_task_del_collect(self):
- class Evil:
- def __del__(self):
- gc.collect()
-
- async def run():
- return Evil()
-
- self.loop.run_until_complete(
- asyncio.gather(*[
- self.new_task(self.loop, run()) for _ in range(100)
- ], loop=self.loop))
-
- def test_other_loop_future(self):
- other_loop = asyncio.new_event_loop()
- fut = self.new_future(other_loop)
-
- async def run(fut):
- await fut
-
- try:
- with self.assertRaisesRegex(RuntimeError,
- r'Task .* got Future .* attached'):
- self.loop.run_until_complete(run(fut))
- finally:
- other_loop.close()
-
- def test_task_awaits_on_itself(self):
-
- async def test():
- await task
-
- task = asyncio.ensure_future(test(), loop=self.loop)
-
- with self.assertRaisesRegex(RuntimeError,
- 'Task cannot await on itself'):
- self.loop.run_until_complete(task)
-
- def test_task_class(self):
- async def notmuch():
- return 'ok'
- t = self.new_task(self.loop, notmuch())
- self.loop.run_until_complete(t)
- self.assertTrue(t.done())
- self.assertEqual(t.result(), 'ok')
- self.assertIs(t._loop, self.loop)
- self.assertIs(t.get_loop(), self.loop)
-
- loop = asyncio.new_event_loop()
- self.set_event_loop(loop)
- t = self.new_task(loop, notmuch())
- self.assertIs(t._loop, loop)
- loop.run_until_complete(t)
- loop.close()
-
- def test_ensure_future_coroutine(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def notmuch():
- return 'ok'
- t = asyncio.ensure_future(notmuch(), loop=self.loop)
- self.loop.run_until_complete(t)
- self.assertTrue(t.done())
- self.assertEqual(t.result(), 'ok')
- self.assertIs(t._loop, self.loop)
-
- loop = asyncio.new_event_loop()
- self.set_event_loop(loop)
- t = asyncio.ensure_future(notmuch(), loop=loop)
- self.assertIs(t._loop, loop)
- loop.run_until_complete(t)
- loop.close()
-
- def test_ensure_future_future(self):
- f_orig = self.new_future(self.loop)
- f_orig.set_result('ko')
-
- f = asyncio.ensure_future(f_orig)
- self.loop.run_until_complete(f)
- self.assertTrue(f.done())
- self.assertEqual(f.result(), 'ko')
- self.assertIs(f, f_orig)
-
- loop = asyncio.new_event_loop()
- self.set_event_loop(loop)
-
- with self.assertRaises(ValueError):
- f = asyncio.ensure_future(f_orig, loop=loop)
-
- loop.close()
-
- f = asyncio.ensure_future(f_orig, loop=self.loop)
- self.assertIs(f, f_orig)
-
- def test_ensure_future_task(self):
- async def notmuch():
- return 'ok'
- t_orig = self.new_task(self.loop, notmuch())
- t = asyncio.ensure_future(t_orig)
- self.loop.run_until_complete(t)
- self.assertTrue(t.done())
- self.assertEqual(t.result(), 'ok')
- self.assertIs(t, t_orig)
-
- loop = asyncio.new_event_loop()
- self.set_event_loop(loop)
-
- with self.assertRaises(ValueError):
- t = asyncio.ensure_future(t_orig, loop=loop)
-
- loop.close()
-
- t = asyncio.ensure_future(t_orig, loop=self.loop)
- self.assertIs(t, t_orig)
-
- def test_ensure_future_awaitable(self):
- class Aw:
- def __init__(self, coro):
- self.coro = coro
- def __await__(self):
- return (yield from self.coro)
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- return 'ok'
-
- loop = asyncio.new_event_loop()
- self.set_event_loop(loop)
- fut = asyncio.ensure_future(Aw(coro()), loop=loop)
- loop.run_until_complete(fut)
- assert fut.result() == 'ok'
-
- def test_ensure_future_neither(self):
- with self.assertRaises(TypeError):
- asyncio.ensure_future('ok')
-
- def test_ensure_future_error_msg(self):
- loop = asyncio.new_event_loop()
- f = self.new_future(self.loop)
- with self.assertRaisesRegex(ValueError, 'The future belongs to a '
- 'different loop than the one specified as '
- 'the loop argument'):
- asyncio.ensure_future(f, loop=loop)
- loop.close()
-
- def test_get_stack(self):
- T = None
-
- async def foo():
- await bar()
-
- async def bar():
- # test get_stack()
- f = T.get_stack(limit=1)
- try:
- self.assertEqual(f[0].f_code.co_name, 'foo')
- finally:
- f = None
-
- # test print_stack()
- file = io.StringIO()
- T.print_stack(limit=1, file=file)
- file.seek(0)
- tb = file.read()
- self.assertRegex(tb, r'foo\(\) running')
-
- async def runner():
- nonlocal T
- T = asyncio.ensure_future(foo(), loop=self.loop)
- await T
-
- self.loop.run_until_complete(runner())
-
- def test_task_repr(self):
- self.loop.set_debug(False)
-
- async def notmuch():
- return 'abc'
-
- # test coroutine function
- self.assertEqual(notmuch.__name__, 'notmuch')
- self.assertRegex(notmuch.__qualname__,
- r'\w+.test_task_repr.<locals>.notmuch')
- self.assertEqual(notmuch.__module__, __name__)
-
- filename, lineno = test_utils.get_function_source(notmuch)
- src = "%s:%s" % (filename, lineno)
-
- # test coroutine object
- gen = notmuch()
- coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch'
- self.assertEqual(gen.__name__, 'notmuch')
- self.assertEqual(gen.__qualname__, coro_qualname)
-
- # test pending Task
- t = self.new_task(self.loop, gen)
- t.add_done_callback(Dummy())
-
- coro = format_coroutine(coro_qualname, 'running', src,
- t._source_traceback, generator=True)
- self.assertEqual(repr(t),
- "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
-
- # test cancelling Task
- t.cancel() # Does not take immediate effect!
- self.assertEqual(repr(t),
- "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro)
-
- # test cancelled Task
- self.assertRaises(asyncio.CancelledError,
- self.loop.run_until_complete, t)
- coro = format_coroutine(coro_qualname, 'done', src,
- t._source_traceback)
- self.assertEqual(repr(t),
- "<Task cancelled name='TestTask' %s>" % coro)
-
- # test finished Task
- t = self.new_task(self.loop, notmuch())
- self.loop.run_until_complete(t)
- coro = format_coroutine(coro_qualname, 'done', src,
- t._source_traceback)
- self.assertEqual(repr(t),
- "<Task finished name='TestTask' %s result='abc'>" % coro)
-
- def test_task_repr_autogenerated(self):
- async def notmuch():
- return 123
-
- t1 = self.new_task(self.loop, notmuch(), None)
- t2 = self.new_task(self.loop, notmuch(), None)
- self.assertNotEqual(repr(t1), repr(t2))
-
- match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1))
- self.assertIsNotNone(match1)
- match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2))
- self.assertIsNotNone(match2)
-
- # Autogenerated task names should have monotonically increasing numbers
- self.assertLess(int(match1.group(1)), int(match2.group(1)))
- self.loop.run_until_complete(t1)
- self.loop.run_until_complete(t2)
-
- def test_task_repr_name_not_str(self):
- async def notmuch():
- return 123
-
- t = self.new_task(self.loop, notmuch())
- t.set_name({6})
- self.assertEqual(t.get_name(), '{6}')
- self.loop.run_until_complete(t)
-
- def test_task_repr_coro_decorator(self):
- self.loop.set_debug(False)
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def notmuch():
- # notmuch() function doesn't use yield from: it will be wrapped by
- # @coroutine decorator
- return 123
-
- # test coroutine function
- self.assertEqual(notmuch.__name__, 'notmuch')
- self.assertRegex(notmuch.__qualname__,
- r'\w+.test_task_repr_coro_decorator'
- r'\.<locals>\.notmuch')
- self.assertEqual(notmuch.__module__, __name__)
-
- # test coroutine object
- gen = notmuch()
- # On Python >= 3.5, generators now inherit the name of the
- # function, as expected, and have a qualified name (__qualname__
- # attribute).
- coro_name = 'notmuch'
- coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator'
- '.<locals>.notmuch')
- self.assertEqual(gen.__name__, coro_name)
- self.assertEqual(gen.__qualname__, coro_qualname)
-
- # test repr(CoroWrapper)
- if coroutines._DEBUG:
- # format the coroutine object
- if coroutines._DEBUG:
- filename, lineno = test_utils.get_function_source(notmuch)
- frame = gen._source_traceback[-1]
- coro = ('%s() running, defined at %s:%s, created at %s:%s'
- % (coro_qualname, filename, lineno,
- frame[0], frame[1]))
- else:
- code = gen.gi_code
- coro = ('%s() running at %s:%s'
- % (coro_qualname, code.co_filename,
- code.co_firstlineno))
-
- self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro)
-
- # test pending Task
- t = self.new_task(self.loop, gen)
- t.add_done_callback(Dummy())
-
- # format the coroutine object
- if coroutines._DEBUG:
- src = '%s:%s' % test_utils.get_function_source(notmuch)
- else:
- code = gen.gi_code
- src = '%s:%s' % (code.co_filename, code.co_firstlineno)
- coro = format_coroutine(coro_qualname, 'running', src,
- t._source_traceback,
- generator=not coroutines._DEBUG)
- self.assertEqual(repr(t),
- "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
- self.loop.run_until_complete(t)
-
- def test_task_repr_wait_for(self):
- self.loop.set_debug(False)
-
- async def wait_for(fut):
- return await fut
-
- fut = self.new_future(self.loop)
- task = self.new_task(self.loop, wait_for(fut))
- test_utils.run_briefly(self.loop)
- self.assertRegex(repr(task),
- '<Task .* wait_for=%s>' % re.escape(repr(fut)))
-
- fut.set_result(None)
- self.loop.run_until_complete(task)
-
- def test_task_repr_partial_corowrapper(self):
- # Issue #222: repr(CoroWrapper) must not fail in debug mode if the
- # coroutine is a partial function
- with set_coroutine_debug(True):
- self.loop.set_debug(True)
-
- async def func(x, y):
- await asyncio.sleep(0)
-
- with self.assertWarns(DeprecationWarning):
- partial_func = asyncio.coroutine(functools.partial(func, 1))
- task = self.loop.create_task(partial_func(2))
-
- # make warnings quiet
- task._log_destroy_pending = False
- self.addCleanup(task._coro.close)
-
- coro_repr = repr(task._coro)
- expected = (
- r'<coroutine object \w+\.test_task_repr_partial_corowrapper'
- r'\.<locals>\.func at'
- )
- self.assertRegex(coro_repr, expected)
-
- def test_task_basics(self):
-
- async def outer():
- a = await inner1()
- b = await inner2()
- return a+b
-
- async def inner1():
- return 42
-
- async def inner2():
- return 1000
-
- t = outer()
- self.assertEqual(self.loop.run_until_complete(t), 1042)
-
- def test_cancel(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(10.0, when)
- yield 0
-
- loop = self.new_test_loop(gen)
-
- async def task():
- await asyncio.sleep(10.0)
- return 12
-
- t = self.new_task(loop, task())
- loop.call_soon(t.cancel)
- with self.assertRaises(asyncio.CancelledError):
- loop.run_until_complete(t)
- self.assertTrue(t.done())
- self.assertTrue(t.cancelled())
- self.assertFalse(t.cancel())
-
- def test_cancel_yield(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def task():
- yield
- yield
- return 12
-
- t = self.new_task(self.loop, task())
- test_utils.run_briefly(self.loop) # start coro
- t.cancel()
- self.assertRaises(
- asyncio.CancelledError, self.loop.run_until_complete, t)
- self.assertTrue(t.done())
- self.assertTrue(t.cancelled())
- self.assertFalse(t.cancel())
-
- def test_cancel_inner_future(self):
- f = self.new_future(self.loop)
-
- async def task():
- await f
- return 12
-
- t = self.new_task(self.loop, task())
- test_utils.run_briefly(self.loop) # start task
- f.cancel()
- with self.assertRaises(asyncio.CancelledError):
- self.loop.run_until_complete(t)
- self.assertTrue(f.cancelled())
- self.assertTrue(t.cancelled())
-
- def test_cancel_both_task_and_inner_future(self):
- f = self.new_future(self.loop)
-
- async def task():
- await f
- return 12
-
- t = self.new_task(self.loop, task())
- test_utils.run_briefly(self.loop)
-
- f.cancel()
- t.cancel()
-
- with self.assertRaises(asyncio.CancelledError):
- self.loop.run_until_complete(t)
-
- self.assertTrue(t.done())
- self.assertTrue(f.cancelled())
- self.assertTrue(t.cancelled())
-
- def test_cancel_task_catching(self):
- fut1 = self.new_future(self.loop)
- fut2 = self.new_future(self.loop)
-
- async def task():
- await fut1
- try:
- await fut2
- except asyncio.CancelledError:
- return 42
-
- t = self.new_task(self.loop, task())
- test_utils.run_briefly(self.loop)
- self.assertIs(t._fut_waiter, fut1) # White-box test.
- fut1.set_result(None)
- test_utils.run_briefly(self.loop)
- self.assertIs(t._fut_waiter, fut2) # White-box test.
- t.cancel()
- self.assertTrue(fut2.cancelled())
- res = self.loop.run_until_complete(t)
- self.assertEqual(res, 42)
- self.assertFalse(t.cancelled())
-
- def test_cancel_task_ignoring(self):
- fut1 = self.new_future(self.loop)
- fut2 = self.new_future(self.loop)
- fut3 = self.new_future(self.loop)
-
- async def task():
- await fut1
- try:
- await fut2
- except asyncio.CancelledError:
- pass
- res = await fut3
- return res
-
- t = self.new_task(self.loop, task())
- test_utils.run_briefly(self.loop)
- self.assertIs(t._fut_waiter, fut1) # White-box test.
- fut1.set_result(None)
- test_utils.run_briefly(self.loop)
- self.assertIs(t._fut_waiter, fut2) # White-box test.
- t.cancel()
- self.assertTrue(fut2.cancelled())
- test_utils.run_briefly(self.loop)
- self.assertIs(t._fut_waiter, fut3) # White-box test.
- fut3.set_result(42)
- res = self.loop.run_until_complete(t)
- self.assertEqual(res, 42)
- self.assertFalse(fut3.cancelled())
- self.assertFalse(t.cancelled())
-
- def test_cancel_current_task(self):
- loop = asyncio.new_event_loop()
- self.set_event_loop(loop)
-
- async def task():
- t.cancel()
- self.assertTrue(t._must_cancel) # White-box test.
- # The sleep should be cancelled immediately.
- await asyncio.sleep(100)
- return 12
-
- t = self.new_task(loop, task())
- self.assertFalse(t.cancelled())
- self.assertRaises(
- asyncio.CancelledError, loop.run_until_complete, t)
- self.assertTrue(t.done())
- self.assertTrue(t.cancelled())
- self.assertFalse(t._must_cancel) # White-box test.
- self.assertFalse(t.cancel())
-
- def test_cancel_at_end(self):
- """coroutine end right after task is cancelled"""
- loop = asyncio.new_event_loop()
- self.set_event_loop(loop)
-
- async def task():
- t.cancel()
- self.assertTrue(t._must_cancel) # White-box test.
- return 12
-
- t = self.new_task(loop, task())
- self.assertFalse(t.cancelled())
- self.assertRaises(
- asyncio.CancelledError, loop.run_until_complete, t)
- self.assertTrue(t.done())
- self.assertTrue(t.cancelled())
- self.assertFalse(t._must_cancel) # White-box test.
- self.assertFalse(t.cancel())
-
- def test_cancel_awaited_task(self):
- # This tests for a relatively rare condition when
- # a task cancellation is requested for a task which is not
- # currently blocked, such as a task cancelling itself.
- # In this situation we must ensure that whatever next future
- # or task the cancelled task blocks on is cancelled correctly
- # as well. See also bpo-34872.
- loop = asyncio.new_event_loop()
- self.addCleanup(lambda: loop.close())
-
- task = nested_task = None
- fut = self.new_future(loop)
-
- async def nested():
- await fut
-
- async def coro():
- nonlocal nested_task
- # Create a sub-task and wait for it to run.
- nested_task = self.new_task(loop, nested())
- await asyncio.sleep(0)
-
- # Request the current task to be cancelled.
- task.cancel()
- # Block on the nested task, which should be immediately
- # cancelled.
- await nested_task
-
- task = self.new_task(loop, coro())
- with self.assertRaises(asyncio.CancelledError):
- loop.run_until_complete(task)
-
- self.assertTrue(task.cancelled())
- self.assertTrue(nested_task.cancelled())
- self.assertTrue(fut.cancelled())
-
- def test_stop_while_run_in_complete(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(0.1, when)
- when = yield 0.1
- self.assertAlmostEqual(0.2, when)
- when = yield 0.1
- self.assertAlmostEqual(0.3, when)
- yield 0.1
-
- loop = self.new_test_loop(gen)
-
- x = 0
-
- async def task():
- nonlocal x
- while x < 10:
- await asyncio.sleep(0.1)
- x += 1
- if x == 2:
- loop.stop()
-
- t = self.new_task(loop, task())
- with self.assertRaises(RuntimeError) as cm:
- loop.run_until_complete(t)
- self.assertEqual(str(cm.exception),
- 'Event loop stopped before Future completed.')
- self.assertFalse(t.done())
- self.assertEqual(x, 2)
- self.assertAlmostEqual(0.3, loop.time())
-
- t.cancel()
- self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
-
- def test_log_traceback(self):
- async def coro():
- pass
-
- task = self.new_task(self.loop, coro())
- with self.assertRaisesRegex(ValueError, 'can only be set to False'):
- task._log_traceback = True
- self.loop.run_until_complete(task)
-
- def test_wait_for_timeout_less_then_0_or_0_future_done(self):
- def gen():
- when = yield
- self.assertAlmostEqual(0, when)
-
- loop = self.new_test_loop(gen)
-
- fut = self.new_future(loop)
- fut.set_result('done')
-
- ret = loop.run_until_complete(asyncio.wait_for(fut, 0))
-
- self.assertEqual(ret, 'done')
- self.assertTrue(fut.done())
- self.assertAlmostEqual(0, loop.time())
-
- def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
- def gen():
- when = yield
- self.assertAlmostEqual(0, when)
-
- loop = self.new_test_loop(gen)
-
- foo_started = False
-
- async def foo():
- nonlocal foo_started
- foo_started = True
-
- with self.assertRaises(asyncio.TimeoutError):
- loop.run_until_complete(asyncio.wait_for(foo(), 0))
-
- self.assertAlmostEqual(0, loop.time())
- self.assertEqual(foo_started, False)
-
- def test_wait_for_timeout_less_then_0_or_0(self):
- def gen():
- when = yield
- self.assertAlmostEqual(0.2, when)
- when = yield 0
- self.assertAlmostEqual(0, when)
-
- for timeout in [0, -1]:
- with self.subTest(timeout=timeout):
- loop = self.new_test_loop(gen)
-
- foo_running = None
-
- async def foo():
- nonlocal foo_running
- foo_running = True
- try:
- await asyncio.sleep(0.2)
- finally:
- foo_running = False
- return 'done'
-
- fut = self.new_task(loop, foo())
-
- with self.assertRaises(asyncio.TimeoutError):
- loop.run_until_complete(asyncio.wait_for(fut, timeout))
- self.assertTrue(fut.done())
- # it should have been cancelled due to the timeout
- self.assertTrue(fut.cancelled())
- self.assertAlmostEqual(0, loop.time())
- self.assertEqual(foo_running, False)
-
- def test_wait_for(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(0.2, when)
- when = yield 0
- self.assertAlmostEqual(0.1, when)
- when = yield 0.1
-
- loop = self.new_test_loop(gen)
-
- foo_running = None
-
- async def foo():
- nonlocal foo_running
- foo_running = True
- try:
- await asyncio.sleep(0.2)
- finally:
- foo_running = False
- return 'done'
-
- fut = self.new_task(loop, foo())
-
- with self.assertRaises(asyncio.TimeoutError):
- loop.run_until_complete(asyncio.wait_for(fut, 0.1))
- self.assertTrue(fut.done())
- # it should have been cancelled due to the timeout
- self.assertTrue(fut.cancelled())
- self.assertAlmostEqual(0.1, loop.time())
- self.assertEqual(foo_running, False)
-
- def test_wait_for_blocking(self):
- loop = self.new_test_loop()
-
- async def coro():
- return 'done'
-
- res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None))
- self.assertEqual(res, 'done')
-
- def test_wait_for_with_global_loop(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(0.2, when)
- when = yield 0
- self.assertAlmostEqual(0.01, when)
- yield 0.01
-
- loop = self.new_test_loop(gen)
-
- async def foo():
- await asyncio.sleep(0.2)
- return 'done'
-
- asyncio.set_event_loop(loop)
- try:
- fut = self.new_task(loop, foo())
- with self.assertRaises(asyncio.TimeoutError):
- loop.run_until_complete(asyncio.wait_for(fut, 0.01))
- finally:
- asyncio.set_event_loop(None)
-
- self.assertAlmostEqual(0.01, loop.time())
- self.assertTrue(fut.done())
- self.assertTrue(fut.cancelled())
-
- def test_wait_for_race_condition(self):
-
- def gen():
- yield 0.1
- yield 0.1
- yield 0.1
-
- loop = self.new_test_loop(gen)
-
- fut = self.new_future(loop)
- task = asyncio.wait_for(fut, timeout=0.2)
- loop.call_later(0.1, fut.set_result, "ok")
- res = loop.run_until_complete(task)
- self.assertEqual(res, "ok")
-
- def test_wait_for_waits_for_task_cancellation(self):
- loop = asyncio.new_event_loop()
- self.addCleanup(loop.close)
-
- task_done = False
-
- async def foo():
- async def inner():
- nonlocal task_done
- try:
- await asyncio.sleep(0.2)
- finally:
- task_done = True
-
- inner_task = self.new_task(loop, inner())
-
- with self.assertRaises(asyncio.TimeoutError):
- await asyncio.wait_for(inner_task, timeout=0.1)
-
- self.assertTrue(task_done)
-
- loop.run_until_complete(foo())
-
- def test_wait_for_self_cancellation(self):
- loop = asyncio.new_event_loop()
- self.addCleanup(loop.close)
-
- async def foo():
- async def inner():
- try:
- await asyncio.sleep(0.3)
- except asyncio.CancelledError:
- try:
- await asyncio.sleep(0.3)
- except asyncio.CancelledError:
- await asyncio.sleep(0.3)
-
- return 42
-
- inner_task = self.new_task(loop, inner())
-
- wait = asyncio.wait_for(inner_task, timeout=0.1)
-
- # Test that wait_for itself is properly cancellable
- # even when the initial task holds up the initial cancellation.
- task = self.new_task(loop, wait)
- await asyncio.sleep(0.2)
- task.cancel()
-
- with self.assertRaises(asyncio.CancelledError):
- await task
-
- self.assertEqual(await inner_task, 42)
-
- loop.run_until_complete(foo())
-
- def test_wait(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(0.1, when)
- when = yield 0
- self.assertAlmostEqual(0.15, when)
- yield 0.15
-
- loop = self.new_test_loop(gen)
-
- a = self.new_task(loop, asyncio.sleep(0.1))
- b = self.new_task(loop, asyncio.sleep(0.15))
-
- async def foo():
- done, pending = await asyncio.wait([b, a])
- self.assertEqual(done, set([a, b]))
- self.assertEqual(pending, set())
- return 42
-
- res = loop.run_until_complete(self.new_task(loop, foo()))
- self.assertEqual(res, 42)
- self.assertAlmostEqual(0.15, loop.time())
-
- # Doing it again should take no time and exercise a different path.
- res = loop.run_until_complete(self.new_task(loop, foo()))
- self.assertAlmostEqual(0.15, loop.time())
- self.assertEqual(res, 42)
-
- def test_wait_with_global_loop(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(0.01, when)
- when = yield 0
- self.assertAlmostEqual(0.015, when)
- yield 0.015
-
- loop = self.new_test_loop(gen)
-
- a = self.new_task(loop, asyncio.sleep(0.01))
- b = self.new_task(loop, asyncio.sleep(0.015))
-
- async def foo():
- done, pending = await asyncio.wait([b, a])
- self.assertEqual(done, set([a, b]))
- self.assertEqual(pending, set())
- return 42
-
- asyncio.set_event_loop(loop)
- res = loop.run_until_complete(
- self.new_task(loop, foo()))
-
- self.assertEqual(res, 42)
-
- def test_wait_duplicate_coroutines(self):
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro(s):
- return s
- c = coro('test')
-
- task =self.new_task(
- self.loop,
- asyncio.wait([c, c, coro('spam')]))
-
- done, pending = self.loop.run_until_complete(task)
-
- self.assertFalse(pending)
- self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
-
- def test_wait_errors(self):
- self.assertRaises(
- ValueError, self.loop.run_until_complete,
- asyncio.wait(set()))
-
- # -1 is an invalid return_when value
- sleep_coro = asyncio.sleep(10.0)
- wait_coro = asyncio.wait([sleep_coro], return_when=-1)
- self.assertRaises(ValueError,
- self.loop.run_until_complete, wait_coro)
-
- sleep_coro.close()
-
- def test_wait_first_completed(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(10.0, when)
- when = yield 0
- self.assertAlmostEqual(0.1, when)
- yield 0.1
-
- loop = self.new_test_loop(gen)
-
- a = self.new_task(loop, asyncio.sleep(10.0))
- b = self.new_task(loop, asyncio.sleep(0.1))
- task = self.new_task(
- loop,
- asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
-
- done, pending = loop.run_until_complete(task)
- self.assertEqual({b}, done)
- self.assertEqual({a}, pending)
- self.assertFalse(a.done())
- self.assertTrue(b.done())
- self.assertIsNone(b.result())
- self.assertAlmostEqual(0.1, loop.time())
-
- # move forward to close generator
- loop.advance_time(10)
- loop.run_until_complete(asyncio.wait([a, b]))
-
- def test_wait_really_done(self):
- # there is possibility that some tasks in the pending list
- # became done but their callbacks haven't all been called yet
-
- async def coro1():
- await asyncio.sleep(0)
-
- async def coro2():
- await asyncio.sleep(0)
- await asyncio.sleep(0)
-
- a = self.new_task(self.loop, coro1())
- b = self.new_task(self.loop, coro2())
- task = self.new_task(
- self.loop,
- asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
-
- done, pending = self.loop.run_until_complete(task)
- self.assertEqual({a, b}, done)
- self.assertTrue(a.done())
- self.assertIsNone(a.result())
- self.assertTrue(b.done())
- self.assertIsNone(b.result())
-
- def test_wait_first_exception(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(10.0, when)
- yield 0
-
- loop = self.new_test_loop(gen)
-
- # first_exception, task already has exception
- a = self.new_task(loop, asyncio.sleep(10.0))
-
- async def exc():
- raise ZeroDivisionError('err')
-
- b = self.new_task(loop, exc())
- task = self.new_task(
- loop,
- asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION))
-
- done, pending = loop.run_until_complete(task)
- self.assertEqual({b}, done)
- self.assertEqual({a}, pending)
- self.assertAlmostEqual(0, loop.time())
-
- # move forward to close generator
- loop.advance_time(10)
- loop.run_until_complete(asyncio.wait([a, b]))
-
- def test_wait_first_exception_in_wait(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(10.0, when)
- when = yield 0
- self.assertAlmostEqual(0.01, when)
- yield 0.01
-
- loop = self.new_test_loop(gen)
-
- # first_exception, exception during waiting
- a = self.new_task(loop, asyncio.sleep(10.0))
-
- async def exc():
- await asyncio.sleep(0.01)
- raise ZeroDivisionError('err')
-
- b = self.new_task(loop, exc())
- task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)
-
- done, pending = loop.run_until_complete(task)
- self.assertEqual({b}, done)
- self.assertEqual({a}, pending)
- self.assertAlmostEqual(0.01, loop.time())
-
- # move forward to close generator
- loop.advance_time(10)
- loop.run_until_complete(asyncio.wait([a, b]))
-
- def test_wait_with_exception(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(0.1, when)
- when = yield 0
- self.assertAlmostEqual(0.15, when)
- yield 0.15
-
- loop = self.new_test_loop(gen)
-
- a = self.new_task(loop, asyncio.sleep(0.1))
-
- async def sleeper():
- await asyncio.sleep(0.15)
- raise ZeroDivisionError('really')
-
- b = self.new_task(loop, sleeper())
-
- async def foo():
- done, pending = await asyncio.wait([b, a])
- self.assertEqual(len(done), 2)
- self.assertEqual(pending, set())
- errors = set(f for f in done if f.exception() is not None)
- self.assertEqual(len(errors), 1)
-
- loop.run_until_complete(self.new_task(loop, foo()))
- self.assertAlmostEqual(0.15, loop.time())
-
- loop.run_until_complete(self.new_task(loop, foo()))
- self.assertAlmostEqual(0.15, loop.time())
-
- def test_wait_with_timeout(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(0.1, when)
- when = yield 0
- self.assertAlmostEqual(0.15, when)
- when = yield 0
- self.assertAlmostEqual(0.11, when)
- yield 0.11
-
- loop = self.new_test_loop(gen)
-
- a = self.new_task(loop, asyncio.sleep(0.1))
- b = self.new_task(loop, asyncio.sleep(0.15))
-
- async def foo():
- done, pending = await asyncio.wait([b, a], timeout=0.11)
- self.assertEqual(done, set([a]))
- self.assertEqual(pending, set([b]))
-
- loop.run_until_complete(self.new_task(loop, foo()))
- self.assertAlmostEqual(0.11, loop.time())
-
- # move forward to close generator
- loop.advance_time(10)
- loop.run_until_complete(asyncio.wait([a, b]))
-
- def test_wait_concurrent_complete(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(0.1, when)
- when = yield 0
- self.assertAlmostEqual(0.15, when)
- when = yield 0
- self.assertAlmostEqual(0.1, when)
- yield 0.1
-
- loop = self.new_test_loop(gen)
-
- a = self.new_task(loop, asyncio.sleep(0.1))
- b = self.new_task(loop, asyncio.sleep(0.15))
-
- done, pending = loop.run_until_complete(
- asyncio.wait([b, a], timeout=0.1))
-
- self.assertEqual(done, set([a]))
- self.assertEqual(pending, set([b]))
- self.assertAlmostEqual(0.1, loop.time())
-
- # move forward to close generator
- loop.advance_time(10)
- loop.run_until_complete(asyncio.wait([a, b]))
-
- def test_as_completed(self):
-
- def gen():
- yield 0
- yield 0
- yield 0.01
- yield 0
-
- loop = self.new_test_loop(gen)
- # disable "slow callback" warning
- loop.slow_callback_duration = 1.0
- completed = set()
- time_shifted = False
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def sleeper(dt, x):
- nonlocal time_shifted
- yield from asyncio.sleep(dt)
- completed.add(x)
- if not time_shifted and 'a' in completed and 'b' in completed:
- time_shifted = True
- loop.advance_time(0.14)
- return x
-
- a = sleeper(0.01, 'a')
- b = sleeper(0.01, 'b')
- c = sleeper(0.15, 'c')
-
- async def foo():
- values = []
- for f in asyncio.as_completed([b, c, a], loop=loop):
- values.append(await f)
- return values
- with self.assertWarns(DeprecationWarning):
- res = loop.run_until_complete(self.new_task(loop, foo()))
- self.assertAlmostEqual(0.15, loop.time())
- self.assertTrue('a' in res[:2])
- self.assertTrue('b' in res[:2])
- self.assertEqual(res[2], 'c')
-
- # Doing it again should take no time and exercise a different path.
- with self.assertWarns(DeprecationWarning):
- res = loop.run_until_complete(self.new_task(loop, foo()))
- self.assertAlmostEqual(0.15, loop.time())
-
- def test_as_completed_with_timeout(self):
-
- def gen():
- yield
- yield 0
- yield 0
- yield 0.1
-
- loop = self.new_test_loop(gen)
-
- a = loop.create_task(asyncio.sleep(0.1, 'a'))
- b = loop.create_task(asyncio.sleep(0.15, 'b'))
-
- async def foo():
- values = []
- for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop):
- if values:
- loop.advance_time(0.02)
- try:
- v = await f
- values.append((1, v))
- except asyncio.TimeoutError as exc:
- values.append((2, exc))
- return values
-
- with self.assertWarns(DeprecationWarning):
- res = loop.run_until_complete(self.new_task(loop, foo()))
- self.assertEqual(len(res), 2, res)
- self.assertEqual(res[0], (1, 'a'))
- self.assertEqual(res[1][0], 2)
- self.assertIsInstance(res[1][1], asyncio.TimeoutError)
- self.assertAlmostEqual(0.12, loop.time())
-
- # move forward to close generator
- loop.advance_time(10)
- loop.run_until_complete(asyncio.wait([a, b]))
-
- def test_as_completed_with_unused_timeout(self):
-
- def gen():
- yield
- yield 0
- yield 0.01
-
- loop = self.new_test_loop(gen)
-
- a = asyncio.sleep(0.01, 'a')
-
- async def foo():
- for f in asyncio.as_completed([a], timeout=1, loop=loop):
- v = await f
- self.assertEqual(v, 'a')
-
- with self.assertWarns(DeprecationWarning):
- loop.run_until_complete(self.new_task(loop, foo()))
-
- def test_as_completed_reverse_wait(self):
-
- def gen():
- yield 0
- yield 0.05
- yield 0
-
- loop = self.new_test_loop(gen)
-
- a = asyncio.sleep(0.05, 'a')
- b = asyncio.sleep(0.10, 'b')
- fs = {a, b}
-
- with self.assertWarns(DeprecationWarning):
- futs = list(asyncio.as_completed(fs, loop=loop))
- self.assertEqual(len(futs), 2)
-
- x = loop.run_until_complete(futs[1])
- self.assertEqual(x, 'a')
- self.assertAlmostEqual(0.05, loop.time())
- loop.advance_time(0.05)
- y = loop.run_until_complete(futs[0])
- self.assertEqual(y, 'b')
- self.assertAlmostEqual(0.10, loop.time())
-
- def test_as_completed_concurrent(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(0.05, when)
- when = yield 0
- self.assertAlmostEqual(0.05, when)
- yield 0.05
-
- loop = self.new_test_loop(gen)
-
- a = asyncio.sleep(0.05, 'a')
- b = asyncio.sleep(0.05, 'b')
- fs = {a, b}
- with self.assertWarns(DeprecationWarning):
- futs = list(asyncio.as_completed(fs, loop=loop))
- self.assertEqual(len(futs), 2)
- waiter = asyncio.wait(futs)
- done, pending = loop.run_until_complete(waiter)
- self.assertEqual(set(f.result() for f in done), {'a', 'b'})
-
- def test_as_completed_duplicate_coroutines(self):
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro(s):
- return s
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def runner():
- result = []
- c = coro('ham')
- for f in asyncio.as_completed([c, c, coro('spam')],
- loop=self.loop):
- result.append((yield from f))
- return result
-
- with self.assertWarns(DeprecationWarning):
- fut = self.new_task(self.loop, runner())
- self.loop.run_until_complete(fut)
- result = fut.result()
- self.assertEqual(set(result), {'ham', 'spam'})
- self.assertEqual(len(result), 2)
-
- def test_sleep(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(0.05, when)
- when = yield 0.05
- self.assertAlmostEqual(0.1, when)
- yield 0.05
-
- loop = self.new_test_loop(gen)
-
- async def sleeper(dt, arg):
- await asyncio.sleep(dt/2)
- res = await asyncio.sleep(dt/2, arg)
- return res
-
- t = self.new_task(loop, sleeper(0.1, 'yeah'))
- loop.run_until_complete(t)
- self.assertTrue(t.done())
- self.assertEqual(t.result(), 'yeah')
- self.assertAlmostEqual(0.1, loop.time())
-
- def test_sleep_cancel(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(10.0, when)
- yield 0
-
- loop = self.new_test_loop(gen)
-
- t = self.new_task(loop, asyncio.sleep(10.0, 'yeah'))
-
- handle = None
- orig_call_later = loop.call_later
-
- def call_later(delay, callback, *args):
- nonlocal handle
- handle = orig_call_later(delay, callback, *args)
- return handle
-
- loop.call_later = call_later
- test_utils.run_briefly(loop)
-
- self.assertFalse(handle._cancelled)
-
- t.cancel()
- test_utils.run_briefly(loop)
- self.assertTrue(handle._cancelled)
-
- def test_task_cancel_sleeping_task(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(0.1, when)
- when = yield 0
- self.assertAlmostEqual(5000, when)
- yield 0.1
-
- loop = self.new_test_loop(gen)
-
- async def sleep(dt):
- await asyncio.sleep(dt)
-
- async def doit():
- sleeper = self.new_task(loop, sleep(5000))
- loop.call_later(0.1, sleeper.cancel)
- try:
- await sleeper
- except asyncio.CancelledError:
- return 'cancelled'
- else:
- return 'slept in'
-
- doer = doit()
- self.assertEqual(loop.run_until_complete(doer), 'cancelled')
- self.assertAlmostEqual(0.1, loop.time())
-
- def test_task_cancel_waiter_future(self):
- fut = self.new_future(self.loop)
-
- async def coro():
- await fut
-
- task = self.new_task(self.loop, coro())
- test_utils.run_briefly(self.loop)
- self.assertIs(task._fut_waiter, fut)
-
- task.cancel()
- test_utils.run_briefly(self.loop)
- self.assertRaises(
- asyncio.CancelledError, self.loop.run_until_complete, task)
- self.assertIsNone(task._fut_waiter)
- self.assertTrue(fut.cancelled())
-
- def test_task_set_methods(self):
- async def notmuch():
- return 'ko'
-
- gen = notmuch()
- task = self.new_task(self.loop, gen)
-
- with self.assertRaisesRegex(RuntimeError, 'not support set_result'):
- task.set_result('ok')
-
- with self.assertRaisesRegex(RuntimeError, 'not support set_exception'):
- task.set_exception(ValueError())
-
- self.assertEqual(
- self.loop.run_until_complete(task),
- 'ko')
-
- def test_step_result(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def notmuch():
- yield None
- yield 1
- return 'ko'
-
- self.assertRaises(
- RuntimeError, self.loop.run_until_complete, notmuch())
-
- def test_step_result_future(self):
- # If coroutine returns future, task waits on this future.
-
- class Fut(asyncio.Future):
- def __init__(self, *args, **kwds):
- self.cb_added = False
- super().__init__(*args, **kwds)
-
- def add_done_callback(self, *args, **kwargs):
- self.cb_added = True
- super().add_done_callback(*args, **kwargs)
-
- fut = Fut(loop=self.loop)
- result = None
-
- async def wait_for_future():
- nonlocal result
- result = await fut
-
- t = self.new_task(self.loop, wait_for_future())
- test_utils.run_briefly(self.loop)
- self.assertTrue(fut.cb_added)
-
- res = object()
- fut.set_result(res)
- test_utils.run_briefly(self.loop)
- self.assertIs(res, result)
- self.assertTrue(t.done())
- self.assertIsNone(t.result())
-
- def test_baseexception_during_cancel(self):
-
- def gen():
- when = yield
- self.assertAlmostEqual(10.0, when)
- yield 0
-
- loop = self.new_test_loop(gen)
-
- async def sleeper():
- await asyncio.sleep(10)
-
- base_exc = SystemExit()
-
- async def notmutch():
- try:
- await sleeper()
- except asyncio.CancelledError:
- raise base_exc
-
- task = self.new_task(loop, notmutch())
- test_utils.run_briefly(loop)
-
- task.cancel()
- self.assertFalse(task.done())
-
- self.assertRaises(SystemExit, test_utils.run_briefly, loop)
-
- self.assertTrue(task.done())
- self.assertFalse(task.cancelled())
- self.assertIs(task.exception(), base_exc)
-
- def test_iscoroutinefunction(self):
- def fn():
- pass
-
- self.assertFalse(asyncio.iscoroutinefunction(fn))
-
- def fn1():
- yield
- self.assertFalse(asyncio.iscoroutinefunction(fn1))
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def fn2():
- yield
- self.assertTrue(asyncio.iscoroutinefunction(fn2))
-
- self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
-
- def test_yield_vs_yield_from(self):
- fut = self.new_future(self.loop)
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def wait_for_future():
- yield fut
-
- task = wait_for_future()
- with self.assertRaises(RuntimeError):
- self.loop.run_until_complete(task)
-
- self.assertFalse(fut.done())
-
- def test_yield_vs_yield_from_generator(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- yield
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def wait_for_future():
- gen = coro()
- try:
- yield gen
- finally:
- gen.close()
-
- task = wait_for_future()
- self.assertRaises(
- RuntimeError,
- self.loop.run_until_complete, task)
-
- def test_coroutine_non_gen_function(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def func():
- return 'test'
-
- self.assertTrue(asyncio.iscoroutinefunction(func))
-
- coro = func()
- self.assertTrue(asyncio.iscoroutine(coro))
-
- res = self.loop.run_until_complete(coro)
- self.assertEqual(res, 'test')
-
- def test_coroutine_non_gen_function_return_future(self):
- fut = self.new_future(self.loop)
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def func():
- return fut
-
- async def coro():
- fut.set_result('test')
-
- t1 = self.new_task(self.loop, func())
- t2 = self.new_task(self.loop, coro())
- res = self.loop.run_until_complete(t1)
- self.assertEqual(res, 'test')
- self.assertIsNone(t2.result())
-
-
- def test_current_task_deprecated(self):
- Task = self.__class__.Task
-
- with self.assertWarns(DeprecationWarning):
- self.assertIsNone(Task.current_task(loop=self.loop))
-
- async def coro(loop):
- with self.assertWarns(DeprecationWarning):
- self.assertIs(Task.current_task(loop=loop), task)
-
- # See http://bugs.python.org/issue29271 for details:
- asyncio.set_event_loop(loop)
- try:
- with self.assertWarns(DeprecationWarning):
- self.assertIs(Task.current_task(None), task)
- with self.assertWarns(DeprecationWarning):
- self.assertIs(Task.current_task(), task)
- finally:
- asyncio.set_event_loop(None)
-
- task = self.new_task(self.loop, coro(self.loop))
- self.loop.run_until_complete(task)
- with self.assertWarns(DeprecationWarning):
- self.assertIsNone(Task.current_task(loop=self.loop))
-
- def test_current_task(self):
- self.assertIsNone(asyncio.current_task(loop=self.loop))
-
- async def coro(loop):
- self.assertIs(asyncio.current_task(loop=loop), task)
-
- self.assertIs(asyncio.current_task(None), task)
- self.assertIs(asyncio.current_task(), task)
-
- task = self.new_task(self.loop, coro(self.loop))
- self.loop.run_until_complete(task)
- self.assertIsNone(asyncio.current_task(loop=self.loop))
-
- def test_current_task_with_interleaving_tasks(self):
- self.assertIsNone(asyncio.current_task(loop=self.loop))
-
- fut1 = self.new_future(self.loop)
- fut2 = self.new_future(self.loop)
-
- async def coro1(loop):
- self.assertTrue(asyncio.current_task(loop=loop) is task1)
- await fut1
- self.assertTrue(asyncio.current_task(loop=loop) is task1)
- fut2.set_result(True)
-
- async def coro2(loop):
- self.assertTrue(asyncio.current_task(loop=loop) is task2)
- fut1.set_result(True)
- await fut2
- self.assertTrue(asyncio.current_task(loop=loop) is task2)
-
- task1 = self.new_task(self.loop, coro1(self.loop))
- task2 = self.new_task(self.loop, coro2(self.loop))
-
- self.loop.run_until_complete(asyncio.wait((task1, task2)))
- self.assertIsNone(asyncio.current_task(loop=self.loop))
-
- # Some thorough tests for cancellation propagation through
- # coroutines, tasks and wait().
-
- def test_yield_future_passes_cancel(self):
- # Cancelling outer() cancels inner() cancels waiter.
- proof = 0
- waiter = self.new_future(self.loop)
-
- async def inner():
- nonlocal proof
- try:
- await waiter
- except asyncio.CancelledError:
- proof += 1
- raise
- else:
- self.fail('got past sleep() in inner()')
-
- async def outer():
- nonlocal proof
- try:
- await inner()
- except asyncio.CancelledError:
- proof += 100 # Expect this path.
- else:
- proof += 10
-
- f = asyncio.ensure_future(outer(), loop=self.loop)
- test_utils.run_briefly(self.loop)
- f.cancel()
- self.loop.run_until_complete(f)
- self.assertEqual(proof, 101)
- self.assertTrue(waiter.cancelled())
-
- def test_yield_wait_does_not_shield_cancel(self):
- # Cancelling outer() makes wait() return early, leaves inner()
- # running.
- proof = 0
- waiter = self.new_future(self.loop)
-
- async def inner():
- nonlocal proof
- await waiter
- proof += 1
-
- async def outer():
- nonlocal proof
- d, p = await asyncio.wait([inner()])
- proof += 100
-
- f = asyncio.ensure_future(outer(), loop=self.loop)
- test_utils.run_briefly(self.loop)
- f.cancel()
- self.assertRaises(
- asyncio.CancelledError, self.loop.run_until_complete, f)
- waiter.set_result(None)
- test_utils.run_briefly(self.loop)
- self.assertEqual(proof, 1)
-
- def test_shield_result(self):
- inner = self.new_future(self.loop)
- outer = asyncio.shield(inner)
- inner.set_result(42)
- res = self.loop.run_until_complete(outer)
- self.assertEqual(res, 42)
-
- def test_shield_exception(self):
- inner = self.new_future(self.loop)
- outer = asyncio.shield(inner)
- test_utils.run_briefly(self.loop)
- exc = RuntimeError('expected')
- inner.set_exception(exc)
- test_utils.run_briefly(self.loop)
- self.assertIs(outer.exception(), exc)
-
- def test_shield_cancel_inner(self):
- inner = self.new_future(self.loop)
- outer = asyncio.shield(inner)
- test_utils.run_briefly(self.loop)
- inner.cancel()
- test_utils.run_briefly(self.loop)
- self.assertTrue(outer.cancelled())
-
- def test_shield_cancel_outer(self):
- inner = self.new_future(self.loop)
- outer = asyncio.shield(inner)
- test_utils.run_briefly(self.loop)
- outer.cancel()
- test_utils.run_briefly(self.loop)
- self.assertTrue(outer.cancelled())
- self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
-
- def test_shield_shortcut(self):
- fut = self.new_future(self.loop)
- fut.set_result(42)
- res = self.loop.run_until_complete(asyncio.shield(fut))
- self.assertEqual(res, 42)
-
- def test_shield_effect(self):
- # Cancelling outer() does not affect inner().
- proof = 0
- waiter = self.new_future(self.loop)
-
- async def inner():
- nonlocal proof
- await waiter
- proof += 1
-
- async def outer():
- nonlocal proof
- await asyncio.shield(inner())
- proof += 100
-
- f = asyncio.ensure_future(outer(), loop=self.loop)
- test_utils.run_briefly(self.loop)
- f.cancel()
- with self.assertRaises(asyncio.CancelledError):
- self.loop.run_until_complete(f)
- waiter.set_result(None)
- test_utils.run_briefly(self.loop)
- self.assertEqual(proof, 1)
-
- def test_shield_gather(self):
- child1 = self.new_future(self.loop)
- child2 = self.new_future(self.loop)
- parent = asyncio.gather(child1, child2)
- outer = asyncio.shield(parent)
- test_utils.run_briefly(self.loop)
- outer.cancel()
- test_utils.run_briefly(self.loop)
- self.assertTrue(outer.cancelled())
- child1.set_result(1)
- child2.set_result(2)
- test_utils.run_briefly(self.loop)
- self.assertEqual(parent.result(), [1, 2])
-
- def test_gather_shield(self):
- child1 = self.new_future(self.loop)
- child2 = self.new_future(self.loop)
- inner1 = asyncio.shield(child1)
- inner2 = asyncio.shield(child2)
- parent = asyncio.gather(inner1, inner2)
- test_utils.run_briefly(self.loop)
- parent.cancel()
- # This should cancel inner1 and inner2 but bot child1 and child2.
- test_utils.run_briefly(self.loop)
- self.assertIsInstance(parent.exception(), asyncio.CancelledError)
- self.assertTrue(inner1.cancelled())
- self.assertTrue(inner2.cancelled())
- child1.set_result(1)
- child2.set_result(2)
- test_utils.run_briefly(self.loop)
-
- def test_as_completed_invalid_args(self):
- fut = self.new_future(self.loop)
-
- # as_completed() expects a list of futures, not a future instance
- self.assertRaises(TypeError, self.loop.run_until_complete,
- asyncio.as_completed(fut, loop=self.loop))
- coro = coroutine_function()
- self.assertRaises(TypeError, self.loop.run_until_complete,
- asyncio.as_completed(coro, loop=self.loop))
- coro.close()
-
- def test_wait_invalid_args(self):
- fut = self.new_future(self.loop)
-
- # wait() expects a list of futures, not a future instance
- self.assertRaises(TypeError, self.loop.run_until_complete,
- asyncio.wait(fut))
- coro = coroutine_function()
- self.assertRaises(TypeError, self.loop.run_until_complete,
- asyncio.wait(coro))
- coro.close()
-
- # wait() expects at least a future
- self.assertRaises(ValueError, self.loop.run_until_complete,
- asyncio.wait([]))
-
- def test_corowrapper_mocks_generator(self):
-
- def check():
- # A function that asserts various things.
- # Called twice, with different debug flag values.
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- # The actual coroutine.
- self.assertTrue(gen.gi_running)
- yield from fut
-
- # A completed Future used to run the coroutine.
- fut = self.new_future(self.loop)
- fut.set_result(None)
-
- # Call the coroutine.
- gen = coro()
-
- # Check some properties.
- self.assertTrue(asyncio.iscoroutine(gen))
- self.assertIsInstance(gen.gi_frame, types.FrameType)
- self.assertFalse(gen.gi_running)
- self.assertIsInstance(gen.gi_code, types.CodeType)
-
- # Run it.
- self.loop.run_until_complete(gen)
-
- # The frame should have changed.
- self.assertIsNone(gen.gi_frame)
-
- # Test with debug flag cleared.
- with set_coroutine_debug(False):
- check()
-
- # Test with debug flag set.
- with set_coroutine_debug(True):
- check()
-
- def test_yield_from_corowrapper(self):
- with set_coroutine_debug(True):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def t1():
- return (yield from t2())
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def t2():
- f = self.new_future(self.loop)
- self.new_task(self.loop, t3(f))
- return (yield from f)
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def t3(f):
- f.set_result((1, 2, 3))
-
- task = self.new_task(self.loop, t1())
- val = self.loop.run_until_complete(task)
- self.assertEqual(val, (1, 2, 3))
-
- def test_yield_from_corowrapper_send(self):
- def foo():
- a = yield
- return a
-
- def call(arg):
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- try:
- cw.send(arg)
- except StopIteration as ex:
- return ex.args[0]
- else:
- raise AssertionError('StopIteration was expected')
-
- self.assertEqual(call((1, 2)), (1, 2))
- self.assertEqual(call('spam'), 'spam')
-
- def test_corowrapper_weakref(self):
- wd = weakref.WeakValueDictionary()
- def foo(): yield from []
- cw = asyncio.coroutines.CoroWrapper(foo())
- wd['cw'] = cw # Would fail without __weakref__ slot.
- cw.gen = None # Suppress warning from __del__.
-
- def test_corowrapper_throw(self):
- # Issue 429: CoroWrapper.throw must be compatible with gen.throw
- def foo():
- value = None
- while True:
- try:
- value = yield value
- except Exception as e:
- value = e
-
- exception = Exception("foo")
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- self.assertIs(exception, cw.throw(exception))
-
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- self.assertIs(exception, cw.throw(Exception, exception))
-
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- exception = cw.throw(Exception, "foo")
- self.assertIsInstance(exception, Exception)
- self.assertEqual(exception.args, ("foo", ))
-
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- exception = cw.throw(Exception, "foo", None)
- self.assertIsInstance(exception, Exception)
- self.assertEqual(exception.args, ("foo", ))
-
- def test_all_tasks_deprecated(self):
- Task = self.__class__.Task
-
- async def coro():
- with self.assertWarns(DeprecationWarning):
- assert Task.all_tasks(self.loop) == {t}
-
- t = self.new_task(self.loop, coro())
- self.loop.run_until_complete(t)
-
- def test_log_destroyed_pending_task(self):
- Task = self.__class__.Task
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def kill_me(loop):
- future = self.new_future(loop)
- yield from future
- # at this point, the only reference to kill_me() task is
- # the Task._wakeup() method in future._callbacks
- raise Exception("code never reached")
-
- mock_handler = mock.Mock()
- self.loop.set_debug(True)
- self.loop.set_exception_handler(mock_handler)
-
- # schedule the task
- coro = kill_me(self.loop)
- task = asyncio.ensure_future(coro, loop=self.loop)
-
- self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
-
- # See http://bugs.python.org/issue29271 for details:
- asyncio.set_event_loop(self.loop)
- try:
- with self.assertWarns(DeprecationWarning):
- self.assertEqual(Task.all_tasks(), {task})
- with self.assertWarns(DeprecationWarning):
- self.assertEqual(Task.all_tasks(None), {task})
- finally:
- asyncio.set_event_loop(None)
-
- # execute the task so it waits for future
- self.loop._run_once()
- self.assertEqual(len(self.loop._ready), 0)
-
- # remove the future used in kill_me(), and references to the task
- del coro.gi_frame.f_locals['future']
- coro = None
- source_traceback = task._source_traceback
- task = None
-
- # no more reference to kill_me() task: the task is destroyed by the GC
- support.gc_collect()
-
- self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
-
- mock_handler.assert_called_with(self.loop, {
- 'message': 'Task was destroyed but it is pending!',
- 'task': mock.ANY,
- 'source_traceback': source_traceback,
- })
- mock_handler.reset_mock()
-
- @mock.patch('asyncio.base_events.logger')
- def test_tb_logger_not_called_after_cancel(self, m_log):
- loop = asyncio.new_event_loop()
- self.set_event_loop(loop)
-
- async def coro():
- raise TypeError
-
- async def runner():
- task = self.new_task(loop, coro())
- await asyncio.sleep(0.05)
- task.cancel()
- task = None
-
- loop.run_until_complete(runner())
- self.assertFalse(m_log.error.called)
-
- @mock.patch('asyncio.coroutines.logger')
- def test_coroutine_never_yielded(self, m_log):
- with set_coroutine_debug(True):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro_noop():
- pass
-
- tb_filename = __file__
- tb_lineno = sys._getframe().f_lineno + 2
- # create a coroutine object but don't use it
- coro_noop()
- support.gc_collect()
-
- self.assertTrue(m_log.error.called)
- message = m_log.error.call_args[0][0]
- func_filename, func_lineno = test_utils.get_function_source(coro_noop)
-
- regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> '
- r'was never yielded from\n'
- r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n'
- r'.*\n'
- r' File "%s", line %s, in test_coroutine_never_yielded\n'
- r' coro_noop\(\)$'
- % (re.escape(coro_noop.__qualname__),
- re.escape(func_filename), func_lineno,
- re.escape(tb_filename), tb_lineno))
-
- self.assertRegex(message, re.compile(regex, re.DOTALL))
-
- def test_return_coroutine_from_coroutine(self):
- """Return of @asyncio.coroutine()-wrapped function generator object
- from @asyncio.coroutine()-wrapped function should have same effect as
- returning generator object or Future."""
- def check():
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def outer_coro():
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def inner_coro():
- return 1
-
- return inner_coro()
-
- result = self.loop.run_until_complete(outer_coro())
- self.assertEqual(result, 1)
-
- # Test with debug flag cleared.
- with set_coroutine_debug(False):
- check()
-
- # Test with debug flag set.
- with set_coroutine_debug(True):
- check()
-
- def test_task_source_traceback(self):
- self.loop.set_debug(True)
-
- task = self.new_task(self.loop, coroutine_function())
- lineno = sys._getframe().f_lineno - 1
- self.assertIsInstance(task._source_traceback, list)
- self.assertEqual(task._source_traceback[-2][:3],
- (__file__,
- lineno,
- 'test_task_source_traceback'))
- self.loop.run_until_complete(task)
-
- def _test_cancel_wait_for(self, timeout):
- loop = asyncio.new_event_loop()
- self.addCleanup(loop.close)
-
- async def blocking_coroutine():
- fut = self.new_future(loop)
- # Block: fut result is never set
- await fut
-
- task = loop.create_task(blocking_coroutine())
-
- wait = loop.create_task(asyncio.wait_for(task, timeout))
- loop.call_soon(wait.cancel)
-
- self.assertRaises(asyncio.CancelledError,
- loop.run_until_complete, wait)
-
- # Python issue #23219: cancelling the wait must also cancel the task
- self.assertTrue(task.cancelled())
-
- def test_cancel_blocking_wait_for(self):
- self._test_cancel_wait_for(None)
-
- def test_cancel_wait_for(self):
- self._test_cancel_wait_for(60.0)
-
- def test_cancel_gather_1(self):
- """Ensure that a gathering future refuses to be cancelled once all
- children are done"""
- loop = asyncio.new_event_loop()
- self.addCleanup(loop.close)
-
- fut = self.new_future(loop)
- # The indirection fut->child_coro is needed since otherwise the
- # gathering task is done at the same time as the child future
- def child_coro():
- return (yield from fut)
- gather_future = asyncio.gather(child_coro(), loop=loop)
- gather_task = asyncio.ensure_future(gather_future, loop=loop)
-
- cancel_result = None
- def cancelling_callback(_):
- nonlocal cancel_result
- cancel_result = gather_task.cancel()
- fut.add_done_callback(cancelling_callback)
-
- fut.set_result(42) # calls the cancelling_callback after fut is done()
-
- # At this point the task should complete.
- loop.run_until_complete(gather_task)
-
- # Python issue #26923: asyncio.gather drops cancellation
- self.assertEqual(cancel_result, False)
- self.assertFalse(gather_task.cancelled())
- self.assertEqual(gather_task.result(), [42])
-
- def test_cancel_gather_2(self):
- loop = asyncio.new_event_loop()
- self.addCleanup(loop.close)
-
- async def test():
- time = 0
- while True:
- time += 0.05
- await asyncio.gather(asyncio.sleep(0.05),
- return_exceptions=True,
- loop=loop)
- if time > 1:
- return
-
- async def main():
- qwe = self.new_task(loop, test())
- await asyncio.sleep(0.2)
- qwe.cancel()
- try:
- await qwe
- except asyncio.CancelledError:
- pass
- else:
- self.fail('gather did not propagate the cancellation request')
-
- loop.run_until_complete(main())
-
- def test_exception_traceback(self):
- # See http://bugs.python.org/issue28843
-
- async def foo():
- 1 / 0
-
- async def main():
- task = self.new_task(self.loop, foo())
- await asyncio.sleep(0) # skip one loop iteration
- self.assertIsNotNone(task.exception().__traceback__)
-
- self.loop.run_until_complete(main())
-
- @mock.patch('asyncio.base_events.logger')
- def test_error_in_call_soon(self, m_log):
- def call_soon(callback, *args, **kwargs):
- raise ValueError
- self.loop.call_soon = call_soon
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- pass
-
- self.assertFalse(m_log.error.called)
-
- with self.assertRaises(ValueError):
- gen = coro()
- try:
- self.new_task(self.loop, gen)
- finally:
- gen.close()
-
- self.assertTrue(m_log.error.called)
- message = m_log.error.call_args[0][0]
- self.assertIn('Task was destroyed but it is pending', message)
-
- self.assertEqual(asyncio.all_tasks(self.loop), set())
-
- def test_create_task_with_noncoroutine(self):
- with self.assertRaisesRegex(TypeError,
- "a coroutine was expected, got 123"):
- self.new_task(self.loop, 123)
-
- # test it for the second time to ensure that caching
- # in asyncio.iscoroutine() doesn't break things.
- with self.assertRaisesRegex(TypeError,
- "a coroutine was expected, got 123"):
- self.new_task(self.loop, 123)
-
- def test_create_task_with_oldstyle_coroutine(self):
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- pass
-
- task = self.new_task(self.loop, coro())
- self.assertIsInstance(task, self.Task)
- self.loop.run_until_complete(task)
-
- # test it for the second time to ensure that caching
- # in asyncio.iscoroutine() doesn't break things.
- task = self.new_task(self.loop, coro())
- self.assertIsInstance(task, self.Task)
- self.loop.run_until_complete(task)
-
- def test_create_task_with_async_function(self):
-
- async def coro():
- pass
-
- task = self.new_task(self.loop, coro())
- self.assertIsInstance(task, self.Task)
- self.loop.run_until_complete(task)
-
- # test it for the second time to ensure that caching
- # in asyncio.iscoroutine() doesn't break things.
- task = self.new_task(self.loop, coro())
- self.assertIsInstance(task, self.Task)
- self.loop.run_until_complete(task)
-
- def test_create_task_with_asynclike_function(self):
- task = self.new_task(self.loop, CoroLikeObject())
- self.assertIsInstance(task, self.Task)
- self.assertEqual(self.loop.run_until_complete(task), 42)
-
- # test it for the second time to ensure that caching
- # in asyncio.iscoroutine() doesn't break things.
- task = self.new_task(self.loop, CoroLikeObject())
- self.assertIsInstance(task, self.Task)
- self.assertEqual(self.loop.run_until_complete(task), 42)
-
- def test_bare_create_task(self):
-
- async def inner():
- return 1
-
- async def coro():
- task = asyncio.create_task(inner())
- self.assertIsInstance(task, self.Task)
- ret = await task
- self.assertEqual(1, ret)
-
- self.loop.run_until_complete(coro())
-
- def test_bare_create_named_task(self):
-
- async def coro_noop():
- pass
-
- async def coro():
- task = asyncio.create_task(coro_noop(), name='No-op')
- self.assertEqual(task.get_name(), 'No-op')
- await task
-
- self.loop.run_until_complete(coro())
-
- def test_context_1(self):
- cvar = contextvars.ContextVar('cvar', default='nope')
-
- async def sub():
- await asyncio.sleep(0.01)
- self.assertEqual(cvar.get(), 'nope')
- cvar.set('something else')
-
- async def main():
- self.assertEqual(cvar.get(), 'nope')
- subtask = self.new_task(loop, sub())
- cvar.set('yes')
- self.assertEqual(cvar.get(), 'yes')
- await subtask
- self.assertEqual(cvar.get(), 'yes')
-
- loop = asyncio.new_event_loop()
- try:
- task = self.new_task(loop, main())
- loop.run_until_complete(task)
- finally:
- loop.close()
-
- def test_context_2(self):
- cvar = contextvars.ContextVar('cvar', default='nope')
-
- async def main():
- def fut_on_done(fut):
- # This change must not pollute the context
- # of the "main()" task.
- cvar.set('something else')
-
- self.assertEqual(cvar.get(), 'nope')
-
- for j in range(2):
- fut = self.new_future(loop)
- fut.add_done_callback(fut_on_done)
- cvar.set(f'yes{j}')
- loop.call_soon(fut.set_result, None)
- await fut
- self.assertEqual(cvar.get(), f'yes{j}')
-
- for i in range(3):
- # Test that task passed its context to add_done_callback:
- cvar.set(f'yes{i}-{j}')
- await asyncio.sleep(0.001)
- self.assertEqual(cvar.get(), f'yes{i}-{j}')
-
- loop = asyncio.new_event_loop()
- try:
- task = self.new_task(loop, main())
- loop.run_until_complete(task)
- finally:
- loop.close()
-
- self.assertEqual(cvar.get(), 'nope')
-
- def test_context_3(self):
- # Run 100 Tasks in parallel, each modifying cvar.
-
- cvar = contextvars.ContextVar('cvar', default=-1)
-
- async def sub(num):
- for i in range(10):
- cvar.set(num + i)
- await asyncio.sleep(random.uniform(0.001, 0.05))
- self.assertEqual(cvar.get(), num + i)
-
- async def main():
- tasks = []
- for i in range(100):
- task = loop.create_task(sub(random.randint(0, 10)))
- tasks.append(task)
-
- await asyncio.gather(*tasks, loop=loop)
-
- loop = asyncio.new_event_loop()
- try:
- loop.run_until_complete(main())
- finally:
- loop.close()
-
- self.assertEqual(cvar.get(), -1)
-
- def test_get_coro(self):
- loop = asyncio.new_event_loop()
- coro = coroutine_function()
- try:
- task = self.new_task(loop, coro)
- loop.run_until_complete(task)
- self.assertIs(task.get_coro(), coro)
- finally:
- loop.close()
-
-
-def add_subclass_tests(cls):
- BaseTask = cls.Task
- BaseFuture = cls.Future
-
- if BaseTask is None or BaseFuture is None:
- return cls
-
- class CommonFuture:
- def __init__(self, *args, **kwargs):
- self.calls = collections.defaultdict(lambda: 0)
- super().__init__(*args, **kwargs)
-
- def add_done_callback(self, *args, **kwargs):
- self.calls['add_done_callback'] += 1
- return super().add_done_callback(*args, **kwargs)
-
- class Task(CommonFuture, BaseTask):
- pass
-
- class Future(CommonFuture, BaseFuture):
- pass
-
- def test_subclasses_ctask_cfuture(self):
- fut = self.Future(loop=self.loop)
-
- async def func():
- self.loop.call_soon(lambda: fut.set_result('spam'))
- return await fut
-
- task = self.Task(func(), loop=self.loop)
-
- result = self.loop.run_until_complete(task)
-
- self.assertEqual(result, 'spam')
-
- self.assertEqual(
- dict(task.calls),
- {'add_done_callback': 1})
-
- self.assertEqual(
- dict(fut.calls),
- {'add_done_callback': 1})
-
- # Add patched Task & Future back to the test case
- cls.Task = Task
- cls.Future = Future
-
- # Add an extra unit-test
- cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
-
- # Disable the "test_task_source_traceback" test
- # (the test is hardcoded for a particular call stack, which
- # is slightly different for Task subclasses)
- cls.test_task_source_traceback = None
-
- return cls
-
-
-class SetMethodsTest:
-
- def test_set_result_causes_invalid_state(self):
- Future = type(self).Future
- self.loop.call_exception_handler = exc_handler = mock.Mock()
-
- async def foo():
- await asyncio.sleep(0.1)
- return 10
-
- coro = foo()
- task = self.new_task(self.loop, coro)
- Future.set_result(task, 'spam')
-
- self.assertEqual(
- self.loop.run_until_complete(task),
- 'spam')
-
- exc_handler.assert_called_once()
- exc = exc_handler.call_args[0][0]['exception']
- with self.assertRaisesRegex(asyncio.InvalidStateError,
- r'step\(\): already done'):
- raise exc
-
- coro.close()
-
- def test_set_exception_causes_invalid_state(self):
- class MyExc(Exception):
- pass
-
- Future = type(self).Future
- self.loop.call_exception_handler = exc_handler = mock.Mock()
-
- async def foo():
- await asyncio.sleep(0.1)
- return 10
-
- coro = foo()
- task = self.new_task(self.loop, coro)
- Future.set_exception(task, MyExc())
-
- with self.assertRaises(MyExc):
- self.loop.run_until_complete(task)
-
- exc_handler.assert_called_once()
- exc = exc_handler.call_args[0][0]['exception']
- with self.assertRaisesRegex(asyncio.InvalidStateError,
- r'step\(\): already done'):
- raise exc
-
- coro.close()
-
-
-@unittest.skipUnless(hasattr(futures, '_CFuture') and
- hasattr(tasks, '_CTask'),
- 'requires the C _asyncio module')
-class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
- test_utils.TestCase):
-
- Task = getattr(tasks, '_CTask', None)
- Future = getattr(futures, '_CFuture', None)
-
- @support.refcount_test
- def test_refleaks_in_task___init__(self):
- gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
- async def coro():
- pass
- task = self.new_task(self.loop, coro())
- self.loop.run_until_complete(task)
- refs_before = gettotalrefcount()
- for i in range(100):
- task.__init__(coro(), loop=self.loop)
- self.loop.run_until_complete(task)
- self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
-
- def test_del__log_destroy_pending_segfault(self):
- async def coro():
- pass
- task = self.new_task(self.loop, coro())
- self.loop.run_until_complete(task)
- with self.assertRaises(AttributeError):
- del task._log_destroy_pending
-
-
-@unittest.skipUnless(hasattr(futures, '_CFuture') and
- hasattr(tasks, '_CTask'),
- 'requires the C _asyncio module')
-@add_subclass_tests
-class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
-
- Task = getattr(tasks, '_CTask', None)
- Future = getattr(futures, '_CFuture', None)
-
-
-@unittest.skipUnless(hasattr(tasks, '_CTask'),
- 'requires the C _asyncio module')
-@add_subclass_tests
-class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
-
- Task = getattr(tasks, '_CTask', None)
- Future = futures._PyFuture
-
-
-@unittest.skipUnless(hasattr(futures, '_CFuture'),
- 'requires the C _asyncio module')
-@add_subclass_tests
-class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase):
-
- Future = getattr(futures, '_CFuture', None)
- Task = tasks._PyTask
-
-
-@unittest.skipUnless(hasattr(tasks, '_CTask'),
- 'requires the C _asyncio module')
-class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
-
- Task = getattr(tasks, '_CTask', None)
- Future = futures._PyFuture
-
-
-@unittest.skipUnless(hasattr(futures, '_CFuture'),
- 'requires the C _asyncio module')
-class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase):
-
- Task = tasks._PyTask
- Future = getattr(futures, '_CFuture', None)
-
-
-class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
- test_utils.TestCase):
-
- Task = tasks._PyTask
- Future = futures._PyFuture
-
-
-@add_subclass_tests
-class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
- Task = tasks._PyTask
- Future = futures._PyFuture
-
-
-@unittest.skipUnless(hasattr(tasks, '_CTask'),
- 'requires the C _asyncio module')
-class CTask_Future_Tests(test_utils.TestCase):
-
- def test_foobar(self):
- class Fut(asyncio.Future):
- @property
- def get_loop(self):
- raise AttributeError
-
- async def coro():
- await fut
- return 'spam'
-
- self.loop = asyncio.new_event_loop()
- try:
- fut = Fut(loop=self.loop)
- self.loop.call_later(0.1, fut.set_result, 1)
- task = self.loop.create_task(coro())
- res = self.loop.run_until_complete(task)
- finally:
- self.loop.close()
-
- self.assertEqual(res, 'spam')
-
-
-class BaseTaskIntrospectionTests:
- _register_task = None
- _unregister_task = None
- _enter_task = None
- _leave_task = None
-
- def test__register_task_1(self):
- class TaskLike:
- @property
- def _loop(self):
- return loop
-
- def done(self):
- return False
-
- task = TaskLike()
- loop = mock.Mock()
-
- self.assertEqual(asyncio.all_tasks(loop), set())
- self._register_task(task)
- self.assertEqual(asyncio.all_tasks(loop), {task})
- self._unregister_task(task)
-
- def test__register_task_2(self):
- class TaskLike:
- def get_loop(self):
- return loop
-
- def done(self):
- return False
-
- task = TaskLike()
- loop = mock.Mock()
-
- self.assertEqual(asyncio.all_tasks(loop), set())
- self._register_task(task)
- self.assertEqual(asyncio.all_tasks(loop), {task})
- self._unregister_task(task)
-
- def test__register_task_3(self):
- class TaskLike:
- def get_loop(self):
- return loop
-
- def done(self):
- return True
-
- task = TaskLike()
- loop = mock.Mock()
-
- self.assertEqual(asyncio.all_tasks(loop), set())
- self._register_task(task)
- self.assertEqual(asyncio.all_tasks(loop), set())
- with self.assertWarns(DeprecationWarning):
- self.assertEqual(asyncio.Task.all_tasks(loop), {task})
- self._unregister_task(task)
-
- def test__enter_task(self):
- task = mock.Mock()
- loop = mock.Mock()
- self.assertIsNone(asyncio.current_task(loop))
- self._enter_task(loop, task)
- self.assertIs(asyncio.current_task(loop), task)
- self._leave_task(loop, task)
-
- def test__enter_task_failure(self):
- task1 = mock.Mock()
- task2 = mock.Mock()
- loop = mock.Mock()
- self._enter_task(loop, task1)
- with self.assertRaises(RuntimeError):
- self._enter_task(loop, task2)
- self.assertIs(asyncio.current_task(loop), task1)
- self._leave_task(loop, task1)
-
- def test__leave_task(self):
- task = mock.Mock()
- loop = mock.Mock()
- self._enter_task(loop, task)
- self._leave_task(loop, task)
- self.assertIsNone(asyncio.current_task(loop))
-
- def test__leave_task_failure1(self):
- task1 = mock.Mock()
- task2 = mock.Mock()
- loop = mock.Mock()
- self._enter_task(loop, task1)
- with self.assertRaises(RuntimeError):
- self._leave_task(loop, task2)
- self.assertIs(asyncio.current_task(loop), task1)
- self._leave_task(loop, task1)
-
- def test__leave_task_failure2(self):
- task = mock.Mock()
- loop = mock.Mock()
- with self.assertRaises(RuntimeError):
- self._leave_task(loop, task)
- self.assertIsNone(asyncio.current_task(loop))
-
- def test__unregister_task(self):
- task = mock.Mock()
- loop = mock.Mock()
- task.get_loop = lambda: loop
- self._register_task(task)
- self._unregister_task(task)
- self.assertEqual(asyncio.all_tasks(loop), set())
-
- def test__unregister_task_not_registered(self):
- task = mock.Mock()
- loop = mock.Mock()
- self._unregister_task(task)
- self.assertEqual(asyncio.all_tasks(loop), set())
-
-
-class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
- _register_task = staticmethod(tasks._py_register_task)
- _unregister_task = staticmethod(tasks._py_unregister_task)
- _enter_task = staticmethod(tasks._py_enter_task)
- _leave_task = staticmethod(tasks._py_leave_task)
-
-
-@unittest.skipUnless(hasattr(tasks, '_c_register_task'),
- 'requires the C _asyncio module')
-class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
- if hasattr(tasks, '_c_register_task'):
- _register_task = staticmethod(tasks._c_register_task)
- _unregister_task = staticmethod(tasks._c_unregister_task)
- _enter_task = staticmethod(tasks._c_enter_task)
- _leave_task = staticmethod(tasks._c_leave_task)
- else:
- _register_task = _unregister_task = _enter_task = _leave_task = None
-
-
-class BaseCurrentLoopTests:
-
- def setUp(self):
- super().setUp()
- self.loop = asyncio.new_event_loop()
- self.set_event_loop(self.loop)
-
- def new_task(self, coro):
- raise NotImplementedError
-
- def test_current_task_no_running_loop(self):
- self.assertIsNone(asyncio.current_task(loop=self.loop))
-
- def test_current_task_no_running_loop_implicit(self):
- with self.assertRaises(RuntimeError):
- asyncio.current_task()
-
- def test_current_task_with_implicit_loop(self):
- async def coro():
- self.assertIs(asyncio.current_task(loop=self.loop), task)
-
- self.assertIs(asyncio.current_task(None), task)
- self.assertIs(asyncio.current_task(), task)
-
- task = self.new_task(coro())
- self.loop.run_until_complete(task)
- self.assertIsNone(asyncio.current_task(loop=self.loop))
-
-
-class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
-
- def new_task(self, coro):
- return tasks._PyTask(coro, loop=self.loop)
-
-
-@unittest.skipUnless(hasattr(tasks, '_CTask'),
- 'requires the C _asyncio module')
-class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
-
- def new_task(self, coro):
- return getattr(tasks, '_CTask')(coro, loop=self.loop)
-
-
-class GenericTaskTests(test_utils.TestCase):
-
- def test_future_subclass(self):
- self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
-
- def test_asyncio_module_compiled(self):
- # Because of circular imports it's easy to make _asyncio
- # module non-importable. This is a simple test that will
- # fail on systems where C modules were successfully compiled
- # (hence the test for _functools), but _asyncio somehow didn't.
- try:
- import _functools
- except ImportError:
- pass
- else:
- try:
- import _asyncio
- except ImportError:
- self.fail('_asyncio module is missing')
-
-
-class GatherTestsBase:
-
- def setUp(self):
- super().setUp()
- self.one_loop = self.new_test_loop()
- self.other_loop = self.new_test_loop()
- self.set_event_loop(self.one_loop, cleanup=False)
-
- def _run_loop(self, loop):
- while loop._ready:
- test_utils.run_briefly(loop)
-
- def _check_success(self, **kwargs):
- a, b, c = [self.one_loop.create_future() for i in range(3)]
- fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs)
- cb = test_utils.MockCallback()
- fut.add_done_callback(cb)
- b.set_result(1)
- a.set_result(2)
- self._run_loop(self.one_loop)
- self.assertEqual(cb.called, False)
- self.assertFalse(fut.done())
- c.set_result(3)
- self._run_loop(self.one_loop)
- cb.assert_called_once_with(fut)
- self.assertEqual(fut.result(), [2, 1, 3])
-
- def test_success(self):
- self._check_success()
- self._check_success(return_exceptions=False)
-
- def test_result_exception_success(self):
- self._check_success(return_exceptions=True)
-
- def test_one_exception(self):
- a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
- fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e))
- cb = test_utils.MockCallback()
- fut.add_done_callback(cb)
- exc = ZeroDivisionError()
- a.set_result(1)
- b.set_exception(exc)
- self._run_loop(self.one_loop)
- self.assertTrue(fut.done())
- cb.assert_called_once_with(fut)
- self.assertIs(fut.exception(), exc)
- # Does nothing
- c.set_result(3)
- d.cancel()
- e.set_exception(RuntimeError())
- e.exception()
-
- def test_return_exceptions(self):
- a, b, c, d = [self.one_loop.create_future() for i in range(4)]
- fut = asyncio.gather(*self.wrap_futures(a, b, c, d),
- return_exceptions=True)
- cb = test_utils.MockCallback()
- fut.add_done_callback(cb)
- exc = ZeroDivisionError()
- exc2 = RuntimeError()
- b.set_result(1)
- c.set_exception(exc)
- a.set_result(3)
- self._run_loop(self.one_loop)
- self.assertFalse(fut.done())
- d.set_exception(exc2)
- self._run_loop(self.one_loop)
- self.assertTrue(fut.done())
- cb.assert_called_once_with(fut)
- self.assertEqual(fut.result(), [3, 1, exc, exc2])
-
- def test_env_var_debug(self):
- code = '\n'.join((
- 'import asyncio.coroutines',
- 'print(asyncio.coroutines._DEBUG)'))
-
- # Test with -E to not fail if the unit test was run with
- # PYTHONASYNCIODEBUG set to a non-empty string
- sts, stdout, stderr = assert_python_ok('-E', '-c', code)
- self.assertEqual(stdout.rstrip(), b'False')
-
- sts, stdout, stderr = assert_python_ok('-c', code,
- PYTHONASYNCIODEBUG='',
- PYTHONDEVMODE='')
- self.assertEqual(stdout.rstrip(), b'False')
-
- sts, stdout, stderr = assert_python_ok('-c', code,
- PYTHONASYNCIODEBUG='1',
- PYTHONDEVMODE='')
- self.assertEqual(stdout.rstrip(), b'True')
-
- sts, stdout, stderr = assert_python_ok('-E', '-c', code,
- PYTHONASYNCIODEBUG='1',
- PYTHONDEVMODE='')
- self.assertEqual(stdout.rstrip(), b'False')
-
- # -X dev
- sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
- '-c', code)
- self.assertEqual(stdout.rstrip(), b'True')
-
-
-class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
-
- def wrap_futures(self, *futures):
- return futures
-
- def _check_empty_sequence(self, seq_or_iter):
- asyncio.set_event_loop(self.one_loop)
- self.addCleanup(asyncio.set_event_loop, None)
- fut = asyncio.gather(*seq_or_iter)
- self.assertIsInstance(fut, asyncio.Future)
- self.assertIs(fut._loop, self.one_loop)
- self._run_loop(self.one_loop)
- self.assertTrue(fut.done())
- self.assertEqual(fut.result(), [])
- with self.assertWarns(DeprecationWarning):
- fut = asyncio.gather(*seq_or_iter, loop=self.other_loop)
- self.assertIs(fut._loop, self.other_loop)
-
- def test_constructor_empty_sequence(self):
- self._check_empty_sequence([])
- self._check_empty_sequence(())
- self._check_empty_sequence(set())
- self._check_empty_sequence(iter(""))
-
- def test_constructor_heterogenous_futures(self):
- fut1 = self.one_loop.create_future()
- fut2 = self.other_loop.create_future()
- with self.assertRaises(ValueError):
- asyncio.gather(fut1, fut2)
- with self.assertRaises(ValueError):
- asyncio.gather(fut1, loop=self.other_loop)
-
- def test_constructor_homogenous_futures(self):
- children = [self.other_loop.create_future() for i in range(3)]
- fut = asyncio.gather(*children)
- self.assertIs(fut._loop, self.other_loop)
- self._run_loop(self.other_loop)
- self.assertFalse(fut.done())
- fut = asyncio.gather(*children, loop=self.other_loop)
- self.assertIs(fut._loop, self.other_loop)
- self._run_loop(self.other_loop)
- self.assertFalse(fut.done())
-
- def test_one_cancellation(self):
- a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
- fut = asyncio.gather(a, b, c, d, e)
- cb = test_utils.MockCallback()
- fut.add_done_callback(cb)
- a.set_result(1)
- b.cancel()
- self._run_loop(self.one_loop)
- self.assertTrue(fut.done())
- cb.assert_called_once_with(fut)
- self.assertFalse(fut.cancelled())
- self.assertIsInstance(fut.exception(), asyncio.CancelledError)
- # Does nothing
- c.set_result(3)
- d.cancel()
- e.set_exception(RuntimeError())
- e.exception()
-
- def test_result_exception_one_cancellation(self):
- a, b, c, d, e, f = [self.one_loop.create_future()
- for i in range(6)]
- fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
- cb = test_utils.MockCallback()
- fut.add_done_callback(cb)
- a.set_result(1)
- zde = ZeroDivisionError()
- b.set_exception(zde)
- c.cancel()
- self._run_loop(self.one_loop)
- self.assertFalse(fut.done())
- d.set_result(3)
- e.cancel()
- rte = RuntimeError()
- f.set_exception(rte)
- res = self.one_loop.run_until_complete(fut)
- self.assertIsInstance(res[2], asyncio.CancelledError)
- self.assertIsInstance(res[4], asyncio.CancelledError)
- res[2] = res[4] = None
- self.assertEqual(res, [1, zde, None, 3, None, rte])
- cb.assert_called_once_with(fut)
-
-
-class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
-
- def setUp(self):
- super().setUp()
- asyncio.set_event_loop(self.one_loop)
-
- def wrap_futures(self, *futures):
- coros = []
- for fut in futures:
- async def coro(fut=fut):
- return await fut
- coros.append(coro())
- return coros
-
- def test_constructor_loop_selection(self):
- async def coro():
- return 'abc'
- gen1 = coro()
- gen2 = coro()
- fut = asyncio.gather(gen1, gen2)
- self.assertIs(fut._loop, self.one_loop)
- self.one_loop.run_until_complete(fut)
-
- self.set_event_loop(self.other_loop, cleanup=False)
- gen3 = coro()
- gen4 = coro()
- fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop)
- self.assertIs(fut2._loop, self.other_loop)
- self.other_loop.run_until_complete(fut2)
-
- def test_duplicate_coroutines(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro(s):
- return s
- c = coro('abc')
- fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop)
- self._run_loop(self.one_loop)
- self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
-
- def test_cancellation_broadcast(self):
- # Cancelling outer() cancels all children.
- proof = 0
- waiter = self.one_loop.create_future()
-
- async def inner():
- nonlocal proof
- await waiter
- proof += 1
-
- child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
- child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
- gatherer = None
-
- async def outer():
- nonlocal proof, gatherer
- gatherer = asyncio.gather(child1, child2, loop=self.one_loop)
- await gatherer
- proof += 100
-
- f = asyncio.ensure_future(outer(), loop=self.one_loop)
- test_utils.run_briefly(self.one_loop)
- self.assertTrue(f.cancel())
- with self.assertRaises(asyncio.CancelledError):
- self.one_loop.run_until_complete(f)
- self.assertFalse(gatherer.cancel())
- self.assertTrue(waiter.cancelled())
- self.assertTrue(child1.cancelled())
- self.assertTrue(child2.cancelled())
- test_utils.run_briefly(self.one_loop)
- self.assertEqual(proof, 0)
-
- def test_exception_marking(self):
- # Test for the first line marked "Mark exception retrieved."
-
- async def inner(f):
- await f
- raise RuntimeError('should not be ignored')
-
- a = self.one_loop.create_future()
- b = self.one_loop.create_future()
-
- async def outer():
- await asyncio.gather(inner(a), inner(b), loop=self.one_loop)
-
- f = asyncio.ensure_future(outer(), loop=self.one_loop)
- test_utils.run_briefly(self.one_loop)
- a.set_result(None)
- test_utils.run_briefly(self.one_loop)
- b.set_result(None)
- test_utils.run_briefly(self.one_loop)
- self.assertIsInstance(f.exception(), RuntimeError)
-
-
-class RunCoroutineThreadsafeTests(test_utils.TestCase):
- """Test case for asyncio.run_coroutine_threadsafe."""
-
- def setUp(self):
- super().setUp()
- self.loop = asyncio.new_event_loop()
- self.set_event_loop(self.loop) # Will cleanup properly
-
- async def add(self, a, b, fail=False, cancel=False):
- """Wait 0.05 second and return a + b."""
- await asyncio.sleep(0.05)
- if fail:
- raise RuntimeError("Fail!")
- if cancel:
- asyncio.current_task(self.loop).cancel()
- await asyncio.sleep(0)
- return a + b
-
- def target(self, fail=False, cancel=False, timeout=None,
- advance_coro=False):
- """Run add coroutine in the event loop."""
- coro = self.add(1, 2, fail=fail, cancel=cancel)
- future = asyncio.run_coroutine_threadsafe(coro, self.loop)
- if advance_coro:
- # this is for test_run_coroutine_threadsafe_task_factory_exception;
- # otherwise it spills errors and breaks **other** unittests, since
- # 'target' is interacting with threads.
-
- # With this call, `coro` will be advanced, so that
- # CoroWrapper.__del__ won't do anything when asyncio tests run
- # in debug mode.
- self.loop.call_soon_threadsafe(coro.send, None)
- try:
- return future.result(timeout)
- finally:
- future.done() or future.cancel()
-
- def test_run_coroutine_threadsafe(self):
- """Test coroutine submission from a thread to an event loop."""
- future = self.loop.run_in_executor(None, self.target)
- result = self.loop.run_until_complete(future)
- self.assertEqual(result, 3)
-
- def test_run_coroutine_threadsafe_with_exception(self):
- """Test coroutine submission from a thread to an event loop
- when an exception is raised."""
- future = self.loop.run_in_executor(None, self.target, True)
- with self.assertRaises(RuntimeError) as exc_context:
- self.loop.run_until_complete(future)
- self.assertIn("Fail!", exc_context.exception.args)
-
- def test_run_coroutine_threadsafe_with_timeout(self):
- """Test coroutine submission from a thread to an event loop
- when a timeout is raised."""
- callback = lambda: self.target(timeout=0)
- future = self.loop.run_in_executor(None, callback)
- with self.assertRaises(asyncio.TimeoutError):
- self.loop.run_until_complete(future)
- test_utils.run_briefly(self.loop)
- # Check that there's no pending task (add has been cancelled)
- for task in asyncio.all_tasks(self.loop):
- self.assertTrue(task.done())
-
- def test_run_coroutine_threadsafe_task_cancelled(self):
- """Test coroutine submission from a tread to an event loop
- when the task is cancelled."""
- callback = lambda: self.target(cancel=True)
- future = self.loop.run_in_executor(None, callback)
- with self.assertRaises(asyncio.CancelledError):
- self.loop.run_until_complete(future)
-
- def test_run_coroutine_threadsafe_task_factory_exception(self):
- """Test coroutine submission from a tread to an event loop
- when the task factory raise an exception."""
-
- def task_factory(loop, coro):
- raise NameError
-
- run = self.loop.run_in_executor(
- None, lambda: self.target(advance_coro=True))
-
- # Set exception handler
- callback = test_utils.MockCallback()
- self.loop.set_exception_handler(callback)
-
- # Set corrupted task factory
- self.addCleanup(self.loop.set_task_factory,
- self.loop.get_task_factory())
- self.loop.set_task_factory(task_factory)
-
- # Run event loop
- with self.assertRaises(NameError) as exc_context:
- self.loop.run_until_complete(run)
-
- # Check exceptions
- self.assertEqual(len(callback.call_args_list), 1)
- (loop, context), kwargs = callback.call_args
- self.assertEqual(context['exception'], exc_context.exception)
-
-
-class SleepTests(test_utils.TestCase):
- def setUp(self):
- super().setUp()
- self.loop = asyncio.new_event_loop()
- self.set_event_loop(self.loop)
-
- def tearDown(self):
- self.loop.close()
- self.loop = None
- super().tearDown()
-
- def test_sleep_zero(self):
- result = 0
-
- def inc_result(num):
- nonlocal result
- result += num
-
- async def coro():
- self.loop.call_soon(inc_result, 1)
- self.assertEqual(result, 0)
- num = await asyncio.sleep(0, result=10)
- self.assertEqual(result, 1) # inc'ed by call_soon
- inc_result(num) # num should be 11
-
- self.loop.run_until_complete(coro())
- self.assertEqual(result, 11)
-
- def test_loop_argument_is_deprecated(self):
- # Remove test when loop argument is removed in Python 3.10
- with self.assertWarns(DeprecationWarning):
- self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop))
-
-
-class WaitTests(test_utils.TestCase):
- def setUp(self):
- super().setUp()
- self.loop = asyncio.new_event_loop()
- self.set_event_loop(self.loop)
-
- def tearDown(self):
- self.loop.close()
- self.loop = None
- super().tearDown()
-
- def test_loop_argument_is_deprecated_in_wait(self):
- # Remove test when loop argument is removed in Python 3.10
- with self.assertWarns(DeprecationWarning):
- self.loop.run_until_complete(
- asyncio.wait([coroutine_function()], loop=self.loop))
-
- def test_loop_argument_is_deprecated_in_wait_for(self):
- # Remove test when loop argument is removed in Python 3.10
- with self.assertWarns(DeprecationWarning):
- self.loop.run_until_complete(
- asyncio.wait_for(coroutine_function(), 0.01, loop=self.loop))
-
-
-class CompatibilityTests(test_utils.TestCase):
- # Tests for checking a bridge between old-styled coroutines
- # and async/await syntax
-
- def setUp(self):
- super().setUp()
- self.loop = asyncio.new_event_loop()
- self.set_event_loop(self.loop)
-
- def tearDown(self):
- self.loop.close()
- self.loop = None
- super().tearDown()
-
- def test_yield_from_awaitable(self):
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- yield from asyncio.sleep(0)
- return 'ok'
-
- result = self.loop.run_until_complete(coro())
- self.assertEqual('ok', result)
-
- def test_await_old_style_coro(self):
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro1():
- return 'ok1'
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro2():
- yield from asyncio.sleep(0)
- return 'ok2'
-
- async def inner():
- return await asyncio.gather(coro1(), coro2(), loop=self.loop)
-
- result = self.loop.run_until_complete(inner())
- self.assertEqual(['ok1', 'ok2'], result)
-
- def test_debug_mode_interop(self):
- # https://bugs.python.org/issue32636
- code = textwrap.dedent("""
- import asyncio
-
- async def native_coro():
- pass
-
- @asyncio.coroutine
- def old_style_coro():
- yield from native_coro()
-
- asyncio.run(old_style_coro())
- """)
-
- assert_python_ok("-Wignore::DeprecationWarning", "-c", code,
- PYTHONASYNCIODEBUG="1")
-
-
-if __name__ == '__main__':
- unittest.main()