diff options
author | Guido van Rossum <guido@python.org> | 2015-10-03 15:31:42 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2015-10-03 15:31:42 (GMT) |
commit | 841d9ee41a8ad0a8a372f9b84f0fa40b07bcc66b (patch) | |
tree | b1d15de84428011b0e5865425e096d1dfe37d8e9 /Lib/test/test_asyncio | |
parent | 3795d12a0d6f2be8d0a062c3ba878fc4800e2db1 (diff) | |
download | cpython-841d9ee41a8ad0a8a372f9b84f0fa40b07bcc66b.zip cpython-841d9ee41a8ad0a8a372f9b84f0fa40b07bcc66b.tar.gz cpython-841d9ee41a8ad0a8a372f9b84f0fa40b07bcc66b.tar.bz2 |
Issue #25304: Add asyncio.run_coroutine_threadsafe(). By Vincent Michel.
Diffstat (limited to 'Lib/test/test_asyncio')
-rw-r--r-- | Lib/test/test_asyncio/test_futures.py | 2 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_tasks.py | 67 |
2 files changed, 67 insertions, 2 deletions
diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py index c8b6829..0bc0581 100644 --- a/Lib/test/test_asyncio/test_futures.py +++ b/Lib/test/test_asyncio/test_futures.py @@ -174,8 +174,6 @@ class FutureTests(test_utils.TestCase): '<Future cancelled>') def test_copy_state(self): - # Test the internal _copy_state method since it's being directly - # invoked in other modules. f = asyncio.Future(loop=self.loop) f.set_result(10) diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 16d3d9d..8ec5d9c 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -2100,5 +2100,72 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): self.assertIsInstance(f.exception(), RuntimeError) +class RunCoroutineThreadsafeTests(test_utils.TestCase): + """Test case for futures.submit_to_loop.""" + + def setUp(self): + self.loop = self.new_test_loop(self.time_gen) + + def time_gen(self): + """Handle the timer.""" + yield 0 # second + yield 1 # second + + @asyncio.coroutine + def add(self, a, b, fail=False, cancel=False): + """Wait 1 second and return a + b.""" + yield from asyncio.sleep(1, loop=self.loop) + if fail: + raise RuntimeError("Fail!") + if cancel: + asyncio.tasks.Task.current_task(self.loop).cancel() + yield + return a + b + + def target(self, fail=False, cancel=False, timeout=None): + """Run add coroutine in the event loop.""" + coro = self.add(1, 2, fail=fail, cancel=cancel) + future = asyncio.run_coroutine_threadsafe(coro, self.loop) + try: + return future.result(timeout) + finally: + future.done() or future.cancel() + + def test_run_coroutine_threadsafe(self): + """Test coroutine submission from a thread to an event loop.""" + future = self.loop.run_in_executor(None, self.target) + result = self.loop.run_until_complete(future) + self.assertEqual(result, 3) + + def test_run_coroutine_threadsafe_with_exception(self): + """Test coroutine submission from a thread to an event loop + when an exception is raised.""" + future = self.loop.run_in_executor(None, self.target, True) + with self.assertRaises(RuntimeError) as exc_context: + self.loop.run_until_complete(future) + self.assertIn("Fail!", exc_context.exception.args) + + def test_run_coroutine_threadsafe_with_timeout(self): + """Test coroutine submission from a thread to an event loop + when a timeout is raised.""" + callback = lambda: self.target(timeout=0) + future = self.loop.run_in_executor(None, callback) + with self.assertRaises(asyncio.TimeoutError): + self.loop.run_until_complete(future) + # Clear the time generator and tasks + test_utils.run_briefly(self.loop) + # Check that there's no pending task (add has been cancelled) + for task in asyncio.Task.all_tasks(self.loop): + self.assertTrue(task.done()) + + def test_run_coroutine_threadsafe_task_cancelled(self): + """Test coroutine submission from a tread to an event loop + when the task is cancelled.""" + callback = lambda: self.target(cancel=True) + future = self.loop.run_in_executor(None, callback) + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(future) + + if __name__ == '__main__': unittest.main() |