diff options
Diffstat (limited to 'Lib/asyncio/futures.py')
-rw-r--r-- | Lib/asyncio/futures.py | 74 |
1 files changed, 58 insertions, 16 deletions
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index dbe06c4..166bc80 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -390,22 +390,64 @@ class Future: __await__ = __iter__ # make compatible with 'await' expression -def wrap_future(fut, *, loop=None): - """Wrap concurrent.futures.Future object.""" - if isinstance(fut, Future): - return fut - assert isinstance(fut, concurrent.futures.Future), \ - 'concurrent.futures.Future is expected, got {!r}'.format(fut) - if loop is None: - loop = events.get_event_loop() - new_future = Future(loop=loop) +def _set_concurrent_future_state(concurrent, source): + """Copy state from a future to a concurrent.futures.Future.""" + assert source.done() + if source.cancelled(): + concurrent.cancel() + if not concurrent.set_running_or_notify_cancel(): + return + exception = source.exception() + if exception is not None: + concurrent.set_exception(exception) + else: + result = source.result() + concurrent.set_result(result) + + +def _chain_future(source, destination): + """Chain two futures so that when one completes, so does the other. + + The result (or exception) of source will be copied to destination. + If destination is cancelled, source gets cancelled too. + Compatible with both asyncio.Future and concurrent.futures.Future. + """ + if not isinstance(source, (Future, concurrent.futures.Future)): + raise TypeError('A future is required for source argument') + if not isinstance(destination, (Future, concurrent.futures.Future)): + raise TypeError('A future is required for destination argument') + source_loop = source._loop if isinstance(source, Future) else None + dest_loop = destination._loop if isinstance(destination, Future) else None + + def _set_state(future, other): + if isinstance(future, Future): + future._copy_state(other) + else: + _set_concurrent_future_state(future, other) - def _check_cancel_other(f): - if f.cancelled(): - fut.cancel() + def _call_check_cancel(destination): + if destination.cancelled(): + if source_loop is None or source_loop is dest_loop: + source.cancel() + else: + source_loop.call_soon_threadsafe(source.cancel) - new_future.add_done_callback(_check_cancel_other) - fut.add_done_callback( - lambda future: loop.call_soon_threadsafe( - new_future._copy_state, future)) + def _call_set_state(source): + if dest_loop is None or dest_loop is source_loop: + _set_state(destination, source) + else: + dest_loop.call_soon_threadsafe(_set_state, destination, source) + + destination.add_done_callback(_call_check_cancel) + source.add_done_callback(_call_set_state) + + +def wrap_future(future, *, loop=None): + """Wrap concurrent.futures.Future object.""" + if isinstance(future, Future): + return future + assert isinstance(future, concurrent.futures.Future), \ + 'concurrent.futures.Future is expected, got {!r}'.format(future) + new_future = Future(loop=loop) + _chain_future(future, new_future) return new_future |