From 81c16cd94ec38d61aa478b9a452436dc3b1b524d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B8ren=20L=C3=B8vborg?= <sorenl@unity3d.com>
Date: Thu, 7 Dec 2023 17:04:06 +0100
Subject: gh-91133: tempfile.TemporaryDirectory: fix symlink bug in cleanup
 (GH-99930)

Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
---
 Lib/tempfile.py                                    |  27 +++--
 Lib/test/test_tempfile.py                          | 111 +++++++++++++++++++--
 .../2022-12-01-16-57-44.gh-issue-91133.LKMVCV.rst  |   2 +
 3 files changed, 125 insertions(+), 15 deletions(-)
 create mode 100644 Misc/NEWS.d/next/Library/2022-12-01-16-57-44.gh-issue-91133.LKMVCV.rst

diff --git a/Lib/tempfile.py b/Lib/tempfile.py
index 55403ad..9a5e7d0 100644
--- a/Lib/tempfile.py
+++ b/Lib/tempfile.py
@@ -270,6 +270,22 @@ def _mkstemp_inner(dir, pre, suf, flags, output_type):
     raise FileExistsError(_errno.EEXIST,
                           "No usable temporary file name found")
 
+def _dont_follow_symlinks(func, path, *args):
+    # Pass follow_symlinks=False, unless not supported on this platform.
+    if func in _os.supports_follow_symlinks:
+        func(path, *args, follow_symlinks=False)
+    elif _os.name == 'nt' or not _os.path.islink(path):
+        func(path, *args)
+
+def _resetperms(path):
+    try:
+        chflags = _os.chflags
+    except AttributeError:
+        pass
+    else:
+        _dont_follow_symlinks(chflags, path, 0)
+    _dont_follow_symlinks(_os.chmod, path, 0o700)
+
 
 # User visible interfaces.
 
@@ -876,17 +892,10 @@ class TemporaryDirectory:
     def _rmtree(cls, name, ignore_errors=False):
         def onexc(func, path, exc):
             if isinstance(exc, PermissionError):
-                def resetperms(path):
-                    try:
-                        _os.chflags(path, 0)
-                    except AttributeError:
-                        pass
-                    _os.chmod(path, 0o700)
-
                 try:
                     if path != name:
-                        resetperms(_os.path.dirname(path))
-                    resetperms(path)
+                        _resetperms(_os.path.dirname(path))
+                    _resetperms(path)
 
                     try:
                         _os.unlink(path)
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py
index f4aef88..2729bec 100644
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -1673,6 +1673,103 @@ class TestTemporaryDirectory(BaseTestCase):
                          "were deleted")
         d2.cleanup()
 
