diff options
Diffstat (limited to 'Lib/test/test_shutil.py')
-rw-r--r-- | Lib/test/test_shutil.py | 946 |
1 files changed, 774 insertions, 172 deletions
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index 1eb5e89..92ab0a4 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -7,8 +7,9 @@ import sys import stat import os import os.path -import functools import errno +import functools +import subprocess from test import support from test.support import TESTFN from os.path import splitdrive @@ -22,7 +23,7 @@ import tarfile import warnings from test import support -from test.support import TESTFN, check_warnings, captured_stdout +from test.support import TESTFN, check_warnings, captured_stdout, requires_zlib try: import bz2 @@ -40,11 +41,6 @@ except ImportError: UID_GID_SUPPORT = False try: - import zlib -except ImportError: - zlib = None - -try: import zipfile ZIP_SUPPORT = True except ImportError: @@ -52,7 +48,7 @@ except ImportError: def _fake_rename(*args, **kwargs): # Pretend the destination path is on a different filesystem. - raise OSError() + raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link") def mock_rename(func): @functools.wraps(func) @@ -65,6 +61,31 @@ def mock_rename(func): os.rename = builtin_rename return wrap +def write_file(path, content, binary=False): + """Write *content* to a file located at *path*. + + If *path* is a tuple instead of a string, os.path.join will be used to + make a path. If *binary* is true, the file will be opened in binary + mode. + """ + if isinstance(path, tuple): + path = os.path.join(*path) + with open(path, 'wb' if binary else 'w') as fp: + fp.write(content) + +def read_file(path, binary=False): + """Return contents from a file located at *path*. + + If *path* is a tuple instead of a string, os.path.join will be used to + make a path. If *binary* is true, the file will be opened in binary + mode. + """ + if isinstance(path, tuple): + path = os.path.join(*path) + with open(path, 'rb' if binary else 'r') as fp: + return fp.read() + + class TestShutil(unittest.TestCase): def setUp(self): @@ -77,19 +98,6 @@ class TestShutil(unittest.TestCase): d = self.tempdirs.pop() shutil.rmtree(d, os.name in ('nt', 'cygwin')) - def write_file(self, path, content='xxx'): - """Writes a file in the given path. - - - path can be a string or a sequence. - """ - if isinstance(path, (list, tuple)): - path = os.path.join(*path) - f = open(path, 'w') - try: - f.write(content) - finally: - f.close() def mkdtemp(self): """Create a temporary directory that will be cleaned up. @@ -100,6 +108,15 @@ class TestShutil(unittest.TestCase): self.tempdirs.append(d) return d + def test_rmtree_works_on_bytes(self): + tmp = self.mkdtemp() + victim = os.path.join(tmp, 'killme') + os.mkdir(victim) + write_file(os.path.join(victim, 'somefile'), 'foo') + victim = os.fsencode(victim) + self.assertIsInstance(victim, bytes) + shutil.rmtree(victim) + @support.skip_unless_symlink def test_rmtree_fails_on_symlink(self): tmp = self.mkdtemp() @@ -119,18 +136,40 @@ class TestShutil(unittest.TestCase): self.assertEqual(errors[0][1], link) self.assertIsInstance(errors[0][2][1], OSError) + @support.skip_unless_symlink + def test_rmtree_works_on_symlinks(self): + tmp = self.mkdtemp() + dir1 = os.path.join(tmp, 'dir1') + dir2 = os.path.join(dir1, 'dir2') + dir3 = os.path.join(tmp, 'dir3') + for d in dir1, dir2, dir3: + os.mkdir(d) + file1 = os.path.join(tmp, 'file1') + write_file(file1, 'foo') + link1 = os.path.join(dir1, 'link1') + os.symlink(dir2, link1) + link2 = os.path.join(dir1, 'link2') + os.symlink(dir3, link2) + link3 = os.path.join(dir1, 'link3') + os.symlink(file1, link3) + # make sure symlinks are removed but not followed + shutil.rmtree(dir1) + self.assertFalse(os.path.exists(dir1)) + self.assertTrue(os.path.exists(dir3)) + self.assertTrue(os.path.exists(file1)) + def test_rmtree_errors(self): # filename is guaranteed not to exist filename = tempfile.mktemp() - self.assertRaises(OSError, shutil.rmtree, filename) - # test that ignore_errors option is honoured + self.assertRaises(FileNotFoundError, shutil.rmtree, filename) + # test that ignore_errors option is honored shutil.rmtree(filename, ignore_errors=True) # existing file tmpdir = self.mkdtemp() - self.write_file((tmpdir, "tstfile"), "") + write_file((tmpdir, "tstfile"), "") filename = os.path.join(tmpdir, "tstfile") - with self.assertRaises(OSError) as cm: + with self.assertRaises(NotADirectoryError) as cm: shutil.rmtree(filename) if os.name == 'nt': rm_name = os.path.join(filename, '*.*') @@ -138,7 +177,7 @@ class TestShutil(unittest.TestCase): rm_name = filename self.assertEqual(cm.exception.filename, rm_name) self.assertTrue(os.path.exists(filename)) - # test that ignore_errors option is honoured + # test that ignore_errors option is honored shutil.rmtree(filename, ignore_errors=True) self.assertTrue(os.path.exists(filename)) errors = [] @@ -148,11 +187,11 @@ class TestShutil(unittest.TestCase): self.assertEqual(len(errors), 2) self.assertIs(errors[0][0], os.listdir) self.assertEqual(errors[0][1], filename) - self.assertIsInstance(errors[0][2][1], OSError) + self.assertIsInstance(errors[0][2][1], NotADirectoryError) self.assertEqual(errors[0][2][1].filename, rm_name) self.assertIs(errors[1][0], os.rmdir) self.assertEqual(errors[1][1], filename) - self.assertIsInstance(errors[1][2][1], OSError) + self.assertIsInstance(errors[1][2][1], NotADirectoryError) self.assertEqual(errors[1][2][1].filename, rm_name) # See bug #1071513 for why we don't run this on cygwin @@ -162,30 +201,34 @@ class TestShutil(unittest.TestCase): def test_on_error(self): self.errorState = 0 os.mkdir(TESTFN) - self.childpath = os.path.join(TESTFN, 'a') - f = open(self.childpath, 'w') - f.close() + self.addCleanup(shutil.rmtree, TESTFN) + + self.child_file_path = os.path.join(TESTFN, 'a') + self.child_dir_path = os.path.join(TESTFN, 'b') + support.create_empty_file(self.child_file_path) + os.mkdir(self.child_dir_path) old_dir_mode = os.stat(TESTFN).st_mode - old_child_mode = os.stat(self.childpath).st_mode + old_child_file_mode = os.stat(self.child_file_path).st_mode + old_child_dir_mode = os.stat(self.child_dir_path).st_mode # Make unwritable. - os.chmod(self.childpath, stat.S_IREAD) - os.chmod(TESTFN, stat.S_IREAD) + new_mode = stat.S_IREAD|stat.S_IEXEC + os.chmod(self.child_file_path, new_mode) + os.chmod(self.child_dir_path, new_mode) + os.chmod(TESTFN, new_mode) + + self.addCleanup(os.chmod, TESTFN, old_dir_mode) + self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode) + self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode) shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) # Test whether onerror has actually been called. - self.assertEqual(self.errorState, 2, - "Expected call to onerror function did not happen.") - - # Make writable again. - os.chmod(TESTFN, old_dir_mode) - os.chmod(self.childpath, old_child_mode) - - # Clean up. - shutil.rmtree(TESTFN) + self.assertEqual(self.errorState, 3, + "Expected call to onerror function did not " + "happen.") def check_args_to_onerror(self, func, arg, exc): # test_rmtree_errors deliberately runs rmtree - # on a directory that is chmod 400, which will fail. + # on a directory that is chmod 500, which will fail. # This function is run when shutil.rmtree fails. # 99.9% of the time it initially fails to remove # a file in the directory, so the first time through @@ -194,99 +237,436 @@ class TestShutil(unittest.TestCase): # FUSE experienced a failure earlier in the process # at os.listdir. The first failure may legally # be either. - if self.errorState == 0: - if func is os.remove: - self.assertEqual(arg, self.childpath) + if self.errorState < 2: + if func is os.unlink: + self.assertEqual(arg, self.child_file_path) + elif func is os.rmdir: + self.assertEqual(arg, self.child_dir_path) else: - self.assertIs(func, os.listdir, - "func must be either os.remove or os.listdir") - self.assertEqual(arg, TESTFN) + self.assertIs(func, os.listdir) + self.assertIn(arg, [TESTFN, self.child_dir_path]) self.assertTrue(issubclass(exc[0], OSError)) - self.errorState = 1 + self.errorState += 1 else: self.assertEqual(func, os.rmdir) self.assertEqual(arg, TESTFN) self.assertTrue(issubclass(exc[0], OSError)) - self.errorState = 2 + self.errorState = 3 + + def test_rmtree_does_not_choke_on_failing_lstat(self): + try: + orig_lstat = os.lstat + def raiser(fn, *args, **kwargs): + if fn != TESTFN: + raise OSError() + else: + return orig_lstat(fn) + os.lstat = raiser + + os.mkdir(TESTFN) + write_file((TESTFN, 'foo'), 'foo') + shutil.rmtree(TESTFN) + finally: + os.lstat = orig_lstat + + @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod') + @support.skip_unless_symlink + def test_copymode_follow_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + dst_link = os.path.join(tmp_dir, 'quux') + write_file(src, 'foo') + write_file(dst, 'foo') + os.symlink(src, src_link) + os.symlink(dst, dst_link) + os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) + # file to file + os.chmod(dst, stat.S_IRWXO) + self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + shutil.copymode(src, dst) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # follow src link + os.chmod(dst, stat.S_IRWXO) + shutil.copymode(src_link, dst) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # follow dst link + os.chmod(dst, stat.S_IRWXO) + shutil.copymode(src, dst_link) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # follow both links + os.chmod(dst, stat.S_IRWXO) + shutil.copymode(src_link, dst) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + + @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod') + @support.skip_unless_symlink + def test_copymode_symlink_to_symlink(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + dst_link = os.path.join(tmp_dir, 'quux') + write_file(src, 'foo') + write_file(dst, 'foo') + os.symlink(src, src_link) + os.symlink(dst, dst_link) + os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) + os.chmod(dst, stat.S_IRWXU) + os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG) + # link to link + os.lchmod(dst_link, stat.S_IRWXO) + shutil.copymode(src_link, dst_link, follow_symlinks=False) + self.assertEqual(os.lstat(src_link).st_mode, + os.lstat(dst_link).st_mode) + self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # src link - use chmod + os.lchmod(dst_link, stat.S_IRWXO) + shutil.copymode(src_link, dst, follow_symlinks=False) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + # dst link - use chmod + os.lchmod(dst_link, stat.S_IRWXO) + shutil.copymode(src, dst_link, follow_symlinks=False) + self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) + + @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing') + @support.skip_unless_symlink + def test_copymode_symlink_to_symlink_wo_lchmod(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + dst_link = os.path.join(tmp_dir, 'quux') + write_file(src, 'foo') + write_file(dst, 'foo') + os.symlink(src, src_link) + os.symlink(dst, dst_link) + shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail + + @support.skip_unless_symlink + def test_copystat_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + dst_link = os.path.join(tmp_dir, 'qux') + write_file(src, 'foo') + src_stat = os.stat(src) + os.utime(src, (src_stat.st_atime, + src_stat.st_mtime - 42.0)) # ensure different mtimes + write_file(dst, 'bar') + self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime) + os.symlink(src, src_link) + os.symlink(dst, dst_link) + if hasattr(os, 'lchmod'): + os.lchmod(src_link, stat.S_IRWXO) + if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): + os.lchflags(src_link, stat.UF_NODUMP) + src_link_stat = os.lstat(src_link) + # follow + if hasattr(os, 'lchmod'): + shutil.copystat(src_link, dst_link, follow_symlinks=True) + self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode) + # don't follow + shutil.copystat(src_link, dst_link, follow_symlinks=False) + dst_link_stat = os.lstat(dst_link) + if os.utime in os.supports_follow_symlinks: + for attr in 'st_atime', 'st_mtime': + # The modification times may be truncated in the new file. + self.assertLessEqual(getattr(src_link_stat, attr), + getattr(dst_link_stat, attr) + 1) + if hasattr(os, 'lchmod'): + self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode) + if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): + self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags) + # tell to follow but dst is not a link + shutil.copystat(src_link, dst, follow_symlinks=False) + self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) < + 00000.1) + + @unittest.skipUnless(hasattr(os, 'chflags') and + hasattr(errno, 'EOPNOTSUPP') and + hasattr(errno, 'ENOTSUP'), + "requires os.chflags, EOPNOTSUPP & ENOTSUP") + def test_copystat_handles_harmless_chflags_errors(self): + tmpdir = self.mkdtemp() + file1 = os.path.join(tmpdir, 'file1') + file2 = os.path.join(tmpdir, 'file2') + write_file(file1, 'xxx') + write_file(file2, 'xxx') + + def make_chflags_raiser(err): + ex = OSError() + + def _chflags_raiser(path, flags, *, follow_symlinks=True): + ex.errno = err + raise ex + return _chflags_raiser + old_chflags = os.chflags + try: + for err in errno.EOPNOTSUPP, errno.ENOTSUP: + os.chflags = make_chflags_raiser(err) + shutil.copystat(file1, file2) + # assert others errors break it + os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) + self.assertRaises(OSError, shutil.copystat, file1, file2) + finally: + os.chflags = old_chflags + + @support.skip_unless_xattr + def test_copyxattr(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + write_file(src, 'foo') + dst = os.path.join(tmp_dir, 'bar') + write_file(dst, 'bar') + + # no xattr == no problem + shutil._copyxattr(src, dst) + # common case + os.setxattr(src, 'user.foo', b'42') + os.setxattr(src, 'user.bar', b'43') + shutil._copyxattr(src, dst) + self.assertEqual(os.listxattr(src), os.listxattr(dst)) + self.assertEqual( + os.getxattr(src, 'user.foo'), + os.getxattr(dst, 'user.foo')) + # check errors don't affect other attrs + os.remove(dst) + write_file(dst, 'bar') + os_error = OSError(errno.EPERM, 'EPERM') + + def _raise_on_user_foo(fname, attr, val, **kwargs): + if attr == 'user.foo': + raise os_error + else: + orig_setxattr(fname, attr, val, **kwargs) + try: + orig_setxattr = os.setxattr + os.setxattr = _raise_on_user_foo + shutil._copyxattr(src, dst) + self.assertIn('user.bar', os.listxattr(dst)) + finally: + os.setxattr = orig_setxattr + + # test that shutil.copystat copies xattrs + src = os.path.join(tmp_dir, 'the_original') + write_file(src, src) + os.setxattr(src, 'user.the_value', b'fiddly') + dst = os.path.join(tmp_dir, 'the_copy') + write_file(dst, dst) + shutil.copystat(src, dst) + self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly') + + @support.skip_unless_symlink + @support.skip_unless_xattr + @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0, + 'root privileges required') + def test_copyxattr_symlinks(self): + # On Linux, it's only possible to access non-user xattr for symlinks; + # which in turn require root privileges. This test should be expanded + # as soon as other platforms gain support for extended attributes. + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + src_link = os.path.join(tmp_dir, 'baz') + write_file(src, 'foo') + os.symlink(src, src_link) + os.setxattr(src, 'trusted.foo', b'42') + os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False) + dst = os.path.join(tmp_dir, 'bar') + dst_link = os.path.join(tmp_dir, 'qux') + write_file(dst, 'bar') + os.symlink(dst, dst_link) + shutil._copyxattr(src_link, dst_link, follow_symlinks=False) + self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43') + self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo') + shutil._copyxattr(src_link, dst, follow_symlinks=False) + self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43') + + @support.skip_unless_symlink + def test_copy_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + write_file(src, 'foo') + os.symlink(src, src_link) + if hasattr(os, 'lchmod'): + os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) + # don't follow + shutil.copy(src_link, dst, follow_symlinks=True) + self.assertFalse(os.path.islink(dst)) + self.assertEqual(read_file(src), read_file(dst)) + os.remove(dst) + # follow + shutil.copy(src_link, dst, follow_symlinks=False) + self.assertTrue(os.path.islink(dst)) + self.assertEqual(os.readlink(dst), os.readlink(src_link)) + if hasattr(os, 'lchmod'): + self.assertEqual(os.lstat(src_link).st_mode, + os.lstat(dst).st_mode) + + @support.skip_unless_symlink + def test_copy2_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + src_link = os.path.join(tmp_dir, 'baz') + write_file(src, 'foo') + os.symlink(src, src_link) + if hasattr(os, 'lchmod'): + os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) + if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): + os.lchflags(src_link, stat.UF_NODUMP) + src_stat = os.stat(src) + src_link_stat = os.lstat(src_link) + # follow + shutil.copy2(src_link, dst, follow_symlinks=True) + self.assertFalse(os.path.islink(dst)) + self.assertEqual(read_file(src), read_file(dst)) + os.remove(dst) + # don't follow + shutil.copy2(src_link, dst, follow_symlinks=False) + self.assertTrue(os.path.islink(dst)) + self.assertEqual(os.readlink(dst), os.readlink(src_link)) + dst_stat = os.lstat(dst) + if os.utime in os.supports_follow_symlinks: + for attr in 'st_atime', 'st_mtime': + # The modification times may be truncated in the new file. + self.assertLessEqual(getattr(src_link_stat, attr), + getattr(dst_stat, attr) + 1) + if hasattr(os, 'lchmod'): + self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode) + self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode) + if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): + self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags) + + @support.skip_unless_xattr + def test_copy2_xattr(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'foo') + dst = os.path.join(tmp_dir, 'bar') + write_file(src, 'foo') + os.setxattr(src, 'user.foo', b'42') + shutil.copy2(src, dst) + self.assertEqual( + os.getxattr(src, 'user.foo'), + os.getxattr(dst, 'user.foo')) + os.remove(dst) + + @support.skip_unless_symlink + def test_copyfile_symlinks(self): + tmp_dir = self.mkdtemp() + src = os.path.join(tmp_dir, 'src') + dst = os.path.join(tmp_dir, 'dst') + dst_link = os.path.join(tmp_dir, 'dst_link') + link = os.path.join(tmp_dir, 'link') + write_file(src, 'foo') + os.symlink(src, link) + # don't follow + shutil.copyfile(link, dst_link, follow_symlinks=False) + self.assertTrue(os.path.islink(dst_link)) + self.assertEqual(os.readlink(link), os.readlink(dst_link)) + # follow + shutil.copyfile(link, dst) + self.assertFalse(os.path.islink(dst)) + + def test_rmtree_uses_safe_fd_version_if_available(self): + _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= + os.supports_dir_fd and + os.listdir in os.supports_fd and + os.stat in os.supports_follow_symlinks) + if _use_fd_functions: + self.assertTrue(shutil._use_fd_functions) + self.assertTrue(shutil.rmtree.avoids_symlink_attacks) + tmp_dir = self.mkdtemp() + d = os.path.join(tmp_dir, 'a') + os.mkdir(d) + try: + real_rmtree = shutil._rmtree_safe_fd + class Called(Exception): pass + def _raiser(*args, **kwargs): + raise Called + shutil._rmtree_safe_fd = _raiser + self.assertRaises(Called, shutil.rmtree, d) + finally: + shutil._rmtree_safe_fd = real_rmtree + else: + self.assertFalse(shutil._use_fd_functions) + self.assertFalse(shutil.rmtree.avoids_symlink_attacks) def test_rmtree_dont_delete_file(self): # When called on a file instead of a directory, don't delete it. handle, path = tempfile.mkstemp() - os.fdopen(handle).close() - self.assertRaises(OSError, shutil.rmtree, path) + os.close(handle) + self.assertRaises(NotADirectoryError, shutil.rmtree, path) os.remove(path) - def _write_data(self, path, data): - f = open(path, "w") - f.write(data) - f.close() - def test_copytree_simple(self): - - def read_data(path): - f = open(path) - data = f.read() - f.close() - return data - src_dir = tempfile.mkdtemp() dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') - self._write_data(os.path.join(src_dir, 'test.txt'), '123') + self.addCleanup(shutil.rmtree, src_dir) + self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) + write_file((src_dir, 'test.txt'), '123') os.mkdir(os.path.join(src_dir, 'test_dir')) - self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') + write_file((src_dir, 'test_dir', 'test.txt'), '456') + + shutil.copytree(src_dir, dst_dir) + self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt'))) + self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir'))) + self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir', + 'test.txt'))) + actual = read_file((dst_dir, 'test.txt')) + self.assertEqual(actual, '123') + actual = read_file((dst_dir, 'test_dir', 'test.txt')) + self.assertEqual(actual, '456') - try: - shutil.copytree(src_dir, dst_dir) - self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt'))) - self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir'))) - self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir', - 'test.txt'))) - actual = read_data(os.path.join(dst_dir, 'test.txt')) - self.assertEqual(actual, '123') - actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt')) - self.assertEqual(actual, '456') - finally: - for path in ( - os.path.join(src_dir, 'test.txt'), - os.path.join(dst_dir, 'test.txt'), - os.path.join(src_dir, 'test_dir', 'test.txt'), - os.path.join(dst_dir, 'test_dir', 'test.txt'), - ): - if os.path.exists(path): - os.remove(path) - for path in (src_dir, - os.path.dirname(dst_dir) - ): - if os.path.exists(path): - shutil.rmtree(path) + @support.skip_unless_symlink + def test_copytree_symlinks(self): + tmp_dir = self.mkdtemp() + src_dir = os.path.join(tmp_dir, 'src') + dst_dir = os.path.join(tmp_dir, 'dst') + sub_dir = os.path.join(src_dir, 'sub') + os.mkdir(src_dir) + os.mkdir(sub_dir) + write_file((src_dir, 'file.txt'), 'foo') + src_link = os.path.join(sub_dir, 'link') + dst_link = os.path.join(dst_dir, 'sub/link') + os.symlink(os.path.join(src_dir, 'file.txt'), + src_link) + if hasattr(os, 'lchmod'): + os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) + if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): + os.lchflags(src_link, stat.UF_NODUMP) + src_stat = os.lstat(src_link) + shutil.copytree(src_dir, dst_dir, symlinks=True) + self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link'))) + self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')), + os.path.join(src_dir, 'file.txt')) + dst_stat = os.lstat(dst_link) + if hasattr(os, 'lchmod'): + self.assertEqual(dst_stat.st_mode, src_stat.st_mode) + if hasattr(os, 'lchflags'): + self.assertEqual(dst_stat.st_flags, src_stat.st_flags) def test_copytree_with_exclude(self): - - def read_data(path): - f = open(path) - data = f.read() - f.close() - return data - # creating data join = os.path.join exists = os.path.exists src_dir = tempfile.mkdtemp() try: dst_dir = join(tempfile.mkdtemp(), 'destination') - self._write_data(join(src_dir, 'test.txt'), '123') - self._write_data(join(src_dir, 'test.tmp'), '123') + write_file((src_dir, 'test.txt'), '123') + write_file((src_dir, 'test.tmp'), '123') os.mkdir(join(src_dir, 'test_dir')) - self._write_data(join(src_dir, 'test_dir', 'test.txt'), '456') + write_file((src_dir, 'test_dir', 'test.txt'), '456') os.mkdir(join(src_dir, 'test_dir2')) - self._write_data(join(src_dir, 'test_dir2', 'test.txt'), '456') + write_file((src_dir, 'test_dir2', 'test.txt'), '456') os.mkdir(join(src_dir, 'test_dir2', 'subdir')) os.mkdir(join(src_dir, 'test_dir2', 'subdir2')) - self._write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), - '456') - self._write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), - '456') - + write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456') + write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456') # testing glob-like patterns try: @@ -294,21 +674,19 @@ class TestShutil(unittest.TestCase): shutil.copytree(src_dir, dst_dir, ignore=patterns) # checking the result: some elements should not be copied self.assertTrue(exists(join(dst_dir, 'test.txt'))) - self.assertTrue(not exists(join(dst_dir, 'test.tmp'))) - self.assertTrue(not exists(join(dst_dir, 'test_dir2'))) + self.assertFalse(exists(join(dst_dir, 'test.tmp'))) + self.assertFalse(exists(join(dst_dir, 'test_dir2'))) finally: - if os.path.exists(dst_dir): - shutil.rmtree(dst_dir) + shutil.rmtree(dst_dir) try: patterns = shutil.ignore_patterns('*.tmp', 'subdir*') shutil.copytree(src_dir, dst_dir, ignore=patterns) # checking the result: some elements should not be copied - self.assertTrue(not exists(join(dst_dir, 'test.tmp'))) - self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2'))) - self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir'))) + self.assertFalse(exists(join(dst_dir, 'test.tmp'))) + self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2'))) + self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) finally: - if os.path.exists(dst_dir): - shutil.rmtree(dst_dir) + shutil.rmtree(dst_dir) # testing callable-style try: @@ -327,13 +705,12 @@ class TestShutil(unittest.TestCase): shutil.copytree(src_dir, dst_dir, ignore=_filter) # checking the result: some elements should not be copied - self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2', - 'test.py'))) - self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir'))) + self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2', + 'test.py'))) + self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) finally: - if os.path.exists(dst_dir): - shutil.rmtree(dst_dir) + shutil.rmtree(dst_dir) finally: shutil.rmtree(src_dir) shutil.rmtree(os.path.dirname(dst_dir)) @@ -358,35 +735,6 @@ class TestShutil(unittest.TestCase): finally: shutil.rmtree(TESTFN, ignore_errors=True) - @unittest.skipUnless(hasattr(os, 'chflags') and - hasattr(errno, 'EOPNOTSUPP') and - hasattr(errno, 'ENOTSUP'), - "requires os.chflags, EOPNOTSUPP & ENOTSUP") - def test_copystat_handles_harmless_chflags_errors(self): - tmpdir = self.mkdtemp() - file1 = os.path.join(tmpdir, 'file1') - file2 = os.path.join(tmpdir, 'file2') - self.write_file(file1, 'xxx') - self.write_file(file2, 'xxx') - - def make_chflags_raiser(err): - ex = OSError() - - def _chflags_raiser(path, flags): - ex.errno = err - raise ex - return _chflags_raiser - old_chflags = os.chflags - try: - for err in errno.EOPNOTSUPP, errno.ENOTSUP: - os.chflags = make_chflags_raiser(err) - shutil.copystat(file1, file2) - # assert others errors break it - os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) - self.assertRaises(OSError, shutil.copystat, file1, file2) - finally: - os.chflags = old_chflags - @support.skip_unless_symlink def test_dont_copy_file_onto_symlink_to_itself(self): # bug 851123. @@ -417,6 +765,7 @@ class TestShutil(unittest.TestCase): os.mkdir(src) os.symlink(src, dst) self.assertRaises(OSError, shutil.rmtree, dst) + shutil.rmtree(dst, ignore_errors=True) finally: shutil.rmtree(TESTFN, ignore_errors=True) @@ -457,9 +806,9 @@ class TestShutil(unittest.TestCase): src_dir = self.mkdtemp() dst_dir = os.path.join(self.mkdtemp(), 'destination') - self._write_data(os.path.join(src_dir, 'test.txt'), '123') + write_file((src_dir, 'test.txt'), '123') os.mkdir(os.path.join(src_dir, 'test_dir')) - self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') + write_file((src_dir, 'test_dir', 'test.txt'), '456') copied = [] def _copy(src, dst): @@ -476,7 +825,7 @@ class TestShutil(unittest.TestCase): dst_dir = os.path.join(self.mkdtemp(), 'destination') os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt')) os.mkdir(os.path.join(src_dir, 'test_dir')) - self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') + write_file((src_dir, 'test_dir', 'test.txt'), '456') self.assertRaises(Error, shutil.copytree, src_dir, dst_dir) # a dangling symlink is ignored with the proper flag @@ -492,7 +841,7 @@ class TestShutil(unittest.TestCase): def _copy_file(self, method): fname = 'test.txt' tmpdir = self.mkdtemp() - self.write_file([tmpdir, fname]) + write_file((tmpdir, fname), 'xxx') file1 = os.path.join(tmpdir, fname) tmpdir2 = self.mkdtemp() method(file1, tmpdir2) @@ -524,14 +873,14 @@ class TestShutil(unittest.TestCase): self.assertEqual(getattr(file1_stat, 'st_flags'), getattr(file2_stat, 'st_flags')) - @unittest.skipUnless(zlib, "requires zlib") + @requires_zlib def test_make_tarball(self): # creating something to tar tmpdir = self.mkdtemp() - self.write_file([tmpdir, 'file1'], 'xxx') - self.write_file([tmpdir, 'file2'], 'xxx') + write_file((tmpdir, 'file1'), 'xxx') + write_file((tmpdir, 'file2'), 'xxx') os.mkdir(os.path.join(tmpdir, 'sub')) - self.write_file([tmpdir, 'sub', 'file3'], 'xxx') + write_file((tmpdir, 'sub', 'file3'), 'xxx') tmpdir2 = self.mkdtemp() # force shutil to create the directory @@ -578,16 +927,16 @@ class TestShutil(unittest.TestCase): tmpdir = self.mkdtemp() dist = os.path.join(tmpdir, 'dist') os.mkdir(dist) - self.write_file([dist, 'file1'], 'xxx') - self.write_file([dist, 'file2'], 'xxx') + write_file((dist, 'file1'), 'xxx') + write_file((dist, 'file2'), 'xxx') os.mkdir(os.path.join(dist, 'sub')) - self.write_file([dist, 'sub', 'file3'], 'xxx') + write_file((dist, 'sub', 'file3'), 'xxx') os.mkdir(os.path.join(dist, 'sub2')) tmpdir2 = self.mkdtemp() base_name = os.path.join(tmpdir2, 'archive') return tmpdir, tmpdir2, base_name - @unittest.skipUnless(zlib, "Requires zlib") + @requires_zlib @unittest.skipUnless(find_executable('tar') and find_executable('gzip'), 'Need the tar command to run') def test_tarfile_vs_tar(self): @@ -642,13 +991,13 @@ class TestShutil(unittest.TestCase): tarball = base_name + '.tar' self.assertTrue(os.path.exists(tarball)) - @unittest.skipUnless(zlib, "Requires zlib") + @requires_zlib @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run') def test_make_zipfile(self): # creating something to tar tmpdir = self.mkdtemp() - self.write_file([tmpdir, 'file1'], 'xxx') - self.write_file([tmpdir, 'file2'], 'xxx') + write_file((tmpdir, 'file1'), 'xxx') + write_file((tmpdir, 'file2'), 'xxx') tmpdir2 = self.mkdtemp() # force shutil to create the directory @@ -666,7 +1015,7 @@ class TestShutil(unittest.TestCase): base_name = os.path.join(tmpdir, 'archive') self.assertRaises(ValueError, make_archive, base_name, 'xxx') - @unittest.skipUnless(zlib, "Requires zlib") + @requires_zlib def test_make_archive_owner_group(self): # testing make_archive with owner and group, with various combinations # this works even if there's not gid/uid support @@ -694,7 +1043,7 @@ class TestShutil(unittest.TestCase): self.assertTrue(os.path.exists(res)) - @unittest.skipUnless(zlib, "Requires zlib") + @requires_zlib @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") def test_tarfile_root_owner(self): tmpdir, tmpdir2, base_name = self._create_files() @@ -763,7 +1112,7 @@ class TestShutil(unittest.TestCase): diff.append(file_) return diff - @unittest.skipUnless(zlib, "Requires zlib") + @requires_zlib def test_unpack_archive(self): formats = ['tar', 'gztar', 'zip'] if BZ2_SUPPORTED: @@ -814,6 +1163,162 @@ class TestShutil(unittest.TestCase): unregister_unpack_format('Boo2') self.assertEqual(get_unpack_formats(), formats) + @unittest.skipUnless(hasattr(shutil, 'disk_usage'), + "disk_usage not available on this platform") + def test_disk_usage(self): + usage = shutil.disk_usage(os.getcwd()) + self.assertGreater(usage.total, 0) + self.assertGreater(usage.used, 0) + self.assertGreaterEqual(usage.free, 0) + self.assertGreaterEqual(usage.total, usage.used) + self.assertGreater(usage.total, usage.free) + + @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") + @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown') + def test_chown(self): + + # cleaned-up automatically by TestShutil.tearDown method + dirname = self.mkdtemp() + filename = tempfile.mktemp(dir=dirname) + write_file(filename, 'testing chown function') + + with self.assertRaises(ValueError): + shutil.chown(filename) + + with self.assertRaises(LookupError): + shutil.chown(filename, user='non-exising username') + + with self.assertRaises(LookupError): + shutil.chown(filename, group='non-exising groupname') + + with self.assertRaises(TypeError): + shutil.chown(filename, b'spam') + + with self.assertRaises(TypeError): + shutil.chown(filename, 3.14) + + uid = os.getuid() + gid = os.getgid() + + def check_chown(path, uid=None, gid=None): + s = os.stat(filename) + if uid is not None: + self.assertEqual(uid, s.st_uid) + if gid is not None: + self.assertEqual(gid, s.st_gid) + + shutil.chown(filename, uid, gid) + check_chown(filename, uid, gid) + shutil.chown(filename, uid) + check_chown(filename, uid) + shutil.chown(filename, user=uid) + check_chown(filename, uid) + shutil.chown(filename, group=gid) + check_chown(filename, gid=gid) + + shutil.chown(dirname, uid, gid) + check_chown(dirname, uid, gid) + shutil.chown(dirname, uid) + check_chown(dirname, uid) + shutil.chown(dirname, user=uid) + check_chown(dirname, uid) + shutil.chown(dirname, group=gid) + check_chown(dirname, gid=gid) + + user = pwd.getpwuid(uid)[0] + group = grp.getgrgid(gid)[0] + shutil.chown(filename, user, group) + check_chown(filename, uid, gid) + shutil.chown(dirname, user, group) + check_chown(dirname, uid, gid) + + def test_copy_return_value(self): + # copy and copy2 both return their destination path. + for fn in (shutil.copy, shutil.copy2): + src_dir = self.mkdtemp() + dst_dir = self.mkdtemp() + src = os.path.join(src_dir, 'foo') + write_file(src, 'foo') + rv = fn(src, dst_dir) + self.assertEqual(rv, os.path.join(dst_dir, 'foo')) + rv = fn(src, os.path.join(dst_dir, 'bar')) + self.assertEqual(rv, os.path.join(dst_dir, 'bar')) + + def test_copyfile_return_value(self): + # copytree returns its destination path. + src_dir = self.mkdtemp() + dst_dir = self.mkdtemp() + dst_file = os.path.join(dst_dir, 'bar') + src_file = os.path.join(src_dir, 'foo') + write_file(src_file, 'foo') + rv = shutil.copyfile(src_file, dst_file) + self.assertTrue(os.path.exists(rv)) + self.assertEqual(read_file(src_file), read_file(dst_file)) + + def test_copytree_return_value(self): + # copytree returns its destination path. + src_dir = self.mkdtemp() + dst_dir = src_dir + "dest" + self.addCleanup(shutil.rmtree, dst_dir, True) + src = os.path.join(src_dir, 'foo') + write_file(src, 'foo') + rv = shutil.copytree(src_dir, dst_dir) + self.assertEqual(['foo'], os.listdir(rv)) + + +class TestWhich(unittest.TestCase): + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.temp_dir, True) + # Give the temp_file an ".exe" suffix for all. + # It's needed on Windows and not harmful on other platforms. + self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir, + suffix=".exe") + os.chmod(self.temp_file.name, stat.S_IXUSR) + self.addCleanup(self.temp_file.close) + self.dir, self.file = os.path.split(self.temp_file.name) + + def test_basic(self): + # Given an EXE in a directory, it should be returned. + rv = shutil.which(self.file, path=self.dir) + self.assertEqual(rv, self.temp_file.name) + + def test_full_path_short_circuit(self): + # When given the fully qualified path to an executable that exists, + # it should be returned. + rv = shutil.which(self.temp_file.name, path=self.temp_dir) + self.assertEqual(self.temp_file.name, rv) + + def test_non_matching_mode(self): + # Set the file read-only and ask for writeable files. + os.chmod(self.temp_file.name, stat.S_IREAD) + rv = shutil.which(self.file, path=self.dir, mode=os.W_OK) + self.assertIsNone(rv) + + def test_relative(self): + old_cwd = os.getcwd() + base_dir, tail_dir = os.path.split(self.dir) + os.chdir(base_dir) + try: + rv = shutil.which(self.file, path=tail_dir) + self.assertEqual(rv, os.path.join(tail_dir, self.file)) + finally: + os.chdir(old_cwd) + + def test_nonexistent_file(self): + # Return None when no matching executable file is found on the path. + rv = shutil.which("foo.exe", path=self.dir) + self.assertIsNone(rv) + + @unittest.skipUnless(sys.platform == "win32", + "pathext check is Windows-only") + def test_pathext_checking(self): + # Ask for the file without the ".exe" extension, then ensure that + # it gets found properly with the extension. + rv = shutil.which(self.temp_file.name[:-4], path=self.dir) + self.assertEqual(self.temp_file.name, rv) + class TestMove(unittest.TestCase): @@ -927,6 +1432,58 @@ class TestMove(unittest.TestCase): finally: shutil.rmtree(TESTFN, ignore_errors=True) + @support.skip_unless_symlink + @mock_rename + def test_move_file_symlink(self): + dst = os.path.join(self.src_dir, 'bar') + os.symlink(self.src_file, dst) + shutil.move(dst, self.dst_file) + self.assertTrue(os.path.islink(self.dst_file)) + self.assertTrue(os.path.samefile(self.src_file, self.dst_file)) + + @support.skip_unless_symlink + @mock_rename + def test_move_file_symlink_to_dir(self): + filename = "bar" + dst = os.path.join(self.src_dir, filename) + os.symlink(self.src_file, dst) + shutil.move(dst, self.dst_dir) + final_link = os.path.join(self.dst_dir, filename) + self.assertTrue(os.path.islink(final_link)) + self.assertTrue(os.path.samefile(self.src_file, final_link)) + + @support.skip_unless_symlink + @mock_rename + def test_move_dangling_symlink(self): + src = os.path.join(self.src_dir, 'baz') + dst = os.path.join(self.src_dir, 'bar') + os.symlink(src, dst) + dst_link = os.path.join(self.dst_dir, 'quux') + shutil.move(dst, dst_link) + self.assertTrue(os.path.islink(dst_link)) + self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link)) + + @support.skip_unless_symlink + @mock_rename + def test_move_dir_symlink(self): + src = os.path.join(self.src_dir, 'baz') + dst = os.path.join(self.src_dir, 'bar') + os.mkdir(src) + os.symlink(src, dst) + dst_link = os.path.join(self.dst_dir, 'quux') + shutil.move(dst, dst_link) + self.assertTrue(os.path.islink(dst_link)) + self.assertTrue(os.path.samefile(src, dst_link)) + + def test_move_return_value(self): + rv = shutil.move(self.src_file, self.dst_dir) + self.assertEqual(rv, + os.path.join(self.dst_dir, os.path.basename(self.src_file))) + + def test_move_as_rename_return_value(self): + rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar')) + self.assertEqual(rv, os.path.join(self.dst_dir, 'bar')) + class TestCopyFile(unittest.TestCase): @@ -1036,6 +1593,7 @@ class TestCopyFile(unittest.TestCase): # but a different case. self.src_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.src_dir, True) dst_dir = os.path.join( os.path.dirname(self.src_dir), os.path.basename(self.src_dir).upper()) @@ -1045,13 +1603,57 @@ class TestCopyFile(unittest.TestCase): shutil.move(self.src_dir, dst_dir) self.assertTrue(os.path.isdir(dst_dir)) finally: - if os.path.exists(dst_dir): - os.rmdir(dst_dir) + os.rmdir(dst_dir) + +class TermsizeTests(unittest.TestCase): + def test_does_not_crash(self): + """Check if get_terminal_size() returns a meaningful value. + + There's no easy portable way to actually check the size of the + terminal, so let's check if it returns something sensible instead. + """ + size = shutil.get_terminal_size() + self.assertGreaterEqual(size.columns, 0) + self.assertGreaterEqual(size.lines, 0) + + def test_os_environ_first(self): + "Check if environment variables have precedence" + + with support.EnvironmentVarGuard() as env: + env['COLUMNS'] = '777' + size = shutil.get_terminal_size() + self.assertEqual(size.columns, 777) + + with support.EnvironmentVarGuard() as env: + env['LINES'] = '888' + size = shutil.get_terminal_size() + self.assertEqual(size.lines, 888) + + @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty") + def test_stty_match(self): + """Check if stty returns the same results ignoring env + + This test will fail if stdin and stdout are connected to + different terminals with different sizes. Nevertheless, such + situations should be pretty rare. + """ + try: + size = subprocess.check_output(['stty', 'size']).decode().split() + except (FileNotFoundError, subprocess.CalledProcessError): + self.skipTest("stty invocation failed") + expected = (int(size[1]), int(size[0])) # reversed order + + with support.EnvironmentVarGuard() as env: + del env['LINES'] + del env['COLUMNS'] + actual = shutil.get_terminal_size() + self.assertEqual(expected, actual) def test_main(): - support.run_unittest(TestShutil, TestMove, TestCopyFile) + support.run_unittest(TestShutil, TestMove, TestCopyFile, + TermsizeTests, TestWhich) if __name__ == '__main__': test_main() |