diff options
Diffstat (limited to 'Lib/test/test_zipfile.py')
| -rw-r--r-- | Lib/test/test_zipfile.py | 519 |
1 files changed, 387 insertions, 132 deletions
diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py index 12a0f71..4bdf7d4 100644 --- a/Lib/test/test_zipfile.py +++ b/Lib/test/test_zipfile.py @@ -1,9 +1,9 @@ +import contextlib import io import os import sys -import imp +import importlib.util import time -import shutil import struct import zipfile import unittest @@ -12,9 +12,9 @@ import unittest from tempfile import TemporaryFile from random import randint, random, getrandbits -from test.support import (TESTFN, findfile, unlink, +from test.support import (TESTFN, findfile, unlink, rmtree, requires_zlib, requires_bz2, requires_lzma, - captured_stdout) + captured_stdout, check_warnings) TESTFN2 = TESTFN + "2" TESTFNDIR = TESTFN + "d" @@ -26,6 +26,9 @@ SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] +def getrandbytes(size): + return getrandbits(8 * size).to_bytes(size, 'little') + def get_files(test): yield TESTFN2 with TemporaryFile() as f: @@ -35,6 +38,10 @@ def get_files(test): yield f test.assertFalse(f.closed) +def openU(zipfp, fn): + with check_warnings(('', DeprecationWarning)): + return zipfp.open(fn, 'rU') + class AbstractTestsWithSourceFile: @classmethod def setUpClass(cls): @@ -286,7 +293,7 @@ class AbstractTestsWithSourceFile: # than requested. for test_size in (1, 4095, 4096, 4097, 16384): file_size = test_size + 1 - junk = getrandbits(8 * file_size).to_bytes(file_size, 'little') + junk = getrandbytes(file_size) with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: zipf.writestr('foo', junk) with zipf.open('foo', 'r') as fp: @@ -459,7 +466,9 @@ class AbstractTestZip64InSmallFiles: def setUp(self): self._limit = zipfile.ZIP64_LIMIT - zipfile.ZIP64_LIMIT = 5 + self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT + zipfile.ZIP64_LIMIT = 1000 + zipfile.ZIP_FILECOUNT_LIMIT = 9 # Make a source file with some lines with open(TESTFN, "wb") as fp: @@ -526,8 +535,67 @@ class AbstractTestZip64InSmallFiles: for f in get_files(self): self.zip_test(f, self.compression) + def test_too_many_files(self): + # This test checks that more than 64k files can be added to an archive, + # and that the resulting archive can be read properly by ZipFile + zipf = zipfile.ZipFile(TESTFN, "w", self.compression, + allowZip64=True) + zipf.debug = 100 + numfiles = 15 + for i in range(numfiles): + zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) + self.assertEqual(len(zipf.namelist()), numfiles) + zipf.close() + + zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) + self.assertEqual(len(zipf2.namelist()), numfiles) + for i in range(numfiles): + content = zipf2.read("foo%08d" % i).decode('ascii') + self.assertEqual(content, "%d" % (i**3 % 57)) + zipf2.close() + + def test_too_many_files_append(self): + zipf = zipfile.ZipFile(TESTFN, "w", self.compression, + allowZip64=False) + zipf.debug = 100 + numfiles = 9 + for i in range(numfiles): + zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) + self.assertEqual(len(zipf.namelist()), numfiles) + with self.assertRaises(zipfile.LargeZipFile): + zipf.writestr("foo%08d" % numfiles, b'') + self.assertEqual(len(zipf.namelist()), numfiles) + zipf.close() + + zipf = zipfile.ZipFile(TESTFN, "a", self.compression, + allowZip64=False) + zipf.debug = 100 + self.assertEqual(len(zipf.namelist()), numfiles) + with self.assertRaises(zipfile.LargeZipFile): + zipf.writestr("foo%08d" % numfiles, b'') + self.assertEqual(len(zipf.namelist()), numfiles) + zipf.close() + + zipf = zipfile.ZipFile(TESTFN, "a", self.compression, + allowZip64=True) + zipf.debug = 100 + self.assertEqual(len(zipf.namelist()), numfiles) + numfiles2 = 15 + for i in range(numfiles, numfiles2): + zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) + self.assertEqual(len(zipf.namelist()), numfiles2) + zipf.close() + + zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) + self.assertEqual(len(zipf2.namelist()), numfiles2) + for i in range(numfiles2): + content = zipf2.read("foo%08d" % i).decode('ascii') + self.assertEqual(content, "%d" % (i**3 % 57)) + zipf2.close() + def tearDown(self): zipfile.ZIP64_LIMIT = self._limit + zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit unlink(TESTFN) unlink(TESTFN2) @@ -537,12 +605,12 @@ class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, compression = zipfile.ZIP_STORED def large_file_exception_test(self, f, compression): - with zipfile.ZipFile(f, "w", compression) as zipfp: + with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: self.assertRaises(zipfile.LargeZipFile, zipfp.write, TESTFN, "another.name") def large_file_exception_test2(self, f, compression): - with zipfile.ZipFile(f, "w", compression) as zipfp: + with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: self.assertRaises(zipfile.LargeZipFile, zipfp.writestr, "another.name", self.data) @@ -580,7 +648,21 @@ class PyZipFileTests(unittest.TestCase): if name + 'o' not in namelist: self.assertIn(name + 'c', namelist) + def requiresWriteAccess(self, path): + # effective_ids unavailable on windows + if not os.access(path, os.W_OK, + effective_ids=os.access in os.supports_effective_ids): + self.skipTest('requires write access to the installed location') + filename = os.path.join(path, 'test_zipfile.try') + try: + fd = os.open(filename, os.O_WRONLY | os.O_CREAT) + os.close(fd) + except Exception: + self.skipTest('requires write access to the installed location') + unlink(filename) + def test_write_pyfile(self): + self.requiresWriteAccess(os.path.dirname(__file__)) with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: fn = __file__ if fn.endswith('.pyc') or fn.endswith('.pyo'): @@ -588,7 +670,7 @@ class PyZipFileTests(unittest.TestCase): if os.altsep is not None: path_split.extend(fn.split(os.altsep)) if '__pycache__' in path_split: - fn = imp.source_from_cache(fn) + fn = importlib.util.source_from_cache(fn) else: fn = fn[:-1] @@ -612,6 +694,7 @@ class PyZipFileTests(unittest.TestCase): def test_write_python_package(self): import email packagedir = os.path.dirname(email.__file__) + self.requiresWriteAccess(packagedir) with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: zipfp.writepy(packagedir) @@ -622,16 +705,47 @@ class PyZipFileTests(unittest.TestCase): self.assertCompiledIn('email/__init__.py', names) self.assertCompiledIn('email/mime/text.py', names) + def test_write_filtered_python_package(self): + import test + packagedir = os.path.dirname(test.__file__) + self.requiresWriteAccess(packagedir) + + with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: + + # first make sure that the test folder gives error messages + # (on the badsyntax_... files) + with captured_stdout() as reportSIO: + zipfp.writepy(packagedir) + reportStr = reportSIO.getvalue() + self.assertTrue('SyntaxError' in reportStr) + + # then check that the filter works on the whole package + with captured_stdout() as reportSIO: + zipfp.writepy(packagedir, filterfunc=lambda whatever: False) + reportStr = reportSIO.getvalue() + self.assertTrue('SyntaxError' not in reportStr) + + # then check that the filter works on individual files + def filter(path): + return not os.path.basename(path).startswith("bad") + with captured_stdout() as reportSIO, self.assertWarns(UserWarning): + zipfp.writepy(packagedir, filterfunc=filter) + reportStr = reportSIO.getvalue() + if reportStr: + print(reportStr) + self.assertTrue('SyntaxError' not in reportStr) + def test_write_with_optimization(self): import email packagedir = os.path.dirname(email.__file__) + self.requiresWriteAccess(packagedir) # use .pyc if running test in optimization mode, # use .pyo if running test in debug mode optlevel = 1 if __debug__ else 0 ext = '.pyo' if optlevel == 1 else '.pyc' with TemporaryFile() as t, \ - zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: + zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: zipfp.writepy(packagedir) names = zipfp.namelist() @@ -659,14 +773,34 @@ class PyZipFileTests(unittest.TestCase): self.assertNotIn('mod2.txt', names) finally: - shutil.rmtree(TESTFN2) + rmtree(TESTFN2) + + def test_write_python_directory_filtered(self): + os.mkdir(TESTFN2) + try: + with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: + fp.write("print(42)\n") + + with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: + fp.write("print(42 * 42)\n") + + with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: + zipfp.writepy(TESTFN2, filterfunc=lambda fn: + not fn.endswith('mod2.py')) + + names = zipfp.namelist() + self.assertCompiledIn('mod1.py', names) + self.assertNotIn('mod2.py', names) + + finally: + rmtree(TESTFN2) def test_write_non_pyfile(self): with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: with open(TESTFN, 'w') as f: f.write('most definitely not a python file') self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) - os.remove(TESTFN) + unlink(TESTFN) def test_write_pyfile_bad_syntax(self): os.mkdir(TESTFN2) @@ -689,7 +823,7 @@ class PyZipFileTests(unittest.TestCase): self.assertNotIn('mod1.pyo', names) finally: - shutil.rmtree(TESTFN2) + rmtree(TESTFN2) class ExtractTests(unittest.TestCase): @@ -712,10 +846,10 @@ class ExtractTests(unittest.TestCase): with open(writtenfile, "rb") as f: self.assertEqual(fdata.encode(), f.read()) - os.remove(writtenfile) + unlink(writtenfile) # remove the test file subdirectories - shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) + rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) def test_extract_all(self): with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: @@ -730,10 +864,10 @@ class ExtractTests(unittest.TestCase): with open(outfile, "rb") as f: self.assertEqual(fdata.encode(), f.read()) - os.remove(outfile) + unlink(outfile) # remove the test file subdirectories - shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) + rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) def check_file(self, filename, content): self.assertTrue(os.path.isfile(filename)) @@ -764,25 +898,25 @@ class ExtractTests(unittest.TestCase): def test_extract_hackers_arcnames_windows_only(self): """Test combination of path fixing and windows name sanitization.""" windows_hacknames = [ - (r'..\foo\bar', 'foo/bar'), - (r'..\/foo\/bar', 'foo/bar'), - (r'foo/\..\/bar', 'foo/bar'), - (r'foo\/../\bar', 'foo/bar'), - (r'C:foo/bar', 'foo/bar'), - (r'C:/foo/bar', 'foo/bar'), - (r'C://foo/bar', 'foo/bar'), - (r'C:\foo\bar', 'foo/bar'), - (r'//conky/mountpoint/foo/bar', 'foo/bar'), - (r'\\conky\mountpoint\foo\bar', 'foo/bar'), - (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), - (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), - (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), - (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), - (r'//?/C:/foo/bar', 'foo/bar'), - (r'\\?\C:\foo\bar', 'foo/bar'), - (r'C:/../C:/foo/bar', 'C_/foo/bar'), - (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), - ('../../foo../../ba..r', 'foo/ba..r'), + (r'..\foo\bar', 'foo/bar'), + (r'..\/foo\/bar', 'foo/bar'), + (r'foo/\..\/bar', 'foo/bar'), + (r'foo\/../\bar', 'foo/bar'), + (r'C:foo/bar', 'foo/bar'), + (r'C:/foo/bar', 'foo/bar'), + (r'C://foo/bar', 'foo/bar'), + (r'C:\foo\bar', 'foo/bar'), + (r'//conky/mountpoint/foo/bar', 'foo/bar'), + (r'\\conky\mountpoint\foo\bar', 'foo/bar'), + (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), + (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), + (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), + (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), + (r'//?/C:/foo/bar', 'foo/bar'), + (r'\\?\C:\foo\bar', 'foo/bar'), + (r'C:/../C:/foo/bar', 'C_/foo/bar'), + (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), + ('../../foo../../ba..r', 'foo/ba..r'), ] self._test_extract_hackers_arcnames(windows_hacknames) @@ -815,12 +949,12 @@ class ExtractTests(unittest.TestCase): msg='extract %r: %r != %r' % (arcname, writtenfile, correctfile)) self.check_file(correctfile, content) - shutil.rmtree('target') + rmtree('target') with zipfile.ZipFile(TESTFN2, 'r') as zipfp: zipfp.extractall(targetpath) self.check_file(correctfile, content) - shutil.rmtree('target') + rmtree('target') correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) @@ -829,14 +963,14 @@ class ExtractTests(unittest.TestCase): self.assertEqual(writtenfile, correctfile, msg="extract %r" % arcname) self.check_file(correctfile, content) - shutil.rmtree(fixedname.split('/')[0]) + rmtree(fixedname.split('/')[0]) with zipfile.ZipFile(TESTFN2, 'r') as zipfp: zipfp.extractall() self.check_file(correctfile, content) - shutil.rmtree(fixedname.split('/')[0]) + rmtree(fixedname.split('/')[0]) - os.remove(TESTFN2) + unlink(TESTFN2) class OtherTests(unittest.TestCase): @@ -860,6 +994,17 @@ class OtherTests(unittest.TestCase): data += zipfp.read(info) self.assertIn(data, {b"foobar", b"barfoo"}) + def test_universal_deprecation(self): + f = io.BytesIO() + with zipfile.ZipFile(f, "w") as zipfp: + zipfp.writestr('spam.txt', b'ababagalamaga') + + with zipfile.ZipFile(f, "r") as zipfp: + for mode in 'U', 'rU': + with self.assertWarns(DeprecationWarning): + zipopen = zipfp.open('spam.txt', mode) + zipopen.close() + def test_universal_readaheads(self): f = io.BytesIO() @@ -869,7 +1014,7 @@ class OtherTests(unittest.TestCase): data2 = b'' with zipfile.ZipFile(f, 'r') as zipfp, \ - zipfp.open(TESTFN, 'rU') as zipopen: + openU(zipfp, TESTFN) as zipopen: for line in zipopen: data2 += line @@ -910,10 +1055,10 @@ class OtherTests(unittest.TestCase): def test_unsupported_version(self): # File has an extract_version of 120 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' - b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' - b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' - b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' - b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') + b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' + b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' + b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') self.assertRaises(NotImplementedError, zipfile.ZipFile, io.BytesIO(data), 'r') @@ -946,7 +1091,7 @@ class OtherTests(unittest.TestCase): try: with zipfile.ZipFile(TESTFN, 'a') as zf: zf.writestr(filename, content) - except IOError: + except OSError: self.fail('Could not append data to a non-existent zip file.') self.assertTrue(os.path.exists(TESTFN)) @@ -1018,7 +1163,7 @@ class OtherTests(unittest.TestCase): fp.seek(0, 0) self.assertTrue(zipfile.is_zipfile(fp)) - def test_non_existent_file_raises_IOError(self): + def test_non_existent_file_raises_OSError(self): # make sure we don't raise an AttributeError when a partially-constructed # ZipFile instance is finalized; this tests for regression on SF tracker # bug #403871. @@ -1030,7 +1175,7 @@ class OtherTests(unittest.TestCase): # it is ignored, but the user should be sufficiently annoyed by # the message on the output that regression will be noticed # quickly. - self.assertRaises(IOError, zipfile.ZipFile, TESTFN) + self.assertRaises(OSError, zipfile.ZipFile, TESTFN) def test_empty_file_raises_BadZipFile(self): f = open(TESTFN, 'w') @@ -1099,11 +1244,11 @@ class OtherTests(unittest.TestCase): def test_unsupported_compression(self): # data is declared as shrunk, but actually deflated data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' - b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' - b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' - b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' - b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' - b'/\x00\x00\x00!\x00\x00\x00\x00\x00') + b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' + b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' + b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' + b'/\x00\x00\x00!\x00\x00\x00\x00\x00') with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: self.assertRaises(NotImplementedError, zipf.open, 'x') @@ -1218,7 +1363,7 @@ class OtherTests(unittest.TestCase): def test_open_empty_file(self): # Issue 1710703: Check that opening a file with less than 22 bytes # raises a BadZipFile exception (rather than the previously unhelpful - # IOError) + # OSError) f = open(TESTFN, 'w') f.close() self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') @@ -1227,6 +1372,21 @@ class OtherTests(unittest.TestCase): self.assertRaises(ValueError, zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) + def test_zipfile_with_short_extra_field(self): + """If an extra field in the header is less than 4 bytes, skip it.""" + zipdata = ( + b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' + b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' + b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' + b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' + b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' + b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' + b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' + ) + with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf: + # testzip returns the name of the first corrupt file, or None + self.assertIsNone(zipf.testzip()) + def tearDown(self): unlink(TESTFN) unlink(TESTFN2) @@ -1266,57 +1426,57 @@ class AbstractBadCrcTests: class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): compression = zipfile.ZIP_STORED zip_with_bad_crc = ( - b'PK\003\004\024\0\0\0\0\0 \213\212;:r' - b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' - b'ilehello,AworldP' - b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' - b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' - b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' - b'lePK\005\006\0\0\0\0\001\0\001\0003\000' - b'\0\0/\0\0\0\0\0') + b'PK\003\004\024\0\0\0\0\0 \213\212;:r' + b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' + b'ilehello,AworldP' + b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' + b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' + b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' + b'lePK\005\006\0\0\0\0\001\0\001\0003\000' + b'\0\0/\0\0\0\0\0') @requires_zlib class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): compression = zipfile.ZIP_DEFLATED zip_with_bad_crc = ( - b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' - b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' - b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' - b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' - b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' - b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' - b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' - b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') + b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' + b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' + b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' + b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' + b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' + b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') @requires_bz2 class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): compression = zipfile.ZIP_BZIP2 zip_with_bad_crc = ( - b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' - b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' - b'ileBZh91AY&SY\xd4\xa8\xca' - b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' - b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' - b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' - b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' - b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' - b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' - b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' - b'\x00\x00\x00\x00') + b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' + b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ileBZh91AY&SY\xd4\xa8\xca' + b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' + b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' + b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' + b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' + b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' + b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' + b'\x00\x00\x00\x00') @requires_lzma class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): compression = zipfile.ZIP_LZMA zip_with_bad_crc = ( - b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' - b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' - b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' - b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' - b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' - b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' - b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' - b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' - b'\x00>\x00\x00\x00\x00\x00') + b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' + b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' + b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' + b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' + b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' + b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' + b'\x00>\x00\x00\x00\x00\x00') class DecryptionTests(unittest.TestCase): @@ -1325,22 +1485,22 @@ class DecryptionTests(unittest.TestCase): ZIP file.""" data = ( - b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' - b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' - b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' - b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' - b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' - b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' - b'\x00\x00L\x00\x00\x00\x00\x00' ) + b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' + b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' + b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' + b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' + b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' + b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' + b'\x00\x00L\x00\x00\x00\x00\x00' ) data2 = ( - b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' - b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' - b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' - b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' - b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' - b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' - b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' - b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) + b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' + b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' + b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' + b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' + b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' + b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' + b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' + b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) plain = b'zipfile.py encryption test' plain2 = b'\x00'*512 @@ -1497,46 +1657,105 @@ class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, @requires_zlib class TestsWithMultipleOpens(unittest.TestCase): - def setUp(self): + @classmethod + def setUpClass(cls): + cls.data1 = b'111' + getrandbytes(10000) + cls.data2 = b'222' + getrandbytes(10000) + + def make_test_archive(self, f): # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp: - zipfp.writestr('ones', '1'*FIXEDTEST_SIZE) - zipfp.writestr('twos', '2'*FIXEDTEST_SIZE) + with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: + zipfp.writestr('ones', self.data1) + zipfp.writestr('twos', self.data2) def test_same_file(self): # Verify that (when the ZipFile is in control of creating file objects) # multiple open() calls can be made without interfering with each other. + self.make_test_archive(TESTFN2) with zipfile.ZipFile(TESTFN2, mode="r") as zipf: with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: data1 = zopen1.read(500) data2 = zopen2.read(500) - data1 += zopen1.read(500) - data2 += zopen2.read(500) + data1 += zopen1.read() + data2 += zopen2.read() self.assertEqual(data1, data2) + self.assertEqual(data1, self.data1) def test_different_file(self): # Verify that (when the ZipFile is in control of creating file objects) # multiple open() calls can be made without interfering with each other. + self.make_test_archive(TESTFN2) with zipfile.ZipFile(TESTFN2, mode="r") as zipf: with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: data1 = zopen1.read(500) data2 = zopen2.read(500) - data1 += zopen1.read(500) - data2 += zopen2.read(500) - self.assertEqual(data1, b'1'*FIXEDTEST_SIZE) - self.assertEqual(data2, b'2'*FIXEDTEST_SIZE) + data1 += zopen1.read() + data2 += zopen2.read() + self.assertEqual(data1, self.data1) + self.assertEqual(data2, self.data2) def test_interleaved(self): # Verify that (when the ZipFile is in control of creating file objects) # multiple open() calls can be made without interfering with each other. + self.make_test_archive(TESTFN2) with zipfile.ZipFile(TESTFN2, mode="r") as zipf: with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: data1 = zopen1.read(500) data2 = zopen2.read(500) - data1 += zopen1.read(500) - data2 += zopen2.read(500) - self.assertEqual(data1, b'1'*FIXEDTEST_SIZE) - self.assertEqual(data2, b'2'*FIXEDTEST_SIZE) + data1 += zopen1.read() + data2 += zopen2.read() + self.assertEqual(data1, self.data1) + self.assertEqual(data2, self.data2) + + def test_read_after_close(self): + self.make_test_archive(TESTFN2) + with contextlib.ExitStack() as stack: + with zipfile.ZipFile(TESTFN2, 'r') as zipf: + zopen1 = stack.enter_context(zipf.open('ones')) + zopen2 = stack.enter_context(zipf.open('twos')) + data1 = zopen1.read(500) + data2 = zopen2.read(500) + data1 += zopen1.read() + data2 += zopen2.read() + self.assertEqual(data1, self.data1) + self.assertEqual(data2, self.data2) + + def test_read_after_write(self): + with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: + zipf.writestr('ones', self.data1) + zipf.writestr('twos', self.data2) + with zipf.open('ones') as zopen1: + data1 = zopen1.read(500) + self.assertEqual(data1, self.data1[:500]) + with zipfile.ZipFile(TESTFN2, 'r') as zipf: + data1 = zipf.read('ones') + data2 = zipf.read('twos') + self.assertEqual(data1, self.data1) + self.assertEqual(data2, self.data2) + + def test_write_after_read(self): + with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipf: + zipf.writestr('ones', self.data1) + with zipf.open('ones') as zopen1: + zopen1.read(500) + zipf.writestr('twos', self.data2) + with zipfile.ZipFile(TESTFN2, 'r') as zipf: + data1 = zipf.read('ones') + data2 = zipf.read('twos') + self.assertEqual(data1, self.data1) + self.assertEqual(data2, self.data2) + + def test_many_opens(self): + # Verify that read() and open() promptly close the file descriptor, + # and don't rely on the garbage collector to free resources. + self.make_test_archive(TESTFN2) + with zipfile.ZipFile(TESTFN2, mode="r") as zipf: + for x in range(100): + zipf.read('ones') + with zipf.open('ones') as zopen1: + pass + with open(os.devnull) as f: + self.assertLess(f.fileno(), 100) def tearDown(self): unlink(TESTFN2) @@ -1558,14 +1777,51 @@ class TestWithDirectory(unittest.TestCase): os.mkdir(os.path.join(TESTFN2, "a")) self.test_extract_dir() - def test_store_dir(self): + def test_write_dir(self): + dirpath = os.path.join(TESTFN2, "x") + os.mkdir(dirpath) + mode = os.stat(dirpath).st_mode & 0xFFFF + with zipfile.ZipFile(TESTFN, "w") as zipf: + zipf.write(dirpath) + zinfo = zipf.filelist[0] + self.assertTrue(zinfo.filename.endswith("/x/")) + self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) + zipf.write(dirpath, "y") + zinfo = zipf.filelist[1] + self.assertTrue(zinfo.filename, "y/") + self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) + with zipfile.ZipFile(TESTFN, "r") as zipf: + zinfo = zipf.filelist[0] + self.assertTrue(zinfo.filename.endswith("/x/")) + self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) + zinfo = zipf.filelist[1] + self.assertTrue(zinfo.filename, "y/") + self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) + target = os.path.join(TESTFN2, "target") + os.mkdir(target) + zipf.extractall(target) + self.assertTrue(os.path.isdir(os.path.join(target, "y"))) + self.assertEqual(len(os.listdir(target)), 2) + + def test_writestr_dir(self): os.mkdir(os.path.join(TESTFN2, "x")) - zipf = zipfile.ZipFile(TESTFN, "w") - zipf.write(os.path.join(TESTFN2, "x"), "x") - self.assertTrue(zipf.filelist[0].filename.endswith("x/")) + with zipfile.ZipFile(TESTFN, "w") as zipf: + zipf.writestr("x/", b'') + zinfo = zipf.filelist[0] + self.assertEqual(zinfo.filename, "x/") + self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) + with zipfile.ZipFile(TESTFN, "r") as zipf: + zinfo = zipf.filelist[0] + self.assertTrue(zinfo.filename.endswith("x/")) + self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) + target = os.path.join(TESTFN2, "target") + os.mkdir(target) + zipf.extractall(target) + self.assertTrue(os.path.isdir(os.path.join(target, "x"))) + self.assertEqual(os.listdir(target), ["x"]) def tearDown(self): - shutil.rmtree(TESTFN2) + rmtree(TESTFN2) if os.path.exists(TESTFN): unlink(TESTFN) @@ -1599,7 +1855,7 @@ class AbstractUniversalNewlineTests: # Read the ZIP archive with zipfile.ZipFile(f, "r") as zipfp: for sep, fn in self.arcfiles.items(): - with zipfp.open(fn, "rU") as fp: + with openU(zipfp, fn) as fp: zipdata = fp.read() self.assertEqual(self.arcdata[sep], zipdata) @@ -1613,7 +1869,7 @@ class AbstractUniversalNewlineTests: # Read the ZIP archive with zipfile.ZipFile(f, "r") as zipfp: for sep, fn in self.arcfiles.items(): - with zipfp.open(fn, "rU") as zipopen: + with openU(zipfp, fn) as zipopen: data = b'' while True: read = zipopen.readline() @@ -1638,7 +1894,7 @@ class AbstractUniversalNewlineTests: # Read the ZIP archive with zipfile.ZipFile(f, "r") as zipfp: for sep, fn in self.arcfiles.items(): - with zipfp.open(fn, "rU") as zipopen: + with openU(zipfp, fn) as zipopen: for line in self.line_gen: linedata = zipopen.readline() self.assertEqual(linedata, line + b'\n') @@ -1653,7 +1909,7 @@ class AbstractUniversalNewlineTests: # Read the ZIP archive with zipfile.ZipFile(f, "r") as zipfp: for sep, fn in self.arcfiles.items(): - with zipfp.open(fn, "rU") as fp: + with openU(zipfp, fn) as fp: ziplines = fp.readlines() for line, zipline in zip(self.line_gen, ziplines): self.assertEqual(zipline, line + b'\n') @@ -1668,7 +1924,7 @@ class AbstractUniversalNewlineTests: # Read the ZIP archive with zipfile.ZipFile(f, "r") as zipfp: for sep, fn in self.arcfiles.items(): - with zipfp.open(fn, "rU") as fp: + with openU(zipfp, fn) as fp: for line, zipline in zip(self.line_gen, fp): self.assertEqual(zipline, line + b'\n') @@ -1678,7 +1934,7 @@ class AbstractUniversalNewlineTests: def tearDown(self): for sep, fn in self.arcfiles.items(): - os.remove(fn) + unlink(fn) unlink(TESTFN) unlink(TESTFN2) @@ -1702,6 +1958,5 @@ class LzmaUniversalNewlineTests(AbstractUniversalNewlineTests, unittest.TestCase): compression = zipfile.ZIP_LZMA - if __name__ == "__main__": unittest.main() |