+    @os_helper.skip_unless_symlink
+    def test_cleanup_with_symlink_modes(self):
+        # cleanup() should not follow symlinks when fixing mode bits (#91133)
+        with self.do_create(recurse=0) as d2:
+            file1 = os.path.join(d2, 'file1')
+            open(file1, 'wb').close()
+            dir1 = os.path.join(d2, 'dir1')
+            os.mkdir(dir1)
+            for mode in range(8):
+                mode <<= 6
+                with self.subTest(mode=format(mode, '03o')):
+                    def test(target, target_is_directory):
+                        d1 = self.do_create(recurse=0)
+                        symlink = os.path.join(d1.name, 'symlink')
+                        os.symlink(target, symlink,
+                                target_is_directory=target_is_directory)
+                        try:
+                            os.chmod(symlink, mode, follow_symlinks=False)
+                        except NotImplementedError:
+                            pass
+                        try:
+                            os.chmod(symlink, mode)
+                        except FileNotFoundError:
+                            pass
+                        os.chmod(d1.name, mode)
+                        d1.cleanup()
+                        self.assertFalse(os.path.exists(d1.name))
+
+                    with self.subTest('nonexisting file'):
+                        test('nonexisting', target_is_directory=False)
+                    with self.subTest('nonexisting dir'):
+                        test('nonexisting', target_is_directory=True)
+
+                    with self.subTest('existing file'):
+                        os.chmod(file1, mode)
+                        old_mode = os.stat(file1).st_mode
+                        test(file1, target_is_directory=False)
+                        new_mode = os.stat(file1).st_mode
+                        self.assertEqual(new_mode, old_mode,
+                                         '%03o != %03o' % (new_mode, old_mode))
+
+                    with self.subTest('existing dir'):
+                        os.chmod(dir1, mode)
+                        old_mode = os.stat(dir1).st_mode
+                        test(dir1, target_is_directory=True)
+                        new_mode = os.stat(dir1).st_mode
+                        self.assertEqual(new_mode, old_mode,
+                                         '%03o != %03o' % (new_mode, old_mode))
+
+    @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags')
+    @os_helper.skip_unless_symlink
+    def test_cleanup_with_symlink_flags(self):
+        # cleanup() should not follow symlinks when fixing flags (#91133)
+        flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
+        self.check_flags(flags)
+
+        with self.do_create(recurse=0) as d2:
+            file1 = os.path.join(d2, 'file1')
+            open(file1, 'wb').close()
+            dir1 = os.path.join(d2, 'dir1')
+            os.mkdir(dir1)
+            def test(target, target_is_directory):
+                d1 = self.do_create(recurse=0)
+                symlink = os.path.join(d1.name, 'symlink')
+                os.symlink(target, symlink,
+                           target_is_directory=target_is_directory)
+                try:
+                    os.chflags(symlink, flags, follow_symlinks=False)
+                except NotImplementedError:
+                    pass
+                try:
+                    os.chflags(symlink, flags)
+                except FileNotFoundError:
+                    pass
+                os.chflags(d1.name, flags)
+                d1.cleanup()
+                self.assertFalse(os.path.exists(d1.name))
+
+            with self.subTest('nonexisting file'):
+                test('nonexisting', target_is_directory=False)
+            with self.subTest('nonexisting dir'):
+                test('nonexisting', target_is_directory=True)
+
+            with self.subTest('existing file'):
+                os.chflags(file1, flags)
+                old_flags = os.stat(file1).st_flags
+                test(file1, target_is_directory=False)
+                new_flags = os.stat(file1).st_flags
+                self.assertEqual(new_flags, old_flags)
+
+            with self.subTest('existing dir'):
+                os.chflags(dir1, flags)
+                old_flags = os.stat(dir1).st_flags
+                test(dir1, target_is_directory=True)
+                new_flags = os.stat(dir1).st_flags
+                self.assertEqual(new_flags, old_flags)
+
     @support.cpython_only
     def test_del_on_collection(self):
         # A TemporaryDirectory is deleted when garbage collected
@@ -1845,10 +1942,7 @@ class TestTemporaryDirectory(BaseTestCase):
                     d.cleanup()
                 self.assertFalse(os.path.exists(d.name))
 
-    @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags')
-    def test_flags(self):
-        flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
-
+    def check_flags(self, flags):
         # skip the test if these flags are not supported (ex: FreeBSD 13)
         filename = os_helper.TESTFN
         try:
@@ -1857,13 +1951,18 @@ class TestTemporaryDirectory(BaseTestCase):
                 os.chflags(filename, flags)
             except OSError as exc:
                 # "OSError: [Errno 45] Operation not supported"
-                self.skipTest(f"chflags() doesn't support "
-                              f"UF_IMMUTABLE|UF_NOUNLINK: {exc}")
+                self.skipTest(f"chflags() doesn't support flags "
+                              f"{flags:#b}: {exc}")
             else:
                 os.chflags(filename, 0)
         finally:
             os_helper.unlink(filename)
 
+    @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags')
+    def test_flags(self):
+        flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
+        self.check_flags(flags)
+
         d = self.do_create(recurse=3, dirs=2, files=2)
         with d:
             # Change files and directories flags recursively.
diff --git a/Misc/NEWS.d/next/Library/2022-12-01-16-57-44.gh-issue-91133.LKMVCV.rst b/Misc/NEWS.d/next/Library/2022-12-01-16-57-44.gh-issue-91133.LKMVCV.rst
new file mode 100644
index 0000000..7991048
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-12-01-16-57-44.gh-issue-91133.LKMVCV.rst
@@ -0,0 +1,2 @@
+Fix a bug in :class:`tempfile.TemporaryDirectory` cleanup, which now no longer
+dereferences symlinks when working around file system permission errors.
-- 
cgit v0.12