diff options
Diffstat (limited to 'Lib/asyncio/tasks.py')
| -rw-r--r-- | Lib/asyncio/tasks.py | 785 | 
1 files changed, 785 insertions, 0 deletions
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py new file mode 100644 index 0000000..c37aa41 --- /dev/null +++ b/Lib/asyncio/tasks.py @@ -0,0 +1,785 @@ +"""Support for tasks, coroutines and the scheduler.""" + +__all__ = ['Task', +           'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', +           'wait', 'wait_for', 'as_completed', 'sleep', 'async', +           'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', +           'timeout', +           ] + +import concurrent.futures +import functools +import inspect +import linecache +import traceback +import warnings +import weakref + +from . import compat +from . import coroutines +from . import events +from . import futures +from .coroutines import coroutine + + +class Task(futures.Future): +    """A coroutine wrapped in a Future.""" + +    # An important invariant maintained while a Task not done: +    # +    # - Either _fut_waiter is None, and _step() is scheduled; +    # - or _fut_waiter is some Future, and _step() is *not* scheduled. +    # +    # The only transition from the latter to the former is through +    # _wakeup().  When _fut_waiter is not None, one of its callbacks +    # must be _wakeup(). + +    # Weak set containing all tasks alive. +    _all_tasks = weakref.WeakSet() + +    # Dictionary containing tasks that are currently active in +    # all running event loops.  {EventLoop: Task} +    _current_tasks = {} + +    # If False, don't log a message if the task is destroyed whereas its +    # status is still pending +    _log_destroy_pending = True + +    @classmethod +    def current_task(cls, loop=None): +        """Return the currently running task in an event loop or None. + +        By default the current task for the current event loop is returned. + +        None is returned when called not in the context of a Task. +        """ +        if loop is None: +            loop = events.get_event_loop() +        return cls._current_tasks.get(loop) + +    @classmethod +    def all_tasks(cls, loop=None): +        """Return a set of all tasks for an event loop. + +        By default all tasks for the current event loop are returned. +        """ +        if loop is None: +            loop = events.get_event_loop() +        return {t for t in cls._all_tasks if t._loop is loop} + +    def __init__(self, coro, *, loop=None): +        assert coroutines.iscoroutine(coro), repr(coro) +        super().__init__(loop=loop) +        if self._source_traceback: +            del self._source_traceback[-1] +        self._coro = coro +        self._fut_waiter = None +        self._must_cancel = False +        self._loop.call_soon(self._step) +        self.__class__._all_tasks.add(self) + +    # On Python 3.3 or older, objects with a destructor that are part of a +    # reference cycle are never destroyed. That's not the case any more on +    # Python 3.4 thanks to the PEP 442. +    if compat.PY34: +        def __del__(self): +            if self._state == futures._PENDING and self._log_destroy_pending: +                context = { +                    'task': self, +                    'message': 'Task was destroyed but it is pending!', +                } +                if self._source_traceback: +                    context['source_traceback'] = self._source_traceback +                self._loop.call_exception_handler(context) +            futures.Future.__del__(self) + +    def _repr_info(self): +        info = super()._repr_info() + +        if self._must_cancel: +            # replace status +            info[0] = 'cancelling' + +        coro = coroutines._format_coroutine(self._coro) +        info.insert(1, 'coro=<%s>' % coro) + +        if self._fut_waiter is not None: +            info.insert(2, 'wait_for=%r' % self._fut_waiter) +        return info + +    def get_stack(self, *, limit=None): +        """Return the list of stack frames for this task's coroutine. + +        If the coroutine is not done, this returns the stack where it is +        suspended.  If the coroutine has completed successfully or was +        cancelled, this returns an empty list.  If the coroutine was +        terminated by an exception, this returns the list of traceback +        frames. + +        The frames are always ordered from oldest to newest. + +        The optional limit gives the maximum number of frames to +        return; by default all available frames are returned.  Its +        meaning differs depending on whether a stack or a traceback is +        returned: the newest frames of a stack are returned, but the +        oldest frames of a traceback are returned.  (This matches the +        behavior of the traceback module.) + +        For reasons beyond our control, only one stack frame is +        returned for a suspended coroutine. +        """ +        frames = [] +        try: +            # 'async def' coroutines +            f = self._coro.cr_frame +        except AttributeError: +            f = self._coro.gi_frame +        if f is not None: +            while f is not None: +                if limit is not None: +                    if limit <= 0: +                        break +                    limit -= 1 +                frames.append(f) +                f = f.f_back +            frames.reverse() +        elif self._exception is not None: +            tb = self._exception.__traceback__ +            while tb is not None: +                if limit is not None: +                    if limit <= 0: +                        break +                    limit -= 1 +                frames.append(tb.tb_frame) +                tb = tb.tb_next +        return frames + +    def print_stack(self, *, limit=None, file=None): +        """Print the stack or traceback for this task's coroutine. + +        This produces output similar to that of the traceback module, +        for the frames retrieved by get_stack().  The limit argument +        is passed to get_stack().  The file argument is an I/O stream +        to which the output is written; by default output is written +        to sys.stderr. +        """ +        extracted_list = [] +        checked = set() +        for f in self.get_stack(limit=limit): +            lineno = f.f_lineno +            co = f.f_code +            filename = co.co_filename +            name = co.co_name +            if filename not in checked: +                checked.add(filename) +                linecache.checkcache(filename) +            line = linecache.getline(filename, lineno, f.f_globals) +            extracted_list.append((filename, lineno, name, line)) +        exc = self._exception +        if not extracted_list: +            print('No stack for %r' % self, file=file) +        elif exc is not None: +            print('Traceback for %r (most recent call last):' % self, +                  file=file) +        else: +            print('Stack for %r (most recent call last):' % self, +                  file=file) +        traceback.print_list(extracted_list, file=file) +        if exc is not None: +            for line in traceback.format_exception_only(exc.__class__, exc): +                print(line, file=file, end='') + +    def cancel(self): +        """Request that this task cancel itself. + +        This arranges for a CancelledError to be thrown into the +        wrapped coroutine on the next cycle through the event loop. +        The coroutine then has a chance to clean up or even deny +        the request using try/except/finally. + +        Unlike Future.cancel, this does not guarantee that the +        task will be cancelled: the exception might be caught and +        acted upon, delaying cancellation of the task or preventing +        cancellation completely.  The task may also return a value or +        raise a different exception. + +        Immediately after this method is called, Task.cancelled() will +        not return True (unless the task was already cancelled).  A +        task will be marked as cancelled when the wrapped coroutine +        terminates with a CancelledError exception (even if cancel() +        was not called). +        """ +        if self.done(): +            return False +        if self._fut_waiter is not None: +            if self._fut_waiter.cancel(): +                # Leave self._fut_waiter; it may be a Task that +                # catches and ignores the cancellation so we may have +                # to cancel it again later. +                return True +        # It must be the case that self._step is already scheduled. +        self._must_cancel = True +        return True + +    def _step(self, exc=None): +        assert not self.done(), \ +            '_step(): already done: {!r}, {!r}'.format(self, exc) +        if self._must_cancel: +            if not isinstance(exc, futures.CancelledError): +                exc = futures.CancelledError() +            self._must_cancel = False +        coro = self._coro +        self._fut_waiter = None + +        self.__class__._current_tasks[self._loop] = self +        # Call either coro.throw(exc) or coro.send(None). +        try: +            if exc is None: +                # We use the `send` method directly, because coroutines +                # don't have `__iter__` and `__next__` methods. +                result = coro.send(None) +            else: +                result = coro.throw(exc) +        except StopIteration as exc: +            self.set_result(exc.value) +        except futures.CancelledError as exc: +            super().cancel()  # I.e., Future.cancel(self). +        except Exception as exc: +            self.set_exception(exc) +        except BaseException as exc: +            self.set_exception(exc) +            raise +        else: +            if isinstance(result, futures.Future): +                # Yielded Future must come from Future.__iter__(). +                if result._loop is not self._loop: +                    self._loop.call_soon( +                        self._step, +                        RuntimeError( +                            'Task {!r} got Future {!r} attached to a ' +                            'different loop'.format(self, result))) +                elif result._blocking: +                    result._blocking = False +                    result.add_done_callback(self._wakeup) +                    self._fut_waiter = result +                    if self._must_cancel: +                        if self._fut_waiter.cancel(): +                            self._must_cancel = False +                else: +                    self._loop.call_soon( +                        self._step, +                        RuntimeError( +                            'yield was used instead of yield from ' +                            'in task {!r} with {!r}'.format(self, result))) +            elif result is None: +                # Bare yield relinquishes control for one event loop iteration. +                self._loop.call_soon(self._step) +            elif inspect.isgenerator(result): +                # Yielding a generator is just wrong. +                self._loop.call_soon( +                    self._step, +                    RuntimeError( +                        'yield was used instead of yield from for ' +                        'generator in task {!r} with {}'.format( +                            self, result))) +            else: +                # Yielding something else is an error. +                self._loop.call_soon( +                    self._step, +                    RuntimeError( +                        'Task got bad yield: {!r}'.format(result))) +        finally: +            self.__class__._current_tasks.pop(self._loop) +            self = None  # Needed to break cycles when an exception occurs. + +    def _wakeup(self, future): +        try: +            future.result() +        except Exception as exc: +            # This may also be a cancellation. +            self._step(exc) +        else: +            # Don't pass the value of `future.result()` explicitly, +            # as `Future.__iter__` and `Future.__await__` don't need it. +            # If we call `_step(value, None)` instead of `_step()`, +            # Python eval loop would use `.send(value)` method call, +            # instead of `__next__()`, which is slower for futures +            # that return non-generator iterators from their `__iter__`. +            self._step() +        self = None  # Needed to break cycles when an exception occurs. + + +# wait() and as_completed() similar to those in PEP 3148. + +FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED +FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION +ALL_COMPLETED = concurrent.futures.ALL_COMPLETED + + +@coroutine +def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): +    """Wait for the Futures and coroutines given by fs to complete. + +    The sequence futures must not be empty. + +    Coroutines will be wrapped in Tasks. + +    Returns two sets of Future: (done, pending). + +    Usage: + +        done, pending = yield from asyncio.wait(fs) + +    Note: This does not raise TimeoutError! Futures that aren't done +    when the timeout occurs are returned in the second set. +    """ +    if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs): +        raise TypeError("expect a list of futures, not %s" % type(fs).__name__) +    if not fs: +        raise ValueError('Set of coroutines/Futures is empty.') +    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): +        raise ValueError('Invalid return_when value: {}'.format(return_when)) + +    if loop is None: +        loop = events.get_event_loop() + +    fs = {ensure_future(f, loop=loop) for f in set(fs)} + +    return (yield from _wait(fs, timeout, return_when, loop)) + + +def _release_waiter(waiter, *args): +    if not waiter.done(): +        waiter.set_result(None) + + +@coroutine +def wait_for(fut, timeout, *, loop=None): +    """Wait for the single Future or coroutine to complete, with timeout. + +    Coroutine will be wrapped in Task. + +    Returns result of the Future or coroutine.  When a timeout occurs, +    it cancels the task and raises TimeoutError.  To avoid the task +    cancellation, wrap it in shield(). + +    If the wait is cancelled, the task is also cancelled. + +    This function is a coroutine. +    """ +    if loop is None: +        loop = events.get_event_loop() + +    if timeout is None: +        return (yield from fut) + +    waiter = futures.Future(loop=loop) +    timeout_handle = loop.call_later(timeout, _release_waiter, waiter) +    cb = functools.partial(_release_waiter, waiter) + +    fut = ensure_future(fut, loop=loop) +    fut.add_done_callback(cb) + +    try: +        # wait until the future completes or the timeout +        try: +            yield from waiter +        except futures.CancelledError: +            fut.remove_done_callback(cb) +            fut.cancel() +            raise + +        if fut.done(): +            return fut.result() +        else: +            fut.remove_done_callback(cb) +            fut.cancel() +            raise futures.TimeoutError() +    finally: +        timeout_handle.cancel() + + +@coroutine +def _wait(fs, timeout, return_when, loop): +    """Internal helper for wait() and _wait_for(). + +    The fs argument must be a collection of Futures. +    """ +    assert fs, 'Set of Futures is empty.' +    waiter = futures.Future(loop=loop) +    timeout_handle = None +    if timeout is not None: +        timeout_handle = loop.call_later(timeout, _release_waiter, waiter) +    counter = len(fs) + +    def _on_completion(f): +        nonlocal counter +        counter -= 1 +        if (counter <= 0 or +            return_when == FIRST_COMPLETED or +            return_when == FIRST_EXCEPTION and (not f.cancelled() and +                                                f.exception() is not None)): +            if timeout_handle is not None: +                timeout_handle.cancel() +            if not waiter.done(): +                waiter.set_result(None) + +    for f in fs: +        f.add_done_callback(_on_completion) + +    try: +        yield from waiter +    finally: +        if timeout_handle is not None: +            timeout_handle.cancel() + +    done, pending = set(), set() +    for f in fs: +        f.remove_done_callback(_on_completion) +        if f.done(): +            done.add(f) +        else: +            pending.add(f) +    return done, pending + + +# This is *not* a @coroutine!  It is just an iterator (yielding Futures). +def as_completed(fs, *, loop=None, timeout=None): +    """Return an iterator whose values are coroutines. + +    When waiting for the yielded coroutines you'll get the results (or +    exceptions!) of the original Futures (or coroutines), in the order +    in which and as soon as they complete. + +    This differs from PEP 3148; the proper way to use this is: + +        for f in as_completed(fs): +            result = yield from f  # The 'yield from' may raise. +            # Use result. + +    If a timeout is specified, the 'yield from' will raise +    TimeoutError when the timeout occurs before all Futures are done. + +    Note: The futures 'f' are not necessarily members of fs. +    """ +    if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs): +        raise TypeError("expect a list of futures, not %s" % type(fs).__name__) +    loop = loop if loop is not None else events.get_event_loop() +    todo = {ensure_future(f, loop=loop) for f in set(fs)} +    from .queues import Queue  # Import here to avoid circular import problem. +    done = Queue(loop=loop) +    timeout_handle = None + +    def _on_timeout(): +        for f in todo: +            f.remove_done_callback(_on_completion) +            done.put_nowait(None)  # Queue a dummy value for _wait_for_one(). +        todo.clear()  # Can't do todo.remove(f) in the loop. + +    def _on_completion(f): +        if not todo: +            return  # _on_timeout() was here first. +        todo.remove(f) +        done.put_nowait(f) +        if not todo and timeout_handle is not None: +            timeout_handle.cancel() + +    @coroutine +    def _wait_for_one(): +        f = yield from done.get() +        if f is None: +            # Dummy value from _on_timeout(). +            raise futures.TimeoutError +        return f.result()  # May raise f.exception(). + +    for f in todo: +        f.add_done_callback(_on_completion) +    if todo and timeout is not None: +        timeout_handle = loop.call_later(timeout, _on_timeout) +    for _ in range(len(todo)): +        yield _wait_for_one() + + +@coroutine +def sleep(delay, result=None, *, loop=None): +    """Coroutine that completes after a given time (in seconds).""" +    if delay == 0: +        yield +        return result + +    future = futures.Future(loop=loop) +    h = future._loop.call_later(delay, +                                futures._set_result_unless_cancelled, +                                future, result) +    try: +        return (yield from future) +    finally: +        h.cancel() + + +def async(coro_or_future, *, loop=None): +    """Wrap a coroutine in a future. + +    If the argument is a Future, it is returned directly. + +    This function is deprecated in 3.5. Use asyncio.ensure_future() instead. +    """ + +    warnings.warn("asyncio.async() function is deprecated, use ensure_future()", +                  DeprecationWarning) + +    return ensure_future(coro_or_future, loop=loop) + + +def ensure_future(coro_or_future, *, loop=None): +    """Wrap a coroutine or an awaitable in a future. + +    If the argument is a Future, it is returned directly. +    """ +    if isinstance(coro_or_future, futures.Future): +        if loop is not None and loop is not coro_or_future._loop: +            raise ValueError('loop argument must agree with Future') +        return coro_or_future +    elif coroutines.iscoroutine(coro_or_future): +        if loop is None: +            loop = events.get_event_loop() +        task = loop.create_task(coro_or_future) +        if task._source_traceback: +            del task._source_traceback[-1] +        return task +    elif compat.PY35 and inspect.isawaitable(coro_or_future): +        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) +    else: +        raise TypeError('A Future, a coroutine or an awaitable is required') + + +@coroutine +def _wrap_awaitable(awaitable): +    """Helper for asyncio.ensure_future(). + +    Wraps awaitable (an object with __await__) into a coroutine +    that will later be wrapped in a Task by ensure_future(). +    """ +    return (yield from awaitable.__await__()) + + +class _GatheringFuture(futures.Future): +    """Helper for gather(). + +    This overrides cancel() to cancel all the children and act more +    like Task.cancel(), which doesn't immediately mark itself as +    cancelled. +    """ + +    def __init__(self, children, *, loop=None): +        super().__init__(loop=loop) +        self._children = children + +    def cancel(self): +        if self.done(): +            return False +        for child in self._children: +            child.cancel() +        return True + + +def gather(*coros_or_futures, loop=None, return_exceptions=False): +    """Return a future aggregating results from the given coroutines +    or futures. + +    All futures must share the same event loop.  If all the tasks are +    done successfully, the returned future's result is the list of +    results (in the order of the original sequence, not necessarily +    the order of results arrival).  If *return_exceptions* is True, +    exceptions in the tasks are treated the same as successful +    results, and gathered in the result list; otherwise, the first +    raised exception will be immediately propagated to the returned +    future. + +    Cancellation: if the outer Future is cancelled, all children (that +    have not completed yet) are also cancelled.  If any child is +    cancelled, this is treated as if it raised CancelledError -- +    the outer Future is *not* cancelled in this case.  (This is to +    prevent the cancellation of one child to cause other children to +    be cancelled.) +    """ +    if not coros_or_futures: +        outer = futures.Future(loop=loop) +        outer.set_result([]) +        return outer + +    arg_to_fut = {} +    for arg in set(coros_or_futures): +        if not isinstance(arg, futures.Future): +            fut = ensure_future(arg, loop=loop) +            if loop is None: +                loop = fut._loop +            # The caller cannot control this future, the "destroy pending task" +            # warning should not be emitted. +            fut._log_destroy_pending = False +        else: +            fut = arg +            if loop is None: +                loop = fut._loop +            elif fut._loop is not loop: +                raise ValueError("futures are tied to different event loops") +        arg_to_fut[arg] = fut + +    children = [arg_to_fut[arg] for arg in coros_or_futures] +    nchildren = len(children) +    outer = _GatheringFuture(children, loop=loop) +    nfinished = 0 +    results = [None] * nchildren + +    def _done_callback(i, fut): +        nonlocal nfinished +        if outer.done(): +            if not fut.cancelled(): +                # Mark exception retrieved. +                fut.exception() +            return + +        if fut.cancelled(): +            res = futures.CancelledError() +            if not return_exceptions: +                outer.set_exception(res) +                return +        elif fut._exception is not None: +            res = fut.exception()  # Mark exception retrieved. +            if not return_exceptions: +                outer.set_exception(res) +                return +        else: +            res = fut._result +        results[i] = res +        nfinished += 1 +        if nfinished == nchildren: +            outer.set_result(results) + +    for i, fut in enumerate(children): +        fut.add_done_callback(functools.partial(_done_callback, i)) +    return outer + + +def shield(arg, *, loop=None): +    """Wait for a future, shielding it from cancellation. + +    The statement + +        res = yield from shield(something()) + +    is exactly equivalent to the statement + +        res = yield from something() + +    *except* that if the coroutine containing it is cancelled, the +    task running in something() is not cancelled.  From the POV of +    something(), the cancellation did not happen.  But its caller is +    still cancelled, so the yield-from expression still raises +    CancelledError.  Note: If something() is cancelled by other means +    this will still cancel shield(). + +    If you want to completely ignore cancellation (not recommended) +    you can combine shield() with a try/except clause, as follows: + +        try: +            res = yield from shield(something()) +        except CancelledError: +            res = None +    """ +    inner = ensure_future(arg, loop=loop) +    if inner.done(): +        # Shortcut. +        return inner +    loop = inner._loop +    outer = futures.Future(loop=loop) + +    def _done_callback(inner): +        if outer.cancelled(): +            if not inner.cancelled(): +                # Mark inner's result as retrieved. +                inner.exception() +            return + +        if inner.cancelled(): +            outer.cancel() +        else: +            exc = inner.exception() +            if exc is not None: +                outer.set_exception(exc) +            else: +                outer.set_result(inner.result()) + +    inner.add_done_callback(_done_callback) +    return outer + + +def run_coroutine_threadsafe(coro, loop): +    """Submit a coroutine object to a given event loop. + +    Return a concurrent.futures.Future to access the result. +    """ +    if not coroutines.iscoroutine(coro): +        raise TypeError('A coroutine object is required') +    future = concurrent.futures.Future() + +    def callback(): +        try: +            futures._chain_future(ensure_future(coro, loop=loop), future) +        except Exception as exc: +            if future.set_running_or_notify_cancel(): +                future.set_exception(exc) +            raise + +    loop.call_soon_threadsafe(callback) +    return future + + +def timeout(timeout, *, loop=None): +    """A factory which produce a context manager with timeout. + +    Useful in cases when you want to apply timeout logic around block +    of code or in cases when asyncio.wait_for is not suitable. + +    For example: + +    >>> with asyncio.timeout(0.001): +    ...     yield from coro() + + +    timeout: timeout value in seconds +    loop: asyncio compatible event loop +    """ +    if loop is None: +        loop = events.get_event_loop() +    return _Timeout(timeout, loop=loop) + + +class _Timeout: +    def __init__(self, timeout, *, loop): +        self._timeout = timeout +        self._loop = loop +        self._task = None +        self._cancelled = False +        self._cancel_handler = None + +    def __enter__(self): +        self._task = Task.current_task(loop=self._loop) +        if self._task is None: +            raise RuntimeError('Timeout context manager should be used ' +                               'inside a task') +        self._cancel_handler = self._loop.call_later( +            self._timeout, self._cancel_task) +        return self + +    def __exit__(self, exc_type, exc_val, exc_tb): +        if exc_type is futures.CancelledError and self._cancelled: +            self._cancel_handler = None +            self._task = None +            raise futures.TimeoutError +        self._cancel_handler.cancel() +        self._cancel_handler = None +        self._task = None + +    def _cancel_task(self): +        self._cancelled = self._task.cancel()  | 
