diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2010-04-28 21:11:01 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2010-04-28 21:11:01 (GMT) |
commit | 3945c867d60b1d53299799dd09b6753dcd0a9546 (patch) | |
tree | 98390bbc725bc526ec6951a1113121b938c791b1 /Lib/test/test_ssl.py | |
parent | 689405ee14b5f33d4717b06d258e8dd4f5332f72 (diff) | |
download | cpython-3945c867d60b1d53299799dd09b6753dcd0a9546.zip cpython-3945c867d60b1d53299799dd09b6753dcd0a9546.tar.gz cpython-3945c867d60b1d53299799dd09b6753dcd0a9546.tar.bz2 |
Fix style issues in test_ssl
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r-- | Lib/test/test_ssl.py | 281 |
1 files changed, 143 insertions, 138 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index feb6f50..c7cd388 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -37,7 +37,7 @@ def handle_error(prefix): class BasicTests(unittest.TestCase): - def testSimpleSSLwrap(self): + def test_sslwrap_simple(self): # A crude test for the legacy API try: ssl.sslwrap_simple(socket.socket(socket.AF_INET)) @@ -54,7 +54,7 @@ class BasicTests(unittest.TestCase): else: raise - def testSSLconnect(self): + def test_connect(self): if not test_support.is_resource_enabled('network'): return s = ssl.wrap_socket(socket.socket(socket.AF_INET), @@ -75,7 +75,7 @@ class BasicTests(unittest.TestCase): finally: s.close() - def testCrucialConstants(self): + def test_constants(self): ssl.PROTOCOL_SSLv2 ssl.PROTOCOL_SSLv23 ssl.PROTOCOL_SSLv3 @@ -84,7 +84,7 @@ class BasicTests(unittest.TestCase): ssl.CERT_OPTIONAL ssl.CERT_REQUIRED - def testRAND(self): + def test_random(self): v = ssl.RAND_status() if test_support.verbose: sys.stdout.write("\n RAND_status is %d (%s)\n" @@ -98,7 +98,7 @@ class BasicTests(unittest.TestCase): print "didn't raise TypeError" ssl.RAND_add("this is a random string", 75.0) - def testParseCert(self): + def test_parse_cert(self): # note that this uses an 'unofficial' function in _ssl.c, # provided solely for this test, to exercise the certificate # parsing code @@ -106,9 +106,9 @@ class BasicTests(unittest.TestCase): if test_support.verbose: sys.stdout.write("\n" + pprint.pformat(p) + "\n") - def testDERtoPEM(self): - - pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read() + def test_DER_to_PEM(self): + with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f: + pem = f.read() d1 = ssl.PEM_cert_to_DER_cert(pem) p2 = ssl.DER_cert_to_PEM_cert(d1) d2 = ssl.PEM_cert_to_DER_cert(p2) @@ -175,7 +175,7 @@ class BasicTests(unittest.TestCase): class NetworkedTests(unittest.TestCase): - def testConnect(self): + def test_connect(self): s = ssl.wrap_socket(socket.socket(socket.AF_INET), cert_reqs=ssl.CERT_NONE) s.connect(("svn.python.org", 443)) @@ -222,7 +222,7 @@ class NetworkedTests(unittest.TestCase): os.read(fd, 0) self.assertEqual(e.exception.errno, errno.EBADF) - def testNonBlockingHandshake(self): + def test_non_blocking_handshake(self): s = socket.socket(socket.AF_INET) s.connect(("svn.python.org", 443)) s.setblocking(False) @@ -246,8 +246,7 @@ class NetworkedTests(unittest.TestCase): if test_support.verbose: sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) - def testFetchServerCert(self): - + def test_get_server_certificate(self): pem = ssl.get_server_certificate(("svn.python.org", 443)) if not pem: self.fail("No server certificate on svn.python.org:443!") @@ -295,7 +294,6 @@ try: except ImportError: _have_threads = False else: - _have_threads = True class ThreadedEchoServer(threading.Thread): @@ -327,7 +325,7 @@ else: if test_support.verbose and self.server.chatty: sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") - def wrap_conn (self): + def wrap_conn(self): try: self.sslconn = ssl.wrap_socket(self.sock, server_side=True, certfile=self.server.certificate, @@ -367,7 +365,7 @@ else: else: self.sock._sock.close() - def run (self): + def run(self): self.running = True if not self.server.starttls_server: if isinstance(self.sock, ssl.SSLSocket): @@ -450,11 +448,11 @@ else: threading.Thread.__init__(self) self.daemon = True - def start (self, flag=None): + def start(self, flag=None): self.flag = flag threading.Thread.start(self) - def run (self): + def run(self): self.sock.settimeout(0.05) self.sock.listen(5) self.active = True @@ -475,14 +473,14 @@ else: self.stop() self.sock.close() - def stop (self): + def stop(self): self.active = False class AsyncoreEchoServer(threading.Thread): - class EchoServer (asyncore.dispatcher): + class EchoServer(asyncore.dispatcher): - class ConnectionHandler (asyncore.dispatcher_with_send): + class ConnectionHandler(asyncore.dispatcher_with_send): def __init__(self, conn, certfile): asyncore.dispatcher_with_send.__init__(self, conn) @@ -556,18 +554,18 @@ else: def __str__(self): return "<%s %s>" % (self.__class__.__name__, self.server) - def start (self, flag=None): + def start(self, flag=None): self.flag = flag threading.Thread.start(self) - def run (self): + def run(self): self.active = True if self.flag: self.flag.set() while self.active: asyncore.loop(0.05) - def stop (self): + def stop(self): self.active = False self.server.close() @@ -576,12 +574,9 @@ else: class HTTPSServer(HTTPServer): def __init__(self, server_address, RequestHandlerClass, certfile): - HTTPServer.__init__(self, server_address, RequestHandlerClass) # we assume the certfile contains both private key and certificate self.certfile = certfile - self.active = False - self.active_lock = threading.Lock() self.allow_reuse_address = True def __str__(self): @@ -590,7 +585,7 @@ else: self.server_name, self.server_port)) - def get_request (self): + def get_request(self): # override this to wrap socket with SSL sock, addr = self.socket.accept() sslconn = ssl.wrap_socket(sock, server_side=True, @@ -598,7 +593,6 @@ else: return sslconn, addr class RootedHTTPRequestHandler(SimpleHTTPRequestHandler): - # need to override translate_path to get a known root, # instead of using os.curdir, since the test could be # run from anywhere @@ -643,7 +637,6 @@ else: def __init__(self, certfile): self.flag = None - self.active = False self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0] self.server = self.HTTPSServer( (HOST, 0), self.RootedHTTPRequestHandler, certfile) @@ -654,23 +647,24 @@ else: def __str__(self): return "<%s %s>" % (self.__class__.__name__, self.server) - def start (self, flag=None): + def start(self, flag=None): self.flag = flag threading.Thread.start(self) - def run (self): - self.active = True + def run(self): if self.flag: self.flag.set() self.server.serve_forever(0.05) - self.active = False - def stop (self): - self.active = False + def stop(self): self.server.shutdown() - def badCertTest (certfile): + def bad_cert_test(certfile): + """ + Launch a server with CERT_REQUIRED, and check that trying to + connect to it with the given client certificate fails. + """ server = ThreadedEchoServer(CERTFILE, certreqs=ssl.CERT_REQUIRED, cacerts=CERTFILE, chatty=False) @@ -697,11 +691,14 @@ else: server.stop() server.join() - def serverParamsTest (certfile, protocol, certreqs, cacertsfile, - client_certfile, client_protocol=None, indata="FOO\n", - ciphers=None, chatty=True, connectionchatty=False, - wrap_accepting_socket=False): - + def server_params_test(certfile, protocol, certreqs, cacertsfile, + client_certfile, client_protocol=None, indata="FOO\n", + ciphers=None, chatty=True, connectionchatty=False, + wrap_accepting_socket=False): + """ + Launch a server, connect a client to it and try various reads + and writes. + """ server = ThreadedEchoServer(certfile, certreqs=certreqs, ssl_version=protocol, @@ -749,22 +746,19 @@ else: server.stop() server.join() - def tryProtocolCombo (server_protocol, - client_protocol, - expectedToWork, - certsreqs=None): - + def try_protocol_combo(server_protocol, + client_protocol, + expect_success, + certsreqs=None): if certsreqs is None: certsreqs = ssl.CERT_NONE - - if certsreqs == ssl.CERT_NONE: - certtype = "CERT_NONE" - elif certsreqs == ssl.CERT_OPTIONAL: - certtype = "CERT_OPTIONAL" - elif certsreqs == ssl.CERT_REQUIRED: - certtype = "CERT_REQUIRED" + certtype = { + ssl.CERT_NONE: "CERT_NONE", + ssl.CERT_OPTIONAL: "CERT_OPTIONAL", + ssl.CERT_REQUIRED: "CERT_REQUIRED", + }[certsreqs] if test_support.verbose: - formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n" + formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" sys.stdout.write(formatstr % (ssl.get_protocol_name(client_protocol), ssl.get_protocol_name(server_protocol), @@ -773,19 +767,19 @@ else: # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client # will send an SSLv3 hello (rather than SSLv2) starting from # OpenSSL 1.0.0 (see issue #8322). - serverParamsTest(CERTFILE, server_protocol, certsreqs, - CERTFILE, CERTFILE, client_protocol, - ciphers="ALL", chatty=False) + server_params_test(CERTFILE, server_protocol, certsreqs, + CERTFILE, CERTFILE, client_protocol, + ciphers="ALL", chatty=False) # Protocol mismatch can result in either an SSLError, or a # "Connection reset by peer" error. except ssl.SSLError: - if expectedToWork: + if expect_success: raise except socket.error as e: - if expectedToWork or e.errno != errno.ECONNRESET: + if expect_success or e.errno != errno.ECONNRESET: raise else: - if not expectedToWork: + if not expect_success: self.fail( "Client protocol %s succeeded with server protocol %s!" % (ssl.get_protocol_name(client_protocol), @@ -794,8 +788,10 @@ else: class ThreadedTests(unittest.TestCase): - def testRudeShutdown(self): - + def test_rude_shutdown(self): + """A brutal shutdown of an SSL server should raise an IOError + in the client when attempting handshake. + """ listener_ready = threading.Event() listener_gone = threading.Event() @@ -832,16 +828,15 @@ else: finally: t.join() - def testEcho (self): - + def test_echo(self): + """Basic test of an SSL client connecting to a server""" if test_support.verbose: sys.stdout.write("\n") - serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE, - CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1, - chatty=True, connectionchatty=True) - - def testReadCert(self): + server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE, + CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1, + chatty=True, connectionchatty=True) + def test_getpeercert(self): if test_support.verbose: sys.stdout.write("\n") s2 = socket.socket() @@ -881,74 +876,82 @@ else: server.stop() server.join() - def testNULLcert(self): - badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, - "nullcert.pem")) - def testMalformedCert(self): - badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, - "badcert.pem")) - def testWrongCert(self): - badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, - "wrongcert.pem")) - def testMalformedKey(self): - badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, - "badkey.pem")) - - def testProtocolSSL2(self): + def test_empty_cert(self): + """Connecting with an empty cert file""" + bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, + "nullcert.pem")) + def test_malformed_cert(self): + """Connecting with a badly formatted certificate (syntax error)""" + bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, + "badcert.pem")) + def test_nonexisting_cert(self): + """Connecting with a non-existing cert file""" + bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, + "wrongcert.pem")) + def test_malformed_key(self): + """Connecting with a badly formatted key (syntax error)""" + bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, + "badkey.pem")) + + def test_protocol_sslv2(self): + """Connecting to an SSLv2 server with various client options""" if test_support.verbose: sys.stdout.write("\n") - tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) - tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) - tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) - tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) - tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) - tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) - - def testProtocolSSL23(self): + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) + try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) + + def test_protocol_sslv23(self): + """Connecting to an SSLv23 server with various client options""" if test_support.verbose: sys.stdout.write("\n") try: - tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) except (ssl.SSLError, socket.error), x: # this fails on some older versions of OpenSSL (0.9.7l, for instance) if test_support.verbose: sys.stdout.write( " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n" % str(x)) - tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True) - tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) - tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True) - tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) - tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL) - tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) - tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) - tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) - tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) + try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) - def testProtocolSSL3(self): + def test_protocol_sslv3(self): + """Connecting to an SSLv3 server with various client options""" if test_support.verbose: sys.stdout.write("\n") - tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True) - tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) - tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) - tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) - tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False) - tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) - - def testProtocolTLS1(self): + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True) + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False) + try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) + + def test_protocol_tlsv1(self): + """Connecting to a TLSv1 server with various client options""" if test_support.verbose: sys.stdout.write("\n") - tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) - tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) - tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) - tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False) - tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) - tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False) - - def testSTARTTLS (self): - + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) + try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False) + + def test_starttls(self): + """Switching from clear text to encrypted and back again.""" msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6") server = ThreadedEchoServer(CERTFILE, @@ -980,6 +983,7 @@ else: outdata = s.recv(1024) if (indata == "STARTTLS" and outdata.strip().lower().startswith("ok")): + # STARTTLS ok, switch to secure mode if test_support.verbose: sys.stdout.write( " client: read %s from server, starting TLS...\n" @@ -988,6 +992,7 @@ else: wrapped = True elif (indata == "ENDTLS" and outdata.strip().lower().startswith("ok")): + # ENDTLS ok, switch back to clear text if test_support.verbose: sys.stdout.write( " client: read %s from server, ending TLS...\n" @@ -1009,8 +1014,8 @@ else: server.stop() server.join() - def testSocketServer(self): - + def test_socketserver(self): + """Using a SocketServer to create and manage SSL connections.""" server = SocketServerHTTPSServer(CERTFILE) flag = threading.Event() server.start(flag) @@ -1020,7 +1025,8 @@ else: try: if test_support.verbose: sys.stdout.write('\n') - d1 = open(CERTFILE, 'rb').read() + with open(CERTFILE, 'rb') as f: + d1 = f.read() d2 = '' # now fetch the same data from the HTTPS server url = 'https://127.0.0.1:%d/%s' % ( @@ -1040,18 +1046,17 @@ else: server.stop() server.join() - def testWrappedAccept (self): - + def test_wrapped_accept(self): + """Check the accept() method on SSL sockets.""" if test_support.verbose: sys.stdout.write("\n") - serverParamsTest(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED, - CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23, - chatty=True, connectionchatty=True, - wrap_accepting_socket=True) - - - def testAsyncoreServer (self): + server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED, + CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23, + chatty=True, connectionchatty=True, + wrap_accepting_socket=True) + def test_asyncore_server(self): + """Check the example asyncore integration.""" indata = "TEST MESSAGE of mixed case\n" if test_support.verbose: @@ -1086,9 +1091,8 @@ else: # wait for server thread to end server.join() - - def testAllRecvAndSendMethods(self): - + def test_recv_send(self): + """Test recv(), send() and friends.""" if test_support.verbose: sys.stdout.write("\n") @@ -1275,10 +1279,11 @@ def test_main(verbose=False): if thread_info and test_support.is_resource_enabled('network'): tests.append(ThreadedTests) - test_support.run_unittest(*tests) - - if _have_threads: - test_support.threading_cleanup(*thread_info) + try: + test_support.run_unittest(*tests) + finally: + if _have_threads: + test_support.threading_cleanup(*thread_info) if __name__ == "__main__": test_main() |