diff options
Diffstat (limited to 'Lib/test/test_asyncio')
-rw-r--r-- | Lib/test/test_asyncio/test_events.py | 75 |
1 files changed, 63 insertions, 12 deletions
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 8fbba8f..ba1fa5d 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -57,6 +57,17 @@ ONLYCERT = data_file('ssl_cert.pem') ONLYKEY = data_file('ssl_key.pem') SIGNED_CERTFILE = data_file('keycert3.pem') SIGNING_CA = data_file('pycacert.pem') +PEERCERT = {'serialNumber': 'B09264B1F2DA21D1', + 'version': 1, + 'subject': ((('countryName', 'XY'),), + (('localityName', 'Castle Anthrax'),), + (('organizationName', 'Python Software Foundation'),), + (('commonName', 'localhost'),)), + 'issuer': ((('countryName', 'XY'),), + (('organizationName', 'Python Software Foundation CA'),), + (('commonName', 'our-ca-server'),)), + 'notAfter': 'Nov 13 19:47:07 2022 GMT', + 'notBefore': 'Jan 4 19:47:07 2013 GMT'} class MyBaseProto(asyncio.Protocol): @@ -596,22 +607,56 @@ class EventLoopTestsMixin: self.assertGreater(pr.nbytes, 0) tr.close() + def check_ssl_extra_info(self, client, check_sockname=True, + peername=None, peercert={}): + if check_sockname: + self.assertIsNotNone(client.get_extra_info('sockname')) + if peername: + self.assertEqual(peername, + client.get_extra_info('peername')) + else: + self.assertIsNotNone(client.get_extra_info('peername')) + self.assertEqual(peercert, + client.get_extra_info('peercert')) + + # Python disables compression to prevent CRIME attacks by default + self.assertIsNone(client.get_extra_info('compression')) + + # test SSL cipher + cipher = client.get_extra_info('cipher') + self.assertIsInstance(cipher, tuple) + self.assertEqual(len(cipher), 3, cipher) + self.assertIsInstance(cipher[0], str) + self.assertIsInstance(cipher[1], str) + self.assertIsInstance(cipher[2], int) + + # test SSL object + sslobj = client.get_extra_info('ssl_object') + self.assertIsNotNone(sslobj) + self.assertEqual(sslobj.compression(), + client.get_extra_info('compression')) + self.assertEqual(sslobj.cipher(), + client.get_extra_info('cipher')) + self.assertEqual(sslobj.getpeercert(), + client.get_extra_info('peercert')) + def _basetest_create_ssl_connection(self, connection_fut, - check_sockname=True): + check_sockname=True, + peername=None): tr, pr = self.loop.run_until_complete(connection_fut) self.assertIsInstance(tr, asyncio.Transport) self.assertIsInstance(pr, asyncio.Protocol) self.assertTrue('ssl' in tr.__class__.__name__.lower()) - if check_sockname: - self.assertIsNotNone(tr.get_extra_info('sockname')) + self.check_ssl_extra_info(tr, check_sockname, peername) self.loop.run_until_complete(pr.done) self.assertGreater(pr.nbytes, 0) tr.close() def _test_create_ssl_connection(self, httpd, create_connection, - check_sockname=True): + check_sockname=True, peername=None): conn_fut = create_connection(ssl=test_utils.dummy_ssl_context()) - self._basetest_create_ssl_connection(conn_fut, check_sockname) + self._basetest_create_ssl_connection(conn_fut, check_sockname, + peername) # ssl.Purpose was introduced in Python 3.4 if hasattr(ssl, 'Purpose'): @@ -629,7 +674,8 @@ class EventLoopTestsMixin: with mock.patch('ssl.create_default_context', side_effect=_dummy_ssl_create_context) as m: conn_fut = create_connection(ssl=True) - self._basetest_create_ssl_connection(conn_fut, check_sockname) + self._basetest_create_ssl_connection(conn_fut, check_sockname, + peername) self.assertEqual(m.call_count, 1) # With the real ssl.create_default_context(), certificate @@ -638,7 +684,8 @@ class EventLoopTestsMixin: conn_fut = create_connection(ssl=True) # Ignore the "SSL handshake failed" log in debug mode with test_utils.disable_logger(): - self._basetest_create_ssl_connection(conn_fut, check_sockname) + self._basetest_create_ssl_connection(conn_fut, check_sockname, + peername) self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED') @@ -649,7 +696,8 @@ class EventLoopTestsMixin: self.loop.create_connection, lambda: MyProto(loop=self.loop), *httpd.address) - self._test_create_ssl_connection(httpd, create_connection) + self._test_create_ssl_connection(httpd, create_connection, + peername=httpd.address) def test_legacy_create_ssl_connection(self): with test_utils.force_legacy_ssl_support(): @@ -669,7 +717,8 @@ class EventLoopTestsMixin: server_hostname='127.0.0.1') self._test_create_ssl_connection(httpd, create_connection, - check_sockname) + check_sockname, + peername=httpd.address) def test_legacy_create_ssl_unix_connection(self): with test_utils.force_legacy_ssl_support(): @@ -819,9 +868,7 @@ class EventLoopTestsMixin: self.assertEqual(3, proto.nbytes) # extra info is available - self.assertIsNotNone(proto.transport.get_extra_info('sockname')) - self.assertEqual('127.0.0.1', - proto.transport.get_extra_info('peername')[0]) + self.check_ssl_extra_info(client, peername=(host, port)) # close connection proto.transport.close() @@ -1023,6 +1070,10 @@ class EventLoopTestsMixin: server_hostname='localhost') client, pr = self.loop.run_until_complete(f_c) + # extra info is available + self.check_ssl_extra_info(client,peername=(host, port), + peercert=PEERCERT) + # close connection proto.transport.close() client.close() |