diff options
Diffstat (limited to 'Lib/shutil.py')
| -rw-r--r-- | Lib/shutil.py | 263 |
1 files changed, 232 insertions, 31 deletions
diff --git a/Lib/shutil.py b/Lib/shutil.py index 420802f..ef29ae2 100644 --- a/Lib/shutil.py +++ b/Lib/shutil.py @@ -11,6 +11,13 @@ from os.path import abspath import fnmatch import collections import errno +import tarfile + +try: + import bz2 + _BZ2_SUPPORTED = True +except ImportError: + _BZ2_SUPPORTED = False try: from pwd import getpwnam @@ -26,7 +33,8 @@ __all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", "copytree", "move", "rmtree", "Error", "SpecialFileError", "ExecError", "make_archive", "get_archive_formats", "register_archive_format", "unregister_archive_format", - "ignore_patterns"] + "get_unpack_formats", "register_unpack_format", + "unregister_unpack_format", "unpack_archive", "ignore_patterns"] class Error(EnvironmentError): pass @@ -38,6 +46,14 @@ class SpecialFileError(EnvironmentError): class ExecError(EnvironmentError): """Raised when a command could not be executed""" +class ReadError(EnvironmentError): + """Raised when an archive cannot be read""" + +class RegistryError(Exception): + """Raised when a registery operation with the archiving + and unpacking registeries fails""" + + try: WindowsError except NameError: @@ -101,7 +117,7 @@ def copystat(src, dst): if hasattr(os, 'chflags') and hasattr(st, 'st_flags'): try: os.chflags(dst, st.st_flags) - except OSError, why: + except OSError as why: for err in 'EOPNOTSUPP', 'ENOTSUP': if hasattr(errno, err) and why.errno == getattr(errno, err): break @@ -142,8 +158,9 @@ def ignore_patterns(*patterns): return set(ignored_names) return _ignore_patterns -def copytree(src, dst, symlinks=False, ignore=None): - """Recursively copy a directory tree using copy2(). +def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, + ignore_dangling_symlinks=False): + """Recursively copy a directory tree. The destination directory must not already exist. If exception(s) occur, an Error is raised with a list of reasons. @@ -151,7 +168,13 @@ def copytree(src, dst, symlinks=False, ignore=None): If the optional symlinks flag is true, symbolic links in the source tree result in symbolic links in the destination tree; if it is false, the contents of the files pointed to by symbolic - links are copied. + links are copied. If the file pointed by the symlink doesn't + exist, an exception will be added in the list of errors raised in + an Error exception at the end of the copy process. + + You can set the optional ignore_dangling_symlinks flag to true if you + want to silence this exception. Notice that this has no effect on + platforms that don't support os.symlink. The optional ignore argument is a callable. If given, it is called with the `src` parameter, which is the directory @@ -165,7 +188,10 @@ def copytree(src, dst, symlinks=False, ignore=None): list of names relative to the `src` directory that should not be copied. - XXX Consider this example code rather than the ultimate tool. + The optional copy_function argument is a callable that will be used + to copy each file. It will be called with the source path and the + destination path as arguments. By default, copy2() is used, but any + function that supports the same signature (like copy()) can be used. """ names = os.listdir(src) @@ -182,30 +208,37 @@ def copytree(src, dst, symlinks=False, ignore=None): srcname = os.path.join(src, name) dstname = os.path.join(dst, name) try: - if symlinks and os.path.islink(srcname): + if os.path.islink(srcname): linkto = os.readlink(srcname) - os.symlink(linkto, dstname) + if symlinks: + os.symlink(linkto, dstname) + else: + # ignore dangling symlink if the flag is on + if not os.path.exists(linkto) and ignore_dangling_symlinks: + continue + # otherwise let the copy occurs. copy2 will raise an error + copy_function(srcname, dstname) elif os.path.isdir(srcname): - copytree(srcname, dstname, symlinks, ignore) + copytree(srcname, dstname, symlinks, ignore, copy_function) else: # Will raise a SpecialFileError for unsupported file types - copy2(srcname, dstname) + copy_function(srcname, dstname) # catch the Error from the recursive copytree so that we can # continue with other files - except Error, err: + except Error as err: errors.extend(err.args[0]) - except EnvironmentError, why: + except EnvironmentError as why: errors.append((srcname, dstname, str(why))) try: copystat(src, dst) - except OSError, why: + except OSError as why: if WindowsError is not None and isinstance(why, WindowsError): # Copying file access times may fail on Windows pass else: errors.append((src, dst, str(why))) if errors: - raise Error, errors + raise Error(errors) def rmtree(path, ignore_errors=False, onerror=None): """Recursively delete a directory tree. @@ -235,7 +268,7 @@ def rmtree(path, ignore_errors=False, onerror=None): names = [] try: names = os.listdir(path) - except os.error, err: + except os.error as err: onerror(os.listdir, path, sys.exc_info()) for name in names: fullname = os.path.join(path, name) @@ -248,7 +281,7 @@ def rmtree(path, ignore_errors=False, onerror=None): else: try: os.remove(fullname) - except os.error, err: + except os.error as err: onerror(os.remove, fullname, sys.exc_info()) try: os.rmdir(path) @@ -288,13 +321,13 @@ def move(src, dst): real_dst = os.path.join(dst, _basename(src)) if os.path.exists(real_dst): - raise Error, "Destination path '%s' already exists" % real_dst + raise Error("Destination path '%s' already exists" % real_dst) try: os.rename(src, real_dst) - except OSError: + except OSError as exc: if os.path.isdir(src): if _destinsrc(src, dst): - raise Error, "Cannot move a directory '%s' into itself '%s'." % (src, dst) + raise Error("Cannot move a directory '%s' into itself '%s'." % (src, dst)) copytree(src, real_dst, symlinks=True) rmtree(src) else: @@ -350,13 +383,17 @@ def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, Returns the output filename. """ - tar_compression = {'gzip': 'gz', 'bzip2': 'bz2', None: ''} - compress_ext = {'gzip': '.gz', 'bzip2': '.bz2'} + tar_compression = {'gzip': 'gz', None: ''} + compress_ext = {'gzip': '.gz'} + + if _BZ2_SUPPORTED: + tar_compression['bzip2'] = 'bz2' + compress_ext['bzip2'] = '.bz2' # flags for compression program, each element of list will be an argument if compress is not None and compress not in compress_ext.keys(): - raise ValueError, \ - ("bad value for 'compress': must be None, 'gzip' or 'bzip2'") + raise ValueError("bad value for 'compress', or compression format not " + "supported : {0}".format(compress)) archive_name = base_name + '.tar' + compress_ext.get(compress, '') archive_dir = os.path.dirname(archive_name) @@ -367,10 +404,7 @@ def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, if not dry_run: os.makedirs(archive_dir) - # creating the tarball - import tarfile # late import so Python build itself doesn't break - if logger is not None: logger.info('Creating tar archive') @@ -408,8 +442,7 @@ def _call_external_zip(base_dir, zip_filename, verbose=False, dry_run=False): except DistutilsExecError: # XXX really should distinguish between "couldn't find # external 'zip' command" and "zip failed". - raise ExecError, \ - ("unable to create zip file '%s': " + raise ExecError("unable to create zip file '%s': " "could neither import the 'zipfile' module nor " "find a standalone zip utility") % zip_filename @@ -462,11 +495,14 @@ def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): _ARCHIVE_FORMATS = { 'gztar': (_make_tarball, [('compress', 'gzip')], "gzip'ed tar-file"), - 'bztar': (_make_tarball, [('compress', 'bzip2')], "bzip2'ed tar-file"), 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"), 'zip': (_make_zipfile, [],"ZIP file") } +if _BZ2_SUPPORTED: + _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')], + "bzip2'ed tar-file") + def get_archive_formats(): """Returns a list of supported formats for archiving and unarchiving. @@ -488,7 +524,7 @@ def register_archive_format(name, function, extra_args=None, description=''): """ if extra_args is None: extra_args = [] - if not isinstance(function, collections.Callable): + if not callable(function): raise TypeError('The %s object is not callable' % function) if not isinstance(extra_args, (tuple, list)): raise TypeError('extra_args needs to be a sequence') @@ -535,7 +571,7 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, try: format_info = _ARCHIVE_FORMATS[format] except KeyError: - raise ValueError, "unknown archive format '%s'" % format + raise ValueError("unknown archive format '%s'" % format) func = format_info[0] for arg, val in format_info[1]: @@ -554,3 +590,168 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, os.chdir(save_cwd) return filename + + +def get_unpack_formats(): + """Returns a list of supported formats for unpacking. + + Each element of the returned sequence is a tuple + (name, extensions, description) + """ + formats = [(name, info[0], info[3]) for name, info in + _UNPACK_FORMATS.items()] + formats.sort() + return formats + +def _check_unpack_options(extensions, function, extra_args): + """Checks what gets registered as an unpacker.""" + # first make sure no other unpacker is registered for this extension + existing_extensions = {} + for name, info in _UNPACK_FORMATS.items(): + for ext in info[0]: + existing_extensions[ext] = name + + for extension in extensions: + if extension in existing_extensions: + msg = '%s is already registered for "%s"' + raise RegistryError(msg % (extension, + existing_extensions[extension])) + + if not callable(function): + raise TypeError('The registered function must be a callable') + + +def register_unpack_format(name, extensions, function, extra_args=None, + description=''): + """Registers an unpack format. + + `name` is the name of the format. `extensions` is a list of extensions + corresponding to the format. + + `function` is the callable that will be + used to unpack archives. The callable will receive archives to unpack. + If it's unable to handle an archive, it needs to raise a ReadError + exception. + + If provided, `extra_args` is a sequence of + (name, value) tuples that will be passed as arguments to the callable. + description can be provided to describe the format, and will be returned + by the get_unpack_formats() function. + """ + if extra_args is None: + extra_args = [] + _check_unpack_options(extensions, function, extra_args) + _UNPACK_FORMATS[name] = extensions, function, extra_args, description + +def unregister_unpack_format(name): + """Removes the pack format from the registery.""" + del _UNPACK_FORMATS[name] + +def _ensure_directory(path): + """Ensure that the parent directory of `path` exists""" + dirname = os.path.dirname(path) + if not os.path.isdir(dirname): + os.makedirs(dirname) + +def _unpack_zipfile(filename, extract_dir): + """Unpack zip `filename` to `extract_dir` + """ + try: + import zipfile + except ImportError: + raise ReadError('zlib not supported, cannot unpack this archive.') + + if not zipfile.is_zipfile(filename): + raise ReadError("%s is not a zip file" % filename) + + zip = zipfile.ZipFile(filename) + try: + for info in zip.infolist(): + name = info.filename + + # don't extract absolute paths or ones with .. in them + if name.startswith('/') or '..' in name: + continue + + target = os.path.join(extract_dir, *name.split('/')) + if not target: + continue + + _ensure_directory(target) + if not name.endswith('/'): + # file + data = zip.read(info.filename) + f = open(target,'wb') + try: + f.write(data) + finally: + f.close() + del data + finally: + zip.close() + +def _unpack_tarfile(filename, extract_dir): + """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` + """ + try: + tarobj = tarfile.open(filename) + except tarfile.TarError: + raise ReadError( + "%s is not a compressed or uncompressed tar file" % filename) + try: + tarobj.extractall(extract_dir) + finally: + tarobj.close() + +_UNPACK_FORMATS = { + 'gztar': (['.tar.gz', '.tgz'], _unpack_tarfile, [], "gzip'ed tar-file"), + 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), + 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file") + } + +if _BZ2_SUPPORTED: + _UNPACK_FORMATS['bztar'] = (['.bz2'], _unpack_tarfile, [], + "bzip2'ed tar-file") + +def _find_unpack_format(filename): + for name, info in _UNPACK_FORMATS.items(): + for extension in info[0]: + if filename.endswith(extension): + return name + return None + +def unpack_archive(filename, extract_dir=None, format=None): + """Unpack an archive. + + `filename` is the name of the archive. + + `extract_dir` is the name of the target directory, where the archive + is unpacked. If not provided, the current working directory is used. + + `format` is the archive format: one of "zip", "tar", or "gztar". Or any + other registered format. If not provided, unpack_archive will use the + filename extension and see if an unpacker was registered for that + extension. + + In case none is found, a ValueError is raised. + """ + if extract_dir is None: + extract_dir = os.getcwd() + + if format is not None: + try: + format_info = _UNPACK_FORMATS[format] + except KeyError: + raise ValueError("Unknown unpack format '{0}'".format(format)) + + func = format_info[1] + func(filename, extract_dir, **dict(format_info[2])) + else: + # we need to look at the registered unpackers supported extensions + format = _find_unpack_format(filename) + if format is None: + raise ReadError("Unknown archive format '{0}'".format(filename)) + + func = _UNPACK_FORMATS[format][1] + kwargs = dict(_UNPACK_FORMATS[format][2]) + func(filename, extract_dir, **kwargs) |
