summaryrefslogtreecommitdiffstats
path: root/Lib/packaging/tests/pypi_server.py
diff options
context:
space:
mode:
authorÉric Araujo <merwok@netwok.org>2012-06-24 04:07:41 (GMT)
committerÉric Araujo <merwok@netwok.org>2012-06-24 04:07:41 (GMT)
commit859aad6a36262383b98ddd45fe3253a882b87ce8 (patch)
tree1cc50af4fc88c650fe997a2e72f5f26d92a1986c /Lib/packaging/tests/pypi_server.py
parentdc44f55cc9dc1d016799362c344958baab328ff4 (diff)
downloadcpython-859aad6a36262383b98ddd45fe3253a882b87ce8.zip
cpython-859aad6a36262383b98ddd45fe3253a882b87ce8.tar.gz
cpython-859aad6a36262383b98ddd45fe3253a882b87ce8.tar.bz2
Remove packaging from the standard library.
Distutils2 will live on on PyPI and be included in the stdlib when it is ready. See discussion starting at http://mail.python.org/pipermail/python-dev/2012-June/120430.html
Diffstat (limited to 'Lib/packaging/tests/pypi_server.py')
-rw-r--r--Lib/packaging/tests/pypi_server.py449
1 files changed, 0 insertions, 449 deletions
diff --git a/Lib/packaging/tests/pypi_server.py b/Lib/packaging/tests/pypi_server.py
deleted file mode 100644
index 13c30cf..0000000
--- a/Lib/packaging/tests/pypi_server.py
+++ /dev/null
@@ -1,449 +0,0 @@
-"""Mock PyPI Server implementation, to use in tests.
-
-This module also provides a simple test case to extend if you need to use
-the PyPIServer all along your test case. Be sure to read the documentation
-before any use.
-
-XXX TODO:
-
-The mock server can handle simple HTTP request (to simulate a simple index) or
-XMLRPC requests, over HTTP. Both does not have the same intergface to deal
-with, and I think it's a pain.
-
-A good idea could be to re-think a bit the way dstributions are handled in the
-mock server. As it should return malformed HTML pages, we need to keep the
-static behavior.
-
-I think of something like that:
-
- >>> server = PyPIMockServer()
- >>> server.startHTTP()
- >>> server.startXMLRPC()
-
-Then, the server must have only one port to rely on, eg.
-
- >>> server.fulladdress()
- "http://ip:port/"
-
-It could be simple to have one HTTP server, relaying the requests to the two
-implementations (static HTTP and XMLRPC over HTTP).
-"""
-
-import os
-import queue
-import select
-import threading
-from functools import wraps
-from http.server import HTTPServer, SimpleHTTPRequestHandler
-from xmlrpc.server import SimpleXMLRPCServer
-
-from packaging.tests import unittest
-
-
-PYPI_DEFAULT_STATIC_PATH = os.path.join(
- os.path.dirname(os.path.abspath(__file__)), 'pypiserver')
-
-
-def use_xmlrpc_server(*server_args, **server_kwargs):
- server_kwargs['serve_xmlrpc'] = True
- return use_pypi_server(*server_args, **server_kwargs)
-
-
-def use_http_server(*server_args, **server_kwargs):
- server_kwargs['serve_xmlrpc'] = False
- return use_pypi_server(*server_args, **server_kwargs)
-
-
-def use_pypi_server(*server_args, **server_kwargs):
- """Decorator to make use of the PyPIServer for test methods,
- just when needed, and not for the entire duration of the testcase.
- """
- def wrapper(func):
- @wraps(func)
- def wrapped(*args, **kwargs):
- server = PyPIServer(*server_args, **server_kwargs)
- server.start()
- try:
- func(server=server, *args, **kwargs)
- finally:
- server.stop()
- return wrapped
- return wrapper
-
-
-class PyPIServerTestCase(unittest.TestCase):
-
- def setUp(self):
- super(PyPIServerTestCase, self).setUp()
- self.pypi = PyPIServer()
- self.pypi.start()
- self.addCleanup(self.pypi.stop)
-
-
-class PyPIServer(threading.Thread):
- """PyPI Mocked server.
- Provides a mocked version of the PyPI API's, to ease tests.
-
- Support serving static content and serving previously given text.
- """
-
- def __init__(self, test_static_path=None,
- static_filesystem_paths=None,
- static_uri_paths=["simple", "packages"], serve_xmlrpc=False):
- """Initialize the server.
-
- Default behavior is to start the HTTP server. You can either start the
- xmlrpc server by setting xmlrpc to True. Caution: Only one server will
- be started.
-
- static_uri_paths and static_base_path are parameters used to provides
- respectively the http_paths to serve statically, and where to find the
- matching files on the filesystem.
- """
- # we want to launch the server in a new dedicated thread, to not freeze
- # tests.
- super(PyPIServer, self).__init__()
- self._run = True
- self._serve_xmlrpc = serve_xmlrpc
- if static_filesystem_paths is None:
- static_filesystem_paths = ["default"]
-
- #TODO allow to serve XMLRPC and HTTP static files at the same time.
- if not self._serve_xmlrpc:
- self.server = HTTPServer(('127.0.0.1', 0), PyPIRequestHandler)
- self.server.RequestHandlerClass.pypi_server = self
-
- self.request_queue = queue.Queue()
- self._requests = []
- self.default_response_status = 404
- self.default_response_headers = [('Content-type', 'text/plain')]
- self.default_response_data = "The page does not exists"
-
- # initialize static paths / filesystems
- self.static_uri_paths = static_uri_paths
-
- # append the static paths defined locally
- if test_static_path is not None:
- static_filesystem_paths.append(test_static_path)
- self.static_filesystem_paths = [
- PYPI_DEFAULT_STATIC_PATH + "/" + path
- for path in static_filesystem_paths]
- else:
- # XMLRPC server
- self.server = PyPIXMLRPCServer(('127.0.0.1', 0))
- self.xmlrpc = XMLRPCMockIndex()
- # register the xmlrpc methods
- self.server.register_introspection_functions()
- self.server.register_instance(self.xmlrpc)
-
- self.address = ('127.0.0.1', self.server.server_port)
- # to not have unwanted outputs.
- self.server.RequestHandlerClass.log_request = lambda *_: None
-
- def run(self):
- # loop because we can't stop it otherwise, for python < 2.6
- while self._run:
- r, w, e = select.select([self.server], [], [], 0.5)
- if r:
- self.server.handle_request()
-
- def stop(self):
- """self shutdown is not supported for python < 2.6"""
- self._run = False
- if self.is_alive():
- self.join()
- self.server.server_close()
-
- def get_next_response(self):
- return (self.default_response_status,
- self.default_response_headers,
- self.default_response_data)
-
- @property
- def requests(self):
- """Use this property to get all requests that have been made
- to the server
- """
- while True:
- try:
- self._requests.append(self.request_queue.get_nowait())
- except queue.Empty:
- break
- return self._requests
-
- @property
- def full_address(self):
- return "http://%s:%s" % self.address
-
-
-class PyPIRequestHandler(SimpleHTTPRequestHandler):
- # we need to access the pypi server while serving the content
- pypi_server = None
-
- def serve_request(self):
- """Serve the content.
-
- Also record the requests to be accessed later. If trying to access an
- url matching a static uri, serve static content, otherwise serve
- what is provided by the `get_next_response` method.
-
- If nothing is defined there, return a 404 header.
- """
- # record the request. Read the input only on PUT or POST requests
- if self.command in ("PUT", "POST"):
- if 'content-length' in self.headers:
- request_data = self.rfile.read(
- int(self.headers['content-length']))
- else:
- request_data = self.rfile.read()
-
- elif self.command in ("GET", "DELETE"):
- request_data = ''
-
- self.pypi_server.request_queue.put((self, request_data))
-
- # serve the content from local disc if we request an URL beginning
- # by a pattern defined in `static_paths`
- url_parts = self.path.split("/")
- if (len(url_parts) > 1 and
- url_parts[1] in self.pypi_server.static_uri_paths):
- data = None
- # always take the last first.
- fs_paths = []
- fs_paths.extend(self.pypi_server.static_filesystem_paths)
- fs_paths.reverse()
- relative_path = self.path
- for fs_path in fs_paths:
- try:
- if self.path.endswith("/"):
- relative_path += "index.html"
-
- if relative_path.endswith('.tar.gz'):
- with open(fs_path + relative_path, 'rb') as file:
- data = file.read()
- headers = [('Content-type', 'application/x-gtar')]
- else:
- with open(fs_path + relative_path) as file:
- data = file.read().encode()
- headers = [('Content-type', 'text/html')]
-
- headers.append(('Content-Length', len(data)))
- self.make_response(data, headers=headers)
-
- except IOError:
- pass
-
- if data is None:
- self.make_response("Not found", 404)
-
- # otherwise serve the content from get_next_response
- else:
- # send back a response
- status, headers, data = self.pypi_server.get_next_response()
- self.make_response(data, status, headers)
-
- do_POST = do_GET = do_DELETE = do_PUT = serve_request
-
- def make_response(self, data, status=200,
- headers=[('Content-type', 'text/html')]):
- """Send the response to the HTTP client"""
- if not isinstance(status, int):
- try:
- status = int(status)
- except ValueError:
- # we probably got something like YYY Codename.
- # Just get the first 3 digits
- status = int(status[:3])
-
- self.send_response(status)
- for header, value in headers:
- self.send_header(header, value)
- self.end_headers()
-
- if isinstance(data, str):
- data = data.encode('utf-8')
-
- self.wfile.write(data)
-
-
-class PyPIXMLRPCServer(SimpleXMLRPCServer):
- def server_bind(self):
- """Override server_bind to store the server name."""
- super(PyPIXMLRPCServer, self).server_bind()
- host, port = self.socket.getsockname()[:2]
- self.server_port = port
-
-
-class MockDist:
- """Fake distribution, used in the Mock PyPI Server"""
-
- def __init__(self, name, version="1.0", hidden=False, url="http://url/",
- type="sdist", filename="", size=10000,
- digest="123456", downloads=7, has_sig=False,
- python_version="source", comment="comment",
- author="John Doe", author_email="john@doe.name",
- maintainer="Main Tayner", maintainer_email="maintainer_mail",
- project_url="http://project_url/", homepage="http://homepage/",
- keywords="", platform="UNKNOWN", classifiers=[], licence="",
- description="Description", summary="Summary", stable_version="",
- ordering="", documentation_id="", code_kwalitee_id="",
- installability_id="", obsoletes=[], obsoletes_dist=[],
- provides=[], provides_dist=[], requires=[], requires_dist=[],
- requires_external=[], requires_python=""):
-
- # basic fields
- self.name = name
- self.version = version
- self.hidden = hidden
-
- # URL infos
- self.url = url
- self.digest = digest
- self.downloads = downloads
- self.has_sig = has_sig
- self.python_version = python_version
- self.comment = comment
- self.type = type
-
- # metadata
- self.author = author
- self.author_email = author_email
- self.maintainer = maintainer
- self.maintainer_email = maintainer_email
- self.project_url = project_url
- self.homepage = homepage
- self.keywords = keywords
- self.platform = platform
- self.classifiers = classifiers
- self.licence = licence
- self.description = description
- self.summary = summary
- self.stable_version = stable_version
- self.ordering = ordering
- self.cheesecake_documentation_id = documentation_id
- self.cheesecake_code_kwalitee_id = code_kwalitee_id
- self.cheesecake_installability_id = installability_id
-
- self.obsoletes = obsoletes
- self.obsoletes_dist = obsoletes_dist
- self.provides = provides
- self.provides_dist = provides_dist
- self.requires = requires
- self.requires_dist = requires_dist
- self.requires_external = requires_external
- self.requires_python = requires_python
-
- def url_infos(self):
- return {
- 'url': self.url,
- 'packagetype': self.type,
- 'filename': 'filename.tar.gz',
- 'size': '6000',
- 'md5_digest': self.digest,
- 'downloads': self.downloads,
- 'has_sig': self.has_sig,
- 'python_version': self.python_version,
- 'comment_text': self.comment,
- }
-
- def metadata(self):
- return {
- 'maintainer': self.maintainer,
- 'project_url': [self.project_url],
- 'maintainer_email': self.maintainer_email,
- 'cheesecake_code_kwalitee_id': self.cheesecake_code_kwalitee_id,
- 'keywords': self.keywords,
- 'obsoletes_dist': self.obsoletes_dist,
- 'requires_external': self.requires_external,
- 'author': self.author,
- 'author_email': self.author_email,
- 'download_url': self.url,
- 'platform': self.platform,
- 'version': self.version,
- 'obsoletes': self.obsoletes,
- 'provides': self.provides,
- 'cheesecake_documentation_id': self.cheesecake_documentation_id,
- '_pypi_hidden': self.hidden,
- 'description': self.description,
- '_pypi_ordering': 19,
- 'requires_dist': self.requires_dist,
- 'requires_python': self.requires_python,
- 'classifiers': [],
- 'name': self.name,
- 'licence': self.licence, # XXX licence or license?
- 'summary': self.summary,
- 'home_page': self.homepage,
- 'stable_version': self.stable_version,
- # FIXME doesn't that reproduce the bug from 6527d3106e9f?
- 'provides_dist': (self.provides_dist or
- "%s (%s)" % (self.name, self.version)),
- 'requires': self.requires,
- 'cheesecake_installability_id': self.cheesecake_installability_id,
- }
-
- def search_result(self):
- return {
- '_pypi_ordering': 0,
- 'version': self.version,
- 'name': self.name,
- 'summary': self.summary,
- }
-
-
-class XMLRPCMockIndex:
- """Mock XMLRPC server"""
-
- def __init__(self, dists=[]):
- self._dists = dists
- self._search_result = []
-
- def add_distributions(self, dists):
- for dist in dists:
- self._dists.append(MockDist(**dist))
-
- def set_distributions(self, dists):
- self._dists = []
- self.add_distributions(dists)
-
- def set_search_result(self, result):
- """set a predefined search result"""
- self._search_result = result
-
- def _get_search_results(self):
- results = []
- for name in self._search_result:
- found_dist = [d for d in self._dists if d.name == name]
- if found_dist:
- results.append(found_dist[0])
- else:
- dist = MockDist(name)
- results.append(dist)
- self._dists.append(dist)
- return [r.search_result() for r in results]
-
- def list_packages(self):
- return [d.name for d in self._dists]
-
- def package_releases(self, package_name, show_hidden=False):
- if show_hidden:
- # return all
- return [d.version for d in self._dists if d.name == package_name]
- else:
- # return only un-hidden
- return [d.version for d in self._dists if d.name == package_name
- and not d.hidden]
-
- def release_urls(self, package_name, version):
- return [d.url_infos() for d in self._dists
- if d.name == package_name and d.version == version]
-
- def release_data(self, package_name, version):
- release = [d for d in self._dists
- if d.name == package_name and d.version == version]
- if release:
- return release[0].metadata()
- else:
- return {}
-
- def search(self, spec, operator="and"):
- return self._get_search_results()