summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_os.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_os.py')
-rw-r--r--Lib/test/test_os.py493
1 files changed, 440 insertions, 53 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 56309bf..31f2cc3 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -24,6 +24,10 @@ import itertools
import stat
import locale
import codecs
+import decimal
+import fractions
+import pickle
+import sysconfig
try:
import threading
except ImportError:
@@ -32,6 +36,10 @@ try:
import resource
except ImportError:
resource = None
+try:
+ import fcntl
+except ImportError:
+ fcntl = None
from test.script_helper import assert_python_ok
@@ -256,6 +264,16 @@ class StatAttributeTests(unittest.TestCase):
warnings.simplefilter("ignore", DeprecationWarning)
self.check_stat_attributes(fname)
+ def test_stat_result_pickle(self):
+ result = os.stat(self.fname)
+ for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ p = pickle.dumps(result, proto)
+ self.assertIn(b'stat_result', p)
+ if proto < 4:
+ self.assertIn(b'cos\nstat_result\n', p)
+ unpickled = pickle.loads(p)
+ self.assertEqual(result, unpickled)
+
@unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
def test_statvfs_attributes(self):
try:
@@ -263,7 +281,7 @@ class StatAttributeTests(unittest.TestCase):
except OSError as e:
# On AtheOS, glibc always returns ENOSYS
if e.errno == errno.ENOSYS:
- self.skipTest('glibc always returns ENOSYS on AtheOS')
+ self.skipTest('os.statvfs() failed with ENOSYS')
# Make sure direct access works
self.assertEqual(result.f_bfree, result[3])
@@ -300,6 +318,24 @@ class StatAttributeTests(unittest.TestCase):
except TypeError:
pass
+ @unittest.skipUnless(hasattr(os, 'statvfs'),
+ "need os.statvfs()")
+ def test_statvfs_result_pickle(self):
+ try:
+ result = os.statvfs(self.fname)
+ except OSError as e:
+ # On AtheOS, glibc always returns ENOSYS
+ if e.errno == errno.ENOSYS:
+ self.skipTest('os.statvfs() failed with ENOSYS')
+
+ for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ p = pickle.dumps(result, proto)
+ self.assertIn(b'statvfs_result', p)
+ if proto < 4:
+ self.assertIn(b'cos\nstatvfs_result\n', p)
+ unpickled = pickle.loads(p)
+ self.assertEqual(result, unpickled)
+
def test_utime_dir(self):
delta = 1000000
st = os.stat(support.TESTFN)
@@ -478,9 +514,9 @@ class StatAttributeTests(unittest.TestCase):
# Verify that an open file can be stat'ed
try:
os.stat(r"c:\pagefile.sys")
- except WindowsError as e:
- if e.errno == 2: # file does not exist; cannot run test
- self.skipTest(r'c:\pagefile.sys does not exist')
+ except FileNotFoundError:
+ self.skipTest(r'c:\pagefile.sys does not exist')
+ except OSError as e:
self.fail("Could not stat pagefile.sys")
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
@@ -658,9 +694,17 @@ class EnvironTests(mapping_tests.BasicTestMappingProtocol):
class WalkTests(unittest.TestCase):
"""Tests for os.walk()."""
+ # Wrapper to hide minor differences between os.walk and os.fwalk
+ # to tests both functions with the same code base
+ def walk(self, directory, topdown=True, follow_symlinks=False):
+ walk_it = os.walk(directory,
+ topdown=topdown,
+ followlinks=follow_symlinks)
+ for root, dirs, files in walk_it:
+ yield (root, dirs, files)
+
def setUp(self):
- import os
- from os.path import join
+ join = os.path.join
# Build:
# TESTFN/
@@ -675,36 +719,39 @@ class WalkTests(unittest.TestCase):
# broken_link
# TEST2/
# tmp4 a lone file
- walk_path = join(support.TESTFN, "TEST1")
- sub1_path = join(walk_path, "SUB1")
- sub11_path = join(sub1_path, "SUB11")
- sub2_path = join(walk_path, "SUB2")
- tmp1_path = join(walk_path, "tmp1")
- tmp2_path = join(sub1_path, "tmp2")
+ self.walk_path = join(support.TESTFN, "TEST1")
+ self.sub1_path = join(self.walk_path, "SUB1")
+ self.sub11_path = join(self.sub1_path, "SUB11")
+ sub2_path = join(self.walk_path, "SUB2")
+ tmp1_path = join(self.walk_path, "tmp1")
+ tmp2_path = join(self.sub1_path, "tmp2")
tmp3_path = join(sub2_path, "tmp3")
- link_path = join(sub2_path, "link")
+ self.link_path = join(sub2_path, "link")
t2_path = join(support.TESTFN, "TEST2")
tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
- link_path = join(sub2_path, "link")
broken_link_path = join(sub2_path, "broken_link")
# Create stuff.
- os.makedirs(sub11_path)
+ os.makedirs(self.sub11_path)
os.makedirs(sub2_path)
os.makedirs(t2_path)
+
for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
f = open(path, "w")
f.write("I'm " + path + " and proud of it. Blame test_os.\n")
f.close()
+
if support.can_symlink():
- os.symlink(os.path.abspath(t2_path), link_path)
+ os.symlink(os.path.abspath(t2_path), self.link_path)
os.symlink('broken', broken_link_path, True)
- sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
+ self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
else:
- sub2_tree = (sub2_path, [], ["tmp3"])
+ self.sub2_tree = (sub2_path, [], ["tmp3"])
+ def test_walk_topdown(self):
# Walk top-down.
- all = list(os.walk(walk_path))
+ all = list(os.walk(self.walk_path))
+
self.assertEqual(len(all), 4)
# We can't know which order SUB1 and SUB2 will appear in.
# Not flipped: TESTFN, SUB1, SUB11, SUB2
@@ -712,26 +759,32 @@ class WalkTests(unittest.TestCase):
flipped = all[0][1][0] != "SUB1"
all[0][1].sort()
all[3 - 2 * flipped][-1].sort()
- self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
- self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
- self.assertEqual(all[2 + flipped], (sub11_path, [], []))
- self.assertEqual(all[3 - 2 * flipped], sub2_tree)
+ self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
+ self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
+ self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
+ self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
+ def test_walk_prune(self):
# Prune the search.
all = []
- for root, dirs, files in os.walk(walk_path):
+ for root, dirs, files in self.walk(self.walk_path):
all.append((root, dirs, files))
# Don't descend into SUB1.
if 'SUB1' in dirs:
# Note that this also mutates the dirs we appended to all!
dirs.remove('SUB1')
+
self.assertEqual(len(all), 2)
- self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
+ self.assertEqual(all[0],
+ (self.walk_path, ["SUB2"], ["tmp1"]))
+
all[1][-1].sort()
- self.assertEqual(all[1], sub2_tree)
+ self.assertEqual(all[1], self.sub2_tree)
+ def test_walk_bottom_up(self):
# Walk bottom-up.
- all = list(os.walk(walk_path, topdown=False))
+ all = list(self.walk(self.walk_path, topdown=False))
+
self.assertEqual(len(all), 4)
# We can't know which order SUB1 and SUB2 will appear in.
# Not flipped: SUB11, SUB1, SUB2, TESTFN
@@ -739,20 +792,28 @@ class WalkTests(unittest.TestCase):
flipped = all[3][1][0] != "SUB1"
all[3][1].sort()
all[2 - 2 * flipped][-1].sort()
- self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
- self.assertEqual(all[flipped], (sub11_path, [], []))
- self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
- self.assertEqual(all[2 - 2 * flipped], sub2_tree)
-
- if support.can_symlink():
- # Walk, following symlinks.
- for root, dirs, files in os.walk(walk_path, followlinks=True):
- if root == link_path:
- self.assertEqual(dirs, [])
- self.assertEqual(files, ["tmp4"])
- break
- else:
- self.fail("Didn't follow symlink with followlinks=True")
+ self.assertEqual(all[3],
+ (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
+ self.assertEqual(all[flipped],
+ (self.sub11_path, [], []))
+ self.assertEqual(all[flipped + 1],
+ (self.sub1_path, ["SUB11"], ["tmp2"]))
+ self.assertEqual(all[2 - 2 * flipped],
+ self.sub2_tree)
+
+ def test_walk_symlink(self):
+ if not support.can_symlink():
+ self.skipTest("need symlink support")
+
+ # Walk, following symlinks.
+ walk_it = self.walk(self.walk_path, follow_symlinks=True)
+ for root, dirs, files in walk_it:
+ if root == self.link_path:
+ self.assertEqual(dirs, [])
+ self.assertEqual(files, ["tmp4"])
+ break
+ else:
+ self.fail("Didn't follow symlink with followlinks=True")
def tearDown(self):
# Tear everything down. This is a decent use for bottom-up on
@@ -775,6 +836,14 @@ class WalkTests(unittest.TestCase):
class FwalkTests(WalkTests):
"""Tests for os.fwalk()."""
+ def walk(self, directory, topdown=True, follow_symlinks=False):
+ walk_it = os.fwalk(directory,
+ topdown=topdown,
+ follow_symlinks=follow_symlinks)
+ for root, dirs, files, root_fd in walk_it:
+ yield (root, dirs, files)
+
+
def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
"""
compare with walk() results.
@@ -876,6 +945,17 @@ class MakedirTests(unittest.TestCase):
os.makedirs(path, mode=mode, exist_ok=True)
os.umask(old_mask)
+ @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
+ def test_chown_uid_gid_arguments_must_be_index(self):
+ stat = os.stat(support.TESTFN)
+ uid = stat.st_uid
+ gid = stat.st_gid
+ for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
+ self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
+ self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
+ self.assertIsNone(os.chown(support.TESTFN, uid, gid))
+ self.assertIsNone(os.chown(support.TESTFN, -1, -1))
+
def test_exist_ok_s_isgid_directory(self):
path = os.path.join(support.TESTFN, 'dir1')
S_ISGID = stat.S_ISGID
@@ -1007,6 +1087,12 @@ class URandomTests(unittest.TestCase):
data2 = self.get_urandom_subprocess(16)
self.assertNotEqual(data1, data2)
+
+HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
+
+@unittest.skipIf(HAVE_GETENTROPY,
+ "getentropy() does not use a file descriptor")
+class URandomFDTests(unittest.TestCase):
@unittest.skipUnless(resource, "test requires the resource module")
def test_urandom_failure(self):
# Check urandom() failing when it is not able to open /dev/random.
@@ -1030,6 +1116,49 @@ class URandomTests(unittest.TestCase):
"""
assert_python_ok('-c', code)
+ def test_urandom_fd_closed(self):
+ # Issue #21207: urandom() should reopen its fd to /dev/urandom if
+ # closed.
+ code = """if 1:
+ import os
+ import sys
+ os.urandom(4)
+ os.closerange(3, 256)
+ sys.stdout.buffer.write(os.urandom(4))
+ """
+ rc, out, err = assert_python_ok('-Sc', code)
+
+ def test_urandom_fd_reopened(self):
+ # Issue #21207: urandom() should detect its fd to /dev/urandom
+ # changed to something else, and reopen it.
+ with open(support.TESTFN, 'wb') as f:
+ f.write(b"x" * 256)
+ self.addCleanup(os.unlink, support.TESTFN)
+ code = """if 1:
+ import os
+ import sys
+ os.urandom(4)
+ for fd in range(3, 256):
+ try:
+ os.close(fd)
+ except OSError:
+ pass
+ else:
+ # Found the urandom fd (XXX hopefully)
+ break
+ os.closerange(3, 256)
+ with open({TESTFN!r}, 'rb') as f:
+ os.dup2(f.fileno(), fd)
+ sys.stdout.buffer.write(os.urandom(4))
+ sys.stdout.buffer.write(os.urandom(4))
+ """.format(TESTFN=support.TESTFN)
+ rc, out, err = assert_python_ok('-Sc', code)
+ self.assertEqual(len(out), 8)
+ self.assertNotEqual(out[0:4], out[4:8])
+ rc, out2, err2 = assert_python_ok('-Sc', code)
+ self.assertEqual(len(out2), 8)
+ self.assertNotEqual(out2, out)
+
@contextlib.contextmanager
def _execvpe_mockup(defpath=None):
@@ -1132,27 +1261,27 @@ class ExecTests(unittest.TestCase):
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32ErrorTests(unittest.TestCase):
def test_rename(self):
- self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
+ self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
def test_remove(self):
- self.assertRaises(WindowsError, os.remove, support.TESTFN)
+ self.assertRaises(OSError, os.remove, support.TESTFN)
def test_chdir(self):
- self.assertRaises(WindowsError, os.chdir, support.TESTFN)
+ self.assertRaises(OSError, os.chdir, support.TESTFN)
def test_mkdir(self):
f = open(support.TESTFN, "w")
try:
- self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
+ self.assertRaises(OSError, os.mkdir, support.TESTFN)
finally:
f.close()
os.unlink(support.TESTFN)
def test_utime(self):
- self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
+ self.assertRaises(OSError, os.utime, support.TESTFN, None)
def test_chmod(self):
- self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
+ self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
class TestInvalidFD(unittest.TestCase):
singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
@@ -1286,41 +1415,57 @@ class PosixUidGidTests(unittest.TestCase):
@unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
def test_setuid(self):
if os.getuid() != 0:
- self.assertRaises(os.error, os.setuid, 0)
+ self.assertRaises(OSError, os.setuid, 0)
self.assertRaises(OverflowError, os.setuid, 1<<32)
@unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
def test_setgid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
- self.assertRaises(os.error, os.setgid, 0)
+ self.assertRaises(OSError, os.setgid, 0)
self.assertRaises(OverflowError, os.setgid, 1<<32)
@unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
def test_seteuid(self):
if os.getuid() != 0:
- self.assertRaises(os.error, os.seteuid, 0)
+ self.assertRaises(OSError, os.seteuid, 0)
self.assertRaises(OverflowError, os.seteuid, 1<<32)
@unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
def test_setegid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
- self.assertRaises(os.error, os.setegid, 0)
+ self.assertRaises(OSError, os.setegid, 0)
self.assertRaises(OverflowError, os.setegid, 1<<32)
@unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
def test_setreuid(self):
if os.getuid() != 0:
- self.assertRaises(os.error, os.setreuid, 0, 0)
+ self.assertRaises(OSError, os.setreuid, 0, 0)
self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
+ @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
+ def test_setreuid_neg1(self):
+ # Needs to accept -1. We run this in a subprocess to avoid
+ # altering the test runner's process state (issue8045).
+ subprocess.check_call([
+ sys.executable, '-c',
+ 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
+
@unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
def test_setregid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
- self.assertRaises(os.error, os.setregid, 0, 0)
+ self.assertRaises(OSError, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
+ @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
+ def test_setregid_neg1(self):
+ # Needs to accept -1. We run this in a subprocess to avoid
+ # altering the test runner's process state (issue8045).
+ subprocess.check_call([
+ sys.executable, '-c',
+ 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
+
@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
class Pep383Tests(unittest.TestCase):
def setUp(self):
@@ -1510,6 +1655,52 @@ class Win32KillTests(unittest.TestCase):
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
+class Win32ListdirTests(unittest.TestCase):
+ """Test listdir on Windows."""
+
+ def setUp(self):
+ self.created_paths = []
+ for i in range(2):
+ dir_name = 'SUB%d' % i
+ dir_path = os.path.join(support.TESTFN, dir_name)
+ file_name = 'FILE%d' % i
+ file_path = os.path.join(support.TESTFN, file_name)
+ os.makedirs(dir_path)
+ with open(file_path, 'w') as f:
+ f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
+ self.created_paths.extend([dir_name, file_name])
+ self.created_paths.sort()
+
+ def tearDown(self):
+ shutil.rmtree(support.TESTFN)
+
+ def test_listdir_no_extended_path(self):
+ """Test when the path is not an "extended" path."""
+ # unicode
+ self.assertEqual(
+ sorted(os.listdir(support.TESTFN)),
+ self.created_paths)
+ # bytes
+ self.assertEqual(
+ sorted(os.listdir(os.fsencode(support.TESTFN))),
+ [os.fsencode(path) for path in self.created_paths])
+
+ def test_listdir_extended_path(self):
+ """Test when the path starts with '\\\\?\\'."""
+ # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
+ # unicode
+ path = '\\\\?\\' + os.path.abspath(support.TESTFN)
+ self.assertEqual(
+ sorted(os.listdir(path)),
+ self.created_paths)
+ # bytes
+ path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
+ self.assertEqual(
+ sorted(os.listdir(path)),
+ [os.fsencode(path) for path in self.created_paths])
+
+
+@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
@support.skip_unless_symlink
class Win32SymlinkTests(unittest.TestCase):
filelink = 'filelinktest'
@@ -2174,6 +2365,197 @@ class TermsizeTests(unittest.TestCase):
self.assertEqual(expected, actual)
+class OSErrorTests(unittest.TestCase):
+ def setUp(self):
+ class Str(str):
+ pass
+
+ self.bytes_filenames = []
+ self.unicode_filenames = []
+ if support.TESTFN_UNENCODABLE is not None:
+ decoded = support.TESTFN_UNENCODABLE
+ else:
+ decoded = support.TESTFN
+ self.unicode_filenames.append(decoded)
+ self.unicode_filenames.append(Str(decoded))
+ if support.TESTFN_UNDECODABLE is not None:
+ encoded = support.TESTFN_UNDECODABLE
+ else:
+ encoded = os.fsencode(support.TESTFN)
+ self.bytes_filenames.append(encoded)
+ self.bytes_filenames.append(memoryview(encoded))
+
+ self.filenames = self.bytes_filenames + self.unicode_filenames
+
+ def test_oserror_filename(self):
+ funcs = [
+ (self.filenames, os.chdir,),
+ (self.filenames, os.chmod, 0o777),
+ (self.filenames, os.lstat,),
+ (self.filenames, os.open, os.O_RDONLY),
+ (self.filenames, os.rmdir,),
+ (self.filenames, os.stat,),
+ (self.filenames, os.unlink,),
+ ]
+ if sys.platform == "win32":
+ funcs.extend((
+ (self.bytes_filenames, os.rename, b"dst"),
+ (self.bytes_filenames, os.replace, b"dst"),
+ (self.unicode_filenames, os.rename, "dst"),
+ (self.unicode_filenames, os.replace, "dst"),
+ # Issue #16414: Don't test undecodable names with listdir()
+ # because of a Windows bug.
+ #
+ # With the ANSI code page 932, os.listdir(b'\xe7') return an
+ # empty list (instead of failing), whereas os.listdir(b'\xff')
+ # raises a FileNotFoundError. It looks like a Windows bug:
+ # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
+ # fails with ERROR_FILE_NOT_FOUND (2), instead of
+ # ERROR_PATH_NOT_FOUND (3).
+ (self.unicode_filenames, os.listdir,),
+ ))
+ else:
+ funcs.extend((
+ (self.filenames, os.listdir,),
+ (self.filenames, os.rename, "dst"),
+ (self.filenames, os.replace, "dst"),
+ ))
+ if hasattr(os, "chown"):
+ funcs.append((self.filenames, os.chown, 0, 0))
+ if hasattr(os, "lchown"):
+ funcs.append((self.filenames, os.lchown, 0, 0))
+ if hasattr(os, "truncate"):
+ funcs.append((self.filenames, os.truncate, 0))
+ if hasattr(os, "chflags"):
+ funcs.append((self.filenames, os.chflags, 0))
+ if hasattr(os, "lchflags"):
+ funcs.append((self.filenames, os.lchflags, 0))
+ if hasattr(os, "chroot"):
+ funcs.append((self.filenames, os.chroot,))
+ if hasattr(os, "link"):
+ if sys.platform == "win32":
+ funcs.append((self.bytes_filenames, os.link, b"dst"))
+ funcs.append((self.unicode_filenames, os.link, "dst"))
+ else:
+ funcs.append((self.filenames, os.link, "dst"))
+ if hasattr(os, "listxattr"):
+ funcs.extend((
+ (self.filenames, os.listxattr,),
+ (self.filenames, os.getxattr, "user.test"),
+ (self.filenames, os.setxattr, "user.test", b'user'),
+ (self.filenames, os.removexattr, "user.test"),
+ ))
+ if hasattr(os, "lchmod"):
+ funcs.append((self.filenames, os.lchmod, 0o777))
+ if hasattr(os, "readlink"):
+ if sys.platform == "win32":
+ funcs.append((self.unicode_filenames, os.readlink,))
+ else:
+ funcs.append((self.filenames, os.readlink,))
+
+ for filenames, func, *func_args in funcs:
+ for name in filenames:
+ try:
+ func(name, *func_args)
+ except OSError as err:
+ self.assertIs(err.filename, name)
+ else:
+ self.fail("No exception thrown by {}".format(func))
+
+class CPUCountTests(unittest.TestCase):
+ def test_cpu_count(self):
+ cpus = os.cpu_count()
+ if cpus is not None:
+ self.assertIsInstance(cpus, int)
+ self.assertGreater(cpus, 0)
+ else:
+ self.skipTest("Could not determine the number of CPUs")
+
+
+class FDInheritanceTests(unittest.TestCase):
+ def test_get_set_inheritable(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(os.get_inheritable(fd), False)
+
+ os.set_inheritable(fd, True)
+ self.assertEqual(os.get_inheritable(fd), True)
+
+ @unittest.skipIf(fcntl is None, "need fcntl")
+ def test_get_inheritable_cloexec(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(os.get_inheritable(fd), False)
+
+ # clear FD_CLOEXEC flag
+ flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+ flags &= ~fcntl.FD_CLOEXEC
+ fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+
+ self.assertEqual(os.get_inheritable(fd), True)
+
+ @unittest.skipIf(fcntl is None, "need fcntl")
+ def test_set_inheritable_cloexec(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
+ fcntl.FD_CLOEXEC)
+
+ os.set_inheritable(fd, True)
+ self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
+ 0)
+
+ def test_open(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(os.get_inheritable(fd), False)
+
+ @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
+ def test_pipe(self):
+ rfd, wfd = os.pipe()
+ self.addCleanup(os.close, rfd)
+ self.addCleanup(os.close, wfd)
+ self.assertEqual(os.get_inheritable(rfd), False)
+ self.assertEqual(os.get_inheritable(wfd), False)
+
+ def test_dup(self):
+ fd1 = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd1)
+
+ fd2 = os.dup(fd1)
+ self.addCleanup(os.close, fd2)
+ self.assertEqual(os.get_inheritable(fd2), False)
+
+ @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
+ def test_dup2(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+
+ # inheritable by default
+ fd2 = os.open(__file__, os.O_RDONLY)
+ try:
+ os.dup2(fd, fd2)
+ self.assertEqual(os.get_inheritable(fd2), True)
+ finally:
+ os.close(fd2)
+
+ # force non-inheritable
+ fd3 = os.open(__file__, os.O_RDONLY)
+ try:
+ os.dup2(fd, fd3, inheritable=False)
+ self.assertEqual(os.get_inheritable(fd3), False)
+ finally:
+ os.close(fd3)
+
+ @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
+ def test_openpty(self):
+ master_fd, slave_fd = os.openpty()
+ self.addCleanup(os.close, master_fd)
+ self.addCleanup(os.close, slave_fd)
+ self.assertEqual(os.get_inheritable(master_fd), False)
+ self.assertEqual(os.get_inheritable(slave_fd), False)
+
+
@support.reap_threads
def test_main():
support.run_unittest(
@@ -2185,12 +2567,14 @@ def test_main():
MakedirTests,
DevNullTests,
URandomTests,
+ URandomFDTests,
ExecTests,
Win32ErrorTests,
TestInvalidFD,
PosixUidGidTests,
Pep383Tests,
Win32KillTests,
+ Win32ListdirTests,
Win32SymlinkTests,
NonLocalSymlinkTests,
FSEncodingTests,
@@ -2203,7 +2587,10 @@ def test_main():
ExtendedAttributeTests,
Win32DeprecatedBytesAPI,
TermsizeTests,
+ OSErrorTests,
RemoveDirsTests,
+ CPUCountTests,
+ FDInheritanceTests,
)
if __name__ == "__main__":