diff options
Diffstat (limited to 'Lib/test/test_os.py')
| -rw-r--r-- | Lib/test/test_os.py | 493 |
1 files changed, 440 insertions, 53 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 56309bf..31f2cc3 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -24,6 +24,10 @@ import itertools import stat import locale import codecs +import decimal +import fractions +import pickle +import sysconfig try: import threading except ImportError: @@ -32,6 +36,10 @@ try: import resource except ImportError: resource = None +try: + import fcntl +except ImportError: + fcntl = None from test.script_helper import assert_python_ok @@ -256,6 +264,16 @@ class StatAttributeTests(unittest.TestCase): warnings.simplefilter("ignore", DeprecationWarning) self.check_stat_attributes(fname) + def test_stat_result_pickle(self): + result = os.stat(self.fname) + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + p = pickle.dumps(result, proto) + self.assertIn(b'stat_result', p) + if proto < 4: + self.assertIn(b'cos\nstat_result\n', p) + unpickled = pickle.loads(p) + self.assertEqual(result, unpickled) + @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') def test_statvfs_attributes(self): try: @@ -263,7 +281,7 @@ class StatAttributeTests(unittest.TestCase): except OSError as e: # On AtheOS, glibc always returns ENOSYS if e.errno == errno.ENOSYS: - self.skipTest('glibc always returns ENOSYS on AtheOS') + self.skipTest('os.statvfs() failed with ENOSYS') # Make sure direct access works self.assertEqual(result.f_bfree, result[3]) @@ -300,6 +318,24 @@ class StatAttributeTests(unittest.TestCase): except TypeError: pass + @unittest.skipUnless(hasattr(os, 'statvfs'), + "need os.statvfs()") + def test_statvfs_result_pickle(self): + try: + result = os.statvfs(self.fname) + except OSError as e: + # On AtheOS, glibc always returns ENOSYS + if e.errno == errno.ENOSYS: + self.skipTest('os.statvfs() failed with ENOSYS') + + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + p = pickle.dumps(result, proto) + self.assertIn(b'statvfs_result', p) + if proto < 4: + self.assertIn(b'cos\nstatvfs_result\n', p) + unpickled = pickle.loads(p) + self.assertEqual(result, unpickled) + def test_utime_dir(self): delta = 1000000 st = os.stat(support.TESTFN) @@ -478,9 +514,9 @@ class StatAttributeTests(unittest.TestCase): # Verify that an open file can be stat'ed try: os.stat(r"c:\pagefile.sys") - except WindowsError as e: - if e.errno == 2: # file does not exist; cannot run test - self.skipTest(r'c:\pagefile.sys does not exist') + except FileNotFoundError: + self.skipTest(r'c:\pagefile.sys does not exist') + except OSError as e: self.fail("Could not stat pagefile.sys") @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") @@ -658,9 +694,17 @@ class EnvironTests(mapping_tests.BasicTestMappingProtocol): class WalkTests(unittest.TestCase): """Tests for os.walk().""" + # Wrapper to hide minor differences between os.walk and os.fwalk + # to tests both functions with the same code base + def walk(self, directory, topdown=True, follow_symlinks=False): + walk_it = os.walk(directory, + topdown=topdown, + followlinks=follow_symlinks) + for root, dirs, files in walk_it: + yield (root, dirs, files) + def setUp(self): - import os - from os.path import join + join = os.path.join # Build: # TESTFN/ @@ -675,36 +719,39 @@ class WalkTests(unittest.TestCase): # broken_link # TEST2/ # tmp4 a lone file - walk_path = join(support.TESTFN, "TEST1") - sub1_path = join(walk_path, "SUB1") - sub11_path = join(sub1_path, "SUB11") - sub2_path = join(walk_path, "SUB2") - tmp1_path = join(walk_path, "tmp1") - tmp2_path = join(sub1_path, "tmp2") + self.walk_path = join(support.TESTFN, "TEST1") + self.sub1_path = join(self.walk_path, "SUB1") + self.sub11_path = join(self.sub1_path, "SUB11") + sub2_path = join(self.walk_path, "SUB2") + tmp1_path = join(self.walk_path, "tmp1") + tmp2_path = join(self.sub1_path, "tmp2") tmp3_path = join(sub2_path, "tmp3") - link_path = join(sub2_path, "link") + self.link_path = join(sub2_path, "link") t2_path = join(support.TESTFN, "TEST2") tmp4_path = join(support.TESTFN, "TEST2", "tmp4") - link_path = join(sub2_path, "link") broken_link_path = join(sub2_path, "broken_link") # Create stuff. - os.makedirs(sub11_path) + os.makedirs(self.sub11_path) os.makedirs(sub2_path) os.makedirs(t2_path) + for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path: f = open(path, "w") f.write("I'm " + path + " and proud of it. Blame test_os.\n") f.close() + if support.can_symlink(): - os.symlink(os.path.abspath(t2_path), link_path) + os.symlink(os.path.abspath(t2_path), self.link_path) os.symlink('broken', broken_link_path, True) - sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"]) + self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"]) else: - sub2_tree = (sub2_path, [], ["tmp3"]) + self.sub2_tree = (sub2_path, [], ["tmp3"]) + def test_walk_topdown(self): # Walk top-down. - all = list(os.walk(walk_path)) + all = list(os.walk(self.walk_path)) + self.assertEqual(len(all), 4) # We can't know which order SUB1 and SUB2 will appear in. # Not flipped: TESTFN, SUB1, SUB11, SUB2 @@ -712,26 +759,32 @@ class WalkTests(unittest.TestCase): flipped = all[0][1][0] != "SUB1" all[0][1].sort() all[3 - 2 * flipped][-1].sort() - self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"])) - self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"])) - self.assertEqual(all[2 + flipped], (sub11_path, [], [])) - self.assertEqual(all[3 - 2 * flipped], sub2_tree) + self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) + self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"])) + self.assertEqual(all[2 + flipped], (self.sub11_path, [], [])) + self.assertEqual(all[3 - 2 * flipped], self.sub2_tree) + def test_walk_prune(self): # Prune the search. all = [] - for root, dirs, files in os.walk(walk_path): + for root, dirs, files in self.walk(self.walk_path): all.append((root, dirs, files)) # Don't descend into SUB1. if 'SUB1' in dirs: # Note that this also mutates the dirs we appended to all! dirs.remove('SUB1') + self.assertEqual(len(all), 2) - self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"])) + self.assertEqual(all[0], + (self.walk_path, ["SUB2"], ["tmp1"])) + all[1][-1].sort() - self.assertEqual(all[1], sub2_tree) + self.assertEqual(all[1], self.sub2_tree) + def test_walk_bottom_up(self): # Walk bottom-up. - all = list(os.walk(walk_path, topdown=False)) + all = list(self.walk(self.walk_path, topdown=False)) + self.assertEqual(len(all), 4) # We can't know which order SUB1 and SUB2 will appear in. # Not flipped: SUB11, SUB1, SUB2, TESTFN @@ -739,20 +792,28 @@ class WalkTests(unittest.TestCase): flipped = all[3][1][0] != "SUB1" all[3][1].sort() all[2 - 2 * flipped][-1].sort() - self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"])) - self.assertEqual(all[flipped], (sub11_path, [], [])) - self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"])) - self.assertEqual(all[2 - 2 * flipped], sub2_tree) - - if support.can_symlink(): - # Walk, following symlinks. - for root, dirs, files in os.walk(walk_path, followlinks=True): - if root == link_path: - self.assertEqual(dirs, []) - self.assertEqual(files, ["tmp4"]) - break - else: - self.fail("Didn't follow symlink with followlinks=True") + self.assertEqual(all[3], + (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) + self.assertEqual(all[flipped], + (self.sub11_path, [], [])) + self.assertEqual(all[flipped + 1], + (self.sub1_path, ["SUB11"], ["tmp2"])) + self.assertEqual(all[2 - 2 * flipped], + self.sub2_tree) + + def test_walk_symlink(self): + if not support.can_symlink(): + self.skipTest("need symlink support") + + # Walk, following symlinks. + walk_it = self.walk(self.walk_path, follow_symlinks=True) + for root, dirs, files in walk_it: + if root == self.link_path: + self.assertEqual(dirs, []) + self.assertEqual(files, ["tmp4"]) + break + else: + self.fail("Didn't follow symlink with followlinks=True") def tearDown(self): # Tear everything down. This is a decent use for bottom-up on @@ -775,6 +836,14 @@ class WalkTests(unittest.TestCase): class FwalkTests(WalkTests): """Tests for os.fwalk().""" + def walk(self, directory, topdown=True, follow_symlinks=False): + walk_it = os.fwalk(directory, + topdown=topdown, + follow_symlinks=follow_symlinks) + for root, dirs, files, root_fd in walk_it: + yield (root, dirs, files) + + def _compare_to_walk(self, walk_kwargs, fwalk_kwargs): """ compare with walk() results. @@ -876,6 +945,17 @@ class MakedirTests(unittest.TestCase): os.makedirs(path, mode=mode, exist_ok=True) os.umask(old_mask) + @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown') + def test_chown_uid_gid_arguments_must_be_index(self): + stat = os.stat(support.TESTFN) + uid = stat.st_uid + gid = stat.st_gid + for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): + self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) + self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) + self.assertIsNone(os.chown(support.TESTFN, uid, gid)) + self.assertIsNone(os.chown(support.TESTFN, -1, -1)) + def test_exist_ok_s_isgid_directory(self): path = os.path.join(support.TESTFN, 'dir1') S_ISGID = stat.S_ISGID @@ -1007,6 +1087,12 @@ class URandomTests(unittest.TestCase): data2 = self.get_urandom_subprocess(16) self.assertNotEqual(data1, data2) + +HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1) + +@unittest.skipIf(HAVE_GETENTROPY, + "getentropy() does not use a file descriptor") +class URandomFDTests(unittest.TestCase): @unittest.skipUnless(resource, "test requires the resource module") def test_urandom_failure(self): # Check urandom() failing when it is not able to open /dev/random. @@ -1030,6 +1116,49 @@ class URandomTests(unittest.TestCase): """ assert_python_ok('-c', code) + def test_urandom_fd_closed(self): + # Issue #21207: urandom() should reopen its fd to /dev/urandom if + # closed. + code = """if 1: + import os + import sys + os.urandom(4) + os.closerange(3, 256) + sys.stdout.buffer.write(os.urandom(4)) + """ + rc, out, err = assert_python_ok('-Sc', code) + + def test_urandom_fd_reopened(self): + # Issue #21207: urandom() should detect its fd to /dev/urandom + # changed to something else, and reopen it. + with open(support.TESTFN, 'wb') as f: + f.write(b"x" * 256) + self.addCleanup(os.unlink, support.TESTFN) + code = """if 1: + import os + import sys + os.urandom(4) + for fd in range(3, 256): + try: + os.close(fd) + except OSError: + pass + else: + # Found the urandom fd (XXX hopefully) + break + os.closerange(3, 256) + with open({TESTFN!r}, 'rb') as f: + os.dup2(f.fileno(), fd) + sys.stdout.buffer.write(os.urandom(4)) + sys.stdout.buffer.write(os.urandom(4)) + """.format(TESTFN=support.TESTFN) + rc, out, err = assert_python_ok('-Sc', code) + self.assertEqual(len(out), 8) + self.assertNotEqual(out[0:4], out[4:8]) + rc, out2, err2 = assert_python_ok('-Sc', code) + self.assertEqual(len(out2), 8) + self.assertNotEqual(out2, out) + @contextlib.contextmanager def _execvpe_mockup(defpath=None): @@ -1132,27 +1261,27 @@ class ExecTests(unittest.TestCase): @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") class Win32ErrorTests(unittest.TestCase): def test_rename(self): - self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak") + self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak") def test_remove(self): - self.assertRaises(WindowsError, os.remove, support.TESTFN) + self.assertRaises(OSError, os.remove, support.TESTFN) def test_chdir(self): - self.assertRaises(WindowsError, os.chdir, support.TESTFN) + self.assertRaises(OSError, os.chdir, support.TESTFN) def test_mkdir(self): f = open(support.TESTFN, "w") try: - self.assertRaises(WindowsError, os.mkdir, support.TESTFN) + self.assertRaises(OSError, os.mkdir, support.TESTFN) finally: f.close() os.unlink(support.TESTFN) def test_utime(self): - self.assertRaises(WindowsError, os.utime, support.TESTFN, None) + self.assertRaises(OSError, os.utime, support.TESTFN, None) def test_chmod(self): - self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0) + self.assertRaises(OSError, os.chmod, support.TESTFN, 0) class TestInvalidFD(unittest.TestCase): singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat", @@ -1286,41 +1415,57 @@ class PosixUidGidTests(unittest.TestCase): @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') def test_setuid(self): if os.getuid() != 0: - self.assertRaises(os.error, os.setuid, 0) + self.assertRaises(OSError, os.setuid, 0) self.assertRaises(OverflowError, os.setuid, 1<<32) @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') def test_setgid(self): if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(os.error, os.setgid, 0) + self.assertRaises(OSError, os.setgid, 0) self.assertRaises(OverflowError, os.setgid, 1<<32) @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') def test_seteuid(self): if os.getuid() != 0: - self.assertRaises(os.error, os.seteuid, 0) + self.assertRaises(OSError, os.seteuid, 0) self.assertRaises(OverflowError, os.seteuid, 1<<32) @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') def test_setegid(self): if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(os.error, os.setegid, 0) + self.assertRaises(OSError, os.setegid, 0) self.assertRaises(OverflowError, os.setegid, 1<<32) @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') def test_setreuid(self): if os.getuid() != 0: - self.assertRaises(os.error, os.setreuid, 0, 0) + self.assertRaises(OSError, os.setreuid, 0, 0) self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) + @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') + def test_setreuid_neg1(self): + # Needs to accept -1. We run this in a subprocess to avoid + # altering the test runner's process state (issue8045). + subprocess.check_call([ + sys.executable, '-c', + 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) + @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') def test_setregid(self): if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(os.error, os.setregid, 0, 0) + self.assertRaises(OSError, os.setregid, 0, 0) self.assertRaises(OverflowError, os.setregid, 1<<32, 0) self.assertRaises(OverflowError, os.setregid, 0, 1<<32) + @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') + def test_setregid_neg1(self): + # Needs to accept -1. We run this in a subprocess to avoid + # altering the test runner's process state (issue8045). + subprocess.check_call([ + sys.executable, '-c', + 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) + @unittest.skipIf(sys.platform == "win32", "Posix specific tests") class Pep383Tests(unittest.TestCase): def setUp(self): @@ -1510,6 +1655,52 @@ class Win32KillTests(unittest.TestCase): @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") +class Win32ListdirTests(unittest.TestCase): + """Test listdir on Windows.""" + + def setUp(self): + self.created_paths = [] + for i in range(2): + dir_name = 'SUB%d' % i + dir_path = os.path.join(support.TESTFN, dir_name) + file_name = 'FILE%d' % i + file_path = os.path.join(support.TESTFN, file_name) + os.makedirs(dir_path) + with open(file_path, 'w') as f: + f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) + self.created_paths.extend([dir_name, file_name]) + self.created_paths.sort() + + def tearDown(self): + shutil.rmtree(support.TESTFN) + + def test_listdir_no_extended_path(self): + """Test when the path is not an "extended" path.""" + # unicode + self.assertEqual( + sorted(os.listdir(support.TESTFN)), + self.created_paths) + # bytes + self.assertEqual( + sorted(os.listdir(os.fsencode(support.TESTFN))), + [os.fsencode(path) for path in self.created_paths]) + + def test_listdir_extended_path(self): + """Test when the path starts with '\\\\?\\'.""" + # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath + # unicode + path = '\\\\?\\' + os.path.abspath(support.TESTFN) + self.assertEqual( + sorted(os.listdir(path)), + self.created_paths) + # bytes + path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN)) + self.assertEqual( + sorted(os.listdir(path)), + [os.fsencode(path) for path in self.created_paths]) + + +@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") @support.skip_unless_symlink class Win32SymlinkTests(unittest.TestCase): filelink = 'filelinktest' @@ -2174,6 +2365,197 @@ class TermsizeTests(unittest.TestCase): self.assertEqual(expected, actual) +class OSErrorTests(unittest.TestCase): + def setUp(self): + class Str(str): + pass + + self.bytes_filenames = [] + self.unicode_filenames = [] + if support.TESTFN_UNENCODABLE is not None: + decoded = support.TESTFN_UNENCODABLE + else: + decoded = support.TESTFN + self.unicode_filenames.append(decoded) + self.unicode_filenames.append(Str(decoded)) + if support.TESTFN_UNDECODABLE is not None: + encoded = support.TESTFN_UNDECODABLE + else: + encoded = os.fsencode(support.TESTFN) + self.bytes_filenames.append(encoded) + self.bytes_filenames.append(memoryview(encoded)) + + self.filenames = self.bytes_filenames + self.unicode_filenames + + def test_oserror_filename(self): + funcs = [ + (self.filenames, os.chdir,), + (self.filenames, os.chmod, 0o777), + (self.filenames, os.lstat,), + (self.filenames, os.open, os.O_RDONLY), + (self.filenames, os.rmdir,), + (self.filenames, os.stat,), + (self.filenames, os.unlink,), + ] + if sys.platform == "win32": + funcs.extend(( + (self.bytes_filenames, os.rename, b"dst"), + (self.bytes_filenames, os.replace, b"dst"), + (self.unicode_filenames, os.rename, "dst"), + (self.unicode_filenames, os.replace, "dst"), + # Issue #16414: Don't test undecodable names with listdir() + # because of a Windows bug. + # + # With the ANSI code page 932, os.listdir(b'\xe7') return an + # empty list (instead of failing), whereas os.listdir(b'\xff') + # raises a FileNotFoundError. It looks like a Windows bug: + # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7') + # fails with ERROR_FILE_NOT_FOUND (2), instead of + # ERROR_PATH_NOT_FOUND (3). + (self.unicode_filenames, os.listdir,), + )) + else: + funcs.extend(( + (self.filenames, os.listdir,), + (self.filenames, os.rename, "dst"), + (self.filenames, os.replace, "dst"), + )) + if hasattr(os, "chown"): + funcs.append((self.filenames, os.chown, 0, 0)) + if hasattr(os, "lchown"): + funcs.append((self.filenames, os.lchown, 0, 0)) + if hasattr(os, "truncate"): + funcs.append((self.filenames, os.truncate, 0)) + if hasattr(os, "chflags"): + funcs.append((self.filenames, os.chflags, 0)) + if hasattr(os, "lchflags"): + funcs.append((self.filenames, os.lchflags, 0)) + if hasattr(os, "chroot"): + funcs.append((self.filenames, os.chroot,)) + if hasattr(os, "link"): + if sys.platform == "win32": + funcs.append((self.bytes_filenames, os.link, b"dst")) + funcs.append((self.unicode_filenames, os.link, "dst")) + else: + funcs.append((self.filenames, os.link, "dst")) + if hasattr(os, "listxattr"): + funcs.extend(( + (self.filenames, os.listxattr,), + (self.filenames, os.getxattr, "user.test"), + (self.filenames, os.setxattr, "user.test", b'user'), + (self.filenames, os.removexattr, "user.test"), + )) + if hasattr(os, "lchmod"): + funcs.append((self.filenames, os.lchmod, 0o777)) + if hasattr(os, "readlink"): + if sys.platform == "win32": + funcs.append((self.unicode_filenames, os.readlink,)) + else: + funcs.append((self.filenames, os.readlink,)) + + for filenames, func, *func_args in funcs: + for name in filenames: + try: + func(name, *func_args) + except OSError as err: + self.assertIs(err.filename, name) + else: + self.fail("No exception thrown by {}".format(func)) + +class CPUCountTests(unittest.TestCase): + def test_cpu_count(self): + cpus = os.cpu_count() + if cpus is not None: + self.assertIsInstance(cpus, int) + self.assertGreater(cpus, 0) + else: + self.skipTest("Could not determine the number of CPUs") + + +class FDInheritanceTests(unittest.TestCase): + def test_get_set_inheritable(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + self.assertEqual(os.get_inheritable(fd), False) + + os.set_inheritable(fd, True) + self.assertEqual(os.get_inheritable(fd), True) + + @unittest.skipIf(fcntl is None, "need fcntl") + def test_get_inheritable_cloexec(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + self.assertEqual(os.get_inheritable(fd), False) + + # clear FD_CLOEXEC flag + flags = fcntl.fcntl(fd, fcntl.F_GETFD) + flags &= ~fcntl.FD_CLOEXEC + fcntl.fcntl(fd, fcntl.F_SETFD, flags) + + self.assertEqual(os.get_inheritable(fd), True) + + @unittest.skipIf(fcntl is None, "need fcntl") + def test_set_inheritable_cloexec(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, + fcntl.FD_CLOEXEC) + + os.set_inheritable(fd, True) + self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, + 0) + + def test_open(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + self.assertEqual(os.get_inheritable(fd), False) + + @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()") + def test_pipe(self): + rfd, wfd = os.pipe() + self.addCleanup(os.close, rfd) + self.addCleanup(os.close, wfd) + self.assertEqual(os.get_inheritable(rfd), False) + self.assertEqual(os.get_inheritable(wfd), False) + + def test_dup(self): + fd1 = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd1) + + fd2 = os.dup(fd1) + self.addCleanup(os.close, fd2) + self.assertEqual(os.get_inheritable(fd2), False) + + @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()") + def test_dup2(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + + # inheritable by default + fd2 = os.open(__file__, os.O_RDONLY) + try: + os.dup2(fd, fd2) + self.assertEqual(os.get_inheritable(fd2), True) + finally: + os.close(fd2) + + # force non-inheritable + fd3 = os.open(__file__, os.O_RDONLY) + try: + os.dup2(fd, fd3, inheritable=False) + self.assertEqual(os.get_inheritable(fd3), False) + finally: + os.close(fd3) + + @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") + def test_openpty(self): + master_fd, slave_fd = os.openpty() + self.addCleanup(os.close, master_fd) + self.addCleanup(os.close, slave_fd) + self.assertEqual(os.get_inheritable(master_fd), False) + self.assertEqual(os.get_inheritable(slave_fd), False) + + @support.reap_threads def test_main(): support.run_unittest( @@ -2185,12 +2567,14 @@ def test_main(): MakedirTests, DevNullTests, URandomTests, + URandomFDTests, ExecTests, Win32ErrorTests, TestInvalidFD, PosixUidGidTests, Pep383Tests, Win32KillTests, + Win32ListdirTests, Win32SymlinkTests, NonLocalSymlinkTests, FSEncodingTests, @@ -2203,7 +2587,10 @@ def test_main(): ExtendedAttributeTests, Win32DeprecatedBytesAPI, TermsizeTests, + OSErrorTests, RemoveDirsTests, + CPUCountTests, + FDInheritanceTests, ) if __name__ == "__main__": |
