diff options
author | Serhiy Storchaka <storchaka@gmail.com> | 2022-01-24 10:14:42 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-24 10:14:42 (GMT) |
commit | 3f1ea163ea54513e00e0e9d5442fee1b639825cc (patch) | |
tree | a7b1b59d34cd6ab37cf69dc15bbb82e9315089ea | |
parent | 1398dca838529e682c06b496cc1911d91334ff3a (diff) | |
download | cpython-3f1ea163ea54513e00e0e9d5442fee1b639825cc.zip cpython-3f1ea163ea54513e00e0e9d5442fee1b639825cc.tar.gz cpython-3f1ea163ea54513e00e0e9d5442fee1b639825cc.tar.bz2 |
[3.9] bpo-46426: Improve tests for the dir_fd argument (GH-30668) (GH-30757)
Ensure that directory file descriptors refer to directories different
from the current directory, and that src_dir_fd and dst_dir_fd refer
to different directories.
Add context manager open_dir_fd() in test.support.os_helper.
(cherry picked from commit 54610bb448a9cf5be77d53b66169fca4c11be6cb)
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
-rw-r--r-- | Lib/test/support/__init__.py | 10 | ||||
-rw-r--r-- | Lib/test/test_os.py | 10 | ||||
-rw-r--r-- | Lib/test/test_posix.py | 404 |
3 files changed, 207 insertions, 217 deletions
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 4ced130..53804f1 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -1013,6 +1013,16 @@ def create_empty_file(filename): fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) os.close(fd) +@contextlib.contextmanager +def open_dir_fd(path): + """Open a file descriptor to a directory.""" + assert os.path.isdir(path) + dir_fd = os.open(path, os.O_RDONLY) + try: + yield dir_fd + finally: + os.close(dir_fd) + def sortdict(dict): "Like repr(dict), but in sorted order." items = sorted(dict.items()) diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 59ddf9e..e48157a 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -708,12 +708,9 @@ class UtimeTests(unittest.TestCase): def test_utime_dir_fd(self): def set_time(filename, ns): dirname, name = os.path.split(filename) - dirfd = os.open(dirname, os.O_RDONLY) - try: + with support.open_dir_fd(dirname) as dirfd: # pass dir_fd to test utimensat(timespec) or futimesat(timeval) os.utime(name, dir_fd=dirfd, ns=ns) - finally: - os.close(dirfd) self._test_utime(set_time) def test_utime_directory(self): @@ -4111,8 +4108,7 @@ class TestScandir(unittest.TestCase): os.symlink('file.txt', os.path.join(self.path, 'link')) expected_names.append('link') - fd = os.open(self.path, os.O_RDONLY) - try: + with support.open_dir_fd(self.path) as fd: with os.scandir(fd) as it: entries = list(it) names = [entry.name for entry in entries] @@ -4127,8 +4123,6 @@ class TestScandir(unittest.TestCase): self.assertEqual(entry.stat(), st) st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False) self.assertEqual(entry.stat(follow_symlinks=False), st) - finally: - os.close(fd) def test_empty_path(self): self.assertRaises(FileNotFoundError, os.scandir, '') diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py index 890b7e0..a8db306 100644 --- a/Lib/test/test_posix.py +++ b/Lib/test/test_posix.py @@ -18,6 +18,7 @@ import tempfile import unittest import warnings import textwrap +from contextlib import contextmanager _DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), support.TESTFN + '-dummy-symlink') @@ -1055,176 +1056,6 @@ class PosixTester(unittest.TestCase): symdiff = idg_groups.symmetric_difference(posix.getgroups()) self.assertTrue(not symdiff or symdiff == {posix.getegid()}) - # tests for the posix *at functions follow - - @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") - def test_access_dir_fd(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f)) - finally: - posix.close(f) - - @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") - def test_chmod_dir_fd(self): - os.chmod(support.TESTFN, stat.S_IRUSR) - - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) - - s = posix.stat(support.TESTFN) - self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) - finally: - posix.close(f) - - @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()") - def test_chown_dir_fd(self): - support.unlink(support.TESTFN) - support.create_empty_file(support.TESTFN) - - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f) - finally: - posix.close(f) - - @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") - def test_stat_dir_fd(self): - support.unlink(support.TESTFN) - with open(support.TESTFN, 'w') as outfile: - outfile.write("testline\n") - - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - s1 = posix.stat(support.TESTFN) - s2 = posix.stat(support.TESTFN, dir_fd=f) - self.assertEqual(s1, s2) - s2 = posix.stat(support.TESTFN, dir_fd=None) - self.assertEqual(s1, s2) - self.assertRaisesRegex(TypeError, 'should be integer or None, not', - posix.stat, support.TESTFN, dir_fd=posix.getcwd()) - self.assertRaisesRegex(TypeError, 'should be integer or None, not', - posix.stat, support.TESTFN, dir_fd=float(f)) - self.assertRaises(OverflowError, - posix.stat, support.TESTFN, dir_fd=10**20) - finally: - posix.close(f) - - @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") - def test_utime_dir_fd(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - now = time.time() - posix.utime(support.TESTFN, None, dir_fd=f) - posix.utime(support.TESTFN, dir_fd=f) - self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f) - self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f) - self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f) - self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f) - self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f) - posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f) - posix.utime(support.TESTFN, (now, now), dir_fd=f) - posix.utime(support.TESTFN, - (int(now), int((now - int(now)) * 1e9)), dir_fd=f) - posix.utime(support.TESTFN, dir_fd=f, - times=(int(now), int((now - int(now)) * 1e9))) - - # try dir_fd and follow_symlinks together - if os.utime in os.supports_follow_symlinks: - try: - posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f) - except ValueError: - # whoops! using both together not supported on this platform. - pass - - finally: - posix.close(f) - - @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()") - def test_link_dir_fd(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f) - except PermissionError as e: - self.skipTest('posix.link(): %s' % e) - else: - # should have same inodes - self.assertEqual(posix.stat(support.TESTFN)[1], - posix.stat(support.TESTFN + 'link')[1]) - finally: - posix.close(f) - support.unlink(support.TESTFN + 'link') - - @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") - def test_mkdir_dir_fd(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.mkdir(support.TESTFN + 'dir', dir_fd=f) - posix.stat(support.TESTFN + 'dir') # should not raise exception - finally: - posix.close(f) - support.rmtree(support.TESTFN + 'dir') - - @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'), - "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") - def test_mknod_dir_fd(self): - # Test using mknodat() to create a FIFO (the only use specified - # by POSIX). - support.unlink(support.TESTFN) - mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.mknod(support.TESTFN, mode, 0, dir_fd=f) - except OSError as e: - # Some old systems don't allow unprivileged users to use - # mknod(), or only support creating device nodes. - self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) - else: - self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) - finally: - posix.close(f) - - @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") - def test_open_dir_fd(self): - support.unlink(support.TESTFN) - with open(support.TESTFN, 'w') as outfile: - outfile.write("testline\n") - a = posix.open(posix.getcwd(), posix.O_RDONLY) - b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a) - try: - res = posix.read(b, 9).decode(encoding="utf-8") - self.assertEqual("testline\n", res) - finally: - posix.close(a) - posix.close(b) - - @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()") - def test_readlink_dir_fd(self): - os.symlink(support.TESTFN, support.TESTFN + 'link') - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - self.assertEqual(posix.readlink(support.TESTFN + 'link'), - posix.readlink(support.TESTFN + 'link', dir_fd=f)) - finally: - support.unlink(support.TESTFN + 'link') - posix.close(f) - - @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") - def test_rename_dir_fd(self): - support.unlink(support.TESTFN) - support.create_empty_file(support.TESTFN + 'ren') - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f) - except: - posix.rename(support.TESTFN + 'ren', support.TESTFN) - raise - else: - posix.stat(support.TESTFN) # should not raise exception - finally: - posix.close(f) - @unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal') @unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result") def test_cld_xxxx_constants(self): @@ -1235,45 +1066,6 @@ class PosixTester(unittest.TestCase): os.CLD_STOPPED os.CLD_CONTINUED - @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") - def test_symlink_dir_fd(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f) - self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN) - finally: - posix.close(f) - support.unlink(support.TESTFN + 'link') - - @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") - def test_unlink_dir_fd(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - support.create_empty_file(support.TESTFN + 'del') - posix.stat(support.TESTFN + 'del') # should not raise exception - try: - posix.unlink(support.TESTFN + 'del', dir_fd=f) - except: - support.unlink(support.TESTFN + 'del') - raise - else: - self.assertRaises(OSError, posix.stat, support.TESTFN + 'link') - finally: - posix.close(f) - - @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") - def test_mkfifo_dir_fd(self): - support.unlink(support.TESTFN) - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - try: - posix.mkfifo(support.TESTFN, - stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) - except PermissionError as e: - self.skipTest('posix.mkfifo(): %s' % e) - self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) - finally: - posix.close(f) - requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'), "don't have scheduling support") requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'), @@ -1480,6 +1272,200 @@ class PosixTester(unittest.TestCase): self.assertEqual(cm.exception.errno, errno.EINVAL) os.close(os.pidfd_open(os.getpid(), 0)) + +# tests for the posix *at functions follow +class TestPosixDirFd(unittest.TestCase): + count = 0 + + @contextmanager + def prepare(self): + TestPosixDirFd.count += 1 + name = f'{support.TESTFN}_{self.count}' + base_dir = f'{support.TESTFN}_{self.count}base' + posix.mkdir(base_dir) + self.addCleanup(posix.rmdir, base_dir) + fullname = os.path.join(base_dir, name) + assert not os.path.exists(fullname) + with support.open_dir_fd(base_dir) as dir_fd: + yield (dir_fd, name, fullname) + + @contextmanager + def prepare_file(self): + with self.prepare() as (dir_fd, name, fullname): + support.create_empty_file(fullname) + self.addCleanup(posix.unlink, fullname) + yield (dir_fd, name, fullname) + + @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") + def test_access_dir_fd(self): + with self.prepare_file() as (dir_fd, name, fullname): + self.assertTrue(posix.access(name, os.R_OK, dir_fd=dir_fd)) + + @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") + def test_chmod_dir_fd(self): + with self.prepare_file() as (dir_fd, name, fullname): + posix.chmod(fullname, stat.S_IRUSR) + posix.chmod(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd) + s = posix.stat(fullname) + self.assertEqual(s.st_mode & stat.S_IRWXU, + stat.S_IRUSR | stat.S_IWUSR) + + @unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd), + "test needs dir_fd support in os.chown()") + def test_chown_dir_fd(self): + with self.prepare_file() as (dir_fd, name, fullname): + posix.chown(name, os.getuid(), os.getgid(), dir_fd=dir_fd) + + @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") + def test_stat_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + with open(fullname, 'w') as outfile: + outfile.write("testline\n") + self.addCleanup(posix.unlink, fullname) + + s1 = posix.stat(fullname) + s2 = posix.stat(name, dir_fd=dir_fd) + self.assertEqual(s1, s2) + s2 = posix.stat(fullname, dir_fd=None) + self.assertEqual(s1, s2) + + self.assertRaisesRegex(TypeError, 'should be integer or None, not', + posix.stat, name, dir_fd=posix.getcwd()) + self.assertRaisesRegex(TypeError, 'should be integer or None, not', + posix.stat, name, dir_fd=float(dir_fd)) + self.assertRaises(OverflowError, + posix.stat, name, dir_fd=10**20) + + @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") + def test_utime_dir_fd(self): + with self.prepare_file() as (dir_fd, name, fullname): + now = time.time() + posix.utime(name, None, dir_fd=dir_fd) + posix.utime(name, dir_fd=dir_fd) + self.assertRaises(TypeError, posix.utime, name, + now, dir_fd=dir_fd) + self.assertRaises(TypeError, posix.utime, name, + (None, None), dir_fd=dir_fd) + self.assertRaises(TypeError, posix.utime, name, + (now, None), dir_fd=dir_fd) + self.assertRaises(TypeError, posix.utime, name, + (None, now), dir_fd=dir_fd) + self.assertRaises(TypeError, posix.utime, name, + (now, "x"), dir_fd=dir_fd) + posix.utime(name, (int(now), int(now)), dir_fd=dir_fd) + posix.utime(name, (now, now), dir_fd=dir_fd) + posix.utime(name, + (int(now), int((now - int(now)) * 1e9)), dir_fd=dir_fd) + posix.utime(name, dir_fd=dir_fd, + times=(int(now), int((now - int(now)) * 1e9))) + + # try dir_fd and follow_symlinks together + if os.utime in os.supports_follow_symlinks: + try: + posix.utime(name, follow_symlinks=False, dir_fd=dir_fd) + except ValueError: + # whoops! using both together not supported on this platform. + pass + + @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()") + def test_link_dir_fd(self): + with self.prepare_file() as (dir_fd, name, fullname), \ + self.prepare() as (dir_fd2, linkname, fulllinkname): + try: + posix.link(name, linkname, src_dir_fd=dir_fd, dst_dir_fd=dir_fd2) + except PermissionError as e: + self.skipTest('posix.link(): %s' % e) + self.addCleanup(posix.unlink, fulllinkname) + # should have same inodes + self.assertEqual(posix.stat(fullname)[1], + posix.stat(fulllinkname)[1]) + + @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") + def test_mkdir_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + posix.mkdir(name, dir_fd=dir_fd) + self.addCleanup(posix.rmdir, fullname) + posix.stat(fullname) # should not raise exception + + @unittest.skipUnless(hasattr(os, 'mknod') + and (os.mknod in os.supports_dir_fd) + and hasattr(stat, 'S_IFIFO'), + "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") + def test_mknod_dir_fd(self): + # Test using mknodat() to create a FIFO (the only use specified + # by POSIX). + with self.prepare() as (dir_fd, name, fullname): + mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR + try: + posix.mknod(name, mode, 0, dir_fd=dir_fd) + except OSError as e: + # Some old systems don't allow unprivileged users to use + # mknod(), or only support creating device nodes. + self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) + else: + self.addCleanup(posix.unlink, fullname) + self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode)) + + @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") + def test_open_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + with open(fullname, 'wb') as outfile: + outfile.write(b"testline\n") + self.addCleanup(posix.unlink, fullname) + fd = posix.open(name, posix.O_RDONLY, dir_fd=dir_fd) + try: + res = posix.read(fd, 9) + self.assertEqual(b"testline\n", res) + finally: + posix.close(fd) + + @unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd), + "test needs dir_fd support in os.readlink()") + def test_readlink_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + os.symlink('symlink', fullname) + self.addCleanup(posix.unlink, fullname) + self.assertEqual(posix.readlink(name, dir_fd=dir_fd), 'symlink') + + @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") + def test_rename_dir_fd(self): + with self.prepare_file() as (dir_fd, name, fullname), \ + self.prepare() as (dir_fd2, name2, fullname2): + posix.rename(name, name2, + src_dir_fd=dir_fd, dst_dir_fd=dir_fd2) + posix.stat(fullname2) # should not raise exception + posix.rename(fullname2, fullname) + + @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") + def test_symlink_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + posix.symlink('symlink', name, dir_fd=dir_fd) + self.addCleanup(posix.unlink, fullname) + self.assertEqual(posix.readlink(fullname), 'symlink') + + @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") + def test_unlink_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + support.create_empty_file(fullname) + posix.stat(fullname) # should not raise exception + try: + posix.unlink(name, dir_fd=dir_fd) + self.assertRaises(OSError, posix.stat, fullname) + except: + self.addCleanup(posix.unlink, fullname) + raise + + @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") + def test_mkfifo_dir_fd(self): + with self.prepare() as (dir_fd, name, fullname): + try: + posix.mkfifo(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd) + except PermissionError as e: + self.skipTest('posix.mkfifo(): %s' % e) + self.addCleanup(posix.unlink, fullname) + self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode)) + + class PosixGroupsTester(unittest.TestCase): def setUp(self): |