summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_ssl.py
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2012-03-21 23:23:03 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2012-03-21 23:23:03 (GMT)
commitd5d17eb653196c8bbddde07a024d35a4b152a498 (patch)
tree756cc4df474dc1d5046d1712422fe8aa3fd8d7ac /Lib/test/test_ssl.py
parenta966c6fddb070cdc392b38486191699815f90478 (diff)
downloadcpython-d5d17eb653196c8bbddde07a024d35a4b152a498.zip
cpython-d5d17eb653196c8bbddde07a024d35a4b152a498.tar.gz
cpython-d5d17eb653196c8bbddde07a024d35a4b152a498.tar.bz2
Issue #14204: The ssl module now has support for the Next Protocol Negotiation extension, if available in the underlying OpenSSL library.
Patch by Colin Marc.
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r--Lib/test/test_ssl.py54
1 files changed, 50 insertions, 4 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index c6ce075..ada3c4b 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -879,6 +879,7 @@ else:
try:
self.sslconn = self.server.context.wrap_socket(
self.sock, server_side=True)
+ self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
except ssl.SSLError as e:
# XXX Various errors can have happened here, for example
# a mismatching protocol version, an invalid certificate,
@@ -901,6 +902,8 @@ else:
cipher = self.sslconn.cipher()
if support.verbose and self.server.chatty:
sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
+ sys.stdout.write(" server: selected protocol is now "
+ + str(self.sslconn.selected_npn_protocol()) + "\n")
return True
def read(self):
@@ -979,7 +982,7 @@ else:
def __init__(self, certificate=None, ssl_version=None,
certreqs=None, cacerts=None,
chatty=True, connectionchatty=False, starttls_server=False,
- ciphers=None, context=None):
+ npn_protocols=None, ciphers=None, context=None):
if context:
self.context = context
else:
@@ -992,6 +995,8 @@ else:
self.context.load_verify_locations(cacerts)
if certificate:
self.context.load_cert_chain(certificate)
+ if npn_protocols:
+ self.context.set_npn_protocols(npn_protocols)
if ciphers:
self.context.set_ciphers(ciphers)
self.chatty = chatty
@@ -1001,6 +1006,7 @@ else:
self.port = support.bind_port(self.sock)
self.flag = None
self.active = False
+ self.selected_protocols = []
self.conn_errors = []
threading.Thread.__init__(self)
self.daemon = True
@@ -1195,6 +1201,7 @@ else:
Launch a server, connect a client to it and try various reads
and writes.
"""
+ stats = {}
server = ThreadedEchoServer(context=server_context,
chatty=chatty,
connectionchatty=False)
@@ -1220,12 +1227,14 @@ else:
if connectionchatty:
if support.verbose:
sys.stdout.write(" client: closing connection.\n")
- stats = {
+ stats.update({
'compression': s.compression(),
'cipher': s.cipher(),
- }
+ 'client_npn_protocol': s.selected_npn_protocol()
+ })
s.close()
- return stats
+ stats['server_npn_protocols'] = server.selected_protocols
+ return stats
def try_protocol_combo(server_protocol, client_protocol, expect_success,
certsreqs=None, server_options=0, client_options=0):
@@ -1853,6 +1862,43 @@ else:
if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
self.fail("Non-DH cipher: " + cipher[0])
+ def test_selected_npn_protocol(self):
+ # selected_npn_protocol() is None unless NPN is used
+ context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ context.load_cert_chain(CERTFILE)
+ stats = server_params_test(context, context,
+ chatty=True, connectionchatty=True)
+ self.assertIs(stats['client_npn_protocol'], None)
+
+ @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
+ def test_npn_protocols(self):
+ server_protocols = ['http/1.1', 'spdy/2']
+ protocol_tests = [
+ (['http/1.1', 'spdy/2'], 'http/1.1'),
+ (['spdy/2', 'http/1.1'], 'http/1.1'),
+ (['spdy/2', 'test'], 'spdy/2'),
+ (['abc', 'def'], 'abc')
+ ]
+ for client_protocols, expected in protocol_tests:
+ server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ server_context.load_cert_chain(CERTFILE)
+ server_context.set_npn_protocols(server_protocols)
+ client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ client_context.load_cert_chain(CERTFILE)
+ client_context.set_npn_protocols(client_protocols)
+ stats = server_params_test(client_context, server_context,
+ chatty=True, connectionchatty=True)
+
+ msg = "failed trying %s (s) and %s (c).\n" \
+ "was expecting %s, but got %%s from the %%s" \
+ % (str(server_protocols), str(client_protocols),
+ str(expected))
+ client_result = stats['client_npn_protocol']
+ self.assertEqual(client_result, expected, msg % (client_result, "client"))
+ server_result = stats['server_npn_protocols'][-1] \
+ if len(stats['server_npn_protocols']) else 'nothing'
+ self.assertEqual(server_result, expected, msg % (server_result, "server"))
+
def test_main(verbose=False):
if support.verbose: