summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/base_events.py96
-rw-r--r--Lib/asyncio/events.py31
-rw-r--r--Lib/asyncio/futures.py22
-rw-r--r--Lib/asyncio/proactor_events.py33
-rw-r--r--Lib/asyncio/selector_events.py35
-rw-r--r--Lib/asyncio/test_utils.py18
-rw-r--r--Lib/asyncio/unix_events.py26
-rw-r--r--Lib/asyncio/windows_events.py8
8 files changed, 229 insertions, 40 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index b74e936..cb2499d 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -122,6 +122,7 @@ class BaseEventLoop(events.AbstractEventLoop):
self._internal_fds = 0
self._running = False
self._clock_resolution = time.get_clock_info('monotonic').resolution
+ self._exception_handler = None
def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None):
@@ -254,7 +255,7 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Like call_later(), but uses an absolute time."""
if tasks.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_at()")
- timer = events.TimerHandle(when, callback, args)
+ timer = events.TimerHandle(when, callback, args, self)
heapq.heappush(self._scheduled, timer)
return timer
@@ -270,7 +271,7 @@ class BaseEventLoop(events.AbstractEventLoop):
"""
if tasks.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_soon()")
- handle = events.Handle(callback, args)
+ handle = events.Handle(callback, args, self)
self._ready.append(handle)
return handle
@@ -625,6 +626,97 @@ class BaseEventLoop(events.AbstractEventLoop):
protocol, popen_args, False, stdin, stdout, stderr, bufsize, **kwargs)
return transport, protocol
+ def set_exception_handler(self, handler):
+ """Set handler as the new event loop exception handler.
+
+ If handler is None, the default exception handler will
+ be set.
+
+ If handler is a callable object, it should have a
+ matching signature to '(loop, context)', where 'loop'
+ will be a reference to the active event loop, 'context'
+ will be a dict object (see `call_exception_handler()`
+ documentation for details about context).
+ """
+ if handler is not None and not callable(handler):
+ raise TypeError('A callable object or None is expected, '
+ 'got {!r}'.format(handler))
+ self._exception_handler = handler
+
+ def default_exception_handler(self, context):
+ """Default exception handler.
+
+ This is called when an exception occurs and no exception
+ handler is set, and can be called by a custom exception
+ handler that wants to defer to the default behavior.
+
+ context parameter has the same meaning as in
+ `call_exception_handler()`.
+ """
+ message = context.get('message')
+ if not message:
+ message = 'Unhandled exception in event loop'
+
+ exception = context.get('exception')
+ if exception is not None:
+ exc_info = (type(exception), exception, exception.__traceback__)
+ else:
+ exc_info = False
+
+ log_lines = [message]
+ for key in sorted(context):
+ if key in {'message', 'exception'}:
+ continue
+ log_lines.append('{}: {!r}'.format(key, context[key]))
+
+ logger.error('\n'.join(log_lines), exc_info=exc_info)
+
+ def call_exception_handler(self, context):
+ """Call the current event loop exception handler.
+
+ context is a dict object containing the following keys
+ (new keys maybe introduced later):
+ - 'message': Error message;
+ - 'exception' (optional): Exception object;
+ - 'future' (optional): Future instance;
+ - 'handle' (optional): Handle instance;
+ - 'protocol' (optional): Protocol instance;
+ - 'transport' (optional): Transport instance;
+ - 'socket' (optional): Socket instance.
+
+ Note: this method should not be overloaded in subclassed
+ event loops. For any custom exception handling, use
+ `set_exception_handler()` method.
+ """
+ if self._exception_handler is None:
+ try:
+ self.default_exception_handler(context)
+ except Exception:
+ # Second protection layer for unexpected errors
+ # in the default implementation, as well as for subclassed
+ # event loops with overloaded "default_exception_handler".
+ logger.error('Exception in default exception handler',
+ exc_info=True)
+ else:
+ try:
+ self._exception_handler(self, context)
+ except Exception as exc:
+ # Exception in the user set custom exception handler.
+ try:
+ # Let's try default handler.
+ self.default_exception_handler({
+ 'message': 'Unhandled error in exception handler',
+ 'exception': exc,
+ 'context': context,
+ })
+ except Exception:
+ # Guard 'default_exception_handler' in case it's
+ # overloaded.
+ logger.error('Exception in default exception handler '
+ 'while handling an unexpected error '
+ 'in custom exception handler',
+ exc_info=True)
+
def _add_callback(self, handle):
"""Add a Handle to ready or scheduled."""
assert isinstance(handle, events.Handle), 'A Handle is required here'
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 7841ad9..f61c5b7 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -19,10 +19,11 @@ from .log import logger
class Handle:
"""Object returned by callback registration methods."""
- __slots__ = ['_callback', '_args', '_cancelled']
+ __slots__ = ['_callback', '_args', '_cancelled', '_loop']
- def __init__(self, callback, args):
+ def __init__(self, callback, args, loop):
assert not isinstance(callback, Handle), 'A Handle is not a callback'
+ self._loop = loop
self._callback = callback
self._args = args
self._cancelled = False
@@ -39,9 +40,14 @@ class Handle:
def _run(self):
try:
self._callback(*self._args)
- except Exception:
- logger.exception('Exception in callback %s %r',
- self._callback, self._args)
+ except Exception as exc:
+ msg = 'Exception in callback {}{!r}'.format(self._callback,
+ self._args)
+ self._loop.call_exception_handler({
+ 'message': msg,
+ 'exception': exc,
+ 'handle': self,
+ })
self = None # Needed to break cycles when an exception occurs.
@@ -50,9 +56,9 @@ class TimerHandle(Handle):
__slots__ = ['_when']
- def __init__(self, when, callback, args):
+ def __init__(self, when, callback, args, loop):
assert when is not None
- super().__init__(callback, args)
+ super().__init__(callback, args, loop)
self._when = when
@@ -328,6 +334,17 @@ class AbstractEventLoop:
def remove_signal_handler(self, sig):
raise NotImplementedError
+ # Error handlers.
+
+ def set_exception_handler(self, handler):
+ raise NotImplementedError
+
+ def default_exception_handler(self, context):
+ raise NotImplementedError
+
+ def call_exception_handler(self, context):
+ raise NotImplementedError
+
class AbstractEventLoopPolicy:
"""Abstract policy for accessing the event loop."""
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index d09f423..b9cd45c 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -83,9 +83,10 @@ class _TracebackLogger:
in a discussion about closing files when they are collected.
"""
- __slots__ = ['exc', 'tb']
+ __slots__ = ['exc', 'tb', 'loop']
- def __init__(self, exc):
+ def __init__(self, exc, loop):
+ self.loop = loop
self.exc = exc
self.tb = None
@@ -102,8 +103,11 @@ class _TracebackLogger:
def __del__(self):
if self.tb:
- logger.error('Future/Task exception was never retrieved:\n%s',
- ''.join(self.tb))
+ msg = 'Future/Task exception was never retrieved:\n{tb}'
+ context = {
+ 'message': msg.format(tb=''.join(self.tb)),
+ }
+ self.loop.call_exception_handler(context)
class Future:
@@ -173,8 +177,12 @@ class Future:
# has consumed the exception
return
exc = self._exception
- logger.error('Future/Task exception was never retrieved:',
- exc_info=(exc.__class__, exc, exc.__traceback__))
+ context = {
+ 'message': 'Future/Task exception was never retrieved',
+ 'exception': exc,
+ 'future': self,
+ }
+ self._loop.call_exception_handler(context)
def cancel(self):
"""Cancel the future and schedule callbacks.
@@ -309,7 +317,7 @@ class Future:
if _PY34:
self._log_traceback = True
else:
- self._tb_logger = _TracebackLogger(exception)
+ self._tb_logger = _TracebackLogger(exception, self._loop)
# Arrange for the logger to be activated after all callbacks
# have had a chance to call result() or exception().
self._loop.call_soon(self._tb_logger.activate)
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index 5de4d3d..b2ac632 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -56,7 +56,12 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
def _fatal_error(self, exc):
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
- logger.exception('Fatal error for %s', self)
+ self._loop.call_exception_handler({
+ 'message': 'Fatal transport error',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
self._force_close(exc)
def _force_close(self, exc):
@@ -103,8 +108,13 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
self._protocol_paused = True
try:
self._protocol.pause_writing()
- except Exception:
- logger.exception('pause_writing() failed')
+ except Exception as exc:
+ self._loop.call_exception_handler({
+ 'message': 'protocol.pause_writing() failed',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
def _maybe_resume_protocol(self):
if (self._protocol_paused and
@@ -112,8 +122,13 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
self._protocol_paused = False
try:
self._protocol.resume_writing()
- except Exception:
- logger.exception('resume_writing() failed')
+ except Exception as exc:
+ self._loop.call_exception_handler({
+ 'message': 'protocol.resume_writing() failed',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
def set_write_buffer_limits(self, high=None, low=None):
if high is None:
@@ -465,9 +480,13 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
conn, protocol,
extra={'peername': addr}, server=server)
f = self._proactor.accept(sock)
- except OSError:
+ except OSError as exc:
if sock.fileno() != -1:
- logger.exception('Accept failed')
+ self.call_exception_handler({
+ 'message': 'Accept failed',
+ 'exception': exc,
+ 'socket': sock,
+ })
sock.close()
except futures.CancelledError:
sock.close()
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 10b0257..fb86f82 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -112,7 +112,11 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
# Some platforms (e.g. Linux keep reporting the FD as
# ready, so we remove the read handler temporarily.
# We'll try again in a while.
- logger.exception('Accept out of system resource (%s)', exc)
+ self.call_exception_handler({
+ 'message': 'socket.accept() out of system resource',
+ 'exception': exc,
+ 'socket': sock,
+ })
self.remove_reader(sock.fileno())
self.call_later(constants.ACCEPT_RETRY_DELAY,
self._start_serving,
@@ -132,7 +136,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def add_reader(self, fd, callback, *args):
"""Add a reader callback."""
- handle = events.Handle(callback, args)
+ handle = events.Handle(callback, args, self)
try:
key = self._selector.get_key(fd)
except KeyError:
@@ -167,7 +171,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def add_writer(self, fd, callback, *args):
"""Add a writer callback.."""
- handle = events.Handle(callback, args)
+ handle = events.Handle(callback, args, self)
try:
key = self._selector.get_key(fd)
except KeyError:
@@ -364,8 +368,13 @@ class _FlowControlMixin(transports.Transport):
self._protocol_paused = True
try:
self._protocol.pause_writing()
- except Exception:
- logger.exception('pause_writing() failed')
+ except Exception as exc:
+ self._loop.call_exception_handler({
+ 'message': 'protocol.pause_writing() failed',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
def _maybe_resume_protocol(self):
if (self._protocol_paused and
@@ -373,8 +382,13 @@ class _FlowControlMixin(transports.Transport):
self._protocol_paused = False
try:
self._protocol.resume_writing()
- except Exception:
- logger.exception('resume_writing() failed')
+ except Exception as exc:
+ self._loop.call_exception_handler({
+ 'message': 'protocol.resume_writing() failed',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
def set_write_buffer_limits(self, high=None, low=None):
if high is None:
@@ -435,7 +449,12 @@ class _SelectorTransport(_FlowControlMixin, transports.Transport):
def _fatal_error(self, exc):
# Should be called from exception handler only.
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
- logger.exception('Fatal error for %s', self)
+ self._loop.call_exception_handler({
+ 'message': 'Fatal transport error',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
self._force_close(exc)
def _force_close(self, exc):
diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py
index de2916b..28e5243 100644
--- a/Lib/asyncio/test_utils.py
+++ b/Lib/asyncio/test_utils.py
@@ -4,6 +4,7 @@ import collections
import contextlib
import io
import os
+import re
import socket
import socketserver
import sys
@@ -301,7 +302,7 @@ class TestLoop(base_events.BaseEventLoop):
raise AssertionError("Time generator is not finished")
def add_reader(self, fd, callback, *args):
- self.readers[fd] = events.Handle(callback, args)
+ self.readers[fd] = events.Handle(callback, args, self)
def remove_reader(self, fd):
self.remove_reader_count[fd] += 1
@@ -320,7 +321,7 @@ class TestLoop(base_events.BaseEventLoop):
handle._args, args)
def add_writer(self, fd, callback, *args):
- self.writers[fd] = events.Handle(callback, args)
+ self.writers[fd] = events.Handle(callback, args, self)
def remove_writer(self, fd):
self.remove_writer_count[fd] += 1
@@ -362,3 +363,16 @@ class TestLoop(base_events.BaseEventLoop):
def MockCallback(**kwargs):
return unittest.mock.Mock(spec=['__call__'], **kwargs)
+
+
+class MockPattern(str):
+ """A regex based str with a fuzzy __eq__.
+
+ Use this helper with 'mock.assert_called_with', or anywhere
+ where a regexp comparison between strings is needed.
+
+ For instance:
+ mock_call.assert_called_with(MockPattern('spam.*ham'))
+ """
+ def __eq__(self, other):
+ return bool(re.search(str(self), other, re.S))
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index e0d7507..9a40c04 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -65,7 +65,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
except ValueError as exc:
raise RuntimeError(str(exc))
- handle = events.Handle(callback, args)
+ handle = events.Handle(callback, args, self)
self._signal_handlers[sig] = handle
try:
@@ -294,7 +294,12 @@ class _UnixReadPipeTransport(transports.ReadTransport):
def _fatal_error(self, exc):
# should be called by exception handler only
if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
- logger.exception('Fatal error for %s', self)
+ self._loop.call_exception_handler({
+ 'message': 'Fatal transport error',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
self._close(exc)
def _close(self, exc):
@@ -441,7 +446,12 @@ class _UnixWritePipeTransport(selector_events._FlowControlMixin,
def _fatal_error(self, exc):
# should be called by exception handler only
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
- logger.exception('Fatal error for %s', self)
+ self._loop.call_exception_handler({
+ 'message': 'Fatal transport error',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
self._close(exc)
def _close(self, exc=None):
@@ -582,8 +592,14 @@ class BaseChildWatcher(AbstractChildWatcher):
def _sig_chld(self):
try:
self._do_waitpid_all()
- except Exception:
- logger.exception('Unknown exception in SIGCHLD handler')
+ except Exception as exc:
+ # self._loop should always be available here
+ # as '_sig_chld' is added as a signal handler
+ # in 'attach_loop'
+ self._loop.call_exception_handler({
+ 'message': 'Unknown exception in SIGCHLD handler',
+ 'exception': exc,
+ })
def _compute_returncode(self, status):
if os.WIFSIGNALED(status):
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index 0a2d981..c667a1c 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -156,9 +156,13 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
if pipe is None:
return
f = self._proactor.accept_pipe(pipe)
- except OSError:
+ except OSError as exc:
if pipe and pipe.fileno() != -1:
- logger.exception('Pipe accept failed')
+ self.call_exception_handler({
+ 'message': 'Pipe accept failed',
+ 'exception': exc,
+ 'pipe': pipe,
+ })
pipe.close()
except futures.CancelledError:
if pipe: