summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Doc/library/importlib.rst3
-rw-r--r--Lib/importlib/_bootstrap.py137
-rw-r--r--Lib/importlib/abc.py102
-rw-r--r--Lib/importlib/test/source/test_abc_loader.py7
4 files changed, 143 insertions, 106 deletions
diff --git a/Doc/library/importlib.rst b/Doc/library/importlib.rst
index 300653a..008df09 100644
--- a/Doc/library/importlib.rst
+++ b/Doc/library/importlib.rst
@@ -248,7 +248,8 @@ are also provided to help in implementing the core ABCs.
.. method:: set_data(self, path, data)
Optional abstract method which writes the specified bytes to a file
- path.
+ path. When writing to the path fails because the path is read-only, do
+ not propagate the exception.
.. method:: get_code(self, fullname)
diff --git a/Lib/importlib/_bootstrap.py b/Lib/importlib/_bootstrap.py
index 511f7dd..145be93 100644
--- a/Lib/importlib/_bootstrap.py
+++ b/Lib/importlib/_bootstrap.py
@@ -301,6 +301,143 @@ class FrozenImporter:
return imp.is_frozen_package(fullname)
+class SourceLoader:
+
+ def path_mtime(self, path:str) -> int:
+ """Optional method that returns the modification time for the specified
+ path.
+
+ Implementing this method allows the loader to read bytecode files.
+
+ """
+ raise NotImplementedError
+
+ def set_data(self, path:str, data:bytes) -> None:
+ """Optional method which writes data to a file path.
+
+ Implementing this method allows for the writing of bytecode files.
+
+ """
+ raise NotImplementedError
+
+ def is_package(self, fullname):
+ """Concrete implementation of InspectLoader.is_package by checking if
+ the path returned by get_filename has a filename of '__init__.py'."""
+ filename = self.get_filename(fullname).rsplit(path_sep, 1)[1]
+ return filename.rsplit('.', 1)[0] == '__init__'
+
+ def get_source(self, fullname):
+ """Concrete implementation of InspectLoader.get_source."""
+ import tokenize
+ path = self.get_filename(fullname)
+ try:
+ source_bytes = self.get_data(path)
+ except IOError:
+ raise ImportError("source not available through get_data()")
+ encoding = tokenize.detect_encoding(_io.BytesIO(source_bytes).readline)
+ # XXX Universal newlines?
+ return source_bytes.decode(encoding[0])
+
+ def get_code(self, fullname):
+ """Concrete implementation of InspectLoader.get_code.
+
+ Reading of bytecode requires path_mtime to be implemented. To write
+ bytecode, set_data must also be implemented.
+
+ """
+ source_path = self.get_filename(fullname)
+ bytecode_path = imp.cache_from_source(source_path)
+ source_mtime = None
+ if bytecode_path is not None:
+ try:
+ source_mtime = self.path_mtime(source_path)
+ except NotImplementedError:
+ pass
+ else:
+ try:
+ data = self.get_data(bytecode_path)
+ except IOError:
+ pass
+ else:
+ magic = data[:4]
+ raw_timestamp = data[4:8]
+ if (len(magic) == 4 and len(raw_timestamp) == 4 and
+ magic == imp.get_magic() and
+ marshal._r_long(raw_timestamp) == source_mtime):
+ return marshal.loads(data[8:])
+ source_bytes = self.get_data(source_path)
+ code_object = compile(source_bytes, source_path, 'exec',
+ dont_inherit=True)
+ if (not sys.dont_write_bytecode and bytecode_path is not None and
+ source_mtime is not None):
+ # If e.g. Jython ever implements imp.cache_from_source to have
+ # their own cached file format, this block of code will most likely
+ # throw an exception.
+ data = bytearray(imp.get_magic())
+ data.extend(marshal._w_long(source_mtime))
+ data.extend(marshal.dumps(code_object))
+ try:
+ self.set_data(bytecode_path, data)
+ except NotImplementedError:
+ pass
+ return code_object
+
+ @module_for_loader
+ def load_module(self, module):
+ """Concrete implementation of Loader.load_module.
+
+ Requires ExecutionLoader.get_filename and ResourceLoader.get_data to be
+ implemented to load source code. Use of bytecode is dictated by whether
+ get_code uses/writes bytecode.
+
+ """
+ name = module.__name__
+ code_object = self.get_code(name)
+ module.__file__ = self.get_filename(name)
+ module.__cached__ = imp.cache_from_source(module.__file__)
+ module.__package__ = name
+ is_package = self.is_package(name)
+ if is_package:
+ module.__path__ = [module.__file__.rsplit(path_sep, 1)[0]]
+ else:
+ module.__package__ = module.__package__.rpartition('.')[0]
+ module.__loader__ = self
+ exec(code_object, module.__dict__)
+ return module
+
+
+class _SourceFileLoader(SourceLoader):
+
+ """Concrete implementation of SourceLoader.
+
+ NOT A PUBLIC CLASS! Do not expect any API stability from this class, so DO
+ NOT SUBCLASS IN YOUR OWN CODE!
+
+ """
+
+ def __init__(self, fullname, path):
+ self._name = fullname
+ self._path = path
+
+ @_check_name
+ def get_filename(self, fullname):
+ """Return the path to the source file as found by the finder."""
+ return self._path
+
+ def path_mtime(self, path):
+ """Return the modification time for the path."""
+ return int(_os.stat(path).st_mtime)
+
+ def set_data(self, data, path):
+ """Write bytes data to a file."""
+ try:
+ with _closing(_io.FileIO(bytecode_path, 'w')) as file:
+ file.write(data)
+ except IOError as exc:
+ if exc.errno != errno.EACCES:
+ raise
+
+
class PyLoader:
"""Loader base class for Python source code.
diff --git a/Lib/importlib/abc.py b/Lib/importlib/abc.py
index 6a688d1..a9cd3c8 100644
--- a/Lib/importlib/abc.py
+++ b/Lib/importlib/abc.py
@@ -100,7 +100,7 @@ class ExecutionLoader(InspectLoader):
raise NotImplementedError
-class SourceLoader(ResourceLoader, ExecutionLoader):
+class SourceLoader(_bootstrap.SourceLoader, ResourceLoader, ExecutionLoader):
"""Abstract base class for loading source code (and optionally any
corresponding bytecode).
@@ -117,106 +117,6 @@ class SourceLoader(ResourceLoader, ExecutionLoader):
"""
- def path_mtime(self, path:str) -> int:
- """Optional method that returns the modification time for the specified
- path.
-
- Implementing this method allows the loader to read bytecode files.
-
- """
- raise NotImplementedError
-
- def set_data(self, path:str, data:bytes) -> None:
- """Optional method which writes data to a file path.
-
- Implementing this method allows for the writing of bytecode files.
-
- """
- raise NotImplementedError
-
- def is_package(self, fullname):
- """Concrete implementation of InspectLoader.is_package by checking if
- the path returned by get_filename has a filename of '__init__.py'."""
- filename = os.path.basename(self.get_filename(fullname))
- return os.path.splitext(filename)[0] == '__init__'
-
- def get_source(self, fullname):
- """Concrete implementation of InspectLoader.get_source."""
- path = self.get_filename(fullname)
- try:
- source_bytes = self.get_data(path)
- except IOError:
- raise ImportError("source not available through get_data()")
- encoding = tokenize.detect_encoding(io.BytesIO(source_bytes).readline)
- return source_bytes.decode(encoding[0])
-
- def get_code(self, fullname):
- """Concrete implementation of InspectLoader.get_code.
-
- Reading of bytecode requires path_mtime to be implemented. To write
- bytecode, set_data must also be implemented.
-
- """
- source_path = self.get_filename(fullname)
- bytecode_path = imp.cache_from_source(source_path)
- source_mtime = None
- if bytecode_path is not None:
- try:
- source_mtime = self.path_mtime(source_path)
- except NotImplementedError:
- pass
- else:
- try:
- data = self.get_data(bytecode_path)
- except IOError:
- pass
- else:
- magic = data[:4]
- raw_timestamp = data[4:8]
- if (len(magic) == 4 and len(raw_timestamp) == 4 and
- magic == imp.get_magic() and
- marshal._r_long(raw_timestamp) == source_mtime):
- return marshal.loads(data[8:])
- source_bytes = self.get_data(source_path)
- code_object = compile(source_bytes, source_path, 'exec',
- dont_inherit=True)
- if (not sys.dont_write_bytecode and bytecode_path is not None and
- source_mtime is not None):
- # If e.g. Jython ever implements imp.cache_from_source to have
- # their own cached file format, this block of code will most likely
- # throw an exception.
- data = bytearray(imp.get_magic())
- data.extend(marshal._w_long(source_mtime))
- data.extend(marshal.dumps(code_object))
- try:
- self.set_data(bytecode_path, data)
- except (NotImplementedError, IOError):
- pass
- return code_object
-
- @util.module_for_loader
- def load_module(self, module):
- """Concrete implementation of Loader.load_module.
-
- Requires ExecutionLoader.get_filename and ResourceLoader.get_data to be
- implemented to load source code. Use of bytecode is dictated by whether
- get_code uses/writes bytecode.
-
- """
- name = module.__name__
- code_object = self.get_code(name)
- module.__file__ = self.get_filename(name)
- module.__cached__ = imp.cache_from_source(module.__file__)
- module.__package__ = name
- is_package = self.is_package(name)
- if is_package:
- module.__path__ = [os.path.dirname(module.__file__)]
- else:
- module.__package__ = module.__package__.rpartition('.')[0]
- module.__loader__ = self
- exec(code_object, module.__dict__)
- return module
-
class PyLoader(SourceLoader):
diff --git a/Lib/importlib/test/source/test_abc_loader.py b/Lib/importlib/test/source/test_abc_loader.py
index 62729b4..6bdaadb 100644
--- a/Lib/importlib/test/source/test_abc_loader.py
+++ b/Lib/importlib/test/source/test_abc_loader.py
@@ -748,10 +748,9 @@ class SourceLoaderBytecodeTests(SourceLoaderTestHarness):
return closure
self.setUp(magic=b'0000')
- for exc in (NotImplementedError, IOError):
- self.loader.set_data = raise_exception(exc)
- code_object = self.loader.get_code(self.name)
- self.verify_code(code_object)
+ self.loader.set_data = raise_exception(NotImplementedError)
+ code_object = self.loader.get_code(self.name)
+ self.verify_code(code_object)
class AbstractMethodImplTests(unittest.TestCase):