summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/test/test_zipfile.py79
-rw-r--r--Lib/zipfile.py25
-rw-r--r--Misc/NEWS4
3 files changed, 106 insertions, 2 deletions
diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py
index e9a90e5..82b4061 100644
--- a/Lib/test/test_zipfile.py
+++ b/Lib/test/test_zipfile.py
@@ -662,6 +662,27 @@ class PyZipFileTests(unittest.TestCase):
class OtherTests(unittest.TestCase):
+ zips_with_bad_crc = {
+ zipfile.ZIP_STORED: (
+ b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
+ b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
+ b'ilehello,AworldP'
+ b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
+ b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
+ b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
+ b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
+ b'\0\0/\0\0\0\0\0'),
+ zipfile.ZIP_DEFLATED: (
+ b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
+ b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
+ b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
+ b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
+ b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
+ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
+ b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
+ b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'),
+ }
+
def test_unicode_filenames(self):
with zipfile.ZipFile(TESTFN, "w") as zf:
zf.writestr("foo.txt", "Test for unicode filename")
@@ -875,6 +896,49 @@ class OtherTests(unittest.TestCase):
with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
self.assertEqual(zipfr.comment, comment2)
+ def check_testzip_with_bad_crc(self, compression):
+ """Tests that files with bad CRCs return their name from testzip."""
+ zipdata = self.zips_with_bad_crc[compression]
+
+ with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
+ # testzip returns the name of the first corrupt file, or None
+ self.assertEqual('afile', zipf.testzip())
+
+ def test_testzip_with_bad_crc_stored(self):
+ self.check_testzip_with_bad_crc(zipfile.ZIP_STORED)
+
+ @skipUnless(zlib, "requires zlib")
+ def test_testzip_with_bad_crc_deflated(self):
+ self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED)
+
+ def check_read_with_bad_crc(self, compression):
+ """Tests that files with bad CRCs raise a BadZipfile exception when read."""
+ zipdata = self.zips_with_bad_crc[compression]
+
+ # Using ZipFile.read()
+ with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
+ self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')
+
+ # Using ZipExtFile.read()
+ with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
+ with zipf.open('afile', 'r') as corrupt_file:
+ self.assertRaises(zipfile.BadZipfile, corrupt_file.read)
+
+ # Same with small reads (in order to exercise the buffering logic)
+ with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
+ with zipf.open('afile', 'r') as corrupt_file:
+ corrupt_file.MIN_READ_SIZE = 2
+ with self.assertRaises(zipfile.BadZipfile):
+ while corrupt_file.read(2):
+ pass
+
+ def test_read_with_bad_crc_stored(self):
+ self.check_read_with_bad_crc(zipfile.ZIP_STORED)
+
+ @skipUnless(zlib, "requires zlib")
+ def test_read_with_bad_crc_deflated(self):
+ self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED)
+
def tearDown(self):
unlink(TESTFN)
unlink(TESTFN2)
@@ -974,6 +1038,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_STORED)
+ @skipUnless(zlib, "requires zlib")
+ def test_deflated(self):
+ for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
+ self.zip_test(f, zipfile.ZIP_DEFLATED)
+
def zip_open_test(self, f, compression):
self.make_test_archive(f, compression)
@@ -1007,6 +1076,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_open_test(f, zipfile.ZIP_STORED)
+ @skipUnless(zlib, "requires zlib")
+ def test_open_deflated(self):
+ for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
+ self.zip_open_test(f, zipfile.ZIP_DEFLATED)
+
def zip_random_open_test(self, f, compression):
self.make_test_archive(f, compression)
@@ -1028,6 +1102,11 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_random_open_test(f, zipfile.ZIP_STORED)
+ @skipUnless(zlib, "requires zlib")
+ def test_random_open_deflated(self):
+ for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
+ self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
+
@skipUnless(zlib, "requires zlib")
class TestsWithMultipleOpens(unittest.TestCase):
diff --git a/Lib/zipfile.py b/Lib/zipfile.py
index f81cc8b..c47c3cc 100644
--- a/Lib/zipfile.py
+++ b/Lib/zipfile.py
@@ -492,6 +492,12 @@ class ZipExtFile(io.BufferedIOBase):
self.mode = mode
self.name = zipinfo.filename
+ if hasattr(zipinfo, 'CRC'):
+ self._expected_crc = zipinfo.CRC
+ self._running_crc = crc32(b'') & 0xffffffff
+ else:
+ self._expected_crc = None
+
def readline(self, limit=-1):
"""Read and return a line from the stream.
@@ -569,6 +575,16 @@ class ZipExtFile(io.BufferedIOBase):
return buf
+ def _update_crc(self, newdata, eof):
+ # Update the CRC using the given data.
+ if self._expected_crc is None:
+ # No need to compute the CRC if we don't have a reference value
+ return
+ self._running_crc = crc32(newdata, self._running_crc) & 0xffffffff
+ # Check the CRC if we're at the end of the file
+ if eof and self._running_crc != self._expected_crc:
+ raise BadZipfile("Bad CRC-32 for file %r" % self.name)
+
def read1(self, n):
"""Read up to n bytes with at most one read() system call."""
@@ -592,6 +608,7 @@ class ZipExtFile(io.BufferedIOBase):
data = bytes(map(self._decrypter, data))
if self._compress_type == ZIP_STORED:
+ self._update_crc(data, eof=(self._compress_left==0))
self._readbuffer = self._readbuffer[self._offset:] + data
self._offset = 0
else:
@@ -607,9 +624,11 @@ class ZipExtFile(io.BufferedIOBase):
)
self._unconsumed = self._decompressor.unconsumed_tail
- if len(self._unconsumed) == 0 and self._compress_left == 0:
+ eof = len(self._unconsumed) == 0 and self._compress_left == 0
+ if eof:
data += self._decompressor.flush()
+ self._update_crc(data, eof=eof)
self._readbuffer = self._readbuffer[self._offset:] + data
self._offset = 0
@@ -1380,7 +1399,9 @@ def main(args = None):
print(USAGE)
sys.exit(1)
zf = ZipFile(args[1], 'r')
- zf.testzip()
+ badfile = zf.testzip()
+ if badfile:
+ print("The following enclosed file is corrupted: {!r}".format(badfile))
print("Done testing")
elif args[0] == '-e':
diff --git a/Misc/NEWS b/Misc/NEWS
index 418f8dd..9b5c35e 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -72,6 +72,10 @@ Extensions
Library
-------
+- Issue #7467: when a file from a ZIP archive, its CRC is checked and a
+ BadZipfile error is raised if it doesn't match (as used to be the
+ case in Python 2.5 and earlier).
+
- Issue #9550: a BufferedReader could issue an additional read when the
original read request had been satisfied, which could block indefinitely
when the underlying raw IO channel was e.g. a socket. Report and original