summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2015-10-03 15:31:42 (GMT)
committerGuido van Rossum <guido@python.org>2015-10-03 15:31:42 (GMT)
commit841d9ee41a8ad0a8a372f9b84f0fa40b07bcc66b (patch)
treeb1d15de84428011b0e5865425e096d1dfe37d8e9 /Lib
parent3795d12a0d6f2be8d0a062c3ba878fc4800e2db1 (diff)
downloadcpython-841d9ee41a8ad0a8a372f9b84f0fa40b07bcc66b.zip
cpython-841d9ee41a8ad0a8a372f9b84f0fa40b07bcc66b.tar.gz
cpython-841d9ee41a8ad0a8a372f9b84f0fa40b07bcc66b.tar.bz2
Issue #25304: Add asyncio.run_coroutine_threadsafe(). By Vincent Michel.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/asyncio/futures.py74
-rw-r--r--Lib/asyncio/tasks.py18
-rw-r--r--Lib/test/test_asyncio/test_futures.py2
-rw-r--r--Lib/test/test_asyncio/test_tasks.py67
4 files changed, 142 insertions, 19 deletions
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index dbe06c4..166bc80 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -390,22 +390,64 @@ class Future:
__await__ = __iter__ # make compatible with 'await' expression
-def wrap_future(fut, *, loop=None):
- """Wrap concurrent.futures.Future object."""
- if isinstance(fut, Future):
- return fut
- assert isinstance(fut, concurrent.futures.Future), \
- 'concurrent.futures.Future is expected, got {!r}'.format(fut)
- if loop is None:
- loop = events.get_event_loop()
- new_future = Future(loop=loop)
+def _set_concurrent_future_state(concurrent, source):
+ """Copy state from a future to a concurrent.futures.Future."""
+ assert source.done()
+ if source.cancelled():
+ concurrent.cancel()
+ if not concurrent.set_running_or_notify_cancel():
+ return
+ exception = source.exception()
+ if exception is not None:
+ concurrent.set_exception(exception)
+ else:
+ result = source.result()
+ concurrent.set_result(result)
+
+
+def _chain_future(source, destination):
+ """Chain two futures so that when one completes, so does the other.
+
+ The result (or exception) of source will be copied to destination.
+ If destination is cancelled, source gets cancelled too.
+ Compatible with both asyncio.Future and concurrent.futures.Future.
+ """
+ if not isinstance(source, (Future, concurrent.futures.Future)):
+ raise TypeError('A future is required for source argument')
+ if not isinstance(destination, (Future, concurrent.futures.Future)):
+ raise TypeError('A future is required for destination argument')
+ source_loop = source._loop if isinstance(source, Future) else None
+ dest_loop = destination._loop if isinstance(destination, Future) else None
+
+ def _set_state(future, other):
+ if isinstance(future, Future):
+ future._copy_state(other)
+ else:
+ _set_concurrent_future_state(future, other)
- def _check_cancel_other(f):
- if f.cancelled():
- fut.cancel()
+ def _call_check_cancel(destination):
+ if destination.cancelled():
+ if source_loop is None or source_loop is dest_loop:
+ source.cancel()
+ else:
+ source_loop.call_soon_threadsafe(source.cancel)
- new_future.add_done_callback(_check_cancel_other)
- fut.add_done_callback(
- lambda future: loop.call_soon_threadsafe(
- new_future._copy_state, future))
+ def _call_set_state(source):
+ if dest_loop is None or dest_loop is source_loop:
+ _set_state(destination, source)
+ else:
+ dest_loop.call_soon_threadsafe(_set_state, destination, source)
+
+ destination.add_done_callback(_call_check_cancel)
+ source.add_done_callback(_call_set_state)
+
+
+def wrap_future(future, *, loop=None):
+ """Wrap concurrent.futures.Future object."""
+ if isinstance(future, Future):
+ return future
+ assert isinstance(future, concurrent.futures.Future), \
+ 'concurrent.futures.Future is expected, got {!r}'.format(future)
+ new_future = Future(loop=loop)
+ _chain_future(future, new_future)
return new_future
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 434f498..5a7bd9d 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -3,7 +3,7 @@
__all__ = ['Task',
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
'wait', 'wait_for', 'as_completed', 'sleep', 'async',
- 'gather', 'shield', 'ensure_future',
+ 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
]
import concurrent.futures
@@ -692,3 +692,19 @@ def shield(arg, *, loop=None):
inner.add_done_callback(_done_callback)
return outer
+
+
+def run_coroutine_threadsafe(coro, loop):
+ """Submit a coroutine object to a given event loop.
+
+ Return a concurrent.futures.Future to access the result.
+ """
+ if not coroutines.iscoroutine(coro):
+ raise TypeError('A coroutine object is required')
+ future = concurrent.futures.Future()
+
+ def callback():
+ futures._chain_future(ensure_future(coro, loop=loop), future)
+
+ loop.call_soon_threadsafe(callback)
+ return future
diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py
index c8b6829..0bc0581 100644
--- a/Lib/test/test_asyncio/test_futures.py
+++ b/Lib/test/test_asyncio/test_futures.py
@@ -174,8 +174,6 @@ class FutureTests(test_utils.TestCase):
'<Future cancelled>')
def test_copy_state(self):
- # Test the internal _copy_state method since it's being directly
- # invoked in other modules.
f = asyncio.Future(loop=self.loop)
f.set_result(10)
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 16d3d9d..8ec5d9c 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -2100,5 +2100,72 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
self.assertIsInstance(f.exception(), RuntimeError)
+class RunCoroutineThreadsafeTests(test_utils.TestCase):
+ """Test case for futures.submit_to_loop."""
+
+ def setUp(self):
+ self.loop = self.new_test_loop(self.time_gen)
+
+ def time_gen(self):
+ """Handle the timer."""
+ yield 0 # second
+ yield 1 # second
+
+ @asyncio.coroutine
+ def add(self, a, b, fail=False, cancel=False):
+ """Wait 1 second and return a + b."""
+ yield from asyncio.sleep(1, loop=self.loop)
+ if fail:
+ raise RuntimeError("Fail!")
+ if cancel:
+ asyncio.tasks.Task.current_task(self.loop).cancel()
+ yield
+ return a + b
+
+ def target(self, fail=False, cancel=False, timeout=None):
+ """Run add coroutine in the event loop."""
+ coro = self.add(1, 2, fail=fail, cancel=cancel)
+ future = asyncio.run_coroutine_threadsafe(coro, self.loop)
+ try:
+ return future.result(timeout)
+ finally:
+ future.done() or future.cancel()
+
+ def test_run_coroutine_threadsafe(self):
+ """Test coroutine submission from a thread to an event loop."""
+ future = self.loop.run_in_executor(None, self.target)
+ result = self.loop.run_until_complete(future)
+ self.assertEqual(result, 3)
+
+ def test_run_coroutine_threadsafe_with_exception(self):
+ """Test coroutine submission from a thread to an event loop
+ when an exception is raised."""
+ future = self.loop.run_in_executor(None, self.target, True)
+ with self.assertRaises(RuntimeError) as exc_context:
+ self.loop.run_until_complete(future)
+ self.assertIn("Fail!", exc_context.exception.args)
+
+ def test_run_coroutine_threadsafe_with_timeout(self):
+ """Test coroutine submission from a thread to an event loop
+ when a timeout is raised."""
+ callback = lambda: self.target(timeout=0)
+ future = self.loop.run_in_executor(None, callback)
+ with self.assertRaises(asyncio.TimeoutError):
+ self.loop.run_until_complete(future)
+ # Clear the time generator and tasks
+ test_utils.run_briefly(self.loop)
+ # Check that there's no pending task (add has been cancelled)
+ for task in asyncio.Task.all_tasks(self.loop):
+ self.assertTrue(task.done())
+
+ def test_run_coroutine_threadsafe_task_cancelled(self):
+ """Test coroutine submission from a tread to an event loop
+ when the task is cancelled."""
+ callback = lambda: self.target(cancel=True)
+ future = self.loop.run_in_executor(None, callback)
+ with self.assertRaises(asyncio.CancelledError):
+ self.loop.run_until_complete(future)
+
+
if __name__ == '__main__':
unittest.main()