summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_zipimport.py
diff options
context:
space:
mode:
authorGregory P. Smith <greg@krypto.org>2014-01-08 02:30:07 (GMT)
committerGregory P. Smith <greg@krypto.org>2014-01-08 02:30:07 (GMT)
commit2bcbc141173e5cd2ed36693a71bed8d5a1a54dd4 (patch)
tree0a48d6c6c345b49f1d341f716c9be9ce84a336a7 /Lib/test/test_zipimport.py
parenta21acb5d95841e7d2c8491f40189a853cb7b5df4 (diff)
downloadcpython-2bcbc141173e5cd2ed36693a71bed8d5a1a54dd4.zip
cpython-2bcbc141173e5cd2ed36693a71bed8d5a1a54dd4.tar.gz
cpython-2bcbc141173e5cd2ed36693a71bed8d5a1a54dd4.tar.bz2
Fixes Issue #19081: When a zipimport .zip file in sys.path being imported from
is modified during the lifetime of the Python process after zipimport has already cached the zip's table of contents we detect this and recover rather than read bad data from the .zip (causing odd import errors).
Diffstat (limited to 'Lib/test/test_zipimport.py')
-rw-r--r--Lib/test/test_zipimport.py104
1 files changed, 85 insertions, 19 deletions
diff --git a/Lib/test/test_zipimport.py b/Lib/test/test_zipimport.py
index 37603b9..0459596 100644
--- a/Lib/test/test_zipimport.py
+++ b/Lib/test/test_zipimport.py
@@ -46,6 +46,27 @@ pyc_file = imp.cache_from_source(TESTMOD + '.py')
pyc_ext = ('.pyc' if __debug__ else '.pyo')
+def _write_zip_package(zipname, files,
+ data_to_prepend=b"", compression=ZIP_STORED):
+ z = ZipFile(zipname, "w")
+ try:
+ for name, (mtime, data) in files.items():
+ zinfo = ZipInfo(name, time.localtime(mtime))
+ zinfo.compress_type = compression
+ z.writestr(zinfo, data)
+ finally:
+ z.close()
+
+ if data_to_prepend:
+ # Prepend data to the start of the zipfile
+ with open(zipname, "rb") as f:
+ zip_data = f.read()
+
+ with open(zipname, "wb") as f:
+ f.write(data_to_prepend)
+ f.write(zip_data)
+
+
class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
compression = ZIP_STORED
@@ -58,23 +79,9 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
ImportHooksBaseTestCase.setUp(self)
def doTest(self, expected_ext, files, *modules, **kw):
- z = ZipFile(TEMP_ZIP, "w")
+ _write_zip_package(TEMP_ZIP, files, data_to_prepend=kw.get("stuff"),
+ compression=self.compression)
try:
- for name, (mtime, data) in files.items():
- zinfo = ZipInfo(name, time.localtime(mtime))
- zinfo.compress_type = self.compression
- z.writestr(zinfo, data)
- z.close()
-
- stuff = kw.get("stuff", None)
- if stuff is not None:
- # Prepend 'stuff' to the start of the zipfile
- with open(TEMP_ZIP, "rb") as f:
- data = f.read()
- with open(TEMP_ZIP, "wb") as f:
- f.write(stuff)
- f.write(data)
-
sys.path.insert(0, TEMP_ZIP)
mod = __import__(".".join(modules), globals(), locals(),
@@ -89,7 +96,8 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
self.assertEqual(file, os.path.join(TEMP_ZIP,
*modules) + expected_ext)
finally:
- z.close()
+ while TEMP_ZIP in sys.path:
+ sys.path.remove(TEMP_ZIP)
os.remove(TEMP_ZIP)
def testAFakeZlib(self):
@@ -395,10 +403,67 @@ class CompressedZipImportTestCase(UncompressedZipImportTestCase):
compression = ZIP_DEFLATED
+class ZipFileModifiedAfterImportTestCase(ImportHooksBaseTestCase):
+ def setUp(self):
+ zipimport._zip_directory_cache.clear()
+ zipimport._zip_stat_cache.clear()
+ ImportHooksBaseTestCase.setUp(self)
+
+ def tearDown(self):
+ ImportHooksBaseTestCase.tearDown(self)
+ if os.path.exists(TEMP_ZIP):
+ os.remove(TEMP_ZIP)
+
+ def testZipFileChangesAfterFirstImport(self):
+ """Alter the zip file after caching its index and try an import."""
+ packdir = TESTPACK + os.sep
+ files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc),
+ packdir + TESTMOD + ".py": (NOW, "test_value = 38\n"),
+ "ziptest_a.py": (NOW, "test_value = 23\n"),
+ "ziptest_b.py": (NOW, "test_value = 42\n"),
+ "ziptest_c.py": (NOW, "test_value = 1337\n")}
+ zipfile_path = TEMP_ZIP
+ _write_zip_package(zipfile_path, files)
+ self.assertTrue(os.path.exists(zipfile_path))
+ sys.path.insert(0, zipfile_path)
+
+ # Import something out of the zipfile and confirm it is correct.
+ testmod = __import__(TESTPACK + "." + TESTMOD,
+ globals(), locals(), ["__dummy__"])
+ self.assertEqual(testmod.test_value, 38)
+ # Import something else out of the zipfile and confirm it is correct.
+ ziptest_b = __import__("ziptest_b", globals(), locals(), ["test_value"])
+ self.assertEqual(ziptest_b.test_value, 42)
+
+ # Truncate and fill the zip file with non-zip garbage.
+ with open(zipfile_path, "rb") as orig_zip_file:
+ orig_zip_file_contents = orig_zip_file.read()
+ with open(zipfile_path, "wb") as byebye_valid_zip_file:
+ byebye_valid_zip_file.write(b"Tear down this wall!\n"*1987)
+ # Now that the zipfile has been replaced, import something else from it
+ # which should fail as the file contents are now garbage.
+ with self.assertRaises(ImportError):
+ ziptest_a = __import__("ziptest_a", globals(), locals(),
+ ["test_value"])
+ self.assertEqual(ziptest_a.test_value, 23)
+
+ # Now lets make it a valid zipfile that has some garbage at the start.
+ # This alters all of the offsets within the file
+ with open(zipfile_path, "wb") as new_zip_file:
+ new_zip_file.write(b"X"*1991) # The year Python was created.
+ new_zip_file.write(orig_zip_file_contents)
+
+ # Now that the zip file has been "restored" to a valid but different
+ # zipfile the zipimporter should *successfully* re-read the new zip
+ # file's end of file central index and be able to import from it again.
+ ziptest_c = __import__("ziptest_c", globals(), locals(), ["test_value"])
+ self.assertEqual(ziptest_c.test_value, 1337)
+
+
class BadFileZipImportTestCase(unittest.TestCase):
def assertZipFailure(self, filename):
- self.assertRaises(zipimport.ZipImportError,
- zipimport.zipimporter, filename)
+ with self.assertRaises(zipimport.ZipImportError):
+ zipimport.zipimporter(filename)
def testNoFile(self):
self.assertZipFailure('AdfjdkFJKDFJjdklfjs')
@@ -472,6 +537,7 @@ def test_main():
UncompressedZipImportTestCase,
CompressedZipImportTestCase,
BadFileZipImportTestCase,
+ ZipFileModifiedAfterImportTestCase,
)
finally:
support.unlink(TESTMOD)