summaryrefslogtreecommitdiffstats
path: root/Lib/packaging/tests/pypi_server.py
blob: cc5fcca05eff9f253b89dcbe83af62eea281ae31 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
"""Mock PyPI Server implementation, to use in tests.

This module also provides a simple test case to extend if you need to use
the PyPIServer all along your test case. Be sure to read the documentation
before any use.

XXX TODO:

The mock server can handle simple HTTP request (to simulate a simple index) or
XMLRPC requests, over HTTP. Both does not have the same intergface to deal
with, and I think it's a pain.

A good idea could be to re-think a bit the way dstributions are handled in the
mock server. As it should return malformed HTML pages, we need to keep the
static behavior.

I think of something like that:

    >>> server = PyPIMockServer()
    >>> server.startHTTP()
    >>> server.startXMLRPC()

Then, the server must have only one port to rely on, eg.

    >>> server.fulladress()
    "http://ip:port/"

It could be simple to have one HTTP server, relaying the requests to the two
implementations (static HTTP and XMLRPC over HTTP).
"""

import os
import queue
import select
import socket
import threading
import socketserver
from functools import wraps
from http.server import HTTPServer, SimpleHTTPRequestHandler
from xmlrpc.server import SimpleXMLRPCServer

from packaging.tests import unittest

PYPI_DEFAULT_STATIC_PATH = os.path.join(
    os.path.dirname(os.path.abspath(__file__)), 'pypiserver')


def use_xmlrpc_server(*server_args, **server_kwargs):
    server_kwargs['serve_xmlrpc'] = True
    return use_pypi_server(*server_args, **server_kwargs)


def use_http_server(*server_args, **server_kwargs):
    server_kwargs['serve_xmlrpc'] = False
    return use_pypi_server(*server_args, **server_kwargs)


def use_pypi_server(*server_args, **server_kwargs):
    """Decorator to make use of the PyPIServer for test methods,
    just when needed, and not for the entire duration of the testcase.
    """
    def wrapper(func):
        @wraps(func)
        def wrapped(*args, **kwargs):
            server = PyPIServer(*server_args, **server_kwargs)
            server.start()
            try:
                func(server=server, *args, **kwargs)
            finally:
                server.stop()
        return wrapped
    return wrapper


class PyPIServerTestCase(unittest.TestCase):

    def setUp(self):
        super(PyPIServerTestCase, self).setUp()
        self.pypi = PyPIServer()
        self.pypi.start()
        self.addCleanup(self.pypi.stop)


class PyPIServer(threading.Thread):
    """PyPI Mocked server.
    Provides a mocked version of the PyPI API's, to ease tests.

    Support serving static content and serving previously given text.
    """

    def __init__(self, test_static_path=None,
                 static_filesystem_paths=["default"],
                 static_uri_paths=["simple", "packages"], serve_xmlrpc=False):
        """Initialize the server.

        Default behavior is to start the HTTP server. You can either start the
        xmlrpc server by setting xmlrpc to True. Caution: Only one server will
        be started.

        static_uri_paths and static_base_path are parameters used to provides
        respectively the http_paths to serve statically, and where to find the
        matching files on the filesystem.
        """
        # we want to launch the server in a new dedicated thread, to not freeze
        # tests.
        threading.Thread.__init__(self)
        self._run = True
        self._serve_xmlrpc = serve_xmlrpc

        #TODO allow to serve XMLRPC and HTTP static files at the same time.
        if not self._serve_xmlrpc:
            self.server = HTTPServer(('127.0.0.1', 0), PyPIRequestHandler)
            self.server.RequestHandlerClass.pypi_server = self

            self.request_queue = queue.Queue()
            self._requests = []
            self.default_response_status = 404
            self.default_response_headers = [('Content-type', 'text/plain')]
            self.default_response_data = "The page does not exists"

            # initialize static paths / filesystems
            self.static_uri_paths = static_uri_paths

            # append the static paths defined locally
            if test_static_path is not None:
                static_filesystem_paths.append(test_static_path)
            self.static_filesystem_paths = [
                PYPI_DEFAULT_STATIC_PATH + "/" + path
                for path in static_filesystem_paths]
        else:
            # XMLRPC server
            self.server = PyPIXMLRPCServer(('127.0.0.1', 0))
            self.xmlrpc = XMLRPCMockIndex()
            # register the xmlrpc methods
            self.server.register_introspection_functions()
            self.server.register_instance(self.xmlrpc)

        self.address = (self.server.server_name, self.server.server_port)
        # to not have unwanted outputs.
        self.server.RequestHandlerClass.log_request = lambda *_: None

    def run(self):
        # loop because we can't stop it otherwise, for python < 2.6
        while self._run:
            r, w, e = select.select([self.server], [], [], 0.5)
            if r:
                self.server.handle_request()

    def stop(self):
        """self shutdown is not supported for python < 2.6"""
        self._run = False

    def get_next_response(self):
        return (self.default_response_status,
                self.default_response_headers,
                self.default_response_data)

    @property
    def requests(self):
        """Use this property to get all requests that have been made
        to the server
        """
        while True:
            try:
                self._requests.append(self.request_queue.get_nowait())
            except queue.Empty:
                break
        return self._requests

    @property
    def full_address(self):
        return "http://%s:%s" % self.address


