diff options
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r-- | Lib/test/test_ssl.py | 297 |
1 files changed, 247 insertions, 50 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 815475e..d4c90cf 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -17,16 +17,11 @@ import asyncore import weakref import platform import functools +from unittest import mock ssl = support.import_module("ssl") -PROTOCOLS = [ - ssl.PROTOCOL_SSLv3, - ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1 -] -if hasattr(ssl, 'PROTOCOL_SSLv2'): - PROTOCOLS.append(ssl.PROTOCOL_SSLv2) - +PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) HOST = support.HOST data_file = lambda name: os.path.join(os.path.dirname(__file__), name) @@ -48,6 +43,11 @@ KEY_PASSWORD = "somepass" CAPATH = data_file("capath") BYTES_CAPATH = os.fsencode(CAPATH) +# Two keys and certs signed by the same CA (for SNI tests) +SIGNED_CERTFILE = data_file("keycert3.pem") +SIGNED_CERTFILE2 = data_file("keycert4.pem") +SIGNING_CA = data_file("pycacert.pem") + SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem") EMPTYCERT = data_file("nullcert.pem") @@ -59,6 +59,7 @@ NOKIACERT = data_file("nokia.pem") DHFILE = data_file("dh512.pem") BYTES_DHFILE = os.fsencode(DHFILE) + def handle_error(prefix): exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) if support.verbose: @@ -89,14 +90,12 @@ def skip_if_broken_ubuntu_ssl(func): else: return func +needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test") + class BasicSocketTests(unittest.TestCase): def test_constants(self): - #ssl.PROTOCOL_SSLv2 - ssl.PROTOCOL_SSLv23 - ssl.PROTOCOL_SSLv3 - ssl.PROTOCOL_TLSv1 ssl.CERT_NONE ssl.CERT_OPTIONAL ssl.CERT_REQUIRED @@ -142,6 +141,7 @@ class BasicSocketTests(unittest.TestCase): (('organizationName', 'Python Software Foundation'),), (('commonName', 'localhost'),)) ) + # Note the next three asserts will fail if the keys are regenerated self.assertEqual(p['notAfter'], 'Oct 5 23:01:56 2020 GMT') self.assertEqual(p['notBefore'], 'Oct 8 23:01:56 2010 GMT') self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E') @@ -214,15 +214,15 @@ class BasicSocketTests(unittest.TestCase): def test_wrapped_unconnected(self): # Methods on an unconnected SSLSocket propagate the original - # socket.error raise by the underlying socket object. + # OSError raise by the underlying socket object. s = socket.socket(socket.AF_INET) with ssl.wrap_socket(s) as ss: - self.assertRaises(socket.error, ss.recv, 1) - self.assertRaises(socket.error, ss.recv_into, bytearray(b'x')) - self.assertRaises(socket.error, ss.recvfrom, 1) - self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1) - self.assertRaises(socket.error, ss.send, b'x') - self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) + self.assertRaises(OSError, ss.recv, 1) + self.assertRaises(OSError, ss.recv_into, bytearray(b'x')) + self.assertRaises(OSError, ss.recvfrom, 1) + self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1) + self.assertRaises(OSError, ss.send, b'x') + self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0)) def test_timeout(self): # Issue #8524: when creating an SSL socket, the timeout of the @@ -247,15 +247,15 @@ class BasicSocketTests(unittest.TestCase): with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s: self.assertRaisesRegex(ValueError, "can't connect in server-side mode", s.connect, (HOST, 8080)) - with self.assertRaises(IOError) as cm: + with self.assertRaises(OSError) as cm: with socket.socket() as sock: ssl.wrap_socket(sock, certfile=WRONGCERT) self.assertEqual(cm.exception.errno, errno.ENOENT) - with self.assertRaises(IOError) as cm: + with self.assertRaises(OSError) as cm: with socket.socket() as sock: ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT) self.assertEqual(cm.exception.errno, errno.ENOENT) - with self.assertRaises(IOError) as cm: + with self.assertRaises(OSError) as cm: with socket.socket() as sock: ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT) self.assertEqual(cm.exception.errno, errno.ENOENT) @@ -387,11 +387,8 @@ class ContextTests(unittest.TestCase): @skip_if_broken_ubuntu_ssl def test_constructor(self): - if hasattr(ssl, 'PROTOCOL_SSLv2'): - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv2) - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv3) - ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + for protocol in PROTOCOLS: + ssl.SSLContext(protocol) self.assertRaises(TypeError, ssl.SSLContext) self.assertRaises(ValueError, ssl.SSLContext, -1) self.assertRaises(ValueError, ssl.SSLContext, 42) @@ -451,7 +448,7 @@ class ContextTests(unittest.TestCase): ctx.load_cert_chain(CERTFILE) ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE) self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE) - with self.assertRaises(IOError) as cm: + with self.assertRaises(OSError) as cm: ctx.load_cert_chain(WRONGCERT) self.assertEqual(cm.exception.errno, errno.ENOENT) with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): @@ -536,7 +533,7 @@ class ContextTests(unittest.TestCase): ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None) self.assertRaises(TypeError, ctx.load_verify_locations) self.assertRaises(TypeError, ctx.load_verify_locations, None, None) - with self.assertRaises(IOError) as cm: + with self.assertRaises(OSError) as cm: ctx.load_verify_locations(WRONGCERT) self.assertEqual(cm.exception.errno, errno.ENOENT) with self.assertRaisesRegex(ssl.SSLError, "PEM lib"): @@ -594,6 +591,34 @@ class ContextTests(unittest.TestCase): self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo") self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo") + @needs_sni + def test_sni_callback(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + + # set_servername_callback expects a callable, or None + self.assertRaises(TypeError, ctx.set_servername_callback) + self.assertRaises(TypeError, ctx.set_servername_callback, 4) + self.assertRaises(TypeError, ctx.set_servername_callback, "") + self.assertRaises(TypeError, ctx.set_servername_callback, ctx) + + def dummycallback(sock, servername, ctx): + pass + ctx.set_servername_callback(None) + ctx.set_servername_callback(dummycallback) + + @needs_sni + def test_sni_callback_refcycle(self): + # Reference cycles through the servername callback are detected + # and cleared. + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + def dummycallback(sock, servername, ctx, cycle=ctx): + pass + ctx.set_servername_callback(dummycallback) + wr = weakref.ref(ctx) + del ctx, dummycallback + gc.collect() + self.assertIs(wr(), None) + class SSLErrorTests(unittest.TestCase): @@ -1036,7 +1061,7 @@ else: sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n" % (msg, ctype, msg.lower(), ctype)) self.write(msg.lower()) - except socket.error: + except OSError: if self.server.chatty: handle_error("Test server failure:\n") self.close() @@ -1146,7 +1171,7 @@ else: return self.handle_close() except ssl.SSLError: raise - except socket.error as err: + except OSError as err: if err.args[0] == errno.ECONNABORTED: return self.handle_close() else: @@ -1250,19 +1275,19 @@ else: except ssl.SSLError as x: if support.verbose: sys.stdout.write("\nSSLError is %s\n" % x.args[1]) - except socket.error as x: + except OSError as x: if support.verbose: - sys.stdout.write("\nsocket.error is %s\n" % x.args[1]) - except IOError as x: + sys.stdout.write("\nOSError is %s\n" % x.args[1]) + except OSError as x: if x.errno != errno.ENOENT: raise if support.verbose: - sys.stdout.write("\IOError is %s\n" % str(x)) + sys.stdout.write("\OSError is %s\n" % str(x)) else: raise AssertionError("Use of invalid cert should have failed!") def server_params_test(client_context, server_context, indata=b"FOO\n", - chatty=True, connectionchatty=False): + chatty=True, connectionchatty=False, sni_name=None): """ Launch a server, connect a client to it and try various reads and writes. @@ -1272,7 +1297,8 @@ else: chatty=chatty, connectionchatty=False) with server: - with client_context.wrap_socket(socket.socket()) as s: + with client_context.wrap_socket(socket.socket(), + server_hostname=sni_name) as s: s.connect((HOST, server.port)) for arg in [indata, bytearray(indata), memoryview(indata)]: if connectionchatty: @@ -1296,6 +1322,7 @@ else: stats.update({ 'compression': s.compression(), 'cipher': s.cipher(), + 'peercert': s.getpeercert(), 'client_npn_protocol': s.selected_npn_protocol() }) s.close() @@ -1321,12 +1348,15 @@ else: client_context.options = ssl.OP_ALL | client_options server_context = ssl.SSLContext(server_protocol) server_context.options = ssl.OP_ALL | server_options + + # NOTE: we must enable "ALL" ciphers on the client, otherwise an + # SSLv23 client will send an SSLv3 hello (rather than SSLv2) + # starting from OpenSSL 1.0.0 (see issue #8322). + if client_context.protocol == ssl.PROTOCOL_SSLv23: + client_context.set_ciphers("ALL") + for ctx in (client_context, server_context): ctx.verify_mode = certsreqs - # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client - # will send an SSLv3 hello (rather than SSLv2) starting from - # OpenSSL 1.0.0 (see issue #8322). - ctx.set_ciphers("ALL") ctx.load_cert_chain(CERTFILE) ctx.load_verify_locations(CERTFILE) try: @@ -1337,7 +1367,7 @@ else: except ssl.SSLError: if expect_success: raise - except socket.error as e: + except OSError as e: if expect_success or e.errno != errno.ECONNRESET: raise else: @@ -1356,10 +1386,11 @@ else: if support.verbose: sys.stdout.write("\n") for protocol in PROTOCOLS: - context = ssl.SSLContext(protocol) - context.load_cert_chain(CERTFILE) - server_params_test(context, context, - chatty=True, connectionchatty=True) + with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]): + context = ssl.SSLContext(protocol) + context.load_cert_chain(CERTFILE) + server_params_test(context, context, + chatty=True, connectionchatty=True) def test_getpeercert(self): if support.verbose: @@ -1411,7 +1442,7 @@ else: "badkey.pem")) def test_rude_shutdown(self): - """A brutal shutdown of an SSL server should raise an IOError + """A brutal shutdown of an SSL server should raise an OSError in the client when attempting handshake. """ listener_ready = threading.Event() @@ -1439,7 +1470,7 @@ else: listener_gone.wait() try: ssl_sock = ssl.wrap_socket(c) - except IOError: + except OSError: pass else: self.fail('connecting to closed SSL socket should have failed') @@ -1482,7 +1513,7 @@ else: if hasattr(ssl, 'PROTOCOL_SSLv2'): try: try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) - except (ssl.SSLError, socket.error) as x: + except OSError as x: # this fails on some older versions of OpenSSL (0.9.7l, for instance) if support.verbose: sys.stdout.write( @@ -1542,6 +1573,49 @@ else: try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False, client_options=ssl.OP_NO_TLSv1) + @skip_if_broken_ubuntu_ssl + @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"), + "TLS version 1.1 not supported.") + def test_protocol_tlsv1_1(self): + """Connecting to a TLSv1.1 server with various client options. + Testing against older TLS versions.""" + if support.verbose: + sys.stdout.write("\n") + try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, True) + if hasattr(ssl, 'PROTOCOL_SSLv2'): + try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False, + client_options=ssl.OP_NO_TLSv1_1) + + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, True) + try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False) + + + @skip_if_broken_ubuntu_ssl + @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"), + "TLS version 1.2 not supported.") + def test_protocol_tlsv1_2(self): + """Connecting to a TLSv1.2 server with various client options. + Testing against older TLS versions.""" + if support.verbose: + sys.stdout.write("\n") + try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, True, + server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2, + client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,) + if hasattr(ssl, 'PROTOCOL_SSLv2'): + try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False, + client_options=ssl.OP_NO_TLSv1_2) + + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, True) + try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False) + def test_starttls(self): """Switching from clear text to encrypted and back again.""" msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6") @@ -1602,7 +1676,7 @@ else: def test_socketserver(self): """Using a SocketServer to create and manage SSL connections.""" - server = make_https_server(self, CERTFILE) + server = make_https_server(self, certfile=CERTFILE) # try to connect if support.verbose: sys.stdout.write('\n') @@ -1858,6 +1932,20 @@ else: self.assertIsInstance(remote, ssl.SSLSocket) self.assertEqual(peer, client_addr) + def test_getpeercert_enotconn(self): + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + with context.wrap_socket(socket.socket()) as sock: + with self.assertRaises(OSError) as cm: + sock.getpeercert() + self.assertEqual(cm.exception.errno, errno.ENOTCONN) + + def test_do_handshake_enotconn(self): + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + with context.wrap_socket(socket.socket()) as sock: + with self.assertRaises(OSError) as cm: + sock.do_handshake() + self.assertEqual(cm.exception.errno, errno.ENOTCONN) + def test_default_ciphers(self): context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) try: @@ -1869,7 +1957,7 @@ else: ssl_version=ssl.PROTOCOL_SSLv23, chatty=False) as server: with context.wrap_socket(socket.socket()) as s: - with self.assertRaises((OSError, ssl.SSLError)): + with self.assertRaises(OSError): s.connect((HOST, server.port)) self.assertIn("no shared cipher", str(server.conn_errors[0])) @@ -2002,6 +2090,109 @@ else: if len(stats['server_npn_protocols']) else 'nothing' self.assertEqual(server_result, expected, msg % (server_result, "server")) + def sni_contexts(self): + server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + server_context.load_cert_chain(SIGNED_CERTFILE) + other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + other_context.load_cert_chain(SIGNED_CERTFILE2) + client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + client_context.verify_mode = ssl.CERT_REQUIRED + client_context.load_verify_locations(SIGNING_CA) + return server_context, other_context, client_context + + def check_common_name(self, stats, name): + cert = stats['peercert'] + self.assertIn((('commonName', name),), cert['subject']) + + @needs_sni + def test_sni_callback(self): + calls = [] + server_context, other_context, client_context = self.sni_contexts() + + def servername_cb(ssl_sock, server_name, initial_context): + calls.append((server_name, initial_context)) + if server_name is not None: + ssl_sock.context = other_context + server_context.set_servername_callback(servername_cb) + + stats = server_params_test(client_context, server_context, + chatty=True, + sni_name='supermessage') + # The hostname was fetched properly, and the certificate was + # changed for the connection. + self.assertEqual(calls, [("supermessage", server_context)]) + # CERTFILE4 was selected + self.check_common_name(stats, 'fakehostname') + + calls = [] + # The callback is called with server_name=None + stats = server_params_test(client_context, server_context, + chatty=True, + sni_name=None) + self.assertEqual(calls, [(None, server_context)]) + self.check_common_name(stats, 'localhost') + + # Check disabling the callback + calls = [] + server_context.set_servername_callback(None) + + stats = server_params_test(client_context, server_context, + chatty=True, + sni_name='notfunny') + # Certificate didn't change + self.check_common_name(stats, 'localhost') + self.assertEqual(calls, []) + + @needs_sni + def test_sni_callback_alert(self): + # Returning a TLS alert is reflected to the connecting client + server_context, other_context, client_context = self.sni_contexts() + + def cb_returning_alert(ssl_sock, server_name, initial_context): + return ssl.ALERT_DESCRIPTION_ACCESS_DENIED + server_context.set_servername_callback(cb_returning_alert) + + with self.assertRaises(ssl.SSLError) as cm: + stats = server_params_test(client_context, server_context, + chatty=False, + sni_name='supermessage') + self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED') + + @needs_sni + def test_sni_callback_raising(self): + # Raising fails the connection with a TLS handshake failure alert. + server_context, other_context, client_context = self.sni_contexts() + + def cb_raising(ssl_sock, server_name, initial_context): + 1/0 + server_context.set_servername_callback(cb_raising) + + with self.assertRaises(ssl.SSLError) as cm, \ + support.captured_stderr() as stderr: + stats = server_params_test(client_context, server_context, + chatty=False, + sni_name='supermessage') + self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE') + self.assertIn("ZeroDivisionError", stderr.getvalue()) + + @needs_sni + def test_sni_callback_wrong_return_type(self): + # Returning the wrong return type terminates the TLS connection + # with an internal error alert. + server_context, other_context, client_context = self.sni_contexts() + + def cb_wrong_return_type(ssl_sock, server_name, initial_context): + return "foo" + server_context.set_servername_callback(cb_wrong_return_type) + + with self.assertRaises(ssl.SSLError) as cm, \ + support.captured_stderr() as stderr: + stats = server_params_test(client_context, server_context, + chatty=False, + sni_name='supermessage') + self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR') + self.assertIn("TypeError", stderr.getvalue()) + def test_main(verbose=False): if support.verbose: @@ -2021,10 +2212,16 @@ def test_main(verbose=False): (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO)) print(" under %s" % plat) print(" HAS_SNI = %r" % ssl.HAS_SNI) + print(" OP_ALL = 0x%8x" % ssl.OP_ALL) + try: + print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1) + except AttributeError: + pass for filename in [ CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE, ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY, + SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA, BADCERT, BADKEY, EMPTYCERT]: if not os.path.exists(filename): raise support.TestFailed("Can't read certificate file %r" % filename) |