From 98fe1a0c3bacdc51071d960d8d76b3b9f5b0d8c6 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 27 May 2011 01:51:18 +0200 Subject: Issue #8796: codecs.open() calls the builtin open() function instead of using StreamReaderWriter. Deprecate StreamReader, StreamWriter, StreamReaderWriter, StreamRecoder and EncodedFile() of the codec module. Use the builtin open() function or io.TextIOWrapper instead. --- Doc/library/codecs.rst | 25 ++++++++ Lib/codecs.py | 25 ++++---- Lib/test/test_codecs.py | 152 +++++++++++++++++++++++++++++++++--------------- Misc/NEWS | 5 ++ 4 files changed, 148 insertions(+), 59 deletions(-) diff --git a/Doc/library/codecs.rst b/Doc/library/codecs.rst index 4d5058e..b58e410 100644 --- a/Doc/library/codecs.rst +++ b/Doc/library/codecs.rst @@ -85,6 +85,9 @@ It defines the following functions: In case a search function cannot find a given encoding, it should return ``None``. + .. deprecated:: 3.3 + *streamreader* and *streamwriter* attributes are now deprecated. + .. function:: lookup(encoding) @@ -139,6 +142,8 @@ functions which use :func:`lookup` for the codec lookup: Raises a :exc:`LookupError` in case the encoding cannot be found. + .. deprecated:: 3.3 + .. function:: getwriter(encoding) @@ -147,6 +152,8 @@ functions which use :func:`lookup` for the codec lookup: Raises a :exc:`LookupError` in case the encoding cannot be found. + .. deprecated:: 3.3 + .. function:: register_error(name, error_handler) @@ -217,6 +224,11 @@ utility functions: .. note:: + This function is kept for backward compatibility with Python 2, the + builtin :func:`open` function should be used instead. + + .. note:: + The wrapped version's methods will accept and return strings only. Bytes arguments will be rejected. @@ -251,6 +263,8 @@ utility functions: ``'strict'``, which causes :exc:`ValueError` to be raised in case an encoding error occurs. + .. deprecated:: 3.3 + .. function:: iterencode(iterator, encoding, errors='strict', **kwargs) @@ -563,6 +577,9 @@ The :class:`StreamWriter` class is a subclass of :class:`Codec` and defines the following methods which every stream writer must define in order to be compatible with the Python codec registry. +.. deprecated:: 3.3 + Use the builtin the :class:`io.TextIOWrapper` class. + .. class:: StreamWriter(stream[, errors]) @@ -628,6 +645,9 @@ The :class:`StreamReader` class is a subclass of :class:`Codec` and defines the following methods which every stream reader must define in order to be compatible with the Python codec registry. +.. deprecated:: 3.3 + Use the builtin the :class:`io.TextIOWrapper` class. + .. class:: StreamReader(stream[, errors]) @@ -728,6 +748,9 @@ and write modes. The design is such that one can use the factory functions returned by the :func:`lookup` function to construct the instance. +.. deprecated:: 3.3 + Use the :class:`io.TextIOWrapper` class. + .. class:: StreamReaderWriter(stream, Reader, Writer, errors) @@ -752,6 +775,8 @@ which is sometimes useful when dealing with different encoding environments. The design is such that one can use the factory functions returned by the :func:`lookup` function to construct the instance. +.. deprecated:: 3.3 + .. class:: StreamRecoder(stream, encode, decode, Reader, Writer, errors) diff --git a/Lib/codecs.py b/Lib/codecs.py index b150d64..ec7879f 100644 --- a/Lib/codecs.py +++ b/Lib/codecs.py @@ -345,6 +345,8 @@ class StreamWriter(Codec): The set of allowed parameter values can be extended via register_error. """ + import warnings + warnings.warn('use io.TextIOWrapper', DeprecationWarning, stacklevel=2) self.stream = stream self.errors = errors @@ -416,6 +418,8 @@ class StreamReader(Codec): The set of allowed parameter values can be extended via register_error. """ + import warnings + warnings.warn('use io.TextIOWrapper', DeprecationWarning, stacklevel=2) self.stream = stream self.errors = errors self.bytebuffer = b"" @@ -846,7 +850,7 @@ class StreamRecoder: ### Shortcuts -def open(filename, mode='rb', encoding=None, errors='strict', buffering=1): +def open(filename, mode='r', encoding=None, errors=None, buffering=1): """ Open an encoded file using the given mode and return a wrapped version providing transparent encoding/decoding. @@ -877,18 +881,13 @@ def open(filename, mode='rb', encoding=None, errors='strict', buffering=1): parameter. """ - if encoding is not None and \ - 'b' not in mode: - # Force opening of the file in binary mode - mode = mode + 'b' - file = builtins.open(filename, mode, buffering) - if encoding is None: - return file - info = lookup(encoding) - srw = StreamReaderWriter(file, info.streamreader, info.streamwriter, errors) - # Add attributes to simplify introspection - srw.encoding = encoding - return srw + if encoding is not None: + return builtins.open(filename, mode, buffering, + encoding, errors, newline='') + else: + if 'b' not in mode: + mode = mode + 'b' + return builtins.open(filename, mode, buffering, encoding, errors) def EncodedFile(file, data_encoding, file_encoding=None, errors='strict'): diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py index c0450e7..1f46560 100644 --- a/Lib/test/test_codecs.py +++ b/Lib/test/test_codecs.py @@ -1,7 +1,10 @@ from test import support -import unittest +import _testcapi import codecs -import sys, _testcapi, io +import io +import sys +import unittest +import warnings class Queue(object): """ @@ -63,7 +66,9 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling): # the StreamReader and check that the results equal the appropriate # entries from partialresults. q = Queue(b"") - r = codecs.getreader(self.encoding)(q) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + r = codecs.getreader(self.encoding)(q) result = "" for (c, partialresult) in zip(input.encode(self.encoding), partialresults): q.write(bytes([c])) @@ -106,7 +111,9 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling): return codecs.getreader(self.encoding)(stream) def readalllines(input, keepends=True, size=None): - reader = getreader(input) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = getreader(input) lines = [] while True: line = reader.readline(size=size, keepends=keepends) @@ -215,14 +222,18 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling): ' \r\n', ] stream = io.BytesIO("".join(s).encode(self.encoding)) - reader = codecs.getreader(self.encoding)(stream) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(self.encoding)(stream) for (i, line) in enumerate(reader): self.assertEqual(line, s[i]) def test_readlinequeue(self): q = Queue(b"") - writer = codecs.getwriter(self.encoding)(q) - reader = codecs.getreader(self.encoding)(q) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + writer = codecs.getwriter(self.encoding)(q) + reader = codecs.getreader(self.encoding)(q) # No lineends writer.write("foo\r") @@ -253,7 +264,9 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling): s = (s1+s2+s3).encode(self.encoding) stream = io.BytesIO(s) - reader = codecs.getreader(self.encoding)(stream) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(self.encoding)(stream) self.assertEqual(reader.readline(), s1) self.assertEqual(reader.readline(), s2) self.assertEqual(reader.readline(), s3) @@ -268,7 +281,9 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling): s = (s1+s2+s3+s4+s5).encode(self.encoding) stream = io.BytesIO(s) - reader = codecs.getreader(self.encoding)(stream) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(self.encoding)(stream) self.assertEqual(reader.readline(), s1) self.assertEqual(reader.readline(), s2) self.assertEqual(reader.readline(), s3) @@ -290,7 +305,9 @@ class UTF32Test(ReadTest): _,_,reader,writer = codecs.lookup(self.encoding) # encode some stream s = io.BytesIO() - f = writer(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = writer(s) f.write("spam") f.write("spam") d = s.getvalue() @@ -298,16 +315,22 @@ class UTF32Test(ReadTest): self.assertTrue(d == self.spamle or d == self.spambe) # try to read it back s = io.BytesIO(d) - f = reader(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = reader(s) self.assertEqual(f.read(), "spamspam") def test_badbom(self): s = io.BytesIO(4*b"\xff") - f = codecs.getreader(self.encoding)(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = codecs.getreader(self.encoding)(s) self.assertRaises(UnicodeError, f.read) s = io.BytesIO(8*b"\xff") - f = codecs.getreader(self.encoding)(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = codecs.getreader(self.encoding)(s) self.assertRaises(UnicodeError, f.read) def test_partial(self): @@ -454,7 +477,9 @@ class UTF16Test(ReadTest): _,_,reader,writer = codecs.lookup(self.encoding) # encode some stream s = io.BytesIO() - f = writer(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = writer(s) f.write("spam") f.write("spam") d = s.getvalue() @@ -462,16 +487,22 @@ class UTF16Test(ReadTest): self.assertTrue(d == self.spamle or d == self.spambe) # try to read it back s = io.BytesIO(d) - f = reader(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = reader(s) self.assertEqual(f.read(), "spamspam") def test_badbom(self): s = io.BytesIO(b"\xff\xff") - f = codecs.getreader(self.encoding)(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = codecs.getreader(self.encoding)(s) self.assertRaises(UnicodeError, f.read) s = io.BytesIO(b"\xff\xff\xff\xff") - f = codecs.getreader(self.encoding)(s) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = codecs.getreader(self.encoding)(s) self.assertRaises(UnicodeError, f.read) def test_partial(self): @@ -517,7 +548,8 @@ class UTF16Test(ReadTest): self.addCleanup(support.unlink, support.TESTFN) with open(support.TESTFN, 'wb') as fp: fp.write(s) - with codecs.open(support.TESTFN, 'U', encoding=self.encoding) as reader: + with codecs.open(support.TESTFN, 'U', + encoding=self.encoding) as reader: self.assertEqual(reader.read(), s1) class UTF16LETest(ReadTest): @@ -705,7 +737,9 @@ class UTF8SigTest(ReadTest): reader = codecs.getreader("utf-8-sig") for sizehint in [None] + list(range(1, 11)) + \ [64, 128, 256, 512, 1024]: - istream = reader(io.BytesIO(bytestring)) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + istream = reader(io.BytesIO(bytestring)) ostream = io.StringIO() while 1: if sizehint is not None: @@ -727,7 +761,9 @@ class UTF8SigTest(ReadTest): reader = codecs.getreader("utf-8-sig") for sizehint in [None] + list(range(1, 11)) + \ [64, 128, 256, 512, 1024]: - istream = reader(io.BytesIO(bytestring)) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + istream = reader(io.BytesIO(bytestring)) ostream = io.StringIO() while 1: if sizehint is not None: @@ -749,7 +785,9 @@ class EscapeDecodeTest(unittest.TestCase): class RecodingTest(unittest.TestCase): def test_recoding(self): f = io.BytesIO() - f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8") + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8") f2.write("a") f2.close() # Python used to crash on this at exit because of a refcount @@ -1126,7 +1164,9 @@ class IDNACodecTest(unittest.TestCase): self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.") def test_stream(self): - r = codecs.getreader("idna")(io.BytesIO(b"abc")) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + r = codecs.getreader("idna")(io.BytesIO(b"abc")) r.read(3) self.assertEqual(r.read(), "") @@ -1233,18 +1273,24 @@ class CodecsModuleTest(unittest.TestCase): class StreamReaderTest(unittest.TestCase): def setUp(self): - self.reader = codecs.getreader('utf-8') + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + self.reader = codecs.getreader('utf-8') self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80') def test_readlines(self): - f = self.reader(self.stream) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + f = self.reader(self.stream) self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00']) class EncodedFileTest(unittest.TestCase): def test_basic(self): f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80') - ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8') + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8') self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae') f = io.BytesIO() @@ -1388,7 +1434,9 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): if encoding not in broken_unicode_with_streams: # check stream reader/writer q = Queue(b"") - writer = codecs.getwriter(encoding)(q) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + writer = codecs.getwriter(encoding)(q) encodedresult = b"" for c in s: writer.write(c) @@ -1396,7 +1444,9 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): self.assertTrue(type(chunk) is bytes, type(chunk)) encodedresult += chunk q = Queue(b"") - reader = codecs.getreader(encoding)(q) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(encoding)(q) decodedresult = "" for c in encodedresult: q.write(bytes([c])) @@ -1470,7 +1520,9 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling): continue if encoding in broken_unicode_with_streams: continue - reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding))) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding))) for t in range(5): # Test that calling seek resets the internal codec state and buffers reader.seek(0, 0) @@ -1539,15 +1591,19 @@ class CharmapTest(unittest.TestCase): class WithStmtTest(unittest.TestCase): def test_encodedfile(self): f = io.BytesIO(b"\xc3\xbc") - with codecs.EncodedFile(f, "latin-1", "utf-8") as ef: - self.assertEqual(ef.read(), b"\xfc") + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + with codecs.EncodedFile(f, "latin-1", "utf-8") as ef: + self.assertEqual(ef.read(), b"\xfc") def test_streamreaderwriter(self): f = io.BytesIO(b"\xc3\xbc") info = codecs.lookup("utf-8") - with codecs.StreamReaderWriter(f, info.streamreader, - info.streamwriter, 'strict') as srw: - self.assertEqual(srw.read(), "\xfc") + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + with codecs.StreamReaderWriter(f, info.streamreader, + info.streamwriter, 'strict') as srw: + self.assertEqual(srw.read(), "\xfc") class TypesTest(unittest.TestCase): def test_decode_unicode(self): @@ -1644,15 +1700,15 @@ class BomTest(unittest.TestCase): # (StreamWriter) Check that the BOM is written after a seek(0) with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: - f.writer.write(data[0]) - self.assertNotEqual(f.writer.tell(), 0) - f.writer.seek(0) - f.writer.write(data) + f.write(data[0]) + self.assertNotEqual(f.tell(), 0) + f.seek(0) + f.write(data) f.seek(0) self.assertEqual(f.read(), data) - # Check that the BOM is not written after a seek() at a position - # different than the start + # Check that the BOM is not written after a seek() at a + # position different than the start with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: f.write(data) f.seek(f.tell()) @@ -1660,12 +1716,12 @@ class BomTest(unittest.TestCase): f.seek(0) self.assertEqual(f.read(), data * 2) - # (StreamWriter) Check that the BOM is not written after a seek() - # at a position different than the start + # (StreamWriter) Check that the BOM is not written after a + # seek() at a position different than the start with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f: - f.writer.write(data) - f.writer.seek(f.writer.tell()) - f.writer.write(data) + f.write(data) + f.seek(f.tell()) + f.write(data) f.seek(0) self.assertEqual(f.read(), data * 2) @@ -1704,7 +1760,9 @@ class TransformCodecTest(unittest.TestCase): def test_read(self): for encoding in bytes_transform_encodings: sin = codecs.encode(b"\x80", encoding) - reader = codecs.getreader(encoding)(io.BytesIO(sin)) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(encoding)(io.BytesIO(sin)) sout = reader.read() self.assertEqual(sout, b"\x80") @@ -1713,7 +1771,9 @@ class TransformCodecTest(unittest.TestCase): if encoding in ['uu_codec', 'zlib_codec']: continue sin = codecs.encode(b"\x80", encoding) - reader = codecs.getreader(encoding)(io.BytesIO(sin)) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + reader = codecs.getreader(encoding)(io.BytesIO(sin)) sout = reader.readline() self.assertEqual(sout, b"\x80") diff --git a/Misc/NEWS b/Misc/NEWS index f3143ab..715539d 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -161,6 +161,11 @@ Core and Builtins Library ------- +- Issue #8796: codecs.open() calls the builtin open() function instead of using + StreamReaderWriter. Deprecate StreamReader, StreamWriter, StreamReaderWriter, + StreamRecoder and EncodedFile() of the codec module. Use the builtin open() + function or io.TextIOWrapper instead. + - Issue #12175: BufferedReader.read(-1) now calls raw.readall() if available. - Issue #12175: FileIO.readall() now only reads the file position and size -- cgit v0.12