diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/ssl.py | 19 | ||||
-rw-r--r-- | Lib/test/test_ssl.py | 91 |
2 files changed, 110 insertions, 0 deletions
@@ -99,6 +99,10 @@ import base64 # for DER-to-PEM translation import traceback import errno +if _ssl.HAS_TLS_UNIQUE: + CHANNEL_BINDING_TYPES = ['tls-unique'] +else: + CHANNEL_BINDING_TYPES = [] class CertificateError(ValueError): pass @@ -495,6 +499,21 @@ class SSLSocket(socket): self.do_handshake_on_connect), addr) + def get_channel_binding(self, cb_type="tls-unique"): + """Get channel binding data for current connection. Raise ValueError + if the requested `cb_type` is not supported. Return bytes of the data + or None if the data is not available (e.g. before the handshake). + """ + if cb_type not in CHANNEL_BINDING_TYPES: + raise ValueError("Unsupported channel binding type") + if cb_type != "tls-unique": + raise NotImplementedError( + "{0} channel binding type not implemented" + .format(cb_type)) + if self._sslobj is None: + return None + return self._sslobj.tls_unique_cb() + def __del__(self): # sys.stderr.write("__del__ on %s\n" % repr(self)) self._real_close() diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index fd1cd2d..f3f0c54 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -321,6 +321,25 @@ class BasicSocketTests(unittest.TestCase): self.assertRaises(ValueError, ctx.wrap_socket, sock, True, server_hostname="some.hostname") + def test_unknown_channel_binding(self): + # should raise ValueError for unknown type + s = socket.socket(socket.AF_INET) + ss = ssl.wrap_socket(s) + with self.assertRaises(ValueError): + ss.get_channel_binding("unknown-type") + + @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, + "'tls-unique' channel binding not available") + def test_tls_unique_channel_binding(self): + # unconnected should return None for known type + s = socket.socket(socket.AF_INET) + ss = ssl.wrap_socket(s) + self.assertIsNone(ss.get_channel_binding("tls-unique")) + # the same for server-side + s = socket.socket(socket.AF_INET) + ss = ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) + self.assertIsNone(ss.get_channel_binding("tls-unique")) + class ContextTests(unittest.TestCase): @skip_if_broken_ubuntu_ssl @@ -826,6 +845,11 @@ else: self.sslconn = None if support.verbose and self.server.connectionchatty: sys.stdout.write(" server: connection is now unencrypted...\n") + elif stripped == b'CB tls-unique': + if support.verbose and self.server.connectionchatty: + sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n") + data = self.sslconn.get_channel_binding("tls-unique") + self.write(repr(data).encode("us-ascii") + b"\n") else: if (support.verbose and self.server.connectionchatty): @@ -1625,6 +1649,73 @@ else: t.join() server.close() + @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, + "'tls-unique' channel binding not available") + def test_tls_unique_channel_binding(self): + """Test tls-unique channel binding.""" + if support.verbose: + sys.stdout.write("\n") + + server = ThreadedEchoServer(CERTFILE, + certreqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_TLSv1, + cacerts=CERTFILE, + chatty=True, + connectionchatty=False) + flag = threading.Event() + server.start(flag) + # wait for it to start + flag.wait() + # try to connect + s = ssl.wrap_socket(socket.socket(), + server_side=False, + certfile=CERTFILE, + ca_certs=CERTFILE, + cert_reqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_TLSv1) + s.connect((HOST, server.port)) + try: + # get the data + cb_data = s.get_channel_binding("tls-unique") + if support.verbose: + sys.stdout.write(" got channel binding data: {0!r}\n" + .format(cb_data)) + + # check if it is sane + self.assertIsNotNone(cb_data) + self.assertEqual(len(cb_data), 12) # True for TLSv1 + + # and compare with the peers version + s.write(b"CB tls-unique\n") + peer_data_repr = s.read().strip() + self.assertEqual(peer_data_repr, + repr(cb_data).encode("us-ascii")) + s.close() + + # now, again + s = ssl.wrap_socket(socket.socket(), + server_side=False, + certfile=CERTFILE, + ca_certs=CERTFILE, + cert_reqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_TLSv1) + s.connect((HOST, server.port)) + new_cb_data = s.get_channel_binding("tls-unique") + if support.verbose: + sys.stdout.write(" got another channel binding data: {0!r}\n" + .format(new_cb_data)) + # is it really unique + self.assertNotEqual(cb_data, new_cb_data) + self.assertIsNotNone(cb_data) + self.assertEqual(len(cb_data), 12) # True for TLSv1 + s.write(b"CB tls-unique\n") + peer_data_repr = s.read().strip() + self.assertEqual(peer_data_repr, + repr(new_cb_data).encode("us-ascii")) + s.close() + finally: + server.stop() + server.join() def test_main(verbose=False): if support.verbose: |