summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio/tasks.py')
-rw-r--r--Lib/asyncio/tasks.py118
1 files changed, 45 insertions, 73 deletions
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index c37aa41..8852aa5 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -4,7 +4,6 @@ __all__ = ['Task',
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
'wait', 'wait_for', 'as_completed', 'sleep', 'async',
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
- 'timeout',
]
import concurrent.futures
@@ -242,7 +241,7 @@ class Task(futures.Future):
result = coro.throw(exc)
except StopIteration as exc:
self.set_result(exc.value)
- except futures.CancelledError as exc:
+ except futures.CancelledError:
super().cancel() # I.e., Future.cancel(self).
except Exception as exc:
self.set_exception(exc)
@@ -250,7 +249,8 @@ class Task(futures.Future):
self.set_exception(exc)
raise
else:
- if isinstance(result, futures.Future):
+ blocking = getattr(result, '_asyncio_future_blocking', None)
+ if blocking is not None:
# Yielded Future must come from Future.__iter__().
if result._loop is not self._loop:
self._loop.call_soon(
@@ -258,13 +258,20 @@ class Task(futures.Future):
RuntimeError(
'Task {!r} got Future {!r} attached to a '
'different loop'.format(self, result)))
- elif result._blocking:
- result._blocking = False
- result.add_done_callback(self._wakeup)
- self._fut_waiter = result
- if self._must_cancel:
- if self._fut_waiter.cancel():
- self._must_cancel = False
+ elif blocking:
+ if result is self:
+ self._loop.call_soon(
+ self._step,
+ RuntimeError(
+ 'Task cannot await on itself: {!r}'.format(
+ self)))
+ else:
+ result._asyncio_future_blocking = False
+ result.add_done_callback(self._wakeup)
+ self._fut_waiter = result
+ if self._must_cancel:
+ if self._fut_waiter.cancel():
+ self._must_cancel = False
else:
self._loop.call_soon(
self._step,
@@ -333,7 +340,7 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Note: This does not raise TimeoutError! Futures that aren't done
when the timeout occurs are returned in the second set.
"""
- if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
+ if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
@@ -373,7 +380,7 @@ def wait_for(fut, timeout, *, loop=None):
if timeout is None:
return (yield from fut)
- waiter = futures.Future(loop=loop)
+ waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
cb = functools.partial(_release_waiter, waiter)
@@ -401,12 +408,12 @@ def wait_for(fut, timeout, *, loop=None):
@coroutine
def _wait(fs, timeout, return_when, loop):
- """Internal helper for wait() and _wait_for().
+ """Internal helper for wait() and wait_for().
The fs argument must be a collection of Futures.
"""
assert fs, 'Set of Futures is empty.'
- waiter = futures.Future(loop=loop)
+ waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
@@ -462,7 +469,7 @@ def as_completed(fs, *, loop=None, timeout=None):
Note: The futures 'f' are not necessarily members of fs.
"""
- if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
+ if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
loop = loop if loop is not None else events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
@@ -507,7 +514,9 @@ def sleep(delay, result=None, *, loop=None):
yield
return result
- future = futures.Future(loop=loop)
+ if loop is None:
+ loop = events.get_event_loop()
+ future = loop.create_future()
h = future._loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
@@ -517,7 +526,7 @@ def sleep(delay, result=None, *, loop=None):
h.cancel()
-def async(coro_or_future, *, loop=None):
+def async_(coro_or_future, *, loop=None):
"""Wrap a coroutine in a future.
If the argument is a Future, it is returned directly.
@@ -530,13 +539,18 @@ def async(coro_or_future, *, loop=None):
return ensure_future(coro_or_future, loop=loop)
+# Silence DeprecationWarning:
+globals()['async'] = async_
+async_.__name__ = 'async'
+del async_
+
def ensure_future(coro_or_future, *, loop=None):
"""Wrap a coroutine or an awaitable in a future.
If the argument is a Future, it is returned directly.
"""
- if isinstance(coro_or_future, futures.Future):
+ if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future')
return coro_or_future
@@ -578,15 +592,21 @@ class _GatheringFuture(futures.Future):
def cancel(self):
if self.done():
return False
+ ret = False
for child in self._children:
- child.cancel()
- return True
+ if child.cancel():
+ ret = True
+ return ret
def gather(*coros_or_futures, loop=None, return_exceptions=False):
"""Return a future aggregating results from the given coroutines
or futures.
+ Coroutines will be wrapped in a future and scheduled in the event
+ loop. They will not necessarily be scheduled in the same order as
+ passed in.
+
All futures must share the same event loop. If all the tasks are
done successfully, the returned future's result is the list of
results (in the order of the original sequence, not necessarily
@@ -604,13 +624,15 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
be cancelled.)
"""
if not coros_or_futures:
- outer = futures.Future(loop=loop)
+ if loop is None:
+ loop = events.get_event_loop()
+ outer = loop.create_future()
outer.set_result([])
return outer
arg_to_fut = {}
for arg in set(coros_or_futures):
- if not isinstance(arg, futures.Future):
+ if not futures.isfuture(arg):
fut = ensure_future(arg, loop=loop)
if loop is None:
loop = fut._loop
@@ -692,7 +714,7 @@ def shield(arg, *, loop=None):
# Shortcut.
return inner
loop = inner._loop
- outer = futures.Future(loop=loop)
+ outer = loop.create_future()
def _done_callback(inner):
if outer.cancelled():
@@ -733,53 +755,3 @@ def run_coroutine_threadsafe(coro, loop):
loop.call_soon_threadsafe(callback)
return future
-
-
-def timeout(timeout, *, loop=None):
- """A factory which produce a context manager with timeout.
-
- Useful in cases when you want to apply timeout logic around block
- of code or in cases when asyncio.wait_for is not suitable.
-
- For example:
-
- >>> with asyncio.timeout(0.001):
- ... yield from coro()
-
-
- timeout: timeout value in seconds
- loop: asyncio compatible event loop
- """
- if loop is None:
- loop = events.get_event_loop()
- return _Timeout(timeout, loop=loop)
-
-
-class _Timeout:
- def __init__(self, timeout, *, loop):
- self._timeout = timeout
- self._loop = loop
- self._task = None
- self._cancelled = False
- self._cancel_handler = None
-
- def __enter__(self):
- self._task = Task.current_task(loop=self._loop)
- if self._task is None:
- raise RuntimeError('Timeout context manager should be used '
- 'inside a task')
- self._cancel_handler = self._loop.call_later(
- self._timeout, self._cancel_task)
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_type is futures.CancelledError and self._cancelled:
- self._cancel_handler = None
- self._task = None
- raise futures.TimeoutError
- self._cancel_handler.cancel()
- self._cancel_handler = None
- self._task = None
-
- def _cancel_task(self):
- self._cancelled = self._task.cancel()