diff options
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r-- | Lib/test/test_ssl.py | 679 |
1 files changed, 336 insertions, 343 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 645ec8d..e0f231c 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -21,6 +21,13 @@ import functools ssl = support.import_module("ssl") +try: + import threading +except ImportError: + _have_threads = False +else: + _have_threads = True + PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) HOST = support.HOST @@ -53,10 +60,10 @@ CRLFILE = data_file("revocation.crl") # Two keys and certs signed by the same CA (for SNI tests) SIGNED_CERTFILE = data_file("keycert3.pem") SIGNED_CERTFILE2 = data_file("keycert4.pem") -SIGNING_CA = data_file("pycacert.pem") +# Same certificate as pycacert.pem, but without extra text in file +SIGNING_CA = data_file("capath", "ceff1710.0") REMOTE_HOST = "self-signed.pythontest.net" -REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem") EMPTYCERT = data_file("nullcert.pem") BADCERT = data_file("badcert.pem") @@ -328,7 +335,7 @@ class BasicSocketTests(unittest.TestCase): wr = weakref.ref(ss) with support.check_warnings(("", ResourceWarning)): del ss - self.assertEqual(wr(), None) + self.assertEqual(wr(), None) def test_wrapped_unconnected(self): # Methods on an unconnected SSLSocket propagate the original @@ -783,6 +790,22 @@ class BasicSocketTests(unittest.TestCase): self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0) self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT") + def test_connect_ex_error(self): + server = socket.socket(socket.AF_INET) + self.addCleanup(server.close) + port = support.bind_port(server) # Reserve port but don't listen + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED) + self.addCleanup(s.close) + rc = s.connect_ex((HOST, port)) + # Issue #19919: Windows machines or VMs hosted on Windows + # machines sometimes return EWOULDBLOCK. + errors = ( + errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT, + errno.EWOULDBLOCK, + ) + self.assertIn(rc, errors) + class ContextTests(unittest.TestCase): @@ -1368,140 +1391,103 @@ class MemoryBIOTests(unittest.TestCase): self.assertRaises(TypeError, bio.write, 1) -class NetworkedTests(unittest.TestCase): +@unittest.skipUnless(_have_threads, "Needs threading module") +class SimpleBackgroundTests(unittest.TestCase): - def test_connect(self): - with support.transient_internet(REMOTE_HOST): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE) - try: - s.connect((REMOTE_HOST, 443)) - self.assertEqual({}, s.getpeercert()) - finally: - s.close() + """Tests that connect to a simple server running in the background""" - # this should fail because we have no verification certs - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED) - self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", - s.connect, (REMOTE_HOST, 443)) - s.close() + def setUp(self): + server = ThreadedEchoServer(SIGNED_CERTFILE) + self.server_addr = (HOST, server.port) + server.__enter__() + self.addCleanup(server.__exit__, None, None, None) - # this should succeed because we specify the root cert - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=REMOTE_ROOT_CERT) - try: - s.connect((REMOTE_HOST, 443)) - self.assertTrue(s.getpeercert()) - finally: - s.close() + def test_connect(self): + with ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE) as s: + s.connect(self.server_addr) + self.assertEqual({}, s.getpeercert()) + + # this should succeed because we specify the root cert + with ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SIGNING_CA) as s: + s.connect(self.server_addr) + self.assertTrue(s.getpeercert()) + + def test_connect_fail(self): + # This should fail because we have no verification certs. Connection + # failure crashes ThreadedEchoServer, so run this in an independent + # test method. + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED) + self.addCleanup(s.close) + self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", + s.connect, self.server_addr) def test_connect_ex(self): # Issue #11326: check connect_ex() implementation - with support.transient_internet(REMOTE_HOST): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=REMOTE_ROOT_CERT) - try: - self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443))) - self.assertTrue(s.getpeercert()) - finally: - s.close() + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SIGNING_CA) + self.addCleanup(s.close) + self.assertEqual(0, s.connect_ex(self.server_addr)) + self.assertTrue(s.getpeercert()) def test_non_blocking_connect_ex(self): # Issue #11326: non-blocking connect_ex() should allow handshake # to proceed after the socket gets ready. - with support.transient_internet(REMOTE_HOST): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=REMOTE_ROOT_CERT, - do_handshake_on_connect=False) + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SIGNING_CA, + do_handshake_on_connect=False) + self.addCleanup(s.close) + s.setblocking(False) + rc = s.connect_ex(self.server_addr) + # EWOULDBLOCK under Windows, EINPROGRESS elsewhere + self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) + # Wait for connect to finish + select.select([], [s], [], 5.0) + # Non-blocking handshake + while True: try: - s.setblocking(False) - rc = s.connect_ex((REMOTE_HOST, 443)) - # EWOULDBLOCK under Windows, EINPROGRESS elsewhere - self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) - # Wait for connect to finish + s.do_handshake() + break + except ssl.SSLWantReadError: + select.select([s], [], [], 5.0) + except ssl.SSLWantWriteError: select.select([], [s], [], 5.0) - # Non-blocking handshake - while True: - try: - s.do_handshake() - break - except ssl.SSLWantReadError: - select.select([s], [], [], 5.0) - except ssl.SSLWantWriteError: - select.select([], [s], [], 5.0) - # SSL established - self.assertTrue(s.getpeercert()) - finally: - s.close() - - def test_timeout_connect_ex(self): - # Issue #12065: on a timeout, connect_ex() should return the original - # errno (mimicking the behaviour of non-SSL sockets). - with support.transient_internet(REMOTE_HOST): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=REMOTE_ROOT_CERT, - do_handshake_on_connect=False) - try: - s.settimeout(0.0000001) - rc = s.connect_ex((REMOTE_HOST, 443)) - if rc == 0: - self.skipTest("REMOTE_HOST responded too quickly") - self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) - finally: - s.close() - - def test_connect_ex_error(self): - with support.transient_internet(REMOTE_HOST): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=REMOTE_ROOT_CERT) - try: - rc = s.connect_ex((REMOTE_HOST, 444)) - # Issue #19919: Windows machines or VMs hosted on Windows - # machines sometimes return EWOULDBLOCK. - errors = ( - errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT, - errno.EWOULDBLOCK, - ) - self.assertIn(rc, errors) - finally: - s.close() + # SSL established + self.assertTrue(s.getpeercert()) def test_connect_with_context(self): - with support.transient_internet(REMOTE_HOST): - # Same as test_connect, but with a separately created context - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) - s.connect((REMOTE_HOST, 443)) - try: - self.assertEqual({}, s.getpeercert()) - finally: - s.close() - # Same with a server hostname - s = ctx.wrap_socket(socket.socket(socket.AF_INET), - server_hostname=REMOTE_HOST) - s.connect((REMOTE_HOST, 443)) - s.close() - # This should fail because we have no verification certs - ctx.verify_mode = ssl.CERT_REQUIRED - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) - self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", - s.connect, (REMOTE_HOST, 443)) - s.close() - # This should succeed because we specify the root cert - ctx.load_verify_locations(REMOTE_ROOT_CERT) - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) - s.connect((REMOTE_HOST, 443)) - try: - cert = s.getpeercert() - self.assertTrue(cert) - finally: - s.close() + # Same as test_connect, but with a separately created context + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + self.assertEqual({}, s.getpeercert()) + # Same with a server hostname + with ctx.wrap_socket(socket.socket(socket.AF_INET), + server_hostname="dummy") as s: + s.connect(self.server_addr) + ctx.verify_mode = ssl.CERT_REQUIRED + # This should succeed because we specify the root cert + ctx.load_verify_locations(SIGNING_CA) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) + + def test_connect_with_context_fail(self): + # This should fail because we have no verification certs. Connection + # failure crashes ThreadedEchoServer, so run this in an independent + # test method. + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + s = ctx.wrap_socket(socket.socket(socket.AF_INET)) + self.addCleanup(s.close) + self.assertRaisesRegex(ssl.SSLError, "certificate verify failed", + s.connect, self.server_addr) def test_connect_capath(self): # Verify server certificates using the `capath` argument @@ -1509,198 +1495,130 @@ class NetworkedTests(unittest.TestCase): # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must # contain both versions of each certificate (same content, different # filename) for this test to be portable across OpenSSL releases. - with support.transient_internet(REMOTE_HOST): - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(capath=CAPATH) - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) - s.connect((REMOTE_HOST, 443)) - try: - cert = s.getpeercert() - self.assertTrue(cert) - finally: - s.close() - # Same with a bytes `capath` argument - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(capath=BYTES_CAPATH) - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) - s.connect((REMOTE_HOST, 443)) - try: - cert = s.getpeercert() - self.assertTrue(cert) - finally: - s.close() + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) + # Same with a bytes `capath` argument + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=BYTES_CAPATH) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) def test_connect_cadata(self): - with open(REMOTE_ROOT_CERT) as f: + with open(SIGNING_CA) as f: pem = f.read() der = ssl.PEM_cert_to_DER_cert(pem) - with support.transient_internet(REMOTE_HOST): - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(cadata=pem) - with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: - s.connect((REMOTE_HOST, 443)) - cert = s.getpeercert() - self.assertTrue(cert) + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(cadata=pem) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) - # same with DER - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(cadata=der) - with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: - s.connect((REMOTE_HOST, 443)) - cert = s.getpeercert() - self.assertTrue(cert) + # same with DER + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(cadata=der) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") def test_makefile_close(self): # Issue #5238: creating a file-like object with makefile() shouldn't # delay closing the underlying "real socket" (here tested with its # file descriptor, hence skipping the test under Windows). - with support.transient_internet(REMOTE_HOST): - ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) - ss.connect((REMOTE_HOST, 443)) - fd = ss.fileno() - f = ss.makefile() - f.close() - # The fd is still open + ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) + ss.connect(self.server_addr) + fd = ss.fileno() + f = ss.makefile() + f.close() + # The fd is still open + os.read(fd, 0) + # Closing the SSL socket should close the fd too + ss.close() + gc.collect() + with self.assertRaises(OSError) as e: os.read(fd, 0) - # Closing the SSL socket should close the fd too - ss.close() - gc.collect() - with self.assertRaises(OSError) as e: - os.read(fd, 0) - self.assertEqual(e.exception.errno, errno.EBADF) + self.assertEqual(e.exception.errno, errno.EBADF) def test_non_blocking_handshake(self): - with support.transient_internet(REMOTE_HOST): - s = socket.socket(socket.AF_INET) - s.connect((REMOTE_HOST, 443)) - s.setblocking(False) - s = ssl.wrap_socket(s, - cert_reqs=ssl.CERT_NONE, - do_handshake_on_connect=False) - count = 0 - while True: - try: - count += 1 - s.do_handshake() - break - except ssl.SSLWantReadError: - select.select([s], [], []) - except ssl.SSLWantWriteError: - select.select([], [s], []) - s.close() - if support.verbose: - sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) + s = socket.socket(socket.AF_INET) + s.connect(self.server_addr) + s.setblocking(False) + s = ssl.wrap_socket(s, + cert_reqs=ssl.CERT_NONE, + do_handshake_on_connect=False) + self.addCleanup(s.close) + count = 0 + while True: + try: + count += 1 + s.do_handshake() + break + except ssl.SSLWantReadError: + select.select([s], [], []) + except ssl.SSLWantWriteError: + select.select([], [s], []) + if support.verbose: + sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) def test_get_server_certificate(self): - def _test_get_server_certificate(host, port, cert=None): - with support.transient_internet(host): - pem = ssl.get_server_certificate((host, port)) - if not pem: - self.fail("No server certificate on %s:%s!" % (host, port)) - - try: - pem = ssl.get_server_certificate((host, port), - ca_certs=CERTFILE) - except ssl.SSLError as x: - #should fail - if support.verbose: - sys.stdout.write("%s\n" % x) - else: - self.fail("Got server certificate %s for %s:%s!" % (pem, host, port)) + _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA) - pem = ssl.get_server_certificate((host, port), - ca_certs=cert) - if not pem: - self.fail("No server certificate on %s:%s!" % (host, port)) - if support.verbose: - sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem)) - - _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT) - if support.IPV6_ENABLED: - _test_get_server_certificate('ipv6.google.com', 443) + def test_get_server_certificate_fail(self): + # Connection failure crashes ThreadedEchoServer, so run this in an + # independent test method + _test_get_server_certificate_fail(self, *self.server_addr) def test_ciphers(self): - remote = (REMOTE_HOST, 443) - with support.transient_internet(remote[0]): - with ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s: - s.connect(remote) - with ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s: - s.connect(remote) - # Error checking can happen at instantiation or when connecting - with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): - with socket.socket(socket.AF_INET) as sock: - s = ssl.wrap_socket(sock, - cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") - s.connect(remote) - - def test_algorithms(self): - # Issue #8484: all algorithms should be available when verifying a - # certificate. - # SHA256 was added in OpenSSL 0.9.8 - if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): - self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) - # sha256.tbs-internet.com needs SNI to use the correct certificate - if not ssl.HAS_SNI: - self.skipTest("SNI needed for this test") - # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host) - remote = ("sha256.tbs-internet.com", 443) - sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") - with support.transient_internet("sha256.tbs-internet.com"): - ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) - ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(sha256_cert) - s = ctx.wrap_socket(socket.socket(socket.AF_INET), - server_hostname="sha256.tbs-internet.com") - try: - s.connect(remote) - if support.verbose: - sys.stdout.write("\nCipher with %r is %r\n" % - (remote, s.cipher())) - sys.stdout.write("Certificate is:\n%s\n" % - pprint.pformat(s.getpeercert())) - finally: - s.close() + with ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s: + s.connect(self.server_addr) + with ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s: + s.connect(self.server_addr) + # Error checking can happen at instantiation or when connecting + with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): + with socket.socket(socket.AF_INET) as sock: + s = ssl.wrap_socket(sock, + cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") + s.connect(self.server_addr) def test_get_ca_certs_capath(self): # capath certs are loaded on request - with support.transient_internet(REMOTE_HOST): - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(capath=CAPATH) - self.assertEqual(ctx.get_ca_certs(), []) - s = ctx.wrap_socket(socket.socket(socket.AF_INET)) - s.connect((REMOTE_HOST, 443)) - try: - cert = s.getpeercert() - self.assertTrue(cert) - finally: - s.close() - self.assertEqual(len(ctx.get_ca_certs()), 1) + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(capath=CAPATH) + self.assertEqual(ctx.get_ca_certs(), []) + with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: + s.connect(self.server_addr) + cert = s.getpeercert() + self.assertTrue(cert) + self.assertEqual(len(ctx.get_ca_certs()), 1) @needs_sni def test_context_setget(self): # Check that the context of a connected socket can be replaced. - with support.transient_internet(REMOTE_HOST): - ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1) - ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - s = socket.socket(socket.AF_INET) - with ctx1.wrap_socket(s) as ss: - ss.connect((REMOTE_HOST, 443)) - self.assertIs(ss.context, ctx1) - self.assertIs(ss._sslobj.context, ctx1) - ss.context = ctx2 - self.assertIs(ss.context, ctx2) - self.assertIs(ss._sslobj.context, ctx2) - - -class NetworkedBIOTests(unittest.TestCase): + ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + s = socket.socket(socket.AF_INET) + with ctx1.wrap_socket(s) as ss: + ss.connect(self.server_addr) + self.assertIs(ss.context, ctx1) + self.assertIs(ss._sslobj.context, ctx1) + ss.context = ctx2 + self.assertIs(ss.context, ctx2) + self.assertIs(ss._sslobj.context, ctx2) def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs): # A simple IO loop. Call func(*args) depending on the error we get @@ -1736,64 +1654,128 @@ class NetworkedBIOTests(unittest.TestCase): % (count, func.__name__)) return ret - def test_handshake(self): + def test_bio_handshake(self): + sock = socket.socket(socket.AF_INET) + self.addCleanup(sock.close) + sock.connect(self.server_addr) + incoming = ssl.MemoryBIO() + outgoing = ssl.MemoryBIO() + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.load_verify_locations(SIGNING_CA) + ctx.check_hostname = True + sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost') + self.assertIs(sslobj._sslobj.owner, sslobj) + self.assertIsNone(sslobj.cipher()) + self.assertIsNone(sslobj.shared_ciphers()) + self.assertRaises(ValueError, sslobj.getpeercert) + if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: + self.assertIsNone(sslobj.get_channel_binding('tls-unique')) + self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) + self.assertTrue(sslobj.cipher()) + self.assertIsNone(sslobj.shared_ciphers()) + self.assertTrue(sslobj.getpeercert()) + if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: + self.assertTrue(sslobj.get_channel_binding('tls-unique')) + try: + self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) + except ssl.SSLSyscallError: + # If the server shuts down the TCP connection without sending a + # secure shutdown message, this is reported as SSL_ERROR_SYSCALL + pass + self.assertRaises(ssl.SSLError, sslobj.write, b'foo') + + def test_bio_read_write_data(self): + sock = socket.socket(socket.AF_INET) + self.addCleanup(sock.close) + sock.connect(self.server_addr) + incoming = ssl.MemoryBIO() + outgoing = ssl.MemoryBIO() + ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ctx.verify_mode = ssl.CERT_NONE + sslobj = ctx.wrap_bio(incoming, outgoing, False) + self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) + req = b'FOO\n' + self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req) + buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024) + self.assertEqual(buf, b'foo\n') + self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) + + +class NetworkedTests(unittest.TestCase): + + def test_timeout_connect_ex(self): + # Issue #12065: on a timeout, connect_ex() should return the original + # errno (mimicking the behaviour of non-SSL sockets). with support.transient_internet(REMOTE_HOST): - sock = socket.socket(socket.AF_INET) - sock.connect((REMOTE_HOST, 443)) - incoming = ssl.MemoryBIO() - outgoing = ssl.MemoryBIO() - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + do_handshake_on_connect=False) + self.addCleanup(s.close) + s.settimeout(0.0000001) + rc = s.connect_ex((REMOTE_HOST, 443)) + if rc == 0: + self.skipTest("REMOTE_HOST responded too quickly") + self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) + + @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6') + def test_get_server_certificate_ipv6(self): + with support.transient_internet('ipv6.google.com'): + _test_get_server_certificate(self, 'ipv6.google.com', 443) + _test_get_server_certificate_fail(self, 'ipv6.google.com', 443) + + def test_algorithms(self): + # Issue #8484: all algorithms should be available when verifying a + # certificate. + # SHA256 was added in OpenSSL 0.9.8 + if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): + self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) + # sha256.tbs-internet.com needs SNI to use the correct certificate + if not ssl.HAS_SNI: + self.skipTest("SNI needed for this test") + # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host) + remote = ("sha256.tbs-internet.com", 443) + sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") + with support.transient_internet("sha256.tbs-internet.com"): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) ctx.verify_mode = ssl.CERT_REQUIRED - ctx.load_verify_locations(REMOTE_ROOT_CERT) - ctx.check_hostname = True - sslobj = ctx.wrap_bio(incoming, outgoing, False, REMOTE_HOST) - self.assertIs(sslobj._sslobj.owner, sslobj) - self.assertIsNone(sslobj.cipher()) - self.assertIsNone(sslobj.shared_ciphers()) - self.assertRaises(ValueError, sslobj.getpeercert) - if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: - self.assertIsNone(sslobj.get_channel_binding('tls-unique')) - self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) - self.assertTrue(sslobj.cipher()) - self.assertIsNone(sslobj.shared_ciphers()) - self.assertTrue(sslobj.getpeercert()) - if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: - self.assertTrue(sslobj.get_channel_binding('tls-unique')) + ctx.load_verify_locations(sha256_cert) + s = ctx.wrap_socket(socket.socket(socket.AF_INET), + server_hostname="sha256.tbs-internet.com") try: - self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) - except ssl.SSLSyscallError: - # self-signed.pythontest.net probably shuts down the TCP - # connection without sending a secure shutdown message, and - # this is reported as SSL_ERROR_SYSCALL - pass - self.assertRaises(ssl.SSLError, sslobj.write, b'foo') - sock.close() + s.connect(remote) + if support.verbose: + sys.stdout.write("\nCipher with %r is %r\n" % + (remote, s.cipher())) + sys.stdout.write("Certificate is:\n%s\n" % + pprint.pformat(s.getpeercert())) + finally: + s.close() - def test_read_write_data(self): - with support.transient_internet(REMOTE_HOST): - sock = socket.socket(socket.AF_INET) - sock.connect((REMOTE_HOST, 443)) - incoming = ssl.MemoryBIO() - outgoing = ssl.MemoryBIO() - ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - ctx.verify_mode = ssl.CERT_NONE - sslobj = ctx.wrap_bio(incoming, outgoing, False) - self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) - req = b'GET / HTTP/1.0\r\n\r\n' - self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req) - buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024) - self.assertEqual(buf[:5], b'HTTP/') - self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) - sock.close() +def _test_get_server_certificate(test, host, port, cert=None): + pem = ssl.get_server_certificate((host, port)) + if not pem: + test.fail("No server certificate on %s:%s!" % (host, port)) + + pem = ssl.get_server_certificate((host, port), ca_certs=cert) + if not pem: + test.fail("No server certificate on %s:%s!" % (host, port)) + if support.verbose: + sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem)) + +def _test_get_server_certificate_fail(test, host, port): + try: + pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE) + except ssl.SSLError as x: + #should fail + if support.verbose: + sys.stdout.write("%s\n" % x) + else: + test.fail("Got server certificate %s for %s:%s!" % (pem, host, port)) -try: - import threading -except ImportError: - _have_threads = False -else: - _have_threads = True +if _have_threads: from test.ssl_servers import make_https_server class ThreadedEchoServer(threading.Thread): @@ -1881,6 +1863,15 @@ else: if not stripped: # eof, so quit this handler self.running = False + try: + self.sock = self.sslconn.unwrap() + except OSError: + # Many tests shut the TCP connection down + # without an SSL shutdown. This causes + # unwrap() to raise OSError with errno=0! + pass + else: + self.sslconn = None self.close() elif stripped == b'over': if support.verbose and self.server.connectionchatty: @@ -3360,18 +3351,20 @@ def test_main(verbose=False): pass for filename in [ - CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE, + CERTFILE, BYTES_CERTFILE, ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY, SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA, BADCERT, BADKEY, EMPTYCERT]: if not os.path.exists(filename): raise support.TestFailed("Can't read certificate file %r" % filename) - tests = [ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests] + tests = [ + ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests, + SimpleBackgroundTests, + ] if support.is_resource_enabled('network'): tests.append(NetworkedTests) - tests.append(NetworkedBIOTests) if _have_threads: thread_info = support.threading_setup() |