diff options
author | Andrew Svetlov <andrew.svetlov@gmail.com> | 2022-03-10 16:05:20 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-10 16:05:20 (GMT) |
commit | f537b2a4fb86445ee3bd6ca7f10bc9d3a9f37da5 (patch) | |
tree | d67041f2121c19f1ba0a6a723a8aeccb771d3614 /Lib/asyncio | |
parent | 32bf3597922ac3f613989582afa2bff43bea8a2f (diff) | |
download | cpython-f537b2a4fb86445ee3bd6ca7f10bc9d3a9f37da5.zip cpython-f537b2a4fb86445ee3bd6ca7f10bc9d3a9f37da5.tar.gz cpython-f537b2a4fb86445ee3bd6ca7f10bc9d3a9f37da5.tar.bz2 |
bpo-46771: Implement asyncio context managers for handling timeouts (GH-31394)
Example:
async with asyncio.timeout(5):
await some_task()
Will interrupt the await and raise TimeoutError if some_task() takes longer than 5 seconds.
Co-authored-by: Guido van Rossum <guido@python.org>
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/__init__.py | 2 | ||||
-rw-r--r-- | Lib/asyncio/timeouts.py | 151 |
2 files changed, 153 insertions, 0 deletions
diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index db1124c..fed16ec7c6 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -18,6 +18,7 @@ from .streams import * from .subprocess import * from .tasks import * from .taskgroups import * +from .timeouts import * from .threads import * from .transports import * @@ -34,6 +35,7 @@ __all__ = (base_events.__all__ + subprocess.__all__ + tasks.__all__ + threads.__all__ + + timeouts.__all__ + transports.__all__) if sys.platform == 'win32': # pragma: no cover diff --git a/Lib/asyncio/timeouts.py b/Lib/asyncio/timeouts.py new file mode 100644 index 0000000..a892053 --- /dev/null +++ b/Lib/asyncio/timeouts.py @@ -0,0 +1,151 @@ +import enum + +from types import TracebackType +from typing import final, Optional, Type + +from . import events +from . import exceptions +from . import tasks + + +__all__ = ( + "Timeout", + "timeout", + "timeout_at", +) + + +class _State(enum.Enum): + CREATED = "created" + ENTERED = "active" + EXPIRING = "expiring" + EXPIRED = "expired" + EXITED = "finished" + + +@final +class Timeout: + + def __init__(self, when: Optional[float]) -> None: + self._state = _State.CREATED + + self._timeout_handler: Optional[events.TimerHandle] = None + self._task: Optional[tasks.Task] = None + self._when = when + + def when(self) -> Optional[float]: + return self._when + + def reschedule(self, when: Optional[float]) -> None: + assert self._state is not _State.CREATED + if self._state is not _State.ENTERED: + raise RuntimeError( + f"Cannot change state of {self._state.value} Timeout", + ) + + self._when = when + + if self._timeout_handler is not None: + self._timeout_handler.cancel() + + if when is None: + self._timeout_handler = None + else: + loop = events.get_running_loop() + self._timeout_handler = loop.call_at( + when, + self._on_timeout, + ) + + def expired(self) -> bool: + """Is timeout expired during execution?""" + return self._state in (_State.EXPIRING, _State.EXPIRED) + + def __repr__(self) -> str: + info = [''] + if self._state is _State.ENTERED: + when = round(self._when, 3) if self._when is not None else None + info.append(f"when={when}") + info_str = ' '.join(info) + return f"<Timeout [{self._state.value}]{info_str}>" + + async def __aenter__(self) -> "Timeout": + self._state = _State.ENTERED + self._task = tasks.current_task() + if self._task is None: + raise RuntimeError("Timeout should be used inside a task") + self.reschedule(self._when) + return self + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> Optional[bool]: + assert self._state in (_State.ENTERED, _State.EXPIRING) + + if self._timeout_handler is not None: + self._timeout_handler.cancel() + self._timeout_handler = None + + if self._state is _State.EXPIRING: + self._state = _State.EXPIRED + + if self._task.uncancel() == 0 and exc_type is exceptions.CancelledError: + # Since there are no outstanding cancel requests, we're + # handling this. + raise TimeoutError + elif self._state is _State.ENTERED: + self._state = _State.EXITED + + return None + + def _on_timeout(self) -> None: + assert self._state is _State.ENTERED + self._task.cancel() + self._state = _State.EXPIRING + # drop the reference early + self._timeout_handler = None + + +def timeout(delay: Optional[float]) -> Timeout: + """Timeout async context manager. + + Useful in cases when you want to apply timeout logic around block + of code or in cases when asyncio.wait_for is not suitable. For example: + + >>> async with asyncio.timeout(10): # 10 seconds timeout + ... await long_running_task() + + + delay - value in seconds or None to disable timeout logic + + long_running_task() is interrupted by raising asyncio.CancelledError, + the top-most affected timeout() context manager converts CancelledError + into TimeoutError. + """ + loop = events.get_running_loop() + return Timeout(loop.time() + delay if delay is not None else None) + + +def timeout_at(when: Optional[float]) -> Timeout: + """Schedule the timeout at absolute time. + + Like timeout() but argument gives absolute time in the same clock system + as loop.time(). + + Please note: it is not POSIX time but a time with + undefined starting base, e.g. the time of the system power on. + + >>> async with asyncio.timeout_at(loop.time() + 10): + ... await long_running_task() + + + when - a deadline when timeout occurs or None to disable timeout logic + + long_running_task() is interrupted by raising asyncio.CancelledError, + the top-most affected timeout() context manager converts CancelledError + into TimeoutError. + """ + return Timeout(when) |