diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/encodings/aliases.py | 10 | ||||
-rw-r--r-- | Lib/encodings/utf_32.py | 144 | ||||
-rw-r--r-- | Lib/encodings/utf_32_be.py | 37 | ||||
-rw-r--r-- | Lib/encodings/utf_32_le.py | 37 | ||||
-rw-r--r-- | Lib/test/test_codeccallbacks.py | 7 | ||||
-rw-r--r-- | Lib/test/test_codecs.py | 140 |
6 files changed, 373 insertions, 2 deletions
diff --git a/Lib/encodings/aliases.py b/Lib/encodings/aliases.py index cefb2ed..c6f5aeb 100644 --- a/Lib/encodings/aliases.py +++ b/Lib/encodings/aliases.py @@ -490,6 +490,16 @@ aliases = { 'unicodelittleunmarked' : 'utf_16_le', 'utf_16le' : 'utf_16_le', + # utf_32 codec + 'u32' : 'utf_32', + 'utf32' : 'utf_32', + + # utf_32_be codec + 'utf_32be' : 'utf_32_be', + + # utf_32_le codec + 'utf_32le' : 'utf_32_le', + # utf_7 codec 'u7' : 'utf_7', 'utf7' : 'utf_7', diff --git a/Lib/encodings/utf_32.py b/Lib/encodings/utf_32.py new file mode 100644 index 0000000..622f84b --- /dev/null +++ b/Lib/encodings/utf_32.py @@ -0,0 +1,144 @@ +""" +Python 'utf-32' Codec +""" +import codecs, sys + +### Codec APIs + +encode = codecs.utf_32_encode + +def decode(input, errors='strict'): + return codecs.utf_32_decode(input, errors, True) + +class IncrementalEncoder(codecs.IncrementalEncoder): + def __init__(self, errors='strict'): + codecs.IncrementalEncoder.__init__(self, errors) + self.encoder = None + + def encode(self, input, final=False): + if self.encoder is None: + result = codecs.utf_32_encode(input, self.errors)[0] + if sys.byteorder == 'little': + self.encoder = codecs.utf_32_le_encode + else: + self.encoder = codecs.utf_32_be_encode + return result + return self.encoder(input, self.errors)[0] + + def reset(self): + codecs.IncrementalEncoder.reset(self) + self.encoder = None + + def getstate(self): + # state info we return to the caller: + # 0: stream is in natural order for this platform + # 2: endianness hasn't been determined yet + # (we're never writing in unnatural order) + return (2 if self.encoder is None else 0) + + def setstate(self, state): + if state: + self.encoder = None + else: + if sys.byteorder == 'little': + self.encoder = codecs.utf_32_le_encode + else: + self.encoder = codecs.utf_32_be_encode + +class IncrementalDecoder(codecs.BufferedIncrementalDecoder): + def __init__(self, errors='strict'): + codecs.BufferedIncrementalDecoder.__init__(self, errors) + self.decoder = None + + def _buffer_decode(self, input, errors, final): + if self.decoder is None: + (output, consumed, byteorder) = \ + codecs.utf_32_ex_decode(input, errors, 0, final) + if byteorder == -1: + self.decoder = codecs.utf_32_le_decode + elif byteorder == 1: + self.decoder = codecs.utf_32_be_decode + elif consumed >= 4: + raise UnicodeError("UTF-32 stream does not start with BOM") + return (output, consumed) + return self.decoder(input, self.errors, final) + + def reset(self): + codecs.BufferedIncrementalDecoder.reset(self) + self.decoder = None + + def getstate(self): + # additonal state info from the base class must be None here, + # as it isn't passed along to the caller + state = codecs.BufferedIncrementalDecoder.getstate(self)[0] + # additional state info we pass to the caller: + # 0: stream is in natural order for this platform + # 1: stream is in unnatural order + # 2: endianness hasn't been determined yet + if self.decoder is None: + return (state, 2) + addstate = int((sys.byteorder == "big") != + (self.decoder is codecs.utf_32_be_decode)) + return (state, addstate) + + def setstate(self, state): + # state[1] will be ignored by BufferedIncrementalDecoder.setstate() + codecs.BufferedIncrementalDecoder.setstate(self, state) + state = state[1] + if state == 0: + self.decoder = (codecs.utf_32_be_decode + if sys.byteorder == "big" + else codecs.utf_32_le_decode) + elif state == 1: + self.decoder = (codecs.utf_32_le_decode + if sys.byteorder == "big" + else codecs.utf_32_be_decode) + else: + self.decoder = None + +class StreamWriter(codecs.StreamWriter): + def __init__(self, stream, errors='strict'): + self.bom_written = False + codecs.StreamWriter.__init__(self, stream, errors) + + def encode(self, input, errors='strict'): + self.bom_written = True + result = codecs.utf_32_encode(input, errors) + if sys.byteorder == 'little': + self.encode = codecs.utf_32_le_encode + else: + self.encode = codecs.utf_32_be_encode + return result + +class StreamReader(codecs.StreamReader): + + def reset(self): + codecs.StreamReader.reset(self) + try: + del self.decode + except AttributeError: + pass + + def decode(self, input, errors='strict'): + (object, consumed, byteorder) = \ + codecs.utf_32_ex_decode(input, errors, 0, False) + if byteorder == -1: + self.decode = codecs.utf_32_le_decode + elif byteorder == 1: + self.decode = codecs.utf_32_le_decode + elif consumed>=4: + raise UnicodeError,"UTF-32 stream does not start with BOM" + return (object, consumed) + +### encodings module API + +def getregentry(): + return codecs.CodecInfo( + name='utf-32', + encode=encode, + decode=decode, + incrementalencoder=IncrementalEncoder, + incrementaldecoder=IncrementalDecoder, + streamreader=StreamReader, + streamwriter=StreamWriter, + ) diff --git a/Lib/encodings/utf_32_be.py b/Lib/encodings/utf_32_be.py new file mode 100644 index 0000000..fe272b5 --- /dev/null +++ b/Lib/encodings/utf_32_be.py @@ -0,0 +1,37 @@ +""" +Python 'utf-32-be' Codec +""" +import codecs + +### Codec APIs + +encode = codecs.utf_32_be_encode + +def decode(input, errors='strict'): + return codecs.utf_32_be_decode(input, errors, True) + +class IncrementalEncoder(codecs.IncrementalEncoder): + def encode(self, input, final=False): + return codecs.utf_32_be_encode(input, self.errors)[0] + +class IncrementalDecoder(codecs.BufferedIncrementalDecoder): + _buffer_decode = codecs.utf_32_be_decode + +class StreamWriter(codecs.StreamWriter): + encode = codecs.utf_32_be_encode + +class StreamReader(codecs.StreamReader): + decode = codecs.utf_32_be_decode + +### encodings module API + +def getregentry(): + return codecs.CodecInfo( + name='utf-32-be', + encode=encode, + decode=decode, + incrementalencoder=IncrementalEncoder, + incrementaldecoder=IncrementalDecoder, + streamreader=StreamReader, + streamwriter=StreamWriter, + ) diff --git a/Lib/encodings/utf_32_le.py b/Lib/encodings/utf_32_le.py new file mode 100644 index 0000000..9e48210 --- /dev/null +++ b/Lib/encodings/utf_32_le.py @@ -0,0 +1,37 @@ +""" +Python 'utf-32-le' Codec +""" +import codecs + +### Codec APIs + +encode = codecs.utf_32_le_encode + +def decode(input, errors='strict'): + return codecs.utf_32_le_decode(input, errors, True) + +class IncrementalEncoder(codecs.IncrementalEncoder): + def encode(self, input, final=False): + return codecs.utf_32_le_encode(input, self.errors)[0] + +class IncrementalDecoder(codecs.BufferedIncrementalDecoder): + _buffer_decode = codecs.utf_32_le_decode + +class StreamWriter(codecs.StreamWriter): + encode = codecs.utf_32_le_encode + +class StreamReader(codecs.StreamReader): + decode = codecs.utf_32_le_decode + +### encodings module API + +def getregentry(): + return codecs.CodecInfo( + name='utf-32-le', + encode=encode, + decode=decode, + incrementalencoder=IncrementalEncoder, + incrementaldecoder=IncrementalDecoder, + streamreader=StreamReader, + streamwriter=StreamWriter, + ) diff --git a/Lib/test/test_codeccallbacks.py b/Lib/test/test_codeccallbacks.py index f76ec65..9b731d5 100644 --- a/Lib/test/test_codeccallbacks.py +++ b/Lib/test/test_codeccallbacks.py @@ -285,7 +285,8 @@ class CodecCallbackTest(unittest.TestCase): def test_longstrings(self): # test long strings to check for memory overflow problems - errors = [ "strict", "ignore", "replace", "xmlcharrefreplace", "backslashreplace"] + errors = [ "strict", "ignore", "replace", "xmlcharrefreplace", + "backslashreplace"] # register the handlers under different names, # to prevent the codec from recognizing the name for err in errors: @@ -293,7 +294,8 @@ class CodecCallbackTest(unittest.TestCase): l = 1000 errors += [ "test." + err for err in errors ] for uni in [ s*l for s in ("x", "\u3042", "a\xe4") ]: - for enc in ("ascii", "latin-1", "iso-8859-1", "iso-8859-15", "utf-8", "utf-7", "utf-16"): + for enc in ("ascii", "latin-1", "iso-8859-1", "iso-8859-15", + "utf-8", "utf-7", "utf-16", "utf-32"): for err in errors: try: uni.encode(enc, err) @@ -812,6 +814,7 @@ class CodecCallbackTest(unittest.TestCase): ("utf-7", b"++"), ("utf-8", b"\xff"), ("utf-16", b"\xff"), + ("utf-32", b"\xff"), ("unicode-escape", b"\\u123g"), ("raw-unicode-escape", b"\\u123g"), ("unicode-internal", b"\xff"), diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index 89a3473..f2ee524 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -277,6 +277,143 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling): self.assertEqual(reader.readline(), s5) self.assertEqual(reader.readline(), "") +class UTF32Test(ReadTest): + encoding = "utf-32" + + spamle = (b'\xff\xfe\x00\x00' + b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00' + b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00') + spambe = (b'\x00\x00\xfe\xff' + b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m' + b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m') + + def test_only_one_bom(self): + _,_,reader,writer = codecs.lookup(self.encoding) + # encode some stream + s = io.BytesIO() + f = writer(s) + f.write("spam") + f.write("spam") + d = s.getvalue() + # check whether there is exactly one BOM in it + self.assert_(d == self.spamle or d == self.spambe) + # try to read it back + s = io.BytesIO(d) + f = reader(s) + self.assertEquals(f.read(), "spamspam") + + def test_badbom(self): + s = io.BytesIO(4*b"\xff") + f = codecs.getreader(self.encoding)(s) + self.assertRaises(UnicodeError, f.read) + + s = io.BytesIO(8*b"\xff") + f = codecs.getreader(self.encoding)(s) + self.assertRaises(UnicodeError, f.read) + + def test_partial(self): + self.check_partial( + "\x00\xff\u0100\uffff", + [ + "", # first byte of BOM read + "", # second byte of BOM read + "", # third byte of BOM read + "", # fourth byte of BOM read => byteorder known + "", + "", + "", + "\x00", + "\x00", + "\x00", + "\x00", + "\x00\xff", + "\x00\xff", + "\x00\xff", + "\x00\xff", + "\x00\xff\u0100", + "\x00\xff\u0100", + "\x00\xff\u0100", + "\x00\xff\u0100", + "\x00\xff\u0100\uffff", + ] + ) + + def test_errors(self): + self.assertRaises(UnicodeDecodeError, codecs.utf_32_decode, + b"\xff", "strict", True) + + def test_decoder_state(self): + self.check_state_handling_decode(self.encoding, + "spamspam", self.spamle) + self.check_state_handling_decode(self.encoding, + "spamspam", self.spambe) + +class UTF32LETest(ReadTest): + encoding = "utf-32-le" + + def test_partial(self): + self.check_partial( + "\x00\xff\u0100\uffff", + [ + "", + "", + "", + "\x00", + "\x00", + "\x00", + "\x00", + "\x00\xff", + "\x00\xff", + "\x00\xff", + "\x00\xff", + "\x00\xff\u0100", + "\x00\xff\u0100", + "\x00\xff\u0100", + "\x00\xff\u0100", + "\x00\xff\u0100\uffff", + ] + ) + + def test_simple(self): + self.assertEqual("\U00010203".encode(self.encoding), b"\x03\x02\x01\x00") + + def test_errors(self): + self.assertRaises(UnicodeDecodeError, codecs.utf_32_le_decode, + b"\xff", "strict", True) + +class UTF32BETest(ReadTest): + encoding = "utf-32-be" + + def test_partial(self): + self.check_partial( + "\x00\xff\u0100\uffff", + [ + "", + "", + "", + "\x00", + "\x00", + "\x00", + "\x00", + "\x00\xff", + "\x00\xff", + "\x00\xff", + "\x00\xff", + "\x00\xff\u0100", + "\x00\xff\u0100", + "\x00\xff\u0100", + "\x00\xff\u0100", + "\x00\xff\u0100\uffff", + ] + ) + + def test_simple(self): + self.assertEqual("\U00010203".encode(self.encoding), b"\x00\x01\x02\x03") + + def test_errors(self): + self.assertRaises(UnicodeDecodeError, codecs.utf_32_be_decode, + b"\xff", "strict", True) + class UTF16Test(ReadTest): encoding = "utf-16" @@ -1284,6 +1421,9 @@ class WithStmtTest(unittest.TestCase): def test_main(): test_support.run_unittest( + UTF32Test, + UTF32LETest, + UTF32BETest, UTF16Test, UTF16LETest, UTF16BETest, |