diff options
author | Martin v. Löwis <martin@v.loewis.de> | 2012-05-13 08:06:36 (GMT) |
---|---|---|
committer | Martin v. Löwis <martin@v.loewis.de> | 2012-05-13 08:06:36 (GMT) |
commit | 7fb79fcb64ad9832c2d616b1b6fe5a93d2ff3288 (patch) | |
tree | 0dce33ace1abe53028aa3722e9cb9db15ddea2ca /Lib | |
parent | bb54b33cec2d83e8b5f6100361ff57cb22b5105a (diff) | |
download | cpython-7fb79fcb64ad9832c2d616b1b6fe5a93d2ff3288.zip cpython-7fb79fcb64ad9832c2d616b1b6fe5a93d2ff3288.tar.gz cpython-7fb79fcb64ad9832c2d616b1b6fe5a93d2ff3288.tar.bz2 |
Issue #14366: Support lzma compression in zip files.
Patch by Serhiy Storchaka.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/test/support.py | 9 | ||||
-rw-r--r-- | Lib/test/test_zipfile.py | 125 | ||||
-rw-r--r-- | Lib/zipfile.py | 121 |
3 files changed, 237 insertions, 18 deletions
diff --git a/Lib/test/support.py b/Lib/test/support.py index c92fa00..4568e54 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -45,6 +45,11 @@ try: except ImportError: bz2 = None +try: + import lzma +except ImportError: + lzma = None + __all__ = [ "Error", "TestFailed", "ResourceDenied", "import_module", "verbose", "use_resources", "max_memuse", "record_original_stdout", @@ -62,7 +67,7 @@ __all__ = [ "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754", "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink", "import_fresh_module", "requires_zlib", "PIPE_MAX_SIZE", "failfast", - "anticipate_failure", "run_with_tz", "requires_bz2" + "anticipate_failure", "run_with_tz", "requires_bz2", "requires_lzma" ] class Error(Exception): @@ -513,6 +518,8 @@ requires_zlib = unittest.skipUnless(zlib, 'requires zlib') requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') +requires_lzma = unittest.skipUnless(lzma, 'requires lzma') + is_jython = sys.platform.startswith('java') # Filename used for testing diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py index 80d45f5..0772236 100644 --- a/Lib/test/test_zipfile.py +++ b/Lib/test/test_zipfile.py @@ -13,7 +13,7 @@ from tempfile import TemporaryFile from random import randint, random from unittest import skipUnless -from test.support import TESTFN, run_unittest, findfile, unlink, requires_zlib, requires_bz2 +from test.support import TESTFN, run_unittest, findfile, unlink, requires_zlib, requires_bz2, requires_lzma TESTFN2 = TESTFN + "2" TESTFNDIR = TESTFN + "d" @@ -361,6 +361,55 @@ class TestsWithSourceFile(unittest.TestCase): self.assertEqual(openobj.read(1), b'1') self.assertEqual(openobj.read(1), b'2') + @requires_lzma + def test_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.zip_test(f, zipfile.ZIP_LZMA) + + @requires_lzma + def test_open_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.zip_open_test(f, zipfile.ZIP_LZMA) + + @requires_lzma + def test_random_open_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.zip_random_open_test(f, zipfile.ZIP_LZMA) + + @requires_lzma + def test_readline_read_lzma(self): + # Issue #7610: calls to readline() interleaved with calls to read(). + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.zip_readline_read_test(f, zipfile.ZIP_LZMA) + + @requires_lzma + def test_readline_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.zip_readline_test(f, zipfile.ZIP_LZMA) + + @requires_lzma + def test_readlines_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.zip_readlines_test(f, zipfile.ZIP_LZMA) + + @requires_lzma + def test_iterlines_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.zip_iterlines_test(f, zipfile.ZIP_LZMA) + + @requires_lzma + def test_low_compression_lzma(self): + """Check for cases where compressed data is larger than original.""" + # Create the ZIP archive + with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_LZMA) as zipfp: + zipfp.writestr("strfile", '12') + + # Get an open object for strfile + with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_LZMA) as zipfp: + with zipfp.open("strfile") as openobj: + self.assertEqual(openobj.read(1), b'1') + self.assertEqual(openobj.read(1), b'2') + def test_absolute_arcnames(self): with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: zipfp.write(TESTFN, "/absolute") @@ -508,6 +557,13 @@ class TestsWithSourceFile(unittest.TestCase): info = zipfp.getinfo('b.txt') self.assertEqual(info.compress_type, zipfile.ZIP_BZIP2) + @requires_lzma + def test_writestr_compression_lzma(self): + zipfp = zipfile.ZipFile(TESTFN2, "w") + zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_LZMA) + info = zipfp.getinfo('b.txt') + self.assertEqual(info.compress_type, zipfile.ZIP_LZMA) + def zip_test_writestr_permissions(self, f, compression): # Make sure that writestr creates files with mode 0600, # when it is passed a name rather than a ZipInfo instance. @@ -686,6 +742,11 @@ class TestZip64InSmallFiles(unittest.TestCase): for f in (TESTFN2, TemporaryFile(), io.BytesIO()): self.zip_test(f, zipfile.ZIP_BZIP2) + @requires_lzma + def test_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.zip_test(f, zipfile.ZIP_LZMA) + def test_absolute_arcnames(self): with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, allowZip64=True) as zipfp: @@ -826,6 +887,16 @@ class OtherTests(unittest.TestCase): b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' b'\x00\x00\x00\x00'), + zipfile.ZIP_LZMA: ( + b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' + b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' + b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' + b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' + b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' + b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' + b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' + b'\x00>\x00\x00\x00\x00\x00'), } def test_unsupported_version(self): @@ -1104,6 +1175,10 @@ class OtherTests(unittest.TestCase): def test_testzip_with_bad_crc_bzip2(self): self.check_testzip_with_bad_crc(zipfile.ZIP_BZIP2) + @requires_lzma + def test_testzip_with_bad_crc_lzma(self): + self.check_testzip_with_bad_crc(zipfile.ZIP_LZMA) + def check_read_with_bad_crc(self, compression): """Tests that files with bad CRCs raise a BadZipFile exception when read.""" zipdata = self.zips_with_bad_crc[compression] @@ -1136,6 +1211,10 @@ class OtherTests(unittest.TestCase): def test_read_with_bad_crc_bzip2(self): self.check_read_with_bad_crc(zipfile.ZIP_BZIP2) + @requires_lzma + def test_read_with_bad_crc_lzma(self): + self.check_read_with_bad_crc(zipfile.ZIP_LZMA) + def check_read_return_size(self, compression): # Issue #9837: ZipExtFile.read() shouldn't return more bytes # than requested. @@ -1160,6 +1239,10 @@ class OtherTests(unittest.TestCase): def test_read_return_size_bzip2(self): self.check_read_return_size(zipfile.ZIP_BZIP2) + @requires_lzma + def test_read_return_size_lzma(self): + self.check_read_return_size(zipfile.ZIP_LZMA) + def test_empty_zipfile(self): # Check that creating a file in 'w' or 'a' mode and closing without # adding any files to the archives creates a valid empty ZIP file @@ -1306,6 +1389,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): for f in (TESTFN2, TemporaryFile(), io.BytesIO()): self.zip_test(f, zipfile.ZIP_BZIP2) + @requires_lzma + def test_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.zip_test(f, zipfile.ZIP_LZMA) + def zip_open_test(self, f, compression): self.make_test_archive(f, compression) @@ -1351,6 +1439,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): for f in (TESTFN2, TemporaryFile(), io.BytesIO()): self.zip_open_test(f, zipfile.ZIP_BZIP2) + @requires_lzma + def test_open_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.zip_open_test(f, zipfile.ZIP_LZMA) + def zip_random_open_test(self, f, compression): self.make_test_archive(f, compression) @@ -1384,6 +1477,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): for f in (TESTFN2, TemporaryFile(), io.BytesIO()): self.zip_random_open_test(f, zipfile.ZIP_BZIP2) + @requires_lzma + def test_random_open_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.zip_random_open_test(f, zipfile.ZIP_LZMA) + @requires_zlib class TestsWithMultipleOpens(unittest.TestCase): @@ -1628,6 +1726,31 @@ class UniversalNewlineTests(unittest.TestCase): for f in (TESTFN2, TemporaryFile(), io.BytesIO()): self.iterlines_test(f, zipfile.ZIP_BZIP2) + @requires_lzma + def test_read_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.read_test(f, zipfile.ZIP_LZMA) + + @requires_lzma + def test_readline_read_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.readline_read_test(f, zipfile.ZIP_LZMA) + + @requires_lzma + def test_readline_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.readline_test(f, zipfile.ZIP_LZMA) + + @requires_lzma + def test_readlines_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.readlines_test(f, zipfile.ZIP_LZMA) + + @requires_lzma + def test_iterlines_lzma(self): + for f in (TESTFN2, TemporaryFile(), io.BytesIO()): + self.iterlines_test(f, zipfile.ZIP_LZMA) + def tearDown(self): for sep, fn in self.arcfiles.items(): os.remove(fn) diff --git a/Lib/zipfile.py b/Lib/zipfile.py index c53b127..c127c2c 100644 --- a/Lib/zipfile.py +++ b/Lib/zipfile.py @@ -27,8 +27,13 @@ try: except ImportError: bz2 = None +try: + import lzma # We may need its compression method +except ImportError: + lzma = None + __all__ = ["BadZipFile", "BadZipfile", "error", - "ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", + "ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", "ZIP_LZMA", "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile"] class BadZipFile(Exception): @@ -52,13 +57,15 @@ ZIP_MAX_COMMENT = (1 << 16) - 1 ZIP_STORED = 0 ZIP_DEFLATED = 8 ZIP_BZIP2 = 12 +ZIP_LZMA = 14 # Other ZIP compression methods not supported DEFAULT_VERSION = 20 ZIP64_VERSION = 45 BZIP2_VERSION = 46 +LZMA_VERSION = 63 # we recognize (but not necessarily support) all features up to that version -MAX_EXTRACT_VERSION = 46 +MAX_EXTRACT_VERSION = 63 # Below are some formats and associated data for reading/writing headers using # the struct module. The names and structures of headers/records are those used @@ -367,6 +374,8 @@ class ZipInfo (object): if self.compress_type == ZIP_BZIP2: min_version = max(BZIP2_VERSION, min_version) + elif self.compress_type == ZIP_LZMA: + min_version = max(LZMA_VERSION, min_version) self.extract_version = max(min_version, self.extract_version) self.create_version = max(min_version, self.create_version) @@ -480,6 +489,77 @@ class _ZipDecrypter: return c +class LZMACompressor: + + def __init__(self): + self._comp = None + + def _init(self): + props = lzma.encode_filter_properties({'id': lzma.FILTER_LZMA1}) + self._comp = lzma.LZMACompressor(lzma.FORMAT_RAW, filters=[ + lzma.decode_filter_properties(lzma.FILTER_LZMA1, props) + ]) + return struct.pack('<BBH', 9, 4, len(props)) + props + + def compress(self, data): + if self._comp is None: + return self._init() + self._comp.compress(data) + return self._comp.compress(data) + + def flush(self): + if self._comp is None: + return self._init() + self._comp.flush() + return self._comp.flush() + + +class LZMADecompressor: + + def __init__(self): + self._decomp = None + self._unconsumed = b'' + self.eof = False + + def decompress(self, data): + if self._decomp is None: + self._unconsumed += data + if len(self._unconsumed) <= 4: + return b'' + psize, = struct.unpack('<H', self._unconsumed[2:4]) + if len(self._unconsumed) <= 4 + psize: + return b'' + + self._decomp = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[ + lzma.decode_filter_properties(lzma.FILTER_LZMA1, + self._unconsumed[4:4 + psize]) + ]) + data = self._unconsumed[4 + psize:] + del self._unconsumed + + result = self._decomp.decompress(data) + self.eof = self._decomp.eof + return result + + +compressor_names = { + 0: 'store', + 1: 'shrink', + 2: 'reduce', + 3: 'reduce', + 4: 'reduce', + 5: 'reduce', + 6: 'implode', + 7: 'tokenize', + 8: 'deflate', + 9: 'deflate64', + 10: 'implode', + 12: 'bzip2', + 14: 'lzma', + 18: 'terse', + 19: 'lz77', + 97: 'wavpack', + 98: 'ppmd', +} + def _check_compression(compression): if compression == ZIP_STORED: pass @@ -491,6 +571,10 @@ def _check_compression(compression): if not bz2: raise RuntimeError( "Compression requires the (missing) bz2 module") + elif compression == ZIP_LZMA: + if not lzma: + raise RuntimeError( + "Compression requires the (missing) lzma module") else: raise RuntimeError("That compression method is not supported") @@ -501,6 +585,8 @@ def _get_compressor(compress_type): zlib.DEFLATED, -15) elif compress_type == ZIP_BZIP2: return bz2.BZ2Compressor() + elif compress_type == ZIP_LZMA: + return LZMACompressor() else: return None @@ -512,19 +598,10 @@ def _get_decompressor(compress_type): return zlib.decompressobj(-15) elif compress_type == ZIP_BZIP2: return bz2.BZ2Decompressor() + elif compress_type == ZIP_LZMA: + return LZMADecompressor() else: - unknown_compressors = { - 1: 'shrink', - 2: 'reduce', - 3: 'reduce', - 4: 'reduce', - 5: 'reduce', - 6: 'implode', - 9: 'enhanced deflate', - 10: 'implode', - 14: 'lzma', - } - descr = unknown_compressors.get(compress_type) + descr = compressor_names.get(compress_type) if descr: raise NotImplementedError("compression type %d (%s)" % (compress_type, descr)) else: @@ -781,8 +858,8 @@ class ZipFile: file: Either the path to the file, or a file-like object. If it is a path, the file will be opened and closed by ZipFile. mode: The mode can be either read "r", write "w" or append "a". - compression: ZIP_STORED (no compression), ZIP_DEFLATED (requires zlib) or - ZIP_BZIP2 (requires bz2). + compression: ZIP_STORED (no compression), ZIP_DEFLATED (requires zlib), + ZIP_BZIP2 (requires bz2) or ZIP_LZMA (requires lzma). allowZip64: if True ZipFile will create files with ZIP64 extensions when needed, otherwise it will raise an exception when this would be necessary. @@ -1062,6 +1139,10 @@ class ZipFile: # Zip 2.7: compressed patched data raise NotImplementedError("compressed patched data (flag bit 5)") + if zinfo.flag_bits & 0x40: + # strong encryption + raise NotImplementedError("strong encryption (flag bit 6)") + if zinfo.flag_bits & 0x800: # UTF-8 filename fname_str = fname.decode("utf-8") @@ -1220,6 +1301,9 @@ class ZipFile: zinfo.file_size = st.st_size zinfo.flag_bits = 0x00 zinfo.header_offset = self.fp.tell() # Start of header bytes + if zinfo.compress_type == ZIP_LZMA: + # Compressed data includes an end-of-stream (EOS) marker + zinfo.flag_bits |= 0x02 self._writecheck(zinfo) self._didModify = True @@ -1292,6 +1376,9 @@ class ZipFile: zinfo.header_offset = self.fp.tell() # Start of header data if compress_type is not None: zinfo.compress_type = compress_type + if zinfo.compress_type == ZIP_LZMA: + # Compressed data includes an end-of-stream (EOS) marker + zinfo.flag_bits |= 0x02 self._writecheck(zinfo) self._didModify = True @@ -1360,6 +1447,8 @@ class ZipFile: if zinfo.compress_type == ZIP_BZIP2: min_version = max(BZIP2_VERSION, min_version) + elif zinfo.compress_type == ZIP_LZMA: + min_version = max(LZMA_VERSION, min_version) extract_version = max(min_version, zinfo.extract_version) create_version = max(min_version, zinfo.create_version) |