diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/importlib/_bootstrap_external.py | 26 | ||||
-rw-r--r-- | Lib/importlib/_common.py | 82 | ||||
-rw-r--r-- | Lib/importlib/abc.py | 2 | ||||
-rw-r--r-- | Lib/importlib/readers.py | 30 | ||||
-rw-r--r-- | Lib/importlib/resources.py | 85 | ||||
-rw-r--r-- | Lib/zipimport.py | 78 |
6 files changed, 112 insertions, 191 deletions
diff --git a/Lib/importlib/_bootstrap_external.py b/Lib/importlib/_bootstrap_external.py index 25a3f8c..4f06039 100644 --- a/Lib/importlib/_bootstrap_external.py +++ b/Lib/importlib/_bootstrap_external.py @@ -982,32 +982,10 @@ class FileLoader: with _io.FileIO(path, 'r') as file: return file.read() - # ResourceReader ABC API. - @_check_name def get_resource_reader(self, module): - if self.is_package(module): - return self - return None - - def open_resource(self, resource): - path = _path_join(_path_split(self.path)[0], resource) - return _io.FileIO(path, 'r') - - def resource_path(self, resource): - if not self.is_resource(resource): - raise FileNotFoundError - path = _path_join(_path_split(self.path)[0], resource) - return path - - def is_resource(self, name): - if path_sep in name: - return False - path = _path_join(_path_split(self.path)[0], name) - return _path_isfile(path) - - def contents(self): - return iter(_os.listdir(_path_split(self.path)[0])) + from importlib.readers import FileReader + return FileReader(self) class SourceFileLoader(FileLoader, SourceLoader): diff --git a/Lib/importlib/_common.py b/Lib/importlib/_common.py index ba7cbac..b15c59e 100644 --- a/Lib/importlib/_common.py +++ b/Lib/importlib/_common.py @@ -1,38 +1,82 @@ import os import pathlib -import zipfile import tempfile import functools import contextlib +import types +import importlib +from typing import Union, Any, Optional +from .abc import ResourceReader -def from_package(package): +Package = Union[types.ModuleType, str] + + +def files(package): """ - Return a Traversable object for the given package. + Get a Traversable resource from a package + """ + return from_package(get_package(package)) + +def normalize_path(path): + # type: (Any) -> str + """Normalize a path by ensuring it is a string. + + If the resulting string contains path separators, an exception is raised. """ - spec = package.__spec__ - return from_traversable_resources(spec) or fallback_resources(spec) + str_path = str(path) + parent, file_name = os.path.split(str_path) + if parent: + raise ValueError('{!r} must be only a file name'.format(path)) + return file_name -def from_traversable_resources(spec): +def get_resource_reader(package): + # type: (types.ModuleType) -> Optional[ResourceReader] """ - If the spec.loader implements TraversableResources, - directly or implicitly, it will have a ``files()`` method. + Return the package's loader if it's a ResourceReader. """ - with contextlib.suppress(AttributeError): - return spec.loader.files() + # We can't use + # a issubclass() check here because apparently abc.'s __subclasscheck__() + # hook wants to create a weak reference to the object, but + # zipimport.zipimporter does not support weak references, resulting in a + # TypeError. That seems terrible. + spec = package.__spec__ + reader = getattr(spec.loader, 'get_resource_reader', None) + if reader is None: + return None + return reader(spec.name) -def fallback_resources(spec): - package_directory = pathlib.Path(spec.origin).parent - try: - archive_path = spec.loader.archive - rel_path = package_directory.relative_to(archive_path) - return zipfile.Path(archive_path, str(rel_path) + '/') - except Exception: - pass - return package_directory +def resolve(cand): + # type: (Package) -> types.ModuleType + return ( + cand if isinstance(cand, types.ModuleType) + else importlib.import_module(cand) + ) + + +def get_package(package): + # type: (Package) -> types.ModuleType + """Take a package name or module object and return the module. + + Raise an exception if the resolved module is not a package. + """ + resolved = resolve(package) + if resolved.__spec__.submodule_search_locations is None: + raise TypeError('{!r} is not a package'.format(package)) + return resolved + + +def from_package(package): + """ + Return a Traversable object for the given package. + + """ + spec = package.__spec__ + reader = spec.loader.get_resource_reader(spec.name) + return reader.files() @contextlib.contextmanager diff --git a/Lib/importlib/abc.py b/Lib/importlib/abc.py index b8a9bb1..0b20e7c 100644 --- a/Lib/importlib/abc.py +++ b/Lib/importlib/abc.py @@ -468,7 +468,7 @@ class TraversableResources(ResourceReader): raise FileNotFoundError(resource) def is_resource(self, path): - return self.files().joinpath(path).isfile() + return self.files().joinpath(path).is_file() def contents(self): return (item.name for item in self.files().iterdir()) diff --git a/Lib/importlib/readers.py b/Lib/importlib/readers.py new file mode 100644 index 0000000..fb49ebe --- /dev/null +++ b/Lib/importlib/readers.py @@ -0,0 +1,30 @@ +import zipfile +import pathlib +from . import abc + + +class FileReader(abc.TraversableResources): + def __init__(self, loader): + self.path = pathlib.Path(loader.path).parent + + def files(self): + return self.path + + +class ZipReader(FileReader): + def __init__(self, loader, module): + _, _, name = module.rpartition('.') + prefix = loader.prefix.replace('\\', '/') + name + '/' + self.path = zipfile.Path(loader.archive, prefix) + + def open_resource(self, resource): + try: + return super().open_resource(resource) + except KeyError as exc: + raise FileNotFoundError(exc.args[0]) + + def is_resource(self, path): + # workaround for `zipfile.Path.is_file` returning true + # for non-existent paths. + target = self.files().joinpath(path) + return target.is_file() and target.exists() diff --git a/Lib/importlib/resources.py b/Lib/importlib/resources.py index b803a01..4535619 100644 --- a/Lib/importlib/resources.py +++ b/Lib/importlib/resources.py @@ -1,15 +1,13 @@ import os -from . import abc as resources_abc from . import _common -from ._common import as_file +from ._common import as_file, files from contextlib import contextmanager, suppress -from importlib import import_module from importlib.abc import ResourceLoader from io import BytesIO, TextIOWrapper from pathlib import Path from types import ModuleType -from typing import ContextManager, Iterable, Optional, Union +from typing import ContextManager, Iterable, Union from typing import cast from typing.io import BinaryIO, TextIO @@ -33,60 +31,11 @@ Package = Union[str, ModuleType] Resource = Union[str, os.PathLike] -def _resolve(name) -> ModuleType: - """If name is a string, resolve to a module.""" - if hasattr(name, '__spec__'): - return name - return import_module(name) - - -def _get_package(package) -> ModuleType: - """Take a package name or module object and return the module. - - If a name, the module is imported. If the resolved module - object is not a package, raise an exception. - """ - module = _resolve(package) - if module.__spec__.submodule_search_locations is None: - raise TypeError('{!r} is not a package'.format(package)) - return module - - -def _normalize_path(path) -> str: - """Normalize a path by ensuring it is a string. - - If the resulting string contains path separators, an exception is raised. - """ - parent, file_name = os.path.split(path) - if parent: - raise ValueError('{!r} must be only a file name'.format(path)) - return file_name - - -def _get_resource_reader( - package: ModuleType) -> Optional[resources_abc.ResourceReader]: - # Return the package's loader if it's a ResourceReader. We can't use - # a issubclass() check here because apparently abc.'s __subclasscheck__() - # hook wants to create a weak reference to the object, but - # zipimport.zipimporter does not support weak references, resulting in a - # TypeError. That seems terrible. - spec = package.__spec__ - if hasattr(spec.loader, 'get_resource_reader'): - return cast(resources_abc.ResourceReader, - spec.loader.get_resource_reader(spec.name)) - return None - - -def _check_location(package): - if package.__spec__.origin is None or not package.__spec__.has_location: - raise FileNotFoundError(f'Package has no location {package!r}') - - def open_binary(package: Package, resource: Resource) -> BinaryIO: """Return a file-like object opened for binary reading of the resource.""" - resource = _normalize_path(resource) - package = _get_package(package) - reader = _get_resource_reader(package) + resource = _common.normalize_path(resource) + package = _common.get_package(package) + reader = _common.get_resource_reader(package) if reader is not None: return reader.open_resource(resource) absolute_package_path = os.path.abspath( @@ -140,13 +89,6 @@ def read_text(package: Package, return fp.read() -def files(package: Package) -> resources_abc.Traversable: - """ - Get a Traversable resource from a package - """ - return _common.from_package(_get_package(package)) - - def path( package: Package, resource: Resource, ) -> 'ContextManager[Path]': @@ -158,17 +100,18 @@ def path( raised if the file was deleted prior to the context manager exiting). """ - reader = _get_resource_reader(_get_package(package)) + reader = _common.get_resource_reader(_common.get_package(package)) return ( _path_from_reader(reader, resource) if reader else - _common.as_file(files(package).joinpath(_normalize_path(resource))) + _common.as_file( + _common.files(package).joinpath(_common.normalize_path(resource))) ) @contextmanager def _path_from_reader(reader, resource): - norm_resource = _normalize_path(resource) + norm_resource = _common.normalize_path(resource) with suppress(FileNotFoundError): yield Path(reader.resource_path(norm_resource)) return @@ -182,9 +125,9 @@ def is_resource(package: Package, name: str) -> bool: Directories are *not* resources. """ - package = _get_package(package) - _normalize_path(name) - reader = _get_resource_reader(package) + package = _common.get_package(package) + _common.normalize_path(name) + reader = _common.get_resource_reader(package) if reader is not None: return reader.is_resource(name) package_contents = set(contents(package)) @@ -200,8 +143,8 @@ def contents(package: Package) -> Iterable[str]: not considered resources. Use `is_resource()` on each entry returned here to check if it is a resource or not. """ - package = _get_package(package) - reader = _get_resource_reader(package) + package = _common.get_package(package) + reader = _common.get_resource_reader(package) if reader is not None: return reader.contents() # Is the package a namespace package? By definition, namespace packages diff --git a/Lib/zipimport.py b/Lib/zipimport.py index 5ef0a17..080e0c4 100644 --- a/Lib/zipimport.py +++ b/Lib/zipimport.py @@ -280,11 +280,8 @@ class zipimporter: return None except ZipImportError: return None - if not _ZipImportResourceReader._registered: - from importlib.abc import ResourceReader - ResourceReader.register(_ZipImportResourceReader) - _ZipImportResourceReader._registered = True - return _ZipImportResourceReader(self, fullname) + from importlib.readers import ZipReader + return ZipReader(self, fullname) def __repr__(self): @@ -719,74 +716,3 @@ def _get_module_code(self, fullname): return code, ispackage, modpath else: raise ZipImportError(f"can't find module {fullname!r}", name=fullname) - - -class _ZipImportResourceReader: - """Private class used to support ZipImport.get_resource_reader(). - - This class is allowed to reference all the innards and private parts of - the zipimporter. - """ - _registered = False - - def __init__(self, zipimporter, fullname): - self.zipimporter = zipimporter - self.fullname = fullname - - def open_resource(self, resource): - fullname_as_path = self.fullname.replace('.', '/') - path = f'{fullname_as_path}/{resource}' - from io import BytesIO - try: - return BytesIO(self.zipimporter.get_data(path)) - except OSError: - raise FileNotFoundError(path) - - def resource_path(self, resource): - # All resources are in the zip file, so there is no path to the file. - # Raising FileNotFoundError tells the higher level API to extract the - # binary data and create a temporary file. - raise FileNotFoundError - - def is_resource(self, name): - # Maybe we could do better, but if we can get the data, it's a - # resource. Otherwise it isn't. - fullname_as_path = self.fullname.replace('.', '/') - path = f'{fullname_as_path}/{name}' - try: - self.zipimporter.get_data(path) - except OSError: - return False - return True - - def contents(self): - # This is a bit convoluted, because fullname will be a module path, - # but _files is a list of file names relative to the top of the - # archive's namespace. We want to compare file paths to find all the - # names of things inside the module represented by fullname. So we - # turn the module path of fullname into a file path relative to the - # top of the archive, and then we iterate through _files looking for - # names inside that "directory". - from pathlib import Path - fullname_path = Path(self.zipimporter.get_filename(self.fullname)) - relative_path = fullname_path.relative_to(self.zipimporter.archive) - # Don't forget that fullname names a package, so its path will include - # __init__.py, which we want to ignore. - assert relative_path.name == '__init__.py' - package_path = relative_path.parent - subdirs_seen = set() - for filename in self.zipimporter._files: - try: - relative = Path(filename).relative_to(package_path) - except ValueError: - continue - # If the path of the file (which is relative to the top of the zip - # namespace), relative to the package given when the resource - # reader was created, has a parent, then it's a name in a - # subdirectory and thus we skip it. - parent_name = relative.parent.name - if len(parent_name) == 0: - yield relative.name - elif parent_name not in subdirs_seen: - subdirs_seen.add(parent_name) - yield parent_name |