summaryrefslogtreecommitdiffstats
path: root/Lib/importlib
diff options
context:
space:
mode:
authorJason R. Coombs <jaraco@jaraco.com>2022-10-16 19:00:39 (GMT)
committerGitHub <noreply@github.com>2022-10-16 19:00:39 (GMT)
commitcea910ebf14d1bd9d8bc0c8a5046e69ae8f5be17 (patch)
treed8f509aa9e5bbb452117ce6eb5de2714171edeb4 /Lib/importlib
parent5c9302d03a57759225dab9e2d7bdd38e79009441 (diff)
downloadcpython-cea910ebf14d1bd9d8bc0c8a5046e69ae8f5be17.zip
cpython-cea910ebf14d1bd9d8bc0c8a5046e69ae8f5be17.tar.gz
cpython-cea910ebf14d1bd9d8bc0c8a5046e69ae8f5be17.tar.bz2
gh-97930: Merge with importlib_resources 5.9 (GH-97929)
* Merge with importlib_resources 5.9 * Update changelog
Diffstat (limited to 'Lib/importlib')
-rw-r--r--Lib/importlib/resources/_common.py62
-rw-r--r--Lib/importlib/resources/abc.py23
-rw-r--r--Lib/importlib/resources/readers.py16
-rw-r--r--Lib/importlib/resources/simple.py14
4 files changed, 86 insertions, 29 deletions
diff --git a/Lib/importlib/resources/_common.py b/Lib/importlib/resources/_common.py
index ca1fa8a..8c0be7e 100644
--- a/Lib/importlib/resources/_common.py
+++ b/Lib/importlib/resources/_common.py
@@ -67,10 +67,14 @@ def from_package(package):
@contextlib.contextmanager
-def _tempfile(reader, suffix='',
- # gh-93353: Keep a reference to call os.remove() in late Python
- # finalization.
- *, _os_remove=os.remove):
+def _tempfile(
+ reader,
+ suffix='',
+ # gh-93353: Keep a reference to call os.remove() in late Python
+ # finalization.
+ *,
+ _os_remove=os.remove,
+):
# Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
# blocks due to the need to close the temporary file to work on Windows
# properly.
@@ -89,13 +93,30 @@ def _tempfile(reader, suffix='',
pass
+def _temp_file(path):
+ return _tempfile(path.read_bytes, suffix=path.name)
+
+
+def _is_present_dir(path: Traversable) -> bool:
+ """
+ Some Traversables implement ``is_dir()`` to raise an
+ exception (i.e. ``FileNotFoundError``) when the
+ directory doesn't exist. This function wraps that call
+ to always return a boolean and only return True
+ if there's a dir and it exists.
+ """
+ with contextlib.suppress(FileNotFoundError):
+ return path.is_dir()
+ return False
+
+
@functools.singledispatch
def as_file(path):
"""
Given a Traversable object, return that object as a
path on the local file system in a context manager.
"""
- return _tempfile(path.read_bytes, suffix=path.name)
+ return _temp_dir(path) if _is_present_dir(path) else _temp_file(path)
@as_file.register(pathlib.Path)
@@ -105,3 +126,34 @@ def _(path):
Degenerate behavior for pathlib.Path objects.
"""
yield path
+
+
+@contextlib.contextmanager
+def _temp_path(dir: tempfile.TemporaryDirectory):
+ """
+ Wrap tempfile.TemporyDirectory to return a pathlib object.
+ """
+ with dir as result:
+ yield pathlib.Path(result)
+
+
+@contextlib.contextmanager
+def _temp_dir(path):
+ """
+ Given a traversable dir, recursively replicate the whole tree
+ to the file system in a context manager.
+ """
+ assert path.is_dir()
+ with _temp_path(tempfile.TemporaryDirectory()) as temp_dir:
+ yield _write_contents(temp_dir, path)
+
+
+def _write_contents(target, source):
+ child = target.joinpath(source.name)
+ if source.is_dir():
+ child.mkdir()
+ for item in source.iterdir():
+ _write_contents(child, item)
+ else:
+ child.open('wb').write(source.read_bytes())
+ return child
diff --git a/Lib/importlib/resources/abc.py b/Lib/importlib/resources/abc.py
index 0b7bfdc..67c78c0 100644
--- a/Lib/importlib/resources/abc.py
+++ b/Lib/importlib/resources/abc.py
@@ -1,6 +1,8 @@
import abc
import io
+import itertools
import os
+import pathlib
from typing import Any, BinaryIO, Iterable, Iterator, NoReturn, Text, Optional
from typing import runtime_checkable, Protocol
from typing import Union
@@ -53,6 +55,10 @@ class ResourceReader(metaclass=abc.ABCMeta):
raise FileNotFoundError
+class TraversalError(Exception):
+ pass
+
+
@runtime_checkable
class Traversable(Protocol):
"""
@@ -95,7 +101,6 @@ class Traversable(Protocol):
Return True if self is a file
"""
- @abc.abstractmethod
def joinpath(self, *descendants: StrPath) -> "Traversable":
"""
Return Traversable resolved with any descendants applied.
@@ -104,6 +109,22 @@ class Traversable(Protocol):
and each may contain multiple levels separated by
``posixpath.sep`` (``/``).
"""
+ if not descendants:
+ return self
+ names = itertools.chain.from_iterable(
+ path.parts for path in map(pathlib.PurePosixPath, descendants)
+ )
+ target = next(names)
+ matches = (
+ traversable for traversable in self.iterdir() if traversable.name == target
+ )
+ try:
+ match = next(matches)
+ except StopIteration:
+ raise TraversalError(
+ "Target not found during traversal.", target, list(names)
+ )
+ return match.joinpath(*names)
def __truediv__(self, child: StrPath) -> "Traversable":
"""
diff --git a/Lib/importlib/resources/readers.py b/Lib/importlib/resources/readers.py
index b470a20..80cb320 100644
--- a/Lib/importlib/resources/readers.py
+++ b/Lib/importlib/resources/readers.py
@@ -82,15 +82,13 @@ class MultiplexedPath(abc.Traversable):
def is_file(self):
return False
- def joinpath(self, child):
- # first try to find child in current paths
- for file in self.iterdir():
- if file.name == child:
- return file
- # if it does not exist, construct it with the first path
- return self._paths[0] / child
-
- __truediv__ = joinpath
+ def joinpath(self, *descendants):
+ try:
+ return super().joinpath(*descendants)
+ except abc.TraversalError:
+ # One of the paths did not resolve (a directory does not exist).
+ # Just return something that will not exist.
+ return self._paths[0].joinpath(*descendants)
def open(self, *args, **kwargs):
raise FileNotFoundError(f'{self} is not a file')
diff --git a/Lib/importlib/resources/simple.py b/Lib/importlib/resources/simple.py
index d0fbf23..b85e469 100644
--- a/Lib/importlib/resources/simple.py
+++ b/Lib/importlib/resources/simple.py
@@ -99,20 +99,6 @@ class ResourceContainer(Traversable):
def open(self, *args, **kwargs):
raise IsADirectoryError()
- @staticmethod
- def _flatten(compound_names):
- for name in compound_names:
- yield from name.split('/')
-
- def joinpath(self, *descendants):
- if not descendants:
- return self
- names = self._flatten(descendants)
- target = next(names)
- return next(
- traversable for traversable in self.iterdir() if traversable.name == target
- ).joinpath(*names)
-
class TraversableReader(TraversableResources, SimpleReader):
"""