diff options
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r-- | Lib/test/test_ssl.py | 953 |
1 files changed, 683 insertions, 270 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index f2f4f80..7edf5b2 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -5,39 +5,86 @@ import unittest from test import support import socket import select -import errno -import subprocess import time import gc import os import errno import pprint -import urllib.parse, urllib.request -import shutil +import tempfile +import urllib.request import traceback import asyncore import weakref +import platform +import functools -from http.server import HTTPServer, SimpleHTTPRequestHandler +ssl = support.import_module("ssl") -# Optionally test SSL support, if we have it in the tested platform -skip_expected = False -try: - import ssl -except ImportError: - skip_expected = True +PROTOCOLS = [ + ssl.PROTOCOL_SSLv3, + ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1 +] +if hasattr(ssl, 'PROTOCOL_SSLv2'): + PROTOCOLS.append(ssl.PROTOCOL_SSLv2) HOST = support.HOST -CERTFILE = None -SVN_PYTHON_ORG_ROOT_CERT = None + +data_file = lambda name: os.path.join(os.path.dirname(__file__), name) + +# The custom key and certificate files used in test_ssl are generated +# using Lib/test/make_ssl_certs.py. +# Other certificates are simply fetched from the Internet servers they +# are meant to authenticate. + +CERTFILE = data_file("keycert.pem") +BYTES_CERTFILE = os.fsencode(CERTFILE) +ONLYCERT = data_file("ssl_cert.pem") +ONLYKEY = data_file("ssl_key.pem") +BYTES_ONLYCERT = os.fsencode(ONLYCERT) +BYTES_ONLYKEY = os.fsencode(ONLYKEY) +CAPATH = data_file("capath") +BYTES_CAPATH = os.fsencode(CAPATH) + +SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem") + +EMPTYCERT = data_file("nullcert.pem") +BADCERT = data_file("badcert.pem") +WRONGCERT = data_file("XXXnonexisting.pem") +BADKEY = data_file("badkey.pem") + def handle_error(prefix): exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) if support.verbose: sys.stdout.write(prefix + exc_format) +def can_clear_options(): + # 0.9.8m or higher + return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 13, 15) + +def no_sslv2_implies_sslv3_hello(): + # 0.9.7h or higher + return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15) + + +# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 +def skip_if_broken_ubuntu_ssl(func): + if hasattr(ssl, 'PROTOCOL_SSLv2'): + @functools.wraps(func) + def f(*args, **kwargs): + try: + ssl.SSLContext(ssl.PROTOCOL_SSLv2) + except ssl.SSLError: + if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and + platform.linux_distribution() == ('debian', 'squeeze/sid', '')): + raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour") + return func(*args, **kwargs) + return f + else: + return func -class BasicTests(unittest.TestCase): + +class BasicSocketTests(unittest.TestCase): def test_constants(self): #ssl.PROTOCOL_SSLv2 @@ -47,6 +94,7 @@ class BasicTests(unittest.TestCase): ssl.CERT_NONE ssl.CERT_OPTIONAL ssl.CERT_REQUIRED + self.assertIn(ssl.HAS_SNI, {True, False}) def test_random(self): v = ssl.RAND_status() @@ -66,7 +114,7 @@ class BasicTests(unittest.TestCase): # note that this uses an 'unofficial' function in _ssl.c, # provided solely for this test, to exercise the certificate # parsing code - p = ssl._ssl._test_decode_cert(CERTFILE, False) + p = ssl._ssl._test_decode_cert(CERTFILE) if support.verbose: sys.stdout.write("\n" + pprint.pformat(p) + "\n") @@ -82,6 +130,33 @@ class BasicTests(unittest.TestCase): if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'): self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2) + def test_openssl_version(self): + n = ssl.OPENSSL_VERSION_NUMBER + t = ssl.OPENSSL_VERSION_INFO + s = ssl.OPENSSL_VERSION + self.assertIsInstance(n, int) + self.assertIsInstance(t, tuple) + self.assertIsInstance(s, str) + # Some sanity checks follow + # >= 0.9 + self.assertGreaterEqual(n, 0x900000) + # < 2.0 + self.assertLess(n, 0x20000000) + major, minor, fix, patch, status = t + self.assertGreaterEqual(major, 0) + self.assertLess(major, 2) + self.assertGreaterEqual(minor, 0) + self.assertLess(minor, 256) + self.assertGreaterEqual(fix, 0) + self.assertLess(fix, 256) + self.assertGreaterEqual(patch, 0) + self.assertLessEqual(patch, 26) + self.assertGreaterEqual(status, 0) + self.assertLessEqual(status, 15) + # Version string as returned by OpenSSL, the format might change + self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)), + (s, t)) + @support.cpython_only def test_refcycle(self): # Issue #7943: an SSL object doesn't create reference cycles with @@ -113,6 +188,263 @@ class BasicTests(unittest.TestCase): ss = ssl.wrap_socket(s) self.assertEqual(timeout, ss.gettimeout()) + def test_errors(self): + sock = socket.socket() + self.assertRaisesRegex(ValueError, + "certfile must be specified", + ssl.wrap_socket, sock, keyfile=CERTFILE) + self.assertRaisesRegex(ValueError, + "certfile must be specified for server-side operations", + ssl.wrap_socket, sock, server_side=True) + self.assertRaisesRegex(ValueError, + "certfile must be specified for server-side operations", + ssl.wrap_socket, sock, server_side=True, certfile="") + s = ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) + self.assertRaisesRegex(ValueError, "can't connect in server-side mode", + s.connect, (HOST, 8080)) + with self.assertRaises(IOError) as cm: + with socket.socket() as sock: + ssl.wrap_socket(sock, certfile=WRONGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaises(IOError) as cm: + with socket.socket() as sock: + ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaises(IOError) as cm: + with socket.socket() as sock: + ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + + def test_match_hostname(self): + def ok(cert, hostname): + ssl.match_hostname(cert, hostname) + def fail(cert, hostname): + self.assertRaises(ssl.CertificateError, + ssl.match_hostname, cert, hostname) + + cert = {'subject': ((('commonName', 'example.com'),),)} + ok(cert, 'example.com') + ok(cert, 'ExAmple.cOm') + fail(cert, 'www.example.com') + fail(cert, '.example.com') + fail(cert, 'example.org') + fail(cert, 'exampleXcom') + + cert = {'subject': ((('commonName', '*.a.com'),),)} + ok(cert, 'foo.a.com') + fail(cert, 'bar.foo.a.com') + fail(cert, 'a.com') + fail(cert, 'Xa.com') + fail(cert, '.a.com') + + cert = {'subject': ((('commonName', 'a.*.com'),),)} + ok(cert, 'a.foo.com') + fail(cert, 'a..com') + fail(cert, 'a.com') + + cert = {'subject': ((('commonName', 'f*.com'),),)} + ok(cert, 'foo.com') + ok(cert, 'f.com') + fail(cert, 'bar.com') + fail(cert, 'foo.a.com') + fail(cert, 'bar.foo.com') + + # Slightly fake real-world example + cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT', + 'subject': ((('commonName', 'linuxfrz.org'),),), + 'subjectAltName': (('DNS', 'linuxfr.org'), + ('DNS', 'linuxfr.com'), + ('othername', '<unsupported>'))} + ok(cert, 'linuxfr.org') + ok(cert, 'linuxfr.com') + # Not a "DNS" entry + fail(cert, '<unsupported>') + # When there is a subjectAltName, commonName isn't used + fail(cert, 'linuxfrz.org') + + # A pristine real-world example + cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT', + 'subject': ((('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'Mountain View'),), + (('organizationName', 'Google Inc'),), + (('commonName', 'mail.google.com'),))} + ok(cert, 'mail.google.com') + fail(cert, 'gmail.com') + # Only commonName is considered + fail(cert, 'California') + + # Neither commonName nor subjectAltName + cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT', + 'subject': ((('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'Mountain View'),), + (('organizationName', 'Google Inc'),))} + fail(cert, 'mail.google.com') + + # No DNS entry in subjectAltName but a commonName + cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT', + 'subject': ((('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'Mountain View'),), + (('commonName', 'mail.google.com'),)), + 'subjectAltName': (('othername', 'blabla'), )} + ok(cert, 'mail.google.com') + + # No DNS entry subjectAltName and no commonName + cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT', + 'subject': ((('countryName', 'US'),), + (('stateOrProvinceName', 'California'),), + (('localityName', 'Mountain View'),), + (('organizationName', 'Google Inc'),)), + 'subjectAltName': (('othername', 'blabla'),)} + fail(cert, 'google.com') + + # Empty cert / no cert + self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com') + self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com') + + def test_server_side(self): + # server_hostname doesn't work for server sockets + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + with socket.socket() as sock: + self.assertRaises(ValueError, ctx.wrap_socket, sock, True, + server_hostname="some.hostname") + +class ContextTests(unittest.TestCase): + + @skip_if_broken_ubuntu_ssl + def test_constructor(self): + if hasattr(ssl, 'PROTOCOL_SSLv2'): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv2) + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv3) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertRaises(TypeError, ssl.SSLContext) + self.assertRaises(ValueError, ssl.SSLContext, -1) + self.assertRaises(ValueError, ssl.SSLContext, 42) + + @skip_if_broken_ubuntu_ssl + def test_protocol(self): + for proto in PROTOCOLS: + ctx = ssl.SSLContext(proto) + self.assertEqual(ctx.protocol, proto) + + def test_ciphers(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.set_ciphers("ALL") + ctx.set_ciphers("DEFAULT") + with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): + ctx.set_ciphers("^$:,;?*'dorothyx") + + @skip_if_broken_ubuntu_ssl + def test_options(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + # OP_ALL is the default value + self.assertEqual(ssl.OP_ALL, ctx.options) + ctx.options |= ssl.OP_NO_SSLv2 + self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2, + ctx.options) + ctx.options |= ssl.OP_NO_SSLv3 + self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3, + ctx.options) + if can_clear_options(): + ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1 + self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3, + ctx.options) + ctx.options = 0 + self.assertEqual(0, ctx.options) + else: + with self.assertRaises(ValueError): + ctx.options = 0 + + def test_verify(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + # Default value + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + ctx.verify_mode = ssl.CERT_OPTIONAL + self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL) + ctx.verify_mode = ssl.CERT_REQUIRED + self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) + ctx.verify_mode = ssl.CERT_NONE + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + with self.assertRaises(TypeError): + ctx.verify_mode = None + with self.assertRaises(ValueError): + ctx.verify_mode = 42 + + def test_load_cert_chain(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + # Combined key and cert in a single file + ctx.load_cert_chain(CERTFILE) + ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE) + self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE) + with self.assertRaises(IOError) as cm: + ctx.load_cert_chain(WRONGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(BADCERT) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(EMPTYCERT) + # Separate key and cert + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_cert_chain(ONLYCERT, ONLYKEY) + ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY) + ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(ONLYCERT) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(ONLYKEY) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT) + # Mismatching key and cert + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"): + ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY) + + def test_load_verify_locations(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.load_verify_locations(CERTFILE) + ctx.load_verify_locations(cafile=CERTFILE, capath=None) + ctx.load_verify_locations(BYTES_CERTFILE) + ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None) + self.assertRaises(TypeError, ctx.load_verify_locations) + self.assertRaises(TypeError, ctx.load_verify_locations, None, None) + with self.assertRaises(IOError) as cm: + ctx.load_verify_locations(WRONGCERT) + self.assertEqual(cm.exception.errno, errno.ENOENT) + with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): + ctx.load_verify_locations(BADCERT) + ctx.load_verify_locations(CERTFILE, CAPATH) + ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH) + + # Issue #10989: crash if the second argument type is invalid + self.assertRaises(TypeError, ctx.load_verify_locations, None, True) + + @skip_if_broken_ubuntu_ssl + def test_session_stats(self): + for proto in PROTOCOLS: + ctx = ssl.SSLContext(proto) + self.assertEqual(ctx.session_stats(), { + 'number': 0, + 'connect': 0, + 'connect_good': 0, + 'connect_renegotiate': 0, + 'accept': 0, + 'accept_good': 0, + 'accept_renegotiate': 0, + 'hits': 0, + 'misses': 0, + 'timeouts': 0, + 'cache_full': 0, + }) + + def test_set_default_verify_paths(self): + # There's not much we can do to test that it acts as expected, + # so just check it doesn't crash or raise an exception. + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.set_default_verify_paths() + class NetworkedTests(unittest.TestCase): @@ -120,28 +452,150 @@ class NetworkedTests(unittest.TestCase): with support.transient_internet("svn.python.org"): s = ssl.wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_NONE) - s.connect(("svn.python.org", 443)) - c = s.getpeercert() - if c: - self.fail("Peer cert %s shouldn't be here!") - s.close() + try: + s.connect(("svn.python.org", 443)) + self.assertEqual({}, s.getpeercert()) + finally: + s.close() # this should fail because we have no verification certs s = ssl.wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED) + self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", + s.connect, ("svn.python.org", 443)) + s.close() + + # this should succeed because we specify the root cert + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SVN_PYTHON_ORG_ROOT_CERT) try: s.connect(("svn.python.org", 443)) - except ssl.SSLError: - pass + self.assertTrue(s.getpeercert()) finally: s.close() - # this should succeed because we specify the root cert + def test_connect_ex(self): + # Issue #11326: check connect_ex() implementation + with support.transient_internet("svn.python.org"): s = ssl.wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_REQUIRED, ca_certs=SVN_PYTHON_ORG_ROOT_CERT) try: + self.assertEqual(0, s.connect_ex(("svn.python.org", 443))) + self.assertTrue(s.getpeercert()) + finally: + s.close() + + def test_non_blocking_connect_ex(self): + # Issue #11326: non-blocking connect_ex() should allow handshake + # to proceed after the socket gets ready. + with support.transient_internet("svn.python.org"): + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SVN_PYTHON_ORG_ROOT_CERT, + do_handshake_on_connect=False) + try: + s.setblocking(False) + rc = s.connect_ex(('svn.python.org', 443)) + # EWOULDBLOCK under Windows, EINPROGRESS elsewhere + self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) + # Wait for connect to finish + select.select([], [s], [], 5.0) + # Non-blocking handshake + while True: + try: + s.do_handshake() + break + except ssl.SSLError as err: + if err.args[0] == ssl.SSL_ERROR_WANT_READ: + select.select([s], [], [], 5.0) + elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: + select.select([], [s], [], 5.0) + else: + raise + # SSL established + self.assertTrue(s.getpeercert()) + finally: + s.close() + + def test_timeout_connect_ex(self): + # Issue #12065: on a timeout, connect_ex() should return the original + # errno (mimicking the behaviour of non-SSL sockets). + with support.transient_internet("svn.python.org"): + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SVN_PYTHON_ORG_ROOT_CERT, + do_handshake_on_connect=False) + try: + s.settimeout(0.0000001) + rc = s.connect_ex(('svn.python.org', 443)) + if rc == 0: + self.skipTest("svn.python.org responded too quickly") + self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) + finally: + s.close() + + def test_connect_with_context(self): + with support.transient_internet("svn.python.org"): + # Same as test_connect, but with a separately created context + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + self.assertEqual({}, s.getpeercert()) + finally: + s.close() + # Same with a server hostname + s = ctx.wrap_socket(socket.socket(socket.AF_INET), + server_hostname="svn.python.org") + if ssl.HAS_SNI: s.connect(("svn.python.org", 443)) + s.close() + else: + self.assertRaises(ValueError, s.connect, ("svn.python.org", 443)) + # This should fail because we have no verification certs + ctx.verify_mode = ssl.CERT_REQUIRED + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", + s.connect, ("svn.python.org", 443)) + s.close() + # This should succeed because we specify the root cert + ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + + def test_connect_capath(self): + # Verify server certificates using the `capath` argument + # NOTE: the subject hashing algorithm has been changed between + # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must + # contain both versions of each certificate (same content, different + # filename) for this test to be portable across OpenSSL releases. + with support.transient_internet("svn.python.org"): + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) + finally: + s.close() + # Same with a bytes `capath` argument + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=BYTES_CAPATH) + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + s.connect(("svn.python.org", 443)) + try: + cert = s.getpeercert() + self.assertTrue(cert) finally: s.close() @@ -161,12 +615,9 @@ class NetworkedTests(unittest.TestCase): # Closing the SSL socket should close the fd too ss.close() gc.collect() - try: + with self.assertRaises(OSError) as e: os.read(fd, 0) - except OSError as e: - self.assertEqual(e.errno, errno.EBADF) - else: - self.fail("OSError wasn't raised") + self.assertEqual(e.exception.errno, errno.EBADF) def test_non_blocking_handshake(self): with support.transient_internet("svn.python.org"): @@ -214,19 +665,30 @@ class NetworkedTests(unittest.TestCase): if support.verbose: sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) - # Test disabled: OPENSSL_VERSION* not available in Python 3.1 + def test_ciphers(self): + remote = ("svn.python.org", 443) + with support.transient_internet(remote[0]): + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="ALL") + s.connect(remote) + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") + s.connect(remote) + # Error checking can happen at instantiation or when connecting + with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): + with socket.socket(socket.AF_INET) as sock: + s = ssl.wrap_socket(sock, + cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") + s.connect(remote) + def test_algorithms(self): - if support.verbose: - sys.stdout.write("test_algorithms disabled, " - "as it fails on some old OpenSSL versions") - return # Issue #8484: all algorithms should be available when verifying a # certificate. # SHA256 was added in OpenSSL 0.9.8 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) - # NOTE: https://sha256.tbs-internet.com is another possible test host - remote = ("sha2.hboeck.de", 443) + # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host) + remote = ("sha256.tbs-internet.com", 443) sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") with support.transient_internet("sha256.tbs-internet.com"): s = ssl.wrap_socket(socket.socket(socket.AF_INET), @@ -250,6 +712,8 @@ except ImportError: else: _have_threads = True + from test.ssl_servers import make_https_server + class ThreadedEchoServer(threading.Thread): class ConnectionHandler(threading.Thread): @@ -270,11 +734,8 @@ else: def wrap_conn(self): try: - self.sslconn = ssl.wrap_socket(self.sock, server_side=True, - certfile=self.server.certificate, - ssl_version=self.server.protocol, - ca_certs=self.server.cacerts, - cert_reqs=self.server.certreqs) + self.sslconn = self.server.context.wrap_socket( + self.sock, server_side=True) except ssl.SSLError: # XXX Various errors can have happened here, for example # a mismatching protocol version, an invalid certificate, @@ -286,7 +747,7 @@ else: self.close() return False else: - if self.server.certreqs == ssl.CERT_REQUIRED: + if self.server.context.verify_mode == ssl.CERT_REQUIRED: cert = self.sslconn.getpeercert() if support.verbose and self.server.chatty: sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") @@ -366,17 +827,24 @@ else: # harness, we want to stop the server self.server.stop() - def __init__(self, certificate, ssl_version=None, + def __init__(self, certificate=None, ssl_version=None, certreqs=None, cacerts=None, - chatty=True, connectionchatty=False, starttls_server=False): - if ssl_version is None: - ssl_version = ssl.PROTOCOL_TLSv1 - if certreqs is None: - certreqs = ssl.CERT_NONE - self.certificate = certificate - self.protocol = ssl_version - self.certreqs = certreqs - self.cacerts = cacerts + chatty=True, connectionchatty=False, starttls_server=False, + ciphers=None, context=None): + if context: + self.context = context + else: + self.context = ssl.SSLContext(ssl_version + if ssl_version is not None + else ssl.PROTOCOL_TLSv1) + self.context.verify_mode = (certreqs if certreqs is not None + else ssl.CERT_NONE) + if cacerts: + self.context.load_verify_locations(cacerts) + if certificate: + self.context.load_cert_chain(certificate) + if ciphers: + self.context.set_ciphers(ciphers) self.chatty = chatty self.connectionchatty = connectionchatty self.starttls_server = starttls_server @@ -415,98 +883,6 @@ else: def stop(self): self.active = False - class OurHTTPSServer(threading.Thread): - - # This one's based on HTTPServer, which is based on SocketServer - - class HTTPSServer(HTTPServer): - - def __init__(self, server_address, RequestHandlerClass, certfile): - HTTPServer.__init__(self, server_address, RequestHandlerClass) - # we assume the certfile contains both private key and certificate - self.certfile = certfile - self.allow_reuse_address = True - - def __str__(self): - return ('<%s %s:%s>' % - (self.__class__.__name__, - self.server_name, - self.server_port)) - - def get_request(self): - # override this to wrap socket with SSL - sock, addr = self.socket.accept() - sslconn = ssl.wrap_socket(sock, server_side=True, - certfile=self.certfile) - return sslconn, addr - - class RootedHTTPRequestHandler(SimpleHTTPRequestHandler): - # need to override translate_path to get a known root, - # instead of using os.curdir, since the test could be - # run from anywhere - - server_version = "TestHTTPS/1.0" - - root = None - - def translate_path(self, path): - """Translate a /-separated PATH to the local filename syntax. - - Components that mean special things to the local file system - (e.g. drive or directory names) are ignored. (XXX They should - probably be diagnosed.) - - """ - # abandon query parameters - path = urllib.parse.urlparse(path)[2] - path = os.path.normpath(urllib.parse.unquote(path)) - words = path.split('/') - words = filter(None, words) - path = self.root - for word in words: - drive, word = os.path.splitdrive(word) - head, word = os.path.split(word) - if word in self.root: continue - path = os.path.join(path, word) - return path - - def log_message(self, format, *args): - # we override this to suppress logging unless "verbose" - - if support.verbose: - sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" % - (self.server.server_address, - self.server.server_port, - self.request.cipher(), - self.log_date_time_string(), - format%args)) - - - def __init__(self, certfile): - self.flag = None - self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0] - self.server = self.HTTPSServer( - (HOST, 0), self.RootedHTTPRequestHandler, certfile) - self.port = self.server.server_port - threading.Thread.__init__(self) - self.daemon = True - - def __str__(self): - return "<%s %s>" % (self.__class__.__name__, self.server) - - def start(self, flag=None): - self.flag = flag - threading.Thread.start(self) - - def run(self): - if self.flag: - self.flag.set() - self.server.serve_forever(0.05) - - def stop(self): - self.server.shutdown() - - class AsyncoreEchoServer(threading.Thread): # this one's based on asyncore.dispatcher @@ -572,8 +948,7 @@ else: asyncore.dispatcher.__init__(self, sock) self.listen(5) - def handle_accept(self): - sock_obj, addr = self.accept() + def handle_accepted(self, sock_obj, addr): if support.verbose: sys.stdout.write(" server: new connection from %s:%s\n" %addr) self.ConnectionHandler(sock_obj, self.certfile) @@ -626,33 +1001,35 @@ else: # try to connect try: try: - s = ssl.wrap_socket(socket.socket(), - certfile=certfile, - ssl_version=ssl.PROTOCOL_TLSv1) - s.connect((HOST, server.port)) + with socket.socket() as sock: + s = ssl.wrap_socket(sock, + certfile=certfile, + ssl_version=ssl.PROTOCOL_TLSv1) + s.connect((HOST, server.port)) except ssl.SSLError as x: if support.verbose: sys.stdout.write("\nSSLError is %s\n" % x.args[1]) except socket.error as x: if support.verbose: sys.stdout.write("\nsocket.error is %s\n" % x.args[1]) + except IOError as x: + if x.errno != errno.ENOENT: + raise + if support.verbose: + sys.stdout.write("\IOError is %s\n" % str(x)) else: - self.fail("Use of invalid cert should have failed!") + raise AssertionError("Use of invalid cert should have failed!") finally: server.stop() server.join() - def server_params_test(certfile, protocol, certreqs, cacertsfile, - client_certfile, client_protocol=None, indata=b"FOO\n", + def server_params_test(client_context, server_context, indata=b"FOO\n", chatty=True, connectionchatty=False): """ Launch a server, connect a client to it and try various reads and writes. """ - server = ThreadedEchoServer(certfile, - certreqs=certreqs, - ssl_version=protocol, - cacerts=cacertsfile, + server = ThreadedEchoServer(context=server_context, chatty=chatty, connectionchatty=False) flag = threading.Event() @@ -660,30 +1037,24 @@ else: # wait for it to start flag.wait() # try to connect - if client_protocol is None: - client_protocol = protocol try: - s = ssl.wrap_socket(socket.socket(), - certfile=client_certfile, - ca_certs=cacertsfile, - cert_reqs=certreqs, - ssl_version=client_protocol) + s = client_context.wrap_socket(socket.socket()) s.connect((HOST, server.port)) - arg = indata - if connectionchatty: - if support.verbose: - sys.stdout.write( - " client: sending %r...\n" % indata) - s.write(arg) - outdata = s.read() - if connectionchatty: - if support.verbose: - sys.stdout.write(" client: read %r\n" % outdata) - if outdata != indata.lower(): - self.fail( - "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n" - % (outdata[:20], len(outdata), - indata[:20].lower(), len(indata))) + for arg in [indata, bytearray(indata), memoryview(indata)]: + if connectionchatty: + if support.verbose: + sys.stdout.write( + " client: sending %r...\n" % indata) + s.write(arg) + outdata = s.read() + if connectionchatty: + if support.verbose: + sys.stdout.write(" client: read %r\n" % outdata) + if outdata != indata.lower(): + raise AssertionError( + "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n" + % (outdata[:20], len(outdata), + indata[:20].lower(), len(indata))) s.write(b"over\n") if connectionchatty: if support.verbose: @@ -693,10 +1064,8 @@ else: server.stop() server.join() - def try_protocol_combo(server_protocol, - client_protocol, - expect_success, - certsreqs=None): + def try_protocol_combo(server_protocol, client_protocol, expect_success, + certsreqs=None, server_options=0, client_options=0): if certsreqs is None: certsreqs = ssl.CERT_NONE certtype = { @@ -710,11 +1079,21 @@ else: (ssl.get_protocol_name(client_protocol), ssl.get_protocol_name(server_protocol), certtype)) + client_context = ssl.SSLContext(client_protocol) + client_context.options = ssl.OP_ALL | client_options + server_context = ssl.SSLContext(server_protocol) + server_context.options = ssl.OP_ALL | server_options + for ctx in (client_context, server_context): + ctx.verify_mode = certsreqs + # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client + # will send an SSLv3 hello (rather than SSLv2) starting from + # OpenSSL 1.0.0 (see issue #8322). + ctx.set_ciphers("ALL") + ctx.load_cert_chain(CERTFILE) + ctx.load_verify_locations(CERTFILE) try: - server_params_test(CERTFILE, server_protocol, certsreqs, - CERTFILE, CERTFILE, client_protocol, - chatty=False, - connectionchatty=False) + server_params_test(client_context, server_context, + chatty=False, connectionchatty=False) # Protocol mismatch can result in either an SSLError, or a # "Connection reset by peer" error. except ssl.SSLError: @@ -733,34 +1112,32 @@ else: class ThreadedTests(unittest.TestCase): + @skip_if_broken_ubuntu_ssl def test_echo(self): """Basic test of an SSL client connecting to a server""" if support.verbose: sys.stdout.write("\n") - server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE, - CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1, - chatty=True, connectionchatty=True) + for protocol in PROTOCOLS: + context = ssl.SSLContext(protocol) + context.load_cert_chain(CERTFILE) + server_params_test(context, context, + chatty=True, connectionchatty=True) def test_getpeercert(self): if support.verbose: sys.stdout.write("\n") - s2 = socket.socket() - server = ThreadedEchoServer(CERTFILE, - certreqs=ssl.CERT_NONE, - ssl_version=ssl.PROTOCOL_SSLv23, - cacerts=CERTFILE, - chatty=False) + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context.verify_mode = ssl.CERT_REQUIRED + context.load_verify_locations(CERTFILE) + context.load_cert_chain(CERTFILE) + server = ThreadedEchoServer(context=context, chatty=False) flag = threading.Event() server.start(flag) # wait for it to start flag.wait() # try to connect try: - s = ssl.wrap_socket(socket.socket(), - certfile=CERTFILE, - ca_certs=CERTFILE, - cert_reqs=ssl.CERT_REQUIRED, - ssl_version=ssl.PROTOCOL_SSLv23) + s = context.wrap_socket(socket.socket()) s.connect((HOST, server.port)) cert = s.getpeercert() self.assertTrue(cert, "Can't get peer certificate.") @@ -776,6 +1153,11 @@ else: self.fail( "Missing or invalid 'organizationName' field in certificate subject; " "should be 'Python Software Foundation'.") + self.assertIn('notBefore', cert) + self.assertIn('notAfter', cert) + before = ssl.cert_time_to_seconds(cert['notBefore']) + after = ssl.cert_time_to_seconds(cert['notAfter']) + self.assertLess(before, after) s.close() finally: server.stop() @@ -815,21 +1197,22 @@ else: def listener(): s.listen(5) listener_ready.set() - s.accept() + newsock, addr = s.accept() + newsock.close() s.close() listener_gone.set() def connector(): listener_ready.wait() - c = socket.socket() - c.connect((HOST, port)) - listener_gone.wait() - try: - ssl_sock = ssl.wrap_socket(c) - except IOError: - pass - else: - self.fail('connecting to closed SSL socket should have failed') + with socket.socket() as c: + c.connect((HOST, port)) + listener_gone.wait() + try: + ssl_sock = ssl.wrap_socket(c) + except IOError: + pass + else: + self.fail('connecting to closed SSL socket should have failed') t = threading.Thread(target=listener) t.start() @@ -838,26 +1221,33 @@ else: finally: t.join() + @skip_if_broken_ubuntu_ssl @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'), "need SSLv2") def test_protocol_sslv2(self): """Connecting to an SSLv2 server with various client options""" if support.verbose: - sys.stdout.write("\ntest_protocol_sslv2 disabled, " - "as it fails on OpenSSL 1.0.0+") - return + sys.stdout.write("\n") try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) - + # SSLv23 client with specific SSL options + if no_sslv2_implies_sslv3_hello(): + # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False, + client_options=ssl.OP_NO_SSLv2) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True, + client_options=ssl.OP_NO_SSLv3) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True, + client_options=ssl.OP_NO_TLSv1) + + @skip_if_broken_ubuntu_ssl def test_protocol_sslv23(self): """Connecting to an SSLv23 server with various client options""" if support.verbose: - sys.stdout.write("\ntest_protocol_sslv23 disabled, " - "as it fails on OpenSSL 1.0.0+") - return + sys.stdout.write("\n") if hasattr(ssl, 'PROTOCOL_SSLv2'): try: try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) @@ -879,12 +1269,21 @@ else: try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) + # Server with specific SSL options + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, + server_options=ssl.OP_NO_SSLv3) + # Will choose TLSv1 + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, + server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False, + server_options=ssl.OP_NO_TLSv1) + + + @skip_if_broken_ubuntu_ssl def test_protocol_sslv3(self): """Connecting to an SSLv3 server with various client options""" if support.verbose: - sys.stdout.write("\ntest_protocol_sslv3 disabled, " - "as it fails on OpenSSL 1.0.0+") - return + sys.stdout.write("\n") try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True) try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) @@ -892,13 +1291,16 @@ else: try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False) try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) + if no_sslv2_implies_sslv3_hello(): + # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, True, + client_options=ssl.OP_NO_SSLv2) + @skip_if_broken_ubuntu_ssl def test_protocol_tlsv1(self): """Connecting to a TLSv1 server with various client options""" if support.verbose: - sys.stdout.write("\ntest_protocol_tlsv1 disabled, " - "as it fails on OpenSSL 1.0.0+") - return + sys.stdout.write("\n") try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) @@ -975,22 +1377,18 @@ else: def test_socketserver(self): """Using a SocketServer to create and manage SSL connections.""" - server = OurHTTPSServer(CERTFILE) - flag = threading.Event() - server.start(flag) - # wait for it to start - flag.wait() + server = make_https_server(self, CERTFILE) # try to connect + if support.verbose: + sys.stdout.write('\n') + with open(CERTFILE, 'rb') as f: + d1 = f.read() + d2 = '' + # now fetch the same data from the HTTPS server + url = 'https://%s:%d/%s' % ( + HOST, server.port, os.path.split(CERTFILE)[1]) + f = urllib.request.urlopen(url) try: - if support.verbose: - sys.stdout.write('\n') - with open(CERTFILE, 'rb') as f: - d1 = f.read() - d2 = '' - # now fetch the same data from the HTTPS server - url = 'https://%s:%d/%s' % ( - HOST, server.port, os.path.split(CERTFILE)[1]) - f = urllib.request.urlopen(url) dlen = f.info().get("content-length") if dlen and (int(dlen) > 0): d2 = f.read(int(dlen)) @@ -998,15 +1396,9 @@ else: sys.stdout.write( " client: read %d bytes from remote server '%s'\n" % (len(d2), server)) - f.close() - self.assertEqual(d1, d2) finally: - if support.verbose: - sys.stdout.write('stopping server\n') - server.stop() - if support.verbose: - sys.stdout.write('joining thread\n') - server.join() + f.close() + self.assertEqual(d1, d2) def test_asyncore_server(self): """Check the example asyncore integration.""" @@ -1041,9 +1433,17 @@ else: if support.verbose: sys.stdout.write(" client: closing connection.\n") s.close() + if support.verbose: + sys.stdout.write(" client: connection closed.\n") finally: + if support.verbose: + sys.stdout.write(" cleanup: stopping server.\n") server.stop() + if support.verbose: + sys.stdout.write(" cleanup: joining server thread.\n") server.join() + if support.verbose: + sys.stdout.write(" cleanup: successfully joined.\n") def test_recv_send(self): """Test recv(), send() and friends.""" @@ -1100,7 +1500,7 @@ else: send_meth(indata, *args) outdata = s.read() if outdata != indata.lower(): - raise support.TestFailed( + self.fail( "While sending with <<{name:s}>> bad data " "<<{outdata:r}>> ({nout:d}) received; " "expected <<{indata:r}>> ({nin:d})\n".format( @@ -1111,12 +1511,12 @@ else: ) except ValueError as e: if expect_success: - raise support.TestFailed( + self.fail( "Failed to send with method <<{name:s}>>; " "expected to succeed.\n".format(name=meth_name) ) if not str(e).startswith(meth_name): - raise support.TestFailed( + self.fail( "Method <<{name:s}>> failed with unexpected " "exception message: {exp:s}\n".format( name=meth_name, exp=e @@ -1129,7 +1529,7 @@ else: s.send(indata) outdata = recv_meth(*args) if outdata != indata.lower(): - raise support.TestFailed( + self.fail( "While receiving with <<{name:s}>> bad data " "<<{outdata:r}>> ({nout:d}) received; " "expected <<{indata:r}>> ({nin:d})\n".format( @@ -1140,12 +1540,12 @@ else: ) except ValueError as e: if expect_success: - raise support.TestFailed( + self.fail( "Failed to receive with method <<{name:s}>>; " "expected to succeed.\n".format(name=meth_name) ) if not str(e).startswith(meth_name): - raise support.TestFailed( + self.fail( "Method <<{name:s}>> failed with unexpected " "exception message: {exp:s}\n".format( name=meth_name, exp=e @@ -1178,6 +1578,8 @@ else: # Let the socket hang around rather than having # it closed by garbage collection. conns.append(server.accept()[0]) + for sock in conns: + sock.close() t = threading.Thread(target=serve) t.start() @@ -1189,8 +1591,8 @@ else: c.settimeout(0.2) c.connect((host, port)) # Will attempt handshake and time out - self.assertRaisesRegexp(ssl.SSLError, "timed out", - ssl.wrap_socket, c) + self.assertRaisesRegex(socket.timeout, "timed out", + ssl.wrap_socket, c) finally: c.close() try: @@ -1198,8 +1600,8 @@ else: c = ssl.wrap_socket(c) c.settimeout(0.2) # Will attempt handshake and time out - self.assertRaisesRegexp(ssl.SSLError, "timed out", - c.connect, (host, port)) + self.assertRaisesRegex(socket.timeout, "timed out", + c.connect, (host, port)) finally: c.close() finally: @@ -1209,21 +1611,32 @@ else: def test_main(verbose=False): - if skip_expected: - raise unittest.SkipTest("No SSL support") - - global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT - CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, - "keycert.pem") - SVN_PYTHON_ORG_ROOT_CERT = os.path.join( - os.path.dirname(__file__) or os.curdir, - "https_svn_python_org_root.pem") - - if (not os.path.exists(CERTFILE) or - not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)): - raise support.TestFailed("Can't read certificate files!") - - tests = [BasicTests] + if support.verbose: + plats = { + 'Linux': platform.linux_distribution, + 'Mac': platform.mac_ver, + 'Windows': platform.win32_ver, + } + for name, func in plats.items(): + plat = func() + if plat and plat[0]: + plat = '%s %r' % (name, plat) + break + else: + plat = repr(platform.platform()) + print("test_ssl: testing with %r %r" % + (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO)) + print(" under %s" % plat) + print(" HAS_SNI = %r" % ssl.HAS_SNI) + + for filename in [ + CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE, + ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY, + BADCERT, BADKEY, EMPTYCERT]: + if not os.path.exists(filename): + raise support.TestFailed("Can't read certificate file %r" % filename) + + tests = [ContextTests, BasicSocketTests] if support.is_resource_enabled('network'): tests.append(NetworkedTests) |