summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorLars Gustäbel <lars@gustaebel.de>2011-12-10 19:38:14 (GMT)
committerLars Gustäbel <lars@gustaebel.de>2011-12-10 19:38:14 (GMT)
commit0a9dd2f11db2a52fbc2cabaf0755aa33ad9372e5 (patch)
treee54c4c65ea1eb07113171b700f7c9ea11573f6bc /Lib
parentce2af335622a1371481fe7f410819d9df9f2ef5d (diff)
downloadcpython-0a9dd2f11db2a52fbc2cabaf0755aa33ad9372e5.zip
cpython-0a9dd2f11db2a52fbc2cabaf0755aa33ad9372e5.tar.gz
cpython-0a9dd2f11db2a52fbc2cabaf0755aa33ad9372e5.tar.bz2
Issue #5689: Add support for lzma compression to the tarfile module.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/tarfile.py66
-rw-r--r--Lib/test/test_tarfile.py78
2 files changed, 128 insertions, 16 deletions
diff --git a/Lib/tarfile.py b/Lib/tarfile.py
index 1789828..39dc1f1 100644
--- a/Lib/tarfile.py
+++ b/Lib/tarfile.py
@@ -420,10 +420,11 @@ class _Stream:
self.crc = zlib.crc32(b"")
if mode == "r":
self._init_read_gz()
+ self.exception = zlib.error
else:
self._init_write_gz()
- if comptype == "bz2":
+ elif comptype == "bz2":
try:
import bz2
except ImportError:
@@ -431,8 +432,25 @@ class _Stream:
if mode == "r":
self.dbuf = b""
self.cmp = bz2.BZ2Decompressor()
+ self.exception = IOError
else:
self.cmp = bz2.BZ2Compressor()
+
+ elif comptype == "xz":
+ try:
+ import lzma
+ except ImportError:
+ raise CompressionError("lzma module is not available")
+ if mode == "r":
+ self.dbuf = b""
+ self.cmp = lzma.LZMADecompressor()
+ self.exception = lzma.LZMAError
+ else:
+ self.cmp = lzma.LZMACompressor()
+
+ elif comptype != "tar":
+ raise CompressionError("unknown compression type %r" % comptype)
+
except:
if not self._extfileobj:
self.fileobj.close()
@@ -584,7 +602,7 @@ class _Stream:
break
try:
buf = self.cmp.decompress(buf)
- except IOError:
+ except self.exception:
raise ReadError("invalid compressed data")
self.dbuf += buf
c += len(buf)
@@ -622,11 +640,14 @@ class _StreamProxy(object):
return self.buf
def getcomptype(self):
- if self.buf.startswith(b"\037\213\010"):
+ if self.buf.startswith(b"\x1f\x8b\x08"):
return "gz"
- if self.buf[0:3] == b"BZh" and self.buf[4:10] == b"1AY&SY":
+ elif self.buf[0:3] == b"BZh" and self.buf[4:10] == b"1AY&SY":
return "bz2"
- return "tar"
+ elif self.buf.startswith((b"\x5d\x00\x00\x80", b"\xfd7zXZ")):
+ return "xz"
+ else:
+ return "tar"
def close(self):
self.fileobj.close()
@@ -1651,18 +1672,22 @@ class TarFile(object):
'r:' open for reading exclusively uncompressed
'r:gz' open for reading with gzip compression
'r:bz2' open for reading with bzip2 compression
+ 'r:xz' open for reading with lzma compression
'a' or 'a:' open for appending, creating the file if necessary
'w' or 'w:' open for writing without compression
'w:gz' open for writing with gzip compression
'w:bz2' open for writing with bzip2 compression
+ 'w:xz' open for writing with lzma compression
'r|*' open a stream of tar blocks with transparent compression
'r|' open an uncompressed stream of tar blocks for reading
'r|gz' open a gzip compressed stream of tar blocks
'r|bz2' open a bzip2 compressed stream of tar blocks
+ 'r|xz' open an lzma compressed stream of tar blocks
'w|' open an uncompressed stream for writing
'w|gz' open a gzip compressed stream for writing
'w|bz2' open a bzip2 compressed stream for writing
+ 'w|xz' open an lzma compressed stream for writing
"""
if not name and not fileobj:
@@ -1780,11 +1805,40 @@ class TarFile(object):
t._extfileobj = False
return t
+ @classmethod
+ def xzopen(cls, name, mode="r", fileobj=None, preset=9, **kwargs):
+ """Open lzma compressed tar archive name for reading or writing.
+ Appending is not allowed.
+ """
+ if mode not in ("r", "w"):
+ raise ValueError("mode must be 'r' or 'w'")
+
+ try:
+ import lzma
+ except ImportError:
+ raise CompressionError("lzma module is not available")
+
+ if mode == "r":
+ # LZMAFile complains about a preset argument in read mode.
+ preset = None
+
+ fileobj = lzma.LZMAFile(filename=name if fileobj is None else None,
+ mode=mode, fileobj=fileobj, preset=preset)
+
+ try:
+ t = cls.taropen(name, mode, fileobj, **kwargs)
+ except (lzma.LZMAError, EOFError):
+ fileobj.close()
+ raise ReadError("not an lzma file")
+ t._extfileobj = False
+ return t
+
# All *open() methods are registered here.
OPEN_METH = {
"tar": "taropen", # uncompressed tar
"gz": "gzopen", # gzip compressed tar
- "bz2": "bz2open" # bzip2 compressed tar
+ "bz2": "bz2open", # bzip2 compressed tar
+ "xz": "xzopen" # lzma compressed tar
}
#--------------------------------------------------------------------------
diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py
index a904e32..ce543df 100644
--- a/Lib/test/test_tarfile.py
+++ b/Lib/test/test_tarfile.py
@@ -21,6 +21,10 @@ try:
import bz2
except ImportError:
bz2 = None
+try:
+ import lzma
+except ImportError:
+ lzma = None
def md5sum(data):
return md5(data).hexdigest()
@@ -29,6 +33,7 @@ TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
tarname = support.findfile("testtar.tar")
gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
+xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
tmpname = os.path.join(TEMPDIR, "tmp.tar")
md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
@@ -201,13 +206,15 @@ class CommonReadTest(ReadTest):
_open = gzip.GzipFile
elif self.mode.endswith(":bz2"):
_open = bz2.BZ2File
+ elif self.mode.endswith(":xz"):
+ _open = lzma.LZMAFile
else:
- _open = open
+ _open = io.FileIO
for char in (b'\0', b'a'):
# Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
# are ignored correctly.
- with _open(tmpname, "wb") as fobj:
+ with _open(tmpname, "w") as fobj:
fobj.write(char * 1024)
fobj.write(tarfile.TarInfo("foo").tobuf())
@@ -222,9 +229,10 @@ class CommonReadTest(ReadTest):
class MiscReadTest(CommonReadTest):
def test_no_name_argument(self):
- if self.mode.endswith("bz2"):
- # BZ2File has no name attribute.
- return
+ if self.mode.endswith(("bz2", "xz")):
+ # BZ2File and LZMAFile have no name attribute.
+ self.skipTest("no name attribute")
+
with open(self.tarname, "rb") as fobj:
tar = tarfile.open(fileobj=fobj, mode=self.mode)
self.assertEqual(tar.name, os.path.abspath(fobj.name))
@@ -265,10 +273,12 @@ class MiscReadTest(CommonReadTest):
_open = gzip.GzipFile
elif self.mode.endswith(":bz2"):
_open = bz2.BZ2File
+ elif self.mode.endswith(":xz"):
+ _open = lzma.LZMAFile
else:
- _open = open
- fobj = _open(self.tarname, "rb")
- try:
+ _open = io.FileIO
+
+ with _open(self.tarname) as fobj:
fobj.seek(offset)
# Test if the tarfile starts with the second member.
@@ -281,8 +291,6 @@ class MiscReadTest(CommonReadTest):
self.assertEqual(tar.extractfile(t).read(), data,
"seek back did not work")
tar.close()
- finally:
- fobj.close()
def test_fail_comp(self):
# For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
@@ -526,6 +534,18 @@ class DetectReadTest(unittest.TestCase):
testfunc(bz2name, "r|*")
testfunc(bz2name, "r|bz2")
+ if lzma:
+ self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:xz")
+ self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|xz")
+ self.assertRaises(tarfile.ReadError, tarfile.open, xzname, mode="r:")
+ self.assertRaises(tarfile.ReadError, tarfile.open, xzname, mode="r|")
+
+ testfunc(xzname, "r")
+ testfunc(xzname, "r:*")
+ testfunc(xzname, "r:xz")
+ testfunc(xzname, "r|*")
+ testfunc(xzname, "r|xz")
+
def test_detect_file(self):
self._test_modes(self._testfunc_file)
@@ -1096,6 +1116,9 @@ class StreamWriteTest(WriteTestBase):
data = dec.decompress(data)
self.assertTrue(len(dec.unused_data) == 0,
"found trailing data")
+ elif self.mode.endswith("xz"):
+ with lzma.LZMAFile(tmpname) as fobj:
+ data = fobj.read()
else:
with open(tmpname, "rb") as fobj:
data = fobj.read()
@@ -1510,6 +1533,12 @@ class AppendTest(unittest.TestCase):
self._create_testtar("w:bz2")
self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
+ def test_append_lzma(self):
+ if lzma is None:
+ self.skipTest("lzma module not available")
+ self._create_testtar("w:xz")
+ self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
+
# Append mode is supposed to fail if the tarfile to append to
# does not end with a zero block.
def _test_error(self, data):
@@ -1788,6 +1817,21 @@ class Bz2PartialReadTest(unittest.TestCase):
self._test_partial_input("r:bz2")
+class LzmaMiscReadTest(MiscReadTest):
+ tarname = xzname
+ mode = "r:xz"
+class LzmaUstarReadTest(UstarReadTest):
+ tarname = xzname
+ mode = "r:xz"
+class LzmaStreamReadTest(StreamReadTest):
+ tarname = xzname
+ mode = "r|xz"
+class LzmaWriteTest(WriteTest):
+ mode = "w:xz"
+class LzmaStreamWriteTest(StreamWriteTest):
+ mode = "w|xz"
+
+
def test_main():
support.unlink(TEMPDIR)
os.makedirs(TEMPDIR)
@@ -1850,6 +1894,20 @@ def test_main():
Bz2PartialReadTest,
]
+ if lzma:
+ # Create testtar.tar.xz and add lzma-specific tests.
+ support.unlink(xzname)
+ with lzma.LZMAFile(xzname, "w") as tar:
+ tar.write(data)
+
+ tests += [
+ LzmaMiscReadTest,
+ LzmaUstarReadTest,
+ LzmaStreamReadTest,
+ LzmaWriteTest,
+ LzmaStreamWriteTest,
+ ]
+
try:
support.run_unittest(*tests)
finally: