summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_os.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_os.py')
-rw-r--r--Lib/test/test_os.py452
1 files changed, 278 insertions, 174 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 874f9e4..6853ebb 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -15,7 +15,6 @@ import locale
import mmap
import os
import pickle
-import platform
import re
import shutil
import signal
@@ -66,6 +65,7 @@ except ImportError:
from test.support.script_helper import assert_python_ok
+
root_in_posix = False
if hasattr(os, 'geteuid'):
root_in_posix = (os.geteuid() == 0)
@@ -82,6 +82,28 @@ else:
# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
+
+@contextlib.contextmanager
+def ignore_deprecation_warnings(msg_regex, quiet=False):
+ with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
+ yield
+
+
+@contextlib.contextmanager
+def bytes_filename_warn(expected):
+ msg = 'The Windows bytes API has been deprecated'
+ if os.name == 'nt':
+ with ignore_deprecation_warnings(msg, quiet=not expected):
+ yield
+ else:
+ yield
+
+
+def create_file(filename, content=b'content'):
+ with open(filename, "xb", 0) as fp:
+ fp.write(content)
+
+
# Tests creating TESTFN
class FileTests(unittest.TestCase):
def setUp(self):
@@ -140,9 +162,8 @@ class FileTests(unittest.TestCase):
"needs INT_MAX < PY_SSIZE_T_MAX")
@support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
def test_large_read(self, size):
- with open(support.TESTFN, "wb") as fp:
- fp.write(b'test')
self.addCleanup(support.unlink, support.TESTFN)
+ create_file(support.TESTFN, b'test')
# Issue #21932: Make sure that os.read() does not raise an
# OverflowError for size larger than INT_MAX
@@ -199,11 +220,12 @@ class FileTests(unittest.TestCase):
def test_replace(self):
TESTFN2 = support.TESTFN + ".2"
- with open(support.TESTFN, 'w') as f:
- f.write("1")
- with open(TESTFN2, 'w') as f:
- f.write("2")
- self.addCleanup(os.unlink, TESTFN2)
+ self.addCleanup(support.unlink, support.TESTFN)
+ self.addCleanup(support.unlink, TESTFN2)
+
+ create_file(support.TESTFN, b"1")
+ create_file(TESTFN2, b"2")
+
os.replace(support.TESTFN, TESTFN2)
self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
with open(TESTFN2, 'r') as f:
@@ -226,15 +248,9 @@ class FileTests(unittest.TestCase):
# Test attributes on return values from os.*stat* family.
class StatAttributeTests(unittest.TestCase):
def setUp(self):
- os.mkdir(support.TESTFN)
- self.fname = os.path.join(support.TESTFN, "f1")
- f = open(self.fname, 'wb')
- f.write(b"ABC")
- f.close()
-
- def tearDown(self):
- os.unlink(self.fname)
- os.rmdir(support.TESTFN)
+ self.fname = support.TESTFN
+ self.addCleanup(support.unlink, self.fname)
+ create_file(self.fname, b"ABC")
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def check_stat_attributes(self, fname):
@@ -310,8 +326,7 @@ class StatAttributeTests(unittest.TestCase):
fname = self.fname.encode(sys.getfilesystemencoding())
except UnicodeEncodeError:
self.skipTest("cannot encode %a for the filesystem" % self.fname)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
+ with bytes_filename_warn(True):
self.check_stat_attributes(fname)
def test_stat_result_pickle(self):
@@ -426,7 +441,11 @@ class StatAttributeTests(unittest.TestCase):
0)
# test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
- result = os.stat(support.TESTFN)
+ dirname = support.TESTFN + "dir"
+ os.mkdir(dirname)
+ self.addCleanup(os.rmdir, dirname)
+
+ result = os.stat(dirname)
self.check_file_attributes(result)
self.assertEqual(
result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
@@ -440,19 +459,14 @@ class UtimeTests(unittest.TestCase):
self.addCleanup(support.rmtree, self.dirname)
os.mkdir(self.dirname)
- with open(self.fname, 'wb') as fp:
- fp.write(b"ABC")
+ create_file(self.fname)
def restore_float_times(state):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
-
+ with ignore_deprecation_warnings('stat_float_times'):
os.stat_float_times(state)
# ensure that st_atime and st_mtime are float
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
-
+ with ignore_deprecation_warnings('stat_float_times'):
old_float_times = os.stat_float_times(-1)
self.addCleanup(restore_float_times, old_float_times)
@@ -544,7 +558,7 @@ class UtimeTests(unittest.TestCase):
"fd support for utime required for this test.")
def test_utime_fd(self):
def set_time(filename, ns):
- with open(filename, 'wb') as fp:
+ with open(filename, 'wb', 0) as fp:
# use a file descriptor to test futimens(timespec)
# or futimes(timeval)
os.utime(fp.fileno(), ns=ns)
@@ -798,6 +812,7 @@ class WalkTests(unittest.TestCase):
def setUp(self):
join = os.path.join
+ self.addCleanup(support.rmtree, support.TESTFN)
# Build:
# TESTFN/
@@ -830,9 +845,8 @@ class WalkTests(unittest.TestCase):
os.makedirs(t2_path)
for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
- f = open(path, "w")
- f.write("I'm " + path + " and proud of it. Blame test_os.\n")
- f.close()
+ with open(path, "x") as f:
+ f.write("I'm " + path + " and proud of it. Blame test_os.\n")
if support.can_symlink():
os.symlink(os.path.abspath(t2_path), self.link_path)
@@ -878,7 +892,7 @@ class WalkTests(unittest.TestCase):
# Walk bottom-up.
all = list(self.walk(self.walk_path, topdown=False))
- self.assertEqual(len(all), 4)
+ self.assertEqual(len(all), 4, all)
# We can't know which order SUB1 and SUB2 will appear in.
# Not flipped: SUB11, SUB1, SUB2, TESTFN
# flipped: SUB2, SUB11, SUB1, TESTFN
@@ -908,22 +922,6 @@ class WalkTests(unittest.TestCase):
else:
self.fail("Didn't follow symlink with followlinks=True")
- def tearDown(self):
- # Tear everything down. This is a decent use for bottom-up on
- # Windows, which doesn't have a recursive delete command. The
- # (not so) subtlety is that rmdir will fail unless the dir's
- # kids are removed first, so bottom up is essential.
- for root, dirs, files in os.walk(support.TESTFN, topdown=False):
- for name in files:
- os.remove(os.path.join(root, name))
- for name in dirs:
- dirname = os.path.join(root, name)
- if not os.path.islink(dirname):
- os.rmdir(dirname)
- else:
- os.remove(dirname)
- os.rmdir(support.TESTFN)
-
def test_walk_bad_dir(self):
# Walk top-down.
errors = []
@@ -1006,27 +1004,13 @@ class FwalkTests(WalkTests):
self.addCleanup(os.close, newfd)
self.assertEqual(newfd, minfd)
- def tearDown(self):
- # cleanup
- for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
- for name in files:
- os.unlink(name, dir_fd=rootfd)
- for name in dirs:
- st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
- if stat.S_ISDIR(st.st_mode):
- os.rmdir(name, dir_fd=rootfd)
- else:
- os.unlink(name, dir_fd=rootfd)
- os.rmdir(support.TESTFN)
-
class BytesWalkTests(WalkTests):
"""Tests for os.walk() with bytes."""
def setUp(self):
super().setUp()
self.stack = contextlib.ExitStack()
if os.name == 'nt':
- self.stack.enter_context(warnings.catch_warnings())
- warnings.simplefilter("ignore", DeprecationWarning)
+ self.stack.enter_context(bytes_filename_warn(False))
def tearDown(self):
self.stack.close()
@@ -1203,8 +1187,7 @@ class RemoveDirsTests(unittest.TestCase):
os.mkdir(dira)
dirb = os.path.join(dira, 'dirb')
os.mkdir(dirb)
- with open(os.path.join(dira, 'file.txt'), 'w') as f:
- f.write('text')
+ create_file(os.path.join(dira, 'file.txt'))
os.removedirs(dirb)
self.assertFalse(os.path.exists(dirb))
self.assertTrue(os.path.exists(dira))
@@ -1215,8 +1198,7 @@ class RemoveDirsTests(unittest.TestCase):
os.mkdir(dira)
dirb = os.path.join(dira, 'dirb')
os.mkdir(dirb)
- with open(os.path.join(dirb, 'file.txt'), 'w') as f:
- f.write('text')
+ create_file(os.path.join(dirb, 'file.txt'))
with self.assertRaises(OSError):
os.removedirs(dirb)
self.assertTrue(os.path.exists(dirb))
@@ -1226,7 +1208,7 @@ class RemoveDirsTests(unittest.TestCase):
class DevNullTests(unittest.TestCase):
def test_devnull(self):
- with open(os.devnull, 'wb') as f:
+ with open(os.devnull, 'wb', 0) as f:
f.write(b'hello')
f.close()
with open(os.devnull, 'rb') as f:
@@ -1313,9 +1295,9 @@ class URandomFDTests(unittest.TestCase):
def test_urandom_fd_reopened(self):
# Issue #21207: urandom() should detect its fd to /dev/urandom
# changed to something else, and reopen it.
- with open(support.TESTFN, 'wb') as f:
- f.write(b"x" * 256)
- self.addCleanup(os.unlink, support.TESTFN)
+ self.addCleanup(support.unlink, support.TESTFN)
+ create_file(support.TESTFN, b"x" * 256)
+
code = """if 1:
import os
import sys
@@ -1444,6 +1426,18 @@ class ExecTests(unittest.TestCase):
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32ErrorTests(unittest.TestCase):
+ def setUp(self):
+ try:
+ os.stat(support.TESTFN)
+ except FileNotFoundError:
+ exists = False
+ except OSError as exc:
+ exists = True
+ self.fail("file %s must not exist; os.stat failed with %s"
+ % (support.TESTFN, exc))
+ else:
+ self.fail("file %s must not exist" % support.TESTFN)
+
def test_rename(self):
self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
@@ -1454,12 +1448,10 @@ class Win32ErrorTests(unittest.TestCase):
self.assertRaises(OSError, os.chdir, support.TESTFN)
def test_mkdir(self):
- f = open(support.TESTFN, "w")
- try:
+ self.addCleanup(support.unlink, support.TESTFN)
+
+ with open(support.TESTFN, "x") as f:
self.assertRaises(OSError, os.mkdir, support.TESTFN)
- finally:
- f.close()
- os.unlink(support.TESTFN)
def test_utime(self):
self.assertRaises(OSError, os.utime, support.TESTFN, None)
@@ -1467,6 +1459,7 @@ class Win32ErrorTests(unittest.TestCase):
def test_chmod(self):
self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
+
class TestInvalidFD(unittest.TestCase):
singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
"fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
@@ -1578,11 +1571,9 @@ class LinkTests(unittest.TestCase):
os.unlink(file)
def _test_link(self, file1, file2):
- with open(file1, "w") as f1:
- f1.write("test")
+ create_file(file1)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
+ with bytes_filename_warn(False):
os.link(file1, file2)
with open(file1, "r") as f1, open(file2, "r") as f2:
self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
@@ -1874,10 +1865,12 @@ class Win32ListdirTests(unittest.TestCase):
self.assertEqual(
sorted(os.listdir(support.TESTFN)),
self.created_paths)
+
# bytes
- self.assertEqual(
- sorted(os.listdir(os.fsencode(support.TESTFN))),
- [os.fsencode(path) for path in self.created_paths])
+ with bytes_filename_warn(False):
+ self.assertEqual(
+ sorted(os.listdir(os.fsencode(support.TESTFN))),
+ [os.fsencode(path) for path in self.created_paths])
def test_listdir_extended_path(self):
"""Test when the path starts with '\\\\?\\'."""
@@ -1887,11 +1880,13 @@ class Win32ListdirTests(unittest.TestCase):
self.assertEqual(
sorted(os.listdir(path)),
self.created_paths)
+
# bytes
- path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
- self.assertEqual(
- sorted(os.listdir(path)),
- [os.fsencode(path) for path in self.created_paths])
+ with bytes_filename_warn(False):
+ path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
+ self.assertEqual(
+ sorted(os.listdir(path)),
+ [os.fsencode(path) for path in self.created_paths])
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
@@ -1966,51 +1961,45 @@ class Win32SymlinkTests(unittest.TestCase):
self.assertNotEqual(os.lstat(link), os.stat(link))
bytes_link = os.fsencode(link)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
+ with bytes_filename_warn(True):
self.assertEqual(os.stat(bytes_link), os.stat(target))
+ with bytes_filename_warn(True):
self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
def test_12084(self):
level1 = os.path.abspath(support.TESTFN)
level2 = os.path.join(level1, "level2")
level3 = os.path.join(level2, "level3")
- try:
- os.mkdir(level1)
- os.mkdir(level2)
- os.mkdir(level3)
+ self.addCleanup(support.rmtree, level1)
- file1 = os.path.abspath(os.path.join(level1, "file1"))
+ os.mkdir(level1)
+ os.mkdir(level2)
+ os.mkdir(level3)
- with open(file1, "w") as f:
- f.write("file1")
+ file1 = os.path.abspath(os.path.join(level1, "file1"))
+ create_file(file1)
- orig_dir = os.getcwd()
- try:
- os.chdir(level2)
- link = os.path.join(level2, "link")
- os.symlink(os.path.relpath(file1), "link")
- self.assertIn("link", os.listdir(os.getcwd()))
-
- # Check os.stat calls from the same dir as the link
- self.assertEqual(os.stat(file1), os.stat("link"))
-
- # Check os.stat calls from a dir below the link
- os.chdir(level1)
- self.assertEqual(os.stat(file1),
- os.stat(os.path.relpath(link)))
-
- # Check os.stat calls from a dir above the link
- os.chdir(level3)
- self.assertEqual(os.stat(file1),
- os.stat(os.path.relpath(link)))
- finally:
- os.chdir(orig_dir)
- except OSError as err:
- self.fail(err)
+ orig_dir = os.getcwd()
+ try:
+ os.chdir(level2)
+ link = os.path.join(level2, "link")
+ os.symlink(os.path.relpath(file1), "link")
+ self.assertIn("link", os.listdir(os.getcwd()))
+
+ # Check os.stat calls from the same dir as the link
+ self.assertEqual(os.stat(file1), os.stat("link"))
+
+ # Check os.stat calls from a dir below the link
+ os.chdir(level1)
+ self.assertEqual(os.stat(file1),
+ os.stat(os.path.relpath(link)))
+
+ # Check os.stat calls from a dir above the link
+ os.chdir(level3)
+ self.assertEqual(os.stat(file1),
+ os.stat(os.path.relpath(link)))
finally:
- os.remove(file1)
- shutil.rmtree(level1)
+ os.chdir(orig_dir)
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
@@ -2146,8 +2135,8 @@ class ProgramPriorityTests(unittest.TestCase):
try:
new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
if base >= 19 and new_prio <= 19:
- raise unittest.SkipTest(
- "unable to reliably test setpriority at current nice level of %s" % base)
+ raise unittest.SkipTest("unable to reliably test setpriority "
+ "at current nice level of %s" % base)
else:
self.assertEqual(new_prio, base + 1)
finally:
@@ -2257,8 +2246,7 @@ class TestSendfile(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.key = support.threading_setup()
- with open(support.TESTFN, "wb") as f:
- f.write(cls.DATA)
+ create_file(support.TESTFN, cls.DATA)
@classmethod
def tearDownClass(cls):
@@ -2408,10 +2396,11 @@ class TestSendfile(unittest.TestCase):
def test_trailers(self):
TESTFN2 = support.TESTFN + "2"
file_data = b"abcdef"
- with open(TESTFN2, 'wb') as f:
- f.write(file_data)
- with open(TESTFN2, 'rb')as f:
- self.addCleanup(os.remove, TESTFN2)
+
+ self.addCleanup(support.unlink, TESTFN2)
+ create_file(TESTFN2, file_data)
+
+ with open(TESTFN2, 'rb') as f:
os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
trailers=[b"1234"])
self.client.close()
@@ -2434,35 +2423,37 @@ class TestSendfile(unittest.TestCase):
def supports_extended_attributes():
if not hasattr(os, "setxattr"):
return False
+
try:
- with open(support.TESTFN, "wb") as fp:
+ with open(support.TESTFN, "xb", 0) as fp:
try:
os.setxattr(fp.fileno(), b"user.test", b"")
except OSError:
return False
finally:
support.unlink(support.TESTFN)
- # Kernels < 2.6.39 don't respect setxattr flags.
- kernel_version = platform.release()
- m = re.match("2.6.(\d{1,2})", kernel_version)
- return m is None or int(m.group(1)) >= 39
+
+ return True
@unittest.skipUnless(supports_extended_attributes(),
"no non-broken extended attribute support")
+# Kernels < 2.6.39 don't respect setxattr flags.
+@support.requires_linux_version(2, 6, 39)
class ExtendedAttributeTests(unittest.TestCase):
- def tearDown(self):
- support.unlink(support.TESTFN)
-
def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
fn = support.TESTFN
- open(fn, "wb").close()
+ self.addCleanup(support.unlink, fn)
+ create_file(fn)
+
with self.assertRaises(OSError) as cm:
getxattr(fn, s("user.test"), **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA)
+
init_xattr = listxattr(fn)
self.assertIsInstance(init_xattr, list)
+
setxattr(fn, s("user.test"), b"", **kwargs)
xattr = set(init_xattr)
xattr.add("user.test")
@@ -2470,19 +2461,24 @@ class ExtendedAttributeTests(unittest.TestCase):
self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
+
with self.assertRaises(OSError) as cm:
setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
self.assertEqual(cm.exception.errno, errno.EEXIST)
+
with self.assertRaises(OSError) as cm:
setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA)
+
setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
xattr.add("user.test2")
self.assertEqual(set(listxattr(fn)), xattr)
removexattr(fn, s("user.test"), **kwargs)
+
with self.assertRaises(OSError) as cm:
getxattr(fn, s("user.test"), **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA)
+
xattr.remove("user.test")
self.assertEqual(set(listxattr(fn)), xattr)
self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
@@ -2495,11 +2491,11 @@ class ExtendedAttributeTests(unittest.TestCase):
self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
def _check_xattrs(self, *args, **kwargs):
- def make_bytes(s):
- return bytes(s, "ascii")
self._check_xattrs_str(str, *args, **kwargs)
support.unlink(support.TESTFN)
- self._check_xattrs_str(make_bytes, *args, **kwargs)
+
+ self._check_xattrs_str(os.fsencode, *args, **kwargs)
+ support.unlink(support.TESTFN)
def test_simple(self):
self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
@@ -2514,10 +2510,10 @@ class ExtendedAttributeTests(unittest.TestCase):
with open(path, "rb") as fp:
return os.getxattr(fp.fileno(), *args)
def setxattr(path, *args):
- with open(path, "wb") as fp:
+ with open(path, "wb", 0) as fp:
os.setxattr(fp.fileno(), *args)
def removexattr(path, *args):
- with open(path, "wb") as fp:
+ with open(path, "wb", 0) as fp:
os.removexattr(fp.fileno(), *args)
def listxattr(path, *args):
with open(path, "rb") as fp:
@@ -2530,36 +2526,39 @@ class Win32DeprecatedBytesAPI(unittest.TestCase):
def test_deprecated(self):
import nt
filename = os.fsencode(support.TESTFN)
- with warnings.catch_warnings():
- warnings.simplefilter("error", DeprecationWarning)
- for func, *args in (
- (nt._getfullpathname, filename),
- (nt._isdir, filename),
- (os.access, filename, os.R_OK),
- (os.chdir, filename),
- (os.chmod, filename, 0o777),
- (os.getcwdb,),
- (os.link, filename, filename),
- (os.listdir, filename),
- (os.lstat, filename),
- (os.mkdir, filename),
- (os.open, filename, os.O_RDONLY),
- (os.rename, filename, filename),
- (os.rmdir, filename),
- (os.startfile, filename),
- (os.stat, filename),
- (os.unlink, filename),
- (os.utime, filename),
- ):
- self.assertRaises(DeprecationWarning, func, *args)
+ for func, *args in (
+ (nt._getfullpathname, filename),
+ (nt._isdir, filename),
+ (os.access, filename, os.R_OK),
+ (os.chdir, filename),
+ (os.chmod, filename, 0o777),
+ (os.getcwdb,),
+ (os.link, filename, filename),
+ (os.listdir, filename),
+ (os.lstat, filename),
+ (os.mkdir, filename),
+ (os.open, filename, os.O_RDONLY),
+ (os.rename, filename, filename),
+ (os.rmdir, filename),
+ (os.startfile, filename),
+ (os.stat, filename),
+ (os.unlink, filename),
+ (os.utime, filename),
+ ):
+ with bytes_filename_warn(True):
+ try:
+ func(*args)
+ except OSError:
+ # ignore OSError, we only care about DeprecationWarning
+ pass
@support.skip_unless_symlink
def test_symlink(self):
+ self.addCleanup(support.unlink, support.TESTFN)
+
filename = os.fsencode(support.TESTFN)
- with warnings.catch_warnings():
- warnings.simplefilter("error", DeprecationWarning)
- self.assertRaises(DeprecationWarning,
- os.symlink, filename, filename)
+ with bytes_filename_warn(True):
+ os.symlink(filename, filename)
@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
@@ -2697,7 +2696,8 @@ class OSErrorTests(unittest.TestCase):
for filenames, func, *func_args in funcs:
for name in filenames:
try:
- func(name, *func_args)
+ with bytes_filename_warn(False):
+ func(name, *func_args)
except OSError as err:
self.assertIs(err.filename, name)
else:
@@ -2820,6 +2820,8 @@ class ExportsTests(unittest.TestCase):
class TestScandir(unittest.TestCase):
+ check_no_resource_warning = support.check_no_resource_warning
+
def setUp(self):
self.path = os.path.realpath(support.TESTFN)
self.addCleanup(support.rmtree, self.path)
@@ -2827,8 +2829,7 @@ class TestScandir(unittest.TestCase):
def create_file(self, name="file.txt"):
filename = os.path.join(self.path, name)
- with open(filename, "wb") as fp:
- fp.write(b'python')
+ create_file(filename, b'python')
return filename
def get_entries(self, names):
@@ -3010,7 +3011,8 @@ class TestScandir(unittest.TestCase):
def test_bytes(self):
if os.name == "nt":
# On Windows, os.scandir(bytes) must raise an exception
- self.assertRaises(TypeError, os.scandir, b'.')
+ with bytes_filename_warn(True):
+ self.assertRaises(TypeError, os.scandir, b'.')
return
self.create_file("file.txt")
@@ -3042,6 +3044,108 @@ class TestScandir(unittest.TestCase):
for obj in [1234, 1.234, {}, []]:
self.assertRaises(TypeError, os.scandir, obj)
+ def test_close(self):
+ self.create_file("file.txt")
+ self.create_file("file2.txt")
+ iterator = os.scandir(self.path)
+ next(iterator)
+ iterator.close()
+ # multiple closes
+ iterator.close()
+ with self.check_no_resource_warning():
+ del iterator
+
+ def test_context_manager(self):
+ self.create_file("file.txt")
+ self.create_file("file2.txt")
+ with os.scandir(self.path) as iterator:
+ next(iterator)
+ with self.check_no_resource_warning():
+ del iterator
+
+ def test_context_manager_close(self):
+ self.create_file("file.txt")
+ self.create_file("file2.txt")
+ with os.scandir(self.path) as iterator:
+ next(iterator)
+ iterator.close()
+
+ def test_context_manager_exception(self):
+ self.create_file("file.txt")
+ self.create_file("file2.txt")
+ with self.assertRaises(ZeroDivisionError):
+ with os.scandir(self.path) as iterator:
+ next(iterator)
+ 1/0
+ with self.check_no_resource_warning():
+ del iterator
+
+ def test_resource_warning(self):
+ self.create_file("file.txt")
+ self.create_file("file2.txt")
+ iterator = os.scandir(self.path)
+ next(iterator)
+ with self.assertWarns(ResourceWarning):
+ del iterator
+ support.gc_collect()
+ # exhausted iterator
+ iterator = os.scandir(self.path)
+ list(iterator)
+ with self.check_no_resource_warning():
+ del iterator
+
+
+class TestPEP519(unittest.TestCase):
+ "os.fspath()"
+
+ def test_return_bytes(self):
+ for b in b'hello', b'goodbye', b'some/path/and/file':
+ self.assertEqual(b, os.fspath(b))
+
+ def test_return_string(self):
+ for s in 'hello', 'goodbye', 'some/path/and/file':
+ self.assertEqual(s, os.fspath(s))
+
+ def test_fsencode_fsdecode_return_pathlike(self):
+ class PathLike:
+ def __init__(self, path):
+ self.path = path
+ def __fspath__(self):
+ return self.path
+
+ for p in "path/like/object", b"path/like/object":
+ pathlike = PathLike(p)
+
+ self.assertEqual(p, os.fspath(pathlike))
+ self.assertEqual(b"path/like/object", os.fsencode(pathlike))
+ self.assertEqual("path/like/object", os.fsdecode(pathlike))
+
+ def test_fspathlike(self):
+ class PathLike:
+ def __init__(self, path=''):
+ self.path = path
+ def __fspath__(self):
+ return self.path
+
+ self.assertEqual('#feelthegil', os.fspath(PathLike('#feelthegil')))
+ self.assertTrue(issubclass(PathLike, os.PathLike))
+ self.assertTrue(isinstance(PathLike(), os.PathLike))
+
+ message = 'expected str, bytes or os.PathLike object, not'
+ for fn in (os.fsencode, os.fsdecode):
+ for obj in PathLike(None), None:
+ with self.assertRaisesRegex(TypeError, message):
+ fn(obj)
+
+ def test_garbage_in_exception_out(self):
+ vapor = type('blah', (), {})
+ for o in int, type, os, vapor():
+ self.assertRaises(TypeError, os.fspath, o)
+
+ def test_argument_required(self):
+ with self.assertRaises(TypeError):
+ os.fspath()
+
if __name__ == "__main__":
unittest.main()