diff options
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/base_events.py | 96 | ||||
-rw-r--r-- | Lib/asyncio/events.py | 31 | ||||
-rw-r--r-- | Lib/asyncio/futures.py | 22 | ||||
-rw-r--r-- | Lib/asyncio/proactor_events.py | 33 | ||||
-rw-r--r-- | Lib/asyncio/selector_events.py | 35 | ||||
-rw-r--r-- | Lib/asyncio/test_utils.py | 18 | ||||
-rw-r--r-- | Lib/asyncio/unix_events.py | 26 | ||||
-rw-r--r-- | Lib/asyncio/windows_events.py | 8 |
8 files changed, 229 insertions, 40 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index b74e936..cb2499d 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -122,6 +122,7 @@ class BaseEventLoop(events.AbstractEventLoop): self._internal_fds = 0 self._running = False self._clock_resolution = time.get_clock_info('monotonic').resolution + self._exception_handler = None def _make_socket_transport(self, sock, protocol, waiter=None, *, extra=None, server=None): @@ -254,7 +255,7 @@ class BaseEventLoop(events.AbstractEventLoop): """Like call_later(), but uses an absolute time.""" if tasks.iscoroutinefunction(callback): raise TypeError("coroutines cannot be used with call_at()") - timer = events.TimerHandle(when, callback, args) + timer = events.TimerHandle(when, callback, args, self) heapq.heappush(self._scheduled, timer) return timer @@ -270,7 +271,7 @@ class BaseEventLoop(events.AbstractEventLoop): """ if tasks.iscoroutinefunction(callback): raise TypeError("coroutines cannot be used with call_soon()") - handle = events.Handle(callback, args) + handle = events.Handle(callback, args, self) self._ready.append(handle) return handle @@ -625,6 +626,97 @@ class BaseEventLoop(events.AbstractEventLoop): protocol, popen_args, False, stdin, stdout, stderr, bufsize, **kwargs) return transport, protocol + def set_exception_handler(self, handler): + """Set handler as the new event loop exception handler. + + If handler is None, the default exception handler will + be set. + + If handler is a callable object, it should have a + matching signature to '(loop, context)', where 'loop' + will be a reference to the active event loop, 'context' + will be a dict object (see `call_exception_handler()` + documentation for details about context). + """ + if handler is not None and not callable(handler): + raise TypeError('A callable object or None is expected, ' + 'got {!r}'.format(handler)) + self._exception_handler = handler + + def default_exception_handler(self, context): + """Default exception handler. + + This is called when an exception occurs and no exception + handler is set, and can be called by a custom exception + handler that wants to defer to the default behavior. + + context parameter has the same meaning as in + `call_exception_handler()`. + """ + message = context.get('message') + if not message: + message = 'Unhandled exception in event loop' + + exception = context.get('exception') + if exception is not None: + exc_info = (type(exception), exception, exception.__traceback__) + else: + exc_info = False + + log_lines = [message] + for key in sorted(context): + if key in {'message', 'exception'}: + continue + log_lines.append('{}: {!r}'.format(key, context[key])) + + logger.error('\n'.join(log_lines), exc_info=exc_info) + + def call_exception_handler(self, context): + """Call the current event loop exception handler. + + context is a dict object containing the following keys + (new keys maybe introduced later): + - 'message': Error message; + - 'exception' (optional): Exception object; + - 'future' (optional): Future instance; + - 'handle' (optional): Handle instance; + - 'protocol' (optional): Protocol instance; + - 'transport' (optional): Transport instance; + - 'socket' (optional): Socket instance. + + Note: this method should not be overloaded in subclassed + event loops. For any custom exception handling, use + `set_exception_handler()` method. + """ + if self._exception_handler is None: + try: + self.default_exception_handler(context) + except Exception: + # Second protection layer for unexpected errors + # in the default implementation, as well as for subclassed + # event loops with overloaded "default_exception_handler". + logger.error('Exception in default exception handler', + exc_info=True) + else: + try: + self._exception_handler(self, context) + except Exception as exc: + # Exception in the user set custom exception handler. + try: + # Let's try default handler. + self.default_exception_handler({ + 'message': 'Unhandled error in exception handler', + 'exception': exc, + 'context': context, + }) + except Exception: + # Guard 'default_exception_handler' in case it's + # overloaded. + logger.error('Exception in default exception handler ' + 'while handling an unexpected error ' + 'in custom exception handler', + exc_info=True) + def _add_callback(self, handle): """Add a Handle to ready or scheduled.""" assert isinstance(handle, events.Handle), 'A Handle is required here' diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 7841ad9..f61c5b7 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -19,10 +19,11 @@ from .log import logger class Handle: """Object returned by callback registration methods.""" - __slots__ = ['_callback', '_args', '_cancelled'] + __slots__ = ['_callback', '_args', '_cancelled', '_loop'] - def __init__(self, callback, args): + def __init__(self, callback, args, loop): assert not isinstance(callback, Handle), 'A Handle is not a callback' + self._loop = loop self._callback = callback self._args = args self._cancelled = False @@ -39,9 +40,14 @@ class Handle: def _run(self): try: self._callback(*self._args) - except Exception: - logger.exception('Exception in callback %s %r', - self._callback, self._args) + except Exception as exc: + msg = 'Exception in callback {}{!r}'.format(self._callback, + self._args) + self._loop.call_exception_handler({ + 'message': msg, + 'exception': exc, + 'handle': self, + }) self = None # Needed to break cycles when an exception occurs. @@ -50,9 +56,9 @@ class TimerHandle(Handle): __slots__ = ['_when'] - def __init__(self, when, callback, args): + def __init__(self, when, callback, args, loop): assert when is not None - super().__init__(callback, args) + super().__init__(callback, args, loop) self._when = when @@ -328,6 +334,17 @@ class AbstractEventLoop: def remove_signal_handler(self, sig): raise NotImplementedError + # Error handlers. + + def set_exception_handler(self, handler): + raise NotImplementedError + + def default_exception_handler(self, context): + raise NotImplementedError + + def call_exception_handler(self, context): + raise NotImplementedError + class AbstractEventLoopPolicy: """Abstract policy for accessing the event loop.""" diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index d09f423..b9cd45c 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -83,9 +83,10 @@ class _TracebackLogger: in a discussion about closing files when they are collected. """ - __slots__ = ['exc', 'tb'] + __slots__ = ['exc', 'tb', 'loop'] - def __init__(self, exc): + def __init__(self, exc, loop): + self.loop = loop self.exc = exc self.tb = None @@ -102,8 +103,11 @@ class _TracebackLogger: def __del__(self): if self.tb: - logger.error('Future/Task exception was never retrieved:\n%s', - ''.join(self.tb)) + msg = 'Future/Task exception was never retrieved:\n{tb}' + context = { + 'message': msg.format(tb=''.join(self.tb)), + } + self.loop.call_exception_handler(context) class Future: @@ -173,8 +177,12 @@ class Future: # has consumed the exception return exc = self._exception - logger.error('Future/Task exception was never retrieved:', - exc_info=(exc.__class__, exc, exc.__traceback__)) + context = { + 'message': 'Future/Task exception was never retrieved', + 'exception': exc, + 'future': self, + } + self._loop.call_exception_handler(context) def cancel(self): """Cancel the future and schedule callbacks. @@ -309,7 +317,7 @@ class Future: if _PY34: self._log_traceback = True else: - self._tb_logger = _TracebackLogger(exception) + self._tb_logger = _TracebackLogger(exception, self._loop) # Arrange for the logger to be activated after all callbacks # have had a chance to call result() or exception(). self._loop.call_soon(self._tb_logger.activate) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 5de4d3d..b2ac632 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -56,7 +56,12 @@ class _ProactorBasePipeTransport(transports.BaseTransport): def _fatal_error(self, exc): if not isinstance(exc, (BrokenPipeError, ConnectionResetError)): - logger.exception('Fatal error for %s', self) + self._loop.call_exception_handler({ + 'message': 'Fatal transport error', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) self._force_close(exc) def _force_close(self, exc): @@ -103,8 +108,13 @@ class _ProactorBasePipeTransport(transports.BaseTransport): self._protocol_paused = True try: self._protocol.pause_writing() - except Exception: - logger.exception('pause_writing() failed') + except Exception as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.pause_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) def _maybe_resume_protocol(self): if (self._protocol_paused and @@ -112,8 +122,13 @@ class _ProactorBasePipeTransport(transports.BaseTransport): self._protocol_paused = False try: self._protocol.resume_writing() - except Exception: - logger.exception('resume_writing() failed') + except Exception as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.resume_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) def set_write_buffer_limits(self, high=None, low=None): if high is None: @@ -465,9 +480,13 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): conn, protocol, extra={'peername': addr}, server=server) f = self._proactor.accept(sock) - except OSError: + except OSError as exc: if sock.fileno() != -1: - logger.exception('Accept failed') + self.call_exception_handler({ + 'message': 'Accept failed', + 'exception': exc, + 'socket': sock, + }) sock.close() except futures.CancelledError: sock.close() diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 10b0257..fb86f82 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -112,7 +112,11 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): # Some platforms (e.g. Linux keep reporting the FD as # ready, so we remove the read handler temporarily. # We'll try again in a while. - logger.exception('Accept out of system resource (%s)', exc) + self.call_exception_handler({ + 'message': 'socket.accept() out of system resource', + 'exception': exc, + 'socket': sock, + }) self.remove_reader(sock.fileno()) self.call_later(constants.ACCEPT_RETRY_DELAY, self._start_serving, @@ -132,7 +136,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def add_reader(self, fd, callback, *args): """Add a reader callback.""" - handle = events.Handle(callback, args) + handle = events.Handle(callback, args, self) try: key = self._selector.get_key(fd) except KeyError: @@ -167,7 +171,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def add_writer(self, fd, callback, *args): """Add a writer callback..""" - handle = events.Handle(callback, args) + handle = events.Handle(callback, args, self) try: key = self._selector.get_key(fd) except KeyError: @@ -364,8 +368,13 @@ class _FlowControlMixin(transports.Transport): self._protocol_paused = True try: self._protocol.pause_writing() - except Exception: - logger.exception('pause_writing() failed') + except Exception as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.pause_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) def _maybe_resume_protocol(self): if (self._protocol_paused and @@ -373,8 +382,13 @@ class _FlowControlMixin(transports.Transport): self._protocol_paused = False try: self._protocol.resume_writing() - except Exception: - logger.exception('resume_writing() failed') + except Exception as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.resume_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) def set_write_buffer_limits(self, high=None, low=None): if high is None: @@ -435,7 +449,12 @@ class _SelectorTransport(_FlowControlMixin, transports.Transport): def _fatal_error(self, exc): # Should be called from exception handler only. if not isinstance(exc, (BrokenPipeError, ConnectionResetError)): - logger.exception('Fatal error for %s', self) + self._loop.call_exception_handler({ + 'message': 'Fatal transport error', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) self._force_close(exc) def _force_close(self, exc): diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py index de2916b..28e5243 100644 --- a/Lib/asyncio/test_utils.py +++ b/Lib/asyncio/test_utils.py @@ -4,6 +4,7 @@ import collections import contextlib import io import os +import re import socket import socketserver import sys @@ -301,7 +302,7 @@ class TestLoop(base_events.BaseEventLoop): raise AssertionError("Time generator is not finished") def add_reader(self, fd, callback, *args): - self.readers[fd] = events.Handle(callback, args) + self.readers[fd] = events.Handle(callback, args, self) def remove_reader(self, fd): self.remove_reader_count[fd] += 1 @@ -320,7 +321,7 @@ class TestLoop(base_events.BaseEventLoop): handle._args, args) def add_writer(self, fd, callback, *args): - self.writers[fd] = events.Handle(callback, args) + self.writers[fd] = events.Handle(callback, args, self) def remove_writer(self, fd): self.remove_writer_count[fd] += 1 @@ -362,3 +363,16 @@ class TestLoop(base_events.BaseEventLoop): def MockCallback(**kwargs): return unittest.mock.Mock(spec=['__call__'], **kwargs) + + +class MockPattern(str): + """A regex based str with a fuzzy __eq__. + + Use this helper with 'mock.assert_called_with', or anywhere + where a regexp comparison between strings is needed. + + For instance: + mock_call.assert_called_with(MockPattern('spam.*ham')) + """ + def __eq__(self, other): + return bool(re.search(str(self), other, re.S)) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index e0d7507..9a40c04 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -65,7 +65,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): except ValueError as exc: raise RuntimeError(str(exc)) - handle = events.Handle(callback, args) + handle = events.Handle(callback, args, self) self._signal_handlers[sig] = handle try: @@ -294,7 +294,12 @@ class _UnixReadPipeTransport(transports.ReadTransport): def _fatal_error(self, exc): # should be called by exception handler only if not (isinstance(exc, OSError) and exc.errno == errno.EIO): - logger.exception('Fatal error for %s', self) + self._loop.call_exception_handler({ + 'message': 'Fatal transport error', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) self._close(exc) def _close(self, exc): @@ -441,7 +446,12 @@ class _UnixWritePipeTransport(selector_events._FlowControlMixin, def _fatal_error(self, exc): # should be called by exception handler only if not isinstance(exc, (BrokenPipeError, ConnectionResetError)): - logger.exception('Fatal error for %s', self) + self._loop.call_exception_handler({ + 'message': 'Fatal transport error', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) self._close(exc) def _close(self, exc=None): @@ -582,8 +592,14 @@ class BaseChildWatcher(AbstractChildWatcher): def _sig_chld(self): try: self._do_waitpid_all() - except Exception: - logger.exception('Unknown exception in SIGCHLD handler') + except Exception as exc: + # self._loop should always be available here + # as '_sig_chld' is added as a signal handler + # in 'attach_loop' + self._loop.call_exception_handler({ + 'message': 'Unknown exception in SIGCHLD handler', + 'exception': exc, + }) def _compute_returncode(self, status): if os.WIFSIGNALED(status): diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 0a2d981..c667a1c 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -156,9 +156,13 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): if pipe is None: return f = self._proactor.accept_pipe(pipe) - except OSError: + except OSError as exc: if pipe and pipe.fileno() != -1: - logger.exception('Pipe accept failed') + self.call_exception_handler({ + 'message': 'Pipe accept failed', + 'exception': exc, + 'pipe': pipe, + }) pipe.close() except futures.CancelledError: if pipe: |