diff options
author | Yury Selivanov <yury@magic.io> | 2016-10-28 16:52:37 (GMT) |
---|---|---|
committer | Yury Selivanov <yury@magic.io> | 2016-10-28 16:52:37 (GMT) |
commit | a0c1ba608eb89b4e10155f7652c50a3ac0b709af (patch) | |
tree | 90e811ae976793876c5c04de7d91cbb7f1518fd9 /Lib/test/test_asyncio | |
parent | bbcb79920b0e220c2e1b0e77db5aca2f3a2a52d4 (diff) | |
download | cpython-a0c1ba608eb89b4e10155f7652c50a3ac0b709af.zip cpython-a0c1ba608eb89b4e10155f7652c50a3ac0b709af.tar.gz cpython-a0c1ba608eb89b4e10155f7652c50a3ac0b709af.tar.bz2 |
Issue #28544: Implement asyncio.Task in C.
This implementation provides additional 10-20% speed boost for
asyncio programs.
The patch also fixes _asynciomodule.c to use Arguments Clinic, and
makes '_schedule_callbacks' an overridable method (as it was in 3.5).
Diffstat (limited to 'Lib/test/test_asyncio')
-rw-r--r-- | Lib/test/test_asyncio/test_tasks.py | 439 |
1 files changed, 305 insertions, 134 deletions
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 1ceb9b2..d8862fc 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -1,5 +1,6 @@ """Tests for tasks.py.""" +import collections import contextlib import functools import io @@ -14,6 +15,8 @@ from unittest import mock import asyncio from asyncio import coroutines +from asyncio import futures +from asyncio import tasks from asyncio import test_utils try: from test import support @@ -72,14 +75,25 @@ class Dummy: pass -class TaskTests(test_utils.TestCase): +class BaseTaskTests: + + Task = None + Future = None + + def new_task(self, loop, coro): + return self.__class__.Task(coro, loop=loop) + + def new_future(self, loop): + return self.__class__.Future(loop=loop) def setUp(self): self.loop = self.new_test_loop() + self.loop.set_task_factory(self.new_task) + self.loop.create_future = lambda: self.new_future(self.loop) def test_other_loop_future(self): other_loop = asyncio.new_event_loop() - fut = asyncio.Future(loop=other_loop) + fut = self.new_future(other_loop) @asyncio.coroutine def run(fut): @@ -107,7 +121,7 @@ class TaskTests(test_utils.TestCase): @asyncio.coroutine def notmuch(): return 'ok' - t = asyncio.Task(notmuch(), loop=self.loop) + t = self.new_task(self.loop, notmuch()) self.loop.run_until_complete(t) self.assertTrue(t.done()) self.assertEqual(t.result(), 'ok') @@ -115,7 +129,7 @@ class TaskTests(test_utils.TestCase): loop = asyncio.new_event_loop() self.set_event_loop(loop) - t = asyncio.Task(notmuch(), loop=loop) + t = self.new_task(loop, notmuch()) self.assertIs(t._loop, loop) loop.run_until_complete(t) loop.close() @@ -138,7 +152,7 @@ class TaskTests(test_utils.TestCase): loop.close() def test_ensure_future_future(self): - f_orig = asyncio.Future(loop=self.loop) + f_orig = self.new_future(self.loop) f_orig.set_result('ko') f = asyncio.ensure_future(f_orig) @@ -162,7 +176,7 @@ class TaskTests(test_utils.TestCase): @asyncio.coroutine def notmuch(): return 'ok' - t_orig = asyncio.Task(notmuch(), loop=self.loop) + t_orig = self.new_task(self.loop, notmuch()) t = asyncio.ensure_future(t_orig) self.loop.run_until_complete(t) self.assertTrue(t.done()) @@ -203,7 +217,7 @@ class TaskTests(test_utils.TestCase): asyncio.ensure_future('ok') def test_async_warning(self): - f = asyncio.Future(loop=self.loop) + f = self.new_future(self.loop) with self.assertWarnsRegex(DeprecationWarning, 'function is deprecated, use ensure_'): self.assertIs(f, asyncio.async(f)) @@ -250,8 +264,8 @@ class TaskTests(test_utils.TestCase): # test coroutine function self.assertEqual(notmuch.__name__, 'notmuch') if PY35: - self.assertEqual(notmuch.__qualname__, - 'TaskTests.test_task_repr.<locals>.notmuch') + self.assertRegex(notmuch.__qualname__, + r'\w+.test_task_repr.<locals>.notmuch') self.assertEqual(notmuch.__module__, __name__) filename, lineno = test_utils.get_function_source(notmuch) @@ -260,7 +274,7 @@ class TaskTests(test_utils.TestCase): # test coroutine object gen = notmuch() if coroutines._DEBUG or PY35: - coro_qualname = 'TaskTests.test_task_repr.<locals>.notmuch' + coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch' else: coro_qualname = 'notmuch' self.assertEqual(gen.__name__, 'notmuch') @@ -269,7 +283,7 @@ class TaskTests(test_utils.TestCase): coro_qualname) # test pending Task - t = asyncio.Task(gen, loop=self.loop) + t = self.new_task(self.loop, gen) t.add_done_callback(Dummy()) coro = format_coroutine(coro_qualname, 'running', src, @@ -291,7 +305,7 @@ class TaskTests(test_utils.TestCase): '<Task cancelled %s>' % coro) # test finished Task - t = asyncio.Task(notmuch(), loop=self.loop) + t = self.new_task(self.loop, notmuch()) self.loop.run_until_complete(t) coro = format_coroutine(coro_qualname, 'done', src, t._source_traceback) @@ -310,9 +324,9 @@ class TaskTests(test_utils.TestCase): # test coroutine function self.assertEqual(notmuch.__name__, 'notmuch') if PY35: - self.assertEqual(notmuch.__qualname__, - 'TaskTests.test_task_repr_coro_decorator' - '.<locals>.notmuch') + self.assertRegex(notmuch.__qualname__, + r'\w+.test_task_repr_coro_decorator' + r'\.<locals>\.notmuch') self.assertEqual(notmuch.__module__, __name__) # test coroutine object @@ -322,7 +336,7 @@ class TaskTests(test_utils.TestCase): # function, as expected, and have a qualified name (__qualname__ # attribute). coro_name = 'notmuch' - coro_qualname = ('TaskTests.test_task_repr_coro_decorator' + coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator' '.<locals>.notmuch') else: # On Python < 3.5, generators inherit the name of the code, not of @@ -350,7 +364,7 @@ class TaskTests(test_utils.TestCase): self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro) # test pending Task - t = asyncio.Task(gen, loop=self.loop) + t = self.new_task(self.loop, gen) t.add_done_callback(Dummy()) # format the coroutine object @@ -373,8 +387,8 @@ class TaskTests(test_utils.TestCase): def wait_for(fut): return (yield from fut) - fut = asyncio.Future(loop=self.loop) - task = asyncio.Task(wait_for(fut), loop=self.loop) + fut = self.new_future(self.loop) + task = self.new_task(self.loop, wait_for(fut)) test_utils.run_briefly(self.loop) self.assertRegex(repr(task), '<Task .* wait_for=%s>' % re.escape(repr(fut))) @@ -400,10 +414,11 @@ class TaskTests(test_utils.TestCase): self.addCleanup(task._coro.close) coro_repr = repr(task._coro) - expected = ('<CoroWrapper TaskTests.test_task_repr_partial_corowrapper' - '.<locals>.func(1)() running, ') - self.assertTrue(coro_repr.startswith(expected), - coro_repr) + expected = ( + r'<CoroWrapper \w+.test_task_repr_partial_corowrapper' + r'\.<locals>\.func\(1\)\(\) running, ' + ) + self.assertRegex(coro_repr, expected) def test_task_basics(self): @asyncio.coroutine @@ -437,7 +452,7 @@ class TaskTests(test_utils.TestCase): yield from asyncio.sleep(10.0, loop=loop) return 12 - t = asyncio.Task(task(), loop=loop) + t = self.new_task(loop, task()) loop.call_soon(t.cancel) with self.assertRaises(asyncio.CancelledError): loop.run_until_complete(t) @@ -452,7 +467,7 @@ class TaskTests(test_utils.TestCase): yield return 12 - t = asyncio.Task(task(), loop=self.loop) + t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) # start coro t.cancel() self.assertRaises( @@ -462,14 +477,14 @@ class TaskTests(test_utils.TestCase): self.assertFalse(t.cancel()) def test_cancel_inner_future(self): - f = asyncio.Future(loop=self.loop) + f = self.new_future(self.loop) @asyncio.coroutine def task(): yield from f return 12 - t = asyncio.Task(task(), loop=self.loop) + t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) # start task f.cancel() with self.assertRaises(asyncio.CancelledError): @@ -478,14 +493,14 @@ class TaskTests(test_utils.TestCase): self.assertTrue(t.cancelled()) def test_cancel_both_task_and_inner_future(self): - f = asyncio.Future(loop=self.loop) + f = self.new_future(self.loop) @asyncio.coroutine def task(): yield from f return 12 - t = asyncio.Task(task(), loop=self.loop) + t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) f.cancel() @@ -499,8 +514,8 @@ class TaskTests(test_utils.TestCase): self.assertTrue(t.cancelled()) def test_cancel_task_catching(self): - fut1 = asyncio.Future(loop=self.loop) - fut2 = asyncio.Future(loop=self.loop) + fut1 = self.new_future(self.loop) + fut2 = self.new_future(self.loop) @asyncio.coroutine def task(): @@ -510,7 +525,7 @@ class TaskTests(test_utils.TestCase): except asyncio.CancelledError: return 42 - t = asyncio.Task(task(), loop=self.loop) + t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut1) # White-box test. fut1.set_result(None) @@ -523,9 +538,9 @@ class TaskTests(test_utils.TestCase): self.assertFalse(t.cancelled()) def test_cancel_task_ignoring(self): - fut1 = asyncio.Future(loop=self.loop) - fut2 = asyncio.Future(loop=self.loop) - fut3 = asyncio.Future(loop=self.loop) + fut1 = self.new_future(self.loop) + fut2 = self.new_future(self.loop) + fut3 = self.new_future(self.loop) @asyncio.coroutine def task(): @@ -537,7 +552,7 @@ class TaskTests(test_utils.TestCase): res = yield from fut3 return res - t = asyncio.Task(task(), loop=self.loop) + t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut1) # White-box test. fut1.set_result(None) @@ -565,7 +580,7 @@ class TaskTests(test_utils.TestCase): yield from asyncio.sleep(100, loop=loop) return 12 - t = asyncio.Task(task(), loop=loop) + t = self.new_task(loop, task()) self.assertRaises( asyncio.CancelledError, loop.run_until_complete, t) self.assertTrue(t.done()) @@ -598,7 +613,7 @@ class TaskTests(test_utils.TestCase): if x == 2: loop.stop() - t = asyncio.Task(task(), loop=loop) + t = self.new_task(loop, task()) with self.assertRaises(RuntimeError) as cm: loop.run_until_complete(t) self.assertEqual(str(cm.exception), @@ -636,7 +651,7 @@ class TaskTests(test_utils.TestCase): foo_running = False return 'done' - fut = asyncio.Task(foo(), loop=loop) + fut = self.new_task(loop, foo()) with self.assertRaises(asyncio.TimeoutError): loop.run_until_complete(asyncio.wait_for(fut, 0.1, loop=loop)) @@ -676,7 +691,7 @@ class TaskTests(test_utils.TestCase): asyncio.set_event_loop(loop) try: - fut = asyncio.Task(foo(), loop=loop) + fut = self.new_task(loop, foo()) with self.assertRaises(asyncio.TimeoutError): loop.run_until_complete(asyncio.wait_for(fut, 0.01)) finally: @@ -695,7 +710,7 @@ class TaskTests(test_utils.TestCase): loop = self.new_test_loop(gen) - fut = asyncio.Future(loop=loop) + fut = self.new_future(loop) task = asyncio.wait_for(fut, timeout=0.2, loop=loop) loop.call_later(0.1, fut.set_result, "ok") res = loop.run_until_complete(task) @@ -712,8 +727,8 @@ class TaskTests(test_utils.TestCase): loop = self.new_test_loop(gen) - a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop) - b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) + b = self.new_task(loop, asyncio.sleep(0.15, loop=loop)) @asyncio.coroutine def foo(): @@ -722,12 +737,12 @@ class TaskTests(test_utils.TestCase): self.assertEqual(pending, set()) return 42 - res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + res = loop.run_until_complete(self.new_task(loop, foo())) self.assertEqual(res, 42) self.assertAlmostEqual(0.15, loop.time()) # Doing it again should take no time and exercise a different path. - res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + res = loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) self.assertEqual(res, 42) @@ -742,8 +757,8 @@ class TaskTests(test_utils.TestCase): loop = self.new_test_loop(gen) - a = asyncio.Task(asyncio.sleep(0.01, loop=loop), loop=loop) - b = asyncio.Task(asyncio.sleep(0.015, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(0.01, loop=loop)) + b = self.new_task(loop, asyncio.sleep(0.015, loop=loop)) @asyncio.coroutine def foo(): @@ -754,7 +769,7 @@ class TaskTests(test_utils.TestCase): asyncio.set_event_loop(loop) res = loop.run_until_complete( - asyncio.Task(foo(), loop=loop)) + self.new_task(loop, foo())) self.assertEqual(res, 42) @@ -764,9 +779,9 @@ class TaskTests(test_utils.TestCase): return s c = coro('test') - task = asyncio.Task( - asyncio.wait([c, c, coro('spam')], loop=self.loop), - loop=self.loop) + task =self.new_task( + self.loop, + asyncio.wait([c, c, coro('spam')], loop=self.loop)) done, pending = self.loop.run_until_complete(task) @@ -797,12 +812,12 @@ class TaskTests(test_utils.TestCase): loop = self.new_test_loop(gen) - a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop) - b = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop) - task = asyncio.Task( + a = self.new_task(loop, asyncio.sleep(10.0, loop=loop)) + b = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) + task = self.new_task( + loop, asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED, - loop=loop), - loop=loop) + loop=loop)) done, pending = loop.run_until_complete(task) self.assertEqual({b}, done) @@ -829,12 +844,12 @@ class TaskTests(test_utils.TestCase): yield yield - a = asyncio.Task(coro1(), loop=self.loop) - b = asyncio.Task(coro2(), loop=self.loop) - task = asyncio.Task( + a = self.new_task(self.loop, coro1()) + b = self.new_task(self.loop, coro2()) + task = self.new_task( + self.loop, asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED, - loop=self.loop), - loop=self.loop) + loop=self.loop)) done, pending = self.loop.run_until_complete(task) self.assertEqual({a, b}, done) @@ -853,17 +868,17 @@ class TaskTests(test_utils.TestCase): loop = self.new_test_loop(gen) # first_exception, task already has exception - a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(10.0, loop=loop)) @asyncio.coroutine def exc(): raise ZeroDivisionError('err') - b = asyncio.Task(exc(), loop=loop) - task = asyncio.Task( + b = self.new_task(loop, exc()) + task = self.new_task( + loop, asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION, - loop=loop), - loop=loop) + loop=loop)) done, pending = loop.run_until_complete(task) self.assertEqual({b}, done) @@ -886,14 +901,14 @@ class TaskTests(test_utils.TestCase): loop = self.new_test_loop(gen) # first_exception, exception during waiting - a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(10.0, loop=loop)) @asyncio.coroutine def exc(): yield from asyncio.sleep(0.01, loop=loop) raise ZeroDivisionError('err') - b = asyncio.Task(exc(), loop=loop) + b = self.new_task(loop, exc()) task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION, loop=loop) @@ -917,14 +932,14 @@ class TaskTests(test_utils.TestCase): loop = self.new_test_loop(gen) - a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) @asyncio.coroutine def sleeper(): yield from asyncio.sleep(0.15, loop=loop) raise ZeroDivisionError('really') - b = asyncio.Task(sleeper(), loop=loop) + b = self.new_task(loop, sleeper()) @asyncio.coroutine def foo(): @@ -934,10 +949,10 @@ class TaskTests(test_utils.TestCase): errors = set(f for f in done if f.exception() is not None) self.assertEqual(len(errors), 1) - loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) - loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) def test_wait_with_timeout(self): @@ -953,8 +968,8 @@ class TaskTests(test_utils.TestCase): loop = self.new_test_loop(gen) - a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop) - b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) + b = self.new_task(loop, asyncio.sleep(0.15, loop=loop)) @asyncio.coroutine def foo(): @@ -963,7 +978,7 @@ class TaskTests(test_utils.TestCase): self.assertEqual(done, set([a])) self.assertEqual(pending, set([b])) - loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.11, loop.time()) # move forward to close generator @@ -983,8 +998,8 @@ class TaskTests(test_utils.TestCase): loop = self.new_test_loop(gen) - a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop) - b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop) + a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) + b = self.new_task(loop, asyncio.sleep(0.15, loop=loop)) done, pending = loop.run_until_complete( asyncio.wait([b, a], timeout=0.1, loop=loop)) @@ -1032,14 +1047,14 @@ class TaskTests(test_utils.TestCase): values.append((yield from f)) return values - res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + res = loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) self.assertTrue('a' in res[:2]) self.assertTrue('b' in res[:2]) self.assertEqual(res[2], 'c') # Doing it again should take no time and exercise a different path. - res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + res = loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) def test_as_completed_with_timeout(self): @@ -1068,7 +1083,7 @@ class TaskTests(test_utils.TestCase): values.append((2, exc)) return values - res = loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + res = loop.run_until_complete(self.new_task(loop, foo())) self.assertEqual(len(res), 2, res) self.assertEqual(res[0], (1, 'a')) self.assertEqual(res[1][0], 2) @@ -1096,7 +1111,7 @@ class TaskTests(test_utils.TestCase): v = yield from f self.assertEqual(v, 'a') - loop.run_until_complete(asyncio.Task(foo(), loop=loop)) + loop.run_until_complete(self.new_task(loop, foo())) def test_as_completed_reverse_wait(self): @@ -1156,7 +1171,7 @@ class TaskTests(test_utils.TestCase): result.append((yield from f)) return result - fut = asyncio.Task(runner(), loop=self.loop) + fut = self.new_task(self.loop, runner()) self.loop.run_until_complete(fut) result = fut.result() self.assertEqual(set(result), {'ham', 'spam'}) @@ -1179,7 +1194,7 @@ class TaskTests(test_utils.TestCase): res = yield from asyncio.sleep(dt/2, arg, loop=loop) return res - t = asyncio.Task(sleeper(0.1, 'yeah'), loop=loop) + t = self.new_task(loop, sleeper(0.1, 'yeah')) loop.run_until_complete(t) self.assertTrue(t.done()) self.assertEqual(t.result(), 'yeah') @@ -1194,8 +1209,7 @@ class TaskTests(test_utils.TestCase): loop = self.new_test_loop(gen) - t = asyncio.Task(asyncio.sleep(10.0, 'yeah', loop=loop), - loop=loop) + t = self.new_task(loop, asyncio.sleep(10.0, 'yeah', loop=loop)) handle = None orig_call_later = loop.call_later @@ -1231,7 +1245,7 @@ class TaskTests(test_utils.TestCase): @asyncio.coroutine def doit(): - sleeper = asyncio.Task(sleep(5000), loop=loop) + sleeper = self.new_task(loop, sleep(5000)) loop.call_later(0.1, sleeper.cancel) try: yield from sleeper @@ -1245,13 +1259,13 @@ class TaskTests(test_utils.TestCase): self.assertAlmostEqual(0.1, loop.time()) def test_task_cancel_waiter_future(self): - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) @asyncio.coroutine def coro(): yield from fut - task = asyncio.Task(coro(), loop=self.loop) + task = self.new_task(self.loop, coro()) test_utils.run_briefly(self.loop) self.assertIs(task._fut_waiter, fut) @@ -1268,7 +1282,7 @@ class TaskTests(test_utils.TestCase): return 'ko' gen = notmuch() - task = asyncio.Task(gen, loop=self.loop) + task = self.new_task(self.loop, gen) task.set_result('ok') self.assertRaises(AssertionError, task._step) @@ -1304,7 +1318,7 @@ class TaskTests(test_utils.TestCase): nonlocal result result = yield from fut - t = asyncio.Task(wait_for_future(), loop=self.loop) + t = self.new_task(self.loop, wait_for_future()) test_utils.run_briefly(self.loop) self.assertTrue(fut.cb_added) @@ -1320,7 +1334,7 @@ class TaskTests(test_utils.TestCase): def notmutch(): raise BaseException() - task = asyncio.Task(notmutch(), loop=self.loop) + task = self.new_task(self.loop, notmutch()) self.assertRaises(BaseException, task._step) self.assertTrue(task.done()) @@ -1348,7 +1362,7 @@ class TaskTests(test_utils.TestCase): except asyncio.CancelledError: raise base_exc - task = asyncio.Task(notmutch(), loop=loop) + task = self.new_task(loop, notmutch()) test_utils.run_briefly(loop) task.cancel() @@ -1376,7 +1390,7 @@ class TaskTests(test_utils.TestCase): self.assertTrue(asyncio.iscoroutinefunction(fn2)) def test_yield_vs_yield_from(self): - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) @asyncio.coroutine def wait_for_future(): @@ -1420,7 +1434,7 @@ class TaskTests(test_utils.TestCase): self.assertEqual(res, 'test') def test_coroutine_non_gen_function_return_future(self): - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) @asyncio.coroutine def func(): @@ -1430,49 +1444,53 @@ class TaskTests(test_utils.TestCase): def coro(): fut.set_result('test') - t1 = asyncio.Task(func(), loop=self.loop) - t2 = asyncio.Task(coro(), loop=self.loop) + t1 = self.new_task(self.loop, func()) + t2 = self.new_task(self.loop, coro()) res = self.loop.run_until_complete(t1) self.assertEqual(res, 'test') self.assertIsNone(t2.result()) def test_current_task(self): - self.assertIsNone(asyncio.Task.current_task(loop=self.loop)) + Task = self.__class__.Task + + self.assertIsNone(Task.current_task(loop=self.loop)) @asyncio.coroutine def coro(loop): - self.assertTrue(asyncio.Task.current_task(loop=loop) is task) + self.assertTrue(Task.current_task(loop=loop) is task) - task = asyncio.Task(coro(self.loop), loop=self.loop) + task = self.new_task(self.loop, coro(self.loop)) self.loop.run_until_complete(task) - self.assertIsNone(asyncio.Task.current_task(loop=self.loop)) + self.assertIsNone(Task.current_task(loop=self.loop)) def test_current_task_with_interleaving_tasks(self): - self.assertIsNone(asyncio.Task.current_task(loop=self.loop)) + Task = self.__class__.Task + + self.assertIsNone(Task.current_task(loop=self.loop)) - fut1 = asyncio.Future(loop=self.loop) - fut2 = asyncio.Future(loop=self.loop) + fut1 = self.new_future(self.loop) + fut2 = self.new_future(self.loop) @asyncio.coroutine def coro1(loop): - self.assertTrue(asyncio.Task.current_task(loop=loop) is task1) + self.assertTrue(Task.current_task(loop=loop) is task1) yield from fut1 - self.assertTrue(asyncio.Task.current_task(loop=loop) is task1) + self.assertTrue(Task.current_task(loop=loop) is task1) fut2.set_result(True) @asyncio.coroutine def coro2(loop): - self.assertTrue(asyncio.Task.current_task(loop=loop) is task2) + self.assertTrue(Task.current_task(loop=loop) is task2) fut1.set_result(True) yield from fut2 - self.assertTrue(asyncio.Task.current_task(loop=loop) is task2) + self.assertTrue(Task.current_task(loop=loop) is task2) - task1 = asyncio.Task(coro1(self.loop), loop=self.loop) - task2 = asyncio.Task(coro2(self.loop), loop=self.loop) + task1 = self.new_task(self.loop, coro1(self.loop)) + task2 = self.new_task(self.loop, coro2(self.loop)) self.loop.run_until_complete(asyncio.wait((task1, task2), loop=self.loop)) - self.assertIsNone(asyncio.Task.current_task(loop=self.loop)) + self.assertIsNone(Task.current_task(loop=self.loop)) # Some thorough tests for cancellation propagation through # coroutines, tasks and wait(). @@ -1480,7 +1498,7 @@ class TaskTests(test_utils.TestCase): def test_yield_future_passes_cancel(self): # Cancelling outer() cancels inner() cancels waiter. proof = 0 - waiter = asyncio.Future(loop=self.loop) + waiter = self.new_future(self.loop) @asyncio.coroutine def inner(): @@ -1514,7 +1532,7 @@ class TaskTests(test_utils.TestCase): # Cancelling outer() makes wait() return early, leaves inner() # running. proof = 0 - waiter = asyncio.Future(loop=self.loop) + waiter = self.new_future(self.loop) @asyncio.coroutine def inner(): @@ -1538,14 +1556,14 @@ class TaskTests(test_utils.TestCase): self.assertEqual(proof, 1) def test_shield_result(self): - inner = asyncio.Future(loop=self.loop) + inner = self.new_future(self.loop) outer = asyncio.shield(inner) inner.set_result(42) res = self.loop.run_until_complete(outer) self.assertEqual(res, 42) def test_shield_exception(self): - inner = asyncio.Future(loop=self.loop) + inner = self.new_future(self.loop) outer = asyncio.shield(inner) test_utils.run_briefly(self.loop) exc = RuntimeError('expected') @@ -1554,7 +1572,7 @@ class TaskTests(test_utils.TestCase): self.assertIs(outer.exception(), exc) def test_shield_cancel(self): - inner = asyncio.Future(loop=self.loop) + inner = self.new_future(self.loop) outer = asyncio.shield(inner) test_utils.run_briefly(self.loop) inner.cancel() @@ -1562,7 +1580,7 @@ class TaskTests(test_utils.TestCase): self.assertTrue(outer.cancelled()) def test_shield_shortcut(self): - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) fut.set_result(42) res = self.loop.run_until_complete(asyncio.shield(fut)) self.assertEqual(res, 42) @@ -1570,7 +1588,7 @@ class TaskTests(test_utils.TestCase): def test_shield_effect(self): # Cancelling outer() does not affect inner(). proof = 0 - waiter = asyncio.Future(loop=self.loop) + waiter = self.new_future(self.loop) @asyncio.coroutine def inner(): @@ -1594,8 +1612,8 @@ class TaskTests(test_utils.TestCase): self.assertEqual(proof, 1) def test_shield_gather(self): - child1 = asyncio.Future(loop=self.loop) - child2 = asyncio.Future(loop=self.loop) + child1 = self.new_future(self.loop) + child2 = self.new_future(self.loop) parent = asyncio.gather(child1, child2, loop=self.loop) outer = asyncio.shield(parent, loop=self.loop) test_utils.run_briefly(self.loop) @@ -1608,8 +1626,8 @@ class TaskTests(test_utils.TestCase): self.assertEqual(parent.result(), [1, 2]) def test_gather_shield(self): - child1 = asyncio.Future(loop=self.loop) - child2 = asyncio.Future(loop=self.loop) + child1 = self.new_future(self.loop) + child2 = self.new_future(self.loop) inner1 = asyncio.shield(child1, loop=self.loop) inner2 = asyncio.shield(child2, loop=self.loop) parent = asyncio.gather(inner1, inner2, loop=self.loop) @@ -1625,7 +1643,7 @@ class TaskTests(test_utils.TestCase): test_utils.run_briefly(self.loop) def test_as_completed_invalid_args(self): - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) # as_completed() expects a list of futures, not a future instance self.assertRaises(TypeError, self.loop.run_until_complete, @@ -1636,7 +1654,7 @@ class TaskTests(test_utils.TestCase): coro.close() def test_wait_invalid_args(self): - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) # wait() expects a list of futures, not a future instance self.assertRaises(TypeError, self.loop.run_until_complete, @@ -1663,7 +1681,7 @@ class TaskTests(test_utils.TestCase): yield from fut # A completed Future used to run the coroutine. - fut = asyncio.Future(loop=self.loop) + fut = self.new_future(self.loop) fut.set_result(None) # Call the coroutine. @@ -1697,15 +1715,15 @@ class TaskTests(test_utils.TestCase): @asyncio.coroutine def t2(): - f = asyncio.Future(loop=self.loop) - asyncio.Task(t3(f), loop=self.loop) + f = self.new_future(self.loop) + self.new_task(self.loop, t3(f)) return (yield from f) @asyncio.coroutine def t3(f): f.set_result((1, 2, 3)) - task = asyncio.Task(t1(), loop=self.loop) + task = self.new_task(self.loop, t1()) val = self.loop.run_until_complete(task) self.assertEqual(val, (1, 2, 3)) @@ -1768,9 +1786,11 @@ class TaskTests(test_utils.TestCase): @unittest.skipUnless(PY34, 'need python 3.4 or later') def test_log_destroyed_pending_task(self): + Task = self.__class__.Task + @asyncio.coroutine def kill_me(loop): - future = asyncio.Future(loop=loop) + future = self.new_future(loop) yield from future # at this point, the only reference to kill_me() task is # the Task._wakeup() method in future._callbacks @@ -1783,7 +1803,7 @@ class TaskTests(test_utils.TestCase): # schedule the task coro = kill_me(self.loop) task = asyncio.ensure_future(coro, loop=self.loop) - self.assertEqual(asyncio.Task.all_tasks(loop=self.loop), {task}) + self.assertEqual(Task.all_tasks(loop=self.loop), {task}) # execute the task so it waits for future self.loop._run_once() @@ -1798,7 +1818,7 @@ class TaskTests(test_utils.TestCase): # no more reference to kill_me() task: the task is destroyed by the GC support.gc_collect() - self.assertEqual(asyncio.Task.all_tasks(loop=self.loop), set()) + self.assertEqual(Task.all_tasks(loop=self.loop), set()) mock_handler.assert_called_with(self.loop, { 'message': 'Task was destroyed but it is pending!', @@ -1863,10 +1883,10 @@ class TaskTests(test_utils.TestCase): def test_task_source_traceback(self): self.loop.set_debug(True) - task = asyncio.Task(coroutine_function(), loop=self.loop) + task = self.new_task(self.loop, coroutine_function()) lineno = sys._getframe().f_lineno - 1 self.assertIsInstance(task._source_traceback, list) - self.assertEqual(task._source_traceback[-1][:3], + self.assertEqual(task._source_traceback[-2][:3], (__file__, lineno, 'test_task_source_traceback')) @@ -1878,7 +1898,7 @@ class TaskTests(test_utils.TestCase): @asyncio.coroutine def blocking_coroutine(): - fut = asyncio.Future(loop=loop) + fut = self.new_future(loop) # Block: fut result is never set yield from fut @@ -1905,7 +1925,7 @@ class TaskTests(test_utils.TestCase): loop = asyncio.new_event_loop() self.addCleanup(loop.close) - fut = asyncio.Future(loop=loop) + fut = self.new_future(loop) # The indirection fut->child_coro is needed since otherwise the # gathering task is done at the same time as the child future def child_coro(): @@ -1929,6 +1949,157 @@ class TaskTests(test_utils.TestCase): self.assertFalse(gather_task.cancelled()) self.assertEqual(gather_task.result(), [42]) + @mock.patch('asyncio.base_events.logger') + def test_error_in_call_soon(self, m_log): + def call_soon(callback, *args): + raise ValueError + self.loop.call_soon = call_soon + + @asyncio.coroutine + def coro(): + pass + + self.assertFalse(m_log.error.called) + + with self.assertRaises(ValueError): + self.new_task(self.loop, coro()) + + self.assertTrue(m_log.error.called) + message = m_log.error.call_args[0][0] + self.assertIn('Task was destroyed but it is pending', message) + + self.assertEqual(self.Task.all_tasks(self.loop), set()) + + +def add_subclass_tests(cls): + BaseTask = cls.Task + BaseFuture = cls.Future + + if BaseTask is None or BaseFuture is None: + return cls + + class CommonFuture: + def __init__(self, *args, **kwargs): + self.calls = collections.defaultdict(lambda: 0) + super().__init__(*args, **kwargs) + + def _schedule_callbacks(self): + self.calls['_schedule_callbacks'] += 1 + return super()._schedule_callbacks() + + def add_done_callback(self, *args): + self.calls['add_done_callback'] += 1 + return super().add_done_callback(*args) + + class Task(CommonFuture, BaseTask): + def _step(self, *args): + self.calls['_step'] += 1 + return super()._step(*args) + + def _wakeup(self, *args): + self.calls['_wakeup'] += 1 + return super()._wakeup(*args) + + class Future(CommonFuture, BaseFuture): + pass + + def test_subclasses_ctask_cfuture(self): + fut = self.Future(loop=self.loop) + + async def func(): + self.loop.call_soon(lambda: fut.set_result('spam')) + return await fut + + task = self.Task(func(), loop=self.loop) + + result = self.loop.run_until_complete(task) + + self.assertEqual(result, 'spam') + + self.assertEqual( + dict(task.calls), + {'_step': 2, '_wakeup': 1, 'add_done_callback': 1, + '_schedule_callbacks': 1}) + + self.assertEqual( + dict(fut.calls), + {'add_done_callback': 1, '_schedule_callbacks': 1}) + + # Add patched Task & Future back to the test case + cls.Task = Task + cls.Future = Future + + # Add an extra unit-test + cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture + + # Disable the "test_task_source_traceback" test + # (the test is hardcoded for a particular call stack, which + # is slightly different for Task subclasses) + cls.test_task_source_traceback = None + + return cls + + +@unittest.skipUnless(hasattr(futures, '_CFuture'), + 'requires the C _asyncio module') +class CTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): + Task = getattr(tasks, '_CTask', None) + Future = getattr(futures, '_CFuture', None) + + +@unittest.skipUnless(hasattr(futures, '_CFuture'), + 'requires the C _asyncio module') +@add_subclass_tests +class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): + Task = getattr(tasks, '_CTask', None) + Future = getattr(futures, '_CFuture', None) + + +@unittest.skipUnless(hasattr(futures, '_CFuture'), + 'requires the C _asyncio module') +class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): + Task = getattr(tasks, '_CTask', None) + Future = futures._PyFuture + + +@unittest.skipUnless(hasattr(futures, '_CFuture'), + 'requires the C _asyncio module') +class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): + Task = tasks._PyTask + Future = getattr(futures, '_CFuture', None) + + +class PyTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): + Task = tasks._PyTask + Future = futures._PyFuture + + +@add_subclass_tests +class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): + Task = tasks._PyTask + Future = futures._PyFuture + + +class GenericTaskTests(test_utils.TestCase): + + def test_future_subclass(self): + self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) + + def test_asyncio_module_compiled(self): + # Because of circular imports it's easy to make _asyncio + # module non-importable. This is a simple test that will + # fail on systems where C modules were successfully compiled + # (hence the test for _functools), but _asyncio somehow didn't. + try: + import _functools + except ImportError: + pass + else: + try: + import _asyncio + except ImportError: + self.fail('_asyncio module is missing') + class GatherTestsBase: |