summaryrefslogtreecommitdiffstats
path: root/Lib/packaging/pypi/dist.py
blob: dbf64592730eb55a31e4f603fa8d6ea3507c586b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
"""Classes representing releases and distributions retrieved from indexes.

A project (= unique name) can have several releases (= versions) and
each release can have several distributions (= sdist and bdists).

Release objects contain metadata-related information (see PEP 376);
distribution objects contain download-related information.
"""

import re
import hashlib
import tempfile
import urllib.request
import urllib.parse
import urllib.error
import urllib.parse
from shutil import unpack_archive

from packaging.errors import IrrationalVersionError
from packaging.version import (suggest_normalized_version, NormalizedVersion,
                               get_version_predicate)
from packaging.metadata import Metadata
from packaging.pypi.errors import (HashDoesNotMatch, UnsupportedHashName,
                                   CantParseArchiveName)


__all__ = ['ReleaseInfo', 'DistInfo', 'ReleasesList', 'get_infos_from_url']

EXTENSIONS = ".tar.gz .tar.bz2 .tar .zip .tgz .egg".split()
MD5_HASH = re.compile(r'^.*#md5=([a-f0-9]+)$')
DIST_TYPES = ['bdist', 'sdist']


class IndexReference:
    """Mixin used to store the index reference"""
    def set_index(self, index=None):
        self._index = index


class ReleaseInfo(IndexReference):
    """Represent a release of a project (a project with a specific version).
    The release contain the _metadata informations related to this specific
    version, and is also a container for distribution related informations.

    See the DistInfo class for more information about distributions.
    """

    def __init__(self, name, version, metadata=None, hidden=False,
                 index=None, **kwargs):
        """
        :param name: the name of the distribution
        :param version: the version of the distribution
        :param metadata: the metadata fields of the release.
        :type metadata: dict
        :param kwargs: optional arguments for a new distribution.
        """
        self.set_index(index)
        self.name = name
        self._version = None
        self.version = version
        if metadata:
            self.metadata = Metadata(mapping=metadata)
        else:
            self.metadata = None
        self.dists = {}
        self.hidden = hidden

        if 'dist_type' in kwargs:
            dist_type = kwargs.pop('dist_type')
            self.add_distribution(dist_type, **kwargs)

    def set_version(self, version):
        try:
            self._version = NormalizedVersion(version)
        except IrrationalVersionError:
            suggestion = suggest_normalized_version(version)
            if suggestion:
                self.version = suggestion
            else:
                raise IrrationalVersionError(version)

    def get_version(self):
        return self._version

    version = property(get_version, set_version)

    def fetch_metadata(self):
        """If the metadata is not set, use the indexes to get it"""
        if not self.metadata:
            self._index.get_metadata(self.name, str(self.version))
        return self.metadata

    @property
    def is_final(self):
        """proxy to version.is_final"""
        return self.version.is_final

    def fetch_distributions(self):
        if self.dists is None:
            self._index.get_distributions(self.name, str(self.version))
            if self.dists is None:
                self.dists = {}
        return self.dists

    def add_distribution(self, dist_type='sdist', python_version=None,
                         **params):
        """Add distribution informations to this release.
        If distribution information is already set for this distribution type,
        add the given url paths to the distribution. This can be useful while
        some of them fails to download.

        :param dist_type: the distribution type (eg. "sdist", "bdist", etc.)
        :param params: the fields to be passed to the distribution object
                       (see the :class:DistInfo constructor).
        """
        if dist_type not in DIST_TYPES:
            raise ValueError(dist_type)
        if dist_type in self.dists:
            self.dists[dist_type].add_url(**params)
        else:
            self.dists[dist_type] = DistInfo(self, dist_type,
                                             index=self._index, **params)
        if python_version:
            self.dists[dist_type].python_version = python_version

    def get_distribution(self, dist_type=None, prefer_source=True):
        """Return a distribution.

        If dist_type is set, find first for this distribution type, and just
        act as an alias of __get_item__.

        If prefer_source is True, search first for source distribution, and if
        not return one existing distribution.
        """
        if len(self.dists) == 0:
            raise LookupError
        if dist_type:
            return self[dist_type]
        if prefer_source:
            if "sdist" in self.dists:
                dist = self["sdist"]
            else:
                dist = next(self.dists.values())
            return dist

    def unpack(self, path=None, prefer_source=True):
        """Unpack the distribution to the given path.

        If not destination is given, creates a temporary location.

        Returns the location of the extracted files (root).
        """
        return self.get_distribution(prefer_source=prefer_source)\
                   .unpack(path=path)

    def download(self, temp_path=None, prefer_source=True):
        """Download the distribution, using the requirements.

        If more than one distribution match the requirements, use the last
        version.
        Download the distribution, and put it in the temp_path. If no temp_path
        is given, creates and return one.

        Returns the complete absolute path to the downloaded archive.
        """
        return self.get_distribution(prefer_source=prefer_source)\
                   .download(path=temp_path)

    def set_metadata(self, metadata):
        if not self.metadata:
            self.metadata = Metadata()
        self.metadata.update(metadata)

    def __getitem__(self, item):
        """distributions are available using release["sdist"]"""
        return self.dists[item]

    def _check_is_comparable(self, other):
        if not isinstance(other, ReleaseInfo):
            raise TypeError("cannot compare %s and %s"
                % (type(self).__name__, type(other).__name__))
        elif self.name != other.name:
            raise TypeError("cannot compare %s and %s"
                % (self.name, other.name))

    def __repr__(self):
        return "<%s %s>" % (self.name, self.version)

    def __eq__(self, other):
        self._check_is_comparable(other)
        return self.version == other.version

    def __lt__(self, other):
        self._check_is_comparable(other)
        return self.version < other.version

    def __ne__(self, other):
        return not self.__eq__(other)

    def __gt__(self, other):
        return not (self.__lt__(other) or self.__eq__(other))

    def __le__(self, other):
        return self.__eq__(other) or self.__lt__(other)

    def __ge__(self, other):
        return self.__eq__(other) or self.__gt__(other)

    # See http://docs.python.org/reference/datamodel#object.__hash__
    __hash__ = object.__hash__


