diff options
-rw-r--r-- | Lib/test/test_zipfile.py | 1404 | ||||
-rw-r--r-- | Misc/NEWS | 4 |
2 files changed, 565 insertions, 843 deletions
diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py index d3a359f..26dd09b 100644 --- a/Lib/test/test_zipfile.py +++ b/Lib/test/test_zipfile.py @@ -10,10 +10,9 @@ import unittest from tempfile import TemporaryFile -from random import randint, random -from unittest import skipUnless +from random import randint, random, getrandbits -from test.support import (TESTFN, run_unittest, findfile, unlink, +from test.support import (TESTFN, findfile, unlink, requires_zlib, requires_bz2, requires_lzma, captured_stdout) @@ -27,14 +26,24 @@ SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] +def get_files(test): + yield TESTFN2 + with TemporaryFile() as f: + yield f + test.assertFalse(f.closed) + with io.BytesIO() as f: + yield f + test.assertFalse(f.closed) + +class AbstractTestsWithSourceFile: + @classmethod + def setUpClass(cls): + cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % + (i, random()), "ascii") + for i in range(FIXEDTEST_SIZE)] + cls.data = b''.join(cls.line_gen) -class TestsWithSourceFile(unittest.TestCase): def setUp(self): - self.line_gen = (bytes("Zipfile test line %d. random float: %f" % - (i, random()), "ascii") - for i in range(FIXEDTEST_SIZE)) - self.data = b'\n'.join(self.line_gen) + b'\n' - # Make a source file with some lines with open(TESTFN, "wb") as fp: fp.write(self.data) @@ -97,12 +106,10 @@ class TestsWithSourceFile(unittest.TestCase): # Check that testzip doesn't raise an exception zipfp.testzip() - if not isinstance(f, str): - f.close() - def test_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_STORED) + def test_basic(self): + for f in get_files(self): + self.zip_test(f, self.compression) def zip_open_test(self, f, compression): self.make_test_archive(f, compression) @@ -127,30 +134,10 @@ class TestsWithSourceFile(unittest.TestCase): self.assertEqual(b''.join(zipdata1), self.data) self.assertEqual(b''.join(zipdata2), self.data) - if not isinstance(f, str): - f.close() - - def test_open_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_STORED) - - def test_open_via_zip_info(self): - # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: - zipfp.writestr("name", "foo") - zipfp.writestr("name", "bar") - with zipfile.ZipFile(TESTFN2, "r") as zipfp: - infos = zipfp.infolist() - data = b"" - for info in infos: - with zipfp.open(info) as zipopen: - data += zipopen.read() - self.assertTrue(data == b"foobar" or data == b"barfoo") - data = b"" - for info in infos: - data += zipfp.read(info) - self.assertTrue(data == b"foobar" or data == b"barfoo") + def test_open(self): + for f in get_files(self): + self.zip_open_test(f, self.compression) def zip_random_open_test(self, f, compression): self.make_test_archive(f, compression) @@ -166,36 +153,17 @@ class TestsWithSourceFile(unittest.TestCase): zipdata1.append(read_data) self.assertEqual(b''.join(zipdata1), self.data) - if not isinstance(f, str): - f.close() - - def test_random_open_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_STORED) - - def test_univeral_readaheads(self): - f = io.BytesIO() - - data = b'a\r\n' * 16 * 1024 - zipfp = zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) - zipfp.writestr(TESTFN, data) - zipfp.close() - - data2 = b'' - zipfp = zipfile.ZipFile(f, 'r') - with zipfp.open(TESTFN, 'rU') as zipopen: - for line in zipopen: - data2 += line - zipfp.close() - self.assertEqual(data, data2.replace(b'\n', b'\r\n')) + def test_random_open(self): + for f in get_files(self): + self.zip_random_open_test(f, self.compression) def zip_readline_read_test(self, f, compression): self.make_test_archive(f, compression) # Read the ZIP archive - zipfp = zipfile.ZipFile(f, "r") - with zipfp.open(TESTFN) as zipopen: + with zipfile.ZipFile(f, "r") as zipfp, \ + zipfp.open(TESTFN) as zipopen: data = b'' while True: read = zipopen.readline() @@ -209,9 +177,11 @@ class TestsWithSourceFile(unittest.TestCase): data += read self.assertEqual(data, self.data) - zipfp.close() - if not isinstance(f, str): - f.close() + + def test_readline_read(self): + # Issue #7610: calls to readline() interleaved with calls to read(). + for f in get_files(self): + self.zip_readline_read_test(f, self.compression) def zip_readline_test(self, f, compression): self.make_test_archive(f, compression) @@ -221,9 +191,11 @@ class TestsWithSourceFile(unittest.TestCase): with zipfp.open(TESTFN) as zipopen: for line in self.line_gen: linedata = zipopen.readline() - self.assertEqual(linedata, line + '\n') - if not isinstance(f, str): - f.close() + self.assertEqual(linedata, line) + + def test_readline(self): + for f in get_files(self): + self.zip_readline_test(f, self.compression) def zip_readlines_test(self, f, compression): self.make_test_archive(f, compression) @@ -233,9 +205,11 @@ class TestsWithSourceFile(unittest.TestCase): with zipfp.open(TESTFN) as zipopen: ziplines = zipopen.readlines() for line, zipline in zip(self.line_gen, ziplines): - self.assertEqual(zipline, line + '\n') - if not isinstance(f, str): - f.close() + self.assertEqual(zipline, line) + + def test_readlines(self): + for f in get_files(self): + self.zip_readlines_test(f, self.compression) def zip_iterlines_test(self, f, compression): self.make_test_archive(f, compression) @@ -244,173 +218,64 @@ class TestsWithSourceFile(unittest.TestCase): with zipfile.ZipFile(f, "r") as zipfp: with zipfp.open(TESTFN) as zipopen: for line, zipline in zip(self.line_gen, zipopen): - self.assertEqual(zipline, line + '\n') - if not isinstance(f, str): - f.close() - - def test_readline_read_stored(self): - # Issue #7610: calls to readline() interleaved with calls to read(). - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_read_test(f, zipfile.ZIP_STORED) - - def test_readline_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_test(f, zipfile.ZIP_STORED) - - def test_readlines_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readlines_test(f, zipfile.ZIP_STORED) - - def test_iterlines_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_iterlines_test(f, zipfile.ZIP_STORED) + self.assertEqual(zipline, line) - @requires_zlib - def test_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_DEFLATED) - - - @requires_zlib - def test_open_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_random_open_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) + def test_iterlines(self): + for f in get_files(self): + self.zip_iterlines_test(f, self.compression) - @requires_zlib - def test_readline_read_deflated(self): - # Issue #7610: calls to readline() interleaved with calls to read(). - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_readline_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_readlines_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readlines_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_iterlines_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib def test_low_compression(self): """Check for cases where compressed data is larger than original.""" # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp: + with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: zipfp.writestr("strfile", '12') # Get an open object for strfile - with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp: + with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: with zipfp.open("strfile") as openobj: self.assertEqual(openobj.read(1), b'1') self.assertEqual(openobj.read(1), b'2') - @requires_bz2 - def test_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_open_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_random_open_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_readline_read_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_read_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_readline_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_readlines_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readlines_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_iterlines_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_iterlines_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_low_compression_bzip2(self): - """Check for cases where compressed data is larger than original.""" - # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_BZIP2) as zipfp: - zipfp.writestr("strfile", '12') + def test_writestr_compression(self): + zipfp = zipfile.ZipFile(TESTFN2, "w") + zipfp.writestr("b.txt", "hello world", compress_type=self.compression) + info = zipfp.getinfo('b.txt') + self.assertEqual(info.compress_type, self.compression) - # Get an open object for strfile - with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_BZIP2) as zipfp: - with zipfp.open("strfile") as openobj: - self.assertEqual(openobj.read(1), b'1') - self.assertEqual(openobj.read(1), b'2') + def test_read_return_size(self): + # Issue #9837: ZipExtFile.read() shouldn't return more bytes + # than requested. + for test_size in (1, 4095, 4096, 4097, 16384): + file_size = test_size + 1 + junk = getrandbits(8 * file_size).to_bytes(file_size, 'little') + with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: + zipf.writestr('foo', junk) + with zipf.open('foo', 'r') as fp: + buf = fp.read(test_size) + self.assertEqual(len(buf), test_size) - @requires_lzma - def test_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_LZMA) + def tearDown(self): + unlink(TESTFN) + unlink(TESTFN2) - @requires_lzma - def test_open_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_LZMA) - @requires_lzma - def test_random_open_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_LZMA) +class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, + unittest.TestCase): + compression = zipfile.ZIP_STORED + test_low_compression = None - @requires_lzma - def test_readline_read_lzma(self): - # Issue #7610: calls to readline() interleaved with calls to read(). - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_read_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_readline_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readline_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_readlines_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_readlines_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_iterlines_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_iterlines_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_low_compression_lzma(self): - """Check for cases where compressed data is larger than original.""" - # Create the ZIP archive - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_LZMA) as zipfp: - zipfp.writestr("strfile", '12') + def zip_test_writestr_permissions(self, f, compression): + # Make sure that writestr creates files with mode 0600, + # when it is passed a name rather than a ZipInfo instance. - # Get an open object for strfile - with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_LZMA) as zipfp: - with zipfp.open("strfile") as openobj: - self.assertEqual(openobj.read(1), b'1') - self.assertEqual(openobj.read(1), b'2') + self.make_test_archive(f, compression) + with zipfile.ZipFile(f, "r") as zipfp: + zinfo = zipfp.getinfo('strfile') + self.assertEqual(zinfo.external_attr, 0o600 << 16) + + def test_writestr_permissions(self): + for f in get_files(self): + self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) def test_absolute_arcnames(self): with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: @@ -470,18 +335,6 @@ class TestsWithSourceFile(unittest.TestCase): with open(TESTFN, "rb") as f: self.assertEqual(zipfp.read(TESTFN), f.read()) - @requires_zlib - def test_per_file_compression(self): - """Check that files within a Zip archive can have different - compression options.""" - with zipfile.ZipFile(TESTFN2, "w") as zipfp: - zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) - zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) - sinfo = zipfp.getinfo('storeme') - dinfo = zipfp.getinfo('deflateme') - self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) - self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) - def test_write_to_readonly(self): """Check that trying to call write() on a readonly ZipFile object raises a RuntimeError.""" @@ -491,283 +344,57 @@ class TestsWithSourceFile(unittest.TestCase): with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: self.assertRaises(RuntimeError, zipfp.write, TESTFN) - def test_extract(self): - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: - for fpath, fdata in SMALL_TEST_DATA: - zipfp.writestr(fpath, fdata) - - with zipfile.ZipFile(TESTFN2, "r") as zipfp: - for fpath, fdata in SMALL_TEST_DATA: - writtenfile = zipfp.extract(fpath) - - # make sure it was written to the right place - correctfile = os.path.join(os.getcwd(), fpath) - correctfile = os.path.normpath(correctfile) - - self.assertEqual(writtenfile, correctfile) - - # make sure correct data is in correct file - with open(writtenfile, "rb") as f: - self.assertEqual(fdata.encode(), f.read()) - - os.remove(writtenfile) - - # remove the test file subdirectories - shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) - - def test_extract_all(self): - with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: - for fpath, fdata in SMALL_TEST_DATA: - zipfp.writestr(fpath, fdata) - - with zipfile.ZipFile(TESTFN2, "r") as zipfp: - zipfp.extractall() - for fpath, fdata in SMALL_TEST_DATA: - outfile = os.path.join(os.getcwd(), fpath) - - with open(outfile, "rb") as f: - self.assertEqual(fdata.encode(), f.read()) - - os.remove(outfile) - - # remove the test file subdirectories - shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) - - def check_file(self, filename, content): - self.assertTrue(os.path.isfile(filename)) - with open(filename, 'rb') as f: - self.assertEqual(f.read(), content) - - def test_sanitize_windows_name(self): - san = zipfile.ZipFile._sanitize_windows_name - # Passing pathsep in allows this test to work regardless of platform. - self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') - self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') - self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') - - def test_extract_hackers_arcnames_common_cases(self): - common_hacknames = [ - ('../foo/bar', 'foo/bar'), - ('foo/../bar', 'foo/bar'), - ('foo/../../bar', 'foo/bar'), - ('foo/bar/..', 'foo/bar'), - ('./../foo/bar', 'foo/bar'), - ('/foo/bar', 'foo/bar'), - ('/foo/../bar', 'foo/bar'), - ('/foo/../../bar', 'foo/bar'), - ] - self._test_extract_hackers_arcnames(common_hacknames) - - @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') - def test_extract_hackers_arcnames_windows_only(self): - """Test combination of path fixing and windows name sanitization.""" - windows_hacknames = [ - (r'..\foo\bar', 'foo/bar'), - (r'..\/foo\/bar', 'foo/bar'), - (r'foo/\..\/bar', 'foo/bar'), - (r'foo\/../\bar', 'foo/bar'), - (r'C:foo/bar', 'foo/bar'), - (r'C:/foo/bar', 'foo/bar'), - (r'C://foo/bar', 'foo/bar'), - (r'C:\foo\bar', 'foo/bar'), - (r'//conky/mountpoint/foo/bar', 'foo/bar'), - (r'\\conky\mountpoint\foo\bar', 'foo/bar'), - (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), - (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), - (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), - (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), - (r'//?/C:/foo/bar', 'foo/bar'), - (r'\\?\C:\foo\bar', 'foo/bar'), - (r'C:/../C:/foo/bar', 'C_/foo/bar'), - (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), - ('../../foo../../ba..r', 'foo/ba..r'), - ] - self._test_extract_hackers_arcnames(windows_hacknames) - - @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') - def test_extract_hackers_arcnames_posix_only(self): - posix_hacknames = [ - ('//foo/bar', 'foo/bar'), - ('../../foo../../ba..r', 'foo../ba..r'), - (r'foo/..\bar', r'foo/..\bar'), - ] - self._test_extract_hackers_arcnames(posix_hacknames) - - def _test_extract_hackers_arcnames(self, hacknames): - for arcname, fixedname in hacknames: - content = b'foobar' + arcname.encode() - with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: - zinfo = zipfile.ZipInfo() - # preserve backslashes - zinfo.filename = arcname - zinfo.external_attr = 0o600 << 16 - zipfp.writestr(zinfo, content) - - arcname = arcname.replace(os.sep, "/") - targetpath = os.path.join('target', 'subdir', 'subsub') - correctfile = os.path.join(targetpath, *fixedname.split('/')) - - with zipfile.ZipFile(TESTFN2, 'r') as zipfp: - writtenfile = zipfp.extract(arcname, targetpath) - self.assertEqual(writtenfile, correctfile, - msg='extract %r: %r != %r' % - (arcname, writtenfile, correctfile)) - self.check_file(correctfile, content) - shutil.rmtree('target') - - with zipfile.ZipFile(TESTFN2, 'r') as zipfp: - zipfp.extractall(targetpath) - self.check_file(correctfile, content) - shutil.rmtree('target') - - correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) - - with zipfile.ZipFile(TESTFN2, 'r') as zipfp: - writtenfile = zipfp.extract(arcname) - self.assertEqual(writtenfile, correctfile, - msg="extract %r" % arcname) - self.check_file(correctfile, content) - shutil.rmtree(fixedname.split('/')[0]) - - with zipfile.ZipFile(TESTFN2, 'r') as zipfp: - zipfp.extractall() - self.check_file(correctfile, content) - shutil.rmtree(fixedname.split('/')[0]) - - os.remove(TESTFN2) - - def test_writestr_compression_stored(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED) - info = zipfp.getinfo('a.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_STORED) - - @requires_zlib - def test_writestr_compression_deflated(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED) - info = zipfp.getinfo('b.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_writestr_compression_bzip2(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_BZIP2) - info = zipfp.getinfo('b.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_BZIP2) - - @requires_lzma - def test_writestr_compression_lzma(self): - zipfp = zipfile.ZipFile(TESTFN2, "w") - zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_LZMA) - info = zipfp.getinfo('b.txt') - self.assertEqual(info.compress_type, zipfile.ZIP_LZMA) - - def zip_test_writestr_permissions(self, f, compression): - # Make sure that writestr creates files with mode 0600, - # when it is passed a name rather than a ZipInfo instance. - - self.make_test_archive(f, compression) - with zipfile.ZipFile(f, "r") as zipfp: - zinfo = zipfp.getinfo('strfile') - self.assertEqual(zinfo.external_attr, 0o600 << 16) - if not isinstance(f, str): - f.close() - - def test_writestr_permissions(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) - - def test_writestr_extended_local_header_issue1202(self): - with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: - for data in 'abcdefghijklmnop': - zinfo = zipfile.ZipInfo(data) - zinfo.flag_bits |= 0x08 # Include an extended local header. - orig_zip.writestr(zinfo, data) - - def test_close(self): - """Check that the zipfile is closed after the 'with' block.""" - with zipfile.ZipFile(TESTFN2, "w") as zipfp: - for fpath, fdata in SMALL_TEST_DATA: - zipfp.writestr(fpath, fdata) - self.assertTrue(zipfp.fp is not None, 'zipfp is not open') - self.assertTrue(zipfp.fp is None, 'zipfp is not closed') - - with zipfile.ZipFile(TESTFN2, "r") as zipfp: - self.assertTrue(zipfp.fp is not None, 'zipfp is not open') - self.assertTrue(zipfp.fp is None, 'zipfp is not closed') - - def test_close_on_exception(self): - """Check that the zipfile is closed if an exception is raised in the - 'with' block.""" - with zipfile.ZipFile(TESTFN2, "w") as zipfp: - for fpath, fdata in SMALL_TEST_DATA: - zipfp.writestr(fpath, fdata) - - try: - with zipfile.ZipFile(TESTFN2, "r") as zipfp2: - raise zipfile.BadZipFile() - except zipfile.BadZipFile: - self.assertTrue(zipfp2.fp is None, 'zipfp is not closed') - def test_add_file_before_1980(self): # Set atime and mtime to 1970-01-01 os.utime(TESTFN, (0, 0)) with zipfile.ZipFile(TESTFN2, "w") as zipfp: self.assertRaises(ValueError, zipfp.write, TESTFN) +@requires_zlib +class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, + unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + def test_per_file_compression(self): + """Check that files within a Zip archive can have different + compression options.""" + with zipfile.ZipFile(TESTFN2, "w") as zipfp: + zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) + zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) + sinfo = zipfp.getinfo('storeme') + dinfo = zipfp.getinfo('deflateme') + self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) + self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) +@requires_bz2 +class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, + unittest.TestCase): + compression = zipfile.ZIP_BZIP2 +@requires_lzma +class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, + unittest.TestCase): + compression = zipfile.ZIP_LZMA - - @requires_zlib - def test_unicode_filenames(self): - # bug #10801 - fname = findfile('zip_cp437_header.zip') - with zipfile.ZipFile(fname) as zipfp: - for name in zipfp.namelist(): - zipfp.open(name).close() - - def tearDown(self): - unlink(TESTFN) - unlink(TESTFN2) - - -class TestZip64InSmallFiles(unittest.TestCase): +class AbstractTestZip64InSmallFiles: # These tests test the ZIP64 functionality without using large files, # see test_zipfile64 for proper tests. + @classmethod + def setUpClass(cls): + line_gen = (bytes("Test of zipfile line %d." % i, "ascii") + for i in range(0, FIXEDTEST_SIZE)) + cls.data = b'\n'.join(line_gen) + def setUp(self): self._limit = zipfile.ZIP64_LIMIT zipfile.ZIP64_LIMIT = 5 - line_gen = (bytes("Test of zipfile line %d." % i, "ascii") - for i in range(0, FIXEDTEST_SIZE)) - self.data = b'\n'.join(line_gen) - # Make a source file with some lines with open(TESTFN, "wb") as fp: fp.write(self.data) - def large_file_exception_test(self, f, compression): - with zipfile.ZipFile(f, "w", compression) as zipfp: - self.assertRaises(zipfile.LargeZipFile, - zipfp.write, TESTFN, "another.name") - - def large_file_exception_test2(self, f, compression): - with zipfile.ZipFile(f, "w", compression) as zipfp: - self.assertRaises(zipfile.LargeZipFile, - zipfp.writestr, "another.name", self.data) - if not isinstance(f, str): - f.close() - - def test_large_file_exception(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.large_file_exception_test(f, zipfile.ZIP_STORED) - self.large_file_exception_test2(f, zipfile.ZIP_STORED) - def zip_test(self, f, compression): # Create the ZIP archive with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: @@ -824,27 +451,35 @@ class TestZip64InSmallFiles(unittest.TestCase): # Check that testzip doesn't raise an exception zipfp.testzip() - if not isinstance(f, str): - f.close() - def test_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_STORED) + def test_basic(self): + for f in get_files(self): + self.zip_test(f, self.compression) - @requires_zlib - def test_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_DEFLATED) + def tearDown(self): + zipfile.ZIP64_LIMIT = self._limit + unlink(TESTFN) + unlink(TESTFN2) - @requires_bz2 - def test_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_BZIP2) - @requires_lzma - def test_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_LZMA) +class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, + unittest.TestCase): + compression = zipfile.ZIP_STORED + + def large_file_exception_test(self, f, compression): + with zipfile.ZipFile(f, "w", compression) as zipfp: + self.assertRaises(zipfile.LargeZipFile, + zipfp.write, TESTFN, "another.name") + + def large_file_exception_test2(self, f, compression): + with zipfile.ZipFile(f, "w", compression) as zipfp: + self.assertRaises(zipfile.LargeZipFile, + zipfp.writestr, "another.name", self.data) + + def test_large_file_exception(self): + for f in get_files(self): + self.large_file_exception_test(f, zipfile.ZIP_STORED) + self.large_file_exception_test2(f, zipfile.ZIP_STORED) def test_absolute_arcnames(self): with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, @@ -854,13 +489,27 @@ class TestZip64InSmallFiles(unittest.TestCase): with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: self.assertEqual(zipfp.namelist(), ["absolute"]) - def tearDown(self): - zipfile.ZIP64_LIMIT = self._limit - unlink(TESTFN) - unlink(TESTFN2) +@requires_zlib +class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, + unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + +@requires_bz2 +class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, + unittest.TestCase): + compression = zipfile.ZIP_BZIP2 + +@requires_lzma +class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, + unittest.TestCase): + compression = zipfile.ZIP_LZMA class PyZipFileTests(unittest.TestCase): + def assertCompiledIn(self, name, namelist): + if name + 'o' not in namelist: + self.assertIn(name + 'c', namelist) + def test_write_pyfile(self): with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: fn = __file__ @@ -877,8 +526,7 @@ class PyZipFileTests(unittest.TestCase): bn = os.path.basename(fn) self.assertNotIn(bn, zipfp.namelist()) - self.assertTrue(bn + 'o' in zipfp.namelist() or - bn + 'c' in zipfp.namelist()) + self.assertCompiledIn(bn, zipfp.namelist()) with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: fn = __file__ @@ -889,8 +537,7 @@ class PyZipFileTests(unittest.TestCase): bn = "%s/%s" % ("testpackage", os.path.basename(fn)) self.assertNotIn(bn, zipfp.namelist()) - self.assertTrue(bn + 'o' in zipfp.namelist() or - bn + 'c' in zipfp.namelist()) + self.assertCompiledIn(bn, zipfp.namelist()) def test_write_python_package(self): import email @@ -902,10 +549,8 @@ class PyZipFileTests(unittest.TestCase): # Check for a couple of modules at different levels of the # hierarchy names = zipfp.namelist() - self.assertTrue('email/__init__.pyo' in names or - 'email/__init__.pyc' in names) - self.assertTrue('email/mime/text.pyo' in names or - 'email/mime/text.pyc' in names) + self.assertCompiledIn('email/__init__.py', names) + self.assertCompiledIn('email/mime/text.py', names) def test_write_with_optimization(self): import email @@ -939,8 +584,8 @@ class PyZipFileTests(unittest.TestCase): zipfp.writepy(TESTFN2) names = zipfp.namelist() - self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names) - self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names) + self.assertCompiledIn('mod1.py', names) + self.assertCompiledIn('mod2.py', names) self.assertNotIn('mod2.txt', names) finally: @@ -976,49 +621,219 @@ class PyZipFileTests(unittest.TestCase): finally: shutil.rmtree(TESTFN2) + +class ExtractTests(unittest.TestCase): + def test_extract(self): + with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: + for fpath, fdata in SMALL_TEST_DATA: + zipfp.writestr(fpath, fdata) + + with zipfile.ZipFile(TESTFN2, "r") as zipfp: + for fpath, fdata in SMALL_TEST_DATA: + writtenfile = zipfp.extract(fpath) + + # make sure it was written to the right place + correctfile = os.path.join(os.getcwd(), fpath) + correctfile = os.path.normpath(correctfile) + + self.assertEqual(writtenfile, correctfile) + + # make sure correct data is in correct file + with open(writtenfile, "rb") as f: + self.assertEqual(fdata.encode(), f.read()) + + os.remove(writtenfile) + + # remove the test file subdirectories + shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) + + def test_extract_all(self): + with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: + for fpath, fdata in SMALL_TEST_DATA: + zipfp.writestr(fpath, fdata) + + with zipfile.ZipFile(TESTFN2, "r") as zipfp: + zipfp.extractall() + for fpath, fdata in SMALL_TEST_DATA: + outfile = os.path.join(os.getcwd(), fpath) + + with open(outfile, "rb") as f: + self.assertEqual(fdata.encode(), f.read()) + + os.remove(outfile) + + # remove the test file subdirectories + shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) + + def check_file(self, filename, content): + self.assertTrue(os.path.isfile(filename)) + with open(filename, 'rb') as f: + self.assertEqual(f.read(), content) + + def test_sanitize_windows_name(self): + san = zipfile.ZipFile._sanitize_windows_name + # Passing pathsep in allows this test to work regardless of platform. + self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') + self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') + self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') + + def test_extract_hackers_arcnames_common_cases(self): + common_hacknames = [ + ('../foo/bar', 'foo/bar'), + ('foo/../bar', 'foo/bar'), + ('foo/../../bar', 'foo/bar'), + ('foo/bar/..', 'foo/bar'), + ('./../foo/bar', 'foo/bar'), + ('/foo/bar', 'foo/bar'), + ('/foo/../bar', 'foo/bar'), + ('/foo/../../bar', 'foo/bar'), + ] + self._test_extract_hackers_arcnames(common_hacknames) + + @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') + def test_extract_hackers_arcnames_windows_only(self): + """Test combination of path fixing and windows name sanitization.""" + windows_hacknames = [ + (r'..\foo\bar', 'foo/bar'), + (r'..\/foo\/bar', 'foo/bar'), + (r'foo/\..\/bar', 'foo/bar'), + (r'foo\/../\bar', 'foo/bar'), + (r'C:foo/bar', 'foo/bar'), + (r'C:/foo/bar', 'foo/bar'), + (r'C://foo/bar', 'foo/bar'), + (r'C:\foo\bar', 'foo/bar'), + (r'//conky/mountpoint/foo/bar', 'foo/bar'), + (r'\\conky\mountpoint\foo\bar', 'foo/bar'), + (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), + (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), + (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), + (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), + (r'//?/C:/foo/bar', 'foo/bar'), + (r'\\?\C:\foo\bar', 'foo/bar'), + (r'C:/../C:/foo/bar', 'C_/foo/bar'), + (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), + ('../../foo../../ba..r', 'foo/ba..r'), + ] + self._test_extract_hackers_arcnames(windows_hacknames) + + @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') + def test_extract_hackers_arcnames_posix_only(self): + posix_hacknames = [ + ('//foo/bar', 'foo/bar'), + ('../../foo../../ba..r', 'foo../ba..r'), + (r'foo/..\bar', r'foo/..\bar'), + ] + self._test_extract_hackers_arcnames(posix_hacknames) + + def _test_extract_hackers_arcnames(self, hacknames): + for arcname, fixedname in hacknames: + content = b'foobar' + arcname.encode() + with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: + zinfo = zipfile.ZipInfo() + # preserve backslashes + zinfo.filename = arcname + zinfo.external_attr = 0o600 << 16 + zipfp.writestr(zinfo, content) + + arcname = arcname.replace(os.sep, "/") + targetpath = os.path.join('target', 'subdir', 'subsub') + correctfile = os.path.join(targetpath, *fixedname.split('/')) + + with zipfile.ZipFile(TESTFN2, 'r') as zipfp: + writtenfile = zipfp.extract(arcname, targetpath) + self.assertEqual(writtenfile, correctfile, + msg='extract %r: %r != %r' % + (arcname, writtenfile, correctfile)) + self.check_file(correctfile, content) + shutil.rmtree('target') + + with zipfile.ZipFile(TESTFN2, 'r') as zipfp: + zipfp.extractall(targetpath) + self.check_file(correctfile, content) + shutil.rmtree('target') + + correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) + + with zipfile.ZipFile(TESTFN2, 'r') as zipfp: + writtenfile = zipfp.extract(arcname) + self.assertEqual(writtenfile, correctfile, + msg="extract %r" % arcname) + self.check_file(correctfile, content) + shutil.rmtree(fixedname.split('/')[0]) + + with zipfile.ZipFile(TESTFN2, 'r') as zipfp: + zipfp.extractall() + self.check_file(correctfile, content) + shutil.rmtree(fixedname.split('/')[0]) + + os.remove(TESTFN2) + + class OtherTests(unittest.TestCase): - zips_with_bad_crc = { - zipfile.ZIP_STORED: ( - b'PK\003\004\024\0\0\0\0\0 \213\212;:r' - b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' - b'ilehello,AworldP' - b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' - b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' - b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' - b'lePK\005\006\0\0\0\0\001\0\001\0003\000' - b'\0\0/\0\0\0\0\0'), - zipfile.ZIP_DEFLATED: ( - b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' - b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' - b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' - b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' - b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' - b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' - b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' - b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'), - zipfile.ZIP_BZIP2: ( - b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' - b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' - b'ileBZh91AY&SY\xd4\xa8\xca' - b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' - b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' - b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' - b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' - b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' - b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' - b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' - b'\x00\x00\x00\x00'), - zipfile.ZIP_LZMA: ( - b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' - b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' - b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' - b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' - b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' - b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' - b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' - b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' - b'\x00>\x00\x00\x00\x00\x00'), - } + def test_open_via_zip_info(self): + # Create the ZIP archive + with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: + zipfp.writestr("name", "foo") + zipfp.writestr("name", "bar") + + with zipfile.ZipFile(TESTFN2, "r") as zipfp: + infos = zipfp.infolist() + data = b"" + for info in infos: + with zipfp.open(info) as zipopen: + data += zipopen.read() + self.assertIn(data, {b"foobar", b"barfoo"}) + data = b"" + for info in infos: + data += zipfp.read(info) + self.assertIn(data, {b"foobar", b"barfoo"}) + + def test_universal_readaheads(self): + f = io.BytesIO() + + data = b'a\r\n' * 16 * 1024 + with zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) as zipfp: + zipfp.writestr(TESTFN, data) + + data2 = b'' + with zipfile.ZipFile(f, 'r') as zipfp, \ + zipfp.open(TESTFN, 'rU') as zipopen: + for line in zipopen: + data2 += line + + self.assertEqual(data, data2.replace(b'\n', b'\r\n')) + + def test_writestr_extended_local_header_issue1202(self): + with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: + for data in 'abcdefghijklmnop': + zinfo = zipfile.ZipInfo(data) + zinfo.flag_bits |= 0x08 # Include an extended local header. + orig_zip.writestr(zinfo, data) + + def test_close(self): + """Check that the zipfile is closed after the 'with' block.""" + with zipfile.ZipFile(TESTFN2, "w") as zipfp: + for fpath, fdata in SMALL_TEST_DATA: + zipfp.writestr(fpath, fdata) + self.assertIsNotNone(zipfp.fp, 'zipfp is not open') + self.assertIsNone(zipfp.fp, 'zipfp is not closed') + + with zipfile.ZipFile(TESTFN2, "r") as zipfp: + self.assertIsNotNone(zipfp.fp, 'zipfp is not open') + self.assertIsNone(zipfp.fp, 'zipfp is not closed') + + def test_close_on_exception(self): + """Check that the zipfile is closed if an exception is raised in the + 'with' block.""" + with zipfile.ZipFile(TESTFN2, "w") as zipfp: + for fpath, fdata in SMALL_TEST_DATA: + zipfp.writestr(fpath, fdata) + + try: + with zipfile.ZipFile(TESTFN2, "r") as zipfp2: + raise zipfile.BadZipFile() + except zipfile.BadZipFile: + self.assertIsNone(zipfp2.fp, 'zipfp is not closed') def test_unsupported_version(self): # File has an extract_version of 120 @@ -1027,10 +842,19 @@ class OtherTests(unittest.TestCase): b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') + self.assertRaises(NotImplementedError, zipfile.ZipFile, io.BytesIO(data), 'r') - def test_unicode_filenames(self): + @requires_zlib + def test_read_unicode_filenames(self): + # bug #10801 + fname = findfile('zip_cp437_header.zip') + with zipfile.ZipFile(fname) as zipfp: + for name in zipfp.namelist(): + zipfp.open(name).close() + + def test_write_unicode_filenames(self): with zipfile.ZipFile(TESTFN, "w") as zf: zf.writestr("foo.txt", "Test for unicode filename") zf.writestr("\xf6.txt", "Test for unicode filename") @@ -1078,20 +902,16 @@ class OtherTests(unittest.TestCase): # - passing a filename with open(TESTFN, "w") as fp: fp.write("this is not a legal zip file\n") - chk = zipfile.is_zipfile(TESTFN) - self.assertFalse(chk) + self.assertFalse(zipfile.is_zipfile(TESTFN)) # - passing a file object with open(TESTFN, "rb") as fp: - chk = zipfile.is_zipfile(fp) - self.assertTrue(not chk) + self.assertFalse(zipfile.is_zipfile(fp)) # - passing a file-like object fp = io.BytesIO() fp.write(b"this is not a legal zip file\n") - chk = zipfile.is_zipfile(fp) - self.assertTrue(not chk) + self.assertFalse(zipfile.is_zipfile(fp)) fp.seek(0, 0) - chk = zipfile.is_zipfile(fp) - self.assertTrue(not chk) + self.assertFalse(zipfile.is_zipfile(fp)) def test_damaged_zipfile(self): """Check that zipfiles with missing bytes at the end raise BadZipFile.""" @@ -1113,22 +933,18 @@ class OtherTests(unittest.TestCase): with zipfile.ZipFile(TESTFN, mode="w") as zipf: zipf.writestr("foo.txt", b"O, for a Muse of Fire!") - chk = zipfile.is_zipfile(TESTFN) - self.assertTrue(chk) + self.assertTrue(zipfile.is_zipfile(TESTFN)) # - passing a file object with open(TESTFN, "rb") as fp: - chk = zipfile.is_zipfile(fp) - self.assertTrue(chk) + self.assertTrue(zipfile.is_zipfile(fp)) fp.seek(0, 0) zip_contents = fp.read() # - passing a file-like object fp = io.BytesIO() fp.write(zip_contents) - chk = zipfile.is_zipfile(fp) - self.assertTrue(chk) + self.assertTrue(zipfile.is_zipfile(fp)) fp.seek(0, 0) - chk = zipfile.is_zipfile(fp) - self.assertTrue(chk) + self.assertTrue(zipfile.is_zipfile(fp)) def test_non_existent_file_raises_OSError(self): # make sure we don't raise an AttributeError when a partially-constructed @@ -1309,93 +1125,6 @@ class OtherTests(unittest.TestCase): with zipfile.ZipFile(TESTFN, "r") as zipf: self.assertEqual(zipf.comment, b"this is a comment") - def check_testzip_with_bad_crc(self, compression): - """Tests that files with bad CRCs return their name from testzip.""" - zipdata = self.zips_with_bad_crc[compression] - - with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: - # testzip returns the name of the first corrupt file, or None - self.assertEqual('afile', zipf.testzip()) - - def test_testzip_with_bad_crc_stored(self): - self.check_testzip_with_bad_crc(zipfile.ZIP_STORED) - - @requires_zlib - def test_testzip_with_bad_crc_deflated(self): - self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_testzip_with_bad_crc_bzip2(self): - self.check_testzip_with_bad_crc(zipfile.ZIP_BZIP2) - - @requires_lzma - def test_testzip_with_bad_crc_lzma(self): - self.check_testzip_with_bad_crc(zipfile.ZIP_LZMA) - - def check_read_with_bad_crc(self, compression): - """Tests that files with bad CRCs raise a BadZipFile exception when read.""" - zipdata = self.zips_with_bad_crc[compression] - - # Using ZipFile.read() - with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: - self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') - - # Using ZipExtFile.read() - with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: - with zipf.open('afile', 'r') as corrupt_file: - self.assertRaises(zipfile.BadZipFile, corrupt_file.read) - - # Same with small reads (in order to exercise the buffering logic) - with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: - with zipf.open('afile', 'r') as corrupt_file: - corrupt_file.MIN_READ_SIZE = 2 - with self.assertRaises(zipfile.BadZipFile): - while corrupt_file.read(2): - pass - - def test_read_with_bad_crc_stored(self): - self.check_read_with_bad_crc(zipfile.ZIP_STORED) - - @requires_zlib - def test_read_with_bad_crc_deflated(self): - self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_read_with_bad_crc_bzip2(self): - self.check_read_with_bad_crc(zipfile.ZIP_BZIP2) - - @requires_lzma - def test_read_with_bad_crc_lzma(self): - self.check_read_with_bad_crc(zipfile.ZIP_LZMA) - - def check_read_return_size(self, compression): - # Issue #9837: ZipExtFile.read() shouldn't return more bytes - # than requested. - for test_size in (1, 4095, 4096, 4097, 16384): - file_size = test_size + 1 - junk = b''.join(struct.pack('B', randint(0, 255)) - for x in range(file_size)) - with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf: - zipf.writestr('foo', junk) - with zipf.open('foo', 'r') as fp: - buf = fp.read(test_size) - self.assertEqual(len(buf), test_size) - - def test_read_return_size_stored(self): - self.check_read_return_size(zipfile.ZIP_STORED) - - @requires_zlib - def test_read_return_size_deflated(self): - self.check_read_return_size(zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_read_return_size_bzip2(self): - self.check_read_return_size(zipfile.ZIP_BZIP2) - - @requires_lzma - def test_read_return_size_lzma(self): - self.check_read_return_size(zipfile.ZIP_LZMA) - def test_empty_zipfile(self): # Check that creating a file in 'w' or 'a' mode and closing without # adding any files to the archives creates a valid empty ZIP file @@ -1430,6 +1159,93 @@ class OtherTests(unittest.TestCase): unlink(TESTFN2) +class AbstractBadCrcTests: + def test_testzip_with_bad_crc(self): + """Tests that files with bad CRCs return their name from testzip.""" + zipdata = self.zip_with_bad_crc + + with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: + # testzip returns the name of the first corrupt file, or None + self.assertEqual('afile', zipf.testzip()) + + def test_read_with_bad_crc(self): + """Tests that files with bad CRCs raise a BadZipFile exception when read.""" + zipdata = self.zip_with_bad_crc + + # Using ZipFile.read() + with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: + self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') + + # Using ZipExtFile.read() + with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: + with zipf.open('afile', 'r') as corrupt_file: + self.assertRaises(zipfile.BadZipFile, corrupt_file.read) + + # Same with small reads (in order to exercise the buffering logic) + with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: + with zipf.open('afile', 'r') as corrupt_file: + corrupt_file.MIN_READ_SIZE = 2 + with self.assertRaises(zipfile.BadZipFile): + while corrupt_file.read(2): + pass + + +class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): + compression = zipfile.ZIP_STORED + zip_with_bad_crc = ( + b'PK\003\004\024\0\0\0\0\0 \213\212;:r' + b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' + b'ilehello,AworldP' + b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' + b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' + b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' + b'lePK\005\006\0\0\0\0\001\0\001\0003\000' + b'\0\0/\0\0\0\0\0') + +@requires_zlib +class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + zip_with_bad_crc = ( + b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' + b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' + b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' + b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' + b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' + b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') + +@requires_bz2 +class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): + compression = zipfile.ZIP_BZIP2 + zip_with_bad_crc = ( + b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' + b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ileBZh91AY&SY\xd4\xa8\xca' + b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' + b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' + b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' + b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' + b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' + b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' + b'\x00\x00\x00\x00') + +@requires_lzma +class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): + compression = zipfile.ZIP_LZMA + zip_with_bad_crc = ( + b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' + b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' + b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' + b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' + b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' + b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' + b'\x00>\x00\x00\x00\x00\x00') + + class DecryptionTests(unittest.TestCase): """Check that ZIP decryption works. Since the library does not support encryption at the moment, we use a pre-generated encrypted @@ -1495,13 +1311,14 @@ class DecryptionTests(unittest.TestCase): self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python") self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python") - -class TestsWithRandomBinaryFiles(unittest.TestCase): - def setUp(self): +class AbstractTestsWithRandomBinaryFiles: + @classmethod + def setUpClass(cls): datacount = randint(16, 64)*1024 + randint(1, 1024) - self.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) - for i in range(datacount)) + cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) + for i in range(datacount)) + def setUp(self): # Make a source file with some lines with open(TESTFN, "wb") as fp: fp.write(self.data) @@ -1525,27 +1342,10 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): self.assertEqual(len(testdata), len(self.data)) self.assertEqual(testdata, self.data) self.assertEqual(zipfp.read("another.name"), self.data) - if not isinstance(f, str): - f.close() - def test_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_STORED) - - @requires_zlib - def test_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_BZIP2) - - @requires_lzma - def test_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_test(f, zipfile.ZIP_LZMA) + def test_read(self): + for f in get_files(self): + self.zip_test(f, self.compression) def zip_open_test(self, f, compression): self.make_test_archive(f, compression) @@ -1575,27 +1375,10 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): testdata2 = b''.join(zipdata2) self.assertEqual(len(testdata2), len(self.data)) self.assertEqual(testdata2, self.data) - if not isinstance(f, str): - f.close() - - def test_open_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_STORED) - - @requires_zlib - def test_open_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_open_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_BZIP2) - @requires_lzma - def test_open_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_open_test(f, zipfile.ZIP_LZMA) + def test_open(self): + for f in get_files(self): + self.zip_open_test(f, self.compression) def zip_random_open_test(self, f, compression): self.make_test_archive(f, compression) @@ -1613,27 +1396,30 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): testdata = b''.join(zipdata1) self.assertEqual(len(testdata), len(self.data)) self.assertEqual(testdata, self.data) - if not isinstance(f, str): - f.close() - def test_random_open_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_STORED) + def test_random_open(self): + for f in get_files(self): + self.zip_random_open_test(f, self.compression) - @requires_zlib - def test_random_open_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_DEFLATED) - @requires_bz2 - def test_random_open_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_BZIP2) +class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, + unittest.TestCase): + compression = zipfile.ZIP_STORED + +@requires_zlib +class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, + unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + +@requires_bz2 +class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, + unittest.TestCase): + compression = zipfile.ZIP_BZIP2 - @requires_lzma - def test_random_open_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.zip_random_open_test(f, zipfile.ZIP_LZMA) +@requires_lzma +class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, + unittest.TestCase): + compression = zipfile.ZIP_LZMA @requires_zlib @@ -1711,21 +1497,22 @@ class TestWithDirectory(unittest.TestCase): unlink(TESTFN) -class UniversalNewlineTests(unittest.TestCase): +class AbstractUniversalNewlineTests: + @classmethod + def setUpClass(cls): + cls.line_gen = [bytes("Test of zipfile line %d." % i, "ascii") + for i in range(FIXEDTEST_SIZE)] + cls.seps = (b'\r', b'\r\n', b'\n') + cls.arcdata = {} + for n, s in enumerate(cls.seps): + cls.arcdata[s] = s.join(cls.line_gen) + s + def setUp(self): - self.line_gen = [bytes("Test of zipfile line %d." % i, "ascii") - for i in range(FIXEDTEST_SIZE)] - self.seps = ('\r', '\r\n', '\n') - self.arcdata, self.arcfiles = {}, {} + self.arcfiles = {} for n, s in enumerate(self.seps): - b = s.encode("ascii") - self.arcdata[s] = b.join(self.line_gen) + b self.arcfiles[s] = '%s-%d' % (TESTFN, n) - f = open(self.arcfiles[s], "wb") - try: + with open(self.arcfiles[s], "wb") as f: f.write(self.arcdata[s]) - finally: - f.close() def make_test_archive(self, f, compression): # Create the ZIP archive @@ -1742,8 +1529,10 @@ class UniversalNewlineTests(unittest.TestCase): with zipfp.open(fn, "rU") as fp: zipdata = fp.read() self.assertEqual(self.arcdata[sep], zipdata) - if not isinstance(f, str): - f.close() + + def test_read(self): + for f in get_files(self): + self.read_test(f, self.compression) def readline_read_test(self, f, compression): self.make_test_archive(f, compression) @@ -1764,10 +1553,11 @@ class UniversalNewlineTests(unittest.TestCase): break data += read - self.assertEqual(data, self.arcdata['\n']) + self.assertEqual(data, self.arcdata[b'\n']) - if not isinstance(f, str): - f.close() + def test_readline_read(self): + for f in get_files(self): + self.readline_read_test(f, self.compression) def readline_test(self, f, compression): self.make_test_archive(f, compression) @@ -1779,8 +1569,10 @@ class UniversalNewlineTests(unittest.TestCase): for line in self.line_gen: linedata = zipopen.readline() self.assertEqual(linedata, line + b'\n') - if not isinstance(f, str): - f.close() + + def test_readline(self): + for f in get_files(self): + self.readline_test(f, self.compression) def readlines_test(self, f, compression): self.make_test_archive(f, compression) @@ -1792,8 +1584,10 @@ class UniversalNewlineTests(unittest.TestCase): ziplines = fp.readlines() for line, zipline in zip(self.line_gen, ziplines): self.assertEqual(zipline, line + b'\n') - if not isinstance(f, str): - f.close() + + def test_readlines(self): + for f in get_files(self): + self.readlines_test(f, self.compression) def iterlines_test(self, f, compression): self.make_test_archive(f, compression) @@ -1804,105 +1598,10 @@ class UniversalNewlineTests(unittest.TestCase): with zipfp.open(fn, "rU") as fp: for line, zipline in zip(self.line_gen, fp): self.assertEqual(zipline, line + b'\n') - if not isinstance(f, str): - f.close() - - def test_read_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.read_test(f, zipfile.ZIP_STORED) - - def test_readline_read_stored(self): - # Issue #7610: calls to readline() interleaved with calls to read(). - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.readline_read_test(f, zipfile.ZIP_STORED) - - def test_readline_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.readline_test(f, zipfile.ZIP_STORED) - - def test_readlines_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.readlines_test(f, zipfile.ZIP_STORED) - def test_iterlines_stored(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.iterlines_test(f, zipfile.ZIP_STORED) - - @requires_zlib - def test_read_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.read_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_readline_read_deflated(self): - # Issue #7610: calls to readline() interleaved with calls to read(). - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.readline_read_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_readline_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.readline_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_readlines_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.readlines_test(f, zipfile.ZIP_DEFLATED) - - @requires_zlib - def test_iterlines_deflated(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.iterlines_test(f, zipfile.ZIP_DEFLATED) - - @requires_bz2 - def test_read_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.read_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_readline_read_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.readline_read_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_readline_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.readline_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_readlines_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.readlines_test(f, zipfile.ZIP_BZIP2) - - @requires_bz2 - def test_iterlines_bzip2(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.iterlines_test(f, zipfile.ZIP_BZIP2) - - @requires_lzma - def test_read_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.read_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_readline_read_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.readline_read_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_readline_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.readline_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_readlines_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.readlines_test(f, zipfile.ZIP_LZMA) - - @requires_lzma - def test_iterlines_lzma(self): - for f in (TESTFN2, TemporaryFile(), io.BytesIO()): - self.iterlines_test(f, zipfile.ZIP_LZMA) + def test_iterlines(self): + for f in get_files(self): + self.iterlines_test(f, self.compression) def tearDown(self): for sep, fn in self.arcfiles.items(): @@ -1911,5 +1610,24 @@ class UniversalNewlineTests(unittest.TestCase): unlink(TESTFN2) +class StoredUniversalNewlineTests(AbstractUniversalNewlineTests, + unittest.TestCase): + compression = zipfile.ZIP_STORED + +@requires_zlib +class DeflateUniversalNewlineTests(AbstractUniversalNewlineTests, + unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + +@requires_bz2 +class Bzip2UniversalNewlineTests(AbstractUniversalNewlineTests, + unittest.TestCase): + compression = zipfile.ZIP_BZIP2 + +@requires_lzma +class LzmaUniversalNewlineTests(AbstractUniversalNewlineTests, + unittest.TestCase): + compression = zipfile.ZIP_LZMA + if __name__ == "__main__": unittest.main() @@ -548,6 +548,10 @@ Library Tests ----- +- Issue #17944: test_zipfile now discoverable and uses subclassing to + generate tests for different compression types. Fixed a bug with skipping + some tests due to use of exhausted iterators. + - Issue #18266: test_largefile now works with unittest test discovery and supports running only selected tests. Patch by Zachary Ware. |