summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_ssl.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r--Lib/test/test_ssl.py679
1 files changed, 336 insertions, 343 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 645ec8d..e0f231c 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -21,6 +21,13 @@ import functools
ssl = support.import_module("ssl")
+try:
+ import threading
+except ImportError:
+ _have_threads = False
+else:
+ _have_threads = True
+
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
HOST = support.HOST
@@ -53,10 +60,10 @@ CRLFILE = data_file("revocation.crl")
# Two keys and certs signed by the same CA (for SNI tests)
SIGNED_CERTFILE = data_file("keycert3.pem")
SIGNED_CERTFILE2 = data_file("keycert4.pem")
-SIGNING_CA = data_file("pycacert.pem")
+# Same certificate as pycacert.pem, but without extra text in file
+SIGNING_CA = data_file("capath", "ceff1710.0")
REMOTE_HOST = "self-signed.pythontest.net"
-REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem")
EMPTYCERT = data_file("nullcert.pem")
BADCERT = data_file("badcert.pem")
@@ -328,7 +335,7 @@ class BasicSocketTests(unittest.TestCase):
wr = weakref.ref(ss)
with support.check_warnings(("", ResourceWarning)):
del ss
- self.assertEqual(wr(), None)
+ self.assertEqual(wr(), None)
def test_wrapped_unconnected(self):
# Methods on an unconnected SSLSocket propagate the original
@@ -783,6 +790,22 @@ class BasicSocketTests(unittest.TestCase):
self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
+ def test_connect_ex_error(self):
+ server = socket.socket(socket.AF_INET)
+ self.addCleanup(server.close)
+ port = support.bind_port(server) # Reserve port but don't listen
+ s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED)
+ self.addCleanup(s.close)
+ rc = s.connect_ex((HOST, port))
+ # Issue #19919: Windows machines or VMs hosted on Windows
+ # machines sometimes return EWOULDBLOCK.
+ errors = (
+ errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
+ errno.EWOULDBLOCK,
+ )
+ self.assertIn(rc, errors)
+
class ContextTests(unittest.TestCase):
@@ -1368,140 +1391,103 @@ class MemoryBIOTests(unittest.TestCase):
self.assertRaises(TypeError, bio.write, 1)
-class NetworkedTests(unittest.TestCase):
+@unittest.skipUnless(_have_threads, "Needs threading module")
+class SimpleBackgroundTests(unittest.TestCase):
- def test_connect(self):
- with support.transient_internet(REMOTE_HOST):
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_NONE)
- try:
- s.connect((REMOTE_HOST, 443))
- self.assertEqual({}, s.getpeercert())
- finally:
- s.close()
+ """Tests that connect to a simple server running in the background"""
- # this should fail because we have no verification certs
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED)
- self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
- s.connect, (REMOTE_HOST, 443))
- s.close()
+ def setUp(self):
+ server = ThreadedEchoServer(SIGNED_CERTFILE)
+ self.server_addr = (HOST, server.port)
+ server.__enter__()
+ self.addCleanup(server.__exit__, None, None, None)
- # this should succeed because we specify the root cert
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=REMOTE_ROOT_CERT)
- try:
- s.connect((REMOTE_HOST, 443))
- self.assertTrue(s.getpeercert())
- finally:
- s.close()
+ def test_connect(self):
+ with ssl.wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_NONE) as s:
+ s.connect(self.server_addr)
+ self.assertEqual({}, s.getpeercert())
+
+ # this should succeed because we specify the root cert
+ with ssl.wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED,
+ ca_certs=SIGNING_CA) as s:
+ s.connect(self.server_addr)
+ self.assertTrue(s.getpeercert())
+
+ def test_connect_fail(self):
+ # This should fail because we have no verification certs. Connection
+ # failure crashes ThreadedEchoServer, so run this in an independent
+ # test method.
+ s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED)
+ self.addCleanup(s.close)
+ self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
+ s.connect, self.server_addr)
def test_connect_ex(self):
# Issue #11326: check connect_ex() implementation
- with support.transient_internet(REMOTE_HOST):
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=REMOTE_ROOT_CERT)
- try:
- self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443)))
- self.assertTrue(s.getpeercert())
- finally:
- s.close()
+ s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED,
+ ca_certs=SIGNING_CA)
+ self.addCleanup(s.close)
+ self.assertEqual(0, s.connect_ex(self.server_addr))
+ self.assertTrue(s.getpeercert())
def test_non_blocking_connect_ex(self):
# Issue #11326: non-blocking connect_ex() should allow handshake
# to proceed after the socket gets ready.
- with support.transient_internet(REMOTE_HOST):
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=REMOTE_ROOT_CERT,
- do_handshake_on_connect=False)
+ s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED,
+ ca_certs=SIGNING_CA,
+ do_handshake_on_connect=False)
+ self.addCleanup(s.close)
+ s.setblocking(False)
+ rc = s.connect_ex(self.server_addr)
+ # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
+ self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
+ # Wait for connect to finish
+ select.select([], [s], [], 5.0)
+ # Non-blocking handshake
+ while True:
try:
- s.setblocking(False)
- rc = s.connect_ex((REMOTE_HOST, 443))
- # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
- self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
- # Wait for connect to finish
+ s.do_handshake()
+ break
+ except ssl.SSLWantReadError:
+ select.select([s], [], [], 5.0)
+ except ssl.SSLWantWriteError:
select.select([], [s], [], 5.0)
- # Non-blocking handshake
- while True:
- try:
- s.do_handshake()
- break
- except ssl.SSLWantReadError:
- select.select([s], [], [], 5.0)
- except ssl.SSLWantWriteError:
- select.select([], [s], [], 5.0)
- # SSL established
- self.assertTrue(s.getpeercert())
- finally:
- s.close()
-
- def test_timeout_connect_ex(self):
- # Issue #12065: on a timeout, connect_ex() should return the original
- # errno (mimicking the behaviour of non-SSL sockets).
- with support.transient_internet(REMOTE_HOST):
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=REMOTE_ROOT_CERT,
- do_handshake_on_connect=False)
- try:
- s.settimeout(0.0000001)
- rc = s.connect_ex((REMOTE_HOST, 443))
- if rc == 0:
- self.skipTest("REMOTE_HOST responded too quickly")
- self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
- finally:
- s.close()
-
- def test_connect_ex_error(self):
- with support.transient_internet(REMOTE_HOST):
- s = ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=REMOTE_ROOT_CERT)
- try:
- rc = s.connect_ex((REMOTE_HOST, 444))
- # Issue #19919: Windows machines or VMs hosted on Windows
- # machines sometimes return EWOULDBLOCK.
- errors = (
- errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
- errno.EWOULDBLOCK,
- )
- self.assertIn(rc, errors)
- finally:
- s.close()
+ # SSL established
+ self.assertTrue(s.getpeercert())
def test_connect_with_context(self):
- with support.transient_internet(REMOTE_HOST):
- # Same as test_connect, but with a separately created context
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect((REMOTE_HOST, 443))
- try:
- self.assertEqual({}, s.getpeercert())
- finally:
- s.close()
- # Same with a server hostname
- s = ctx.wrap_socket(socket.socket(socket.AF_INET),
- server_hostname=REMOTE_HOST)
- s.connect((REMOTE_HOST, 443))
- s.close()
- # This should fail because we have no verification certs
- ctx.verify_mode = ssl.CERT_REQUIRED
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
- s.connect, (REMOTE_HOST, 443))
- s.close()
- # This should succeed because we specify the root cert
- ctx.load_verify_locations(REMOTE_ROOT_CERT)
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect((REMOTE_HOST, 443))
- try:
- cert = s.getpeercert()
- self.assertTrue(cert)
- finally:
- s.close()
+ # Same as test_connect, but with a separately created context
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ self.assertEqual({}, s.getpeercert())
+ # Same with a server hostname
+ with ctx.wrap_socket(socket.socket(socket.AF_INET),
+ server_hostname="dummy") as s:
+ s.connect(self.server_addr)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ # This should succeed because we specify the root cert
+ ctx.load_verify_locations(SIGNING_CA)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ cert = s.getpeercert()
+ self.assertTrue(cert)
+
+ def test_connect_with_context_fail(self):
+ # This should fail because we have no verification certs. Connection
+ # failure crashes ThreadedEchoServer, so run this in an independent
+ # test method.
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ s = ctx.wrap_socket(socket.socket(socket.AF_INET))
+ self.addCleanup(s.close)
+ self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
+ s.connect, self.server_addr)
def test_connect_capath(self):
# Verify server certificates using the `capath` argument
@@ -1509,198 +1495,130 @@ class NetworkedTests(unittest.TestCase):
# OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
# contain both versions of each certificate (same content, different
# filename) for this test to be portable across OpenSSL releases.
- with support.transient_internet(REMOTE_HOST):
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(capath=CAPATH)
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect((REMOTE_HOST, 443))
- try:
- cert = s.getpeercert()
- self.assertTrue(cert)
- finally:
- s.close()
- # Same with a bytes `capath` argument
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(capath=BYTES_CAPATH)
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect((REMOTE_HOST, 443))
- try:
- cert = s.getpeercert()
- self.assertTrue(cert)
- finally:
- s.close()
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(capath=CAPATH)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ cert = s.getpeercert()
+ self.assertTrue(cert)
+ # Same with a bytes `capath` argument
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(capath=BYTES_CAPATH)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ cert = s.getpeercert()
+ self.assertTrue(cert)
def test_connect_cadata(self):
- with open(REMOTE_ROOT_CERT) as f:
+ with open(SIGNING_CA) as f:
pem = f.read()
der = ssl.PEM_cert_to_DER_cert(pem)
- with support.transient_internet(REMOTE_HOST):
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(cadata=pem)
- with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
- s.connect((REMOTE_HOST, 443))
- cert = s.getpeercert()
- self.assertTrue(cert)
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(cadata=pem)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ cert = s.getpeercert()
+ self.assertTrue(cert)
- # same with DER
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(cadata=der)
- with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
- s.connect((REMOTE_HOST, 443))
- cert = s.getpeercert()
- self.assertTrue(cert)
+ # same with DER
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(cadata=der)
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ cert = s.getpeercert()
+ self.assertTrue(cert)
@unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
def test_makefile_close(self):
# Issue #5238: creating a file-like object with makefile() shouldn't
# delay closing the underlying "real socket" (here tested with its
# file descriptor, hence skipping the test under Windows).
- with support.transient_internet(REMOTE_HOST):
- ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
- ss.connect((REMOTE_HOST, 443))
- fd = ss.fileno()
- f = ss.makefile()
- f.close()
- # The fd is still open
+ ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
+ ss.connect(self.server_addr)
+ fd = ss.fileno()
+ f = ss.makefile()
+ f.close()
+ # The fd is still open
+ os.read(fd, 0)
+ # Closing the SSL socket should close the fd too
+ ss.close()
+ gc.collect()
+ with self.assertRaises(OSError) as e:
os.read(fd, 0)
- # Closing the SSL socket should close the fd too
- ss.close()
- gc.collect()
- with self.assertRaises(OSError) as e:
- os.read(fd, 0)
- self.assertEqual(e.exception.errno, errno.EBADF)
+ self.assertEqual(e.exception.errno, errno.EBADF)
def test_non_blocking_handshake(self):
- with support.transient_internet(REMOTE_HOST):
- s = socket.socket(socket.AF_INET)
- s.connect((REMOTE_HOST, 443))
- s.setblocking(False)
- s = ssl.wrap_socket(s,
- cert_reqs=ssl.CERT_NONE,
- do_handshake_on_connect=False)
- count = 0
- while True:
- try:
- count += 1
- s.do_handshake()
- break
- except ssl.SSLWantReadError:
- select.select([s], [], [])
- except ssl.SSLWantWriteError:
- select.select([], [s], [])
- s.close()
- if support.verbose:
- sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
+ s = socket.socket(socket.AF_INET)
+ s.connect(self.server_addr)
+ s.setblocking(False)
+ s = ssl.wrap_socket(s,
+ cert_reqs=ssl.CERT_NONE,
+ do_handshake_on_connect=False)
+ self.addCleanup(s.close)
+ count = 0
+ while True:
+ try:
+ count += 1
+ s.do_handshake()
+ break
+ except ssl.SSLWantReadError:
+ select.select([s], [], [])
+ except ssl.SSLWantWriteError:
+ select.select([], [s], [])
+ if support.verbose:
+ sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
def test_get_server_certificate(self):
- def _test_get_server_certificate(host, port, cert=None):
- with support.transient_internet(host):
- pem = ssl.get_server_certificate((host, port))
- if not pem:
- self.fail("No server certificate on %s:%s!" % (host, port))
-
- try:
- pem = ssl.get_server_certificate((host, port),
- ca_certs=CERTFILE)
- except ssl.SSLError as x:
- #should fail
- if support.verbose:
- sys.stdout.write("%s\n" % x)
- else:
- self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
+ _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
- pem = ssl.get_server_certificate((host, port),
- ca_certs=cert)
- if not pem:
- self.fail("No server certificate on %s:%s!" % (host, port))
- if support.verbose:
- sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
-
- _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT)
- if support.IPV6_ENABLED:
- _test_get_server_certificate('ipv6.google.com', 443)
+ def test_get_server_certificate_fail(self):
+ # Connection failure crashes ThreadedEchoServer, so run this in an
+ # independent test method
+ _test_get_server_certificate_fail(self, *self.server_addr)
def test_ciphers(self):
- remote = (REMOTE_HOST, 443)
- with support.transient_internet(remote[0]):
- with ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
- s.connect(remote)
- with ssl.wrap_socket(socket.socket(socket.AF_INET),
- cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
- s.connect(remote)
- # Error checking can happen at instantiation or when connecting
- with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
- with socket.socket(socket.AF_INET) as sock:
- s = ssl.wrap_socket(sock,
- cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
- s.connect(remote)
-
- def test_algorithms(self):
- # Issue #8484: all algorithms should be available when verifying a
- # certificate.
- # SHA256 was added in OpenSSL 0.9.8
- if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
- self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
- # sha256.tbs-internet.com needs SNI to use the correct certificate
- if not ssl.HAS_SNI:
- self.skipTest("SNI needed for this test")
- # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
- remote = ("sha256.tbs-internet.com", 443)
- sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
- with support.transient_internet("sha256.tbs-internet.com"):
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(sha256_cert)
- s = ctx.wrap_socket(socket.socket(socket.AF_INET),
- server_hostname="sha256.tbs-internet.com")
- try:
- s.connect(remote)
- if support.verbose:
- sys.stdout.write("\nCipher with %r is %r\n" %
- (remote, s.cipher()))
- sys.stdout.write("Certificate is:\n%s\n" %
- pprint.pformat(s.getpeercert()))
- finally:
- s.close()
+ with ssl.wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
+ s.connect(self.server_addr)
+ with ssl.wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
+ s.connect(self.server_addr)
+ # Error checking can happen at instantiation or when connecting
+ with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
+ with socket.socket(socket.AF_INET) as sock:
+ s = ssl.wrap_socket(sock,
+ cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
+ s.connect(self.server_addr)
def test_get_ca_certs_capath(self):
# capath certs are loaded on request
- with support.transient_internet(REMOTE_HOST):
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(capath=CAPATH)
- self.assertEqual(ctx.get_ca_certs(), [])
- s = ctx.wrap_socket(socket.socket(socket.AF_INET))
- s.connect((REMOTE_HOST, 443))
- try:
- cert = s.getpeercert()
- self.assertTrue(cert)
- finally:
- s.close()
- self.assertEqual(len(ctx.get_ca_certs()), 1)
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(capath=CAPATH)
+ self.assertEqual(ctx.get_ca_certs(), [])
+ with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+ s.connect(self.server_addr)
+ cert = s.getpeercert()
+ self.assertTrue(cert)
+ self.assertEqual(len(ctx.get_ca_certs()), 1)
@needs_sni
def test_context_setget(self):
# Check that the context of a connected socket can be replaced.
- with support.transient_internet(REMOTE_HOST):
- ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
- ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- s = socket.socket(socket.AF_INET)
- with ctx1.wrap_socket(s) as ss:
- ss.connect((REMOTE_HOST, 443))
- self.assertIs(ss.context, ctx1)
- self.assertIs(ss._sslobj.context, ctx1)
- ss.context = ctx2
- self.assertIs(ss.context, ctx2)
- self.assertIs(ss._sslobj.context, ctx2)
-
-
-class NetworkedBIOTests(unittest.TestCase):
+ ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
+ ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ s = socket.socket(socket.AF_INET)
+ with ctx1.wrap_socket(s) as ss:
+ ss.connect(self.server_addr)
+ self.assertIs(ss.context, ctx1)
+ self.assertIs(ss._sslobj.context, ctx1)
+ ss.context = ctx2
+ self.assertIs(ss.context, ctx2)
+ self.assertIs(ss._sslobj.context, ctx2)
def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
# A simple IO loop. Call func(*args) depending on the error we get
@@ -1736,64 +1654,128 @@ class NetworkedBIOTests(unittest.TestCase):
% (count, func.__name__))
return ret
- def test_handshake(self):
+ def test_bio_handshake(self):
+ sock = socket.socket(socket.AF_INET)
+ self.addCleanup(sock.close)
+ sock.connect(self.server_addr)
+ incoming = ssl.MemoryBIO()
+ outgoing = ssl.MemoryBIO()
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_verify_locations(SIGNING_CA)
+ ctx.check_hostname = True
+ sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost')
+ self.assertIs(sslobj._sslobj.owner, sslobj)
+ self.assertIsNone(sslobj.cipher())
+ self.assertIsNone(sslobj.shared_ciphers())
+ self.assertRaises(ValueError, sslobj.getpeercert)
+ if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
+ self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
+ self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
+ self.assertTrue(sslobj.cipher())
+ self.assertIsNone(sslobj.shared_ciphers())
+ self.assertTrue(sslobj.getpeercert())
+ if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
+ self.assertTrue(sslobj.get_channel_binding('tls-unique'))
+ try:
+ self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
+ except ssl.SSLSyscallError:
+ # If the server shuts down the TCP connection without sending a
+ # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
+ pass
+ self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
+
+ def test_bio_read_write_data(self):
+ sock = socket.socket(socket.AF_INET)
+ self.addCleanup(sock.close)
+ sock.connect(self.server_addr)
+ incoming = ssl.MemoryBIO()
+ outgoing = ssl.MemoryBIO()
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.verify_mode = ssl.CERT_NONE
+ sslobj = ctx.wrap_bio(incoming, outgoing, False)
+ self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
+ req = b'FOO\n'
+ self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
+ buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
+ self.assertEqual(buf, b'foo\n')
+ self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
+
+
+class NetworkedTests(unittest.TestCase):
+
+ def test_timeout_connect_ex(self):
+ # Issue #12065: on a timeout, connect_ex() should return the original
+ # errno (mimicking the behaviour of non-SSL sockets).
with support.transient_internet(REMOTE_HOST):
- sock = socket.socket(socket.AF_INET)
- sock.connect((REMOTE_HOST, 443))
- incoming = ssl.MemoryBIO()
- outgoing = ssl.MemoryBIO()
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ s = ssl.wrap_socket(socket.socket(socket.AF_INET),
+ cert_reqs=ssl.CERT_REQUIRED,
+ do_handshake_on_connect=False)
+ self.addCleanup(s.close)
+ s.settimeout(0.0000001)
+ rc = s.connect_ex((REMOTE_HOST, 443))
+ if rc == 0:
+ self.skipTest("REMOTE_HOST responded too quickly")
+ self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
+
+ @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
+ def test_get_server_certificate_ipv6(self):
+ with support.transient_internet('ipv6.google.com'):
+ _test_get_server_certificate(self, 'ipv6.google.com', 443)
+ _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
+
+ def test_algorithms(self):
+ # Issue #8484: all algorithms should be available when verifying a
+ # certificate.
+ # SHA256 was added in OpenSSL 0.9.8
+ if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
+ self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
+ # sha256.tbs-internet.com needs SNI to use the correct certificate
+ if not ssl.HAS_SNI:
+ self.skipTest("SNI needed for this test")
+ # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
+ remote = ("sha256.tbs-internet.com", 443)
+ sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
+ with support.transient_internet("sha256.tbs-internet.com"):
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.verify_mode = ssl.CERT_REQUIRED
- ctx.load_verify_locations(REMOTE_ROOT_CERT)
- ctx.check_hostname = True
- sslobj = ctx.wrap_bio(incoming, outgoing, False, REMOTE_HOST)
- self.assertIs(sslobj._sslobj.owner, sslobj)
- self.assertIsNone(sslobj.cipher())
- self.assertIsNone(sslobj.shared_ciphers())
- self.assertRaises(ValueError, sslobj.getpeercert)
- if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
- self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
- self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
- self.assertTrue(sslobj.cipher())
- self.assertIsNone(sslobj.shared_ciphers())
- self.assertTrue(sslobj.getpeercert())
- if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
- self.assertTrue(sslobj.get_channel_binding('tls-unique'))
+ ctx.load_verify_locations(sha256_cert)
+ s = ctx.wrap_socket(socket.socket(socket.AF_INET),
+ server_hostname="sha256.tbs-internet.com")
try:
- self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
- except ssl.SSLSyscallError:
- # self-signed.pythontest.net probably shuts down the TCP
- # connection without sending a secure shutdown message, and
- # this is reported as SSL_ERROR_SYSCALL
- pass
- self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
- sock.close()
+ s.connect(remote)
+ if support.verbose:
+ sys.stdout.write("\nCipher with %r is %r\n" %
+ (remote, s.cipher()))
+ sys.stdout.write("Certificate is:\n%s\n" %
+ pprint.pformat(s.getpeercert()))
+ finally:
+ s.close()
- def test_read_write_data(self):
- with support.transient_internet(REMOTE_HOST):
- sock = socket.socket(socket.AF_INET)
- sock.connect((REMOTE_HOST, 443))
- incoming = ssl.MemoryBIO()
- outgoing = ssl.MemoryBIO()
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.verify_mode = ssl.CERT_NONE
- sslobj = ctx.wrap_bio(incoming, outgoing, False)
- self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
- req = b'GET / HTTP/1.0\r\n\r\n'
- self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
- buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
- self.assertEqual(buf[:5], b'HTTP/')
- self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
- sock.close()
+def _test_get_server_certificate(test, host, port, cert=None):
+ pem = ssl.get_server_certificate((host, port))
+ if not pem:
+ test.fail("No server certificate on %s:%s!" % (host, port))
+
+ pem = ssl.get_server_certificate((host, port), ca_certs=cert)
+ if not pem:
+ test.fail("No server certificate on %s:%s!" % (host, port))
+ if support.verbose:
+ sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
+
+def _test_get_server_certificate_fail(test, host, port):
+ try:
+ pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
+ except ssl.SSLError as x:
+ #should fail
+ if support.verbose:
+ sys.stdout.write("%s\n" % x)
+ else:
+ test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
-try:
- import threading
-except ImportError:
- _have_threads = False
-else:
- _have_threads = True
+if _have_threads:
from test.ssl_servers import make_https_server
class ThreadedEchoServer(threading.Thread):
@@ -1881,6 +1863,15 @@ else:
if not stripped:
# eof, so quit this handler
self.running = False
+ try:
+ self.sock = self.sslconn.unwrap()
+ except OSError:
+ # Many tests shut the TCP connection down
+ # without an SSL shutdown. This causes
+ # unwrap() to raise OSError with errno=0!
+ pass
+ else:
+ self.sslconn = None
self.close()
elif stripped == b'over':
if support.verbose and self.server.connectionchatty:
@@ -3360,18 +3351,20 @@ def test_main(verbose=False):
pass
for filename in [
- CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE,
+ CERTFILE, BYTES_CERTFILE,
ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
BADCERT, BADKEY, EMPTYCERT]:
if not os.path.exists(filename):
raise support.TestFailed("Can't read certificate file %r" % filename)
- tests = [ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests]
+ tests = [
+ ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
+ SimpleBackgroundTests,
+ ]
if support.is_resource_enabled('network'):
tests.append(NetworkedTests)
- tests.append(NetworkedBIOTests)
if _have_threads:
thread_info = support.threading_setup()