"""Functions that read and write gzipped files. The user of the file doesn't have to worry about the compression, but random access is not allowed.""" # based on Andrew Kuchling's minigzip.py distributed with the zlib module import struct, sys, time, os import zlib import builtins import io __all__ = ["GzipFile", "open", "compress", "decompress"] FTEXT, FHCRC, FEXTRA, FNAME, FCOMMENT = 1, 2, 4, 8, 16 READ, WRITE = 1, 2 def U32(i): """Return i as an unsigned integer, assuming it fits in 32 bits. If it's >= 2GB when viewed as a 32-bit unsigned int, return a long. """ if i < 0: i += 1 << 32 return i def LOWU32(i): """Return the low-order 32 bits, as a non-negative int""" return i & 0xFFFFFFFF def write32u(output, value): # The L format writes the bit pattern correctly whether signed # or unsigned. output.write(struct.pack("' def _check_closed(self): """Raises a ValueError if the underlying file object has been closed. """ if self.closed: raise ValueError('I/O operation on closed file.') def _init_write(self, filename): self.name = filename self.crc = zlib.crc32(b"") & 0xffffffff self.size = 0 self.writebuf = [] self.bufsize = 0 def _write_gzip_header(self): self.fileobj.write(b'\037\213') # magic header self.fileobj.write(b'\010') # compression method try: # RFC 1952 requires the FNAME field to be Latin-1. Do not # include filenames that cannot be represented that way. fname = os.path.basename(self.name) if not isinstance(fname, bytes): fname = fname.encode('latin-1') if fname.endswith(b'.gz'): fname = fname[:-3] except UnicodeEncodeError: fname = b'' flags = 0 if fname: flags = FNAME self.fileobj.write(chr(flags).encode('latin-1')) mtime = self.mtime if mtime is None: mtime = time.time() write32u(self.fileobj, int(mtime)) self.fileobj.write(b'\002') self.fileobj.write(b'\377') if fname: self.fileobj.write(fname + b'\000') def _init_read(self): self.crc = zlib.crc32(b"") & 0xffffffff self.size = 0 def _read_exact(self, n): data = self.fileobj.read(n) while len(data) < n: b = self.fileobj.read(n - len(data)) if not b: raise EOFError("Compressed file ended before the " "end-of-stream marker was reached") data += b return data def _read_gzip_header(self): magic = self.fileobj.read(2) if magic == b'': return False if magic != b'\037\213': raise IOError('Not a gzipped file') method, flag, self.mtime = struct.unpack(" 0: self.size = self.size + len(data) self.crc = zlib.crc32(data, self.crc) & 0xffffffff self.fileobj.write( self.compress.compress(data) ) self.offset += len(data) return len(data) def read(self, size=-1): self._check_closed() if self.mode != READ: import errno raise IOError(errno.EBADF, "read() on write-only GzipFile object") if self.extrasize <= 0 and self.fileobj is None: return b'' readsize = 1024 if size < 0: # get the whole thing while self._read(readsize): readsize = min(self.max_read_chunk, readsize * 2) size = self.extrasize else: # just get some more of it while size > self.extrasize: if not self._read(readsize): if size > self.extrasize: size = self.extrasize break readsize = min(self.max_read_chunk, readsize * 2) offset = self.offset - self.extrastart chunk = self.extrabuf[offset: offset + size] self.extrasize = self.extrasize - size self.offset += size return chunk def peek(self, n): if self.mode != READ: import errno raise IOError(errno.EBADF, "peek() on write-only GzipFile object") # Do not return ridiculously small buffers, for one common idiom # is to call peek(1) and expect more bytes in return. if n < 100: n = 100 if self.extrasize == 0: if self.fileobj is None: return b'' # Ensure that we don't return b"" if we haven't reached EOF. # 1024 is the same buffering heuristic used in read() while self.extrasize == 0 and self._read(max(n, 1024)): pass offset = self.offset - self.extrastart remaining = self.extrasize assert remaining == len(self.extrabuf) - offset return self.extrabuf[offset:offset + n] def _unread(self, buf): self.extrasize = len(buf) + self.extrasize self.offset -= len(buf) def _read(self, size=1024): if self.fileobj is None: return False if self._new_member: # If the _new_member flag is set, we have to # jump to the next member, if there is one. self._init_read() if not self._read_gzip_header(): return False self.decompress = zlib.decompressobj(-zlib.MAX_WBITS) self._new_member = False # Read a chunk of data from the file buf = self.fileobj.read(size) # If the EOF has been reached, flush the decompression object # and mark this object as finished. if buf == b"": uncompress = self.decompress.flush() # Prepend the already read bytes to the fileobj to they can be # seen by _read_eof() self.fileobj.prepend(self.decompress.unused_data, True) self._read_eof() self._add_read_data( uncompress ) return False uncompress = self.decompress.decompress(buf) self._add_read_data( uncompress ) if self.decompress.unused_data != b"": # Ending case: we've come to the end of a member in the file, # so seek back to the start of the unused data, finish up # this member, and read a new gzip header. # Prepend the already read bytes to the fileobj to they can be # seen by _read_eof() and _read_gzip_header() self.fileobj.prepend(self.decompress.unused_data, True) # Check the CRC and file size, and set the flag so we read # a new member on the next call self._read_eof() self._new_member = True return True def _add_read_data(self, data): self.crc = zlib.crc32(data, self.crc) & 0xffffffff offset = self.offset - self.extrastart self.extrabuf = self.extrabuf[offset:] + data self.extrasize = self.extrasize + len(data) self.extrastart = self.offset self.size = self.size + len(data) def _read_eof(self): # We've read to the end of the file # We check the that the computed CRC and size of the # uncompressed data matches the stored values. Note that the size # stored is the true file size mod 2**32. crc32, isize = struct.unpack(" 0: self.extrasize -= i - offset self.offset += i - offset return self.extrabuf[offset: i] size = sys.maxsize readsize = self.min_readsize else: readsize = size bufs = [] while size != 0: c = self.read(readsize) i = c.find(b'\n') # We set i=size to break out of the loop under two # conditions: 1) there's no newline, and the chunk is # larger than size, or 2) there is a newline, but the # resulting line would be longer than 'size'. if (size <= i) or (i == -1 and len(c) > size): i = size - 1 if i >= 0 or c == b'': bufs.append(c[:i + 1]) # Add portion of last chunk self._unread(c[i + 1:]) # Push back rest of chunk break # Append chunk to list, decrease 'size', bufs.append(c) size = size - len(c) readsize = min(size, readsize * 2) if readsize > self.min_readsize: self.min_readsize = min(readsize, self.min_readsize * 2, 512) return b''.join(bufs) # Return resulting line def compress(data, compresslevel=9): """Compress data in one shot and return the compressed string. Optional argument is the compression level, in range of 0-9. """ buf = io.BytesIO() with GzipFile(fileobj=buf, mode='wb', compresslevel=compresslevel) as f: f.write(data) return buf.getvalue() def decompress(data): """Decompress a gzip compressed string in one shot. Return the decompressed string. """ with GzipFile(fileobj=io.BytesIO(data)) as f: return f.read() def _test(): # Act like gzip; with -d, act like gunzip. # The input file is not deleted, however, nor are any other gzip # options or features supported. args = sys.argv[1:] decompress = args and args[0] == "-d" if decompress: args = args[1:] if not args: args = ["-"] for arg in args: if decompress: if arg == "-": f = GzipFile(filename="", mode="rb", fileobj=sys.stdin.buffer) g = sys.stdout.buffer else: if arg[-3:] != ".gz": print("filename doesn't end in .gz:", repr(arg)) continue f = open(arg, "rb") g = builtins.open(arg[:-3], "wb") else: if arg == "-": f = sys.stdin.buffer g = GzipFile(filename="", mode="wb", fileobj=sys.stdout.buffer) else: f = builtins.open(arg, "rb") g = open(arg + ".gz", "wb") while True: chunk = f.read(1024) if not chunk: break g.write(chunk) if g is not sys.stdout.buffer: g.close() if f is not sys.stdin.buffer: f.close() if __name__ == '__main__': _test()