summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/encodings/aliases.py10
-rw-r--r--Lib/encodings/utf_32.py144
-rw-r--r--Lib/encodings/utf_32_be.py37
-rw-r--r--Lib/encodings/utf_32_le.py37
-rw-r--r--Lib/test/test_codeccallbacks.py7
-rw-r--r--Lib/test/test_codecs.py140
6 files changed, 373 insertions, 2 deletions
diff --git a/Lib/encodings/aliases.py b/Lib/encodings/aliases.py
index cefb2ed..c6f5aeb 100644
--- a/Lib/encodings/aliases.py
+++ b/Lib/encodings/aliases.py
@@ -490,6 +490,16 @@ aliases = {
'unicodelittleunmarked' : 'utf_16_le',
'utf_16le' : 'utf_16_le',
+ # utf_32 codec
+ 'u32' : 'utf_32',
+ 'utf32' : 'utf_32',
+
+ # utf_32_be codec
+ 'utf_32be' : 'utf_32_be',
+
+ # utf_32_le codec
+ 'utf_32le' : 'utf_32_le',
+
# utf_7 codec
'u7' : 'utf_7',
'utf7' : 'utf_7',
diff --git a/Lib/encodings/utf_32.py b/Lib/encodings/utf_32.py
new file mode 100644
index 0000000..622f84b
--- /dev/null
+++ b/Lib/encodings/utf_32.py
@@ -0,0 +1,144 @@
+"""
+Python 'utf-32' Codec
+"""
+import codecs, sys
+
+### Codec APIs
+
+encode = codecs.utf_32_encode
+
+def decode(input, errors='strict'):
+ return codecs.utf_32_decode(input, errors, True)
+
+class IncrementalEncoder(codecs.IncrementalEncoder):
+ def __init__(self, errors='strict'):
+ codecs.IncrementalEncoder.__init__(self, errors)
+ self.encoder = None
+
+ def encode(self, input, final=False):
+ if self.encoder is None:
+ result = codecs.utf_32_encode(input, self.errors)[0]
+ if sys.byteorder == 'little':
+ self.encoder = codecs.utf_32_le_encode
+ else:
+ self.encoder = codecs.utf_32_be_encode
+ return result
+ return self.encoder(input, self.errors)[0]
+
+ def reset(self):
+ codecs.IncrementalEncoder.reset(self)
+ self.encoder = None
+
+ def getstate(self):
+ # state info we return to the caller:
+ # 0: stream is in natural order for this platform
+ # 2: endianness hasn't been determined yet
+ # (we're never writing in unnatural order)
+ return (2 if self.encoder is None else 0)
+
+ def setstate(self, state):
+ if state:
+ self.encoder = None
+ else:
+ if sys.byteorder == 'little':
+ self.encoder = codecs.utf_32_le_encode
+ else:
+ self.encoder = codecs.utf_32_be_encode
+
+class IncrementalDecoder(codecs.BufferedIncrementalDecoder):
+ def __init__(self, errors='strict'):
+ codecs.BufferedIncrementalDecoder.__init__(self, errors)
+ self.decoder = None
+
+ def _buffer_decode(self, input, errors, final):
+ if self.decoder is None:
+ (output, consumed, byteorder) = \
+ codecs.utf_32_ex_decode(input, errors, 0, final)
+ if byteorder == -1:
+ self.decoder = codecs.utf_32_le_decode
+ elif byteorder == 1:
+ self.decoder = codecs.utf_32_be_decode
+ elif consumed >= 4:
+ raise UnicodeError("UTF-32 stream does not start with BOM")
+ return (output, consumed)
+ return self.decoder(input, self.errors, final)
+
+ def reset(self):
+ codecs.BufferedIncrementalDecoder.reset(self)
+ self.decoder = None
+
+ def getstate(self):
+ # additonal state info from the base class must be None here,
+ # as it isn't passed along to the caller
+ state = codecs.BufferedIncrementalDecoder.getstate(self)[0]
+ # additional state info we pass to the caller:
+ # 0: stream is in natural order for this platform
+ # 1: stream is in unnatural order
+ # 2: endianness hasn't been determined yet
+ if self.decoder is None:
+ return (state, 2)
+ addstate = int((sys.byteorder == "big") !=
+ (self.decoder is codecs.utf_32_be_decode))
+ return (state, addstate)
+
+ def setstate(self, state):
+ # state[1] will be ignored by BufferedIncrementalDecoder.setstate()
+ codecs.BufferedIncrementalDecoder.setstate(self, state)
+ state = state[1]
+ if state == 0:
+ self.decoder = (codecs.utf_32_be_decode
+ if sys.byteorder == "big"
+ else codecs.utf_32_le_decode)
+ elif state == 1:
+ self.decoder = (codecs.utf_32_le_decode
+ if sys.byteorder == "big"
+ else codecs.utf_32_be_decode)
+ else:
+ self.decoder = None
+
+class StreamWriter(codecs.StreamWriter):
+ def __init__(self, stream, errors='strict'):
+ self.bom_written = False
+ codecs.StreamWriter.__init__(self, stream, errors)
+
+ def encode(self, input, errors='strict'):
+ self.bom_written = True
+ result = codecs.utf_32_encode(input, errors)
+ if sys.byteorder == 'little':
+ self.encode = codecs.utf_32_le_encode
+ else:
+ self.encode = codecs.utf_32_be_encode
+ return result
+
+class StreamReader(codecs.StreamReader):
+
+ def reset(self):
+ codecs.StreamReader.reset(self)
+ try:
+ del self.decode
+ except AttributeError:
+ pass
+
+ def decode(self, input, errors='strict'):
+ (object, consumed, byteorder) = \
+ codecs.utf_32_ex_decode(input, errors, 0, False)
+ if byteorder == -1:
+ self.decode = codecs.utf_32_le_decode
+ elif byteorder == 1:
+ self.decode = codecs.utf_32_le_decode
+ elif consumed>=4:
+ raise UnicodeError,"UTF-32 stream does not start with BOM"
+ return (object, consumed)
+
+### encodings module API
+
+def getregentry():
+ return codecs.CodecInfo(
+ name='utf-32',
+ encode=encode,
+ decode=decode,
+ incrementalencoder=IncrementalEncoder,
+ incrementaldecoder=IncrementalDecoder,
+ streamreader=StreamReader,
+ streamwriter=StreamWriter,
+ )
diff --git a/Lib/encodings/utf_32_be.py b/Lib/encodings/utf_32_be.py
new file mode 100644
index 0000000..fe272b5
--- /dev/null
+++ b/Lib/encodings/utf_32_be.py
@@ -0,0 +1,37 @@
+"""
+Python 'utf-32-be' Codec
+"""
+import codecs
+
+### Codec APIs
+
+encode = codecs.utf_32_be_encode
+
+def decode(input, errors='strict'):
+ return codecs.utf_32_be_decode(input, errors, True)
+
+class IncrementalEncoder(codecs.IncrementalEncoder):
+ def encode(self, input, final=False):
+ return codecs.utf_32_be_encode(input, self.errors)[0]
+
+class IncrementalDecoder(codecs.BufferedIncrementalDecoder):
+ _buffer_decode = codecs.utf_32_be_decode
+
+class StreamWriter(codecs.StreamWriter):
+ encode = codecs.utf_32_be_encode
+
+class StreamReader(codecs.StreamReader):
+ decode = codecs.utf_32_be_decode
+
+### encodings module API
+
+def getregentry():
+ return codecs.CodecInfo(
+ name='utf-32-be',
+ encode=encode,
+ decode=decode,
+ incrementalencoder=IncrementalEncoder,
+ incrementaldecoder=IncrementalDecoder,
+ streamreader=StreamReader,
+ streamwriter=StreamWriter,
+ )
diff --git a/Lib/encodings/utf_32_le.py b/Lib/encodings/utf_32_le.py
new file mode 100644
index 0000000..9e48210
--- /dev/null
+++ b/Lib/encodings/utf_32_le.py
@@ -0,0 +1,37 @@
+"""
+Python 'utf-32-le' Codec
+"""
+import codecs
+
+### Codec APIs
+
+encode = codecs.utf_32_le_encode
+
+def decode(input, errors='strict'):
+ return codecs.utf_32_le_decode(input, errors, True)
+
+class IncrementalEncoder(codecs.IncrementalEncoder):
+ def encode(self, input, final=False):
+ return codecs.utf_32_le_encode(input, self.errors)[0]
+
+class IncrementalDecoder(codecs.BufferedIncrementalDecoder):
+ _buffer_decode = codecs.utf_32_le_decode
+
+class StreamWriter(codecs.StreamWriter):
+ encode = codecs.utf_32_le_encode
+
+class StreamReader(codecs.StreamReader):
+ decode = codecs.utf_32_le_decode
+
+### encodings module API
+
+def getregentry():
+ return codecs.CodecInfo(
+ name='utf-32-le',
+ encode=encode,
+ decode=decode,
+ incrementalencoder=IncrementalEncoder,
+ incrementaldecoder=IncrementalDecoder,
+ streamreader=StreamReader,
+ streamwriter=StreamWriter,
+ )
diff --git a/Lib/test/test_codeccallbacks.py b/Lib/test/test_codeccallbacks.py
index f76ec65..9b731d5 100644
--- a/Lib/test/test_codeccallbacks.py
+++ b/Lib/test/test_codeccallbacks.py
@@ -285,7 +285,8 @@ class CodecCallbackTest(unittest.TestCase):
def test_longstrings(self):
# test long strings to check for memory overflow problems
- errors = [ "strict", "ignore", "replace", "xmlcharrefreplace", "backslashreplace"]
+ errors = [ "strict", "ignore", "replace", "xmlcharrefreplace",
+ "backslashreplace"]
# register the handlers under different names,
# to prevent the codec from recognizing the name
for err in errors:
@@ -293,7 +294,8 @@ class CodecCallbackTest(unittest.TestCase):
l = 1000
errors += [ "test." + err for err in errors ]
for uni in [ s*l for s in ("x", "\u3042", "a\xe4") ]:
- for enc in ("ascii", "latin-1", "iso-8859-1", "iso-8859-15", "utf-8", "utf-7", "utf-16"):
+ for enc in ("ascii", "latin-1", "iso-8859-1", "iso-8859-15",
+ "utf-8", "utf-7", "utf-16", "utf-32"):
for err in errors:
try:
uni.encode(enc, err)
@@ -812,6 +814,7 @@ class CodecCallbackTest(unittest.TestCase):
("utf-7", b"++"),
("utf-8", b"\xff"),
("utf-16", b"\xff"),
+ ("utf-32", b"\xff"),
("unicode-escape", b"\\u123g"),
("raw-unicode-escape", b"\\u123g"),
("unicode-internal", b"\xff"),
diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py
index 89a3473..f2ee524 100644
--- a/Lib/test/test_codecs.py
+++ b/Lib/test/test_codecs.py
@@ -277,6 +277,143 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling):
self.assertEqual(reader.readline(), s5)
self.assertEqual(reader.readline(), "")
+class UTF32Test(ReadTest):
+ encoding = "utf-32"
+
+ spamle = (b'\xff\xfe\x00\x00'
+ b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00'
+ b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00')
+ spambe = (b'\x00\x00\xfe\xff'
+ b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m'
+ b'\x00\x00\x00s\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m')
+
+ def test_only_one_bom(self):
+ _,_,reader,writer = codecs.lookup(self.encoding)
+ # encode some stream
+ s = io.BytesIO()
+ f = writer(s)
+ f.write("spam")
+ f.write("spam")
+ d = s.getvalue()
+ # check whether there is exactly one BOM in it
+ self.assert_(d == self.spamle or d == self.spambe)
+ # try to read it back
+ s = io.BytesIO(d)
+ f = reader(s)
+ self.assertEquals(f.read(), "spamspam")
+
+ def test_badbom(self):
+ s = io.BytesIO(4*b"\xff")
+ f = codecs.getreader(self.encoding)(s)
+ self.assertRaises(UnicodeError, f.read)
+
+ s = io.BytesIO(8*b"\xff")
+ f = codecs.getreader(self.encoding)(s)
+ self.assertRaises(UnicodeError, f.read)
+
+ def test_partial(self):
+ self.check_partial(
+ "\x00\xff\u0100\uffff",
+ [
+ "", # first byte of BOM read
+ "", # second byte of BOM read
+ "", # third byte of BOM read
+ "", # fourth byte of BOM read => byteorder known
+ "",
+ "",
+ "",
+ "\x00",
+ "\x00",
+ "\x00",
+ "\x00",
+ "\x00\xff",
+ "\x00\xff",
+ "\x00\xff",
+ "\x00\xff",
+ "\x00\xff\u0100",
+ "\x00\xff\u0100",
+ "\x00\xff\u0100",
+ "\x00\xff\u0100",
+ "\x00\xff\u0100\uffff",
+ ]
+ )
+
+ def test_errors(self):
+ self.assertRaises(UnicodeDecodeError, codecs.utf_32_decode,
+ b"\xff", "strict", True)
+
+ def test_decoder_state(self):
+ self.check_state_handling_decode(self.encoding,
+ "spamspam", self.spamle)
+ self.check_state_handling_decode(self.encoding,
+ "spamspam", self.spambe)
+
+class UTF32LETest(ReadTest):
+ encoding = "utf-32-le"
+
+ def test_partial(self):
+ self.check_partial(
+ "\x00\xff\u0100\uffff",
+ [
+ "",
+ "",
+ "",
+ "\x00",
+ "\x00",
+ "\x00",
+ "\x00",
+ "\x00\xff",
+ "\x00\xff",
+ "\x00\xff",
+ "\x00\xff",
+ "\x00\xff\u0100",
+ "\x00\xff\u0100",
+ "\x00\xff\u0100",
+ "\x00\xff\u0100",
+ "\x00\xff\u0100\uffff",
+ ]
+ )
+
+ def test_simple(self):
+ self.assertEqual("\U00010203".encode(self.encoding), b"\x03\x02\x01\x00")
+
+ def test_errors(self):
+ self.assertRaises(UnicodeDecodeError, codecs.utf_32_le_decode,
+ b"\xff", "strict", True)
+
+class UTF32BETest(ReadTest):
+ encoding = "utf-32-be"
+
+ def test_partial(self):
+ self.check_partial(
+ "\x00\xff\u0100\uffff",
+ [
+ "",
+ "",
+ "",
+ "\x00",
+ "\x00",
+ "\x00",
+ "\x00",
+ "\x00\xff",
+ "\x00\xff",
+ "\x00\xff",
+ "\x00\xff",
+ "\x00\xff\u0100",
+ "\x00\xff\u0100",
+ "\x00\xff\u0100",
+ "\x00\xff\u0100",
+ "\x00\xff\u0100\uffff",
+ ]
+ )
+
+ def test_simple(self):
+ self.assertEqual("\U00010203".encode(self.encoding), b"\x00\x01\x02\x03")
+
+ def test_errors(self):
+ self.assertRaises(UnicodeDecodeError, codecs.utf_32_be_decode,
+ b"\xff", "strict", True)
+
class UTF16Test(ReadTest):
encoding = "utf-16"
@@ -1284,6 +1421,9 @@ class WithStmtTest(unittest.TestCase):
def test_main():
test_support.run_unittest(
+ UTF32Test,
+ UTF32LETest,
+ UTF32BETest,
UTF16Test,
UTF16LETest,
UTF16BETest,