diff options
author | Larry Hastings <larry@hastings.org> | 2012-06-22 23:30:09 (GMT) |
---|---|---|
committer | Larry Hastings <larry@hastings.org> | 2012-06-22 23:30:09 (GMT) |
commit | 9cf065cfdc4245ea7e31edcb2e6ede0cea47d148 (patch) | |
tree | 22d8450865a023586034d555ae72e3753f95e84c /Lib/test/test_os.py | |
parent | f0f4742b495554238d1204ce0002c1ef1ba23507 (diff) | |
download | cpython-9cf065cfdc4245ea7e31edcb2e6ede0cea47d148.zip cpython-9cf065cfdc4245ea7e31edcb2e6ede0cea47d148.tar.gz cpython-9cf065cfdc4245ea7e31edcb2e6ede0cea47d148.tar.bz2 |
Issue #14626: Large refactoring of functions / parameters in the os module.
Many functions now support "dir_fd" and "follow_symlinks" parameters;
some also support accepting an open file descriptor in place of of a path
string. Added os.support_* collections as LBYL helpers. Removed many
functions only previously seen in 3.3 alpha releases (often starting with
"f" or "l", or ending with "at"). Originally suggested by Serhiy Storchaka;
implemented by Larry Hastings.
Diffstat (limited to 'Lib/test/test_os.py')
-rw-r--r-- | Lib/test/test_os.py | 168 |
1 files changed, 85 insertions, 83 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index d47c8d3..cc1120f 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -345,40 +345,36 @@ class StatAttributeTests(unittest.TestCase): return os.utime(file, ns=times) self._test_utime_ns(utime_ns) - requires_lutimes = unittest.skipUnless(hasattr(os, 'lutimes'), - "os.lutimes required for this test.") - requires_futimes = unittest.skipUnless(hasattr(os, 'futimes'), - "os.futimes required for this test.") - - @requires_lutimes + requires_utime_dir_fd = unittest.skipUnless( + os.utime in os.supports_dir_fd, + "dir_fd support for utime required for this test.") + requires_utime_fd = unittest.skipUnless( + os.utime in os.supports_fd, + "fd support for utime required for this test.") + requires_utime_nofollow_symlinks = unittest.skipUnless( + os.utime in os.supports_follow_symlinks, + "follow_symlinks support for utime required for this test.") + + @requires_utime_nofollow_symlinks def test_lutimes_ns(self): def lutimes_ns(file, times): - return os.lutimes(file, ns=times) + return os.utime(file, ns=times, follow_symlinks=False) self._test_utime_ns(lutimes_ns) - @requires_futimes + @requires_utime_fd def test_futimes_ns(self): def futimes_ns(file, times): with open(file, "wb") as f: - os.futimes(f.fileno(), ns=times) + os.utime(f.fileno(), ns=times) self._test_utime_ns(futimes_ns, test_dir=False) def _utime_invalid_arguments(self, name, arg): - with self.assertRaises(RuntimeError): + with self.assertRaises(ValueError): getattr(os, name)(arg, (5, 5), ns=(5, 5)) def test_utime_invalid_arguments(self): self._utime_invalid_arguments('utime', self.fname) - @requires_lutimes - def test_lutimes_invalid_arguments(self): - self._utime_invalid_arguments('lutimes', self.fname) - - @requires_futimes - def test_futimes_invalid_arguments(self): - with open(self.fname, "wb") as f: - self._utime_invalid_arguments('futimes', f.fileno()) - @unittest.skipUnless(stat_supports_subsecond, "os.stat() doesn't has a subsecond resolution") @@ -402,64 +398,46 @@ class StatAttributeTests(unittest.TestCase): os.utime(filename, (atime, mtime)) self._test_utime_subsecond(set_time) - @requires_futimes + @requires_utime_fd def test_futimes_subsecond(self): def set_time(filename, atime, mtime): with open(filename, "wb") as f: - os.futimes(f.fileno(), (atime, mtime)) + os.utime(f.fileno(), times=(atime, mtime)) self._test_utime_subsecond(set_time) - @unittest.skipUnless(hasattr(os, 'futimens'), - "os.futimens required for this test.") + @requires_utime_fd def test_futimens_subsecond(self): def set_time(filename, atime, mtime): with open(filename, "wb") as f: - asec, ansec = divmod(atime, 1.0) - asec = int(asec) - ansec = int(ansec * 1e9) - msec, mnsec = divmod(mtime, 1.0) - msec = int(msec) - mnsec = int(mnsec * 1e9) - os.futimens(f.fileno(), - (asec, ansec), - (msec, mnsec)) + os.utime(f.fileno(), times=(atime, mtime)) self._test_utime_subsecond(set_time) - @unittest.skipUnless(hasattr(os, 'futimesat'), - "os.futimesat required for this test.") + @requires_utime_dir_fd def test_futimesat_subsecond(self): def set_time(filename, atime, mtime): dirname = os.path.dirname(filename) dirfd = os.open(dirname, os.O_RDONLY) try: - os.futimesat(dirfd, os.path.basename(filename), - (atime, mtime)) + os.utime(os.path.basename(filename), dir_fd=dirfd, + times=(atime, mtime)) finally: os.close(dirfd) self._test_utime_subsecond(set_time) - @requires_lutimes + @requires_utime_nofollow_symlinks def test_lutimes_subsecond(self): def set_time(filename, atime, mtime): - os.lutimes(filename, (atime, mtime)) + os.utime(filename, (atime, mtime), follow_symlinks=False) self._test_utime_subsecond(set_time) - @unittest.skipUnless(hasattr(os, 'utimensat'), - "os.utimensat required for this test.") + @requires_utime_dir_fd def test_utimensat_subsecond(self): def set_time(filename, atime, mtime): dirname = os.path.dirname(filename) dirfd = os.open(dirname, os.O_RDONLY) try: - asec, ansec = divmod(atime, 1.0) - asec = int(asec) - ansec = int(ansec * 1e9) - msec, mnsec = divmod(mtime, 1.0) - msec = int(msec) - mnsec = int(mnsec * 1e9) - os.utimensat(dirfd, os.path.basename(filename), - (asec, ansec), - (msec, mnsec)) + os.utime(os.path.basename(filename), dir_fd=dirfd, + times=(atime, mtime)) finally: os.close(dirfd) self._test_utime_subsecond(set_time) @@ -782,8 +760,10 @@ class FwalkTests(WalkTests): for root, dirs, files, rootfd in os.fwalk(*args): # check that the FD is valid os.fstat(rootfd) - # check that flistdir() returns consistent information - self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files)) + # redundant check + os.stat(rootfd) + # check that listdir() returns consistent information + self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files)) def test_fd_leak(self): # Since we're opening a lot of FDs, we must be careful to avoid leaks: @@ -802,13 +782,10 @@ class FwalkTests(WalkTests): # cleanup for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False): for name in files: - os.unlinkat(rootfd, name) + os.unlink(name, dir_fd=rootfd) for name in dirs: - st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW) - if stat.S_ISDIR(st.st_mode): - os.unlinkat(rootfd, name, os.AT_REMOVEDIR) - else: - os.unlinkat(rootfd, name) + st = os.stat(name, dir_fd=rootfd, follow_symlinks=False) + os.unlink(name, dir_fd=rootfd, rmdir=stat.S_ISDIR(st.st_mode)) os.rmdir(support.TESTFN) @@ -1262,6 +1239,13 @@ if sys.platform != 'win32': expected = self.unicodefn found = set(os.listdir(self.dir)) self.assertEqual(found, expected) + # test listdir without arguments + current_directory = os.getcwd() + try: + os.chdir(os.sep) + self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) + finally: + os.chdir(current_directory) def test_open(self): for fn in self.unicodefn: @@ -1846,79 +1830,97 @@ class TestSendfile(unittest.TestCase): raise -@support.skip_unless_xattr +def supports_extended_attributes(): + if not hasattr(os, "setxattr"): + return False + try: + with open(support.TESTFN, "wb") as fp: + try: + os.setxattr(fp.fileno(), b"user.test", b"") + except OSError: + return False + finally: + support.unlink(support.TESTFN) + # Kernels < 2.6.39 don't respect setxattr flags. + kernel_version = platform.release() + m = re.match("2.6.(\d{1,2})", kernel_version) + return m is None or int(m.group(1)) >= 39 + + +@unittest.skipUnless(supports_extended_attributes(), + "no non-broken extended attribute support") class ExtendedAttributeTests(unittest.TestCase): def tearDown(self): support.unlink(support.TESTFN) - def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr): + def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): fn = support.TESTFN open(fn, "wb").close() with self.assertRaises(OSError) as cm: - getxattr(fn, s("user.test")) + getxattr(fn, s("user.test"), **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) init_xattr = listxattr(fn) self.assertIsInstance(init_xattr, list) - setxattr(fn, s("user.test"), b"") + setxattr(fn, s("user.test"), b"", **kwargs) xattr = set(init_xattr) xattr.add("user.test") self.assertEqual(set(listxattr(fn)), xattr) - self.assertEqual(getxattr(fn, b"user.test"), b"") - setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE) - self.assertEqual(getxattr(fn, b"user.test"), b"hello") + self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") + setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) + self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") with self.assertRaises(OSError) as cm: - setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE) + setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) self.assertEqual(cm.exception.errno, errno.EEXIST) with self.assertRaises(OSError) as cm: - setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE) + setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) - setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE) + setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) xattr.add("user.test2") self.assertEqual(set(listxattr(fn)), xattr) - removexattr(fn, s("user.test")) + removexattr(fn, s("user.test"), **kwargs) with self.assertRaises(OSError) as cm: - getxattr(fn, s("user.test")) + getxattr(fn, s("user.test"), **kwargs) self.assertEqual(cm.exception.errno, errno.ENODATA) xattr.remove("user.test") self.assertEqual(set(listxattr(fn)), xattr) - self.assertEqual(getxattr(fn, s("user.test2")), b"foo") - setxattr(fn, s("user.test"), b"a"*1024) - self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024) - removexattr(fn, s("user.test")) + self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") + setxattr(fn, s("user.test"), b"a"*1024, **kwargs) + self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024) + removexattr(fn, s("user.test"), **kwargs) many = sorted("user.test{}".format(i) for i in range(100)) for thing in many: - setxattr(fn, thing, b"x") + setxattr(fn, thing, b"x", **kwargs) self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) - def _check_xattrs(self, *args): + def _check_xattrs(self, *args, **kwargs): def make_bytes(s): return bytes(s, "ascii") - self._check_xattrs_str(str, *args) + self._check_xattrs_str(str, *args, **kwargs) support.unlink(support.TESTFN) - self._check_xattrs_str(make_bytes, *args) + self._check_xattrs_str(make_bytes, *args, **kwargs) def test_simple(self): self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, os.listxattr) def test_lpath(self): - self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr, - os.llistxattr) + self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, + os.listxattr, follow_symlinks=False) def test_fds(self): def getxattr(path, *args): with open(path, "rb") as fp: - return os.fgetxattr(fp.fileno(), *args) + return os.getxattr(fp.fileno(), *args) def setxattr(path, *args): with open(path, "wb") as fp: - os.fsetxattr(fp.fileno(), *args) + os.setxattr(fp.fileno(), *args) def removexattr(path, *args): with open(path, "wb") as fp: - os.fremovexattr(fp.fileno(), *args) + os.removexattr(fp.fileno(), *args) def listxattr(path, *args): with open(path, "rb") as fp: - return os.flistxattr(fp.fileno(), *args) + return os.listxattr(fp.fileno(), *args) self._check_xattrs(getxattr, setxattr, removexattr, listxattr) |