diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/os.py | 42 | ||||
-rw-r--r-- | Lib/test/test_os.py | 31 |
2 files changed, 55 insertions, 18 deletions
@@ -129,6 +129,7 @@ if _exists("_have_functions"): _add("HAVE_FCHMOD", "chmod") _add("HAVE_FCHOWN", "chown") _add("HAVE_FDOPENDIR", "listdir") + _add("HAVE_FDOPENDIR", "scandir") _add("HAVE_FEXECVE", "execve") _set.add(stat) # fstat always works _add("HAVE_FTRUNCATE", "truncate") @@ -416,7 +417,7 @@ def walk(top, topdown=True, onerror=None, followlinks=False): __all__.append("walk") -if {open, stat} <= supports_dir_fd and {listdir, stat} <= supports_fd: +if {open, stat} <= supports_dir_fd and {scandir, stat} <= supports_fd: def fwalk(top=".", topdown=True, onerror=None, *, follow_symlinks=False, dir_fd=None): """Directory tree generator. @@ -455,7 +456,8 @@ if {open, stat} <= supports_dir_fd and {listdir, stat} <= supports_fd: top = fspath(top) # Note: To guard against symlink races, we use the standard # lstat()/open()/fstat() trick. - orig_st = stat(top, follow_symlinks=False, dir_fd=dir_fd) + if not follow_symlinks: + orig_st = stat(top, follow_symlinks=False, dir_fd=dir_fd) topfd = open(top, O_RDONLY, dir_fd=dir_fd) try: if (follow_symlinks or (st.S_ISDIR(orig_st.st_mode) and @@ -470,35 +472,41 @@ if {open, stat} <= supports_dir_fd and {listdir, stat} <= supports_fd: # necessary, it can be adapted to only require O(1) FDs, see issue # #13734. - names = listdir(topfd) - if isbytes: - names = map(fsencode, names) - dirs, nondirs = [], [] - for name in names: + scandir_it = scandir(topfd) + dirs = [] + nondirs = [] + entries = None if topdown or follow_symlinks else [] + for entry in scandir_it: + name = entry.name + if isbytes: + name = fsencode(name) try: - # Here, we don't use AT_SYMLINK_NOFOLLOW to be consistent with - # walk() which reports symlinks to directories as directories. - # We do however check for symlinks before recursing into - # a subdirectory. - if st.S_ISDIR(stat(name, dir_fd=topfd).st_mode): + if entry.is_dir(): dirs.append(name) + if entries is not None: + entries.append(entry) else: nondirs.append(name) except OSError: try: # Add dangling symlinks, ignore disappeared files - if st.S_ISLNK(stat(name, dir_fd=topfd, follow_symlinks=False) - .st_mode): + if entry.is_symlink(): nondirs.append(name) except OSError: - continue + pass if topdown: yield toppath, dirs, nondirs, topfd - for name in dirs: + for name in dirs if entries is None else zip(dirs, entries): try: - orig_st = stat(name, dir_fd=topfd, follow_symlinks=follow_symlinks) + if not follow_symlinks: + if topdown: + orig_st = stat(name, dir_fd=topfd, follow_symlinks=False) + else: + assert entries is not None + name, entry = name + orig_st = entry.stat(follow_symlinks=False) dirfd = open(name, O_RDONLY, dir_fd=topfd) except OSError as err: if onerror is not None: diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 83932e6..746b3f8 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -3313,6 +3313,35 @@ class TestScandir(unittest.TestCase): self.assertEqual(entry.path, os.fsencode(os.path.join(self.path, 'file.txt'))) + @unittest.skipUnless(os.listdir in os.supports_fd, + 'fd support for listdir required for this test.') + def test_fd(self): + self.assertIn(os.scandir, os.supports_fd) + self.create_file('file.txt') + expected_names = ['file.txt'] + if support.can_symlink(): + os.symlink('file.txt', os.path.join(self.path, 'link')) + expected_names.append('link') + + fd = os.open(self.path, os.O_RDONLY) + try: + with os.scandir(fd) as it: + entries = list(it) + names = [entry.name for entry in entries] + self.assertEqual(sorted(names), expected_names) + self.assertEqual(names, os.listdir(fd)) + for entry in entries: + self.assertEqual(entry.path, entry.name) + self.assertEqual(os.fspath(entry), entry.name) + self.assertEqual(entry.is_symlink(), entry.name == 'link') + if os.stat in os.supports_dir_fd: + st = os.stat(entry.name, dir_fd=fd) + self.assertEqual(entry.stat(), st) + st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False) + self.assertEqual(entry.stat(follow_symlinks=False), st) + finally: + os.close(fd) + def test_empty_path(self): self.assertRaises(FileNotFoundError, os.scandir, '') @@ -3328,7 +3357,7 @@ class TestScandir(unittest.TestCase): self.assertEqual(len(entries2), 0, entries2) def test_bad_path_type(self): - for obj in [1234, 1.234, {}, []]: + for obj in [1.234, {}, []]: self.assertRaises(TypeError, os.scandir, obj) def test_close(self): |