summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_posix.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_posix.py')
-rw-r--r--Lib/test/test_posix.py361
1 files changed, 359 insertions, 2 deletions
diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py
index 45b3afc..0e9ac75 100644
--- a/Lib/test/test_posix.py
+++ b/Lib/test/test_posix.py
@@ -37,7 +37,7 @@ class PosixTester(unittest.TestCase):
NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
"times", "getloadavg",
"getegid", "geteuid", "getgid", "getgroups",
- "getpid", "getpgrp", "getppid", "getuid",
+ "getpid", "getpgrp", "getppid", "getuid", "sync",
]
for name in NO_ARG_FUNCTIONS:
@@ -132,6 +132,156 @@ class PosixTester(unittest.TestCase):
finally:
fp.close()
+ @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
+ def test_truncate(self):
+ with open(support.TESTFN, 'w') as fp:
+ fp.write('test')
+ fp.flush()
+ posix.truncate(support.TESTFN, 0)
+
+ @unittest.skipUnless(hasattr(posix, 'fexecve'), "test needs posix.fexecve()")
+ @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
+ @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
+ def test_fexecve(self):
+ fp = os.open(sys.executable, os.O_RDONLY)
+ try:
+ pid = os.fork()
+ if pid == 0:
+ os.chdir(os.path.split(sys.executable)[0])
+ posix.fexecve(fp, [sys.executable, '-c', 'pass'], os.environ)
+ else:
+ self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ finally:
+ os.close(fp)
+
+ @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
+ @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
+ def test_waitid(self):
+ pid = os.fork()
+ if pid == 0:
+ os.chdir(os.path.split(sys.executable)[0])
+ posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
+ else:
+ res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
+ self.assertEqual(pid, res.si_pid)
+
+ @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
+ def test_lockf(self):
+ fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
+ try:
+ os.write(fd, b'test')
+ os.lseek(fd, 0, os.SEEK_SET)
+ posix.lockf(fd, posix.F_LOCK, 4)
+ # section is locked
+ posix.lockf(fd, posix.F_ULOCK, 4)
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
+ def test_pread(self):
+ fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
+ try:
+ os.write(fd, b'test')
+ os.lseek(fd, 0, os.SEEK_SET)
+ self.assertEqual(b'es', posix.pread(fd, 2, 1))
+ # the first pread() shoudn't disturb the file offset
+ self.assertEqual(b'te', posix.read(fd, 2))
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
+ def test_pwrite(self):
+ fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
+ try:
+ os.write(fd, b'test')
+ os.lseek(fd, 0, os.SEEK_SET)
+ posix.pwrite(fd, b'xx', 1)
+ self.assertEqual(b'txxt', posix.read(fd, 4))
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
+ "test needs posix.posix_fallocate()")
+ def test_posix_fallocate(self):
+ fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
+ try:
+ posix.posix_fallocate(fd, 0, 10)
+ except OSError as inst:
+ # issue10812, ZFS doesn't appear to support posix_fallocate,
+ # so skip Solaris-based since they are likely to have ZFS.
+ if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"):
+ raise
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
+ "test needs posix.posix_fadvise()")
+ def test_posix_fadvise(self):
+ fd = os.open(support.TESTFN, os.O_RDONLY)
+ try:
+ posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'futimes'), "test needs posix.futimes()")
+ def test_futimes(self):
+ now = time.time()
+ fd = os.open(support.TESTFN, os.O_RDONLY)
+ try:
+ posix.futimes(fd, None)
+ self.assertRaises(TypeError, posix.futimes, fd, (None, None))
+ self.assertRaises(TypeError, posix.futimes, fd, (now, None))
+ self.assertRaises(TypeError, posix.futimes, fd, (None, now))
+ posix.futimes(fd, (int(now), int(now)))
+ posix.futimes(fd, (now, now))
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'lutimes'), "test needs posix.lutimes()")
+ def test_lutimes(self):
+ now = time.time()
+ posix.lutimes(support.TESTFN, None)
+ self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, None))
+ self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (now, None))
+ self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, now))
+ posix.lutimes(support.TESTFN, (int(now), int(now)))
+ posix.lutimes(support.TESTFN, (now, now))
+
+ @unittest.skipUnless(hasattr(posix, 'futimens'), "test needs posix.futimens()")
+ def test_futimens(self):
+ now = time.time()
+ fd = os.open(support.TESTFN, os.O_RDONLY)
+ try:
+ self.assertRaises(TypeError, posix.futimens, fd, (None, None), (None, None))
+ self.assertRaises(TypeError, posix.futimens, fd, (now, 0), None)
+ self.assertRaises(TypeError, posix.futimens, fd, None, (now, 0))
+ posix.futimens(fd, (int(now), int((now - int(now)) * 1e9)),
+ (int(now), int((now - int(now)) * 1e9)))
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
+ def test_writev(self):
+ fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
+ try:
+ os.writev(fd, (b'test1', b'tt2', b't3'))
+ os.lseek(fd, 0, os.SEEK_SET)
+ self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
+ def test_readv(self):
+ fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
+ try:
+ os.write(fd, b'test1tt2t3')
+ os.lseek(fd, 0, os.SEEK_SET)
+ buf = [bytearray(i) for i in [5, 3, 2]]
+ self.assertEqual(posix.readv(fd, buf), 10)
+ self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
+ finally:
+ os.close(fd)
+
def test_dup(self):
if hasattr(posix, 'dup'):
fp = open(support.TESTFN)
@@ -285,6 +435,18 @@ class PosixTester(unittest.TestCase):
if hasattr(posix, 'listdir'):
self.assertTrue(support.TESTFN in posix.listdir())
+ @unittest.skipUnless(hasattr(posix, 'fdlistdir'), "test needs posix.fdlistdir()")
+ def test_fdlistdir(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ self.assertEqual(
+ sorted(posix.listdir('.')),
+ sorted(posix.fdlistdir(f))
+ )
+ # Check the fd was closed by fdlistdir
+ with self.assertRaises(OSError) as ctx:
+ posix.close(f)
+ self.assertEqual(ctx.exception.errno, errno.EBADF)
+
def test_access(self):
if hasattr(posix, 'access'):
self.assertTrue(posix.access(support.TESTFN, os.R_OK))
@@ -389,6 +551,198 @@ class PosixTester(unittest.TestCase):
set([int(x) for x in groups.split()]),
set(posix.getgroups() + [posix.getegid()]))
+ # tests for the posix *at functions follow
+
+ @unittest.skipUnless(hasattr(posix, 'faccessat'), "test needs posix.faccessat()")
+ def test_faccessat(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ self.assertTrue(posix.faccessat(f, support.TESTFN, os.R_OK))
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(hasattr(posix, 'fchmodat'), "test needs posix.fchmodat()")
+ def test_fchmodat(self):
+ os.chmod(support.TESTFN, stat.S_IRUSR)
+
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.fchmodat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)
+
+ s = posix.stat(support.TESTFN)
+ self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR)
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(hasattr(posix, 'fchownat'), "test needs posix.fchownat()")
+ def test_fchownat(self):
+ support.unlink(support.TESTFN)
+ open(support.TESTFN, 'w').close()
+
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.fchownat(f, support.TESTFN, os.getuid(), os.getgid())
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(hasattr(posix, 'fstatat'), "test needs posix.fstatat()")
+ def test_fstatat(self):
+ support.unlink(support.TESTFN)
+ with open(support.TESTFN, 'w') as outfile:
+ outfile.write("testline\n")
+
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ s1 = posix.stat(support.TESTFN)
+ s2 = posix.fstatat(f, support.TESTFN)
+ self.assertEqual(s1, s2)
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(hasattr(posix, 'futimesat'), "test needs posix.futimesat()")
+ def test_futimesat(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ now = time.time()
+ posix.futimesat(f, support.TESTFN, None)
+ self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, None))
+ self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (now, None))
+ self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, now))
+ posix.futimesat(f, support.TESTFN, (int(now), int(now)))
+ posix.futimesat(f, support.TESTFN, (now, now))
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(hasattr(posix, 'linkat'), "test needs posix.linkat()")
+ def test_linkat(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.linkat(f, support.TESTFN, f, support.TESTFN + 'link')
+ # should have same inodes
+ self.assertEqual(posix.stat(support.TESTFN)[1],
+ posix.stat(support.TESTFN + 'link')[1])
+ finally:
+ posix.close(f)
+ support.unlink(support.TESTFN + 'link')
+
+ @unittest.skipUnless(hasattr(posix, 'mkdirat'), "test needs posix.mkdirat()")
+ def test_mkdirat(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.mkdirat(f, support.TESTFN + 'dir')
+ posix.stat(support.TESTFN + 'dir') # should not raise exception
+ finally:
+ posix.close(f)
+ support.rmtree(support.TESTFN + 'dir')
+
+ @unittest.skipUnless(hasattr(posix, 'mknodat') and hasattr(stat, 'S_IFIFO'),
+ "don't have mknodat()/S_IFIFO")
+ def test_mknodat(self):
+ # Test using mknodat() to create a FIFO (the only use specified
+ # by POSIX).
+ support.unlink(support.TESTFN)
+ mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.mknodat(f, support.TESTFN, mode, 0)
+ except OSError as e:
+ # Some old systems don't allow unprivileged users to use
+ # mknod(), or only support creating device nodes.
+ self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
+ else:
+ self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(hasattr(posix, 'openat'), "test needs posix.openat()")
+ def test_openat(self):
+ support.unlink(support.TESTFN)
+ with open(support.TESTFN, 'w') as outfile:
+ outfile.write("testline\n")
+ a = posix.open(posix.getcwd(), posix.O_RDONLY)
+ b = posix.openat(a, support.TESTFN, posix.O_RDONLY)
+ try:
+ res = posix.read(b, 9).decode(encoding="utf-8")
+ self.assertEqual("testline\n", res)
+ finally:
+ posix.close(a)
+ posix.close(b)
+
+ @unittest.skipUnless(hasattr(posix, 'readlinkat'), "test needs posix.readlinkat()")
+ def test_readlinkat(self):
+ os.symlink(support.TESTFN, support.TESTFN + 'link')
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ self.assertEqual(posix.readlink(support.TESTFN + 'link'),
+ posix.readlinkat(f, support.TESTFN + 'link'))
+ finally:
+ support.unlink(support.TESTFN + 'link')
+ posix.close(f)
+
+ @unittest.skipUnless(hasattr(posix, 'renameat'), "test needs posix.renameat()")
+ def test_renameat(self):
+ support.unlink(support.TESTFN)
+ open(support.TESTFN + 'ren', 'w').close()
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.renameat(f, support.TESTFN + 'ren', f, support.TESTFN)
+ except:
+ posix.rename(support.TESTFN + 'ren', support.TESTFN)
+ raise
+ else:
+ posix.stat(support.TESTFN) # should not throw exception
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(hasattr(posix, 'symlinkat'), "test needs posix.symlinkat()")
+ def test_symlinkat(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.symlinkat(support.TESTFN, f, support.TESTFN + 'link')
+ self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN)
+ finally:
+ posix.close(f)
+ support.unlink(support.TESTFN + 'link')
+
+ @unittest.skipUnless(hasattr(posix, 'unlinkat'), "test needs posix.unlinkat()")
+ def test_unlinkat(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ open(support.TESTFN + 'del', 'w').close()
+ posix.stat(support.TESTFN + 'del') # should not throw exception
+ try:
+ posix.unlinkat(f, support.TESTFN + 'del')
+ except:
+ support.unlink(support.TESTFN + 'del')
+ raise
+ else:
+ self.assertRaises(OSError, posix.stat, support.TESTFN + 'link')
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(hasattr(posix, 'utimensat'), "test needs posix.utimensat()")
+ def test_utimensat(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ now = time.time()
+ posix.utimensat(f, support.TESTFN, None, None)
+ self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (None, None), (None, None))
+ self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (now, 0), None)
+ self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, None, (now, 0))
+ posix.utimensat(f, support.TESTFN, (int(now), int((now - int(now)) * 1e9)),
+ (int(now), int((now - int(now)) * 1e9)))
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(hasattr(posix, 'mkfifoat'), "don't have mkfifoat()")
+ def test_mkfifoat(self):
+ support.unlink(support.TESTFN)
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.mkfifoat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)
+ self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
+ finally:
+ posix.close(f)
+
class PosixGroupsTester(unittest.TestCase):
def setUp(self):
@@ -426,7 +780,10 @@ class PosixGroupsTester(unittest.TestCase):
def test_main():
- support.run_unittest(PosixTester, PosixGroupsTester)
+ try:
+ support.run_unittest(PosixTester, PosixGroupsTester)
+ finally:
+ support.reap_children()
if __name__ == '__main__':
test_main()