summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_posix.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_posix.py')
-rw-r--r--Lib/test/test_posix.py637
1 files changed, 608 insertions, 29 deletions
diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py
index 06ad5c8..fb79b17 100644
--- a/Lib/test/test_posix.py
+++ b/Lib/test/test_posix.py
@@ -9,6 +9,7 @@ import errno
import sys
import time
import os
+import fcntl
import platform
import pwd
import shutil
@@ -16,6 +17,7 @@ import stat
import tempfile
import unittest
import warnings
+import _testcapi
_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
support.TESTFN + '-dummy-symlink')
@@ -43,7 +45,7 @@ class PosixTester(unittest.TestCase):
NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
"times", "getloadavg",
"getegid", "geteuid", "getgid", "getgroups",
- "getpid", "getpgrp", "getppid", "getuid",
+ "getpid", "getpgrp", "getppid", "getuid", "sync",
]
for name in NO_ARG_FUNCTIONS:
@@ -128,6 +130,7 @@ class PosixTester(unittest.TestCase):
fp = open(support.TESTFN)
try:
self.assertTrue(posix.fstatvfs(fp.fileno()))
+ self.assertTrue(posix.statvfs(fp.fileno()))
finally:
fp.close()
@@ -142,6 +145,151 @@ class PosixTester(unittest.TestCase):
finally:
fp.close()
+ @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
+ def test_truncate(self):
+ with open(support.TESTFN, 'w') as fp:
+ fp.write('test')
+ fp.flush()
+ posix.truncate(support.TESTFN, 0)
+
+ @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter")
+ @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
+ @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
+ def test_fexecve(self):
+ fp = os.open(sys.executable, os.O_RDONLY)
+ try:
+ pid = os.fork()
+ if pid == 0:
+ os.chdir(os.path.split(sys.executable)[0])
+ posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ)
+ else:
+ self.assertEqual(os.waitpid(pid, 0), (pid, 0))
+ finally:
+ os.close(fp)
+
+ @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
+ @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
+ def test_waitid(self):
+ pid = os.fork()
+ if pid == 0:
+ os.chdir(os.path.split(sys.executable)[0])
+ posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
+ else:
+ res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
+ self.assertEqual(pid, res.si_pid)
+
+ @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
+ def test_lockf(self):
+ fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
+ try:
+ os.write(fd, b'test')
+ os.lseek(fd, 0, os.SEEK_SET)
+ posix.lockf(fd, posix.F_LOCK, 4)
+ # section is locked
+ posix.lockf(fd, posix.F_ULOCK, 4)
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
+ def test_pread(self):
+ fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
+ try:
+ os.write(fd, b'test')
+ os.lseek(fd, 0, os.SEEK_SET)
+ self.assertEqual(b'es', posix.pread(fd, 2, 1))
+ # the first pread() shouldn't disturb the file offset
+ self.assertEqual(b'te', posix.read(fd, 2))
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
+ def test_pwrite(self):
+ fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
+ try:
+ os.write(fd, b'test')
+ os.lseek(fd, 0, os.SEEK_SET)
+ posix.pwrite(fd, b'xx', 1)
+ self.assertEqual(b'txxt', posix.read(fd, 4))
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
+ "test needs posix.posix_fallocate()")
+ def test_posix_fallocate(self):
+ fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
+ try:
+ posix.posix_fallocate(fd, 0, 10)
+ except OSError as inst:
+ # issue10812, ZFS doesn't appear to support posix_fallocate,
+ # so skip Solaris-based since they are likely to have ZFS.
+ if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"):
+ raise
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
+ "test needs posix.posix_fadvise()")
+ def test_posix_fadvise(self):
+ fd = os.open(support.TESTFN, os.O_RDONLY)
+ try:
+ posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime")
+ def test_utime_with_fd(self):
+ now = time.time()
+ fd = os.open(support.TESTFN, os.O_RDONLY)
+ try:
+ posix.utime(fd)
+ posix.utime(fd, None)
+ self.assertRaises(TypeError, posix.utime, fd, (None, None))
+ self.assertRaises(TypeError, posix.utime, fd, (now, None))
+ self.assertRaises(TypeError, posix.utime, fd, (None, now))
+ posix.utime(fd, (int(now), int(now)))
+ posix.utime(fd, (now, now))
+ self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now))
+ self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None))
+ self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0))
+ posix.utime(fd, (int(now), int((now - int(now)) * 1e9)))
+ posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9)))
+
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime")
+ def test_utime_nofollow_symlinks(self):
+ now = time.time()
+ posix.utime(support.TESTFN, None, follow_symlinks=False)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), follow_symlinks=False)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), follow_symlinks=False)
+ posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False)
+ posix.utime(support.TESTFN, (now, now), follow_symlinks=False)
+ posix.utime(support.TESTFN, follow_symlinks=False)
+
+ @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
+ def test_writev(self):
+ fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
+ try:
+ os.writev(fd, (b'test1', b'tt2', b't3'))
+ os.lseek(fd, 0, os.SEEK_SET)
+ self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
+ finally:
+ os.close(fd)
+
+ @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
+ def test_readv(self):
+ fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
+ try:
+ os.write(fd, b'test1tt2t3')
+ os.lseek(fd, 0, os.SEEK_SET)
+ buf = [bytearray(i) for i in [5, 3, 2]]
+ self.assertEqual(posix.readv(fd, buf), 10)
+ self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
+ finally:
+ os.close(fd)
+
def test_dup(self):
if hasattr(posix, 'dup'):
fp = open(support.TESTFN)
@@ -167,6 +315,13 @@ class PosixTester(unittest.TestCase):
fp1.close()
fp2.close()
+ @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")
+ @support.requires_linux_version(2, 6, 23)
+ def test_oscloexec(self):
+ fd = os.open(support.TESTFN, os.O_RDONLY|os.O_CLOEXEC)
+ self.addCleanup(os.close, fd)
+ self.assertTrue(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
+
def test_osexlock(self):
if hasattr(posix, "O_EXLOCK"):
fd = os.open(support.TESTFN,
@@ -203,12 +358,29 @@ class PosixTester(unittest.TestCase):
fp = open(support.TESTFN)
try:
self.assertTrue(posix.fstat(fp.fileno()))
+ self.assertTrue(posix.stat(fp.fileno()))
+
+ self.assertRaisesRegex(TypeError,
+ 'should be string, bytes or integer, not',
+ posix.stat, float(fp.fileno()))
finally:
fp.close()
def test_stat(self):
if hasattr(posix, 'stat'):
self.assertTrue(posix.stat(support.TESTFN))
+ self.assertTrue(posix.stat(os.fsencode(support.TESTFN)))
+ self.assertTrue(posix.stat(bytearray(os.fsencode(support.TESTFN))))
+
+ self.assertRaisesRegex(TypeError,
+ 'can\'t specify None for path argument',
+ posix.stat, None)
+ self.assertRaisesRegex(TypeError,
+ 'should be string, bytes or integer, not',
+ posix.stat, list(support.TESTFN))
+ self.assertRaisesRegex(TypeError,
+ 'should be string, bytes or integer, not',
+ posix.stat, list(os.fsencode(support.TESTFN)))
@unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")
def test_mkfifo(self):
@@ -265,7 +437,7 @@ class PosixTester(unittest.TestCase):
self.assertRaises(OSError, posix.chown, support.TESTFN, -1, -1)
# re-create the file
- open(support.TESTFN, 'w').close()
+ support.create_empty_file(support.TESTFN)
self._test_all_chown_common(posix.chown, support.TESTFN)
@unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
@@ -293,13 +465,32 @@ class PosixTester(unittest.TestCase):
self.assertRaises(OSError, posix.chdir, support.TESTFN)
def test_listdir(self):
- if hasattr(posix, 'listdir'):
- self.assertTrue(support.TESTFN in posix.listdir(os.curdir))
+ self.assertTrue(support.TESTFN in posix.listdir(os.curdir))
def test_listdir_default(self):
- # When listdir is called without argument, it's the same as listdir(os.curdir)
- if hasattr(posix, 'listdir'):
- self.assertTrue(support.TESTFN in posix.listdir())
+ # When listdir is called without argument,
+ # it's the same as listdir(os.curdir).
+ self.assertTrue(support.TESTFN in posix.listdir())
+
+ def test_listdir_bytes(self):
+ # When listdir is called with a bytes object,
+ # the returned strings are of type bytes.
+ self.assertTrue(os.fsencode(support.TESTFN) in posix.listdir(b'.'))
+
+ @unittest.skipUnless(posix.listdir in os.supports_fd,
+ "test needs fd support for posix.listdir()")
+ def test_listdir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ self.addCleanup(posix.close, f)
+ self.assertEqual(
+ sorted(posix.listdir('.')),
+ sorted(posix.listdir(f))
+ )
+ # Check that the fd offset was reset (issue #13739)
+ self.assertEqual(
+ sorted(posix.listdir('.')),
+ sorted(posix.listdir(f))
+ )
def test_access(self):
if hasattr(posix, 'access'):
@@ -321,6 +512,36 @@ class PosixTester(unittest.TestCase):
os.close(reader)
os.close(writer)
+ @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
+ @support.requires_linux_version(2, 6, 27)
+ def test_pipe2(self):
+ self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')
+ self.assertRaises(TypeError, os.pipe2, 0, 0)
+
+ # try calling with flags = 0, like os.pipe()
+ r, w = os.pipe2(0)
+ os.close(r)
+ os.close(w)
+
+ # test flags
+ r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)
+ self.addCleanup(os.close, r)
+ self.addCleanup(os.close, w)
+ self.assertTrue(fcntl.fcntl(r, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
+ self.assertTrue(fcntl.fcntl(w, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
+ # try reading from an empty pipe: this should fail, not block
+ self.assertRaises(OSError, os.read, r, 1)
+ # try a write big enough to fill-up the pipe: this should either
+ # fail or perform a partial write, not block
+ try:
+ os.write(w, b'x' * support.PIPE_MAX_SIZE)
+ except OSError:
+ pass
+
+ # Issue 15989
+ self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1)
+ self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1)
+
def test_utime(self):
if hasattr(posix, 'utime'):
now = time.time()
@@ -331,13 +552,14 @@ class PosixTester(unittest.TestCase):
posix.utime(support.TESTFN, (int(now), int(now)))
posix.utime(support.TESTFN, (now, now))
- def _test_chflags_regular_file(self, chflags_func, target_file):
+ def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
st = os.stat(target_file)
self.assertTrue(hasattr(st, 'st_flags'))
# ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
+ flags = st.st_flags | stat.UF_IMMUTABLE
try:
- chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE)
+ chflags_func(target_file, flags, **kwargs)
except OSError as err:
if err.errno != errno.EOPNOTSUPP:
raise
@@ -361,6 +583,7 @@ class PosixTester(unittest.TestCase):
@unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
def test_lchflags_regular_file(self):
self._test_chflags_regular_file(posix.lchflags, support.TESTFN)
+ self._test_chflags_regular_file(posix.chflags, support.TESTFN, follow_symlinks=False)
@unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
def test_lchflags_symlink(self):
@@ -372,25 +595,28 @@ class PosixTester(unittest.TestCase):
self.teardown_files.append(_DUMMY_SYMLINK)
dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
- # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
- try:
- posix.lchflags(_DUMMY_SYMLINK,
- dummy_symlink_st.st_flags | stat.UF_IMMUTABLE)
- except OSError as err:
- if err.errno != errno.EOPNOTSUPP:
- raise
- msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
- self.skipTest(msg)
+ def chflags_nofollow(path, flags):
+ return posix.chflags(path, flags, follow_symlinks=False)
- try:
- new_testfn_st = os.stat(support.TESTFN)
- new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
+ for fn in (posix.lchflags, chflags_nofollow):
+ # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
+ flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE
+ try:
+ fn(_DUMMY_SYMLINK, flags)
+ except OSError as err:
+ if err.errno != errno.EOPNOTSUPP:
+ raise
+ msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
+ self.skipTest(msg)
+ try:
+ new_testfn_st = os.stat(support.TESTFN)
+ new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
- self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
- self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
- new_dummy_symlink_st.st_flags)
- finally:
- posix.lchflags(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
+ self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
+ self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
+ new_dummy_symlink_st.st_flags)
+ finally:
+ fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
def test_environ(self):
if os.name == "nt":
@@ -438,13 +664,22 @@ class PosixTester(unittest.TestCase):
os.chdir(curdir)
support.rmtree(base_path)
+ @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()")
+ @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
+ @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()")
+ def test_getgrouplist(self):
+ user = pwd.getpwuid(os.getuid())[0]
+ group = pwd.getpwuid(os.getuid())[3]
+ self.assertIn(group, posix.getgrouplist(user, group))
+
+
@unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
def test_getgroups(self):
with os.popen('id -G') as idg:
groups = idg.read().strip()
ret = idg.close()
- if ret != None or not groups:
+ if ret is not None or not groups:
raise unittest.SkipTest("need working 'id -G'")
# Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups()
@@ -462,6 +697,348 @@ class PosixTester(unittest.TestCase):
set([int(x) for x in groups.split()]),
set(posix.getgroups() + [posix.getegid()]))
+ # tests for the posix *at functions follow
+
+ @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")
+ def test_access_dir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f))
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")
+ def test_chmod_dir_fd(self):
+ os.chmod(support.TESTFN, stat.S_IRUSR)
+
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
+
+ s = posix.stat(support.TESTFN)
+ self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR)
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()")
+ def test_chown_dir_fd(self):
+ support.unlink(support.TESTFN)
+ support.create_empty_file(support.TESTFN)
+
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f)
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")
+ def test_stat_dir_fd(self):
+ support.unlink(support.TESTFN)
+ with open(support.TESTFN, 'w') as outfile:
+ outfile.write("testline\n")
+
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ s1 = posix.stat(support.TESTFN)
+ s2 = posix.stat(support.TESTFN, dir_fd=f)
+ self.assertEqual(s1, s2)
+ s2 = posix.stat(support.TESTFN, dir_fd=None)
+ self.assertEqual(s1, s2)
+ self.assertRaisesRegex(TypeError, 'should be integer, not',
+ posix.stat, support.TESTFN, dir_fd=posix.getcwd())
+ self.assertRaisesRegex(TypeError, 'should be integer, not',
+ posix.stat, support.TESTFN, dir_fd=float(f))
+ self.assertRaises(OverflowError,
+ posix.stat, support.TESTFN, dir_fd=10**20)
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")
+ def test_utime_dir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ now = time.time()
+ posix.utime(support.TESTFN, None, dir_fd=f)
+ posix.utime(support.TESTFN, dir_fd=f)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f)
+ posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f)
+ posix.utime(support.TESTFN, (now, now), dir_fd=f)
+ posix.utime(support.TESTFN,
+ (int(now), int((now - int(now)) * 1e9)), dir_fd=f)
+ posix.utime(support.TESTFN, dir_fd=f,
+ times=(int(now), int((now - int(now)) * 1e9)))
+
+ # try dir_fd and follow_symlinks together
+ if os.utime in os.supports_follow_symlinks:
+ try:
+ posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f)
+ except ValueError:
+ # whoops! using both together not supported on this platform.
+ pass
+
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()")
+ def test_link_dir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f)
+ # should have same inodes
+ self.assertEqual(posix.stat(support.TESTFN)[1],
+ posix.stat(support.TESTFN + 'link')[1])
+ finally:
+ posix.close(f)
+ support.unlink(support.TESTFN + 'link')
+
+ @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")
+ def test_mkdir_dir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.mkdir(support.TESTFN + 'dir', dir_fd=f)
+ posix.stat(support.TESTFN + 'dir') # should not raise exception
+ finally:
+ posix.close(f)
+ support.rmtree(support.TESTFN + 'dir')
+
+ @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'),
+ "test requires both stat.S_IFIFO and dir_fd support for os.mknod()")
+ def test_mknod_dir_fd(self):
+ # Test using mknodat() to create a FIFO (the only use specified
+ # by POSIX).
+ support.unlink(support.TESTFN)
+ mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
+ except OSError as e:
+ # Some old systems don't allow unprivileged users to use
+ # mknod(), or only support creating device nodes.
+ self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
+ else:
+ self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")
+ def test_open_dir_fd(self):
+ support.unlink(support.TESTFN)
+ with open(support.TESTFN, 'w') as outfile:
+ outfile.write("testline\n")
+ a = posix.open(posix.getcwd(), posix.O_RDONLY)
+ b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a)
+ try:
+ res = posix.read(b, 9).decode(encoding="utf-8")
+ self.assertEqual("testline\n", res)
+ finally:
+ posix.close(a)
+ posix.close(b)
+
+ @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()")
+ def test_readlink_dir_fd(self):
+ os.symlink(support.TESTFN, support.TESTFN + 'link')
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ self.assertEqual(posix.readlink(support.TESTFN + 'link'),
+ posix.readlink(support.TESTFN + 'link', dir_fd=f))
+ finally:
+ support.unlink(support.TESTFN + 'link')
+ posix.close(f)
+
+ @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")
+ def test_rename_dir_fd(self):
+ support.unlink(support.TESTFN)
+ support.create_empty_file(support.TESTFN + 'ren')
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f)
+ except:
+ posix.rename(support.TESTFN + 'ren', support.TESTFN)
+ raise
+ else:
+ posix.stat(support.TESTFN) # should not raise exception
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")
+ def test_symlink_dir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f)
+ self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN)
+ finally:
+ posix.close(f)
+ support.unlink(support.TESTFN + 'link')
+
+ @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")
+ def test_unlink_dir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ support.create_empty_file(support.TESTFN + 'del')
+ posix.stat(support.TESTFN + 'del') # should not raise exception
+ try:
+ posix.unlink(support.TESTFN + 'del', dir_fd=f)
+ except:
+ support.unlink(support.TESTFN + 'del')
+ raise
+ else:
+ self.assertRaises(OSError, posix.stat, support.TESTFN + 'link')
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")
+ def test_mkfifo_dir_fd(self):
+ support.unlink(support.TESTFN)
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
+ self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
+ finally:
+ posix.close(f)
+
+ requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
+ "don't have scheduling support")
+ requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'),
+ "don't have sched affinity support")
+
+ @requires_sched_h
+ def test_sched_yield(self):
+ # This has no error conditions (at least on Linux).
+ posix.sched_yield()
+
+ @requires_sched_h
+ @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),
+ "requires sched_get_priority_max()")
+ def test_sched_priority(self):
+ # Round-robin usually has interesting priorities.
+ pol = posix.SCHED_RR
+ lo = posix.sched_get_priority_min(pol)
+ hi = posix.sched_get_priority_max(pol)
+ self.assertIsInstance(lo, int)
+ self.assertIsInstance(hi, int)
+ self.assertGreaterEqual(hi, lo)
+ # OSX evidently just returns 15 without checking the argument.
+ if sys.platform != "darwin":
+ self.assertRaises(OSError, posix.sched_get_priority_min, -23)
+ self.assertRaises(OSError, posix.sched_get_priority_max, -23)
+
+ @unittest.skipUnless(hasattr(posix, 'sched_setscheduler'), "can't change scheduler")
+ def test_get_and_set_scheduler_and_param(self):
+ possible_schedulers = [sched for name, sched in posix.__dict__.items()
+ if name.startswith("SCHED_")]
+ mine = posix.sched_getscheduler(0)
+ self.assertIn(mine, possible_schedulers)
+ try:
+ parent = posix.sched_getscheduler(os.getppid())
+ except OSError as e:
+ if e.errno != errno.EPERM:
+ raise
+ else:
+ self.assertIn(parent, possible_schedulers)
+ self.assertRaises(OSError, posix.sched_getscheduler, -1)
+ self.assertRaises(OSError, posix.sched_getparam, -1)
+ param = posix.sched_getparam(0)
+ self.assertIsInstance(param.sched_priority, int)
+
+ # POSIX states that calling sched_setparam() or sched_setscheduler() on
+ # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR
+ # is implementation-defined: NetBSD and FreeBSD can return EINVAL.
+ if not sys.platform.startswith(('freebsd', 'netbsd')):
+ try:
+ posix.sched_setscheduler(0, mine, param)
+ posix.sched_setparam(0, param)
+ except OSError as e:
+ if e.errno != errno.EPERM:
+ raise
+ self.assertRaises(OSError, posix.sched_setparam, -1, param)
+
+ self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)
+ self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)
+ self.assertRaises(TypeError, posix.sched_setparam, 0, 43)
+ param = posix.sched_param(None)
+ self.assertRaises(TypeError, posix.sched_setparam, 0, param)
+ large = 214748364700
+ param = posix.sched_param(large)
+ self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
+ param = posix.sched_param(sched_priority=-large)
+ self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
+
+ @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")
+ def test_sched_rr_get_interval(self):
+ try:
+ interval = posix.sched_rr_get_interval(0)
+ except OSError as e:
+ # This likely means that sched_rr_get_interval is only valid for
+ # processes with the SCHED_RR scheduler in effect.
+ if e.errno != errno.EINVAL:
+ raise
+ self.skipTest("only works on SCHED_RR processes")
+ self.assertIsInstance(interval, float)
+ # Reasonable constraints, I think.
+ self.assertGreaterEqual(interval, 0.)
+ self.assertLess(interval, 1.)
+
+ @requires_sched_affinity
+ def test_sched_getaffinity(self):
+ mask = posix.sched_getaffinity(0)
+ self.assertIsInstance(mask, set)
+ self.assertGreaterEqual(len(mask), 1)
+ self.assertRaises(OSError, posix.sched_getaffinity, -1)
+ for cpu in mask:
+ self.assertIsInstance(cpu, int)
+ self.assertGreaterEqual(cpu, 0)
+ self.assertLess(cpu, 1 << 32)
+
+ @requires_sched_affinity
+ def test_sched_setaffinity(self):
+ mask = posix.sched_getaffinity(0)
+ if len(mask) > 1:
+ # Empty masks are forbidden
+ mask.pop()
+ posix.sched_setaffinity(0, mask)
+ self.assertEqual(posix.sched_getaffinity(0), mask)
+ self.assertRaises(OSError, posix.sched_setaffinity, 0, [])
+ self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10])
+ self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128])
+ self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)
+
+ def test_rtld_constants(self):
+ # check presence of major RTLD_* constants
+ posix.RTLD_LAZY
+ posix.RTLD_NOW
+ posix.RTLD_GLOBAL
+ posix.RTLD_LOCAL
+
+ @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'),
+ "test needs an OS that reports file holes")
+ def test_fs_holes(self):
+ # Even if the filesystem doesn't report holes,
+ # if the OS supports it the SEEK_* constants
+ # will be defined and will have a consistent
+ # behaviour:
+ # os.SEEK_DATA = current position
+ # os.SEEK_HOLE = end of file position
+ with open(support.TESTFN, 'r+b') as fp:
+ fp.write(b"hello")
+ fp.flush()
+ size = fp.tell()
+ fno = fp.fileno()
+ try :
+ for i in range(size):
+ self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
+ self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
+ self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
+ self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
+ except OSError :
+ # Some OSs claim to support SEEK_HOLE/SEEK_DATA
+ # but it is not true.
+ # For instance:
+ # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
+ raise unittest.SkipTest("OSError raised!")
+
class PosixGroupsTester(unittest.TestCase):
def setUp(self):
@@ -497,9 +1074,11 @@ class PosixGroupsTester(unittest.TestCase):
posix.setgroups(groups)
self.assertListEqual(groups, posix.getgroups())
-
def test_main():
- support.run_unittest(PosixTester, PosixGroupsTester)
+ try:
+ support.run_unittest(PosixTester, PosixGroupsTester)
+ finally:
+ support.reap_children()
if __name__ == '__main__':
test_main()