diff options
author | Jason R. Coombs <jaraco@jaraco.com> | 2022-10-16 19:00:39 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-16 19:00:39 (GMT) |
commit | cea910ebf14d1bd9d8bc0c8a5046e69ae8f5be17 (patch) | |
tree | d8f509aa9e5bbb452117ce6eb5de2714171edeb4 /Lib/importlib | |
parent | 5c9302d03a57759225dab9e2d7bdd38e79009441 (diff) | |
download | cpython-cea910ebf14d1bd9d8bc0c8a5046e69ae8f5be17.zip cpython-cea910ebf14d1bd9d8bc0c8a5046e69ae8f5be17.tar.gz cpython-cea910ebf14d1bd9d8bc0c8a5046e69ae8f5be17.tar.bz2 |
gh-97930: Merge with importlib_resources 5.9 (GH-97929)
* Merge with importlib_resources 5.9
* Update changelog
Diffstat (limited to 'Lib/importlib')
-rw-r--r-- | Lib/importlib/resources/_common.py | 62 | ||||
-rw-r--r-- | Lib/importlib/resources/abc.py | 23 | ||||
-rw-r--r-- | Lib/importlib/resources/readers.py | 16 | ||||
-rw-r--r-- | Lib/importlib/resources/simple.py | 14 |
4 files changed, 86 insertions, 29 deletions
diff --git a/Lib/importlib/resources/_common.py b/Lib/importlib/resources/_common.py index ca1fa8a..8c0be7e 100644 --- a/Lib/importlib/resources/_common.py +++ b/Lib/importlib/resources/_common.py @@ -67,10 +67,14 @@ def from_package(package): @contextlib.contextmanager -def _tempfile(reader, suffix='', - # gh-93353: Keep a reference to call os.remove() in late Python - # finalization. - *, _os_remove=os.remove): +def _tempfile( + reader, + suffix='', + # gh-93353: Keep a reference to call os.remove() in late Python + # finalization. + *, + _os_remove=os.remove, +): # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' # blocks due to the need to close the temporary file to work on Windows # properly. @@ -89,13 +93,30 @@ def _tempfile(reader, suffix='', pass +def _temp_file(path): + return _tempfile(path.read_bytes, suffix=path.name) + + +def _is_present_dir(path: Traversable) -> bool: + """ + Some Traversables implement ``is_dir()`` to raise an + exception (i.e. ``FileNotFoundError``) when the + directory doesn't exist. This function wraps that call + to always return a boolean and only return True + if there's a dir and it exists. + """ + with contextlib.suppress(FileNotFoundError): + return path.is_dir() + return False + + @functools.singledispatch def as_file(path): """ Given a Traversable object, return that object as a path on the local file system in a context manager. """ - return _tempfile(path.read_bytes, suffix=path.name) + return _temp_dir(path) if _is_present_dir(path) else _temp_file(path) @as_file.register(pathlib.Path) @@ -105,3 +126,34 @@ def _(path): Degenerate behavior for pathlib.Path objects. """ yield path + + +@contextlib.contextmanager +def _temp_path(dir: tempfile.TemporaryDirectory): + """ + Wrap tempfile.TemporyDirectory to return a pathlib object. + """ + with dir as result: + yield pathlib.Path(result) + + +@contextlib.contextmanager +def _temp_dir(path): + """ + Given a traversable dir, recursively replicate the whole tree + to the file system in a context manager. + """ + assert path.is_dir() + with _temp_path(tempfile.TemporaryDirectory()) as temp_dir: + yield _write_contents(temp_dir, path) + + +def _write_contents(target, source): + child = target.joinpath(source.name) + if source.is_dir(): + child.mkdir() + for item in source.iterdir(): + _write_contents(child, item) + else: + child.open('wb').write(source.read_bytes()) + return child diff --git a/Lib/importlib/resources/abc.py b/Lib/importlib/resources/abc.py index 0b7bfdc..67c78c0 100644 --- a/Lib/importlib/resources/abc.py +++ b/Lib/importlib/resources/abc.py @@ -1,6 +1,8 @@ import abc import io +import itertools import os +import pathlib from typing import Any, BinaryIO, Iterable, Iterator, NoReturn, Text, Optional from typing import runtime_checkable, Protocol from typing import Union @@ -53,6 +55,10 @@ class ResourceReader(metaclass=abc.ABCMeta): raise FileNotFoundError +class TraversalError(Exception): + pass + + @runtime_checkable class Traversable(Protocol): """ @@ -95,7 +101,6 @@ class Traversable(Protocol): Return True if self is a file """ - @abc.abstractmethod def joinpath(self, *descendants: StrPath) -> "Traversable": """ Return Traversable resolved with any descendants applied. @@ -104,6 +109,22 @@ class Traversable(Protocol): and each may contain multiple levels separated by ``posixpath.sep`` (``/``). """ + if not descendants: + return self + names = itertools.chain.from_iterable( + path.parts for path in map(pathlib.PurePosixPath, descendants) + ) + target = next(names) + matches = ( + traversable for traversable in self.iterdir() if traversable.name == target + ) + try: + match = next(matches) + except StopIteration: + raise TraversalError( + "Target not found during traversal.", target, list(names) + ) + return match.joinpath(*names) def __truediv__(self, child: StrPath) -> "Traversable": """ diff --git a/Lib/importlib/resources/readers.py b/Lib/importlib/resources/readers.py index b470a20..80cb320 100644 --- a/Lib/importlib/resources/readers.py +++ b/Lib/importlib/resources/readers.py @@ -82,15 +82,13 @@ class MultiplexedPath(abc.Traversable): def is_file(self): return False - def joinpath(self, child): - # first try to find child in current paths - for file in self.iterdir(): - if file.name == child: - return file - # if it does not exist, construct it with the first path - return self._paths[0] / child - - __truediv__ = joinpath + def joinpath(self, *descendants): + try: + return super().joinpath(*descendants) + except abc.TraversalError: + # One of the paths did not resolve (a directory does not exist). + # Just return something that will not exist. + return self._paths[0].joinpath(*descendants) def open(self, *args, **kwargs): raise FileNotFoundError(f'{self} is not a file') diff --git a/Lib/importlib/resources/simple.py b/Lib/importlib/resources/simple.py index d0fbf23..b85e469 100644 --- a/Lib/importlib/resources/simple.py +++ b/Lib/importlib/resources/simple.py @@ -99,20 +99,6 @@ class ResourceContainer(Traversable): def open(self, *args, **kwargs): raise IsADirectoryError() - @staticmethod - def _flatten(compound_names): - for name in compound_names: - yield from name.split('/') - - def joinpath(self, *descendants): - if not descendants: - return self - names = self._flatten(descendants) - target = next(names) - return next( - traversable for traversable in self.iterdir() if traversable.name == target - ).joinpath(*names) - class TraversableReader(TraversableResources, SimpleReader): """ |