summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_ssl.py
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2013-01-05 20:20:29 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2013-01-05 20:20:29 (GMT)
commit58ddc9d743d09ee93d5cf46a4de62eab30dad79d (patch)
tree777a03261e792820c18c4a8ffd3923e2cc284adb /Lib/test/test_ssl.py
parent3c9850aad7ef6d88d26081e062c11635af5dea8e (diff)
downloadcpython-58ddc9d743d09ee93d5cf46a4de62eab30dad79d.zip
cpython-58ddc9d743d09ee93d5cf46a4de62eab30dad79d.tar.gz
cpython-58ddc9d743d09ee93d5cf46a4de62eab30dad79d.tar.bz2
Issue #8109: The ssl module now has support for server-side SNI, thanks to a :meth:`SSLContext.set_servername_callback` method.
Patch by Daniel Black.
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r--Lib/test/test_ssl.py138
1 files changed, 136 insertions, 2 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 21843cf..2e559b1 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -48,6 +48,11 @@ KEY_PASSWORD = "somepass"
CAPATH = data_file("capath")
BYTES_CAPATH = os.fsencode(CAPATH)
+# Two keys and certs signed by the same CA (for SNI tests)
+SIGNED_CERTFILE = data_file("keycert3.pem")
+SIGNED_CERTFILE2 = data_file("keycert4.pem")
+SIGNING_CA = data_file("pycacert.pem")
+
SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
EMPTYCERT = data_file("nullcert.pem")
@@ -59,6 +64,7 @@ NOKIACERT = data_file("nokia.pem")
DHFILE = data_file("dh512.pem")
BYTES_DHFILE = os.fsencode(DHFILE)
+
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
if support.verbose:
@@ -89,6 +95,8 @@ def skip_if_broken_ubuntu_ssl(func):
else:
return func
+needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
+
class BasicSocketTests(unittest.TestCase):
@@ -142,6 +150,7 @@ class BasicSocketTests(unittest.TestCase):
(('organizationName', 'Python Software Foundation'),),
(('commonName', 'localhost'),))
)
+ # Note the next three asserts will fail if the keys are regenerated
self.assertEqual(p['notAfter'], 'Oct 5 23:01:56 2020 GMT')
self.assertEqual(p['notBefore'], 'Oct 8 23:01:56 2010 GMT')
self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
@@ -585,6 +594,34 @@ class ContextTests(unittest.TestCase):
self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
+ @needs_sni
+ def test_sni_callback(self):
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+
+ # set_servername_callback expects a callable, or None
+ self.assertRaises(TypeError, ctx.set_servername_callback)
+ self.assertRaises(TypeError, ctx.set_servername_callback, 4)
+ self.assertRaises(TypeError, ctx.set_servername_callback, "")
+ self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
+
+ def dummycallback(sock, servername, ctx):
+ pass
+ ctx.set_servername_callback(None)
+ ctx.set_servername_callback(dummycallback)
+
+ @needs_sni
+ def test_sni_callback_refcycle(self):
+ # Reference cycles through the servername callback are detected
+ # and cleared.
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ def dummycallback(sock, servername, ctx, cycle=ctx):
+ pass
+ ctx.set_servername_callback(dummycallback)
+ wr = weakref.ref(ctx)
+ del ctx, dummycallback
+ gc.collect()
+ self.assertIs(wr(), None)
+
class SSLErrorTests(unittest.TestCase):
@@ -1249,7 +1286,7 @@ else:
raise AssertionError("Use of invalid cert should have failed!")
def server_params_test(client_context, server_context, indata=b"FOO\n",
- chatty=True, connectionchatty=False):
+ chatty=True, connectionchatty=False, sni_name=None):
"""
Launch a server, connect a client to it and try various reads
and writes.
@@ -1259,7 +1296,8 @@ else:
chatty=chatty,
connectionchatty=False)
with server:
- with client_context.wrap_socket(socket.socket()) as s:
+ with client_context.wrap_socket(socket.socket(),
+ server_hostname=sni_name) as s:
s.connect((HOST, server.port))
for arg in [indata, bytearray(indata), memoryview(indata)]:
if connectionchatty:
@@ -1283,6 +1321,7 @@ else:
stats.update({
'compression': s.compression(),
'cipher': s.cipher(),
+ 'peercert': s.getpeercert(),
'client_npn_protocol': s.selected_npn_protocol()
})
s.close()
@@ -1988,6 +2027,100 @@ else:
if len(stats['server_npn_protocols']) else 'nothing'
self.assertEqual(server_result, expected, msg % (server_result, "server"))
+ def sni_contexts(self):
+ server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ server_context.load_cert_chain(SIGNED_CERTFILE)
+ other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ other_context.load_cert_chain(SIGNED_CERTFILE2)
+ client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ client_context.verify_mode = ssl.CERT_REQUIRED
+ client_context.load_verify_locations(SIGNING_CA)
+ return server_context, other_context, client_context
+
+ def check_common_name(self, stats, name):
+ cert = stats['peercert']
+ self.assertIn((('commonName', name),), cert['subject'])
+
+ @needs_sni
+ def test_sni_callback(self):
+ calls = []
+ server_context, other_context, client_context = self.sni_contexts()
+
+ def servername_cb(ssl_sock, server_name, initial_context):
+ calls.append((server_name, initial_context))
+ ssl_sock.context = other_context
+ server_context.set_servername_callback(servername_cb)
+
+ stats = server_params_test(client_context, server_context,
+ chatty=True,
+ sni_name='supermessage')
+ # The hostname was fetched properly, and the certificate was
+ # changed for the connection.
+ self.assertEqual(calls, [("supermessage", server_context)])
+ # CERTFILE4 was selected
+ self.check_common_name(stats, 'fakehostname')
+
+ # Check disabling the callback
+ calls = []
+ server_context.set_servername_callback(None)
+
+ stats = server_params_test(client_context, server_context,
+ chatty=True,
+ sni_name='notfunny')
+ # Certificate didn't change
+ self.check_common_name(stats, 'localhost')
+ self.assertEqual(calls, [])
+
+ @needs_sni
+ def test_sni_callback_alert(self):
+ # Returning a TLS alert is reflected to the connecting client
+ server_context, other_context, client_context = self.sni_contexts()
+
+ def cb_returning_alert(ssl_sock, server_name, initial_context):
+ return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
+ server_context.set_servername_callback(cb_returning_alert)
+
+ with self.assertRaises(ssl.SSLError) as cm:
+ stats = server_params_test(client_context, server_context,
+ chatty=False,
+ sni_name='supermessage')
+ self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
+
+ @needs_sni
+ def test_sni_callback_raising(self):
+ # Raising fails the connection with a TLS handshake failure alert.
+ server_context, other_context, client_context = self.sni_contexts()
+
+ def cb_raising(ssl_sock, server_name, initial_context):
+ 1/0
+ server_context.set_servername_callback(cb_raising)
+
+ with self.assertRaises(ssl.SSLError) as cm, \
+ support.captured_stderr() as stderr:
+ stats = server_params_test(client_context, server_context,
+ chatty=False,
+ sni_name='supermessage')
+ self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
+ self.assertIn("ZeroDivisionError", stderr.getvalue())
+
+ @needs_sni
+ def test_sni_callback_wrong_return_type(self):
+ # Returning the wrong return type terminates the TLS connection
+ # with an internal error alert.
+ server_context, other_context, client_context = self.sni_contexts()
+
+ def cb_wrong_return_type(ssl_sock, server_name, initial_context):
+ return "foo"
+ server_context.set_servername_callback(cb_wrong_return_type)
+
+ with self.assertRaises(ssl.SSLError) as cm, \
+ support.captured_stderr() as stderr:
+ stats = server_params_test(client_context, server_context,
+ chatty=False,
+ sni_name='supermessage')
+ self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
+ self.assertIn("TypeError", stderr.getvalue())
+
def test_main(verbose=False):
if support.verbose:
@@ -2011,6 +2144,7 @@ def test_main(verbose=False):
for filename in [
CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
+ SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
BADCERT, BADKEY, EMPTYCERT]:
if not os.path.exists(filename):
raise support.TestFailed("Can't read certificate file %r" % filename)