diff options
Diffstat (limited to 'Lib/test/test_os.py')
-rw-r--r-- | Lib/test/test_os.py | 479 |
1 files changed, 233 insertions, 246 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index d2424d7..601c6b2 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -178,10 +178,8 @@ class StatAttributeTests(unittest.TestCase): os.unlink(self.fname) os.rmdir(support.TESTFN) + @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') def check_stat_attributes(self, fname): - if not hasattr(os, "stat"): - return - result = os.stat(fname) # Make sure direct access works @@ -258,10 +256,8 @@ class StatAttributeTests(unittest.TestCase): warnings.simplefilter("ignore", DeprecationWarning) self.check_stat_attributes(fname) + @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') def test_statvfs_attributes(self): - if not hasattr(os, "statvfs"): - return - try: result = os.statvfs(self.fname) except OSError as e: @@ -450,10 +446,10 @@ class StatAttributeTests(unittest.TestCase): os.close(dirfd) self._test_utime_subsecond(set_time) - # Restrict test to Win32, since there is no guarantee other + # Restrict tests to Win32, since there is no guarantee other # systems support centiseconds - if sys.platform == 'win32': - def get_file_system(path): + def get_file_system(path): + if sys.platform == 'win32': root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' import ctypes kernel32 = ctypes.windll.kernel32 @@ -461,38 +457,45 @@ class StatAttributeTests(unittest.TestCase): if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)): return buf.value - if get_file_system(support.TESTFN) == "NTFS": - def test_1565150(self): - t1 = 1159195039.25 - os.utime(self.fname, (t1, t1)) - self.assertEqual(os.stat(self.fname).st_mtime, t1) - - def test_large_time(self): - t1 = 5000000000 # some day in 2128 - os.utime(self.fname, (t1, t1)) - self.assertEqual(os.stat(self.fname).st_mtime, t1) + @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") + @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS", + "requires NTFS") + def test_1565150(self): + t1 = 1159195039.25 + os.utime(self.fname, (t1, t1)) + self.assertEqual(os.stat(self.fname).st_mtime, t1) + + @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") + @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS", + "requires NTFS") + def test_large_time(self): + t1 = 5000000000 # some day in 2128 + os.utime(self.fname, (t1, t1)) + self.assertEqual(os.stat(self.fname).st_mtime, t1) + + @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") + def test_1686475(self): + # Verify that an open file can be stat'ed + try: + os.stat(r"c:\pagefile.sys") + except WindowsError as e: + if e.errno == 2: # file does not exist; cannot run test + return + self.fail("Could not stat pagefile.sys") - def test_1686475(self): - # Verify that an open file can be stat'ed - try: - os.stat(r"c:\pagefile.sys") - except WindowsError as e: - if e.errno == 2: # file does not exist; cannot run test - return - self.fail("Could not stat pagefile.sys") - - @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") - def test_15261(self): - # Verify that stat'ing a closed fd does not cause crash - r, w = os.pipe() - try: - os.stat(r) # should not raise error - finally: - os.close(r) - os.close(w) - with self.assertRaises(OSError) as ctx: - os.stat(r) - self.assertEqual(ctx.exception.errno, errno.EBADF) + @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") + @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") + def test_15261(self): + # Verify that stat'ing a closed fd does not cause crash + r, w = os.pipe() + try: + os.stat(r) # should not raise error + finally: + os.close(r) + os.close(w) + with self.assertRaises(OSError) as ctx: + os.stat(r) + self.assertEqual(ctx.exception.errno, errno.EBADF) from test import mapping_tests @@ -1127,6 +1130,7 @@ class ExecTests(unittest.TestCase): self._test_internal_execvpe(bytes) +@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") class Win32ErrorTests(unittest.TestCase): def test_rename(self): self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak") @@ -1173,63 +1177,63 @@ class TestInvalidFD(unittest.TestCase): self.fail("%r didn't raise a OSError with a bad file descriptor" % f) + @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') def test_isatty(self): - if hasattr(os, "isatty"): - self.assertEqual(os.isatty(support.make_bad_fd()), False) + self.assertEqual(os.isatty(support.make_bad_fd()), False) + @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()') def test_closerange(self): - if hasattr(os, "closerange"): - fd = support.make_bad_fd() - # Make sure none of the descriptors we are about to close are - # currently valid (issue 6542). - for i in range(10): - try: os.fstat(fd+i) - except OSError: - pass - else: - break - if i < 2: - raise unittest.SkipTest( - "Unable to acquire a range of invalid file descriptors") - self.assertEqual(os.closerange(fd, fd + i-1), None) + fd = support.make_bad_fd() + # Make sure none of the descriptors we are about to close are + # currently valid (issue 6542). + for i in range(10): + try: os.fstat(fd+i) + except OSError: + pass + else: + break + if i < 2: + raise unittest.SkipTest( + "Unable to acquire a range of invalid file descriptors") + self.assertEqual(os.closerange(fd, fd + i-1), None) + @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') def test_dup2(self): - if hasattr(os, "dup2"): - self.check(os.dup2, 20) + self.check(os.dup2, 20) + @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()') def test_fchmod(self): - if hasattr(os, "fchmod"): - self.check(os.fchmod, 0) + self.check(os.fchmod, 0) + @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()') def test_fchown(self): - if hasattr(os, "fchown"): - self.check(os.fchown, -1, -1) + self.check(os.fchown, -1, -1) + @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') def test_fpathconf(self): - if hasattr(os, "fpathconf"): - self.check(os.pathconf, "PC_NAME_MAX") - self.check(os.fpathconf, "PC_NAME_MAX") + self.check(os.pathconf, "PC_NAME_MAX") + self.check(os.fpathconf, "PC_NAME_MAX") + @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') def test_ftruncate(self): - if hasattr(os, "ftruncate"): - self.check(os.truncate, 0) - self.check(os.ftruncate, 0) + self.check(os.truncate, 0) + self.check(os.ftruncate, 0) + @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') def test_lseek(self): - if hasattr(os, "lseek"): - self.check(os.lseek, 0, 0) + self.check(os.lseek, 0, 0) + @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') def test_read(self): - if hasattr(os, "read"): - self.check(os.read, 1) + self.check(os.read, 1) + @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()') def test_tcsetpgrpt(self): - if hasattr(os, "tcsetpgrp"): - self.check(os.tcsetpgrp, 0) + self.check(os.tcsetpgrp, 0) + @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') def test_write(self): - if hasattr(os, "write"): - self.check(os.write, b" ") + self.check(os.write, b" ") class LinkTests(unittest.TestCase): @@ -1269,138 +1273,117 @@ class LinkTests(unittest.TestCase): self.file2 = self.file1 + "2" self._test_link(self.file1, self.file2) -if sys.platform != 'win32': - class Win32ErrorTests(unittest.TestCase): - pass - - class PosixUidGidTests(unittest.TestCase): - if hasattr(os, 'setuid'): - def test_setuid(self): - if os.getuid() != 0: - self.assertRaises(os.error, os.setuid, 0) - self.assertRaises(OverflowError, os.setuid, 1<<32) - - if hasattr(os, 'setgid'): - def test_setgid(self): - if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(os.error, os.setgid, 0) - self.assertRaises(OverflowError, os.setgid, 1<<32) - - if hasattr(os, 'seteuid'): - def test_seteuid(self): - if os.getuid() != 0: - self.assertRaises(os.error, os.seteuid, 0) - self.assertRaises(OverflowError, os.seteuid, 1<<32) - - if hasattr(os, 'setegid'): - def test_setegid(self): - if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(os.error, os.setegid, 0) - self.assertRaises(OverflowError, os.setegid, 1<<32) - - if hasattr(os, 'setreuid'): - def test_setreuid(self): - if os.getuid() != 0: - self.assertRaises(os.error, os.setreuid, 0, 0) - self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) - self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) - - def test_setreuid_neg1(self): - # Needs to accept -1. We run this in a subprocess to avoid - # altering the test runner's process state (issue8045). - subprocess.check_call([ - sys.executable, '-c', - 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) - - if hasattr(os, 'setregid'): - def test_setregid(self): - if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(os.error, os.setregid, 0, 0) - self.assertRaises(OverflowError, os.setregid, 1<<32, 0) - self.assertRaises(OverflowError, os.setregid, 0, 1<<32) - - def test_setregid_neg1(self): - # Needs to accept -1. We run this in a subprocess to avoid - # altering the test runner's process state (issue8045). - subprocess.check_call([ - sys.executable, '-c', - 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) - - class Pep383Tests(unittest.TestCase): - def setUp(self): - if support.TESTFN_UNENCODABLE: - self.dir = support.TESTFN_UNENCODABLE - elif support.TESTFN_NONASCII: - self.dir = support.TESTFN_NONASCII - else: - self.dir = support.TESTFN - self.bdir = os.fsencode(self.dir) - - bytesfn = [] - def add_filename(fn): - try: - fn = os.fsencode(fn) - except UnicodeEncodeError: - return - bytesfn.append(fn) - add_filename(support.TESTFN_UNICODE) - if support.TESTFN_UNENCODABLE: - add_filename(support.TESTFN_UNENCODABLE) - if support.TESTFN_NONASCII: - add_filename(support.TESTFN_NONASCII) - if not bytesfn: - self.skipTest("couldn't create any non-ascii filename") - - self.unicodefn = set() - os.mkdir(self.dir) - try: - for fn in bytesfn: - support.create_empty_file(os.path.join(self.bdir, fn)) - fn = os.fsdecode(fn) - if fn in self.unicodefn: - raise ValueError("duplicate filename") - self.unicodefn.add(fn) - except: - shutil.rmtree(self.dir) - raise +@unittest.skipIf(sys.platform == "win32", "Posix specific tests") +class PosixUidGidTests(unittest.TestCase): + @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') + def test_setuid(self): + if os.getuid() != 0: + self.assertRaises(os.error, os.setuid, 0) + self.assertRaises(OverflowError, os.setuid, 1<<32) + + @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') + def test_setgid(self): + if os.getuid() != 0 and not HAVE_WHEEL_GROUP: + self.assertRaises(os.error, os.setgid, 0) + self.assertRaises(OverflowError, os.setgid, 1<<32) + + @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') + def test_seteuid(self): + if os.getuid() != 0: + self.assertRaises(os.error, os.seteuid, 0) + self.assertRaises(OverflowError, os.seteuid, 1<<32) + + @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') + def test_setegid(self): + if os.getuid() != 0 and not HAVE_WHEEL_GROUP: + self.assertRaises(os.error, os.setegid, 0) + self.assertRaises(OverflowError, os.setegid, 1<<32) + + @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') + def test_setreuid(self): + if os.getuid() != 0: + self.assertRaises(os.error, os.setreuid, 0, 0) + self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) + self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) + + @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') + def test_setregid(self): + if os.getuid() != 0 and not HAVE_WHEEL_GROUP: + self.assertRaises(os.error, os.setregid, 0, 0) + self.assertRaises(OverflowError, os.setregid, 1<<32, 0) + self.assertRaises(OverflowError, os.setregid, 0, 1<<32) + +@unittest.skipIf(sys.platform == "win32", "Posix specific tests") +class Pep383Tests(unittest.TestCase): + def setUp(self): + if support.TESTFN_UNENCODABLE: + self.dir = support.TESTFN_UNENCODABLE + elif support.TESTFN_NONASCII: + self.dir = support.TESTFN_NONASCII + else: + self.dir = support.TESTFN + self.bdir = os.fsencode(self.dir) - def tearDown(self): + bytesfn = [] + def add_filename(fn): + try: + fn = os.fsencode(fn) + except UnicodeEncodeError: + return + bytesfn.append(fn) + add_filename(support.TESTFN_UNICODE) + if support.TESTFN_UNENCODABLE: + add_filename(support.TESTFN_UNENCODABLE) + if support.TESTFN_NONASCII: + add_filename(support.TESTFN_NONASCII) + if not bytesfn: + self.skipTest("couldn't create any non-ascii filename") + + self.unicodefn = set() + os.mkdir(self.dir) + try: + for fn in bytesfn: + support.create_empty_file(os.path.join(self.bdir, fn)) + fn = os.fsdecode(fn) + if fn in self.unicodefn: + raise ValueError("duplicate filename") + self.unicodefn.add(fn) + except: shutil.rmtree(self.dir) + raise - def test_listdir(self): - expected = self.unicodefn - found = set(os.listdir(self.dir)) - self.assertEqual(found, expected) - # test listdir without arguments - current_directory = os.getcwd() - try: - os.chdir(os.sep) - self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) - finally: - os.chdir(current_directory) - - def test_open(self): - for fn in self.unicodefn: - f = open(os.path.join(self.dir, fn), 'rb') - f.close() - - @unittest.skipUnless(hasattr(os, 'statvfs'), - "need os.statvfs()") - def test_statvfs(self): - # issue #9645 - for fn in self.unicodefn: - # should not fail with file not found error - fullname = os.path.join(self.dir, fn) - os.statvfs(fullname) - - def test_stat(self): - for fn in self.unicodefn: - os.stat(os.path.join(self.dir, fn)) -else: - class PosixUidGidTests(unittest.TestCase): - pass - class Pep383Tests(unittest.TestCase): - pass + def tearDown(self): + shutil.rmtree(self.dir) + + def test_listdir(self): + expected = self.unicodefn + found = set(os.listdir(self.dir)) + self.assertEqual(found, expected) + # test listdir without arguments + current_directory = os.getcwd() + try: + os.chdir(os.sep) + self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) + finally: + os.chdir(current_directory) + + def test_open(self): + for fn in self.unicodefn: + f = open(os.path.join(self.dir, fn), 'rb') + f.close() + + @unittest.skipUnless(hasattr(os, 'statvfs'), + "need os.statvfs()") + def test_statvfs(self): + # issue #9645 + for fn in self.unicodefn: + # should not fail with file not found error + fullname = os.path.join(self.dir, fn) + os.statvfs(fullname) + + def test_stat(self): + for fn in self.unicodefn: + os.stat(os.path.join(self.dir, fn)) @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") class Win32KillTests(unittest.TestCase): @@ -1838,6 +1821,8 @@ class TestSendfile(unittest.TestCase): SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ not sys.platform.startswith("solaris") and \ not sys.platform.startswith("sunos") + requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, + 'requires headers and trailers support') @classmethod def setUpClass(cls): @@ -1956,52 +1941,54 @@ class TestSendfile(unittest.TestCase): # --- headers / trailers tests - if SUPPORT_HEADERS_TRAILERS: - - def test_headers(self): - total_sent = 0 - sent = os.sendfile(self.sockno, self.fileno, 0, 4096, - headers=[b"x" * 512]) + @requires_headers_trailers + def test_headers(self): + total_sent = 0 + sent = os.sendfile(self.sockno, self.fileno, 0, 4096, + headers=[b"x" * 512]) + total_sent += sent + offset = 4096 + nbytes = 4096 + while 1: + sent = self.sendfile_wrapper(self.sockno, self.fileno, + offset, nbytes) + if sent == 0: + break total_sent += sent - offset = 4096 - nbytes = 4096 - while 1: - sent = self.sendfile_wrapper(self.sockno, self.fileno, - offset, nbytes) - if sent == 0: - break - total_sent += sent - offset += sent + offset += sent - expected_data = b"x" * 512 + self.DATA - self.assertEqual(total_sent, len(expected_data)) + expected_data = b"x" * 512 + self.DATA + self.assertEqual(total_sent, len(expected_data)) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + self.assertEqual(hash(data), hash(expected_data)) + + @requires_headers_trailers + def test_trailers(self): + TESTFN2 = support.TESTFN + "2" + file_data = b"abcdef" + with open(TESTFN2, 'wb') as f: + f.write(file_data) + with open(TESTFN2, 'rb')as f: + self.addCleanup(os.remove, TESTFN2) + os.sendfile(self.sockno, f.fileno(), 0, len(file_data), + trailers=[b"1234"]) self.client.close() self.server.wait() data = self.server.handler_instance.get_data() - self.assertEqual(hash(data), hash(expected_data)) - - def test_trailers(self): - TESTFN2 = support.TESTFN + "2" - file_data = b"abcdef" - with open(TESTFN2, 'wb') as f: - f.write(file_data) - with open(TESTFN2, 'rb')as f: - self.addCleanup(os.remove, TESTFN2) - os.sendfile(self.sockno, f.fileno(), 0, len(file_data), - trailers=[b"1234"]) - self.client.close() - self.server.wait() - data = self.server.handler_instance.get_data() - self.assertEqual(data, b"abcdef1234") - - if hasattr(os, "SF_NODISKIO"): - def test_flags(self): - try: - os.sendfile(self.sockno, self.fileno, 0, 4096, - flags=os.SF_NODISKIO) - except OSError as err: - if err.errno not in (errno.EBUSY, errno.EAGAIN): - raise + self.assertEqual(data, b"abcdef1234") + + @requires_headers_trailers + @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'), + 'test needs os.SF_NODISKIO') + def test_flags(self): + try: + os.sendfile(self.sockno, self.fileno, 0, 4096, + flags=os.SF_NODISKIO) + except OSError as err: + if err.errno not in (errno.EBUSY, errno.EAGAIN): + raise def supports_extended_attributes(): |