class PyPIRequestHandler(SimpleHTTPRequestHandler):
    # we need to access the pypi server while serving the content
    pypi_server = None

    def serve_request(self):
        """Serve the content.

        Also record the requests to be accessed later. If trying to access an
        url matching a static uri, serve static content, otherwise serve
        what is provided by the `get_next_response` method.

        If nothing is defined there, return a 404 header.
        """
        # record the request. Read the input only on PUT or POST requests
        if self.command in ("PUT", "POST"):
            if 'content-length' in self.headers:
                request_data = self.rfile.read(
                    int(self.headers['content-length']))
            else:
                request_data = self.rfile.read()

        elif self.command in ("GET", "DELETE"):
            request_data = ''

        self.pypi_server.request_queue.put((self, request_data))

        # serve the content from local disc if we request an URL beginning
        # by a pattern defined in `static_paths`
        url_parts = self.path.split("/")
        if (len(url_parts) > 1 and
                url_parts[1] in self.pypi_server.static_uri_paths):
            data = None
            # always take the last first.
            fs_paths = []
            fs_paths.extend(self.pypi_server.static_filesystem_paths)
            fs_paths.reverse()
            relative_path = self.path
            for fs_path in fs_paths:
                try:
                    if self.path.endswith("/"):
                        relative_path += "index.html"

                    if relative_path.endswith('.tar.gz'):
                        with open(fs_path + relative_path, 'br') as file:
                            data = file.read()
                        headers = [('Content-type', 'application/x-gtar')]
                    else:
                        with open(fs_path + relative_path) as file:
                            data = file.read().encode()
                        headers = [('Content-type', 'text/html')]

                    self.make_response(data, headers=headers)

                except IOError:
                    pass

            if data is None:
                self.make_response("Not found", 404)

        # otherwise serve the content from get_next_response
        else:
            # send back a response
            status, headers, data = self.pypi_server.get_next_response()
            self.make_response(data, status, headers)

    do_POST = do_GET = do_DELETE = do_PUT = serve_request

    def make_response(self, data, status=200,
                      headers=[('Content-type', 'text/html')]):
        """Send the response to the HTTP client"""
        if not isinstance(status, int):
            try:
                status = int(status)
            except ValueError:
                # we probably got something like YYY Codename.
                # Just get the first 3 digits
                status = int(status[:3])

        self.send_response(status)
        for header, value in headers:
            self.send_header(header, value)
        self.end_headers()

        if type(data) is str:
            data = data.encode()

        self.wfile.write(data)


class PyPIXMLRPCServer(SimpleXMLRPCServer):
    def server_bind(self):
        """Override server_bind to store the server name."""
        socketserver.TCPServer.server_bind(self)
        host, port = self.socket.getsockname()[:2]
        self.server_name = socket.getfqdn(host)
        self.server_port = port


