summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/os.py42
-rw-r--r--Lib/test/test_os.py31
2 files changed, 55 insertions, 18 deletions
diff --git a/Lib/os.py b/Lib/os.py
index 70857c7..e293eca 100644
--- a/Lib/os.py
+++ b/Lib/os.py
@@ -129,6 +129,7 @@ if _exists("_have_functions"):
_add("HAVE_FCHMOD", "chmod")
_add("HAVE_FCHOWN", "chown")
_add("HAVE_FDOPENDIR", "listdir")
+ _add("HAVE_FDOPENDIR", "scandir")
_add("HAVE_FEXECVE", "execve")
_set.add(stat) # fstat always works
_add("HAVE_FTRUNCATE", "truncate")
@@ -416,7 +417,7 @@ def walk(top, topdown=True, onerror=None, followlinks=False):
__all__.append("walk")
-if {open, stat} <= supports_dir_fd and {listdir, stat} <= supports_fd:
+if {open, stat} <= supports_dir_fd and {scandir, stat} <= supports_fd:
def fwalk(top=".", topdown=True, onerror=None, *, follow_symlinks=False, dir_fd=None):
"""Directory tree generator.
@@ -455,7 +456,8 @@ if {open, stat} <= supports_dir_fd and {listdir, stat} <= supports_fd:
top = fspath(top)
# Note: To guard against symlink races, we use the standard
# lstat()/open()/fstat() trick.
- orig_st = stat(top, follow_symlinks=False, dir_fd=dir_fd)
+ if not follow_symlinks:
+ orig_st = stat(top, follow_symlinks=False, dir_fd=dir_fd)
topfd = open(top, O_RDONLY, dir_fd=dir_fd)
try:
if (follow_symlinks or (st.S_ISDIR(orig_st.st_mode) and
@@ -470,35 +472,41 @@ if {open, stat} <= supports_dir_fd and {listdir, stat} <= supports_fd:
# necessary, it can be adapted to only require O(1) FDs, see issue
# #13734.
- names = listdir(topfd)
- if isbytes:
- names = map(fsencode, names)
- dirs, nondirs = [], []
- for name in names:
+ scandir_it = scandir(topfd)
+ dirs = []
+ nondirs = []
+ entries = None if topdown or follow_symlinks else []
+ for entry in scandir_it:
+ name = entry.name
+ if isbytes:
+ name = fsencode(name)
try:
- # Here, we don't use AT_SYMLINK_NOFOLLOW to be consistent with
- # walk() which reports symlinks to directories as directories.
- # We do however check for symlinks before recursing into
- # a subdirectory.
- if st.S_ISDIR(stat(name, dir_fd=topfd).st_mode):
+ if entry.is_dir():
dirs.append(name)
+ if entries is not None:
+ entries.append(entry)
else:
nondirs.append(name)
except OSError:
try:
# Add dangling symlinks, ignore disappeared files
- if st.S_ISLNK(stat(name, dir_fd=topfd, follow_symlinks=False)
- .st_mode):
+ if entry.is_symlink():
nondirs.append(name)
except OSError:
- continue
+ pass
if topdown:
yield toppath, dirs, nondirs, topfd
- for name in dirs:
+ for name in dirs if entries is None else zip(dirs, entries):
try:
- orig_st = stat(name, dir_fd=topfd, follow_symlinks=follow_symlinks)
+ if not follow_symlinks:
+ if topdown:
+ orig_st = stat(name, dir_fd=topfd, follow_symlinks=False)
+ else:
+ assert entries is not None
+ name, entry = name
+ orig_st = entry.stat(follow_symlinks=False)
dirfd = open(name, O_RDONLY, dir_fd=topfd)
except OSError as err:
if onerror is not None:
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 83932e6..746b3f8 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -3313,6 +3313,35 @@ class TestScandir(unittest.TestCase):
self.assertEqual(entry.path,
os.fsencode(os.path.join(self.path, 'file.txt')))
+ @unittest.skipUnless(os.listdir in os.supports_fd,
+ 'fd support for listdir required for this test.')
+ def test_fd(self):
+ self.assertIn(os.scandir, os.supports_fd)
+ self.create_file('file.txt')
+ expected_names = ['file.txt']
+ if support.can_symlink():
+ os.symlink('file.txt', os.path.join(self.path, 'link'))
+ expected_names.append('link')
+
+ fd = os.open(self.path, os.O_RDONLY)
+ try:
+ with os.scandir(fd) as it:
+ entries = list(it)
+ names = [entry.name for entry in entries]
+ self.assertEqual(sorted(names), expected_names)
+ self.assertEqual(names, os.listdir(fd))
+ for entry in entries:
+ self.assertEqual(entry.path, entry.name)
+ self.assertEqual(os.fspath(entry), entry.name)
+ self.assertEqual(entry.is_symlink(), entry.name == 'link')
+ if os.stat in os.supports_dir_fd:
+ st = os.stat(entry.name, dir_fd=fd)
+ self.assertEqual(entry.stat(), st)
+ st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
+ self.assertEqual(entry.stat(follow_symlinks=False), st)
+ finally:
+ os.close(fd)
+
def test_empty_path(self):
self.assertRaises(FileNotFoundError, os.scandir, '')
@@ -3328,7 +3357,7 @@ class TestScandir(unittest.TestCase):
self.assertEqual(len(entries2), 0, entries2)
def test_bad_path_type(self):
- for obj in [1234, 1.234, {}, []]:
+ for obj in [1.234, {}, []]:
self.assertRaises(TypeError, os.scandir, obj)
def test_close(self):