summaryrefslogtreecommitdiffstats
path: root/Lib/test/ssl_servers.py
blob: b99ea45972013ef7ccd9a9b3adf0c09ec2ed145b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import os
import sys
import ssl
import pprint
import socket
import urllib.parse
# Rename HTTPServer to _HTTPServer so as to avoid confusion with HTTPSServer.
from http.server import (HTTPServer as _HTTPServer,
    SimpleHTTPRequestHandler, BaseHTTPRequestHandler)

from test import support
threading = support.import_module("threading")

here = os.path.dirname(__file__)

HOST = support.HOST
CERTFILE = os.path.join(here, 'keycert.pem')

# This one's based on HTTPServer, which is based on socketserver

class HTTPSServer(_HTTPServer):

    def __init__(self, server_address, handler_class, context):
        _HTTPServer.__init__(self, server_address, handler_class)
        self.context = context

    def __str__(self):
        return ('<%s %s:%s>' %
                (self.__class__.__name__,
                 self.server_name,
                 self.server_port))

    def get_request(self):
        # override this to wrap socket with SSL
        try:
            sock, addr = self.socket.accept()
            sslconn = self.context.wrap_socket(sock, server_side=True)
        except OSError as e:
            # socket errors are silenced by the caller, print them here
            if support.verbose:
                sys.stderr.write("Got an error:\n%s\n" % e)
            raise
        return sslconn, addr

class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
    # need to override translate_path to get a known root,
    # instead of using os.curdir, since the test could be
    # run from anywhere

    server_version = "TestHTTPS/1.0"
    root = here
    # Avoid hanging when a request gets interrupted by the client
    timeout = 5

    def translate_path(self, path):
        """Translate a /-separated PATH to the local filename syntax.

        Components that mean special things to the local file system
        (e.g. drive or directory names) are ignored.  (XXX They should
        probably be diagnosed.)

        """
        # abandon query parameters
        path = urllib.parse.urlparse(path)[2]
        path = os.path.normpath(urllib.parse.unquote(path))
        words = path.split('/')
        words = filter(None, words)
        path = self.root
        for word in words:
            drive, word = os.path.splitdrive(word)
            head, word = os.path.split(word)
            path = os.path.join(path, word)
        return path

    def log_message(self, format, *args):
        # we override this to suppress logging unless "verbose"
        if support.verbose:
            sys.stdout.write(" server (%s:%d %s):\n   [%s] %s\n" %
                             (self.server.server_address,
                              self.server.server_port,
                              self.request.cipher(),
                              self.log_date_time_string(),
                              format%args))


class StatsRequestHandler(BaseHTTPRequestHandler):
    """Example HTTP request handler which returns SSL statistics on GET
    requests.
    """

    server_version = "StatsHTTPS/1.0"

    def do_GET(self, send_body=True):
        """Serve a GET request."""
        sock = self.rfile.raw._sock
        context = sock.context
        stats = {
            'session_cache': context.session_stats(),
            'cipher': sock.cipher(),
            'compression': sock.compression(),
            }
        body = pprint.pformat(stats)
        body = body.encode('utf-8')
        self.send_response(200)
        self.send_header("Content-type", "text/plain; charset=utf-8")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        if send_body:
            self.wfile.write(body)

    def do_HEAD(self):
        """Serve a HEAD request."""
        self.do_GET(send_body=False)

    def log_request(self, format, *args):
        if support.verbose:
            BaseHTTPRequestHandler.log_request(self, format, *args)


class HTTPSServerThread(threading.Thread):

    def __init__(self, context, host=HOST, handler_class=None):
        self.flag = None
        self.server = HTTPSServer((host, 0),
                                  handler_class or RootedHTTPRequestHandler,
                                  context)
        self.port = self.server.server_port
        threading.Thread.__init__(self)
        self.daemon = True

    def __str__(self):
        return "<%s %s>" % (self.__class__.__name__, self.server)

    def start(self, flag=None):
        self.flag = flag
        threading.Thread.start(self)

    def run(self):
        if self.flag:
            self.flag.set()
        try:
            self.server.serve_forever(0.05)
        finally:
            self.server.server_close()

    def stop(self):
        self.server.shutdown()


def make_https_server(case, *, context=None, certfile=CERTFILE,
                      host=HOST, handler_class=None):
    if context is None:
        context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    # We assume the certfile contains both private key and certificate
    context.load_cert_chain(certfile)
    server = HTTPSServerThread(context, host, handler_class)
    flag = threading.Event()
    server.start(flag)
    flag.wait()
    def cleanup():
        if support.verbose:
            sys.stdout.write('stopping HTTPS server\n')
        server.stop()
        if support.verbose:
            sys.stdout.write('joining HTTPS thread\n')
        server.join()
    case.addCleanup(cleanup)
    return server


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(
        description='Run a test HTTPS server. '
                    'By default, the current directory is served.')
    parser.add_argument('-p', '--port', type=int, default=4433,
                        help='port to listen on (default: %(default)s)')
    parser.add_argument('-q', '--quiet', dest='verbose', default=True,
                        action='store_false', help='be less verbose')
    parser.add_argument('-s', '--stats', dest='use_stats_handler', default=False,
                        action='store_true', help='always return stats page')
    parser.add_argument('--curve-name', dest='curve_name', type=str,
                        action='store',
                        help='curve name for EC-based Diffie-Hellman')
    parser.add_argument('--ciphers', dest='ciphers', type=str,
                        help='allowed cipher list')
    parser.add_argument('--dh', dest='dh_file', type=str, action='store',
                        help='PEM file containing DH parameters')
    args = parser.parse_args()

    support.verbose = args.verbose
    if args.use_stats_handler:
        handler_class = StatsRequestHandler
    else:
        handler_class = RootedHTTPRequestHandler
        handler_class.root = os.getcwd()
    context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    context.load_cert_chain(CERTFILE)
    if args.curve_name:
        context.set_ecdh_curve(args.curve_name)
    if args.dh_file:
        context.load_dh_params(args.dh_file)
    if args.ciphers:
        context.set_ciphers(args.ciphers)

    server = HTTPSServer(("", args.port), handler_class, context)
    if args.verbose:
        print("Listening on https://localhost:{0.port}".format(args))
    server.serve_forever(0.1)