import sys import os import marshal import glob import importlib import importlib.util import re import struct import time import unittest import unittest.mock import warnings from test import support from test.support import import_helper from test.support import os_helper from zipfile import ZipFile, ZipInfo, ZIP_STORED, ZIP_DEFLATED import zipimport import linecache import doctest import inspect import io from traceback import extract_tb, extract_stack, print_tb try: import zlib except ImportError: zlib = None test_src = """\ def get_name(): return __name__ def get_file(): return __file__ """ test_co = compile(test_src, "", "exec") raise_src = 'def do_raise(): raise TypeError\n' def make_pyc(co, mtime, size): data = marshal.dumps(co) pyc = (importlib.util.MAGIC_NUMBER + struct.pack("", "exec"), NOW, len(src)) files = {TESTMOD + pyc_ext: (NOW, pyc), "some.data": (NOW, "some data")} self.doTest(pyc_ext, files, TESTMOD) def testDefaultOptimizationLevel(self): # zipimport should use the default optimization level (#28131) src = """if 1: # indent hack def test(val): assert(val) return val\n""" files = {TESTMOD + '.py': (NOW, src)} self.makeZip(files) sys.path.insert(0, TEMP_ZIP) mod = importlib.import_module(TESTMOD) self.assertEqual(mod.test(1), 1) if __debug__: self.assertRaises(AssertionError, mod.test, False) else: self.assertEqual(mod.test(0), 0) def testImport_WithStuff(self): # try importing from a zipfile which contains additional # stuff at the beginning of the file files = {TESTMOD + ".py": (NOW, test_src)} self.doTest(".py", files, TESTMOD, stuff=b"Some Stuff"*31) def assertModuleSource(self, module): self.assertEqual(inspect.getsource(module), test_src) def testGetSource(self): files = {TESTMOD + ".py": (NOW, test_src)} self.doTest(".py", files, TESTMOD, call=self.assertModuleSource) def testGetCompiledSource(self): pyc = make_pyc(compile(test_src, "", "exec"), NOW, len(test_src)) files = {TESTMOD + ".py": (NOW, test_src), TESTMOD + pyc_ext: (NOW, pyc)} self.doTest(pyc_ext, files, TESTMOD, call=self.assertModuleSource) def runDoctest(self, callback): files = {TESTMOD + ".py": (NOW, test_src), "xyz.txt": (NOW, ">>> log.append(True)\n")} self.doTest(".py", files, TESTMOD, call=callback) def doDoctestFile(self, module): log = [] old_master, doctest.master = doctest.master, None try: doctest.testfile( 'xyz.txt', package=module, module_relative=True, globs=locals() ) finally: doctest.master = old_master self.assertEqual(log,[True]) def testDoctestFile(self): self.runDoctest(self.doDoctestFile) def doDoctestSuite(self, module): log = [] doctest.DocFileTest( 'xyz.txt', package=module, module_relative=True, globs=locals() ).run() self.assertEqual(log,[True]) def testDoctestSuite(self): self.runDoctest(self.doDoctestSuite) def doTraceback(self, module): try: module.do_raise() except Exception as e: tb = e.__traceback__.tb_next f,lno,n,line = extract_tb(tb, 1)[0] self.assertEqual(line, raise_src.strip()) f,lno,n,line = extract_stack(tb.tb_frame, 1)[0] self.assertEqual(line, raise_src.strip()) s = io.StringIO() print_tb(tb, 1, s) self.assertTrue(s.getvalue().endswith( ' def do_raise(): raise TypeError\n' '' if support.has_no_debug_ranges() else ' ^^^^^^^^^^^^^^^\n' )) else: raise AssertionError("This ought to be impossible") def testTraceback(self): files = {TESTMOD + ".py": (NOW, raise_src)} self.doTest(None, files, TESTMOD, call=self.doTraceback) @unittest.skipIf(os_helper.TESTFN_UNENCODABLE is None, "need an unencodable filename") def testUnencodable(self): filename = os_helper.TESTFN_UNENCODABLE + ".zip" self.addCleanup(os_helper.unlink, filename) with ZipFile(filename, "w") as z: zinfo = ZipInfo(TESTMOD + ".py", time.localtime(NOW)) zinfo.compress_type = self.compression z.writestr(zinfo, test_src) spec = zipimport.zipimporter(filename).find_spec(TESTMOD) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) def testBytesPath(self): filename = os_helper.TESTFN + ".zip" self.addCleanup(os_helper.unlink, filename) with ZipFile(filename, "w") as z: zinfo = ZipInfo(TESTMOD + ".py", time.localtime(NOW)) zinfo.compress_type = self.compression z.writestr(zinfo, test_src) zipimport.zipimporter(filename) with self.assertRaises(TypeError): zipimport.zipimporter(os.fsencode(filename)) with self.assertRaises(TypeError): zipimport.zipimporter(bytearray(os.fsencode(filename))) with self.assertRaises(TypeError): zipimport.zipimporter(memoryview(os.fsencode(filename))) def testComment(self): files = {TESTMOD + ".py": (NOW, test_src)} self.doTest(".py", files, TESTMOD, comment=b"comment") def testBeginningCruftAndComment(self): files = {TESTMOD + ".py": (NOW, test_src)} self.doTest(".py", files, TESTMOD, stuff=b"cruft" * 64, comment=b"hi") def testLargestPossibleComment(self): files = {TESTMOD + ".py": (NOW, test_src)} self.doTest(".py", files, TESTMOD, comment=b"c" * ((1 << 16) - 1)) def testZip64(self): files = self.getZip64Files() self.doTest(".py", files, "f6") def testZip64CruftAndComment(self): files = self.getZip64Files() self.doTest(".py", files, "f65536", comment=b"c" * ((1 << 16) - 1)) def testZip64LargeFile(self): support.requires( "largefile", f"test generates files >{0xFFFFFFFF} bytes and takes a long time " "to run" ) # N.B.: We do alot of gymnastics below in the ZIP_STORED case to save # and reconstruct a sparse zip on systems that support sparse files. # Instead of creating a ~8GB zip file mainly consisting of null bytes # for every run of the test, we create the zip once and save off the # non-null portions of the resulting file as data blobs with offsets # that allow re-creating the zip file sparsely. This drops disk space # usage to ~9KB for the ZIP_STORED case and drops that test time by ~2 # orders of magnitude. For the ZIP_DEFLATED case, however, we bite the # bullet. The resulting zip file is ~8MB of non-null data; so the sparse # trick doesn't work and would result in that full ~8MB zip data file # being checked in to source control. parts_glob = f"sparse-zip64-c{self.compression:d}-0x*.part" full_parts_glob = os.path.join(TEST_DATA_DIR, parts_glob) pre_built_zip_parts = glob.glob(full_parts_glob) self.addCleanup(os_helper.unlink, TEMP_ZIP) if not pre_built_zip_parts: if self.compression != ZIP_STORED: support.requires( "cpu", "test requires a lot of CPU for compression." ) self.addCleanup(os_helper.unlink, os_helper.TESTFN) with open(os_helper.TESTFN, "wb") as f: f.write(b"data") f.write(os.linesep.encode()) f.seek(0xffff_ffff, os.SEEK_CUR) f.write(os.linesep.encode()) os.utime(os_helper.TESTFN, (0.0, 0.0)) with ZipFile( TEMP_ZIP, "w", compression=self.compression, strict_timestamps=False ) as z: z.write(os_helper.TESTFN, "data1") z.writestr( ZipInfo("module.py", (1980, 1, 1, 0, 0, 0)), test_src ) z.write(os_helper.TESTFN, "data2") # This "works" but relies on the zip format having a non-empty # final page due to the trailing central directory to wind up with # the correct length file. def make_sparse_zip_parts(name): empty_page = b"\0" * 4096 with open(name, "rb") as f: part = None try: while True: offset = f.tell() data = f.read(len(empty_page)) if not data: break if data != empty_page: if not part: part_fullname = os.path.join( TEST_DATA_DIR, f"sparse-zip64-c{self.compression:d}-" f"{offset:#011x}.part", ) os.makedirs( os.path.dirname(part_fullname), exist_ok=True ) part = open(part_fullname, "wb") print("Created", part_fullname) part.write(data) else: if part: part.close() part = None finally: if part: part.close() if self.compression == ZIP_STORED: print(f"Creating sparse parts to check in into {TEST_DATA_DIR}:") make_sparse_zip_parts(TEMP_ZIP) else: def extract_offset(name): if m := re.search(r"-(0x[0-9a-f]{9})\.part$", name): return int(m.group(1), base=16) raise ValueError(f"{name=} does not fit expected pattern.") offset_parts = [(extract_offset(n), n) for n in pre_built_zip_parts] with open(TEMP_ZIP, "wb") as f: for offset, part_fn in sorted(offset_parts): with open(part_fn, "rb") as part: f.seek(offset, os.SEEK_SET) f.write(part.read()) # Confirm that the reconstructed zip file works and looks right. with ZipFile(TEMP_ZIP, "r") as z: self.assertEqual( z.getinfo("module.py").date_time, (1980, 1, 1, 0, 0, 0) ) self.assertEqual( z.read("module.py"), test_src.encode(), msg=f"Recreate {full_parts_glob}, unexpected contents." ) def assertDataEntry(name): zinfo = z.getinfo(name) self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) self.assertGreater(zinfo.file_size, 0xffff_ffff) assertDataEntry("data1") assertDataEntry("data2") self.doTestWithPreBuiltZip(".py", "module") @support.requires_zlib() class CompressedZipImportTestCase(UncompressedZipImportTestCase): compression = ZIP_DEFLATED class BadFileZipImportTestCase(unittest.TestCase): def assertZipFailure(self, filename): self.assertRaises(zipimport.ZipImportError, zipimport.zipimporter, filename) def testNoFile(self): self.assertZipFailure('AdfjdkFJKDFJjdklfjs') def testEmptyFilename(self): self.assertZipFailure('') def testBadArgs(self): self.assertRaises(TypeError, zipimport.zipimporter, None) self.assertRaises(TypeError, zipimport.zipimporter, TESTMOD, kwd=None) self.assertRaises(TypeError, zipimport.zipimporter, list(os.fsencode(TESTMOD))) def testFilenameTooLong(self): self.assertZipFailure('A' * 33000) def testEmptyFile(self): os_helper.unlink(TESTMOD) os_helper.create_empty_file(TESTMOD) self.assertZipFailure(TESTMOD) @unittest.skipIf(support.is_wasi, "mode 000 not supported.") def testFileUnreadable(self): os_helper.unlink(TESTMOD) fd = os.open(TESTMOD, os.O_CREAT, 000) try: os.close(fd) with self.assertRaises(zipimport.ZipImportError) as cm: zipimport.zipimporter(TESTMOD) finally: # If we leave "the read-only bit" set on Windows, nothing can # delete TESTMOD, and later tests suffer bogus failures. os.chmod(TESTMOD, 0o666) os_helper.unlink(TESTMOD) def testNotZipFile(self): os_helper.unlink(TESTMOD) fp = open(TESTMOD, 'w+') fp.write('a' * 22) fp.close() self.assertZipFailure(TESTMOD) # XXX: disabled until this works on Big-endian machines def _testBogusZipFile(self): os_helper.unlink(TESTMOD) fp = open(TESTMOD, 'w+') fp.write(struct.pack('=I', 0x06054B50)) fp.write('a' * 18) fp.close() z = zipimport.zipimporter(TESTMOD) try: with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) self.assertRaises(TypeError, z.load_module, None) self.assertRaises(TypeError, z.find_module, None) self.assertRaises(TypeError, z.find_spec, None) self.assertRaises(TypeError, z.exec_module, None) self.assertRaises(TypeError, z.is_package, None) self.assertRaises(TypeError, z.get_code, None) self.assertRaises(TypeError, z.get_data, None) self.assertRaises(TypeError, z.get_source, None) error = zipimport.ZipImportError self.assertIsNone(z.find_spec('abc')) with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) self.assertRaises(error, z.load_module, 'abc') self.assertRaises(error, z.get_code, 'abc') self.assertRaises(OSError, z.get_data, 'abc') self.assertRaises(error, z.get_source, 'abc') self.assertRaises(error, z.is_package, 'abc') finally: zipimport._zip_directory_cache.clear() def tearDownModule(): os_helper.unlink(TESTMOD) if __name__ == "__main__": unittest.main()