summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio/tasks.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio/tasks.py')
-rw-r--r--Lib/asyncio/tasks.py636
1 files changed, 636 insertions, 0 deletions
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
new file mode 100644
index 0000000..2c8579f
--- /dev/null
+++ b/Lib/asyncio/tasks.py
@@ -0,0 +1,636 @@
+"""Support for tasks, coroutines and the scheduler."""
+
+__all__ = ['coroutine', 'Task',
+ 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
+ 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
+ 'gather',
+ ]
+
+import collections
+import concurrent.futures
+import functools
+import inspect
+import linecache
+import traceback
+import weakref
+
+from . import events
+from . import futures
+from .log import asyncio_log
+
+# If you set _DEBUG to true, @coroutine will wrap the resulting
+# generator objects in a CoroWrapper instance (defined below). That
+# instance will log a message when the generator is never iterated
+# over, which may happen when you forget to use "yield from" with a
+# coroutine call. Note that the value of the _DEBUG flag is taken
+# when the decorator is used, so to be of any use it must be set
+# before you define your coroutines. A downside of using this feature
+# is that tracebacks show entries for the CoroWrapper.__next__ method
+# when _DEBUG is true.
+_DEBUG = False
+
+
+class CoroWrapper:
+ """Wrapper for coroutine in _DEBUG mode."""
+
+ __slot__ = ['gen', 'func']
+
+ def __init__(self, gen, func):
+ assert inspect.isgenerator(gen), gen
+ self.gen = gen
+ self.func = func
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return next(self.gen)
+
+ def send(self, value):
+ return self.gen.send(value)
+
+ def throw(self, exc):
+ return self.gen.throw(exc)
+
+ def close(self):
+ return self.gen.close()
+
+ def __del__(self):
+ frame = self.gen.gi_frame
+ if frame is not None and frame.f_lasti == -1:
+ func = self.func
+ code = func.__code__
+ filename = code.co_filename
+ lineno = code.co_firstlineno
+ asyncio_log.error('Coroutine %r defined at %s:%s was never yielded from',
+ func.__name__, filename, lineno)
+
+
+def coroutine(func):
+ """Decorator to mark coroutines.
+
+ If the coroutine is not yielded from before it is destroyed,
+ an error message is logged.
+ """
+ if inspect.isgeneratorfunction(func):
+ coro = func
+ else:
+ @functools.wraps(func)
+ def coro(*args, **kw):
+ res = func(*args, **kw)
+ if isinstance(res, futures.Future) or inspect.isgenerator(res):
+ res = yield from res
+ return res
+
+ if not _DEBUG:
+ wrapper = coro
+ else:
+ @functools.wraps(func)
+ def wrapper(*args, **kwds):
+ w = CoroWrapper(coro(*args, **kwds), func)
+ w.__name__ = coro.__name__
+ w.__doc__ = coro.__doc__
+ return w
+
+ wrapper._is_coroutine = True # For iscoroutinefunction().
+ return wrapper
+
+
+def iscoroutinefunction(func):
+ """Return True if func is a decorated coroutine function."""
+ return getattr(func, '_is_coroutine', False)
+
+
+def iscoroutine(obj):
+ """Return True if obj is a coroutine object."""
+ return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
+
+
+class Task(futures.Future):
+ """A coroutine wrapped in a Future."""
+
+ # An important invariant maintained while a Task not done:
+ #
+ # - Either _fut_waiter is None, and _step() is scheduled;
+ # - or _fut_waiter is some Future, and _step() is *not* scheduled.
+ #
+ # The only transition from the latter to the former is through
+ # _wakeup(). When _fut_waiter is not None, one of its callbacks
+ # must be _wakeup().
+
+ # Weak set containing all tasks alive.
+ _all_tasks = weakref.WeakSet()
+
+ @classmethod
+ def all_tasks(cls, loop=None):
+ """Return a set of all tasks for an event loop.
+
+ By default all tasks for the current event loop are returned.
+ """
+ if loop is None:
+ loop = events.get_event_loop()
+ return {t for t in cls._all_tasks if t._loop is loop}
+
+ def __init__(self, coro, *, loop=None):
+ assert iscoroutine(coro), repr(coro) # Not a coroutine function!
+ super().__init__(loop=loop)
+ self._coro = iter(coro) # Use the iterator just in case.
+ self._fut_waiter = None
+ self._must_cancel = False
+ self._loop.call_soon(self._step)
+ self.__class__._all_tasks.add(self)
+
+ def __repr__(self):
+ res = super().__repr__()
+ if (self._must_cancel and
+ self._state == futures._PENDING and
+ '<PENDING' in res):
+ res = res.replace('<PENDING', '<CANCELLING', 1)
+ i = res.find('<')
+ if i < 0:
+ i = len(res)
+ res = res[:i] + '(<{}>)'.format(self._coro.__name__) + res[i:]
+ return res
+
+ def get_stack(self, *, limit=None):
+ """Return the list of stack frames for this task's coroutine.
+
+ If the coroutine is active, this returns the stack where it is
+ suspended. If the coroutine has completed successfully or was
+ cancelled, this returns an empty list. If the coroutine was
+ terminated by an exception, this returns the list of traceback
+ frames.
+
+ The frames are always ordered from oldest to newest.
+
+ The optional limit gives the maximum nummber of frames to
+ return; by default all available frames are returned. Its
+ meaning differs depending on whether a stack or a traceback is
+ returned: the newest frames of a stack are returned, but the
+ oldest frames of a traceback are returned. (This matches the
+ behavior of the traceback module.)
+
+ For reasons beyond our control, only one stack frame is
+ returned for a suspended coroutine.
+ """
+ frames = []
+ f = self._coro.gi_frame
+ if f is not None:
+ while f is not None:
+ if limit is not None:
+ if limit <= 0:
+ break
+ limit -= 1
+ frames.append(f)
+ f = f.f_back
+ frames.reverse()
+ elif self._exception is not None:
+ tb = self._exception.__traceback__
+ while tb is not None:
+ if limit is not None:
+ if limit <= 0:
+ break
+ limit -= 1
+ frames.append(tb.tb_frame)
+ tb = tb.tb_next
+ return frames
+
+ def print_stack(self, *, limit=None, file=None):
+ """Print the stack or traceback for this task's coroutine.
+
+ This produces output similar to that of the traceback module,
+ for the frames retrieved by get_stack(). The limit argument
+ is passed to get_stack(). The file argument is an I/O stream
+ to which the output goes; by default it goes to sys.stderr.
+ """
+ extracted_list = []
+ checked = set()
+ for f in self.get_stack(limit=limit):
+ lineno = f.f_lineno
+ co = f.f_code
+ filename = co.co_filename
+ name = co.co_name
+ if filename not in checked:
+ checked.add(filename)
+ linecache.checkcache(filename)
+ line = linecache.getline(filename, lineno, f.f_globals)
+ extracted_list.append((filename, lineno, name, line))
+ exc = self._exception
+ if not extracted_list:
+ print('No stack for %r' % self, file=file)
+ elif exc is not None:
+ print('Traceback for %r (most recent call last):' % self,
+ file=file)
+ else:
+ print('Stack for %r (most recent call last):' % self,
+ file=file)
+ traceback.print_list(extracted_list, file=file)
+ if exc is not None:
+ for line in traceback.format_exception_only(exc.__class__, exc):
+ print(line, file=file, end='')
+
+ def cancel(self):
+ if self.done():
+ return False
+ if self._fut_waiter is not None:
+ if self._fut_waiter.cancel():
+ # Leave self._fut_waiter; it may be a Task that
+ # catches and ignores the cancellation so we may have
+ # to cancel it again later.
+ return True
+ # It must be the case that self._step is already scheduled.
+ self._must_cancel = True
+ return True
+
+ def _step(self, value=None, exc=None):
+ assert not self.done(), \
+ '_step(): already done: {!r}, {!r}, {!r}'.format(self, value, exc)
+ if self._must_cancel:
+ if not isinstance(exc, futures.CancelledError):
+ exc = futures.CancelledError()
+ self._must_cancel = False
+ coro = self._coro
+ self._fut_waiter = None
+ # Call either coro.throw(exc) or coro.send(value).
+ try:
+ if exc is not None:
+ result = coro.throw(exc)
+ elif value is not None:
+ result = coro.send(value)
+ else:
+ result = next(coro)
+ except StopIteration as exc:
+ self.set_result(exc.value)
+ except futures.CancelledError as exc:
+ super().cancel() # I.e., Future.cancel(self).
+ except Exception as exc:
+ self.set_exception(exc)
+ except BaseException as exc:
+ self.set_exception(exc)
+ raise
+ else:
+ if isinstance(result, futures.Future):
+ # Yielded Future must come from Future.__iter__().
+ if result._blocking:
+ result._blocking = False
+ result.add_done_callback(self._wakeup)
+ self._fut_waiter = result
+ if self._must_cancel:
+ if self._fut_waiter.cancel():
+ self._must_cancel = False
+ else:
+ self._loop.call_soon(
+ self._step, None,
+ RuntimeError(
+ 'yield was used instead of yield from '
+ 'in task {!r} with {!r}'.format(self, result)))
+ elif result is None:
+ # Bare yield relinquishes control for one event loop iteration.
+ self._loop.call_soon(self._step)
+ elif inspect.isgenerator(result):
+ # Yielding a generator is just wrong.
+ self._loop.call_soon(
+ self._step, None,
+ RuntimeError(
+ 'yield was used instead of yield from for '
+ 'generator in task {!r} with {}'.format(
+ self, result)))
+ else:
+ # Yielding something else is an error.
+ self._loop.call_soon(
+ self._step, None,
+ RuntimeError(
+ 'Task got bad yield: {!r}'.format(result)))
+ self = None
+
+ def _wakeup(self, future):
+ try:
+ value = future.result()
+ except Exception as exc:
+ # This may also be a cancellation.
+ self._step(None, exc)
+ else:
+ self._step(value, None)
+ self = None # Needed to break cycles when an exception occurs.
+
+
+# wait() and as_completed() similar to those in PEP 3148.
+
+FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
+FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
+ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
+
+
+@coroutine
+def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
+ """Wait for the Futures and coroutines given by fs to complete.
+
+ Coroutines will be wrapped in Tasks.
+
+ Returns two sets of Future: (done, pending).
+
+ Usage:
+
+ done, pending = yield from asyncio.wait(fs)
+
+ Note: This does not raise TimeoutError! Futures that aren't done
+ when the timeout occurs are returned in the second set.
+ """
+ if not fs:
+ raise ValueError('Set of coroutines/Futures is empty.')
+
+ if loop is None:
+ loop = events.get_event_loop()
+
+ fs = set(async(f, loop=loop) for f in fs)
+
+ if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
+ raise ValueError('Invalid return_when value: {}'.format(return_when))
+ return (yield from _wait(fs, timeout, return_when, loop))
+
+
+def _release_waiter(waiter, value=True, *args):
+ if not waiter.done():
+ waiter.set_result(value)
+
+
+@coroutine
+def wait_for(fut, timeout, *, loop=None):
+ """Wait for the single Future or coroutine to complete, with timeout.
+
+ Coroutine will be wrapped in Task.
+
+ Returns result of the Future or coroutine. Raises TimeoutError when
+ timeout occurs.
+
+ Usage:
+
+ result = yield from asyncio.wait_for(fut, 10.0)
+
+ """
+ if loop is None:
+ loop = events.get_event_loop()
+
+ waiter = futures.Future(loop=loop)
+ timeout_handle = loop.call_later(timeout, _release_waiter, waiter, False)
+ cb = functools.partial(_release_waiter, waiter, True)
+
+ fut = async(fut, loop=loop)
+ fut.add_done_callback(cb)
+
+ try:
+ if (yield from waiter):
+ return fut.result()
+ else:
+ fut.remove_done_callback(cb)
+ raise futures.TimeoutError()
+ finally:
+ timeout_handle.cancel()
+
+
+@coroutine
+def _wait(fs, timeout, return_when, loop):
+ """Internal helper for wait() and _wait_for().
+
+ The fs argument must be a collection of Futures.
+ """
+ assert fs, 'Set of Futures is empty.'
+ waiter = futures.Future(loop=loop)
+ timeout_handle = None
+ if timeout is not None:
+ timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
+ counter = len(fs)
+
+ def _on_completion(f):
+ nonlocal counter
+ counter -= 1
+ if (counter <= 0 or
+ return_when == FIRST_COMPLETED or
+ return_when == FIRST_EXCEPTION and (not f.cancelled() and
+ f.exception() is not None)):
+ if timeout_handle is not None:
+ timeout_handle.cancel()
+ if not waiter.done():
+ waiter.set_result(False)
+
+ for f in fs:
+ f.add_done_callback(_on_completion)
+
+ try:
+ yield from waiter
+ finally:
+ if timeout_handle is not None:
+ timeout_handle.cancel()
+
+ done, pending = set(), set()
+ for f in fs:
+ f.remove_done_callback(_on_completion)
+ if f.done():
+ done.add(f)
+ else:
+ pending.add(f)
+ return done, pending
+
+
+# This is *not* a @coroutine! It is just an iterator (yielding Futures).
+def as_completed(fs, *, loop=None, timeout=None):
+ """Return an iterator whose values, when waited for, are Futures.
+
+ This differs from PEP 3148; the proper way to use this is:
+
+ for f in as_completed(fs):
+ result = yield from f # The 'yield from' may raise.
+ # Use result.
+
+ Raises TimeoutError if the timeout occurs before all Futures are
+ done.
+
+ Note: The futures 'f' are not necessarily members of fs.
+ """
+ loop = loop if loop is not None else events.get_event_loop()
+ deadline = None if timeout is None else loop.time() + timeout
+ todo = set(async(f, loop=loop) for f in fs)
+ completed = collections.deque()
+
+ @coroutine
+ def _wait_for_one():
+ while not completed:
+ timeout = None
+ if deadline is not None:
+ timeout = deadline - loop.time()
+ if timeout < 0:
+ raise futures.TimeoutError()
+ done, pending = yield from _wait(
+ todo, timeout, FIRST_COMPLETED, loop)
+ # Multiple callers might be waiting for the same events
+ # and getting the same outcome. Dedupe by updating todo.
+ for f in done:
+ if f in todo:
+ todo.remove(f)
+ completed.append(f)
+ f = completed.popleft()
+ return f.result() # May raise.
+
+ for _ in range(len(todo)):
+ yield _wait_for_one()
+
+
+@coroutine
+def sleep(delay, result=None, *, loop=None):
+ """Coroutine that completes after a given time (in seconds)."""
+ future = futures.Future(loop=loop)
+ h = future._loop.call_later(delay, future.set_result, result)
+ try:
+ return (yield from future)
+ finally:
+ h.cancel()
+
+
+def async(coro_or_future, *, loop=None):
+ """Wrap a coroutine in a future.
+
+ If the argument is a Future, it is returned directly.
+ """
+ if isinstance(coro_or_future, futures.Future):
+ if loop is not None and loop is not coro_or_future._loop:
+ raise ValueError('loop argument must agree with Future')
+ return coro_or_future
+ elif iscoroutine(coro_or_future):
+ return Task(coro_or_future, loop=loop)
+ else:
+ raise TypeError('A Future or coroutine is required')
+
+
+class _GatheringFuture(futures.Future):
+ """Helper for gather().
+
+ This overrides cancel() to cancel all the children and act more
+ like Task.cancel(), which doesn't immediately mark itself as
+ cancelled.
+ """
+
+ def __init__(self, children, *, loop=None):
+ super().__init__(loop=loop)
+ self._children = children
+
+ def cancel(self):
+ if self.done():
+ return False
+ for child in self._children:
+ child.cancel()
+ return True
+
+
+def gather(*coros_or_futures, loop=None, return_exceptions=False):
+ """Return a future aggregating results from the given coroutines
+ or futures.
+
+ All futures must share the same event loop. If all the tasks are
+ done successfully, the returned future's result is the list of
+ results (in the order of the original sequence, not necessarily
+ the order of results arrival). If *result_exception* is True,
+ exceptions in the tasks are treated the same as successful
+ results, and gathered in the result list; otherwise, the first
+ raised exception will be immediately propagated to the returned
+ future.
+
+ Cancellation: if the outer Future is cancelled, all children (that
+ have not completed yet) are also cancelled. If any child is
+ cancelled, this is treated as if it raised CancelledError --
+ the outer Future is *not* cancelled in this case. (This is to
+ prevent the cancellation of one child to cause other children to
+ be cancelled.)
+ """
+ children = [async(fut, loop=loop) for fut in coros_or_futures]
+ n = len(children)
+ if n == 0:
+ outer = futures.Future(loop=loop)
+ outer.set_result([])
+ return outer
+ if loop is None:
+ loop = children[0]._loop
+ for fut in children:
+ if fut._loop is not loop:
+ raise ValueError("futures are tied to different event loops")
+ outer = _GatheringFuture(children, loop=loop)
+ nfinished = 0
+ results = [None] * n
+
+ def _done_callback(i, fut):
+ nonlocal nfinished
+ if outer._state != futures._PENDING:
+ if fut._exception is not None:
+ # Mark exception retrieved.
+ fut.exception()
+ return
+ if fut._state == futures._CANCELLED:
+ res = futures.CancelledError()
+ if not return_exceptions:
+ outer.set_exception(res)
+ return
+ elif fut._exception is not None:
+ res = fut.exception() # Mark exception retrieved.
+ if not return_exceptions:
+ outer.set_exception(res)
+ return
+ else:
+ res = fut._result
+ results[i] = res
+ nfinished += 1
+ if nfinished == n:
+ outer.set_result(results)
+
+ for i, fut in enumerate(children):
+ fut.add_done_callback(functools.partial(_done_callback, i))
+ return outer
+
+
+def shield(arg, *, loop=None):
+ """Wait for a future, shielding it from cancellation.
+
+ The statement
+
+ res = yield from shield(something())
+
+ is exactly equivalent to the statement
+
+ res = yield from something()
+
+ *except* that if the coroutine containing it is cancelled, the
+ task running in something() is not cancelled. From the POV of
+ something(), the cancellation did not happen. But its caller is
+ still cancelled, so the yield-from expression still raises
+ CancelledError. Note: If something() is cancelled by other means
+ this will still cancel shield().
+
+ If you want to completely ignore cancellation (not recommended)
+ you can combine shield() with a try/except clause, as follows:
+
+ try:
+ res = yield from shield(something())
+ except CancelledError:
+ res = None
+ """
+ inner = async(arg, loop=loop)
+ if inner.done():
+ # Shortcut.
+ return inner
+ loop = inner._loop
+ outer = futures.Future(loop=loop)
+
+ def _done_callback(inner):
+ if outer.cancelled():
+ # Mark inner's result as retrieved.
+ inner.cancelled() or inner.exception()
+ return
+ if inner.cancelled():
+ outer.cancel()
+ else:
+ exc = inner.exception()
+ if exc is not None:
+ outer.set_exception(exc)
+ else:
+ outer.set_result(inner.result())
+
+ inner.add_done_callback(_done_callback)
+ return outer