diff options
author | Larry Hastings <larry@hastings.org> | 2012-06-22 23:30:09 (GMT) |
---|---|---|
committer | Larry Hastings <larry@hastings.org> | 2012-06-22 23:30:09 (GMT) |
commit | 9cf065cfdc4245ea7e31edcb2e6ede0cea47d148 (patch) | |
tree | 22d8450865a023586034d555ae72e3753f95e84c /Lib | |
parent | f0f4742b495554238d1204ce0002c1ef1ba23507 (diff) | |
download | cpython-9cf065cfdc4245ea7e31edcb2e6ede0cea47d148.zip cpython-9cf065cfdc4245ea7e31edcb2e6ede0cea47d148.tar.gz cpython-9cf065cfdc4245ea7e31edcb2e6ede0cea47d148.tar.bz2 |
Issue #14626: Large refactoring of functions / parameters in the os module.
Many functions now support "dir_fd" and "follow_symlinks" parameters;
some also support accepting an open file descriptor in place of of a path
string. Added os.support_* collections as LBYL helpers. Removed many
functions only previously seen in 3.3 alpha releases (often starting with
"f" or "l", or ending with "at"). Originally suggested by Serhiy Storchaka;
implemented by Larry Hastings.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/os.py | 117 | ||||
-rw-r--r-- | Lib/shutil.py | 63 | ||||
-rw-r--r-- | Lib/test/support.py | 4 | ||||
-rw-r--r-- | Lib/test/test_os.py | 168 | ||||
-rw-r--r-- | Lib/test/test_posix.py | 224 | ||||
-rw-r--r-- | Lib/test/test_shutil.py | 12 |
6 files changed, 340 insertions, 248 deletions
@@ -56,9 +56,10 @@ if 'posix' in _names: pass import posixpath as path - import posix - __all__.extend(_get_exports_list(posix)) - del posix + try: + from posix import _have_functions + except ImportError: + pass elif 'nt' in _names: name = 'nt' @@ -75,6 +76,11 @@ elif 'nt' in _names: __all__.extend(_get_exports_list(nt)) del nt + try: + from nt import _have_functions + except ImportError: + pass + elif 'os2' in _names: name = 'os2' linesep = '\r\n' @@ -94,6 +100,11 @@ elif 'os2' in _names: __all__.extend(_get_exports_list(os2)) del os2 + try: + from os2 import _have_functions + except ImportError: + pass + elif 'ce' in _names: name = 'ce' linesep = '\r\n' @@ -110,6 +121,11 @@ elif 'ce' in _names: __all__.extend(_get_exports_list(ce)) del ce + try: + from ce import _have_functions + except ImportError: + pass + else: raise ImportError('no os specific module found') @@ -119,6 +135,84 @@ from os.path import (curdir, pardir, sep, pathsep, defpath, extsep, altsep, del _names + +if _exists("_have_functions"): + _globals = globals() + def _add(str, fn): + if (fn in _globals) and (str in _have_functions): + _set.add(_globals[fn]) + + _set = set() + _add("HAVE_FACCESSAT", "access") + _add("HAVE_FCHMODAT", "chmod") + _add("HAVE_FCHOWNAT", "chown") + _add("HAVE_FSTATAT", "stat") + _add("HAVE_FUTIMESAT", "utime") + _add("HAVE_LINKAT", "link") + _add("HAVE_MKDIRAT", "mkdir") + _add("HAVE_MKFIFOAT", "mkfifo") + _add("HAVE_MKNODAT", "mknod") + _add("HAVE_OPENAT", "open") + _add("HAVE_READLINKAT", "readlink") + _add("HAVE_RENAMEAT", "rename") + _add("HAVE_SYMLINKAT", "symlink") + _add("HAVE_UNLINKAT", "unlink") + _add("HAVE_UTIMENSAT", "utime") + supports_dir_fd = _set + + _set = set() + _add("HAVE_FACCESSAT", "access") + supports_effective_ids = _set + + _set = set() + _add("HAVE_FCHDIR", "chdir") + _add("HAVE_FCHMOD", "chmod") + _add("HAVE_FCHOWN", "chown") + _add("HAVE_FDOPENDIR", "listdir") + _add("HAVE_FEXECVE", "execve") + _set.add(stat) # fstat always works + _add("HAVE_FUTIMENS", "utime") + _add("HAVE_FUTIMES", "utime") + if _exists("statvfs") and _exists("fstatvfs"): # mac os x10.3 + _add("HAVE_FSTATVFS", "statvfs") + supports_fd = _set + + _set = set() + _add("HAVE_FACCESSAT", "access") + # Current linux (kernel 3.2, glibc 2.15) doesn't support lchmod. + # (The function exists, but it's a stub that always returns ENOSUP.) + # Now, linux *does* have fchmodat, which says it can ignore + # symbolic links. But that doesn't work either (also returns ENOSUP). + # I'm guessing that if they fix fchmodat, they'll also add lchmod at + # the same time. So, for now, assume that fchmodat doesn't support + # follow_symlinks unless lchmod works. + if ((sys.platform != "linux") or + ("HAVE_LCHMOD" in _have_functions)): + _add("HAVE_FCHMODAT", "chmod") + _add("HAVE_FCHOWNAT", "chown") + _add("HAVE_FSTATAT", "stat") + _add("HAVE_LCHFLAGS", "chflags") + _add("HAVE_LCHMOD", "chmod") + if _exists("lchown"): # mac os x10.3 + _add("HAVE_LCHOWN", "chown") + _add("HAVE_LINKAT", "link") + _add("HAVE_LUTIMES", "utime") + _add("HAVE_LSTAT", "stat") + _add("HAVE_FSTATAT", "stat") + _add("HAVE_UTIMENSAT", "utime") + _add("MS_WINDOWS", "stat") + supports_follow_symlinks = _set + + _set = set() + _add("HAVE_UNLINKAT", "unlink") + supports_remove_directory = _set + + del _set + del _have_functions + del _globals + del _add + + # Python uses fixed values for the SEEK_ constants; they are mapped # to native constants if necessary in posixmodule.c # Other possible SEEK values are directly imported from posixmodule.c @@ -318,7 +412,7 @@ def walk(top, topdown=True, onerror=None, followlinks=False): __all__.append("walk") -if _exists("openat"): +if open in supports_dir_fd: def fwalk(top, topdown=True, onerror=None, followlinks=False): """Directory tree generator. @@ -343,7 +437,7 @@ if _exists("openat"): import os for root, dirs, files, rootfd in os.fwalk('python/Lib/email'): print(root, "consumes", end="") - print(sum([os.fstatat(rootfd, name).st_size for name in files]), + print(sum([os.stat(name, dir_fd=rootfd).st_size for name in files]), end="") print("bytes in", len(files), "non-directory files") if 'CVS' in dirs: @@ -365,10 +459,7 @@ if _exists("openat"): # necessary, it can be adapted to only require O(1) FDs, see issue # #13734. - # whether to follow symlinks - flag = 0 if followlinks else AT_SYMLINK_NOFOLLOW - - names = flistdir(topfd) + names = listdir(topfd) dirs, nondirs = [], [] for name in names: try: @@ -376,14 +467,14 @@ if _exists("openat"): # walk() which reports symlinks to directories as directories. # We do however check for symlinks before recursing into # a subdirectory. - if st.S_ISDIR(fstatat(topfd, name).st_mode): + if st.S_ISDIR(stat(name, dir_fd=topfd).st_mode): dirs.append(name) else: nondirs.append(name) except FileNotFoundError: try: # Add dangling symlinks, ignore disappeared files - if st.S_ISLNK(fstatat(topfd, name, AT_SYMLINK_NOFOLLOW) + if st.S_ISLNK(stat(name, dir_fd=topfd, follow_symlinks=False) .st_mode): nondirs.append(name) except FileNotFoundError: @@ -394,8 +485,8 @@ if _exists("openat"): for name in dirs: try: - orig_st = fstatat(topfd, name, flag) - dirfd = openat(topfd, name, O_RDONLY) + orig_st = stat(name, dir_fd=topfd, follow_symlinks=followlinks) + dirfd = open(name, O_RDONLY, dir_fd=topfd) except error as err: if onerror is not None: onerror(err) diff --git a/Lib/shutil.py b/Lib/shutil.py index 8da46d1..401e9ea 100644 --- a/Lib/shutil.py +++ b/Lib/shutil.py @@ -139,27 +139,45 @@ def copystat(src, dst, symlinks=False): only if both `src` and `dst` are symlinks. """ - def _nop(*args, ns=None): + def _nop(*args, ns=None, follow_symlinks=None): pass - if symlinks and os.path.islink(src) and os.path.islink(dst): - stat_func = os.lstat - utime_func = os.lutimes if hasattr(os, 'lutimes') else _nop - chmod_func = os.lchmod if hasattr(os, 'lchmod') else _nop - chflags_func = os.lchflags if hasattr(os, 'lchflags') else _nop + # follow symlinks (aka don't not follow symlinks) + follow = not (symlinks and os.path.islink(src) and os.path.islink(dst)) + if follow: + # use the real function if it exists + def lookup(name): + return getattr(os, name, _nop) else: - stat_func = os.stat - utime_func = os.utime if hasattr(os, 'utime') else _nop - chmod_func = os.chmod if hasattr(os, 'chmod') else _nop - chflags_func = os.chflags if hasattr(os, 'chflags') else _nop - - st = stat_func(src) + # use the real function only if it exists + # *and* it supports follow_symlinks + def lookup(name): + fn = getattr(os, name, _nop) + if fn in os.supports_follow_symlinks: + return fn + return _nop + + st = lookup("stat")(src, follow_symlinks=follow) mode = stat.S_IMODE(st.st_mode) - utime_func(dst, ns=(st.st_atime_ns, st.st_mtime_ns)) - chmod_func(dst, mode) + lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), + follow_symlinks=follow) + try: + lookup("chmod")(dst, mode, follow_symlinks=follow) + except NotImplementedError: + # if we got a NotImplementedError, it's because + # * follow_symlinks=False, + # * lchown() is unavailable, and + # * either + # * fchownat() is unvailable or + # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. + # (it returned ENOSUP.) + # therefore we're out of options--we simply cannot chown the + # symlink. give up, suppress the error. + # (which is what shutil always did in this circumstance.) + pass if hasattr(st, 'st_flags'): try: - chflags_func(dst, st.st_flags) + lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) except OSError as why: for err in 'EOPNOTSUPP', 'ENOTSUP': if hasattr(errno, err) and why.errno == getattr(errno, err): @@ -176,20 +194,11 @@ if hasattr(os, 'listxattr'): If the optional flag `symlinks` is set, symlinks won't be followed. """ - if symlinks: - listxattr = os.llistxattr - removexattr = os.lremovexattr - setxattr = os.lsetxattr - getxattr = os.lgetxattr - else: - listxattr = os.listxattr - removexattr = os.removexattr - setxattr = os.setxattr - getxattr = os.getxattr - for attr in listxattr(src): + for name in os.listxattr(src, follow_symlinks=symlinks): try: - setxattr(dst, attr, getxattr(src, attr)) + value = os.getxattr(src, name, follow_symlinks=symlinks) + os.setxattr(dst, name, value, follow_symlinks=symlinks) except OSError as e: if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA): raise diff --git a/Lib/test/support.py b/Lib/test/support.py index 3ff1df5..ddd3ab6 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -1703,8 +1703,8 @@ def can_xattr(): try: # TESTFN & tempfile may use different file systems with # different capabilities - os.fsetxattr(tmp_fp, b"user.test", b"") - os.fsetxattr(fp.fileno(), b"user.test", b"") + os.setxattr(tmp_fp, b"user.test", b"") + os.setxattr(fp.fileno(), b"user.test", b"") # Kernels < 2.6.39 don't respect setxattr flags. kernel_version = platform.release() m = re.match("2.6.(\d{1,2})", kernel_version) diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index d47c8d3..cc1120f 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -345,40 +345,36 @@ class StatAttributeTests(unittest.TestCase): return os.utime(file, ns=times) self._test_utime_ns(utime_ns) - requires_lutimes = unittest.skipUnless(hasattr(os, 'lutimes'), - "os.lutimes required for this test.") - requires_futimes = unittest.skipUnless(hasattr(os, 'futimes'), - "os.futimes required for this test.") - - @requires_lutimes + requires_utime_dir_fd = unittest.skipUnless( + os.utime in os.supports_dir_fd, + "dir_fd support for utime required for this test.") + requires_utime_fd = unittest.skipUnless( + os.utime in os.supports_fd, + "fd support for utime required for this test.") + requires_utime_nofollow_symlinks = unittest.skipUnless( + os.utime in os.supports_follow_symlinks, + "follow_symlinks support for utime required for this test.") + + @requires_utime_nofollow_symlinks def test_lutimes_ns(self): def lutimes_ns(file, times): - return os.lutimes(file, ns=times) + return os.utime(file, ns=times, follow_symlinks=False) self._test_utime_ns(lutimes_ns) - @requires_futimes + @requires_utime_fd def test_futimes_ns(self): def futimes_ns(file, times): with open(file, "wb") as f: - os.futimes(f.fileno(), ns=times) + os.utime(f.fileno(), ns=times) self._test_utime_ns(futimes_ns, test_dir=False) def _utime_invalid_arguments(self, name, arg): - with self.assertRaises(RuntimeError): + with self.assertRaises(ValueError): getattr(os, name)(arg, (5, 5), ns=(5, 5)) def test_utime_invalid_arguments(self): self._utime_invalid_arguments('utime', self.fname) - @requires_lutimes - def test_lutimes_invalid_arguments(self): - self._utime_invalid_arguments('lutimes', self.fname) - - @requires_futimes - def test_futimes_invalid_arguments(self): - with open(self.fname, "wb") as f: - self._utime_invalid_arguments('futimes', f.fileno()) - @unittest.skipUnless(stat_supports_subsecond, "os.stat() doesn't has a subsecond resolution") @@ -402,64 +398,46 @@ class StatAttributeTests(unittest.TestCase): os.utime(filename, (atime, mtime)) self._test_utime_subsecond(set_time) - @requires_futimes + @requires_utime_fd def test_futimes_subsecond(self): def set_time(filename, atime, mtime): with open(filename, "wb") as f: - os.futimes(f.fileno(), (atime, mtime)) + os.utime(f.fileno(), times=(atime, mtime)) self._test_utime_subsecond(set_time) - @unittest.skipUnless(hasattr(os, 'futimens'), - "os.futimens required for this test.") + @requires_utime_fd def test_futimens_subsecond(self): def set_time(filename, atime, mtime): with open(filename, "wb") as f: - asec, ansec = divmod(atime, 1.0) - asec = int(asec) - ansec = int(ansec * 1e9) - msec, mnsec = divmod(mtime, 1.0) - msec = int(msec) - mnsec = int(mnsec * 1e9) - os.futimens(f.fileno(), - (asec, ansec), - (msec, mnsec)) + os.utime(f.fileno(), times=(atime, mtime)) self._test_utime_subsecond(set_time) - @unittest.skipUnless(hasattr(os, 'futimesat'), - "os.futimesat required for this test.") + @requires_utime_dir_fd def test_futimesat_subsecond(self): def set_time(filename, atime, mtime): dirname = os.path.dirname(filename) dirfd = os.open(dirname, os.O_RDONLY) try: - os.futimesat(dirfd, os.path.basename(filename), - (atime, mtime)) + os.utime(os.path.basename(filename), dir_fd=dirfd, + times=(atime, mtime)) finally: os.close(dirfd) self._test_utime_subsecond(set_time) - @requires_lutimes + @requires_utime_nofollow_symlinks def test_lutimes_subsecond(self): def set_time(filename, atime, mtime): - os.lutimes(filename, (atime, mtime)) + os.utime(filename, (atime, mtime), follow_symlinks=False) self._test_utime_subsecond(set_time) - @unittest.skipUnless(hasattr(os, 'utimensat'), - "os.utimensat required for this test.") + @requires_utime_dir_fd def test_utimensat_subsecond(self): def set_time(filename, atime, mtime): dirname = os.path.dirname(filename) dirfd = os.open(dirname, os.O_RDONLY) try: - asec, ansec = divmod(atime, 1.0) - asec = int(asec) - ansec = int(ansec * 1e9) - msec, mnsec = divmod(mtime, 1.0) - msec = int(msec) - mnsec = int(mnsec * 1e9) - os.utimensat(dirfd, os.path.basename(filename), - (asec, ansec), - (msec, mnsec)) + os.utime(os.path.basename(filename), dir_fd=dirfd, + times=(atime, mtime)) finally: os.close(dirfd) self._test_utime_subsecond(set_time) @@ -782,8 +760,10 @@ class FwalkTests(WalkTests): for root, dirs, files, rootfd in os.fwalk(*args): # check that the FD is valid os.fstat(rootfd) - # check that flistdir() returns consistent information - self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files)) + # redundant check + os.stat(rootfd) + # check that listdir() returns consistent information + self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files)) def test_fd_leak(self): # Since we're opening a lot of FDs, we must be careful to avoid leaks: @@ -802,13 +782,10 @@ class FwalkTests(WalkTests): # cleanup for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False): for name in files: - os.unlinkat(rootfd, name) + os.unlink(name, dir_fd=rootfd) for name in dirs: - st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW) - if stat.S_ISDIR(st.st_mode): - os.unlinkat(rootfd, name, os.AT_REMOVEDIR) - else: - os.unlinkat(rootfd, name) + st = os.stat(name, dir_fd=rootfd, follow_symlinks=False) + os.unlink(name, dir_fd=rootfd, rmdir=stat.S_ISDIR(st.st_mode)) os.rmdir(support.TESTFN) @@ -1262,6 +1239,13 @@ if sys.platform != 'win32': expected = self.unicodefn found = set(os.listdir(self.dir)) self.assertEqual(found, expected) + # test listdir without arguments + current_directory = os.getcwd() + try: + os.chdir(os.sep) + self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) + finally: + os.chdir(current_directory) def test_open(self): for fn in self.unicodefn: @@ -1846,79 +1830,97 @@ class TestSendfile(unittest.TestCase): raise -@support.skip_unless_xattr +def supports_extended_attributes(): + if not hasattr(os, "setxattr"): + return False + try: + with open(support.TESTFN, "wb") as fp: + try: + os.setxattr(fp.fileno(), b"user.test", b"") + except OSError: + return False + finally: + support.unlink(support.TESTFN) + # Kernels < 2.6.39 don't respect setxattr flags. + kernel_version = platform.release() + m = re.match("2.6.(\d{1,2})", kernel_version) + return m is None or int(m.group(1)) >= 39 + + +@unittest.skipUnless(supports_extended_attributes(), + "no non-broken extended attribute support") class ExtendedAttributeTests(unittest.TestCase): def tearDown(self): support.unlink(support.TESTFN) - def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr): + def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): fn = support.TESTFN open(fn, "wb").close() with self.assertRaises(OSError) as cm: - getxattr(fn, s("user.test")) + getxattr(fn, s("user.test"), **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) init_xattr = listxattr(fn) self.assertIsInstance(init_xattr, list) - setxattr(fn, s("user.test"), b"") + setxattr(fn, s("user.test"), b"", **kwargs) xattr = set(init_xattr) xattr.add("user.test") self.assertEqual(set(listxattr(fn)), xattr) - self.assertEqual(getxattr(fn, b"user.test"), b"") - setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE) - self.assertEqual(getxattr(fn, b"user.test"), b"hello") + self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") + setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) + self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") with self.assertRaises(OSError) as cm: - setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE) + setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) self.assertEqual(cm.exception.errno, errno.EEXIST) with self.assertRaises(OSError) as cm: - setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE) + setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) - setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE) + setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) xattr.add("user.test2") self.assertEqual(set(listxattr(fn)), xattr) - removexattr(fn, s("user.test")) + removexattr(fn, s("user.test"), **kwargs) with self.assertRaises(OSError) as cm: - getxattr(fn, s("user.test")) + getxattr(fn, s("user.test"), **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) xattr.remove("user.test") self.assertEqual(set(listxattr(fn)), xattr) - self.assertEqual(getxattr(fn, s("user.test2")), b"foo") - setxattr(fn, s("user.test"), b"a"*1024) - self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024) - removexattr(fn, s("user.test")) + self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") + setxattr(fn, s("user.test"), b"a"*1024, **kwargs) + self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024) + removexattr(fn, s("user.test"), **kwargs) many = sorted("user.test{}".format(i) for i in range(100)) for thing in many: - setxattr(fn, thing, b"x") + setxattr(fn, thing, b"x", **kwargs) self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) - def _check_xattrs(self, *args): + def _check_xattrs(self, *args, **kwargs): def make_bytes(s): return bytes(s, "ascii") - self._check_xattrs_str(str, *args) + self._check_xattrs_str(str, *args, **kwargs) support.unlink(support.TESTFN) - self._check_xattrs_str(make_bytes, *args) + self._check_xattrs_str(make_bytes, *args, **kwargs) def test_simple(self): self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, os.listxattr) def test_lpath(self): - self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr, - os.llistxattr) + self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, + os.listxattr, follow_symlinks=False) def test_fds(self): def getxattr(path, *args): with open(path, "rb") as fp: - return os.fgetxattr(fp.fileno(), *args) + return os.getxattr(fp.fileno(), *args) def setxattr(path, *args): with open(path, "wb") as fp: - os.fsetxattr(fp.fileno(), *args) + os.setxattr(fp.fileno(), *args) def removexattr(path, *args): with open(path, "wb") as fp: - os.fremovexattr(fp.fileno(), *args) + os.removexattr(fp.fileno(), *args) def listxattr(path, *args): with open(path, "rb") as fp: - return os.flistxattr(fp.fileno(), *args) + return os.listxattr(fp.fileno(), *args) self._check_xattrs(getxattr, setxattr, removexattr, listxattr) diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py index ae46866..ffa58ee 100644 --- a/Lib/test/test_posix.py +++ b/Lib/test/test_posix.py @@ -129,6 +129,7 @@ class PosixTester(unittest.TestCase): fp = open(support.TESTFN) try: self.assertTrue(posix.fstatvfs(fp.fileno())) + self.assertTrue(posix.statvfs(fp.fileno())) finally: fp.close() @@ -150,7 +151,7 @@ class PosixTester(unittest.TestCase): fp.flush() posix.truncate(support.TESTFN, 0) - @unittest.skipUnless(hasattr(posix, 'fexecve'), "test needs posix.fexecve()") + @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter") @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") def test_fexecve(self): @@ -159,7 +160,7 @@ class PosixTester(unittest.TestCase): pid = os.fork() if pid == 0: os.chdir(os.path.split(sys.executable)[0]) - posix.fexecve(fp, [sys.executable, '-c', 'pass'], os.environ) + posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ) else: self.assertEqual(os.waitpid(pid, 0), (pid, 0)) finally: @@ -234,45 +235,37 @@ class PosixTester(unittest.TestCase): finally: os.close(fd) - @unittest.skipUnless(hasattr(posix, 'futimes'), "test needs posix.futimes()") - def test_futimes(self): + @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime") + def test_utime_with_fd(self): now = time.time() fd = os.open(support.TESTFN, os.O_RDONLY) try: - posix.futimes(fd, None) - posix.futimes(fd) - self.assertRaises(TypeError, posix.futimes, fd, (None, None)) - self.assertRaises(TypeError, posix.futimes, fd, (now, None)) - self.assertRaises(TypeError, posix.futimes, fd, (None, now)) - posix.futimes(fd, (int(now), int(now))) - posix.futimes(fd, (now, now)) + posix.utime(fd) + posix.utime(fd, None) + self.assertRaises(TypeError, posix.utime, fd, (None, None)) + self.assertRaises(TypeError, posix.utime, fd, (now, None)) + self.assertRaises(TypeError, posix.utime, fd, (None, now)) + posix.utime(fd, (int(now), int(now))) + posix.utime(fd, (now, now)) + self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now)) + self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None)) + self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0)) + posix.utime(fd, (int(now), int((now - int(now)) * 1e9))) + posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9))) + finally: os.close(fd) - @unittest.skipUnless(hasattr(posix, 'lutimes'), "test needs posix.lutimes()") - def test_lutimes(self): - now = time.time() - posix.lutimes(support.TESTFN, None) - self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, None)) - self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (now, None)) - self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, now)) - posix.lutimes(support.TESTFN, (int(now), int(now))) - posix.lutimes(support.TESTFN, (now, now)) - posix.lutimes(support.TESTFN) - - @unittest.skipUnless(hasattr(posix, 'futimens'), "test needs posix.futimens()") - def test_futimens(self): + @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime") + def test_utime_nofollow_symlinks(self): now = time.time() - fd = os.open(support.TESTFN, os.O_RDONLY) - try: - self.assertRaises(TypeError, posix.futimens, fd, (None, None), (None, None)) - self.assertRaises(TypeError, posix.futimens, fd, (now, 0), None) - self.assertRaises(TypeError, posix.futimens, fd, None, (now, 0)) - posix.futimens(fd, (int(now), int((now - int(now)) * 1e9)), - (int(now), int((now - int(now)) * 1e9))) - posix.futimens(fd) - finally: - os.close(fd) + posix.utime(support.TESTFN, None, follow_symlinks=False) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), follow_symlinks=False) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), follow_symlinks=False) + posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False) + posix.utime(support.TESTFN, (now, now), follow_symlinks=False) + posix.utime(support.TESTFN, follow_symlinks=False) @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") def test_writev(self): @@ -364,6 +357,7 @@ class PosixTester(unittest.TestCase): fp = open(support.TESTFN) try: self.assertTrue(posix.fstat(fp.fileno())) + self.assertTrue(posix.stat(fp.fileno())) finally: fp.close() @@ -462,18 +456,18 @@ class PosixTester(unittest.TestCase): if hasattr(posix, 'listdir'): self.assertTrue(support.TESTFN in posix.listdir()) - @unittest.skipUnless(hasattr(posix, 'flistdir'), "test needs posix.flistdir()") + @unittest.skipUnless(os.listdir in os.supports_fd, "test needs fd support for os.listdir()") def test_flistdir(self): f = posix.open(posix.getcwd(), posix.O_RDONLY) self.addCleanup(posix.close, f) self.assertEqual( sorted(posix.listdir('.')), - sorted(posix.flistdir(f)) + sorted(posix.listdir(f)) ) # Check that the fd offset was reset (issue #13739) self.assertEqual( sorted(posix.listdir('.')), - sorted(posix.flistdir(f)) + sorted(posix.listdir(f)) ) def test_access(self): @@ -532,10 +526,10 @@ class PosixTester(unittest.TestCase): posix.utime(support.TESTFN, (int(now), int(now))) posix.utime(support.TESTFN, (now, now)) - def _test_chflags_regular_file(self, chflags_func, target_file): + def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): st = os.stat(target_file) self.assertTrue(hasattr(st, 'st_flags')) - chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE) + chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE, **kwargs) try: new_st = os.stat(target_file) self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) @@ -553,6 +547,7 @@ class PosixTester(unittest.TestCase): @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') def test_lchflags_regular_file(self): self._test_chflags_regular_file(posix.lchflags, support.TESTFN) + self._test_chflags_regular_file(posix.chflags, support.TESTFN, follow_symlinks=False) @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') def test_lchflags_symlink(self): @@ -564,17 +559,21 @@ class PosixTester(unittest.TestCase): self.teardown_files.append(_DUMMY_SYMLINK) dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) - posix.lchflags(_DUMMY_SYMLINK, - dummy_symlink_st.st_flags | stat.UF_IMMUTABLE) - try: - new_testfn_st = os.stat(support.TESTFN) - new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) + def chflags_nofollow(path, flags): + return posix.chflags(path, flags, follow_symlinks=False) - self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) - self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, - new_dummy_symlink_st.st_flags) - finally: - posix.lchflags(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) + for fn in (posix.lchflags, chflags_nofollow): + fn(_DUMMY_SYMLINK, + dummy_symlink_st.st_flags | stat.UF_IMMUTABLE) + try: + new_testfn_st = os.stat(support.TESTFN) + new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) + + self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) + self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, + new_dummy_symlink_st.st_flags) + finally: + fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) def test_environ(self): if os.name == "nt": @@ -657,40 +656,40 @@ class PosixTester(unittest.TestCase): # tests for the posix *at functions follow - @unittest.skipUnless(hasattr(posix, 'faccessat'), "test needs posix.faccessat()") - def test_faccessat(self): + @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") + def test_access_dir_fd(self): f = posix.open(posix.getcwd(), posix.O_RDONLY) try: - self.assertTrue(posix.faccessat(f, support.TESTFN, os.R_OK)) + self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f)) finally: posix.close(f) - @unittest.skipUnless(hasattr(posix, 'fchmodat'), "test needs posix.fchmodat()") - def test_fchmodat(self): + @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") + def test_chmod_dir_fd(self): os.chmod(support.TESTFN, stat.S_IRUSR) f = posix.open(posix.getcwd(), posix.O_RDONLY) try: - posix.fchmodat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR) + posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) s = posix.stat(support.TESTFN) self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) finally: posix.close(f) - @unittest.skipUnless(hasattr(posix, 'fchownat'), "test needs posix.fchownat()") - def test_fchownat(self): + @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()") + def test_chown_dir_fd(self): support.unlink(support.TESTFN) support.create_empty_file(support.TESTFN) f = posix.open(posix.getcwd(), posix.O_RDONLY) try: - posix.fchownat(f, support.TESTFN, os.getuid(), os.getgid()) + posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f) finally: posix.close(f) - @unittest.skipUnless(hasattr(posix, 'fstatat'), "test needs posix.fstatat()") - def test_fstatat(self): + @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") + def test_stat_dir_fd(self): support.unlink(support.TESTFN) with open(support.TESTFN, 'w') as outfile: outfile.write("testline\n") @@ -698,31 +697,41 @@ class PosixTester(unittest.TestCase): f = posix.open(posix.getcwd(), posix.O_RDONLY) try: s1 = posix.stat(support.TESTFN) - s2 = posix.fstatat(f, support.TESTFN) + s2 = posix.stat(support.TESTFN, dir_fd=f) self.assertEqual(s1, s2) finally: posix.close(f) - @unittest.skipUnless(hasattr(posix, 'futimesat'), "test needs posix.futimesat()") - def test_futimesat(self): + @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") + def test_utime_dir_fd(self): f = posix.open(posix.getcwd(), posix.O_RDONLY) try: now = time.time() - posix.futimesat(f, support.TESTFN, None) - posix.futimesat(f, support.TESTFN) - self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, None)) - self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (now, None)) - self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, now)) - posix.futimesat(f, support.TESTFN, (int(now), int(now))) - posix.futimesat(f, support.TESTFN, (now, now)) + posix.utime(support.TESTFN, None, dir_fd=f) + posix.utime(support.TESTFN, dir_fd=f) + self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f) + posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f) + posix.utime(support.TESTFN, (now, now), dir_fd=f) + posix.utime(support.TESTFN, + (int(now), int((now - int(now)) * 1e9)), dir_fd=f) + posix.utime(support.TESTFN, dir_fd=f, + times=(int(now), int((now - int(now)) * 1e9))) + + if os.utime in os.supports_follow_symlinks: + posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f) + finally: posix.close(f) - @unittest.skipUnless(hasattr(posix, 'linkat'), "test needs posix.linkat()") - def test_linkat(self): + @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()") + def test_link_dir_fd(self): f = posix.open(posix.getcwd(), posix.O_RDONLY) try: - posix.linkat(f, support.TESTFN, f, support.TESTFN + 'link') + posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f) # should have same inodes self.assertEqual(posix.stat(support.TESTFN)[1], posix.stat(support.TESTFN + 'link')[1]) @@ -730,26 +739,26 @@ class PosixTester(unittest.TestCase): posix.close(f) support.unlink(support.TESTFN + 'link') - @unittest.skipUnless(hasattr(posix, 'mkdirat'), "test needs posix.mkdirat()") - def test_mkdirat(self): + @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") + def test_mkdir_dir_fd(self): f = posix.open(posix.getcwd(), posix.O_RDONLY) try: - posix.mkdirat(f, support.TESTFN + 'dir') + posix.mkdir(support.TESTFN + 'dir', dir_fd=f) posix.stat(support.TESTFN + 'dir') # should not raise exception finally: posix.close(f) support.rmtree(support.TESTFN + 'dir') - @unittest.skipUnless(hasattr(posix, 'mknodat') and hasattr(stat, 'S_IFIFO'), - "don't have mknodat()/S_IFIFO") - def test_mknodat(self): + @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'), + "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") + def test_mknod_dir_fd(self): # Test using mknodat() to create a FIFO (the only use specified # by POSIX). support.unlink(support.TESTFN) mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR f = posix.open(posix.getcwd(), posix.O_RDONLY) try: - posix.mknodat(f, support.TESTFN, mode, 0) + posix.mknod(support.TESTFN, mode, 0, dir_fd=f) except OSError as e: # Some old systems don't allow unprivileged users to use # mknod(), or only support creating device nodes. @@ -759,13 +768,13 @@ class PosixTester(unittest.TestCase): finally: posix.close(f) - @unittest.skipUnless(hasattr(posix, 'openat'), "test needs posix.openat()") - def test_openat(self): + @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") + def test_open_dir_fd(self): support.unlink(support.TESTFN) with open(support.TESTFN, 'w') as outfile: outfile.write("testline\n") a = posix.open(posix.getcwd(), posix.O_RDONLY) - b = posix.openat(a, support.TESTFN, posix.O_RDONLY) + b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a) try: res = posix.read(b, 9).decode(encoding="utf-8") self.assertEqual("testline\n", res) @@ -773,24 +782,24 @@ class PosixTester(unittest.TestCase): posix.close(a) posix.close(b) - @unittest.skipUnless(hasattr(posix, 'readlinkat'), "test needs posix.readlinkat()") - def test_readlinkat(self): + @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()") + def test_readlink_dir_fd(self): os.symlink(support.TESTFN, support.TESTFN + 'link') f = posix.open(posix.getcwd(), posix.O_RDONLY) try: self.assertEqual(posix.readlink(support.TESTFN + 'link'), - posix.readlinkat(f, support.TESTFN + 'link')) + posix.readlink(support.TESTFN + 'link', dir_fd=f)) finally: support.unlink(support.TESTFN + 'link') posix.close(f) - @unittest.skipUnless(hasattr(posix, 'renameat'), "test needs posix.renameat()") - def test_renameat(self): + @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") + def test_rename_dir_fd(self): support.unlink(support.TESTFN) support.create_empty_file(support.TESTFN + 'ren') f = posix.open(posix.getcwd(), posix.O_RDONLY) try: - posix.renameat(f, support.TESTFN + 'ren', f, support.TESTFN) + posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f) except: posix.rename(support.TESTFN + 'ren', support.TESTFN) raise @@ -799,23 +808,23 @@ class PosixTester(unittest.TestCase): finally: posix.close(f) - @unittest.skipUnless(hasattr(posix, 'symlinkat'), "test needs posix.symlinkat()") - def test_symlinkat(self): + @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") + def test_symlink_dir_fd(self): f = posix.open(posix.getcwd(), posix.O_RDONLY) try: - posix.symlinkat(support.TESTFN, f, support.TESTFN + 'link') + posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f) self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN) finally: posix.close(f) support.unlink(support.TESTFN + 'link') - @unittest.skipUnless(hasattr(posix, 'unlinkat'), "test needs posix.unlinkat()") - def test_unlinkat(self): + @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") + def test_unlink_dir_fd(self): f = posix.open(posix.getcwd(), posix.O_RDONLY) support.create_empty_file(support.TESTFN + 'del') posix.stat(support.TESTFN + 'del') # should not throw exception try: - posix.unlinkat(f, support.TESTFN + 'del') + posix.unlink(support.TESTFN + 'del', dir_fd=f) except: support.unlink(support.TESTFN + 'del') raise @@ -824,31 +833,12 @@ class PosixTester(unittest.TestCase): finally: posix.close(f) - @unittest.skipUnless(hasattr(posix, 'utimensat'), "test needs posix.utimensat()") - def test_utimensat(self): - f = posix.open(posix.getcwd(), posix.O_RDONLY) - try: - now = time.time() - posix.utimensat(f, support.TESTFN, None, None) - posix.utimensat(f, support.TESTFN) - posix.utimensat(f, support.TESTFN, flags=os.AT_SYMLINK_NOFOLLOW) - self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (None, None), (None, None)) - self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (now, 0), None) - self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, None, (now, 0)) - posix.utimensat(f, support.TESTFN, (int(now), int((now - int(now)) * 1e9)), - (int(now), int((now - int(now)) * 1e9))) - posix.utimensat(dirfd=f, path=support.TESTFN, - atime=(int(now), int((now - int(now)) * 1e9)), - mtime=(int(now), int((now - int(now)) * 1e9))) - finally: - posix.close(f) - - @unittest.skipUnless(hasattr(posix, 'mkfifoat'), "don't have mkfifoat()") - def test_mkfifoat(self): + @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") + def test_mkfifo_dir_fd(self): support.unlink(support.TESTFN) f = posix.open(posix.getcwd(), posix.O_RDONLY) try: - posix.mkfifoat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR) + posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) finally: posix.close(f) diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index 559f05b..ad835ae 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -268,7 +268,7 @@ class TestShutil(unittest.TestCase): # don't follow shutil.copystat(src_link, dst_link, symlinks=True) dst_link_stat = os.lstat(dst_link) - if hasattr(os, 'lutimes'): + if os.utime in os.supports_follow_symlinks: for attr in 'st_atime', 'st_mtime': # The modification times may be truncated in the new file. self.assertLessEqual(getattr(src_link_stat, attr), @@ -334,11 +334,11 @@ class TestShutil(unittest.TestCase): write_file(dst, 'bar') os_error = OSError(errno.EPERM, 'EPERM') - def _raise_on_user_foo(fname, attr, val): + def _raise_on_user_foo(fname, attr, val, **kwargs): if attr == 'user.foo': raise os_error else: - orig_setxattr(fname, attr, val) + orig_setxattr(fname, attr, val, **kwargs) try: orig_setxattr = os.setxattr os.setxattr = _raise_on_user_foo @@ -361,13 +361,13 @@ class TestShutil(unittest.TestCase): write_file(src, 'foo') os.symlink(src, src_link) os.setxattr(src, 'trusted.foo', b'42') - os.lsetxattr(src_link, 'trusted.foo', b'43') + os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False) dst = os.path.join(tmp_dir, 'bar') dst_link = os.path.join(tmp_dir, 'qux') write_file(dst, 'bar') os.symlink(dst, dst_link) shutil._copyxattr(src_link, dst_link, symlinks=True) - self.assertEqual(os.lgetxattr(dst_link, 'trusted.foo'), b'43') + self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43') self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo') shutil._copyxattr(src_link, dst, symlinks=True) self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43') @@ -419,7 +419,7 @@ class TestShutil(unittest.TestCase): self.assertTrue(os.path.islink(dst)) self.assertEqual(os.readlink(dst), os.readlink(src_link)) dst_stat = os.lstat(dst) - if hasattr(os, 'lutimes'): + if os.utime in os.supports_follow_symlinks: for attr in 'st_atime', 'st_mtime': # The modification times may be truncated in the new file. self.assertLessEqual(getattr(src_link_stat, attr), |