summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_pathlib/test_pathlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_pathlib/test_pathlib.py')
-rw-r--r--Lib/test/test_pathlib/test_pathlib.py251
1 files changed, 251 insertions, 0 deletions
diff --git a/Lib/test/test_pathlib/test_pathlib.py b/Lib/test/test_pathlib/test_pathlib.py
index 1328a86..e17e7d7 100644
--- a/Lib/test/test_pathlib/test_pathlib.py
+++ b/Lib/test/test_pathlib/test_pathlib.py
@@ -16,6 +16,7 @@ from urllib.request import pathname2url
from test.support import import_helper
from test.support import is_emscripten, is_wasi
from test.support import infinite_recursion
+from test.support import swap_attr
from test.support import os_helper
from test.support.os_helper import TESTFN, FakePath
from test.test_pathlib import test_pathlib_abc
@@ -31,6 +32,10 @@ root_in_posix = False
if hasattr(os, 'geteuid'):
root_in_posix = (os.geteuid() == 0)
+rmtree_use_fd_functions = (
+ {os.open, os.stat, os.unlink, os.rmdir} <= os.supports_dir_fd and
+ os.listdir in os.supports_fd and os.stat in os.supports_follow_symlinks)
+
#
# Tests for the pure classes.
#
@@ -827,6 +832,252 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest):
self.assertEqual(expected_gid, gid_2)
self.assertEqual(expected_name, link.group(follow_symlinks=False))
+ def test_rmtree_uses_safe_fd_version_if_available(self):
+ if rmtree_use_fd_functions:
+ d = self.cls(self.base, 'a')
+ d.mkdir()
+ try:
+ real_open = os.open
+
+ class Called(Exception):
+ pass
+
+ def _raiser(*args, **kwargs):
+ raise Called
+
+ os.open = _raiser
+ self.assertRaises(Called, d.rmtree)
+ finally:
+ os.open = real_open
+
+ @unittest.skipIf(sys.platform[:6] == 'cygwin',
+ "This test can't be run on Cygwin (issue #1071513).")
+ @os_helper.skip_if_dac_override
+ @os_helper.skip_unless_working_chmod
+ def test_rmtree_unwritable(self):
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ child_file_path = tmp / 'a'
+ child_dir_path = tmp / 'b'
+ child_file_path.write_text("")
+ child_dir_path.mkdir()
+ old_dir_mode = tmp.stat().st_mode
+ old_child_file_mode = child_file_path.stat().st_mode
+ old_child_dir_mode = child_dir_path.stat().st_mode
+ # Make unwritable.
+ new_mode = stat.S_IREAD | stat.S_IEXEC
+ try:
+ child_file_path.chmod(new_mode)
+ child_dir_path.chmod(new_mode)
+ tmp.chmod(new_mode)
+
+ errors = []
+ tmp.rmtree(on_error=errors.append)
+ # Test whether onerror has actually been called.
+ print(errors)
+ self.assertEqual(len(errors), 3)
+ finally:
+ tmp.chmod(old_dir_mode)
+ child_file_path.chmod(old_child_file_mode)
+ child_dir_path.chmod(old_child_dir_mode)
+
+ @needs_windows
+ def test_rmtree_inner_junction(self):
+ import _winapi
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ dir1 = tmp / 'dir1'
+ dir2 = dir1 / 'dir2'
+ dir3 = tmp / 'dir3'
+ for d in dir1, dir2, dir3:
+ d.mkdir()
+ file1 = tmp / 'file1'
+ file1.write_text('foo')
+ link1 = dir1 / 'link1'
+ _winapi.CreateJunction(str(dir2), str(link1))
+ link2 = dir1 / 'link2'
+ _winapi.CreateJunction(str(dir3), str(link2))
+ link3 = dir1 / 'link3'
+ _winapi.CreateJunction(str(file1), str(link3))
+ # make sure junctions are removed but not followed
+ dir1.rmtree()
+ self.assertFalse(dir1.exists())
+ self.assertTrue(dir3.exists())
+ self.assertTrue(file1.exists())
+
+ @needs_windows
+ def test_rmtree_outer_junction(self):
+ import _winapi
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ try:
+ src = tmp / 'cheese'
+ dst = tmp / 'shop'
+ src.mkdir()
+ spam = src / 'spam'
+ spam.write_text('')
+ _winapi.CreateJunction(str(src), str(dst))
+ self.assertRaises(OSError, dst.rmtree)
+ dst.rmtree(ignore_errors=True)
+ finally:
+ tmp.rmtree(ignore_errors=True)
+
+ @needs_windows
+ def test_rmtree_outer_junction_on_error(self):
+ import _winapi
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ dir_ = tmp / 'dir'
+ dir_.mkdir()
+ link = tmp / 'link'
+ _winapi.CreateJunction(str(dir_), str(link))
+ try:
+ self.assertRaises(OSError, link.rmtree)
+ self.assertTrue(dir_.exists())
+ self.assertTrue(link.exists(follow_symlinks=False))
+ errors = []
+
+ def on_error(error):
+ errors.append(error)
+
+ link.rmtree(on_error=on_error)
+ self.assertEqual(len(errors), 1)
+ self.assertIsInstance(errors[0], OSError)
+ self.assertEqual(errors[0].filename, str(link))
+ finally:
+ os.unlink(str(link))
+
+ @unittest.skipUnless(rmtree_use_fd_functions, "requires safe rmtree")
+ def test_rmtree_fails_on_close(self):
+ # Test that the error handler is called for failed os.close() and that
+ # os.close() is only called once for a file descriptor.
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ dir1 = tmp / 'dir1'
+ dir1.mkdir()
+ dir2 = dir1 / 'dir2'
+ dir2.mkdir()
+
+ def close(fd):
+ orig_close(fd)
+ nonlocal close_count
+ close_count += 1
+ raise OSError
+
+ close_count = 0
+ with swap_attr(os, 'close', close) as orig_close:
+ with self.assertRaises(OSError):
+ dir1.rmtree()
+ self.assertTrue(dir2.is_dir())
+ self.assertEqual(close_count, 2)
+
+ close_count = 0
+ errors = []
+
+ with swap_attr(os, 'close', close) as orig_close:
+ dir1.rmtree(on_error=errors.append)
+ print(errors)
+ self.assertEqual(len(errors), 2)
+ self.assertEqual(errors[0].filename, str(dir2))
+ self.assertEqual(errors[1].filename, str(dir1))
+ self.assertEqual(close_count, 2)
+
+ @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
+ @unittest.skipIf(sys.platform == "vxworks",
+ "fifo requires special path on VxWorks")
+ def test_rmtree_on_named_pipe(self):
+ p = self.cls(self.base, 'pipe')
+ os.mkfifo(p)
+ try:
+ with self.assertRaises(NotADirectoryError):
+ p.rmtree()
+ self.assertTrue(p.exists())
+ finally:
+ p.unlink()
+
+ p = self.cls(self.base, 'dir')
+ p.mkdir()
+ os.mkfifo(p / 'mypipe')
+ p.rmtree()
+ self.assertFalse(p.exists())
+
+ @unittest.skipIf(sys.platform[:6] == 'cygwin',
+ "This test can't be run on Cygwin (issue #1071513).")
+ @os_helper.skip_if_dac_override
+ @os_helper.skip_unless_working_chmod
+ def test_rmtree_deleted_race_condition(self):
+ # bpo-37260
+ #
+ # Test that a file or a directory deleted after it is enumerated
+ # by scandir() but before unlink() or rmdr() is called doesn't
+ # generate any errors.
+ def on_error(exc):
+ assert exc.filename
+ if not isinstance(exc, PermissionError):
+ raise
+ # Make the parent and the children writeable.
+ for p, mode in zip(paths, old_modes):
+ p.chmod(mode)
+ # Remove other dirs except one.
+ keep = next(p for p in dirs if str(p) != exc.filename)
+ for p in dirs:
+ if p != keep:
+ p.rmdir()
+ # Remove other files except one.
+ keep = next(p for p in files if str(p) != exc.filename)
+ for p in files:
+ if p != keep:
+ p.unlink()
+
+ tmp = self.cls(self.base, 'rmtree')
+ tmp.mkdir()
+ paths = [tmp] + [tmp / f'child{i}' for i in range(6)]
+ dirs = paths[1::2]
+ files = paths[2::2]
+ for path in dirs:
+ path.mkdir()
+ for path in files:
+ path.write_text('')
+
+ old_modes = [path.stat().st_mode for path in paths]
+
+ # Make the parent and the children non-writeable.
+ new_mode = stat.S_IREAD | stat.S_IEXEC
+ for path in reversed(paths):
+ path.chmod(new_mode)
+
+ try:
+ tmp.rmtree(on_error=on_error)
+ except:
+ # Test failed, so cleanup artifacts.
+ for path, mode in zip(paths, old_modes):
+ try:
+ path.chmod(mode)
+ except OSError:
+ pass
+ tmp.rmtree()
+ raise
+
+ def test_rmtree_does_not_choke_on_failing_lstat(self):
+ try:
+ orig_lstat = os.lstat
+ tmp = self.cls(self.base, 'rmtree')
+
+ def raiser(fn, *args, **kwargs):
+ if fn != str(tmp):
+ raise OSError()
+ else:
+ return orig_lstat(fn)
+
+ os.lstat = raiser
+
+ tmp.mkdir()
+ foo = tmp / 'foo'
+ foo.write_text('')
+ tmp.rmtree()
+ finally:
+ os.lstat = orig_lstat
+
@os_helper.skip_unless_hardlink
def test_hardlink_to(self):
P = self.cls(self.base)