diff options
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r-- | Lib/test/test_ssl.py | 170 |
1 files changed, 85 insertions, 85 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 59bf57d..4f884f0 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -2,7 +2,7 @@ import sys import unittest -from test import test_support +from test import support import socket import select import errno @@ -25,27 +25,27 @@ try: except ImportError: skip_expected = True -HOST = test_support.HOST +HOST = support.HOST CERTFILE = None SVN_PYTHON_ORG_ROOT_CERT = None def handle_error(prefix): exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) - if test_support.verbose: + if support.verbose: sys.stdout.write(prefix + exc_format) class BasicTests(unittest.TestCase): def testSSLconnect(self): - if not test_support.is_resource_enabled('network'): + if not support.is_resource_enabled('network'): return s = ssl.wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_NONE) s.connect(("svn.python.org", 443)) c = s.getpeercert() if c: - raise test_support.TestFailed("Peer cert %s shouldn't be here!") + raise support.TestFailed("Peer cert %s shouldn't be here!") s.close() # this should fail because we have no verification certs @@ -69,7 +69,7 @@ class BasicTests(unittest.TestCase): def testRAND(self): v = ssl.RAND_status() - if test_support.verbose: + if support.verbose: sys.stdout.write("\n RAND_status is %d (%s)\n" % (v, (v and "sufficient randomness") or "insufficient randomness")) @@ -86,7 +86,7 @@ class BasicTests(unittest.TestCase): # provided solely for this test, to exercise the certificate # parsing code p = ssl._ssl._test_decode_cert(CERTFILE, False) - if test_support.verbose: + if support.verbose: sys.stdout.write("\n" + pprint.pformat(p) + "\n") def testDERtoPEM(self): @@ -96,7 +96,7 @@ class BasicTests(unittest.TestCase): p2 = ssl.DER_cert_to_PEM_cert(d1) d2 = ssl.PEM_cert_to_DER_cert(p2) if (d1 != d2): - raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed") + raise support.TestFailed("PEM-to-DER or DER-to-PEM translation failed") class NetworkedTests(unittest.TestCase): @@ -106,7 +106,7 @@ class NetworkedTests(unittest.TestCase): s.connect(("svn.python.org", 443)) c = s.getpeercert() if c: - raise test_support.TestFailed("Peer cert %s shouldn't be here!") + raise support.TestFailed("Peer cert %s shouldn't be here!") s.close() # this should fail because we have no verification certs @@ -126,7 +126,7 @@ class NetworkedTests(unittest.TestCase): try: s.connect(("svn.python.org", 443)) except ssl.SSLError as x: - raise test_support.TestFailed("Unexpected exception %s" % x) + raise support.TestFailed("Unexpected exception %s" % x) finally: s.close() @@ -151,14 +151,14 @@ class NetworkedTests(unittest.TestCase): else: raise s.close() - if test_support.verbose: + if support.verbose: sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) def testFetchServerCert(self): pem = ssl.get_server_certificate(("svn.python.org", 443)) if not pem: - raise test_support.TestFailed("No server certificate on svn.python.org:443!") + raise support.TestFailed("No server certificate on svn.python.org:443!") return @@ -166,15 +166,15 @@ class NetworkedTests(unittest.TestCase): pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE) except ssl.SSLError as x: #should fail - if test_support.verbose: + if support.verbose: sys.stdout.write("%s\n" % x) else: - raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem) + raise support.TestFailed("Got server certificate %s for svn.python.org!" % pem) pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT) if not pem: - raise test_support.TestFailed("No server certificate on svn.python.org:443!") - if test_support.verbose: + raise support.TestFailed("No server certificate on svn.python.org:443!") + if support.verbose: sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) @@ -227,13 +227,13 @@ else: else: if self.server.certreqs == ssl.CERT_REQUIRED: cert = self.sslconn.getpeercert() - if test_support.verbose and self.server.chatty: + if support.verbose and self.server.chatty: sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") cert_binary = self.sslconn.getpeercert(True) - if test_support.verbose and self.server.chatty: + if support.verbose and self.server.chatty: sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") cipher = self.sslconn.cipher() - if test_support.verbose and self.server.chatty: + if support.verbose and self.server.chatty: sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") return True @@ -269,19 +269,19 @@ else: self.running = False self.close() elif amsg.strip() == 'over': - if test_support.verbose and self.server.connectionchatty: + if support.verbose and self.server.connectionchatty: sys.stdout.write(" server: client closed connection\n") self.close() return elif (self.server.starttls_server and amsg.strip() == 'STARTTLS'): - if test_support.verbose and self.server.connectionchatty: + if support.verbose and self.server.connectionchatty: sys.stdout.write(" server: read STARTTLS from client, sending OK...\n") self.write("OK\n".encode("ASCII", "strict")) if not self.wrap_conn(): return else: - if (test_support.verbose and + if (support.verbose and self.server.connectionchatty): ctype = (self.sslconn and "encrypted") or "unencrypted" sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n" @@ -314,7 +314,7 @@ else: self.connectionchatty = connectionchatty self.starttls_server = starttls_server self.sock = socket.socket() - self.port = test_support.bind_port(self.sock) + self.port = support.bind_port(self.sock) self.flag = None self.active = False threading.Thread.__init__(self) @@ -334,7 +334,7 @@ else: while self.active: try: newconn, connaddr = self.sock.accept() - if test_support.verbose and self.chatty: + if support.verbose and self.chatty: sys.stdout.write(' server: new connection from ' + repr(connaddr) + '\n') handler = self.ConnectionHandler(self, newconn, connaddr) @@ -457,7 +457,7 @@ else: # we override this to suppress logging unless "verbose" - if test_support.verbose: + if support.verbose: sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" % (self.server.server_address, self.server.server_port, @@ -470,7 +470,7 @@ else: self.flag = None self.active = False self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0] - self.port = test_support.find_unused_port() + self.port = support.find_unused_port() self.server = self.HTTPSServer( (HOST, self.port), self.RootedHTTPRequestHandler, certfile) threading.Thread.__init__(self) @@ -522,7 +522,7 @@ else: def handle_read(self): data = self.recv(1024) - if test_support.verbose: + if support.verbose: sys.stdout.write(" server: read %s from client\n" % repr(data)) if not data: self.close() @@ -530,7 +530,7 @@ else: self.send(str(data, 'ASCII', 'strict').lower().encode('ASCII', 'strict')) def handle_close(self): - if test_support.verbose: + if support.verbose: sys.stdout.write(" server: closed connection %s\n" % self.socket) def handle_error(self): @@ -546,7 +546,7 @@ else: def handle_accept(self): sock_obj, addr = self.accept() - if test_support.verbose: + if support.verbose: sys.stdout.write(" server: new connection from %s:%s\n" %addr) self.ConnectionHandler(sock_obj, self.certfile) @@ -556,7 +556,7 @@ else: def __init__(self, certfile): self.flag = None self.active = False - self.port = test_support.find_unused_port() + self.port = support.find_unused_port() self.server = self.EchoServer(self.port, certfile) threading.Thread.__init__(self) self.setDaemon(True) @@ -599,10 +599,10 @@ else: ssl_version=ssl.PROTOCOL_TLSv1) s.connect((HOST, server.port)) except ssl.SSLError as x: - if test_support.verbose: + if support.verbose: sys.stdout.write("\nSSLError is %s\n" % x) else: - raise test_support.TestFailed( + raise support.TestFailed( "Use of invalid cert should have failed!") finally: server.stop() @@ -635,28 +635,28 @@ else: ssl_version=client_protocol) s.connect((HOST, server.port)) except ssl.SSLError as x: - raise test_support.TestFailed("Unexpected SSL error: " + str(x)) + raise support.TestFailed("Unexpected SSL error: " + str(x)) except Exception as x: - raise test_support.TestFailed("Unexpected exception: " + str(x)) + raise support.TestFailed("Unexpected exception: " + str(x)) else: if connectionchatty: - if test_support.verbose: + if support.verbose: sys.stdout.write( " client: sending %s...\n" % (repr(indata))) s.write(indata.encode('ASCII', 'strict')) outdata = s.read() if connectionchatty: - if test_support.verbose: + if support.verbose: sys.stdout.write(" client: read %s\n" % repr(outdata)) outdata = str(outdata, 'ASCII', 'strict') if outdata != indata.lower(): - raise test_support.TestFailed( + raise support.TestFailed( "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" % (repr(outdata[:min(len(outdata),20)]), len(outdata), repr(indata[:min(len(indata),20)].lower()), len(indata))) s.write("over\n".encode("ASCII", "strict")) if connectionchatty: - if test_support.verbose: + if support.verbose: sys.stdout.write(" client: closing connection.\n") s.close() finally: @@ -677,7 +677,7 @@ else: certtype = "CERT_OPTIONAL" elif certsreqs == ssl.CERT_REQUIRED: certtype = "CERT_REQUIRED" - if test_support.verbose: + if support.verbose: formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n" sys.stdout.write(formatstr % (ssl.get_protocol_name(client_protocol), @@ -687,12 +687,12 @@ else: serverParamsTest(CERTFILE, server_protocol, certsreqs, CERTFILE, CERTFILE, client_protocol, chatty=False, connectionchatty=False) - except test_support.TestFailed: + except support.TestFailed: if expectedToWork: raise else: if not expectedToWork: - raise test_support.TestFailed( + raise support.TestFailed( "Client protocol %s succeeded with server protocol %s!" % (ssl.get_protocol_name(client_protocol), ssl.get_protocol_name(server_protocol))) @@ -702,7 +702,7 @@ else: def testEcho (self): - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE, CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1, @@ -710,7 +710,7 @@ else: def testReadCert(self): - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") s2 = socket.socket() server = ThreadedEchoServer(CERTFILE, @@ -732,30 +732,30 @@ else: ssl_version=ssl.PROTOCOL_SSLv23) s.connect((HOST, server.port)) except ssl.SSLError as x: - raise test_support.TestFailed( + raise support.TestFailed( "Unexpected SSL error: " + str(x)) except Exception as x: - raise test_support.TestFailed( + raise support.TestFailed( "Unexpected exception: " + str(x)) else: if not s: - raise test_support.TestFailed( + raise support.TestFailed( "Can't SSL-handshake with test server") cert = s.getpeercert() if not cert: - raise test_support.TestFailed( + raise support.TestFailed( "Can't get peer certificate.") cipher = s.cipher() - if test_support.verbose: + if support.verbose: sys.stdout.write(pprint.pformat(cert) + '\n') sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') if 'subject' not in cert: - raise test_support.TestFailed( + raise support.TestFailed( "No subject field in certificate: %s." % pprint.pformat(cert)) if ((('organizationName', 'Python Software Foundation'),) not in cert['subject']): - raise test_support.TestFailed( + raise support.TestFailed( "Missing or invalid 'organizationName' field in certificate subject; " "should be 'Python Software Foundation'.") s.close() @@ -777,7 +777,7 @@ else: listener_ready = threading.Event() listener_gone = threading.Event() - port = test_support.find_unused_port() + port = support.find_unused_port() # `listener` runs in a thread. It opens a socket listening on # PORT, and sits in an accept() until the main thread connects. @@ -802,7 +802,7 @@ else: except IOError: pass else: - raise test_support.TestFailed( + raise support.TestFailed( 'connecting to closed SSL socket should have failed') t = threading.Thread(target=listener) @@ -811,7 +811,7 @@ else: t.join() def testProtocolSSL2(self): - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) @@ -821,13 +821,13 @@ else: tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) def testProtocolSSL23(self): - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") try: tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) - except test_support.TestFailed as x: + except support.TestFailed as x: # this fails on some older versions of OpenSSL (0.9.7l, for instance) - if test_support.verbose: + if support.verbose: sys.stdout.write( " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n" % str(x)) @@ -844,7 +844,7 @@ else: tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) def testProtocolSSL3(self): - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True) tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) @@ -854,7 +854,7 @@ else: tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) def testProtocolTLS1(self): - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) @@ -884,13 +884,13 @@ else: s.setblocking(1) s.connect((HOST, server.port)) except Exception as x: - raise test_support.TestFailed("Unexpected exception: " + str(x)) + raise support.TestFailed("Unexpected exception: " + str(x)) else: - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") for indata in msgs: msg = indata.encode('ASCII', 'replace') - if test_support.verbose: + if support.verbose: sys.stdout.write( " client: sending %s...\n" % repr(msg)) if wrapped: @@ -901,7 +901,7 @@ else: outdata = s.recv(1024) if (indata == "STARTTLS" and str(outdata, 'ASCII', 'replace').strip().lower().startswith("ok")): - if test_support.verbose: + if support.verbose: msg = str(outdata, 'ASCII', 'replace') sys.stdout.write( " client: read %s from server, starting TLS...\n" @@ -910,11 +910,11 @@ else: wrapped = True else: - if test_support.verbose: + if support.verbose: msg = str(outdata, 'ASCII', 'replace') sys.stdout.write( " client: read %s from server\n" % repr(msg)) - if test_support.verbose: + if support.verbose: sys.stdout.write(" client: closing connection.\n") if wrapped: conn.write("over\n".encode("ASCII", "strict")) @@ -937,7 +937,7 @@ else: flag.wait() # try to connect try: - if test_support.verbose: + if support.verbose: sys.stdout.write('\n') d1 = open(CERTFILE, 'rb').read() d2 = '' @@ -948,33 +948,33 @@ else: dlen = f.info().getheader("content-length") if dlen and (int(dlen) > 0): d2 = f.read(int(dlen)) - if test_support.verbose: + if support.verbose: sys.stdout.write( " client: read %d bytes from remote server '%s'\n" % (len(d2), server)) f.close() except: msg = ''.join(traceback.format_exception(*sys.exc_info())) - if test_support.verbose: + if support.verbose: sys.stdout.write('\n' + msg) - raise test_support.TestFailed(msg) + raise support.TestFailed(msg) else: if not (d1 == d2): print("d1 is", len(d1), repr(d1)) print("d2 is", len(d2), repr(d2)) - raise test_support.TestFailed( + raise support.TestFailed( "Couldn't fetch data from HTTPS server") finally: - if test_support.verbose: + if support.verbose: sys.stdout.write('stopping server\n') server.stop() - if test_support.verbose: + if support.verbose: sys.stdout.write('joining thread\n') server.join() def testAsyncoreServer(self): - if test_support.verbose: + if support.verbose: sys.stdout.write("\n") indata="FOO\n" @@ -988,25 +988,25 @@ else: s = ssl.wrap_socket(socket.socket()) s.connect((HOST, server.port)) except ssl.SSLError as x: - raise test_support.TestFailed("Unexpected SSL error: " + str(x)) + raise support.TestFailed("Unexpected SSL error: " + str(x)) except Exception as x: - raise test_support.TestFailed("Unexpected exception: " + str(x)) + raise support.TestFailed("Unexpected exception: " + str(x)) else: - if test_support.verbose: + if support.verbose: sys.stdout.write( " client: sending %s...\n" % (repr(indata))) s.sendall(indata.encode('ASCII', 'strict')) outdata = s.recv() - if test_support.verbose: + if support.verbose: sys.stdout.write(" client: read %s\n" % repr(outdata)) outdata = str(outdata, 'ASCII', 'strict') if outdata != indata.lower(): - raise test_support.TestFailed( + raise support.TestFailed( "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" % (repr(outdata[:min(len(outdata),20)]), len(outdata), repr(indata[:min(len(indata),20)].lower()), len(indata))) s.write("over\n".encode("ASCII", "strict")) - if test_support.verbose: + if support.verbose: sys.stdout.write(" client: closing connection.\n") s.close() finally: @@ -1015,7 +1015,7 @@ else: def test_main(verbose=False): if skip_expected: - raise test_support.TestSkipped("No SSL support") + raise support.TestSkipped("No SSL support") global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, @@ -1026,22 +1026,22 @@ def test_main(verbose=False): if (not os.path.exists(CERTFILE) or not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)): - raise test_support.TestFailed("Can't read certificate files!") + raise support.TestFailed("Can't read certificate files!") tests = [BasicTests] - if test_support.is_resource_enabled('network'): + if support.is_resource_enabled('network'): tests.append(NetworkedTests) if _have_threads: - thread_info = test_support.threading_setup() - if thread_info and test_support.is_resource_enabled('network'): + thread_info = support.threading_setup() + if thread_info and support.is_resource_enabled('network'): tests.append(ThreadedTests) - test_support.run_unittest(*tests) + support.run_unittest(*tests) if _have_threads: - test_support.threading_cleanup(*thread_info) + support.threading_cleanup(*thread_info) if __name__ == "__main__": test_main() |