diff options
author | Nadeem Vawda <nadeem.vawda@gmail.com> | 2011-05-26 23:52:15 (GMT) |
---|---|---|
committer | Nadeem Vawda <nadeem.vawda@gmail.com> | 2011-05-26 23:52:15 (GMT) |
commit | 55b4338874ede31619a097c0b4a271b90e980472 (patch) | |
tree | 43120769e11327e0b9c4bb97d9711ba47167f731 /Lib/test/test_bz2.py | |
parent | c556e10b94541d7bf20e908f8eca78e7f63fc28c (diff) | |
download | cpython-55b4338874ede31619a097c0b4a271b90e980472.zip cpython-55b4338874ede31619a097c0b4a271b90e980472.tar.gz cpython-55b4338874ede31619a097c0b4a271b90e980472.tar.bz2 |
Issue #1625: BZ2File and bz2.decompress() now support multi-stream files.
Initial patch by Nir Aides.
Diffstat (limited to 'Lib/test/test_bz2.py')
-rw-r--r-- | Lib/test/test_bz2.py | 126 |
1 files changed, 124 insertions, 2 deletions
diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py index 3567b36..4d66840 100644 --- a/Lib/test/test_bz2.py +++ b/Lib/test/test_bz2.py @@ -84,9 +84,9 @@ class BZ2FileTest(BaseTest): else: return self.DATA - def createTempFile(self, crlf=False): + def createTempFile(self, crlf=False, streams=1): with open(self.filename, "wb") as f: - f.write(self.getData(crlf)) + f.write(self.getData(crlf) * streams) def testRead(self): # "Test BZ2File.read()" @@ -95,6 +95,26 @@ class BZ2FileTest(BaseTest): self.assertRaises(TypeError, bz2f.read, None) self.assertEqual(bz2f.read(), self.TEXT) + def testReadMultiStream(self): + # "Test BZ2File.read() with a multi stream archive" + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.read, None) + self.assertEqual(bz2f.read(), self.TEXT * 5) + + def testReadMonkeyMultiStream(self): + # "Test BZ2File.read() with a multi stream archive in which stream" + # "end is alined with internal buffer size" + buffer_size = bz2._BUFFER_SIZE + bz2._BUFFER_SIZE = len(self.DATA) + try: + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.read, None) + self.assertEqual(bz2f.read(), self.TEXT * 5) + finally: + bz2._BUFFER_SIZE = buffer_size + def testRead0(self): # "Test BBZ2File.read(0)" self.createTempFile() @@ -114,6 +134,18 @@ class BZ2FileTest(BaseTest): text += str self.assertEqual(text, self.TEXT) + def testReadChunk10MultiStream(self): + # "Test BZ2File.read() in chunks of 10 bytes with a multi stream archive" + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + text = b'' + while 1: + str = bz2f.read(10) + if not str: + break + text += str + self.assertEqual(text, self.TEXT * 5) + def testRead100(self): # "Test BZ2File.read(100)" self.createTempFile() @@ -151,6 +183,15 @@ class BZ2FileTest(BaseTest): for line in sio.readlines(): self.assertEqual(bz2f.readline(), line) + def testReadLineMultiStream(self): + # "Test BZ2File.readline() with a multi stream archive" + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.readline, None) + sio = BytesIO(self.TEXT * 5) + for line in sio.readlines(): + self.assertEqual(bz2f.readline(), line) + def testReadLines(self): # "Test BZ2File.readlines()" self.createTempFile() @@ -159,6 +200,14 @@ class BZ2FileTest(BaseTest): sio = BytesIO(self.TEXT) self.assertEqual(bz2f.readlines(), sio.readlines()) + def testReadLinesMultiStream(self): + # "Test BZ2File.readlines() with a multi stream archive" + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.readlines, None) + sio = BytesIO(self.TEXT * 5) + self.assertEqual(bz2f.readlines(), sio.readlines()) + def testIterator(self): # "Test iter(BZ2File)" self.createTempFile() @@ -166,6 +215,13 @@ class BZ2FileTest(BaseTest): sio = BytesIO(self.TEXT) self.assertEqual(list(iter(bz2f)), sio.readlines()) + def testIteratorMultiStream(self): + # "Test iter(BZ2File) with a multi stream archive" + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + sio = BytesIO(self.TEXT * 5) + self.assertEqual(list(iter(bz2f)), sio.readlines()) + def testClosedIteratorDeadlock(self): # "Test that iteration on a closed bz2file releases the lock." # http://bugs.python.org/issue3309 @@ -217,6 +273,17 @@ class BZ2FileTest(BaseTest): self.assertRaises(IOError, bz2f.write, b"a") self.assertRaises(IOError, bz2f.writelines, [b"a"]) + def testAppend(self): + # "Test BZ2File.write()" + with BZ2File(self.filename, "w") as bz2f: + self.assertRaises(TypeError, bz2f.write) + bz2f.write(self.TEXT) + with BZ2File(self.filename, "a") as bz2f: + self.assertRaises(TypeError, bz2f.write) + bz2f.write(self.TEXT) + with open(self.filename, 'rb') as f: + self.assertEqual(self.decompress(f.read()), self.TEXT * 2) + def testSeekForward(self): # "Test BZ2File.seek(150, 0)" self.createTempFile() @@ -225,6 +292,14 @@ class BZ2FileTest(BaseTest): bz2f.seek(150) self.assertEqual(bz2f.read(), self.TEXT[150:]) + def testSeekForwardMultiStream(self): + # "Test BZ2File.seek(150, 0) across stream boundaries" + self.createTempFile(streams=2) + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.seek) + bz2f.seek(len(self.TEXT) + 150) + self.assertEqual(bz2f.read(), self.TEXT[150:]) + def testSeekBackwards(self): # "Test BZ2File.seek(-150, 1)" self.createTempFile() @@ -233,6 +308,16 @@ class BZ2FileTest(BaseTest): bz2f.seek(-150, 1) self.assertEqual(bz2f.read(), self.TEXT[500-150:]) + def testSeekBackwardsMultiStream(self): + # "Test BZ2File.seek(-150, 1) across stream boundaries" + self.createTempFile(streams=2) + with BZ2File(self.filename) as bz2f: + readto = len(self.TEXT) + 100 + while readto > 0: + readto -= len(bz2f.read(readto)) + bz2f.seek(-150, 1) + self.assertEqual(bz2f.read(), self.TEXT[100-150:] + self.TEXT) + def testSeekBackwardsFromEnd(self): # "Test BZ2File.seek(-150, 2)" self.createTempFile() @@ -240,6 +325,13 @@ class BZ2FileTest(BaseTest): bz2f.seek(-150, 2) self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:]) + def testSeekBackwardsFromEndMultiStream(self): + # "Test BZ2File.seek(-1000, 2) across stream boundaries" + self.createTempFile(streams=2) + with BZ2File(self.filename) as bz2f: + bz2f.seek(-1000, 2) + self.assertEqual(bz2f.read(), (self.TEXT * 2)[-1000:]) + def testSeekPostEnd(self): # "Test BZ2File.seek(150000)" self.createTempFile() @@ -248,6 +340,14 @@ class BZ2FileTest(BaseTest): self.assertEqual(bz2f.tell(), len(self.TEXT)) self.assertEqual(bz2f.read(), b"") + def testSeekPostEndMultiStream(self): + # "Test BZ2File.seek(150000)" + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + bz2f.seek(150000) + self.assertEqual(bz2f.tell(), len(self.TEXT) * 5) + self.assertEqual(bz2f.read(), b"") + def testSeekPostEndTwice(self): # "Test BZ2File.seek(150000) twice" self.createTempFile() @@ -257,6 +357,15 @@ class BZ2FileTest(BaseTest): self.assertEqual(bz2f.tell(), len(self.TEXT)) self.assertEqual(bz2f.read(), b"") + def testSeekPostEndTwiceMultiStream(self): + # "Test BZ2File.seek(150000) twice with a multi stream archive" + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + bz2f.seek(150000) + bz2f.seek(150000) + self.assertEqual(bz2f.tell(), len(self.TEXT) * 5) + self.assertEqual(bz2f.read(), b"") + def testSeekPreStart(self): # "Test BZ2File.seek(-150, 0)" self.createTempFile() @@ -265,6 +374,14 @@ class BZ2FileTest(BaseTest): self.assertEqual(bz2f.tell(), 0) self.assertEqual(bz2f.read(), self.TEXT) + def testSeekPreStartMultiStream(self): + # "Test BZ2File.seek(-150, 0) with a multi stream archive" + self.createTempFile(streams=2) + with BZ2File(self.filename) as bz2f: + bz2f.seek(-150) + self.assertEqual(bz2f.tell(), 0) + self.assertEqual(bz2f.read(), self.TEXT * 2) + def testFileno(self): # "Test BZ2File.fileno()" self.createTempFile() @@ -510,6 +627,11 @@ class FuncTest(BaseTest): # "Test decompress() function with incomplete data" self.assertRaises(ValueError, bz2.decompress, self.DATA[:-10]) + def testDecompressMultiStream(self): + # "Test decompress() function for data with multiple streams" + text = bz2.decompress(self.DATA * 5) + self.assertEqual(text, self.TEXT * 5) + def test_main(): support.run_unittest( BZ2FileTest, |