diff options
Diffstat (limited to 'Lib/test/test_posix.py')
-rw-r--r-- | Lib/test/test_posix.py | 608 |
1 files changed, 579 insertions, 29 deletions
diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py index b936dda..4ad7350 100644 --- a/Lib/test/test_posix.py +++ b/Lib/test/test_posix.py @@ -9,6 +9,7 @@ import errno import sys import time import os +import fcntl import platform import pwd import shutil @@ -43,7 +44,7 @@ class PosixTester(unittest.TestCase): NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", "times", "getloadavg", "getegid", "geteuid", "getgid", "getgroups", - "getpid", "getpgrp", "getppid", "getuid", + "getpid", "getpgrp", "getppid", "getuid", "sync", ] for name in NO_ARG_FUNCTIONS: @@ -128,6 +129,7 @@ class PosixTester(unittest.TestCase): fp = open(support.TESTFN) try: self.assertTrue(posix.fstatvfs(fp.fileno())) + self.assertTrue(posix.statvfs(fp.fileno())) finally: fp.close() @@ -142,6 +144,151 @@ class PosixTester(unittest.TestCase): finally: fp.close() + @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()") + def test_truncate(self): + with open(support.TESTFN, 'w') as fp: + fp.write('test') + fp.flush() + posix.truncate(support.TESTFN, 0) + + @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter") + @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") + @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") + def test_fexecve(self): + fp = os.open(sys.executable, os.O_RDONLY) + try: + pid = os.fork() + if pid == 0: + os.chdir(os.path.split(sys.executable)[0]) + posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ) + else: + self.assertEqual(os.waitpid(pid, 0), (pid, 0)) + finally: + os.close(fp) + + @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") + @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") + def test_waitid(self): + pid = os.fork() + if pid == 0: + os.chdir(os.path.split(sys.executable)[0]) + posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ) + else: + res = posix.waitid(posix.P_PID, pid, posix.WEXITED) + self.assertEqual(pid, res.si_pid) + + @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()") + def test_lockf(self): + fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) + try: + os.write(fd, b'test') + os.lseek(fd, 0, os.SEEK_SET) + posix.lockf(fd, posix.F_LOCK, 4) + # section is locked + posix.lockf(fd, posix.F_ULOCK, 4) + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()") + def test_pread(self): + fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) + try: + os.write(fd, b'test') + os.lseek(fd, 0, os.SEEK_SET) + self.assertEqual(b'es', posix.pread(fd, 2, 1)) + # the first pread() shouldn't disturb the file offset + self.assertEqual(b'te', posix.read(fd, 2)) + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()") + def test_pwrite(self): + fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) + try: + os.write(fd, b'test') + os.lseek(fd, 0, os.SEEK_SET) + posix.pwrite(fd, b'xx', 1) + self.assertEqual(b'txxt', posix.read(fd, 4)) + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), + "test needs posix.posix_fallocate()") + def test_posix_fallocate(self): + fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) + try: + posix.posix_fallocate(fd, 0, 10) + except OSError as inst: + # issue10812, ZFS doesn't appear to support posix_fallocate, + # so skip Solaris-based since they are likely to have ZFS. + if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"): + raise + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), + "test needs posix.posix_fadvise()") + def test_posix_fadvise(self): + fd = os.open(support.TESTFN, os.O_RDONLY) + try: + posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) + finally: + os.close(fd) + + @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime") + def test_utime_with_fd(self): + now = time.time() + fd = os.open(support.TESTFN, os.O_RDONLY) + try: + posix.utime(fd) + posix.utime(fd, None) + self.assertRaises(TypeError, posix.utime, fd, (None, None)) + self.assertRaises(TypeError, posix.utime, fd, (now, None)) + self.assertRaises(TypeError, posix.utime, fd, (None, now)) + posix.utime(fd, (int(now), int(now))) + posix.utime(fd, (now, now)) + self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now)) + self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None)) + self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0)) + posix.utime(fd, (int(now), int((now - int(now)) * 1e9))) + posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9))) + + finally: + os.close(fd) + + @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime") + def test_utime_nofollow_symlinks(self): + now = time.time() + posix.utime(support.TESTFN, None, follow_symlinks=False) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), follow_symlinks=False) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), follow_symlinks=False) + posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False) + posix.utime(support.TESTFN, (now, now), follow_symlinks=False) + posix.utime(support.TESTFN, follow_symlinks=False) + + @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") + def test_writev(self): + fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) + try: + os.writev(fd, (b'test1', b'tt2', b't3')) + os.lseek(fd, 0, os.SEEK_SET) + self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) + finally: + os.close(fd) + + @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") + def test_readv(self): + fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) + try: + os.write(fd, b'test1tt2t3') + os.lseek(fd, 0, os.SEEK_SET) + buf = [bytearray(i) for i in [5, 3, 2]] + self.assertEqual(posix.readv(fd, buf), 10) + self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf]) + finally: + os.close(fd) + def test_dup(self): if hasattr(posix, 'dup'): fp = open(support.TESTFN) @@ -167,6 +314,13 @@ class PosixTester(unittest.TestCase): fp1.close() fp2.close() + @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC") + @support.requires_linux_version(2, 6, 23) + def test_oscloexec(self): + fd = os.open(support.TESTFN, os.O_RDONLY|os.O_CLOEXEC) + self.addCleanup(os.close, fd) + self.assertTrue(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC) + def test_osexlock(self): if hasattr(posix, "O_EXLOCK"): fd = os.open(support.TESTFN, @@ -203,6 +357,7 @@ class PosixTester(unittest.TestCase): fp = open(support.TESTFN) try: self.assertTrue(posix.fstat(fp.fileno())) + self.assertTrue(posix.stat(fp.fileno())) finally: fp.close() @@ -265,7 +420,7 @@ class PosixTester(unittest.TestCase): self.assertRaises(OSError, posix.chown, support.TESTFN, -1, -1) # re-create the file - open(support.TESTFN, 'w').close() + support.create_empty_file(support.TESTFN) self._test_all_chown_common(posix.chown, support.TESTFN) @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()") @@ -293,13 +448,32 @@ class PosixTester(unittest.TestCase): self.assertRaises(OSError, posix.chdir, support.TESTFN) def test_listdir(self): - if hasattr(posix, 'listdir'): - self.assertTrue(support.TESTFN in posix.listdir(os.curdir)) + self.assertTrue(support.TESTFN in posix.listdir(os.curdir)) def test_listdir_default(self): - # When listdir is called without argument, it's the same as listdir(os.curdir) - if hasattr(posix, 'listdir'): - self.assertTrue(support.TESTFN in posix.listdir()) + # When listdir is called without argument, + # it's the same as listdir(os.curdir). + self.assertTrue(support.TESTFN in posix.listdir()) + + def test_listdir_bytes(self): + # When listdir is called with a bytes object, + # the returned strings are of type bytes. + self.assertTrue(os.fsencode(support.TESTFN) in posix.listdir(b'.')) + + @unittest.skipUnless(posix.listdir in os.supports_fd, + "test needs fd support for posix.listdir()") + def test_listdir_fd(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + self.addCleanup(posix.close, f) + self.assertEqual( + sorted(posix.listdir('.')), + sorted(posix.listdir(f)) + ) + # Check that the fd offset was reset (issue #13739) + self.assertEqual( + sorted(posix.listdir('.')), + sorted(posix.listdir(f)) + ) def test_access(self): if hasattr(posix, 'access'): @@ -321,6 +495,32 @@ class PosixTester(unittest.TestCase): os.close(reader) os.close(writer) + @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") + @support.requires_linux_version(2, 6, 27) + def test_pipe2(self): + self.assertRaises(TypeError, os.pipe2, 'DEADBEEF') + self.assertRaises(TypeError, os.pipe2, 0, 0) + + # try calling with flags = 0, like os.pipe() + r, w = os.pipe2(0) + os.close(r) + os.close(w) + + # test flags + r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK) + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + self.assertTrue(fcntl.fcntl(r, fcntl.F_GETFD) & fcntl.FD_CLOEXEC) + self.assertTrue(fcntl.fcntl(w, fcntl.F_GETFD) & fcntl.FD_CLOEXEC) + # try reading from an empty pipe: this should fail, not block + self.assertRaises(OSError, os.read, r, 1) + # try a write big enough to fill-up the pipe: this should either + # fail or perform a partial write, not block + try: + os.write(w, b'x' * support.PIPE_MAX_SIZE) + except OSError: + pass + def test_utime(self): if hasattr(posix, 'utime'): now = time.time() @@ -331,13 +531,14 @@ class PosixTester(unittest.TestCase): posix.utime(support.TESTFN, (int(now), int(now))) posix.utime(support.TESTFN, (now, now)) - def _test_chflags_regular_file(self, chflags_func, target_file): + def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): st = os.stat(target_file) self.assertTrue(hasattr(st, 'st_flags')) # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. + flags = st.st_flags | stat.UF_IMMUTABLE try: - chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE) + chflags_func(target_file, flags, **kwargs) except OSError as err: if err.errno != errno.EOPNOTSUPP: raise @@ -361,6 +562,7 @@ class PosixTester(unittest.TestCase): @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') def test_lchflags_regular_file(self): self._test_chflags_regular_file(posix.lchflags, support.TESTFN) + self._test_chflags_regular_file(posix.chflags, support.TESTFN, follow_symlinks=False) @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') def test_lchflags_symlink(self): @@ -372,25 +574,28 @@ class PosixTester(unittest.TestCase): self.teardown_files.append(_DUMMY_SYMLINK) dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) - # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. - try: - posix.lchflags(_DUMMY_SYMLINK, - dummy_symlink_st.st_flags | stat.UF_IMMUTABLE) - except OSError as err: - if err.errno != errno.EOPNOTSUPP: - raise - msg = 'chflag UF_IMMUTABLE not supported by underlying fs' - self.skipTest(msg) + def chflags_nofollow(path, flags): + return posix.chflags(path, flags, follow_symlinks=False) - try: - new_testfn_st = os.stat(support.TESTFN) - new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) + for fn in (posix.lchflags, chflags_nofollow): + # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. + flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE + try: + fn(_DUMMY_SYMLINK, flags) + except OSError as err: + if err.errno != errno.EOPNOTSUPP: + raise + msg = 'chflag UF_IMMUTABLE not supported by underlying fs' + self.skipTest(msg) + try: + new_testfn_st = os.stat(support.TESTFN) + new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) - self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) - self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, - new_dummy_symlink_st.st_flags) - finally: - posix.lchflags(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) + self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) + self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, + new_dummy_symlink_st.st_flags) + finally: + fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) def test_environ(self): if os.name == "nt": @@ -438,13 +643,22 @@ class PosixTester(unittest.TestCase): os.chdir(curdir) support.rmtree(base_path) + @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()") + @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()") + @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()") + def test_getgrouplist(self): + user = pwd.getpwuid(os.getuid())[0] + group = pwd.getpwuid(os.getuid())[3] + self.assertIn(group, posix.getgrouplist(user, group)) + + @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()") def test_getgroups(self): with os.popen('id -G') as idg: groups = idg.read().strip() ret = idg.close() - if ret != None or not groups: + if ret is not None or not groups: raise unittest.SkipTest("need working 'id -G'") # 'id -G' and 'os.getgroups()' should return the same @@ -455,6 +669,340 @@ class PosixTester(unittest.TestCase): set([int(x) for x in groups.split()]), set(posix.getgroups() + [posix.getegid()])) + # tests for the posix *at functions follow + + @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") + def test_access_dir_fd(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f)) + finally: + posix.close(f) + + @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") + def test_chmod_dir_fd(self): + os.chmod(support.TESTFN, stat.S_IRUSR) + + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) + + s = posix.stat(support.TESTFN) + self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) + finally: + posix.close(f) + + @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()") + def test_chown_dir_fd(self): + support.unlink(support.TESTFN) + support.create_empty_file(support.TESTFN) + + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f) + finally: + posix.close(f) + + @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") + def test_stat_dir_fd(self): + support.unlink(support.TESTFN) + with open(support.TESTFN, 'w') as outfile: + outfile.write("testline\n") + + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + s1 = posix.stat(support.TESTFN) + s2 = posix.stat(support.TESTFN, dir_fd=f) + self.assertEqual(s1, s2) + finally: + posix.close(f) + + @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") + def test_utime_dir_fd(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + now = time.time() + posix.utime(support.TESTFN, None, dir_fd=f) + posix.utime(support.TESTFN, dir_fd=f) + self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f) + self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f) + posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f) + posix.utime(support.TESTFN, (now, now), dir_fd=f) + posix.utime(support.TESTFN, + (int(now), int((now - int(now)) * 1e9)), dir_fd=f) + posix.utime(support.TESTFN, dir_fd=f, + times=(int(now), int((now - int(now)) * 1e9))) + + # try dir_fd and follow_symlinks together + if os.utime in os.supports_follow_symlinks: + try: + posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f) + except ValueError: + # whoops! using both together not supported on this platform. + pass + + finally: + posix.close(f) + + @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()") + def test_link_dir_fd(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f) + # should have same inodes + self.assertEqual(posix.stat(support.TESTFN)[1], + posix.stat(support.TESTFN + 'link')[1]) + finally: + posix.close(f) + support.unlink(support.TESTFN + 'link') + + @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") + def test_mkdir_dir_fd(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.mkdir(support.TESTFN + 'dir', dir_fd=f) + posix.stat(support.TESTFN + 'dir') # should not raise exception + finally: + posix.close(f) + support.rmtree(support.TESTFN + 'dir') + + @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'), + "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") + def test_mknod_dir_fd(self): + # Test using mknodat() to create a FIFO (the only use specified + # by POSIX). + support.unlink(support.TESTFN) + mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.mknod(support.TESTFN, mode, 0, dir_fd=f) + except OSError as e: + # Some old systems don't allow unprivileged users to use + # mknod(), or only support creating device nodes. + self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) + else: + self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) + finally: + posix.close(f) + + @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") + def test_open_dir_fd(self): + support.unlink(support.TESTFN) + with open(support.TESTFN, 'w') as outfile: + outfile.write("testline\n") + a = posix.open(posix.getcwd(), posix.O_RDONLY) + b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a) + try: + res = posix.read(b, 9).decode(encoding="utf-8") + self.assertEqual("testline\n", res) + finally: + posix.close(a) + posix.close(b) + + @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()") + def test_readlink_dir_fd(self): + os.symlink(support.TESTFN, support.TESTFN + 'link') + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + self.assertEqual(posix.readlink(support.TESTFN + 'link'), + posix.readlink(support.TESTFN + 'link', dir_fd=f)) + finally: + support.unlink(support.TESTFN + 'link') + posix.close(f) + + @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") + def test_rename_dir_fd(self): + support.unlink(support.TESTFN) + support.create_empty_file(support.TESTFN + 'ren') + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f) + except: + posix.rename(support.TESTFN + 'ren', support.TESTFN) + raise + else: + posix.stat(support.TESTFN) # should not throw exception + finally: + posix.close(f) + + @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") + def test_symlink_dir_fd(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f) + self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN) + finally: + posix.close(f) + support.unlink(support.TESTFN + 'link') + + @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") + def test_unlink_dir_fd(self): + f = posix.open(posix.getcwd(), posix.O_RDONLY) + support.create_empty_file(support.TESTFN + 'del') + posix.stat(support.TESTFN + 'del') # should not throw exception + try: + posix.unlink(support.TESTFN + 'del', dir_fd=f) + except: + support.unlink(support.TESTFN + 'del') + raise + else: + self.assertRaises(OSError, posix.stat, support.TESTFN + 'link') + finally: + posix.close(f) + + @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") + def test_mkfifo_dir_fd(self): + support.unlink(support.TESTFN) + f = posix.open(posix.getcwd(), posix.O_RDONLY) + try: + posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) + self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) + finally: + posix.close(f) + + requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'), + "don't have scheduling support") + requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'), + "don't have sched affinity support") + + @requires_sched_h + def test_sched_yield(self): + # This has no error conditions (at least on Linux). + posix.sched_yield() + + @requires_sched_h + @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'), + "requires sched_get_priority_max()") + def test_sched_priority(self): + # Round-robin usually has interesting priorities. + pol = posix.SCHED_RR + lo = posix.sched_get_priority_min(pol) + hi = posix.sched_get_priority_max(pol) + self.assertIsInstance(lo, int) + self.assertIsInstance(hi, int) + self.assertGreaterEqual(hi, lo) + # OSX evidently just returns 15 without checking the argument. + if sys.platform != "darwin": + self.assertRaises(OSError, posix.sched_get_priority_min, -23) + self.assertRaises(OSError, posix.sched_get_priority_max, -23) + + @unittest.skipUnless(hasattr(posix, 'sched_setscheduler'), "can't change scheduler") + def test_get_and_set_scheduler_and_param(self): + possible_schedulers = [sched for name, sched in posix.__dict__.items() + if name.startswith("SCHED_")] + mine = posix.sched_getscheduler(0) + self.assertIn(mine, possible_schedulers) + try: + parent = posix.sched_getscheduler(os.getppid()) + except OSError as e: + if e.errno != errno.EPERM: + raise + else: + self.assertIn(parent, possible_schedulers) + self.assertRaises(OSError, posix.sched_getscheduler, -1) + self.assertRaises(OSError, posix.sched_getparam, -1) + param = posix.sched_getparam(0) + self.assertIsInstance(param.sched_priority, int) + try: + posix.sched_setscheduler(0, mine, param) + except OSError as e: + if e.errno != errno.EPERM: + raise + + # POSIX states that calling sched_setparam() on a process with a + # scheduling policy other than SCHED_FIFO or SCHED_RR is + # implementation-defined: FreeBSD returns EINVAL. + if not sys.platform.startswith('freebsd'): + posix.sched_setparam(0, param) + self.assertRaises(OSError, posix.sched_setparam, -1, param) + + self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param) + self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None) + self.assertRaises(TypeError, posix.sched_setparam, 0, 43) + param = posix.sched_param(None) + self.assertRaises(TypeError, posix.sched_setparam, 0, param) + large = 214748364700 + param = posix.sched_param(large) + self.assertRaises(OverflowError, posix.sched_setparam, 0, param) + param = posix.sched_param(sched_priority=-large) + self.assertRaises(OverflowError, posix.sched_setparam, 0, param) + + @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function") + def test_sched_rr_get_interval(self): + try: + interval = posix.sched_rr_get_interval(0) + except OSError as e: + # This likely means that sched_rr_get_interval is only valid for + # processes with the SCHED_RR scheduler in effect. + if e.errno != errno.EINVAL: + raise + self.skipTest("only works on SCHED_RR processes") + self.assertIsInstance(interval, float) + # Reasonable constraints, I think. + self.assertGreaterEqual(interval, 0.) + self.assertLess(interval, 1.) + + @requires_sched_affinity + def test_sched_getaffinity(self): + mask = posix.sched_getaffinity(0) + self.assertIsInstance(mask, set) + self.assertGreaterEqual(len(mask), 1) + self.assertRaises(OSError, posix.sched_getaffinity, -1) + for cpu in mask: + self.assertIsInstance(cpu, int) + self.assertGreaterEqual(cpu, 0) + self.assertLess(cpu, 1 << 32) + + @requires_sched_affinity + def test_sched_setaffinity(self): + mask = posix.sched_getaffinity(0) + if len(mask) > 1: + # Empty masks are forbidden + mask.pop() + posix.sched_setaffinity(0, mask) + self.assertEqual(posix.sched_getaffinity(0), mask) + self.assertRaises(OSError, posix.sched_setaffinity, 0, []) + self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10]) + self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128]) + self.assertRaises(OSError, posix.sched_setaffinity, -1, mask) + + def test_rtld_constants(self): + # check presence of major RTLD_* constants + posix.RTLD_LAZY + posix.RTLD_NOW + posix.RTLD_GLOBAL + posix.RTLD_LOCAL + + @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'), + "test needs an OS that reports file holes") + def test_fs_holes(self): + # Even if the filesystem doesn't report holes, + # if the OS supports it the SEEK_* constants + # will be defined and will have a consistent + # behaviour: + # os.SEEK_DATA = current position + # os.SEEK_HOLE = end of file position + with open(support.TESTFN, 'r+b') as fp: + fp.write(b"hello") + fp.flush() + size = fp.tell() + fno = fp.fileno() + try : + for i in range(size): + self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA)) + self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE)) + self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA) + self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE) + except OSError : + # Some OSs claim to support SEEK_HOLE/SEEK_DATA + # but it is not true. + # For instance: + # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html + raise unittest.SkipTest("OSError raised!") + class PosixGroupsTester(unittest.TestCase): def setUp(self): @@ -490,9 +1038,11 @@ class PosixGroupsTester(unittest.TestCase): posix.setgroups(groups) self.assertListEqual(groups, posix.getgroups()) - def test_main(): - support.run_unittest(PosixTester, PosixGroupsTester) + try: + support.run_unittest(PosixTester, PosixGroupsTester) + finally: + support.reap_children() if __name__ == '__main__': test_main() |