summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorINADA Naoki <methane@users.noreply.github.com>2017-12-21 00:59:53 (GMT)
committerGitHub <noreply@github.com>2017-12-21 00:59:53 (GMT)
commit507434fd504f3ebc1da72aa77544edc0d73f136e (patch)
tree521c4ca2eeff16a7cee8594cbca095d9bdec9181 /Lib
parent31e99080f6f8cf7faaba9fe3a4e0996e49163317 (diff)
downloadcpython-507434fd504f3ebc1da72aa77544edc0d73f136e.zip
cpython-507434fd504f3ebc1da72aa77544edc0d73f136e.tar.gz
cpython-507434fd504f3ebc1da72aa77544edc0d73f136e.tar.bz2
bpo-15216: io: TextIOWrapper.reconfigure() accepts encoding, errors and newline (GH-2343)
Diffstat (limited to 'Lib')
-rw-r--r--Lib/_pyio.py76
-rw-r--r--Lib/test/test_io.py117
2 files changed, 173 insertions, 20 deletions
diff --git a/Lib/_pyio.py b/Lib/_pyio.py
index b59a650..c91a647 100644
--- a/Lib/_pyio.py
+++ b/Lib/_pyio.py
@@ -1938,10 +1938,7 @@ class TextIOWrapper(TextIOBase):
# so that the signature can match the signature of the C version.
def __init__(self, buffer, encoding=None, errors=None, newline=None,
line_buffering=False, write_through=False):
- if newline is not None and not isinstance(newline, str):
- raise TypeError("illegal newline type: %r" % (type(newline),))
- if newline not in (None, "", "\n", "\r", "\r\n"):
- raise ValueError("illegal newline value: %r" % (newline,))
+ self._check_newline(newline)
if encoding is None:
try:
encoding = os.device_encoding(buffer.fileno())
@@ -1971,22 +1968,38 @@ class TextIOWrapper(TextIOBase):
raise ValueError("invalid errors: %r" % errors)
self._buffer = buffer
+ self._decoded_chars = '' # buffer for text returned from decoder
+ self._decoded_chars_used = 0 # offset into _decoded_chars for read()
+ self._snapshot = None # info for reconstructing decoder state
+ self._seekable = self._telling = self.buffer.seekable()
+ self._has_read1 = hasattr(self.buffer, 'read1')
+ self._configure(encoding, errors, newline,
+ line_buffering, write_through)
+
+ def _check_newline(self, newline):
+ if newline is not None and not isinstance(newline, str):
+ raise TypeError("illegal newline type: %r" % (type(newline),))
+ if newline not in (None, "", "\n", "\r", "\r\n"):
+ raise ValueError("illegal newline value: %r" % (newline,))
+
+ def _configure(self, encoding=None, errors=None, newline=None,
+ line_buffering=False, write_through=False):
self._encoding = encoding
self._errors = errors
+ self._encoder = None
+ self._decoder = None
+ self._b2cratio = 0.0
+
self._readuniversal = not newline
self._readtranslate = newline is None
self._readnl = newline
self._writetranslate = newline != ''
self._writenl = newline or os.linesep
- self._encoder = None
- self._decoder = None
- self._decoded_chars = '' # buffer for text returned from decoder
- self._decoded_chars_used = 0 # offset into _decoded_chars for read()
- self._snapshot = None # info for reconstructing decoder state
- self._seekable = self._telling = self.buffer.seekable()
- self._has_read1 = hasattr(self.buffer, 'read1')
- self._b2cratio = 0.0
+ self._line_buffering = line_buffering
+ self._write_through = write_through
+
+ # don't write a BOM in the middle of a file
if self._seekable and self.writable():
position = self.buffer.tell()
if position != 0:
@@ -1996,12 +2009,6 @@ class TextIOWrapper(TextIOBase):
# Sometimes the encoder doesn't exist
pass
- self._configure(line_buffering, write_through)
-
- def _configure(self, line_buffering=False, write_through=False):
- self._line_buffering = line_buffering
- self._write_through = write_through
-
# self._snapshot is either None, or a tuple (dec_flags, next_input)
# where dec_flags is the second (integer) item of the decoder state
# and next_input is the chunk of input bytes that comes next after the
@@ -2048,17 +2055,46 @@ class TextIOWrapper(TextIOBase):
def buffer(self):
return self._buffer
- def reconfigure(self, *, line_buffering=None, write_through=None):
+ def reconfigure(self, *,
+ encoding=None, errors=None, newline=Ellipsis,
+ line_buffering=None, write_through=None):
"""Reconfigure the text stream with new parameters.
This also flushes the stream.
"""
+ if (self._decoder is not None
+ and (encoding is not None or errors is not None
+ or newline is not Ellipsis)):
+ raise UnsupportedOperation(
+ "It is not possible to set the encoding or newline of stream "
+ "after the first read")
+
+ if errors is None:
+ if encoding is None:
+ errors = self._errors
+ else:
+ errors = 'strict'
+ elif not isinstance(errors, str):
+ raise TypeError("invalid errors: %r" % errors)
+
+ if encoding is None:
+ encoding = self._encoding
+ else:
+ if not isinstance(encoding, str):
+ raise TypeError("invalid encoding: %r" % encoding)
+
+ if newline is Ellipsis:
+ newline = self._readnl
+ self._check_newline(newline)
+
if line_buffering is None:
line_buffering = self.line_buffering
if write_through is None:
write_through = self.write_through
+
self.flush()
- self._configure(line_buffering, write_through)
+ self._configure(encoding, errors, newline,
+ line_buffering, write_through)
def seekable(self):
if self.closed:
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 9bfe4b0..3aee5f1 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -3408,6 +3408,123 @@ class TextIOWrapperTest(unittest.TestCase):
F.tell = lambda x: 0
t = self.TextIOWrapper(F(), encoding='utf-8')
+ def test_reconfigure_encoding_read(self):
+ # latin1 -> utf8
+ # (latin1 can decode utf-8 encoded string)
+ data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
+ raw = self.BytesIO(data)
+ txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
+ self.assertEqual(txt.readline(), 'abc\xe9\n')
+ with self.assertRaises(self.UnsupportedOperation):
+ txt.reconfigure(encoding='utf-8')
+ with self.assertRaises(self.UnsupportedOperation):
+ txt.reconfigure(newline=None)
+
+ def test_reconfigure_write_fromascii(self):
+ # ascii has a specific encodefunc in the C implementation,
+ # but utf-8-sig has not. Make sure that we get rid of the
+ # cached encodefunc when we switch encoders.
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('foo\n')
+ txt.reconfigure(encoding='utf-8-sig')
+ txt.write('\xe9\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
+
+ def test_reconfigure_write(self):
+ # latin -> utf8
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
+ txt.write('abc\xe9\n')
+ txt.reconfigure(encoding='utf-8')
+ self.assertEqual(raw.getvalue(), b'abc\xe9\n')
+ txt.write('d\xe9f\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
+
+ # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
+ # the file
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('abc\n')
+ txt.reconfigure(encoding='utf-8-sig')
+ txt.write('d\xe9f\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
+
+ def test_reconfigure_write_non_seekable(self):
+ raw = self.BytesIO()
+ raw.seekable = lambda: False
+ raw.seek = None
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('abc\n')
+ txt.reconfigure(encoding='utf-8-sig')
+ txt.write('d\xe9f\n')
+ txt.flush()
+
+ # If the raw stream is not seekable, there'll be a BOM
+ self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
+
+ def test_reconfigure_defaults(self):
+ txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
+ txt.reconfigure(encoding=None)
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'replace')
+ txt.write('LF\n')
+
+ txt.reconfigure(newline='\r\n')
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'replace')
+
+ txt.reconfigure(errors='ignore')
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'ignore')
+ txt.write('CRLF\n')
+
+ txt.reconfigure(encoding='utf-8', newline=None)
+ self.assertEqual(txt.errors, 'strict')
+ txt.seek(0)
+ self.assertEqual(txt.read(), 'LF\nCRLF\n')
+
+ self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
+
+ def test_reconfigure_newline(self):
+ raw = self.BytesIO(b'CR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.reconfigure(newline=None)
+ self.assertEqual(txt.readline(), 'CR\n')
+ raw = self.BytesIO(b'CR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.reconfigure(newline='')
+ self.assertEqual(txt.readline(), 'CR\r')
+ raw = self.BytesIO(b'CR\rLF\nEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
+ txt.reconfigure(newline='\n')
+ self.assertEqual(txt.readline(), 'CR\rLF\n')
+ raw = self.BytesIO(b'LF\nCR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.reconfigure(newline='\r')
+ self.assertEqual(txt.readline(), 'LF\nCR\r')
+ raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
+ txt.reconfigure(newline='\r\n')
+ self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
+
+ txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
+ txt.reconfigure(newline=None)
+ txt.write('linesep\n')
+ txt.reconfigure(newline='')
+ txt.write('LF\n')
+ txt.reconfigure(newline='\n')
+ txt.write('LF\n')
+ txt.reconfigure(newline='\r')
+ txt.write('CR\n')
+ txt.reconfigure(newline='\r\n')
+ txt.write('CRLF\n')
+ expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
+ self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
+
class MemviewBytesIO(io.BytesIO):
'''A BytesIO object whose read method returns memoryviews