diff options
| author | Éric Araujo <merwok@netwok.org> | 2012-06-24 04:07:41 (GMT) |
|---|---|---|
| committer | Éric Araujo <merwok@netwok.org> | 2012-06-24 04:07:41 (GMT) |
| commit | 859aad6a36262383b98ddd45fe3253a882b87ce8 (patch) | |
| tree | 1cc50af4fc88c650fe997a2e72f5f26d92a1986c /Lib/packaging/tests/pypi_server.py | |
| parent | dc44f55cc9dc1d016799362c344958baab328ff4 (diff) | |
| download | cpython-859aad6a36262383b98ddd45fe3253a882b87ce8.zip cpython-859aad6a36262383b98ddd45fe3253a882b87ce8.tar.gz cpython-859aad6a36262383b98ddd45fe3253a882b87ce8.tar.bz2 | |
Remove packaging from the standard library.
Distutils2 will live on on PyPI and be included in the stdlib when it
is ready. See discussion starting at
http://mail.python.org/pipermail/python-dev/2012-June/120430.html
Diffstat (limited to 'Lib/packaging/tests/pypi_server.py')
| -rw-r--r-- | Lib/packaging/tests/pypi_server.py | 449 |
1 files changed, 0 insertions, 449 deletions
diff --git a/Lib/packaging/tests/pypi_server.py b/Lib/packaging/tests/pypi_server.py deleted file mode 100644 index 13c30cf..0000000 --- a/Lib/packaging/tests/pypi_server.py +++ /dev/null @@ -1,449 +0,0 @@ -"""Mock PyPI Server implementation, to use in tests. - -This module also provides a simple test case to extend if you need to use -the PyPIServer all along your test case. Be sure to read the documentation -before any use. - -XXX TODO: - -The mock server can handle simple HTTP request (to simulate a simple index) or -XMLRPC requests, over HTTP. Both does not have the same intergface to deal -with, and I think it's a pain. - -A good idea could be to re-think a bit the way dstributions are handled in the -mock server. As it should return malformed HTML pages, we need to keep the -static behavior. - -I think of something like that: - - >>> server = PyPIMockServer() - >>> server.startHTTP() - >>> server.startXMLRPC() - -Then, the server must have only one port to rely on, eg. - - >>> server.fulladdress() - "http://ip:port/" - -It could be simple to have one HTTP server, relaying the requests to the two -implementations (static HTTP and XMLRPC over HTTP). -""" - -import os -import queue -import select -import threading -from functools import wraps -from http.server import HTTPServer, SimpleHTTPRequestHandler -from xmlrpc.server import SimpleXMLRPCServer - -from packaging.tests import unittest - - -PYPI_DEFAULT_STATIC_PATH = os.path.join( - os.path.dirname(os.path.abspath(__file__)), 'pypiserver') - - -def use_xmlrpc_server(*server_args, **server_kwargs): - server_kwargs['serve_xmlrpc'] = True - return use_pypi_server(*server_args, **server_kwargs) - - -def use_http_server(*server_args, **server_kwargs): - server_kwargs['serve_xmlrpc'] = False - return use_pypi_server(*server_args, **server_kwargs) - - -def use_pypi_server(*server_args, **server_kwargs): - """Decorator to make use of the PyPIServer for test methods, - just when needed, and not for the entire duration of the testcase. - """ - def wrapper(func): - @wraps(func) - def wrapped(*args, **kwargs): - server = PyPIServer(*server_args, **server_kwargs) - server.start() - try: - func(server=server, *args, **kwargs) - finally: - server.stop() - return wrapped - return wrapper - - -class PyPIServerTestCase(unittest.TestCase): - - def setUp(self): - super(PyPIServerTestCase, self).setUp() - self.pypi = PyPIServer() - self.pypi.start() - self.addCleanup(self.pypi.stop) - - -class PyPIServer(threading.Thread): - """PyPI Mocked server. - Provides a mocked version of the PyPI API's, to ease tests. - - Support serving static content and serving previously given text. - """ - - def __init__(self, test_static_path=None, - static_filesystem_paths=None, - static_uri_paths=["simple", "packages"], serve_xmlrpc=False): - """Initialize the server. - - Default behavior is to start the HTTP server. You can either start the - xmlrpc server by setting xmlrpc to True. Caution: Only one server will - be started. - - static_uri_paths and static_base_path are parameters used to provides - respectively the http_paths to serve statically, and where to find the - matching files on the filesystem. - """ - # we want to launch the server in a new dedicated thread, to not freeze - # tests. - super(PyPIServer, self).__init__() - self._run = True - self._serve_xmlrpc = serve_xmlrpc - if static_filesystem_paths is None: - static_filesystem_paths = ["default"] - - #TODO allow to serve XMLRPC and HTTP static files at the same time. - if not self._serve_xmlrpc: - self.server = HTTPServer(('127.0.0.1', 0), PyPIRequestHandler) - self.server.RequestHandlerClass.pypi_server = self - - self.request_queue = queue.Queue() - self._requests = [] - self.default_response_status = 404 - self.default_response_headers = [('Content-type', 'text/plain')] - self.default_response_data = "The page does not exists" - - # initialize static paths / filesystems - self.static_uri_paths = static_uri_paths - - # append the static paths defined locally - if test_static_path is not None: - static_filesystem_paths.append(test_static_path) - self.static_filesystem_paths = [ - PYPI_DEFAULT_STATIC_PATH + "/" + path - for path in static_filesystem_paths] - else: - # XMLRPC server - self.server = PyPIXMLRPCServer(('127.0.0.1', 0)) - self.xmlrpc = XMLRPCMockIndex() - # register the xmlrpc methods - self.server.register_introspection_functions() - self.server.register_instance(self.xmlrpc) - - self.address = ('127.0.0.1', self.server.server_port) - # to not have unwanted outputs. - self.server.RequestHandlerClass.log_request = lambda *_: None - - def run(self): - # loop because we can't stop it otherwise, for python < 2.6 - while self._run: - r, w, e = select.select([self.server], [], [], 0.5) - if r: - self.server.handle_request() - - def stop(self): - """self shutdown is not supported for python < 2.6""" - self._run = False - if self.is_alive(): - self.join() - self.server.server_close() - - def get_next_response(self): - return (self.default_response_status, - self.default_response_headers, - self.default_response_data) - - @property - def requests(self): - """Use this property to get all requests that have been made - to the server - """ - while True: - try: - self._requests.append(self.request_queue.get_nowait()) - except queue.Empty: - break - return self._requests - - @property - def full_address(self): - return "http://%s:%s" % self.address - - -class PyPIRequestHandler(SimpleHTTPRequestHandler): - # we need to access the pypi server while serving the content - pypi_server = None - - def serve_request(self): - """Serve the content. - - Also record the requests to be accessed later. If trying to access an - url matching a static uri, serve static content, otherwise serve - what is provided by the `get_next_response` method. - - If nothing is defined there, return a 404 header. - """ - # record the request. Read the input only on PUT or POST requests - if self.command in ("PUT", "POST"): - if 'content-length' in self.headers: - request_data = self.rfile.read( - int(self.headers['content-length'])) - else: - request_data = self.rfile.read() - - elif self.command in ("GET", "DELETE"): - request_data = '' - - self.pypi_server.request_queue.put((self, request_data)) - - # serve the content from local disc if we request an URL beginning - # by a pattern defined in `static_paths` - url_parts = self.path.split("/") - if (len(url_parts) > 1 and - url_parts[1] in self.pypi_server.static_uri_paths): - data = None - # always take the last first. - fs_paths = [] - fs_paths.extend(self.pypi_server.static_filesystem_paths) - fs_paths.reverse() - relative_path = self.path - for fs_path in fs_paths: - try: - if self.path.endswith("/"): - relative_path += "index.html" - - if relative_path.endswith('.tar.gz'): - with open(fs_path + relative_path, 'rb') as file: - data = file.read() - headers = [('Content-type', 'application/x-gtar')] - else: - with open(fs_path + relative_path) as file: - data = file.read().encode() - headers = [('Content-type', 'text/html')] - - headers.append(('Content-Length', len(data))) - self.make_response(data, headers=headers) - - except IOError: - pass - - if data is None: - self.make_response("Not found", 404) - - # otherwise serve the content from get_next_response - else: - # send back a response - status, headers, data = self.pypi_server.get_next_response() - self.make_response(data, status, headers) - - do_POST = do_GET = do_DELETE = do_PUT = serve_request - - def make_response(self, data, status=200, - headers=[('Content-type', 'text/html')]): - """Send the response to the HTTP client""" - if not isinstance(status, int): - try: - status = int(status) - except ValueError: - # we probably got something like YYY Codename. - # Just get the first 3 digits - status = int(status[:3]) - - self.send_response(status) - for header, value in headers: - self.send_header(header, value) - self.end_headers() - - if isinstance(data, str): - data = data.encode('utf-8') - - self.wfile.write(data) - - -class PyPIXMLRPCServer(SimpleXMLRPCServer): - def server_bind(self): - """Override server_bind to store the server name.""" - super(PyPIXMLRPCServer, self).server_bind() - host, port = self.socket.getsockname()[:2] - self.server_port = port - - -class MockDist: - """Fake distribution, used in the Mock PyPI Server""" - - def __init__(self, name, version="1.0", hidden=False, url="http://url/", - type="sdist", filename="", size=10000, - digest="123456", downloads=7, has_sig=False, - python_version="source", comment="comment", - author="John Doe", author_email="john@doe.name", - maintainer="Main Tayner", maintainer_email="maintainer_mail", - project_url="http://project_url/", homepage="http://homepage/", - keywords="", platform="UNKNOWN", classifiers=[], licence="", - description="Description", summary="Summary", stable_version="", - ordering="", documentation_id="", code_kwalitee_id="", - installability_id="", obsoletes=[], obsoletes_dist=[], - provides=[], provides_dist=[], requires=[], requires_dist=[], - requires_external=[], requires_python=""): - - # basic fields - self.name = name - self.version = version - self.hidden = hidden - - # URL infos - self.url = url - self.digest = digest - self.downloads = downloads - self.has_sig = has_sig - self.python_version = python_version - self.comment = comment - self.type = type - - # metadata - self.author = author - self.author_email = author_email - self.maintainer = maintainer - self.maintainer_email = maintainer_email - self.project_url = project_url - self.homepage = homepage - self.keywords = keywords - self.platform = platform - self.classifiers = classifiers - self.licence = licence - self.description = description - self.summary = summary - self.stable_version = stable_version - self.ordering = ordering - self.cheesecake_documentation_id = documentation_id - self.cheesecake_code_kwalitee_id = code_kwalitee_id - self.cheesecake_installability_id = installability_id - - self.obsoletes = obsoletes - self.obsoletes_dist = obsoletes_dist - self.provides = provides - self.provides_dist = provides_dist - self.requires = requires - self.requires_dist = requires_dist - self.requires_external = requires_external - self.requires_python = requires_python - - def url_infos(self): - return { - 'url': self.url, - 'packagetype': self.type, - 'filename': 'filename.tar.gz', - 'size': '6000', - 'md5_digest': self.digest, - 'downloads': self.downloads, - 'has_sig': self.has_sig, - 'python_version': self.python_version, - 'comment_text': self.comment, - } - - def metadata(self): - return { - 'maintainer': self.maintainer, - 'project_url': [self.project_url], - 'maintainer_email': self.maintainer_email, - 'cheesecake_code_kwalitee_id': self.cheesecake_code_kwalitee_id, - 'keywords': self.keywords, - 'obsoletes_dist': self.obsoletes_dist, - 'requires_external': self.requires_external, - 'author': self.author, - 'author_email': self.author_email, - 'download_url': self.url, - 'platform': self.platform, - 'version': self.version, - 'obsoletes': self.obsoletes, - 'provides': self.provides, - 'cheesecake_documentation_id': self.cheesecake_documentation_id, - '_pypi_hidden': self.hidden, - 'description': self.description, - '_pypi_ordering': 19, - 'requires_dist': self.requires_dist, - 'requires_python': self.requires_python, - 'classifiers': [], - 'name': self.name, - 'licence': self.licence, # XXX licence or license? - 'summary': self.summary, - 'home_page': self.homepage, - 'stable_version': self.stable_version, - # FIXME doesn't that reproduce the bug from 6527d3106e9f? - 'provides_dist': (self.provides_dist or - "%s (%s)" % (self.name, self.version)), - 'requires': self.requires, - 'cheesecake_installability_id': self.cheesecake_installability_id, - } - - def search_result(self): - return { - '_pypi_ordering': 0, - 'version': self.version, - 'name': self.name, - 'summary': self.summary, - } - - -class XMLRPCMockIndex: - """Mock XMLRPC server""" - - def __init__(self, dists=[]): - self._dists = dists - self._search_result = [] - - def add_distributions(self, dists): - for dist in dists: - self._dists.append(MockDist(**dist)) - - def set_distributions(self, dists): - self._dists = [] - self.add_distributions(dists) - - def set_search_result(self, result): - """set a predefined search result""" - self._search_result = result - - def _get_search_results(self): - results = [] - for name in self._search_result: - found_dist = [d for d in self._dists if d.name == name] - if found_dist: - results.append(found_dist[0]) - else: - dist = MockDist(name) - results.append(dist) - self._dists.append(dist) - return [r.search_result() for r in results] - - def list_packages(self): - return [d.name for d in self._dists] - - def package_releases(self, package_name, show_hidden=False): - if show_hidden: - # return all - return [d.version for d in self._dists if d.name == package_name] - else: - # return only un-hidden - return [d.version for d in self._dists if d.name == package_name - and not d.hidden] - - def release_urls(self, package_name, version): - return [d.url_infos() for d in self._dists - if d.name == package_name and d.version == version] - - def release_data(self, package_name, version): - release = [d for d in self._dists - if d.name == package_name and d.version == version] - if release: - return release[0].metadata() - else: - return {} - - def search(self, spec, operator="and"): - return self._get_search_results() |