class DistInfo(IndexReference):
    """Represents a distribution retrieved from an index (sdist, bdist, ...)
    """

    def __init__(self, release, dist_type=None, url=None, hashname=None,
                 hashval=None, is_external=True, python_version=None,
                 index=None):
        """Create a new instance of DistInfo.

        :param release: a DistInfo class is relative to a release.
        :param dist_type: the type of the dist (eg. source, bin-*, etc.)
        :param url: URL where we found this distribution
        :param hashname: the name of the hash we want to use. Refer to the
                         hashlib.new documentation for more information.
        :param hashval: the hash value.
        :param is_external: we need to know if the provided url comes from
                            an index browsing, or from an external resource.

        """
        self.set_index(index)
        self.release = release
        self.dist_type = dist_type
        self.python_version = python_version
        self._unpacked_dir = None
        # set the downloaded path to None by default. The goal here
        # is to not download distributions multiple times
        self.downloaded_location = None
        # We store urls in dict, because we need to have a bit more infos
        # than the simple URL. It will be used later to find the good url to
        # use.
        # We have two _url* attributes: _url and urls. urls contains a list
        # of dict for the different urls, and _url contains the choosen url, in
        # order to dont make the selection process multiple times.
        self.urls = []
        self._url = None
        self.add_url(url, hashname, hashval, is_external)

    def add_url(self, url=None, hashname=None, hashval=None, is_external=True):
        """Add a new url to the list of urls"""
        if hashname is not None:
            try:
                hashlib.new(hashname)
            except ValueError:
                raise UnsupportedHashName(hashname)
        if url not in [u['url'] for u in self.urls]:
            self.urls.append({
                'url': url,
                'hashname': hashname,
                'hashval': hashval,
                'is_external': is_external,
            })
            # reset the url selection process
            self._url = None

    @property
    def url(self):
        """Pick up the right url for the list of urls in self.urls"""
        # We return internal urls over externals.
        # If there is more than one internal or external, return the first
        # one.
        if self._url is None:
            if len(self.urls) > 1:
                internals_urls = [u for u in self.urls \
                                  if u['is_external'] == False]
                if len(internals_urls) >= 1:
                    self._url = internals_urls[0]
            if self._url is None:
                self._url = self.urls[0]
        return self._url

    @property
    def is_source(self):
        """return if the distribution is a source one or not"""
        return self.dist_type == 'sdist'

    def download(self, path=None):
        """Download the distribution to a path, and return it.

        If the path is given in path, use this, otherwise, generates a new one
        Return the download location.
        """
        if path is None:
            path = tempfile.mkdtemp()

        # if we do not have downloaded it yet, do it.
        if self.downloaded_location is None:
            url = self.url['url']
            archive_name = urllib.parse.urlparse(url)[2].split('/')[-1]
            filename, headers = urllib.request.urlretrieve(url,
                                                   path + "/" + archive_name)
            self.downloaded_location = filename
            self._check_md5(filename)
        return self.downloaded_location

    def unpack(self, path=None):
        """Unpack the distribution to the given path.

        If not destination is given, creates a temporary location.

        Returns the location of the extracted files (root).
        """
        if not self._unpacked_dir:
            if path is None:
                path = tempfile.mkdtemp()

            filename = self.download(path)
            unpack_archive(filename, path)
            self._unpacked_dir = path

        return path

    def _check_md5(self, filename):
        """Check that the md5 checksum of the given file matches the one in
        url param"""
        hashname = self.url['hashname']
        expected_hashval = self.url['hashval']
        if None not in (expected_hashval, hashname):
            with open(filename, 'rb') as f:
                hashval = hashlib.new(hashname)
                hashval.update(f.read())

            if hashval.hexdigest() != expected_hashval:
                raise HashDoesNotMatch("got %s instead of %s"
                    % (hashval.hexdigest(), expected_hashval))

    def __repr__(self):
        if self.release is None:
            return "<? ? %s>" % self.dist_type

        return "<%s %s %s>" % (
            self.release.name, self.release.version, self.dist_type or "")


