summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2010-04-27 10:32:58 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2010-04-27 10:32:58 (GMT)
commitdb187847fbe60c1dde0e26e25cf52a81c581b55d (patch)
tree75cca8c83dcaaf778fdb849ebcd81f17742be4e0
parent435ba0cfb82ecadca693eaedd2a2806b8766afb1 (diff)
downloadcpython-db187847fbe60c1dde0e26e25cf52a81c581b55d.zip
cpython-db187847fbe60c1dde0e26e25cf52a81c581b55d.tar.gz
cpython-db187847fbe60c1dde0e26e25cf52a81c581b55d.tar.bz2
Qualify or remove or bare excepts. Simplify exception handling in places.
Remove uses of test_support.TestFailed.
-rw-r--r--Lib/test/test_ssl.py329
1 files changed, 136 insertions, 193 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 435c198..7c13db4 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -62,7 +62,7 @@ class BasicTests(unittest.TestCase):
s.connect(("svn.python.org", 443))
c = s.getpeercert()
if c:
- raise test_support.TestFailed("Peer cert %s shouldn't be here!")
+ self.fail("Peer cert %s shouldn't be here!")
s.close()
# this should fail because we have no verification certs
@@ -112,8 +112,7 @@ class BasicTests(unittest.TestCase):
d1 = ssl.PEM_cert_to_DER_cert(pem)
p2 = ssl.DER_cert_to_PEM_cert(d1)
d2 = ssl.PEM_cert_to_DER_cert(p2)
- if (d1 != d2):
- raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
+ self.assertEqual(d1, d2)
def test_openssl_version(self):
n = ssl.OPENSSL_VERSION_NUMBER
@@ -178,7 +177,7 @@ class NetworkedTests(unittest.TestCase):
s.connect(("svn.python.org", 443))
c = s.getpeercert()
if c:
- raise test_support.TestFailed("Peer cert %s shouldn't be here!")
+ self.fail("Peer cert %s shouldn't be here!")
s.close()
# this should fail because we have no verification certs
@@ -197,8 +196,6 @@ class NetworkedTests(unittest.TestCase):
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
try:
s.connect(("svn.python.org", 443))
- except ssl.SSLError, x:
- raise test_support.TestFailed("Unexpected exception %s" % x)
finally:
s.close()
@@ -249,7 +246,7 @@ class NetworkedTests(unittest.TestCase):
pem = ssl.get_server_certificate(("svn.python.org", 443))
if not pem:
- raise test_support.TestFailed("No server certificate on svn.python.org:443!")
+ self.fail("No server certificate on svn.python.org:443!")
try:
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
@@ -257,11 +254,11 @@ class NetworkedTests(unittest.TestCase):
#should fail
pass
else:
- raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
+ self.fail("Got server certificate %s for svn.python.org!" % pem)
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
if not pem:
- raise test_support.TestFailed("No server certificate on svn.python.org:443!")
+ self.fail("No server certificate on svn.python.org:443!")
if test_support.verbose:
sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
@@ -334,20 +331,17 @@ else:
ca_certs=self.server.cacerts,
cert_reqs=self.server.certreqs,
ciphers=self.server.ciphers)
- except:
+ except ssl.SSLError:
+ # XXX Various errors can have happened here, for example
+ # a mismatching protocol version, an invalid certificate,
+ # or a low-level bug. This should be made more discriminating.
if self.server.chatty:
handle_error("\n server: bad connection attempt from " +
str(self.sock.getpeername()) + ":\n")
self.close()
- if not self.server.expect_bad_connects:
- # here, we want to stop the server, because this shouldn't
- # happen in the context of our test case
- self.running = False
- # normally, we'd just stop here, but for the test
- # harness, we want to stop the server
- self.server.stop()
+ self.running = False
+ self.server.stop()
return False
-
else:
return True
@@ -418,11 +412,9 @@ else:
# normally, we'd just stop here, but for the test
# harness, we want to stop the server
self.server.stop()
- except:
- handle_error('')
def __init__(self, certificate, ssl_version=None,
- certreqs=None, cacerts=None, expect_bad_connects=False,
+ certreqs=None, cacerts=None,
chatty=True, connectionchatty=False, starttls_server=False,
wrap_accepting_socket=False, ciphers=None):
@@ -435,7 +427,6 @@ else:
self.certreqs = certreqs
self.cacerts = cacerts
self.ciphers = ciphers
- self.expect_bad_connects = expect_bad_connects
self.chatty = chatty
self.connectionchatty = connectionchatty
self.starttls_server = starttls_server
@@ -478,9 +469,6 @@ else:
pass
except KeyboardInterrupt:
self.stop()
- except:
- if self.chatty:
- handle_error("Test server failure:\n")
self.sock.close()
def stop (self):
@@ -526,7 +514,8 @@ else:
self._do_ssl_handshake()
else:
data = self.recv(1024)
- self.send(data.lower())
+ if data and data.strip() != 'over':
+ self.send(data.lower())
def handle_close(self):
self.close()
@@ -572,10 +561,7 @@ else:
if self.flag:
self.flag.set()
while self.active:
- try:
- asyncore.loop(1)
- except:
- pass
+ asyncore.loop(0.05)
def stop (self):
self.active = False
@@ -698,12 +684,8 @@ else:
except ssl.SSLError, x:
if test_support.verbose:
sys.stdout.write("\nSSLError is %s\n" % x[1])
- except socket.error, x:
- if test_support.verbose:
- sys.stdout.write("\nsocket.error is %s\n" % x[1])
else:
- raise test_support.TestFailed(
- "Use of invalid cert should have failed!")
+ self.fail("Use of invalid cert should have failed!")
finally:
server.stop()
server.join()
@@ -729,39 +711,33 @@ else:
if client_protocol is None:
client_protocol = protocol
try:
- try:
- s = ssl.wrap_socket(socket.socket(),
- certfile=client_certfile,
- ca_certs=cacertsfile,
- ciphers=ciphers,
- cert_reqs=certreqs,
- ssl_version=client_protocol)
- s.connect((HOST, server.port))
- except ssl.SSLError, x:
- raise test_support.TestFailed("Unexpected SSL error: " + str(x))
- except Exception, x:
- raise test_support.TestFailed("Unexpected exception: " + str(x))
- else:
- for arg in [indata, bytearray(indata), memoryview(indata)]:
- if connectionchatty:
- if test_support.verbose:
- sys.stdout.write(
- " client: sending %s...\n" % (repr(arg)))
- s.write(arg)
- outdata = s.read()
- if connectionchatty:
- if test_support.verbose:
- sys.stdout.write(" client: read %s\n" % repr(outdata))
- if outdata != indata.lower():
- raise test_support.TestFailed(
- "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
- % (outdata[:min(len(outdata),20)], len(outdata),
- indata[:min(len(indata),20)].lower(), len(indata)))
- s.write("over\n")
+ s = ssl.wrap_socket(socket.socket(),
+ certfile=client_certfile,
+ ca_certs=cacertsfile,
+ ciphers=ciphers,
+ cert_reqs=certreqs,
+ ssl_version=client_protocol)
+ s.connect((HOST, server.port))
+ for arg in [indata, bytearray(indata), memoryview(indata)]:
if connectionchatty:
if test_support.verbose:
- sys.stdout.write(" client: closing connection.\n")
- s.close()
+ sys.stdout.write(
+ " client: sending %s...\n" % (repr(arg)))
+ s.write(arg)
+ outdata = s.read()
+ if connectionchatty:
+ if test_support.verbose:
+ sys.stdout.write(" client: read %s\n" % repr(outdata))
+ if outdata != indata.lower():
+ self.fail(
+ "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
+ % (outdata[:min(len(outdata),20)], len(outdata),
+ indata[:min(len(indata),20)].lower(), len(indata)))
+ s.write("over\n")
+ if connectionchatty:
+ if test_support.verbose:
+ sys.stdout.write(" client: closing connection.\n")
+ s.close()
finally:
server.stop()
server.join()
@@ -793,12 +769,17 @@ else:
serverParamsTest(CERTFILE, server_protocol, certsreqs,
CERTFILE, CERTFILE, client_protocol,
ciphers="ALL", chatty=False)
- except test_support.TestFailed:
+ # Protocol mismatch can result in either an SSLError, or a
+ # "Connection reset by peer" error.
+ except ssl.SSLError:
if expectedToWork:
raise
+ except socket.error as e:
+ if expectedToWork or e.errno != errno.ECONNRESET:
+ raise
else:
if not expectedToWork:
- raise test_support.TestFailed(
+ self.fail(
"Client protocol %s succeeded with server protocol %s!"
% (ssl.get_protocol_name(client_protocol),
ssl.get_protocol_name(server_protocol)))
@@ -835,8 +816,7 @@ else:
except IOError:
pass
else:
- raise test_support.TestFailed(
- 'connecting to closed SSL socket should have failed')
+ self.fail('connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener)
t.start()
@@ -869,41 +849,27 @@ else:
flag.wait()
# try to connect
try:
- try:
- s = ssl.wrap_socket(socket.socket(),
- certfile=CERTFILE,
- ca_certs=CERTFILE,
- cert_reqs=ssl.CERT_REQUIRED,
- ssl_version=ssl.PROTOCOL_SSLv23)
- s.connect((HOST, server.port))
- except ssl.SSLError, x:
- raise test_support.TestFailed(
- "Unexpected SSL error: " + str(x))
- except Exception, x:
- raise test_support.TestFailed(
- "Unexpected exception: " + str(x))
- else:
- if not s:
- raise test_support.TestFailed(
- "Can't SSL-handshake with test server")
- cert = s.getpeercert()
- if not cert:
- raise test_support.TestFailed(
- "Can't get peer certificate.")
- cipher = s.cipher()
- if test_support.verbose:
- sys.stdout.write(pprint.pformat(cert) + '\n')
- sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
- if 'subject' not in cert:
- raise test_support.TestFailed(
- "No subject field in certificate: %s." %
- pprint.pformat(cert))
- if ((('organizationName', 'Python Software Foundation'),)
- not in cert['subject']):
- raise test_support.TestFailed(
- "Missing or invalid 'organizationName' field in certificate subject; "
- "should be 'Python Software Foundation'.")
- s.close()
+ s = ssl.wrap_socket(socket.socket(),
+ certfile=CERTFILE,
+ ca_certs=CERTFILE,
+ cert_reqs=ssl.CERT_REQUIRED,
+ ssl_version=ssl.PROTOCOL_SSLv23)
+ s.connect((HOST, server.port))
+ cert = s.getpeercert()
+ self.assertTrue(cert, "Can't get peer certificate.")
+ cipher = s.cipher()
+ if test_support.verbose:
+ sys.stdout.write(pprint.pformat(cert) + '\n')
+ sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
+ if 'subject' not in cert:
+ self.fail("No subject field in certificate: %s." %
+ pprint.pformat(cert))
+ if ((('organizationName', 'Python Software Foundation'),)
+ not in cert['subject']):
+ self.fail(
+ "Missing or invalid 'organizationName' field in certificate subject; "
+ "should be 'Python Software Foundation'.")
+ s.close()
finally:
server.stop()
server.join()
@@ -936,7 +902,7 @@ else:
sys.stdout.write("\n")
try:
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
- except test_support.TestFailed, x:
+ except (SSLError, socket.error), x:
# this fails on some older versions of OpenSSL (0.9.7l, for instance)
if test_support.verbose:
sys.stdout.write(
@@ -990,52 +956,48 @@ else:
# try to connect
wrapped = False
try:
- try:
- s = socket.socket()
- s.setblocking(1)
- s.connect((HOST, server.port))
- except Exception, x:
- raise test_support.TestFailed("Unexpected exception: " + str(x))
- else:
+ s = socket.socket()
+ s.setblocking(1)
+ s.connect((HOST, server.port))
+ if test_support.verbose:
+ sys.stdout.write("\n")
+ for indata in msgs:
if test_support.verbose:
- sys.stdout.write("\n")
- for indata in msgs:
+ sys.stdout.write(
+ " client: sending %s...\n" % repr(indata))
+ if wrapped:
+ conn.write(indata)
+ outdata = conn.read()
+ else:
+ s.send(indata)
+ outdata = s.recv(1024)
+ if (indata == "STARTTLS" and
+ outdata.strip().lower().startswith("ok")):
if test_support.verbose:
sys.stdout.write(
- " client: sending %s...\n" % repr(indata))
- if wrapped:
- conn.write(indata)
- outdata = conn.read()
- else:
- s.send(indata)
- outdata = s.recv(1024)
- if (indata == "STARTTLS" and
- outdata.strip().lower().startswith("ok")):
- if test_support.verbose:
- sys.stdout.write(
- " client: read %s from server, starting TLS...\n"
- % repr(outdata))
- conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
- wrapped = True
- elif (indata == "ENDTLS" and
- outdata.strip().lower().startswith("ok")):
- if test_support.verbose:
- sys.stdout.write(
- " client: read %s from server, ending TLS...\n"
- % repr(outdata))
- s = conn.unwrap()
- wrapped = False
- else:
- if test_support.verbose:
- sys.stdout.write(
- " client: read %s from server\n" % repr(outdata))
- if test_support.verbose:
- sys.stdout.write(" client: closing connection.\n")
- if wrapped:
- conn.write("over\n")
+ " client: read %s from server, starting TLS...\n"
+ % repr(outdata))
+ conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
+ wrapped = True
+ elif (indata == "ENDTLS" and
+ outdata.strip().lower().startswith("ok")):
+ if test_support.verbose:
+ sys.stdout.write(
+ " client: read %s from server, ending TLS...\n"
+ % repr(outdata))
+ s = conn.unwrap()
+ wrapped = False
else:
- s.send("over\n")
- s.close()
+ if test_support.verbose:
+ sys.stdout.write(
+ " client: read %s from server\n" % repr(outdata))
+ if test_support.verbose:
+ sys.stdout.write(" client: closing connection.\n")
+ if wrapped:
+ conn.write("over\n")
+ else:
+ s.send("over\n")
+ s.close()
finally:
server.stop()
server.join()
@@ -1066,15 +1028,7 @@ else:
" client: read %d bytes from remote server '%s'\n"
% (len(d2), server))
f.close()
- except:
- msg = ''.join(traceback.format_exception(*sys.exc_info()))
- if test_support.verbose:
- sys.stdout.write('\n' + msg)
- raise test_support.TestFailed(msg)
- else:
- if not (d1 == d2):
- raise test_support.TestFailed(
- "Couldn't fetch data from HTTPS server")
+ self.assertEqual(d1, d2)
finally:
server.stop()
server.join()
@@ -1102,30 +1056,24 @@ else:
flag.wait()
# try to connect
try:
- try:
- s = ssl.wrap_socket(socket.socket())
- s.connect(('127.0.0.1', server.port))
- except ssl.SSLError, x:
- raise test_support.TestFailed("Unexpected SSL error: " + str(x))
- except Exception, x:
- raise test_support.TestFailed("Unexpected exception: " + str(x))
- else:
- if test_support.verbose:
- sys.stdout.write(
- " client: sending %s...\n" % (repr(indata)))
- s.write(indata)
- outdata = s.read()
- if test_support.verbose:
- sys.stdout.write(" client: read %s\n" % repr(outdata))
- if outdata != indata.lower():
- raise test_support.TestFailed(
- "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
- % (outdata[:min(len(outdata),20)], len(outdata),
- indata[:min(len(indata),20)].lower(), len(indata)))
- s.write("over\n")
- if test_support.verbose:
- sys.stdout.write(" client: closing connection.\n")
- s.close()
+ s = ssl.wrap_socket(socket.socket())
+ s.connect(('127.0.0.1', server.port))
+ if test_support.verbose:
+ sys.stdout.write(
+ " client: sending %s...\n" % (repr(indata)))
+ s.write(indata)
+ outdata = s.read()
+ if test_support.verbose:
+ sys.stdout.write(" client: read %s\n" % repr(outdata))
+ if outdata != indata.lower():
+ self.fail(
+ "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
+ % (outdata[:min(len(outdata),20)], len(outdata),
+ indata[:min(len(indata),20)].lower(), len(indata)))
+ s.write("over\n")
+ if test_support.verbose:
+ sys.stdout.write(" client: closing connection.\n")
+ s.close()
finally:
server.stop()
# wait for server thread to end
@@ -1148,19 +1096,14 @@ else:
# wait for it to start
flag.wait()
# try to connect
+ s = ssl.wrap_socket(socket.socket(),
+ server_side=False,
+ certfile=CERTFILE,
+ ca_certs=CERTFILE,
+ cert_reqs=ssl.CERT_NONE,
+ ssl_version=ssl.PROTOCOL_TLSv1)
+ s.connect((HOST, server.port))
try:
- s = ssl.wrap_socket(socket.socket(),
- server_side=False,
- certfile=CERTFILE,
- ca_certs=CERTFILE,
- cert_reqs=ssl.CERT_NONE,
- ssl_version=ssl.PROTOCOL_TLSv1)
- s.connect((HOST, server.port))
- except ssl.SSLError as x:
- self.fail("Unexpected SSL error: " + str(x))
- except Exception as x:
- self.fail("Unexpected exception: " + str(x))
- else:
# helper methods for standardising recv* method signatures
def _recv_into():
b = bytearray("\0"*100)