diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2015-04-10 22:31:01 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2015-04-10 22:31:01 (GMT) |
commit | 2dbc6e6bce0a29757acddd8000d55f7c844295a2 (patch) | |
tree | f1510e3a93b2527308dd6400a8b0544607e072db /Lib/test/test_bz2.py | |
parent | 2ce11d296cee8d71d2bf2451c7dba4ffa119d9d3 (diff) | |
download | cpython-2dbc6e6bce0a29757acddd8000d55f7c844295a2.zip cpython-2dbc6e6bce0a29757acddd8000d55f7c844295a2.tar.gz cpython-2dbc6e6bce0a29757acddd8000d55f7c844295a2.tar.bz2 |
Issue #23529: Limit the size of decompressed data when reading from
GzipFile, BZ2File or LZMAFile. This defeats denial of service attacks
using compressed bombs (i.e. compressed payloads which decompress to a huge
size).
Patch by Martin Panter and Nikolaus Rath.
Diffstat (limited to 'Lib/test/test_bz2.py')
-rw-r--r-- | Lib/test/test_bz2.py | 30 |
1 files changed, 21 insertions, 9 deletions
diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py index bf9887b..a1e4b8d 100644 --- a/Lib/test/test_bz2.py +++ b/Lib/test/test_bz2.py @@ -2,7 +2,7 @@ from test import support from test.support import bigmemtest, _4G import unittest -from io import BytesIO +from io import BytesIO, DEFAULT_BUFFER_SIZE import os import pickle import glob @@ -10,6 +10,7 @@ import random import subprocess import sys from test.support import unlink +import _compression try: import threading @@ -110,7 +111,7 @@ class BZ2FileTest(BaseTest): def testRead(self): self.createTempFile() with BZ2File(self.filename) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(), self.TEXT) def testReadBadFile(self): @@ -121,21 +122,21 @@ class BZ2FileTest(BaseTest): def testReadMultiStream(self): self.createTempFile(streams=5) with BZ2File(self.filename) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(), self.TEXT * 5) def testReadMonkeyMultiStream(self): # Test BZ2File.read() on a multi-stream archive where a stream # boundary coincides with the end of the raw read buffer. - buffer_size = bz2._BUFFER_SIZE - bz2._BUFFER_SIZE = len(self.DATA) + buffer_size = _compression.BUFFER_SIZE + _compression.BUFFER_SIZE = len(self.DATA) try: self.createTempFile(streams=5) with BZ2File(self.filename) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(), self.TEXT * 5) finally: - bz2._BUFFER_SIZE = buffer_size + _compression.BUFFER_SIZE = buffer_size def testReadTrailingJunk(self): self.createTempFile(suffix=self.BAD_DATA) @@ -150,7 +151,7 @@ class BZ2FileTest(BaseTest): def testRead0(self): self.createTempFile() with BZ2File(self.filename) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(0), b"") def testReadChunk10(self): @@ -559,13 +560,24 @@ class BZ2FileTest(BaseTest): with BZ2File(str_filename, "rb") as f: self.assertEqual(f.read(), self.DATA) + def testDecompressLimited(self): + """Decompressed data buffering should be limited""" + bomb = bz2.compress(bytes(int(2e6)), compresslevel=9) + self.assertLess(len(bomb), _compression.BUFFER_SIZE) + + decomp = BZ2File(BytesIO(bomb)) + self.assertEqual(bytes(1), decomp.read(1)) + max_decomp = 1 + DEFAULT_BUFFER_SIZE + self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, + "Excessive amount of data was decompressed") + # Tests for a BZ2File wrapping another file object: def testReadBytesIO(self): with BytesIO(self.DATA) as bio: with BZ2File(bio) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(), self.TEXT) self.assertFalse(bio.closed) |