From ca8a8d0b3fe228c0c3c0d84c09775630581edf25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Walter=20D=C3=B6rwald?= Date: Fri, 4 May 2007 13:05:09 +0000 Subject: Make the BOM constants in codecs.py bytes. Make the buffered input for decoders a bytes object. Fix some of the codec tests. --- Lib/codecs.py | 22 ++++---- Lib/test/test_codecs.py | 145 ++++++++++++++++++++++++------------------------ 2 files changed, 83 insertions(+), 84 deletions(-) diff --git a/Lib/codecs.py b/Lib/codecs.py index 909a651..982c282 100644 --- a/Lib/codecs.py +++ b/Lib/codecs.py @@ -33,19 +33,19 @@ __all__ = ["register", "lookup", "open", "EncodedFile", "BOM", "BOM_BE", # # UTF-8 -BOM_UTF8 = '\xef\xbb\xbf' +BOM_UTF8 = b'\xef\xbb\xbf' # UTF-16, little endian -BOM_LE = BOM_UTF16_LE = '\xff\xfe' +BOM_LE = BOM_UTF16_LE = b'\xff\xfe' # UTF-16, big endian -BOM_BE = BOM_UTF16_BE = '\xfe\xff' +BOM_BE = BOM_UTF16_BE = b'\xfe\xff' # UTF-32, little endian -BOM_UTF32_LE = '\xff\xfe\x00\x00' +BOM_UTF32_LE = b'\xff\xfe\x00\x00' # UTF-32, big endian -BOM_UTF32_BE = '\x00\x00\xfe\xff' +BOM_UTF32_BE = b'\x00\x00\xfe\xff' if sys.byteorder == 'little': @@ -261,7 +261,7 @@ class IncrementalDecoder(object): Return the current state of the decoder. This must be a (buffered_input, additional_state_info) tuple. """ - return ("", 0) + return (b"", 0) def setstate(self, state): """ @@ -278,7 +278,7 @@ class BufferedIncrementalDecoder(IncrementalDecoder): def __init__(self, errors='strict'): IncrementalDecoder.__init__(self, errors) # undecoded input that is kept between calls to decode() - self.buffer = "" + self.buffer = b"" def _buffer_decode(self, input, errors, final): # Overwrite this method in subclasses: It must decode input @@ -295,7 +295,7 @@ class BufferedIncrementalDecoder(IncrementalDecoder): def reset(self): IncrementalDecoder.reset(self) - self.buffer = "" + self.buffer = b"" def getstate(self): # additional state info is always 0 @@ -402,7 +402,7 @@ class StreamReader(Codec): """ self.stream = stream self.errors = errors - self.bytebuffer = "" + self.bytebuffer = b"" # For str->str decoding this will stay a str # For str->unicode decoding the first read will promote it to unicode self.charbuffer = "" @@ -588,7 +588,7 @@ class StreamReader(Codec): from decoding errors. """ - self.bytebuffer = "" + self.bytebuffer = b"" self.charbuffer = "" self.linebuffer = None @@ -1005,7 +1005,7 @@ def iterdecode(iterator, encoding, errors='strict', **kwargs): output = decoder.decode(input) if output: yield output - output = decoder.decode("", True) + output = decoder.decode(b"", True) if output: yield output diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index 991b44a..10e4c36 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -1,14 +1,14 @@ from test import test_support import unittest import codecs -import sys, StringIO, _testcapi +import sys, cStringIO, _testcapi class Queue(object): """ queue: write bytes at one end, read bytes from the other end """ - def __init__(self): - self._buffer = "" + def __init__(self, buffer): + self._buffer = buffer def write(self, chars): self._buffer += chars @@ -16,7 +16,7 @@ class Queue(object): def read(self, size=-1): if size<0: s = self._buffer - self._buffer = "" + self._buffer = self._buffer[:0] # make empty return s else: s = self._buffer[:size] @@ -62,48 +62,48 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling): # of input to the reader byte by byte. Read every available from # the StreamReader and check that the results equal the appropriate # entries from partialresults. - q = Queue() + q = Queue(b"") r = codecs.getreader(self.encoding)(q) result = "" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): - q.write(c) + q.write(bytes([c])) result += r.read() self.assertEqual(result, partialresult) # check that there's nothing left in the buffers self.assertEqual(r.read(), "") - self.assertEqual(r.bytebuffer, "") + self.assertEqual(r.bytebuffer, b"") self.assertEqual(r.charbuffer, "") # do the check again, this time using a incremental decoder d = codecs.getincrementaldecoder(self.encoding)() result = "" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): - result += d.decode(c) + result += d.decode(bytes([c])) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers - self.assertEqual(d.decode("", True), "") - self.assertEqual(d.buffer, "") + self.assertEqual(d.decode(b"", True), "") + self.assertEqual(d.buffer, b"") - # Check whether the rest method works properly + # Check whether the reset method works properly d.reset() result = "" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): - result += d.decode(c) + result += d.decode(bytes([c])) self.assertEqual(result, partialresult) # check that there's nothing left in the buffers - self.assertEqual(d.decode("", True), "") - self.assertEqual(d.buffer, "") + self.assertEqual(d.decode(b"", True), "") + self.assertEqual(d.buffer, b"") # check iterdecode() encoded = input.encode(self.encoding) self.assertEqual( input, - "".join(codecs.iterdecode(encoded, self.encoding)) + "".join(codecs.iterdecode([bytes([c]) for c in encoded], self.encoding)) ) def test_readline(self): def getreader(input): - stream = StringIO.StringIO(input.encode(self.encoding)) + stream = cStringIO.StringIO(input.encode(self.encoding)) return codecs.getreader(self.encoding)(stream) def readalllines(input, keepends=True, size=None): @@ -215,13 +215,13 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling): ' break\r\n', ' \r\n', ] - stream = StringIO.StringIO("".join(s).encode(self.encoding)) + stream = cStringIO.StringIO("".join(s).encode(self.encoding)) reader = codecs.getreader(self.encoding)(stream) for (i, line) in enumerate(reader): self.assertEqual(line, s[i]) def test_readlinequeue(self): - q = Queue() + q = Queue(b"") writer = codecs.getwriter(self.encoding)(q) reader = codecs.getreader(self.encoding)(q) @@ -253,7 +253,7 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling): s3 = "next line.\r\n" s = (s1+s2+s3).encode(self.encoding) - stream = StringIO.StringIO(s) + stream = cStringIO.StringIO(s) reader = codecs.getreader(self.encoding)(stream) self.assertEqual(reader.readline(), s1) self.assertEqual(reader.readline(), s2) @@ -268,7 +268,7 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling): s5 = "againokay.\r\n" s = (s1+s2+s3+s4+s5).encode(self.encoding) - stream = StringIO.StringIO(s) + stream = cStringIO.StringIO(s) reader = codecs.getreader(self.encoding)(stream) self.assertEqual(reader.readline(), s1) self.assertEqual(reader.readline(), s2) @@ -280,13 +280,13 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling): class UTF16Test(ReadTest): encoding = "utf-16" - spamle = '\xff\xfes\x00p\x00a\x00m\x00s\x00p\x00a\x00m\x00' - spambe = '\xfe\xff\x00s\x00p\x00a\x00m\x00s\x00p\x00a\x00m' + spamle = b'\xff\xfes\x00p\x00a\x00m\x00s\x00p\x00a\x00m\x00' + spambe = b'\xfe\xff\x00s\x00p\x00a\x00m\x00s\x00p\x00a\x00m' def test_only_one_bom(self): _,_,reader,writer = codecs.lookup(self.encoding) # encode some stream - s = StringIO.StringIO() + s = cStringIO.StringIO() f = writer(s) f.write("spam") f.write("spam") @@ -294,16 +294,16 @@ class UTF16Test(ReadTest): # check whether there is exactly one BOM in it self.assert_(d == self.spamle or d == self.spambe) # try to read it back - s = StringIO.StringIO(d) + s = cStringIO.StringIO(d) f = reader(s) self.assertEquals(f.read(), "spamspam") def test_badbom(self): - s = StringIO.StringIO("\xff\xff") + s = cStringIO.StringIO(b"\xff\xff") f = codecs.getreader(self.encoding)(s) self.assertRaises(UnicodeError, f.read) - s = StringIO.StringIO("\xff\xff\xff\xff") + s = cStringIO.StringIO(b"\xff\xff\xff\xff") f = codecs.getreader(self.encoding)(s) self.assertRaises(UnicodeError, f.read) @@ -326,7 +326,7 @@ class UTF16Test(ReadTest): def test_errors(self): self.assertRaises(UnicodeDecodeError, codecs.utf_16_decode, - "\xff", "strict", True) + b"\xff", "strict", True) def test_decoder_state(self): self.check_state_handling_decode(self.encoding, @@ -354,7 +354,7 @@ class UTF16LETest(ReadTest): def test_errors(self): self.assertRaises(UnicodeDecodeError, codecs.utf_16_le_decode, - "\xff", "strict", True) + b"\xff", "strict", True) class UTF16BETest(ReadTest): encoding = "utf-16-be" @@ -376,7 +376,7 @@ class UTF16BETest(ReadTest): def test_errors(self): self.assertRaises(UnicodeDecodeError, codecs.utf_16_be_decode, - "\xff", "strict", True) + b"\xff", "strict", True) class UTF8Test(ReadTest): encoding = "utf-8" @@ -412,7 +412,7 @@ class UTF7Test(ReadTest): class UTF16ExTest(unittest.TestCase): def test_errors(self): - self.assertRaises(UnicodeDecodeError, codecs.utf_16_ex_decode, "\xff", "strict", 0, True) + self.assertRaises(UnicodeDecodeError, codecs.utf_16_ex_decode, b"\xff", "strict", 0, True) def test_bad_args(self): self.assertRaises(TypeError, codecs.utf_16_ex_decode) @@ -474,7 +474,7 @@ class UTF8SigTest(ReadTest): def test_bug1601501(self): # SF bug #1601501: check that the codec works with a buffer - str("\xef\xbb\xbf", "utf-8-sig") + str(b"\xef\xbb\xbf", "utf-8-sig") def test_bom(self): d = codecs.getincrementaldecoder("utf-8-sig")() @@ -492,7 +492,7 @@ class EscapeDecodeTest(unittest.TestCase): class RecodingTest(unittest.TestCase): def test_recoding(self): - f = StringIO.StringIO() + f = cStringIO.StringIO() f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8") f2.write("a") f2.close() @@ -856,8 +856,7 @@ class IDNACodecTest(unittest.TestCase): self.assertEquals("pyth\xf6n.org.".encode("idna"), "xn--pythn-mua.org.") def test_stream(self): - import StringIO - r = codecs.getreader("idna")(StringIO.StringIO("abc")) + r = codecs.getreader("idna")(cStringIO.StringIO(b"abc")) r.read(3) self.assertEquals(r.read(), "") @@ -922,18 +921,18 @@ class IDNACodecTest(unittest.TestCase): class CodecsModuleTest(unittest.TestCase): def test_decode(self): - self.assertEquals(codecs.decode('\xe4\xf6\xfc', 'latin-1'), + self.assertEquals(codecs.decode(b'\xe4\xf6\xfc', 'latin-1'), '\xe4\xf6\xfc') self.assertRaises(TypeError, codecs.decode) - self.assertEquals(codecs.decode('abc'), 'abc') - self.assertRaises(UnicodeDecodeError, codecs.decode, '\xff', 'ascii') + self.assertEquals(codecs.decode(b'abc'), 'abc') + self.assertRaises(UnicodeDecodeError, codecs.decode, b'\xff', 'ascii') def test_encode(self): self.assertEquals(codecs.encode('\xe4\xf6\xfc', 'latin-1'), - '\xe4\xf6\xfc') + b'\xe4\xf6\xfc') self.assertRaises(TypeError, codecs.encode) self.assertRaises(LookupError, codecs.encode, "foo", "__spam__") - self.assertEquals(codecs.encode('abc'), 'abc') + self.assertEquals(codecs.encode('abc'), b'abc') self.assertRaises(UnicodeEncodeError, codecs.encode, '\xffff', 'ascii') def test_register(self): @@ -965,7 +964,7 @@ class StreamReaderTest(unittest.TestCase): def setUp(self): self.reader = codecs.getreader('utf-8') - self.stream = StringIO.StringIO('\xed\x95\x9c\n\xea\xb8\x80') + self.stream = cStringIO.StringIO(b'\xed\x95\x9c\n\xea\xb8\x80') def test_readlines(self): f = self.reader(self.stream) @@ -974,27 +973,27 @@ class StreamReaderTest(unittest.TestCase): class EncodedFileTest(unittest.TestCase): def test_basic(self): - f = StringIO.StringIO('\xed\x95\x9c\n\xea\xb8\x80') + f = cStringIO.StringIO(b'\xed\x95\x9c\n\xea\xb8\x80') ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8') - self.assertEquals(ef.read(), '\\\xd5\n\x00\x00\xae') + self.assertEquals(ef.read(), b'\\\xd5\n\x00\x00\xae') - f = StringIO.StringIO() + f = cStringIO.StringIO() ef = codecs.EncodedFile(f, 'utf-8', 'latin1') - ef.write('\xc3\xbc') - self.assertEquals(f.getvalue(), '\xfc') + ef.write(b'\xc3\xbc') + self.assertEquals(f.getvalue(), b'\xfc') class Str2StrTest(unittest.TestCase): def test_read(self): sin = "\x80".encode("base64_codec") - reader = codecs.getreader("base64_codec")(StringIO.StringIO(sin)) + reader = codecs.getreader("base64_codec")(cStringIO.StringIO(sin)) sout = reader.read() self.assertEqual(sout, "\x80") self.assert_(isinstance(sout, str)) def test_readline(self): sin = "\x80".encode("base64_codec") - reader = codecs.getreader("base64_codec")(StringIO.StringIO(sin)) + reader = codecs.getreader("base64_codec")(cStringIO.StringIO(sin)) sout = reader.readline() self.assertEqual(sout, "\x80") self.assert_(isinstance(sout, str)) @@ -1162,25 +1161,25 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): elif encoding == "latin_1": name = "latin_1" self.assertEqual(encoding.replace("_", "-"), name.replace("_", "-")) - (bytes, size) = codecs.getencoder(encoding)(s) + (b, size) = codecs.getencoder(encoding)(s) if encoding != "unicode_internal": self.assertEqual(size, len(s), "%r != %r (encoding=%r)" % (size, len(s), encoding)) - (chars, size) = codecs.getdecoder(encoding)(bytes) + (chars, size) = codecs.getdecoder(encoding)(b) self.assertEqual(chars, s, "%r != %r (encoding=%r)" % (chars, s, encoding)) if encoding not in broken_unicode_with_streams: # check stream reader/writer - q = Queue() + q = Queue(b"") writer = codecs.getwriter(encoding)(q) - encodedresult = "" + encodedresult = b"" for c in s: writer.write(c) encodedresult += q.read() - q = Queue() + q = Queue(b"") reader = codecs.getreader(encoding)(q) decodedresult = "" for c in encodedresult: - q.write(c) + q.write(bytes([c])) decodedresult += reader.read() self.assertEqual(decodedresult, s, "%r != %r (encoding=%r)" % (decodedresult, s, encoding)) @@ -1194,26 +1193,26 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): pass else: # check incremental decoder/encoder - encodedresult = "" + encodedresult = b"" for c in s: encodedresult += encoder.encode(c) encodedresult += encoder.encode("", True) decoder = codecs.getincrementaldecoder(encoding)() decodedresult = "" for c in encodedresult: - decodedresult += decoder.decode(c) + decodedresult += decoder.decode(bytes([c])) decodedresult += decoder.decode("", True) self.assertEqual(decodedresult, s, "%r != %r (encoding=%r)" % (decodedresult, s, encoding)) # check C API - encodedresult = "" + encodedresult = b"" for c in s: encodedresult += cencoder.encode(c) encodedresult += cencoder.encode("", True) cdecoder = _testcapi.codec_incrementaldecoder(encoding) decodedresult = "" for c in encodedresult: - decodedresult += cdecoder.decode(c) + decodedresult += cdecoder.decode(bytes([c])) decodedresult += cdecoder.decode("", True) self.assertEqual(decodedresult, s, "%r != %r (encoding=%r)" % (decodedresult, s, encoding)) @@ -1233,14 +1232,14 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): except LookupError: # no IncrementalEncoder pass else: - encodedresult = "".join(encoder.encode(c) for c in s) + encodedresult = b"".join(encoder.encode(c) for c in s) decoder = codecs.getincrementaldecoder(encoding)("ignore") - decodedresult = "".join(decoder.decode(c) for c in encodedresult) + decodedresult = "".join(decoder.decode(bytes([c])) for c in encodedresult) self.assertEqual(decodedresult, s, "%r != %r (encoding=%r)" % (decodedresult, s, encoding)) - encodedresult = "".join(cencoder.encode(c) for c in s) + encodedresult = b"".join(cencoder.encode(c) for c in s) cdecoder = _testcapi.codec_incrementaldecoder(encoding, "ignore") - decodedresult = "".join(cdecoder.decode(c) for c in encodedresult) + decodedresult = "".join(cdecoder.decode(bytes([c])) for c in encodedresult) self.assertEqual(decodedresult, s, "%r != %r (encoding=%r)" % (decodedresult, s, encoding)) def test_seek(self): @@ -1251,7 +1250,7 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): continue if encoding in broken_unicode_with_streams: continue - reader = codecs.getreader(encoding)(StringIO.StringIO(s.encode(encoding))) + reader = codecs.getreader(encoding)(cStringIO.StringIO(s.encode(encoding))) for t in xrange(5): # Test that calling seek resets the internal codec state and buffers reader.seek(0, 0) @@ -1288,39 +1287,39 @@ class BasicStrTest(unittest.TestCase): def test_basics(self): s = "abc123" for encoding in all_string_encodings: - (bytes, size) = codecs.getencoder(encoding)(s) + (encoded, size) = codecs.getencoder(encoding)(s) self.assertEqual(size, len(s)) - (chars, size) = codecs.getdecoder(encoding)(bytes) + (chars, size) = codecs.getdecoder(encoding)(encoded) self.assertEqual(chars, s, "%r != %r (encoding=%r)" % (chars, s, encoding)) class CharmapTest(unittest.TestCase): def test_decode_with_string_map(self): self.assertEquals( - codecs.charmap_decode("\x00\x01\x02", "strict", "abc"), + codecs.charmap_decode(b"\x00\x01\x02", "strict", "abc"), ("abc", 3) ) self.assertEquals( - codecs.charmap_decode("\x00\x01\x02", "replace", "ab"), + codecs.charmap_decode(b"\x00\x01\x02", "replace", "ab"), ("ab\ufffd", 3) ) self.assertEquals( - codecs.charmap_decode("\x00\x01\x02", "replace", "ab\ufffe"), + codecs.charmap_decode(b"\x00\x01\x02", "replace", "ab\ufffe"), ("ab\ufffd", 3) ) self.assertEquals( - codecs.charmap_decode("\x00\x01\x02", "ignore", "ab"), + codecs.charmap_decode(b"\x00\x01\x02", "ignore", "ab"), ("ab", 3) ) self.assertEquals( - codecs.charmap_decode("\x00\x01\x02", "ignore", "ab\ufffe"), + codecs.charmap_decode(b"\x00\x01\x02", "ignore", "ab\ufffe"), ("ab", 3) ) - allbytes = "".join(chr(i) for i in xrange(256)) + allbytes = bytes(xrange(256)) self.assertEquals( codecs.charmap_decode(allbytes, "ignore", ""), ("", len(allbytes)) @@ -1328,12 +1327,12 @@ class CharmapTest(unittest.TestCase): class WithStmtTest(unittest.TestCase): def test_encodedfile(self): - f = StringIO.StringIO("\xc3\xbc") + f = cStringIO.StringIO(b"\xc3\xbc") with codecs.EncodedFile(f, "latin-1", "utf-8") as ef: - self.assertEquals(ef.read(), "\xfc") + self.assertEquals(ef.read(), b"\xfc") def test_streamreaderwriter(self): - f = StringIO.StringIO("\xc3\xbc") + f = cStringIO.StringIO(b"\xc3\xbc") info = codecs.lookup("utf-8") with codecs.StreamReaderWriter(f, info.streamreader, info.streamwriter, 'strict') as srw: -- cgit v0.12