1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
|
"""Mock PyPI Server implementation, to use in tests.
This module also provides a simple test case to extend if you need to use
the PyPIServer all along your test case. Be sure to read the documentation
before any use.
XXX TODO:
The mock server can handle simple HTTP request (to simulate a simple index) or
XMLRPC requests, over HTTP. Both does not have the same intergface to deal
with, and I think it's a pain.
A good idea could be to re-think a bit the way dstributions are handled in the
mock server. As it should return malformed HTML pages, we need to keep the
static behavior.
I think of something like that:
>>> server = PyPIMockServer()
>>> server.startHTTP()
>>> server.startXMLRPC()
Then, the server must have only one port to rely on, eg.
>>> server.fulladress()
"http://ip:port/"
It could be simple to have one HTTP server, relaying the requests to the two
implementations (static HTTP and XMLRPC over HTTP).
"""
import os
import queue
import select
import socket
import threading
import socketserver
from functools import wraps
from http.server import HTTPServer, SimpleHTTPRequestHandler
from xmlrpc.server import SimpleXMLRPCServer
from packaging.tests import unittest
PYPI_DEFAULT_STATIC_PATH = os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'pypiserver')
def use_xmlrpc_server(*server_args, **server_kwargs):
server_kwargs['serve_xmlrpc'] = True
return use_pypi_server(*server_args, **server_kwargs)
def use_http_server(*server_args, **server_kwargs):
server_kwargs['serve_xmlrpc'] = False
return use_pypi_server(*server_args, **server_kwargs)
def use_pypi_server(*server_args, **server_kwargs):
"""Decorator to make use of the PyPIServer for test methods,
just when needed, and not for the entire duration of the testcase.
"""
def wrapper(func):
@wraps(func)
def wrapped(*args, **kwargs):
server = PyPIServer(*server_args, **server_kwargs)
server.start()
try:
func(server=server, *args, **kwargs)
finally:
server.stop()
return wrapped
return wrapper
class PyPIServerTestCase(unittest.TestCase):
def setUp(self):
super(PyPIServerTestCase, self).setUp()
self.pypi = PyPIServer()
self.pypi.start()
self.addCleanup(self.pypi.stop)
class PyPIServer(threading.Thread):
"""PyPI Mocked server.
Provides a mocked version of the PyPI API's, to ease tests.
Support serving static content and serving previously given text.
"""
def __init__(self, test_static_path=None,
static_filesystem_paths=["default"],
static_uri_paths=["simple", "packages"], serve_xmlrpc=False):
"""Initialize the server.
Default behavior is to start the HTTP server. You can either start the
xmlrpc server by setting xmlrpc to True. Caution: Only one server will
be started.
static_uri_paths and static_base_path are parameters used to provides
respectively the http_paths to serve statically, and where to find the
matching files on the filesystem.
"""
# we want to launch the server in a new dedicated thread, to not freeze
# tests.
threading.Thread.__init__(self)
self._run = True
self._serve_xmlrpc = serve_xmlrpc
#TODO allow to serve XMLRPC and HTTP static files at the same time.
if not self._serve_xmlrpc:
self.server = HTTPServer(('127.0.0.1', 0), PyPIRequestHandler)
self.server.RequestHandlerClass.pypi_server = self
self.request_queue = queue.Queue()
self._requests = []
self.default_response_status = 404
self.default_response_headers = [('Content-type', 'text/plain')]
self.default_response_data = "The page does not exists"
# initialize static paths / filesystems
self.static_uri_paths = static_uri_paths
# append the static paths defined locally
if test_static_path is not None:
static_filesystem_paths.append(test_static_path)
self.static_filesystem_paths = [
PYPI_DEFAULT_STATIC_PATH + "/" + path
for path in static_filesystem_paths]
else:
# XMLRPC server
self.server = PyPIXMLRPCServer(('127.0.0.1', 0))
self.xmlrpc = XMLRPCMockIndex()
# register the xmlrpc methods
self.server.register_introspection_functions()
self.server.register_instance(self.xmlrpc)
self.address = (self.server.server_name, self.server.server_port)
# to not have unwanted outputs.
self.server.RequestHandlerClass.log_request = lambda *_: None
def run(self):
# loop because we can't stop it otherwise, for python < 2.6
while self._run:
r, w, e = select.select([self.server], [], [], 0.5)
if r:
self.server.handle_request()
def stop(self):
"""self shutdown is not supported for python < 2.6"""
self._run = False
def get_next_response(self):
return (self.default_response_status,
self.default_response_headers,
self.default_response_data)
@property
def requests(self):
"""Use this property to get all requests that have been made
to the server
"""
while True:
try:
self._requests.append(self.request_queue.get_nowait())
except queue.Empty:
break
return self._requests
@property
def full_address(self):
return "http://%s:%s" % self.address
class PyPIRequestHandler(SimpleHTTPRequestHandler):
# we need to access the pypi server while serving the content
pypi_server = None
def serve_request(self):
"""Serve the content.
Also record the requests to be accessed later. If trying to access an
url matching a static uri, serve static content, otherwise serve
what is provided by the `get_next_response` method.
If nothing is defined there, return a 404 header.
"""
# record the request. Read the input only on PUT or POST requests
if self.command in ("PUT", "POST"):
if 'content-length' in self.headers:
request_data = self.rfile.read(
int(self.headers['content-length']))
else:
request_data = self.rfile.read()
elif self.command in ("GET", "DELETE"):
request_data = ''
self.pypi_server.request_queue.put((self, request_data))
# serve the content from local disc if we request an URL beginning
# by a pattern defined in `static_paths`
url_parts = self.path.split("/")
if (len(url_parts) > 1 and
url_parts[1] in self.pypi_server.static_uri_paths):
data = None
# always take the last first.
fs_paths = []
fs_paths.extend(self.pypi_server.static_filesystem_paths)
fs_paths.reverse()
relative_path = self.path
for fs_path in fs_paths:
try:
if self.path.endswith("/"):
relative_path += "index.html"
if relative_path.endswith('.tar.gz'):
with open(fs_path + relative_path, 'br') as file:
data = file.read()
headers = [('Content-type', 'application/x-gtar')]
else:
with open(fs_path + relative_path) as file:
data = file.read().encode()
headers = [('Content-type', 'text/html')]
self.make_response(data, headers=headers)
except IOError:
pass
if data is None:
self.make_response("Not found", 404)
# otherwise serve the content from get_next_response
else:
# send back a response
status, headers, data = self.pypi_server.get_next_response()
self.make_response(data, status, headers)
do_POST = do_GET = do_DELETE = do_PUT = serve_request
def make_response(self, data, status=200,
headers=[('Content-type', 'text/html')]):
"""Send the response to the HTTP client"""
if not isinstance(status, int):
try:
status = int(status)
except ValueError:
# we probably got something like YYY Codename.
# Just get the first 3 digits
status = int(status[:3])
self.send_response(status)
for header, value in headers:
self.send_header(header, value)
self.end_headers()
if type(data) is str:
data = data.encode()
self.wfile.write(data)
class PyPIXMLRPCServer(SimpleXMLRPCServer):
def server_bind(self):
"""Override server_bind to store the server name."""
socketserver.TCPServer.server_bind(self)
host, port = self.socket.getsockname()[:2]
self.server_name = socket.getfqdn(host)
self.server_port = port
class MockDist:
"""Fake distribution, used in the Mock PyPI Server"""
def __init__(self, name, version="1.0", hidden=False, url="http://url/",
type="sdist", filename="", size=10000,
digest="123456", downloads=7, has_sig=False,
python_version="source", comment="comment",
author="John Doe", author_email="john@doe.name",
maintainer="Main Tayner", maintainer_email="maintainer_mail",
project_url="http://project_url/", homepage="http://homepage/",
keywords="", platform="UNKNOWN", classifiers=[], licence="",
description="Description", summary="Summary", stable_version="",
ordering="", documentation_id="", code_kwalitee_id="",
installability_id="", obsoletes=[], obsoletes_dist=[],
provides=[], provides_dist=[], requires=[], requires_dist=[],
requires_external=[], requires_python=""):
# basic fields
self.name = name
self.version = version
self.hidden = hidden
# URL infos
self.url = url
self.digest = digest
self.downloads = downloads
self.has_sig = has_sig
self.python_version = python_version
self.comment = comment
self.type = type
# metadata
self.author = author
self.author_email = author_email
self.maintainer = maintainer
self.maintainer_email = maintainer_email
self.project_url = project_url
self.homepage = homepage
self.keywords = keywords
self.platform = platform
self.classifiers = classifiers
self.licence = licence
self.description = description
self.summary = summary
self.stable_version = stable_version
self.ordering = ordering
self.cheesecake_documentation_id = documentation_id
self.cheesecake_code_kwalitee_id = code_kwalitee_id
self.cheesecake_installability_id = installability_id
self.obsoletes = obsoletes
self.obsoletes_dist = obsoletes_dist
self.provides = provides
self.provides_dist = provides_dist
self.requires = requires
self.requires_dist = requires_dist
self.requires_external = requires_external
self.requires_python = requires_python
def url_infos(self):
return {
'url': self.url,
'packagetype': self.type,
'filename': 'filename.tar.gz',
'size': '6000',
'md5_digest': self.digest,
'downloads': self.downloads,
'has_sig': self.has_sig,
'python_version': self.python_version,
'comment_text': self.comment,
}
def metadata(self):
return {
'maintainer': self.maintainer,
'project_url': [self.project_url],
'maintainer_email': self.maintainer_email,
'cheesecake_code_kwalitee_id': self.cheesecake_code_kwalitee_id,
'keywords': self.keywords,
'obsoletes_dist': self.obsoletes_dist,
'requires_external': self.requires_external,
'author': self.author,
'author_email': self.author_email,
'download_url': self.url,
'platform': self.platform,
'version': self.version,
'obsoletes': self.obsoletes,
'provides': self.provides,
'cheesecake_documentation_id': self.cheesecake_documentation_id,
'_pypi_hidden': self.hidden,
'description': self.description,
'_pypi_ordering': 19,
'requires_dist': self.requires_dist,
'requires_python': self.requires_python,
'classifiers': [],
'name': self.name,
'licence': self.licence,
'summary': self.summary,
'home_page': self.homepage,
'stable_version': self.stable_version,
'provides_dist': self.provides_dist or "%s (%s)" % (self.name,
self.version),
'requires': self.requires,
'cheesecake_installability_id': self.cheesecake_installability_id,
}
def search_result(self):
return {
'_pypi_ordering': 0,
'version': self.version,
'name': self.name,
'summary': self.summary,
}
class XMLRPCMockIndex:
"""Mock XMLRPC server"""
def __init__(self, dists=[]):
self._dists = dists
self._search_result = []
def add_distributions(self, dists):
for dist in dists:
self._dists.append(MockDist(**dist))
def set_distributions(self, dists):
self._dists = []
self.add_distributions(dists)
def set_search_result(self, result):
"""set a predefined search result"""
self._search_result = result
def _get_search_results(self):
results = []
for name in self._search_result:
found_dist = [d for d in self._dists if d.name == name]
if found_dist:
results.append(found_dist[0])
else:
dist = MockDist(name)
results.append(dist)
self._dists.append(dist)
return [r.search_result() for r in results]
def list_packages(self):
return [d.name for d in self._dists]
def package_releases(self, package_name, show_hidden=False):
if show_hidden:
# return all
return [d.version for d in self._dists if d.name == package_name]
else:
# return only un-hidden
return [d.version for d in self._dists if d.name == package_name
and not d.hidden]
def release_urls(self, package_name, version):
return [d.url_infos() for d in self._dists
if d.name == package_name and d.version == version]
def release_data(self, package_name, version):
release = [d for d in self._dists
if d.name == package_name and d.version == version]
if release:
return release[0].metadata()
else:
return {}
def search(self, spec, operator="and"):
return self._get_search_results()
|