class MockDist:
    """Fake distribution, used in the Mock PyPI Server"""

    def __init__(self, name, version="1.0", hidden=False, url="http://url/",
             type="sdist", filename="", size=10000,
             digest="123456", downloads=7, has_sig=False,
             python_version="source", comment="comment",
             author="John Doe", author_email="john@doe.name",
             maintainer="Main Tayner", maintainer_email="maintainer_mail",
             project_url="http://project_url/", homepage="http://homepage/",
             keywords="", platform="UNKNOWN", classifiers=[], licence="",
             description="Description", summary="Summary", stable_version="",
             ordering="", documentation_id="", code_kwalitee_id="",
             installability_id="", obsoletes=[], obsoletes_dist=[],
             provides=[], provides_dist=[], requires=[], requires_dist=[],
             requires_external=[], requires_python=""):

        # basic fields
        self.name = name
        self.version = version
        self.hidden = hidden

        # URL infos
        self.url = url
        self.digest = digest
        self.downloads = downloads
        self.has_sig = has_sig
        self.python_version = python_version
        self.comment = comment
        self.type = type

        # metadata
        self.author = author
        self.author_email = author_email
        self.maintainer = maintainer
        self.maintainer_email = maintainer_email
        self.project_url = project_url
        self.homepage = homepage
        self.keywords = keywords
        self.platform = platform
        self.classifiers = classifiers
        self.licence = licence
        self.description = description
        self.summary = summary
        self.stable_version = stable_version
        self.ordering = ordering
        self.cheesecake_documentation_id = documentation_id
        self.cheesecake_code_kwalitee_id = code_kwalitee_id
        self.cheesecake_installability_id = installability_id

        self.obsoletes = obsoletes
        self.obsoletes_dist = obsoletes_dist
        self.provides = provides
        self.provides_dist = provides_dist
        self.requires = requires
        self.requires_dist = requires_dist
        self.requires_external = requires_external
        self.requires_python = requires_python

    def url_infos(self):
        return {
            'url': self.url,
            'packagetype': self.type,
            'filename': 'filename.tar.gz',
            'size': '6000',
            'md5_digest': self.digest,
            'downloads': self.downloads,
            'has_sig': self.has_sig,
            'python_version': self.python_version,
            'comment_text': self.comment,
        }

    def metadata(self):
        return {
            'maintainer': self.maintainer,
            'project_url': [self.project_url],
            'maintainer_email': self.maintainer_email,
            'cheesecake_code_kwalitee_id': self.cheesecake_code_kwalitee_id,
            'keywords': self.keywords,
            'obsoletes_dist': self.obsoletes_dist,
            'requires_external': self.requires_external,
            'author': self.author,
            'author_email': self.author_email,
            'download_url': self.url,
            'platform': self.platform,
            'version': self.version,
            'obsoletes': self.obsoletes,
            'provides': self.provides,
            'cheesecake_documentation_id': self.cheesecake_documentation_id,
            '_pypi_hidden': self.hidden,
            'description': self.description,
            '_pypi_ordering': 19,
            'requires_dist': self.requires_dist,
            'requires_python': self.requires_python,
            'classifiers': [],
            'name': self.name,
            'licence': self.licence,
            'summary': self.summary,
            'home_page': self.homepage,
            'stable_version': self.stable_version,
            'provides_dist': self.provides_dist or "%s (%s)" % (self.name,
                                                              self.version),
            'requires': self.requires,
            'cheesecake_installability_id': self.cheesecake_installability_id,
        }

    def search_result(self):
        return {
            '_pypi_ordering': 0,
            'version': self.version,
            'name': self.name,
            'summary': self.summary,
        }


class XMLRPCMockIndex:
    """Mock XMLRPC server"""

    def __init__(self, dists=[]):
        self._dists = dists
        self._search_result = []

    def add_distributions(self, dists):
        for dist in dists:
            self._dists.append(MockDist(**dist))

    def set_distributions(self, dists):
        self._dists = []
        self.add_distributions(dists)

    def set_search_result(self, result):
        """set a predefined search result"""
        self._search_result = result

    def _get_search_results(self):
        results = []
        for name in self._search_result:
            found_dist = [d for d in self._dists if d.name == name]
            if found_dist:
                results.append(found_dist[0])
            else:
                dist = MockDist(name)
                results.append(dist)
                self._dists.append(dist)
        return [r.search_result() for r in results]

    def list_packages(self):
        return [d.name for d in self._dists]

    def package_releases(self, package_name, show_hidden=False):
        if show_hidden:
            # return all
            return [d.version for d in self._dists if d.name == package_name]
        else:
            # return only un-hidden
            return [d.version for d in self._dists if d.name == package_name
                    and not d.hidden]

    def release_urls(self, package_name, version):
        return [d.url_infos() for d in self._dists
                if d.name == package_name and d.version == version]

    def release_data(self, package_name, version):
        release = [d for d in self._dists
                   if d.name == package_name and d.version == version]
        if release:
            return release[0].metadata()
        else:
            return {}

    def search(self, spec, operator="and"):
        return self._get_search_results()