diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-08-25 21:22:54 (GMT) |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-08-25 21:22:54 (GMT) |
commit | 83b9ea4942692efcb8d3eeab200c62b6a98208fb (patch) | |
tree | 43536d3ed3243be929f36f76a2db88656ef753ac | |
parent | 3597befd68b3307d58765863544effcff75386a9 (diff) | |
parent | b261475a48d905f160bc1f499e90b995b0d0b6c0 (diff) | |
download | cpython-83b9ea4942692efcb8d3eeab200c62b6a98208fb.zip cpython-83b9ea4942692efcb8d3eeab200c62b6a98208fb.tar.gz cpython-83b9ea4942692efcb8d3eeab200c62b6a98208fb.tar.bz2 |
(Merge 3.4) asyncio: sync with Tulip
* PipeServer.close() now cancels the "accept pipe" future which cancels the
overlapped operation.
* Fix _SelectorTransport.__repr__() if the transport was closed
* Fix debug log in BaseEventLoop.create_connection(): get the socket object
from the transport because SSL transport closes the old socket and creates a
new SSL socket object. Remove also the _SelectorSslTransport._rawsock
attribute: it contained the closed socket (not very useful) and it was not
used.
* Issue #22063: socket operations (sock_recv, sock_sendall, sock_connect,
sock_accept) of the proactor event loop don't raise an exception in debug
mode if the socket are in blocking mode. Overlapped operations also work on
blocking sockets.
* Fix unit tests in debug mode: mock a non-blocking socket for socket
operations which now raise an exception if the socket is blocking.
* _fatal_error() method of _UnixReadPipeTransport and _UnixWritePipeTransport
now log all exceptions in debug mode
* Don't log expected errors in unit tests
* Tulip issue 200: _WaitHandleFuture._unregister_wait() now catchs and logs
exceptions.
* Tulip issue 200: Log errors in debug mode instead of simply ignoring them.
-rw-r--r-- | Lib/asyncio/base_events.py | 7 | ||||
-rw-r--r-- | Lib/asyncio/proactor_events.py | 24 | ||||
-rw-r--r-- | Lib/asyncio/selector_events.py | 31 | ||||
-rw-r--r-- | Lib/asyncio/test_utils.py | 6 | ||||
-rw-r--r-- | Lib/asyncio/unix_events.py | 14 | ||||
-rw-r--r-- | Lib/asyncio/windows_events.py | 31 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_base_events.py | 3 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_events.py | 36 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_selector_events.py | 24 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_subprocess.py | 8 |
10 files changed, 120 insertions, 64 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index d0a337b..db13250 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -578,6 +578,9 @@ class BaseEventLoop(events.AbstractEventLoop): transport, protocol = yield from self._create_connection_transport( sock, protocol_factory, ssl, server_hostname) if self._debug: + # Get the socket from the transport because SSL transport closes + # the old socket and creates a new SSL socket + sock = transport.get_extra_info('socket') logger.debug("%r connected to %s:%r: (%r, %r)", sock, host, port, transport, protocol) return transport, protocol @@ -725,6 +728,10 @@ class BaseEventLoop(events.AbstractEventLoop): sock = socket.socket(af, socktype, proto) except socket.error: # Assume it's a bad family/type/protocol combination. + if self._debug: + logger.warning('create_server() failed to create ' + 'socket.socket(%r, %r, %r)', + af, socktype, proto, exc_info=True) continue sockets.append(sock) if reuse_address: diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 751155b..0ad0656 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -172,6 +172,9 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, except ConnectionAbortedError as exc: if not self._closing: self._fatal_error(exc, 'Fatal read error on pipe transport') + elif self._loop.get_debug(): + logger.debug("Read error on pipe transport while closing", + exc_info=True) except ConnectionResetError as exc: self._force_close(exc) except OSError as exc: @@ -324,12 +327,16 @@ class _ProactorSocketTransport(_ProactorReadPipeTransport, try: self._extra['sockname'] = sock.getsockname() except (socket.error, AttributeError): - pass + if self._loop.get_debug(): + logger.warning("getsockname() failed on %r", + sock, exc_info=True) if 'peername' not in self._extra: try: self._extra['peername'] = sock.getpeername() except (socket.error, AttributeError): - pass + if self._loop.get_debug(): + logger.warning("getpeername() failed on %r", + sock, exc_info=True) def can_write_eof(self): return True @@ -385,18 +392,12 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): self._selector = None def sock_recv(self, sock, n): - if self.get_debug() and sock.gettimeout() != 0: - raise ValueError("the socket must be non-blocking") return self._proactor.recv(sock, n) def sock_sendall(self, sock, data): - if self.get_debug() and sock.gettimeout() != 0: - raise ValueError("the socket must be non-blocking") return self._proactor.send(sock, data) def sock_connect(self, sock, address): - if self.get_debug() and sock.gettimeout() != 0: - raise ValueError("the socket must be non-blocking") try: base_events._check_resolved_address(sock, address) except ValueError as err: @@ -407,8 +408,6 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): return self._proactor.connect(sock, address) def sock_accept(self, sock): - if self.get_debug() and sock.gettimeout() != 0: - raise ValueError("the socket must be non-blocking") return self._proactor.accept(sock) def _socketpair(self): @@ -470,11 +469,14 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): except OSError as exc: if sock.fileno() != -1: self.call_exception_handler({ - 'message': 'Accept failed', + 'message': 'Accept failed on a socket', 'exception': exc, 'socket': sock, }) sock.close() + elif self._debug: + logger.debug("Accept failed on socket %r", + sock, exc_info=True) except futures.CancelledError: sock.close() else: diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 6b7bdf0..0434a70 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -450,22 +450,24 @@ class _SelectorTransport(transports._FlowControlMixin, def __repr__(self): info = [self.__class__.__name__, 'fd=%s' % self._sock_fd] - polling = _test_selector_event(self._loop._selector, - self._sock_fd, selectors.EVENT_READ) - if polling: - info.append('read=polling') - else: - info.append('read=idle') + # test if the transport was closed + if self._loop is not None: + polling = _test_selector_event(self._loop._selector, + self._sock_fd, selectors.EVENT_READ) + if polling: + info.append('read=polling') + else: + info.append('read=idle') - polling = _test_selector_event(self._loop._selector, - self._sock_fd, selectors.EVENT_WRITE) - if polling: - state = 'polling' - else: - state = 'idle' + polling = _test_selector_event(self._loop._selector, + self._sock_fd, selectors.EVENT_WRITE) + if polling: + state = 'polling' + else: + state = 'idle' - bufsize = self.get_write_buffer_size() - info.append('write=<%s, bufsize=%s>' % (state, bufsize)) + bufsize = self.get_write_buffer_size() + info.append('write=<%s, bufsize=%s>' % (state, bufsize)) return '<%s>' % ' '.join(info) def abort(self): @@ -689,7 +691,6 @@ class _SelectorSslTransport(_SelectorTransport): self._server_hostname = server_hostname self._waiter = waiter - self._rawsock = rawsock self._sslcontext = sslcontext self._paused = False diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py index 840bbf9..ac7680d 100644 --- a/Lib/asyncio/test_utils.py +++ b/Lib/asyncio/test_utils.py @@ -417,3 +417,9 @@ def disable_logger(): yield finally: logger.setLevel(old_level) + +def mock_nonblocking_socket(): + """Create a mock of a non-blocking socket.""" + sock = mock.Mock(socket.socket) + sock.gettimeout.return_value = 0.0 + return sock diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 94a46d0..335a77d 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -336,7 +336,10 @@ class _UnixReadPipeTransport(transports.ReadTransport): def _fatal_error(self, exc, message='Fatal error on pipe transport'): # should be called by exception handler only - if not (isinstance(exc, OSError) and exc.errno == errno.EIO): + if (isinstance(exc, OSError) and exc.errno == errno.EIO): + if self._loop.get_debug(): + logger.debug("%r: %s", self, message, exc_info=True) + else: self._loop.call_exception_handler({ 'message': message, 'exception': exc, @@ -508,7 +511,10 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, def _fatal_error(self, exc, message='Fatal error on pipe transport'): # should be called by exception handler only - if not isinstance(exc, (BrokenPipeError, ConnectionResetError)): + if isinstance(exc, (BrokenPipeError, ConnectionResetError)): + if self._loop.get_debug(): + logger.debug("%r: %s", self, message, exc_info=True) + else: self._loop.call_exception_handler({ 'message': message, 'exception': exc, @@ -749,7 +755,9 @@ class SafeChildWatcher(BaseChildWatcher): except KeyError: # pragma: no cover # May happen if .remove_child_handler() is called # after os.waitpid() returns. - pass + if self._loop.get_debug(): + logger.warning("Child watcher got an unexpected pid: %r", + pid, exc_info=True) else: callback(pid, returncode, *args) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index ec427d5..6763f0b 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -111,10 +111,17 @@ class _WaitHandleFuture(futures.Future): return try: _overlapped.UnregisterWait(self._wait_handle) - except OSError as e: - if e.winerror != _overlapped.ERROR_IO_PENDING: - raise + except OSError as exc: # ERROR_IO_PENDING is not an error, the wait was unregistered + if exc.winerror != _overlapped.ERROR_IO_PENDING: + context = { + 'message': 'Failed to unregister the wait handle', + 'exception': exc, + 'future': self, + } + if self._source_traceback: + context['source_traceback'] = self._source_traceback + self._loop.call_exception_handler(context) self._wait_handle = None self._iocp = None self._ov = None @@ -145,6 +152,11 @@ class PipeServer(object): def __init__(self, address): self._address = address self._free_instances = weakref.WeakSet() + # initialize the pipe attribute before calling _server_pipe_handle() + # because this function can raise an exception and the destructor calls + # the close() method + self._pipe = None + self._accept_pipe_future = None self._pipe = self._server_pipe_handle(True) def _get_unconnected_pipe(self): @@ -174,6 +186,9 @@ class PipeServer(object): return pipe def close(self): + if self._accept_pipe_future is not None: + self._accept_pipe_future.cancel() + self._accept_pipe_future = None # Close all instances which have not been connected to by a client. if self._address is not None: for pipe in self._free_instances: @@ -216,7 +231,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): def start_serving_pipe(self, protocol_factory, address): server = PipeServer(address) - def loop(f=None): + def loop_accept_pipe(f=None): pipe = None try: if f: @@ -237,13 +252,17 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): 'pipe': pipe, }) pipe.close() + elif self._debug: + logger.warning("Accept pipe failed on pipe %r", + pipe, exc_info=True) except futures.CancelledError: if pipe: pipe.close() else: - f.add_done_callback(loop) + server._accept_pipe_future = f + f.add_done_callback(loop_accept_pipe) - self.call_soon(loop) + self.call_soon(loop_accept_pipe) return [server] @coroutine diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 7bf07ed..ca12101 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -792,6 +792,9 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase): class _SelectorTransportMock: _sock = None + def get_extra_info(self, key): + return mock.Mock() + def close(self): self._sock.close() diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index f6220f4..02cdbdc 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -27,6 +27,7 @@ from test import support # find_unused_port, IPV6_ENABLED, TEST_HOME_DIR import asyncio +from asyncio import proactor_events from asyncio import selector_events from asyncio import test_utils @@ -383,22 +384,23 @@ class EventLoopTestsMixin: self.assertEqual(read, data) def _basetest_sock_client_ops(self, httpd, sock): - # in debug mode, socket operations must fail - # if the socket is not in blocking mode - self.loop.set_debug(True) - sock.setblocking(True) - with self.assertRaises(ValueError): - self.loop.run_until_complete( - self.loop.sock_connect(sock, httpd.address)) - with self.assertRaises(ValueError): - self.loop.run_until_complete( - self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n')) - with self.assertRaises(ValueError): - self.loop.run_until_complete( - self.loop.sock_recv(sock, 1024)) - with self.assertRaises(ValueError): - self.loop.run_until_complete( - self.loop.sock_accept(sock)) + if not isinstance(self.loop, proactor_events.BaseProactorEventLoop): + # in debug mode, socket operations must fail + # if the socket is not in blocking mode + self.loop.set_debug(True) + sock.setblocking(True) + with self.assertRaises(ValueError): + self.loop.run_until_complete( + self.loop.sock_connect(sock, httpd.address)) + with self.assertRaises(ValueError): + self.loop.run_until_complete( + self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n')) + with self.assertRaises(ValueError): + self.loop.run_until_complete( + self.loop.sock_recv(sock, 1024)) + with self.assertRaises(ValueError): + self.loop.run_until_complete( + self.loop.sock_accept(sock)) # test in non-blocking mode sock.setblocking(False) @@ -1229,6 +1231,7 @@ class EventLoopTestsMixin: "Don't support pipes for Windows") def test_write_pipe_disconnect_on_close(self): rsock, wsock = test_utils.socketpair() + rsock.setblocking(False) pipeobj = io.open(wsock.detach(), 'wb', 1024) proto = MyWritePipeProto(loop=self.loop) @@ -1366,6 +1369,7 @@ class EventLoopTestsMixin: for sock_type in (socket.SOCK_STREAM, socket.SOCK_DGRAM): sock = socket.socket(family, sock_type) with sock: + sock.setblocking(False) connect = self.loop.sock_connect(sock, address) with self.assertRaises(ValueError) as cm: self.loop.run_until_complete(connect) diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index bd6c2f2..df6e991 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -58,8 +58,9 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): self.loop.remove_reader = mock.Mock() self.loop.remove_writer = mock.Mock() waiter = asyncio.Future(loop=self.loop) - transport = self.loop._make_ssl_transport( - m, asyncio.Protocol(), m, waiter) + with test_utils.disable_logger(): + transport = self.loop._make_ssl_transport( + m, asyncio.Protocol(), m, waiter) self.assertIsInstance(transport, _SelectorSslTransport) @mock.patch('asyncio.selector_events.ssl', None) @@ -127,7 +128,8 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): def test_write_to_self_tryagain(self): self.loop._csock.send.side_effect = BlockingIOError - self.assertIsNone(self.loop._write_to_self()) + with test_utils.disable_logger(): + self.assertIsNone(self.loop._write_to_self()) def test_write_to_self_exception(self): # _write_to_self() swallows OSError @@ -135,7 +137,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): self.assertRaises(RuntimeError, self.loop._write_to_self) def test_sock_recv(self): - sock = mock.Mock() + sock = test_utils.mock_nonblocking_socket() self.loop._sock_recv = mock.Mock() f = self.loop.sock_recv(sock, 1024) @@ -183,7 +185,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): self.assertIs(err, f.exception()) def test_sock_sendall(self): - sock = mock.Mock() + sock = test_utils.mock_nonblocking_socket() self.loop._sock_sendall = mock.Mock() f = self.loop.sock_sendall(sock, b'data') @@ -193,7 +195,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): self.loop._sock_sendall.call_args[0]) def test_sock_sendall_nodata(self): - sock = mock.Mock() + sock = test_utils.mock_nonblocking_socket() self.loop._sock_sendall = mock.Mock() f = self.loop.sock_sendall(sock, b'') @@ -295,7 +297,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): self.loop.add_writer.call_args[0]) def test_sock_connect(self): - sock = mock.Mock() + sock = test_utils.mock_nonblocking_socket() self.loop._sock_connect = mock.Mock() f = self.loop.sock_connect(sock, ('127.0.0.1', 8080)) @@ -361,7 +363,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): self.assertIsInstance(f.exception(), OSError) def test_sock_accept(self): - sock = mock.Mock() + sock = test_utils.mock_nonblocking_socket() self.loop._sock_accept = mock.Mock() f = self.loop.sock_accept(sock) @@ -782,7 +784,8 @@ class SelectorSocketTransportTests(test_utils.TestCase): transport = _SelectorSocketTransport( self.loop, self.sock, self.protocol) transport._force_close = mock.Mock() - transport._read_ready() + with test_utils.disable_logger(): + transport._read_ready() transport._force_close.assert_called_with(err) @mock.patch('logging.exception') @@ -1219,7 +1222,8 @@ class SelectorSslTransportTests(test_utils.TestCase): err = self.sslsock.recv.side_effect = ConnectionResetError() transport = self._make_one() transport._force_close = mock.Mock() - transport._read_ready() + with test_utils.disable_logger(): + transport._read_ready() transport._force_close.assert_called_with(err) def test_read_ready_recv_retry(self): diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index b5b1012..0e9e1ce 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -148,15 +148,17 @@ class SubprocessMixin: coro = write_stdin(proc, large_data) # drain() must raise BrokenPipeError or ConnectionResetError - self.assertRaises((BrokenPipeError, ConnectionResetError), - self.loop.run_until_complete, coro) + with test_utils.disable_logger(): + self.assertRaises((BrokenPipeError, ConnectionResetError), + self.loop.run_until_complete, coro) self.loop.run_until_complete(proc.wait()) def test_communicate_ignore_broken_pipe(self): proc, large_data = self.prepare_broken_pipe_test() # communicate() must ignore BrokenPipeError when feeding stdin - self.loop.run_until_complete(proc.communicate(large_data)) + with test_utils.disable_logger(): + self.loop.run_until_complete(proc.communicate(large_data)) self.loop.run_until_complete(proc.wait()) |