diff options
Diffstat (limited to 'Lib/test/test_os.py')
| -rw-r--r-- | Lib/test/test_os.py | 931 |
1 files changed, 678 insertions, 253 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 56309bf..e29b0d5 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -2,28 +2,32 @@ # does add tests for a few functions which have been determined to be more # portable than they had been thought to be. -import os -import errno -import unittest -import warnings -import sys -import signal -import subprocess -import time -import shutil -from test import support +import asynchat +import asyncore +import codecs import contextlib +import decimal +import errno +import fractions +import itertools +import locale import mmap +import os +import pickle import platform import re -import uuid -import asyncore -import asynchat +import shutil +import signal import socket -import itertools import stat -import locale -import codecs +import subprocess +import sys +import sysconfig +import time +import unittest +import uuid +import warnings +from test import support try: import threading except ImportError: @@ -32,19 +36,13 @@ try: import resource except ImportError: resource = None +try: + import fcntl +except ImportError: + fcntl = None from test.script_helper import assert_python_ok -with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - os.stat_float_times(True) -st = os.stat(__file__) -stat_supports_subsecond = ( - # check if float and int timestamps are different - (st.st_atime != st[7]) - or (st.st_mtime != st[8]) - or (st.st_ctime != st[9])) - # Detect whether we're on a Linux system that uses the (now outdated # and unmaintained) linuxthreads threading library. There's an issue # when combining linuxthreads with a failed execv call: see @@ -60,7 +58,7 @@ HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0 # Tests creating TESTFN class FileTests(unittest.TestCase): def setUp(self): - if os.path.exists(support.TESTFN): + if os.path.lexists(support.TESTFN): os.unlink(support.TESTFN) tearDown = setUp @@ -164,6 +162,19 @@ class FileTests(unittest.TestCase): with open(TESTFN2, 'r') as f: self.assertEqual(f.read(), "1") + def test_open_keywords(self): + f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777, + dir_fd=None) + os.close(f) + + def test_symlink_keywords(self): + symlink = support.get_attribute(os, "symlink") + try: + symlink(src='target', dst=support.TESTFN, + target_is_directory=False, dir_fd=None) + except (NotImplementedError, OSError): + pass # No OS support or unprivileged user + # Test attributes on return values from os.*stat* family. class StatAttributeTests(unittest.TestCase): @@ -256,6 +267,16 @@ class StatAttributeTests(unittest.TestCase): warnings.simplefilter("ignore", DeprecationWarning) self.check_stat_attributes(fname) + def test_stat_result_pickle(self): + result = os.stat(self.fname) + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + p = pickle.dumps(result, proto) + self.assertIn(b'stat_result', p) + if proto < 4: + self.assertIn(b'cos\nstat_result\n', p) + unpickled = pickle.loads(p) + self.assertEqual(result, unpickled) + @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') def test_statvfs_attributes(self): try: @@ -263,7 +284,7 @@ class StatAttributeTests(unittest.TestCase): except OSError as e: # On AtheOS, glibc always returns ENOSYS if e.errno == errno.ENOSYS: - self.skipTest('glibc always returns ENOSYS on AtheOS') + self.skipTest('os.statvfs() failed with ENOSYS') # Make sure direct access works self.assertEqual(result.f_bfree, result[3]) @@ -300,202 +321,243 @@ class StatAttributeTests(unittest.TestCase): except TypeError: pass - def test_utime_dir(self): - delta = 1000000 - st = os.stat(support.TESTFN) - # round to int, because some systems may support sub-second - # time stamps in stat, but not in utime. - os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta))) - st2 = os.stat(support.TESTFN) - self.assertEqual(st2.st_mtime, int(st.st_mtime-delta)) - - def _test_utime(self, filename, attr, utime, delta): - # Issue #13327 removed the requirement to pass None as the - # second argument. Check that the previous methods of passing - # a time tuple or None work in addition to no argument. - st0 = os.stat(filename) - # Doesn't set anything new, but sets the time tuple way - utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime"))) - # Setting the time to the time you just read, then reading again, - # should always return exactly the same times. - st1 = os.stat(filename) - self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime")) - self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime")) - # Set to the current time in the old explicit way. - os.utime(filename, None) - st2 = os.stat(support.TESTFN) - # Set to the current time in the new way - os.utime(filename) - st3 = os.stat(filename) - self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta) + @unittest.skipUnless(hasattr(os, 'statvfs'), + "need os.statvfs()") + def test_statvfs_result_pickle(self): + try: + result = os.statvfs(self.fname) + except OSError as e: + # On AtheOS, glibc always returns ENOSYS + if e.errno == errno.ENOSYS: + self.skipTest('os.statvfs() failed with ENOSYS') - def test_utime(self): - def utime(file, times): - return os.utime(file, times) - self._test_utime(self.fname, getattr, utime, 10) - self._test_utime(support.TESTFN, getattr, utime, 10) - - - def _test_utime_ns(self, set_times_ns, test_dir=True): - def getattr_ns(o, attr): - return getattr(o, attr + "_ns") - ten_s = 10 * 1000 * 1000 * 1000 - self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s) - if test_dir: - self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s) - - def test_utime_ns(self): - def utime_ns(file, times): - return os.utime(file, ns=times) - self._test_utime_ns(utime_ns) - - requires_utime_dir_fd = unittest.skipUnless( - os.utime in os.supports_dir_fd, - "dir_fd support for utime required for this test.") - requires_utime_fd = unittest.skipUnless( - os.utime in os.supports_fd, - "fd support for utime required for this test.") - requires_utime_nofollow_symlinks = unittest.skipUnless( - os.utime in os.supports_follow_symlinks, - "follow_symlinks support for utime required for this test.") - - @requires_utime_nofollow_symlinks - def test_lutimes_ns(self): - def lutimes_ns(file, times): - return os.utime(file, ns=times, follow_symlinks=False) - self._test_utime_ns(lutimes_ns) - - @requires_utime_fd - def test_futimes_ns(self): - def futimes_ns(file, times): - with open(file, "wb") as f: - os.utime(f.fileno(), ns=times) - self._test_utime_ns(futimes_ns, test_dir=False) - - def _utime_invalid_arguments(self, name, arg): - with self.assertRaises(ValueError): - getattr(os, name)(arg, (5, 5), ns=(5, 5)) + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + p = pickle.dumps(result, proto) + self.assertIn(b'statvfs_result', p) + if proto < 4: + self.assertIn(b'cos\nstatvfs_result\n', p) + unpickled = pickle.loads(p) + self.assertEqual(result, unpickled) - def test_utime_invalid_arguments(self): - self._utime_invalid_arguments('utime', self.fname) - - - @unittest.skipUnless(stat_supports_subsecond, - "os.stat() doesn't has a subsecond resolution") - def _test_utime_subsecond(self, set_time_func): - asec, amsec = 1, 901 - atime = asec + amsec * 1e-3 - msec, mmsec = 2, 901 - mtime = msec + mmsec * 1e-3 - filename = self.fname - os.utime(filename, (0, 0)) - set_time_func(filename, atime, mtime) + @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") + def test_1686475(self): + # Verify that an open file can be stat'ed + try: + os.stat(r"c:\pagefile.sys") + except FileNotFoundError: + self.skipTest(r'c:\pagefile.sys does not exist') + except OSError as e: + self.fail("Could not stat pagefile.sys") + + @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") + @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") + def test_15261(self): + # Verify that stat'ing a closed fd does not cause crash + r, w = os.pipe() + try: + os.stat(r) # should not raise error + finally: + os.close(r) + os.close(w) + with self.assertRaises(OSError) as ctx: + os.stat(r) + self.assertEqual(ctx.exception.errno, errno.EBADF) + + +class UtimeTests(unittest.TestCase): + def setUp(self): + self.dirname = support.TESTFN + self.fname = os.path.join(self.dirname, "f1") + + self.addCleanup(support.rmtree, self.dirname) + os.mkdir(self.dirname) + with open(self.fname, 'wb') as fp: + fp.write(b"ABC") + + def restore_float_times(state): + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + + os.stat_float_times(state) + + # ensure that st_atime and st_mtime are float with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) + + old_float_times = os.stat_float_times(-1) + self.addCleanup(restore_float_times, old_float_times) + os.stat_float_times(True) + + def support_subsecond(self, filename): + # Heuristic to check if the filesystem supports timestamp with + # subsecond resolution: check if float and int timestamps are different + st = os.stat(filename) + return ((st.st_atime != st[7]) + or (st.st_mtime != st[8]) + or (st.st_ctime != st[9])) + + def _test_utime(self, set_time, filename=None): + if not filename: + filename = self.fname + + support_subsecond = self.support_subsecond(filename) + if support_subsecond: + # Timestamp with a resolution of 1 microsecond (10^-6). + # + # The resolution of the C internal function used by os.utime() + # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable + # test with a resolution of 1 ns requires more work: + # see the issue #15745. + atime_ns = 1002003000 # 1.002003 seconds + mtime_ns = 4005006000 # 4.005006 seconds + else: + # use a resolution of 1 second + atime_ns = 5 * 10**9 + mtime_ns = 8 * 10**9 + + set_time(filename, (atime_ns, mtime_ns)) st = os.stat(filename) - self.assertAlmostEqual(st.st_atime, atime, places=3) - self.assertAlmostEqual(st.st_mtime, mtime, places=3) - def test_utime_subsecond(self): - def set_time(filename, atime, mtime): + if support_subsecond: + self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6) + self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6) + else: + self.assertEqual(st.st_atime, atime_ns * 1e-9) + self.assertEqual(st.st_mtime, mtime_ns * 1e-9) + self.assertEqual(st.st_atime_ns, atime_ns) + self.assertEqual(st.st_mtime_ns, mtime_ns) + + def test_utime(self): + def set_time(filename, ns): + # test the ns keyword parameter + os.utime(filename, ns=ns) + self._test_utime(set_time) + + @staticmethod + def ns_to_sec(ns): + # Convert a number of nanosecond (int) to a number of seconds (float). + # Round towards infinity by adding 0.5 nanosecond to avoid rounding + # issue, os.utime() rounds towards minus infinity. + return (ns * 1e-9) + 0.5e-9 + + def test_utime_by_indexed(self): + # pass times as floating point seconds as the second indexed parameter + def set_time(filename, ns): + atime_ns, mtime_ns = ns + atime = self.ns_to_sec(atime_ns) + mtime = self.ns_to_sec(mtime_ns) + # test utimensat(timespec), utimes(timeval), utime(utimbuf) + # or utime(time_t) os.utime(filename, (atime, mtime)) - self._test_utime_subsecond(set_time) - - @requires_utime_fd - def test_futimes_subsecond(self): - def set_time(filename, atime, mtime): - with open(filename, "wb") as f: - os.utime(f.fileno(), times=(atime, mtime)) - self._test_utime_subsecond(set_time) - - @requires_utime_fd - def test_futimens_subsecond(self): - def set_time(filename, atime, mtime): - with open(filename, "wb") as f: - os.utime(f.fileno(), times=(atime, mtime)) - self._test_utime_subsecond(set_time) - - @requires_utime_dir_fd - def test_futimesat_subsecond(self): - def set_time(filename, atime, mtime): - dirname = os.path.dirname(filename) - dirfd = os.open(dirname, os.O_RDONLY) - try: - os.utime(os.path.basename(filename), dir_fd=dirfd, - times=(atime, mtime)) - finally: - os.close(dirfd) - self._test_utime_subsecond(set_time) - - @requires_utime_nofollow_symlinks - def test_lutimes_subsecond(self): - def set_time(filename, atime, mtime): - os.utime(filename, (atime, mtime), follow_symlinks=False) - self._test_utime_subsecond(set_time) - - @requires_utime_dir_fd - def test_utimensat_subsecond(self): - def set_time(filename, atime, mtime): - dirname = os.path.dirname(filename) + self._test_utime(set_time) + + def test_utime_by_times(self): + def set_time(filename, ns): + atime_ns, mtime_ns = ns + atime = self.ns_to_sec(atime_ns) + mtime = self.ns_to_sec(mtime_ns) + # test the times keyword parameter + os.utime(filename, times=(atime, mtime)) + self._test_utime(set_time) + + @unittest.skipUnless(os.utime in os.supports_follow_symlinks, + "follow_symlinks support for utime required " + "for this test.") + def test_utime_nofollow_symlinks(self): + def set_time(filename, ns): + # use follow_symlinks=False to test utimensat(timespec) + # or lutimes(timeval) + os.utime(filename, ns=ns, follow_symlinks=False) + self._test_utime(set_time) + + @unittest.skipUnless(os.utime in os.supports_fd, + "fd support for utime required for this test.") + def test_utime_fd(self): + def set_time(filename, ns): + with open(filename, 'wb') as fp: + # use a file descriptor to test futimens(timespec) + # or futimes(timeval) + os.utime(fp.fileno(), ns=ns) + self._test_utime(set_time) + + @unittest.skipUnless(os.utime in os.supports_dir_fd, + "dir_fd support for utime required for this test.") + def test_utime_dir_fd(self): + def set_time(filename, ns): + dirname, name = os.path.split(filename) dirfd = os.open(dirname, os.O_RDONLY) try: - os.utime(os.path.basename(filename), dir_fd=dirfd, - times=(atime, mtime)) + # pass dir_fd to test utimensat(timespec) or futimesat(timeval) + os.utime(name, dir_fd=dirfd, ns=ns) finally: os.close(dirfd) - self._test_utime_subsecond(set_time) + self._test_utime(set_time) - # Restrict tests to Win32, since there is no guarantee other - # systems support centiseconds - def get_file_system(path): + def test_utime_directory(self): + def set_time(filename, ns): + # test calling os.utime() on a directory + os.utime(filename, ns=ns) + self._test_utime(set_time, filename=self.dirname) + + def _test_utime_current(self, set_time): + # Get the system clock + current = time.time() + + # Call os.utime() to set the timestamp to the current system clock + set_time(self.fname) + + if not self.support_subsecond(self.fname): + delta = 1.0 + else: + # On Windows, the usual resolution of time.time() is 15.6 ms + delta = 0.020 + st = os.stat(self.fname) + msg = ("st_time=%r, current=%r, dt=%r" + % (st.st_mtime, current, st.st_mtime - current)) + self.assertAlmostEqual(st.st_mtime, current, + delta=delta, msg=msg) + + def test_utime_current(self): + def set_time(filename): + # Set to the current time in the new way + os.utime(self.fname) + self._test_utime_current(set_time) + + def test_utime_current_old(self): + def set_time(filename): + # Set to the current time in the old explicit way. + os.utime(self.fname, None) + self._test_utime_current(set_time) + + def get_file_system(self, path): if sys.platform == 'win32': root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' import ctypes kernel32 = ctypes.windll.kernel32 buf = ctypes.create_unicode_buffer("", 100) - if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)): + ok = kernel32.GetVolumeInformationW(root, None, 0, + None, None, None, + buf, len(buf)) + if ok: return buf.value + # return None if the filesystem is unknown - @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") - @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS", - "requires NTFS") - def test_1565150(self): - t1 = 1159195039.25 - os.utime(self.fname, (t1, t1)) - self.assertEqual(os.stat(self.fname).st_mtime, t1) - - @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") - @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS", - "requires NTFS") def test_large_time(self): - t1 = 5000000000 # some day in 2128 - os.utime(self.fname, (t1, t1)) - self.assertEqual(os.stat(self.fname).st_mtime, t1) + # Many filesystems are limited to the year 2038. At least, the test + # pass with NTFS filesystem. + if self.get_file_system(self.dirname) != "NTFS": + self.skipTest("requires NTFS") - @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") - def test_1686475(self): - # Verify that an open file can be stat'ed - try: - os.stat(r"c:\pagefile.sys") - except WindowsError as e: - if e.errno == 2: # file does not exist; cannot run test - self.skipTest(r'c:\pagefile.sys does not exist') - self.fail("Could not stat pagefile.sys") + large = 5000000000 # some day in 2128 + os.utime(self.fname, (large, large)) + self.assertEqual(os.stat(self.fname).st_mtime, large) + + def test_utime_invalid_arguments(self): + # seconds and nanoseconds parameters are mutually exclusive + with self.assertRaises(ValueError): + os.utime(self.fname, (5, 5), ns=(5, 5)) - @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") - @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") - def test_15261(self): - # Verify that stat'ing a closed fd does not cause crash - r, w = os.pipe() - try: - os.stat(r) # should not raise error - finally: - os.close(r) - os.close(w) - with self.assertRaises(OSError) as ctx: - os.stat(r) - self.assertEqual(ctx.exception.errno, errno.EBADF) from test import mapping_tests @@ -658,9 +720,17 @@ class EnvironTests(mapping_tests.BasicTestMappingProtocol): class WalkTests(unittest.TestCase): """Tests for os.walk().""" + # Wrapper to hide minor differences between os.walk and os.fwalk + # to tests both functions with the same code base + def walk(self, directory, topdown=True, follow_symlinks=False): + walk_it = os.walk(directory, + topdown=topdown, + followlinks=follow_symlinks) + for root, dirs, files in walk_it: + yield (root, dirs, files) + def setUp(self): - import os - from os.path import join + join = os.path.join # Build: # TESTFN/ @@ -675,36 +745,39 @@ class WalkTests(unittest.TestCase): # broken_link # TEST2/ # tmp4 a lone file - walk_path = join(support.TESTFN, "TEST1") - sub1_path = join(walk_path, "SUB1") - sub11_path = join(sub1_path, "SUB11") - sub2_path = join(walk_path, "SUB2") - tmp1_path = join(walk_path, "tmp1") - tmp2_path = join(sub1_path, "tmp2") + self.walk_path = join(support.TESTFN, "TEST1") + self.sub1_path = join(self.walk_path, "SUB1") + self.sub11_path = join(self.sub1_path, "SUB11") + sub2_path = join(self.walk_path, "SUB2") + tmp1_path = join(self.walk_path, "tmp1") + tmp2_path = join(self.sub1_path, "tmp2") tmp3_path = join(sub2_path, "tmp3") - link_path = join(sub2_path, "link") + self.link_path = join(sub2_path, "link") t2_path = join(support.TESTFN, "TEST2") tmp4_path = join(support.TESTFN, "TEST2", "tmp4") - link_path = join(sub2_path, "link") broken_link_path = join(sub2_path, "broken_link") # Create stuff. - os.makedirs(sub11_path) + os.makedirs(self.sub11_path) os.makedirs(sub2_path) os.makedirs(t2_path) + for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path: f = open(path, "w") f.write("I'm " + path + " and proud of it. Blame test_os.\n") f.close() + if support.can_symlink(): - os.symlink(os.path.abspath(t2_path), link_path) + os.symlink(os.path.abspath(t2_path), self.link_path) os.symlink('broken', broken_link_path, True) - sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"]) + self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"]) else: - sub2_tree = (sub2_path, [], ["tmp3"]) + self.sub2_tree = (sub2_path, [], ["tmp3"]) + def test_walk_topdown(self): # Walk top-down. - all = list(os.walk(walk_path)) + all = list(os.walk(self.walk_path)) + self.assertEqual(len(all), 4) # We can't know which order SUB1 and SUB2 will appear in. # Not flipped: TESTFN, SUB1, SUB11, SUB2 @@ -712,26 +785,32 @@ class WalkTests(unittest.TestCase): flipped = all[0][1][0] != "SUB1" all[0][1].sort() all[3 - 2 * flipped][-1].sort() - self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"])) - self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"])) - self.assertEqual(all[2 + flipped], (sub11_path, [], [])) - self.assertEqual(all[3 - 2 * flipped], sub2_tree) + self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) + self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"])) + self.assertEqual(all[2 + flipped], (self.sub11_path, [], [])) + self.assertEqual(all[3 - 2 * flipped], self.sub2_tree) + def test_walk_prune(self): # Prune the search. all = [] - for root, dirs, files in os.walk(walk_path): + for root, dirs, files in self.walk(self.walk_path): all.append((root, dirs, files)) # Don't descend into SUB1. if 'SUB1' in dirs: # Note that this also mutates the dirs we appended to all! dirs.remove('SUB1') + self.assertEqual(len(all), 2) - self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"])) + self.assertEqual(all[0], + (self.walk_path, ["SUB2"], ["tmp1"])) + all[1][-1].sort() - self.assertEqual(all[1], sub2_tree) + self.assertEqual(all[1], self.sub2_tree) + def test_walk_bottom_up(self): # Walk bottom-up. - all = list(os.walk(walk_path, topdown=False)) + all = list(self.walk(self.walk_path, topdown=False)) + self.assertEqual(len(all), 4) # We can't know which order SUB1 and SUB2 will appear in. # Not flipped: SUB11, SUB1, SUB2, TESTFN @@ -739,20 +818,28 @@ class WalkTests(unittest.TestCase): flipped = all[3][1][0] != "SUB1" all[3][1].sort() all[2 - 2 * flipped][-1].sort() - self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"])) - self.assertEqual(all[flipped], (sub11_path, [], [])) - self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"])) - self.assertEqual(all[2 - 2 * flipped], sub2_tree) - - if support.can_symlink(): - # Walk, following symlinks. - for root, dirs, files in os.walk(walk_path, followlinks=True): - if root == link_path: - self.assertEqual(dirs, []) - self.assertEqual(files, ["tmp4"]) - break - else: - self.fail("Didn't follow symlink with followlinks=True") + self.assertEqual(all[3], + (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) + self.assertEqual(all[flipped], + (self.sub11_path, [], [])) + self.assertEqual(all[flipped + 1], + (self.sub1_path, ["SUB11"], ["tmp2"])) + self.assertEqual(all[2 - 2 * flipped], + self.sub2_tree) + + def test_walk_symlink(self): + if not support.can_symlink(): + self.skipTest("need symlink support") + + # Walk, following symlinks. + walk_it = self.walk(self.walk_path, follow_symlinks=True) + for root, dirs, files in walk_it: + if root == self.link_path: + self.assertEqual(dirs, []) + self.assertEqual(files, ["tmp4"]) + break + else: + self.fail("Didn't follow symlink with followlinks=True") def tearDown(self): # Tear everything down. This is a decent use for bottom-up on @@ -775,6 +862,14 @@ class WalkTests(unittest.TestCase): class FwalkTests(WalkTests): """Tests for os.fwalk().""" + def walk(self, directory, topdown=True, follow_symlinks=False): + walk_it = os.fwalk(directory, + topdown=topdown, + follow_symlinks=follow_symlinks) + for root, dirs, files, root_fd in walk_it: + yield (root, dirs, files) + + def _compare_to_walk(self, walk_kwargs, fwalk_kwargs): """ compare with walk() results. @@ -876,6 +971,20 @@ class MakedirTests(unittest.TestCase): os.makedirs(path, mode=mode, exist_ok=True) os.umask(old_mask) + # Issue #25583: A drive root could raise PermissionError on Windows + os.makedirs(os.path.abspath('/'), exist_ok=True) + + @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown') + def test_chown_uid_gid_arguments_must_be_index(self): + stat = os.stat(support.TESTFN) + uid = stat.st_uid + gid = stat.st_gid + for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): + self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) + self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) + self.assertIsNone(os.chown(support.TESTFN, uid, gid)) + self.assertIsNone(os.chown(support.TESTFN, -1, -1)) + def test_exist_ok_s_isgid_directory(self): path = os.path.join(support.TESTFN, 'dir1') S_ISGID = stat.S_ISGID @@ -1007,6 +1116,12 @@ class URandomTests(unittest.TestCase): data2 = self.get_urandom_subprocess(16) self.assertNotEqual(data1, data2) + +HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1) + +@unittest.skipIf(HAVE_GETENTROPY, + "getentropy() does not use a file descriptor") +class URandomFDTests(unittest.TestCase): @unittest.skipUnless(resource, "test requires the resource module") def test_urandom_failure(self): # Check urandom() failing when it is not able to open /dev/random. @@ -1030,6 +1145,49 @@ class URandomTests(unittest.TestCase): """ assert_python_ok('-c', code) + def test_urandom_fd_closed(self): + # Issue #21207: urandom() should reopen its fd to /dev/urandom if + # closed. + code = """if 1: + import os + import sys + os.urandom(4) + os.closerange(3, 256) + sys.stdout.buffer.write(os.urandom(4)) + """ + rc, out, err = assert_python_ok('-Sc', code) + + def test_urandom_fd_reopened(self): + # Issue #21207: urandom() should detect its fd to /dev/urandom + # changed to something else, and reopen it. + with open(support.TESTFN, 'wb') as f: + f.write(b"x" * 256) + self.addCleanup(os.unlink, support.TESTFN) + code = """if 1: + import os + import sys + os.urandom(4) + for fd in range(3, 256): + try: + os.close(fd) + except OSError: + pass + else: + # Found the urandom fd (XXX hopefully) + break + os.closerange(3, 256) + with open({TESTFN!r}, 'rb') as f: + os.dup2(f.fileno(), fd) + sys.stdout.buffer.write(os.urandom(4)) + sys.stdout.buffer.write(os.urandom(4)) + """.format(TESTFN=support.TESTFN) + rc, out, err = assert_python_ok('-Sc', code) + self.assertEqual(len(out), 8) + self.assertNotEqual(out[0:4], out[4:8]) + rc, out2, err2 = assert_python_ok('-Sc', code) + self.assertEqual(len(out2), 8) + self.assertNotEqual(out2, out) + @contextlib.contextmanager def _execvpe_mockup(defpath=None): @@ -1132,27 +1290,27 @@ class ExecTests(unittest.TestCase): @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") class Win32ErrorTests(unittest.TestCase): def test_rename(self): - self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak") + self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak") def test_remove(self): - self.assertRaises(WindowsError, os.remove, support.TESTFN) + self.assertRaises(OSError, os.remove, support.TESTFN) def test_chdir(self): - self.assertRaises(WindowsError, os.chdir, support.TESTFN) + self.assertRaises(OSError, os.chdir, support.TESTFN) def test_mkdir(self): f = open(support.TESTFN, "w") try: - self.assertRaises(WindowsError, os.mkdir, support.TESTFN) + self.assertRaises(OSError, os.mkdir, support.TESTFN) finally: f.close() os.unlink(support.TESTFN) def test_utime(self): - self.assertRaises(WindowsError, os.utime, support.TESTFN, None) + self.assertRaises(OSError, os.utime, support.TESTFN, None) def test_chmod(self): - self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0) + self.assertRaises(OSError, os.chmod, support.TESTFN, 0) class TestInvalidFD(unittest.TestCase): singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat", @@ -1173,7 +1331,7 @@ class TestInvalidFD(unittest.TestCase): except OSError as e: self.assertEqual(e.errno, errno.EBADF) else: - self.fail("%r didn't raise a OSError with a bad file descriptor" + self.fail("%r didn't raise an OSError with a bad file descriptor" % f) @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') @@ -1286,41 +1444,57 @@ class PosixUidGidTests(unittest.TestCase): @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') def test_setuid(self): if os.getuid() != 0: - self.assertRaises(os.error, os.setuid, 0) + self.assertRaises(OSError, os.setuid, 0) self.assertRaises(OverflowError, os.setuid, 1<<32) @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') def test_setgid(self): if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(os.error, os.setgid, 0) + self.assertRaises(OSError, os.setgid, 0) self.assertRaises(OverflowError, os.setgid, 1<<32) @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') def test_seteuid(self): if os.getuid() != 0: - self.assertRaises(os.error, os.seteuid, 0) + self.assertRaises(OSError, os.seteuid, 0) self.assertRaises(OverflowError, os.seteuid, 1<<32) @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') def test_setegid(self): if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(os.error, os.setegid, 0) + self.assertRaises(OSError, os.setegid, 0) self.assertRaises(OverflowError, os.setegid, 1<<32) @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') def test_setreuid(self): if os.getuid() != 0: - self.assertRaises(os.error, os.setreuid, 0, 0) + self.assertRaises(OSError, os.setreuid, 0, 0) self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) + @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') + def test_setreuid_neg1(self): + # Needs to accept -1. We run this in a subprocess to avoid + # altering the test runner's process state (issue8045). + subprocess.check_call([ + sys.executable, '-c', + 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) + @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') def test_setregid(self): if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(os.error, os.setregid, 0, 0) + self.assertRaises(OSError, os.setregid, 0, 0) self.assertRaises(OverflowError, os.setregid, 1<<32, 0) self.assertRaises(OverflowError, os.setregid, 0, 1<<32) + @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') + def test_setregid_neg1(self): + # Needs to accept -1. We run this in a subprocess to avoid + # altering the test runner's process state (issue8045). + subprocess.check_call([ + sys.executable, '-c', + 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) + @unittest.skipIf(sys.platform == "win32", "Posix specific tests") class Pep383Tests(unittest.TestCase): def setUp(self): @@ -1486,7 +1660,7 @@ class Win32KillTests(unittest.TestCase): os.kill(proc.pid, signal.SIGINT) self.fail("subprocess did not stop on {}".format(name)) - @unittest.skip("subprocesses aren't inheriting CTRL+C property") + @unittest.skip("subprocesses aren't inheriting Ctrl+C property") def test_CTRL_C_EVENT(self): from ctypes import wintypes import ctypes @@ -1499,7 +1673,7 @@ class Win32KillTests(unittest.TestCase): SetConsoleCtrlHandler.restype = wintypes.BOOL # Calling this with NULL and FALSE causes the calling process to - # handle CTRL+C, rather than ignore it. This property is inherited + # handle Ctrl+C, rather than ignore it. This property is inherited # by subprocesses. SetConsoleCtrlHandler(NULL, 0) @@ -1510,6 +1684,52 @@ class Win32KillTests(unittest.TestCase): @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") +class Win32ListdirTests(unittest.TestCase): + """Test listdir on Windows.""" + + def setUp(self): + self.created_paths = [] + for i in range(2): + dir_name = 'SUB%d' % i + dir_path = os.path.join(support.TESTFN, dir_name) + file_name = 'FILE%d' % i + file_path = os.path.join(support.TESTFN, file_name) + os.makedirs(dir_path) + with open(file_path, 'w') as f: + f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) + self.created_paths.extend([dir_name, file_name]) + self.created_paths.sort() + + def tearDown(self): + shutil.rmtree(support.TESTFN) + + def test_listdir_no_extended_path(self): + """Test when the path is not an "extended" path.""" + # unicode + self.assertEqual( + sorted(os.listdir(support.TESTFN)), + self.created_paths) + # bytes + self.assertEqual( + sorted(os.listdir(os.fsencode(support.TESTFN))), + [os.fsencode(path) for path in self.created_paths]) + + def test_listdir_extended_path(self): + """Test when the path starts with '\\\\?\\'.""" + # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath + # unicode + path = '\\\\?\\' + os.path.abspath(support.TESTFN) + self.assertEqual( + sorted(os.listdir(path)), + self.created_paths) + # bytes + path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN)) + self.assertEqual( + sorted(os.listdir(path)), + [os.fsencode(path) for path in self.created_paths]) + + +@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") @support.skip_unless_symlink class Win32SymlinkTests(unittest.TestCase): filelink = 'filelinktest' @@ -1947,6 +2167,14 @@ class TestSendfile(unittest.TestCase): os.sendfile(self.sockno, self.fileno, -1, 4096) self.assertEqual(cm.exception.errno, errno.EINVAL) + def test_keywords(self): + # Keyword arguments should be supported + os.sendfile(out=self.sockno, offset=0, count=4096, + **{'in': self.fileno}) + if self.SUPPORT_HEADERS_TRAILERS: + os.sendfile(self.sockno, self.fileno, offset=0, count=4096, + headers=(), trailers=(), flags=0) + # --- headers / trailers tests @requires_headers_trailers @@ -2174,23 +2402,217 @@ class TermsizeTests(unittest.TestCase): self.assertEqual(expected, actual) +class OSErrorTests(unittest.TestCase): + def setUp(self): + class Str(str): + pass + + self.bytes_filenames = [] + self.unicode_filenames = [] + if support.TESTFN_UNENCODABLE is not None: + decoded = support.TESTFN_UNENCODABLE + else: + decoded = support.TESTFN + self.unicode_filenames.append(decoded) + self.unicode_filenames.append(Str(decoded)) + if support.TESTFN_UNDECODABLE is not None: + encoded = support.TESTFN_UNDECODABLE + else: + encoded = os.fsencode(support.TESTFN) + self.bytes_filenames.append(encoded) + self.bytes_filenames.append(memoryview(encoded)) + + self.filenames = self.bytes_filenames + self.unicode_filenames + + def test_oserror_filename(self): + funcs = [ + (self.filenames, os.chdir,), + (self.filenames, os.chmod, 0o777), + (self.filenames, os.lstat,), + (self.filenames, os.open, os.O_RDONLY), + (self.filenames, os.rmdir,), + (self.filenames, os.stat,), + (self.filenames, os.unlink,), + ] + if sys.platform == "win32": + funcs.extend(( + (self.bytes_filenames, os.rename, b"dst"), + (self.bytes_filenames, os.replace, b"dst"), + (self.unicode_filenames, os.rename, "dst"), + (self.unicode_filenames, os.replace, "dst"), + # Issue #16414: Don't test undecodable names with listdir() + # because of a Windows bug. + # + # With the ANSI code page 932, os.listdir(b'\xe7') return an + # empty list (instead of failing), whereas os.listdir(b'\xff') + # raises a FileNotFoundError. It looks like a Windows bug: + # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7') + # fails with ERROR_FILE_NOT_FOUND (2), instead of + # ERROR_PATH_NOT_FOUND (3). + (self.unicode_filenames, os.listdir,), + )) + else: + funcs.extend(( + (self.filenames, os.listdir,), + (self.filenames, os.rename, "dst"), + (self.filenames, os.replace, "dst"), + )) + if hasattr(os, "chown"): + funcs.append((self.filenames, os.chown, 0, 0)) + if hasattr(os, "lchown"): + funcs.append((self.filenames, os.lchown, 0, 0)) + if hasattr(os, "truncate"): + funcs.append((self.filenames, os.truncate, 0)) + if hasattr(os, "chflags"): + funcs.append((self.filenames, os.chflags, 0)) + if hasattr(os, "lchflags"): + funcs.append((self.filenames, os.lchflags, 0)) + if hasattr(os, "chroot"): + funcs.append((self.filenames, os.chroot,)) + if hasattr(os, "link"): + if sys.platform == "win32": + funcs.append((self.bytes_filenames, os.link, b"dst")) + funcs.append((self.unicode_filenames, os.link, "dst")) + else: + funcs.append((self.filenames, os.link, "dst")) + if hasattr(os, "listxattr"): + funcs.extend(( + (self.filenames, os.listxattr,), + (self.filenames, os.getxattr, "user.test"), + (self.filenames, os.setxattr, "user.test", b'user'), + (self.filenames, os.removexattr, "user.test"), + )) + if hasattr(os, "lchmod"): + funcs.append((self.filenames, os.lchmod, 0o777)) + if hasattr(os, "readlink"): + if sys.platform == "win32": + funcs.append((self.unicode_filenames, os.readlink,)) + else: + funcs.append((self.filenames, os.readlink,)) + + for filenames, func, *func_args in funcs: + for name in filenames: + try: + func(name, *func_args) + except OSError as err: + self.assertIs(err.filename, name) + else: + self.fail("No exception thrown by {}".format(func)) + +class CPUCountTests(unittest.TestCase): + def test_cpu_count(self): + cpus = os.cpu_count() + if cpus is not None: + self.assertIsInstance(cpus, int) + self.assertGreater(cpus, 0) + else: + self.skipTest("Could not determine the number of CPUs") + + +class FDInheritanceTests(unittest.TestCase): + def test_get_set_inheritable(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + self.assertEqual(os.get_inheritable(fd), False) + + os.set_inheritable(fd, True) + self.assertEqual(os.get_inheritable(fd), True) + + @unittest.skipIf(fcntl is None, "need fcntl") + def test_get_inheritable_cloexec(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + self.assertEqual(os.get_inheritable(fd), False) + + # clear FD_CLOEXEC flag + flags = fcntl.fcntl(fd, fcntl.F_GETFD) + flags &= ~fcntl.FD_CLOEXEC + fcntl.fcntl(fd, fcntl.F_SETFD, flags) + + self.assertEqual(os.get_inheritable(fd), True) + + @unittest.skipIf(fcntl is None, "need fcntl") + def test_set_inheritable_cloexec(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, + fcntl.FD_CLOEXEC) + + os.set_inheritable(fd, True) + self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, + 0) + + def test_open(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + self.assertEqual(os.get_inheritable(fd), False) + + @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()") + def test_pipe(self): + rfd, wfd = os.pipe() + self.addCleanup(os.close, rfd) + self.addCleanup(os.close, wfd) + self.assertEqual(os.get_inheritable(rfd), False) + self.assertEqual(os.get_inheritable(wfd), False) + + def test_dup(self): + fd1 = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd1) + + fd2 = os.dup(fd1) + self.addCleanup(os.close, fd2) + self.assertEqual(os.get_inheritable(fd2), False) + + @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()") + def test_dup2(self): + fd = os.open(__file__, os.O_RDONLY) + self.addCleanup(os.close, fd) + + # inheritable by default + fd2 = os.open(__file__, os.O_RDONLY) + try: + os.dup2(fd, fd2) + self.assertEqual(os.get_inheritable(fd2), True) + finally: + os.close(fd2) + + # force non-inheritable + fd3 = os.open(__file__, os.O_RDONLY) + try: + os.dup2(fd, fd3, inheritable=False) + self.assertEqual(os.get_inheritable(fd3), False) + finally: + os.close(fd3) + + @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") + def test_openpty(self): + master_fd, slave_fd = os.openpty() + self.addCleanup(os.close, master_fd) + self.addCleanup(os.close, slave_fd) + self.assertEqual(os.get_inheritable(master_fd), False) + self.assertEqual(os.get_inheritable(slave_fd), False) + + @support.reap_threads def test_main(): support.run_unittest( FileTests, StatAttributeTests, + UtimeTests, EnvironTests, WalkTests, FwalkTests, MakedirTests, DevNullTests, URandomTests, + URandomFDTests, ExecTests, Win32ErrorTests, TestInvalidFD, PosixUidGidTests, Pep383Tests, Win32KillTests, + Win32ListdirTests, Win32SymlinkTests, NonLocalSymlinkTests, FSEncodingTests, @@ -2203,7 +2625,10 @@ def test_main(): ExtendedAttributeTests, Win32DeprecatedBytesAPI, TermsizeTests, + OSErrorTests, RemoveDirsTests, + CPUCountTests, + FDInheritanceTests, ) if __name__ == "__main__": |
