From fa6bc29987ca0df75a31615b0468d657ff41a747 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Mon, 22 Jul 2013 21:00:11 +0300 Subject: Issue #17944: test_zipfile now discoverable and uses subclassing to generate tests for different compression types. Fixed a bug with skipping some tests due to use of exhausted iterators. --- Lib/test/test_zipfile.py | 1555 +++++++++++++++++++--------------------------- Misc/NEWS | 4 + 2 files changed, 638 insertions(+), 921 deletions(-) diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py index 5e837cd..fb866d8 100644 --- a/Lib/test/test_zipfile.py +++ b/Lib/test/test_zipfile.py @@ -10,10 +10,9 @@ import unittest from tempfile import TemporaryFile -from random import randint, random -from unittest import skipUnless +from random import randint, random, getrandbits -from test.support import (TESTFN, run_unittest, findfile, unlink, +from test.support import (TESTFN, findfile, unlink, requires_zlib, requires_bz2, requires_lzma, captured_stdout) @@ -27,14 +26,24 @@ SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] +def get_files(test): + yield TESTFN2 + with TemporaryFile() as f: + yield f + test.assertFalse(f.closed) + with io.BytesIO() as f: + yield f + test.assertFalse(f.closed) + +class AbstractTestsWithSourceFile: + @classmethod + def setUpClass(cls): + cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % + (i, random()), "ascii") + for i in range(FIXEDTEST_SIZE)] + cls.data = b''.join(cls.line_gen) -class TestsWithSourceFile(unittest.TestCase): def setUp(self): - self.line_gen = (bytes("Zipfile test line %d. random float: %f" % - (i, random()), "ascii") - for i in range(FIXEDTEST_SIZE)) - self.data = b'\n'.join(self.line_gen) + b'\n' - # Make a source file with some lines with open(TESTFN, "wb") as fp: fp.write(self.data) @@ -97,12 +106,10 @@ class TestsWithSourceFile(unittest.TestCase): # Check that testzip doesn't raise an exception zipfp.testzip() - if not isinstance(f, str): - f.close() - def test_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_STORED) + def test_basic(self): + for f in get_files(self): + self.zip_test(f, self.compression) def zip_open_test(self, f, compression): self.make_test_archive(f, compression) @@ -127,30 +134,10 @@ class TestsWithSourceFile(unittest.TestCase): self.assertEqual(b''.join(zipdata1), self.data) self.assertEqual(b''.join(zipdata2), self.data) - if not isinstance(f, str): - f.close() - - def test_open_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_STORED) - - def test_open_via_zip_info(self): - # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: - zipfp.writestr("name", "foo") - zipfp.writestr("name", "bar") - with zipfile.ZipFile(TESTFN2, "r") as zipfp: - infos = zipfp.infolist() - data = b"" - for info in infos: - with zipfp.open(info) as zipopen: - data += zipopen.read() - self.assertTrue(data == b"foobar" or data == b"barfoo") - data = b"" - for info in infos: - data += zipfp.read(info) - self.assertTrue(data == b"foobar" or data == b"barfoo") + def test_open(self): + for f in get_files(self): + self.zip_open_test(f, self.compression) def zip_random_open_test(self, f, compression): self.make_test_archive(f, compression) @@ -166,36 +153,17 @@ class TestsWithSourceFile(unittest.TestCase): zipdata1.append(read_data) self.assertEqual(b''.join(zipdata1), self.data) - if not isinstance(f, str): - f.close() - - def test_random_open_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_STORED) - - def test_univeral_readaheads(self): - f = io.BytesIO() - - data = b'a\r\n' * 16 * 1024 - zipfp = zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) - zipfp.writestr(TESTFN, data) - zipfp.close() - - data2 = b'' - zipfp = zipfile.ZipFile(f, 'r') - with zipfp.open(TESTFN, 'rU') as zipopen: - for line in zipopen: - data2 += line - zipfp.close() - self.assertEqual(data, data2.replace(b'\n', b'\r\n')) + def test_random_open(self): + for f in get_files(self): + self.zip_random_open_test(f, self.compression) def zip_readline_read_test(self, f, compression): self.make_test_archive(f, compression) # Read the ZIP archive - zipfp = zipfile.ZipFile(f, "r") - with zipfp.open(TESTFN) as zipopen: + with zipfile.ZipFile(f, "r") as zipfp, \ + zipfp.open(TESTFN) as zipopen: data = b'' while True: read = zipopen.readline() @@ -209,9 +177,11 @@ class TestsWithSourceFile(unittest.TestCase): data += read self.assertEqual(data, self.data) - zipfp.close() - if not isinstance(f, str): - f.close() + + def test_readline_read(self): + # Issue #7610: calls to readline() interleaved with calls to read(). + for f in get_files(self): + self.zip_readline_read_test(f, self.compression) def zip_readline_test(self, f, compression): self.make_test_archive(f, compression) @@ -221,9 +191,11 @@ class TestsWithSourceFile(unittest.TestCase): with zipfp.open(TESTFN) as zipopen: for line in self.line_gen: linedata = zipopen.readline() - self.assertEqual(linedata, line + '\n') - if not isinstance(f, str): - f.close() + self.assertEqual(linedata, line) + + def test_readline(self): + for f in get_files(self): + self.zip_readline_test(f, self.compression) def zip_readlines_test(self, f, compression): self.make_test_archive(f, compression) @@ -233,9 +205,11 @@ class TestsWithSourceFile(unittest.TestCase): with zipfp.open(TESTFN) as zipopen: ziplines = zipopen.readlines() for line, zipline in zip(self.line_gen, ziplines): - self.assertEqual(zipline, line + '\n') - if not isinstance(f, str): - f.close() + self.assertEqual(zipline, line) + + def test_readlines(self): + for f in get_files(self): + self.zip_readlines_test(f, self.compression) def zip_iterlines_test(self, f, compression): self.make_test_archive(f, compression) @@ -244,173 +218,64 @@ class TestsWithSourceFile(unittest.TestCase): with zipfile.ZipFile(f, "r") as zipfp: with zipfp.open(TESTFN) as zipopen: for line, zipline in zip(self.line_gen, zipopen): - self.assertEqual(zipline, line + '\n') - if not isinstance(f, str): - f.close() - - def test_readline_read_stored(self): - # Issue #7610: calls to readline() interleaved with calls to read(). - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_read_test(f, zipfile.ZIP_STORED) - - def test_readline_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_test(f, zipfile.ZIP_STORED) - - def test_readlines_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readlines_test(f, zipfile.ZIP_STORED) - - def test_iterlines_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_iterlines_test(f, zipfile.ZIP_STORED) - - @requires_zlib - def test_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_DEFLATED) - - - @requires_zlib - def test_open_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_random_open_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_readline_read_deflated(self): - # Issue #7610: calls to readline() interleaved with calls to read(). - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_readline_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_test(f, zipfile.ZIP_DEFLATED) + self.assertEqual(zipline, line) - @requires_zlib - def test_readlines_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readlines_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_iterlines_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED) + def test_iterlines(self): + for f in get_files(self): + self.zip_iterlines_test(f, self.compression) - @requires_zlib def test_low_compression(self): """Check for cases where compressed data is larger than original.""" # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp: + with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: zipfp.writestr("strfile", '12') # Get an open object for strfile - with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp: + with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: with zipfp.open("strfile") as openobj: self.assertEqual(openobj.read(1), b'1') self.assertEqual(openobj.read(1), b'2') - @requires_bz2 - def test_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_open_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_random_open_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_readline_read_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_read_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_readline_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_readlines_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readlines_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_iterlines_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_iterlines_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_low_compression_bzip2(self): - """Check for cases where compressed data is larger than original.""" - # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_BZIP2) as zipfp: - zipfp.writestr("strfile", '12') + def test_writestr_compression(self): + zipfp = zipfile.ZipFile(TESTFN2, "w") + zipfp.writestr("b.txt", "hello world", compress_type=self.compression) + info = zipfp.getinfo('b.txt') + self.assertEqual(info.compress_type, self.compression) - # Get an open object for strfile - with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_BZIP2) as zipfp: - with zipfp.open("strfile") as openobj: - self.assertEqual(openobj.read(1), b'1') - self.assertEqual(openobj.read(1), b'2') + def test_read_return_size(self): + # Issue #9837: ZipExtFile.read() shouldn't return more bytes + # than requested. + for test_size in (1, 4095, 4096, 4097, 16384): + file_size = test_size + 1 + junk = getrandbits(8 * file_size).to_bytes(file_size, 'little') + with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: + zipf.writestr('foo', junk) + with zipf.open('foo', 'r') as fp: + buf = fp.read(test_size) + self.assertEqual(len(buf), test_size) - @requires_lzma - def test_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_LZMA) + def tearDown(self): + unlink(TESTFN) + unlink(TESTFN2) - @requires_lzma - def test_open_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_LZMA) - @requires_lzma - def test_random_open_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_LZMA) +class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, + unittest.TestCase): + compression = zipfile.ZIP_STORED + test_low_compression = None - @requires_lzma - def test_readline_read_lzma(self): - # Issue #7610: calls to readline() interleaved with calls to read(). - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_read_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_readline_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_readlines_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readlines_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_iterlines_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_iterlines_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_low_compression_lzma(self): - """Check for cases where compressed data is larger than original.""" - # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_LZMA) as zipfp: - zipfp.writestr("strfile", '12') + def zip_test_writestr_permissions(self, f, compression): + # Make sure that writestr creates files with mode 0600, + # when it is passed a name rather than a ZipInfo instance. - # Get an open object for strfile - with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_LZMA) as zipfp: - with zipfp.open("strfile") as openobj: - self.assertEqual(openobj.read(1), b'1') - self.assertEqual(openobj.read(1), b'2') + self.make_test_archive(f, compression) + with zipfile.ZipFile(f, "r") as zipfp: + zinfo = zipfp.getinfo('strfile') + self.assertEqual(zinfo.external_attr, 0o600 << 16) + + def test_writestr_permissions(self): + for f in get_files(self): + self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) def test_absolute_arcnames(self): with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: @@ -470,7 +335,26 @@ class TestsWithSourceFile(unittest.TestCase): with open(TESTFN, "rb") as f: self.assertEqual(zipfp.read(TESTFN), f.read()) - @requires_zlib + def test_write_to_readonly(self): + """Check that trying to call write() on a readonly ZipFile object + raises a RuntimeError.""" + with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: + zipfp.writestr("somefile.txt", "bogus") + + with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: + self.assertRaises(RuntimeError, zipfp.write, TESTFN) + + def test_add_file_before_1980(self): + # Set atime and mtime to 1970-01-01 + os.utime(TESTFN, (0, 0)) + with zipfile.ZipFile(TESTFN2, "w") as zipfp: + self.assertRaises(ValueError, zipfp.write, TESTFN) + +@requires_zlib +class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, + unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + def test_per_file_compression(self): """Check that files within a Zip archive can have different compression options.""" @@ -482,274 +366,105 @@ class TestsWithSourceFile(unittest.TestCase): self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) - def test_write_to_readonly(self): - """Check that trying to call write() on a readonly ZipFile object - raises a RuntimeError.""" - with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: - zipfp.writestr("somefile.txt", "bogus") +@requires_bz2 +class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, + unittest.TestCase): + compression = zipfile.ZIP_BZIP2 - with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: - self.assertRaises(RuntimeError, zipfp.write, TESTFN) +@requires_lzma +class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, + unittest.TestCase): + compression = zipfile.ZIP_LZMA - def test_extract(self): - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: - for fpath, fdata in SMALL_TEST_DATA: - zipfp.writestr(fpath, fdata) - with zipfile.ZipFile(TESTFN2, "r") as zipfp: - for fpath, fdata in SMALL_TEST_DATA: - writtenfile = zipfp.extract(fpath) +class AbstractTestZip64InSmallFiles: + # These tests test the ZIP64 functionality without using large files, + # see test_zipfile64 for proper tests. - # make sure it was written to the right place - correctfile = os.path.join(os.getcwd(), fpath) - correctfile = os.path.normpath(correctfile) + @classmethod + def setUpClass(cls): + line_gen = (bytes("Test of zipfile line %d." % i, "ascii") + for i in range(0, FIXEDTEST_SIZE)) + cls.data = b'\n'.join(line_gen) - self.assertEqual(writtenfile, correctfile) + def setUp(self): + self._limit = zipfile.ZIP64_LIMIT + zipfile.ZIP64_LIMIT = 5 - # make sure correct data is in correct file - with open(writtenfile, "rb") as f: - self.assertEqual(fdata.encode(), f.read()) + # Make a source file with some lines + with open(TESTFN, "wb") as fp: + fp.write(self.data) - os.remove(writtenfile) + def zip_test(self, f, compression): + # Create the ZIP archive + with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: + zipfp.write(TESTFN, "another.name") + zipfp.write(TESTFN, TESTFN) + zipfp.writestr("strfile", self.data) - # remove the test file subdirectories - shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) + # Read the ZIP archive + with zipfile.ZipFile(f, "r", compression) as zipfp: + self.assertEqual(zipfp.read(TESTFN), self.data) + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) - def test_extract_all(self): - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: - for fpath, fdata in SMALL_TEST_DATA: - zipfp.writestr(fpath, fdata) + # Print the ZIP directory + fp = io.StringIO() + zipfp.printdir(fp) - with zipfile.ZipFile(TESTFN2, "r") as zipfp: - zipfp.extractall() - for fpath, fdata in SMALL_TEST_DATA: - outfile = os.path.join(os.getcwd(), fpath) + directory = fp.getvalue() + lines = directory.splitlines() + self.assertEqual(len(lines), 4) # Number of files + header - with open(outfile, "rb") as f: - self.assertEqual(fdata.encode(), f.read()) + self.assertIn('File Name', lines[0]) + self.assertIn('Modified', lines[0]) + self.assertIn('Size', lines[0]) - os.remove(outfile) + fn, date, time_, size = lines[1].split() + self.assertEqual(fn, 'another.name') + self.assertTrue(time.strptime(date, '%Y-%m-%d')) + self.assertTrue(time.strptime(time_, '%H:%M:%S')) + self.assertEqual(size, str(len(self.data))) - # remove the test file subdirectories - shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) + # Check the namelist + names = zipfp.namelist() + self.assertEqual(len(names), 3) + self.assertIn(TESTFN, names) + self.assertIn("another.name", names) + self.assertIn("strfile", names) - def check_file(self, filename, content): - self.assertTrue(os.path.isfile(filename)) - with open(filename, 'rb') as f: - self.assertEqual(f.read(), content) + # Check infolist + infos = zipfp.infolist() + names = [i.filename for i in infos] + self.assertEqual(len(names), 3) + self.assertIn(TESTFN, names) + self.assertIn("another.name", names) + self.assertIn("strfile", names) + for i in infos: + self.assertEqual(i.file_size, len(self.data)) - def test_sanitize_windows_name(self): - san = zipfile.ZipFile._sanitize_windows_name - # Passing pathsep in allows this test to work regardless of platform. - self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') - self.assertEqual(san(r'a\b,ce|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') - self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') + # check getinfo + for nm in (TESTFN, "another.name", "strfile"): + info = zipfp.getinfo(nm) + self.assertEqual(info.filename, nm) + self.assertEqual(info.file_size, len(self.data)) - def test_extract_hackers_arcnames_common_cases(self): - common_hacknames = [ - ('../foo/bar', 'foo/bar'), - ('foo/../bar', 'foo/bar'), - ('foo/../../bar', 'foo/bar'), - ('foo/bar/..', 'foo/bar'), - ('./../foo/bar', 'foo/bar'), - ('/foo/bar', 'foo/bar'), - ('/foo/../bar', 'foo/bar'), - ('/foo/../../bar', 'foo/bar'), - ] - self._test_extract_hackers_arcnames(common_hacknames) + # Check that testzip doesn't raise an exception + zipfp.testzip() - @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') - def test_extract_hackers_arcnames_windows_only(self): - """Test combination of path fixing and windows name sanitization.""" - windows_hacknames = [ - (r'..\foo\bar', 'foo/bar'), - (r'..\/foo\/bar', 'foo/bar'), - (r'foo/\..\/bar', 'foo/bar'), - (r'foo\/../\bar', 'foo/bar'), - (r'C:foo/bar', 'foo/bar'), - (r'C:/foo/bar', 'foo/bar'), - (r'C://foo/bar', 'foo/bar'), - (r'C:\foo\bar', 'foo/bar'), - (r'//conky/mountpoint/foo/bar', 'foo/bar'), - (r'\\conky\mountpoint\foo\bar', 'foo/bar'), - (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), - (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), - (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), - (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), - (r'//?/C:/foo/bar', 'foo/bar'), - (r'\\?\C:\foo\bar', 'foo/bar'), - (r'C:/../C:/foo/bar', 'C_/foo/bar'), - (r'a:b\ce|f"g?h*i', 'b/c_d_e_f_g_h_i'), - ('../../foo../../ba..r', 'foo/ba..r'), - ] - self._test_extract_hackers_arcnames(windows_hacknames) - - @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') - def test_extract_hackers_arcnames_posix_only(self): - posix_hacknames = [ - ('//foo/bar', 'foo/bar'), - ('../../foo../../ba..r', 'foo../ba..r'), - (r'foo/..\bar', r'foo/..\bar'), - ] - self._test_extract_hackers_arcnames(posix_hacknames) - - def _test_extract_hackers_arcnames(self, hacknames): - for arcname, fixedname in hacknames: - content = b'foobar' + arcname.encode() - with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: - zinfo = zipfile.ZipInfo() - # preserve backslashes - zinfo.filename = arcname - zinfo.external_attr = 0o600 << 16 - zipfp.writestr(zinfo, content) - - arcname = arcname.replace(os.sep, "/") - targetpath = os.path.join('target', 'subdir', 'subsub') - correctfile = os.path.join(targetpath, *fixedname.split('/')) - - with zipfile.ZipFile(TESTFN2, 'r') as zipfp: - writtenfile = zipfp.extract(arcname, targetpath) - self.assertEqual(writtenfile, correctfile, - msg='extract %r: %r != %r' % - (arcname, writtenfile, correctfile)) - self.check_file(correctfile, content) - shutil.rmtree('target') - - with zipfile.ZipFile(TESTFN2, 'r') as zipfp: - zipfp.extractall(targetpath) - self.check_file(correctfile, content) - shutil.rmtree('target') - - correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) - - with zipfile.ZipFile(TESTFN2, 'r') as zipfp: - writtenfile = zipfp.extract(arcname) - self.assertEqual(writtenfile, correctfile, - msg="extract %r" % arcname) - self.check_file(correctfile, content) - shutil.rmtree(fixedname.split('/')[0]) - - with zipfile.ZipFile(TESTFN2, 'r') as zipfp: - zipfp.extractall() - self.check_file(correctfile, content) - shutil.rmtree(fixedname.split('/')[0]) - - os.remove(TESTFN2) - - def test_writestr_compression_stored(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED) - info = zipfp.getinfo('a.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_STORED) - - @requires_zlib - def test_writestr_compression_deflated(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED) - info = zipfp.getinfo('b.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_writestr_compression_bzip2(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_BZIP2) - info = zipfp.getinfo('b.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_BZIP2) - - @requires_lzma - def test_writestr_compression_lzma(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_LZMA) - info = zipfp.getinfo('b.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_LZMA) - - def zip_test_writestr_permissions(self, f, compression): - # Make sure that writestr creates files with mode 0600, - # when it is passed a name rather than a ZipInfo instance. - - self.make_test_archive(f, compression) - with zipfile.ZipFile(f, "r") as zipfp: - zinfo = zipfp.getinfo('strfile') - self.assertEqual(zinfo.external_attr, 0o600 << 16) - if not isinstance(f, str): - f.close() - - def test_writestr_permissions(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) - - def test_writestr_extended_local_header_issue1202(self): - with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: - for data in 'abcdefghijklmnop': - zinfo = zipfile.ZipInfo(data) - zinfo.flag_bits |= 0x08 # Include an extended local header. - orig_zip.writestr(zinfo, data) - - def test_close(self): - """Check that the zipfile is closed after the 'with' block.""" - with zipfile.ZipFile(TESTFN2, "w") as zipfp: - for fpath, fdata in SMALL_TEST_DATA: - zipfp.writestr(fpath, fdata) - self.assertTrue(zipfp.fp is not None, 'zipfp is not open') - self.assertTrue(zipfp.fp is None, 'zipfp is not closed') - - with zipfile.ZipFile(TESTFN2, "r") as zipfp: - self.assertTrue(zipfp.fp is not None, 'zipfp is not open') - self.assertTrue(zipfp.fp is None, 'zipfp is not closed') - - def test_close_on_exception(self): - """Check that the zipfile is closed if an exception is raised in the - 'with' block.""" - with zipfile.ZipFile(TESTFN2, "w") as zipfp: - for fpath, fdata in SMALL_TEST_DATA: - zipfp.writestr(fpath, fdata) - - try: - with zipfile.ZipFile(TESTFN2, "r") as zipfp2: - raise zipfile.BadZipFile() - except zipfile.BadZipFile: - self.assertTrue(zipfp2.fp is None, 'zipfp is not closed') - - def test_add_file_before_1980(self): - # Set atime and mtime to 1970-01-01 - os.utime(TESTFN, (0, 0)) - with zipfile.ZipFile(TESTFN2, "w") as zipfp: - self.assertRaises(ValueError, zipfp.write, TESTFN) - - - - - - - - @requires_zlib - def test_unicode_filenames(self): - # bug #10801 - fname = findfile('zip_cp437_header.zip') - with zipfile.ZipFile(fname) as zipfp: - for name in zipfp.namelist(): - zipfp.open(name).close() + def test_basic(self): + for f in get_files(self): + self.zip_test(f, self.compression) def tearDown(self): + zipfile.ZIP64_LIMIT = self._limit unlink(TESTFN) unlink(TESTFN2) -class TestZip64InSmallFiles(unittest.TestCase): - # These tests test the ZIP64 functionality without using large files, - # see test_zipfile64 for proper tests. - - def setUp(self): - self._limit = zipfile.ZIP64_LIMIT - zipfile.ZIP64_LIMIT = 5 - - line_gen = (bytes("Test of zipfile line %d." % i, "ascii") - for i in range(0, FIXEDTEST_SIZE)) - self.data = b'\n'.join(line_gen) - - # Make a source file with some lines - with open(TESTFN, "wb") as fp: - fp.write(self.data) +class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, + unittest.TestCase): + compression = zipfile.ZIP_STORED def large_file_exception_test(self, f, compression): with zipfile.ZipFile(f, "w", compression) as zipfp: @@ -760,92 +475,12 @@ class TestZip64InSmallFiles(unittest.TestCase): with zipfile.ZipFile(f, "w", compression) as zipfp: self.assertRaises(zipfile.LargeZipFile, zipfp.writestr, "another.name", self.data) - if not isinstance(f, str): - f.close() def test_large_file_exception(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + for f in get_files(self): self.large_file_exception_test(f, zipfile.ZIP_STORED) self.large_file_exception_test2(f, zipfile.ZIP_STORED) - def zip_test(self, f, compression): - # Create the ZIP archive - with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: - zipfp.write(TESTFN, "another.name") - zipfp.write(TESTFN, TESTFN) - zipfp.writestr("strfile", self.data) - - # Read the ZIP archive - with zipfile.ZipFile(f, "r", compression) as zipfp: - self.assertEqual(zipfp.read(TESTFN), self.data) - self.assertEqual(zipfp.read("another.name"), self.data) - self.assertEqual(zipfp.read("strfile"), self.data) - - # Print the ZIP directory - fp = io.StringIO() - zipfp.printdir(fp) - - directory = fp.getvalue() - lines = directory.splitlines() - self.assertEqual(len(lines), 4) # Number of files + header - - self.assertIn('File Name', lines[0]) - self.assertIn('Modified', lines[0]) - self.assertIn('Size', lines[0]) - - fn, date, time_, size = lines[1].split() - self.assertEqual(fn, 'another.name') - self.assertTrue(time.strptime(date, '%Y-%m-%d')) - self.assertTrue(time.strptime(time_, '%H:%M:%S')) - self.assertEqual(size, str(len(self.data))) - - # Check the namelist - names = zipfp.namelist() - self.assertEqual(len(names), 3) - self.assertIn(TESTFN, names) - self.assertIn("another.name", names) - self.assertIn("strfile", names) - - # Check infolist - infos = zipfp.infolist() - names = [i.filename for i in infos] - self.assertEqual(len(names), 3) - self.assertIn(TESTFN, names) - self.assertIn("another.name", names) - self.assertIn("strfile", names) - for i in infos: - self.assertEqual(i.file_size, len(self.data)) - - # check getinfo - for nm in (TESTFN, "another.name", "strfile"): - info = zipfp.getinfo(nm) - self.assertEqual(info.filename, nm) - self.assertEqual(info.file_size, len(self.data)) - - # Check that testzip doesn't raise an exception - zipfp.testzip() - if not isinstance(f, str): - f.close() - - def test_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_STORED) - - @requires_zlib - def test_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_BZIP2) - - @requires_lzma - def test_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_LZMA) - def test_absolute_arcnames(self): with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, allowZip64=True) as zipfp: @@ -854,13 +489,27 @@ class TestZip64InSmallFiles(unittest.TestCase): with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: self.assertEqual(zipfp.namelist(), ["absolute"]) - def tearDown(self): - zipfile.ZIP64_LIMIT = self._limit - unlink(TESTFN) - unlink(TESTFN2) +@requires_zlib +class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, + unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + +@requires_bz2 +class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, + unittest.TestCase): + compression = zipfile.ZIP_BZIP2 + +@requires_lzma +class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, + unittest.TestCase): + compression = zipfile.ZIP_LZMA class PyZipFileTests(unittest.TestCase): + def assertCompiledIn(self, name, namelist): + if name + 'o' not in namelist: + self.assertIn(name + 'c', namelist) + def test_write_pyfile(self): with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: fn = __file__ @@ -877,8 +526,7 @@ class PyZipFileTests(unittest.TestCase): bn = os.path.basename(fn) self.assertNotIn(bn, zipfp.namelist()) - self.assertTrue(bn + 'o' in zipfp.namelist() or - bn + 'c' in zipfp.namelist()) + self.assertCompiledIn(bn, zipfp.namelist()) with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: fn = __file__ @@ -889,8 +537,7 @@ class PyZipFileTests(unittest.TestCase): bn = "%s/%s" % ("testpackage", os.path.basename(fn)) self.assertNotIn(bn, zipfp.namelist()) - self.assertTrue(bn + 'o' in zipfp.namelist() or - bn + 'c' in zipfp.namelist()) + self.assertCompiledIn(bn, zipfp.namelist()) def test_write_python_package(self): import email @@ -902,10 +549,8 @@ class PyZipFileTests(unittest.TestCase): # Check for a couple of modules at different levels of the # hierarchy names = zipfp.namelist() - self.assertTrue('email/__init__.pyo' in names or - 'email/__init__.pyc' in names) - self.assertTrue('email/mime/text.pyo' in names or - 'email/mime/text.pyc' in names) + self.assertCompiledIn('email/__init__.py', names) + self.assertCompiledIn('email/mime/text.py', names) def test_write_with_optimization(self): import email @@ -938,87 +583,257 @@ class PyZipFileTests(unittest.TestCase): with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: zipfp.writepy(TESTFN2) - names = zipfp.namelist() - self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names) - self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names) - self.assertNotIn('mod2.txt', names) + names = zipfp.namelist() + self.assertCompiledIn('mod1.py', names) + self.assertCompiledIn('mod2.py', names) + self.assertNotIn('mod2.txt', names) + + finally: + shutil.rmtree(TESTFN2) + + def test_write_non_pyfile(self): + with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: + with open(TESTFN, 'w') as f: + f.write('most definitely not a python file') + self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) + os.remove(TESTFN) + + def test_write_pyfile_bad_syntax(self): + os.mkdir(TESTFN2) + try: + with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: + fp.write("Bad syntax in python file\n") + + with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: + # syntax errors are printed to stdout + with captured_stdout() as s: + zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) + + self.assertIn("SyntaxError", s.getvalue()) + + # as it will not have compiled the python file, it will + # include the .py file not .pyc or .pyo + names = zipfp.namelist() + self.assertIn('mod1.py', names) + self.assertNotIn('mod1.pyc', names) + self.assertNotIn('mod1.pyo', names) + + finally: + shutil.rmtree(TESTFN2) + + +class ExtractTests(unittest.TestCase): + def test_extract(self): + with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: + for fpath, fdata in SMALL_TEST_DATA: + zipfp.writestr(fpath, fdata) + + with zipfile.ZipFile(TESTFN2, "r") as zipfp: + for fpath, fdata in SMALL_TEST_DATA: + writtenfile = zipfp.extract(fpath) + + # make sure it was written to the right place + correctfile = os.path.join(os.getcwd(), fpath) + correctfile = os.path.normpath(correctfile) + + self.assertEqual(writtenfile, correctfile) + + # make sure correct data is in correct file + with open(writtenfile, "rb") as f: + self.assertEqual(fdata.encode(), f.read()) + + os.remove(writtenfile) + + # remove the test file subdirectories + shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) + + def test_extract_all(self): + with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: + for fpath, fdata in SMALL_TEST_DATA: + zipfp.writestr(fpath, fdata) + + with zipfile.ZipFile(TESTFN2, "r") as zipfp: + zipfp.extractall() + for fpath, fdata in SMALL_TEST_DATA: + outfile = os.path.join(os.getcwd(), fpath) + + with open(outfile, "rb") as f: + self.assertEqual(fdata.encode(), f.read()) + + os.remove(outfile) + + # remove the test file subdirectories + shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) + + def check_file(self, filename, content): + self.assertTrue(os.path.isfile(filename)) + with open(filename, 'rb') as f: + self.assertEqual(f.read(), content) + + def test_sanitize_windows_name(self): + san = zipfile.ZipFile._sanitize_windows_name + # Passing pathsep in allows this test to work regardless of platform. + self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') + self.assertEqual(san(r'a\b,ce|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') + self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') + + def test_extract_hackers_arcnames_common_cases(self): + common_hacknames = [ + ('../foo/bar', 'foo/bar'), + ('foo/../bar', 'foo/bar'), + ('foo/../../bar', 'foo/bar'), + ('foo/bar/..', 'foo/bar'), + ('./../foo/bar', 'foo/bar'), + ('/foo/bar', 'foo/bar'), + ('/foo/../bar', 'foo/bar'), + ('/foo/../../bar', 'foo/bar'), + ] + self._test_extract_hackers_arcnames(common_hacknames) + + @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') + def test_extract_hackers_arcnames_windows_only(self): + """Test combination of path fixing and windows name sanitization.""" + windows_hacknames = [ + (r'..\foo\bar', 'foo/bar'), + (r'..\/foo\/bar', 'foo/bar'), + (r'foo/\..\/bar', 'foo/bar'), + (r'foo\/../\bar', 'foo/bar'), + (r'C:foo/bar', 'foo/bar'), + (r'C:/foo/bar', 'foo/bar'), + (r'C://foo/bar', 'foo/bar'), + (r'C:\foo\bar', 'foo/bar'), + (r'//conky/mountpoint/foo/bar', 'foo/bar'), + (r'\\conky\mountpoint\foo\bar', 'foo/bar'), + (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), + (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), + (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), + (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), + (r'//?/C:/foo/bar', 'foo/bar'), + (r'\\?\C:\foo\bar', 'foo/bar'), + (r'C:/../C:/foo/bar', 'C_/foo/bar'), + (r'a:b\ce|f"g?h*i', 'b/c_d_e_f_g_h_i'), + ('../../foo../../ba..r', 'foo/ba..r'), + ] + self._test_extract_hackers_arcnames(windows_hacknames) + + @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') + def test_extract_hackers_arcnames_posix_only(self): + posix_hacknames = [ + ('//foo/bar', 'foo/bar'), + ('../../foo../../ba..r', 'foo../ba..r'), + (r'foo/..\bar', r'foo/..\bar'), + ] + self._test_extract_hackers_arcnames(posix_hacknames) + + def _test_extract_hackers_arcnames(self, hacknames): + for arcname, fixedname in hacknames: + content = b'foobar' + arcname.encode() + with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: + zinfo = zipfile.ZipInfo() + # preserve backslashes + zinfo.filename = arcname + zinfo.external_attr = 0o600 << 16 + zipfp.writestr(zinfo, content) + + arcname = arcname.replace(os.sep, "/") + targetpath = os.path.join('target', 'subdir', 'subsub') + correctfile = os.path.join(targetpath, *fixedname.split('/')) + + with zipfile.ZipFile(TESTFN2, 'r') as zipfp: + writtenfile = zipfp.extract(arcname, targetpath) + self.assertEqual(writtenfile, correctfile, + msg='extract %r: %r != %r' % + (arcname, writtenfile, correctfile)) + self.check_file(correctfile, content) + shutil.rmtree('target') + + with zipfile.ZipFile(TESTFN2, 'r') as zipfp: + zipfp.extractall(targetpath) + self.check_file(correctfile, content) + shutil.rmtree('target') + + correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) + + with zipfile.ZipFile(TESTFN2, 'r') as zipfp: + writtenfile = zipfp.extract(arcname) + self.assertEqual(writtenfile, correctfile, + msg="extract %r" % arcname) + self.check_file(correctfile, content) + shutil.rmtree(fixedname.split('/')[0]) + + with zipfile.ZipFile(TESTFN2, 'r') as zipfp: + zipfp.extractall() + self.check_file(correctfile, content) + shutil.rmtree(fixedname.split('/')[0]) + + os.remove(TESTFN2) + + +class OtherTests(unittest.TestCase): + def test_open_via_zip_info(self): + # Create the ZIP archive + with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: + zipfp.writestr("name", "foo") + zipfp.writestr("name", "bar") + + with zipfile.ZipFile(TESTFN2, "r") as zipfp: + infos = zipfp.infolist() + data = b"" + for info in infos: + with zipfp.open(info) as zipopen: + data += zipopen.read() + self.assertIn(data, {b"foobar", b"barfoo"}) + data = b"" + for info in infos: + data += zipfp.read(info) + self.assertIn(data, {b"foobar", b"barfoo"}) + + def test_universal_readaheads(self): + f = io.BytesIO() - finally: - shutil.rmtree(TESTFN2) + data = b'a\r\n' * 16 * 1024 + with zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) as zipfp: + zipfp.writestr(TESTFN, data) - def test_write_non_pyfile(self): - with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: - with open(TESTFN, 'w') as f: - f.write('most definitely not a python file') - self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) - os.remove(TESTFN) + data2 = b'' + with zipfile.ZipFile(f, 'r') as zipfp, \ + zipfp.open(TESTFN, 'rU') as zipopen: + for line in zipopen: + data2 += line - def test_write_pyfile_bad_syntax(self): - os.mkdir(TESTFN2) - try: - with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: - fp.write("Bad syntax in python file\n") + self.assertEqual(data, data2.replace(b'\n', b'\r\n')) - with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: - # syntax errors are printed to stdout - with captured_stdout() as s: - zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) + def test_writestr_extended_local_header_issue1202(self): + with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: + for data in 'abcdefghijklmnop': + zinfo = zipfile.ZipInfo(data) + zinfo.flag_bits |= 0x08 # Include an extended local header. + orig_zip.writestr(zinfo, data) - self.assertIn("SyntaxError", s.getvalue()) + def test_close(self): + """Check that the zipfile is closed after the 'with' block.""" + with zipfile.ZipFile(TESTFN2, "w") as zipfp: + for fpath, fdata in SMALL_TEST_DATA: + zipfp.writestr(fpath, fdata) + self.assertIsNotNone(zipfp.fp, 'zipfp is not open') + self.assertIsNone(zipfp.fp, 'zipfp is not closed') - # as it will not have compiled the python file, it will - # include the .py file not .pyc or .pyo - names = zipfp.namelist() - self.assertIn('mod1.py', names) - self.assertNotIn('mod1.pyc', names) - self.assertNotIn('mod1.pyo', names) + with zipfile.ZipFile(TESTFN2, "r") as zipfp: + self.assertIsNotNone(zipfp.fp, 'zipfp is not open') + self.assertIsNone(zipfp.fp, 'zipfp is not closed') - finally: - shutil.rmtree(TESTFN2) + def test_close_on_exception(self): + """Check that the zipfile is closed if an exception is raised in the + 'with' block.""" + with zipfile.ZipFile(TESTFN2, "w") as zipfp: + for fpath, fdata in SMALL_TEST_DATA: + zipfp.writestr(fpath, fdata) -class OtherTests(unittest.TestCase): - zips_with_bad_crc = { - zipfile.ZIP_STORED: ( - b'PK\003\004\024\0\0\0\0\0 \213\212;:r' - b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' - b'ilehello,AworldP' - b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' - b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' - b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' - b'lePK\005\006\0\0\0\0\001\0\001\0003\000' - b'\0\0/\0\0\0\0\0'), - zipfile.ZIP_DEFLATED: ( - b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' - b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' - b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' - b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' - b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' - b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' - b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' - b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'), - zipfile.ZIP_BZIP2: ( - b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' - b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' - b'ileBZh91AY&SY\xd4\xa8\xca' - b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' - b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' - b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' - b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' - b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' - b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' - b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' - b'\x00\x00\x00\x00'), - zipfile.ZIP_LZMA: ( - b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' - b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' - b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' - b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' - b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' - b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' - b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' - b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' - b'\x00>\x00\x00\x00\x00\x00'), - } + try: + with zipfile.ZipFile(TESTFN2, "r") as zipfp2: + raise zipfile.BadZipFile() + except zipfile.BadZipFile: + self.assertIsNone(zipfp2.fp, 'zipfp is not closed') def test_unsupported_version(self): # File has an extract_version of 120 @@ -1027,10 +842,19 @@ class OtherTests(unittest.TestCase): b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') + self.assertRaises(NotImplementedError, zipfile.ZipFile, io.BytesIO(data), 'r') - def test_unicode_filenames(self): + @requires_zlib + def test_read_unicode_filenames(self): + # bug #10801 + fname = findfile('zip_cp437_header.zip') + with zipfile.ZipFile(fname) as zipfp: + for name in zipfp.namelist(): + zipfp.open(name).close() + + def test_write_unicode_filenames(self): with zipfile.ZipFile(TESTFN, "w") as zf: zf.writestr("foo.txt", "Test for unicode filename") zf.writestr("\xf6.txt", "Test for unicode filename") @@ -1078,20 +902,16 @@ class OtherTests(unittest.TestCase): # - passing a filename with open(TESTFN, "w") as fp: fp.write("this is not a legal zip file\n") - chk = zipfile.is_zipfile(TESTFN) - self.assertFalse(chk) + self.assertFalse(zipfile.is_zipfile(TESTFN)) # - passing a file object with open(TESTFN, "rb") as fp: - chk = zipfile.is_zipfile(fp) - self.assertTrue(not chk) + self.assertFalse(zipfile.is_zipfile(fp)) # - passing a file-like object fp = io.BytesIO() fp.write(b"this is not a legal zip file\n") - chk = zipfile.is_zipfile(fp) - self.assertTrue(not chk) + self.assertFalse(zipfile.is_zipfile(fp)) fp.seek(0, 0) - chk = zipfile.is_zipfile(fp) - self.assertTrue(not chk) + self.assertFalse(zipfile.is_zipfile(fp)) def test_damaged_zipfile(self): """Check that zipfiles with missing bytes at the end raise BadZipFile.""" @@ -1113,22 +933,18 @@ class OtherTests(unittest.TestCase): with zipfile.ZipFile(TESTFN, mode="w") as zipf: zipf.writestr("foo.txt", b"O, for a Muse of Fire!") - chk = zipfile.is_zipfile(TESTFN) - self.assertTrue(chk) + self.assertTrue(zipfile.is_zipfile(TESTFN)) # - passing a file object with open(TESTFN, "rb") as fp: - chk = zipfile.is_zipfile(fp) - self.assertTrue(chk) + self.assertTrue(zipfile.is_zipfile(fp)) fp.seek(0, 0) zip_contents = fp.read() # - passing a file-like object fp = io.BytesIO() fp.write(zip_contents) - chk = zipfile.is_zipfile(fp) - self.assertTrue(chk) + self.assertTrue(zipfile.is_zipfile(fp)) fp.seek(0, 0) - chk = zipfile.is_zipfile(fp) - self.assertTrue(chk) + self.assertTrue(zipfile.is_zipfile(fp)) def test_non_existent_file_raises_IOError(self): # make sure we don't raise an AttributeError when a partially-constructed @@ -1309,93 +1125,6 @@ class OtherTests(unittest.TestCase): with zipfile.ZipFile(TESTFN, "r") as zipf: self.assertEqual(zipf.comment, b"this is a comment") - def check_testzip_with_bad_crc(self, compression): - """Tests that files with bad CRCs return their name from testzip.""" - zipdata = self.zips_with_bad_crc[compression] - - with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: - # testzip returns the name of the first corrupt file, or None - self.assertEqual('afile', zipf.testzip()) - - def test_testzip_with_bad_crc_stored(self): - self.check_testzip_with_bad_crc(zipfile.ZIP_STORED) - - @requires_zlib - def test_testzip_with_bad_crc_deflated(self): - self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_testzip_with_bad_crc_bzip2(self): - self.check_testzip_with_bad_crc(zipfile.ZIP_BZIP2) - - @requires_lzma - def test_testzip_with_bad_crc_lzma(self): - self.check_testzip_with_bad_crc(zipfile.ZIP_LZMA) - - def check_read_with_bad_crc(self, compression): - """Tests that files with bad CRCs raise a BadZipFile exception when read.""" - zipdata = self.zips_with_bad_crc[compression] - - # Using ZipFile.read() - with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: - self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') - - # Using ZipExtFile.read() - with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: - with zipf.open('afile', 'r') as corrupt_file: - self.assertRaises(zipfile.BadZipFile, corrupt_file.read) - - # Same with small reads (in order to exercise the buffering logic) - with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: - with zipf.open('afile', 'r') as corrupt_file: - corrupt_file.MIN_READ_SIZE = 2 - with self.assertRaises(zipfile.BadZipFile): - while corrupt_file.read(2): - pass - - def test_read_with_bad_crc_stored(self): - self.check_read_with_bad_crc(zipfile.ZIP_STORED) - - @requires_zlib - def test_read_with_bad_crc_deflated(self): - self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_read_with_bad_crc_bzip2(self): - self.check_read_with_bad_crc(zipfile.ZIP_BZIP2) - - @requires_lzma - def test_read_with_bad_crc_lzma(self): - self.check_read_with_bad_crc(zipfile.ZIP_LZMA) - - def check_read_return_size(self, compression): - # Issue #9837: ZipExtFile.read() shouldn't return more bytes - # than requested. - for test_size in (1, 4095, 4096, 4097, 16384): - file_size = test_size + 1 - junk = b''.join(struct.pack('B', randint(0, 255)) - for x in range(file_size)) - with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf: - zipf.writestr('foo', junk) - with zipf.open('foo', 'r') as fp: - buf = fp.read(test_size) - self.assertEqual(len(buf), test_size) - - def test_read_return_size_stored(self): - self.check_read_return_size(zipfile.ZIP_STORED) - - @requires_zlib - def test_read_return_size_deflated(self): - self.check_read_return_size(zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_read_return_size_bzip2(self): - self.check_read_return_size(zipfile.ZIP_BZIP2) - - @requires_lzma - def test_read_return_size_lzma(self): - self.check_read_return_size(zipfile.ZIP_LZMA) - def test_empty_zipfile(self): # Check that creating a file in 'w' or 'a' mode and closing without # adding any files to the archives creates a valid empty ZIP file @@ -1430,6 +1159,93 @@ class OtherTests(unittest.TestCase): unlink(TESTFN2) +class AbstractBadCrcTests: + def test_testzip_with_bad_crc(self): + """Tests that files with bad CRCs return their name from testzip.""" + zipdata = self.zip_with_bad_crc + + with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: + # testzip returns the name of the first corrupt file, or None + self.assertEqual('afile', zipf.testzip()) + + def test_read_with_bad_crc(self): + """Tests that files with bad CRCs raise a BadZipFile exception when read.""" + zipdata = self.zip_with_bad_crc + + # Using ZipFile.read() + with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: + self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') + + # Using ZipExtFile.read() + with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: + with zipf.open('afile', 'r') as corrupt_file: + self.assertRaises(zipfile.BadZipFile, corrupt_file.read) + + # Same with small reads (in order to exercise the buffering logic) + with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: + with zipf.open('afile', 'r') as corrupt_file: + corrupt_file.MIN_READ_SIZE = 2 + with self.assertRaises(zipfile.BadZipFile): + while corrupt_file.read(2): + pass + + +class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): + compression = zipfile.ZIP_STORED + zip_with_bad_crc = ( + b'PK\003\004\024\0\0\0\0\0 \213\212;:r' + b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' + b'ilehello,AworldP' + b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' + b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' + b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' + b'lePK\005\006\0\0\0\0\001\0\001\0003\000' + b'\0\0/\0\0\0\0\0') + +@requires_zlib +class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + zip_with_bad_crc = ( + b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' + b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' + b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' + b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' + b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' + b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') + +@requires_bz2 +class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): + compression = zipfile.ZIP_BZIP2 + zip_with_bad_crc = ( + b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' + b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ileBZh91AY&SY\xd4\xa8\xca' + b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' + b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' + b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' + b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' + b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' + b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' + b'\x00\x00\x00\x00') + +@requires_lzma +class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): + compression = zipfile.ZIP_LZMA + zip_with_bad_crc = ( + b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' + b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' + b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' + b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' + b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' + b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' + b'\x00>\x00\x00\x00\x00\x00') + + class DecryptionTests(unittest.TestCase): """Check that ZIP decryption works. Since the library does not support encryption at the moment, we use a pre-generated encrypted @@ -1495,13 +1311,14 @@ class DecryptionTests(unittest.TestCase): self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python") self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python") - -class TestsWithRandomBinaryFiles(unittest.TestCase): - def setUp(self): +class AbstractTestsWithRandomBinaryFiles: + @classmethod + def setUpClass(cls): datacount = randint(16, 64)*1024 + randint(1, 1024) - self.data = b''.join(struct.pack('