diff options
Diffstat (limited to 'Lib/test/test_shutil.py')
-rw-r--r-- | Lib/test/test_shutil.py | 317 |
1 files changed, 317 insertions, 0 deletions
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index a747d6f..f2e5d9b 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -9,9 +9,71 @@ import os import os.path from test import support from test.support import TESTFN +from os.path import splitdrive +from distutils.spawn import find_executable, spawn +from shutil import (_make_tarball, _make_zipfile, make_archive, + register_archive_format, unregister_archive_format, + get_archive_formats) +import tarfile +import warnings + +from test import support +from test.support import TESTFN, check_warnings, captured_stdout + TESTFN2 = TESTFN + "2" +try: + import grp + import pwd + UID_GID_SUPPORT = True +except ImportError: + UID_GID_SUPPORT = False + +try: + import zlib +except ImportError: + zlib = None + +try: + import zipfile + ZIP_SUPPORT = True +except ImportError: + ZIP_SUPPORT = find_executable('zip') + class TestShutil(unittest.TestCase): + + def setUp(self): + super(TestShutil, self).setUp() + self.tempdirs = [] + + def tearDown(self): + super(TestShutil, self).tearDown() + while self.tempdirs: + d = self.tempdirs.pop() + shutil.rmtree(d, os.name in ('nt', 'cygwin')) + + def write_file(self, path, content='xxx'): + """Writes a file in the given path. + + + path can be a string or a sequence. + """ + if isinstance(path, (list, tuple)): + path = os.path.join(*path) + f = open(path, 'w') + try: + f.write(content) + finally: + f.close() + + def mkdtemp(self): + """Create a temporary directory that will be cleaned up. + + Returns the path of the directory. + """ + d = tempfile.mkdtemp() + self.tempdirs.append(d) + return d def test_rmtree_errors(self): # filename is guaranteed not to exist filename = tempfile.mktemp() @@ -277,6 +339,261 @@ class TestShutil(unittest.TestCase): shutil.rmtree(TESTFN, ignore_errors=True) shutil.rmtree(TESTFN2, ignore_errors=True) + @unittest.skipUnless(zlib, "requires zlib") + def test_make_tarball(self): + # creating something to tar + tmpdir = self.mkdtemp() + self.write_file([tmpdir, 'file1'], 'xxx') + self.write_file([tmpdir, 'file2'], 'xxx') + os.mkdir(os.path.join(tmpdir, 'sub')) + self.write_file([tmpdir, 'sub', 'file3'], 'xxx') + + tmpdir2 = self.mkdtemp() + unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0], + "source and target should be on same drive") + + base_name = os.path.join(tmpdir2, 'archive') + + # working with relative paths to avoid tar warnings + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + _make_tarball(splitdrive(base_name)[1], '.') + finally: + os.chdir(old_dir) + + # check if the compressed tarball was created + tarball = base_name + '.tar.gz' + self.assertTrue(os.path.exists(tarball)) + + # trying an uncompressed one + base_name = os.path.join(tmpdir2, 'archive') + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + _make_tarball(splitdrive(base_name)[1], '.', compress=None) + finally: + os.chdir(old_dir) + tarball = base_name + '.tar' + self.assertTrue(os.path.exists(tarball)) + + def _tarinfo(self, path): + tar = tarfile.open(path) + try: + names = tar.getnames() + names.sort() + return tuple(names) + finally: + tar.close() + + def _create_files(self): + # creating something to tar + tmpdir = self.mkdtemp() + dist = os.path.join(tmpdir, 'dist') + os.mkdir(dist) + self.write_file([dist, 'file1'], 'xxx') + self.write_file([dist, 'file2'], 'xxx') + os.mkdir(os.path.join(dist, 'sub')) + self.write_file([dist, 'sub', 'file3'], 'xxx') + os.mkdir(os.path.join(dist, 'sub2')) + tmpdir2 = self.mkdtemp() + base_name = os.path.join(tmpdir2, 'archive') + return tmpdir, tmpdir2, base_name + + @unittest.skipUnless(zlib, "Requires zlib") + @unittest.skipUnless(find_executable('tar') and find_executable('gzip'), + 'Need the tar command to run') + def test_tarfile_vs_tar(self): + tmpdir, tmpdir2, base_name = self._create_files() + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + _make_tarball(base_name, 'dist') + finally: + os.chdir(old_dir) + + # check if the compressed tarball was created + tarball = base_name + '.tar.gz' + self.assertTrue(os.path.exists(tarball)) + + # now create another tarball using `tar` + tarball2 = os.path.join(tmpdir, 'archive2.tar.gz') + tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist'] + gzip_cmd = ['gzip', '-f9', 'archive2.tar'] + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + with captured_stdout() as s: + spawn(tar_cmd) + spawn(gzip_cmd) + finally: + os.chdir(old_dir) + + self.assertTrue(os.path.exists(tarball2)) + # let's compare both tarballs + self.assertEquals(self._tarinfo(tarball), self._tarinfo(tarball2)) + + # trying an uncompressed one + base_name = os.path.join(tmpdir2, 'archive') + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + _make_tarball(base_name, 'dist', compress=None) + finally: + os.chdir(old_dir) + tarball = base_name + '.tar' + self.assertTrue(os.path.exists(tarball)) + + # now for a dry_run + base_name = os.path.join(tmpdir2, 'archive') + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + _make_tarball(base_name, 'dist', compress=None, dry_run=True) + finally: + os.chdir(old_dir) + tarball = base_name + '.tar' + self.assertTrue(os.path.exists(tarball)) + + @unittest.skipUnless(find_executable('compress'), + 'The compress program is required') + def test_compress_deprecated(self): + tmpdir, tmpdir2, base_name = self._create_files() + + # using compress and testing the PendingDeprecationWarning + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + with captured_stdout() as s: + with check_warnings() as w: + warnings.simplefilter("always") + _make_tarball(base_name, 'dist', compress='compress') + finally: + os.chdir(old_dir) + tarball = base_name + '.tar.Z' + self.assertTrue(os.path.exists(tarball)) + self.assertEquals(len(w.warnings), 1) + + # same test with dry_run + os.remove(tarball) + old_dir = os.getcwd() + os.chdir(tmpdir) + try: + with captured_stdout() as s: + with check_warnings() as w: + warnings.simplefilter("always") + _make_tarball(base_name, 'dist', compress='compress', + dry_run=True) + finally: + os.chdir(old_dir) + self.assertTrue(not os.path.exists(tarball)) + self.assertEquals(len(w.warnings), 1) + + @unittest.skipUnless(zlib, "Requires zlib") + @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run') + def test_make_zipfile(self): + # creating something to tar + tmpdir = self.mkdtemp() + self.write_file([tmpdir, 'file1'], 'xxx') + self.write_file([tmpdir, 'file2'], 'xxx') + + tmpdir2 = self.mkdtemp() + base_name = os.path.join(tmpdir2, 'archive') + _make_zipfile(base_name, tmpdir) + + # check if the compressed tarball was created + tarball = base_name + '.zip' + + + def test_make_archive(self): + tmpdir = self.mkdtemp() + base_name = os.path.join(tmpdir, 'archive') + self.assertRaises(ValueError, make_archive, base_name, 'xxx') + + @unittest.skipUnless(zlib, "Requires zlib") + def test_make_archive_owner_group(self): + # testing make_archive with owner and group, with various combinations + # this works even if there's not gid/uid support + if UID_GID_SUPPORT: + group = grp.getgrgid(0)[0] + owner = pwd.getpwuid(0)[0] + else: + group = owner = 'root' + + base_dir, root_dir, base_name = self._create_files() + base_name = os.path.join(self.mkdtemp() , 'archive') + res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, + group=group) + self.assertTrue(os.path.exists(res)) + + res = make_archive(base_name, 'zip', root_dir, base_dir) + self.assertTrue(os.path.exists(res)) + + res = make_archive(base_name, 'tar', root_dir, base_dir, + owner=owner, group=group) + self.assertTrue(os.path.exists(res)) + + res = make_archive(base_name, 'tar', root_dir, base_dir, + owner='kjhkjhkjg', group='oihohoh') + self.assertTrue(os.path.exists(res)) + + @unittest.skipUnless(zlib, "Requires zlib") + @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") + def test_tarfile_root_owner(self): + tmpdir, tmpdir2, base_name = self._create_files() + old_dir = os.getcwd() + os.chdir(tmpdir) + group = grp.getgrgid(0)[0] + owner = pwd.getpwuid(0)[0] + try: + archive_name = _make_tarball(base_name, 'dist', compress=None, + owner=owner, group=group) + finally: + os.chdir(old_dir) + + # check if the compressed tarball was created + self.assertTrue(os.path.exists(archive_name)) + + # now checks the rights + archive = tarfile.open(archive_name) + try: + for member in archive.getmembers(): + self.assertEquals(member.uid, 0) + self.assertEquals(member.gid, 0) + finally: + archive.close() + + def test_make_archive_cwd(self): + current_dir = os.getcwd() + def _breaks(*args, **kw): + raise RuntimeError() + + register_archive_format('xxx', _breaks, [], 'xxx file') + try: + try: + make_archive('xxx', 'xxx', root_dir=self.mkdtemp()) + except Exception: + pass + self.assertEquals(os.getcwd(), current_dir) + finally: + unregister_archive_format('xxx') + + def test_register_archive_format(self): + + self.assertRaises(TypeError, register_archive_format, 'xxx', 1) + self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, + 1) + self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, + [(1, 2), (1, 2, 3)]) + + register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file') + formats = [name for name, params in get_archive_formats()] + self.assertIn('xxx', formats) + + unregister_archive_format('xxx') + formats = [name for name, params in get_archive_formats()] + self.assertNotIn('xxx', formats) + class TestMove(unittest.TestCase): |