class ReleasesList(IndexReference):
    """A container of Release.

    Provides useful methods and facilities to sort and filter releases.
    """
    def __init__(self, name, releases=None, contains_hidden=False, index=None):
        self.set_index(index)
        self.releases = []
        self.name = name
        self.contains_hidden = contains_hidden
        if releases:
            self.add_releases(releases)

    def fetch_releases(self):
        self._index.get_releases(self.name)
        return self.releases

    def filter(self, predicate):
        """Filter and return a subset of releases matching the given predicate.
        """
        return ReleasesList(self.name, [release for release in self.releases
                                        if predicate.match(release.version)],
                                        index=self._index)

    def get_last(self, requirements, prefer_final=None):
        """Return the "last" release, that satisfy the given predicates.

        "last" is defined by the version number of the releases, you also could
        set prefer_final parameter to True or False to change the order results
        """
        predicate = get_version_predicate(requirements)
        releases = self.filter(predicate)
        if len(releases) == 0:
            return None
        releases.sort_releases(prefer_final, reverse=True)
        return releases[0]

    def add_releases(self, releases):
        """Add releases in the release list.

        :param: releases is a list of ReleaseInfo objects.
        """
        for r in releases:
            self.add_release(release=r)

    def add_release(self, version=None, dist_type='sdist', release=None,
                    **dist_args):
        """Add a release to the list.

        The release can be passed in the `release` parameter, and in this case,
        it will be crawled to extract the useful informations if necessary, or
        the release informations can be directly passed in the `version` and
        `dist_type` arguments.

        Other keywords arguments can be provided, and will be forwarded to the
        distribution creation (eg. the arguments of the DistInfo constructor).
        """
        if release:
            if release.name.lower() != self.name.lower():
                raise ValueError("%s is not the same project as %s" %
                                 (release.name, self.name))
            version = str(release.version)

            if version not in self.get_versions():
                # append only if not already exists
                self.releases.append(release)
            for dist in release.dists.values():
                for url in dist.urls:
                    self.add_release(version, dist.dist_type, **url)
        else:
            matches = [r for r in self.releases
                       if str(r.version) == version and r.name == self.name]
            if not matches:
                release = ReleaseInfo(self.name, version, index=self._index)
                self.releases.append(release)
            else:
                release = matches[0]

            release.add_distribution(dist_type=dist_type, **dist_args)

    def sort_releases(self, prefer_final=False, reverse=True, *args, **kwargs):
        """Sort the results with the given properties.

        The `prefer_final` argument can be used to specify if final
        distributions (eg. not dev, bet or alpha) would be prefered or not.

        Results can be inverted by using `reverse`.

        Any other parameter provided will be forwarded to the sorted call. You
        cannot redefine the key argument of "sorted" here, as it is used
        internally to sort the releases.
        """

        sort_by = []
        if prefer_final:
            sort_by.append("is_final")
        sort_by.append("version")

        self.releases.sort(
            key=lambda i: tuple(getattr(i, arg) for arg in sort_by),
            reverse=reverse, *args, **kwargs)

    def get_release(self, version):
        """Return a release from its version."""
        matches = [r for r in self.releases if str(r.version) == version]
        if len(matches) != 1:
            raise KeyError(version)
        return matches[0]

    def get_versions(self):
        """Return a list of releases versions contained"""
        return [str(r.version) for r in self.releases]

    def __getitem__(self, key):
        return self.releases[key]

    def __len__(self):
        return len(self.releases)

    def __repr__(self):
        string = 'Project "%s"' % self.name
        if self.get_versions():
            string += ' versions: %s' % ', '.join(self.get_versions())
        return '<%s>' % string


