summaryrefslogtreecommitdiffstats
path: root/Lib/shutil.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/shutil.py')
-rw-r--r--Lib/shutil.py263
1 files changed, 232 insertions, 31 deletions
diff --git a/Lib/shutil.py b/Lib/shutil.py
index 420802f..ef29ae2 100644
--- a/Lib/shutil.py
+++ b/Lib/shutil.py
@@ -11,6 +11,13 @@ from os.path import abspath
import fnmatch
import collections
import errno
+import tarfile
+
+try:
+ import bz2
+ _BZ2_SUPPORTED = True
+except ImportError:
+ _BZ2_SUPPORTED = False
try:
from pwd import getpwnam
@@ -26,7 +33,8 @@ __all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
"copytree", "move", "rmtree", "Error", "SpecialFileError",
"ExecError", "make_archive", "get_archive_formats",
"register_archive_format", "unregister_archive_format",
- "ignore_patterns"]
+ "get_unpack_formats", "register_unpack_format",
+ "unregister_unpack_format", "unpack_archive", "ignore_patterns"]
class Error(EnvironmentError):
pass
@@ -38,6 +46,14 @@ class SpecialFileError(EnvironmentError):
class ExecError(EnvironmentError):
"""Raised when a command could not be executed"""
+class ReadError(EnvironmentError):
+ """Raised when an archive cannot be read"""
+
+class RegistryError(Exception):
+ """Raised when a registery operation with the archiving
+ and unpacking registeries fails"""
+
+
try:
WindowsError
except NameError:
@@ -101,7 +117,7 @@ def copystat(src, dst):
if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
try:
os.chflags(dst, st.st_flags)
- except OSError, why:
+ except OSError as why:
for err in 'EOPNOTSUPP', 'ENOTSUP':
if hasattr(errno, err) and why.errno == getattr(errno, err):
break
@@ -142,8 +158,9 @@ def ignore_patterns(*patterns):
return set(ignored_names)
return _ignore_patterns
-def copytree(src, dst, symlinks=False, ignore=None):
- """Recursively copy a directory tree using copy2().
+def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
+ ignore_dangling_symlinks=False):
+ """Recursively copy a directory tree.
The destination directory must not already exist.
If exception(s) occur, an Error is raised with a list of reasons.
@@ -151,7 +168,13 @@ def copytree(src, dst, symlinks=False, ignore=None):
If the optional symlinks flag is true, symbolic links in the
source tree result in symbolic links in the destination tree; if
it is false, the contents of the files pointed to by symbolic
- links are copied.
+ links are copied. If the file pointed by the symlink doesn't
+ exist, an exception will be added in the list of errors raised in
+ an Error exception at the end of the copy process.
+
+ You can set the optional ignore_dangling_symlinks flag to true if you
+ want to silence this exception. Notice that this has no effect on
+ platforms that don't support os.symlink.
The optional ignore argument is a callable. If given, it
is called with the `src` parameter, which is the directory
@@ -165,7 +188,10 @@ def copytree(src, dst, symlinks=False, ignore=None):
list of names relative to the `src` directory that should
not be copied.
- XXX Consider this example code rather than the ultimate tool.
+ The optional copy_function argument is a callable that will be used
+ to copy each file. It will be called with the source path and the
+ destination path as arguments. By default, copy2() is used, but any
+ function that supports the same signature (like copy()) can be used.
"""
names = os.listdir(src)
@@ -182,30 +208,37 @@ def copytree(src, dst, symlinks=False, ignore=None):
srcname = os.path.join(src, name)
dstname = os.path.join(dst, name)
try:
- if symlinks and os.path.islink(srcname):
+ if os.path.islink(srcname):
linkto = os.readlink(srcname)
- os.symlink(linkto, dstname)
+ if symlinks:
+ os.symlink(linkto, dstname)
+ else:
+ # ignore dangling symlink if the flag is on
+ if not os.path.exists(linkto) and ignore_dangling_symlinks:
+ continue
+ # otherwise let the copy occurs. copy2 will raise an error
+ copy_function(srcname, dstname)
elif os.path.isdir(srcname):
- copytree(srcname, dstname, symlinks, ignore)
+ copytree(srcname, dstname, symlinks, ignore, copy_function)
else:
# Will raise a SpecialFileError for unsupported file types
- copy2(srcname, dstname)
+ copy_function(srcname, dstname)
# catch the Error from the recursive copytree so that we can
# continue with other files
- except Error, err:
+ except Error as err:
errors.extend(err.args[0])
- except EnvironmentError, why:
+ except EnvironmentError as why:
errors.append((srcname, dstname, str(why)))
try:
copystat(src, dst)
- except OSError, why:
+ except OSError as why:
if WindowsError is not None and isinstance(why, WindowsError):
# Copying file access times may fail on Windows
pass
else:
errors.append((src, dst, str(why)))
if errors:
- raise Error, errors
+ raise Error(errors)
def rmtree(path, ignore_errors=False, onerror=None):
"""Recursively delete a directory tree.
@@ -235,7 +268,7 @@ def rmtree(path, ignore_errors=False, onerror=None):
names = []
try:
names = os.listdir(path)
- except os.error, err:
+ except os.error as err:
onerror(os.listdir, path, sys.exc_info())
for name in names:
fullname = os.path.join(path, name)
@@ -248,7 +281,7 @@ def rmtree(path, ignore_errors=False, onerror=None):
else:
try:
os.remove(fullname)
- except os.error, err:
+ except os.error as err:
onerror(os.remove, fullname, sys.exc_info())
try:
os.rmdir(path)
@@ -288,13 +321,13 @@ def move(src, dst):
real_dst = os.path.join(dst, _basename(src))
if os.path.exists(real_dst):
- raise Error, "Destination path '%s' already exists" % real_dst
+ raise Error("Destination path '%s' already exists" % real_dst)
try:
os.rename(src, real_dst)
- except OSError:
+ except OSError as exc:
if os.path.isdir(src):
if _destinsrc(src, dst):
- raise Error, "Cannot move a directory '%s' into itself '%s'." % (src, dst)
+ raise Error("Cannot move a directory '%s' into itself '%s'." % (src, dst))
copytree(src, real_dst, symlinks=True)
rmtree(src)
else:
@@ -350,13 +383,17 @@ def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
Returns the output filename.
"""
- tar_compression = {'gzip': 'gz', 'bzip2': 'bz2', None: ''}
- compress_ext = {'gzip': '.gz', 'bzip2': '.bz2'}
+ tar_compression = {'gzip': 'gz', None: ''}
+ compress_ext = {'gzip': '.gz'}
+
+ if _BZ2_SUPPORTED:
+ tar_compression['bzip2'] = 'bz2'
+ compress_ext['bzip2'] = '.bz2'
# flags for compression program, each element of list will be an argument
if compress is not None and compress not in compress_ext.keys():
- raise ValueError, \
- ("bad value for 'compress': must be None, 'gzip' or 'bzip2'")
+ raise ValueError("bad value for 'compress', or compression format not "
+ "supported : {0}".format(compress))
archive_name = base_name + '.tar' + compress_ext.get(compress, '')
archive_dir = os.path.dirname(archive_name)
@@ -367,10 +404,7 @@ def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
if not dry_run:
os.makedirs(archive_dir)
-
# creating the tarball
- import tarfile # late import so Python build itself doesn't break
-
if logger is not None:
logger.info('Creating tar archive')
@@ -408,8 +442,7 @@ def _call_external_zip(base_dir, zip_filename, verbose=False, dry_run=False):
except DistutilsExecError:
# XXX really should distinguish between "couldn't find
# external 'zip' command" and "zip failed".
- raise ExecError, \
- ("unable to create zip file '%s': "
+ raise ExecError("unable to create zip file '%s': "
"could neither import the 'zipfile' module nor "
"find a standalone zip utility") % zip_filename
@@ -462,11 +495,14 @@ def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None):
_ARCHIVE_FORMATS = {
'gztar': (_make_tarball, [('compress', 'gzip')], "gzip'ed tar-file"),
- 'bztar': (_make_tarball, [('compress', 'bzip2')], "bzip2'ed tar-file"),
'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"),
'zip': (_make_zipfile, [],"ZIP file")
}
+if _BZ2_SUPPORTED:
+ _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
+ "bzip2'ed tar-file")
+
def get_archive_formats():
"""Returns a list of supported formats for archiving and unarchiving.
@@ -488,7 +524,7 @@ def register_archive_format(name, function, extra_args=None, description=''):
"""
if extra_args is None:
extra_args = []
- if not isinstance(function, collections.Callable):
+ if not callable(function):
raise TypeError('The %s object is not callable' % function)
if not isinstance(extra_args, (tuple, list)):
raise TypeError('extra_args needs to be a sequence')
@@ -535,7 +571,7 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
try:
format_info = _ARCHIVE_FORMATS[format]
except KeyError:
- raise ValueError, "unknown archive format '%s'" % format
+ raise ValueError("unknown archive format '%s'" % format)
func = format_info[0]
for arg, val in format_info[1]:
@@ -554,3 +590,168 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
os.chdir(save_cwd)
return filename
+
+
+def get_unpack_formats():
+ """Returns a list of supported formats for unpacking.
+
+ Each element of the returned sequence is a tuple
+ (name, extensions, description)
+ """
+ formats = [(name, info[0], info[3]) for name, info in
+ _UNPACK_FORMATS.items()]
+ formats.sort()
+ return formats
+
+def _check_unpack_options(extensions, function, extra_args):
+ """Checks what gets registered as an unpacker."""
+ # first make sure no other unpacker is registered for this extension
+ existing_extensions = {}
+ for name, info in _UNPACK_FORMATS.items():
+ for ext in info[0]:
+ existing_extensions[ext] = name
+
+ for extension in extensions:
+ if extension in existing_extensions:
+ msg = '%s is already registered for "%s"'
+ raise RegistryError(msg % (extension,
+ existing_extensions[extension]))
+
+ if not callable(function):
+ raise TypeError('The registered function must be a callable')
+
+
+def register_unpack_format(name, extensions, function, extra_args=None,
+ description=''):
+ """Registers an unpack format.
+
+ `name` is the name of the format. `extensions` is a list of extensions
+ corresponding to the format.
+
+ `function` is the callable that will be
+ used to unpack archives. The callable will receive archives to unpack.
+ If it's unable to handle an archive, it needs to raise a ReadError
+ exception.
+
+ If provided, `extra_args` is a sequence of
+ (name, value) tuples that will be passed as arguments to the callable.
+ description can be provided to describe the format, and will be returned
+ by the get_unpack_formats() function.
+ """
+ if extra_args is None:
+ extra_args = []
+ _check_unpack_options(extensions, function, extra_args)
+ _UNPACK_FORMATS[name] = extensions, function, extra_args, description
+
+def unregister_unpack_format(name):
+ """Removes the pack format from the registery."""
+ del _UNPACK_FORMATS[name]
+
+def _ensure_directory(path):
+ """Ensure that the parent directory of `path` exists"""
+ dirname = os.path.dirname(path)
+ if not os.path.isdir(dirname):
+ os.makedirs(dirname)
+
+def _unpack_zipfile(filename, extract_dir):
+ """Unpack zip `filename` to `extract_dir`
+ """
+ try:
+ import zipfile
+ except ImportError:
+ raise ReadError('zlib not supported, cannot unpack this archive.')
+
+ if not zipfile.is_zipfile(filename):
+ raise ReadError("%s is not a zip file" % filename)
+
+ zip = zipfile.ZipFile(filename)
+ try:
+ for info in zip.infolist():
+ name = info.filename
+
+ # don't extract absolute paths or ones with .. in them
+ if name.startswith('/') or '..' in name:
+ continue
+
+ target = os.path.join(extract_dir, *name.split('/'))
+ if not target:
+ continue
+
+ _ensure_directory(target)
+ if not name.endswith('/'):
+ # file
+ data = zip.read(info.filename)
+ f = open(target,'wb')
+ try:
+ f.write(data)
+ finally:
+ f.close()
+ del data
+ finally:
+ zip.close()
+
+def _unpack_tarfile(filename, extract_dir):
+ """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
+ """
+ try:
+ tarobj = tarfile.open(filename)
+ except tarfile.TarError:
+ raise ReadError(
+ "%s is not a compressed or uncompressed tar file" % filename)
+ try:
+ tarobj.extractall(extract_dir)
+ finally:
+ tarobj.close()
+
+_UNPACK_FORMATS = {
+ 'gztar': (['.tar.gz', '.tgz'], _unpack_tarfile, [], "gzip'ed tar-file"),
+ 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
+ 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file")
+ }
+
+if _BZ2_SUPPORTED:
+ _UNPACK_FORMATS['bztar'] = (['.bz2'], _unpack_tarfile, [],
+ "bzip2'ed tar-file")
+
+def _find_unpack_format(filename):
+ for name, info in _UNPACK_FORMATS.items():
+ for extension in info[0]:
+ if filename.endswith(extension):
+ return name
+ return None
+
+def unpack_archive(filename, extract_dir=None, format=None):
+ """Unpack an archive.
+
+ `filename` is the name of the archive.
+
+ `extract_dir` is the name of the target directory, where the archive
+ is unpacked. If not provided, the current working directory is used.
+
+ `format` is the archive format: one of "zip", "tar", or "gztar". Or any
+ other registered format. If not provided, unpack_archive will use the
+ filename extension and see if an unpacker was registered for that
+ extension.
+
+ In case none is found, a ValueError is raised.
+ """
+ if extract_dir is None:
+ extract_dir = os.getcwd()
+
+ if format is not None:
+ try:
+ format_info = _UNPACK_FORMATS[format]
+ except KeyError:
+ raise ValueError("Unknown unpack format '{0}'".format(format))
+
+ func = format_info[1]
+ func(filename, extract_dir, **dict(format_info[2]))
+ else:
+ # we need to look at the registered unpackers supported extensions
+ format = _find_unpack_format(filename)
+ if format is None:
+ raise ReadError("Unknown archive format '{0}'".format(filename))
+
+ func = _UNPACK_FORMATS[format][1]
+ kwargs = dict(_UNPACK_FORMATS[format][2])
+ func(filename, extract_dir, **kwargs)