summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorBarney Gale <barney.gale@gmail.com>2024-12-22 02:22:08 (GMT)
committerGitHub <noreply@github.com>2024-12-22 02:22:08 (GMT)
commit8d9f52a7be5c09c0fd4423943edadaacf6d7f917 (patch)
tree6027636f753421b139f0ce77dabea3eab1d82052 /Lib
parentf5ba74b81979b621e38be70ec3ddad1e7f1365ce (diff)
downloadcpython-8d9f52a7be5c09c0fd4423943edadaacf6d7f917.zip
cpython-8d9f52a7be5c09c0fd4423943edadaacf6d7f917.tar.gz
cpython-8d9f52a7be5c09c0fd4423943edadaacf6d7f917.tar.bz2
GH-127807: pathlib ABCs: move private copying methods to dedicated class (#127810)
Move 9 private `PathBase` attributes and methods into a new `CopyWorker` class. Change `PathBase.copy` from a method to a `CopyWorker` instance. The methods remain private in the `CopyWorker` class. In future we might make some/all of them public so that user subclasses of `PathBase` can customize the copying process (in particular reading/writing of metadata,) but we'd need to make `PathBase` public first.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/pathlib/_abc.py228
-rw-r--r--Lib/pathlib/_local.py183
-rw-r--r--Lib/pathlib/_os.py98
3 files changed, 261 insertions, 248 deletions
diff --git a/Lib/pathlib/_abc.py b/Lib/pathlib/_abc.py
index b521c75..6acc29e 100644
--- a/Lib/pathlib/_abc.py
+++ b/Lib/pathlib/_abc.py
@@ -57,6 +57,132 @@ class PathGlobber(_GlobberBase):
return path.with_segments(str(path) + text)
+class CopyWorker:
+ """
+ Class that implements copying between path objects. An instance of this
+ class is available from the PathBase.copy property; it's made callable so
+ that PathBase.copy() can be treated as a method.
+
+ The target path's CopyWorker drives the process from its _create() method.
+ Files and directories are exchanged by calling methods on the source and
+ target paths, and metadata is exchanged by calling
+ source.copy._read_metadata() and target.copy._write_metadata().
+ """
+ __slots__ = ('_path',)
+
+ def __init__(self, path):
+ self._path = path
+
+ def __call__(self, target, follow_symlinks=True, dirs_exist_ok=False,
+ preserve_metadata=False):
+ """
+ Recursively copy this file or directory tree to the given destination.
+ """
+ if not isinstance(target, PathBase):
+ target = self._path.with_segments(target)
+
+ # Delegate to the target path's CopyWorker object.
+ return target.copy._create(self._path, follow_symlinks, dirs_exist_ok, preserve_metadata)
+
+ _readable_metakeys = frozenset()
+
+ def _read_metadata(self, metakeys, *, follow_symlinks=True):
+ """
+ Returns path metadata as a dict with string keys.
+ """
+ raise NotImplementedError
+
+ _writable_metakeys = frozenset()
+
+ def _write_metadata(self, metadata, *, follow_symlinks=True):
+ """
+ Sets path metadata from the given dict with string keys.
+ """
+ raise NotImplementedError
+
+ def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
+ self._ensure_distinct_path(source)
+ if preserve_metadata:
+ metakeys = self._writable_metakeys & source.copy._readable_metakeys
+ else:
+ metakeys = None
+ if not follow_symlinks and source.is_symlink():
+ self._create_symlink(source, metakeys)
+ elif source.is_dir():
+ self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
+ else:
+ self._create_file(source, metakeys)
+ return self._path
+
+ def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
+ """Copy the given directory to our path."""
+ children = list(source.iterdir())
+ self._path.mkdir(exist_ok=dirs_exist_ok)
+ for src in children:
+ dst = self._path.joinpath(src.name)
+ if not follow_symlinks and src.is_symlink():
+ dst.copy._create_symlink(src, metakeys)
+ elif src.is_dir():
+ dst.copy._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
+ else:
+ dst.copy._create_file(src, metakeys)
+ if metakeys:
+ metadata = source.copy._read_metadata(metakeys)
+ if metadata:
+ self._write_metadata(metadata)
+
+ def _create_file(self, source, metakeys):
+ """Copy the given file to our path."""
+ self._ensure_different_file(source)
+ with source.open('rb') as source_f:
+ try:
+ with self._path.open('wb') as target_f:
+ copyfileobj(source_f, target_f)
+ except IsADirectoryError as e:
+ if not self._path.exists():
+ # Raise a less confusing exception.
+ raise FileNotFoundError(
+ f'Directory does not exist: {self._path}') from e
+ raise
+ if metakeys:
+ metadata = source.copy._read_metadata(metakeys)
+ if metadata:
+ self._write_metadata(metadata)
+
+ def _create_symlink(self, source, metakeys):
+ """Copy the given symbolic link to our path."""
+ self._path.symlink_to(source.readlink())
+ if metakeys:
+ metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
+ if metadata:
+ self._write_metadata(metadata, follow_symlinks=False)
+
+ def _ensure_different_file(self, source):
+ """
+ Raise OSError(EINVAL) if both paths refer to the same file.
+ """
+ pass
+
+ def _ensure_distinct_path(self, source):
+ """
+ Raise OSError(EINVAL) if the other path is within this path.
+ """
+ # Note: there is no straightforward, foolproof algorithm to determine
+ # if one directory is within another (a particularly perverse example
+ # would be a single network share mounted in one location via NFS, and
+ # in another location via CIFS), so we simply checks whether the
+ # other path is lexically equal to, or within, this path.
+ if source == self._path:
+ err = OSError(EINVAL, "Source and target are the same path")
+ elif source in self._path.parents:
+ err = OSError(EINVAL, "Source path is a parent of target path")
+ else:
+ return
+ err.filename = str(source)
+ err.filename2 = str(self._path)
+ raise err
+
+
class PurePathBase:
"""Base class for pure path objects.
@@ -374,31 +500,6 @@ class PathBase(PurePathBase):
except (OSError, ValueError):
return False
- def _ensure_different_file(self, other_path):
- """
- Raise OSError(EINVAL) if both paths refer to the same file.
- """
- pass
-
- def _ensure_distinct_path(self, other_path):
- """
- Raise OSError(EINVAL) if the other path is within this path.
- """
- # Note: there is no straightforward, foolproof algorithm to determine
- # if one directory is within another (a particularly perverse example
- # would be a single network share mounted in one location via NFS, and
- # in another location via CIFS), so we simply checks whether the
- # other path is lexically equal to, or within, this path.
- if self == other_path:
- err = OSError(EINVAL, "Source and target are the same path")
- elif self in other_path.parents:
- err = OSError(EINVAL, "Source path is a parent of target path")
- else:
- return
- err.filename = str(self)
- err.filename2 = str(other_path)
- raise err
-
def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
"""
@@ -537,88 +638,13 @@ class PathBase(PurePathBase):
"""
raise NotImplementedError
- def _symlink_to_target_of(self, link):
- """
- Make this path a symlink with the same target as the given link. This
- is used by copy().
- """
- self.symlink_to(link.readlink())
-
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""
Create a new directory at this given path.
"""
raise NotImplementedError
- # Metadata keys supported by this path type.
- _readable_metadata = _writable_metadata = frozenset()
-
- def _read_metadata(self, keys=None, *, follow_symlinks=True):
- """
- Returns path metadata as a dict with string keys.
- """
- raise NotImplementedError
-
- def _write_metadata(self, metadata, *, follow_symlinks=True):
- """
- Sets path metadata from the given dict with string keys.
- """
- raise NotImplementedError
-
- def _copy_metadata(self, target, *, follow_symlinks=True):
- """
- Copies metadata (permissions, timestamps, etc) from this path to target.
- """
- # Metadata types supported by both source and target.
- keys = self._readable_metadata & target._writable_metadata
- if keys:
- metadata = self._read_metadata(keys, follow_symlinks=follow_symlinks)
- target._write_metadata(metadata, follow_symlinks=follow_symlinks)
-
- def _copy_file(self, target):
- """
- Copy the contents of this file to the given target.
- """
- self._ensure_different_file(target)
- with self.open('rb') as source_f:
- try:
- with target.open('wb') as target_f:
- copyfileobj(source_f, target_f)
- except IsADirectoryError as e:
- if not target.exists():
- # Raise a less confusing exception.
- raise FileNotFoundError(
- f'Directory does not exist: {target}') from e
- else:
- raise
-
- def copy(self, target, *, follow_symlinks=True, dirs_exist_ok=False,
- preserve_metadata=False):
- """
- Recursively copy this file or directory tree to the given destination.
- """
- if not isinstance(target, PathBase):
- target = self.with_segments(target)
- self._ensure_distinct_path(target)
- stack = [(self, target)]
- while stack:
- src, dst = stack.pop()
- if not follow_symlinks and src.is_symlink():
- dst._symlink_to_target_of(src)
- if preserve_metadata:
- src._copy_metadata(dst, follow_symlinks=False)
- elif src.is_dir():
- children = src.iterdir()
- dst.mkdir(exist_ok=dirs_exist_ok)
- stack.extend((child, dst.joinpath(child.name))
- for child in children)
- if preserve_metadata:
- src._copy_metadata(dst)
- else:
- src._copy_file(dst)
- if preserve_metadata:
- src._copy_metadata(dst)
- return target
+ copy = property(CopyWorker, doc=CopyWorker.__call__.__doc__)
def copy_into(self, target_dir, *, follow_symlinks=True,
dirs_exist_ok=False, preserve_metadata=False):
diff --git a/Lib/pathlib/_local.py b/Lib/pathlib/_local.py
index 4897149..915402e 100644
--- a/Lib/pathlib/_local.py
+++ b/Lib/pathlib/_local.py
@@ -4,10 +4,10 @@ import operator
import os
import posixpath
import sys
-from errno import EINVAL, EXDEV
+from errno import *
from glob import _StringGlobber, _no_recurse_symlinks
from itertools import chain
-from stat import S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
+from stat import S_IMODE, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
from _collections_abc import Sequence
try:
@@ -19,9 +19,8 @@ try:
except ImportError:
grp = None
-from pathlib._os import (copyfile, file_metadata_keys, read_file_metadata,
- write_file_metadata)
-from pathlib._abc import PurePathBase, PathBase
+from pathlib._os import copyfile
+from pathlib._abc import CopyWorker, PurePathBase, PathBase
__all__ = [
@@ -66,6 +65,131 @@ class _PathParents(Sequence):
return "<{}.parents>".format(type(self._path).__name__)
+class _LocalCopyWorker(CopyWorker):
+ """This object implements the Path.copy callable. Don't try to construct
+ it yourself."""
+ __slots__ = ()
+
+ _readable_metakeys = {'mode', 'times_ns'}
+ if hasattr(os.stat_result, 'st_flags'):
+ _readable_metakeys.add('flags')
+ if hasattr(os, 'listxattr'):
+ _readable_metakeys.add('xattrs')
+ _readable_metakeys = _writable_metakeys = frozenset(_readable_metakeys)
+
+ def _read_metadata(self, metakeys, *, follow_symlinks=True):
+ metadata = {}
+ if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys:
+ st = self._path.stat(follow_symlinks=follow_symlinks)
+ if 'mode' in metakeys:
+ metadata['mode'] = S_IMODE(st.st_mode)
+ if 'times_ns' in metakeys:
+ metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns
+ if 'flags' in metakeys:
+ metadata['flags'] = st.st_flags
+ if 'xattrs' in metakeys:
+ try:
+ metadata['xattrs'] = [
+ (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
+ for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
+ except OSError as err:
+ if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+ raise
+ return metadata
+
+ def _write_metadata(self, metadata, *, follow_symlinks=True):
+ def _nop(*args, ns=None, follow_symlinks=None):
+ pass
+
+ if follow_symlinks:
+ # use the real function if it exists
+ def lookup(name):
+ return getattr(os, name, _nop)
+ else:
+ # use the real function only if it exists
+ # *and* it supports follow_symlinks
+ def lookup(name):
+ fn = getattr(os, name, _nop)
+ if fn in os.supports_follow_symlinks:
+ return fn
+ return _nop
+
+ times_ns = metadata.get('times_ns')
+ if times_ns is not None:
+ lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks)
+ # We must copy extended attributes before the file is (potentially)
+ # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
+ xattrs = metadata.get('xattrs')
+ if xattrs is not None:
+ for attr, value in xattrs:
+ try:
+ os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks)
+ except OSError as e:
+ if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+ raise
+ mode = metadata.get('mode')
+ if mode is not None:
+ try:
+ lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks)
+ except NotImplementedError:
+ # if we got a NotImplementedError, it's because
+ # * follow_symlinks=False,
+ # * lchown() is unavailable, and
+ # * either
+ # * fchownat() is unavailable or
+ # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
+ # (it returned ENOSUP.)
+ # therefore we're out of options--we simply cannot chown the
+ # symlink. give up, suppress the error.
+ # (which is what shutil always did in this circumstance.)
+ pass
+ flags = metadata.get('flags')
+ if flags is not None:
+ try:
+ lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks)
+ except OSError as why:
+ if why.errno not in (EOPNOTSUPP, ENOTSUP):
+ raise
+
+ if copyfile:
+ # Use fast OS routine for local file copying where available.
+ def _create_file(self, source, metakeys):
+ """Copy the given file to the given target."""
+ try:
+ source = os.fspath(source)
+ except TypeError:
+ if not isinstance(source, PathBase):
+ raise
+ super()._create_file(source, metakeys)
+ else:
+ copyfile(source, os.fspath(self._path))
+
+ if os.name == 'nt':
+ # Windows: symlink target might not exist yet if we're copying several
+ # files, so ensure we pass is_dir to os.symlink().
+ def _create_symlink(self, source, metakeys):
+ """Copy the given symlink to the given target."""
+ self._path.symlink_to(source.readlink(), source.is_dir())
+ if metakeys:
+ metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
+ if metadata:
+ self._write_metadata(metadata, follow_symlinks=False)
+
+ def _ensure_different_file(self, source):
+ """
+ Raise OSError(EINVAL) if both paths refer to the same file.
+ """
+ try:
+ if not self._path.samefile(source):
+ return
+ except (OSError, ValueError):
+ return
+ err = OSError(EINVAL, "Source and target are the same file")
+ err.filename = str(source)
+ err.filename2 = str(self._path)
+ raise err
+
+
class PurePath(PurePathBase):
"""Base class for manipulating paths without I/O.
@@ -678,20 +802,6 @@ class Path(PathBase, PurePath):
return (st.st_ino == other_st.st_ino and
st.st_dev == other_st.st_dev)
- def _ensure_different_file(self, other_path):
- """
- Raise OSError(EINVAL) if both paths refer to the same file.
- """
- try:
- if not self.samefile(other_path):
- return
- except (OSError, ValueError):
- return
- err = OSError(EINVAL, "Source and target are the same file")
- err.filename = str(self)
- err.filename2 = str(other_path)
- raise err
-
def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
"""
@@ -932,24 +1042,6 @@ class Path(PathBase, PurePath):
if not exist_ok or not self.is_dir():
raise
- _readable_metadata = _writable_metadata = file_metadata_keys
- _read_metadata = read_file_metadata
- _write_metadata = write_file_metadata
-
- if copyfile:
- def _copy_file(self, target):
- """
- Copy the contents of this file to the given target.
- """
- try:
- target = os.fspath(target)
- except TypeError:
- if not isinstance(target, PathBase):
- raise
- PathBase._copy_file(self, target)
- else:
- copyfile(os.fspath(self), target)
-
def chmod(self, mode, *, follow_symlinks=True):
"""
Change the permissions of the path, like os.chmod().
@@ -1019,16 +1111,17 @@ class Path(PathBase, PurePath):
os.replace(self, target)
return self.with_segments(target)
+ copy = property(_LocalCopyWorker, doc=_LocalCopyWorker.__call__.__doc__)
+
def move(self, target):
"""
Recursively move this file or directory tree to the given destination.
"""
- self._ensure_different_file(target)
+ if not isinstance(target, PathBase):
+ target = self.with_segments(target)
+ target.copy._ensure_different_file(self)
try:
return self.replace(target)
- except TypeError:
- if not isinstance(target, PathBase):
- raise
except OSError as err:
if err.errno != EXDEV:
raise
@@ -1051,14 +1144,6 @@ class Path(PathBase, PurePath):
f = f"{type(self).__name__}.symlink_to()"
raise UnsupportedOperation(f"{f} is unsupported on this system")
- if os.name == 'nt':
- def _symlink_to_target_of(self, link):
- """
- Make this path a symlink with the same target as the given link.
- This is used by copy().
- """
- self.symlink_to(link.readlink(), link.is_dir())
-
if hasattr(os, "link"):
def hardlink_to(self, target):
"""
diff --git a/Lib/pathlib/_os.py b/Lib/pathlib/_os.py
index 642b3a5..57bcaf3 100644
--- a/Lib/pathlib/_os.py
+++ b/Lib/pathlib/_os.py
@@ -4,7 +4,6 @@ Low-level OS functionality wrappers used by pathlib.
from errno import *
import os
-import stat
import sys
try:
import fcntl
@@ -163,100 +162,3 @@ def copyfileobj(source_f, target_f):
write_target = target_f.write
while buf := read_source(1024 * 1024):
write_target(buf)
-
-
-# Kinds of metadata supported by the operating system.
-file_metadata_keys = {'mode', 'times_ns'}
-if hasattr(os.stat_result, 'st_flags'):
- file_metadata_keys.add('flags')
-if hasattr(os, 'listxattr'):
- file_metadata_keys.add('xattrs')
-file_metadata_keys = frozenset(file_metadata_keys)
-
-
-def read_file_metadata(path, keys=None, *, follow_symlinks=True):
- """
- Returns local path metadata as a dict with string keys.
- """
- if keys is None:
- keys = file_metadata_keys
- assert keys.issubset(file_metadata_keys)
- result = {}
- for key in keys:
- if key == 'xattrs':
- try:
- result['xattrs'] = [
- (attr, os.getxattr(path, attr, follow_symlinks=follow_symlinks))
- for attr in os.listxattr(path, follow_symlinks=follow_symlinks)]
- except OSError as err:
- if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
- raise
- continue
- st = os.stat(path, follow_symlinks=follow_symlinks)
- if key == 'mode':
- result['mode'] = stat.S_IMODE(st.st_mode)
- elif key == 'times_ns':
- result['times_ns'] = st.st_atime_ns, st.st_mtime_ns
- elif key == 'flags':
- result['flags'] = st.st_flags
- return result
-
-
-def write_file_metadata(path, metadata, *, follow_symlinks=True):
- """
- Sets local path metadata from the given dict with string keys.
- """
- assert frozenset(metadata.keys()).issubset(file_metadata_keys)
-
- def _nop(*args, ns=None, follow_symlinks=None):
- pass
-
- if follow_symlinks:
- # use the real function if it exists
- def lookup(name):
- return getattr(os, name, _nop)
- else:
- # use the real function only if it exists
- # *and* it supports follow_symlinks
- def lookup(name):
- fn = getattr(os, name, _nop)
- if fn in os.supports_follow_symlinks:
- return fn
- return _nop
-
- times_ns = metadata.get('times_ns')
- if times_ns is not None:
- lookup("utime")(path, ns=times_ns, follow_symlinks=follow_symlinks)
- # We must copy extended attributes before the file is (potentially)
- # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
- xattrs = metadata.get('xattrs')
- if xattrs is not None:
- for attr, value in xattrs:
- try:
- os.setxattr(path, attr, value, follow_symlinks=follow_symlinks)
- except OSError as e:
- if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
- raise
- mode = metadata.get('mode')
- if mode is not None:
- try:
- lookup("chmod")(path, mode, follow_symlinks=follow_symlinks)
- except NotImplementedError:
- # if we got a NotImplementedError, it's because
- # * follow_symlinks=False,
- # * lchown() is unavailable, and
- # * either
- # * fchownat() is unavailable or
- # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
- # (it returned ENOSUP.)
- # therefore we're out of options--we simply cannot chown the
- # symlink. give up, suppress the error.
- # (which is what shutil always did in this circumstance.)
- pass
- flags = metadata.get('flags')
- if flags is not None:
- try:
- lookup("chflags")(path, flags, follow_symlinks=follow_symlinks)
- except OSError as why:
- if why.errno not in (EOPNOTSUPP, ENOTSUP):
- raise