diff options
Diffstat (limited to 'Lib/test/test_asyncio/test_sslproto.py')
-rw-r--r-- | Lib/test/test_asyncio/test_sslproto.py | 778 |
1 files changed, 0 insertions, 778 deletions
diff --git a/Lib/test/test_asyncio/test_sslproto.py b/Lib/test/test_asyncio/test_sslproto.py deleted file mode 100644 index c716eac..0000000 --- a/Lib/test/test_asyncio/test_sslproto.py +++ /dev/null @@ -1,778 +0,0 @@ -"""Tests for asyncio/sslproto.py.""" - -import logging -import socket -import sys -from test import support -import unittest -import weakref -from unittest import mock -try: - import ssl -except ImportError: - ssl = None - -import asyncio -from asyncio import log -from asyncio import protocols -from asyncio import sslproto -from test import support -from test.test_asyncio import utils as test_utils -from test.test_asyncio import functional as func_tests - - -def tearDownModule(): - asyncio.set_event_loop_policy(None) - - -@unittest.skipIf(ssl is None, 'No ssl module') -class SslProtoHandshakeTests(test_utils.TestCase): - - def setUp(self): - super().setUp() - self.loop = asyncio.new_event_loop() - self.set_event_loop(self.loop) - - def ssl_protocol(self, *, waiter=None, proto=None): - sslcontext = test_utils.dummy_ssl_context() - if proto is None: # app protocol - proto = asyncio.Protocol() - ssl_proto = sslproto.SSLProtocol(self.loop, proto, sslcontext, waiter, - ssl_handshake_timeout=0.1) - self.assertIs(ssl_proto._app_transport.get_protocol(), proto) - self.addCleanup(ssl_proto._app_transport.close) - return ssl_proto - - def connection_made(self, ssl_proto, *, do_handshake=None): - transport = mock.Mock() - sslpipe = mock.Mock() - sslpipe.shutdown.return_value = b'' - if do_handshake: - sslpipe.do_handshake.side_effect = do_handshake - else: - def mock_handshake(callback): - return [] - sslpipe.do_handshake.side_effect = mock_handshake - with mock.patch('asyncio.sslproto._SSLPipe', return_value=sslpipe): - ssl_proto.connection_made(transport) - return transport - - def test_handshake_timeout_zero(self): - sslcontext = test_utils.dummy_ssl_context() - app_proto = mock.Mock() - waiter = mock.Mock() - with self.assertRaisesRegex(ValueError, 'a positive number'): - sslproto.SSLProtocol(self.loop, app_proto, sslcontext, waiter, - ssl_handshake_timeout=0) - - def test_handshake_timeout_negative(self): - sslcontext = test_utils.dummy_ssl_context() - app_proto = mock.Mock() - waiter = mock.Mock() - with self.assertRaisesRegex(ValueError, 'a positive number'): - sslproto.SSLProtocol(self.loop, app_proto, sslcontext, waiter, - ssl_handshake_timeout=-10) - - def test_eof_received_waiter(self): - waiter = self.loop.create_future() - ssl_proto = self.ssl_protocol(waiter=waiter) - self.connection_made(ssl_proto) - ssl_proto.eof_received() - test_utils.run_briefly(self.loop) - self.assertIsInstance(waiter.exception(), ConnectionResetError) - - def test_fatal_error_no_name_error(self): - # From issue #363. - # _fatal_error() generates a NameError if sslproto.py - # does not import base_events. - waiter = self.loop.create_future() - ssl_proto = self.ssl_protocol(waiter=waiter) - # Temporarily turn off error logging so as not to spoil test output. - log_level = log.logger.getEffectiveLevel() - log.logger.setLevel(logging.FATAL) - try: - ssl_proto._fatal_error(None) - finally: - # Restore error logging. - log.logger.setLevel(log_level) - - def test_connection_lost(self): - # From issue #472. - # yield from waiter hang if lost_connection was called. - waiter = self.loop.create_future() - ssl_proto = self.ssl_protocol(waiter=waiter) - self.connection_made(ssl_proto) - ssl_proto.connection_lost(ConnectionAbortedError) - test_utils.run_briefly(self.loop) - self.assertIsInstance(waiter.exception(), ConnectionAbortedError) - - def test_close_during_handshake(self): - # bpo-29743 Closing transport during handshake process leaks socket - waiter = self.loop.create_future() - ssl_proto = self.ssl_protocol(waiter=waiter) - - transport = self.connection_made(ssl_proto) - test_utils.run_briefly(self.loop) - - ssl_proto._app_transport.close() - self.assertTrue(transport.abort.called) - - def test_get_extra_info_on_closed_connection(self): - waiter = self.loop.create_future() - ssl_proto = self.ssl_protocol(waiter=waiter) - self.assertIsNone(ssl_proto._get_extra_info('socket')) - default = object() - self.assertIs(ssl_proto._get_extra_info('socket', default), default) - self.connection_made(ssl_proto) - self.assertIsNotNone(ssl_proto._get_extra_info('socket')) - ssl_proto.connection_lost(None) - self.assertIsNone(ssl_proto._get_extra_info('socket')) - - def test_set_new_app_protocol(self): - waiter = self.loop.create_future() - ssl_proto = self.ssl_protocol(waiter=waiter) - new_app_proto = asyncio.Protocol() - ssl_proto._app_transport.set_protocol(new_app_proto) - self.assertIs(ssl_proto._app_transport.get_protocol(), new_app_proto) - self.assertIs(ssl_proto._app_protocol, new_app_proto) - - def test_data_received_after_closing(self): - ssl_proto = self.ssl_protocol() - self.connection_made(ssl_proto) - transp = ssl_proto._app_transport - - transp.close() - - # should not raise - self.assertIsNone(ssl_proto.data_received(b'data')) - - def test_write_after_closing(self): - ssl_proto = self.ssl_protocol() - self.connection_made(ssl_proto) - transp = ssl_proto._app_transport - transp.close() - - # should not raise - self.assertIsNone(transp.write(b'data')) - - -############################################################################## -# Start TLS Tests -############################################################################## - - -class BaseStartTLS(func_tests.FunctionalTestCaseMixin): - - PAYLOAD_SIZE = 1024 * 100 - TIMEOUT = support.LONG_TIMEOUT - - def new_loop(self): - raise NotImplementedError - - def test_buf_feed_data(self): - - class Proto(asyncio.BufferedProtocol): - - def __init__(self, bufsize, usemv): - self.buf = bytearray(bufsize) - self.mv = memoryview(self.buf) - self.data = b'' - self.usemv = usemv - - def get_buffer(self, sizehint): - if self.usemv: - return self.mv - else: - return self.buf - - def buffer_updated(self, nsize): - if self.usemv: - self.data += self.mv[:nsize] - else: - self.data += self.buf[:nsize] - - for usemv in [False, True]: - proto = Proto(1, usemv) - protocols._feed_data_to_buffered_proto(proto, b'12345') - self.assertEqual(proto.data, b'12345') - - proto = Proto(2, usemv) - protocols._feed_data_to_buffered_proto(proto, b'12345') - self.assertEqual(proto.data, b'12345') - - proto = Proto(2, usemv) - protocols._feed_data_to_buffered_proto(proto, b'1234') - self.assertEqual(proto.data, b'1234') - - proto = Proto(4, usemv) - protocols._feed_data_to_buffered_proto(proto, b'1234') - self.assertEqual(proto.data, b'1234') - - proto = Proto(100, usemv) - protocols._feed_data_to_buffered_proto(proto, b'12345') - self.assertEqual(proto.data, b'12345') - - proto = Proto(0, usemv) - with self.assertRaisesRegex(RuntimeError, 'empty buffer'): - protocols._feed_data_to_buffered_proto(proto, b'12345') - - def test_start_tls_client_reg_proto_1(self): - HELLO_MSG = b'1' * self.PAYLOAD_SIZE - - server_context = test_utils.simple_server_sslcontext() - client_context = test_utils.simple_client_sslcontext() - - def serve(sock): - sock.settimeout(self.TIMEOUT) - - data = sock.recv_all(len(HELLO_MSG)) - self.assertEqual(len(data), len(HELLO_MSG)) - - sock.start_tls(server_context, server_side=True) - - sock.sendall(b'O') - data = sock.recv_all(len(HELLO_MSG)) - self.assertEqual(len(data), len(HELLO_MSG)) - - sock.shutdown(socket.SHUT_RDWR) - sock.close() - - class ClientProto(asyncio.Protocol): - def __init__(self, on_data, on_eof): - self.on_data = on_data - self.on_eof = on_eof - self.con_made_cnt = 0 - - def connection_made(proto, tr): - proto.con_made_cnt += 1 - # Ensure connection_made gets called only once. - self.assertEqual(proto.con_made_cnt, 1) - - def data_received(self, data): - self.on_data.set_result(data) - - def eof_received(self): - self.on_eof.set_result(True) - - async def client(addr): - await asyncio.sleep(0.5) - - on_data = self.loop.create_future() - on_eof = self.loop.create_future() - - tr, proto = await self.loop.create_connection( - lambda: ClientProto(on_data, on_eof), *addr) - - tr.write(HELLO_MSG) - new_tr = await self.loop.start_tls(tr, proto, client_context) - - self.assertEqual(await on_data, b'O') - new_tr.write(HELLO_MSG) - await on_eof - - new_tr.close() - - with self.tcp_server(serve, timeout=self.TIMEOUT) as srv: - self.loop.run_until_complete( - asyncio.wait_for(client(srv.addr), - timeout=support.SHORT_TIMEOUT)) - - # No garbage is left if SSL is closed uncleanly - client_context = weakref.ref(client_context) - self.assertIsNone(client_context()) - - def test_create_connection_memory_leak(self): - HELLO_MSG = b'1' * self.PAYLOAD_SIZE - - server_context = test_utils.simple_server_sslcontext() - client_context = test_utils.simple_client_sslcontext() - - def serve(sock): - sock.settimeout(self.TIMEOUT) - - sock.start_tls(server_context, server_side=True) - - sock.sendall(b'O') - data = sock.recv_all(len(HELLO_MSG)) - self.assertEqual(len(data), len(HELLO_MSG)) - - sock.shutdown(socket.SHUT_RDWR) - sock.close() - - class ClientProto(asyncio.Protocol): - def __init__(self, on_data, on_eof): - self.on_data = on_data - self.on_eof = on_eof - self.con_made_cnt = 0 - - def connection_made(proto, tr): - # XXX: We assume user stores the transport in protocol - proto.tr = tr - proto.con_made_cnt += 1 - # Ensure connection_made gets called only once. - self.assertEqual(proto.con_made_cnt, 1) - - def data_received(self, data): - self.on_data.set_result(data) - - def eof_received(self): - self.on_eof.set_result(True) - - async def client(addr): - await asyncio.sleep(0.5) - - on_data = self.loop.create_future() - on_eof = self.loop.create_future() - - tr, proto = await self.loop.create_connection( - lambda: ClientProto(on_data, on_eof), *addr, - ssl=client_context) - - self.assertEqual(await on_data, b'O') - tr.write(HELLO_MSG) - await on_eof - - tr.close() - - with self.tcp_server(serve, timeout=self.TIMEOUT) as srv: - self.loop.run_until_complete( - asyncio.wait_for(client(srv.addr), - timeout=support.SHORT_TIMEOUT)) - - # No garbage is left for SSL client from loop.create_connection, even - # if user stores the SSLTransport in corresponding protocol instance - client_context = weakref.ref(client_context) - self.assertIsNone(client_context()) - - def test_start_tls_client_buf_proto_1(self): - HELLO_MSG = b'1' * self.PAYLOAD_SIZE - - server_context = test_utils.simple_server_sslcontext() - client_context = test_utils.simple_client_sslcontext() - client_con_made_calls = 0 - - def serve(sock): - sock.settimeout(self.TIMEOUT) - - data = sock.recv_all(len(HELLO_MSG)) - self.assertEqual(len(data), len(HELLO_MSG)) - - sock.start_tls(server_context, server_side=True) - - sock.sendall(b'O') - data = sock.recv_all(len(HELLO_MSG)) - self.assertEqual(len(data), len(HELLO_MSG)) - - sock.sendall(b'2') - data = sock.recv_all(len(HELLO_MSG)) - self.assertEqual(len(data), len(HELLO_MSG)) - - sock.shutdown(socket.SHUT_RDWR) - sock.close() - - class ClientProtoFirst(asyncio.BufferedProtocol): - def __init__(self, on_data): - self.on_data = on_data - self.buf = bytearray(1) - - def connection_made(self, tr): - nonlocal client_con_made_calls - client_con_made_calls += 1 - - def get_buffer(self, sizehint): - return self.buf - - def buffer_updated(self, nsize): - assert nsize == 1 - self.on_data.set_result(bytes(self.buf[:nsize])) - - class ClientProtoSecond(asyncio.Protocol): - def __init__(self, on_data, on_eof): - self.on_data = on_data - self.on_eof = on_eof - self.con_made_cnt = 0 - - def connection_made(self, tr): - nonlocal client_con_made_calls - client_con_made_calls += 1 - - def data_received(self, data): - self.on_data.set_result(data) - - def eof_received(self): - self.on_eof.set_result(True) - - async def client(addr): - await asyncio.sleep(0.5) - - on_data1 = self.loop.create_future() - on_data2 = self.loop.create_future() - on_eof = self.loop.create_future() - - tr, proto = await self.loop.create_connection( - lambda: ClientProtoFirst(on_data1), *addr) - - tr.write(HELLO_MSG) - new_tr = await self.loop.start_tls(tr, proto, client_context) - - self.assertEqual(await on_data1, b'O') - new_tr.write(HELLO_MSG) - - new_tr.set_protocol(ClientProtoSecond(on_data2, on_eof)) - self.assertEqual(await on_data2, b'2') - new_tr.write(HELLO_MSG) - await on_eof - - new_tr.close() - - # connection_made() should be called only once -- when - # we establish connection for the first time. Start TLS - # doesn't call connection_made() on application protocols. - self.assertEqual(client_con_made_calls, 1) - - with self.tcp_server(serve, timeout=self.TIMEOUT) as srv: - self.loop.run_until_complete( - asyncio.wait_for(client(srv.addr), - timeout=self.TIMEOUT)) - - def test_start_tls_slow_client_cancel(self): - HELLO_MSG = b'1' * self.PAYLOAD_SIZE - - client_context = test_utils.simple_client_sslcontext() - server_waits_on_handshake = self.loop.create_future() - - def serve(sock): - sock.settimeout(self.TIMEOUT) - - data = sock.recv_all(len(HELLO_MSG)) - self.assertEqual(len(data), len(HELLO_MSG)) - - try: - self.loop.call_soon_threadsafe( - server_waits_on_handshake.set_result, None) - data = sock.recv_all(1024 * 1024) - except ConnectionAbortedError: - pass - finally: - sock.close() - - class ClientProto(asyncio.Protocol): - def __init__(self, on_data, on_eof): - self.on_data = on_data - self.on_eof = on_eof - self.con_made_cnt = 0 - - def connection_made(proto, tr): - proto.con_made_cnt += 1 - # Ensure connection_made gets called only once. - self.assertEqual(proto.con_made_cnt, 1) - - def data_received(self, data): - self.on_data.set_result(data) - - def eof_received(self): - self.on_eof.set_result(True) - - async def client(addr): - await asyncio.sleep(0.5) - - on_data = self.loop.create_future() - on_eof = self.loop.create_future() - - tr, proto = await self.loop.create_connection( - lambda: ClientProto(on_data, on_eof), *addr) - - tr.write(HELLO_MSG) - - await server_waits_on_handshake - - with self.assertRaises(asyncio.TimeoutError): - await asyncio.wait_for( - self.loop.start_tls(tr, proto, client_context), - 0.5) - - with self.tcp_server(serve, timeout=self.TIMEOUT) as srv: - self.loop.run_until_complete( - asyncio.wait_for(client(srv.addr), - timeout=support.SHORT_TIMEOUT)) - - def test_start_tls_server_1(self): - HELLO_MSG = b'1' * self.PAYLOAD_SIZE - ANSWER = b'answer' - - server_context = test_utils.simple_server_sslcontext() - client_context = test_utils.simple_client_sslcontext() - answer = None - - def client(sock, addr): - nonlocal answer - sock.settimeout(self.TIMEOUT) - - sock.connect(addr) - data = sock.recv_all(len(HELLO_MSG)) - self.assertEqual(len(data), len(HELLO_MSG)) - - sock.start_tls(client_context) - sock.sendall(HELLO_MSG) - answer = sock.recv_all(len(ANSWER)) - sock.close() - - class ServerProto(asyncio.Protocol): - def __init__(self, on_con, on_con_lost, on_got_hello): - self.on_con = on_con - self.on_con_lost = on_con_lost - self.on_got_hello = on_got_hello - self.data = b'' - self.transport = None - - def connection_made(self, tr): - self.transport = tr - self.on_con.set_result(tr) - - def replace_transport(self, tr): - self.transport = tr - - def data_received(self, data): - self.data += data - if len(self.data) >= len(HELLO_MSG): - self.on_got_hello.set_result(None) - - def connection_lost(self, exc): - self.transport = None - if exc is None: - self.on_con_lost.set_result(None) - else: - self.on_con_lost.set_exception(exc) - - async def main(proto, on_con, on_con_lost, on_got_hello): - tr = await on_con - tr.write(HELLO_MSG) - - self.assertEqual(proto.data, b'') - - new_tr = await self.loop.start_tls( - tr, proto, server_context, - server_side=True, - ssl_handshake_timeout=self.TIMEOUT) - proto.replace_transport(new_tr) - - await on_got_hello - new_tr.write(ANSWER) - - await on_con_lost - self.assertEqual(proto.data, HELLO_MSG) - new_tr.close() - - async def run_main(): - on_con = self.loop.create_future() - on_con_lost = self.loop.create_future() - on_got_hello = self.loop.create_future() - proto = ServerProto(on_con, on_con_lost, on_got_hello) - - server = await self.loop.create_server( - lambda: proto, '127.0.0.1', 0) - addr = server.sockets[0].getsockname() - - with self.tcp_client(lambda sock: client(sock, addr), - timeout=self.TIMEOUT): - await asyncio.wait_for( - main(proto, on_con, on_con_lost, on_got_hello), - timeout=self.TIMEOUT) - - server.close() - await server.wait_closed() - self.assertEqual(answer, ANSWER) - - self.loop.run_until_complete(run_main()) - - def test_start_tls_wrong_args(self): - async def main(): - with self.assertRaisesRegex(TypeError, 'SSLContext, got'): - await self.loop.start_tls(None, None, None) - - sslctx = test_utils.simple_server_sslcontext() - with self.assertRaisesRegex(TypeError, 'is not supported'): - await self.loop.start_tls(None, None, sslctx) - - self.loop.run_until_complete(main()) - - def test_handshake_timeout(self): - # bpo-29970: Check that a connection is aborted if handshake is not - # completed in timeout period, instead of remaining open indefinitely - client_sslctx = test_utils.simple_client_sslcontext() - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - - server_side_aborted = False - - def server(sock): - nonlocal server_side_aborted - try: - sock.recv_all(1024 * 1024) - except ConnectionAbortedError: - server_side_aborted = True - finally: - sock.close() - - async def client(addr): - await asyncio.wait_for( - self.loop.create_connection( - asyncio.Protocol, - *addr, - ssl=client_sslctx, - server_hostname='', - ssl_handshake_timeout=support.SHORT_TIMEOUT), - 0.5) - - with self.tcp_server(server, - max_clients=1, - backlog=1) as srv: - - with self.assertRaises(asyncio.TimeoutError): - self.loop.run_until_complete(client(srv.addr)) - - self.assertTrue(server_side_aborted) - - # Python issue #23197: cancelling a handshake must not raise an - # exception or log an error, even if the handshake failed - self.assertEqual(messages, []) - - # The 10s handshake timeout should be cancelled to free related - # objects without really waiting for 10s - client_sslctx = weakref.ref(client_sslctx) - self.assertIsNone(client_sslctx()) - - def test_create_connection_ssl_slow_handshake(self): - client_sslctx = test_utils.simple_client_sslcontext() - - messages = [] - self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) - - def server(sock): - try: - sock.recv_all(1024 * 1024) - except ConnectionAbortedError: - pass - finally: - sock.close() - - async def client(addr): - with self.assertWarns(DeprecationWarning): - reader, writer = await asyncio.open_connection( - *addr, - ssl=client_sslctx, - server_hostname='', - loop=self.loop, - ssl_handshake_timeout=1.0) - - with self.tcp_server(server, - max_clients=1, - backlog=1) as srv: - - with self.assertRaisesRegex( - ConnectionAbortedError, - r'SSL handshake.*is taking longer'): - - self.loop.run_until_complete(client(srv.addr)) - - self.assertEqual(messages, []) - - def test_create_connection_ssl_failed_certificate(self): - self.loop.set_exception_handler(lambda loop, ctx: None) - - sslctx = test_utils.simple_server_sslcontext() - client_sslctx = test_utils.simple_client_sslcontext( - disable_verify=False) - - def server(sock): - try: - sock.start_tls( - sslctx, - server_side=True) - except ssl.SSLError: - pass - except OSError: - pass - finally: - sock.close() - - async def client(addr): - with self.assertWarns(DeprecationWarning): - reader, writer = await asyncio.open_connection( - *addr, - ssl=client_sslctx, - server_hostname='', - loop=self.loop, - ssl_handshake_timeout=support.LOOPBACK_TIMEOUT) - - with self.tcp_server(server, - max_clients=1, - backlog=1) as srv: - - with self.assertRaises(ssl.SSLCertVerificationError): - self.loop.run_until_complete(client(srv.addr)) - - def test_start_tls_client_corrupted_ssl(self): - self.loop.set_exception_handler(lambda loop, ctx: None) - - sslctx = test_utils.simple_server_sslcontext() - client_sslctx = test_utils.simple_client_sslcontext() - - def server(sock): - orig_sock = sock.dup() - try: - sock.start_tls( - sslctx, - server_side=True) - sock.sendall(b'A\n') - sock.recv_all(1) - orig_sock.send(b'please corrupt the SSL connection') - except ssl.SSLError: - pass - finally: - orig_sock.close() - sock.close() - - async def client(addr): - with self.assertWarns(DeprecationWarning): - reader, writer = await asyncio.open_connection( - *addr, - ssl=client_sslctx, - server_hostname='', - loop=self.loop) - - self.assertEqual(await reader.readline(), b'A\n') - writer.write(b'B') - with self.assertRaises(ssl.SSLError): - await reader.readline() - - writer.close() - return 'OK' - - with self.tcp_server(server, - max_clients=1, - backlog=1) as srv: - - res = self.loop.run_until_complete(client(srv.addr)) - - self.assertEqual(res, 'OK') - - -@unittest.skipIf(ssl is None, 'No ssl module') -class SelectorStartTLSTests(BaseStartTLS, unittest.TestCase): - - def new_loop(self): - return asyncio.SelectorEventLoop() - - -@unittest.skipIf(ssl is None, 'No ssl module') -@unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only') -class ProactorStartTLSTests(BaseStartTLS, unittest.TestCase): - - def new_loop(self): - return asyncio.ProactorEventLoop() - - -if __name__ == '__main__': - unittest.main() |