summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/test_tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asyncio/test_tasks.py')
-rw-r--r--Lib/test/test_asyncio/test_tasks.py67
1 files changed, 67 insertions, 0 deletions
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 16d3d9d..8ec5d9c 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -2100,5 +2100,72 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
self.assertIsInstance(f.exception(), RuntimeError)
+class RunCoroutineThreadsafeTests(test_utils.TestCase):
+ """Test case for futures.submit_to_loop."""
+
+ def setUp(self):
+ self.loop = self.new_test_loop(self.time_gen)
+
+ def time_gen(self):
+ """Handle the timer."""
+ yield 0 # second
+ yield 1 # second
+
+ @asyncio.coroutine
+ def add(self, a, b, fail=False, cancel=False):
+ """Wait 1 second and return a + b."""
+ yield from asyncio.sleep(1, loop=self.loop)
+ if fail:
+ raise RuntimeError("Fail!")
+ if cancel:
+ asyncio.tasks.Task.current_task(self.loop).cancel()
+ yield
+ return a + b
+
+ def target(self, fail=False, cancel=False, timeout=None):
+ """Run add coroutine in the event loop."""
+ coro = self.add(1, 2, fail=fail, cancel=cancel)
+ future = asyncio.run_coroutine_threadsafe(coro, self.loop)
+ try:
+ return future.result(timeout)
+ finally:
+ future.done() or future.cancel()
+
+ def test_run_coroutine_threadsafe(self):
+ """Test coroutine submission from a thread to an event loop."""
+ future = self.loop.run_in_executor(None, self.target)
+ result = self.loop.run_until_complete(future)
+ self.assertEqual(result, 3)
+
+ def test_run_coroutine_threadsafe_with_exception(self):
+ """Test coroutine submission from a thread to an event loop
+ when an exception is raised."""
+ future = self.loop.run_in_executor(None, self.target, True)
+ with self.assertRaises(RuntimeError) as exc_context:
+ self.loop.run_until_complete(future)
+ self.assertIn("Fail!", exc_context.exception.args)
+
+ def test_run_coroutine_threadsafe_with_timeout(self):
+ """Test coroutine submission from a thread to an event loop
+ when a timeout is raised."""
+ callback = lambda: self.target(timeout=0)
+ future = self.loop.run_in_executor(None, callback)
+ with self.assertRaises(asyncio.TimeoutError):
+ self.loop.run_until_complete(future)
+ # Clear the time generator and tasks
+ test_utils.run_briefly(self.loop)
+ # Check that there's no pending task (add has been cancelled)
+ for task in asyncio.Task.all_tasks(self.loop):
+ self.assertTrue(task.done())
+
+ def test_run_coroutine_threadsafe_task_cancelled(self):
+ """Test coroutine submission from a tread to an event loop
+ when the task is cancelled."""
+ callback = lambda: self.target(cancel=True)
+ future = self.loop.run_in_executor(None, callback)
+ with self.assertRaises(asyncio.CancelledError):
+ self.loop.run_until_complete(future)
+
+
if __name__ == '__main__':
unittest.main()