diff options
Diffstat (limited to 'Lib/test/test_pathlib/test_pathlib.py')
-rw-r--r-- | Lib/test/test_pathlib/test_pathlib.py | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/Lib/test/test_pathlib/test_pathlib.py b/Lib/test/test_pathlib/test_pathlib.py index 1328a86..e17e7d7 100644 --- a/Lib/test/test_pathlib/test_pathlib.py +++ b/Lib/test/test_pathlib/test_pathlib.py @@ -16,6 +16,7 @@ from urllib.request import pathname2url from test.support import import_helper from test.support import is_emscripten, is_wasi from test.support import infinite_recursion +from test.support import swap_attr from test.support import os_helper from test.support.os_helper import TESTFN, FakePath from test.test_pathlib import test_pathlib_abc @@ -31,6 +32,10 @@ root_in_posix = False if hasattr(os, 'geteuid'): root_in_posix = (os.geteuid() == 0) +rmtree_use_fd_functions = ( + {os.open, os.stat, os.unlink, os.rmdir} <= os.supports_dir_fd and + os.listdir in os.supports_fd and os.stat in os.supports_follow_symlinks) + # # Tests for the pure classes. # @@ -827,6 +832,252 @@ class PathTest(test_pathlib_abc.DummyPathTest, PurePathTest): self.assertEqual(expected_gid, gid_2) self.assertEqual(expected_name, link.group(follow_symlinks=False)) + def test_rmtree_uses_safe_fd_version_if_available(self): + if rmtree_use_fd_functions: + d = self.cls(self.base, 'a') + d.mkdir() + try: + real_open = os.open + + class Called(Exception): + pass + + def _raiser(*args, **kwargs): + raise Called + + os.open = _raiser + self.assertRaises(Called, d.rmtree) + finally: + os.open = real_open + + @unittest.skipIf(sys.platform[:6] == 'cygwin', + "This test can't be run on Cygwin (issue #1071513).") + @os_helper.skip_if_dac_override + @os_helper.skip_unless_working_chmod + def test_rmtree_unwritable(self): + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + child_file_path = tmp / 'a' + child_dir_path = tmp / 'b' + child_file_path.write_text("") + child_dir_path.mkdir() + old_dir_mode = tmp.stat().st_mode + old_child_file_mode = child_file_path.stat().st_mode + old_child_dir_mode = child_dir_path.stat().st_mode + # Make unwritable. + new_mode = stat.S_IREAD | stat.S_IEXEC + try: + child_file_path.chmod(new_mode) + child_dir_path.chmod(new_mode) + tmp.chmod(new_mode) + + errors = [] + tmp.rmtree(on_error=errors.append) + # Test whether onerror has actually been called. + print(errors) + self.assertEqual(len(errors), 3) + finally: + tmp.chmod(old_dir_mode) + child_file_path.chmod(old_child_file_mode) + child_dir_path.chmod(old_child_dir_mode) + + @needs_windows + def test_rmtree_inner_junction(self): + import _winapi + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + dir1 = tmp / 'dir1' + dir2 = dir1 / 'dir2' + dir3 = tmp / 'dir3' + for d in dir1, dir2, dir3: + d.mkdir() + file1 = tmp / 'file1' + file1.write_text('foo') + link1 = dir1 / 'link1' + _winapi.CreateJunction(str(dir2), str(link1)) + link2 = dir1 / 'link2' + _winapi.CreateJunction(str(dir3), str(link2)) + link3 = dir1 / 'link3' + _winapi.CreateJunction(str(file1), str(link3)) + # make sure junctions are removed but not followed + dir1.rmtree() + self.assertFalse(dir1.exists()) + self.assertTrue(dir3.exists()) + self.assertTrue(file1.exists()) + + @needs_windows + def test_rmtree_outer_junction(self): + import _winapi + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + try: + src = tmp / 'cheese' + dst = tmp / 'shop' + src.mkdir() + spam = src / 'spam' + spam.write_text('') + _winapi.CreateJunction(str(src), str(dst)) + self.assertRaises(OSError, dst.rmtree) + dst.rmtree(ignore_errors=True) + finally: + tmp.rmtree(ignore_errors=True) + + @needs_windows + def test_rmtree_outer_junction_on_error(self): + import _winapi + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + dir_ = tmp / 'dir' + dir_.mkdir() + link = tmp / 'link' + _winapi.CreateJunction(str(dir_), str(link)) + try: + self.assertRaises(OSError, link.rmtree) + self.assertTrue(dir_.exists()) + self.assertTrue(link.exists(follow_symlinks=False)) + errors = [] + + def on_error(error): + errors.append(error) + + link.rmtree(on_error=on_error) + self.assertEqual(len(errors), 1) + self.assertIsInstance(errors[0], OSError) + self.assertEqual(errors[0].filename, str(link)) + finally: + os.unlink(str(link)) + + @unittest.skipUnless(rmtree_use_fd_functions, "requires safe rmtree") + def test_rmtree_fails_on_close(self): + # Test that the error handler is called for failed os.close() and that + # os.close() is only called once for a file descriptor. + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + dir1 = tmp / 'dir1' + dir1.mkdir() + dir2 = dir1 / 'dir2' + dir2.mkdir() + + def close(fd): + orig_close(fd) + nonlocal close_count + close_count += 1 + raise OSError + + close_count = 0 + with swap_attr(os, 'close', close) as orig_close: + with self.assertRaises(OSError): + dir1.rmtree() + self.assertTrue(dir2.is_dir()) + self.assertEqual(close_count, 2) + + close_count = 0 + errors = [] + + with swap_attr(os, 'close', close) as orig_close: + dir1.rmtree(on_error=errors.append) + print(errors) + self.assertEqual(len(errors), 2) + self.assertEqual(errors[0].filename, str(dir2)) + self.assertEqual(errors[1].filename, str(dir1)) + self.assertEqual(close_count, 2) + + @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') + @unittest.skipIf(sys.platform == "vxworks", + "fifo requires special path on VxWorks") + def test_rmtree_on_named_pipe(self): + p = self.cls(self.base, 'pipe') + os.mkfifo(p) + try: + with self.assertRaises(NotADirectoryError): + p.rmtree() + self.assertTrue(p.exists()) + finally: + p.unlink() + + p = self.cls(self.base, 'dir') + p.mkdir() + os.mkfifo(p / 'mypipe') + p.rmtree() + self.assertFalse(p.exists()) + + @unittest.skipIf(sys.platform[:6] == 'cygwin', + "This test can't be run on Cygwin (issue #1071513).") + @os_helper.skip_if_dac_override + @os_helper.skip_unless_working_chmod + def test_rmtree_deleted_race_condition(self): + # bpo-37260 + # + # Test that a file or a directory deleted after it is enumerated + # by scandir() but before unlink() or rmdr() is called doesn't + # generate any errors. + def on_error(exc): + assert exc.filename + if not isinstance(exc, PermissionError): + raise + # Make the parent and the children writeable. + for p, mode in zip(paths, old_modes): + p.chmod(mode) + # Remove other dirs except one. + keep = next(p for p in dirs if str(p) != exc.filename) + for p in dirs: + if p != keep: + p.rmdir() + # Remove other files except one. + keep = next(p for p in files if str(p) != exc.filename) + for p in files: + if p != keep: + p.unlink() + + tmp = self.cls(self.base, 'rmtree') + tmp.mkdir() + paths = [tmp] + [tmp / f'child{i}' for i in range(6)] + dirs = paths[1::2] + files = paths[2::2] + for path in dirs: + path.mkdir() + for path in files: + path.write_text('') + + old_modes = [path.stat().st_mode for path in paths] + + # Make the parent and the children non-writeable. + new_mode = stat.S_IREAD | stat.S_IEXEC + for path in reversed(paths): + path.chmod(new_mode) + + try: + tmp.rmtree(on_error=on_error) + except: + # Test failed, so cleanup artifacts. + for path, mode in zip(paths, old_modes): + try: + path.chmod(mode) + except OSError: + pass + tmp.rmtree() + raise + + def test_rmtree_does_not_choke_on_failing_lstat(self): + try: + orig_lstat = os.lstat + tmp = self.cls(self.base, 'rmtree') + + def raiser(fn, *args, **kwargs): + if fn != str(tmp): + raise OSError() + else: + return orig_lstat(fn) + + os.lstat = raiser + + tmp.mkdir() + foo = tmp / 'foo' + foo.write_text('') + tmp.rmtree() + finally: + os.lstat = orig_lstat + @os_helper.skip_unless_hardlink def test_hardlink_to(self): P = self.cls(self.base) |