diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2010-10-13 10:36:15 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2010-10-13 10:36:15 (GMT) |
commit | 803e6d670c019335096ca456b1778205edb30a79 (patch) | |
tree | 1741b686a80afe93cf5bf57905b7ada473da2d6c /Lib/test/ssl_servers.py | |
parent | bd4dacb3f983cb839314a79ef92430e83f757f17 (diff) | |
download | cpython-803e6d670c019335096ca456b1778205edb30a79.zip cpython-803e6d670c019335096ca456b1778205edb30a79.tar.gz cpython-803e6d670c019335096ca456b1778205edb30a79.tar.bz2 |
Issue #9003: http.client.HTTPSConnection, urllib.request.HTTPSHandler and
urllib.request.urlopen now take optional arguments to allow for
server certificate checking, as recommended in public uses of HTTPS.
Diffstat (limited to 'Lib/test/ssl_servers.py')
-rw-r--r-- | Lib/test/ssl_servers.py | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/Lib/test/ssl_servers.py b/Lib/test/ssl_servers.py new file mode 100644 index 0000000..d0736b1 --- /dev/null +++ b/Lib/test/ssl_servers.py @@ -0,0 +1,119 @@ +import os +import sys +import ssl +import threading +import urllib.parse +# Rename HTTPServer to _HTTPServer so as to avoid confusion with HTTPSServer. +from http.server import HTTPServer as _HTTPServer, SimpleHTTPRequestHandler + +from test import support + +here = os.path.dirname(__file__) + +HOST = support.HOST +CERTFILE = os.path.join(here, 'keycert.pem') + +# This one's based on HTTPServer, which is based on SocketServer + +class HTTPSServer(_HTTPServer): + + def __init__(self, server_address, handler_class, context): + _HTTPServer.__init__(self, server_address, handler_class) + self.context = context + + def __str__(self): + return ('<%s %s:%s>' % + (self.__class__.__name__, + self.server_name, + self.server_port)) + + def get_request(self): + # override this to wrap socket with SSL + sock, addr = self.socket.accept() + sslconn = self.context.wrap_socket(sock, server_side=True) + return sslconn, addr + +class RootedHTTPRequestHandler(SimpleHTTPRequestHandler): + # need to override translate_path to get a known root, + # instead of using os.curdir, since the test could be + # run from anywhere + + server_version = "TestHTTPS/1.0" + root = here + # Avoid hanging when a request gets interrupted by the client + timeout = 5 + + def translate_path(self, path): + """Translate a /-separated PATH to the local filename syntax. + + Components that mean special things to the local file system + (e.g. drive or directory names) are ignored. (XXX They should + probably be diagnosed.) + + """ + # abandon query parameters + path = urllib.parse.urlparse(path)[2] + path = os.path.normpath(urllib.parse.unquote(path)) + words = path.split('/') + words = filter(None, words) + path = self.root + for word in words: + drive, word = os.path.splitdrive(word) + head, word = os.path.split(word) + path = os.path.join(path, word) + return path + + def log_message(self, format, *args): + # we override this to suppress logging unless "verbose" + if support.verbose: + sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" % + (self.server.server_address, + self.server.server_port, + self.request.cipher(), + self.log_date_time_string(), + format%args)) + +class HTTPSServerThread(threading.Thread): + + def __init__(self, context, host=HOST, handler_class=None): + self.flag = None + self.server = HTTPSServer((host, 0), + handler_class or RootedHTTPRequestHandler, + context) + self.port = self.server.server_port + threading.Thread.__init__(self) + self.daemon = True + + def __str__(self): + return "<%s %s>" % (self.__class__.__name__, self.server) + + def start(self, flag=None): + self.flag = flag + threading.Thread.start(self) + + def run(self): + if self.flag: + self.flag.set() + self.server.serve_forever(0.05) + + def stop(self): + self.server.shutdown() + + +def make_https_server(case, certfile=CERTFILE, host=HOST, handler_class=None): + # we assume the certfile contains both private key and certificate + context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + context.load_cert_chain(certfile) + server = HTTPSServerThread(context, host, handler_class) + flag = threading.Event() + server.start(flag) + flag.wait() + def cleanup(): + if support.verbose: + sys.stdout.write('stopping HTTPS server\n') + server.stop() + if support.verbose: + sys.stdout.write('joining HTTPS thread\n') + server.join() + case.addCleanup(cleanup) + return server |