diff options
author | Barney Gale <barney.gale@gmail.com> | 2025-03-11 20:54:22 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-11 20:54:22 (GMT) |
commit | ad90c5fabc415d4e46205947cceda82893ec1460 (patch) | |
tree | a1800020ac7bcf49d355dd5faf972fa1d0876bbb /Lib/test/test_pathlib/support/zip_path.py | |
parent | 24070492cfee2cd698009418f70cc3755dbd0b99 (diff) | |
download | cpython-ad90c5fabc415d4e46205947cceda82893ec1460.zip cpython-ad90c5fabc415d4e46205947cceda82893ec1460.tar.gz cpython-ad90c5fabc415d4e46205947cceda82893ec1460.tar.bz2 |
GH-130614: pathlib ABCs: revise test suite for readable paths (#131018)
Test `pathlib.types._ReadablePath` in a dedicated test module. These tests
cover `ReadableZipPath`, `ReadableLocalPath` and `Path`, where the former
two classes are implementations of `_ReadablePath` for use in tests.
Diffstat (limited to 'Lib/test/test_pathlib/support/zip_path.py')
-rw-r--r-- | Lib/test/test_pathlib/support/zip_path.py | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/Lib/test/test_pathlib/support/zip_path.py b/Lib/test/test_pathlib/support/zip_path.py new file mode 100644 index 0000000..ab6a929 --- /dev/null +++ b/Lib/test/test_pathlib/support/zip_path.py @@ -0,0 +1,278 @@ +""" +Implementation of ReadablePath for zip file members, for use in pathlib tests. + +ZipPathGround is also defined here. It helps establish the "ground truth" +about zip file members in tests. +""" + +import errno +import io +import pathlib.types +import posixpath +import stat +import zipfile +from stat import S_IFMT, S_ISDIR, S_ISREG, S_ISLNK + + +class ZipPathGround: + can_symlink = True + + def __init__(self, path_cls): + self.path_cls = path_cls + + def setup(self, local_suffix=""): + return self.path_cls(zip_file=zipfile.ZipFile(io.BytesIO(), "w")) + + def teardown(self, root): + root.zip_file.close() + + def create_file(self, path, data=b''): + path.zip_file.writestr(str(path), data) + + def create_dir(self, path): + path.zip_file.mkdir(str(path)) + + def create_symlink(self, path, target): + zip_info = zipfile.ZipInfo(str(path)) + zip_info.external_attr = stat.S_IFLNK << 16 + path.zip_file.writestr(zip_info, target.encode()) + + def create_hierarchy(self, p): + # Add regular files + self.create_file(p.joinpath('fileA'), b'this is file A\n') + self.create_file(p.joinpath('dirB/fileB'), b'this is file B\n') + self.create_file(p.joinpath('dirC/fileC'), b'this is file C\n') + self.create_file(p.joinpath('dirC/dirD/fileD'), b'this is file D\n') + self.create_file(p.joinpath('dirC/novel.txt'), b'this is a novel\n') + # Add symlinks + self.create_symlink(p.joinpath('linkA'), 'fileA') + self.create_symlink(p.joinpath('linkB'), 'dirB') + self.create_symlink(p.joinpath('dirA/linkC'), '../dirB') + self.create_symlink(p.joinpath('brokenLink'), 'non-existing') + self.create_symlink(p.joinpath('brokenLinkLoop'), 'brokenLinkLoop') + + def readtext(self, p): + with p.zip_file.open(str(p), 'r') as f: + f = io.TextIOWrapper(f) + return f.read() + + def readbytes(self, p): + with p.zip_file.open(str(p), 'r') as f: + return f.read() + + readlink = readtext + + def isdir(self, p): + path_str = str(p) + "/" + return path_str in p.zip_file.NameToInfo + + def isfile(self, p): + info = p.zip_file.NameToInfo.get(str(p)) + if info is None: + return False + return not stat.S_ISLNK(info.external_attr >> 16) + + def islink(self, p): + info = p.zip_file.NameToInfo.get(str(p)) + if info is None: + return False + return stat.S_ISLNK(info.external_attr >> 16) + + +class MissingZipPathInfo: + """ + PathInfo implementation that is used when a zip file member is missing. + """ + __slots__ = () + + def exists(self, follow_symlinks=True): + return False + + def is_dir(self, follow_symlinks=True): + return False + + def is_file(self, follow_symlinks=True): + return False + + def is_symlink(self): + return False + + def resolve(self): + return self + + +missing_zip_path_info = MissingZipPathInfo() + + +class ZipPathInfo: + """ + PathInfo implementation for an existing zip file member. + """ + __slots__ = ('zip_file', 'zip_info', 'parent', 'children') + + def __init__(self, zip_file, parent=None): + self.zip_file = zip_file + self.zip_info = None + self.parent = parent or self + self.children = {} + + def exists(self, follow_symlinks=True): + if follow_symlinks and self.is_symlink(): + return self.resolve().exists() + return True + + def is_dir(self, follow_symlinks=True): + if follow_symlinks and self.is_symlink(): + return self.resolve().is_dir() + elif self.zip_info is None: + return True + elif fmt := S_IFMT(self.zip_info.external_attr >> 16): + return S_ISDIR(fmt) + else: + return self.zip_info.filename.endswith('/') + + def is_file(self, follow_symlinks=True): + if follow_symlinks and self.is_symlink(): + return self.resolve().is_file() + elif self.zip_info is None: + return False + elif fmt := S_IFMT(self.zip_info.external_attr >> 16): + return S_ISREG(fmt) + else: + return not self.zip_info.filename.endswith('/') + + def is_symlink(self): + if self.zip_info is None: + return False + elif fmt := S_IFMT(self.zip_info.external_attr >> 16): + return S_ISLNK(fmt) + else: + return False + + def resolve(self, path=None, create=False, follow_symlinks=True): + """ + Traverse zip hierarchy (parents, children and symlinks) starting + from this PathInfo. This is called from three places: + + - When a zip file member is added to ZipFile.filelist, this method + populates the ZipPathInfo tree (using create=True). + - When ReadableZipPath.info is accessed, this method is finds a + ZipPathInfo entry for the path without resolving any final symlink + (using follow_symlinks=False) + - When ZipPathInfo methods are called with follow_symlinks=True, this + method resolves any symlink in the final path position. + """ + link_count = 0 + stack = path.split('/')[::-1] if path else [] + info = self + while True: + if info.is_symlink() and (follow_symlinks or stack): + link_count += 1 + if link_count >= 40: + return missing_zip_path_info # Symlink loop! + path = info.zip_file.read(info.zip_info).decode() + stack += path.split('/')[::-1] if path else [] + info = info.parent + + if stack: + name = stack.pop() + else: + return info + + if name == '..': + info = info.parent + elif name and name != '.': + if name not in info.children: + if create: + info.children[name] = ZipPathInfo(info.zip_file, info) + else: + return missing_zip_path_info # No such child! + info = info.children[name] + + +class ZipFileList: + """ + `list`-like object that we inject as `ZipFile.filelist`. We maintain a + tree of `ZipPathInfo` objects representing the zip file members. + """ + + __slots__ = ('tree', '_items') + + def __init__(self, zip_file): + self.tree = ZipPathInfo(zip_file) + self._items = [] + for item in zip_file.filelist: + self.append(item) + + def __len__(self): + return len(self._items) + + def __iter__(self): + return iter(self._items) + + def append(self, item): + self._items.append(item) + self.tree.resolve(item.filename, create=True).zip_info = item + + +class ReadableZipPath(pathlib.types._ReadablePath): + """ + Simple implementation of a ReadablePath class for .zip files. + """ + + __slots__ = ('_segments', 'zip_file') + parser = posixpath + + def __init__(self, *pathsegments, zip_file): + self._segments = pathsegments + self.zip_file = zip_file + if not isinstance(zip_file.filelist, ZipFileList): + zip_file.filelist = ZipFileList(zip_file) + + def __hash__(self): + return hash((str(self), self.zip_file)) + + def __eq__(self, other): + if not isinstance(other, ReadableZipPath): + return NotImplemented + return str(self) == str(other) and self.zip_file is other.zip_file + + def __str__(self): + if not self._segments: + return '' + return self.parser.join(*self._segments) + + def __repr__(self): + return f'{type(self).__name__}({str(self)!r}, zip_file={self.zip_file!r})' + + def with_segments(self, *pathsegments): + return type(self)(*pathsegments, zip_file=self.zip_file) + + @property + def info(self): + tree = self.zip_file.filelist.tree + return tree.resolve(str(self), follow_symlinks=False) + + def __open_rb__(self, buffering=-1): + info = self.info.resolve() + if not info.exists(): + raise FileNotFoundError(errno.ENOENT, "File not found", self) + elif info.is_dir(): + raise IsADirectoryError(errno.EISDIR, "Is a directory", self) + return self.zip_file.open(info.zip_info, 'r') + + def iterdir(self): + info = self.info.resolve() + if not info.exists(): + raise FileNotFoundError(errno.ENOENT, "File not found", self) + elif not info.is_dir(): + raise NotADirectoryError(errno.ENOTDIR, "Not a directory", self) + return (self / name for name in info.children) + + def readlink(self): + info = self.info + if not info.exists(): + raise FileNotFoundError(errno.ENOENT, "File not found", self) + elif not info.is_symlink(): + raise OSError(errno.EINVAL, "Not a symlink", self) + return self.with_segments(self.zip_file.read(info.zip_info).decode()) |