summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/os.py4
-rw-r--r--Lib/shutil.py4
-rw-r--r--Lib/test/test_glob.py12
-rw-r--r--Lib/test/test_os.py82
-rw-r--r--Lib/test/test_shutil.py17
5 files changed, 111 insertions, 8 deletions
diff --git a/Lib/os.py b/Lib/os.py
index 598c9e5..7ee7d69 100644
--- a/Lib/os.py
+++ b/Lib/os.py
@@ -473,7 +473,7 @@ if {open, stat} <= supports_dir_fd and {scandir, stat} <= supports_fd:
# lstat()/open()/fstat() trick.
if not follow_symlinks:
orig_st = stat(top, follow_symlinks=False, dir_fd=dir_fd)
- topfd = open(top, O_RDONLY, dir_fd=dir_fd)
+ topfd = open(top, O_RDONLY | O_NONBLOCK, dir_fd=dir_fd)
try:
if (follow_symlinks or (st.S_ISDIR(orig_st.st_mode) and
path.samestat(orig_st, stat(topfd)))):
@@ -522,7 +522,7 @@ if {open, stat} <= supports_dir_fd and {scandir, stat} <= supports_fd:
assert entries is not None
name, entry = name
orig_st = entry.stat(follow_symlinks=False)
- dirfd = open(name, O_RDONLY, dir_fd=topfd)
+ dirfd = open(name, O_RDONLY | O_NONBLOCK, dir_fd=topfd)
except OSError as err:
if onerror is not None:
onerror(err)
diff --git a/Lib/shutil.py b/Lib/shutil.py
index 9646300..3a2b6be 100644
--- a/Lib/shutil.py
+++ b/Lib/shutil.py
@@ -676,7 +676,7 @@ def _rmtree_safe_fd(topfd, path, onexc):
continue
if is_dir:
try:
- dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd)
+ dirfd = os.open(entry.name, os.O_RDONLY | os.O_NONBLOCK, dir_fd=topfd)
dirfd_closed = False
except OSError as err:
onexc(os.open, fullname, err)
@@ -775,7 +775,7 @@ def rmtree(path, ignore_errors=False, onerror=None, *, onexc=None, dir_fd=None):
onexc(os.lstat, path, err)
return
try:
- fd = os.open(path, os.O_RDONLY, dir_fd=dir_fd)
+ fd = os.open(path, os.O_RDONLY | os.O_NONBLOCK, dir_fd=dir_fd)
fd_closed = False
except Exception as err:
onexc(os.open, path, err)
diff --git a/Lib/test/test_glob.py b/Lib/test/test_glob.py
index 4f4649f..faf5bd8 100644
--- a/Lib/test/test_glob.py
+++ b/Lib/test/test_glob.py
@@ -343,6 +343,18 @@ class GlobTests(unittest.TestCase):
eq(self.rglob('nonexistent', '*'), [])
eq(self.rglob('nonexistent', '**'), [])
+ @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
+ @unittest.skipIf(sys.platform == "vxworks",
+ "fifo requires special path on VxWorks")
+ def test_glob_named_pipe(self):
+ path = os.path.join(self.tempdir, 'mypipe')
+ os.mkfifo(path)
+ self.assertEqual(self.rglob('mypipe'), [path])
+ self.assertEqual(self.rglob('mypipe*'), [path])
+ self.assertEqual(self.rglob('mypipe', ''), [])
+ self.assertEqual(self.rglob('mypipe', 'sub'), [])
+ self.assertEqual(self.rglob('mypipe', '*'), [])
+
def test_glob_many_open_files(self):
depth = 30
base = os.path.join(self.tempdir, 'deep')
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index ad6a607..46aec21 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -1290,6 +1290,7 @@ class EnvironTests(mapping_tests.BasicTestMappingProtocol):
class WalkTests(unittest.TestCase):
"""Tests for os.walk()."""
+ is_fwalk = False
# Wrapper to hide minor differences between os.walk and os.fwalk
# to tests both functions with the same code base
@@ -1324,14 +1325,14 @@ class WalkTests(unittest.TestCase):
self.sub11_path = join(self.sub1_path, "SUB11")
sub2_path = join(self.walk_path, "SUB2")
sub21_path = join(sub2_path, "SUB21")
- tmp1_path = join(self.walk_path, "tmp1")
+ self.tmp1_path = join(self.walk_path, "tmp1")
tmp2_path = join(self.sub1_path, "tmp2")
tmp3_path = join(sub2_path, "tmp3")
tmp5_path = join(sub21_path, "tmp3")
self.link_path = join(sub2_path, "link")
t2_path = join(os_helper.TESTFN, "TEST2")
tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4")
- broken_link_path = join(sub2_path, "broken_link")
+ self.broken_link_path = join(sub2_path, "broken_link")
broken_link2_path = join(sub2_path, "broken_link2")
broken_link3_path = join(sub2_path, "broken_link3")
@@ -1341,13 +1342,13 @@ class WalkTests(unittest.TestCase):
os.makedirs(sub21_path)
os.makedirs(t2_path)
- for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
+ for path in self.tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
with open(path, "x", encoding='utf-8') as f:
f.write("I'm " + path + " and proud of it. Blame test_os.\n")
if os_helper.can_symlink():
os.symlink(os.path.abspath(t2_path), self.link_path)
- os.symlink('broken', broken_link_path, True)
+ os.symlink('broken', self.broken_link_path, True)
os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
self.sub2_tree = (sub2_path, ["SUB21", "link"],
@@ -1443,6 +1444,11 @@ class WalkTests(unittest.TestCase):
else:
self.fail("Didn't follow symlink with followlinks=True")
+ walk_it = self.walk(self.broken_link_path, follow_symlinks=True)
+ if self.is_fwalk:
+ self.assertRaises(FileNotFoundError, next, walk_it)
+ self.assertRaises(StopIteration, next, walk_it)
+
def test_walk_bad_dir(self):
# Walk top-down.
errors = []
@@ -1464,6 +1470,73 @@ class WalkTests(unittest.TestCase):
finally:
os.rename(path1new, path1)
+ def test_walk_bad_dir2(self):
+ walk_it = self.walk('nonexisting')
+ if self.is_fwalk:
+ self.assertRaises(FileNotFoundError, next, walk_it)
+ self.assertRaises(StopIteration, next, walk_it)
+
+ walk_it = self.walk('nonexisting', follow_symlinks=True)
+ if self.is_fwalk:
+ self.assertRaises(FileNotFoundError, next, walk_it)
+ self.assertRaises(StopIteration, next, walk_it)
+
+ walk_it = self.walk(self.tmp1_path)
+ self.assertRaises(StopIteration, next, walk_it)
+
+ walk_it = self.walk(self.tmp1_path, follow_symlinks=True)
+ if self.is_fwalk:
+ self.assertRaises(NotADirectoryError, next, walk_it)
+ self.assertRaises(StopIteration, next, walk_it)
+
+ @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
+ @unittest.skipIf(sys.platform == "vxworks",
+ "fifo requires special path on VxWorks")
+ def test_walk_named_pipe(self):
+ path = os_helper.TESTFN + '-pipe'
+ os.mkfifo(path)
+ self.addCleanup(os.unlink, path)
+
+ walk_it = self.walk(path)
+ self.assertRaises(StopIteration, next, walk_it)
+
+ walk_it = self.walk(path, follow_symlinks=True)
+ if self.is_fwalk:
+ self.assertRaises(NotADirectoryError, next, walk_it)
+ self.assertRaises(StopIteration, next, walk_it)
+
+ @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
+ @unittest.skipIf(sys.platform == "vxworks",
+ "fifo requires special path on VxWorks")
+ def test_walk_named_pipe2(self):
+ path = os_helper.TESTFN + '-dir'
+ os.mkdir(path)
+ self.addCleanup(shutil.rmtree, path)
+ os.mkfifo(os.path.join(path, 'mypipe'))
+
+ errors = []
+ walk_it = self.walk(path, onerror=errors.append)
+ next(walk_it)
+ self.assertRaises(StopIteration, next, walk_it)
+ self.assertEqual(errors, [])
+
+ errors = []
+ walk_it = self.walk(path, onerror=errors.append)
+ root, dirs, files = next(walk_it)
+ self.assertEqual(root, path)
+ self.assertEqual(dirs, [])
+ self.assertEqual(files, ['mypipe'])
+ dirs.extend(files)
+ files.clear()
+ if self.is_fwalk:
+ self.assertRaises(NotADirectoryError, next, walk_it)
+ self.assertRaises(StopIteration, next, walk_it)
+ if self.is_fwalk:
+ self.assertEqual(errors, [])
+ else:
+ self.assertEqual(len(errors), 1, errors)
+ self.assertIsInstance(errors[0], NotADirectoryError)
+
def test_walk_many_open_files(self):
depth = 30
base = os.path.join(os_helper.TESTFN, 'deep')
@@ -1529,6 +1602,7 @@ class WalkTests(unittest.TestCase):
@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
class FwalkTests(WalkTests):
"""Tests for os.fwalk()."""
+ is_fwalk = True
def walk(self, top, **kwargs):
for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
index bf60f37..49fcd78 100644
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -669,6 +669,23 @@ class TestRmTree(BaseTest, unittest.TestCase):
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
+ @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
+ @unittest.skipIf(sys.platform == "vxworks",
+ "fifo requires special path on VxWorks")
+ def test_rmtree_on_named_pipe(self):
+ os.mkfifo(TESTFN)
+ try:
+ with self.assertRaises(NotADirectoryError):
+ shutil.rmtree(TESTFN)
+ self.assertTrue(os.path.exists(TESTFN))
+ finally:
+ os.unlink(TESTFN)
+
+ os.mkdir(TESTFN)
+ os.mkfifo(os.path.join(TESTFN, 'mypipe'))
+ shutil.rmtree(TESTFN)
+ self.assertFalse(os.path.exists(TESTFN))
+
class TestCopyTree(BaseTest, unittest.TestCase):