diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2013-11-22 16:38:12 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2013-11-22 16:38:12 (GMT) |
commit | 31119e4f10d2905d0ed2c21a012dd9f45ec2fa61 (patch) | |
tree | 80c8d7baf9835deec98a240f8ddccda744170a9b /Lib/pathlib.py | |
parent | b523f8433a8982e10eb41a3e2b37ee0e6d6a6e00 (diff) | |
download | cpython-31119e4f10d2905d0ed2c21a012dd9f45ec2fa61.zip cpython-31119e4f10d2905d0ed2c21a012dd9f45ec2fa61.tar.gz cpython-31119e4f10d2905d0ed2c21a012dd9f45ec2fa61.tar.bz2 |
Issue #19673: Add pathlib to the stdlib as a provisional module (PEP 428).
Diffstat (limited to 'Lib/pathlib.py')
-rw-r--r-- | Lib/pathlib.py | 1287 |
1 files changed, 1287 insertions, 0 deletions
diff --git a/Lib/pathlib.py b/Lib/pathlib.py new file mode 100644 index 0000000..b81720e --- /dev/null +++ b/Lib/pathlib.py @@ -0,0 +1,1287 @@ +import fnmatch +import functools +import io +import ntpath +import os +import posixpath +import re +import sys +import time +import weakref +try: + import threading +except ImportError: + import dummy_threading as threading + +from collections import Sequence, defaultdict +from contextlib import contextmanager +from errno import EINVAL, ENOENT +from itertools import chain, count +from operator import attrgetter +from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO +from urllib.parse import quote as urlquote, quote_from_bytes as urlquote_from_bytes + + +supports_symlinks = True +try: + import nt +except ImportError: + nt = None +else: + if sys.getwindowsversion()[:2] >= (6, 0): + from nt import _getfinalpathname + else: + supports_symlinks = False + _getfinalpathname = None + + +__all__ = [ + "PurePath", "PurePosixPath", "PureWindowsPath", + "Path", "PosixPath", "WindowsPath", + ] + +# +# Internals +# + +def _is_wildcard_pattern(pat): + # Whether this pattern needs actual matching using fnmatch, or can + # be looked up directly as a file. + return "*" in pat or "?" in pat or "[" in pat + + +class _Flavour(object): + """A flavour implements a particular (platform-specific) set of path + semantics.""" + + def __init__(self): + self.join = self.sep.join + + def parse_parts(self, parts): + parsed = [] + sep = self.sep + altsep = self.altsep + drv = root = '' + it = reversed(parts) + for part in it: + if not part: + continue + if altsep: + part = part.replace(altsep, sep) + drv, root, rel = self.splitroot(part) + if sep in rel: + for x in reversed(rel.split(sep)): + if x and x != '.': + parsed.append(sys.intern(x)) + else: + if rel and rel != '.': + parsed.append(sys.intern(rel)) + if drv or root: + if not drv: + # If no drive is present, try to find one in the previous + # parts. This makes the result of parsing e.g. + # ("C:", "/", "a") reasonably intuitive. + for part in it: + drv = self.splitroot(part)[0] + if drv: + break + break + if drv or root: + parsed.append(drv + root) + parsed.reverse() + return drv, root, parsed + + def join_parsed_parts(self, drv, root, parts, drv2, root2, parts2): + """ + Join the two paths represented by the respective + (drive, root, parts) tuples. Return a new (drive, root, parts) tuple. + """ + if root2: + parts = parts2 + root = root2 + else: + parts = parts + parts2 + # XXX raise error if drv and drv2 are different? + return drv2 or drv, root, parts + + +class _WindowsFlavour(_Flavour): + # Reference for Windows paths can be found at + # http://msdn.microsoft.com/en-us/library/aa365247%28v=vs.85%29.aspx + + sep = '\\' + altsep = '/' + has_drv = True + pathmod = ntpath + + is_supported = (nt is not None) + + drive_letters = ( + set(chr(x) for x in range(ord('a'), ord('z') + 1)) | + set(chr(x) for x in range(ord('A'), ord('Z') + 1)) + ) + ext_namespace_prefix = '\\\\?\\' + + reserved_names = ( + {'CON', 'PRN', 'AUX', 'NUL'} | + {'COM%d' % i for i in range(1, 10)} | + {'LPT%d' % i for i in range(1, 10)} + ) + + # Interesting findings about extended paths: + # - '\\?\c:\a', '//?/c:\a' and '//?/c:/a' are all supported + # but '\\?\c:/a' is not + # - extended paths are always absolute; "relative" extended paths will + # fail. + + def splitroot(self, part, sep=sep): + first = part[0:1] + second = part[1:2] + if (second == sep and first == sep): + # XXX extended paths should also disable the collapsing of "." + # components (according to MSDN docs). + prefix, part = self._split_extended_path(part) + first = part[0:1] + second = part[1:2] + else: + prefix = '' + third = part[2:3] + if (second == sep and first == sep and third != sep): + # is a UNC path: + # vvvvvvvvvvvvvvvvvvvvv root + # \\machine\mountpoint\directory\etc\... + # directory ^^^^^^^^^^^^^^ + index = part.find(sep, 2) + if index != -1: + index2 = part.find(sep, index + 1) + # a UNC path can't have two slashes in a row + # (after the initial two) + if index2 != index + 1: + if index2 == -1: + index2 = len(part) + if prefix: + return prefix + part[1:index2], sep, part[index2+1:] + else: + return part[:index2], sep, part[index2+1:] + drv = root = '' + if second == ':' and first in self.drive_letters: + drv = part[:2] + part = part[2:] + first = third + if first == sep: + root = first + part = part.lstrip(sep) + return prefix + drv, root, part + + def casefold(self, s): + return s.lower() + + def casefold_parts(self, parts): + return [p.lower() for p in parts] + + def resolve(self, path): + s = str(path) + if not s: + return os.getcwd() + if _getfinalpathname is not None: + return self._ext_to_normal(_getfinalpathname(s)) + # Means fallback on absolute + return None + + def _split_extended_path(self, s, ext_prefix=ext_namespace_prefix): + prefix = '' + if s.startswith(ext_prefix): + prefix = s[:4] + s = s[4:] + if s.startswith('UNC\\'): + prefix += s[:3] + s = '\\' + s[3:] + return prefix, s + + def _ext_to_normal(self, s): + # Turn back an extended path into a normal DOS-like path + return self._split_extended_path(s)[1] + + def is_reserved(self, parts): + # NOTE: the rules for reserved names seem somewhat complicated + # (e.g. r"..\NUL" is reserved but not r"foo\NUL"). + # We err on the side of caution and return True for paths which are + # not considered reserved by Windows. + if not parts: + return False + if parts[0].startswith('\\\\'): + # UNC paths are never reserved + return False + return parts[-1].partition('.')[0].upper() in self.reserved_names + + def make_uri(self, path): + # Under Windows, file URIs use the UTF-8 encoding. + drive = path.drive + if len(drive) == 2 and drive[1] == ':': + # It's a path on a local drive => 'file:///c:/a/b' + rest = path.as_posix()[2:].lstrip('/') + return 'file:///%s/%s' % ( + drive, urlquote_from_bytes(rest.encode('utf-8'))) + else: + # It's a path on a network drive => 'file://host/share/a/b' + return 'file:' + urlquote_from_bytes(path.as_posix().encode('utf-8')) + + +class _PosixFlavour(_Flavour): + sep = '/' + altsep = '' + has_drv = False + pathmod = posixpath + + is_supported = (os.name != 'nt') + + def splitroot(self, part, sep=sep): + if part and part[0] == sep: + stripped_part = part.lstrip(sep) + # According to POSIX path resolution: + # http://pubs.opengroup.org/onlinepubs/009695399/basedefs/xbd_chap04.html#tag_04_11 + # "A pathname that begins with two successive slashes may be + # interpreted in an implementation-defined manner, although more + # than two leading slashes shall be treated as a single slash". + if len(part) - len(stripped_part) == 2: + return '', sep * 2, stripped_part + else: + return '', sep, stripped_part + else: + return '', '', part + + def casefold(self, s): + return s + + def casefold_parts(self, parts): + return parts + + def resolve(self, path): + sep = self.sep + def split(p): + return [x for x in p.split(sep) if x] + def absparts(p): + # Our own abspath(), since the posixpath one makes + # the mistake of "normalizing" the path without resolving the + # symlinks first. + if not p.startswith(sep): + return split(os.getcwd()) + split(p) + else: + return split(p) + parts = absparts(str(path))[::-1] + accessor = path._accessor + resolved = cur = "" + symlinks = {} + while parts: + part = parts.pop() + cur = resolved + sep + part + if cur in symlinks and symlinks[cur] <= len(parts): + # We've already seen the symlink and there's not less + # work to do than the last time. + raise RuntimeError("Symlink loop from %r" % cur) + try: + target = accessor.readlink(cur) + except OSError as e: + if e.errno != EINVAL: + raise + # Not a symlink + resolved = cur + else: + # Take note of remaining work from this symlink + symlinks[cur] = len(parts) + if target.startswith(sep): + # Symlink points to absolute path + resolved = "" + parts.extend(split(target)[::-1]) + return resolved or sep + + def is_reserved(self, parts): + return False + + def make_uri(self, path): + # We represent the path using the local filesystem encoding, + # for portability to other applications. + bpath = bytes(path) + return 'file://' + urlquote_from_bytes(bpath) + + +_windows_flavour = _WindowsFlavour() +_posix_flavour = _PosixFlavour() + + +class _Accessor: + """An accessor implements a particular (system-specific or not) way of + accessing paths on the filesystem.""" + + +class _NormalAccessor(_Accessor): + + def _wrap_strfunc(strfunc): + @functools.wraps(strfunc) + def wrapped(pathobj, *args): + return strfunc(str(pathobj), *args) + return staticmethod(wrapped) + + def _wrap_binary_strfunc(strfunc): + @functools.wraps(strfunc) + def wrapped(pathobjA, pathobjB, *args): + return strfunc(str(pathobjA), str(pathobjB), *args) + return staticmethod(wrapped) + + stat = _wrap_strfunc(os.stat) + + lstat = _wrap_strfunc(os.lstat) + + open = _wrap_strfunc(os.open) + + listdir = _wrap_strfunc(os.listdir) + + chmod = _wrap_strfunc(os.chmod) + + if hasattr(os, "lchmod"): + lchmod = _wrap_strfunc(os.lchmod) + else: + def lchmod(self, pathobj, mode): + raise NotImplementedError("lchmod() not available on this system") + + mkdir = _wrap_strfunc(os.mkdir) + + unlink = _wrap_strfunc(os.unlink) + + rmdir = _wrap_strfunc(os.rmdir) + + rename = _wrap_binary_strfunc(os.rename) + + replace = _wrap_binary_strfunc(os.replace) + + if nt: + if supports_symlinks: + symlink = _wrap_binary_strfunc(os.symlink) + else: + def symlink(a, b, target_is_directory): + raise NotImplementedError("symlink() not available on this system") + else: + # Under POSIX, os.symlink() takes two args + @staticmethod + def symlink(a, b, target_is_directory): + return os.symlink(str(a), str(b)) + + utime = _wrap_strfunc(os.utime) + + # Helper for resolve() + def readlink(self, path): + return os.readlink(path) + + +_normal_accessor = _NormalAccessor() + + +# +# Globbing helpers +# + +@contextmanager +def _cached(func): + try: + func.__cached__ + yield func + except AttributeError: + cache = {} + def wrapper(*args): + try: + return cache[args] + except KeyError: + value = cache[args] = func(*args) + return value + wrapper.__cached__ = True + try: + yield wrapper + finally: + cache.clear() + +def _make_selector(pattern_parts): + pat = pattern_parts[0] + child_parts = pattern_parts[1:] + if pat == '**': + cls = _RecursiveWildcardSelector + elif '**' in pat: + raise ValueError("Invalid pattern: '**' can only be an entire path component") + elif _is_wildcard_pattern(pat): + cls = _WildcardSelector + else: + cls = _PreciseSelector + return cls(pat, child_parts) + +if hasattr(functools, "lru_cache"): + _make_selector = functools.lru_cache()(_make_selector) + + +class _Selector: + """A selector matches a specific glob pattern part against the children + of a given path.""" + + def __init__(self, child_parts): + self.child_parts = child_parts + if child_parts: + self.successor = _make_selector(child_parts) + else: + self.successor = _TerminatingSelector() + + def select_from(self, parent_path): + """Iterate over all child paths of `parent_path` matched by this + selector. This can contain parent_path itself.""" + path_cls = type(parent_path) + is_dir = path_cls.is_dir + exists = path_cls.exists + listdir = parent_path._accessor.listdir + return self._select_from(parent_path, is_dir, exists, listdir) + + +class _TerminatingSelector: + + def _select_from(self, parent_path, is_dir, exists, listdir): + yield parent_path + + +class _PreciseSelector(_Selector): + + def __init__(self, name, child_parts): + self.name = name + _Selector.__init__(self, child_parts) + + def _select_from(self, parent_path, is_dir, exists, listdir): + if not is_dir(parent_path): + return + path = parent_path._make_child_relpath(self.name) + if exists(path): + for p in self.successor._select_from(path, is_dir, exists, listdir): + yield p + + +class _WildcardSelector(_Selector): + + def __init__(self, pat, child_parts): + self.pat = re.compile(fnmatch.translate(pat)) + _Selector.__init__(self, child_parts) + + def _select_from(self, parent_path, is_dir, exists, listdir): + if not is_dir(parent_path): + return + cf = parent_path._flavour.casefold + for name in listdir(parent_path): + casefolded = cf(name) + if self.pat.match(casefolded): + path = parent_path._make_child_relpath(name) + for p in self.successor._select_from(path, is_dir, exists, listdir): + yield p + + +class _RecursiveWildcardSelector(_Selector): + + def __init__(self, pat, child_parts): + _Selector.__init__(self, child_parts) + + def _iterate_directories(self, parent_path, is_dir, listdir): + yield parent_path + for name in listdir(parent_path): + path = parent_path._make_child_relpath(name) + if is_dir(path): + for p in self._iterate_directories(path, is_dir, listdir): + yield p + + def _select_from(self, parent_path, is_dir, exists, listdir): + if not is_dir(parent_path): + return + with _cached(listdir) as listdir: + yielded = set() + try: + successor_select = self.successor._select_from + for starting_point in self._iterate_directories(parent_path, is_dir, listdir): + for p in successor_select(starting_point, is_dir, exists, listdir): + if p not in yielded: + yield p + yielded.add(p) + finally: + yielded.clear() + + +# +# Public API +# + +class _PathParents(Sequence): + """This object provides sequence-like access to the logical ancestors + of a path. Don't try to construct it yourself.""" + __slots__ = ('_pathcls', '_drv', '_root', '_parts') + + def __init__(self, path): + # We don't store the instance to avoid reference cycles + self._pathcls = type(path) + self._drv = path._drv + self._root = path._root + self._parts = path._parts + + def __len__(self): + if self._drv or self._root: + return len(self._parts) - 1 + else: + return len(self._parts) + + def __getitem__(self, idx): + if idx < 0 or idx >= len(self): + raise IndexError(idx) + return self._pathcls._from_parsed_parts(self._drv, self._root, + self._parts[:-idx - 1]) + + def __repr__(self): + return "<{}.parents>".format(self._pathcls.__name__) + + +class PurePath(object): + """PurePath represents a filesystem path and offers operations which + don't imply any actual filesystem I/O. Depending on your system, + instantiating a PurePath will return either a PurePosixPath or a + PureWindowsPath object. You can also instantiate either of these classes + directly, regardless of your system. + """ + __slots__ = ( + '_drv', '_root', '_parts', + '_str', '_hash', '_pparts', '_cached_cparts', + ) + + def __new__(cls, *args): + """Construct a PurePath from one or several strings and or existing + PurePath objects. The strings and path objects are combined so as + to yield a canonicalized path, which is incorporated into the + new PurePath object. + """ + if cls is PurePath: + cls = PureWindowsPath if os.name == 'nt' else PurePosixPath + return cls._from_parts(args) + + def __reduce__(self): + # Using the parts tuple helps share interned path parts + # when pickling related paths. + return (self.__class__, tuple(self._parts)) + + @classmethod + def _parse_args(cls, args): + # This is useful when you don't want to create an instance, just + # canonicalize some constructor arguments. + parts = [] + for a in args: + if isinstance(a, PurePath): + parts += a._parts + elif isinstance(a, str): + # Assuming a str + parts.append(a) + else: + raise TypeError( + "argument should be a path or str object, not %r" + % type(a)) + return cls._flavour.parse_parts(parts) + + @classmethod + def _from_parts(cls, args, init=True): + # We need to call _parse_args on the instance, so as to get the + # right flavour. + self = object.__new__(cls) + drv, root, parts = self._parse_args(args) + self._drv = drv + self._root = root + self._parts = parts + if init: + self._init() + return self + + @classmethod + def _from_parsed_parts(cls, drv, root, parts, init=True): + self = object.__new__(cls) + self._drv = drv + self._root = root + self._parts = parts + if init: + self._init() + return self + + @classmethod + def _format_parsed_parts(cls, drv, root, parts): + if drv or root: + return drv + root + cls._flavour.join(parts[1:]) + else: + return cls._flavour.join(parts) + + def _init(self): + # Overriden in concrete Path + pass + + def _make_child(self, args): + drv, root, parts = self._parse_args(args) + drv, root, parts = self._flavour.join_parsed_parts( + self._drv, self._root, self._parts, drv, root, parts) + return self._from_parsed_parts(drv, root, parts) + + def __str__(self): + """Return the string representation of the path, suitable for + passing to system calls.""" + try: + return self._str + except AttributeError: + self._str = self._format_parsed_parts(self._drv, self._root, + self._parts) or '.' + return self._str + + def as_posix(self): + """Return the string representation of the path with forward (/) + slashes.""" + f = self._flavour + return str(self).replace(f.sep, '/') + + def __bytes__(self): + """Return the bytes representation of the path. This is only + recommended to use under Unix.""" + return os.fsencode(str(self)) + + def __repr__(self): + return "{}({!r})".format(self.__class__.__name__, self.as_posix()) + + def as_uri(self): + """Return the path as a 'file' URI.""" + if not self.is_absolute(): + raise ValueError("relative path can't be expressed as a file URI") + return self._flavour.make_uri(self) + + @property + def _cparts(self): + # Cached casefolded parts, for hashing and comparison + try: + return self._cached_cparts + except AttributeError: + self._cached_cparts = self._flavour.casefold_parts(self._parts) + return self._cached_cparts + + def __eq__(self, other): + if not isinstance(other, PurePath): + return NotImplemented + return self._cparts == other._cparts and self._flavour is other._flavour + + def __ne__(self, other): + return not self == other + + def __hash__(self): + try: + return self._hash + except AttributeError: + self._hash = hash(tuple(self._cparts)) + return self._hash + + def __lt__(self, other): + if not isinstance(other, PurePath) or self._flavour is not other._flavour: + return NotImplemented + return self._cparts < other._cparts + + def __le__(self, other): + if not isinstance(other, PurePath) or self._flavour is not other._flavour: + return NotImplemented + return self._cparts <= other._cparts + + def __gt__(self, other): + if not isinstance(other, PurePath) or self._flavour is not other._flavour: + return NotImplemented + return self._cparts > other._cparts + + def __ge__(self, other): + if not isinstance(other, PurePath) or self._flavour is not other._flavour: + return NotImplemented + return self._cparts >= other._cparts + + drive = property(attrgetter('_drv'), + doc="""The drive prefix (letter or UNC path), if any.""") + + root = property(attrgetter('_root'), + doc="""The root of the path, if any.""") + + @property + def anchor(self): + """The concatenation of the drive and root, or ''.""" + anchor = self._drv + self._root + return anchor + + @property + def name(self): + """The final path component, if any.""" + parts = self._parts + if len(parts) == (1 if (self._drv or self._root) else 0): + return '' + return parts[-1] + + @property + def suffix(self): + """The final component's last suffix, if any.""" + name = self.name + i = name.rfind('.') + if 0 < i < len(name) - 1: + return name[i:] + else: + return '' + + @property + def suffixes(self): + """A list of the final component's suffixes, if any.""" + name = self.name + if name.endswith('.'): + return [] + name = name.lstrip('.') + return ['.' + suffix for suffix in name.split('.')[1:]] + + @property + def stem(self): + """The final path component, minus its last suffix.""" + name = self.name + i = name.rfind('.') + if 0 < i < len(name) - 1: + return name[:i] + else: + return name + + def with_name(self, name): + """Return a new path with the file name changed.""" + if not self.name: + raise ValueError("%r has an empty name" % (self,)) + return self._from_parsed_parts(self._drv, self._root, + self._parts[:-1] + [name]) + + def with_suffix(self, suffix): + """Return a new path with the file suffix changed (or added, if none).""" + # XXX if suffix is None, should the current suffix be removed? + name = self.name + if not name: + raise ValueError("%r has an empty name" % (self,)) + old_suffix = self.suffix + if not old_suffix: + name = name + suffix + else: + name = name[:-len(old_suffix)] + suffix + return self._from_parsed_parts(self._drv, self._root, + self._parts[:-1] + [name]) + + def relative_to(self, *other): + """Return the relative path to another path identified by the passed + arguments. If the operation is not possible (because this is not + a subpath of the other path), raise ValueError. + """ + # For the purpose of this method, drive and root are considered + # separate parts, i.e.: + # Path('c:/').relative_to('c:') gives Path('/') + # Path('c:/').relative_to('/') raise ValueError + if not other: + raise TypeError("need at least one argument") + parts = self._parts + drv = self._drv + root = self._root + if drv or root: + if root: + abs_parts = [drv, root] + parts[1:] + else: + abs_parts = [drv] + parts[1:] + else: + abs_parts = parts + to_drv, to_root, to_parts = self._parse_args(other) + if to_drv or to_root: + if to_root: + to_abs_parts = [to_drv, to_root] + to_parts[1:] + else: + to_abs_parts = [to_drv] + to_parts[1:] + else: + to_abs_parts = to_parts + n = len(to_abs_parts) + if n == 0 and (drv or root) or abs_parts[:n] != to_abs_parts: + formatted = self._format_parsed_parts(to_drv, to_root, to_parts) + raise ValueError("{!r} does not start with {!r}" + .format(str(self), str(formatted))) + return self._from_parsed_parts('', '', abs_parts[n:]) + + @property + def parts(self): + """An object providing sequence-like access to the + components in the filesystem path.""" + # We cache the tuple to avoid building a new one each time .parts + # is accessed. XXX is this necessary? + try: + return self._pparts + except AttributeError: + self._pparts = tuple(self._parts) + return self._pparts + + def joinpath(self, *args): + """Combine this path with one or several arguments, and return a + new path representing either a subpath (if all arguments are relative + paths) or a totally different path (if one of the arguments is + anchored). + """ + return self._make_child(args) + + def __truediv__(self, key): + return self._make_child((key,)) + + def __rtruediv__(self, key): + return self._from_parts([key] + self._parts) + + @property + def parent(self): + """The logical parent of the path.""" + drv = self._drv + root = self._root + parts = self._parts + if len(parts) == 1 and (drv or root): + return self + return self._from_parsed_parts(drv, root, parts[:-1]) + + @property + def parents(self): + """A sequence of this path's logical parents.""" + return _PathParents(self) + + def is_absolute(self): + """True if the path is absolute (has both a root and, if applicable, + a drive).""" + if not self._root: + return False + return not self._flavour.has_drv or bool(self._drv) + + def is_reserved(self): + """Return True if the path contains one of the special names reserved + by the system, if any.""" + return self._flavour.is_reserved(self._parts) + + def match(self, path_pattern): + """ + Return True if this path matches the given pattern. + """ + cf = self._flavour.casefold + path_pattern = cf(path_pattern) + drv, root, pat_parts = self._flavour.parse_parts((path_pattern,)) + if not pat_parts: + raise ValueError("empty pattern") + if drv and drv != cf(self._drv): + return False + if root and root != cf(self._root): + return False + parts = self._cparts + if drv or root: + if len(pat_parts) != len(parts): + return False + pat_parts = pat_parts[1:] + elif len(pat_parts) > len(parts): + return False + for part, pat in zip(reversed(parts), reversed(pat_parts)): + if not fnmatch.fnmatchcase(part, pat): + return False + return True + + +class PurePosixPath(PurePath): + _flavour = _posix_flavour + __slots__ = () + + +class PureWindowsPath(PurePath): + _flavour = _windows_flavour + __slots__ = () + + +# Filesystem-accessing classes + + +class Path(PurePath): + __slots__ = ( + '_accessor', + '_closed', + ) + + def __new__(cls, *args, **kwargs): + if cls is Path: + cls = WindowsPath if os.name == 'nt' else PosixPath + self = cls._from_parts(args, init=False) + if not self._flavour.is_supported: + raise NotImplementedError("cannot instantiate %r on your system" + % (cls.__name__,)) + self._init() + return self + + def _init(self, + # Private non-constructor arguments + template=None, + ): + self._closed = False + if template is not None: + self._accessor = template._accessor + else: + self._accessor = _normal_accessor + + def _make_child_relpath(self, part): + # This is an optimization used for dir walking. `part` must be + # a single part relative to this path. + parts = self._parts + [part] + return self._from_parsed_parts(self._drv, self._root, parts) + + def __enter__(self): + if self._closed: + self._raise_closed() + return self + + def __exit__(self, t, v, tb): + self._closed = True + + def _raise_closed(self): + raise ValueError("I/O operation on closed path") + + def _opener(self, name, flags, mode=0o666): + # A stub for the opener argument to built-in open() + return self._accessor.open(self, flags, mode) + + # Public API + + @classmethod + def cwd(cls): + """Return a new path pointing to the current working directory + (as returned by os.getcwd()). + """ + return cls(os.getcwd()) + + def iterdir(self): + """Iterate over the files in this directory. Does not yield any + result for the special paths '.' and '..'. + """ + if self._closed: + self._raise_closed() + for name in self._accessor.listdir(self): + if name in {'.', '..'}: + # Yielding a path object for these makes little sense + continue + yield self._make_child_relpath(name) + if self._closed: + self._raise_closed() + + def glob(self, pattern): + """Iterate over this subtree and yield all existing files (of any + kind, including directories) matching the given pattern. + """ + pattern = self._flavour.casefold(pattern) + drv, root, pattern_parts = self._flavour.parse_parts((pattern,)) + if drv or root: + raise NotImplementedError("Non-relative patterns are unsupported") + selector = _make_selector(tuple(pattern_parts)) + for p in selector.select_from(self): + yield p + + def rglob(self, pattern): + """Recursively yield all existing files (of any kind, including + directories) matching the given pattern, anywhere in this subtree. + """ + pattern = self._flavour.casefold(pattern) + drv, root, pattern_parts = self._flavour.parse_parts((pattern,)) + if drv or root: + raise NotImplementedError("Non-relative patterns are unsupported") + selector = _make_selector(("**",) + tuple(pattern_parts)) + for p in selector.select_from(self): + yield p + + def absolute(self): + """Return an absolute version of this path. This function works + even if the path doesn't point to anything. + + No normalization is done, i.e. all '.' and '..' will be kept along. + Use resolve() to get the canonical path to a file. + """ + # XXX untested yet! + if self._closed: + self._raise_closed() + if self.is_absolute(): + return self + # FIXME this must defer to the specific flavour (and, under Windows, + # use nt._getfullpathname()) + obj = self._from_parts([os.getcwd()] + self._parts, init=False) + obj._init(template=self) + return obj + + def resolve(self): + """ + Make the path absolute, resolving all symlinks on the way and also + normalizing it (for example turning slashes into backslashes under + Windows). + """ + if self._closed: + self._raise_closed() + s = self._flavour.resolve(self) + if s is None: + # No symlink resolution => for consistency, raise an error if + # the path doesn't exist or is forbidden + self.stat() + s = str(self.absolute()) + # Now we have no symlinks in the path, it's safe to normalize it. + normed = self._flavour.pathmod.normpath(s) + obj = self._from_parts((normed,), init=False) + obj._init(template=self) + return obj + + def stat(self): + """ + Return the result of the stat() system call on this path, like + os.stat() does. + """ + return self._accessor.stat(self) + + def owner(self): + """ + Return the login name of the file owner. + """ + import pwd + return pwd.getpwuid(self.stat().st_uid).pw_name + + def group(self): + """ + Return the group name of the file gid. + """ + import grp + return grp.getgrgid(self.stat().st_gid).gr_name + + def _raw_open(self, flags, mode=0o777): + """ + Open the file pointed by this path and return a file descriptor, + as os.open() does. + """ + if self._closed: + self._raise_closed() + return self._accessor.open(self, flags, mode) + + def open(self, mode='r', buffering=-1, encoding=None, + errors=None, newline=None): + """ + Open the file pointed by this path and return a file object, as + the built-in open() function does. + """ + if self._closed: + self._raise_closed() + return io.open(str(self), mode, buffering, encoding, errors, newline, + opener=self._opener) + + def touch(self, mode=0o666, exist_ok=True): + """ + Create this file with the given access mode, if it doesn't exist. + """ + if self._closed: + self._raise_closed() + if exist_ok: + # First try to bump modification time + # Implementation note: GNU touch uses the UTIME_NOW option of + # the utimensat() / futimens() functions. + t = time.time() + try: + self._accessor.utime(self, (t, t)) + except OSError: + # Avoid exception chaining + pass + else: + return + flags = os.O_CREAT | os.O_WRONLY + if not exist_ok: + flags |= os.O_EXCL + fd = self._raw_open(flags, mode) + os.close(fd) + + def mkdir(self, mode=0o777, parents=False): + if self._closed: + self._raise_closed() + if not parents: + self._accessor.mkdir(self, mode) + else: + try: + self._accessor.mkdir(self, mode) + except OSError as e: + if e.errno != ENOENT: + raise + self.parent.mkdir(mode, True) + self._accessor.mkdir(self, mode) + + def chmod(self, mode): + """ + Change the permissions of the path, like os.chmod(). + """ + if self._closed: + self._raise_closed() + self._accessor.chmod(self, mode) + + def lchmod(self, mode): + """ + Like chmod(), except if the path points to a symlink, the symlink's + permissions are changed, rather than its target's. + """ + if self._closed: + self._raise_closed() + self._accessor.lchmod(self, mode) + + def unlink(self): + """ + Remove this file or link. + If the path is a directory, use rmdir() instead. + """ + if self._closed: + self._raise_closed() + self._accessor.unlink(self) + + def rmdir(self): + """ + Remove this directory. The directory must be empty. + """ + if self._closed: + self._raise_closed() + self._accessor.rmdir(self) + + def lstat(self): + """ + Like stat(), except if the path points to a symlink, the symlink's + status information is returned, rather than its target's. + """ + if self._closed: + self._raise_closed() + return self._accessor.lstat(self) + + def rename(self, target): + """ + Rename this path to the given path. + """ + if self._closed: + self._raise_closed() + self._accessor.rename(self, target) + + def replace(self, target): + """ + Rename this path to the given path, clobbering the existing + destination if it exists. + """ + if self._closed: + self._raise_closed() + self._accessor.replace(self, target) + + def symlink_to(self, target, target_is_directory=False): + """ + Make this path a symlink pointing to the given path. + Note the order of arguments (self, target) is the reverse of os.symlink's. + """ + if self._closed: + self._raise_closed() + self._accessor.symlink(target, self, target_is_directory) + + # Convenience functions for querying the stat results + + def exists(self): + """ + Whether this path exists. + """ + try: + self.stat() + except OSError as e: + if e.errno != ENOENT: + raise + return False + return True + + def is_dir(self): + """ + Whether this path is a directory. + """ + try: + return S_ISDIR(self.stat().st_mode) + except OSError as e: + if e.errno != ENOENT: + raise + # Path doesn't exist or is a broken symlink + # (see https://bitbucket.org/pitrou/pathlib/issue/12/) + return False + + def is_file(self): + """ + Whether this path is a regular file (also True for symlinks pointing + to regular files). + """ + try: + return S_ISREG(self.stat().st_mode) + except OSError as e: + if e.errno != ENOENT: + raise + # Path doesn't exist or is a broken symlink + # (see https://bitbucket.org/pitrou/pathlib/issue/12/) + return False + + def is_symlink(self): + """ + Whether this path is a symbolic link. + """ + try: + return S_ISLNK(self.lstat().st_mode) + except OSError as e: + if e.errno != ENOENT: + raise + # Path doesn't exist + return False + + def is_block_device(self): + """ + Whether this path is a block device. + """ + try: + return S_ISBLK(self.stat().st_mode) + except OSError as e: + if e.errno != ENOENT: + raise + # Path doesn't exist or is a broken symlink + # (see https://bitbucket.org/pitrou/pathlib/issue/12/) + return False + + def is_char_device(self): + """ + Whether this path is a character device. + """ + try: + return S_ISCHR(self.stat().st_mode) + except OSError as e: + if e.errno != ENOENT: + raise + # Path doesn't exist or is a broken symlink + # (see https://bitbucket.org/pitrou/pathlib/issue/12/) + return False + + def is_fifo(self): + """ + Whether this path is a FIFO. + """ + try: + return S_ISFIFO(self.stat().st_mode) + except OSError as e: + if e.errno != ENOENT: + raise + # Path doesn't exist or is a broken symlink + # (see https://bitbucket.org/pitrou/pathlib/issue/12/) + return False + + def is_socket(self): + """ + Whether this path is a socket. + """ + try: + return S_ISSOCK(self.stat().st_mode) + except OSError as e: + if e.errno != ENOENT: + raise + # Path doesn't exist or is a broken symlink + # (see https://bitbucket.org/pitrou/pathlib/issue/12/) + return False + + +class PosixPath(Path, PurePosixPath): + __slots__ = () + +class WindowsPath(Path, PureWindowsPath): + __slots__ = () + |