summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_ssl.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r--Lib/test/test_ssl.py170
1 files changed, 85 insertions, 85 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 59bf57d..4f884f0 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -2,7 +2,7 @@
import sys
import unittest
-from test import test_support
+from test import support
import socket
import select
import errno
@@ -25,27 +25,27 @@ try:
except ImportError:
skip_expected = True
-HOST = test_support.HOST
+HOST = support.HOST
CERTFILE = None
SVN_PYTHON_ORG_ROOT_CERT = None
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(prefix + exc_format)
class BasicTests(unittest.TestCase):
def testSSLconnect(self):
- if not test_support.is_resource_enabled('network'):
+ if not support.is_resource_enabled('network'):
return
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
s.connect(("svn.python.org", 443))
c = s.getpeercert()
if c:
- raise test_support.TestFailed("Peer cert %s shouldn't be here!")
+ raise support.TestFailed("Peer cert %s shouldn't be here!")
s.close()
# this should fail because we have no verification certs
@@ -69,7 +69,7 @@ class BasicTests(unittest.TestCase):
def testRAND(self):
v = ssl.RAND_status()
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("\n RAND_status is %d (%s)\n"
% (v, (v and "sufficient randomness") or
"insufficient randomness"))
@@ -86,7 +86,7 @@ class BasicTests(unittest.TestCase):
# provided solely for this test, to exercise the certificate
# parsing code
p = ssl._ssl._test_decode_cert(CERTFILE, False)
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("\n" + pprint.pformat(p) + "\n")
def testDERtoPEM(self):
@@ -96,7 +96,7 @@ class BasicTests(unittest.TestCase):
p2 = ssl.DER_cert_to_PEM_cert(d1)
d2 = ssl.PEM_cert_to_DER_cert(p2)
if (d1 != d2):
- raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
+ raise support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
class NetworkedTests(unittest.TestCase):
@@ -106,7 +106,7 @@ class NetworkedTests(unittest.TestCase):
s.connect(("svn.python.org", 443))
c = s.getpeercert()
if c:
- raise test_support.TestFailed("Peer cert %s shouldn't be here!")
+ raise support.TestFailed("Peer cert %s shouldn't be here!")
s.close()
# this should fail because we have no verification certs
@@ -126,7 +126,7 @@ class NetworkedTests(unittest.TestCase):
try:
s.connect(("svn.python.org", 443))
except ssl.SSLError as x:
- raise test_support.TestFailed("Unexpected exception %s" % x)
+ raise support.TestFailed("Unexpected exception %s" % x)
finally:
s.close()
@@ -151,14 +151,14 @@ class NetworkedTests(unittest.TestCase):
else:
raise
s.close()
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
def testFetchServerCert(self):
pem = ssl.get_server_certificate(("svn.python.org", 443))
if not pem:
- raise test_support.TestFailed("No server certificate on svn.python.org:443!")
+ raise support.TestFailed("No server certificate on svn.python.org:443!")
return
@@ -166,15 +166,15 @@ class NetworkedTests(unittest.TestCase):
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
except ssl.SSLError as x:
#should fail
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("%s\n" % x)
else:
- raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
+ raise support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
if not pem:
- raise test_support.TestFailed("No server certificate on svn.python.org:443!")
- if test_support.verbose:
+ raise support.TestFailed("No server certificate on svn.python.org:443!")
+ if support.verbose:
sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
@@ -227,13 +227,13 @@ else:
else:
if self.server.certreqs == ssl.CERT_REQUIRED:
cert = self.sslconn.getpeercert()
- if test_support.verbose and self.server.chatty:
+ if support.verbose and self.server.chatty:
sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
cert_binary = self.sslconn.getpeercert(True)
- if test_support.verbose and self.server.chatty:
+ if support.verbose and self.server.chatty:
sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
cipher = self.sslconn.cipher()
- if test_support.verbose and self.server.chatty:
+ if support.verbose and self.server.chatty:
sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
return True
@@ -269,19 +269,19 @@ else:
self.running = False
self.close()
elif amsg.strip() == 'over':
- if test_support.verbose and self.server.connectionchatty:
+ if support.verbose and self.server.connectionchatty:
sys.stdout.write(" server: client closed connection\n")
self.close()
return
elif (self.server.starttls_server and
amsg.strip() == 'STARTTLS'):
- if test_support.verbose and self.server.connectionchatty:
+ if support.verbose and self.server.connectionchatty:
sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
self.write("OK\n".encode("ASCII", "strict"))
if not self.wrap_conn():
return
else:
- if (test_support.verbose and
+ if (support.verbose and
self.server.connectionchatty):
ctype = (self.sslconn and "encrypted") or "unencrypted"
sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
@@ -314,7 +314,7 @@ else:
self.connectionchatty = connectionchatty
self.starttls_server = starttls_server
self.sock = socket.socket()
- self.port = test_support.bind_port(self.sock)
+ self.port = support.bind_port(self.sock)
self.flag = None
self.active = False
threading.Thread.__init__(self)
@@ -334,7 +334,7 @@ else:
while self.active:
try:
newconn, connaddr = self.sock.accept()
- if test_support.verbose and self.chatty:
+ if support.verbose and self.chatty:
sys.stdout.write(' server: new connection from '
+ repr(connaddr) + '\n')
handler = self.ConnectionHandler(self, newconn, connaddr)
@@ -457,7 +457,7 @@ else:
# we override this to suppress logging unless "verbose"
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
(self.server.server_address,
self.server.server_port,
@@ -470,7 +470,7 @@ else:
self.flag = None
self.active = False
self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
- self.port = test_support.find_unused_port()
+ self.port = support.find_unused_port()
self.server = self.HTTPSServer(
(HOST, self.port), self.RootedHTTPRequestHandler, certfile)
threading.Thread.__init__(self)
@@ -522,7 +522,7 @@ else:
def handle_read(self):
data = self.recv(1024)
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(" server: read %s from client\n" % repr(data))
if not data:
self.close()
@@ -530,7 +530,7 @@ else:
self.send(str(data, 'ASCII', 'strict').lower().encode('ASCII', 'strict'))
def handle_close(self):
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(" server: closed connection %s\n" % self.socket)
def handle_error(self):
@@ -546,7 +546,7 @@ else:
def handle_accept(self):
sock_obj, addr = self.accept()
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(" server: new connection from %s:%s\n" %addr)
self.ConnectionHandler(sock_obj, self.certfile)
@@ -556,7 +556,7 @@ else:
def __init__(self, certfile):
self.flag = None
self.active = False
- self.port = test_support.find_unused_port()
+ self.port = support.find_unused_port()
self.server = self.EchoServer(self.port, certfile)
threading.Thread.__init__(self)
self.setDaemon(True)
@@ -599,10 +599,10 @@ else:
ssl_version=ssl.PROTOCOL_TLSv1)
s.connect((HOST, server.port))
except ssl.SSLError as x:
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("\nSSLError is %s\n" % x)
else:
- raise test_support.TestFailed(
+ raise support.TestFailed(
"Use of invalid cert should have failed!")
finally:
server.stop()
@@ -635,28 +635,28 @@ else:
ssl_version=client_protocol)
s.connect((HOST, server.port))
except ssl.SSLError as x:
- raise test_support.TestFailed("Unexpected SSL error: " + str(x))
+ raise support.TestFailed("Unexpected SSL error: " + str(x))
except Exception as x:
- raise test_support.TestFailed("Unexpected exception: " + str(x))
+ raise support.TestFailed("Unexpected exception: " + str(x))
else:
if connectionchatty:
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(
" client: sending %s...\n" % (repr(indata)))
s.write(indata.encode('ASCII', 'strict'))
outdata = s.read()
if connectionchatty:
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(" client: read %s\n" % repr(outdata))
outdata = str(outdata, 'ASCII', 'strict')
if outdata != indata.lower():
- raise test_support.TestFailed(
+ raise support.TestFailed(
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
% (repr(outdata[:min(len(outdata),20)]), len(outdata),
repr(indata[:min(len(indata),20)].lower()), len(indata)))
s.write("over\n".encode("ASCII", "strict"))
if connectionchatty:
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(" client: closing connection.\n")
s.close()
finally:
@@ -677,7 +677,7 @@ else:
certtype = "CERT_OPTIONAL"
elif certsreqs == ssl.CERT_REQUIRED:
certtype = "CERT_REQUIRED"
- if test_support.verbose:
+ if support.verbose:
formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
sys.stdout.write(formatstr %
(ssl.get_protocol_name(client_protocol),
@@ -687,12 +687,12 @@ else:
serverParamsTest(CERTFILE, server_protocol, certsreqs,
CERTFILE, CERTFILE, client_protocol,
chatty=False, connectionchatty=False)
- except test_support.TestFailed:
+ except support.TestFailed:
if expectedToWork:
raise
else:
if not expectedToWork:
- raise test_support.TestFailed(
+ raise support.TestFailed(
"Client protocol %s succeeded with server protocol %s!"
% (ssl.get_protocol_name(client_protocol),
ssl.get_protocol_name(server_protocol)))
@@ -702,7 +702,7 @@ else:
def testEcho (self):
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("\n")
serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
@@ -710,7 +710,7 @@ else:
def testReadCert(self):
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("\n")
s2 = socket.socket()
server = ThreadedEchoServer(CERTFILE,
@@ -732,30 +732,30 @@ else:
ssl_version=ssl.PROTOCOL_SSLv23)
s.connect((HOST, server.port))
except ssl.SSLError as x:
- raise test_support.TestFailed(
+ raise support.TestFailed(
"Unexpected SSL error: " + str(x))
except Exception as x:
- raise test_support.TestFailed(
+ raise support.TestFailed(
"Unexpected exception: " + str(x))
else:
if not s:
- raise test_support.TestFailed(
+ raise support.TestFailed(
"Can't SSL-handshake with test server")
cert = s.getpeercert()
if not cert:
- raise test_support.TestFailed(
+ raise support.TestFailed(
"Can't get peer certificate.")
cipher = s.cipher()
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(pprint.pformat(cert) + '\n')
sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
if 'subject' not in cert:
- raise test_support.TestFailed(
+ raise support.TestFailed(
"No subject field in certificate: %s." %
pprint.pformat(cert))
if ((('organizationName', 'Python Software Foundation'),)
not in cert['subject']):
- raise test_support.TestFailed(
+ raise support.TestFailed(
"Missing or invalid 'organizationName' field in certificate subject; "
"should be 'Python Software Foundation'.")
s.close()
@@ -777,7 +777,7 @@ else:
listener_ready = threading.Event()
listener_gone = threading.Event()
- port = test_support.find_unused_port()
+ port = support.find_unused_port()
# `listener` runs in a thread. It opens a socket listening on
# PORT, and sits in an accept() until the main thread connects.
@@ -802,7 +802,7 @@ else:
except IOError:
pass
else:
- raise test_support.TestFailed(
+ raise support.TestFailed(
'connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener)
@@ -811,7 +811,7 @@ else:
t.join()
def testProtocolSSL2(self):
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("\n")
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
@@ -821,13 +821,13 @@ else:
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
def testProtocolSSL23(self):
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("\n")
try:
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
- except test_support.TestFailed as x:
+ except support.TestFailed as x:
# this fails on some older versions of OpenSSL (0.9.7l, for instance)
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(
" SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
% str(x))
@@ -844,7 +844,7 @@ else:
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
def testProtocolSSL3(self):
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("\n")
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
@@ -854,7 +854,7 @@ else:
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
def testProtocolTLS1(self):
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("\n")
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
@@ -884,13 +884,13 @@ else:
s.setblocking(1)
s.connect((HOST, server.port))
except Exception as x:
- raise test_support.TestFailed("Unexpected exception: " + str(x))
+ raise support.TestFailed("Unexpected exception: " + str(x))
else:
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("\n")
for indata in msgs:
msg = indata.encode('ASCII', 'replace')
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(
" client: sending %s...\n" % repr(msg))
if wrapped:
@@ -901,7 +901,7 @@ else:
outdata = s.recv(1024)
if (indata == "STARTTLS" and
str(outdata, 'ASCII', 'replace').strip().lower().startswith("ok")):
- if test_support.verbose:
+ if support.verbose:
msg = str(outdata, 'ASCII', 'replace')
sys.stdout.write(
" client: read %s from server, starting TLS...\n"
@@ -910,11 +910,11 @@ else:
wrapped = True
else:
- if test_support.verbose:
+ if support.verbose:
msg = str(outdata, 'ASCII', 'replace')
sys.stdout.write(
" client: read %s from server\n" % repr(msg))
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(" client: closing connection.\n")
if wrapped:
conn.write("over\n".encode("ASCII", "strict"))
@@ -937,7 +937,7 @@ else:
flag.wait()
# try to connect
try:
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write('\n')
d1 = open(CERTFILE, 'rb').read()
d2 = ''
@@ -948,33 +948,33 @@ else:
dlen = f.info().getheader("content-length")
if dlen and (int(dlen) > 0):
d2 = f.read(int(dlen))
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(
" client: read %d bytes from remote server '%s'\n"
% (len(d2), server))
f.close()
except:
msg = ''.join(traceback.format_exception(*sys.exc_info()))
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write('\n' + msg)
- raise test_support.TestFailed(msg)
+ raise support.TestFailed(msg)
else:
if not (d1 == d2):
print("d1 is", len(d1), repr(d1))
print("d2 is", len(d2), repr(d2))
- raise test_support.TestFailed(
+ raise support.TestFailed(
"Couldn't fetch data from HTTPS server")
finally:
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write('stopping server\n')
server.stop()
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write('joining thread\n')
server.join()
def testAsyncoreServer(self):
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write("\n")
indata="FOO\n"
@@ -988,25 +988,25 @@ else:
s = ssl.wrap_socket(socket.socket())
s.connect((HOST, server.port))
except ssl.SSLError as x:
- raise test_support.TestFailed("Unexpected SSL error: " + str(x))
+ raise support.TestFailed("Unexpected SSL error: " + str(x))
except Exception as x:
- raise test_support.TestFailed("Unexpected exception: " + str(x))
+ raise support.TestFailed("Unexpected exception: " + str(x))
else:
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(
" client: sending %s...\n" % (repr(indata)))
s.sendall(indata.encode('ASCII', 'strict'))
outdata = s.recv()
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(" client: read %s\n" % repr(outdata))
outdata = str(outdata, 'ASCII', 'strict')
if outdata != indata.lower():
- raise test_support.TestFailed(
+ raise support.TestFailed(
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
% (repr(outdata[:min(len(outdata),20)]), len(outdata),
repr(indata[:min(len(indata),20)].lower()), len(indata)))
s.write("over\n".encode("ASCII", "strict"))
- if test_support.verbose:
+ if support.verbose:
sys.stdout.write(" client: closing connection.\n")
s.close()
finally:
@@ -1015,7 +1015,7 @@ else:
def test_main(verbose=False):
if skip_expected:
- raise test_support.TestSkipped("No SSL support")
+ raise support.TestSkipped("No SSL support")
global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
@@ -1026,22 +1026,22 @@ def test_main(verbose=False):
if (not os.path.exists(CERTFILE) or
not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
- raise test_support.TestFailed("Can't read certificate files!")
+ raise support.TestFailed("Can't read certificate files!")
tests = [BasicTests]
- if test_support.is_resource_enabled('network'):
+ if support.is_resource_enabled('network'):
tests.append(NetworkedTests)
if _have_threads:
- thread_info = test_support.threading_setup()
- if thread_info and test_support.is_resource_enabled('network'):
+ thread_info = support.threading_setup()
+ if thread_info and support.is_resource_enabled('network'):
tests.append(ThreadedTests)
- test_support.run_unittest(*tests)
+ support.run_unittest(*tests)
if _have_threads:
- test_support.threading_cleanup(*thread_info)
+ support.threading_cleanup(*thread_info)
if __name__ == "__main__":
test_main()