summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorTarek Ziadé <ziade.tarek@gmail.com>2010-04-28 17:51:36 (GMT)
committerTarek Ziadé <ziade.tarek@gmail.com>2010-04-28 17:51:36 (GMT)
commit6ac91723bd78dd1b9ae05e3efefe9b36e9aaad84 (patch)
tree24d868801f005344c4c2cb8cca8ed2ff172425bd /Lib
parent71fb6c88a8f8e04968a726945ef53b4dee6e10c9 (diff)
downloadcpython-6ac91723bd78dd1b9ae05e3efefe9b36e9aaad84.zip
cpython-6ac91723bd78dd1b9ae05e3efefe9b36e9aaad84.tar.gz
cpython-6ac91723bd78dd1b9ae05e3efefe9b36e9aaad84.tar.bz2
#8295 : Added shutil.unpack_archive and related APIs
Diffstat (limited to 'Lib')
-rw-r--r--Lib/shutil.py178
-rw-r--r--Lib/test/test_shutil.py57
2 files changed, 230 insertions, 5 deletions
diff --git a/Lib/shutil.py b/Lib/shutil.py
index 8890d24..c07f394 100644
--- a/Lib/shutil.py
+++ b/Lib/shutil.py
@@ -11,6 +11,7 @@ from os.path import abspath
import fnmatch
import collections
import errno
+import tarfile
try:
from pwd import getpwnam
@@ -25,7 +26,9 @@ except ImportError:
__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
"copytree", "move", "rmtree", "Error", "SpecialFileError",
"ExecError", "make_archive", "get_archive_formats",
- "register_archive_format", "unregister_archive_format"]
+ "register_archive_format", "unregister_archive_format",
+ "get_unpack_formats", "register_unpack_format",
+ "unregister_unpack_format", "unpack_archive"]
class Error(EnvironmentError):
pass
@@ -37,6 +40,14 @@ class SpecialFileError(EnvironmentError):
class ExecError(EnvironmentError):
"""Raised when a command could not be executed"""
+class ReadError(EnvironmentError):
+ """Raised when an archive cannot be read"""
+
+class RegistryError(Exception):
+ """Raised when a registery operation with the archiving
+ and unpacking registeries fails"""
+
+
try:
WindowsError
except NameError:
@@ -381,10 +392,7 @@ def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
if not dry_run:
os.makedirs(archive_dir)
-
# creating the tarball
- import tarfile # late import so Python build itself doesn't break
-
if logger is not None:
logger.info('Creating tar archive')
@@ -567,3 +575,165 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
os.chdir(save_cwd)
return filename
+
+
+def get_unpack_formats():
+ """Returns a list of supported formats for unpacking.
+
+ Each element of the returned sequence is a tuple
+ (name, extensions, description)
+ """
+ formats = [(name, info[0], info[3]) for name, info in
+ _UNPACK_FORMATS.items()]
+ formats.sort()
+ return formats
+
+def _check_unpack_options(extensions, function, extra_args):
+ """Checks what gets registered as an unpacker."""
+ # first make sure no other unpacker is registered for this extension
+ existing_extensions = {}
+ for name, info in _UNPACK_FORMATS.items():
+ for ext in info[0]:
+ existing_extensions[ext] = name
+
+ for extension in extensions:
+ if extension in existing_extensions:
+ msg = '%s is already registered for "%s"'
+ raise RegistryError(msg % (extension,
+ existing_extensions[extension]))
+
+ if not isinstance(function, collections.Callable):
+ raise TypeError('The registered function must be a callable')
+
+
+def register_unpack_format(name, extensions, function, extra_args=None,
+ description=''):
+ """Registers an unpack format.
+
+ `name` is the name of the format. `extensions` is a list of extensions
+ corresponding to the format.
+
+ `function` is the callable that will be
+ used to unpack archives. The callable will receive archives to unpack.
+ If it's unable to handle an archive, it needs to raise a ReadError
+ exception.
+
+ If provided, `extra_args` is a sequence of
+ (name, value) tuples that will be passed as arguments to the callable.
+ description can be provided to describe the format, and will be returned
+ by the get_unpack_formats() function.
+ """
+ if extra_args is None:
+ extra_args = []
+ _check_unpack_options(extensions, function, extra_args)
+ _UNPACK_FORMATS[name] = extensions, function, extra_args, description
+
+def unregister_unpack_format(name):
+ """Removes the pack format from the registery."""
+ del _UNPACK_FORMATS[name]
+
+def _ensure_directory(path):
+ """Ensure that the parent directory of `path` exists"""
+ dirname = os.path.dirname(path)
+ if not os.path.isdir(dirname):
+ os.makedirs(dirname)
+
+def _unpack_zipfile(filename, extract_dir):
+ """Unpack zip `filename` to `extract_dir`
+ """
+ try:
+ import zipfile
+ except ImportError:
+ raise ReadError('zlib not supported, cannot unpack this archive.')
+
+ if not zipfile.is_zipfile(filename):
+ raise ReadError("%s is not a zip file" % filename)
+
+ zip = zipfile.ZipFile(filename)
+ try:
+ for info in zip.infolist():
+ name = info.filename
+
+ # don't extract absolute paths or ones with .. in them
+ if name.startswith('/') or '..' in name:
+ continue
+
+ target = os.path.join(extract_dir, *name.split('/'))
+ if not target:
+ continue
+
+ _ensure_directory(target)
+ if not name.endswith('/'):
+ # file
+ data = zip.read(info.filename)
+ f = open(target,'wb')
+ try:
+ f.write(data)
+ finally:
+ f.close()
+ del data
+ finally:
+ zip.close()
+
+def _unpack_tarfile(filename, extract_dir):
+ """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
+ """
+ try:
+ tarobj = tarfile.open(filename)
+ except tarfile.TarError:
+ raise ReadError(
+ "%s is not a compressed or uncompressed tar file" % filename)
+ try:
+ tarobj.extractall(extract_dir)
+ finally:
+ tarobj.close()
+
+_UNPACK_FORMATS = {
+ 'gztar': (['.tar.gz', '.tgz'], _unpack_tarfile, [], "gzip'ed tar-file"),
+ 'bztar': (['.bz2'], _unpack_tarfile, [], "bzip2'ed tar-file"),
+ 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
+ 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file")
+ }
+
+def _find_unpack_format(filename):
+ for name, info in _UNPACK_FORMATS.items():
+ for extension in info[0]:
+ if filename.endswith(extension):
+ return name
+ return None
+
+def unpack_archive(filename, extract_dir=None, format=None):
+ """Unpack an archive.
+
+ `filename` is the name of the archive.
+
+ `extract_dir` is the name of the target directory, where the archive
+ is unpacked. If not provided, the current working directory is used.
+
+ `format` is the archive format: one of "zip", "tar", or "gztar". Or any
+ other registered format. If not provided, unpack_archive will use the
+ filename extension and see if an unpacker was registered for that
+ extension.
+
+ In case none is found, a ValueError is raised.
+ """
+ if extract_dir is None:
+ extract_dir = os.getcwd()
+
+ if format is not None:
+ try:
+ format_info = _UNPACK_FORMATS[format]
+ except KeyError:
+ raise ValueError("Unknown unpack format '{0}'".format(format))
+
+ func = format_info[0]
+ func(filename, extract_dir, **dict(format_info[1]))
+ else:
+ # we need to look at the registered unpackers supported extensions
+ format = _find_unpack_format(filename)
+ if format is None:
+ raise ReadError("Unknown archive format '{0}'".format(filename))
+
+ func = _UNPACK_FORMATS[format][1]
+ kwargs = dict(_UNPACK_FORMATS[format][2])
+ func(filename, extract_dir, **kwargs)
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
index 18164ab..b34165d 100644
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -13,7 +13,9 @@ from os.path import splitdrive
from distutils.spawn import find_executable, spawn
from shutil import (_make_tarball, _make_zipfile, make_archive,
register_archive_format, unregister_archive_format,
- get_archive_formats, Error)
+ get_archive_formats, Error, unpack_archive,
+ register_unpack_format, RegistryError,
+ unregister_unpack_format, get_unpack_formats)
import tarfile
import warnings
@@ -538,6 +540,7 @@ class TestShutil(unittest.TestCase):
owner='kjhkjhkjg', group='oihohoh')
self.assertTrue(os.path.exists(res))
+
@unittest.skipUnless(zlib, "Requires zlib")
@unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
def test_tarfile_root_owner(self):
@@ -595,6 +598,58 @@ class TestShutil(unittest.TestCase):
formats = [name for name, params in get_archive_formats()]
self.assertNotIn('xxx', formats)
+ def _compare_dirs(self, dir1, dir2):
+ # check that dir1 and dir2 are equivalent,
+ # return the diff
+ diff = []
+ for root, dirs, files in os.walk(dir1):
+ for file_ in files:
+ path = os.path.join(root, file_)
+ target_path = os.path.join(dir2, os.path.split(path)[-1])
+ if not os.path.exists(target_path):
+ diff.append(file_)
+ return diff
+
+ @unittest.skipUnless(zlib, "Requires zlib")
+ def test_unpack_archive(self):
+
+ for format in ('tar', 'gztar', 'bztar', 'zip'):
+ tmpdir = self.mkdtemp()
+ base_dir, root_dir, base_name = self._create_files()
+ tmpdir2 = self.mkdtemp()
+ filename = make_archive(base_name, format, root_dir, base_dir)
+
+ # let's try to unpack it now
+ unpack_archive(filename, tmpdir2)
+ diff = self._compare_dirs(tmpdir, tmpdir2)
+ self.assertEquals(diff, [])
+
+ def test_unpack_registery(self):
+
+ formats = get_unpack_formats()
+
+ def _boo(filename, extract_dir, extra):
+ self.assertEquals(extra, 1)
+ self.assertEquals(filename, 'stuff.boo')
+ self.assertEquals(extract_dir, 'xx')
+
+ register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
+ unpack_archive('stuff.boo', 'xx')
+
+ # trying to register a .boo unpacker again
+ self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
+ ['.boo'], _boo)
+
+ # should work now
+ unregister_unpack_format('Boo')
+ register_unpack_format('Boo2', ['.boo'], _boo)
+ self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
+ self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
+
+ # let's leave a clean state
+ unregister_unpack_format('Boo2')
+ self.assertEquals(get_unpack_formats(), formats)
+
class TestMove(unittest.TestCase):