summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_os.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_os.py')
-rw-r--r--Lib/test/test_os.py931
1 files changed, 678 insertions, 253 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 56309bf..e29b0d5 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -2,28 +2,32 @@
# does add tests for a few functions which have been determined to be more
# portable than they had been thought to be.
-import os
-import errno
-import unittest
-import warnings
-import sys
-import signal
-import subprocess
-import time
-import shutil
-from test import support
+import asynchat
+import asyncore
+import codecs
import contextlib
+import decimal
+import errno
+import fractions
+import itertools
+import locale
import mmap
+import os
+import pickle
import platform
import re
-import uuid
-import asyncore
-import asynchat
+import shutil
+import signal
import socket
-import itertools
import stat
-import locale
-import codecs
+import subprocess
+import sys
+import sysconfig
+import time
+import unittest
+import uuid
+import warnings
+from test import support
try:
import threading
except ImportError:
@@ -32,19 +36,13 @@ try:
import resource
except ImportError:
resource = None
+try:
+ import fcntl
+except ImportError:
+ fcntl = None
from test.script_helper import assert_python_ok
-with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- os.stat_float_times(True)
-st = os.stat(__file__)
-stat_supports_subsecond = (
- # check if float and int timestamps are different
- (st.st_atime != st[7])
- or (st.st_mtime != st[8])
- or (st.st_ctime != st[9]))
-
# Detect whether we're on a Linux system that uses the (now outdated
# and unmaintained) linuxthreads threading library. There's an issue
# when combining linuxthreads with a failed execv call: see
@@ -60,7 +58,7 @@ HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
# Tests creating TESTFN
class FileTests(unittest.TestCase):
def setUp(self):
- if os.path.exists(support.TESTFN):
+ if os.path.lexists(support.TESTFN):
os.unlink(support.TESTFN)
tearDown = setUp
@@ -164,6 +162,19 @@ class FileTests(unittest.TestCase):
with open(TESTFN2, 'r') as f:
self.assertEqual(f.read(), "1")
+ def test_open_keywords(self):
+ f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
+ dir_fd=None)
+ os.close(f)
+
+ def test_symlink_keywords(self):
+ symlink = support.get_attribute(os, "symlink")
+ try:
+ symlink(src='target', dst=support.TESTFN,
+ target_is_directory=False, dir_fd=None)
+ except (NotImplementedError, OSError):
+ pass # No OS support or unprivileged user
+
# Test attributes on return values from os.*stat* family.
class StatAttributeTests(unittest.TestCase):
@@ -256,6 +267,16 @@ class StatAttributeTests(unittest.TestCase):
warnings.simplefilter("ignore", DeprecationWarning)
self.check_stat_attributes(fname)
+ def test_stat_result_pickle(self):
+ result = os.stat(self.fname)
+ for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ p = pickle.dumps(result, proto)
+ self.assertIn(b'stat_result', p)
+ if proto < 4:
+ self.assertIn(b'cos\nstat_result\n', p)
+ unpickled = pickle.loads(p)
+ self.assertEqual(result, unpickled)
+
@unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
def test_statvfs_attributes(self):
try:
@@ -263,7 +284,7 @@ class StatAttributeTests(unittest.TestCase):
except OSError as e:
# On AtheOS, glibc always returns ENOSYS
if e.errno == errno.ENOSYS:
- self.skipTest('glibc always returns ENOSYS on AtheOS')
+ self.skipTest('os.statvfs() failed with ENOSYS')
# Make sure direct access works
self.assertEqual(result.f_bfree, result[3])
@@ -300,202 +321,243 @@ class StatAttributeTests(unittest.TestCase):
except TypeError:
pass
- def test_utime_dir(self):
- delta = 1000000
- st = os.stat(support.TESTFN)
- # round to int, because some systems may support sub-second
- # time stamps in stat, but not in utime.
- os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
- st2 = os.stat(support.TESTFN)
- self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
-
- def _test_utime(self, filename, attr, utime, delta):
- # Issue #13327 removed the requirement to pass None as the
- # second argument. Check that the previous methods of passing
- # a time tuple or None work in addition to no argument.
- st0 = os.stat(filename)
- # Doesn't set anything new, but sets the time tuple way
- utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
- # Setting the time to the time you just read, then reading again,
- # should always return exactly the same times.
- st1 = os.stat(filename)
- self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
- self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
- # Set to the current time in the old explicit way.
- os.utime(filename, None)
- st2 = os.stat(support.TESTFN)
- # Set to the current time in the new way
- os.utime(filename)
- st3 = os.stat(filename)
- self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
+ @unittest.skipUnless(hasattr(os, 'statvfs'),
+ "need os.statvfs()")
+ def test_statvfs_result_pickle(self):
+ try:
+ result = os.statvfs(self.fname)
+ except OSError as e:
+ # On AtheOS, glibc always returns ENOSYS
+ if e.errno == errno.ENOSYS:
+ self.skipTest('os.statvfs() failed with ENOSYS')
- def test_utime(self):
- def utime(file, times):
- return os.utime(file, times)
- self._test_utime(self.fname, getattr, utime, 10)
- self._test_utime(support.TESTFN, getattr, utime, 10)
-
-
- def _test_utime_ns(self, set_times_ns, test_dir=True):
- def getattr_ns(o, attr):
- return getattr(o, attr + "_ns")
- ten_s = 10 * 1000 * 1000 * 1000
- self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
- if test_dir:
- self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
-
- def test_utime_ns(self):
- def utime_ns(file, times):
- return os.utime(file, ns=times)
- self._test_utime_ns(utime_ns)
-
- requires_utime_dir_fd = unittest.skipUnless(
- os.utime in os.supports_dir_fd,
- "dir_fd support for utime required for this test.")
- requires_utime_fd = unittest.skipUnless(
- os.utime in os.supports_fd,
- "fd support for utime required for this test.")
- requires_utime_nofollow_symlinks = unittest.skipUnless(
- os.utime in os.supports_follow_symlinks,
- "follow_symlinks support for utime required for this test.")
-
- @requires_utime_nofollow_symlinks
- def test_lutimes_ns(self):
- def lutimes_ns(file, times):
- return os.utime(file, ns=times, follow_symlinks=False)
- self._test_utime_ns(lutimes_ns)
-
- @requires_utime_fd
- def test_futimes_ns(self):
- def futimes_ns(file, times):
- with open(file, "wb") as f:
- os.utime(f.fileno(), ns=times)
- self._test_utime_ns(futimes_ns, test_dir=False)
-
- def _utime_invalid_arguments(self, name, arg):
- with self.assertRaises(ValueError):
- getattr(os, name)(arg, (5, 5), ns=(5, 5))
+ for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ p = pickle.dumps(result, proto)
+ self.assertIn(b'statvfs_result', p)
+ if proto < 4:
+ self.assertIn(b'cos\nstatvfs_result\n', p)
+ unpickled = pickle.loads(p)
+ self.assertEqual(result, unpickled)
- def test_utime_invalid_arguments(self):
- self._utime_invalid_arguments('utime', self.fname)
-
-
- @unittest.skipUnless(stat_supports_subsecond,
- "os.stat() doesn't has a subsecond resolution")
- def _test_utime_subsecond(self, set_time_func):
- asec, amsec = 1, 901
- atime = asec + amsec * 1e-3
- msec, mmsec = 2, 901
- mtime = msec + mmsec * 1e-3
- filename = self.fname
- os.utime(filename, (0, 0))
- set_time_func(filename, atime, mtime)
+ @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
+ def test_1686475(self):
+ # Verify that an open file can be stat'ed
+ try:
+ os.stat(r"c:\pagefile.sys")
+ except FileNotFoundError:
+ self.skipTest(r'c:\pagefile.sys does not exist')
+ except OSError as e:
+ self.fail("Could not stat pagefile.sys")
+
+ @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
+ @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
+ def test_15261(self):
+ # Verify that stat'ing a closed fd does not cause crash
+ r, w = os.pipe()
+ try:
+ os.stat(r) # should not raise error
+ finally:
+ os.close(r)
+ os.close(w)
+ with self.assertRaises(OSError) as ctx:
+ os.stat(r)
+ self.assertEqual(ctx.exception.errno, errno.EBADF)
+
+
+class UtimeTests(unittest.TestCase):
+ def setUp(self):
+ self.dirname = support.TESTFN
+ self.fname = os.path.join(self.dirname, "f1")
+
+ self.addCleanup(support.rmtree, self.dirname)
+ os.mkdir(self.dirname)
+ with open(self.fname, 'wb') as fp:
+ fp.write(b"ABC")
+
+ def restore_float_times(state):
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+
+ os.stat_float_times(state)
+
+ # ensure that st_atime and st_mtime are float
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
+
+ old_float_times = os.stat_float_times(-1)
+ self.addCleanup(restore_float_times, old_float_times)
+
os.stat_float_times(True)
+
+ def support_subsecond(self, filename):
+ # Heuristic to check if the filesystem supports timestamp with
+ # subsecond resolution: check if float and int timestamps are different
+ st = os.stat(filename)
+ return ((st.st_atime != st[7])
+ or (st.st_mtime != st[8])
+ or (st.st_ctime != st[9]))
+
+ def _test_utime(self, set_time, filename=None):
+ if not filename:
+ filename = self.fname
+
+ support_subsecond = self.support_subsecond(filename)
+ if support_subsecond:
+ # Timestamp with a resolution of 1 microsecond (10^-6).
+ #
+ # The resolution of the C internal function used by os.utime()
+ # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
+ # test with a resolution of 1 ns requires more work:
+ # see the issue #15745.
+ atime_ns = 1002003000 # 1.002003 seconds
+ mtime_ns = 4005006000 # 4.005006 seconds
+ else:
+ # use a resolution of 1 second
+ atime_ns = 5 * 10**9
+ mtime_ns = 8 * 10**9
+
+ set_time(filename, (atime_ns, mtime_ns))
st = os.stat(filename)
- self.assertAlmostEqual(st.st_atime, atime, places=3)
- self.assertAlmostEqual(st.st_mtime, mtime, places=3)
- def test_utime_subsecond(self):
- def set_time(filename, atime, mtime):
+ if support_subsecond:
+ self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
+ self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
+ else:
+ self.assertEqual(st.st_atime, atime_ns * 1e-9)
+ self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
+ self.assertEqual(st.st_atime_ns, atime_ns)
+ self.assertEqual(st.st_mtime_ns, mtime_ns)
+
+ def test_utime(self):
+ def set_time(filename, ns):
+ # test the ns keyword parameter
+ os.utime(filename, ns=ns)
+ self._test_utime(set_time)
+
+ @staticmethod
+ def ns_to_sec(ns):
+ # Convert a number of nanosecond (int) to a number of seconds (float).
+ # Round towards infinity by adding 0.5 nanosecond to avoid rounding
+ # issue, os.utime() rounds towards minus infinity.
+ return (ns * 1e-9) + 0.5e-9
+
+ def test_utime_by_indexed(self):
+ # pass times as floating point seconds as the second indexed parameter
+ def set_time(filename, ns):
+ atime_ns, mtime_ns = ns
+ atime = self.ns_to_sec(atime_ns)
+ mtime = self.ns_to_sec(mtime_ns)
+ # test utimensat(timespec), utimes(timeval), utime(utimbuf)
+ # or utime(time_t)
os.utime(filename, (atime, mtime))
- self._test_utime_subsecond(set_time)
-
- @requires_utime_fd
- def test_futimes_subsecond(self):
- def set_time(filename, atime, mtime):
- with open(filename, "wb") as f:
- os.utime(f.fileno(), times=(atime, mtime))
- self._test_utime_subsecond(set_time)
-
- @requires_utime_fd
- def test_futimens_subsecond(self):
- def set_time(filename, atime, mtime):
- with open(filename, "wb") as f:
- os.utime(f.fileno(), times=(atime, mtime))
- self._test_utime_subsecond(set_time)
-
- @requires_utime_dir_fd
- def test_futimesat_subsecond(self):
- def set_time(filename, atime, mtime):
- dirname = os.path.dirname(filename)
- dirfd = os.open(dirname, os.O_RDONLY)
- try:
- os.utime(os.path.basename(filename), dir_fd=dirfd,
- times=(atime, mtime))
- finally:
- os.close(dirfd)
- self._test_utime_subsecond(set_time)
-
- @requires_utime_nofollow_symlinks
- def test_lutimes_subsecond(self):
- def set_time(filename, atime, mtime):
- os.utime(filename, (atime, mtime), follow_symlinks=False)
- self._test_utime_subsecond(set_time)
-
- @requires_utime_dir_fd
- def test_utimensat_subsecond(self):
- def set_time(filename, atime, mtime):
- dirname = os.path.dirname(filename)
+ self._test_utime(set_time)
+
+ def test_utime_by_times(self):
+ def set_time(filename, ns):
+ atime_ns, mtime_ns = ns
+ atime = self.ns_to_sec(atime_ns)
+ mtime = self.ns_to_sec(mtime_ns)
+ # test the times keyword parameter
+ os.utime(filename, times=(atime, mtime))
+ self._test_utime(set_time)
+
+ @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
+ "follow_symlinks support for utime required "
+ "for this test.")
+ def test_utime_nofollow_symlinks(self):
+ def set_time(filename, ns):
+ # use follow_symlinks=False to test utimensat(timespec)
+ # or lutimes(timeval)
+ os.utime(filename, ns=ns, follow_symlinks=False)
+ self._test_utime(set_time)
+
+ @unittest.skipUnless(os.utime in os.supports_fd,
+ "fd support for utime required for this test.")
+ def test_utime_fd(self):
+ def set_time(filename, ns):
+ with open(filename, 'wb') as fp:
+ # use a file descriptor to test futimens(timespec)
+ # or futimes(timeval)
+ os.utime(fp.fileno(), ns=ns)
+ self._test_utime(set_time)
+
+ @unittest.skipUnless(os.utime in os.supports_dir_fd,
+ "dir_fd support for utime required for this test.")
+ def test_utime_dir_fd(self):
+ def set_time(filename, ns):
+ dirname, name = os.path.split(filename)
dirfd = os.open(dirname, os.O_RDONLY)
try:
- os.utime(os.path.basename(filename), dir_fd=dirfd,
- times=(atime, mtime))
+ # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
+ os.utime(name, dir_fd=dirfd, ns=ns)
finally:
os.close(dirfd)
- self._test_utime_subsecond(set_time)
+ self._test_utime(set_time)
- # Restrict tests to Win32, since there is no guarantee other
- # systems support centiseconds
- def get_file_system(path):
+ def test_utime_directory(self):
+ def set_time(filename, ns):
+ # test calling os.utime() on a directory
+ os.utime(filename, ns=ns)
+ self._test_utime(set_time, filename=self.dirname)
+
+ def _test_utime_current(self, set_time):
+ # Get the system clock
+ current = time.time()
+
+ # Call os.utime() to set the timestamp to the current system clock
+ set_time(self.fname)
+
+ if not self.support_subsecond(self.fname):
+ delta = 1.0
+ else:
+ # On Windows, the usual resolution of time.time() is 15.6 ms
+ delta = 0.020
+ st = os.stat(self.fname)
+ msg = ("st_time=%r, current=%r, dt=%r"
+ % (st.st_mtime, current, st.st_mtime - current))
+ self.assertAlmostEqual(st.st_mtime, current,
+ delta=delta, msg=msg)
+
+ def test_utime_current(self):
+ def set_time(filename):
+ # Set to the current time in the new way
+ os.utime(self.fname)
+ self._test_utime_current(set_time)
+
+ def test_utime_current_old(self):
+ def set_time(filename):
+ # Set to the current time in the old explicit way.
+ os.utime(self.fname, None)
+ self._test_utime_current(set_time)
+
+ def get_file_system(self, path):
if sys.platform == 'win32':
root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
import ctypes
kernel32 = ctypes.windll.kernel32
buf = ctypes.create_unicode_buffer("", 100)
- if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
+ ok = kernel32.GetVolumeInformationW(root, None, 0,
+ None, None, None,
+ buf, len(buf))
+ if ok:
return buf.value
+ # return None if the filesystem is unknown
- @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
- @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
- "requires NTFS")
- def test_1565150(self):
- t1 = 1159195039.25
- os.utime(self.fname, (t1, t1))
- self.assertEqual(os.stat(self.fname).st_mtime, t1)
-
- @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
- @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
- "requires NTFS")
def test_large_time(self):
- t1 = 5000000000 # some day in 2128
- os.utime(self.fname, (t1, t1))
- self.assertEqual(os.stat(self.fname).st_mtime, t1)
+ # Many filesystems are limited to the year 2038. At least, the test
+ # pass with NTFS filesystem.
+ if self.get_file_system(self.dirname) != "NTFS":
+ self.skipTest("requires NTFS")
- @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
- def test_1686475(self):
- # Verify that an open file can be stat'ed
- try:
- os.stat(r"c:\pagefile.sys")
- except WindowsError as e:
- if e.errno == 2: # file does not exist; cannot run test
- self.skipTest(r'c:\pagefile.sys does not exist')
- self.fail("Could not stat pagefile.sys")
+ large = 5000000000 # some day in 2128
+ os.utime(self.fname, (large, large))
+ self.assertEqual(os.stat(self.fname).st_mtime, large)
+
+ def test_utime_invalid_arguments(self):
+ # seconds and nanoseconds parameters are mutually exclusive
+ with self.assertRaises(ValueError):
+ os.utime(self.fname, (5, 5), ns=(5, 5))
- @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
- @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
- def test_15261(self):
- # Verify that stat'ing a closed fd does not cause crash
- r, w = os.pipe()
- try:
- os.stat(r) # should not raise error
- finally:
- os.close(r)
- os.close(w)
- with self.assertRaises(OSError) as ctx:
- os.stat(r)
- self.assertEqual(ctx.exception.errno, errno.EBADF)
from test import mapping_tests
@@ -658,9 +720,17 @@ class EnvironTests(mapping_tests.BasicTestMappingProtocol):
class WalkTests(unittest.TestCase):
"""Tests for os.walk()."""
+ # Wrapper to hide minor differences between os.walk and os.fwalk
+ # to tests both functions with the same code base
+ def walk(self, directory, topdown=True, follow_symlinks=False):
+ walk_it = os.walk(directory,
+ topdown=topdown,
+ followlinks=follow_symlinks)
+ for root, dirs, files in walk_it:
+ yield (root, dirs, files)
+
def setUp(self):
- import os
- from os.path import join
+ join = os.path.join
# Build:
# TESTFN/
@@ -675,36 +745,39 @@ class WalkTests(unittest.TestCase):
# broken_link
# TEST2/
# tmp4 a lone file
- walk_path = join(support.TESTFN, "TEST1")
- sub1_path = join(walk_path, "SUB1")
- sub11_path = join(sub1_path, "SUB11")
- sub2_path = join(walk_path, "SUB2")
- tmp1_path = join(walk_path, "tmp1")
- tmp2_path = join(sub1_path, "tmp2")
+ self.walk_path = join(support.TESTFN, "TEST1")
+ self.sub1_path = join(self.walk_path, "SUB1")
+ self.sub11_path = join(self.sub1_path, "SUB11")
+ sub2_path = join(self.walk_path, "SUB2")
+ tmp1_path = join(self.walk_path, "tmp1")
+ tmp2_path = join(self.sub1_path, "tmp2")
tmp3_path = join(sub2_path, "tmp3")
- link_path = join(sub2_path, "link")
+ self.link_path = join(sub2_path, "link")
t2_path = join(support.TESTFN, "TEST2")
tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
- link_path = join(sub2_path, "link")
broken_link_path = join(sub2_path, "broken_link")
# Create stuff.
- os.makedirs(sub11_path)
+ os.makedirs(self.sub11_path)
os.makedirs(sub2_path)
os.makedirs(t2_path)
+
for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
f = open(path, "w")
f.write("I'm " + path + " and proud of it. Blame test_os.\n")
f.close()
+
if support.can_symlink():
- os.symlink(os.path.abspath(t2_path), link_path)
+ os.symlink(os.path.abspath(t2_path), self.link_path)
os.symlink('broken', broken_link_path, True)
- sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
+ self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
else:
- sub2_tree = (sub2_path, [], ["tmp3"])
+ self.sub2_tree = (sub2_path, [], ["tmp3"])
+ def test_walk_topdown(self):
# Walk top-down.
- all = list(os.walk(walk_path))
+ all = list(os.walk(self.walk_path))
+
self.assertEqual(len(all), 4)
# We can't know which order SUB1 and SUB2 will appear in.
# Not flipped: TESTFN, SUB1, SUB11, SUB2
@@ -712,26 +785,32 @@ class WalkTests(unittest.TestCase):
flipped = all[0][1][0] != "SUB1"
all[0][1].sort()
all[3 - 2 * flipped][-1].sort()
- self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
- self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
- self.assertEqual(all[2 + flipped], (sub11_path, [], []))
- self.assertEqual(all[3 - 2 * flipped], sub2_tree)
+ self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
+ self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
+ self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
+ self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
+ def test_walk_prune(self):
# Prune the search.
all = []
- for root, dirs, files in os.walk(walk_path):
+ for root, dirs, files in self.walk(self.walk_path):
all.append((root, dirs, files))
# Don't descend into SUB1.
if 'SUB1' in dirs:
# Note that this also mutates the dirs we appended to all!
dirs.remove('SUB1')
+
self.assertEqual(len(all), 2)
- self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
+ self.assertEqual(all[0],
+ (self.walk_path, ["SUB2"], ["tmp1"]))
+
all[1][-1].sort()
- self.assertEqual(all[1], sub2_tree)
+ self.assertEqual(all[1], self.sub2_tree)
+ def test_walk_bottom_up(self):
# Walk bottom-up.
- all = list(os.walk(walk_path, topdown=False))
+ all = list(self.walk(self.walk_path, topdown=False))
+
self.assertEqual(len(all), 4)
# We can't know which order SUB1 and SUB2 will appear in.
# Not flipped: SUB11, SUB1, SUB2, TESTFN
@@ -739,20 +818,28 @@ class WalkTests(unittest.TestCase):
flipped = all[3][1][0] != "SUB1"
all[3][1].sort()
all[2 - 2 * flipped][-1].sort()
- self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
- self.assertEqual(all[flipped], (sub11_path, [], []))
- self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
- self.assertEqual(all[2 - 2 * flipped], sub2_tree)
-
- if support.can_symlink():
- # Walk, following symlinks.
- for root, dirs, files in os.walk(walk_path, followlinks=True):
- if root == link_path:
- self.assertEqual(dirs, [])
- self.assertEqual(files, ["tmp4"])
- break
- else:
- self.fail("Didn't follow symlink with followlinks=True")
+ self.assertEqual(all[3],
+ (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
+ self.assertEqual(all[flipped],
+ (self.sub11_path, [], []))
+ self.assertEqual(all[flipped + 1],
+ (self.sub1_path, ["SUB11"], ["tmp2"]))
+ self.assertEqual(all[2 - 2 * flipped],
+ self.sub2_tree)
+
+ def test_walk_symlink(self):
+ if not support.can_symlink():
+ self.skipTest("need symlink support")
+
+ # Walk, following symlinks.
+ walk_it = self.walk(self.walk_path, follow_symlinks=True)
+ for root, dirs, files in walk_it:
+ if root == self.link_path:
+ self.assertEqual(dirs, [])
+ self.assertEqual(files, ["tmp4"])
+ break
+ else:
+ self.fail("Didn't follow symlink with followlinks=True")
def tearDown(self):
# Tear everything down. This is a decent use for bottom-up on
@@ -775,6 +862,14 @@ class WalkTests(unittest.TestCase):
class FwalkTests(WalkTests):
"""Tests for os.fwalk()."""
+ def walk(self, directory, topdown=True, follow_symlinks=False):
+ walk_it = os.fwalk(directory,
+ topdown=topdown,
+ follow_symlinks=follow_symlinks)
+ for root, dirs, files, root_fd in walk_it:
+ yield (root, dirs, files)
+
+
def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
"""
compare with walk() results.
@@ -876,6 +971,20 @@ class MakedirTests(unittest.TestCase):
os.makedirs(path, mode=mode, exist_ok=True)
os.umask(old_mask)
+ # Issue #25583: A drive root could raise PermissionError on Windows
+ os.makedirs(os.path.abspath('/'), exist_ok=True)
+
+ @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
+ def test_chown_uid_gid_arguments_must_be_index(self):
+ stat = os.stat(support.TESTFN)
+ uid = stat.st_uid
+ gid = stat.st_gid
+ for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
+ self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
+ self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
+ self.assertIsNone(os.chown(support.TESTFN, uid, gid))
+ self.assertIsNone(os.chown(support.TESTFN, -1, -1))
+
def test_exist_ok_s_isgid_directory(self):
path = os.path.join(support.TESTFN, 'dir1')
S_ISGID = stat.S_ISGID
@@ -1007,6 +1116,12 @@ class URandomTests(unittest.TestCase):
data2 = self.get_urandom_subprocess(16)
self.assertNotEqual(data1, data2)
+
+HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
+
+@unittest.skipIf(HAVE_GETENTROPY,
+ "getentropy() does not use a file descriptor")
+class URandomFDTests(unittest.TestCase):
@unittest.skipUnless(resource, "test requires the resource module")
def test_urandom_failure(self):
# Check urandom() failing when it is not able to open /dev/random.
@@ -1030,6 +1145,49 @@ class URandomTests(unittest.TestCase):
"""
assert_python_ok('-c', code)
+ def test_urandom_fd_closed(self):
+ # Issue #21207: urandom() should reopen its fd to /dev/urandom if
+ # closed.
+ code = """if 1:
+ import os
+ import sys
+ os.urandom(4)
+ os.closerange(3, 256)
+ sys.stdout.buffer.write(os.urandom(4))
+ """
+ rc, out, err = assert_python_ok('-Sc', code)
+
+ def test_urandom_fd_reopened(self):
+ # Issue #21207: urandom() should detect its fd to /dev/urandom
+ # changed to something else, and reopen it.
+ with open(support.TESTFN, 'wb') as f:
+ f.write(b"x" * 256)
+ self.addCleanup(os.unlink, support.TESTFN)
+ code = """if 1:
+ import os
+ import sys
+ os.urandom(4)
+ for fd in range(3, 256):
+ try:
+ os.close(fd)
+ except OSError:
+ pass
+ else:
+ # Found the urandom fd (XXX hopefully)
+ break
+ os.closerange(3, 256)
+ with open({TESTFN!r}, 'rb') as f:
+ os.dup2(f.fileno(), fd)
+ sys.stdout.buffer.write(os.urandom(4))
+ sys.stdout.buffer.write(os.urandom(4))
+ """.format(TESTFN=support.TESTFN)
+ rc, out, err = assert_python_ok('-Sc', code)
+ self.assertEqual(len(out), 8)
+ self.assertNotEqual(out[0:4], out[4:8])
+ rc, out2, err2 = assert_python_ok('-Sc', code)
+ self.assertEqual(len(out2), 8)
+ self.assertNotEqual(out2, out)
+
@contextlib.contextmanager
def _execvpe_mockup(defpath=None):
@@ -1132,27 +1290,27 @@ class ExecTests(unittest.TestCase):
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32ErrorTests(unittest.TestCase):
def test_rename(self):
- self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
+ self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
def test_remove(self):
- self.assertRaises(WindowsError, os.remove, support.TESTFN)
+ self.assertRaises(OSError, os.remove, support.TESTFN)
def test_chdir(self):
- self.assertRaises(WindowsError, os.chdir, support.TESTFN)
+ self.assertRaises(OSError, os.chdir, support.TESTFN)
def test_mkdir(self):
f = open(support.TESTFN, "w")
try:
- self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
+ self.assertRaises(OSError, os.mkdir, support.TESTFN)
finally:
f.close()
os.unlink(support.TESTFN)
def test_utime(self):
- self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
+ self.assertRaises(OSError, os.utime, support.TESTFN, None)
def test_chmod(self):
- self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
+ self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
class TestInvalidFD(unittest.TestCase):
singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
@@ -1173,7 +1331,7 @@ class TestInvalidFD(unittest.TestCase):
except OSError as e:
self.assertEqual(e.errno, errno.EBADF)
else:
- self.fail("%r didn't raise a OSError with a bad file descriptor"
+ self.fail("%r didn't raise an OSError with a bad file descriptor"
% f)
@unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
@@ -1286,41 +1444,57 @@ class PosixUidGidTests(unittest.TestCase):
@unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
def test_setuid(self):
if os.getuid() != 0:
- self.assertRaises(os.error, os.setuid, 0)
+ self.assertRaises(OSError, os.setuid, 0)
self.assertRaises(OverflowError, os.setuid, 1<<32)
@unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
def test_setgid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
- self.assertRaises(os.error, os.setgid, 0)
+ self.assertRaises(OSError, os.setgid, 0)
self.assertRaises(OverflowError, os.setgid, 1<<32)
@unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
def test_seteuid(self):
if os.getuid() != 0:
- self.assertRaises(os.error, os.seteuid, 0)
+ self.assertRaises(OSError, os.seteuid, 0)
self.assertRaises(OverflowError, os.seteuid, 1<<32)
@unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
def test_setegid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
- self.assertRaises(os.error, os.setegid, 0)
+ self.assertRaises(OSError, os.setegid, 0)
self.assertRaises(OverflowError, os.setegid, 1<<32)
@unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
def test_setreuid(self):
if os.getuid() != 0:
- self.assertRaises(os.error, os.setreuid, 0, 0)
+ self.assertRaises(OSError, os.setreuid, 0, 0)
self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
+ @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
+ def test_setreuid_neg1(self):
+ # Needs to accept -1. We run this in a subprocess to avoid
+ # altering the test runner's process state (issue8045).
+ subprocess.check_call([
+ sys.executable, '-c',
+ 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
+
@unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
def test_setregid(self):
if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
- self.assertRaises(os.error, os.setregid, 0, 0)
+ self.assertRaises(OSError, os.setregid, 0, 0)
self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
+ @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
+ def test_setregid_neg1(self):
+ # Needs to accept -1. We run this in a subprocess to avoid
+ # altering the test runner's process state (issue8045).
+ subprocess.check_call([
+ sys.executable, '-c',
+ 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
+
@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
class Pep383Tests(unittest.TestCase):
def setUp(self):
@@ -1486,7 +1660,7 @@ class Win32KillTests(unittest.TestCase):
os.kill(proc.pid, signal.SIGINT)
self.fail("subprocess did not stop on {}".format(name))
- @unittest.skip("subprocesses aren't inheriting CTRL+C property")
+ @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
def test_CTRL_C_EVENT(self):
from ctypes import wintypes
import ctypes
@@ -1499,7 +1673,7 @@ class Win32KillTests(unittest.TestCase):
SetConsoleCtrlHandler.restype = wintypes.BOOL
# Calling this with NULL and FALSE causes the calling process to
- # handle CTRL+C, rather than ignore it. This property is inherited
+ # handle Ctrl+C, rather than ignore it. This property is inherited
# by subprocesses.
SetConsoleCtrlHandler(NULL, 0)
@@ -1510,6 +1684,52 @@ class Win32KillTests(unittest.TestCase):
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
+class Win32ListdirTests(unittest.TestCase):
+ """Test listdir on Windows."""
+
+ def setUp(self):
+ self.created_paths = []
+ for i in range(2):
+ dir_name = 'SUB%d' % i
+ dir_path = os.path.join(support.TESTFN, dir_name)
+ file_name = 'FILE%d' % i
+ file_path = os.path.join(support.TESTFN, file_name)
+ os.makedirs(dir_path)
+ with open(file_path, 'w') as f:
+ f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
+ self.created_paths.extend([dir_name, file_name])
+ self.created_paths.sort()
+
+ def tearDown(self):
+ shutil.rmtree(support.TESTFN)
+
+ def test_listdir_no_extended_path(self):
+ """Test when the path is not an "extended" path."""
+ # unicode
+ self.assertEqual(
+ sorted(os.listdir(support.TESTFN)),
+ self.created_paths)
+ # bytes
+ self.assertEqual(
+ sorted(os.listdir(os.fsencode(support.TESTFN))),
+ [os.fsencode(path) for path in self.created_paths])
+
+ def test_listdir_extended_path(self):
+ """Test when the path starts with '\\\\?\\'."""
+ # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
+ # unicode
+ path = '\\\\?\\' + os.path.abspath(support.TESTFN)
+ self.assertEqual(
+ sorted(os.listdir(path)),
+ self.created_paths)
+ # bytes
+ path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
+ self.assertEqual(
+ sorted(os.listdir(path)),
+ [os.fsencode(path) for path in self.created_paths])
+
+
+@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
@support.skip_unless_symlink
class Win32SymlinkTests(unittest.TestCase):
filelink = 'filelinktest'
@@ -1947,6 +2167,14 @@ class TestSendfile(unittest.TestCase):
os.sendfile(self.sockno, self.fileno, -1, 4096)
self.assertEqual(cm.exception.errno, errno.EINVAL)
+ def test_keywords(self):
+ # Keyword arguments should be supported
+ os.sendfile(out=self.sockno, offset=0, count=4096,
+ **{'in': self.fileno})
+ if self.SUPPORT_HEADERS_TRAILERS:
+ os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
+ headers=(), trailers=(), flags=0)
+
# --- headers / trailers tests
@requires_headers_trailers
@@ -2174,23 +2402,217 @@ class TermsizeTests(unittest.TestCase):
self.assertEqual(expected, actual)
+class OSErrorTests(unittest.TestCase):
+ def setUp(self):
+ class Str(str):
+ pass
+
+ self.bytes_filenames = []
+ self.unicode_filenames = []
+ if support.TESTFN_UNENCODABLE is not None:
+ decoded = support.TESTFN_UNENCODABLE
+ else:
+ decoded = support.TESTFN
+ self.unicode_filenames.append(decoded)
+ self.unicode_filenames.append(Str(decoded))
+ if support.TESTFN_UNDECODABLE is not None:
+ encoded = support.TESTFN_UNDECODABLE
+ else:
+ encoded = os.fsencode(support.TESTFN)
+ self.bytes_filenames.append(encoded)
+ self.bytes_filenames.append(memoryview(encoded))
+
+ self.filenames = self.bytes_filenames + self.unicode_filenames
+
+ def test_oserror_filename(self):
+ funcs = [
+ (self.filenames, os.chdir,),
+ (self.filenames, os.chmod, 0o777),
+ (self.filenames, os.lstat,),
+ (self.filenames, os.open, os.O_RDONLY),
+ (self.filenames, os.rmdir,),
+ (self.filenames, os.stat,),
+ (self.filenames, os.unlink,),
+ ]
+ if sys.platform == "win32":
+ funcs.extend((
+ (self.bytes_filenames, os.rename, b"dst"),
+ (self.bytes_filenames, os.replace, b"dst"),
+ (self.unicode_filenames, os.rename, "dst"),
+ (self.unicode_filenames, os.replace, "dst"),
+ # Issue #16414: Don't test undecodable names with listdir()
+ # because of a Windows bug.
+ #
+ # With the ANSI code page 932, os.listdir(b'\xe7') return an
+ # empty list (instead of failing), whereas os.listdir(b'\xff')
+ # raises a FileNotFoundError. It looks like a Windows bug:
+ # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
+ # fails with ERROR_FILE_NOT_FOUND (2), instead of
+ # ERROR_PATH_NOT_FOUND (3).
+ (self.unicode_filenames, os.listdir,),
+ ))
+ else:
+ funcs.extend((
+ (self.filenames, os.listdir,),
+ (self.filenames, os.rename, "dst"),
+ (self.filenames, os.replace, "dst"),
+ ))
+ if hasattr(os, "chown"):
+ funcs.append((self.filenames, os.chown, 0, 0))
+ if hasattr(os, "lchown"):
+ funcs.append((self.filenames, os.lchown, 0, 0))
+ if hasattr(os, "truncate"):
+ funcs.append((self.filenames, os.truncate, 0))
+ if hasattr(os, "chflags"):
+ funcs.append((self.filenames, os.chflags, 0))
+ if hasattr(os, "lchflags"):
+ funcs.append((self.filenames, os.lchflags, 0))
+ if hasattr(os, "chroot"):
+ funcs.append((self.filenames, os.chroot,))
+ if hasattr(os, "link"):
+ if sys.platform == "win32":
+ funcs.append((self.bytes_filenames, os.link, b"dst"))
+ funcs.append((self.unicode_filenames, os.link, "dst"))
+ else:
+ funcs.append((self.filenames, os.link, "dst"))
+ if hasattr(os, "listxattr"):
+ funcs.extend((
+ (self.filenames, os.listxattr,),
+ (self.filenames, os.getxattr, "user.test"),
+ (self.filenames, os.setxattr, "user.test", b'user'),
+ (self.filenames, os.removexattr, "user.test"),
+ ))
+ if hasattr(os, "lchmod"):
+ funcs.append((self.filenames, os.lchmod, 0o777))
+ if hasattr(os, "readlink"):
+ if sys.platform == "win32":
+ funcs.append((self.unicode_filenames, os.readlink,))
+ else:
+ funcs.append((self.filenames, os.readlink,))
+
+ for filenames, func, *func_args in funcs:
+ for name in filenames:
+ try:
+ func(name, *func_args)
+ except OSError as err:
+ self.assertIs(err.filename, name)
+ else:
+ self.fail("No exception thrown by {}".format(func))
+
+class CPUCountTests(unittest.TestCase):
+ def test_cpu_count(self):
+ cpus = os.cpu_count()
+ if cpus is not None:
+ self.assertIsInstance(cpus, int)
+ self.assertGreater(cpus, 0)
+ else:
+ self.skipTest("Could not determine the number of CPUs")
+
+
+class FDInheritanceTests(unittest.TestCase):
+ def test_get_set_inheritable(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(os.get_inheritable(fd), False)
+
+ os.set_inheritable(fd, True)
+ self.assertEqual(os.get_inheritable(fd), True)
+
+ @unittest.skipIf(fcntl is None, "need fcntl")
+ def test_get_inheritable_cloexec(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(os.get_inheritable(fd), False)
+
+ # clear FD_CLOEXEC flag
+ flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+ flags &= ~fcntl.FD_CLOEXEC
+ fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+
+ self.assertEqual(os.get_inheritable(fd), True)
+
+ @unittest.skipIf(fcntl is None, "need fcntl")
+ def test_set_inheritable_cloexec(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
+ fcntl.FD_CLOEXEC)
+
+ os.set_inheritable(fd, True)
+ self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
+ 0)
+
+ def test_open(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(os.get_inheritable(fd), False)
+
+ @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
+ def test_pipe(self):
+ rfd, wfd = os.pipe()
+ self.addCleanup(os.close, rfd)
+ self.addCleanup(os.close, wfd)
+ self.assertEqual(os.get_inheritable(rfd), False)
+ self.assertEqual(os.get_inheritable(wfd), False)
+
+ def test_dup(self):
+ fd1 = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd1)
+
+ fd2 = os.dup(fd1)
+ self.addCleanup(os.close, fd2)
+ self.assertEqual(os.get_inheritable(fd2), False)
+
+ @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
+ def test_dup2(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+
+ # inheritable by default
+ fd2 = os.open(__file__, os.O_RDONLY)
+ try:
+ os.dup2(fd, fd2)
+ self.assertEqual(os.get_inheritable(fd2), True)
+ finally:
+ os.close(fd2)
+
+ # force non-inheritable
+ fd3 = os.open(__file__, os.O_RDONLY)
+ try:
+ os.dup2(fd, fd3, inheritable=False)
+ self.assertEqual(os.get_inheritable(fd3), False)
+ finally:
+ os.close(fd3)
+
+ @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
+ def test_openpty(self):
+ master_fd, slave_fd = os.openpty()
+ self.addCleanup(os.close, master_fd)
+ self.addCleanup(os.close, slave_fd)
+ self.assertEqual(os.get_inheritable(master_fd), False)
+ self.assertEqual(os.get_inheritable(slave_fd), False)
+
+
@support.reap_threads
def test_main():
support.run_unittest(
FileTests,
StatAttributeTests,
+ UtimeTests,
EnvironTests,
WalkTests,
FwalkTests,
MakedirTests,
DevNullTests,
URandomTests,
+ URandomFDTests,
ExecTests,
Win32ErrorTests,
TestInvalidFD,
PosixUidGidTests,
Pep383Tests,
Win32KillTests,
+ Win32ListdirTests,
Win32SymlinkTests,
NonLocalSymlinkTests,
FSEncodingTests,
@@ -2203,7 +2625,10 @@ def test_main():
ExtendedAttributeTests,
Win32DeprecatedBytesAPI,
TermsizeTests,
+ OSErrorTests,
RemoveDirsTests,
+ CPUCountTests,
+ FDInheritanceTests,
)
if __name__ == "__main__":