summaryrefslogtreecommitdiffstats
path: root/Lib/pathlib/_local.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/pathlib/_local.py')
-rw-r--r--Lib/pathlib/_local.py183
1 files changed, 134 insertions, 49 deletions
diff --git a/Lib/pathlib/_local.py b/Lib/pathlib/_local.py
index 4897149..915402e 100644
--- a/Lib/pathlib/_local.py
+++ b/Lib/pathlib/_local.py
@@ -4,10 +4,10 @@ import operator
import os
import posixpath
import sys
-from errno import EINVAL, EXDEV
+from errno import *
from glob import _StringGlobber, _no_recurse_symlinks
from itertools import chain
-from stat import S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
+from stat import S_IMODE, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
from _collections_abc import Sequence
try:
@@ -19,9 +19,8 @@ try:
except ImportError:
grp = None
-from pathlib._os import (copyfile, file_metadata_keys, read_file_metadata,
- write_file_metadata)
-from pathlib._abc import PurePathBase, PathBase
+from pathlib._os import copyfile
+from pathlib._abc import CopyWorker, PurePathBase, PathBase
__all__ = [
@@ -66,6 +65,131 @@ class _PathParents(Sequence):
return "<{}.parents>".format(type(self._path).__name__)
+class _LocalCopyWorker(CopyWorker):
+ """This object implements the Path.copy callable. Don't try to construct
+ it yourself."""
+ __slots__ = ()
+
+ _readable_metakeys = {'mode', 'times_ns'}
+ if hasattr(os.stat_result, 'st_flags'):
+ _readable_metakeys.add('flags')
+ if hasattr(os, 'listxattr'):
+ _readable_metakeys.add('xattrs')
+ _readable_metakeys = _writable_metakeys = frozenset(_readable_metakeys)
+
+ def _read_metadata(self, metakeys, *, follow_symlinks=True):
+ metadata = {}
+ if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys:
+ st = self._path.stat(follow_symlinks=follow_symlinks)
+ if 'mode' in metakeys:
+ metadata['mode'] = S_IMODE(st.st_mode)
+ if 'times_ns' in metakeys:
+ metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns
+ if 'flags' in metakeys:
+ metadata['flags'] = st.st_flags
+ if 'xattrs' in metakeys:
+ try:
+ metadata['xattrs'] = [
+ (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
+ for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
+ except OSError as err:
+ if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+ raise
+ return metadata
+
+ def _write_metadata(self, metadata, *, follow_symlinks=True):
+ def _nop(*args, ns=None, follow_symlinks=None):
+ pass
+
+ if follow_symlinks:
+ # use the real function if it exists
+ def lookup(name):
+ return getattr(os, name, _nop)
+ else:
+ # use the real function only if it exists
+ # *and* it supports follow_symlinks
+ def lookup(name):
+ fn = getattr(os, name, _nop)
+ if fn in os.supports_follow_symlinks:
+ return fn
+ return _nop
+
+ times_ns = metadata.get('times_ns')
+ if times_ns is not None:
+ lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks)
+ # We must copy extended attributes before the file is (potentially)
+ # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
+ xattrs = metadata.get('xattrs')
+ if xattrs is not None:
+ for attr, value in xattrs:
+ try:
+ os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks)
+ except OSError as e:
+ if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+ raise
+ mode = metadata.get('mode')
+ if mode is not None:
+ try:
+ lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks)
+ except NotImplementedError:
+ # if we got a NotImplementedError, it's because
+ # * follow_symlinks=False,
+ # * lchown() is unavailable, and
+ # * either
+ # * fchownat() is unavailable or
+ # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
+ # (it returned ENOSUP.)
+ # therefore we're out of options--we simply cannot chown the
+ # symlink. give up, suppress the error.
+ # (which is what shutil always did in this circumstance.)
+ pass
+ flags = metadata.get('flags')
+ if flags is not None:
+ try:
+ lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks)
+ except OSError as why:
+ if why.errno not in (EOPNOTSUPP, ENOTSUP):
+ raise
+
+ if copyfile:
+ # Use fast OS routine for local file copying where available.
+ def _create_file(self, source, metakeys):
+ """Copy the given file to the given target."""
+ try:
+ source = os.fspath(source)
+ except TypeError:
+ if not isinstance(source, PathBase):
+ raise
+ super()._create_file(source, metakeys)
+ else:
+ copyfile(source, os.fspath(self._path))
+
+ if os.name == 'nt':
+ # Windows: symlink target might not exist yet if we're copying several
+ # files, so ensure we pass is_dir to os.symlink().
+ def _create_symlink(self, source, metakeys):
+ """Copy the given symlink to the given target."""
+ self._path.symlink_to(source.readlink(), source.is_dir())
+ if metakeys:
+ metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
+ if metadata:
+ self._write_metadata(metadata, follow_symlinks=False)
+
+ def _ensure_different_file(self, source):
+ """
+ Raise OSError(EINVAL) if both paths refer to the same file.
+ """
+ try:
+ if not self._path.samefile(source):
+ return
+ except (OSError, ValueError):
+ return
+ err = OSError(EINVAL, "Source and target are the same file")
+ err.filename = str(source)
+ err.filename2 = str(self._path)
+ raise err
+
+
class PurePath(PurePathBase):
"""Base class for manipulating paths without I/O.
@@ -678,20 +802,6 @@ class Path(PathBase, PurePath):
return (st.st_ino == other_st.st_ino and
st.st_dev == other_st.st_dev)
- def _ensure_different_file(self, other_path):
- """
- Raise OSError(EINVAL) if both paths refer to the same file.
- """
- try:
- if not self.samefile(other_path):
- return
- except (OSError, ValueError):
- return
- err = OSError(EINVAL, "Source and target are the same file")
- err.filename = str(self)
- err.filename2 = str(other_path)
- raise err
-
def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
"""
@@ -932,24 +1042,6 @@ class Path(PathBase, PurePath):
if not exist_ok or not self.is_dir():
raise
- _readable_metadata = _writable_metadata = file_metadata_keys
- _read_metadata = read_file_metadata
- _write_metadata = write_file_metadata
-
- if copyfile:
- def _copy_file(self, target):
- """
- Copy the contents of this file to the given target.
- """
- try:
- target = os.fspath(target)
- except TypeError:
- if not isinstance(target, PathBase):
- raise
- PathBase._copy_file(self, target)
- else:
- copyfile(os.fspath(self), target)
-
def chmod(self, mode, *, follow_symlinks=True):
"""
Change the permissions of the path, like os.chmod().
@@ -1019,16 +1111,17 @@ class Path(PathBase, PurePath):
os.replace(self, target)
return self.with_segments(target)
+ copy = property(_LocalCopyWorker, doc=_LocalCopyWorker.__call__.__doc__)
+
def move(self, target):
"""
Recursively move this file or directory tree to the given destination.
"""
- self._ensure_different_file(target)
+ if not isinstance(target, PathBase):
+ target = self.with_segments(target)
+ target.copy._ensure_different_file(self)
try:
return self.replace(target)
- except TypeError:
- if not isinstance(target, PathBase):
- raise
except OSError as err:
if err.errno != EXDEV:
raise
@@ -1051,14 +1144,6 @@ class Path(PathBase, PurePath):
f = f"{type(self).__name__}.symlink_to()"
raise UnsupportedOperation(f"{f} is unsupported on this system")
- if os.name == 'nt':
- def _symlink_to_target_of(self, link):
- """
- Make this path a symlink with the same target as the given link.
- This is used by copy().
- """
- self.symlink_to(link.readlink(), link.is_dir())
-
if hasattr(os, "link"):
def hardlink_to(self, target):
"""