diff options
Diffstat (limited to 'Lib/test/test_socket_ssl.py')
-rw-r--r-- | Lib/test/test_socket_ssl.py | 299 |
1 files changed, 196 insertions, 103 deletions
diff --git a/Lib/test/test_socket_ssl.py b/Lib/test/test_socket_ssl.py index b04effe..42efb6e 100644 --- a/Lib/test/test_socket_ssl.py +++ b/Lib/test/test_socket_ssl.py @@ -1,128 +1,221 @@ # Test just the SSL support in the socket module, in a moderately bogus way. import sys +import unittest from test import test_support import socket import errno +import threading +import subprocess +import time +import os +import urllib -# Optionally test SSL support. This requires the 'network' resource as given -# on the regrtest command line. -skip_expected = not (test_support.is_resource_enabled('network') and - hasattr(socket, "ssl")) +# Optionally test SSL support, if we have it in the tested platform +skip_expected = not hasattr(socket, "ssl") -def test_basic(): - test_support.requires('network') +class ConnectedTests(unittest.TestCase): - import urllib - - if test_support.verbose: - print("test_basic ...") - - socket.RAND_status() - try: - socket.RAND_egd(1) - except TypeError: - pass - else: - print("didn't raise TypeError") - socket.RAND_add("this is a random string", 75.0) - - f = urllib.urlopen('https://sf.net') - buf = f.read() - f.close() - -def test_timeout(): - test_support.requires('network') - - def error_msg(extra_msg): - print("""\ - WARNING: an attempt to connect to %r %s, in - test_timeout. That may be legitimate, but is not the outcome we hoped - for. If this message is seen often, test_timeout should be changed to - use a more reliable address.""" % (ADDR, extra_msg), file=sys.stderr) - - if test_support.verbose: - print("test_timeout ...") - - # A service which issues a welcome banner (without need to write - # anything). - # XXX ("gmail.org", 995) has been unreliable so far, from time to time - # XXX non-responsive for hours on end (& across all buildbot slaves, - # XXX so that's not just a local thing). - ADDR = "gmail.org", 995 - - s = socket.socket() - s.settimeout(30.0) - try: - s.connect(ADDR) - except socket.timeout: - error_msg('timed out') - return - except socket.error as exc: # In case connection is refused. - if exc.args[0] == errno.ECONNREFUSED: - error_msg('was refused') - return + def testBasic(self): + socket.RAND_status() + try: + socket.RAND_egd(1) + except TypeError: + pass else: - raise + print("didn't raise TypeError") + socket.RAND_add("this is a random string", 75.0) + + with test_support.transient_internet(): + f = urllib.urlopen('https://sf.net') + buf = f.read() + f.close() + + def testTimeout(self): + def error_msg(extra_msg): + print("""\ + WARNING: an attempt to connect to %r %s, in + test_timeout. That may be legitimate, but is not the outcome we + hoped for. If this message is seen often, test_timeout should be + changed to use a more reliable address.""" % (ADDR, extra_msg), file=sys.stderr) + + # A service which issues a welcome banner (without need to write + # anything). + # XXX ("gmail.org", 995) has been unreliable so far, from time to + # XXX time non-responsive for hours on end (& across all buildbot + # XXX slaves, so that's not just a local thing). + ADDR = "gmail.org", 995 - ss = socket.ssl(s) - # Read part of return welcome banner twice. - ss.read(1) - ss.read(1) - s.close() - -def test_rude_shutdown(): - if test_support.verbose: - print("test_rude_shutdown ...") - - try: - import threading - except ImportError: - return + s = socket.socket() + s.settimeout(30.0) + try: + s.connect(ADDR) + except socket.timeout: + error_msg('timed out') + return + except socket.error as exc: # In case connection is refused. + if exc.args[0] == errno.ECONNREFUSED: + error_msg('was refused') + return + else: + raise + + ss = socket.ssl(s) + # Read part of return welcome banner twice. + ss.read(1) + ss.read(1) + s.close() + +class BasicTests(unittest.TestCase): + + def testRudeShutdown(self): + # Some random port to connect to. + PORT = [9934] + + listener_ready = threading.Event() + listener_gone = threading.Event() + + # `listener` runs in a thread. It opens a socket listening on + # PORT, and sits in an accept() until the main thread connects. + # Then it rudely closes the socket, and sets Event `listener_gone` + # to let the main thread know the socket is gone. + def listener(): + s = socket.socket() + PORT[0] = test_support.bind_port(s, '', PORT[0]) + s.listen(5) + listener_ready.set() + s.accept() + s = None # reclaim the socket object, which also closes it + listener_gone.set() + + def connector(): + listener_ready.wait() + s = socket.socket() + s.connect(('localhost', PORT[0])) + listener_gone.wait() + try: + ssl_sock = socket.ssl(s) + except socket.sslerror: + pass + else: + raise test_support.TestFailed( + 'connecting to closed SSL socket should have failed') - # Some random port to connect to. - PORT = [9934] + t = threading.Thread(target=listener) + t.start() + connector() + t.join() - listener_ready = threading.Event() - listener_gone = threading.Event() +class OpenSSLTests(unittest.TestCase): - # `listener` runs in a thread. It opens a socket listening on PORT, and - # sits in an accept() until the main thread connects. Then it rudely - # closes the socket, and sets Event `listener_gone` to let the main thread - # know the socket is gone. - def listener(): + def testBasic(self): s = socket.socket() - PORT[0] = test_support.bind_port(s, '', PORT[0]) - s.listen(5) - listener_ready.set() - s.accept() - s = None # reclaim the socket object, which also closes it - listener_gone.set() - - def connector(): - listener_ready.wait() + s.connect(("localhost", 4433)) + ss = socket.ssl(s) + ss.write("Foo\n") + i = ss.read(4) + self.assertEqual(i, "Foo\n") + s.close() + + def testMethods(self): + # read & write is already tried in the Basic test + # now we'll try to get the server info about certificates + # this came from the certificate I used, one I found in /usr/share/openssl + info = "/C=PT/ST=Queensland/L=Lisboa/O=Neuronio, Lda./OU=Desenvolvimento/CN=brutus.neuronio.pt/emailAddress=sampo@iki.fi" + s = socket.socket() - s.connect(('localhost', PORT[0])) - listener_gone.wait() + s.connect(("localhost", 4433)) + ss = socket.ssl(s) + cert = ss.server() + self.assertEqual(cert, info) + cert = ss.issuer() + self.assertEqual(cert, info) + s.close() + + +class OpenSSLServer(threading.Thread): + def __init__(self): + self.s = None + self.keepServing = True + self._external() + if self.haveServer: + threading.Thread.__init__(self) + + def _external(self): + # let's find the .pem files + curdir = os.path.dirname(__file__) or os.curdir + cert_file = os.path.join(curdir, "ssl_cert.pem") + if not os.access(cert_file, os.F_OK): + raise ValueError("No cert file found! (tried %r)" % cert_file) + key_file = os.path.join(curdir, "ssl_key.pem") + if not os.access(key_file, os.F_OK): + raise ValueError("No key file found! (tried %r)" % key_file) + try: - ssl_sock = socket.ssl(s) - except socket.sslerror: - pass + cmd = "openssl s_server -cert %s -key %s -quiet" % (cert_file, key_file) + self.s = subprocess.Popen(cmd.split(), stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + time.sleep(1) + except: + self.haveServer = False else: - raise test_support.TestFailed( - 'connecting to closed SSL socket should have failed') - - t = threading.Thread(target=listener) - t.start() - connector() - t.join() + # let's try if it is actually up + try: + s = socket.socket() + s.connect(("localhost", 4433)) + s.close() + if self.s.stdout.readline() != "ERROR\n": + raise ValueError + except: + self.haveServer = False + else: + self.haveServer = True + + def run(self): + while self.keepServing: + time.sleep(.5) + l = self.s.stdout.readline() + self.s.stdin.write(l) + + def shutdown(self): + self.keepServing = False + if not self.s: + return + if sys.platform == "win32": + subprocess.TerminateProcess(int(self.s._handle), -1) + else: + os.kill(self.s.pid, 15) def test_main(): if not hasattr(socket, "ssl"): raise test_support.TestSkipped("socket module has no ssl support") - test_rude_shutdown() - test_basic() - test_timeout() + + tests = [BasicTests] + + if test_support.is_resource_enabled('network'): + tests.append(ConnectedTests) + + # in these platforms we can kill the openssl process + if sys.platform in ("sunos5", "darwin", "linux1", + "linux2", "win32", "hp-ux11"): + + server = OpenSSLServer() + if server.haveServer: + tests.append(OpenSSLTests) + server.start() + else: + server = None + + thread_info = test_support.threading_setup() + + try: + test_support.run_unittest(*tests) + finally: + if server is not None and server.haveServer: + server.shutdown() + + test_support.threading_cleanup(*thread_info) if __name__ == "__main__": test_main() |