summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/pythoninfo.py2
-rw-r--r--Lib/test/test_asyncio/utils.py4
-rw-r--r--Lib/test/test_ftplib.py2
-rw-r--r--Lib/test/test_hashlib.py6
-rw-r--r--Lib/test/test_imaplib.py2
-rw-r--r--Lib/test/test_nntplib.py2
-rw-r--r--Lib/test/test_poplib.py2
-rw-r--r--Lib/test/test_ssl.py273
8 files changed, 132 insertions, 161 deletions
diff --git a/Lib/test/pythoninfo.py b/Lib/test/pythoninfo.py
index cc228fb..278dfe7 100644
--- a/Lib/test/pythoninfo.py
+++ b/Lib/test/pythoninfo.py
@@ -504,7 +504,7 @@ def collect_ssl(info_add):
copy_attributes(info_add, ssl, 'ssl.%s', attributes, formatter=format_attr)
for name, ctx in (
- ('SSLContext', ssl.SSLContext()),
+ ('SSLContext', ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)),
('default_https_context', ssl._create_default_https_context()),
('stdlib_context', ssl._create_stdlib_context()),
):
diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py
index 0032c9a..3765194 100644
--- a/Lib/test/test_asyncio/utils.py
+++ b/Lib/test/test_asyncio/utils.py
@@ -91,7 +91,7 @@ def dummy_ssl_context():
if ssl is None:
return None
else:
- return ssl.SSLContext(ssl.PROTOCOL_TLS)
+ return simple_client_sslcontext(disable_verify=True)
def run_briefly(loop):
@@ -158,7 +158,7 @@ class SSLWSGIServerMixin:
# contains the ssl key and certificate files) differs
# between the stdlib and stand-alone asyncio.
# Prefer our own if we can find it.
- context = ssl.SSLContext()
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(ONLYCERT, ONLYKEY)
ssock = context.wrap_socket(request, server_side=True)
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py
index 154dce1..a48b429 100644
--- a/Lib/test/test_ftplib.py
+++ b/Lib/test/test_ftplib.py
@@ -324,7 +324,7 @@ if ssl is not None:
_ssl_closing = False
def secure_connection(self):
- context = ssl.SSLContext()
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(CERTFILE)
socket = context.wrap_socket(self.socket,
suppress_ragged_eofs=False,
diff --git a/Lib/test/test_hashlib.py b/Lib/test/test_hashlib.py
index bf9f559..9e9c874 100644
--- a/Lib/test/test_hashlib.py
+++ b/Lib/test/test_hashlib.py
@@ -21,6 +21,7 @@ from test import support
from test.support import _4G, bigmemtest
from test.support.import_helper import import_fresh_module
from test.support import threading_helper
+from test.support import warnings_helper
from http.client import HTTPException
# Were we compiled --with-pydebug or with #define Py_DEBUG?
@@ -1021,7 +1022,10 @@ class KDFTests(unittest.TestCase):
@unittest.skipIf(builtin_hashlib is None, "test requires builtin_hashlib")
def test_pbkdf2_hmac_py(self):
- self._test_pbkdf2_hmac(builtin_hashlib.pbkdf2_hmac, builtin_hashes)
+ with warnings_helper.check_warnings():
+ self._test_pbkdf2_hmac(
+ builtin_hashlib.pbkdf2_hmac, builtin_hashes
+ )
@unittest.skipUnless(hasattr(openssl_hashlib, 'pbkdf2_hmac'),
' test requires OpenSSL > 1.0')
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py
index 0cab789..c2b935f 100644
--- a/Lib/test/test_imaplib.py
+++ b/Lib/test/test_imaplib.py
@@ -96,7 +96,7 @@ if ssl:
def get_request(self):
newsocket, fromaddr = self.socket.accept()
- context = ssl.SSLContext()
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(CERTFILE)
connstream = context.wrap_socket(newsocket, server_side=True)
return connstream, fromaddr
diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py
index 4dbf941..230a444 100644
--- a/Lib/test/test_nntplib.py
+++ b/Lib/test/test_nntplib.py
@@ -1602,7 +1602,7 @@ class LocalServerTests(unittest.TestCase):
elif cmd == b'STARTTLS\r\n':
reader.close()
client.sendall(b'382 Begin TLS negotiation now\r\n')
- context = ssl.SSLContext()
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile)
client = context.wrap_socket(
client, server_side=True)
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py
index 5488683..c5ae9f7 100644
--- a/Lib/test/test_poplib.py
+++ b/Lib/test/test_poplib.py
@@ -155,7 +155,7 @@ class DummyPOP3Handler(asynchat.async_chat):
def cmd_stls(self, arg):
if self.tls_active is False:
self.push('+OK Begin TLS negotiation')
- context = ssl.SSLContext()
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(CERTFILE)
tls_sock = context.wrap_socket(self.socket,
server_side=True,
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 7b70979..a2c79ff 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -224,7 +224,7 @@ def has_tls_version(version):
# check runtime and dynamic crypto policy settings. A TLS version may
# be compiled in but disabled by a policy or config option.
- ctx = ssl.SSLContext()
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
if (
hasattr(ctx, 'minimum_version') and
ctx.minimum_version != ssl.TLSVersion.MINIMUM_SUPPORTED and
@@ -306,12 +306,20 @@ def asn1time(cert_time):
needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
+ignore_deprecation = warnings_helper.ignore_warnings(
+ category=DeprecationWarning
+)
+
-def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
+def test_wrap_socket(sock, *,
cert_reqs=ssl.CERT_NONE, ca_certs=None,
ciphers=None, certfile=None, keyfile=None,
**kwargs):
- context = ssl.SSLContext(ssl_version)
+ if not kwargs.get("server_side"):
+ kwargs["server_hostname"] = SIGNED_CERTFILE_HOSTNAME
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ else:
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
if cert_reqs is not None:
if cert_reqs == ssl.CERT_NONE:
context.check_hostname = False
@@ -378,8 +386,8 @@ class BasicSocketTests(unittest.TestCase):
def test_str_for_enums(self):
# Make sure that the PROTOCOL_* constants have enum-like string
# reprs.
- proto = ssl.PROTOCOL_TLS
- self.assertEqual(str(proto), 'PROTOCOL_TLS')
+ proto = ssl.PROTOCOL_TLS_CLIENT
+ self.assertEqual(str(proto), 'PROTOCOL_TLS_CLIENT')
ctx = ssl.SSLContext(proto)
self.assertIs(ctx.protocol, proto)
@@ -390,7 +398,8 @@ class BasicSocketTests(unittest.TestCase):
% (v, (v and "sufficient randomness") or
"insufficient randomness"))
- data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
+ with warnings_helper.check_warnings():
+ data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
self.assertEqual(len(data), 16)
self.assertEqual(is_cryptographic, v == 1)
if v:
@@ -401,48 +410,13 @@ class BasicSocketTests(unittest.TestCase):
# negative num is invalid
self.assertRaises(ValueError, ssl.RAND_bytes, -5)
- self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
+ with warnings_helper.check_warnings():
+ self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
- if hasattr(ssl, 'RAND_egd'):
- self.assertRaises(TypeError, ssl.RAND_egd, 1)
- self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
ssl.RAND_add("this is a random string", 75.0)
ssl.RAND_add(b"this is a random bytes object", 75.0)
ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
- @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
- def test_random_fork(self):
- status = ssl.RAND_status()
- if not status:
- self.fail("OpenSSL's PRNG has insufficient randomness")
-
- rfd, wfd = os.pipe()
- pid = os.fork()
- if pid == 0:
- try:
- os.close(rfd)
- child_random = ssl.RAND_pseudo_bytes(16)[0]
- self.assertEqual(len(child_random), 16)
- os.write(wfd, child_random)
- os.close(wfd)
- except BaseException:
- os._exit(1)
- else:
- os._exit(0)
- else:
- os.close(wfd)
- self.addCleanup(os.close, rfd)
- support.wait_process(pid, exitcode=0)
-
- child_random = os.read(rfd, 16)
- self.assertEqual(len(child_random), 16)
- parent_random = ssl.RAND_pseudo_bytes(16)[0]
- self.assertEqual(len(parent_random), 16)
-
- self.assertNotEqual(child_random, parent_random)
-
- maxDiff = None
-
def test_parse_cert(self):
# note that this uses an 'unofficial' function in _ssl.c,
# provided solely for this test, to exercise the certificate
@@ -624,6 +598,7 @@ class BasicSocketTests(unittest.TestCase):
with test_wrap_socket(s) as ss:
self.assertEqual(timeout, ss.gettimeout())
+ @ignore_deprecation
def test_errors_sslwrap(self):
sock = socket.socket()
self.assertRaisesRegex(ValueError,
@@ -675,6 +650,7 @@ class BasicSocketTests(unittest.TestCase):
"""Wrapping with a badly formatted key (syntax error)"""
self.bad_cert_test("badkey.pem")
+ @ignore_deprecation
def test_match_hostname(self):
def ok(cert, hostname):
ssl.match_hostname(cert, hostname)
@@ -1126,17 +1102,15 @@ class ContextTests(unittest.TestCase):
def test_constructor(self):
for protocol in PROTOCOLS:
- ssl.SSLContext(protocol)
- ctx = ssl.SSLContext()
+ with warnings_helper.check_warnings():
+ ctx = ssl.SSLContext(protocol)
+ self.assertEqual(ctx.protocol, protocol)
+ with warnings_helper.check_warnings():
+ ctx = ssl.SSLContext()
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
self.assertRaises(ValueError, ssl.SSLContext, -1)
self.assertRaises(ValueError, ssl.SSLContext, 42)
- def test_protocol(self):
- for proto in PROTOCOLS:
- ctx = ssl.SSLContext(proto)
- self.assertEqual(ctx.protocol, proto)
-
def test_ciphers(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.set_ciphers("ALL")
@@ -1174,16 +1148,19 @@ class ContextTests(unittest.TestCase):
OP_ENABLE_MIDDLEBOX_COMPAT |
OP_IGNORE_UNEXPECTED_EOF)
self.assertEqual(default, ctx.options)
- ctx.options |= ssl.OP_NO_TLSv1
+ with warnings_helper.check_warnings():
+ ctx.options |= ssl.OP_NO_TLSv1
self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
- ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
+ with warnings_helper.check_warnings():
+ ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
self.assertEqual(default, ctx.options)
ctx.options = 0
# Ubuntu has OP_NO_SSLv3 forced on by default
self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
def test_verify_mode_protocol(self):
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ with warnings_helper.check_warnings():
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
# Default value
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
ctx.verify_mode = ssl.CERT_OPTIONAL
@@ -1221,6 +1198,7 @@ class ContextTests(unittest.TestCase):
@requires_minimum_version
@unittest.skipIf(IS_LIBRESSL, "see bpo-34001")
+ @ignore_deprecation
def test_min_max_version(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
@@ -1304,7 +1282,7 @@ class ContextTests(unittest.TestCase):
"requires OpenSSL >= 1.1.0"
)
def test_security_level(self):
- ctx = ssl.SSLContext()
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
# The default security callback allows for levels between 0-5
# with OpenSSL defaulting to 1, however some vendors override the
# default value (e.g. Debian defaults to 2)
@@ -1513,7 +1491,7 @@ class ContextTests(unittest.TestCase):
ctx.load_dh_params(CERTFILE)
def test_session_stats(self):
- for proto in PROTOCOLS:
+ for proto in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
ctx = ssl.SSLContext(proto)
self.assertEqual(ctx.session_stats(), {
'number': 0,
@@ -1673,7 +1651,7 @@ class ContextTests(unittest.TestCase):
def test_create_default_context(self):
ctx = ssl.create_default_context()
- self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
+ self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_CLIENT)
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
self.assertTrue(ctx.check_hostname)
self._assert_context_options(ctx)
@@ -1682,42 +1660,49 @@ class ContextTests(unittest.TestCase):
cadata = f.read()
ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
cadata=cadata)
- self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
+ self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_CLIENT)
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
self._assert_context_options(ctx)
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
+ self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_SERVER)
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
self._assert_context_options(ctx)
+
+
def test__create_stdlib_context(self):
ctx = ssl._create_stdlib_context()
- self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
+ self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_CLIENT)
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
self.assertFalse(ctx.check_hostname)
self._assert_context_options(ctx)
- ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
+ with warnings_helper.check_warnings():
+ ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
self._assert_context_options(ctx)
- ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
- cert_reqs=ssl.CERT_REQUIRED,
- check_hostname=True)
- self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
+ with warnings_helper.check_warnings():
+ ctx = ssl._create_stdlib_context(
+ ssl.PROTOCOL_TLSv1_2,
+ cert_reqs=ssl.CERT_REQUIRED,
+ check_hostname=True
+ )
+ self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1_2)
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
self.assertTrue(ctx.check_hostname)
self._assert_context_options(ctx)
ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
- self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
+ self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_SERVER)
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
self._assert_context_options(ctx)
def test_check_hostname(self):
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ with warnings_helper.check_warnings():
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
self.assertFalse(ctx.check_hostname)
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
@@ -2042,7 +2027,9 @@ class SimpleBackgroundTests(unittest.TestCase):
def test_connect_with_context(self):
# Same as test_connect, but with a separately created context
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ ctx.check_hostname = False
+ ctx.verify_mode = ssl.CERT_NONE
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(self.server_addr)
self.assertEqual({}, s.getpeercert())
@@ -2062,9 +2049,11 @@ class SimpleBackgroundTests(unittest.TestCase):
# This should fail because we have no verification certs. Connection
# failure crashes ThreadedEchoServer, so run this in an independent
# test method.
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
- ctx.verify_mode = ssl.CERT_REQUIRED
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ s = ctx.wrap_socket(
+ socket.socket(socket.AF_INET),
+ server_hostname=SIGNED_CERTFILE_HOSTNAME
+ )
self.addCleanup(s.close)
self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
s.connect, self.server_addr)
@@ -2075,19 +2064,19 @@ class SimpleBackgroundTests(unittest.TestCase):
# OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
# contain both versions of each certificate (same content, different
# filename) for this test to be portable across OpenSSL releases.
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
- ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.load_verify_locations(capath=CAPATH)
- with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ with ctx.wrap_socket(socket.socket(socket.AF_INET),
+ server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
s.connect(self.server_addr)
cert = s.getpeercert()
self.assertTrue(cert)
# Same with a bytes `capath` argument
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
- ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.load_verify_locations(capath=BYTES_CAPATH)
- with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ with ctx.wrap_socket(socket.socket(socket.AF_INET),
+ server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
s.connect(self.server_addr)
cert = s.getpeercert()
self.assertTrue(cert)
@@ -2096,19 +2085,19 @@ class SimpleBackgroundTests(unittest.TestCase):
with open(SIGNING_CA) as f:
pem = f.read()
der = ssl.PEM_cert_to_DER_cert(pem)
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
- ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.load_verify_locations(cadata=pem)
- with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ with ctx.wrap_socket(socket.socket(socket.AF_INET),
+ server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
s.connect(self.server_addr)
cert = s.getpeercert()
self.assertTrue(cert)
# same with DER
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
- ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.load_verify_locations(cadata=der)
- with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ with ctx.wrap_socket(socket.socket(socket.AF_INET),
+ server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
s.connect(self.server_addr)
cert = s.getpeercert()
self.assertTrue(cert)
@@ -2302,7 +2291,8 @@ class SimpleBackgroundTests(unittest.TestCase):
sock.connect(self.server_addr)
incoming = ssl.MemoryBIO()
outgoing = ssl.MemoryBIO()
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
sslobj = ctx.wrap_bio(incoming, outgoing, False)
self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
@@ -2384,7 +2374,6 @@ class ThreadedEchoServer(threading.Thread):
try:
self.sslconn = self.server.context.wrap_socket(
self.sock, server_side=True)
- self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
# We treat ConnectionResetError as though it were an
@@ -2433,8 +2422,6 @@ class ThreadedEchoServer(threading.Thread):
cipher = self.sslconn.cipher()
if support.verbose and self.server.chatty:
sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
- sys.stdout.write(" server: selected protocol is now "
- + str(self.sslconn.selected_npn_protocol()) + "\n")
return True
def read(self):
@@ -2562,7 +2549,7 @@ class ThreadedEchoServer(threading.Thread):
def __init__(self, certificate=None, ssl_version=None,
certreqs=None, cacerts=None,
chatty=True, connectionchatty=False, starttls_server=False,
- npn_protocols=None, alpn_protocols=None,
+ alpn_protocols=None,
ciphers=None, context=None):
if context:
self.context = context
@@ -2576,8 +2563,6 @@ class ThreadedEchoServer(threading.Thread):
self.context.load_verify_locations(cacerts)
if certificate:
self.context.load_cert_chain(certificate)
- if npn_protocols:
- self.context.set_npn_protocols(npn_protocols)
if alpn_protocols:
self.context.set_alpn_protocols(alpn_protocols)
if ciphers:
@@ -2589,7 +2574,6 @@ class ThreadedEchoServer(threading.Thread):
self.port = socket_helper.bind_port(self.sock)
self.flag = None
self.active = False
- self.selected_npn_protocols = []
self.selected_alpn_protocols = []
self.shared_ciphers = []
self.conn_errors = []
@@ -2796,14 +2780,12 @@ def server_params_test(client_context, server_context, indata=b"FOO\n",
'cipher': s.cipher(),
'peercert': s.getpeercert(),
'client_alpn_protocol': s.selected_alpn_protocol(),
- 'client_npn_protocol': s.selected_npn_protocol(),
'version': s.version(),
'session_reused': s.session_reused,
'session': s.session,
})
s.close()
stats['server_alpn_protocols'] = server.selected_alpn_protocols
- stats['server_npn_protocols'] = server.selected_npn_protocols
stats['server_shared_ciphers'] = server.shared_ciphers
return stats
@@ -2829,21 +2811,26 @@ def try_protocol_combo(server_protocol, client_protocol, expect_success,
(ssl.get_protocol_name(client_protocol),
ssl.get_protocol_name(server_protocol),
certtype))
- client_context = ssl.SSLContext(client_protocol)
- client_context.options |= client_options
- server_context = ssl.SSLContext(server_protocol)
- server_context.options |= server_options
+
+ with warnings_helper.check_warnings():
+ # ignore Deprecation warnings
+ client_context = ssl.SSLContext(client_protocol)
+ client_context.options |= client_options
+ server_context = ssl.SSLContext(server_protocol)
+ server_context.options |= server_options
min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
if (min_version is not None
- # SSLContext.minimum_version is only available on recent OpenSSL
- # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
- and hasattr(server_context, 'minimum_version')
- and server_protocol == ssl.PROTOCOL_TLS
- and server_context.minimum_version > min_version):
+ # SSLContext.minimum_version is only available on recent OpenSSL
+ # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
+ and hasattr(server_context, 'minimum_version')
+ and server_protocol == ssl.PROTOCOL_TLS
+ and server_context.minimum_version > min_version
+ ):
# If OpenSSL configuration is strict and requires more recent TLS
# version, we have to change the minimum to test old TLS versions.
- server_context.minimum_version = min_version
+ with warnings_helper.check_warnings():
+ server_context.minimum_version = min_version
# NOTE: we must enable "ALL" ciphers on the client, otherwise an
# SSLv23 client will send an SSLv3 hello (rather than SSLv2)
@@ -2886,17 +2873,6 @@ class ThreadedTests(unittest.TestCase):
"""Basic test of an SSL client connecting to a server"""
if support.verbose:
sys.stdout.write("\n")
- for protocol in PROTOCOLS:
- if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
- continue
- if not has_tls_protocol(protocol):
- continue
- with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
- context = ssl.SSLContext(protocol)
- context.load_cert_chain(CERTFILE)
- seclevel_workaround(context)
- server_params_test(context, context,
- chatty=True, connectionchatty=True)
client_context, server_context, hostname = testing_context()
@@ -3565,8 +3541,7 @@ class ThreadedTests(unittest.TestCase):
server_side=False,
certfile=CERTFILE,
ca_certs=CERTFILE,
- cert_reqs=ssl.CERT_NONE,
- ssl_version=ssl.PROTOCOL_TLS_CLIENT)
+ cert_reqs=ssl.CERT_NONE)
s.connect((HOST, server.port))
# helper methods for standardising recv* method signatures
def _recv_into():
@@ -3718,8 +3693,7 @@ class ThreadedTests(unittest.TestCase):
server_side=False,
certfile=CERTFILE,
ca_certs=CERTFILE,
- cert_reqs=ssl.CERT_NONE,
- ssl_version=ssl.PROTOCOL_TLS_CLIENT)
+ cert_reqs=ssl.CERT_NONE)
s.connect((HOST, server.port))
s.setblocking(False)
@@ -3788,14 +3762,11 @@ class ThreadedTests(unittest.TestCase):
def test_server_accept(self):
# Issue #16357: accept() on a SSLSocket created through
# SSLContext.wrap_socket().
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
- context.verify_mode = ssl.CERT_REQUIRED
- context.load_verify_locations(SIGNING_CA)
- context.load_cert_chain(SIGNED_CERTFILE)
+ client_ctx, server_ctx, hostname = testing_context()
server = socket.socket(socket.AF_INET)
host = "127.0.0.1"
port = socket_helper.bind_port(server)
- server = context.wrap_socket(server, server_side=True)
+ server = server_ctx.wrap_socket(server, server_side=True)
self.assertTrue(server.server_side)
evt = threading.Event()
@@ -3813,8 +3784,10 @@ class ThreadedTests(unittest.TestCase):
t.start()
# Client wait until server setup and perform a connect.
evt.wait()
- client = context.wrap_socket(socket.socket())
- client.connect((host, port))
+ client = client_ctx.wrap_socket(
+ socket.socket(), server_hostname=hostname
+ )
+ client.connect((hostname, port))
client.send(b'data')
client.recv()
client_addr = client.getsockname()
@@ -3827,14 +3800,16 @@ class ThreadedTests(unittest.TestCase):
self.assertEqual(peer, client_addr)
def test_getpeercert_enotconn(self):
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context.check_hostname = False
with context.wrap_socket(socket.socket()) as sock:
with self.assertRaises(OSError) as cm:
sock.getpeercert()
self.assertEqual(cm.exception.errno, errno.ENOTCONN)
def test_do_handshake_enotconn(self):
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ context.check_hostname = False
with context.wrap_socket(socket.socket()) as sock:
with self.assertRaises(OSError) as cm:
sock.do_handshake()
@@ -3875,13 +3850,11 @@ class ThreadedTests(unittest.TestCase):
@requires_tls_version('TLSv1_3')
def test_tls1_3(self):
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
- context.load_cert_chain(CERTFILE)
- context.options |= (
- ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
- )
- with ThreadedEchoServer(context=context) as server:
- with context.wrap_socket(socket.socket()) as s:
+ client_context, server_context, hostname = testing_context()
+ client_context.minimum_version = ssl.TLSVersion.TLSv1_3
+ with ThreadedEchoServer(context=server_context) as server:
+ with client_context.wrap_socket(socket.socket(),
+ server_hostname=hostname) as s:
s.connect((HOST, server.port))
self.assertIn(s.cipher()[0], {
'TLS_AES_256_GCM_SHA384',
@@ -3892,6 +3865,8 @@ class ThreadedTests(unittest.TestCase):
@requires_minimum_version
@requires_tls_version('TLSv1_2')
+ @requires_tls_version('TLSv1')
+ @ignore_deprecation
def test_min_max_version_tlsv1_2(self):
client_context, server_context, hostname = testing_context()
# client TLSv1.0 to 1.2
@@ -3909,6 +3884,7 @@ class ThreadedTests(unittest.TestCase):
@requires_minimum_version
@requires_tls_version('TLSv1_1')
+ @ignore_deprecation
def test_min_max_version_tlsv1_1(self):
client_context, server_context, hostname = testing_context()
# client 1.0 to 1.2, server 1.0 to 1.1
@@ -3927,6 +3903,7 @@ class ThreadedTests(unittest.TestCase):
@requires_minimum_version
@requires_tls_version('TLSv1_2')
@requires_tls_version('TLSv1')
+ @ignore_deprecation
def test_min_max_version_mismatch(self):
client_context, server_context, hostname = testing_context()
# client 1.0, server 1.2 (mismatch)
@@ -3962,17 +3939,17 @@ class ThreadedTests(unittest.TestCase):
def test_default_ecdh_curve(self):
# Issue #21015: elliptic curve-based Diffie Hellman key exchange
# should be enabled by default on SSL contexts.
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
- context.load_cert_chain(CERTFILE)
+ client_context, server_context, hostname = testing_context()
# TLSv1.3 defaults to PFS key agreement and no longer has KEA in
# cipher name.
- context.options |= ssl.OP_NO_TLSv1_3
+ client_context.maximum_version = ssl.TLSVersion.TLSv1_2
# Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
# explicitly using the 'ECCdraft' cipher alias. Otherwise,
# our default cipher list should prefer ECDH-based ciphers
# automatically.
- with ThreadedEchoServer(context=context) as server:
- with context.wrap_socket(socket.socket()) as s:
+ with ThreadedEchoServer(context=server_context) as server:
+ with client_context.wrap_socket(socket.socket(),
+ server_hostname=hostname) as s:
s.connect((HOST, server.port))
self.assertIn("ECDH", s.cipher()[0])
@@ -4159,14 +4136,6 @@ class ThreadedTests(unittest.TestCase):
self.assertEqual(server_result, expected,
msg % (server_result, "server"))
- def test_selected_npn_protocol(self):
- # selected_npn_protocol() is None unless NPN is used
- client_context, server_context, hostname = testing_context()
- stats = server_params_test(client_context, server_context,
- chatty=True, connectionchatty=True,
- sni_name=hostname)
- self.assertIs(stats['client_npn_protocol'], None)
-
def test_npn_protocols(self):
assert not ssl.HAS_NPN
@@ -4313,13 +4282,11 @@ class ThreadedTests(unittest.TestCase):
with open(os_helper.TESTFN, 'wb') as f:
f.write(TEST_DATA)
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
- context.verify_mode = ssl.CERT_REQUIRED
- context.load_verify_locations(SIGNING_CA)
- context.load_cert_chain(SIGNED_CERTFILE)
- server = ThreadedEchoServer(context=context, chatty=False)
+ client_context, server_context, hostname = testing_context()
+ server = ThreadedEchoServer(context=server_context, chatty=False)
with server:
- with context.wrap_socket(socket.socket()) as s:
+ with client_context.wrap_socket(socket.socket(),
+ server_hostname=hostname) as s:
s.connect((HOST, server.port))
with open(os_helper.TESTFN, 'rb') as file:
s.sendfile(file)
@@ -4437,7 +4404,7 @@ class ThreadedTests(unittest.TestCase):
class TestPostHandshakeAuth(unittest.TestCase):
def test_pha_setter(self):
protocols = [
- ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
+ ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
]
for protocol in protocols:
ctx = ssl.SSLContext(protocol)