diff options
author | INADA Naoki <methane@users.noreply.github.com> | 2017-12-21 00:59:53 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-21 00:59:53 (GMT) |
commit | 507434fd504f3ebc1da72aa77544edc0d73f136e (patch) | |
tree | 521c4ca2eeff16a7cee8594cbca095d9bdec9181 /Lib | |
parent | 31e99080f6f8cf7faaba9fe3a4e0996e49163317 (diff) | |
download | cpython-507434fd504f3ebc1da72aa77544edc0d73f136e.zip cpython-507434fd504f3ebc1da72aa77544edc0d73f136e.tar.gz cpython-507434fd504f3ebc1da72aa77544edc0d73f136e.tar.bz2 |
bpo-15216: io: TextIOWrapper.reconfigure() accepts encoding, errors and newline (GH-2343)
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/_pyio.py | 76 | ||||
-rw-r--r-- | Lib/test/test_io.py | 117 |
2 files changed, 173 insertions, 20 deletions
diff --git a/Lib/_pyio.py b/Lib/_pyio.py index b59a650..c91a647 100644 --- a/Lib/_pyio.py +++ b/Lib/_pyio.py @@ -1938,10 +1938,7 @@ class TextIOWrapper(TextIOBase): # so that the signature can match the signature of the C version. def __init__(self, buffer, encoding=None, errors=None, newline=None, line_buffering=False, write_through=False): - if newline is not None and not isinstance(newline, str): - raise TypeError("illegal newline type: %r" % (type(newline),)) - if newline not in (None, "", "\n", "\r", "\r\n"): - raise ValueError("illegal newline value: %r" % (newline,)) + self._check_newline(newline) if encoding is None: try: encoding = os.device_encoding(buffer.fileno()) @@ -1971,22 +1968,38 @@ class TextIOWrapper(TextIOBase): raise ValueError("invalid errors: %r" % errors) self._buffer = buffer + self._decoded_chars = '' # buffer for text returned from decoder + self._decoded_chars_used = 0 # offset into _decoded_chars for read() + self._snapshot = None # info for reconstructing decoder state + self._seekable = self._telling = self.buffer.seekable() + self._has_read1 = hasattr(self.buffer, 'read1') + self._configure(encoding, errors, newline, + line_buffering, write_through) + + def _check_newline(self, newline): + if newline is not None and not isinstance(newline, str): + raise TypeError("illegal newline type: %r" % (type(newline),)) + if newline not in (None, "", "\n", "\r", "\r\n"): + raise ValueError("illegal newline value: %r" % (newline,)) + + def _configure(self, encoding=None, errors=None, newline=None, + line_buffering=False, write_through=False): self._encoding = encoding self._errors = errors + self._encoder = None + self._decoder = None + self._b2cratio = 0.0 + self._readuniversal = not newline self._readtranslate = newline is None self._readnl = newline self._writetranslate = newline != '' self._writenl = newline or os.linesep - self._encoder = None - self._decoder = None - self._decoded_chars = '' # buffer for text returned from decoder - self._decoded_chars_used = 0 # offset into _decoded_chars for read() - self._snapshot = None # info for reconstructing decoder state - self._seekable = self._telling = self.buffer.seekable() - self._has_read1 = hasattr(self.buffer, 'read1') - self._b2cratio = 0.0 + self._line_buffering = line_buffering + self._write_through = write_through + + # don't write a BOM in the middle of a file if self._seekable and self.writable(): position = self.buffer.tell() if position != 0: @@ -1996,12 +2009,6 @@ class TextIOWrapper(TextIOBase): # Sometimes the encoder doesn't exist pass - self._configure(line_buffering, write_through) - - def _configure(self, line_buffering=False, write_through=False): - self._line_buffering = line_buffering - self._write_through = write_through - # self._snapshot is either None, or a tuple (dec_flags, next_input) # where dec_flags is the second (integer) item of the decoder state # and next_input is the chunk of input bytes that comes next after the @@ -2048,17 +2055,46 @@ class TextIOWrapper(TextIOBase): def buffer(self): return self._buffer - def reconfigure(self, *, line_buffering=None, write_through=None): + def reconfigure(self, *, + encoding=None, errors=None, newline=Ellipsis, + line_buffering=None, write_through=None): """Reconfigure the text stream with new parameters. This also flushes the stream. """ + if (self._decoder is not None + and (encoding is not None or errors is not None + or newline is not Ellipsis)): + raise UnsupportedOperation( + "It is not possible to set the encoding or newline of stream " + "after the first read") + + if errors is None: + if encoding is None: + errors = self._errors + else: + errors = 'strict' + elif not isinstance(errors, str): + raise TypeError("invalid errors: %r" % errors) + + if encoding is None: + encoding = self._encoding + else: + if not isinstance(encoding, str): + raise TypeError("invalid encoding: %r" % encoding) + + if newline is Ellipsis: + newline = self._readnl + self._check_newline(newline) + if line_buffering is None: line_buffering = self.line_buffering if write_through is None: write_through = self.write_through + self.flush() - self._configure(line_buffering, write_through) + self._configure(encoding, errors, newline, + line_buffering, write_through) def seekable(self): if self.closed: diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 9bfe4b0..3aee5f1 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -3408,6 +3408,123 @@ class TextIOWrapperTest(unittest.TestCase): F.tell = lambda x: 0 t = self.TextIOWrapper(F(), encoding='utf-8') + def test_reconfigure_encoding_read(self): + # latin1 -> utf8 + # (latin1 can decode utf-8 encoded string) + data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8') + raw = self.BytesIO(data) + txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') + self.assertEqual(txt.readline(), 'abc\xe9\n') + with self.assertRaises(self.UnsupportedOperation): + txt.reconfigure(encoding='utf-8') + with self.assertRaises(self.UnsupportedOperation): + txt.reconfigure(newline=None) + + def test_reconfigure_write_fromascii(self): + # ascii has a specific encodefunc in the C implementation, + # but utf-8-sig has not. Make sure that we get rid of the + # cached encodefunc when we switch encoders. + raw = self.BytesIO() + txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') + txt.write('foo\n') + txt.reconfigure(encoding='utf-8-sig') + txt.write('\xe9\n') + txt.flush() + self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n') + + def test_reconfigure_write(self): + # latin -> utf8 + raw = self.BytesIO() + txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') + txt.write('abc\xe9\n') + txt.reconfigure(encoding='utf-8') + self.assertEqual(raw.getvalue(), b'abc\xe9\n') + txt.write('d\xe9f\n') + txt.flush() + self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n') + + # ascii -> utf-8-sig: ensure that no BOM is written in the middle of + # the file + raw = self.BytesIO() + txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') + txt.write('abc\n') + txt.reconfigure(encoding='utf-8-sig') + txt.write('d\xe9f\n') + txt.flush() + self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n') + + def test_reconfigure_write_non_seekable(self): + raw = self.BytesIO() + raw.seekable = lambda: False + raw.seek = None + txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') + txt.write('abc\n') + txt.reconfigure(encoding='utf-8-sig') + txt.write('d\xe9f\n') + txt.flush() + + # If the raw stream is not seekable, there'll be a BOM + self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n') + + def test_reconfigure_defaults(self): + txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n') + txt.reconfigure(encoding=None) + self.assertEqual(txt.encoding, 'ascii') + self.assertEqual(txt.errors, 'replace') + txt.write('LF\n') + + txt.reconfigure(newline='\r\n') + self.assertEqual(txt.encoding, 'ascii') + self.assertEqual(txt.errors, 'replace') + + txt.reconfigure(errors='ignore') + self.assertEqual(txt.encoding, 'ascii') + self.assertEqual(txt.errors, 'ignore') + txt.write('CRLF\n') + + txt.reconfigure(encoding='utf-8', newline=None) + self.assertEqual(txt.errors, 'strict') + txt.seek(0) + self.assertEqual(txt.read(), 'LF\nCRLF\n') + + self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n') + + def test_reconfigure_newline(self): + raw = self.BytesIO(b'CR\rEOF') + txt = self.TextIOWrapper(raw, 'ascii', newline='\n') + txt.reconfigure(newline=None) + self.assertEqual(txt.readline(), 'CR\n') + raw = self.BytesIO(b'CR\rEOF') + txt = self.TextIOWrapper(raw, 'ascii', newline='\n') + txt.reconfigure(newline='') + self.assertEqual(txt.readline(), 'CR\r') + raw = self.BytesIO(b'CR\rLF\nEOF') + txt = self.TextIOWrapper(raw, 'ascii', newline='\r') + txt.reconfigure(newline='\n') + self.assertEqual(txt.readline(), 'CR\rLF\n') + raw = self.BytesIO(b'LF\nCR\rEOF') + txt = self.TextIOWrapper(raw, 'ascii', newline='\n') + txt.reconfigure(newline='\r') + self.assertEqual(txt.readline(), 'LF\nCR\r') + raw = self.BytesIO(b'CR\rCRLF\r\nEOF') + txt = self.TextIOWrapper(raw, 'ascii', newline='\r') + txt.reconfigure(newline='\r\n') + self.assertEqual(txt.readline(), 'CR\rCRLF\r\n') + + txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r') + txt.reconfigure(newline=None) + txt.write('linesep\n') + txt.reconfigure(newline='') + txt.write('LF\n') + txt.reconfigure(newline='\n') + txt.write('LF\n') + txt.reconfigure(newline='\r') + txt.write('CR\n') + txt.reconfigure(newline='\r\n') + txt.write('CRLF\n') + expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n' + self.assertEqual(txt.detach().getvalue().decode('ascii'), expected) + class MemviewBytesIO(io.BytesIO): '''A BytesIO object whose read method returns memoryviews |