summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@haypocalc.com>2011-05-26 23:51:18 (GMT)
committerVictor Stinner <victor.stinner@haypocalc.com>2011-05-26 23:51:18 (GMT)
commit98fe1a0c3bacdc51071d960d8d76b3b9f5b0d8c6 (patch)
tree9f3ade330a69ba55b615ccbfcf4680939377ab84
parentc556e10b94541d7bf20e908f8eca78e7f63fc28c (diff)
downloadcpython-98fe1a0c3bacdc51071d960d8d76b3b9f5b0d8c6.zip
cpython-98fe1a0c3bacdc51071d960d8d76b3b9f5b0d8c6.tar.gz
cpython-98fe1a0c3bacdc51071d960d8d76b3b9f5b0d8c6.tar.bz2
Issue #8796: codecs.open() calls the builtin open() function instead of using
StreamReaderWriter. Deprecate StreamReader, StreamWriter, StreamReaderWriter, StreamRecoder and EncodedFile() of the codec module. Use the builtin open() function or io.TextIOWrapper instead.
-rw-r--r--Doc/library/codecs.rst25
-rw-r--r--Lib/codecs.py25
-rw-r--r--Lib/test/test_codecs.py152
-rw-r--r--Misc/NEWS5
4 files changed, 148 insertions, 59 deletions
diff --git a/Doc/library/codecs.rst b/Doc/library/codecs.rst
index 4d5058e..b58e410 100644
--- a/Doc/library/codecs.rst
+++ b/Doc/library/codecs.rst
@@ -85,6 +85,9 @@ It defines the following functions:
In case a search function cannot find a given encoding, it should return
``None``.
+ .. deprecated:: 3.3
+ *streamreader* and *streamwriter* attributes are now deprecated.
+
.. function:: lookup(encoding)
@@ -139,6 +142,8 @@ functions which use :func:`lookup` for the codec lookup:
Raises a :exc:`LookupError` in case the encoding cannot be found.
+ .. deprecated:: 3.3
+
.. function:: getwriter(encoding)
@@ -147,6 +152,8 @@ functions which use :func:`lookup` for the codec lookup:
Raises a :exc:`LookupError` in case the encoding cannot be found.
+ .. deprecated:: 3.3
+
.. function:: register_error(name, error_handler)
@@ -217,6 +224,11 @@ utility functions:
.. note::
+ This function is kept for backward compatibility with Python 2, the
+ builtin :func:`open` function should be used instead.
+
+ .. note::
+
The wrapped version's methods will accept and return strings only. Bytes
arguments will be rejected.
@@ -251,6 +263,8 @@ utility functions:
``'strict'``, which causes :exc:`ValueError` to be raised in case an encoding
error occurs.
+ .. deprecated:: 3.3
+
.. function:: iterencode(iterator, encoding, errors='strict', **kwargs)
@@ -563,6 +577,9 @@ The :class:`StreamWriter` class is a subclass of :class:`Codec` and defines the
following methods which every stream writer must define in order to be
compatible with the Python codec registry.
+.. deprecated:: 3.3
+ Use the builtin the :class:`io.TextIOWrapper` class.
+
.. class:: StreamWriter(stream[, errors])
@@ -628,6 +645,9 @@ The :class:`StreamReader` class is a subclass of :class:`Codec` and defines the
following methods which every stream reader must define in order to be
compatible with the Python codec registry.
+.. deprecated:: 3.3
+ Use the builtin the :class:`io.TextIOWrapper` class.
+
.. class:: StreamReader(stream[, errors])
@@ -728,6 +748,9 @@ and write modes.
The design is such that one can use the factory functions returned by the
:func:`lookup` function to construct the instance.
+.. deprecated:: 3.3
+ Use the :class:`io.TextIOWrapper` class.
+
.. class:: StreamReaderWriter(stream, Reader, Writer, errors)
@@ -752,6 +775,8 @@ which is sometimes useful when dealing with different encoding environments.
The design is such that one can use the factory functions returned by the
:func:`lookup` function to construct the instance.
+.. deprecated:: 3.3
+
.. class:: StreamRecoder(stream, encode, decode, Reader, Writer, errors)
diff --git a/Lib/codecs.py b/Lib/codecs.py
index b150d64..ec7879f 100644
--- a/Lib/codecs.py
+++ b/Lib/codecs.py
@@ -345,6 +345,8 @@ class StreamWriter(Codec):
The set of allowed parameter values can be extended via
register_error.
"""
+ import warnings
+ warnings.warn('use io.TextIOWrapper', DeprecationWarning, stacklevel=2)
self.stream = stream
self.errors = errors
@@ -416,6 +418,8 @@ class StreamReader(Codec):
The set of allowed parameter values can be extended via
register_error.
"""
+ import warnings
+ warnings.warn('use io.TextIOWrapper', DeprecationWarning, stacklevel=2)
self.stream = stream
self.errors = errors
self.bytebuffer = b""
@@ -846,7 +850,7 @@ class StreamRecoder:
### Shortcuts
-def open(filename, mode='rb', encoding=None, errors='strict', buffering=1):
+def open(filename, mode='r', encoding=None, errors=None, buffering=1):
""" Open an encoded file using the given mode and return
a wrapped version providing transparent encoding/decoding.
@@ -877,18 +881,13 @@ def open(filename, mode='rb', encoding=None, errors='strict', buffering=1):
parameter.
"""
- if encoding is not None and \
- 'b' not in mode:
- # Force opening of the file in binary mode
- mode = mode + 'b'
- file = builtins.open(filename, mode, buffering)
- if encoding is None:
- return file
- info = lookup(encoding)
- srw = StreamReaderWriter(file, info.streamreader, info.streamwriter, errors)
- # Add attributes to simplify introspection
- srw.encoding = encoding
- return srw
+ if encoding is not None:
+ return builtins.open(filename, mode, buffering,
+ encoding, errors, newline='')
+ else:
+ if 'b' not in mode:
+ mode = mode + 'b'
+ return builtins.open(filename, mode, buffering, encoding, errors)
def EncodedFile(file, data_encoding, file_encoding=None, errors='strict'):
diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py
index c0450e7..1f46560 100644
--- a/Lib/test/test_codecs.py
+++ b/Lib/test/test_codecs.py
@@ -1,7 +1,10 @@
from test import support
-import unittest
+import _testcapi
import codecs
-import sys, _testcapi, io
+import io
+import sys
+import unittest
+import warnings
class Queue(object):
"""
@@ -63,7 +66,9 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling):
# the StreamReader and check that the results equal the appropriate
# entries from partialresults.
q = Queue(b"")
- r = codecs.getreader(self.encoding)(q)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ r = codecs.getreader(self.encoding)(q)
result = ""
for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
q.write(bytes([c]))
@@ -106,7 +111,9 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling):
return codecs.getreader(self.encoding)(stream)
def readalllines(input, keepends=True, size=None):
- reader = getreader(input)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ reader = getreader(input)
lines = []
while True:
line = reader.readline(size=size, keepends=keepends)
@@ -215,14 +222,18 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling):
' \r\n',
]
stream = io.BytesIO("".join(s).encode(self.encoding))
- reader = codecs.getreader(self.encoding)(stream)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ reader = codecs.getreader(self.encoding)(stream)
for (i, line) in enumerate(reader):
self.assertEqual(line, s[i])
def test_readlinequeue(self):
q = Queue(b"")
- writer = codecs.getwriter(self.encoding)(q)
- reader = codecs.getreader(self.encoding)(q)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ writer = codecs.getwriter(self.encoding)(q)
+ reader = codecs.getreader(self.encoding)(q)
# No lineends
writer.write("foo\r")
@@ -253,7 +264,9 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling):
s = (s1+s2+s3).encode(self.encoding)
stream = io.BytesIO(s)
- reader = codecs.getreader(self.encoding)(stream)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ reader = codecs.getreader(self.encoding)(stream)
self.assertEqual(reader.readline(), s1)
self.assertEqual(reader.readline(), s2)
self.assertEqual(reader.readline(), s3)
@@ -268,7 +281,9 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling):
s = (s1+s2+s3+s4+s5).encode(self.encoding)
stream = io.BytesIO(s)
- reader = codecs.getreader(self.encoding)(stream)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ reader = codecs.getreader(self.encoding)(stream)
self.assertEqual(reader.readline(), s1)
self.assertEqual(reader.readline(), s2)
self.assertEqual(reader.readline(), s3)
@@ -290,7 +305,9 @@ class UTF32Test(ReadTest):
_,_,reader,writer = codecs.lookup(self.encoding)
# encode some stream
s = io.BytesIO()
- f = writer(s)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ f = writer(s)
f.write("spam")
f.write("spam")
d = s.getvalue()
@@ -298,16 +315,22 @@ class UTF32Test(ReadTest):
self.assertTrue(d == self.spamle or d == self.spambe)
# try to read it back
s = io.BytesIO(d)
- f = reader(s)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ f = reader(s)
self.assertEqual(f.read(), "spamspam")
def test_badbom(self):
s = io.BytesIO(4*b"\xff")
- f = codecs.getreader(self.encoding)(s)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ f = codecs.getreader(self.encoding)(s)
self.assertRaises(UnicodeError, f.read)
s = io.BytesIO(8*b"\xff")
- f = codecs.getreader(self.encoding)(s)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ f = codecs.getreader(self.encoding)(s)
self.assertRaises(UnicodeError, f.read)
def test_partial(self):
@@ -454,7 +477,9 @@ class UTF16Test(ReadTest):
_,_,reader,writer = codecs.lookup(self.encoding)
# encode some stream
s = io.BytesIO()
- f = writer(s)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ f = writer(s)
f.write("spam")
f.write("spam")
d = s.getvalue()
@@ -462,16 +487,22 @@ class UTF16Test(ReadTest):
self.assertTrue(d == self.spamle or d == self.spambe)
# try to read it back
s = io.BytesIO(d)
- f = reader(s)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ f = reader(s)
self.assertEqual(f.read(), "spamspam")
def test_badbom(self):
s = io.BytesIO(b"\xff\xff")
- f = codecs.getreader(self.encoding)(s)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ f = codecs.getreader(self.encoding)(s)
self.assertRaises(UnicodeError, f.read)
s = io.BytesIO(b"\xff\xff\xff\xff")
- f = codecs.getreader(self.encoding)(s)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ f = codecs.getreader(self.encoding)(s)
self.assertRaises(UnicodeError, f.read)
def test_partial(self):
@@ -517,7 +548,8 @@ class UTF16Test(ReadTest):
self.addCleanup(support.unlink, support.TESTFN)
with open(support.TESTFN, 'wb') as fp:
fp.write(s)
- with codecs.open(support.TESTFN, 'U', encoding=self.encoding) as reader:
+ with codecs.open(support.TESTFN, 'U',
+ encoding=self.encoding) as reader:
self.assertEqual(reader.read(), s1)
class UTF16LETest(ReadTest):
@@ -705,7 +737,9 @@ class UTF8SigTest(ReadTest):
reader = codecs.getreader("utf-8-sig")
for sizehint in [None] + list(range(1, 11)) + \
[64, 128, 256, 512, 1024]:
- istream = reader(io.BytesIO(bytestring))
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ istream = reader(io.BytesIO(bytestring))
ostream = io.StringIO()
while 1:
if sizehint is not None:
@@ -727,7 +761,9 @@ class UTF8SigTest(ReadTest):
reader = codecs.getreader("utf-8-sig")
for sizehint in [None] + list(range(1, 11)) + \
[64, 128, 256, 512, 1024]:
- istream = reader(io.BytesIO(bytestring))
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ istream = reader(io.BytesIO(bytestring))
ostream = io.StringIO()
while 1:
if sizehint is not None:
@@ -749,7 +785,9 @@ class EscapeDecodeTest(unittest.TestCase):
class RecodingTest(unittest.TestCase):
def test_recoding(self):
f = io.BytesIO()
- f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
f2.write("a")
f2.close()
# Python used to crash on this at exit because of a refcount
@@ -1126,7 +1164,9 @@ class IDNACodecTest(unittest.TestCase):
self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
def test_stream(self):
- r = codecs.getreader("idna")(io.BytesIO(b"abc"))
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ r = codecs.getreader("idna")(io.BytesIO(b"abc"))
r.read(3)
self.assertEqual(r.read(), "")
@@ -1233,18 +1273,24 @@ class CodecsModuleTest(unittest.TestCase):
class StreamReaderTest(unittest.TestCase):
def setUp(self):
- self.reader = codecs.getreader('utf-8')
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ self.reader = codecs.getreader('utf-8')
self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
def test_readlines(self):
- f = self.reader(self.stream)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ f = self.reader(self.stream)
self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00'])
class EncodedFileTest(unittest.TestCase):
def test_basic(self):
f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
- ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')
f = io.BytesIO()
@@ -1388,7 +1434,9 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
if encoding not in broken_unicode_with_streams:
# check stream reader/writer
q = Queue(b"")
- writer = codecs.getwriter(encoding)(q)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ writer = codecs.getwriter(encoding)(q)
encodedresult = b""
for c in s:
writer.write(c)
@@ -1396,7 +1444,9 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
self.assertTrue(type(chunk) is bytes, type(chunk))
encodedresult += chunk
q = Queue(b"")
- reader = codecs.getreader(encoding)(q)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ reader = codecs.getreader(encoding)(q)
decodedresult = ""
for c in encodedresult:
q.write(bytes([c]))
@@ -1470,7 +1520,9 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
continue
if encoding in broken_unicode_with_streams:
continue
- reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
for t in range(5):
# Test that calling seek resets the internal codec state and buffers
reader.seek(0, 0)
@@ -1539,15 +1591,19 @@ class CharmapTest(unittest.TestCase):
class WithStmtTest(unittest.TestCase):
def test_encodedfile(self):
f = io.BytesIO(b"\xc3\xbc")
- with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
- self.assertEqual(ef.read(), b"\xfc")
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
+ self.assertEqual(ef.read(), b"\xfc")
def test_streamreaderwriter(self):
f = io.BytesIO(b"\xc3\xbc")
info = codecs.lookup("utf-8")
- with codecs.StreamReaderWriter(f, info.streamreader,
- info.streamwriter, 'strict') as srw:
- self.assertEqual(srw.read(), "\xfc")
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ with codecs.StreamReaderWriter(f, info.streamreader,
+ info.streamwriter, 'strict') as srw:
+ self.assertEqual(srw.read(), "\xfc")
class TypesTest(unittest.TestCase):
def test_decode_unicode(self):
@@ -1644,15 +1700,15 @@ class BomTest(unittest.TestCase):
# (StreamWriter) Check that the BOM is written after a seek(0)
with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
- f.writer.write(data[0])
- self.assertNotEqual(f.writer.tell(), 0)
- f.writer.seek(0)
- f.writer.write(data)
+ f.write(data[0])
+ self.assertNotEqual(f.tell(), 0)
+ f.seek(0)
+ f.write(data)
f.seek(0)
self.assertEqual(f.read(), data)
- # Check that the BOM is not written after a seek() at a position
- # different than the start
+ # Check that the BOM is not written after a seek() at a
+ # position different than the start
with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
f.write(data)
f.seek(f.tell())
@@ -1660,12 +1716,12 @@ class BomTest(unittest.TestCase):
f.seek(0)
self.assertEqual(f.read(), data * 2)
- # (StreamWriter) Check that the BOM is not written after a seek()
- # at a position different than the start
+ # (StreamWriter) Check that the BOM is not written after a
+ # seek() at a position different than the start
with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
- f.writer.write(data)
- f.writer.seek(f.writer.tell())
- f.writer.write(data)
+ f.write(data)
+ f.seek(f.tell())
+ f.write(data)
f.seek(0)
self.assertEqual(f.read(), data * 2)
@@ -1704,7 +1760,9 @@ class TransformCodecTest(unittest.TestCase):
def test_read(self):
for encoding in bytes_transform_encodings:
sin = codecs.encode(b"\x80", encoding)
- reader = codecs.getreader(encoding)(io.BytesIO(sin))
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ reader = codecs.getreader(encoding)(io.BytesIO(sin))
sout = reader.read()
self.assertEqual(sout, b"\x80")
@@ -1713,7 +1771,9 @@ class TransformCodecTest(unittest.TestCase):
if encoding in ['uu_codec', 'zlib_codec']:
continue
sin = codecs.encode(b"\x80", encoding)
- reader = codecs.getreader(encoding)(io.BytesIO(sin))
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ reader = codecs.getreader(encoding)(io.BytesIO(sin))
sout = reader.readline()
self.assertEqual(sout, b"\x80")
diff --git a/Misc/NEWS b/Misc/NEWS
index f3143ab..715539d 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -161,6 +161,11 @@ Core and Builtins
Library
-------
+- Issue #8796: codecs.open() calls the builtin open() function instead of using
+ StreamReaderWriter. Deprecate StreamReader, StreamWriter, StreamReaderWriter,
+ StreamRecoder and EncodedFile() of the codec module. Use the builtin open()
+ function or io.TextIOWrapper instead.
+
- Issue #12175: BufferedReader.read(-1) now calls raw.readall() if available.
- Issue #12175: FileIO.readall() now only reads the file position and size