summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorBarney Gale <barney.gale@gmail.com>2022-02-02 12:38:25 (GMT)
committerGitHub <noreply@github.com>2022-02-02 12:38:25 (GMT)
commit08f8301b21648d58d053e1a513db8ed32fbf37dd (patch)
tree5653595bd4eb375f7931e22a3cacd47bfdcf25dd /Lib
parent187930f74c44e460ba09c60ba5d9bb4fac543d8f (diff)
downloadcpython-08f8301b21648d58d053e1a513db8ed32fbf37dd.zip
cpython-08f8301b21648d58d053e1a513db8ed32fbf37dd.tar.gz
cpython-08f8301b21648d58d053e1a513db8ed32fbf37dd.tar.bz2
bpo-43012: remove `pathlib._Accessor` (GH-25701)
Per Pitrou: > The original intent for the “accessor” thing was to have a variant that did all accesses under a filesystem tree in a race condition-free way using openat and friends. It turned out to be much too hairy to actually implement, so was entirely abandoned, but the accessor abstraction was left there. https://discuss.python.org/t/make-pathlib-extensible/3428/2 Accessors are: - Lacking any internal purpose - '_NormalAccessor' is the only implementation - Lacking any firm conceptual difference to `Path` objects themselves (inc. subclasses) - Non-public, i.e. underscore prefixed - '_Accessor' and '_NormalAccessor' - Unofficially used to implement customized `Path` objects, but once once [bpo-24132]() is addressed there will be a supported route for that. This patch preserves all existing behaviour.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/pathlib.py171
-rw-r--r--Lib/test/test_pathlib.py35
2 files changed, 76 insertions, 130 deletions
diff --git a/Lib/pathlib.py b/Lib/pathlib.py
index 1603325..920e1f4 100644
--- a/Lib/pathlib.py
+++ b/Lib/pathlib.py
@@ -275,93 +275,6 @@ _windows_flavour = _WindowsFlavour()
_posix_flavour = _PosixFlavour()
-class _Accessor:
- """An accessor implements a particular (system-specific or not) way of
- accessing paths on the filesystem."""
-
-
-class _NormalAccessor(_Accessor):
-
- stat = os.stat
-
- open = io.open
-
- listdir = os.listdir
-
- scandir = os.scandir
-
- chmod = os.chmod
-
- mkdir = os.mkdir
-
- unlink = os.unlink
-
- if hasattr(os, "link"):
- link = os.link
- else:
- def link(self, src, dst):
- raise NotImplementedError("os.link() not available on this system")
-
- rmdir = os.rmdir
-
- rename = os.rename
-
- replace = os.replace
-
- if hasattr(os, "symlink"):
- symlink = os.symlink
- else:
- def symlink(self, src, dst, target_is_directory=False):
- raise NotImplementedError("os.symlink() not available on this system")
-
- def touch(self, path, mode=0o666, exist_ok=True):
- if exist_ok:
- # First try to bump modification time
- # Implementation note: GNU touch uses the UTIME_NOW option of
- # the utimensat() / futimens() functions.
- try:
- os.utime(path, None)
- except OSError:
- # Avoid exception chaining
- pass
- else:
- return
- flags = os.O_CREAT | os.O_WRONLY
- if not exist_ok:
- flags |= os.O_EXCL
- fd = os.open(path, flags, mode)
- os.close(fd)
-
- if hasattr(os, "readlink"):
- readlink = os.readlink
- else:
- def readlink(self, path):
- raise NotImplementedError("os.readlink() not available on this system")
-
- def owner(self, path):
- try:
- import pwd
- return pwd.getpwuid(self.stat(path).st_uid).pw_name
- except ImportError:
- raise NotImplementedError("Path.owner() is unsupported on this system")
-
- def group(self, path):
- try:
- import grp
- return grp.getgrgid(self.stat(path).st_gid).gr_name
- except ImportError:
- raise NotImplementedError("Path.group() is unsupported on this system")
-
- getcwd = os.getcwd
-
- expanduser = staticmethod(os.path.expanduser)
-
- realpath = staticmethod(os.path.realpath)
-
-
-_normal_accessor = _NormalAccessor()
-
-
#
# Globbing helpers
#
@@ -402,7 +315,7 @@ class _Selector:
path_cls = type(parent_path)
is_dir = path_cls.is_dir
exists = path_cls.exists
- scandir = parent_path._accessor.scandir
+ scandir = path_cls._scandir
if not is_dir(parent_path):
return iter([])
return self._select_from(parent_path, is_dir, exists, scandir)
@@ -949,7 +862,6 @@ class Path(PurePath):
object. You can also instantiate a PosixPath or WindowsPath directly,
but cannot instantiate a WindowsPath on a POSIX system or vice versa.
"""
- _accessor = _normal_accessor
__slots__ = ()
def __new__(cls, *args, **kwargs):
@@ -988,7 +900,7 @@ class Path(PurePath):
"""Return a new path pointing to the current working directory
(as returned by os.getcwd()).
"""
- return cls(cls._accessor.getcwd())
+ return cls(os.getcwd())
@classmethod
def home(cls):
@@ -1005,16 +917,22 @@ class Path(PurePath):
try:
other_st = other_path.stat()
except AttributeError:
- other_st = self._accessor.stat(other_path)
+ other_st = self.__class__(other_path).stat()
return os.path.samestat(st, other_st)
def iterdir(self):
"""Iterate over the files in this directory. Does not yield any
result for the special paths '.' and '..'.
"""
- for name in self._accessor.listdir(self):
+ for name in os.listdir(self):
yield self._make_child_relpath(name)
+ def _scandir(self):
+ # bpo-24132: a future version of pathlib will support subclassing of
+ # pathlib.Path to customize how the filesystem is accessed. This
+ # includes scandir(), which is used to implement glob().
+ return os.scandir(self)
+
def glob(self, pattern):
"""Iterate over this subtree and yield all existing files (of any
kind, including directories) matching the given relative pattern.
@@ -1050,7 +968,7 @@ class Path(PurePath):
"""
if self.is_absolute():
return self
- return self._from_parts([self._accessor.getcwd()] + self._parts)
+ return self._from_parts([self.cwd()] + self._parts)
def resolve(self, strict=False):
"""
@@ -1064,7 +982,7 @@ class Path(PurePath):
raise RuntimeError("Symlink loop from %r" % e.filename)
try:
- s = self._accessor.realpath(self, strict=strict)
+ s = os.path.realpath(self, strict=strict)
except OSError as e:
check_eloop(e)
raise
@@ -1084,19 +1002,28 @@ class Path(PurePath):
Return the result of the stat() system call on this path, like
os.stat() does.
"""
- return self._accessor.stat(self, follow_symlinks=follow_symlinks)
+ return os.stat(self, follow_symlinks=follow_symlinks)
def owner(self):
"""
Return the login name of the file owner.
"""
- return self._accessor.owner(self)
+ try:
+ import pwd
+ return pwd.getpwuid(self.stat().st_uid).pw_name
+ except ImportError:
+ raise NotImplementedError("Path.owner() is unsupported on this system")
def group(self):
"""
Return the group name of the file gid.
"""
- return self._accessor.group(self)
+
+ try:
+ import grp
+ return grp.getgrgid(self.stat().st_gid).gr_name
+ except ImportError:
+ raise NotImplementedError("Path.group() is unsupported on this system")
def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
@@ -1106,8 +1033,7 @@ class Path(PurePath):
"""
if "b" not in mode:
encoding = io.text_encoding(encoding)
- return self._accessor.open(self, mode, buffering, encoding, errors,
- newline)
+ return io.open(self, mode, buffering, encoding, errors, newline)
def read_bytes(self):
"""
@@ -1148,21 +1074,38 @@ class Path(PurePath):
"""
Return the path to which the symbolic link points.
"""
- path = self._accessor.readlink(self)
- return self._from_parts((path,))
+ if not hasattr(os, "readlink"):
+ raise NotImplementedError("os.readlink() not available on this system")
+ return self._from_parts((os.readlink(self),))
def touch(self, mode=0o666, exist_ok=True):
"""
Create this file with the given access mode, if it doesn't exist.
"""
- self._accessor.touch(self, mode, exist_ok)
+
+ if exist_ok:
+ # First try to bump modification time
+ # Implementation note: GNU touch uses the UTIME_NOW option of
+ # the utimensat() / futimens() functions.
+ try:
+ os.utime(self, None)
+ except OSError:
+ # Avoid exception chaining
+ pass
+ else:
+ return
+ flags = os.O_CREAT | os.O_WRONLY
+ if not exist_ok:
+ flags |= os.O_EXCL
+ fd = os.open(self, flags, mode)
+ os.close(fd)
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""
Create a new directory at this given path.
"""
try:
- self._accessor.mkdir(self, mode)
+ os.mkdir(self, mode)
except FileNotFoundError:
if not parents or self.parent == self:
raise
@@ -1178,7 +1121,7 @@ class Path(PurePath):
"""
Change the permissions of the path, like os.chmod().
"""
- self._accessor.chmod(self, mode, follow_symlinks=follow_symlinks)
+ os.chmod(self, mode, follow_symlinks=follow_symlinks)
def lchmod(self, mode):
"""
@@ -1193,7 +1136,7 @@ class Path(PurePath):
If the path is a directory, use rmdir() instead.
"""
try:
- self._accessor.unlink(self)
+ os.unlink(self)
except FileNotFoundError:
if not missing_ok:
raise
@@ -1202,7 +1145,7 @@ class Path(PurePath):
"""
Remove this directory. The directory must be empty.
"""
- self._accessor.rmdir(self)
+ os.rmdir(self)
def lstat(self):
"""
@@ -1221,7 +1164,7 @@ class Path(PurePath):
Returns the new Path instance pointing to the target path.
"""
- self._accessor.rename(self, target)
+ os.rename(self, target)
return self.__class__(target)
def replace(self, target):
@@ -1234,7 +1177,7 @@ class Path(PurePath):
Returns the new Path instance pointing to the target path.
"""
- self._accessor.replace(self, target)
+ os.replace(self, target)
return self.__class__(target)
def symlink_to(self, target, target_is_directory=False):
@@ -1242,7 +1185,9 @@ class Path(PurePath):
Make this path a symlink pointing to the target path.
Note the order of arguments (link, target) is the reverse of os.symlink.
"""
- self._accessor.symlink(target, self, target_is_directory)
+ if not hasattr(os, "symlink"):
+ raise NotImplementedError("os.symlink() not available on this system")
+ os.symlink(target, self, target_is_directory)
def hardlink_to(self, target):
"""
@@ -1250,7 +1195,9 @@ class Path(PurePath):
Note the order of arguments (self, target) is the reverse of os.link's.
"""
- self._accessor.link(target, self)
+ if not hasattr(os, "link"):
+ raise NotImplementedError("os.link() not available on this system")
+ os.link(target, self)
def link_to(self, target):
"""
@@ -1268,7 +1215,7 @@ class Path(PurePath):
"for removal in Python 3.12. "
"Use pathlib.Path.hardlink_to() instead.",
DeprecationWarning, stacklevel=2)
- self._accessor.link(self, target)
+ self.__class__(target).hardlink_to(self)
# Convenience functions for querying the stat results
@@ -1425,7 +1372,7 @@ class Path(PurePath):
"""
if (not (self._drv or self._root) and
self._parts and self._parts[0][:1] == '~'):
- homedir = self._accessor.expanduser(self._parts[0])
+ homedir = os.path.expanduser(self._parts[0])
if homedir[:1] == "~":
raise RuntimeError("Could not determine home directory.")
return self._from_parts([homedir] + self._parts[1:])
diff --git a/Lib/test/test_pathlib.py b/Lib/test/test_pathlib.py
index 3fbf1d1..5e46b4f 100644
--- a/Lib/test/test_pathlib.py
+++ b/Lib/test/test_pathlib.py
@@ -1,3 +1,4 @@
+import contextlib
import collections.abc
import io
import os
@@ -1459,7 +1460,7 @@ class _BasePathTest(object):
def test_absolute_common(self):
P = self.cls
- with mock.patch("pathlib._normal_accessor.getcwd") as getcwd:
+ with mock.patch("os.getcwd") as getcwd:
getcwd.return_value = BASE
# Simple relative paths.
@@ -1738,21 +1739,18 @@ class _BasePathTest(object):
# Patching is needed to avoid relying on the filesystem
# to return the order of the files as the error will not
# happen if the symlink is the last item.
-
- with mock.patch("os.scandir") as scandir:
- scandir.return_value = sorted(os.scandir(base))
+ real_scandir = os.scandir
+ def my_scandir(path):
+ with real_scandir(path) as scandir_it:
+ entries = list(scandir_it)
+ entries.sort(key=lambda entry: entry.name)
+ return contextlib.nullcontext(entries)
+
+ with mock.patch("os.scandir", my_scandir):
self.assertEqual(len(set(base.glob("*"))), 3)
-
- subdir.mkdir()
-
- with mock.patch("os.scandir") as scandir:
- scandir.return_value = sorted(os.scandir(base))
+ subdir.mkdir()
self.assertEqual(len(set(base.glob("*"))), 4)
-
- subdir.chmod(000)
-
- with mock.patch("os.scandir") as scandir:
- scandir.return_value = sorted(os.scandir(base))
+ subdir.chmod(000)
self.assertEqual(len(set(base.glob("*"))), 4)
def _check_resolve(self, p, expected, strict=True):
@@ -2199,6 +2197,7 @@ class _BasePathTest(object):
p = self.cls(BASE, 'dirCPC%d' % pattern_num)
self.assertFalse(p.exists())
+ real_mkdir = os.mkdir
def my_mkdir(path, mode=0o777):
path = str(path)
# Emulate another process that would create the directory
@@ -2207,15 +2206,15 @@ class _BasePathTest(object):
# function is called at most 5 times (dirCPC/dir1/dir2,
# dirCPC/dir1, dirCPC, dirCPC/dir1, dirCPC/dir1/dir2).
if pattern.pop():
- os.mkdir(path, mode) # From another process.
+ real_mkdir(path, mode) # From another process.
concurrently_created.add(path)
- os.mkdir(path, mode) # Our real call.
+ real_mkdir(path, mode) # Our real call.
pattern = [bool(pattern_num & (1 << n)) for n in range(5)]
concurrently_created = set()
p12 = p / 'dir1' / 'dir2'
try:
- with mock.patch("pathlib._normal_accessor.mkdir", my_mkdir):
+ with mock.patch("os.mkdir", my_mkdir):
p12.mkdir(parents=True, exist_ok=False)
except FileExistsError:
self.assertIn(str(p12), concurrently_created)
@@ -2676,7 +2675,7 @@ class WindowsPathTest(_BasePathTest, unittest.TestCase):
self.assertEqual(str(P(share + 'a\\b').absolute()), share + 'a\\b')
# UNC relative paths.
- with mock.patch("pathlib._normal_accessor.getcwd") as getcwd:
+ with mock.patch("os.getcwd") as getcwd:
getcwd.return_value = share
self.assertEqual(str(P().absolute()), share)