summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_ssl.py
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2007-08-26 19:35:09 (GMT)
committerGuido van Rossum <guido@python.org>2007-08-26 19:35:09 (GMT)
commite472933e2781a5e99dd2d30b72d2a9d69ccd37a0 (patch)
tree341d046c41cfce99bba99110dd3385f19a6faa9b /Lib/test/test_ssl.py
parent7fc8e2993ae341d789c2401c145f253f9dfdaff4 (diff)
downloadcpython-e472933e2781a5e99dd2d30b72d2a9d69ccd37a0.zip
cpython-e472933e2781a5e99dd2d30b72d2a9d69ccd37a0.tar.gz
cpython-e472933e2781a5e99dd2d30b72d2a9d69ccd37a0.tar.bz2
Bill Janssen wrote:
Here's a patch which makes test_ssl a better player in the buildbots environment. I deep-ended on "try-except-else" clauses.
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r--Lib/test/test_ssl.py121
1 files changed, 82 insertions, 39 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 4f9e47c..2d4e412 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -91,38 +91,66 @@ class ConnectedTests(unittest.TestCase):
def testTLSecho (self):
s1 = socket.socket()
- s1.connect(('127.0.0.1', 10024))
- c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
- indata = "FOO\n"
- c1.write(indata)
- outdata = c1.read()
- if outdata != indata.lower():
- sys.stderr.write("bad data <<%s>> received\n" % data)
- c1.close()
+ try:
+ s1.connect(('127.0.0.1', 10024))
+ except:
+ sys.stdout.write("connection failure:\n" + string.join(
+ traceback.format_exception(*sys.exc_info())))
+ raise test_support.TestFailed("Can't connect to test server")
+ else:
+ try:
+ c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
+ except:
+ sys.stdout.write("SSL handshake failure:\n" + string.join(
+ traceback.format_exception(*sys.exc_info())))
+ raise test_support.TestFailed("Can't SSL-handshake with test server")
+ else:
+ if not c1:
+ raise test_support.TestFailed("Can't SSL-handshake with test server")
+ indata = "FOO\n"
+ c1.write(indata)
+ outdata = c1.read()
+ if outdata != indata.lower():
+ raise test_support.TestFailed("bad data <<%s>> received; expected <<%s>>\n" % (data, indata.lower()))
+ c1.close()
def testReadCert(self):
s2 = socket.socket()
- s2.connect(('127.0.0.1', 10024))
- c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
- cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
- cert = c2.getpeercert()
- if not cert:
- raise test_support.TestFailed("Can't get peer certificate.")
- if not cert.has_key('subject'):
- raise test_support.TestFailed(
- "No subject field in certificate: %s." %
- pprint.pformat(cert))
- if not (cert['subject'].has_key('organizationName')):
- raise test_support.TestFailed(
- "No 'organizationName' field in certificate subject: %s." %
- pprint.pformat(cert))
- if (cert['subject']['organizationName'] !=
- "Python Software Foundation"):
- raise test_support.TestFailed(
- "Invalid 'organizationName' field in certificate subject; "
- "should be 'Python Software Foundation'.");
- c2.close()
+ try:
+ s2.connect(('127.0.0.1', 10024))
+ except:
+ sys.stdout.write("connection failure:\n" + string.join(
+ traceback.format_exception(*sys.exc_info())))
+ raise test_support.TestFailed("Can't connect to test server")
+ else:
+ try:
+ c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
+ cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
+ except:
+ sys.stdout.write("SSL handshake failure:\n" + string.join(
+ traceback.format_exception(*sys.exc_info())))
+ raise test_support.TestFailed("Can't SSL-handshake with test server")
+ else:
+ if not c2:
+ raise test_support.TestFailed("Can't SSL-handshake with test server")
+ cert = c2.getpeercert()
+ if not cert:
+ raise test_support.TestFailed("Can't get peer certificate.")
+ if not cert.has_key('subject'):
+ raise test_support.TestFailed(
+ "No subject field in certificate: %s." %
+ pprint.pformat(cert))
+ if not (cert['subject'].has_key('organizationName')):
+ raise test_support.TestFailed(
+ "No 'organizationName' field in certificate subject: %s." %
+ pprint.pformat(cert))
+ if (cert['subject']['organizationName'] !=
+ "Python Software Foundation"):
+ raise test_support.TestFailed(
+ "Invalid 'organizationName' field in certificate subject; "
+ "should be 'Python Software Foundation'.");
+ c2.close()
class threadedEchoServer(threading.Thread):
@@ -138,10 +166,22 @@ class threadedEchoServer(threading.Thread):
def run (self):
self.running = True
- sslconn = ssl.sslsocket(self.sock, server_side=True,
- certfile=self.server.certificate,
- ssl_version=self.server.protocol,
- cert_reqs=self.server.certreqs)
+ try:
+ sslconn = ssl.sslsocket(self.sock, server_side=True,
+ certfile=self.server.certificate,
+ ssl_version=self.server.protocol,
+ cert_reqs=self.server.certreqs)
+ except:
+ # here, we want to stop the server, because this shouldn't
+ # happen in the context of our test case
+ sys.stdout.write("Test server failure:\n" + string.join(
+ traceback.format_exception(*sys.exc_info())))
+ self.running = False
+ # normally, we'd just stop here, but for the test
+ # harness, we want to stop the server
+ self.server.stop()
+ return
+
while self.running:
try:
msg = sslconn.read()
@@ -154,15 +194,18 @@ class threadedEchoServer(threading.Thread):
self.server.stop()
self.running = False
else:
- # print "server:", msg.strip().lower()
+ sys.stdout.write("\nserver: %s\n" % msg.strip().lower())
sslconn.write(msg.lower())
except ssl.sslerror:
- sys.stderr.write(string.join(
+ sys.stdout.write("Test server failure:\n" + string.join(
traceback.format_exception(*sys.exc_info())))
sslconn.close()
self.running = False
+ # normally, we'd just stop here, but for the test
+ # harness, we want to stop the server
+ self.server.stop()
except:
- sys.stderr.write(string.join(
+ sys.stdout.write(string.join(
traceback.format_exception(*sys.exc_info())))
def __init__(self, port, certificate, ssl_version=None,
@@ -192,20 +235,20 @@ class threadedEchoServer(threading.Thread):
while self.active:
try:
newconn, connaddr = self.sock.accept()
- # sys.stderr.write('new connection from ' + str(connaddr))
+ sys.stdout.write('\nserver: new connection from ' + str(connaddr) + '\n')
handler = self.connectionHandler(self, newconn)
handler.start()
except socket.timeout:
pass
except KeyboardInterrupt:
- self.active = False
+ self.stop()
except:
- sys.stderr.write(string.join(
+ sys.stdout.write("Test server failure:\n" + string.join(
traceback.format_exception(*sys.exc_info())))
def stop (self):
self.active = False
-
+ self.sock.close()
CERTFILE_CONFIG_TEMPLATE = """
# create RSA certs - Server