diff options
Diffstat (limited to 'Lib/test/test_codecs.py')
| -rw-r--r-- | Lib/test/test_codecs.py | 543 |
1 files changed, 480 insertions, 63 deletions
diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index a8b3da0..8fe21fb 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -1,4 +1,5 @@ import codecs +import contextlib import io import locale import sys @@ -96,7 +97,7 @@ class ReadTest(MixInCheckStateHandling): self.assertEqual(r.read(), "") self.assertEqual(r.bytebuffer, b"") - # do the check again, this time using a incremental decoder + # do the check again, this time using an incremental decoder d = codecs.getincrementaldecoder(self.encoding)() result = "" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): @@ -342,8 +343,46 @@ class ReadTest(MixInCheckStateHandling): self.assertEqual(reader.readline(), s5) self.assertEqual(reader.readline(), "") + ill_formed_sequence_replace = "\ufffd" + + def test_lone_surrogates(self): + self.assertRaises(UnicodeEncodeError, "\ud800".encode, self.encoding) + self.assertEqual("[\uDC80]".encode(self.encoding, "backslashreplace"), + "[\\udc80]".encode(self.encoding)) + self.assertEqual("[\uDC80]".encode(self.encoding, "xmlcharrefreplace"), + "[�]".encode(self.encoding)) + self.assertEqual("[\uDC80]".encode(self.encoding, "ignore"), + "[]".encode(self.encoding)) + self.assertEqual("[\uDC80]".encode(self.encoding, "replace"), + "[?]".encode(self.encoding)) + + bom = "".encode(self.encoding) + for before, after in [("\U00010fff", "A"), ("[", "]"), + ("A", "\U00010fff")]: + before_sequence = before.encode(self.encoding)[len(bom):] + after_sequence = after.encode(self.encoding)[len(bom):] + test_string = before + "\uDC80" + after + test_sequence = (bom + before_sequence + + self.ill_formed_sequence + after_sequence) + self.assertRaises(UnicodeDecodeError, test_sequence.decode, + self.encoding) + self.assertEqual(test_string.encode(self.encoding, + "surrogatepass"), + test_sequence) + self.assertEqual(test_sequence.decode(self.encoding, + "surrogatepass"), + test_string) + self.assertEqual(test_sequence.decode(self.encoding, "ignore"), + before + after) + self.assertEqual(test_sequence.decode(self.encoding, "replace"), + before + self.ill_formed_sequence_replace + after) + class UTF32Test(ReadTest, unittest.TestCase): encoding = "utf-32" + if sys.byteorder == 'little': + ill_formed_sequence = b"\x80\xdc\x00\x00" + else: + ill_formed_sequence = b"\x00\x00\xdc\x80" spamle = (b'\xff\xfe\x00\x00' b's\x00\x00\x00p\x00\x00\x00a\x00\x00\x00m\x00\x00\x00' @@ -435,6 +474,7 @@ class UTF32Test(ReadTest, unittest.TestCase): class UTF32LETest(ReadTest, unittest.TestCase): encoding = "utf-32-le" + ill_formed_sequence = b"\x80\xdc\x00\x00" def test_partial(self): self.check_partial( @@ -479,6 +519,7 @@ class UTF32LETest(ReadTest, unittest.TestCase): class UTF32BETest(ReadTest, unittest.TestCase): encoding = "utf-32-be" + ill_formed_sequence = b"\x00\x00\xdc\x80" def test_partial(self): self.check_partial( @@ -524,6 +565,10 @@ class UTF32BETest(ReadTest, unittest.TestCase): class UTF16Test(ReadTest, unittest.TestCase): encoding = "utf-16" + if sys.byteorder == 'little': + ill_formed_sequence = b"\x80\xdc" + else: + ill_formed_sequence = b"\xdc\x80" spamle = b'\xff\xfes\x00p\x00a\x00m\x00s\x00p\x00a\x00m\x00' spambe = b'\xfe\xff\x00s\x00p\x00a\x00m\x00s\x00p\x00a\x00m' @@ -599,11 +644,14 @@ class UTF16Test(ReadTest, unittest.TestCase): self.addCleanup(support.unlink, support.TESTFN) with open(support.TESTFN, 'wb') as fp: fp.write(s) - with codecs.open(support.TESTFN, 'U', encoding=self.encoding) as reader: + with support.check_warnings(('', DeprecationWarning)): + reader = codecs.open(support.TESTFN, 'U', encoding=self.encoding) + with reader: self.assertEqual(reader.read(), s1) class UTF16LETest(ReadTest, unittest.TestCase): encoding = "utf-16-le" + ill_formed_sequence = b"\x80\xdc" def test_partial(self): self.check_partial( @@ -647,6 +695,7 @@ class UTF16LETest(ReadTest, unittest.TestCase): class UTF16BETest(ReadTest, unittest.TestCase): encoding = "utf-16-be" + ill_formed_sequence = b"\xdc\x80" def test_partial(self): self.check_partial( @@ -690,6 +739,8 @@ class UTF16BETest(ReadTest, unittest.TestCase): class UTF8Test(ReadTest, unittest.TestCase): encoding = "utf-8" + ill_formed_sequence = b"\xed\xb2\x80" + ill_formed_sequence_replace = "\ufffd" * 3 def test_partial(self): self.check_partial( @@ -719,18 +770,11 @@ class UTF8Test(ReadTest, unittest.TestCase): u, u.encode(self.encoding)) def test_lone_surrogates(self): - self.assertRaises(UnicodeEncodeError, "\ud800".encode, "utf-8") - self.assertRaises(UnicodeDecodeError, b"\xed\xa0\x80".decode, "utf-8") - self.assertEqual("[\uDC80]".encode("utf-8", "backslashreplace"), - b'[\\udc80]') - self.assertEqual("[\uDC80]".encode("utf-8", "xmlcharrefreplace"), - b'[�]') - self.assertEqual("[\uDC80]".encode("utf-8", "surrogateescape"), + super().test_lone_surrogates() + # not sure if this is making sense for + # UTF-16 and UTF-32 + self.assertEqual("[\uDC80]".encode('utf-8', "surrogateescape"), b'[\x80]') - self.assertEqual("[\uDC80]".encode("utf-8", "ignore"), - b'[]') - self.assertEqual("[\uDC80]".encode("utf-8", "replace"), - b'[?]') def test_surrogatepass_handler(self): self.assertEqual("abc\ud800def".encode("utf-8", "surrogatepass"), @@ -854,6 +898,32 @@ class CP65001Test(ReadTest, unittest.TestCase): class UTF7Test(ReadTest, unittest.TestCase): encoding = "utf-7" + def test_ascii(self): + # Set D (directly encoded characters) + set_d = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' + 'abcdefghijklmnopqrstuvwxyz' + '0123456789' + '\'(),-./:?') + self.assertEqual(set_d.encode(self.encoding), set_d.encode('ascii')) + self.assertEqual(set_d.encode('ascii').decode(self.encoding), set_d) + # Set O (optional direct characters) + set_o = ' !"#$%&*;<=>@[]^_`{|}' + self.assertEqual(set_o.encode(self.encoding), set_o.encode('ascii')) + self.assertEqual(set_o.encode('ascii').decode(self.encoding), set_o) + # + + self.assertEqual('a+b'.encode(self.encoding), b'a+-b') + self.assertEqual(b'a+-b'.decode(self.encoding), 'a+b') + # White spaces + ws = ' \t\n\r' + self.assertEqual(ws.encode(self.encoding), ws.encode('ascii')) + self.assertEqual(ws.encode('ascii').decode(self.encoding), ws) + # Other ASCII characters + other_ascii = ''.join(sorted(set(bytes(range(0x80)).decode()) - + set(set_d + set_o + '+' + ws))) + self.assertEqual(other_ascii.encode(self.encoding), + b'+AAAAAQACAAMABAAFAAYABwAIAAsADAAOAA8AEAARABIAEwAU' + b'ABUAFgAXABgAGQAaABsAHAAdAB4AHwBcAH4Afw-') + def test_partial(self): self.check_partial( 'a+-b\x00c\x80d\u0100e\U00010000f', @@ -895,7 +965,9 @@ class UTF7Test(ReadTest, unittest.TestCase): def test_errors(self): tests = [ + (b'\xffb', '\ufffdb'), (b'a\xffb', 'a\ufffdb'), + (b'a\xff\xffb', 'a\ufffd\ufffdb'), (b'a+IK', 'a\ufffd'), (b'a+IK-b', 'a\ufffdb'), (b'a+IK,b', 'a\ufffdb'), @@ -911,16 +983,50 @@ class UTF7Test(ReadTest, unittest.TestCase): (b'a+//,+IKw-b', 'a\ufffd\u20acb'), (b'a+///,+IKw-b', 'a\uffff\ufffd\u20acb'), (b'a+////,+IKw-b', 'a\uffff\ufffd\u20acb'), + (b'a+IKw-b\xff', 'a\u20acb\ufffd'), + (b'a+IKw\xffb', 'a\u20ac\ufffdb'), ] for raw, expected in tests: - self.assertRaises(UnicodeDecodeError, codecs.utf_7_decode, - raw, 'strict', True) - self.assertEqual(raw.decode('utf-7', 'replace'), expected) + with self.subTest(raw=raw): + self.assertRaises(UnicodeDecodeError, codecs.utf_7_decode, + raw, 'strict', True) + self.assertEqual(raw.decode('utf-7', 'replace'), expected) def test_nonbmp(self): self.assertEqual('\U000104A0'.encode(self.encoding), b'+2AHcoA-') self.assertEqual('\ud801\udca0'.encode(self.encoding), b'+2AHcoA-') self.assertEqual(b'+2AHcoA-'.decode(self.encoding), '\U000104A0') + self.assertEqual(b'+2AHcoA'.decode(self.encoding), '\U000104A0') + self.assertEqual('\u20ac\U000104A0'.encode(self.encoding), b'+IKzYAdyg-') + self.assertEqual(b'+IKzYAdyg-'.decode(self.encoding), '\u20ac\U000104A0') + self.assertEqual(b'+IKzYAdyg'.decode(self.encoding), '\u20ac\U000104A0') + self.assertEqual('\u20ac\u20ac\U000104A0'.encode(self.encoding), + b'+IKwgrNgB3KA-') + self.assertEqual(b'+IKwgrNgB3KA-'.decode(self.encoding), + '\u20ac\u20ac\U000104A0') + self.assertEqual(b'+IKwgrNgB3KA'.decode(self.encoding), + '\u20ac\u20ac\U000104A0') + + def test_lone_surrogates(self): + tests = [ + (b'a+2AE-b', 'a\ud801b'), + (b'a+2AE\xffb', 'a\ufffdb'), + (b'a+2AE', 'a\ufffd'), + (b'a+2AEA-b', 'a\ufffdb'), + (b'a+2AH-b', 'a\ufffdb'), + (b'a+IKzYAQ-b', 'a\u20ac\ud801b'), + (b'a+IKzYAQ\xffb', 'a\u20ac\ufffdb'), + (b'a+IKzYAQA-b', 'a\u20ac\ufffdb'), + (b'a+IKzYAd-b', 'a\u20ac\ufffdb'), + (b'a+IKwgrNgB-b', 'a\u20ac\u20ac\ud801b'), + (b'a+IKwgrNgB\xffb', 'a\u20ac\u20ac\ufffdb'), + (b'a+IKwgrNgB', 'a\u20ac\u20ac\ufffd'), + (b'a+IKwgrNgBA-b', 'a\u20ac\u20ac\ufffdb'), + ] + for raw, expected in tests: + with self.subTest(raw=raw): + self.assertEqual(raw.decode('utf-7', 'replace'), expected) + class UTF16ExTest(unittest.TestCase): @@ -946,7 +1052,7 @@ class ReadBufferTest(unittest.TestCase): self.assertRaises(TypeError, codecs.readbuffer_encode) self.assertRaises(TypeError, codecs.readbuffer_encode, 42) -class UTF8SigTest(ReadTest, unittest.TestCase): +class UTF8SigTest(UTF8Test, unittest.TestCase): encoding = "utf-8-sig" def test_partial(self): @@ -1091,6 +1197,8 @@ class RecodingTest(unittest.TestCase): # Python used to crash on this at exit because of a refcount # bug in _codecsmodule.c + self.assertTrue(f.closed) + # From RFC 3492 punycode_testcases = [ # A Arabic (Egyptian): @@ -1543,6 +1651,16 @@ class IDNACodecTest(unittest.TestCase): self.assertEqual(encoder.encode("ample.org."), b"xn--xample-9ta.org.") self.assertEqual(encoder.encode("", True), b"") + def test_errors(self): + """Only supports "strict" error handler""" + "python.org".encode("idna", "strict") + b"python.org".decode("idna", "strict") + for errors in ("ignore", "replace", "backslashreplace", + "surrogateescape"): + self.assertRaises(Exception, "python.org".encode, "idna", errors) + self.assertRaises(Exception, + b"python.org".decode, "idna", errors) + class CodecsModuleTest(unittest.TestCase): def test_decode(self): @@ -1598,6 +1716,46 @@ class CodecsModuleTest(unittest.TestCase): c = codecs.lookup('ASCII') self.assertEqual(c.name, 'ascii') + def test_all(self): + api = ( + "encode", "decode", + "register", "CodecInfo", "Codec", "IncrementalEncoder", + "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", + "getencoder", "getdecoder", "getincrementalencoder", + "getincrementaldecoder", "getreader", "getwriter", + "register_error", "lookup_error", + "strict_errors", "replace_errors", "ignore_errors", + "xmlcharrefreplace_errors", "backslashreplace_errors", + "open", "EncodedFile", + "iterencode", "iterdecode", + "BOM", "BOM_BE", "BOM_LE", + "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", + "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", + "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented + "StreamReaderWriter", "StreamRecoder", + ) + self.assertCountEqual(api, codecs.__all__) + for api in codecs.__all__: + getattr(codecs, api) + + def test_open(self): + self.addCleanup(support.unlink, support.TESTFN) + for mode in ('w', 'r', 'r+', 'w+', 'a', 'a+'): + with self.subTest(mode), \ + codecs.open(support.TESTFN, mode, 'ascii') as file: + self.assertIsInstance(file, codecs.StreamReaderWriter) + + def test_undefined(self): + self.assertRaises(UnicodeError, codecs.encode, 'abc', 'undefined') + self.assertRaises(UnicodeError, codecs.decode, b'abc', 'undefined') + self.assertRaises(UnicodeError, codecs.encode, '', 'undefined') + self.assertRaises(UnicodeError, codecs.decode, b'', 'undefined') + for errors in ('strict', 'ignore', 'replace', 'backslashreplace'): + self.assertRaises(UnicodeError, + codecs.encode, 'abc', 'undefined', errors) + self.assertRaises(UnicodeError, + codecs.decode, b'abc', 'undefined', errors) + class StreamReaderTest(unittest.TestCase): def setUp(self): @@ -1628,6 +1786,7 @@ all_unicode_encodings = [ "cp037", "cp1006", "cp1026", + "cp1125", "cp1140", "cp1250", "cp1251", @@ -1730,13 +1889,10 @@ if hasattr(codecs, "mbcs_encode"): # "undefined" # The following encodings don't work in stateful mode -broken_unicode_with_streams = [ +broken_unicode_with_stateful = [ "punycode", "unicode_internal" ] -broken_incremental_coders = broken_unicode_with_streams + [ - "idna", -] class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): def test_basics(self): @@ -1756,7 +1912,7 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): (chars, size) = codecs.getdecoder(encoding)(b) self.assertEqual(chars, s, "encoding=%r" % encoding) - if encoding not in broken_unicode_with_streams: + if encoding not in broken_unicode_with_stateful: # check stream reader/writer q = Queue(b"") writer = codecs.getwriter(encoding)(q) @@ -1774,7 +1930,7 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): decodedresult += reader.read() self.assertEqual(decodedresult, s, "encoding=%r" % encoding) - if encoding not in broken_incremental_coders: + if encoding not in broken_unicode_with_stateful: # check incremental decoder/encoder and iterencode()/iterdecode() try: encoder = codecs.getincrementalencoder(encoding)() @@ -1823,7 +1979,7 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): from _testcapi import codec_incrementalencoder, codec_incrementaldecoder s = "abc123" # all codecs should be able to encode these for encoding in all_unicode_encodings: - if encoding not in broken_incremental_coders: + if encoding not in broken_unicode_with_stateful: # check incremental decoder/encoder (fetched via the C API) try: cencoder = codec_incrementalencoder(encoding) @@ -1863,7 +2019,7 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): for encoding in all_unicode_encodings: if encoding == "idna": # FIXME: See SF bug #1163178 continue - if encoding in broken_unicode_with_streams: + if encoding in broken_unicode_with_stateful: continue reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding))) for t in range(5): @@ -1896,7 +2052,7 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): # Check that getstate() and setstate() handle the state properly u = "abc123" for encoding in all_unicode_encodings: - if encoding not in broken_incremental_coders: + if encoding not in broken_unicode_with_stateful: self.check_state_handling_decode(encoding, u, u.encode(encoding)) self.check_state_handling_encode(encoding, u, u.encode(encoding)) @@ -2100,6 +2256,7 @@ class WithStmtTest(unittest.TestCase): f = io.BytesIO(b"\xc3\xbc") with codecs.EncodedFile(f, "latin-1", "utf-8") as ef: self.assertEqual(ef.read(), b"\xfc") + self.assertTrue(f.closed) def test_streamreaderwriter(self): f = io.BytesIO(b"\xc3\xbc") @@ -2370,60 +2527,93 @@ bytes_transform_encodings = [ "quopri_codec", "hex_codec", ] + +transform_aliases = { + "base64_codec": ["base64", "base_64"], + "uu_codec": ["uu"], + "quopri_codec": ["quopri", "quoted_printable", "quotedprintable"], + "hex_codec": ["hex"], + "rot_13": ["rot13"], +} + try: import zlib except ImportError: - pass + zlib = None else: bytes_transform_encodings.append("zlib_codec") + transform_aliases["zlib_codec"] = ["zip", "zlib"] try: import bz2 except ImportError: pass else: bytes_transform_encodings.append("bz2_codec") + transform_aliases["bz2_codec"] = ["bz2"] class TransformCodecTest(unittest.TestCase): def test_basics(self): binput = bytes(range(256)) for encoding in bytes_transform_encodings: - # generic codecs interface - (o, size) = codecs.getencoder(encoding)(binput) - self.assertEqual(size, len(binput)) - (i, size) = codecs.getdecoder(encoding)(o) - self.assertEqual(size, len(o)) - self.assertEqual(i, binput) + with self.subTest(encoding=encoding): + # generic codecs interface + (o, size) = codecs.getencoder(encoding)(binput) + self.assertEqual(size, len(binput)) + (i, size) = codecs.getdecoder(encoding)(o) + self.assertEqual(size, len(o)) + self.assertEqual(i, binput) def test_read(self): for encoding in bytes_transform_encodings: - sin = codecs.encode(b"\x80", encoding) - reader = codecs.getreader(encoding)(io.BytesIO(sin)) - sout = reader.read() - self.assertEqual(sout, b"\x80") + with self.subTest(encoding=encoding): + sin = codecs.encode(b"\x80", encoding) + reader = codecs.getreader(encoding)(io.BytesIO(sin)) + sout = reader.read() + self.assertEqual(sout, b"\x80") def test_readline(self): for encoding in bytes_transform_encodings: - sin = codecs.encode(b"\x80", encoding) - reader = codecs.getreader(encoding)(io.BytesIO(sin)) - sout = reader.readline() - self.assertEqual(sout, b"\x80") + with self.subTest(encoding=encoding): + sin = codecs.encode(b"\x80", encoding) + reader = codecs.getreader(encoding)(io.BytesIO(sin)) + sout = reader.readline() + self.assertEqual(sout, b"\x80") + + def test_buffer_api_usage(self): + # We check all the transform codecs accept memoryview input + # for encoding and decoding + # and also that they roundtrip correctly + original = b"12345\x80" + for encoding in bytes_transform_encodings: + with self.subTest(encoding=encoding): + data = original + view = memoryview(data) + data = codecs.encode(data, encoding) + view_encoded = codecs.encode(view, encoding) + self.assertEqual(view_encoded, data) + view = memoryview(data) + data = codecs.decode(data, encoding) + self.assertEqual(data, original) + view_decoded = codecs.decode(view, encoding) + self.assertEqual(view_decoded, data) def test_text_to_binary_blacklists_binary_transforms(self): # Check binary -> binary codecs give a good error for str input bad_input = "bad input type" for encoding in bytes_transform_encodings: - fmt = (r"{!r} is not a text encoding; " - r"use codecs.encode\(\) to handle arbitrary codecs") - msg = fmt.format(encoding) - with self.assertRaisesRegex(LookupError, msg) as failure: - bad_input.encode(encoding) - self.assertIsNone(failure.exception.__cause__) + with self.subTest(encoding=encoding): + fmt = ( "{!r} is not a text encoding; " + "use codecs.encode\(\) to handle arbitrary codecs") + msg = fmt.format(encoding) + with self.assertRaisesRegex(LookupError, msg) as failure: + bad_input.encode(encoding) + self.assertIsNone(failure.exception.__cause__) def test_text_to_binary_blacklists_text_transforms(self): # Check str.encode gives a good error message for str -> str codecs msg = (r"^'rot_13' is not a text encoding; " - r"use codecs.encode\(\) to handle arbitrary codecs") + "use codecs.encode\(\) to handle arbitrary codecs") with self.assertRaisesRegex(LookupError, msg): "just an example message".encode("rot_13") @@ -2432,23 +2622,250 @@ class TransformCodecTest(unittest.TestCase): # message for binary -> binary codecs data = b"encode first to ensure we meet any format restrictions" for encoding in bytes_transform_encodings: - encoded_data = codecs.encode(data, encoding) - fmt = (r"{!r} is not a text encoding; " - r"use codecs.decode\(\) to handle arbitrary codecs") - msg = fmt.format(encoding) - with self.assertRaisesRegex(LookupError, msg): - encoded_data.decode(encoding) - with self.assertRaisesRegex(LookupError, msg): - bytearray(encoded_data).decode(encoding) + with self.subTest(encoding=encoding): + encoded_data = codecs.encode(data, encoding) + fmt = (r"{!r} is not a text encoding; " + "use codecs.decode\(\) to handle arbitrary codecs") + msg = fmt.format(encoding) + with self.assertRaisesRegex(LookupError, msg): + encoded_data.decode(encoding) + with self.assertRaisesRegex(LookupError, msg): + bytearray(encoded_data).decode(encoding) def test_binary_to_text_blacklists_text_transforms(self): # Check str -> str codec gives a good error for binary input for bad_input in (b"immutable", bytearray(b"mutable")): - msg = (r"^'rot_13' is not a text encoding; " - r"use codecs.decode\(\) to handle arbitrary codecs") - with self.assertRaisesRegex(LookupError, msg) as failure: - bad_input.decode("rot_13") - self.assertIsNone(failure.exception.__cause__) + with self.subTest(bad_input=bad_input): + msg = (r"^'rot_13' is not a text encoding; " + "use codecs.decode\(\) to handle arbitrary codecs") + with self.assertRaisesRegex(LookupError, msg) as failure: + bad_input.decode("rot_13") + self.assertIsNone(failure.exception.__cause__) + + @unittest.skipUnless(zlib, "Requires zlib support") + def test_custom_zlib_error_is_wrapped(self): + # Check zlib codec gives a good error for malformed input + msg = "^decoding with 'zlib_codec' codec failed" + with self.assertRaisesRegex(Exception, msg) as failure: + codecs.decode(b"hello", "zlib_codec") + self.assertIsInstance(failure.exception.__cause__, + type(failure.exception)) + + def test_custom_hex_error_is_wrapped(self): + # Check hex codec gives a good error for malformed input + msg = "^decoding with 'hex_codec' codec failed" + with self.assertRaisesRegex(Exception, msg) as failure: + codecs.decode(b"hello", "hex_codec") + self.assertIsInstance(failure.exception.__cause__, + type(failure.exception)) + + # Unfortunately, the bz2 module throws OSError, which the codec + # machinery currently can't wrap :( + + # Ensure codec aliases from http://bugs.python.org/issue7475 work + def test_aliases(self): + for codec_name, aliases in transform_aliases.items(): + expected_name = codecs.lookup(codec_name).name + for alias in aliases: + with self.subTest(alias=alias): + info = codecs.lookup(alias) + self.assertEqual(info.name, expected_name) + + def test_quopri_stateless(self): + # Should encode with quotetabs=True + encoded = codecs.encode(b"space tab\teol \n", "quopri-codec") + self.assertEqual(encoded, b"space=20tab=09eol=20\n") + # But should still support unescaped tabs and spaces + unescaped = b"space tab eol\n" + self.assertEqual(codecs.decode(unescaped, "quopri-codec"), unescaped) + + def test_uu_invalid(self): + # Missing "begin" line + self.assertRaises(ValueError, codecs.decode, b"", "uu-codec") + + +# The codec system tries to wrap exceptions in order to ensure the error +# mentions the operation being performed and the codec involved. We +# currently *only* want this to happen for relatively stateless +# exceptions, where the only significant information they contain is their +# type and a single str argument. + +# Use a local codec registry to avoid appearing to leak objects when +# registering multiple seach functions +_TEST_CODECS = {} + +def _get_test_codec(codec_name): + return _TEST_CODECS.get(codec_name) +codecs.register(_get_test_codec) # Returns None, not usable as a decorator + +try: + # Issue #22166: Also need to clear the internal cache in CPython + from _codecs import _forget_codec +except ImportError: + def _forget_codec(codec_name): + pass + + +class ExceptionChainingTest(unittest.TestCase): + + def setUp(self): + # There's no way to unregister a codec search function, so we just + # ensure we render this one fairly harmless after the test + # case finishes by using the test case repr as the codec name + # The codecs module normalizes codec names, although this doesn't + # appear to be formally documented... + # We also make sure we use a truly unique id for the custom codec + # to avoid issues with the codec cache when running these tests + # multiple times (e.g. when hunting for refleaks) + unique_id = repr(self) + str(id(self)) + self.codec_name = encodings.normalize_encoding(unique_id).lower() + + # We store the object to raise on the instance because of a bad + # interaction between the codec caching (which means we can't + # recreate the codec entry) and regrtest refleak hunting (which + # runs the same test instance multiple times). This means we + # need to ensure the codecs call back in to the instance to find + # out which exception to raise rather than binding them in a + # closure to an object that may change on the next run + self.obj_to_raise = RuntimeError + + def tearDown(self): + _TEST_CODECS.pop(self.codec_name, None) + # Issue #22166: Also pop from caches to avoid appearance of ref leaks + encodings._cache.pop(self.codec_name, None) + try: + _forget_codec(self.codec_name) + except KeyError: + pass + + def set_codec(self, encode, decode): + codec_info = codecs.CodecInfo(encode, decode, + name=self.codec_name) + _TEST_CODECS[self.codec_name] = codec_info + + @contextlib.contextmanager + def assertWrapped(self, operation, exc_type, msg): + full_msg = r"{} with {!r} codec failed \({}: {}\)".format( + operation, self.codec_name, exc_type.__name__, msg) + with self.assertRaisesRegex(exc_type, full_msg) as caught: + yield caught + self.assertIsInstance(caught.exception.__cause__, exc_type) + self.assertIsNotNone(caught.exception.__cause__.__traceback__) + + def raise_obj(self, *args, **kwds): + # Helper to dynamically change the object raised by a test codec + raise self.obj_to_raise + + def check_wrapped(self, obj_to_raise, msg, exc_type=RuntimeError): + self.obj_to_raise = obj_to_raise + self.set_codec(self.raise_obj, self.raise_obj) + with self.assertWrapped("encoding", exc_type, msg): + "str_input".encode(self.codec_name) + with self.assertWrapped("encoding", exc_type, msg): + codecs.encode("str_input", self.codec_name) + with self.assertWrapped("decoding", exc_type, msg): + b"bytes input".decode(self.codec_name) + with self.assertWrapped("decoding", exc_type, msg): + codecs.decode(b"bytes input", self.codec_name) + + def test_raise_by_type(self): + self.check_wrapped(RuntimeError, "") + + def test_raise_by_value(self): + msg = "This should be wrapped" + self.check_wrapped(RuntimeError(msg), msg) + + def test_raise_grandchild_subclass_exact_size(self): + msg = "This should be wrapped" + class MyRuntimeError(RuntimeError): + __slots__ = () + self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError) + + def test_raise_subclass_with_weakref_support(self): + msg = "This should be wrapped" + class MyRuntimeError(RuntimeError): + pass + self.check_wrapped(MyRuntimeError(msg), msg, MyRuntimeError) + + def check_not_wrapped(self, obj_to_raise, msg): + def raise_obj(*args, **kwds): + raise obj_to_raise + self.set_codec(raise_obj, raise_obj) + with self.assertRaisesRegex(RuntimeError, msg): + "str input".encode(self.codec_name) + with self.assertRaisesRegex(RuntimeError, msg): + codecs.encode("str input", self.codec_name) + with self.assertRaisesRegex(RuntimeError, msg): + b"bytes input".decode(self.codec_name) + with self.assertRaisesRegex(RuntimeError, msg): + codecs.decode(b"bytes input", self.codec_name) + + def test_init_override_is_not_wrapped(self): + class CustomInit(RuntimeError): + def __init__(self): + pass + self.check_not_wrapped(CustomInit, "") + + def test_new_override_is_not_wrapped(self): + class CustomNew(RuntimeError): + def __new__(cls): + return super().__new__(cls) + self.check_not_wrapped(CustomNew, "") + + def test_instance_attribute_is_not_wrapped(self): + msg = "This should NOT be wrapped" + exc = RuntimeError(msg) + exc.attr = 1 + self.check_not_wrapped(exc, "^{}$".format(msg)) + + def test_non_str_arg_is_not_wrapped(self): + self.check_not_wrapped(RuntimeError(1), "1") + + def test_multiple_args_is_not_wrapped(self): + msg_re = r"^\('a', 'b', 'c'\)$" + self.check_not_wrapped(RuntimeError('a', 'b', 'c'), msg_re) + + # http://bugs.python.org/issue19609 + def test_codec_lookup_failure_not_wrapped(self): + msg = "^unknown encoding: {}$".format(self.codec_name) + # The initial codec lookup should not be wrapped + with self.assertRaisesRegex(LookupError, msg): + "str input".encode(self.codec_name) + with self.assertRaisesRegex(LookupError, msg): + codecs.encode("str input", self.codec_name) + with self.assertRaisesRegex(LookupError, msg): + b"bytes input".decode(self.codec_name) + with self.assertRaisesRegex(LookupError, msg): + codecs.decode(b"bytes input", self.codec_name) + + def test_unflagged_non_text_codec_handling(self): + # The stdlib non-text codecs are now marked so they're + # pre-emptively skipped by the text model related methods + # However, third party codecs won't be flagged, so we still make + # sure the case where an inappropriate output type is produced is + # handled appropriately + def encode_to_str(*args, **kwds): + return "not bytes!", 0 + def decode_to_bytes(*args, **kwds): + return b"not str!", 0 + self.set_codec(encode_to_str, decode_to_bytes) + # No input or output type checks on the codecs module functions + encoded = codecs.encode(None, self.codec_name) + self.assertEqual(encoded, "not bytes!") + decoded = codecs.decode(None, self.codec_name) + self.assertEqual(decoded, b"not str!") + # Text model methods should complain + fmt = (r"^{!r} encoder returned 'str' instead of 'bytes'; " + "use codecs.encode\(\) to encode to arbitrary types$") + msg = fmt.format(self.codec_name) + with self.assertRaisesRegex(TypeError, msg): + "str_input".encode(self.codec_name) + fmt = (r"^{!r} decoder returned 'bytes' instead of 'str'; " + "use codecs.decode\(\) to decode to arbitrary types$") + msg = fmt.format(self.codec_name) + with self.assertRaisesRegex(TypeError, msg): + b"bytes input".decode(self.codec_name) + @unittest.skipUnless(sys.platform == 'win32', @@ -2460,8 +2877,8 @@ class CodePageTest(unittest.TestCase): def test_invalid_code_page(self): self.assertRaises(ValueError, codecs.code_page_encode, -1, 'a') self.assertRaises(ValueError, codecs.code_page_decode, -1, b'a') - self.assertRaises(WindowsError, codecs.code_page_encode, 123, 'a') - self.assertRaises(WindowsError, codecs.code_page_decode, 123, b'a') + self.assertRaises(OSError, codecs.code_page_encode, 123, 'a') + self.assertRaises(OSError, codecs.code_page_decode, 123, b'a') def test_code_page_name(self): self.assertRaisesRegex(UnicodeEncodeError, 'cp932', |
