diff options
Diffstat (limited to 'Lib/test/test_os.py')
-rw-r--r-- | Lib/test/test_os.py | 233 |
1 files changed, 186 insertions, 47 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 152ba93..afa7a4a 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -4,6 +4,7 @@ import os import errno +import getpass import unittest import warnings import sys @@ -40,9 +41,35 @@ try: import fcntl except ImportError: fcntl = None +try: + import _winapi +except ImportError: + _winapi = None +try: + import grp + groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem] + if hasattr(os, 'getgid'): + process_gid = os.getgid() + if process_gid not in groups: + groups.append(process_gid) +except ImportError: + groups = [] +try: + import pwd + all_users = [u.pw_uid for u in pwd.getpwall()] +except ImportError: + all_users = [] +try: + from _testcapi import INT_MAX, PY_SSIZE_T_MAX +except ImportError: + INT_MAX = PY_SSIZE_T_MAX = sys.maxsize from test.script_helper import assert_python_ok +root_in_posix = False +if hasattr(os, 'geteuid'): + root_in_posix = (os.geteuid() == 0) + with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) os.stat_float_times(True) @@ -116,6 +143,26 @@ class FileTests(unittest.TestCase): self.assertEqual(type(s), bytes) self.assertEqual(s, b"spam") + @support.cpython_only + # Skip the test on 32-bit platforms: the number of bytes must fit in a + # Py_ssize_t type + @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX, + "needs INT_MAX < PY_SSIZE_T_MAX") + @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) + def test_large_read(self, size): + with open(support.TESTFN, "wb") as fp: + fp.write(b'test') + self.addCleanup(support.unlink, support.TESTFN) + + # Issue #21932: Make sure that os.read() does not raise an + # OverflowError for size larger than INT_MAX + with open(support.TESTFN, "rb") as fp: + data = os.read(fp.fileno(), size) + + # The test does not try to read more than 2 GB at once because the + # operating system is free to return less bytes than requested. + self.assertEqual(data, b'test') + def test_write(self): # os.write() accepts bytes- and buffer-like objects but not strings fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY) @@ -533,6 +580,28 @@ class StatAttributeTests(unittest.TestCase): os.stat(r) self.assertEqual(ctx.exception.errno, errno.EBADF) + def check_file_attributes(self, result): + self.assertTrue(hasattr(result, 'st_file_attributes')) + self.assertTrue(isinstance(result.st_file_attributes, int)) + self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF) + + @unittest.skipUnless(sys.platform == "win32", + "st_file_attributes is Win32 specific") + def test_file_attributes(self): + # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set) + result = os.stat(self.fname) + self.check_file_attributes(result) + self.assertEqual( + result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, + 0) + + # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set) + result = os.stat(support.TESTFN) + self.check_file_attributes(result) + self.assertEqual( + result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, + stat.FILE_ATTRIBUTE_DIRECTORY) + from test import mapping_tests class EnvironTests(mapping_tests.BasicTestMappingProtocol): @@ -912,17 +981,6 @@ class MakedirTests(unittest.TestCase): os.makedirs(path, mode=mode, exist_ok=True) os.umask(old_mask) - @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown') - def test_chown_uid_gid_arguments_must_be_index(self): - stat = os.stat(support.TESTFN) - uid = stat.st_uid - gid = stat.st_gid - for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): - self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) - self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) - self.assertIsNone(os.chown(support.TESTFN, uid, gid)) - self.assertIsNone(os.chown(support.TESTFN, -1, -1)) - def test_exist_ok_s_isgid_directory(self): path = os.path.join(support.TESTFN, 'dir1') S_ISGID = stat.S_ISGID @@ -973,6 +1031,58 @@ class MakedirTests(unittest.TestCase): os.removedirs(path) +@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown") +class ChownFileTests(unittest.TestCase): + + def setUpClass(): + os.mkdir(support.TESTFN) + + def test_chown_uid_gid_arguments_must_be_index(self): + stat = os.stat(support.TESTFN) + uid = stat.st_uid + gid = stat.st_gid + for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): + self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) + self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) + self.assertIsNone(os.chown(support.TESTFN, uid, gid)) + self.assertIsNone(os.chown(support.TESTFN, -1, -1)) + + @unittest.skipUnless(len(groups) > 1, "test needs more than one group") + def test_chown(self): + gid_1, gid_2 = groups[:2] + uid = os.stat(support.TESTFN).st_uid + os.chown(support.TESTFN, uid, gid_1) + gid = os.stat(support.TESTFN).st_gid + self.assertEqual(gid, gid_1) + os.chown(support.TESTFN, uid, gid_2) + gid = os.stat(support.TESTFN).st_gid + self.assertEqual(gid, gid_2) + + @unittest.skipUnless(root_in_posix and len(all_users) > 1, + "test needs root privilege and more than one user") + def test_chown_with_root(self): + uid_1, uid_2 = all_users[:2] + gid = os.stat(support.TESTFN).st_gid + os.chown(support.TESTFN, uid_1, gid) + uid = os.stat(support.TESTFN).st_uid + self.assertEqual(uid, uid_1) + os.chown(support.TESTFN, uid_2, gid) + uid = os.stat(support.TESTFN).st_uid + self.assertEqual(uid, uid_2) + + @unittest.skipUnless(not root_in_posix and len(all_users) > 1, + "test needs non-root account and more than one user") + def test_chown_without_permission(self): + uid_1, uid_2 = all_users[:2] + gid = os.stat(support.TESTFN).st_gid + with self.assertRaises(PermissionError): + os.chown(support.TESTFN, uid_1, gid) + os.chown(support.TESTFN, uid_2, gid) + + def tearDownClass(): + os.rmdir(support.TESTFN) + + class RemoveDirsTests(unittest.TestCase): def setUp(self): os.makedirs(support.TESTFN) @@ -1339,6 +1449,16 @@ class TestInvalidFD(unittest.TestCase): def test_writev(self): self.check(os.writev, [b'abc']) + def test_inheritable(self): + self.check(os.get_inheritable) + self.check(os.set_inheritable, True) + + @unittest.skipUnless(hasattr(os, 'get_blocking'), + 'needs os.get_blocking() and os.set_blocking()') + def test_blocking(self): + self.check(os.get_blocking) + self.check(os.set_blocking, True) + class LinkTests(unittest.TestCase): def setUp(self): @@ -1786,6 +1906,37 @@ class Win32SymlinkTests(unittest.TestCase): shutil.rmtree(level1) +@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") +class Win32JunctionTests(unittest.TestCase): + junction = 'junctiontest' + junction_target = os.path.dirname(os.path.abspath(__file__)) + + def setUp(self): + assert os.path.exists(self.junction_target) + assert not os.path.exists(self.junction) + + def tearDown(self): + if os.path.exists(self.junction): + # os.rmdir delegates to Windows' RemoveDirectoryW, + # which removes junction points safely. + os.rmdir(self.junction) + + def test_create_junction(self): + _winapi.CreateJunction(self.junction_target, self.junction) + self.assertTrue(os.path.exists(self.junction)) + self.assertTrue(os.path.isdir(self.junction)) + + # Junctions are not recognized as links. + self.assertFalse(os.path.islink(self.junction)) + + def test_unlink_removes_junction(self): + _winapi.CreateJunction(self.junction_target, self.junction) + self.assertTrue(os.path.exists(self.junction)) + + os.unlink(self.junction) + self.assertFalse(os.path.exists(self.junction)) + + @support.skip_unless_symlink class NonLocalSymlinkTests(unittest.TestCase): @@ -1992,11 +2143,13 @@ class TestSendfile(unittest.TestCase): @classmethod def setUpClass(cls): + cls.key = support.threading_setup() with open(support.TESTFN, "wb") as f: f.write(cls.DATA) @classmethod def tearDownClass(cls): + support.threading_cleanup(*cls.key) support.unlink(support.TESTFN) def setUp(self): @@ -2523,41 +2676,27 @@ class FDInheritanceTests(unittest.TestCase): self.assertEqual(os.get_inheritable(slave_fd), False) -@support.reap_threads -def test_main(): - support.run_unittest( - FileTests, - StatAttributeTests, - EnvironTests, - WalkTests, - FwalkTests, - MakedirTests, - DevNullTests, - URandomTests, - ExecTests, - Win32ErrorTests, - TestInvalidFD, - PosixUidGidTests, - Pep383Tests, - Win32KillTests, - Win32ListdirTests, - Win32SymlinkTests, - NonLocalSymlinkTests, - FSEncodingTests, - DeviceEncodingTests, - PidTests, - LoginTests, - LinkTests, - TestSendfile, - ProgramPriorityTests, - ExtendedAttributeTests, - Win32DeprecatedBytesAPI, - TermsizeTests, - OSErrorTests, - RemoveDirsTests, - CPUCountTests, - FDInheritanceTests, - ) +@unittest.skipUnless(hasattr(os, 'get_blocking'), + 'needs os.get_blocking() and os.set_blocking()') +class BlockingTests(unittest.TestCase): + def test_blocking(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + self.assertEqual(os.get_blocking(fd), True) + + os.set_blocking(fd, False) + self.assertEqual(os.get_blocking(fd), False) + + os.set_blocking(fd, True) + self.assertEqual(os.get_blocking(fd), True) + + + +class ExportsTests(unittest.TestCase): + def test_os_all(self): + self.assertIn('open', os.__all__) + self.assertIn('walk', os.__all__) + if __name__ == "__main__": - test_main() + unittest.main() |