diff options
Diffstat (limited to 'Lib/test/test_asyncio/test_tasks.py')
| -rw-r--r-- | Lib/test/test_asyncio/test_tasks.py | 67 | 
1 files changed, 67 insertions, 0 deletions
| diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 16d3d9d..8ec5d9c 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -2100,5 +2100,72 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):          self.assertIsInstance(f.exception(), RuntimeError) +class RunCoroutineThreadsafeTests(test_utils.TestCase): +    """Test case for futures.submit_to_loop.""" + +    def setUp(self): +        self.loop = self.new_test_loop(self.time_gen) + +    def time_gen(self): +        """Handle the timer.""" +        yield 0  # second +        yield 1  # second + +    @asyncio.coroutine +    def add(self, a, b, fail=False, cancel=False): +        """Wait 1 second and return a + b.""" +        yield from asyncio.sleep(1, loop=self.loop) +        if fail: +            raise RuntimeError("Fail!") +        if cancel: +            asyncio.tasks.Task.current_task(self.loop).cancel() +            yield +        return a + b + +    def target(self, fail=False, cancel=False, timeout=None): +        """Run add coroutine in the event loop.""" +        coro = self.add(1, 2, fail=fail, cancel=cancel) +        future = asyncio.run_coroutine_threadsafe(coro, self.loop) +        try: +            return future.result(timeout) +        finally: +            future.done() or future.cancel() + +    def test_run_coroutine_threadsafe(self): +        """Test coroutine submission from a thread to an event loop.""" +        future = self.loop.run_in_executor(None, self.target) +        result = self.loop.run_until_complete(future) +        self.assertEqual(result, 3) + +    def test_run_coroutine_threadsafe_with_exception(self): +        """Test coroutine submission from a thread to an event loop +        when an exception is raised.""" +        future = self.loop.run_in_executor(None, self.target, True) +        with self.assertRaises(RuntimeError) as exc_context: +            self.loop.run_until_complete(future) +        self.assertIn("Fail!", exc_context.exception.args) + +    def test_run_coroutine_threadsafe_with_timeout(self): +        """Test coroutine submission from a thread to an event loop +        when a timeout is raised.""" +        callback = lambda: self.target(timeout=0) +        future = self.loop.run_in_executor(None, callback) +        with self.assertRaises(asyncio.TimeoutError): +            self.loop.run_until_complete(future) +        # Clear the time generator and tasks +        test_utils.run_briefly(self.loop) +        # Check that there's no pending task (add has been cancelled) +        for task in asyncio.Task.all_tasks(self.loop): +            self.assertTrue(task.done()) + +    def test_run_coroutine_threadsafe_task_cancelled(self): +        """Test coroutine submission from a tread to an event loop +        when the task is cancelled.""" +        callback = lambda: self.target(cancel=True) +        future = self.loop.run_in_executor(None, callback) +        with self.assertRaises(asyncio.CancelledError): +            self.loop.run_until_complete(future) + +  if __name__ == '__main__':      unittest.main() | 
