diff options
Diffstat (limited to 'Lib/test/test_posix.py')
| -rw-r--r-- | Lib/test/test_posix.py | 361 | 
1 files changed, 359 insertions, 2 deletions
| diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py index 45b3afc..0e9ac75 100644 --- a/Lib/test/test_posix.py +++ b/Lib/test/test_posix.py @@ -37,7 +37,7 @@ class PosixTester(unittest.TestCase):          NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",                               "times", "getloadavg",                               "getegid", "geteuid", "getgid", "getgroups", -                             "getpid", "getpgrp", "getppid", "getuid", +                             "getpid", "getpgrp", "getppid", "getuid", "sync",                             ]          for name in NO_ARG_FUNCTIONS: @@ -132,6 +132,156 @@ class PosixTester(unittest.TestCase):              finally:                  fp.close() +    @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()") +    def test_truncate(self): +        with open(support.TESTFN, 'w') as fp: +            fp.write('test') +            fp.flush() +        posix.truncate(support.TESTFN, 0) + +    @unittest.skipUnless(hasattr(posix, 'fexecve'), "test needs posix.fexecve()") +    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") +    @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") +    def test_fexecve(self): +        fp = os.open(sys.executable, os.O_RDONLY) +        try: +            pid = os.fork() +            if pid == 0: +                os.chdir(os.path.split(sys.executable)[0]) +                posix.fexecve(fp, [sys.executable, '-c', 'pass'], os.environ) +            else: +                self.assertEqual(os.waitpid(pid, 0), (pid, 0)) +        finally: +            os.close(fp) + +    @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") +    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") +    def test_waitid(self): +        pid = os.fork() +        if pid == 0: +            os.chdir(os.path.split(sys.executable)[0]) +            posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ) +        else: +            res = posix.waitid(posix.P_PID, pid, posix.WEXITED) +            self.assertEqual(pid, res.si_pid) + +    @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()") +    def test_lockf(self): +        fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) +        try: +            os.write(fd, b'test') +            os.lseek(fd, 0, os.SEEK_SET) +            posix.lockf(fd, posix.F_LOCK, 4) +            # section is locked +            posix.lockf(fd, posix.F_ULOCK, 4) +        finally: +            os.close(fd) + +    @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()") +    def test_pread(self): +        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) +        try: +            os.write(fd, b'test') +            os.lseek(fd, 0, os.SEEK_SET) +            self.assertEqual(b'es', posix.pread(fd, 2, 1)) +            # the first pread() shoudn't disturb the file offset +            self.assertEqual(b'te', posix.read(fd, 2)) +        finally: +            os.close(fd) + +    @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()") +    def test_pwrite(self): +        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) +        try: +            os.write(fd, b'test') +            os.lseek(fd, 0, os.SEEK_SET) +            posix.pwrite(fd, b'xx', 1) +            self.assertEqual(b'txxt', posix.read(fd, 4)) +        finally: +            os.close(fd) + +    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), +        "test needs posix.posix_fallocate()") +    def test_posix_fallocate(self): +        fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) +        try: +            posix.posix_fallocate(fd, 0, 10) +        except OSError as inst: +            # issue10812, ZFS doesn't appear to support posix_fallocate, +            # so skip Solaris-based since they are likely to have ZFS. +            if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"): +                raise +        finally: +            os.close(fd) + +    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), +        "test needs posix.posix_fadvise()") +    def test_posix_fadvise(self): +        fd = os.open(support.TESTFN, os.O_RDONLY) +        try: +            posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) +        finally: +            os.close(fd) + +    @unittest.skipUnless(hasattr(posix, 'futimes'), "test needs posix.futimes()") +    def test_futimes(self): +        now = time.time() +        fd = os.open(support.TESTFN, os.O_RDONLY) +        try: +            posix.futimes(fd, None) +            self.assertRaises(TypeError, posix.futimes, fd, (None, None)) +            self.assertRaises(TypeError, posix.futimes, fd, (now, None)) +            self.assertRaises(TypeError, posix.futimes, fd, (None, now)) +            posix.futimes(fd, (int(now), int(now))) +            posix.futimes(fd, (now, now)) +        finally: +            os.close(fd) + +    @unittest.skipUnless(hasattr(posix, 'lutimes'), "test needs posix.lutimes()") +    def test_lutimes(self): +        now = time.time() +        posix.lutimes(support.TESTFN, None) +        self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, None)) +        self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (now, None)) +        self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, now)) +        posix.lutimes(support.TESTFN, (int(now), int(now))) +        posix.lutimes(support.TESTFN, (now, now)) + +    @unittest.skipUnless(hasattr(posix, 'futimens'), "test needs posix.futimens()") +    def test_futimens(self): +        now = time.time() +        fd = os.open(support.TESTFN, os.O_RDONLY) +        try: +            self.assertRaises(TypeError, posix.futimens, fd, (None, None), (None, None)) +            self.assertRaises(TypeError, posix.futimens, fd, (now, 0), None) +            self.assertRaises(TypeError, posix.futimens, fd, None, (now, 0)) +            posix.futimens(fd, (int(now), int((now - int(now)) * 1e9)), +                    (int(now), int((now - int(now)) * 1e9))) +        finally: +            os.close(fd) + +    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") +    def test_writev(self): +        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) +        try: +            os.writev(fd, (b'test1', b'tt2', b't3')) +            os.lseek(fd, 0, os.SEEK_SET) +            self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) +        finally: +            os.close(fd) + +    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") +    def test_readv(self): +        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) +        try: +            os.write(fd, b'test1tt2t3') +            os.lseek(fd, 0, os.SEEK_SET) +            buf = [bytearray(i) for i in [5, 3, 2]] +            self.assertEqual(posix.readv(fd, buf), 10) +            self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf]) +        finally: +            os.close(fd) +      def test_dup(self):          if hasattr(posix, 'dup'):              fp = open(support.TESTFN) @@ -285,6 +435,18 @@ class PosixTester(unittest.TestCase):          if hasattr(posix, 'listdir'):              self.assertTrue(support.TESTFN in posix.listdir()) +    @unittest.skipUnless(hasattr(posix, 'fdlistdir'), "test needs posix.fdlistdir()") +    def test_fdlistdir(self): +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        self.assertEqual( +            sorted(posix.listdir('.')), +            sorted(posix.fdlistdir(f)) +            ) +        # Check the fd was closed by fdlistdir +        with self.assertRaises(OSError) as ctx: +            posix.close(f) +        self.assertEqual(ctx.exception.errno, errno.EBADF) +      def test_access(self):          if hasattr(posix, 'access'):              self.assertTrue(posix.access(support.TESTFN, os.R_OK)) @@ -389,6 +551,198 @@ class PosixTester(unittest.TestCase):                  set([int(x) for x in groups.split()]),                  set(posix.getgroups() + [posix.getegid()])) +    # tests for the posix *at functions follow + +    @unittest.skipUnless(hasattr(posix, 'faccessat'), "test needs posix.faccessat()") +    def test_faccessat(self): +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            self.assertTrue(posix.faccessat(f, support.TESTFN, os.R_OK)) +        finally: +            posix.close(f) + +    @unittest.skipUnless(hasattr(posix, 'fchmodat'), "test needs posix.fchmodat()") +    def test_fchmodat(self): +        os.chmod(support.TESTFN, stat.S_IRUSR) + +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            posix.fchmodat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR) + +            s = posix.stat(support.TESTFN) +            self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) +        finally: +            posix.close(f) + +    @unittest.skipUnless(hasattr(posix, 'fchownat'), "test needs posix.fchownat()") +    def test_fchownat(self): +        support.unlink(support.TESTFN) +        open(support.TESTFN, 'w').close() + +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            posix.fchownat(f, support.TESTFN, os.getuid(), os.getgid()) +        finally: +            posix.close(f) + +    @unittest.skipUnless(hasattr(posix, 'fstatat'), "test needs posix.fstatat()") +    def test_fstatat(self): +        support.unlink(support.TESTFN) +        with open(support.TESTFN, 'w') as outfile: +            outfile.write("testline\n") + +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            s1 = posix.stat(support.TESTFN) +            s2 = posix.fstatat(f, support.TESTFN) +            self.assertEqual(s1, s2) +        finally: +            posix.close(f) + +    @unittest.skipUnless(hasattr(posix, 'futimesat'), "test needs posix.futimesat()") +    def test_futimesat(self): +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            now = time.time() +            posix.futimesat(f, support.TESTFN, None) +            self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, None)) +            self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (now, None)) +            self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, now)) +            posix.futimesat(f, support.TESTFN, (int(now), int(now))) +            posix.futimesat(f, support.TESTFN, (now, now)) +        finally: +            posix.close(f) + +    @unittest.skipUnless(hasattr(posix, 'linkat'), "test needs posix.linkat()") +    def test_linkat(self): +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            posix.linkat(f, support.TESTFN, f, support.TESTFN + 'link') +            # should have same inodes +            self.assertEqual(posix.stat(support.TESTFN)[1], +                posix.stat(support.TESTFN + 'link')[1]) +        finally: +            posix.close(f) +            support.unlink(support.TESTFN + 'link') + +    @unittest.skipUnless(hasattr(posix, 'mkdirat'), "test needs posix.mkdirat()") +    def test_mkdirat(self): +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            posix.mkdirat(f, support.TESTFN + 'dir') +            posix.stat(support.TESTFN + 'dir') # should not raise exception +        finally: +            posix.close(f) +            support.rmtree(support.TESTFN + 'dir') + +    @unittest.skipUnless(hasattr(posix, 'mknodat') and hasattr(stat, 'S_IFIFO'), +                         "don't have mknodat()/S_IFIFO") +    def test_mknodat(self): +        # Test using mknodat() to create a FIFO (the only use specified +        # by POSIX). +        support.unlink(support.TESTFN) +        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            posix.mknodat(f, support.TESTFN, mode, 0) +        except OSError as e: +            # Some old systems don't allow unprivileged users to use +            # mknod(), or only support creating device nodes. +            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) +        else: +            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) +        finally: +            posix.close(f) + +    @unittest.skipUnless(hasattr(posix, 'openat'), "test needs posix.openat()") +    def test_openat(self): +        support.unlink(support.TESTFN) +        with open(support.TESTFN, 'w') as outfile: +            outfile.write("testline\n") +        a = posix.open(posix.getcwd(), posix.O_RDONLY) +        b = posix.openat(a, support.TESTFN, posix.O_RDONLY) +        try: +            res = posix.read(b, 9).decode(encoding="utf-8") +            self.assertEqual("testline\n", res) +        finally: +            posix.close(a) +            posix.close(b) + +    @unittest.skipUnless(hasattr(posix, 'readlinkat'), "test needs posix.readlinkat()") +    def test_readlinkat(self): +        os.symlink(support.TESTFN, support.TESTFN + 'link') +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            self.assertEqual(posix.readlink(support.TESTFN + 'link'), +                posix.readlinkat(f, support.TESTFN + 'link')) +        finally: +            support.unlink(support.TESTFN + 'link') +            posix.close(f) + +    @unittest.skipUnless(hasattr(posix, 'renameat'), "test needs posix.renameat()") +    def test_renameat(self): +        support.unlink(support.TESTFN) +        open(support.TESTFN + 'ren', 'w').close() +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            posix.renameat(f, support.TESTFN + 'ren', f, support.TESTFN) +        except: +            posix.rename(support.TESTFN + 'ren', support.TESTFN) +            raise +        else: +            posix.stat(support.TESTFN) # should not throw exception +        finally: +            posix.close(f) + +    @unittest.skipUnless(hasattr(posix, 'symlinkat'), "test needs posix.symlinkat()") +    def test_symlinkat(self): +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            posix.symlinkat(support.TESTFN, f, support.TESTFN + 'link') +            self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN) +        finally: +            posix.close(f) +            support.unlink(support.TESTFN + 'link') + +    @unittest.skipUnless(hasattr(posix, 'unlinkat'), "test needs posix.unlinkat()") +    def test_unlinkat(self): +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        open(support.TESTFN + 'del', 'w').close() +        posix.stat(support.TESTFN + 'del') # should not throw exception +        try: +            posix.unlinkat(f, support.TESTFN + 'del') +        except: +            support.unlink(support.TESTFN + 'del') +            raise +        else: +            self.assertRaises(OSError, posix.stat, support.TESTFN + 'link') +        finally: +            posix.close(f) + +    @unittest.skipUnless(hasattr(posix, 'utimensat'), "test needs posix.utimensat()") +    def test_utimensat(self): +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            now = time.time() +            posix.utimensat(f, support.TESTFN, None, None) +            self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (None, None), (None, None)) +            self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (now, 0), None) +            self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, None, (now, 0)) +            posix.utimensat(f, support.TESTFN, (int(now), int((now - int(now)) * 1e9)), +                    (int(now), int((now - int(now)) * 1e9))) +        finally: +            posix.close(f) + +    @unittest.skipUnless(hasattr(posix, 'mkfifoat'), "don't have mkfifoat()") +    def test_mkfifoat(self): +        support.unlink(support.TESTFN) +        f = posix.open(posix.getcwd(), posix.O_RDONLY) +        try: +            posix.mkfifoat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR) +            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) +        finally: +            posix.close(f) +  class PosixGroupsTester(unittest.TestCase):      def setUp(self): @@ -426,7 +780,10 @@ class PosixGroupsTester(unittest.TestCase):  def test_main(): -    support.run_unittest(PosixTester, PosixGroupsTester) +    try: +        support.run_unittest(PosixTester, PosixGroupsTester) +    finally: +        support.reap_children()  if __name__ == '__main__':      test_main() | 
