# Adapted with permission from the EdgeDB project. __all__ = ["TaskGroup"] import weakref from . import events from . import exceptions from . import tasks class TaskGroup: def __init__(self): self._entered = False self._exiting = False self._aborting = False self._loop = None self._parent_task = None self._parent_cancel_requested = False self._tasks = weakref.WeakSet() self._unfinished_tasks = 0 self._errors = [] self._base_error = None self._on_completed_fut = None def __repr__(self): msg = f' bool: assert isinstance(exc, BaseException) return isinstance(exc, (SystemExit, KeyboardInterrupt)) def _abort(self): self._aborting = True for t in self._tasks: if not t.done(): t.cancel() def _on_task_done(self, task): self._unfinished_tasks -= 1 assert self._unfinished_tasks >= 0 if self._on_completed_fut is not None and not self._unfinished_tasks: if not self._on_completed_fut.done(): self._on_completed_fut.set_result(True) if task.cancelled(): return exc = task.exception() if exc is None: return self._errors.append(exc) if self._is_base_error(exc) and self._base_error is None: self._base_error = exc if self._parent_task.done(): # Not sure if this case is possible, but we want to handle # it anyways. self._loop.call_exception_handler({ 'message': f'Task {task!r} has errored out but its parent ' f'task {self._parent_task} is already completed', 'exception': exc, 'task': task, }) return self._abort() if not self._parent_task.cancelling(): # If parent task *is not* being cancelled, it means that we want # to manually cancel it to abort whatever is being run right now # in the TaskGroup. But we want to mark parent task as # "not cancelled" later in __aexit__. Example situation that # we need to handle: # # async def foo(): # try: # async with TaskGroup() as g: # g.create_task(crash_soon()) # await something # <- this needs to be canceled # # by the TaskGroup, e.g. # # foo() needs to be cancelled # except Exception: # # Ignore any exceptions raised in the TaskGroup # pass # await something_else # this line has to be called # # after TaskGroup is finished. self._parent_cancel_requested = True self._parent_task.cancel()