diff options
author | Christian Heimes <christian@python.org> | 2021-04-19 05:27:10 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-19 05:27:10 (GMT) |
commit | 2875c603b2a7691b55c2046aca54831c91efda8e (patch) | |
tree | dd22c59c35f8c113175befbe46a31fb2ecc24733 /Lib/test/test_ssl.py | |
parent | 89d1550d14ba689af12eeb726e4ff8ce73cee7e1 (diff) | |
download | cpython-2875c603b2a7691b55c2046aca54831c91efda8e.zip cpython-2875c603b2a7691b55c2046aca54831c91efda8e.tar.gz cpython-2875c603b2a7691b55c2046aca54831c91efda8e.tar.bz2 |
bpo-43880: Show DeprecationWarnings for deprecated ssl module features (GH-25455)
* ssl.OP_NO_SSLv2
* ssl.OP_NO_SSLv3
* ssl.OP_NO_TLSv1
* ssl.OP_NO_TLSv1_1
* ssl.OP_NO_TLSv1_2
* ssl.OP_NO_TLSv1_3
* ssl.PROTOCOL_SSLv2
* ssl.PROTOCOL_SSLv3
* ssl.PROTOCOL_SSLv23 (alias for PROTOCOL_TLS)
* ssl.PROTOCOL_TLS
* ssl.PROTOCOL_TLSv1
* ssl.PROTOCOL_TLSv1_1
* ssl.PROTOCOL_TLSv1_2
* ssl.TLSVersion.SSLv3
* ssl.TLSVersion.TLSv1
* ssl.TLSVersion.TLSv1_1
* ssl.wrap_socket()
* ssl.RAND_pseudo_bytes()
* ssl.RAND_egd() (already removed since it's not supported by OpenSSL 1.1.1)
* ssl.SSLContext() without a protocol argument
* ssl.match_hostname()
* hashlib.pbkdf2_hmac() (pure Python implementation, fast OpenSSL
function will stay)
Signed-off-by: Christian Heimes <christian@python.org>
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r-- | Lib/test/test_ssl.py | 273 |
1 files changed, 120 insertions, 153 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 7b70979..a2c79ff 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -224,7 +224,7 @@ def has_tls_version(version): # check runtime and dynamic crypto policy settings. A TLS version may # be compiled in but disabled by a policy or config option. - ctx = ssl.SSLContext() + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) if ( hasattr(ctx, 'minimum_version') and ctx.minimum_version != ssl.TLSVersion.MINIMUM_SUPPORTED and @@ -306,12 +306,20 @@ def asn1time(cert_time): needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test") +ignore_deprecation = warnings_helper.ignore_warnings( + category=DeprecationWarning +) + -def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *, +def test_wrap_socket(sock, *, cert_reqs=ssl.CERT_NONE, ca_certs=None, ciphers=None, certfile=None, keyfile=None, **kwargs): - context = ssl.SSLContext(ssl_version) + if not kwargs.get("server_side"): + kwargs["server_hostname"] = SIGNED_CERTFILE_HOSTNAME + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + else: + context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) if cert_reqs is not None: if cert_reqs == ssl.CERT_NONE: context.check_hostname = False @@ -378,8 +386,8 @@ class BasicSocketTests(unittest.TestCase): def test_str_for_enums(self): # Make sure that the PROTOCOL_* constants have enum-like string # reprs. - proto = ssl.PROTOCOL_TLS - self.assertEqual(str(proto), 'PROTOCOL_TLS') + proto = ssl.PROTOCOL_TLS_CLIENT + self.assertEqual(str(proto), 'PROTOCOL_TLS_CLIENT') ctx = ssl.SSLContext(proto) self.assertIs(ctx.protocol, proto) @@ -390,7 +398,8 @@ class BasicSocketTests(unittest.TestCase): % (v, (v and "sufficient randomness") or "insufficient randomness")) - data, is_cryptographic = ssl.RAND_pseudo_bytes(16) + with warnings_helper.check_warnings(): + data, is_cryptographic = ssl.RAND_pseudo_bytes(16) self.assertEqual(len(data), 16) self.assertEqual(is_cryptographic, v == 1) if v: @@ -401,48 +410,13 @@ class BasicSocketTests(unittest.TestCase): # negative num is invalid self.assertRaises(ValueError, ssl.RAND_bytes, -5) - self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5) + with warnings_helper.check_warnings(): + self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5) - if hasattr(ssl, 'RAND_egd'): - self.assertRaises(TypeError, ssl.RAND_egd, 1) - self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1) ssl.RAND_add("this is a random string", 75.0) ssl.RAND_add(b"this is a random bytes object", 75.0) ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0) - @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork') - def test_random_fork(self): - status = ssl.RAND_status() - if not status: - self.fail("OpenSSL's PRNG has insufficient randomness") - - rfd, wfd = os.pipe() - pid = os.fork() - if pid == 0: - try: - os.close(rfd) - child_random = ssl.RAND_pseudo_bytes(16)[0] - self.assertEqual(len(child_random), 16) - os.write(wfd, child_random) - os.close(wfd) - except BaseException: - os._exit(1) - else: - os._exit(0) - else: - os.close(wfd) - self.addCleanup(os.close, rfd) - support.wait_process(pid, exitcode=0) - - child_random = os.read(rfd, 16) - self.assertEqual(len(child_random), 16) - parent_random = ssl.RAND_pseudo_bytes(16)[0] - self.assertEqual(len(parent_random), 16) - - self.assertNotEqual(child_random, parent_random) - - maxDiff = None - def test_parse_cert(self): # note that this uses an 'unofficial' function in _ssl.c, # provided solely for this test, to exercise the certificate @@ -624,6 +598,7 @@ class BasicSocketTests(unittest.TestCase): with test_wrap_socket(s) as ss: self.assertEqual(timeout, ss.gettimeout()) + @ignore_deprecation def test_errors_sslwrap(self): sock = socket.socket() self.assertRaisesRegex(ValueError, @@ -675,6 +650,7 @@ class BasicSocketTests(unittest.TestCase): """Wrapping with a badly formatted key (syntax error)""" self.bad_cert_test("badkey.pem") + @ignore_deprecation def test_match_hostname(self): def ok(cert, hostname): ssl.match_hostname(cert, hostname) @@ -1126,17 +1102,15 @@ class ContextTests(unittest.TestCase): def test_constructor(self): for protocol in PROTOCOLS: - ssl.SSLContext(protocol) - ctx = ssl.SSLContext() + with warnings_helper.check_warnings(): + ctx = ssl.SSLContext(protocol) + self.assertEqual(ctx.protocol, protocol) + with warnings_helper.check_warnings(): + ctx = ssl.SSLContext() self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS) self.assertRaises(ValueError, ssl.SSLContext, -1) self.assertRaises(ValueError, ssl.SSLContext, 42) - def test_protocol(self): - for proto in PROTOCOLS: - ctx = ssl.SSLContext(proto) - self.assertEqual(ctx.protocol, proto) - def test_ciphers(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.set_ciphers("ALL") @@ -1174,16 +1148,19 @@ class ContextTests(unittest.TestCase): OP_ENABLE_MIDDLEBOX_COMPAT | OP_IGNORE_UNEXPECTED_EOF) self.assertEqual(default, ctx.options) - ctx.options |= ssl.OP_NO_TLSv1 + with warnings_helper.check_warnings(): + ctx.options |= ssl.OP_NO_TLSv1 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options) - ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1) + with warnings_helper.check_warnings(): + ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1) self.assertEqual(default, ctx.options) ctx.options = 0 # Ubuntu has OP_NO_SSLv3 forced on by default self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3) def test_verify_mode_protocol(self): - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) + with warnings_helper.check_warnings(): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) # Default value self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) ctx.verify_mode = ssl.CERT_OPTIONAL @@ -1221,6 +1198,7 @@ class ContextTests(unittest.TestCase): @requires_minimum_version @unittest.skipIf(IS_LIBRESSL, "see bpo-34001") + @ignore_deprecation def test_min_max_version(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like @@ -1304,7 +1282,7 @@ class ContextTests(unittest.TestCase): "requires OpenSSL >= 1.1.0" ) def test_security_level(self): - ctx = ssl.SSLContext() + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) # The default security callback allows for levels between 0-5 # with OpenSSL defaulting to 1, however some vendors override the # default value (e.g. Debian defaults to 2) @@ -1513,7 +1491,7 @@ class ContextTests(unittest.TestCase): ctx.load_dh_params(CERTFILE) def test_session_stats(self): - for proto in PROTOCOLS: + for proto in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}: ctx = ssl.SSLContext(proto) self.assertEqual(ctx.session_stats(), { 'number': 0, @@ -1673,7 +1651,7 @@ class ContextTests(unittest.TestCase): def test_create_default_context(self): ctx = ssl.create_default_context() - self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS) + self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_CLIENT) self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) self.assertTrue(ctx.check_hostname) self._assert_context_options(ctx) @@ -1682,42 +1660,49 @@ class ContextTests(unittest.TestCase): cadata = f.read() ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH, cadata=cadata) - self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS) + self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_CLIENT) self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) self._assert_context_options(ctx) ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS) + self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_SERVER) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) self._assert_context_options(ctx) + + def test__create_stdlib_context(self): ctx = ssl._create_stdlib_context() - self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS) + self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_CLIENT) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) self.assertFalse(ctx.check_hostname) self._assert_context_options(ctx) - ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1) + with warnings_helper.check_warnings(): + ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1) self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) self._assert_context_options(ctx) - ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1, - cert_reqs=ssl.CERT_REQUIRED, - check_hostname=True) - self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) + with warnings_helper.check_warnings(): + ctx = ssl._create_stdlib_context( + ssl.PROTOCOL_TLSv1_2, + cert_reqs=ssl.CERT_REQUIRED, + check_hostname=True + ) + self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1_2) self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) self.assertTrue(ctx.check_hostname) self._assert_context_options(ctx) ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH) - self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS) + self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_SERVER) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) self._assert_context_options(ctx) def test_check_hostname(self): - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) + with warnings_helper.check_warnings(): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) self.assertFalse(ctx.check_hostname) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) @@ -2042,7 +2027,9 @@ class SimpleBackgroundTests(unittest.TestCase): def test_connect_with_context(self): # Same as test_connect, but with a separately created context - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: s.connect(self.server_addr) self.assertEqual({}, s.getpeercert()) @@ -2062,9 +2049,11 @@ class SimpleBackgroundTests(unittest.TestCase): # This should fail because we have no verification certs. Connection # failure crashes ThreadedEchoServer, so run this in an independent # test method. - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) - ctx.verify_mode = ssl.CERT_REQUIRED - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + s = ctx.wrap_socket( + socket.socket(socket.AF_INET), + server_hostname=SIGNED_CERTFILE_HOSTNAME + ) self.addCleanup(s.close) self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", s.connect, self.server_addr) @@ -2075,19 +2064,19 @@ class SimpleBackgroundTests(unittest.TestCase): # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must # contain both versions of each certificate (same content, different # filename) for this test to be portable across OpenSSL releases. - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) - ctx.verify_mode = ssl.CERT_REQUIRED + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.load_verify_locations(capath=CAPATH) - with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + with ctx.wrap_socket(socket.socket(socket.AF_INET), + server_hostname=SIGNED_CERTFILE_HOSTNAME) as s: s.connect(self.server_addr) cert = s.getpeercert() self.assertTrue(cert) # Same with a bytes `capath` argument - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) - ctx.verify_mode = ssl.CERT_REQUIRED + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.load_verify_locations(capath=BYTES_CAPATH) - with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + with ctx.wrap_socket(socket.socket(socket.AF_INET), + server_hostname=SIGNED_CERTFILE_HOSTNAME) as s: s.connect(self.server_addr) cert = s.getpeercert() self.assertTrue(cert) @@ -2096,19 +2085,19 @@ class SimpleBackgroundTests(unittest.TestCase): with open(SIGNING_CA) as f: pem = f.read() der = ssl.PEM_cert_to_DER_cert(pem) - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) - ctx.verify_mode = ssl.CERT_REQUIRED + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.load_verify_locations(cadata=pem) - with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + with ctx.wrap_socket(socket.socket(socket.AF_INET), + server_hostname=SIGNED_CERTFILE_HOSTNAME) as s: s.connect(self.server_addr) cert = s.getpeercert() self.assertTrue(cert) # same with DER - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) - ctx.verify_mode = ssl.CERT_REQUIRED + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.load_verify_locations(cadata=der) - with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + with ctx.wrap_socket(socket.socket(socket.AF_INET), + server_hostname=SIGNED_CERTFILE_HOSTNAME) as s: s.connect(self.server_addr) cert = s.getpeercert() self.assertTrue(cert) @@ -2302,7 +2291,8 @@ class SimpleBackgroundTests(unittest.TestCase): sock.connect(self.server_addr) incoming = ssl.MemoryBIO() outgoing = ssl.MemoryBIO() - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE sslobj = ctx.wrap_bio(incoming, outgoing, False) self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) @@ -2384,7 +2374,6 @@ class ThreadedEchoServer(threading.Thread): try: self.sslconn = self.server.context.wrap_socket( self.sock, server_side=True) - self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol()) self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol()) except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e: # We treat ConnectionResetError as though it were an @@ -2433,8 +2422,6 @@ class ThreadedEchoServer(threading.Thread): cipher = self.sslconn.cipher() if support.verbose and self.server.chatty: sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") - sys.stdout.write(" server: selected protocol is now " - + str(self.sslconn.selected_npn_protocol()) + "\n") return True def read(self): @@ -2562,7 +2549,7 @@ class ThreadedEchoServer(threading.Thread): def __init__(self, certificate=None, ssl_version=None, certreqs=None, cacerts=None, chatty=True, connectionchatty=False, starttls_server=False, - npn_protocols=None, alpn_protocols=None, + alpn_protocols=None, ciphers=None, context=None): if context: self.context = context @@ -2576,8 +2563,6 @@ class ThreadedEchoServer(threading.Thread): self.context.load_verify_locations(cacerts) if certificate: self.context.load_cert_chain(certificate) - if npn_protocols: - self.context.set_npn_protocols(npn_protocols) if alpn_protocols: self.context.set_alpn_protocols(alpn_protocols) if ciphers: @@ -2589,7 +2574,6 @@ class ThreadedEchoServer(threading.Thread): self.port = socket_helper.bind_port(self.sock) self.flag = None self.active = False - self.selected_npn_protocols = [] self.selected_alpn_protocols = [] self.shared_ciphers = [] self.conn_errors = [] @@ -2796,14 +2780,12 @@ def server_params_test(client_context, server_context, indata=b"FOO\n", 'cipher': s.cipher(), 'peercert': s.getpeercert(), 'client_alpn_protocol': s.selected_alpn_protocol(), - 'client_npn_protocol': s.selected_npn_protocol(), 'version': s.version(), 'session_reused': s.session_reused, 'session': s.session, }) s.close() stats['server_alpn_protocols'] = server.selected_alpn_protocols - stats['server_npn_protocols'] = server.selected_npn_protocols stats['server_shared_ciphers'] = server.shared_ciphers return stats @@ -2829,21 +2811,26 @@ def try_protocol_combo(server_protocol, client_protocol, expect_success, (ssl.get_protocol_name(client_protocol), ssl.get_protocol_name(server_protocol), certtype)) - client_context = ssl.SSLContext(client_protocol) - client_context.options |= client_options - server_context = ssl.SSLContext(server_protocol) - server_context.options |= server_options + + with warnings_helper.check_warnings(): + # ignore Deprecation warnings + client_context = ssl.SSLContext(client_protocol) + client_context.options |= client_options + server_context = ssl.SSLContext(server_protocol) + server_context.options |= server_options min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None) if (min_version is not None - # SSLContext.minimum_version is only available on recent OpenSSL - # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1) - and hasattr(server_context, 'minimum_version') - and server_protocol == ssl.PROTOCOL_TLS - and server_context.minimum_version > min_version): + # SSLContext.minimum_version is only available on recent OpenSSL + # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1) + and hasattr(server_context, 'minimum_version') + and server_protocol == ssl.PROTOCOL_TLS + and server_context.minimum_version > min_version + ): # If OpenSSL configuration is strict and requires more recent TLS # version, we have to change the minimum to test old TLS versions. - server_context.minimum_version = min_version + with warnings_helper.check_warnings(): + server_context.minimum_version = min_version # NOTE: we must enable "ALL" ciphers on the client, otherwise an # SSLv23 client will send an SSLv3 hello (rather than SSLv2) @@ -2886,17 +2873,6 @@ class ThreadedTests(unittest.TestCase): """Basic test of an SSL client connecting to a server""" if support.verbose: sys.stdout.write("\n") - for protocol in PROTOCOLS: - if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}: - continue - if not has_tls_protocol(protocol): - continue - with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]): - context = ssl.SSLContext(protocol) - context.load_cert_chain(CERTFILE) - seclevel_workaround(context) - server_params_test(context, context, - chatty=True, connectionchatty=True) client_context, server_context, hostname = testing_context() @@ -3565,8 +3541,7 @@ class ThreadedTests(unittest.TestCase): server_side=False, certfile=CERTFILE, ca_certs=CERTFILE, - cert_reqs=ssl.CERT_NONE, - ssl_version=ssl.PROTOCOL_TLS_CLIENT) + cert_reqs=ssl.CERT_NONE) s.connect((HOST, server.port)) # helper methods for standardising recv* method signatures def _recv_into(): @@ -3718,8 +3693,7 @@ class ThreadedTests(unittest.TestCase): server_side=False, certfile=CERTFILE, ca_certs=CERTFILE, - cert_reqs=ssl.CERT_NONE, - ssl_version=ssl.PROTOCOL_TLS_CLIENT) + cert_reqs=ssl.CERT_NONE) s.connect((HOST, server.port)) s.setblocking(False) @@ -3788,14 +3762,11 @@ class ThreadedTests(unittest.TestCase): def test_server_accept(self): # Issue #16357: accept() on a SSLSocket created through # SSLContext.wrap_socket(). - context = ssl.SSLContext(ssl.PROTOCOL_TLS) - context.verify_mode = ssl.CERT_REQUIRED - context.load_verify_locations(SIGNING_CA) - context.load_cert_chain(SIGNED_CERTFILE) + client_ctx, server_ctx, hostname = testing_context() server = socket.socket(socket.AF_INET) host = "127.0.0.1" port = socket_helper.bind_port(server) - server = context.wrap_socket(server, server_side=True) + server = server_ctx.wrap_socket(server, server_side=True) self.assertTrue(server.server_side) evt = threading.Event() @@ -3813,8 +3784,10 @@ class ThreadedTests(unittest.TestCase): t.start() # Client wait until server setup and perform a connect. evt.wait() - client = context.wrap_socket(socket.socket()) - client.connect((host, port)) + client = client_ctx.wrap_socket( + socket.socket(), server_hostname=hostname + ) + client.connect((hostname, port)) client.send(b'data') client.recv() client_addr = client.getsockname() @@ -3827,14 +3800,16 @@ class ThreadedTests(unittest.TestCase): self.assertEqual(peer, client_addr) def test_getpeercert_enotconn(self): - context = ssl.SSLContext(ssl.PROTOCOL_TLS) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + context.check_hostname = False with context.wrap_socket(socket.socket()) as sock: with self.assertRaises(OSError) as cm: sock.getpeercert() self.assertEqual(cm.exception.errno, errno.ENOTCONN) def test_do_handshake_enotconn(self): - context = ssl.SSLContext(ssl.PROTOCOL_TLS) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + context.check_hostname = False with context.wrap_socket(socket.socket()) as sock: with self.assertRaises(OSError) as cm: sock.do_handshake() @@ -3875,13 +3850,11 @@ class ThreadedTests(unittest.TestCase): @requires_tls_version('TLSv1_3') def test_tls1_3(self): - context = ssl.SSLContext(ssl.PROTOCOL_TLS) - context.load_cert_chain(CERTFILE) - context.options |= ( - ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2 - ) - with ThreadedEchoServer(context=context) as server: - with context.wrap_socket(socket.socket()) as s: + client_context, server_context, hostname = testing_context() + client_context.minimum_version = ssl.TLSVersion.TLSv1_3 + with ThreadedEchoServer(context=server_context) as server: + with client_context.wrap_socket(socket.socket(), + server_hostname=hostname) as s: s.connect((HOST, server.port)) self.assertIn(s.cipher()[0], { 'TLS_AES_256_GCM_SHA384', @@ -3892,6 +3865,8 @@ class ThreadedTests(unittest.TestCase): @requires_minimum_version @requires_tls_version('TLSv1_2') + @requires_tls_version('TLSv1') + @ignore_deprecation def test_min_max_version_tlsv1_2(self): client_context, server_context, hostname = testing_context() # client TLSv1.0 to 1.2 @@ -3909,6 +3884,7 @@ class ThreadedTests(unittest.TestCase): @requires_minimum_version @requires_tls_version('TLSv1_1') + @ignore_deprecation def test_min_max_version_tlsv1_1(self): client_context, server_context, hostname = testing_context() # client 1.0 to 1.2, server 1.0 to 1.1 @@ -3927,6 +3903,7 @@ class ThreadedTests(unittest.TestCase): @requires_minimum_version @requires_tls_version('TLSv1_2') @requires_tls_version('TLSv1') + @ignore_deprecation def test_min_max_version_mismatch(self): client_context, server_context, hostname = testing_context() # client 1.0, server 1.2 (mismatch) @@ -3962,17 +3939,17 @@ class ThreadedTests(unittest.TestCase): def test_default_ecdh_curve(self): # Issue #21015: elliptic curve-based Diffie Hellman key exchange # should be enabled by default on SSL contexts. - context = ssl.SSLContext(ssl.PROTOCOL_TLS) - context.load_cert_chain(CERTFILE) + client_context, server_context, hostname = testing_context() # TLSv1.3 defaults to PFS key agreement and no longer has KEA in # cipher name. - context.options |= ssl.OP_NO_TLSv1_3 + client_context.maximum_version = ssl.TLSVersion.TLSv1_2 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled # explicitly using the 'ECCdraft' cipher alias. Otherwise, # our default cipher list should prefer ECDH-based ciphers # automatically. - with ThreadedEchoServer(context=context) as server: - with context.wrap_socket(socket.socket()) as s: + with ThreadedEchoServer(context=server_context) as server: + with client_context.wrap_socket(socket.socket(), + server_hostname=hostname) as s: s.connect((HOST, server.port)) self.assertIn("ECDH", s.cipher()[0]) @@ -4159,14 +4136,6 @@ class ThreadedTests(unittest.TestCase): self.assertEqual(server_result, expected, msg % (server_result, "server")) - def test_selected_npn_protocol(self): - # selected_npn_protocol() is None unless NPN is used - client_context, server_context, hostname = testing_context() - stats = server_params_test(client_context, server_context, - chatty=True, connectionchatty=True, - sni_name=hostname) - self.assertIs(stats['client_npn_protocol'], None) - def test_npn_protocols(self): assert not ssl.HAS_NPN @@ -4313,13 +4282,11 @@ class ThreadedTests(unittest.TestCase): with open(os_helper.TESTFN, 'wb') as f: f.write(TEST_DATA) self.addCleanup(os_helper.unlink, os_helper.TESTFN) - context = ssl.SSLContext(ssl.PROTOCOL_TLS) - context.verify_mode = ssl.CERT_REQUIRED - context.load_verify_locations(SIGNING_CA) - context.load_cert_chain(SIGNED_CERTFILE) - server = ThreadedEchoServer(context=context, chatty=False) + client_context, server_context, hostname = testing_context() + server = ThreadedEchoServer(context=server_context, chatty=False) with server: - with context.wrap_socket(socket.socket()) as s: + with client_context.wrap_socket(socket.socket(), + server_hostname=hostname) as s: s.connect((HOST, server.port)) with open(os_helper.TESTFN, 'rb') as file: s.sendfile(file) @@ -4437,7 +4404,7 @@ class ThreadedTests(unittest.TestCase): class TestPostHandshakeAuth(unittest.TestCase): def test_pha_setter(self): protocols = [ - ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT + ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT ] for protocol in protocols: ctx = ssl.SSLContext(protocol) |