summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_shutil.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_shutil.py')
-rw-r--r--Lib/test/test_shutil.py946
1 files changed, 774 insertions, 172 deletions
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
index 1eb5e89..92ab0a4 100644
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -7,8 +7,9 @@ import sys
import stat
import os
import os.path
-import functools
import errno
+import functools
+import subprocess
from test import support
from test.support import TESTFN
from os.path import splitdrive
@@ -22,7 +23,7 @@ import tarfile
import warnings
from test import support
-from test.support import TESTFN, check_warnings, captured_stdout
+from test.support import TESTFN, check_warnings, captured_stdout, requires_zlib
try:
import bz2
@@ -40,11 +41,6 @@ except ImportError:
UID_GID_SUPPORT = False
try:
- import zlib
-except ImportError:
- zlib = None
-
-try:
import zipfile
ZIP_SUPPORT = True
except ImportError:
@@ -52,7 +48,7 @@ except ImportError:
def _fake_rename(*args, **kwargs):
# Pretend the destination path is on a different filesystem.
- raise OSError()
+ raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
def mock_rename(func):
@functools.wraps(func)
@@ -65,6 +61,31 @@ def mock_rename(func):
os.rename = builtin_rename
return wrap
+def write_file(path, content, binary=False):
+ """Write *content* to a file located at *path*.
+
+ If *path* is a tuple instead of a string, os.path.join will be used to
+ make a path. If *binary* is true, the file will be opened in binary
+ mode.
+ """
+ if isinstance(path, tuple):
+ path = os.path.join(*path)
+ with open(path, 'wb' if binary else 'w') as fp:
+ fp.write(content)
+
+def read_file(path, binary=False):
+ """Return contents from a file located at *path*.
+
+ If *path* is a tuple instead of a string, os.path.join will be used to
+ make a path. If *binary* is true, the file will be opened in binary
+ mode.
+ """
+ if isinstance(path, tuple):
+ path = os.path.join(*path)
+ with open(path, 'rb' if binary else 'r') as fp:
+ return fp.read()
+
+
class TestShutil(unittest.TestCase):
def setUp(self):
@@ -77,19 +98,6 @@ class TestShutil(unittest.TestCase):
d = self.tempdirs.pop()
shutil.rmtree(d, os.name in ('nt', 'cygwin'))
- def write_file(self, path, content='xxx'):
- """Writes a file in the given path.
-
-
- path can be a string or a sequence.
- """
- if isinstance(path, (list, tuple)):
- path = os.path.join(*path)
- f = open(path, 'w')
- try:
- f.write(content)
- finally:
- f.close()
def mkdtemp(self):
"""Create a temporary directory that will be cleaned up.
@@ -100,6 +108,15 @@ class TestShutil(unittest.TestCase):
self.tempdirs.append(d)
return d
+ def test_rmtree_works_on_bytes(self):
+ tmp = self.mkdtemp()
+ victim = os.path.join(tmp, 'killme')
+ os.mkdir(victim)
+ write_file(os.path.join(victim, 'somefile'), 'foo')
+ victim = os.fsencode(victim)
+ self.assertIsInstance(victim, bytes)
+ shutil.rmtree(victim)
+
@support.skip_unless_symlink
def test_rmtree_fails_on_symlink(self):
tmp = self.mkdtemp()
@@ -119,18 +136,40 @@ class TestShutil(unittest.TestCase):
self.assertEqual(errors[0][1], link)
self.assertIsInstance(errors[0][2][1], OSError)
+ @support.skip_unless_symlink
+ def test_rmtree_works_on_symlinks(self):
+ tmp = self.mkdtemp()
+ dir1 = os.path.join(tmp, 'dir1')
+ dir2 = os.path.join(dir1, 'dir2')
+ dir3 = os.path.join(tmp, 'dir3')
+ for d in dir1, dir2, dir3:
+ os.mkdir(d)
+ file1 = os.path.join(tmp, 'file1')
+ write_file(file1, 'foo')
+ link1 = os.path.join(dir1, 'link1')
+ os.symlink(dir2, link1)
+ link2 = os.path.join(dir1, 'link2')
+ os.symlink(dir3, link2)
+ link3 = os.path.join(dir1, 'link3')
+ os.symlink(file1, link3)
+ # make sure symlinks are removed but not followed
+ shutil.rmtree(dir1)
+ self.assertFalse(os.path.exists(dir1))
+ self.assertTrue(os.path.exists(dir3))
+ self.assertTrue(os.path.exists(file1))
+
def test_rmtree_errors(self):
# filename is guaranteed not to exist
filename = tempfile.mktemp()
- self.assertRaises(OSError, shutil.rmtree, filename)
- # test that ignore_errors option is honoured
+ self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
+ # test that ignore_errors option is honored
shutil.rmtree(filename, ignore_errors=True)
# existing file
tmpdir = self.mkdtemp()
- self.write_file((tmpdir, "tstfile"), "")
+ write_file((tmpdir, "tstfile"), "")
filename = os.path.join(tmpdir, "tstfile")
- with self.assertRaises(OSError) as cm:
+ with self.assertRaises(NotADirectoryError) as cm:
shutil.rmtree(filename)
if os.name == 'nt':
rm_name = os.path.join(filename, '*.*')
@@ -138,7 +177,7 @@ class TestShutil(unittest.TestCase):
rm_name = filename
self.assertEqual(cm.exception.filename, rm_name)
self.assertTrue(os.path.exists(filename))
- # test that ignore_errors option is honoured
+ # test that ignore_errors option is honored
shutil.rmtree(filename, ignore_errors=True)
self.assertTrue(os.path.exists(filename))
errors = []
@@ -148,11 +187,11 @@ class TestShutil(unittest.TestCase):
self.assertEqual(len(errors), 2)
self.assertIs(errors[0][0], os.listdir)
self.assertEqual(errors[0][1], filename)
- self.assertIsInstance(errors[0][2][1], OSError)
+ self.assertIsInstance(errors[0][2][1], NotADirectoryError)
self.assertEqual(errors[0][2][1].filename, rm_name)
self.assertIs(errors[1][0], os.rmdir)
self.assertEqual(errors[1][1], filename)
- self.assertIsInstance(errors[1][2][1], OSError)
+ self.assertIsInstance(errors[1][2][1], NotADirectoryError)
self.assertEqual(errors[1][2][1].filename, rm_name)
# See bug #1071513 for why we don't run this on cygwin
@@ -162,30 +201,34 @@ class TestShutil(unittest.TestCase):
def test_on_error(self):
self.errorState = 0
os.mkdir(TESTFN)
- self.childpath = os.path.join(TESTFN, 'a')
- f = open(self.childpath, 'w')
- f.close()
+ self.addCleanup(shutil.rmtree, TESTFN)
+
+ self.child_file_path = os.path.join(TESTFN, 'a')
+ self.child_dir_path = os.path.join(TESTFN, 'b')
+ support.create_empty_file(self.child_file_path)
+ os.mkdir(self.child_dir_path)
old_dir_mode = os.stat(TESTFN).st_mode
- old_child_mode = os.stat(self.childpath).st_mode
+ old_child_file_mode = os.stat(self.child_file_path).st_mode
+ old_child_dir_mode = os.stat(self.child_dir_path).st_mode
# Make unwritable.
- os.chmod(self.childpath, stat.S_IREAD)
- os.chmod(TESTFN, stat.S_IREAD)
+ new_mode = stat.S_IREAD|stat.S_IEXEC
+ os.chmod(self.child_file_path, new_mode)
+ os.chmod(self.child_dir_path, new_mode)
+ os.chmod(TESTFN, new_mode)
+
+ self.addCleanup(os.chmod, TESTFN, old_dir_mode)
+ self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
+ self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
# Test whether onerror has actually been called.
- self.assertEqual(self.errorState, 2,
- "Expected call to onerror function did not happen.")
-
- # Make writable again.
- os.chmod(TESTFN, old_dir_mode)
- os.chmod(self.childpath, old_child_mode)
-
- # Clean up.
- shutil.rmtree(TESTFN)
+ self.assertEqual(self.errorState, 3,
+ "Expected call to onerror function did not "
+ "happen.")
def check_args_to_onerror(self, func, arg, exc):
# test_rmtree_errors deliberately runs rmtree
- # on a directory that is chmod 400, which will fail.
+ # on a directory that is chmod 500, which will fail.
# This function is run when shutil.rmtree fails.
# 99.9% of the time it initially fails to remove
# a file in the directory, so the first time through
@@ -194,99 +237,436 @@ class TestShutil(unittest.TestCase):
# FUSE experienced a failure earlier in the process
# at os.listdir. The first failure may legally
# be either.
- if self.errorState == 0:
- if func is os.remove:
- self.assertEqual(arg, self.childpath)
+ if self.errorState < 2:
+ if func is os.unlink:
+ self.assertEqual(arg, self.child_file_path)
+ elif func is os.rmdir:
+ self.assertEqual(arg, self.child_dir_path)
else:
- self.assertIs(func, os.listdir,
- "func must be either os.remove or os.listdir")
- self.assertEqual(arg, TESTFN)
+ self.assertIs(func, os.listdir)
+ self.assertIn(arg, [TESTFN, self.child_dir_path])
self.assertTrue(issubclass(exc[0], OSError))
- self.errorState = 1
+ self.errorState += 1
else:
self.assertEqual(func, os.rmdir)
self.assertEqual(arg, TESTFN)
self.assertTrue(issubclass(exc[0], OSError))
- self.errorState = 2
+ self.errorState = 3
+
+ def test_rmtree_does_not_choke_on_failing_lstat(self):
+ try:
+ orig_lstat = os.lstat
+ def raiser(fn, *args, **kwargs):
+ if fn != TESTFN:
+ raise OSError()
+ else:
+ return orig_lstat(fn)
+ os.lstat = raiser
+
+ os.mkdir(TESTFN)
+ write_file((TESTFN, 'foo'), 'foo')
+ shutil.rmtree(TESTFN)
+ finally:
+ os.lstat = orig_lstat
+
+ @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
+ @support.skip_unless_symlink
+ def test_copymode_follow_symlinks(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ src_link = os.path.join(tmp_dir, 'baz')
+ dst_link = os.path.join(tmp_dir, 'quux')
+ write_file(src, 'foo')
+ write_file(dst, 'foo')
+ os.symlink(src, src_link)
+ os.symlink(dst, dst_link)
+ os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
+ # file to file
+ os.chmod(dst, stat.S_IRWXO)
+ self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+ shutil.copymode(src, dst)
+ self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+ # follow src link
+ os.chmod(dst, stat.S_IRWXO)
+ shutil.copymode(src_link, dst)
+ self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+ # follow dst link
+ os.chmod(dst, stat.S_IRWXO)
+ shutil.copymode(src, dst_link)
+ self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+ # follow both links
+ os.chmod(dst, stat.S_IRWXO)
+ shutil.copymode(src_link, dst)
+ self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+
+ @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
+ @support.skip_unless_symlink
+ def test_copymode_symlink_to_symlink(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ src_link = os.path.join(tmp_dir, 'baz')
+ dst_link = os.path.join(tmp_dir, 'quux')
+ write_file(src, 'foo')
+ write_file(dst, 'foo')
+ os.symlink(src, src_link)
+ os.symlink(dst, dst_link)
+ os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
+ os.chmod(dst, stat.S_IRWXU)
+ os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
+ # link to link
+ os.lchmod(dst_link, stat.S_IRWXO)
+ shutil.copymode(src_link, dst_link, follow_symlinks=False)
+ self.assertEqual(os.lstat(src_link).st_mode,
+ os.lstat(dst_link).st_mode)
+ self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+ # src link - use chmod
+ os.lchmod(dst_link, stat.S_IRWXO)
+ shutil.copymode(src_link, dst, follow_symlinks=False)
+ self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+ # dst link - use chmod
+ os.lchmod(dst_link, stat.S_IRWXO)
+ shutil.copymode(src, dst_link, follow_symlinks=False)
+ self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
+
+ @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
+ @support.skip_unless_symlink
+ def test_copymode_symlink_to_symlink_wo_lchmod(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ src_link = os.path.join(tmp_dir, 'baz')
+ dst_link = os.path.join(tmp_dir, 'quux')
+ write_file(src, 'foo')
+ write_file(dst, 'foo')
+ os.symlink(src, src_link)
+ os.symlink(dst, dst_link)
+ shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
+
+ @support.skip_unless_symlink
+ def test_copystat_symlinks(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ src_link = os.path.join(tmp_dir, 'baz')
+ dst_link = os.path.join(tmp_dir, 'qux')
+ write_file(src, 'foo')
+ src_stat = os.stat(src)
+ os.utime(src, (src_stat.st_atime,
+ src_stat.st_mtime - 42.0)) # ensure different mtimes
+ write_file(dst, 'bar')
+ self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
+ os.symlink(src, src_link)
+ os.symlink(dst, dst_link)
+ if hasattr(os, 'lchmod'):
+ os.lchmod(src_link, stat.S_IRWXO)
+ if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
+ os.lchflags(src_link, stat.UF_NODUMP)
+ src_link_stat = os.lstat(src_link)
+ # follow
+ if hasattr(os, 'lchmod'):
+ shutil.copystat(src_link, dst_link, follow_symlinks=True)
+ self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
+ # don't follow
+ shutil.copystat(src_link, dst_link, follow_symlinks=False)
+ dst_link_stat = os.lstat(dst_link)
+ if os.utime in os.supports_follow_symlinks:
+ for attr in 'st_atime', 'st_mtime':
+ # The modification times may be truncated in the new file.
+ self.assertLessEqual(getattr(src_link_stat, attr),
+ getattr(dst_link_stat, attr) + 1)
+ if hasattr(os, 'lchmod'):
+ self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
+ if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
+ self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
+ # tell to follow but dst is not a link
+ shutil.copystat(src_link, dst, follow_symlinks=False)
+ self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
+ 00000.1)
+
+ @unittest.skipUnless(hasattr(os, 'chflags') and
+ hasattr(errno, 'EOPNOTSUPP') and
+ hasattr(errno, 'ENOTSUP'),
+ "requires os.chflags, EOPNOTSUPP & ENOTSUP")
+ def test_copystat_handles_harmless_chflags_errors(self):
+ tmpdir = self.mkdtemp()
+ file1 = os.path.join(tmpdir, 'file1')
+ file2 = os.path.join(tmpdir, 'file2')
+ write_file(file1, 'xxx')
+ write_file(file2, 'xxx')
+
+ def make_chflags_raiser(err):
+ ex = OSError()
+
+ def _chflags_raiser(path, flags, *, follow_symlinks=True):
+ ex.errno = err
+ raise ex
+ return _chflags_raiser
+ old_chflags = os.chflags
+ try:
+ for err in errno.EOPNOTSUPP, errno.ENOTSUP:
+ os.chflags = make_chflags_raiser(err)
+ shutil.copystat(file1, file2)
+ # assert others errors break it
+ os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
+ self.assertRaises(OSError, shutil.copystat, file1, file2)
+ finally:
+ os.chflags = old_chflags
+
+ @support.skip_unless_xattr
+ def test_copyxattr(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ write_file(src, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ write_file(dst, 'bar')
+
+ # no xattr == no problem
+ shutil._copyxattr(src, dst)
+ # common case
+ os.setxattr(src, 'user.foo', b'42')
+ os.setxattr(src, 'user.bar', b'43')
+ shutil._copyxattr(src, dst)
+ self.assertEqual(os.listxattr(src), os.listxattr(dst))
+ self.assertEqual(
+ os.getxattr(src, 'user.foo'),
+ os.getxattr(dst, 'user.foo'))
+ # check errors don't affect other attrs
+ os.remove(dst)
+ write_file(dst, 'bar')
+ os_error = OSError(errno.EPERM, 'EPERM')
+
+ def _raise_on_user_foo(fname, attr, val, **kwargs):
+ if attr == 'user.foo':
+ raise os_error
+ else:
+ orig_setxattr(fname, attr, val, **kwargs)
+ try:
+ orig_setxattr = os.setxattr
+ os.setxattr = _raise_on_user_foo
+ shutil._copyxattr(src, dst)
+ self.assertIn('user.bar', os.listxattr(dst))
+ finally:
+ os.setxattr = orig_setxattr
+
+ # test that shutil.copystat copies xattrs
+ src = os.path.join(tmp_dir, 'the_original')
+ write_file(src, src)
+ os.setxattr(src, 'user.the_value', b'fiddly')
+ dst = os.path.join(tmp_dir, 'the_copy')
+ write_file(dst, dst)
+ shutil.copystat(src, dst)
+ self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
+
+ @support.skip_unless_symlink
+ @support.skip_unless_xattr
+ @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
+ 'root privileges required')
+ def test_copyxattr_symlinks(self):
+ # On Linux, it's only possible to access non-user xattr for symlinks;
+ # which in turn require root privileges. This test should be expanded
+ # as soon as other platforms gain support for extended attributes.
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ src_link = os.path.join(tmp_dir, 'baz')
+ write_file(src, 'foo')
+ os.symlink(src, src_link)
+ os.setxattr(src, 'trusted.foo', b'42')
+ os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
+ dst = os.path.join(tmp_dir, 'bar')
+ dst_link = os.path.join(tmp_dir, 'qux')
+ write_file(dst, 'bar')
+ os.symlink(dst, dst_link)
+ shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
+ self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
+ self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
+ shutil._copyxattr(src_link, dst, follow_symlinks=False)
+ self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
+
+ @support.skip_unless_symlink
+ def test_copy_symlinks(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ src_link = os.path.join(tmp_dir, 'baz')
+ write_file(src, 'foo')
+ os.symlink(src, src_link)
+ if hasattr(os, 'lchmod'):
+ os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
+ # don't follow
+ shutil.copy(src_link, dst, follow_symlinks=True)
+ self.assertFalse(os.path.islink(dst))
+ self.assertEqual(read_file(src), read_file(dst))
+ os.remove(dst)
+ # follow
+ shutil.copy(src_link, dst, follow_symlinks=False)
+ self.assertTrue(os.path.islink(dst))
+ self.assertEqual(os.readlink(dst), os.readlink(src_link))
+ if hasattr(os, 'lchmod'):
+ self.assertEqual(os.lstat(src_link).st_mode,
+ os.lstat(dst).st_mode)
+
+ @support.skip_unless_symlink
+ def test_copy2_symlinks(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ src_link = os.path.join(tmp_dir, 'baz')
+ write_file(src, 'foo')
+ os.symlink(src, src_link)
+ if hasattr(os, 'lchmod'):
+ os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
+ if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
+ os.lchflags(src_link, stat.UF_NODUMP)
+ src_stat = os.stat(src)
+ src_link_stat = os.lstat(src_link)
+ # follow
+ shutil.copy2(src_link, dst, follow_symlinks=True)
+ self.assertFalse(os.path.islink(dst))
+ self.assertEqual(read_file(src), read_file(dst))
+ os.remove(dst)
+ # don't follow
+ shutil.copy2(src_link, dst, follow_symlinks=False)
+ self.assertTrue(os.path.islink(dst))
+ self.assertEqual(os.readlink(dst), os.readlink(src_link))
+ dst_stat = os.lstat(dst)
+ if os.utime in os.supports_follow_symlinks:
+ for attr in 'st_atime', 'st_mtime':
+ # The modification times may be truncated in the new file.
+ self.assertLessEqual(getattr(src_link_stat, attr),
+ getattr(dst_stat, attr) + 1)
+ if hasattr(os, 'lchmod'):
+ self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
+ self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
+ if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
+ self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
+
+ @support.skip_unless_xattr
+ def test_copy2_xattr(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'foo')
+ dst = os.path.join(tmp_dir, 'bar')
+ write_file(src, 'foo')
+ os.setxattr(src, 'user.foo', b'42')
+ shutil.copy2(src, dst)
+ self.assertEqual(
+ os.getxattr(src, 'user.foo'),
+ os.getxattr(dst, 'user.foo'))
+ os.remove(dst)
+
+ @support.skip_unless_symlink
+ def test_copyfile_symlinks(self):
+ tmp_dir = self.mkdtemp()
+ src = os.path.join(tmp_dir, 'src')
+ dst = os.path.join(tmp_dir, 'dst')
+ dst_link = os.path.join(tmp_dir, 'dst_link')
+ link = os.path.join(tmp_dir, 'link')
+ write_file(src, 'foo')
+ os.symlink(src, link)
+ # don't follow
+ shutil.copyfile(link, dst_link, follow_symlinks=False)
+ self.assertTrue(os.path.islink(dst_link))
+ self.assertEqual(os.readlink(link), os.readlink(dst_link))
+ # follow
+ shutil.copyfile(link, dst)
+ self.assertFalse(os.path.islink(dst))
+
+ def test_rmtree_uses_safe_fd_version_if_available(self):
+ _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
+ os.supports_dir_fd and
+ os.listdir in os.supports_fd and
+ os.stat in os.supports_follow_symlinks)
+ if _use_fd_functions:
+ self.assertTrue(shutil._use_fd_functions)
+ self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
+ tmp_dir = self.mkdtemp()
+ d = os.path.join(tmp_dir, 'a')
+ os.mkdir(d)
+ try:
+ real_rmtree = shutil._rmtree_safe_fd
+ class Called(Exception): pass
+ def _raiser(*args, **kwargs):
+ raise Called
+ shutil._rmtree_safe_fd = _raiser
+ self.assertRaises(Called, shutil.rmtree, d)
+ finally:
+ shutil._rmtree_safe_fd = real_rmtree
+ else:
+ self.assertFalse(shutil._use_fd_functions)
+ self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
def test_rmtree_dont_delete_file(self):
# When called on a file instead of a directory, don't delete it.
handle, path = tempfile.mkstemp()
- os.fdopen(handle).close()
- self.assertRaises(OSError, shutil.rmtree, path)
+ os.close(handle)
+ self.assertRaises(NotADirectoryError, shutil.rmtree, path)
os.remove(path)
- def _write_data(self, path, data):
- f = open(path, "w")
- f.write(data)
- f.close()
-
def test_copytree_simple(self):
-
- def read_data(path):
- f = open(path)
- data = f.read()
- f.close()
- return data
-
src_dir = tempfile.mkdtemp()
dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
- self._write_data(os.path.join(src_dir, 'test.txt'), '123')
+ self.addCleanup(shutil.rmtree, src_dir)
+ self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
+ write_file((src_dir, 'test.txt'), '123')
os.mkdir(os.path.join(src_dir, 'test_dir'))
- self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
+ write_file((src_dir, 'test_dir', 'test.txt'), '456')
+
+ shutil.copytree(src_dir, dst_dir)
+ self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
+ self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
+ self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
+ 'test.txt')))
+ actual = read_file((dst_dir, 'test.txt'))
+ self.assertEqual(actual, '123')
+ actual = read_file((dst_dir, 'test_dir', 'test.txt'))
+ self.assertEqual(actual, '456')
- try:
- shutil.copytree(src_dir, dst_dir)
- self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
- self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
- self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
- 'test.txt')))
- actual = read_data(os.path.join(dst_dir, 'test.txt'))
- self.assertEqual(actual, '123')
- actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt'))
- self.assertEqual(actual, '456')
- finally:
- for path in (
- os.path.join(src_dir, 'test.txt'),
- os.path.join(dst_dir, 'test.txt'),
- os.path.join(src_dir, 'test_dir', 'test.txt'),
- os.path.join(dst_dir, 'test_dir', 'test.txt'),
- ):
- if os.path.exists(path):
- os.remove(path)
- for path in (src_dir,
- os.path.dirname(dst_dir)
- ):
- if os.path.exists(path):
- shutil.rmtree(path)
+ @support.skip_unless_symlink
+ def test_copytree_symlinks(self):
+ tmp_dir = self.mkdtemp()
+ src_dir = os.path.join(tmp_dir, 'src')
+ dst_dir = os.path.join(tmp_dir, 'dst')
+ sub_dir = os.path.join(src_dir, 'sub')
+ os.mkdir(src_dir)
+ os.mkdir(sub_dir)
+ write_file((src_dir, 'file.txt'), 'foo')
+ src_link = os.path.join(sub_dir, 'link')
+ dst_link = os.path.join(dst_dir, 'sub/link')
+ os.symlink(os.path.join(src_dir, 'file.txt'),
+ src_link)
+ if hasattr(os, 'lchmod'):
+ os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
+ if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
+ os.lchflags(src_link, stat.UF_NODUMP)
+ src_stat = os.lstat(src_link)
+ shutil.copytree(src_dir, dst_dir, symlinks=True)
+ self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
+ self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')),
+ os.path.join(src_dir, 'file.txt'))
+ dst_stat = os.lstat(dst_link)
+ if hasattr(os, 'lchmod'):
+ self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
+ if hasattr(os, 'lchflags'):
+ self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
def test_copytree_with_exclude(self):
-
- def read_data(path):
- f = open(path)
- data = f.read()
- f.close()
- return data
-
# creating data
join = os.path.join
exists = os.path.exists
src_dir = tempfile.mkdtemp()
try:
dst_dir = join(tempfile.mkdtemp(), 'destination')
- self._write_data(join(src_dir, 'test.txt'), '123')
- self._write_data(join(src_dir, 'test.tmp'), '123')
+ write_file((src_dir, 'test.txt'), '123')
+ write_file((src_dir, 'test.tmp'), '123')
os.mkdir(join(src_dir, 'test_dir'))
- self._write_data(join(src_dir, 'test_dir', 'test.txt'), '456')
+ write_file((src_dir, 'test_dir', 'test.txt'), '456')
os.mkdir(join(src_dir, 'test_dir2'))
- self._write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')
+ write_file((src_dir, 'test_dir2', 'test.txt'), '456')
os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
- self._write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'),
- '456')
- self._write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'),
- '456')
-
+ write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
+ write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
# testing glob-like patterns
try:
@@ -294,21 +674,19 @@ class TestShutil(unittest.TestCase):
shutil.copytree(src_dir, dst_dir, ignore=patterns)
# checking the result: some elements should not be copied
self.assertTrue(exists(join(dst_dir, 'test.txt')))
- self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
- self.assertTrue(not exists(join(dst_dir, 'test_dir2')))
+ self.assertFalse(exists(join(dst_dir, 'test.tmp')))
+ self.assertFalse(exists(join(dst_dir, 'test_dir2')))
finally:
- if os.path.exists(dst_dir):
- shutil.rmtree(dst_dir)
+ shutil.rmtree(dst_dir)
try:
patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
shutil.copytree(src_dir, dst_dir, ignore=patterns)
# checking the result: some elements should not be copied
- self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
- self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2')))
- self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
+ self.assertFalse(exists(join(dst_dir, 'test.tmp')))
+ self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
+ self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
finally:
- if os.path.exists(dst_dir):
- shutil.rmtree(dst_dir)
+ shutil.rmtree(dst_dir)
# testing callable-style
try:
@@ -327,13 +705,12 @@ class TestShutil(unittest.TestCase):
shutil.copytree(src_dir, dst_dir, ignore=_filter)
# checking the result: some elements should not be copied
- self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2',
- 'test.py')))
- self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
+ self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
+ 'test.py')))
+ self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
finally:
- if os.path.exists(dst_dir):
- shutil.rmtree(dst_dir)
+ shutil.rmtree(dst_dir)
finally:
shutil.rmtree(src_dir)
shutil.rmtree(os.path.dirname(dst_dir))
@@ -358,35 +735,6 @@ class TestShutil(unittest.TestCase):
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
- @unittest.skipUnless(hasattr(os, 'chflags') and
- hasattr(errno, 'EOPNOTSUPP') and
- hasattr(errno, 'ENOTSUP'),
- "requires os.chflags, EOPNOTSUPP & ENOTSUP")
- def test_copystat_handles_harmless_chflags_errors(self):
- tmpdir = self.mkdtemp()
- file1 = os.path.join(tmpdir, 'file1')
- file2 = os.path.join(tmpdir, 'file2')
- self.write_file(file1, 'xxx')
- self.write_file(file2, 'xxx')
-
- def make_chflags_raiser(err):
- ex = OSError()
-
- def _chflags_raiser(path, flags):
- ex.errno = err
- raise ex
- return _chflags_raiser
- old_chflags = os.chflags
- try:
- for err in errno.EOPNOTSUPP, errno.ENOTSUP:
- os.chflags = make_chflags_raiser(err)
- shutil.copystat(file1, file2)
- # assert others errors break it
- os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
- self.assertRaises(OSError, shutil.copystat, file1, file2)
- finally:
- os.chflags = old_chflags
-
@support.skip_unless_symlink
def test_dont_copy_file_onto_symlink_to_itself(self):
# bug 851123.
@@ -417,6 +765,7 @@ class TestShutil(unittest.TestCase):
os.mkdir(src)
os.symlink(src, dst)
self.assertRaises(OSError, shutil.rmtree, dst)
+ shutil.rmtree(dst, ignore_errors=True)
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
@@ -457,9 +806,9 @@ class TestShutil(unittest.TestCase):
src_dir = self.mkdtemp()
dst_dir = os.path.join(self.mkdtemp(), 'destination')
- self._write_data(os.path.join(src_dir, 'test.txt'), '123')
+ write_file((src_dir, 'test.txt'), '123')
os.mkdir(os.path.join(src_dir, 'test_dir'))
- self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
+ write_file((src_dir, 'test_dir', 'test.txt'), '456')
copied = []
def _copy(src, dst):
@@ -476,7 +825,7 @@ class TestShutil(unittest.TestCase):
dst_dir = os.path.join(self.mkdtemp(), 'destination')
os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
os.mkdir(os.path.join(src_dir, 'test_dir'))
- self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
+ write_file((src_dir, 'test_dir', 'test.txt'), '456')
self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
# a dangling symlink is ignored with the proper flag
@@ -492,7 +841,7 @@ class TestShutil(unittest.TestCase):
def _copy_file(self, method):
fname = 'test.txt'
tmpdir = self.mkdtemp()
- self.write_file([tmpdir, fname])
+ write_file((tmpdir, fname), 'xxx')
file1 = os.path.join(tmpdir, fname)
tmpdir2 = self.mkdtemp()
method(file1, tmpdir2)
@@ -524,14 +873,14 @@ class TestShutil(unittest.TestCase):
self.assertEqual(getattr(file1_stat, 'st_flags'),
getattr(file2_stat, 'st_flags'))
- @unittest.skipUnless(zlib, "requires zlib")
+ @requires_zlib
def test_make_tarball(self):
# creating something to tar
tmpdir = self.mkdtemp()
- self.write_file([tmpdir, 'file1'], 'xxx')
- self.write_file([tmpdir, 'file2'], 'xxx')
+ write_file((tmpdir, 'file1'), 'xxx')
+ write_file((tmpdir, 'file2'), 'xxx')
os.mkdir(os.path.join(tmpdir, 'sub'))
- self.write_file([tmpdir, 'sub', 'file3'], 'xxx')
+ write_file((tmpdir, 'sub', 'file3'), 'xxx')
tmpdir2 = self.mkdtemp()
# force shutil to create the directory
@@ -578,16 +927,16 @@ class TestShutil(unittest.TestCase):
tmpdir = self.mkdtemp()
dist = os.path.join(tmpdir, 'dist')
os.mkdir(dist)
- self.write_file([dist, 'file1'], 'xxx')
- self.write_file([dist, 'file2'], 'xxx')
+ write_file((dist, 'file1'), 'xxx')
+ write_file((dist, 'file2'), 'xxx')
os.mkdir(os.path.join(dist, 'sub'))
- self.write_file([dist, 'sub', 'file3'], 'xxx')
+ write_file((dist, 'sub', 'file3'), 'xxx')
os.mkdir(os.path.join(dist, 'sub2'))
tmpdir2 = self.mkdtemp()
base_name = os.path.join(tmpdir2, 'archive')
return tmpdir, tmpdir2, base_name
- @unittest.skipUnless(zlib, "Requires zlib")
+ @requires_zlib
@unittest.skipUnless(find_executable('tar') and find_executable('gzip'),
'Need the tar command to run')
def test_tarfile_vs_tar(self):
@@ -642,13 +991,13 @@ class TestShutil(unittest.TestCase):
tarball = base_name + '.tar'
self.assertTrue(os.path.exists(tarball))
- @unittest.skipUnless(zlib, "Requires zlib")
+ @requires_zlib
@unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
def test_make_zipfile(self):
# creating something to tar
tmpdir = self.mkdtemp()
- self.write_file([tmpdir, 'file1'], 'xxx')
- self.write_file([tmpdir, 'file2'], 'xxx')
+ write_file((tmpdir, 'file1'), 'xxx')
+ write_file((tmpdir, 'file2'), 'xxx')
tmpdir2 = self.mkdtemp()
# force shutil to create the directory
@@ -666,7 +1015,7 @@ class TestShutil(unittest.TestCase):
base_name = os.path.join(tmpdir, 'archive')
self.assertRaises(ValueError, make_archive, base_name, 'xxx')
- @unittest.skipUnless(zlib, "Requires zlib")
+ @requires_zlib
def test_make_archive_owner_group(self):
# testing make_archive with owner and group, with various combinations
# this works even if there's not gid/uid support
@@ -694,7 +1043,7 @@ class TestShutil(unittest.TestCase):
self.assertTrue(os.path.exists(res))
- @unittest.skipUnless(zlib, "Requires zlib")
+ @requires_zlib
@unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
def test_tarfile_root_owner(self):
tmpdir, tmpdir2, base_name = self._create_files()
@@ -763,7 +1112,7 @@ class TestShutil(unittest.TestCase):
diff.append(file_)
return diff
- @unittest.skipUnless(zlib, "Requires zlib")
+ @requires_zlib
def test_unpack_archive(self):
formats = ['tar', 'gztar', 'zip']
if BZ2_SUPPORTED:
@@ -814,6 +1163,162 @@ class TestShutil(unittest.TestCase):
unregister_unpack_format('Boo2')
self.assertEqual(get_unpack_formats(), formats)
+ @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
+ "disk_usage not available on this platform")
+ def test_disk_usage(self):
+ usage = shutil.disk_usage(os.getcwd())
+ self.assertGreater(usage.total, 0)
+ self.assertGreater(usage.used, 0)
+ self.assertGreaterEqual(usage.free, 0)
+ self.assertGreaterEqual(usage.total, usage.used)
+ self.assertGreater(usage.total, usage.free)
+
+ @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
+ @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
+ def test_chown(self):
+
+ # cleaned-up automatically by TestShutil.tearDown method
+ dirname = self.mkdtemp()
+ filename = tempfile.mktemp(dir=dirname)
+ write_file(filename, 'testing chown function')
+
+ with self.assertRaises(ValueError):
+ shutil.chown(filename)
+
+ with self.assertRaises(LookupError):
+ shutil.chown(filename, user='non-exising username')
+
+ with self.assertRaises(LookupError):
+ shutil.chown(filename, group='non-exising groupname')
+
+ with self.assertRaises(TypeError):
+ shutil.chown(filename, b'spam')
+
+ with self.assertRaises(TypeError):
+ shutil.chown(filename, 3.14)
+
+ uid = os.getuid()
+ gid = os.getgid()
+
+ def check_chown(path, uid=None, gid=None):
+ s = os.stat(filename)
+ if uid is not None:
+ self.assertEqual(uid, s.st_uid)
+ if gid is not None:
+ self.assertEqual(gid, s.st_gid)
+
+ shutil.chown(filename, uid, gid)
+ check_chown(filename, uid, gid)
+ shutil.chown(filename, uid)
+ check_chown(filename, uid)
+ shutil.chown(filename, user=uid)
+ check_chown(filename, uid)
+ shutil.chown(filename, group=gid)
+ check_chown(filename, gid=gid)
+
+ shutil.chown(dirname, uid, gid)
+ check_chown(dirname, uid, gid)
+ shutil.chown(dirname, uid)
+ check_chown(dirname, uid)
+ shutil.chown(dirname, user=uid)
+ check_chown(dirname, uid)
+ shutil.chown(dirname, group=gid)
+ check_chown(dirname, gid=gid)
+
+ user = pwd.getpwuid(uid)[0]
+ group = grp.getgrgid(gid)[0]
+ shutil.chown(filename, user, group)
+ check_chown(filename, uid, gid)
+ shutil.chown(dirname, user, group)
+ check_chown(dirname, uid, gid)
+
+ def test_copy_return_value(self):
+ # copy and copy2 both return their destination path.
+ for fn in (shutil.copy, shutil.copy2):
+ src_dir = self.mkdtemp()
+ dst_dir = self.mkdtemp()
+ src = os.path.join(src_dir, 'foo')
+ write_file(src, 'foo')
+ rv = fn(src, dst_dir)
+ self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
+ rv = fn(src, os.path.join(dst_dir, 'bar'))
+ self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
+
+ def test_copyfile_return_value(self):
+ # copytree returns its destination path.
+ src_dir = self.mkdtemp()
+ dst_dir = self.mkdtemp()
+ dst_file = os.path.join(dst_dir, 'bar')
+ src_file = os.path.join(src_dir, 'foo')
+ write_file(src_file, 'foo')
+ rv = shutil.copyfile(src_file, dst_file)
+ self.assertTrue(os.path.exists(rv))
+ self.assertEqual(read_file(src_file), read_file(dst_file))
+
+ def test_copytree_return_value(self):
+ # copytree returns its destination path.
+ src_dir = self.mkdtemp()
+ dst_dir = src_dir + "dest"
+ self.addCleanup(shutil.rmtree, dst_dir, True)
+ src = os.path.join(src_dir, 'foo')
+ write_file(src, 'foo')
+ rv = shutil.copytree(src_dir, dst_dir)
+ self.assertEqual(['foo'], os.listdir(rv))
+
+
+class TestWhich(unittest.TestCase):
+
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.temp_dir, True)
+ # Give the temp_file an ".exe" suffix for all.
+ # It's needed on Windows and not harmful on other platforms.
+ self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
+ suffix=".exe")
+ os.chmod(self.temp_file.name, stat.S_IXUSR)
+ self.addCleanup(self.temp_file.close)
+ self.dir, self.file = os.path.split(self.temp_file.name)
+
+ def test_basic(self):
+ # Given an EXE in a directory, it should be returned.
+ rv = shutil.which(self.file, path=self.dir)
+ self.assertEqual(rv, self.temp_file.name)
+
+ def test_full_path_short_circuit(self):
+ # When given the fully qualified path to an executable that exists,
+ # it should be returned.
+ rv = shutil.which(self.temp_file.name, path=self.temp_dir)
+ self.assertEqual(self.temp_file.name, rv)
+
+ def test_non_matching_mode(self):
+ # Set the file read-only and ask for writeable files.
+ os.chmod(self.temp_file.name, stat.S_IREAD)
+ rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
+ self.assertIsNone(rv)
+
+ def test_relative(self):
+ old_cwd = os.getcwd()
+ base_dir, tail_dir = os.path.split(self.dir)
+ os.chdir(base_dir)
+ try:
+ rv = shutil.which(self.file, path=tail_dir)
+ self.assertEqual(rv, os.path.join(tail_dir, self.file))
+ finally:
+ os.chdir(old_cwd)
+
+ def test_nonexistent_file(self):
+ # Return None when no matching executable file is found on the path.
+ rv = shutil.which("foo.exe", path=self.dir)
+ self.assertIsNone(rv)
+
+ @unittest.skipUnless(sys.platform == "win32",
+ "pathext check is Windows-only")
+ def test_pathext_checking(self):
+ # Ask for the file without the ".exe" extension, then ensure that
+ # it gets found properly with the extension.
+ rv = shutil.which(self.temp_file.name[:-4], path=self.dir)
+ self.assertEqual(self.temp_file.name, rv)
+
class TestMove(unittest.TestCase):
@@ -927,6 +1432,58 @@ class TestMove(unittest.TestCase):
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
+ @support.skip_unless_symlink
+ @mock_rename
+ def test_move_file_symlink(self):
+ dst = os.path.join(self.src_dir, 'bar')
+ os.symlink(self.src_file, dst)
+ shutil.move(dst, self.dst_file)
+ self.assertTrue(os.path.islink(self.dst_file))
+ self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
+
+ @support.skip_unless_symlink
+ @mock_rename
+ def test_move_file_symlink_to_dir(self):
+ filename = "bar"
+ dst = os.path.join(self.src_dir, filename)
+ os.symlink(self.src_file, dst)
+ shutil.move(dst, self.dst_dir)
+ final_link = os.path.join(self.dst_dir, filename)
+ self.assertTrue(os.path.islink(final_link))
+ self.assertTrue(os.path.samefile(self.src_file, final_link))
+
+ @support.skip_unless_symlink
+ @mock_rename
+ def test_move_dangling_symlink(self):
+ src = os.path.join(self.src_dir, 'baz')
+ dst = os.path.join(self.src_dir, 'bar')
+ os.symlink(src, dst)
+ dst_link = os.path.join(self.dst_dir, 'quux')
+ shutil.move(dst, dst_link)
+ self.assertTrue(os.path.islink(dst_link))
+ self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
+
+ @support.skip_unless_symlink
+ @mock_rename
+ def test_move_dir_symlink(self):
+ src = os.path.join(self.src_dir, 'baz')
+ dst = os.path.join(self.src_dir, 'bar')
+ os.mkdir(src)
+ os.symlink(src, dst)
+ dst_link = os.path.join(self.dst_dir, 'quux')
+ shutil.move(dst, dst_link)
+ self.assertTrue(os.path.islink(dst_link))
+ self.assertTrue(os.path.samefile(src, dst_link))
+
+ def test_move_return_value(self):
+ rv = shutil.move(self.src_file, self.dst_dir)
+ self.assertEqual(rv,
+ os.path.join(self.dst_dir, os.path.basename(self.src_file)))
+
+ def test_move_as_rename_return_value(self):
+ rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
+ self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
+
class TestCopyFile(unittest.TestCase):
@@ -1036,6 +1593,7 @@ class TestCopyFile(unittest.TestCase):
# but a different case.
self.src_dir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.src_dir, True)
dst_dir = os.path.join(
os.path.dirname(self.src_dir),
os.path.basename(self.src_dir).upper())
@@ -1045,13 +1603,57 @@ class TestCopyFile(unittest.TestCase):
shutil.move(self.src_dir, dst_dir)
self.assertTrue(os.path.isdir(dst_dir))
finally:
- if os.path.exists(dst_dir):
- os.rmdir(dst_dir)
+ os.rmdir(dst_dir)
+
+class TermsizeTests(unittest.TestCase):
+ def test_does_not_crash(self):
+ """Check if get_terminal_size() returns a meaningful value.
+
+ There's no easy portable way to actually check the size of the
+ terminal, so let's check if it returns something sensible instead.
+ """
+ size = shutil.get_terminal_size()
+ self.assertGreaterEqual(size.columns, 0)
+ self.assertGreaterEqual(size.lines, 0)
+
+ def test_os_environ_first(self):
+ "Check if environment variables have precedence"
+
+ with support.EnvironmentVarGuard() as env:
+ env['COLUMNS'] = '777'
+ size = shutil.get_terminal_size()
+ self.assertEqual(size.columns, 777)
+
+ with support.EnvironmentVarGuard() as env:
+ env['LINES'] = '888'
+ size = shutil.get_terminal_size()
+ self.assertEqual(size.lines, 888)
+
+ @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
+ def test_stty_match(self):
+ """Check if stty returns the same results ignoring env
+
+ This test will fail if stdin and stdout are connected to
+ different terminals with different sizes. Nevertheless, such
+ situations should be pretty rare.
+ """
+ try:
+ size = subprocess.check_output(['stty', 'size']).decode().split()
+ except (FileNotFoundError, subprocess.CalledProcessError):
+ self.skipTest("stty invocation failed")
+ expected = (int(size[1]), int(size[0])) # reversed order
+
+ with support.EnvironmentVarGuard() as env:
+ del env['LINES']
+ del env['COLUMNS']
+ actual = shutil.get_terminal_size()
+ self.assertEqual(expected, actual)
def test_main():
- support.run_unittest(TestShutil, TestMove, TestCopyFile)
+ support.run_unittest(TestShutil, TestMove, TestCopyFile,
+ TermsizeTests, TestWhich)
if __name__ == '__main__':
test_main()