diff options
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/__init__.py | 2 | ||||
-rw-r--r-- | Lib/asyncio/base_events.py | 11 | ||||
-rw-r--r-- | Lib/asyncio/base_futures.py | 6 | ||||
-rw-r--r-- | Lib/asyncio/events.py | 11 | ||||
-rw-r--r-- | Lib/asyncio/exceptions.py | 60 | ||||
-rw-r--r-- | Lib/asyncio/futures.py | 33 | ||||
-rw-r--r-- | Lib/asyncio/locks.py | 7 | ||||
-rw-r--r-- | Lib/asyncio/proactor_events.py | 11 | ||||
-rw-r--r-- | Lib/asyncio/streams.py | 48 | ||||
-rw-r--r-- | Lib/asyncio/tasks.py | 25 | ||||
-rw-r--r-- | Lib/asyncio/unix_events.py | 9 | ||||
-rw-r--r-- | Lib/asyncio/windows_events.py | 5 |
12 files changed, 130 insertions, 98 deletions
diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index 2685902..28c2e2c 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -8,6 +8,7 @@ import sys from .base_events import * from .coroutines import * from .events import * +from .exceptions import * from .futures import * from .locks import * from .protocols import * @@ -25,6 +26,7 @@ from .tasks import _all_tasks_compat # NoQA __all__ = (base_events.__all__ + coroutines.__all__ + events.__all__ + + exceptions.__all__ + futures.__all__ + locks.__all__ + protocols.__all__ + diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index ee13d1a..0467438 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -37,6 +37,7 @@ except ImportError: # pragma: no cover from . import constants from . import coroutines from . import events +from . import exceptions from . import futures from . import protocols from . import sslproto @@ -327,7 +328,7 @@ class Server(events.AbstractServer): try: await self._serving_forever_fut - except futures.CancelledError: + except exceptions.CancelledError: try: self.close() await self.wait_closed() @@ -800,7 +801,7 @@ class BaseEventLoop(events.AbstractEventLoop): try: return await self._sock_sendfile_native(sock, file, offset, count) - except events.SendfileNotAvailableError as exc: + except exceptions.SendfileNotAvailableError as exc: if not fallback: raise return await self._sock_sendfile_fallback(sock, file, @@ -809,7 +810,7 @@ class BaseEventLoop(events.AbstractEventLoop): async def _sock_sendfile_native(self, sock, file, offset, count): # NB: sendfile syscall is not supported for SSL sockets and # non-mmap files even if sendfile is supported by OS - raise events.SendfileNotAvailableError( + raise exceptions.SendfileNotAvailableError( f"syscall sendfile is not available for socket {sock!r} " "and file {file!r} combination") @@ -1053,7 +1054,7 @@ class BaseEventLoop(events.AbstractEventLoop): try: return await self._sendfile_native(transport, file, offset, count) - except events.SendfileNotAvailableError as exc: + except exceptions.SendfileNotAvailableError as exc: if not fallback: raise @@ -1066,7 +1067,7 @@ class BaseEventLoop(events.AbstractEventLoop): offset, count) async def _sendfile_native(self, transp, file, offset, count): - raise events.SendfileNotAvailableError( + raise exceptions.SendfileNotAvailableError( "sendfile syscall is not supported") async def _sendfile_fallback(self, transp, file, offset, count): diff --git a/Lib/asyncio/base_futures.py b/Lib/asyncio/base_futures.py index bd65bee..22f2980 100644 --- a/Lib/asyncio/base_futures.py +++ b/Lib/asyncio/base_futures.py @@ -1,15 +1,9 @@ __all__ = () -import concurrent.futures import reprlib from . import format_helpers -CancelledError = concurrent.futures.CancelledError -TimeoutError = concurrent.futures.TimeoutError -InvalidStateError = concurrent.futures.InvalidStateError - - # States for Future. _PENDING = 'PENDING' _CANCELLED = 'CANCELLED' diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 58a60a0..163b868 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -3,7 +3,7 @@ __all__ = ( 'AbstractEventLoopPolicy', 'AbstractEventLoop', 'AbstractServer', - 'Handle', 'TimerHandle', 'SendfileNotAvailableError', + 'Handle', 'TimerHandle', 'get_event_loop_policy', 'set_event_loop_policy', 'get_event_loop', 'set_event_loop', 'new_event_loop', 'get_child_watcher', 'set_child_watcher', @@ -19,14 +19,7 @@ import sys import threading from . import format_helpers - - -class SendfileNotAvailableError(RuntimeError): - """Sendfile syscall is not available. - - Raised if OS does not support sendfile syscall for given socket or - file type. - """ +from . import exceptions class Handle: diff --git a/Lib/asyncio/exceptions.py b/Lib/asyncio/exceptions.py new file mode 100644 index 0000000..cac31a5 --- /dev/null +++ b/Lib/asyncio/exceptions.py @@ -0,0 +1,60 @@ +"""asyncio exceptions.""" + + +__all__ = ('CancelledError', 'InvalidStateError', 'TimeoutError', + 'IncompleteReadError', 'LimitOverrunError', + 'SendfileNotAvailableError') + +import concurrent.futures +from . import base_futures + + +class CancelledError(concurrent.futures.CancelledError): + """The Future or Task was cancelled.""" + + +class TimeoutError(concurrent.futures.TimeoutError): + """The operation exceeded the given deadline.""" + + +class InvalidStateError(concurrent.futures.InvalidStateError): + """The operation is not allowed in this state.""" + + +class SendfileNotAvailableError(RuntimeError): + """Sendfile syscall is not available. + + Raised if OS does not support sendfile syscall for given socket or + file type. + """ + + +class IncompleteReadError(EOFError): + """ + Incomplete read error. Attributes: + + - partial: read bytes string before the end of stream was reached + - expected: total number of expected bytes (or None if unknown) + """ + def __init__(self, partial, expected): + super().__init__(f'{len(partial)} bytes read on a total of ' + f'{expected!r} expected bytes') + self.partial = partial + self.expected = expected + + def __reduce__(self): + return type(self), (self.partial, self.expected) + + +class LimitOverrunError(Exception): + """Reached the buffer limit while looking for a separator. + + Attributes: + - consumed: total number of to be consumed bytes. + """ + def __init__(self, message, consumed): + super().__init__(message) + self.consumed = consumed + + def __reduce__(self): + return type(self), (self.args[0], self.consumed) diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index 0e0e696..98a5308e 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -1,7 +1,6 @@ """A Future class similar to the one in PEP 3148.""" __all__ = ( - 'CancelledError', 'TimeoutError', 'InvalidStateError', 'Future', 'wrap_future', 'isfuture', ) @@ -12,12 +11,10 @@ import sys from . import base_futures from . import events +from . import exceptions from . import format_helpers -CancelledError = base_futures.CancelledError -InvalidStateError = base_futures.InvalidStateError -TimeoutError = base_futures.TimeoutError isfuture = base_futures.isfuture @@ -170,9 +167,9 @@ class Future: the future is done and has an exception set, this exception is raised. """ if self._state == _CANCELLED: - raise CancelledError + raise exceptions.CancelledError if self._state != _FINISHED: - raise InvalidStateError('Result is not ready.') + raise exceptions.InvalidStateError('Result is not ready.') self.__log_traceback = False if self._exception is not None: raise self._exception @@ -187,9 +184,9 @@ class Future: InvalidStateError. """ if self._state == _CANCELLED: - raise CancelledError + raise exceptions.CancelledError if self._state != _FINISHED: - raise InvalidStateError('Exception is not set.') + raise exceptions.InvalidStateError('Exception is not set.') self.__log_traceback = False return self._exception @@ -231,7 +228,7 @@ class Future: InvalidStateError. """ if self._state != _PENDING: - raise InvalidStateError('{}: {!r}'.format(self._state, self)) + raise exceptions.InvalidStateError(f'{self._state}: {self!r}') self._result = result self._state = _FINISHED self.__schedule_callbacks() @@ -243,7 +240,7 @@ class Future: InvalidStateError. """ if self._state != _PENDING: - raise InvalidStateError('{}: {!r}'.format(self._state, self)) + raise exceptions.InvalidStateError(f'{self._state}: {self!r}') if isinstance(exception, type): exception = exception() if type(exception) is StopIteration: @@ -288,6 +285,18 @@ def _set_result_unless_cancelled(fut, result): fut.set_result(result) +def _convert_future_exc(exc): + exc_class = type(exc) + if exc_class is concurrent.futures.CancelledError: + return exceptions.CancelledError(*exc.args) + elif exc_class is concurrent.futures.TimeoutError: + return exceptions.TimeoutError(*exc.args) + elif exc_class is concurrent.futures.InvalidStateError: + return exceptions.InvalidStateError(*exc.args) + else: + return exc + + def _set_concurrent_future_state(concurrent, source): """Copy state from a future to a concurrent.futures.Future.""" assert source.done() @@ -297,7 +306,7 @@ def _set_concurrent_future_state(concurrent, source): return exception = source.exception() if exception is not None: - concurrent.set_exception(exception) + concurrent.set_exception(_convert_future_exc(exception)) else: result = source.result() concurrent.set_result(result) @@ -317,7 +326,7 @@ def _copy_future_state(source, dest): else: exception = source.exception() if exception is not None: - dest.set_exception(exception) + dest.set_exception(_convert_future_exc(exception)) else: result = source.result() dest.set_result(result) diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py index 91f7a01..639bd11 100644 --- a/Lib/asyncio/locks.py +++ b/Lib/asyncio/locks.py @@ -7,6 +7,7 @@ import warnings from . import events from . import futures +from . import exceptions from .coroutines import coroutine @@ -192,7 +193,7 @@ class Lock(_ContextManagerMixin): await fut finally: self._waiters.remove(fut) - except futures.CancelledError: + except exceptions.CancelledError: if not self._locked: self._wake_up_first() raise @@ -363,11 +364,11 @@ class Condition(_ContextManagerMixin): try: await self.acquire() break - except futures.CancelledError: + except exceptions.CancelledError: cancelled = True if cancelled: - raise futures.CancelledError + raise exceptions.CancelledError async def wait_for(self, predicate): """Wait until a predicate becomes true. diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 66bfb0a..ad23918 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -15,6 +15,7 @@ from . import base_events from . import constants from . import events from . import futures +from . import exceptions from . import protocols from . import sslproto from . import transports @@ -282,7 +283,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, self._force_close(exc) except OSError as exc: self._fatal_error(exc, 'Fatal read error on pipe transport') - except futures.CancelledError: + except exceptions.CancelledError: if not self._closing: raise else: @@ -555,11 +556,11 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): try: fileno = file.fileno() except (AttributeError, io.UnsupportedOperation) as err: - raise events.SendfileNotAvailableError("not a regular file") + raise exceptions.SendfileNotAvailableError("not a regular file") try: fsize = os.fstat(fileno).st_size except OSError as err: - raise events.SendfileNotAvailableError("not a regular file") + raise exceptions.SendfileNotAvailableError("not a regular file") blocksize = count if count else fsize if not blocksize: return 0 # empty file @@ -615,7 +616,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): if f is not None: f.result() # may raise f = self._proactor.recv(self._ssock, 4096) - except futures.CancelledError: + except exceptions.CancelledError: # _close_self_pipe() has been called, stop waiting for data return except Exception as exc: @@ -666,7 +667,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): elif self._debug: logger.debug("Accept failed on socket %r", sock, exc_info=True) - except futures.CancelledError: + except exceptions.CancelledError: sock.close() else: self._accept_futures[sock.fileno()] = f diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index d6531f8..9dab49b 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -1,8 +1,6 @@ __all__ = ( 'StreamReader', 'StreamWriter', 'StreamReaderProtocol', - 'open_connection', 'start_server', - 'IncompleteReadError', 'LimitOverrunError', -) + 'open_connection', 'start_server') import socket @@ -11,6 +9,7 @@ if hasattr(socket, 'AF_UNIX'): from . import coroutines from . import events +from . import exceptions from . import protocols from .log import logger from .tasks import sleep @@ -19,37 +18,6 @@ from .tasks import sleep _DEFAULT_LIMIT = 2 ** 16 # 64 KiB -class IncompleteReadError(EOFError): - """ - Incomplete read error. Attributes: - - - partial: read bytes string before the end of stream was reached - - expected: total number of expected bytes (or None if unknown) - """ - def __init__(self, partial, expected): - super().__init__(f'{len(partial)} bytes read on a total of ' - f'{expected!r} expected bytes') - self.partial = partial - self.expected = expected - - def __reduce__(self): - return type(self), (self.partial, self.expected) - - -class LimitOverrunError(Exception): - """Reached the buffer limit while looking for a separator. - - Attributes: - - consumed: total number of to be consumed bytes. - """ - def __init__(self, message, consumed): - super().__init__(message) - self.consumed = consumed - - def __reduce__(self): - return type(self), (self.args[0], self.consumed) - - async def open_connection(host=None, port=None, *, loop=None, limit=_DEFAULT_LIMIT, **kwds): """A wrapper for create_connection() returning a (reader, writer) pair. @@ -494,9 +462,9 @@ class StreamReader: seplen = len(sep) try: line = await self.readuntil(sep) - except IncompleteReadError as e: + except exceptions.IncompleteReadError as e: return e.partial - except LimitOverrunError as e: + except exceptions.LimitOverrunError as e: if self._buffer.startswith(sep, e.consumed): del self._buffer[:e.consumed + seplen] else: @@ -571,7 +539,7 @@ class StreamReader: # see upper comment for explanation. offset = buflen + 1 - seplen if offset > self._limit: - raise LimitOverrunError( + raise exceptions.LimitOverrunError( 'Separator is not found, and chunk exceed the limit', offset) @@ -582,13 +550,13 @@ class StreamReader: if self._eof: chunk = bytes(self._buffer) self._buffer.clear() - raise IncompleteReadError(chunk, None) + raise exceptions.IncompleteReadError(chunk, None) # _wait_for_data() will resume reading if stream was paused. await self._wait_for_data('readuntil') if isep > self._limit: - raise LimitOverrunError( + raise exceptions.LimitOverrunError( 'Separator is found, but chunk is longer than limit', isep) chunk = self._buffer[:isep + seplen] @@ -674,7 +642,7 @@ class StreamReader: if self._eof: incomplete = bytes(self._buffer) self._buffer.clear() - raise IncompleteReadError(incomplete, n) + raise exceptions.IncompleteReadError(incomplete, n) await self._wait_for_data('readexactly') diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 03d71d3..7121aa6 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -21,6 +21,7 @@ import weakref from . import base_tasks from . import coroutines from . import events +from . import exceptions from . import futures from .coroutines import coroutine @@ -228,11 +229,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation def __step(self, exc=None): if self.done(): - raise futures.InvalidStateError( + raise exceptions.InvalidStateError( f'_step(): already done: {self!r}, {exc!r}') if self._must_cancel: - if not isinstance(exc, futures.CancelledError): - exc = futures.CancelledError() + if not isinstance(exc, exceptions.CancelledError): + exc = exceptions.CancelledError() self._must_cancel = False coro = self._coro self._fut_waiter = None @@ -250,10 +251,10 @@ class Task(futures._PyFuture): # Inherit Python Task implementation if self._must_cancel: # Task is cancelled right before coro stops. self._must_cancel = False - super().set_exception(futures.CancelledError()) + super().set_exception(exceptions.CancelledError()) else: super().set_result(exc.value) - except futures.CancelledError: + except exceptions.CancelledError: super().cancel() # I.e., Future.cancel(self). except Exception as exc: super().set_exception(exc) @@ -419,7 +420,7 @@ async def wait_for(fut, timeout, *, loop=None): return fut.result() fut.cancel() - raise futures.TimeoutError() + raise exceptions.TimeoutError() waiter = loop.create_future() timeout_handle = loop.call_later(timeout, _release_waiter, waiter) @@ -432,7 +433,7 @@ async def wait_for(fut, timeout, *, loop=None): # wait until the future completes or the timeout try: await waiter - except futures.CancelledError: + except exceptions.CancelledError: fut.remove_done_callback(cb) fut.cancel() raise @@ -445,7 +446,7 @@ async def wait_for(fut, timeout, *, loop=None): # after wait_for() returns. # See https://bugs.python.org/issue32751 await _cancel_and_wait(fut, loop=loop) - raise futures.TimeoutError() + raise exceptions.TimeoutError() finally: timeout_handle.cancel() @@ -554,7 +555,7 @@ def as_completed(fs, *, loop=None, timeout=None): f = await done.get() if f is None: # Dummy value from _on_timeout(). - raise futures.TimeoutError + raise exceptions.TimeoutError return f.result() # May raise f.exception(). for f in todo: @@ -701,7 +702,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): # Check if 'fut' is cancelled first, as # 'fut.exception()' will *raise* a CancelledError # instead of returning it. - exc = futures.CancelledError() + exc = exceptions.CancelledError() outer.set_exception(exc) return else: @@ -720,7 +721,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): # Check if 'fut' is cancelled first, as # 'fut.exception()' will *raise* a CancelledError # instead of returning it. - res = futures.CancelledError() + res = exceptions.CancelledError() else: res = fut.exception() if res is None: @@ -731,7 +732,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): # If gather is being cancelled we must propagate the # cancellation regardless of *return_exceptions* argument. # See issue 32684. - outer.set_exception(futures.CancelledError()) + outer.set_exception(exceptions.CancelledError()) else: outer.set_result(results) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 7cad7e3..1a62db4 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -18,6 +18,7 @@ from . import base_subprocess from . import constants from . import coroutines from . import events +from . import exceptions from . import futures from . import selector_events from . import tasks @@ -319,16 +320,16 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): try: os.sendfile except AttributeError as exc: - raise events.SendfileNotAvailableError( + raise exceptions.SendfileNotAvailableError( "os.sendfile() is not available") try: fileno = file.fileno() except (AttributeError, io.UnsupportedOperation) as err: - raise events.SendfileNotAvailableError("not a regular file") + raise exceptions.SendfileNotAvailableError("not a regular file") try: fsize = os.fstat(fileno).st_size except OSError as err: - raise events.SendfileNotAvailableError("not a regular file") + raise exceptions.SendfileNotAvailableError("not a regular file") blocksize = count if count else fsize if not blocksize: return 0 # empty file @@ -382,7 +383,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): # one being 'file' is not a regular mmap(2)-like # file, in which case we'll fall back on using # plain send(). - err = events.SendfileNotAvailableError( + err = exceptions.SendfileNotAvailableError( "os.sendfile call failed") self._sock_sendfile_update_filepos(fileno, offset, total_sent) fut.set_exception(err) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 2ec5427..fdde8e9 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -12,6 +12,7 @@ import weakref from . import events from . import base_subprocess from . import futures +from . import exceptions from . import proactor_events from . import selector_events from . import tasks @@ -351,7 +352,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): elif self._debug: logger.warning("Accept pipe failed on pipe %r", pipe, exc_info=True) - except futures.CancelledError: + except exceptions.CancelledError: if pipe: pipe.close() else: @@ -497,7 +498,7 @@ class IocpProactor: # Coroutine closing the accept socket if the future is cancelled try: await future - except futures.CancelledError: + except exceptions.CancelledError: conn.close() raise |