From 2095cfea89279489e10a7f9772a025b49502143a Mon Sep 17 00:00:00 2001 From: Georg Brandl Date: Sat, 7 Jun 2008 19:01:03 +0000 Subject: Remove cl usage from aifc and use bytes throughout. This module needs a test suite! --- Lib/aifc.py | 224 +++++++++++++++++++++--------------------------------------- 1 file changed, 77 insertions(+), 147 deletions(-) diff --git a/Lib/aifc.py b/Lib/aifc.py index 9b41c35..26b0ff5 100644 --- a/Lib/aifc.py +++ b/Lib/aifc.py @@ -137,15 +137,15 @@ writeframesraw. import struct import builtins -__all__ = ["Error","open","openfp"] +__all__ = ["Error", "open", "openfp"] class Error(Exception): pass _AIFC_version = 0xA2805140 # Version 1 of AIFF-C -_skiplist = 'COMT', 'INST', 'MIDI', 'AESD', \ - 'APPL', 'NAME', 'AUTH', '(c) ', 'ANNO' +_skiplist = b'COMT', b'INST', b'MIDI', b'AESD', \ + b'APPL', b'NAME', b'AUTH', b'(c) ', b'ANNO' def _read_long(file): try: @@ -168,7 +168,7 @@ def _read_short(file): def _read_string(file): length = ord(file.read(1)) if length == 0: - data = '' + data = b'' else: data = file.read(length) if length & 1 == 0: @@ -203,10 +203,10 @@ def _write_long(f, x): def _write_string(f, s): if len(s) > 255: raise ValueError("string exceeds maximum pstring length") - f.write(chr(len(s))) + f.write(struct.pack('b', len(s))) f.write(s) if len(s) & 1 == 0: - f.write(chr(0)) + f.write(b'\x00') def _write_float(f, x): import math @@ -281,17 +281,16 @@ class Aifc_read: def initfp(self, file): self._version = 0 - self._decomp = None self._convert = None self._markers = [] self._soundpos = 0 self._file = Chunk(file) - if self._file.getname() != 'FORM': + if self._file.getname() != b'FORM': raise Error('file does not start with FORM id') formdata = self._file.read(4) - if formdata == 'AIFF': + if formdata == b'AIFF': self._aifc = 0 - elif formdata == 'AIFC': + elif formdata == b'AIFC': self._aifc = 1 else: raise Error('not an AIFF or AIFF-C file') @@ -303,39 +302,28 @@ class Aifc_read: except EOFError: break chunkname = chunk.getname() - if chunkname == 'COMM': + if chunkname == b'COMM': self._read_comm_chunk(chunk) self._comm_chunk_read = 1 - elif chunkname == 'SSND': + elif chunkname == b'SSND': self._ssnd_chunk = chunk dummy = chunk.read(8) self._ssnd_seek_needed = 0 - elif chunkname == 'FVER': + elif chunkname == b'FVER': self._version = _read_ulong(chunk) - elif chunkname == 'MARK': + elif chunkname == b'MARK': self._readmark(chunk) elif chunkname in _skiplist: pass else: - raise Error('unrecognized chunk type '+chunk.chunkname) + raise Error('unrecognized chunk type ' + + chunkname.decode('latin1')) chunk.skip() if not self._comm_chunk_read or not self._ssnd_chunk: raise Error('COMM chunk and/or SSND chunk missing') - if self._aifc and self._decomp: - import cl - params = [cl.ORIGINAL_FORMAT, 0, - cl.BITS_PER_COMPONENT, self._sampwidth * 8, - cl.FRAME_RATE, self._framerate] - if self._nchannels == 1: - params[1] = cl.MONO - elif self._nchannels == 2: - params[1] = cl.STEREO_INTERLEAVED - else: - raise Error('cannot compress more than 2 channels') - self._decomp.SetParams(params) def __init__(self, f): - if type(f) == type(''): + if isinstance(f, str): f = builtins.open(f, 'rb') # else, assume it is an open file object already self.initfp(f) @@ -351,9 +339,6 @@ class Aifc_read: self._soundpos = 0 def close(self): - if self._decomp: - self._decomp.CloseDecompressor() - self._decomp = None self._file = None def tell(self): @@ -394,7 +379,7 @@ class Aifc_read: for marker in self._markers: if id == marker[0]: return marker - raise Error('marker %r does not exist' % (id,)) + raise Error('marker {0!r} does not exist'.format(id)) def setpos(self, pos): if pos < 0 or pos > self._nframes: @@ -411,23 +396,21 @@ class Aifc_read: self._ssnd_chunk.seek(pos + 8) self._ssnd_seek_needed = 0 if nframes == 0: - return '' + return b'' data = self._ssnd_chunk.read(nframes * self._framesize) if self._convert and data: data = self._convert(data) - self._soundpos = self._soundpos + len(data) / (self._nchannels * self._sampwidth) + self._soundpos = self._soundpos + len(data) // (self._nchannels + * self._sampwidth) return data # # Internal methods. # - def _decomp_data(self, data): - import cl - dummy = self._decomp.SetParam(cl.FRAME_BUFFER_SIZE, - len(data) * 2) - return self._decomp.Decompress(len(data) / self._nchannels, - data) + def _alaw2lin(self, data): + import audioop + return audioop.alaw2lin(data, 2) def _ulaw2lin(self, data): import audioop @@ -438,14 +421,13 @@ class Aifc_read: if not hasattr(self, '_adpcmstate'): # first time self._adpcmstate = None - data, self._adpcmstate = audioop.adpcm2lin(data, 2, - self._adpcmstate) + data, self._adpcmstate = audioop.adpcm2lin(data, 2, self._adpcmstate) return data def _read_comm_chunk(self, chunk): self._nchannels = _read_short(chunk) self._nframes = _read_long(chunk) - self._sampwidth = (_read_short(chunk) + 7) / 8 + self._sampwidth = (_read_short(chunk) + 7) // 8 self._framerate = int(_read_float(chunk)) self._framesize = self._nchannels * self._sampwidth if self._aifc: @@ -466,42 +448,21 @@ class Aifc_read: chunk.file.seek(-1, 1) #DEBUG end self._compname = _read_string(chunk) - if self._comptype != 'NONE': - if self._comptype == 'G722': - try: - import audioop - except ImportError: - pass - else: - self._convert = self._adpcm2lin - self._framesize = self._framesize / 4 - return - # for ULAW and ALAW try Compression Library - try: - import cl - except ImportError: - if self._comptype == 'ULAW': - try: - import audioop - self._convert = self._ulaw2lin - self._framesize = self._framesize / 2 - return - except ImportError: - pass - raise Error('cannot read compressed AIFF-C files') - if self._comptype == 'ULAW': - scheme = cl.G711_ULAW - self._framesize = self._framesize / 2 - elif self._comptype == 'ALAW': - scheme = cl.G711_ALAW - self._framesize = self._framesize / 2 + if self._comptype != b'NONE': + if self._comptype == b'G722': + self._convert = self._adpcm2lin + self._framesize = self._framesize // 4 + elif self._comptype in (b'ulaw', b'ULAW'): + self._convert = self._ulaw2lin + self._framesize = self._framesize // 2 + elif self._comptype in (b'alaw', b'ALAW'): + self._convert = self._alaw2lin + self._framesize = self._framesize // 2 else: raise Error('unsupported compression type') - self._decomp = cl.OpenDecompressor(scheme) - self._convert = self._decomp_data else: - self._comptype = 'NONE' - self._compname = 'not compressed' + self._comptype = b'NONE' + self._compname = b'not compressed' def _readmark(self, chunk): nmarkers = _read_short(chunk) @@ -555,7 +516,7 @@ class Aifc_write: # _datawritten -- the size of the audio samples actually written def __init__(self, f): - if type(f) == type(''): + if isinstance(f, str): filename = f f = builtins.open(f, 'wb') else: @@ -570,9 +531,8 @@ class Aifc_write: def initfp(self, file): self._file = file self._version = _AIFC_version - self._comptype = 'NONE' - self._compname = 'not compressed' - self._comp = None + self._comptype = b'NONE' + self._compname = b'not compressed' self._convert = None self._nchannels = 0 self._sampwidth = 0 @@ -649,7 +609,8 @@ class Aifc_write: def setcomptype(self, comptype, compname): if self._nframeswritten: raise Error('cannot change parameters after starting to write') - if comptype not in ('NONE', 'ULAW', 'ALAW', 'G722'): + if comptype not in (b'NONE', b'ulaw', b'ULAW', + b'alaw', b'ALAW', b'G722'): raise Error('unsupported compression type') self._comptype = comptype self._compname = compname @@ -669,7 +630,8 @@ class Aifc_write: nchannels, sampwidth, framerate, nframes, comptype, compname = params if self._nframeswritten: raise Error('cannot change parameters after starting to write') - if comptype not in ('NONE', 'ULAW', 'ALAW', 'G722'): + if comptype not in (b'NONE', b'ulaw', b'ULAW', + b'alaw', b'ALAW', b'G722'): raise Error('unsupported compression type') self.setnchannels(nchannels) self.setsampwidth(sampwidth) @@ -688,7 +650,7 @@ class Aifc_write: raise Error('marker ID must be > 0') if pos < 0: raise Error('marker position must be >= 0') - if type(name) != type(''): + if not isinstance(name, str): raise Error('marker name must be a string') for i in range(len(self._markers)): if id == self._markers[i][0]: @@ -700,7 +662,7 @@ class Aifc_write: for marker in self._markers: if id == marker[0]: return marker - raise Error('marker %r does not exist' % (id,)) + raise Error('marker {0!r} does not exist'.format(id)) def getmarkers(self): if len(self._markers) == 0: @@ -712,7 +674,7 @@ class Aifc_write: def writeframesraw(self, data): self._ensure_header_written(len(data)) - nframes = len(data) / (self._sampwidth * self._nchannels) + nframes = len(data) // (self._sampwidth * self._nchannels) if self._convert: data = self._convert(data) self._file.write(data) @@ -729,16 +691,13 @@ class Aifc_write: self._ensure_header_written(0) if self._datawritten & 1: # quick pad to even size - self._file.write(chr(0)) + self._file.write(b'\x00') self._datawritten = self._datawritten + 1 self._writemarkers() if self._nframeswritten != self._nframes or \ self._datalength != self._datawritten or \ self._marklength: self._patchheader() - if self._comp: - self._comp.CloseCompressor() - self._comp = None self._file.flush() self._file = None @@ -746,11 +705,9 @@ class Aifc_write: # Internal methods. # - def _comp_data(self, data): - import cl - dummy = self._comp.SetParam(cl.FRAME_BUFFER_SIZE, len(data)) - dummy = self._comp.SetParam(cl.COMPRESSED_BUFFER_SIZE, len(data)) - return self._comp.Compress(self._nframes, data) + def _lin2alaw(self, data): + import audioop + return audioop.lin2alaw(data, 2) def _lin2ulaw(self, data): import audioop @@ -760,22 +717,23 @@ class Aifc_write: import audioop if not hasattr(self, '_adpcmstate'): self._adpcmstate = None - data, self._adpcmstate = audioop.lin2adpcm(data, 2, - self._adpcmstate) + data, self._adpcmstate = audioop.lin2adpcm(data, 2, self._adpcmstate) return data def _ensure_header_written(self, datasize): if not self._nframeswritten: - if self._comptype in ('ULAW', 'ALAW'): + if self._comptype in (b'ULAW', b'ALAW'): if not self._sampwidth: self._sampwidth = 2 if self._sampwidth != 2: - raise Error('sample width must be 2 when compressing with ULAW or ALAW') - if self._comptype == 'G722': + raise Error('sample width must be 2 when compressing ' + 'with ulaw/ULAW or alaw/ALAW') + if self._comptype == b'G722': if not self._sampwidth: self._sampwidth = 2 if self._sampwidth != 2: - raise Error('sample width must be 2 when compressing with G7.22 (ADPCM)') + raise Error('sample width must be 2 when compressing ' + 'with G7.22 (ADPCM)') if not self._nchannels: raise Error('# channels not specified') if not self._sampwidth: @@ -785,71 +743,43 @@ class Aifc_write: self._write_header(datasize) def _init_compression(self): - if self._comptype == 'G722': + if self._comptype == b'G722': self._convert = self._lin2adpcm - return - try: - import cl - except ImportError: - if self._comptype == 'ULAW': - try: - import audioop - self._convert = self._lin2ulaw - return - except ImportError: - pass - raise Error('cannot write compressed AIFF-C files') - if self._comptype == 'ULAW': - scheme = cl.G711_ULAW - elif self._comptype == 'ALAW': - scheme = cl.G711_ALAW + elif self._comptype in (b'ulaw', b'ULAW'): + self._convert = self._lin2ulaw + elif self._comptype in (b'alaw', b'ALAW'): + self._convert = self._lin2alaw else: raise Error('unsupported compression type') - self._comp = cl.OpenCompressor(scheme) - params = [cl.ORIGINAL_FORMAT, 0, - cl.BITS_PER_COMPONENT, self._sampwidth * 8, - cl.FRAME_RATE, self._framerate, - cl.FRAME_BUFFER_SIZE, 100, - cl.COMPRESSED_BUFFER_SIZE, 100] - if self._nchannels == 1: - params[1] = cl.MONO - elif self._nchannels == 2: - params[1] = cl.STEREO_INTERLEAVED - else: - raise Error('cannot compress more than 2 channels') - self._comp.SetParams(params) - # the compressor produces a header which we ignore - dummy = self._comp.Compress(0, '') - self._convert = self._comp_data def _write_header(self, initlength): - if self._aifc and self._comptype != 'NONE': + if self._aifc and self._comptype != b'NONE': self._init_compression() - self._file.write('FORM') + self._file.write(b'FORM') if not self._nframes: - self._nframes = initlength / (self._nchannels * self._sampwidth) + self._nframes = initlength // (self._nchannels * self._sampwidth) self._datalength = self._nframes * self._nchannels * self._sampwidth if self._datalength & 1: self._datalength = self._datalength + 1 if self._aifc: - if self._comptype in ('ULAW', 'ALAW'): - self._datalength = self._datalength / 2 + if self._comptype in (b'ulaw', b'ULAW', b'alaw', b'ALAW'): + self._datalength = self._datalength // 2 if self._datalength & 1: self._datalength = self._datalength + 1 - elif self._comptype == 'G722': - self._datalength = (self._datalength + 3) / 4 + elif self._comptype == b'G722': + self._datalength = (self._datalength + 3) // 4 if self._datalength & 1: self._datalength = self._datalength + 1 self._form_length_pos = self._file.tell() commlength = self._write_form_length(self._datalength) if self._aifc: - self._file.write('AIFC') - self._file.write('FVER') + self._file.write(b'AIFC') + self._file.write(b'FVER') _write_long(self._file, 4) _write_long(self._file, self._version) else: - self._file.write('AIFF') - self._file.write('COMM') + self._file.write(b'AIFF') + self._file.write(b'COMM') _write_long(self._file, commlength) _write_short(self._file, self._nchannels) self._nframes_pos = self._file.tell() @@ -859,7 +789,7 @@ class Aifc_write: if self._aifc: self._file.write(self._comptype) _write_string(self._file, self._compname) - self._file.write('SSND') + self._file.write(b'SSND') self._ssnd_length_pos = self._file.tell() _write_long(self._file, self._datalength + 8) _write_long(self._file, 0) @@ -882,7 +812,7 @@ class Aifc_write: curpos = self._file.tell() if self._datawritten & 1: datalength = self._datawritten + 1 - self._file.write(chr(0)) + self._file.write(b'\x00') else: datalength = self._datawritten if datalength == self._datalength and \ @@ -903,7 +833,7 @@ class Aifc_write: def _writemarkers(self): if len(self._markers) == 0: return - self._file.write('MARK') + self._file.write(b'MARK') length = 2 for marker in self._markers: id, pos, name = marker -- cgit v0.12