diff options
Diffstat (limited to 'Lib/test/test_asyncio/test_tasks.py')
-rw-r--r-- | Lib/test/test_asyncio/test_tasks.py | 206 |
1 files changed, 183 insertions, 23 deletions
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index a32dca1..5429facb 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -1493,53 +1493,69 @@ class BaseTaskTests: self.assertEqual(res, 'test') self.assertIsNone(t2.result()) - def test_current_task(self): + + def test_current_task_deprecated(self): Task = self.__class__.Task - self.assertIsNone(Task.current_task(loop=self.loop)) + with self.assertWarns(PendingDeprecationWarning): + self.assertIsNone(Task.current_task(loop=self.loop)) - @asyncio.coroutine - def coro(loop): - self.assertTrue(Task.current_task(loop=loop) is task) + async def coro(loop): + with self.assertWarns(PendingDeprecationWarning): + self.assertIs(Task.current_task(loop=loop), task) # See http://bugs.python.org/issue29271 for details: asyncio.set_event_loop(loop) try: - self.assertIs(Task.current_task(None), task) - self.assertIs(Task.current_task(), task) + with self.assertWarns(PendingDeprecationWarning): + self.assertIs(Task.current_task(None), task) + with self.assertWarns(PendingDeprecationWarning): + self.assertIs(Task.current_task(), task) finally: asyncio.set_event_loop(None) task = self.new_task(self.loop, coro(self.loop)) self.loop.run_until_complete(task) - self.assertIsNone(Task.current_task(loop=self.loop)) + with self.assertWarns(PendingDeprecationWarning): + self.assertIsNone(Task.current_task(loop=self.loop)) - def test_current_task_with_interleaving_tasks(self): - Task = self.__class__.Task + def test_current_task(self): + self.assertIsNone(asyncio.current_task(loop=self.loop)) - self.assertIsNone(Task.current_task(loop=self.loop)) + async def coro(loop): + self.assertIs(asyncio.current_task(loop=loop), task) + + self.assertIs(asyncio.current_task(None), task) + self.assertIs(asyncio.current_task(), task) + + task = self.new_task(self.loop, coro(self.loop)) + self.loop.run_until_complete(task) + self.assertIsNone(asyncio.current_task(loop=self.loop)) + + def test_current_task_with_interleaving_tasks(self): + self.assertIsNone(asyncio.current_task(loop=self.loop)) fut1 = self.new_future(self.loop) fut2 = self.new_future(self.loop) async def coro1(loop): - self.assertTrue(Task.current_task(loop=loop) is task1) + self.assertTrue(asyncio.current_task(loop=loop) is task1) await fut1 - self.assertTrue(Task.current_task(loop=loop) is task1) + self.assertTrue(asyncio.current_task(loop=loop) is task1) fut2.set_result(True) async def coro2(loop): - self.assertTrue(Task.current_task(loop=loop) is task2) + self.assertTrue(asyncio.current_task(loop=loop) is task2) fut1.set_result(True) await fut2 - self.assertTrue(Task.current_task(loop=loop) is task2) + self.assertTrue(asyncio.current_task(loop=loop) is task2) task1 = self.new_task(self.loop, coro1(self.loop)) task2 = self.new_task(self.loop, coro2(self.loop)) self.loop.run_until_complete(asyncio.wait((task1, task2), loop=self.loop)) - self.assertIsNone(Task.current_task(loop=self.loop)) + self.assertIsNone(asyncio.current_task(loop=self.loop)) # Some thorough tests for cancellation propagation through # coroutines, tasks and wait(). @@ -1826,6 +1842,16 @@ class BaseTaskTests: self.assertIsInstance(exception, Exception) self.assertEqual(exception.args, ("foo", )) + def test_all_tasks_deprecated(self): + Task = self.__class__.Task + + async def coro(): + with self.assertWarns(PendingDeprecationWarning): + assert Task.all_tasks(self.loop) == {t} + + t = self.new_task(self.loop, coro()) + self.loop.run_until_complete(t) + def test_log_destroyed_pending_task(self): Task = self.__class__.Task @@ -1845,13 +1871,13 @@ class BaseTaskTests: coro = kill_me(self.loop) task = asyncio.ensure_future(coro, loop=self.loop) - self.assertEqual(Task.all_tasks(loop=self.loop), {task}) + self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) # See http://bugs.python.org/issue29271 for details: asyncio.set_event_loop(self.loop) try: - self.assertEqual(Task.all_tasks(), {task}) - self.assertEqual(Task.all_tasks(None), {task}) + self.assertEqual(asyncio.all_tasks(), {task}) + self.assertEqual(asyncio.all_tasks(None), {task}) finally: asyncio.set_event_loop(None) @@ -1868,7 +1894,7 @@ class BaseTaskTests: # no more reference to kill_me() task: the task is destroyed by the GC support.gc_collect() - self.assertEqual(Task.all_tasks(loop=self.loop), set()) + self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) mock_handler.assert_called_with(self.loop, { 'message': 'Task was destroyed but it is pending!', @@ -2052,7 +2078,7 @@ class BaseTaskTests: message = m_log.error.call_args[0][0] self.assertIn('Task was destroyed but it is pending', message) - self.assertEqual(self.Task.all_tasks(self.loop), set()) + self.assertEqual(asyncio.all_tasks(self.loop), set()) def test_create_task_with_noncoroutine(self): with self.assertRaisesRegex(TypeError, @@ -2201,6 +2227,140 @@ class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): Future = futures._PyFuture +class BaseTaskIntrospectionTests: + _register_task = None + _unregister_task = None + _enter_task = None + _leave_task = None + + def test__register_task(self): + task = mock.Mock() + loop = mock.Mock() + self.assertEqual(asyncio.all_tasks(loop), set()) + self._register_task(loop, task) + self.assertEqual(asyncio.all_tasks(loop), {task}) + self._unregister_task(loop, task) + + def test__enter_task(self): + task = mock.Mock() + loop = mock.Mock() + self.assertIsNone(asyncio.current_task(loop)) + self._enter_task(loop, task) + self.assertIs(asyncio.current_task(loop), task) + self._leave_task(loop, task) + + def test__enter_task_failure(self): + task1 = mock.Mock() + task2 = mock.Mock() + loop = mock.Mock() + self._enter_task(loop, task1) + with self.assertRaises(RuntimeError): + self._enter_task(loop, task2) + self.assertIs(asyncio.current_task(loop), task1) + self._leave_task(loop, task1) + + def test__leave_task(self): + task = mock.Mock() + loop = mock.Mock() + self._enter_task(loop, task) + self._leave_task(loop, task) + self.assertIsNone(asyncio.current_task(loop)) + + def test__leave_task_failure1(self): + task1 = mock.Mock() + task2 = mock.Mock() + loop = mock.Mock() + self._enter_task(loop, task1) + with self.assertRaises(RuntimeError): + self._leave_task(loop, task2) + self.assertIs(asyncio.current_task(loop), task1) + self._leave_task(loop, task1) + + def test__leave_task_failure2(self): + task = mock.Mock() + loop = mock.Mock() + with self.assertRaises(RuntimeError): + self._leave_task(loop, task) + self.assertIsNone(asyncio.current_task(loop)) + + def test__unregister_task(self): + task = mock.Mock() + loop = mock.Mock() + self._register_task(loop, task) + self._unregister_task(loop, task) + self.assertEqual(asyncio.all_tasks(loop), set()) + + def test__unregister_task_not_registered(self): + task = mock.Mock() + loop = mock.Mock() + self._unregister_task(loop, task) + self.assertEqual(asyncio.all_tasks(loop), set()) + + +class PyIntrospectionTests(unittest.TestCase, BaseTaskIntrospectionTests): + _register_task = staticmethod(tasks._py_register_task) + _unregister_task = staticmethod(tasks._py_unregister_task) + _enter_task = staticmethod(tasks._py_enter_task) + _leave_task = staticmethod(tasks._py_leave_task) + + +@unittest.skipUnless(hasattr(tasks, '_c_register_task'), + 'requires the C _asyncio module') +class CIntrospectionTests(unittest.TestCase, BaseTaskIntrospectionTests): + _register_task = staticmethod(tasks._c_register_task) + _unregister_task = staticmethod(tasks._c_unregister_task) + _enter_task = staticmethod(tasks._c_enter_task) + _leave_task = staticmethod(tasks._c_leave_task) + + +class BaseCurrentLoopTests: + + def setUp(self): + super().setUp() + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + self.loop.close() + asyncio.set_event_loop(None) + super().tearDown() + + def new_task(self, coro): + raise NotImplementedError + + def test_current_task_no_running_loop(self): + self.assertIsNone(asyncio.current_task(loop=self.loop)) + + def test_current_task_no_running_loop_implicit(self): + with self.assertRaises(RuntimeError): + asyncio.current_task() + + def test_current_task_with_implicit_loop(self): + async def coro(): + self.assertIs(asyncio.current_task(loop=self.loop), task) + + self.assertIs(asyncio.current_task(None), task) + self.assertIs(asyncio.current_task(), task) + + task = self.new_task(coro()) + self.loop.run_until_complete(task) + self.assertIsNone(asyncio.current_task(loop=self.loop)) + + +class PyCurrentLoopTests(BaseCurrentLoopTests, unittest.TestCase): + + def new_task(self, coro): + return tasks._PyTask(coro, loop=self.loop) + + +@unittest.skipUnless(hasattr(tasks, '_CTask'), + 'requires the C _asyncio module') +class CCurrentLoopTests(BaseCurrentLoopTests, unittest.TestCase): + + def new_task(self, coro): + return getattr(tasks, '_CTask')(coro, loop=self.loop) + + class GenericTaskTests(test_utils.TestCase): def test_future_subclass(self): @@ -2522,7 +2682,7 @@ class RunCoroutineThreadsafeTests(test_utils.TestCase): if fail: raise RuntimeError("Fail!") if cancel: - asyncio.tasks.Task.current_task(self.loop).cancel() + asyncio.current_task(self.loop).cancel() yield return a + b @@ -2568,7 +2728,7 @@ class RunCoroutineThreadsafeTests(test_utils.TestCase): self.loop.run_until_complete(future) test_utils.run_briefly(self.loop) # Check that there's no pending task (add has been cancelled) - for task in asyncio.Task.all_tasks(self.loop): + for task in asyncio.all_tasks(self.loop): self.assertTrue(task.done()) def test_run_coroutine_threadsafe_task_cancelled(self): |