summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-07-12 01:11:53 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2014-07-12 01:11:53 (GMT)
commite912e652f85bfd92d7209aa0cb23e5d3975a8d72 (patch)
tree9bd5327f26baf131749e46842ea038690914e369
parent8ebeb03740dad4d9edd65de88f82840a05070941 (diff)
downloadcpython-e912e652f85bfd92d7209aa0cb23e5d3975a8d72.zip
cpython-e912e652f85bfd92d7209aa0cb23e5d3975a8d72.tar.gz
cpython-e912e652f85bfd92d7209aa0cb23e5d3975a8d72.tar.bz2
asyncio: sync with Tulip
* Tulip issue #183: log socket events in debug mode - Log most important socket events: socket connected, new client, connection reset or closed by peer (EOF), etc. - Log time elapsed in DNS resolution (getaddrinfo) - Log pause/resume reading - Log time of SSL handshake - Log SSL handshake errors - Add a __repr__() method to many classes * Fix ProactorEventLoop() in debug mode. ProactorEventLoop._make_self_pipe() doesn't call call_soon() directly because it checks for the current loop which fails, because the method is called to build the event loop. * Cleanup _ProactorReadPipeTransport constructor. Not need to set again _read_fut attribute to None, it is already done in the base class.
-rw-r--r--Lib/asyncio/base_events.py56
-rw-r--r--Lib/asyncio/proactor_events.py36
-rw-r--r--Lib/asyncio/selector_events.py89
-rw-r--r--Lib/asyncio/unix_events.py36
-rw-r--r--Lib/asyncio/windows_events.py12
-rw-r--r--Lib/test/test_asyncio/test_proactor_events.py7
-rw-r--r--Lib/test/test_asyncio/test_selector_events.py12
7 files changed, 219 insertions, 29 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index cab4462..e5683fd 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -94,6 +94,9 @@ class Server(events.AbstractServer):
self._active_count = 0
self._waiters = []
+ def __repr__(self):
+ return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
+
def _attach(self):
assert self.sockets is not None
self._active_count += 1
@@ -110,8 +113,6 @@ class Server(events.AbstractServer):
return
self.sockets = None
for sock in sockets:
- # closing sockets will call asynchronously the _detach() method
- # which calls _wakeup() for the last socket
self._loop._stop_serving(sock)
if self._active_count == 0:
self._wakeup()
@@ -276,6 +277,8 @@ class BaseEventLoop(events.AbstractEventLoop):
raise RuntimeError("cannot close a running event loop")
if self._closed:
return
+ if self._debug:
+ logger.debug("Close %r", self)
self._closed = True
self._ready.clear()
self._scheduled.clear()
@@ -402,10 +405,39 @@ class BaseEventLoop(events.AbstractEventLoop):
def set_default_executor(self, executor):
self._default_executor = executor
+ def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
+ msg = ["%s:%r" % (host, port)]
+ if family:
+ msg.append('family=%r' % family)
+ if type:
+ msg.append('type=%r' % type)
+ if proto:
+ msg.append('proto=%r' % proto)
+ if flags:
+ msg.append('flags=%r' % flags)
+ msg = ', '.join(msg)
+ logger.debug('Get addresss info %s', msg)
+
+ t0 = self.time()
+ addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
+ dt = self.time() - t0
+
+ msg = ('Getting addresss info %s took %.3f ms: %r'
+ % (msg, dt * 1e3, addrinfo))
+ if dt >= self.slow_callback_duration:
+ logger.info(msg)
+ else:
+ logger.debug(msg)
+ return addrinfo
+
def getaddrinfo(self, host, port, *,
family=0, type=0, proto=0, flags=0):
- return self.run_in_executor(None, socket.getaddrinfo,
- host, port, family, type, proto, flags)
+ if self._debug:
+ return self.run_in_executor(None, self._getaddrinfo_debug,
+ host, port, family, type, proto, flags)
+ else:
+ return self.run_in_executor(None, socket.getaddrinfo,
+ host, port, family, type, proto, flags)
def getnameinfo(self, sockaddr, flags=0):
return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
@@ -492,6 +524,8 @@ class BaseEventLoop(events.AbstractEventLoop):
sock.close()
sock = None
continue
+ if self._debug:
+ logger.debug("connect %r to %r", sock, address)
yield from self.sock_connect(sock, address)
except OSError as exc:
if sock is not None:
@@ -524,6 +558,9 @@ class BaseEventLoop(events.AbstractEventLoop):
transport, protocol = yield from self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname)
+ if self._debug:
+ logger.debug("connected to %s:%r: (%r, %r)",
+ host, port, transport, protocol)
return transport, protocol
@coroutine
@@ -614,6 +651,15 @@ class BaseEventLoop(events.AbstractEventLoop):
waiter = futures.Future(loop=self)
transport = self._make_datagram_transport(sock, protocol, r_addr,
waiter)
+ if self._debug:
+ if local_addr:
+ logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
+ "created: (%r, %r)",
+ local_addr, remote_addr, transport, protocol)
+ else:
+ logger.debug("Datagram endpoint remote_addr=%r created: "
+ "(%r, %r)",
+ remote_addr, transport, protocol)
yield from waiter
return transport, protocol
@@ -694,6 +740,8 @@ class BaseEventLoop(events.AbstractEventLoop):
sock.listen(backlog)
sock.setblocking(False)
self._start_serving(protocol_factory, sock, ssl, server)
+ if self._debug:
+ logger.info("%r is serving", server)
return server
@coroutine
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index d0b601d..d09e9faa1 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -41,6 +41,23 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
# wait until protocol.connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
+ def __repr__(self):
+ info = [self.__class__.__name__, 'fd=%s' % self._sock.fileno()]
+ if self._read_fut is not None:
+ ov = "pending" if self._read_fut.ov.pending else "completed"
+ info.append('read=%s' % ov)
+ if self._write_fut is not None:
+ if self._write_fut.ov.pending:
+ info.append("write=pending=%s" % self._pending_write)
+ else:
+ info.append("write=completed")
+ if self._buffer:
+ bufsize = len(self._buffer)
+ info.append('write_bufsize=%s' % bufsize)
+ if self._eof_written:
+ info.append('EOF written')
+ return '<%s>' % ' '.join(info)
+
def _set_extra(self, sock):
self._extra['pipe'] = sock
@@ -55,7 +72,10 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._read_fut.cancel()
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
- if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+ if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+ if self._loop.get_debug():
+ logger.debug("%r: %s", self, message, exc_info=True)
+ else:
self._loop.call_exception_handler({
'message': message,
'exception': exc,
@@ -108,7 +128,6 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
super().__init__(loop, sock, protocol, waiter, extra, server)
- self._read_fut = None
self._paused = False
self._loop.call_soon(self._loop_reading)
@@ -118,6 +137,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
+ if self._loop.get_debug():
+ logger.debug("%r pauses reading", self)
def resume_reading(self):
if not self._paused:
@@ -126,6 +147,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if self._closing:
return
self._loop.call_soon(self._loop_reading, self._read_fut)
+ if self._loop.get_debug():
+ logger.debug("%r resumes reading", self)
def _loop_reading(self, fut=None):
if self._paused:
@@ -166,6 +189,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if data:
self._protocol.data_received(data)
elif data is not None:
+ if self._loop.get_debug():
+ logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if not keep_open:
self.close()
@@ -401,7 +426,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
self._ssock.setblocking(False)
self._csock.setblocking(False)
self._internal_fds += 1
- self.call_soon(self._loop_self_reading)
+ # don't check the current loop because _make_self_pipe() is called
+ # from the event loop constructor
+ self._call_soon(self._loop_self_reading, (), check_loop=False)
def _loop_self_reading(self, f=None):
try:
@@ -426,6 +453,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
try:
if f is not None:
conn, addr = f.result()
+ if self._debug:
+ logger.debug("%r got a new connection from %r: %r",
+ server, addr, conn)
protocol = protocol_factory()
self._make_socket_transport(
conn, protocol,
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index b965046..d79c080 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -23,6 +23,17 @@ from . import transports
from .log import logger
+def _test_selector_event(selector, fd, event):
+ # Test if the selector is monitoring 'event' events
+ # for the file descriptor 'fd'.
+ try:
+ key = selector.get_key(fd)
+ except KeyError:
+ return False
+ else:
+ return bool(key.events & event)
+
+
class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""Selector event loop.
@@ -116,6 +127,9 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
sslcontext=None, server=None):
try:
conn, addr = sock.accept()
+ if self._debug:
+ logger.debug("%r got a new connection from %r: %r",
+ server, addr, conn)
conn.setblocking(False)
except (BlockingIOError, InterruptedError, ConnectionAbortedError):
pass # False alarm.
@@ -419,6 +433,26 @@ class _SelectorTransport(transports._FlowControlMixin,
if self._server is not None:
self._server._attach()
+ def __repr__(self):
+ info = [self.__class__.__name__, 'fd=%s' % self._sock_fd]
+ polling = _test_selector_event(self._loop._selector,
+ self._sock_fd, selectors.EVENT_READ)
+ if polling:
+ info.append('read=polling')
+ else:
+ info.append('read=idle')
+
+ polling = _test_selector_event(self._loop._selector,
+ self._sock_fd, selectors.EVENT_WRITE)
+ if polling:
+ state = 'polling'
+ else:
+ state = 'idle'
+
+ bufsize = self.get_write_buffer_size()
+ info.append('write=<%s, bufsize=%s>' % (state, bufsize))
+ return '<%s>' % ' '.join(info)
+
def abort(self):
self._force_close(None)
@@ -433,7 +467,10 @@ class _SelectorTransport(transports._FlowControlMixin,
def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only.
- if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+ if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+ if self._loop.get_debug():
+ logger.debug("%r: %s", self, message, exc_info=True)
+ else:
self._loop.call_exception_handler({
'message': message,
'exception': exc,
@@ -492,6 +529,8 @@ class _SelectorSocketTransport(_SelectorTransport):
raise RuntimeError('Already paused')
self._paused = True
self._loop.remove_reader(self._sock_fd)
+ if self._loop.get_debug():
+ logger.debug("%r pauses reading", self)
def resume_reading(self):
if not self._paused:
@@ -500,6 +539,8 @@ class _SelectorSocketTransport(_SelectorTransport):
if self._closing:
return
self._loop.add_reader(self._sock_fd, self._read_ready)
+ if self._loop.get_debug():
+ logger.debug("%r resumes reading", self)
def _read_ready(self):
try:
@@ -512,6 +553,8 @@ class _SelectorSocketTransport(_SelectorTransport):
if data:
self._protocol.data_received(data)
else:
+ if self._loop.get_debug():
+ logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if keep_open:
# We're keeping the connection open so the
@@ -638,31 +681,37 @@ class _SelectorSslTransport(_SelectorTransport):
# SSL-specific extra info. (peercert is set later)
self._extra.update(sslcontext=sslcontext)
- self._on_handshake()
+ if self._loop.get_debug():
+ logger.debug("%r starts SSL handshake", self)
+ start_time = self._loop.time()
+ else:
+ start_time = None
+ self._on_handshake(start_time)
- def _on_handshake(self):
+ def _on_handshake(self, start_time):
try:
self._sock.do_handshake()
except ssl.SSLWantReadError:
- self._loop.add_reader(self._sock_fd, self._on_handshake)
+ self._loop.add_reader(self._sock_fd,
+ self._on_handshake, start_time)
return
except ssl.SSLWantWriteError:
- self._loop.add_writer(self._sock_fd, self._on_handshake)
- return
- except Exception as exc:
- self._loop.remove_reader(self._sock_fd)
- self._loop.remove_writer(self._sock_fd)
- self._sock.close()
- if self._waiter is not None:
- self._waiter.set_exception(exc)
+ self._loop.add_writer(self._sock_fd,
+ self._on_handshake, start_time)
return
except BaseException as exc:
+ if self._loop.get_debug():
+ logger.warning("%r: SSL handshake failed",
+ self, exc_info=True)
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
self._sock.close()
if self._waiter is not None:
self._waiter.set_exception(exc)
- raise
+ if isinstance(exc, Exception):
+ return
+ else:
+ raise
self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
@@ -676,6 +725,10 @@ class _SelectorSslTransport(_SelectorTransport):
try:
ssl.match_hostname(peercert, self._server_hostname)
except Exception as exc:
+ if self._loop.get_debug():
+ logger.warning("%r: SSL handshake failed "
+ "on matching the hostname",
+ self, exc_info=True)
self._sock.close()
if self._waiter is not None:
self._waiter.set_exception(exc)
@@ -696,6 +749,10 @@ class _SelectorSslTransport(_SelectorTransport):
self._loop.call_soon(self._waiter._set_result_unless_cancelled,
None)
+ if self._loop.get_debug():
+ dt = self._loop.time() - start_time
+ logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
+
def pause_reading(self):
# XXX This is a bit icky, given the comment at the top of
# _read_ready(). Is it possible to evoke a deadlock? I don't
@@ -709,6 +766,8 @@ class _SelectorSslTransport(_SelectorTransport):
raise RuntimeError('Already paused')
self._paused = True
self._loop.remove_reader(self._sock_fd)
+ if self._loop.get_debug():
+ logger.debug("%r pauses reading", self)
def resume_reading(self):
if not self._paused:
@@ -717,6 +776,8 @@ class _SelectorSslTransport(_SelectorTransport):
if self._closing:
return
self._loop.add_reader(self._sock_fd, self._read_ready)
+ if self._loop.get_debug():
+ logger.debug("%r resumes reading", self)
def _read_ready(self):
if self._write_wants_read:
@@ -741,6 +802,8 @@ class _SelectorSslTransport(_SelectorTransport):
self._protocol.data_received(data)
else:
try:
+ if self._loop.get_debug():
+ logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if keep_open:
logger.warning('returning true from eof_received() '
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 764e719..09b875c 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -16,6 +16,7 @@ from . import base_subprocess
from . import constants
from . import events
from . import selector_events
+from . import selectors
from . import transports
from .coroutines import coroutine
from .log import logger
@@ -272,6 +273,20 @@ class _UnixReadPipeTransport(transports.ReadTransport):
# wait until protocol.connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
+ def __repr__(self):
+ info = [self.__class__.__name__, 'fd=%s' % self._fileno]
+ if self._pipe is not None:
+ polling = selector_events._test_selector_event(
+ self._loop._selector,
+ self._fileno, selectors.EVENT_READ)
+ if polling:
+ info.append('polling')
+ else:
+ info.append('idle')
+ else:
+ info.append('closed')
+ return '<%s>' % ' '.join(info)
+
def _read_ready(self):
try:
data = os.read(self._fileno, self.max_size)
@@ -283,6 +298,8 @@ class _UnixReadPipeTransport(transports.ReadTransport):
if data:
self._protocol.data_received(data)
else:
+ if self._loop.get_debug():
+ logger.info("%r was closed by peer", self)
self._closing = True
self._loop.remove_reader(self._fileno)
self._loop.call_soon(self._protocol.eof_received)
@@ -357,11 +374,30 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
# wait until protocol.connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
+ def __repr__(self):
+ info = [self.__class__.__name__, 'fd=%s' % self._fileno]
+ if self._pipe is not None:
+ polling = selector_events._test_selector_event(
+ self._loop._selector,
+ self._fileno, selectors.EVENT_WRITE)
+ if polling:
+ info.append('polling')
+ else:
+ info.append('idle')
+
+ bufsize = self.get_write_buffer_size()
+ info.append('bufsize=%s' % bufsize)
+ else:
+ info.append('closed')
+ return '<%s>' % ' '.join(info)
+
def get_write_buffer_size(self):
return sum(len(data) for data in self._buffer)
def _read_ready(self):
# Pipe was closed by peer.
+ if self._loop.get_debug():
+ logger.info("%r was closed by peer", self)
if self._buffer:
self._close(BrokenPipeError())
else:
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index 93b71b2..9d86c96 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -40,6 +40,18 @@ class _OverlappedFuture(futures.Future):
super().__init__(loop=loop)
self.ov = ov
+ def __repr__(self):
+ info = [self._state.lower()]
+ if self.ov.pending:
+ info.append('overlapped=pending')
+ else:
+ info.append('overlapped=completed')
+ if self._state == futures._FINISHED:
+ info.append(self._format_result())
+ if self._callbacks:
+ info.append(self._format_callbacks())
+ return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
+
def cancel(self):
try:
self.ov.cancel()
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
index ddfceae..4bb4f0b 100644
--- a/Lib/test/test_asyncio/test_proactor_events.py
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -358,16 +358,17 @@ class BaseProactorEventLoopTests(test_utils.TestCase):
self.loop = EventLoop(self.proactor)
self.set_event_loop(self.loop, cleanup=False)
- @mock.patch.object(BaseProactorEventLoop, 'call_soon')
+ @mock.patch.object(BaseProactorEventLoop, '_call_soon')
@mock.patch.object(BaseProactorEventLoop, '_socketpair')
- def test_ctor(self, socketpair, call_soon):
+ def test_ctor(self, socketpair, _call_soon):
ssock, csock = socketpair.return_value = (
mock.Mock(), mock.Mock())
loop = BaseProactorEventLoop(self.proactor)
self.assertIs(loop._ssock, ssock)
self.assertIs(loop._csock, csock)
self.assertEqual(loop._internal_fds, 1)
- call_soon.assert_called_with(loop._loop_self_reading)
+ _call_soon.assert_called_with(loop._loop_self_reading, (),
+ check_loop=False)
def test_close_self_pipe(self):
self.loop._close_self_pipe()
diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py
index 35efab9..5186931 100644
--- a/Lib/test/test_asyncio/test_selector_events.py
+++ b/Lib/test/test_asyncio/test_selector_events.py
@@ -1092,15 +1092,15 @@ class SelectorSslTransportTests(test_utils.TestCase):
self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError
transport = _SelectorSslTransport(
self.loop, self.sock, self.protocol, self.sslcontext)
- transport._on_handshake()
- self.loop.assert_reader(1, transport._on_handshake)
+ transport._on_handshake(None)
+ self.loop.assert_reader(1, transport._on_handshake, None)
def test_on_handshake_writer_retry(self):
self.sslsock.do_handshake.side_effect = ssl.SSLWantWriteError
transport = _SelectorSslTransport(
self.loop, self.sock, self.protocol, self.sslcontext)
- transport._on_handshake()
- self.loop.assert_writer(1, transport._on_handshake)
+ transport._on_handshake(None)
+ self.loop.assert_writer(1, transport._on_handshake, None)
def test_on_handshake_exc(self):
exc = ValueError()
@@ -1108,7 +1108,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport = _SelectorSslTransport(
self.loop, self.sock, self.protocol, self.sslcontext)
transport._waiter = asyncio.Future(loop=self.loop)
- transport._on_handshake()
+ transport._on_handshake(None)
self.assertTrue(self.sslsock.close.called)
self.assertTrue(transport._waiter.done())
self.assertIs(exc, transport._waiter.exception())
@@ -1119,7 +1119,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport._waiter = asyncio.Future(loop=self.loop)
exc = BaseException()
self.sslsock.do_handshake.side_effect = exc
- self.assertRaises(BaseException, transport._on_handshake)
+ self.assertRaises(BaseException, transport._on_handshake, None)
self.assertTrue(self.sslsock.close.called)
self.assertTrue(transport._waiter.done())
self.assertIs(exc, transport._waiter.exception())