def get_infos_from_url(url, probable_dist_name=None, is_external=True):
    """Get useful informations from an URL.

    Return a dict of (name, version, url, hashtype, hash, is_external)

    :param url: complete url of the distribution
    :param probable_dist_name: A probable name of the project.
    :param is_external: Tell if the url commes from an index or from
                        an external URL.
    """
    # if the url contains a md5 hash, get it.
    md5_hash = None
    match = MD5_HASH.match(url)
    if match is not None:
        md5_hash = match.group(1)
        # remove the hash
        url = url.replace("#md5=%s" % md5_hash, "")

    # parse the archive name to find dist name and version
    archive_name = urllib.parse.urlparse(url)[2].split('/')[-1]
    extension_matched = False
    # remove the extension from the name
    for ext in EXTENSIONS:
        if archive_name.endswith(ext):
            archive_name = archive_name[:-len(ext)]
            extension_matched = True

    name, version = split_archive_name(archive_name)
    if extension_matched is True:
        return {'name': name,
                'version': version,
                'url': url,
                'hashname': "md5",
                'hashval': md5_hash,
                'is_external': is_external,
                'dist_type': 'sdist'}


def split_archive_name(archive_name, probable_name=None):
    """Split an archive name into two parts: name and version.

    Return the tuple (name, version)
    """
    # Try to determine wich part is the name and wich is the version using the
    # "-" separator. Take the larger part to be the version number then reduce
    # if this not works.
    def eager_split(str, maxsplit=2):
        # split using the "-" separator
        splits = str.rsplit("-", maxsplit)
        name = splits[0]
        version = "-".join(splits[1:])
        if version.startswith("-"):
            version = version[1:]
        if suggest_normalized_version(version) is None and maxsplit >= 0:
            # we dont get a good version number: recurse !
            return eager_split(str, maxsplit - 1)
        else:
            return name, version
    if probable_name is not None:
        probable_name = probable_name.lower()
    name = None
    if probable_name is not None and probable_name in archive_name:
        # we get the name from probable_name, if given.
        name = probable_name
        version = archive_name.lstrip(name)
    else:
        name, version = eager_split(archive_name)

    version = suggest_normalized_version(version)
    if version is not None and name != "":
        return name.lower(), version
    else:
        raise CantParseArchiveName(archive_name)