diff options
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r-- | Lib/test/test_ssl.py | 997 |
1 files changed, 592 insertions, 405 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 5515583..ad30105 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -21,6 +21,13 @@ import functools ssl = support.import_module("ssl") +try: + import threading +except ImportError: + _have_threads = False +else: + _have_threads = True + PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) HOST = support.HOST IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL') @@ -56,12 +63,12 @@ CRLFILE = data_file("revocation.crl") # Two keys and certs signed by the same CA (for SNI tests) SIGNED_CERTFILE = data_file("keycert3.pem") SIGNED_CERTFILE2 = data_file("keycert4.pem") -SIGNING_CA = data_file("pycacert.pem") +# Same certificate as pycacert.pem, but without extra text in file +SIGNING_CA = data_file("capath", "ceff1710.0") # cert with all kinds of subject alt names ALLSANFILE = data_file("allsans.pem") REMOTE_HOST = "self-signed.pythontest.net" -REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem") EMPTYCERT = data_file("nullcert.pem") BADCERT = data_file("badcert.pem") @@ -73,6 +80,12 @@ NULLBYTECERT = data_file("nullbytecert.pem") DHFILE = data_file("dh1024.pem") BYTES_DHFILE = os.fsencode(DHFILE) +# Not defined in all versions of OpenSSL +OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0) +OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0) +OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0) +OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0) + def handle_error(prefix): exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) @@ -130,6 +143,21 @@ def skip_if_broken_ubuntu_ssl(func): needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test") +def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *, + cert_reqs=ssl.CERT_NONE, ca_certs=None, + ciphers=None, certfile=None, keyfile=None, + **kwargs): + context = ssl.SSLContext(ssl_version) + if cert_reqs is not None: + context.verify_mode = cert_reqs + if ca_certs is not None: + context.load_verify_locations(ca_certs) + if certfile is not None or keyfile is not None: + context.load_cert_chain(certfile, keyfile) + if ciphers is not None: + context.set_ciphers(ciphers) + return context.wrap_socket(sock, **kwargs) + class BasicSocketTests(unittest.TestCase): def test_constants(self): @@ -350,17 +378,17 @@ class BasicSocketTests(unittest.TestCase): # Issue #7943: an SSL object doesn't create reference cycles with # itself. s = socket.socket(socket.AF_INET) - ss = ssl.wrap_socket(s) + ss = test_wrap_socket(s) wr = weakref.ref(ss) with support.check_warnings(("", ResourceWarning)): del ss - self.assertEqual(wr(), None) + self.assertEqual(wr(), None) def test_wrapped_unconnected(self): # Methods on an unconnected SSLSocket propagate the original # OSError raise by the underlying socket object. s = socket.socket(socket.AF_INET) - with ssl.wrap_socket(s) as ss: + with test_wrap_socket(s) as ss: self.assertRaises(OSError, ss.recv, 1) self.assertRaises(OSError, ss.recv_into, bytearray(b'x')) self.assertRaises(OSError, ss.recvfrom, 1) @@ -374,10 +402,10 @@ class BasicSocketTests(unittest.TestCase): for timeout in (None, 0.0, 5.0): s = socket.socket(socket.AF_INET) s.settimeout(timeout) - with ssl.wrap_socket(s) as ss: + with test_wrap_socket(s) as ss: self.assertEqual(timeout, ss.gettimeout()) - def test_errors(self): + def test_errors_sslwrap(self): sock = socket.socket() self.assertRaisesRegex(ValueError, "certfile must be specified", @@ -387,10 +415,10 @@ class BasicSocketTests(unittest.TestCase): ssl.wrap_socket, sock, server_side=True) self.assertRaisesRegex(ValueError, "certfile must be specified for server-side operations", - ssl.wrap_socket, sock, server_side=True, certfile="") + ssl.wrap_socket, sock, server_side=True, certfile="") with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s: self.assertRaisesRegex(ValueError, "can't connect in server-side mode", - s.connect, (HOST, 8080)) + s.connect, (HOST, 8080)) with self.assertRaises(OSError) as cm: with socket.socket() as sock: ssl.wrap_socket(sock, certfile=NONEXISTINGCERT) @@ -413,7 +441,7 @@ class BasicSocketTests(unittest.TestCase): sock = socket.socket() self.addCleanup(sock.close) with self.assertRaises(ssl.SSLError): - ssl.wrap_socket(sock, + test_wrap_socket(sock, certfile=certfile, ssl_version=ssl.PROTOCOL_TLSv1) @@ -600,7 +628,7 @@ class BasicSocketTests(unittest.TestCase): s.listen() c = socket.socket(socket.AF_INET) c.connect(s.getsockname()) - with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss: + with test_wrap_socket(c, do_handshake_on_connect=False) as ss: with self.assertRaises(ValueError): ss.get_channel_binding("unknown-type") s.close() @@ -610,15 +638,15 @@ class BasicSocketTests(unittest.TestCase): def test_tls_unique_channel_binding(self): # unconnected should return None for known type s = socket.socket(socket.AF_INET) - with ssl.wrap_socket(s) as ss: + with test_wrap_socket(s) as ss: self.assertIsNone(ss.get_channel_binding("tls-unique")) # the same for server-side s = socket.socket(socket.AF_INET) - with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss: + with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss: self.assertIsNone(ss.get_channel_binding("tls-unique")) def test_dealloc_warn(self): - ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) + ss = test_wrap_socket(socket.socket(socket.AF_INET)) r = repr(ss) with self.assertWarns(ResourceWarning) as cm: ss = None @@ -737,7 +765,7 @@ class BasicSocketTests(unittest.TestCase): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.addCleanup(s.close) with self.assertRaises(NotImplementedError) as cx: - ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE) + test_wrap_socket(s, cert_reqs=ssl.CERT_NONE) self.assertEqual(str(cx.exception), "only stream sockets are supported") ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) with self.assertRaises(NotImplementedError) as cx: @@ -809,6 +837,22 @@ class BasicSocketTests(unittest.TestCase): self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0) self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT") + def test_connect_ex_error(self): + server = socket.socket(socket.AF_INET) + self.addCleanup(server.close) + port = support.bind_port(server) # Reserve port but don't listen + s = test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED) + self.addCleanup(s.close) + rc = s.connect_ex((HOST, port)) + # Issue #19919: Windows machines or VMs hosted on Windows + # machines sometimes return EWOULDBLOCK. + errors = ( + errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT, + errno.EWOULDBLOCK, + ) + self.assertIn(rc, errors) + class ContextTests(unittest.TestCase): @@ -834,13 +878,22 @@ class ContextTests(unittest.TestCase): with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): ctx.set_ciphers("^$:,;?*'dorothyx") + @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old') + def test_get_ciphers(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx.set_ciphers('AESGCM') + names = set(d['name'] for d in ctx.get_ciphers()) + self.assertIn('AES256-GCM-SHA384', names) + self.assertIn('AES128-GCM-SHA256', names) + @skip_if_broken_ubuntu_ssl def test_options(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3) - if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0): - default |= ssl.OP_NO_COMPRESSION + # SSLContext also enables these by default + default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE | + OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE) self.assertEqual(default, ctx.options) ctx.options |= ssl.OP_NO_TLSv1 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options) @@ -1205,16 +1258,29 @@ class ContextTests(unittest.TestCase): stats["x509"] += 1 self.assertEqual(ctx.cert_store_stats(), stats) + def _assert_context_options(self, ctx): + self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) + if OP_NO_COMPRESSION != 0: + self.assertEqual(ctx.options & OP_NO_COMPRESSION, + OP_NO_COMPRESSION) + if OP_SINGLE_DH_USE != 0: + self.assertEqual(ctx.options & OP_SINGLE_DH_USE, + OP_SINGLE_DH_USE) + if OP_SINGLE_ECDH_USE != 0: + self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE, + OP_SINGLE_ECDH_USE) + if OP_CIPHER_SERVER_PREFERENCE != 0: + self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE, + OP_CIPHER_SERVER_PREFERENCE) + def test_create_default_context(self): ctx = ssl.create_default_context() + self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) self.assertTrue(ctx.check_hostname) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) - self.assertEqual( - ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), - getattr(ssl, "OP_NO_COMPRESSION", 0), - ) + self._assert_context_options(ctx) + with open(SIGNING_CA) as f: cadata = f.read() @@ -1222,40 +1288,24 @@ class ContextTests(unittest.TestCase): cadata=cadata) self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) - self.assertEqual( - ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), - getattr(ssl, "OP_NO_COMPRESSION", 0), - ) + self._assert_context_options(ctx) ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) - self.assertEqual( - ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), - getattr(ssl, "OP_NO_COMPRESSION", 0), - ) - self.assertEqual( - ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0), - getattr(ssl, "OP_SINGLE_DH_USE", 0), - ) - self.assertEqual( - ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0), - getattr(ssl, "OP_SINGLE_ECDH_USE", 0), - ) + self._assert_context_options(ctx) def test__create_stdlib_context(self): ctx = ssl._create_stdlib_context() self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) self.assertFalse(ctx.check_hostname) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) + self._assert_context_options(ctx) ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1) self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) + self._assert_context_options(ctx) ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1, cert_reqs=ssl.CERT_REQUIRED, @@ -1263,12 +1313,12 @@ class ContextTests(unittest.TestCase): self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) self.assertTrue(ctx.check_hostname) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) + self._assert_context_options(ctx) ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH) self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) - self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) + self._assert_context_options(ctx) def test_check_hostname(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) @@ -1292,6 +1342,17 @@ class ContextTests(unittest.TestCase): ctx.check_hostname = False self.assertFalse(ctx.check_hostname) + def test_context_client_server(self): + # PROTOCOL_TLS_CLIENT has sane defaults + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + self.assertTrue(ctx.check_hostname) + self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) + + # PROTOCOL_TLS_SERVER has different but also sane defaults + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + self.assertFalse(ctx.check_hostname) + self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) + class SSLErrorTests(unittest.TestCase): @@ -1397,140 +1458,105 @@ class MemoryBIOTests(unittest.TestCase): self.assertRaises(TypeError, bio.write, 1) -class NetworkedTests(unittest.TestCase): +@unittest.skipUnless(_have_threads, "Needs threading module") +class SimpleBackgroundTests(unittest.TestCase): - def test_connect(self): - with support.transient_internet(REMOTE_HOST): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE) - try: - s.connect((REMOTE_HOST, 443)) - self.assertEqual({}, s.getpeercert()) - finally: - s.close() + """Tests that connect to a simple server running in the background""" - # this should fail because we have no verification certs - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED) - self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", - s.connect, (REMOTE_HOST, 443)) - s.close() + def setUp(self): + server = ThreadedEchoServer(SIGNED_CERTFILE) + self.server_addr = (HOST, server.port) + server.__enter__() + self.addCleanup(server.__exit__, None, None, None) - # this should succeed because we specify the root cert - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=REMOTE_ROOT_CERT) - try: - s.connect((REMOTE_HOST, 443)) - self.assertTrue(s.getpeercert()) - finally: - s.close() + def test_connect(self): + with test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE) as s: + s.connect(self.server_addr) + self.assertEqual({}, s.getpeercert()) + self.assertFalse(s.server_side) + + # this should succeed because we specify the root cert + with test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SIGNING_CA) as s: + s.connect(self.server_addr) + self.assertTrue(s.getpeercert()) + self.assertFalse(s.server_side) + + def test_connect_fail(self): + # This should fail because we have no verification certs. Connection + # failure crashes ThreadedEchoServer, so run this in an independent + # test method. + s = test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED) + self.addCleanup(s.close) + self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", + s.connect, self.server_addr) def test_connect_ex(self): # Issue #11326: check connect_ex() implementation - with support.transient_internet(REMOTE_HOST): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=REMOTE_ROOT_CERT) - try: - self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443))) - self.assertTrue(s.getpeercert()) - finally: - s.close() + s = test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SIGNING_CA) + self.addCleanup(s.close) + self.assertEqual(0, s.connect_ex(self.server_addr)) + self.assertTrue(s.getpeercert()) def test_non_blocking_connect_ex(self): # Issue #11326: non-blocking connect_ex() should allow handshake # to proceed after the socket gets ready. - with support.transient_internet(REMOTE_HOST): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=REMOTE_ROOT_CERT, - do_handshake_on_connect=False) + s = test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SIGNING_CA, + do_handshake_on_connect=False) + self.addCleanup(s.close) + s.setblocking(False) + rc = s.connect_ex(self.server_addr) + # EWOULDBLOCK under Windows, EINPROGRESS elsewhere + self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) + # Wait for connect to finish + select.select([], [s], [], 5.0) + # Non-blocking handshake + while True: try: - s.setblocking(False) - rc = s.connect_ex((REMOTE_HOST, 443)) - # EWOULDBLOCK under Windows, EINPROGRESS elsewhere - self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) - # Wait for connect to finish + s.do_handshake() + break + except ssl.SSLWantReadError: + select.select([s], [], [], 5.0) + except ssl.SSLWantWriteError: select.select([], [s], [], 5.0) - # Non-blocking handshake - while True: - try: - s.do_handshake() - break - except ssl.SSLWantReadError: - select.select([s], [], [], 5.0) - except ssl.SSLWantWriteError: - select.select([], [s], [], 5.0) - # SSL established - self.assertTrue(s.getpeercert()) - finally: - s.close() - - def test_timeout_connect_ex(self): - # Issue #12065: on a timeout, connect_ex() should return the original - # errno (mimicking the behaviour of non-SSL sockets). - with support.transient_internet(REMOTE_HOST): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=REMOTE_ROOT_CERT, - do_handshake_on_connect=False) - try: - s.settimeout(0.0000001) - rc = s.connect_ex((REMOTE_HOST, 443)) - if rc == 0: - self.skipTest("REMOTE_HOST responded too quickly") - self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) - finally: - s.close() - - def test_connect_ex_error(self): - with support.transient_internet(REMOTE_HOST): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=REMOTE_ROOT_CERT) - try: - rc = s.connect_ex((REMOTE_HOST, 444)) - # Issue #19919: Windows machines or VMs hosted on Windows - # machines sometimes return EWOULDBLOCK. - errors = ( - errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT, - errno.EWOULDBLOCK, - ) - self.assertIn(rc, errors) - finally: - s.close() + # SSL established + self.assertTrue(s.getpeercert()) def test_connect_with_context(self): - with support.transient_internet(REMOTE_HOST): - # Same as test_connect, but with a separately created context - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) - s.connect((REMOTE_HOST, 443)) - try: - self.assertEqual({}, s.getpeercert()) - finally: - s.close() - # Same with a server hostname - s = ctx.wrap_socket(socket.socket(socket.AF_INET), - server_hostname=REMOTE_HOST) - s.connect((REMOTE_HOST, 443)) - s.close() - # This should fail because we have no verification certs - ctx.verify_mode = ssl.CERT_REQUIRED - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) - self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", - s.connect, (REMOTE_HOST, 443)) - s.close() - # This should succeed because we specify the root cert - ctx.load_verify_locations(REMOTE_ROOT_CERT) - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) - s.connect((REMOTE_HOST, 443)) - try: - cert = s.getpeercert() - self.assertTrue(cert) - finally: - s.close() + # Same as test_connect, but with a separately created context + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + self.assertEqual({}, s.getpeercert()) + # Same with a server hostname + with ctx.wrap_socket(socket.socket(socket.AF_INET), + server_hostname="dummy") as s: + s.connect(self.server_addr) + ctx.verify_mode = ssl.CERT_REQUIRED + # This should succeed because we specify the root cert + ctx.load_verify_locations(SIGNING_CA) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) + + def test_connect_with_context_fail(self): + # This should fail because we have no verification certs. Connection + # failure crashes ThreadedEchoServer, so run this in an independent + # test method. + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + self.addCleanup(s.close) + self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", + s.connect, self.server_addr) def test_connect_capath(self): # Verify server certificates using the `capath` argument @@ -1538,198 +1564,130 @@ class NetworkedTests(unittest.TestCase): # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must # contain both versions of each certificate (same content, different # filename) for this test to be portable across OpenSSL releases. - with support.transient_internet(REMOTE_HOST): - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(capath=CAPATH) - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) - s.connect((REMOTE_HOST, 443)) - try: - cert = s.getpeercert() - self.assertTrue(cert) - finally: - s.close() - # Same with a bytes `capath` argument - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(capath=BYTES_CAPATH) - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) - s.connect((REMOTE_HOST, 443)) - try: - cert = s.getpeercert() - self.assertTrue(cert) - finally: - s.close() + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) + # Same with a bytes `capath` argument + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=BYTES_CAPATH) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) def test_connect_cadata(self): - with open(REMOTE_ROOT_CERT) as f: + with open(SIGNING_CA) as f: pem = f.read() der = ssl.PEM_cert_to_DER_cert(pem) - with support.transient_internet(REMOTE_HOST): - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(cadata=pem) - with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: - s.connect((REMOTE_HOST, 443)) - cert = s.getpeercert() - self.assertTrue(cert) + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(cadata=pem) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) - # same with DER - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(cadata=der) - with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: - s.connect((REMOTE_HOST, 443)) - cert = s.getpeercert() - self.assertTrue(cert) + # same with DER + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(cadata=der) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") def test_makefile_close(self): # Issue #5238: creating a file-like object with makefile() shouldn't # delay closing the underlying "real socket" (here tested with its # file descriptor, hence skipping the test under Windows). - with support.transient_internet(REMOTE_HOST): - ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) - ss.connect((REMOTE_HOST, 443)) - fd = ss.fileno() - f = ss.makefile() - f.close() - # The fd is still open + ss = test_wrap_socket(socket.socket(socket.AF_INET)) + ss.connect(self.server_addr) + fd = ss.fileno() + f = ss.makefile() + f.close() + # The fd is still open + os.read(fd, 0) + # Closing the SSL socket should close the fd too + ss.close() + gc.collect() + with self.assertRaises(OSError) as e: os.read(fd, 0) - # Closing the SSL socket should close the fd too - ss.close() - gc.collect() - with self.assertRaises(OSError) as e: - os.read(fd, 0) - self.assertEqual(e.exception.errno, errno.EBADF) + self.assertEqual(e.exception.errno, errno.EBADF) def test_non_blocking_handshake(self): - with support.transient_internet(REMOTE_HOST): - s = socket.socket(socket.AF_INET) - s.connect((REMOTE_HOST, 443)) - s.setblocking(False) - s = ssl.wrap_socket(s, - cert_reqs=ssl.CERT_NONE, - do_handshake_on_connect=False) - count = 0 - while True: - try: - count += 1 - s.do_handshake() - break - except ssl.SSLWantReadError: - select.select([s], [], []) - except ssl.SSLWantWriteError: - select.select([], [s], []) - s.close() - if support.verbose: - sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) + s = socket.socket(socket.AF_INET) + s.connect(self.server_addr) + s.setblocking(False) + s = test_wrap_socket(s, + cert_reqs=ssl.CERT_NONE, + do_handshake_on_connect=False) + self.addCleanup(s.close) + count = 0 + while True: + try: + count += 1 + s.do_handshake() + break + except ssl.SSLWantReadError: + select.select([s], [], []) + except ssl.SSLWantWriteError: + select.select([], [s], []) + if support.verbose: + sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) def test_get_server_certificate(self): - def _test_get_server_certificate(host, port, cert=None): - with support.transient_internet(host): - pem = ssl.get_server_certificate((host, port)) - if not pem: - self.fail("No server certificate on %s:%s!" % (host, port)) + _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA) - try: - pem = ssl.get_server_certificate((host, port), - ca_certs=CERTFILE) - except ssl.SSLError as x: - #should fail - if support.verbose: - sys.stdout.write("%s\n" % x) - else: - self.fail("Got server certificate %s for %s:%s!" % (pem, host, port)) - - pem = ssl.get_server_certificate((host, port), - ca_certs=cert) - if not pem: - self.fail("No server certificate on %s:%s!" % (host, port)) - if support.verbose: - sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem)) - - _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT) - if support.IPV6_ENABLED: - _test_get_server_certificate('ipv6.google.com', 443) + def test_get_server_certificate_fail(self): + # Connection failure crashes ThreadedEchoServer, so run this in an + # independent test method + _test_get_server_certificate_fail(self, *self.server_addr) def test_ciphers(self): - remote = (REMOTE_HOST, 443) - with support.transient_internet(remote[0]): - with ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s: - s.connect(remote) - with ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s: - s.connect(remote) - # Error checking can happen at instantiation or when connecting - with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): - with socket.socket(socket.AF_INET) as sock: - s = ssl.wrap_socket(sock, - cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") - s.connect(remote) - - def test_algorithms(self): - # Issue #8484: all algorithms should be available when verifying a - # certificate. - # SHA256 was added in OpenSSL 0.9.8 - if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): - self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) - # sha256.tbs-internet.com needs SNI to use the correct certificate - if not ssl.HAS_SNI: - self.skipTest("SNI needed for this test") - # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host) - remote = ("sha256.tbs-internet.com", 443) - sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") - with support.transient_internet("sha256.tbs-internet.com"): - ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) - ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(sha256_cert) - s = ctx.wrap_socket(socket.socket(socket.AF_INET), - server_hostname="sha256.tbs-internet.com") - try: - s.connect(remote) - if support.verbose: - sys.stdout.write("\nCipher with %r is %r\n" % - (remote, s.cipher())) - sys.stdout.write("Certificate is:\n%s\n" % - pprint.pformat(s.getpeercert())) - finally: - s.close() + with test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s: + s.connect(self.server_addr) + with test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s: + s.connect(self.server_addr) + # Error checking can happen at instantiation or when connecting + with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): + with socket.socket(socket.AF_INET) as sock: + s = test_wrap_socket(sock, + cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") + s.connect(self.server_addr) def test_get_ca_certs_capath(self): # capath certs are loaded on request - with support.transient_internet(REMOTE_HOST): - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(capath=CAPATH) - self.assertEqual(ctx.get_ca_certs(), []) - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) - s.connect((REMOTE_HOST, 443)) - try: - cert = s.getpeercert() - self.assertTrue(cert) - finally: - s.close() - self.assertEqual(len(ctx.get_ca_certs()), 1) + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + self.assertEqual(ctx.get_ca_certs(), []) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) + self.assertEqual(len(ctx.get_ca_certs()), 1) @needs_sni def test_context_setget(self): # Check that the context of a connected socket can be replaced. - with support.transient_internet(REMOTE_HOST): - ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1) - ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - s = socket.socket(socket.AF_INET) - with ctx1.wrap_socket(s) as ss: - ss.connect((REMOTE_HOST, 443)) - self.assertIs(ss.context, ctx1) - self.assertIs(ss._sslobj.context, ctx1) - ss.context = ctx2 - self.assertIs(ss.context, ctx2) - self.assertIs(ss._sslobj.context, ctx2) - - -class NetworkedBIOTests(unittest.TestCase): + ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + s = socket.socket(socket.AF_INET) + with ctx1.wrap_socket(s) as ss: + ss.connect(self.server_addr) + self.assertIs(ss.context, ctx1) + self.assertIs(ss._sslobj.context, ctx1) + ss.context = ctx2 + self.assertIs(ss.context, ctx2) + self.assertIs(ss._sslobj.context, ctx2) def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs): # A simple IO loop. Call func(*args) depending on the error we get @@ -1765,64 +1723,128 @@ class NetworkedBIOTests(unittest.TestCase): % (count, func.__name__)) return ret - def test_handshake(self): + def test_bio_handshake(self): + sock = socket.socket(socket.AF_INET) + self.addCleanup(sock.close) + sock.connect(self.server_addr) + incoming = ssl.MemoryBIO() + outgoing = ssl.MemoryBIO() + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(SIGNING_CA) + ctx.check_hostname = True + sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost') + self.assertIs(sslobj._sslobj.owner, sslobj) + self.assertIsNone(sslobj.cipher()) + self.assertIsNotNone(sslobj.shared_ciphers()) + self.assertRaises(ValueError, sslobj.getpeercert) + if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: + self.assertIsNone(sslobj.get_channel_binding('tls-unique')) + self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) + self.assertTrue(sslobj.cipher()) + self.assertIsNotNone(sslobj.shared_ciphers()) + self.assertTrue(sslobj.getpeercert()) + if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: + self.assertTrue(sslobj.get_channel_binding('tls-unique')) + try: + self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) + except ssl.SSLSyscallError: + # If the server shuts down the TCP connection without sending a + # secure shutdown message, this is reported as SSL_ERROR_SYSCALL + pass + self.assertRaises(ssl.SSLError, sslobj.write, b'foo') + + def test_bio_read_write_data(self): + sock = socket.socket(socket.AF_INET) + self.addCleanup(sock.close) + sock.connect(self.server_addr) + incoming = ssl.MemoryBIO() + outgoing = ssl.MemoryBIO() + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_NONE + sslobj = ctx.wrap_bio(incoming, outgoing, False) + self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) + req = b'FOO\n' + self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req) + buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024) + self.assertEqual(buf, b'foo\n') + self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) + + +class NetworkedTests(unittest.TestCase): + + def test_timeout_connect_ex(self): + # Issue #12065: on a timeout, connect_ex() should return the original + # errno (mimicking the behaviour of non-SSL sockets). with support.transient_internet(REMOTE_HOST): - sock = socket.socket(socket.AF_INET) - sock.connect((REMOTE_HOST, 443)) - incoming = ssl.MemoryBIO() - outgoing = ssl.MemoryBIO() - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + s = test_wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + do_handshake_on_connect=False) + self.addCleanup(s.close) + s.settimeout(0.0000001) + rc = s.connect_ex((REMOTE_HOST, 443)) + if rc == 0: + self.skipTest("REMOTE_HOST responded too quickly") + self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) + + @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6') + def test_get_server_certificate_ipv6(self): + with support.transient_internet('ipv6.google.com'): + _test_get_server_certificate(self, 'ipv6.google.com', 443) + _test_get_server_certificate_fail(self, 'ipv6.google.com', 443) + + def test_algorithms(self): + # Issue #8484: all algorithms should be available when verifying a + # certificate. + # SHA256 was added in OpenSSL 0.9.8 + if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): + self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) + # sha256.tbs-internet.com needs SNI to use the correct certificate + if not ssl.HAS_SNI: + self.skipTest("SNI needed for this test") + # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host) + remote = ("sha256.tbs-internet.com", 443) + sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") + with support.transient_internet("sha256.tbs-internet.com"): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(REMOTE_ROOT_CERT) - ctx.check_hostname = True - sslobj = ctx.wrap_bio(incoming, outgoing, False, REMOTE_HOST) - self.assertIs(sslobj._sslobj.owner, sslobj) - self.assertIsNone(sslobj.cipher()) - self.assertIsNotNone(sslobj.shared_ciphers()) - self.assertRaises(ValueError, sslobj.getpeercert) - if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: - self.assertIsNone(sslobj.get_channel_binding('tls-unique')) - self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) - self.assertTrue(sslobj.cipher()) - self.assertIsNotNone(sslobj.shared_ciphers()) - self.assertTrue(sslobj.getpeercert()) - if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: - self.assertTrue(sslobj.get_channel_binding('tls-unique')) + ctx.load_verify_locations(sha256_cert) + s = ctx.wrap_socket(socket.socket(socket.AF_INET), + server_hostname="sha256.tbs-internet.com") try: - self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) - except ssl.SSLSyscallError: - # self-signed.pythontest.net probably shuts down the TCP - # connection without sending a secure shutdown message, and - # this is reported as SSL_ERROR_SYSCALL - pass - self.assertRaises(ssl.SSLError, sslobj.write, b'foo') - sock.close() + s.connect(remote) + if support.verbose: + sys.stdout.write("\nCipher with %r is %r\n" % + (remote, s.cipher())) + sys.stdout.write("Certificate is:\n%s\n" % + pprint.pformat(s.getpeercert())) + finally: + s.close() - def test_read_write_data(self): - with support.transient_internet(REMOTE_HOST): - sock = socket.socket(socket.AF_INET) - sock.connect((REMOTE_HOST, 443)) - incoming = ssl.MemoryBIO() - outgoing = ssl.MemoryBIO() - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.verify_mode = ssl.CERT_NONE - sslobj = ctx.wrap_bio(incoming, outgoing, False) - self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) - req = b'GET / HTTP/1.0\r\n\r\n' - self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req) - buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024) - self.assertEqual(buf[:5], b'HTTP/') - self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) - sock.close() +def _test_get_server_certificate(test, host, port, cert=None): + pem = ssl.get_server_certificate((host, port)) + if not pem: + test.fail("No server certificate on %s:%s!" % (host, port)) + + pem = ssl.get_server_certificate((host, port), ca_certs=cert) + if not pem: + test.fail("No server certificate on %s:%s!" % (host, port)) + if support.verbose: + sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem)) + +def _test_get_server_certificate_fail(test, host, port): + try: + pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE) + except ssl.SSLError as x: + #should fail + if support.verbose: + sys.stdout.write("%s\n" % x) + else: + test.fail("Got server certificate %s for %s:%s!" % (pem, host, port)) -try: - import threading -except ImportError: - _have_threads = False -else: - _have_threads = True +if _have_threads: from test.ssl_servers import make_https_server class ThreadedEchoServer(threading.Thread): @@ -1910,6 +1932,15 @@ else: if not stripped: # eof, so quit this handler self.running = False + try: + self.sock = self.sslconn.unwrap() + except OSError: + # Many tests shut the TCP connection down + # without an SSL shutdown. This causes + # unwrap() to raise OSError with errno=0! + pass + else: + self.sslconn = None self.close() elif stripped == b'over': if support.verbose and self.server.connectionchatty: @@ -2037,7 +2068,7 @@ else: class ConnectionHandler (asyncore.dispatcher_with_send): def __init__(self, conn, certfile): - self.socket = ssl.wrap_socket(conn, server_side=True, + self.socket = test_wrap_socket(conn, server_side=True, certfile=certfile, do_handshake_on_connect=False) asyncore.dispatcher_with_send.__init__(self, self.socket) @@ -2145,7 +2176,8 @@ else: self.server.close() def server_params_test(client_context, server_context, indata=b"FOO\n", - chatty=True, connectionchatty=False, sni_name=None): + chatty=True, connectionchatty=False, sni_name=None, + session=None): """ Launch a server, connect a client to it and try various reads and writes. @@ -2156,7 +2188,7 @@ else: connectionchatty=False) with server: with client_context.wrap_socket(socket.socket(), - server_hostname=sni_name) as s: + server_hostname=sni_name, session=session) as s: s.connect((HOST, server.port)) for arg in [indata, bytearray(indata), memoryview(indata)]: if connectionchatty: @@ -2184,6 +2216,8 @@ else: 'client_alpn_protocol': s.selected_alpn_protocol(), 'client_npn_protocol': s.selected_npn_protocol(), 'version': s.version(), + 'session_reused': s.session_reused, + 'session': s.session, }) s.close() stats['server_alpn_protocols'] = server.selected_alpn_protocols @@ -2259,12 +2293,53 @@ else: if support.verbose: sys.stdout.write("\n") for protocol in PROTOCOLS: + if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}: + continue with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]): context = ssl.SSLContext(protocol) context.load_cert_chain(CERTFILE) server_params_test(context, context, chatty=True, connectionchatty=True) + client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + client_context.load_verify_locations(SIGNING_CA) + server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + # server_context.load_verify_locations(SIGNING_CA) + server_context.load_cert_chain(SIGNED_CERTFILE2) + + with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER): + server_params_test(client_context=client_context, + server_context=server_context, + chatty=True, connectionchatty=True, + sni_name='fakehostname') + + client_context.check_hostname = False + with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT): + with self.assertRaises(ssl.SSLError) as e: + server_params_test(client_context=server_context, + server_context=client_context, + chatty=True, connectionchatty=True, + sni_name='fakehostname') + self.assertIn('called a function you should not call', + str(e.exception)) + + with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER): + with self.assertRaises(ssl.SSLError) as e: + server_params_test(client_context=server_context, + server_context=server_context, + chatty=True, connectionchatty=True) + self.assertIn('called a function you should not call', + str(e.exception)) + + with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT): + with self.assertRaises(ssl.SSLError) as e: + server_params_test(client_context=server_context, + server_context=client_context, + chatty=True, connectionchatty=True) + self.assertIn('called a function you should not call', + str(e.exception)) + + def test_getpeercert(self): if support.verbose: sys.stdout.write("\n") @@ -2398,7 +2473,7 @@ else: connectionchatty=False) with server, \ socket.socket() as sock, \ - ssl.wrap_socket(sock, + test_wrap_socket(sock, certfile=certfile, ssl_version=ssl.PROTOCOL_TLSv1) as s: try: @@ -2445,7 +2520,7 @@ else: c.connect((HOST, port)) listener_gone.wait() try: - ssl_sock = ssl.wrap_socket(c) + ssl_sock = test_wrap_socket(c) except OSError: pass else: @@ -2635,7 +2710,7 @@ else: sys.stdout.write( " client: read %r from server, starting TLS...\n" % msg) - conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) + conn = test_wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) wrapped = True elif indata == b"ENDTLS" and msg.startswith(b"ok"): # ENDTLS ok, switch back to clear text @@ -2694,7 +2769,7 @@ else: indata = b"FOO\n" server = AsyncoreEchoServer(CERTFILE) with server: - s = ssl.wrap_socket(socket.socket()) + s = test_wrap_socket(socket.socket()) s.connect(('127.0.0.1', server.port)) if support.verbose: sys.stdout.write( @@ -2727,7 +2802,7 @@ else: chatty=True, connectionchatty=False) with server: - s = ssl.wrap_socket(socket.socket(), + s = test_wrap_socket(socket.socket(), server_side=False, certfile=CERTFILE, ca_certs=CERTFILE, @@ -2745,12 +2820,13 @@ else: count, addr = s.recvfrom_into(b) return b[:count] - # (name, method, whether to expect success, *args) + # (name, method, expect success?, *args, return value func) send_methods = [ - ('send', s.send, True, []), - ('sendto', s.sendto, False, ["some.address"]), - ('sendall', s.sendall, True, []), + ('send', s.send, True, [], len), + ('sendto', s.sendto, False, ["some.address"], len), + ('sendall', s.sendall, True, [], lambda x: None), ] + # (name, method, whether to expect success, *args) recv_methods = [ ('recv', s.recv, True, []), ('recvfrom', s.recvfrom, False, ["some.address"]), @@ -2759,10 +2835,13 @@ else: ] data_prefix = "PREFIX_" - for meth_name, send_meth, expect_success, args in send_methods: + for (meth_name, send_meth, expect_success, args, + ret_val_meth) in send_methods: indata = (data_prefix + meth_name).encode('ascii') try: - send_meth(indata, *args) + ret = send_meth(indata, *args) + msg = "sending with {}".format(meth_name) + self.assertEqual(ret, ret_val_meth(indata), msg=msg) outdata = s.read() if outdata != indata.lower(): self.fail( @@ -2847,7 +2926,7 @@ else: self.addCleanup(server.__exit__, None, None) s = socket.create_connection((HOST, server.port)) self.addCleanup(s.close) - s = ssl.wrap_socket(s, suppress_ragged_eofs=False) + s = test_wrap_socket(s, suppress_ragged_eofs=False) self.addCleanup(s.close) # recv/read(0) should return no data @@ -2869,7 +2948,7 @@ else: chatty=True, connectionchatty=False) with server: - s = ssl.wrap_socket(socket.socket(), + s = test_wrap_socket(socket.socket(), server_side=False, certfile=CERTFILE, ca_certs=CERTFILE, @@ -2923,12 +3002,12 @@ else: c.connect((host, port)) # Will attempt handshake and time out self.assertRaisesRegex(socket.timeout, "timed out", - ssl.wrap_socket, c) + test_wrap_socket, c) finally: c.close() try: c = socket.socket(socket.AF_INET) - c = ssl.wrap_socket(c) + c = test_wrap_socket(c) c.settimeout(0.2) # Will attempt handshake and time out self.assertRaisesRegex(socket.timeout, "timed out", @@ -2951,6 +3030,7 @@ else: host = "127.0.0.1" port = support.bind_port(server) server = context.wrap_socket(server, server_side=True) + self.assertTrue(server.server_side) evt = threading.Event() remote = None @@ -3053,7 +3133,7 @@ else: chatty=True, connectionchatty=False) with server: - s = ssl.wrap_socket(socket.socket(), + s = test_wrap_socket(socket.socket(), server_side=False, certfile=CERTFILE, ca_certs=CERTFILE, @@ -3078,7 +3158,7 @@ else: s.close() # now, again - s = ssl.wrap_socket(socket.socket(), + s = test_wrap_socket(socket.socket(), server_side=False, certfile=CERTFILE, ca_certs=CERTFILE, @@ -3388,6 +3468,111 @@ else: s.sendfile(file) self.assertEqual(s.recv(1024), TEST_DATA) + def test_session(self): + server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + server_context.load_cert_chain(SIGNED_CERTFILE) + client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + client_context.verify_mode = ssl.CERT_REQUIRED + client_context.load_verify_locations(SIGNING_CA) + + # first conncetion without session + stats = server_params_test(client_context, server_context) + session = stats['session'] + self.assertTrue(session.id) + self.assertGreater(session.time, 0) + self.assertGreater(session.timeout, 0) + self.assertTrue(session.has_ticket) + if ssl.OPENSSL_VERSION_INFO > (1, 0, 1): + self.assertGreater(session.ticket_lifetime_hint, 0) + self.assertFalse(stats['session_reused']) + sess_stat = server_context.session_stats() + self.assertEqual(sess_stat['accept'], 1) + self.assertEqual(sess_stat['hits'], 0) + + # reuse session + stats = server_params_test(client_context, server_context, session=session) + sess_stat = server_context.session_stats() + self.assertEqual(sess_stat['accept'], 2) + self.assertEqual(sess_stat['hits'], 1) + self.assertTrue(stats['session_reused']) + session2 = stats['session'] + self.assertEqual(session2.id, session.id) + self.assertEqual(session2, session) + self.assertIsNot(session2, session) + self.assertGreaterEqual(session2.time, session.time) + self.assertGreaterEqual(session2.timeout, session.timeout) + + # another one without session + stats = server_params_test(client_context, server_context) + self.assertFalse(stats['session_reused']) + session3 = stats['session'] + self.assertNotEqual(session3.id, session.id) + self.assertNotEqual(session3, session) + sess_stat = server_context.session_stats() + self.assertEqual(sess_stat['accept'], 3) + self.assertEqual(sess_stat['hits'], 1) + + # reuse session again + stats = server_params_test(client_context, server_context, session=session) + self.assertTrue(stats['session_reused']) + session4 = stats['session'] + self.assertEqual(session4.id, session.id) + self.assertEqual(session4, session) + self.assertGreaterEqual(session4.time, session.time) + self.assertGreaterEqual(session4.timeout, session.timeout) + sess_stat = server_context.session_stats() + self.assertEqual(sess_stat['accept'], 4) + self.assertEqual(sess_stat['hits'], 2) + + def test_session_handling(self): + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context.verify_mode = ssl.CERT_REQUIRED + context.load_verify_locations(CERTFILE) + context.load_cert_chain(CERTFILE) + + context2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context2.verify_mode = ssl.CERT_REQUIRED + context2.load_verify_locations(CERTFILE) + context2.load_cert_chain(CERTFILE) + + server = ThreadedEchoServer(context=context, chatty=False) + with server: + with context.wrap_socket(socket.socket()) as s: + # session is None before handshake + self.assertEqual(s.session, None) + self.assertEqual(s.session_reused, None) + s.connect((HOST, server.port)) + session = s.session + self.assertTrue(session) + with self.assertRaises(TypeError) as e: + s.session = object + self.assertEqual(str(e.exception), 'Value is not a SSLSession.') + + with context.wrap_socket(socket.socket()) as s: + s.connect((HOST, server.port)) + # cannot set session after handshake + with self.assertRaises(ValueError) as e: + s.session = session + self.assertEqual(str(e.exception), + 'Cannot set session after handshake.') + + with context.wrap_socket(socket.socket()) as s: + # can set session before handshake and before the + # connection was established + s.session = session + s.connect((HOST, server.port)) + self.assertEqual(s.session.id, session.id) + self.assertEqual(s.session, session) + self.assertEqual(s.session_reused, True) + + with context2.wrap_socket(socket.socket()) as s: + # cannot re-use session with a different SSLContext + with self.assertRaises(ValueError) as e: + s.session = session + s.connect((HOST, server.port)) + self.assertEqual(str(e.exception), + 'Session refers to a different SSLContext.') + def test_main(verbose=False): if support.verbose: @@ -3400,7 +3585,7 @@ def test_main(verbose=False): with warnings.catch_warnings(): warnings.filterwarnings( 'ignore', - 'dist\(\) and linux_distribution\(\) ' + r'dist\(\) and linux_distribution\(\) ' 'functions are deprecated .*', PendingDeprecationWarning, ) @@ -3422,18 +3607,20 @@ def test_main(verbose=False): pass for filename in [ - CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE, + CERTFILE, BYTES_CERTFILE, ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY, SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA, BADCERT, BADKEY, EMPTYCERT]: if not os.path.exists(filename): raise support.TestFailed("Can't read certificate file %r" % filename) - tests = [ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests] + tests = [ + ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests, + SimpleBackgroundTests, + ] if support.is_resource_enabled('network'): tests.append(NetworkedTests) - tests.append(NetworkedBIOTests) if _have_threads: thread_info = support.threading_setup() |