diff options
author | Guido van Rossum <guido@dropbox.com> | 2013-10-17 20:40:50 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@dropbox.com> | 2013-10-17 20:40:50 (GMT) |
commit | 27b7c7ebf1039e96cac41b6330cf16b5632d9e49 (patch) | |
tree | 814505b0f9d02a5cabdec733dcde70250b04ee28 /Lib/test/test_asyncio/test_tasks.py | |
parent | 5b37f97ea5ac9f6b33b0e0269c69539cbb478142 (diff) | |
download | cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.zip cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.tar.gz cpython-27b7c7ebf1039e96cac41b6330cf16b5632d9e49.tar.bz2 |
Initial checkin of asyncio package (== Tulip, == PEP 3156).
Diffstat (limited to 'Lib/test/test_asyncio/test_tasks.py')
-rw-r--r-- | Lib/test/test_asyncio/test_tasks.py | 1518 |
1 files changed, 1518 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py new file mode 100644 index 0000000..57fb053 --- /dev/null +++ b/Lib/test/test_asyncio/test_tasks.py @@ -0,0 +1,1518 @@ +"""Tests for tasks.py.""" + +import gc +import unittest +import unittest.mock +from unittest.mock import Mock + +from asyncio import events +from asyncio import futures +from asyncio import tasks +from asyncio import test_utils + + +class Dummy: + + def __repr__(self): + return 'Dummy()' + + def __call__(self, *args): + pass + + +class TaskTests(unittest.TestCase): + + def setUp(self): + self.loop = test_utils.TestLoop() + events.set_event_loop(None) + + def tearDown(self): + self.loop.close() + gc.collect() + + def test_task_class(self): + @tasks.coroutine + def notmuch(): + return 'ok' + t = tasks.Task(notmuch(), loop=self.loop) + self.loop.run_until_complete(t) + self.assertTrue(t.done()) + self.assertEqual(t.result(), 'ok') + self.assertIs(t._loop, self.loop) + + loop = events.new_event_loop() + t = tasks.Task(notmuch(), loop=loop) + self.assertIs(t._loop, loop) + loop.close() + + def test_async_coroutine(self): + @tasks.coroutine + def notmuch(): + return 'ok' + t = tasks.async(notmuch(), loop=self.loop) + self.loop.run_until_complete(t) + self.assertTrue(t.done()) + self.assertEqual(t.result(), 'ok') + self.assertIs(t._loop, self.loop) + + loop = events.new_event_loop() + t = tasks.async(notmuch(), loop=loop) + self.assertIs(t._loop, loop) + loop.close() + + def test_async_future(self): + f_orig = futures.Future(loop=self.loop) + f_orig.set_result('ko') + + f = tasks.async(f_orig) + self.loop.run_until_complete(f) + self.assertTrue(f.done()) + self.assertEqual(f.result(), 'ko') + self.assertIs(f, f_orig) + + loop = events.new_event_loop() + + with self.assertRaises(ValueError): + f = tasks.async(f_orig, loop=loop) + + loop.close() + + f = tasks.async(f_orig, loop=self.loop) + self.assertIs(f, f_orig) + + def test_async_task(self): + @tasks.coroutine + def notmuch(): + return 'ok' + t_orig = tasks.Task(notmuch(), loop=self.loop) + t = tasks.async(t_orig) + self.loop.run_until_complete(t) + self.assertTrue(t.done()) + self.assertEqual(t.result(), 'ok') + self.assertIs(t, t_orig) + + loop = events.new_event_loop() + + with self.assertRaises(ValueError): + t = tasks.async(t_orig, loop=loop) + + loop.close() + + t = tasks.async(t_orig, loop=self.loop) + self.assertIs(t, t_orig) + + def test_async_neither(self): + with self.assertRaises(TypeError): + tasks.async('ok') + + def test_task_repr(self): + @tasks.coroutine + def notmuch(): + yield from [] + return 'abc' + + t = tasks.Task(notmuch(), loop=self.loop) + t.add_done_callback(Dummy()) + self.assertEqual(repr(t), 'Task(<notmuch>)<PENDING, [Dummy()]>') + t.cancel() # Does not take immediate effect! + self.assertEqual(repr(t), 'Task(<notmuch>)<CANCELLING, [Dummy()]>') + self.assertRaises(futures.CancelledError, + self.loop.run_until_complete, t) + self.assertEqual(repr(t), 'Task(<notmuch>)<CANCELLED>') + t = tasks.Task(notmuch(), loop=self.loop) + self.loop.run_until_complete(t) + self.assertEqual(repr(t), "Task(<notmuch>)<result='abc'>") + + def test_task_repr_custom(self): + @tasks.coroutine + def coro(): + pass + + class T(futures.Future): + def __repr__(self): + return 'T[]' + + class MyTask(tasks.Task, T): + def __repr__(self): + return super().__repr__() + + gen = coro() + t = MyTask(gen, loop=self.loop) + self.assertEqual(repr(t), 'T[](<coro>)') + gen.close() + + def test_task_basics(self): + @tasks.coroutine + def outer(): + a = yield from inner1() + b = yield from inner2() + return a+b + + @tasks.coroutine + def inner1(): + return 42 + + @tasks.coroutine + def inner2(): + return 1000 + + t = outer() + self.assertEqual(self.loop.run_until_complete(t), 1042) + + def test_cancel(self): + + def gen(): + when = yield + self.assertAlmostEqual(10.0, when) + yield 0 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + @tasks.coroutine + def task(): + yield from tasks.sleep(10.0, loop=loop) + return 12 + + t = tasks.Task(task(), loop=loop) + loop.call_soon(t.cancel) + with self.assertRaises(futures.CancelledError): + loop.run_until_complete(t) + self.assertTrue(t.done()) + self.assertTrue(t.cancelled()) + self.assertFalse(t.cancel()) + + def test_cancel_yield(self): + @tasks.coroutine + def task(): + yield + yield + return 12 + + t = tasks.Task(task(), loop=self.loop) + test_utils.run_briefly(self.loop) # start coro + t.cancel() + self.assertRaises( + futures.CancelledError, self.loop.run_until_complete, t) + self.assertTrue(t.done()) + self.assertTrue(t.cancelled()) + self.assertFalse(t.cancel()) + + def test_cancel_inner_future(self): + f = futures.Future(loop=self.loop) + + @tasks.coroutine + def task(): + yield from f + return 12 + + t = tasks.Task(task(), loop=self.loop) + test_utils.run_briefly(self.loop) # start task + f.cancel() + with self.assertRaises(futures.CancelledError): + self.loop.run_until_complete(t) + self.assertTrue(f.cancelled()) + self.assertTrue(t.cancelled()) + + def test_cancel_both_task_and_inner_future(self): + f = futures.Future(loop=self.loop) + + @tasks.coroutine + def task(): + yield from f + return 12 + + t = tasks.Task(task(), loop=self.loop) + test_utils.run_briefly(self.loop) + + f.cancel() + t.cancel() + + with self.assertRaises(futures.CancelledError): + self.loop.run_until_complete(t) + + self.assertTrue(t.done()) + self.assertTrue(f.cancelled()) + self.assertTrue(t.cancelled()) + + def test_cancel_task_catching(self): + fut1 = futures.Future(loop=self.loop) + fut2 = futures.Future(loop=self.loop) + + @tasks.coroutine + def task(): + yield from fut1 + try: + yield from fut2 + except futures.CancelledError: + return 42 + + t = tasks.Task(task(), loop=self.loop) + test_utils.run_briefly(self.loop) + self.assertIs(t._fut_waiter, fut1) # White-box test. + fut1.set_result(None) + test_utils.run_briefly(self.loop) + self.assertIs(t._fut_waiter, fut2) # White-box test. + t.cancel() + self.assertTrue(fut2.cancelled()) + res = self.loop.run_until_complete(t) + self.assertEqual(res, 42) + self.assertFalse(t.cancelled()) + + def test_cancel_task_ignoring(self): + fut1 = futures.Future(loop=self.loop) + fut2 = futures.Future(loop=self.loop) + fut3 = futures.Future(loop=self.loop) + + @tasks.coroutine + def task(): + yield from fut1 + try: + yield from fut2 + except futures.CancelledError: + pass + res = yield from fut3 + return res + + t = tasks.Task(task(), loop=self.loop) + test_utils.run_briefly(self.loop) + self.assertIs(t._fut_waiter, fut1) # White-box test. + fut1.set_result(None) + test_utils.run_briefly(self.loop) + self.assertIs(t._fut_waiter, fut2) # White-box test. + t.cancel() + self.assertTrue(fut2.cancelled()) + test_utils.run_briefly(self.loop) + self.assertIs(t._fut_waiter, fut3) # White-box test. + fut3.set_result(42) + res = self.loop.run_until_complete(t) + self.assertEqual(res, 42) + self.assertFalse(fut3.cancelled()) + self.assertFalse(t.cancelled()) + + def test_cancel_current_task(self): + loop = events.new_event_loop() + self.addCleanup(loop.close) + + @tasks.coroutine + def task(): + t.cancel() + self.assertTrue(t._must_cancel) # White-box test. + # The sleep should be cancelled immediately. + yield from tasks.sleep(100, loop=loop) + return 12 + + t = tasks.Task(task(), loop=loop) + self.assertRaises( + futures.CancelledError, loop.run_until_complete, t) + self.assertTrue(t.done()) + self.assertFalse(t._must_cancel) # White-box test. + self.assertFalse(t.cancel()) + + def test_stop_while_run_in_complete(self): + + def gen(): + when = yield + self.assertAlmostEqual(0.1, when) + when = yield 0.1 + self.assertAlmostEqual(0.2, when) + when = yield 0.1 + self.assertAlmostEqual(0.3, when) + yield 0.1 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + x = 0 + waiters = [] + + @tasks.coroutine + def task(): + nonlocal x + while x < 10: + waiters.append(tasks.sleep(0.1, loop=loop)) + yield from waiters[-1] + x += 1 + if x == 2: + loop.stop() + + t = tasks.Task(task(), loop=loop) + self.assertRaises( + RuntimeError, loop.run_until_complete, t) + self.assertFalse(t.done()) + self.assertEqual(x, 2) + self.assertAlmostEqual(0.3, loop.time()) + + # close generators + for w in waiters: + w.close() + + def test_wait_for(self): + + def gen(): + when = yield + self.assertAlmostEqual(0.2, when) + when = yield 0 + self.assertAlmostEqual(0.1, when) + when = yield 0.1 + self.assertAlmostEqual(0.4, when) + yield 0.1 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + @tasks.coroutine + def foo(): + yield from tasks.sleep(0.2, loop=loop) + return 'done' + + fut = tasks.Task(foo(), loop=loop) + + with self.assertRaises(futures.TimeoutError): + loop.run_until_complete(tasks.wait_for(fut, 0.1, loop=loop)) + + self.assertFalse(fut.done()) + self.assertAlmostEqual(0.1, loop.time()) + + # wait for result + res = loop.run_until_complete( + tasks.wait_for(fut, 0.3, loop=loop)) + self.assertEqual(res, 'done') + self.assertAlmostEqual(0.2, loop.time()) + + def test_wait_for_with_global_loop(self): + + def gen(): + when = yield + self.assertAlmostEqual(0.2, when) + when = yield 0 + self.assertAlmostEqual(0.01, when) + yield 0.01 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + @tasks.coroutine + def foo(): + yield from tasks.sleep(0.2, loop=loop) + return 'done' + + events.set_event_loop(loop) + try: + fut = tasks.Task(foo(), loop=loop) + with self.assertRaises(futures.TimeoutError): + loop.run_until_complete(tasks.wait_for(fut, 0.01)) + finally: + events.set_event_loop(None) + + self.assertAlmostEqual(0.01, loop.time()) + self.assertFalse(fut.done()) + + # move forward to close generator + loop.advance_time(10) + loop.run_until_complete(fut) + + def test_wait(self): + + def gen(): + when = yield + self.assertAlmostEqual(0.1, when) + when = yield 0 + self.assertAlmostEqual(0.15, when) + yield 0.15 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + a = tasks.Task(tasks.sleep(0.1, loop=loop), loop=loop) + b = tasks.Task(tasks.sleep(0.15, loop=loop), loop=loop) + + @tasks.coroutine + def foo(): + done, pending = yield from tasks.wait([b, a], loop=loop) + self.assertEqual(done, set([a, b])) + self.assertEqual(pending, set()) + return 42 + + res = loop.run_until_complete(tasks.Task(foo(), loop=loop)) + self.assertEqual(res, 42) + self.assertAlmostEqual(0.15, loop.time()) + + # Doing it again should take no time and exercise a different path. + res = loop.run_until_complete(tasks.Task(foo(), loop=loop)) + self.assertAlmostEqual(0.15, loop.time()) + self.assertEqual(res, 42) + + def test_wait_with_global_loop(self): + + def gen(): + when = yield + self.assertAlmostEqual(0.01, when) + when = yield 0 + self.assertAlmostEqual(0.015, when) + yield 0.015 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + a = tasks.Task(tasks.sleep(0.01, loop=loop), loop=loop) + b = tasks.Task(tasks.sleep(0.015, loop=loop), loop=loop) + + @tasks.coroutine + def foo(): + done, pending = yield from tasks.wait([b, a]) + self.assertEqual(done, set([a, b])) + self.assertEqual(pending, set()) + return 42 + + events.set_event_loop(loop) + try: + res = loop.run_until_complete( + tasks.Task(foo(), loop=loop)) + finally: + events.set_event_loop(None) + + self.assertEqual(res, 42) + + def test_wait_errors(self): + self.assertRaises( + ValueError, self.loop.run_until_complete, + tasks.wait(set(), loop=self.loop)) + + self.assertRaises( + ValueError, self.loop.run_until_complete, + tasks.wait([tasks.sleep(10.0, loop=self.loop)], + return_when=-1, loop=self.loop)) + + def test_wait_first_completed(self): + + def gen(): + when = yield + self.assertAlmostEqual(10.0, when) + when = yield 0 + self.assertAlmostEqual(0.1, when) + yield 0.1 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + a = tasks.Task(tasks.sleep(10.0, loop=loop), loop=loop) + b = tasks.Task(tasks.sleep(0.1, loop=loop), loop=loop) + task = tasks.Task( + tasks.wait([b, a], return_when=tasks.FIRST_COMPLETED, + loop=loop), + loop=loop) + + done, pending = loop.run_until_complete(task) + self.assertEqual({b}, done) + self.assertEqual({a}, pending) + self.assertFalse(a.done()) + self.assertTrue(b.done()) + self.assertIsNone(b.result()) + self.assertAlmostEqual(0.1, loop.time()) + + # move forward to close generator + loop.advance_time(10) + loop.run_until_complete(tasks.wait([a, b], loop=loop)) + + def test_wait_really_done(self): + # there is possibility that some tasks in the pending list + # became done but their callbacks haven't all been called yet + + @tasks.coroutine + def coro1(): + yield + + @tasks.coroutine + def coro2(): + yield + yield + + a = tasks.Task(coro1(), loop=self.loop) + b = tasks.Task(coro2(), loop=self.loop) + task = tasks.Task( + tasks.wait([b, a], return_when=tasks.FIRST_COMPLETED, + loop=self.loop), + loop=self.loop) + + done, pending = self.loop.run_until_complete(task) + self.assertEqual({a, b}, done) + self.assertTrue(a.done()) + self.assertIsNone(a.result()) + self.assertTrue(b.done()) + self.assertIsNone(b.result()) + + def test_wait_first_exception(self): + + def gen(): + when = yield + self.assertAlmostEqual(10.0, when) + yield 0 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + # first_exception, task already has exception + a = tasks.Task(tasks.sleep(10.0, loop=loop), loop=loop) + + @tasks.coroutine + def exc(): + raise ZeroDivisionError('err') + + b = tasks.Task(exc(), loop=loop) + task = tasks.Task( + tasks.wait([b, a], return_when=tasks.FIRST_EXCEPTION, + loop=loop), + loop=loop) + + done, pending = loop.run_until_complete(task) + self.assertEqual({b}, done) + self.assertEqual({a}, pending) + self.assertAlmostEqual(0, loop.time()) + + # move forward to close generator + loop.advance_time(10) + loop.run_until_complete(tasks.wait([a, b], loop=loop)) + + def test_wait_first_exception_in_wait(self): + + def gen(): + when = yield + self.assertAlmostEqual(10.0, when) + when = yield 0 + self.assertAlmostEqual(0.01, when) + yield 0.01 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + # first_exception, exception during waiting + a = tasks.Task(tasks.sleep(10.0, loop=loop), loop=loop) + + @tasks.coroutine + def exc(): + yield from tasks.sleep(0.01, loop=loop) + raise ZeroDivisionError('err') + + b = tasks.Task(exc(), loop=loop) + task = tasks.wait([b, a], return_when=tasks.FIRST_EXCEPTION, + loop=loop) + + done, pending = loop.run_until_complete(task) + self.assertEqual({b}, done) + self.assertEqual({a}, pending) + self.assertAlmostEqual(0.01, loop.time()) + + # move forward to close generator + loop.advance_time(10) + loop.run_until_complete(tasks.wait([a, b], loop=loop)) + + def test_wait_with_exception(self): + + def gen(): + when = yield + self.assertAlmostEqual(0.1, when) + when = yield 0 + self.assertAlmostEqual(0.15, when) + yield 0.15 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + a = tasks.Task(tasks.sleep(0.1, loop=loop), loop=loop) + + @tasks.coroutine + def sleeper(): + yield from tasks.sleep(0.15, loop=loop) + raise ZeroDivisionError('really') + + b = tasks.Task(sleeper(), loop=loop) + + @tasks.coroutine + def foo(): + done, pending = yield from tasks.wait([b, a], loop=loop) + self.assertEqual(len(done), 2) + self.assertEqual(pending, set()) + errors = set(f for f in done if f.exception() is not None) + self.assertEqual(len(errors), 1) + + loop.run_until_complete(tasks.Task(foo(), loop=loop)) + self.assertAlmostEqual(0.15, loop.time()) + + loop.run_until_complete(tasks.Task(foo(), loop=loop)) + self.assertAlmostEqual(0.15, loop.time()) + + def test_wait_with_timeout(self): + + def gen(): + when = yield + self.assertAlmostEqual(0.1, when) + when = yield 0 + self.assertAlmostEqual(0.15, when) + when = yield 0 + self.assertAlmostEqual(0.11, when) + yield 0.11 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + a = tasks.Task(tasks.sleep(0.1, loop=loop), loop=loop) + b = tasks.Task(tasks.sleep(0.15, loop=loop), loop=loop) + + @tasks.coroutine + def foo(): + done, pending = yield from tasks.wait([b, a], timeout=0.11, + loop=loop) + self.assertEqual(done, set([a])) + self.assertEqual(pending, set([b])) + + loop.run_until_complete(tasks.Task(foo(), loop=loop)) + self.assertAlmostEqual(0.11, loop.time()) + + # move forward to close generator + loop.advance_time(10) + loop.run_until_complete(tasks.wait([a, b], loop=loop)) + + def test_wait_concurrent_complete(self): + + def gen(): + when = yield + self.assertAlmostEqual(0.1, when) + when = yield 0 + self.assertAlmostEqual(0.15, when) + when = yield 0 + self.assertAlmostEqual(0.1, when) + yield 0.1 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + a = tasks.Task(tasks.sleep(0.1, loop=loop), loop=loop) + b = tasks.Task(tasks.sleep(0.15, loop=loop), loop=loop) + + done, pending = loop.run_until_complete( + tasks.wait([b, a], timeout=0.1, loop=loop)) + + self.assertEqual(done, set([a])) + self.assertEqual(pending, set([b])) + self.assertAlmostEqual(0.1, loop.time()) + + # move forward to close generator + loop.advance_time(10) + loop.run_until_complete(tasks.wait([a, b], loop=loop)) + + def test_as_completed(self): + + def gen(): + yield 0 + yield 0 + yield 0.01 + yield 0 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + completed = set() + time_shifted = False + + @tasks.coroutine + def sleeper(dt, x): + nonlocal time_shifted + yield from tasks.sleep(dt, loop=loop) + completed.add(x) + if not time_shifted and 'a' in completed and 'b' in completed: + time_shifted = True + loop.advance_time(0.14) + return x + + a = sleeper(0.01, 'a') + b = sleeper(0.01, 'b') + c = sleeper(0.15, 'c') + + @tasks.coroutine + def foo(): + values = [] + for f in tasks.as_completed([b, c, a], loop=loop): + values.append((yield from f)) + return values + + res = loop.run_until_complete(tasks.Task(foo(), loop=loop)) + self.assertAlmostEqual(0.15, loop.time()) + self.assertTrue('a' in res[:2]) + self.assertTrue('b' in res[:2]) + self.assertEqual(res[2], 'c') + + # Doing it again should take no time and exercise a different path. + res = loop.run_until_complete(tasks.Task(foo(), loop=loop)) + self.assertAlmostEqual(0.15, loop.time()) + + def test_as_completed_with_timeout(self): + + def gen(): + when = yield + self.assertAlmostEqual(0.12, when) + when = yield 0 + self.assertAlmostEqual(0.1, when) + when = yield 0 + self.assertAlmostEqual(0.15, when) + when = yield 0.1 + self.assertAlmostEqual(0.12, when) + yield 0.02 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + a = tasks.sleep(0.1, 'a', loop=loop) + b = tasks.sleep(0.15, 'b', loop=loop) + + @tasks.coroutine + def foo(): + values = [] + for f in tasks.as_completed([a, b], timeout=0.12, loop=loop): + try: + v = yield from f + values.append((1, v)) + except futures.TimeoutError as exc: + values.append((2, exc)) + return values + + res = loop.run_until_complete(tasks.Task(foo(), loop=loop)) + self.assertEqual(len(res), 2, res) + self.assertEqual(res[0], (1, 'a')) + self.assertEqual(res[1][0], 2) + self.assertTrue(isinstance(res[1][1], futures.TimeoutError)) + self.assertAlmostEqual(0.12, loop.time()) + + # move forward to close generator + loop.advance_time(10) + loop.run_until_complete(tasks.wait([a, b], loop=loop)) + + def test_as_completed_reverse_wait(self): + + def gen(): + yield 0 + yield 0.05 + yield 0 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + a = tasks.sleep(0.05, 'a', loop=loop) + b = tasks.sleep(0.10, 'b', loop=loop) + fs = {a, b} + futs = list(tasks.as_completed(fs, loop=loop)) + self.assertEqual(len(futs), 2) + + x = loop.run_until_complete(futs[1]) + self.assertEqual(x, 'a') + self.assertAlmostEqual(0.05, loop.time()) + loop.advance_time(0.05) + y = loop.run_until_complete(futs[0]) + self.assertEqual(y, 'b') + self.assertAlmostEqual(0.10, loop.time()) + + def test_as_completed_concurrent(self): + + def gen(): + when = yield + self.assertAlmostEqual(0.05, when) + when = yield 0 + self.assertAlmostEqual(0.05, when) + yield 0.05 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + a = tasks.sleep(0.05, 'a', loop=loop) + b = tasks.sleep(0.05, 'b', loop=loop) + fs = {a, b} + futs = list(tasks.as_completed(fs, loop=loop)) + self.assertEqual(len(futs), 2) + waiter = tasks.wait(futs, loop=loop) + done, pending = loop.run_until_complete(waiter) + self.assertEqual(set(f.result() for f in done), {'a', 'b'}) + + def test_sleep(self): + + def gen(): + when = yield + self.assertAlmostEqual(0.05, when) + when = yield 0.05 + self.assertAlmostEqual(0.1, when) + yield 0.05 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + @tasks.coroutine + def sleeper(dt, arg): + yield from tasks.sleep(dt/2, loop=loop) + res = yield from tasks.sleep(dt/2, arg, loop=loop) + return res + + t = tasks.Task(sleeper(0.1, 'yeah'), loop=loop) + loop.run_until_complete(t) + self.assertTrue(t.done()) + self.assertEqual(t.result(), 'yeah') + self.assertAlmostEqual(0.1, loop.time()) + + def test_sleep_cancel(self): + + def gen(): + when = yield + self.assertAlmostEqual(10.0, when) + yield 0 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + t = tasks.Task(tasks.sleep(10.0, 'yeah', loop=loop), + loop=loop) + + handle = None + orig_call_later = loop.call_later + + def call_later(self, delay, callback, *args): + nonlocal handle + handle = orig_call_later(self, delay, callback, *args) + return handle + + loop.call_later = call_later + test_utils.run_briefly(loop) + + self.assertFalse(handle._cancelled) + + t.cancel() + test_utils.run_briefly(loop) + self.assertTrue(handle._cancelled) + + def test_task_cancel_sleeping_task(self): + + def gen(): + when = yield + self.assertAlmostEqual(0.1, when) + when = yield 0 + self.assertAlmostEqual(5000, when) + yield 0.1 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + sleepfut = None + + @tasks.coroutine + def sleep(dt): + nonlocal sleepfut + sleepfut = tasks.sleep(dt, loop=loop) + yield from sleepfut + + @tasks.coroutine + def doit(): + sleeper = tasks.Task(sleep(5000), loop=loop) + loop.call_later(0.1, sleeper.cancel) + try: + yield from sleeper + except futures.CancelledError: + return 'cancelled' + else: + return 'slept in' + + doer = doit() + self.assertEqual(loop.run_until_complete(doer), 'cancelled') + self.assertAlmostEqual(0.1, loop.time()) + + def test_task_cancel_waiter_future(self): + fut = futures.Future(loop=self.loop) + + @tasks.coroutine + def coro(): + yield from fut + + task = tasks.Task(coro(), loop=self.loop) + test_utils.run_briefly(self.loop) + self.assertIs(task._fut_waiter, fut) + + task.cancel() + test_utils.run_briefly(self.loop) + self.assertRaises( + futures.CancelledError, self.loop.run_until_complete, task) + self.assertIsNone(task._fut_waiter) + self.assertTrue(fut.cancelled()) + + def test_step_in_completed_task(self): + @tasks.coroutine + def notmuch(): + return 'ko' + + gen = notmuch() + task = tasks.Task(gen, loop=self.loop) + task.set_result('ok') + + self.assertRaises(AssertionError, task._step) + gen.close() + + def test_step_result(self): + @tasks.coroutine + def notmuch(): + yield None + yield 1 + return 'ko' + + self.assertRaises( + RuntimeError, self.loop.run_until_complete, notmuch()) + + def test_step_result_future(self): + # If coroutine returns future, task waits on this future. + + class Fut(futures.Future): + def __init__(self, *args, **kwds): + self.cb_added = False + super().__init__(*args, **kwds) + + def add_done_callback(self, fn): + self.cb_added = True + super().add_done_callback(fn) + + fut = Fut(loop=self.loop) + result = None + + @tasks.coroutine + def wait_for_future(): + nonlocal result + result = yield from fut + + t = tasks.Task(wait_for_future(), loop=self.loop) + test_utils.run_briefly(self.loop) + self.assertTrue(fut.cb_added) + + res = object() + fut.set_result(res) + test_utils.run_briefly(self.loop) + self.assertIs(res, result) + self.assertTrue(t.done()) + self.assertIsNone(t.result()) + + def test_step_with_baseexception(self): + @tasks.coroutine + def notmutch(): + raise BaseException() + + task = tasks.Task(notmutch(), loop=self.loop) + self.assertRaises(BaseException, task._step) + + self.assertTrue(task.done()) + self.assertIsInstance(task.exception(), BaseException) + + def test_baseexception_during_cancel(self): + + def gen(): + when = yield + self.assertAlmostEqual(10.0, when) + yield 0 + + loop = test_utils.TestLoop(gen) + self.addCleanup(loop.close) + + @tasks.coroutine + def sleeper(): + yield from tasks.sleep(10, loop=loop) + + base_exc = BaseException() + + @tasks.coroutine + def notmutch(): + try: + yield from sleeper() + except futures.CancelledError: + raise base_exc + + task = tasks.Task(notmutch(), loop=loop) + test_utils.run_briefly(loop) + + task.cancel() + self.assertFalse(task.done()) + + self.assertRaises(BaseException, test_utils.run_briefly, loop) + + self.assertTrue(task.done()) + self.assertFalse(task.cancelled()) + self.assertIs(task.exception(), base_exc) + + def test_iscoroutinefunction(self): + def fn(): + pass + + self.assertFalse(tasks.iscoroutinefunction(fn)) + + def fn1(): + yield + self.assertFalse(tasks.iscoroutinefunction(fn1)) + + @tasks.coroutine + def fn2(): + yield + self.assertTrue(tasks.iscoroutinefunction(fn2)) + + def test_yield_vs_yield_from(self): + fut = futures.Future(loop=self.loop) + + @tasks.coroutine + def wait_for_future(): + yield fut + + task = wait_for_future() + with self.assertRaises(RuntimeError): + self.loop.run_until_complete(task) + + self.assertFalse(fut.done()) + + def test_yield_vs_yield_from_generator(self): + @tasks.coroutine + def coro(): + yield + + @tasks.coroutine + def wait_for_future(): + gen = coro() + try: + yield gen + finally: + gen.close() + + task = wait_for_future() + self.assertRaises( + RuntimeError, + self.loop.run_until_complete, task) + + def test_coroutine_non_gen_function(self): + @tasks.coroutine + def func(): + return 'test' + + self.assertTrue(tasks.iscoroutinefunction(func)) + + coro = func() + self.assertTrue(tasks.iscoroutine(coro)) + + res = self.loop.run_until_complete(coro) + self.assertEqual(res, 'test') + + def test_coroutine_non_gen_function_return_future(self): + fut = futures.Future(loop=self.loop) + + @tasks.coroutine + def func(): + return fut + + @tasks.coroutine + def coro(): + fut.set_result('test') + + t1 = tasks.Task(func(), loop=self.loop) + t2 = tasks.Task(coro(), loop=self.loop) + res = self.loop.run_until_complete(t1) + self.assertEqual(res, 'test') + self.assertIsNone(t2.result()) + + # Some thorough tests for cancellation propagation through + # coroutines, tasks and wait(). + + def test_yield_future_passes_cancel(self): + # Cancelling outer() cancels inner() cancels waiter. + proof = 0 + waiter = futures.Future(loop=self.loop) + + @tasks.coroutine + def inner(): + nonlocal proof + try: + yield from waiter + except futures.CancelledError: + proof += 1 + raise + else: + self.fail('got past sleep() in inner()') + + @tasks.coroutine + def outer(): + nonlocal proof + try: + yield from inner() + except futures.CancelledError: + proof += 100 # Expect this path. + else: + proof += 10 + + f = tasks.async(outer(), loop=self.loop) + test_utils.run_briefly(self.loop) + f.cancel() + self.loop.run_until_complete(f) + self.assertEqual(proof, 101) + self.assertTrue(waiter.cancelled()) + + def test_yield_wait_does_not_shield_cancel(self): + # Cancelling outer() makes wait() return early, leaves inner() + # running. + proof = 0 + waiter = futures.Future(loop=self.loop) + + @tasks.coroutine + def inner(): + nonlocal proof + yield from waiter + proof += 1 + + @tasks.coroutine + def outer(): + nonlocal proof + d, p = yield from tasks.wait([inner()], loop=self.loop) + proof += 100 + + f = tasks.async(outer(), loop=self.loop) + test_utils.run_briefly(self.loop) + f.cancel() + self.assertRaises( + futures.CancelledError, self.loop.run_until_complete, f) + waiter.set_result(None) + test_utils.run_briefly(self.loop) + self.assertEqual(proof, 1) + + def test_shield_result(self): + inner = futures.Future(loop=self.loop) + outer = tasks.shield(inner) + inner.set_result(42) + res = self.loop.run_until_complete(outer) + self.assertEqual(res, 42) + + def test_shield_exception(self): + inner = futures.Future(loop=self.loop) + outer = tasks.shield(inner) + test_utils.run_briefly(self.loop) + exc = RuntimeError('expected') + inner.set_exception(exc) + test_utils.run_briefly(self.loop) + self.assertIs(outer.exception(), exc) + + def test_shield_cancel(self): + inner = futures.Future(loop=self.loop) + outer = tasks.shield(inner) + test_utils.run_briefly(self.loop) + inner.cancel() + test_utils.run_briefly(self.loop) + self.assertTrue(outer.cancelled()) + + def test_shield_shortcut(self): + fut = futures.Future(loop=self.loop) + fut.set_result(42) + res = self.loop.run_until_complete(tasks.shield(fut)) + self.assertEqual(res, 42) + + def test_shield_effect(self): + # Cancelling outer() does not affect inner(). + proof = 0 + waiter = futures.Future(loop=self.loop) + + @tasks.coroutine + def inner(): + nonlocal proof + yield from waiter + proof += 1 + + @tasks.coroutine + def outer(): + nonlocal proof + yield from tasks.shield(inner(), loop=self.loop) + proof += 100 + + f = tasks.async(outer(), loop=self.loop) + test_utils.run_briefly(self.loop) + f.cancel() + with self.assertRaises(futures.CancelledError): + self.loop.run_until_complete(f) + waiter.set_result(None) + test_utils.run_briefly(self.loop) + self.assertEqual(proof, 1) + + def test_shield_gather(self): + child1 = futures.Future(loop=self.loop) + child2 = futures.Future(loop=self.loop) + parent = tasks.gather(child1, child2, loop=self.loop) + outer = tasks.shield(parent, loop=self.loop) + test_utils.run_briefly(self.loop) + outer.cancel() + test_utils.run_briefly(self.loop) + self.assertTrue(outer.cancelled()) + child1.set_result(1) + child2.set_result(2) + test_utils.run_briefly(self.loop) + self.assertEqual(parent.result(), [1, 2]) + + def test_gather_shield(self): + child1 = futures.Future(loop=self.loop) + child2 = futures.Future(loop=self.loop) + inner1 = tasks.shield(child1, loop=self.loop) + inner2 = tasks.shield(child2, loop=self.loop) + parent = tasks.gather(inner1, inner2, loop=self.loop) + test_utils.run_briefly(self.loop) + parent.cancel() + # This should cancel inner1 and inner2 but bot child1 and child2. + test_utils.run_briefly(self.loop) + self.assertIsInstance(parent.exception(), futures.CancelledError) + self.assertTrue(inner1.cancelled()) + self.assertTrue(inner2.cancelled()) + child1.set_result(1) + child2.set_result(2) + test_utils.run_briefly(self.loop) + + +class GatherTestsBase: + + def setUp(self): + self.one_loop = test_utils.TestLoop() + self.other_loop = test_utils.TestLoop() + + def tearDown(self): + self.one_loop.close() + self.other_loop.close() + + def _run_loop(self, loop): + while loop._ready: + test_utils.run_briefly(loop) + + def _check_success(self, **kwargs): + a, b, c = [futures.Future(loop=self.one_loop) for i in range(3)] + fut = tasks.gather(*self.wrap_futures(a, b, c), **kwargs) + cb = Mock() + fut.add_done_callback(cb) + b.set_result(1) + a.set_result(2) + self._run_loop(self.one_loop) + self.assertEqual(cb.called, False) + self.assertFalse(fut.done()) + c.set_result(3) + self._run_loop(self.one_loop) + cb.assert_called_once_with(fut) + self.assertEqual(fut.result(), [2, 1, 3]) + + def test_success(self): + self._check_success() + self._check_success(return_exceptions=False) + + def test_result_exception_success(self): + self._check_success(return_exceptions=True) + + def test_one_exception(self): + a, b, c, d, e = [futures.Future(loop=self.one_loop) for i in range(5)] + fut = tasks.gather(*self.wrap_futures(a, b, c, d, e)) + cb = Mock() + fut.add_done_callback(cb) + exc = ZeroDivisionError() + a.set_result(1) + b.set_exception(exc) + self._run_loop(self.one_loop) + self.assertTrue(fut.done()) + cb.assert_called_once_with(fut) + self.assertIs(fut.exception(), exc) + # Does nothing + c.set_result(3) + d.cancel() + e.set_exception(RuntimeError()) + + def test_return_exceptions(self): + a, b, c, d = [futures.Future(loop=self.one_loop) for i in range(4)] + fut = tasks.gather(*self.wrap_futures(a, b, c, d), + return_exceptions=True) + cb = Mock() + fut.add_done_callback(cb) + exc = ZeroDivisionError() + exc2 = RuntimeError() + b.set_result(1) + c.set_exception(exc) + a.set_result(3) + self._run_loop(self.one_loop) + self.assertFalse(fut.done()) + d.set_exception(exc2) + self._run_loop(self.one_loop) + self.assertTrue(fut.done()) + cb.assert_called_once_with(fut) + self.assertEqual(fut.result(), [3, 1, exc, exc2]) + + +class FutureGatherTests(GatherTestsBase, unittest.TestCase): + + def wrap_futures(self, *futures): + return futures + + def _check_empty_sequence(self, seq_or_iter): + events.set_event_loop(self.one_loop) + self.addCleanup(events.set_event_loop, None) + fut = tasks.gather(*seq_or_iter) + self.assertIsInstance(fut, futures.Future) + self.assertIs(fut._loop, self.one_loop) + self._run_loop(self.one_loop) + self.assertTrue(fut.done()) + self.assertEqual(fut.result(), []) + fut = tasks.gather(*seq_or_iter, loop=self.other_loop) + self.assertIs(fut._loop, self.other_loop) + + def test_constructor_empty_sequence(self): + self._check_empty_sequence([]) + self._check_empty_sequence(()) + self._check_empty_sequence(set()) + self._check_empty_sequence(iter("")) + + def test_constructor_heterogenous_futures(self): + fut1 = futures.Future(loop=self.one_loop) + fut2 = futures.Future(loop=self.other_loop) + with self.assertRaises(ValueError): + tasks.gather(fut1, fut2) + with self.assertRaises(ValueError): + tasks.gather(fut1, loop=self.other_loop) + + def test_constructor_homogenous_futures(self): + children = [futures.Future(loop=self.other_loop) for i in range(3)] + fut = tasks.gather(*children) + self.assertIs(fut._loop, self.other_loop) + self._run_loop(self.other_loop) + self.assertFalse(fut.done()) + fut = tasks.gather(*children, loop=self.other_loop) + self.assertIs(fut._loop, self.other_loop) + self._run_loop(self.other_loop) + self.assertFalse(fut.done()) + + def test_one_cancellation(self): + a, b, c, d, e = [futures.Future(loop=self.one_loop) for i in range(5)] + fut = tasks.gather(a, b, c, d, e) + cb = Mock() + fut.add_done_callback(cb) + a.set_result(1) + b.cancel() + self._run_loop(self.one_loop) + self.assertTrue(fut.done()) + cb.assert_called_once_with(fut) + self.assertFalse(fut.cancelled()) + self.assertIsInstance(fut.exception(), futures.CancelledError) + # Does nothing + c.set_result(3) + d.cancel() + e.set_exception(RuntimeError()) + + def test_result_exception_one_cancellation(self): + a, b, c, d, e, f = [futures.Future(loop=self.one_loop) + for i in range(6)] + fut = tasks.gather(a, b, c, d, e, f, return_exceptions=True) + cb = Mock() + fut.add_done_callback(cb) + a.set_result(1) + zde = ZeroDivisionError() + b.set_exception(zde) + c.cancel() + self._run_loop(self.one_loop) + self.assertFalse(fut.done()) + d.set_result(3) + e.cancel() + rte = RuntimeError() + f.set_exception(rte) + res = self.one_loop.run_until_complete(fut) + self.assertIsInstance(res[2], futures.CancelledError) + self.assertIsInstance(res[4], futures.CancelledError) + res[2] = res[4] = None + self.assertEqual(res, [1, zde, None, 3, None, rte]) + cb.assert_called_once_with(fut) + + +class CoroutineGatherTests(GatherTestsBase, unittest.TestCase): + + def setUp(self): + super().setUp() + events.set_event_loop(self.one_loop) + + def tearDown(self): + events.set_event_loop(None) + super().tearDown() + + def wrap_futures(self, *futures): + coros = [] + for fut in futures: + @tasks.coroutine + def coro(fut=fut): + return (yield from fut) + coros.append(coro()) + return coros + + def test_constructor_loop_selection(self): + @tasks.coroutine + def coro(): + return 'abc' + gen1 = coro() + gen2 = coro() + fut = tasks.gather(gen1, gen2) + self.assertIs(fut._loop, self.one_loop) + gen1.close() + gen2.close() + gen3 = coro() + gen4 = coro() + fut = tasks.gather(gen3, gen4, loop=self.other_loop) + self.assertIs(fut._loop, self.other_loop) + gen3.close() + gen4.close() + + def test_cancellation_broadcast(self): + # Cancelling outer() cancels all children. + proof = 0 + waiter = futures.Future(loop=self.one_loop) + + @tasks.coroutine + def inner(): + nonlocal proof + yield from waiter + proof += 1 + + child1 = tasks.async(inner(), loop=self.one_loop) + child2 = tasks.async(inner(), loop=self.one_loop) + gatherer = None + + @tasks.coroutine + def outer(): + nonlocal proof, gatherer + gatherer = tasks.gather(child1, child2, loop=self.one_loop) + yield from gatherer + proof += 100 + + f = tasks.async(outer(), loop=self.one_loop) + test_utils.run_briefly(self.one_loop) + self.assertTrue(f.cancel()) + with self.assertRaises(futures.CancelledError): + self.one_loop.run_until_complete(f) + self.assertFalse(gatherer.cancel()) + self.assertTrue(waiter.cancelled()) + self.assertTrue(child1.cancelled()) + self.assertTrue(child2.cancelled()) + test_utils.run_briefly(self.one_loop) + self.assertEqual(proof, 0) + + def test_exception_marking(self): + # Test for the first line marked "Mark exception retrieved." + + @tasks.coroutine + def inner(f): + yield from f + raise RuntimeError('should not be ignored') + + a = futures.Future(loop=self.one_loop) + b = futures.Future(loop=self.one_loop) + + @tasks.coroutine + def outer(): + yield from tasks.gather(inner(a), inner(b), loop=self.one_loop) + + f = tasks.async(outer(), loop=self.one_loop) + test_utils.run_briefly(self.one_loop) + a.set_result(None) + test_utils.run_briefly(self.one_loop) + b.set_result(None) + test_utils.run_briefly(self.one_loop) + self.assertIsInstance(f.exception(), RuntimeError) + + +if __name__ == '__main__': + unittest.main() |