diff options
author | Phillip J. Eby <pje@telecommunity.com> | 2006-04-18 04:05:34 (GMT) |
---|---|---|
committer | Phillip J. Eby <pje@telecommunity.com> | 2006-04-18 04:05:34 (GMT) |
commit | 069159b11390d2827380cd3bf9a90d4ff57527a0 (patch) | |
tree | 68b7af33b6aecb062376dddd6117adffa6d16533 /Lib/setuptools/archive_util.py | |
parent | e247e898464f421a8b9d41ef74aaaf6137876ebb (diff) | |
download | cpython-069159b11390d2827380cd3bf9a90d4ff57527a0.zip cpython-069159b11390d2827380cd3bf9a90d4ff57527a0.tar.gz cpython-069159b11390d2827380cd3bf9a90d4ff57527a0.tar.bz2 |
Initial import of setuptools, with integrated tests.
Diffstat (limited to 'Lib/setuptools/archive_util.py')
-rwxr-xr-x | Lib/setuptools/archive_util.py | 205 |
1 files changed, 205 insertions, 0 deletions
diff --git a/Lib/setuptools/archive_util.py b/Lib/setuptools/archive_util.py new file mode 100755 index 0000000..511f05a --- /dev/null +++ b/Lib/setuptools/archive_util.py @@ -0,0 +1,205 @@ +"""Utilities for extracting common archive formats""" + + +__all__ = [ + "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter", + "UnrecognizedFormat", "extraction_drivers", "unpack_directory", +] + +import zipfile, tarfile, os, shutil +from pkg_resources import ensure_directory +from distutils.errors import DistutilsError + +class UnrecognizedFormat(DistutilsError): + """Couldn't recognize the archive type""" + +def default_filter(src,dst): + """The default progress/filter callback; returns True for all files""" + return dst + + + + + + + + + + + + + + + + + + + + + + + +def unpack_archive(filename, extract_dir, progress_filter=default_filter, + drivers=None +): + """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat`` + + `progress_filter` is a function taking two arguments: a source path + internal to the archive ('/'-separated), and a filesystem path where it + will be extracted. The callback must return the desired extract path + (which may be the same as the one passed in), or else ``None`` to skip + that file or directory. The callback can thus be used to report on the + progress of the extraction, as well as to filter the items extracted or + alter their extraction paths. + + `drivers`, if supplied, must be a non-empty sequence of functions with the + same signature as this function (minus the `drivers` argument), that raise + ``UnrecognizedFormat`` if they do not support extracting the designated + archive type. The `drivers` are tried in sequence until one is found that + does not raise an error, or until all are exhausted (in which case + ``UnrecognizedFormat`` is raised). If you do not supply a sequence of + drivers, the module's ``extraction_drivers`` constant will be used, which + means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that + order. + """ + for driver in drivers or extraction_drivers: + try: + driver(filename, extract_dir, progress_filter) + except UnrecognizedFormat: + continue + else: + return + else: + raise UnrecognizedFormat( + "Not a recognized archive type: %s" % filename + ) + + + + + + + +def unpack_directory(filename, extract_dir, progress_filter=default_filter): + """"Unpack" a directory, using the same interface as for archives + + Raises ``UnrecognizedFormat`` if `filename` is not a directory + """ + if not os.path.isdir(filename): + raise UnrecognizedFormat("%s is not a directory" % (filename,)) + + paths = {filename:('',extract_dir)} + for base, dirs, files in os.walk(filename): + src,dst = paths[base] + for d in dirs: + paths[os.path.join(base,d)] = src+d+'/', os.path.join(dst,d) + for f in files: + name = src+f + target = os.path.join(dst,f) + target = progress_filter(src+f, target) + if not target: + continue # skip non-files + ensure_directory(target) + f = os.path.join(base,f) + shutil.copyfile(f, target) + shutil.copystat(f, target) + + + + + + + + + + + + + + + + + + +def unpack_zipfile(filename, extract_dir, progress_filter=default_filter): + """Unpack zip `filename` to `extract_dir` + + Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined + by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation + of the `progress_filter` argument. + """ + + if not zipfile.is_zipfile(filename): + raise UnrecognizedFormat("%s is not a zip file" % (filename,)) + + z = zipfile.ZipFile(filename) + try: + for info in z.infolist(): + name = info.filename + + # don't extract absolute paths or ones with .. in them + if name.startswith('/') or '..' in name: + continue + + target = os.path.join(extract_dir, *name.split('/')) + target = progress_filter(name, target) + if not target: + continue + if name.endswith('/'): + # directory + ensure_directory(target) + else: + # file + ensure_directory(target) + data = z.read(info.filename) + f = open(target,'wb') + try: + f.write(data) + finally: + f.close() + del data + finally: + z.close() + + +def unpack_tarfile(filename, extract_dir, progress_filter=default_filter): + """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` + + Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined + by ``tarfile.open()``). See ``unpack_archive()`` for an explanation + of the `progress_filter` argument. + """ + + try: + tarobj = tarfile.open(filename) + except tarfile.TarError: + raise UnrecognizedFormat( + "%s is not a compressed or uncompressed tar file" % (filename,) + ) + + try: + tarobj.chown = lambda *args: None # don't do any chowning! + for member in tarobj: + if member.isfile() or member.isdir(): + name = member.name + # don't extract absolute paths or ones with .. in them + if not name.startswith('/') and '..' not in name: + dst = os.path.join(extract_dir, *name.split('/')) + dst = progress_filter(name, dst) + if dst: + if dst.endswith(os.sep): + dst = dst[:-1] + tarobj._extract_member(member,dst) # XXX Ugh + return True + finally: + tarobj.close() + + + + +extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile + + + + + |