diff options
Diffstat (limited to 'Lib/test/test_os.py')
| -rw-r--r-- | Lib/test/test_os.py | 329 | 
1 files changed, 313 insertions, 16 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 56309bf..f744815 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -24,6 +24,9 @@ import itertools  import stat  import locale  import codecs +import decimal +import fractions +import pickle  try:      import threading  except ImportError: @@ -32,6 +35,10 @@ try:      import resource  except ImportError:      resource = None +try: +    import fcntl +except ImportError: +    fcntl = None  from test.script_helper import assert_python_ok @@ -256,6 +263,13 @@ class StatAttributeTests(unittest.TestCase):              warnings.simplefilter("ignore", DeprecationWarning)              self.check_stat_attributes(fname) +    def test_stat_result_pickle(self): +        result = os.stat(self.fname) +        p = pickle.dumps(result) +        self.assertIn(b'\x03cos\nstat_result\n', p) +        unpickled = pickle.loads(p) +        self.assertEqual(result, unpickled) +      @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')      def test_statvfs_attributes(self):          try: @@ -263,7 +277,7 @@ class StatAttributeTests(unittest.TestCase):          except OSError as e:              # On AtheOS, glibc always returns ENOSYS              if e.errno == errno.ENOSYS: -                self.skipTest('glibc always returns ENOSYS on AtheOS') +                self.skipTest('os.statvfs() failed with ENOSYS')          # Make sure direct access works          self.assertEqual(result.f_bfree, result[3]) @@ -300,6 +314,21 @@ class StatAttributeTests(unittest.TestCase):          except TypeError:              pass +    @unittest.skipUnless(hasattr(os, 'statvfs'), +                         "need os.statvfs()") +    def test_statvfs_result_pickle(self): +        try: +            result = os.statvfs(self.fname) +        except OSError as e: +            # On AtheOS, glibc always returns ENOSYS +            if e.errno == errno.ENOSYS: +                self.skipTest('os.statvfs() failed with ENOSYS') + +        p = pickle.dumps(result) +        self.assertIn(b'\x03cos\nstatvfs_result\n', p) +        unpickled = pickle.loads(p) +        self.assertEqual(result, unpickled) +      def test_utime_dir(self):          delta = 1000000          st = os.stat(support.TESTFN) @@ -478,9 +507,9 @@ class StatAttributeTests(unittest.TestCase):          # Verify that an open file can be stat'ed          try:              os.stat(r"c:\pagefile.sys") -        except WindowsError as e: -            if e.errno == 2: # file does not exist; cannot run test -                self.skipTest(r'c:\pagefile.sys does not exist') +        except FileNotFoundError: +            self.skipTest(r'c:\pagefile.sys does not exist') +        except OSError as e:              self.fail("Could not stat pagefile.sys")      @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") @@ -876,6 +905,17 @@ class MakedirTests(unittest.TestCase):          os.makedirs(path, mode=mode, exist_ok=True)          os.umask(old_mask) +    @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown') +    def test_chown_uid_gid_arguments_must_be_index(self): +        stat = os.stat(support.TESTFN) +        uid = stat.st_uid +        gid = stat.st_gid +        for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): +            self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) +            self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) +        self.assertIsNone(os.chown(support.TESTFN, uid, gid)) +        self.assertIsNone(os.chown(support.TESTFN, -1, -1)) +      def test_exist_ok_s_isgid_directory(self):          path = os.path.join(support.TESTFN, 'dir1')          S_ISGID = stat.S_ISGID @@ -1132,27 +1172,27 @@ class ExecTests(unittest.TestCase):  @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")  class Win32ErrorTests(unittest.TestCase):      def test_rename(self): -        self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak") +        self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")      def test_remove(self): -        self.assertRaises(WindowsError, os.remove, support.TESTFN) +        self.assertRaises(OSError, os.remove, support.TESTFN)      def test_chdir(self): -        self.assertRaises(WindowsError, os.chdir, support.TESTFN) +        self.assertRaises(OSError, os.chdir, support.TESTFN)      def test_mkdir(self):          f = open(support.TESTFN, "w")          try: -            self.assertRaises(WindowsError, os.mkdir, support.TESTFN) +            self.assertRaises(OSError, os.mkdir, support.TESTFN)          finally:              f.close()              os.unlink(support.TESTFN)      def test_utime(self): -        self.assertRaises(WindowsError, os.utime, support.TESTFN, None) +        self.assertRaises(OSError, os.utime, support.TESTFN, None)      def test_chmod(self): -        self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0) +        self.assertRaises(OSError, os.chmod, support.TESTFN, 0)  class TestInvalidFD(unittest.TestCase):      singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat", @@ -1286,41 +1326,57 @@ class PosixUidGidTests(unittest.TestCase):      @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')      def test_setuid(self):          if os.getuid() != 0: -            self.assertRaises(os.error, os.setuid, 0) +            self.assertRaises(OSError, os.setuid, 0)          self.assertRaises(OverflowError, os.setuid, 1<<32)      @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')      def test_setgid(self):          if os.getuid() != 0 and not HAVE_WHEEL_GROUP: -            self.assertRaises(os.error, os.setgid, 0) +            self.assertRaises(OSError, os.setgid, 0)          self.assertRaises(OverflowError, os.setgid, 1<<32)      @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')      def test_seteuid(self):          if os.getuid() != 0: -            self.assertRaises(os.error, os.seteuid, 0) +            self.assertRaises(OSError, os.seteuid, 0)          self.assertRaises(OverflowError, os.seteuid, 1<<32)      @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')      def test_setegid(self):          if os.getuid() != 0 and not HAVE_WHEEL_GROUP: -            self.assertRaises(os.error, os.setegid, 0) +            self.assertRaises(OSError, os.setegid, 0)          self.assertRaises(OverflowError, os.setegid, 1<<32)      @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')      def test_setreuid(self):          if os.getuid() != 0: -            self.assertRaises(os.error, os.setreuid, 0, 0) +            self.assertRaises(OSError, os.setreuid, 0, 0)          self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)          self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) +    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') +    def test_setreuid_neg1(self): +        # Needs to accept -1.  We run this in a subprocess to avoid +        # altering the test runner's process state (issue8045). +        subprocess.check_call([ +                sys.executable, '-c', +                'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) +      @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')      def test_setregid(self):          if os.getuid() != 0 and not HAVE_WHEEL_GROUP: -            self.assertRaises(os.error, os.setregid, 0, 0) +            self.assertRaises(OSError, os.setregid, 0, 0)          self.assertRaises(OverflowError, os.setregid, 1<<32, 0)          self.assertRaises(OverflowError, os.setregid, 0, 1<<32) +    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') +    def test_setregid_neg1(self): +        # Needs to accept -1.  We run this in a subprocess to avoid +        # altering the test runner's process state (issue8045). +        subprocess.check_call([ +                sys.executable, '-c', +                'import os,sys;os.setregid(-1,-1);sys.exit(0)']) +  @unittest.skipIf(sys.platform == "win32", "Posix specific tests")  class Pep383Tests(unittest.TestCase):      def setUp(self): @@ -1510,6 +1566,52 @@ class Win32KillTests(unittest.TestCase):  @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") +class Win32ListdirTests(unittest.TestCase): +    """Test listdir on Windows.""" + +    def setUp(self): +        self.created_paths = [] +        for i in range(2): +            dir_name = 'SUB%d' % i +            dir_path = os.path.join(support.TESTFN, dir_name) +            file_name = 'FILE%d' % i +            file_path = os.path.join(support.TESTFN, file_name) +            os.makedirs(dir_path) +            with open(file_path, 'w') as f: +                f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) +            self.created_paths.extend([dir_name, file_name]) +        self.created_paths.sort() + +    def tearDown(self): +        shutil.rmtree(support.TESTFN) + +    def test_listdir_no_extended_path(self): +        """Test when the path is not an "extended" path.""" +        # unicode +        self.assertEqual( +                sorted(os.listdir(support.TESTFN)), +                self.created_paths) +        # bytes +        self.assertEqual( +                sorted(os.listdir(os.fsencode(support.TESTFN))), +                [os.fsencode(path) for path in self.created_paths]) + +    def test_listdir_extended_path(self): +        """Test when the path starts with '\\\\?\\'.""" +        # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath +        # unicode +        path = '\\\\?\\' + os.path.abspath(support.TESTFN) +        self.assertEqual( +                sorted(os.listdir(path)), +                self.created_paths) +        # bytes +        path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN)) +        self.assertEqual( +                sorted(os.listdir(path)), +                [os.fsencode(path) for path in self.created_paths]) + + +@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")  @support.skip_unless_symlink  class Win32SymlinkTests(unittest.TestCase):      filelink = 'filelinktest' @@ -2174,6 +2276,197 @@ class TermsizeTests(unittest.TestCase):          self.assertEqual(expected, actual) +class OSErrorTests(unittest.TestCase): +    def setUp(self): +        class Str(str): +            pass + +        self.bytes_filenames = [] +        self.unicode_filenames = [] +        if support.TESTFN_UNENCODABLE is not None: +            decoded = support.TESTFN_UNENCODABLE +        else: +            decoded = support.TESTFN +        self.unicode_filenames.append(decoded) +        self.unicode_filenames.append(Str(decoded)) +        if support.TESTFN_UNDECODABLE is not None: +            encoded = support.TESTFN_UNDECODABLE +        else: +            encoded = os.fsencode(support.TESTFN) +        self.bytes_filenames.append(encoded) +        self.bytes_filenames.append(memoryview(encoded)) + +        self.filenames = self.bytes_filenames + self.unicode_filenames + +    def test_oserror_filename(self): +        funcs = [ +            (self.filenames, os.chdir,), +            (self.filenames, os.chmod, 0o777), +            (self.filenames, os.lstat,), +            (self.filenames, os.open, os.O_RDONLY), +            (self.filenames, os.rmdir,), +            (self.filenames, os.stat,), +            (self.filenames, os.unlink,), +        ] +        if sys.platform == "win32": +            funcs.extend(( +                (self.bytes_filenames, os.rename, b"dst"), +                (self.bytes_filenames, os.replace, b"dst"), +                (self.unicode_filenames, os.rename, "dst"), +                (self.unicode_filenames, os.replace, "dst"), +                # Issue #16414: Don't test undecodable names with listdir() +                # because of a Windows bug. +                # +                # With the ANSI code page 932, os.listdir(b'\xe7') return an +                # empty list (instead of failing), whereas os.listdir(b'\xff') +                # raises a FileNotFoundError. It looks like a Windows bug: +                # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7') +                # fails with ERROR_FILE_NOT_FOUND (2), instead of +                # ERROR_PATH_NOT_FOUND (3). +                (self.unicode_filenames, os.listdir,), +            )) +        else: +            funcs.extend(( +                (self.filenames, os.listdir,), +                (self.filenames, os.rename, "dst"), +                (self.filenames, os.replace, "dst"), +            )) +        if hasattr(os, "chown"): +            funcs.append((self.filenames, os.chown, 0, 0)) +        if hasattr(os, "lchown"): +            funcs.append((self.filenames, os.lchown, 0, 0)) +        if hasattr(os, "truncate"): +            funcs.append((self.filenames, os.truncate, 0)) +        if hasattr(os, "chflags"): +            funcs.append((self.filenames, os.chflags, 0)) +        if hasattr(os, "lchflags"): +            funcs.append((self.filenames, os.lchflags, 0)) +        if hasattr(os, "chroot"): +            funcs.append((self.filenames, os.chroot,)) +        if hasattr(os, "link"): +            if sys.platform == "win32": +                funcs.append((self.bytes_filenames, os.link, b"dst")) +                funcs.append((self.unicode_filenames, os.link, "dst")) +            else: +                funcs.append((self.filenames, os.link, "dst")) +        if hasattr(os, "listxattr"): +            funcs.extend(( +                (self.filenames, os.listxattr,), +                (self.filenames, os.getxattr, "user.test"), +                (self.filenames, os.setxattr, "user.test", b'user'), +                (self.filenames, os.removexattr, "user.test"), +            )) +        if hasattr(os, "lchmod"): +            funcs.append((self.filenames, os.lchmod, 0o777)) +        if hasattr(os, "readlink"): +            if sys.platform == "win32": +                funcs.append((self.unicode_filenames, os.readlink,)) +            else: +                funcs.append((self.filenames, os.readlink,)) + +        for filenames, func, *func_args in funcs: +            for name in filenames: +                try: +                    func(name, *func_args) +                except OSError as err: +                    self.assertIs(err.filename, name) +                else: +                    self.fail("No exception thrown by {}".format(func)) + +class CPUCountTests(unittest.TestCase): +    def test_cpu_count(self): +        cpus = os.cpu_count() +        if cpus is not None: +            self.assertIsInstance(cpus, int) +            self.assertGreater(cpus, 0) +        else: +            self.skipTest("Could not determine the number of CPUs") + + +class FDInheritanceTests(unittest.TestCase): +    def test_get_set_inheritable(self): +        fd = os.open(__file__, os.O_RDONLY) +        self.addCleanup(os.close, fd) +        self.assertEqual(os.get_inheritable(fd), False) + +        os.set_inheritable(fd, True) +        self.assertEqual(os.get_inheritable(fd), True) + +    @unittest.skipIf(fcntl is None, "need fcntl") +    def test_get_inheritable_cloexec(self): +        fd = os.open(__file__, os.O_RDONLY) +        self.addCleanup(os.close, fd) +        self.assertEqual(os.get_inheritable(fd), False) + +        # clear FD_CLOEXEC flag +        flags = fcntl.fcntl(fd, fcntl.F_GETFD) +        flags &= ~fcntl.FD_CLOEXEC +        fcntl.fcntl(fd, fcntl.F_SETFD, flags) + +        self.assertEqual(os.get_inheritable(fd), True) + +    @unittest.skipIf(fcntl is None, "need fcntl") +    def test_set_inheritable_cloexec(self): +        fd = os.open(__file__, os.O_RDONLY) +        self.addCleanup(os.close, fd) +        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, +                         fcntl.FD_CLOEXEC) + +        os.set_inheritable(fd, True) +        self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, +                         0) + +    def test_open(self): +        fd = os.open(__file__, os.O_RDONLY) +        self.addCleanup(os.close, fd) +        self.assertEqual(os.get_inheritable(fd), False) + +    @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()") +    def test_pipe(self): +        rfd, wfd = os.pipe() +        self.addCleanup(os.close, rfd) +        self.addCleanup(os.close, wfd) +        self.assertEqual(os.get_inheritable(rfd), False) +        self.assertEqual(os.get_inheritable(wfd), False) + +    def test_dup(self): +        fd1 = os.open(__file__, os.O_RDONLY) +        self.addCleanup(os.close, fd1) + +        fd2 = os.dup(fd1) +        self.addCleanup(os.close, fd2) +        self.assertEqual(os.get_inheritable(fd2), False) + +    @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()") +    def test_dup2(self): +        fd = os.open(__file__, os.O_RDONLY) +        self.addCleanup(os.close, fd) + +        # inheritable by default +        fd2 = os.open(__file__, os.O_RDONLY) +        try: +            os.dup2(fd, fd2) +            self.assertEqual(os.get_inheritable(fd2), True) +        finally: +            os.close(fd2) + +        # force non-inheritable +        fd3 = os.open(__file__, os.O_RDONLY) +        try: +            os.dup2(fd, fd3, inheritable=False) +            self.assertEqual(os.get_inheritable(fd3), False) +        finally: +            os.close(fd3) + +    @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") +    def test_openpty(self): +        master_fd, slave_fd = os.openpty() +        self.addCleanup(os.close, master_fd) +        self.addCleanup(os.close, slave_fd) +        self.assertEqual(os.get_inheritable(master_fd), False) +        self.assertEqual(os.get_inheritable(slave_fd), False) + +  @support.reap_threads  def test_main():      support.run_unittest( @@ -2191,6 +2484,7 @@ def test_main():          PosixUidGidTests,          Pep383Tests,          Win32KillTests, +        Win32ListdirTests,          Win32SymlinkTests,          NonLocalSymlinkTests,          FSEncodingTests, @@ -2203,7 +2497,10 @@ def test_main():          ExtendedAttributeTests,          Win32DeprecatedBytesAPI,          TermsizeTests, +        OSErrorTests,          RemoveDirsTests, +        CPUCountTests, +        FDInheritanceTests,      )  if __name__ == "__main__":  | 
