diff options
author | Guido van Rossum <guido@python.org> | 2007-08-18 21:39:55 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2007-08-18 21:39:55 (GMT) |
commit | 8358db22faa3d8fa9ac0ef2f2c1ff1770a843996 (patch) | |
tree | d3be700f72499f59816b5472ff3a03ae0150ac46 /Lib | |
parent | b99f762f10edb2646a634c2290ecb064bd52e5c7 (diff) | |
download | cpython-8358db22faa3d8fa9ac0ef2f2c1ff1770a843996.zip cpython-8358db22faa3d8fa9ac0ef2f2c1ff1770a843996.tar.gz cpython-8358db22faa3d8fa9ac0ef2f2c1ff1770a843996.tar.bz2 |
New I/O code from Tony Lownds implement newline feature correctly,
and implements .newlines attribute in a 2.x-compatible fashion.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/io.py | 210 | ||||
-rw-r--r-- | Lib/test/test_io.py | 97 | ||||
-rw-r--r-- | Lib/test/test_univnewlines.py | 14 |
3 files changed, 248 insertions, 73 deletions
@@ -61,10 +61,26 @@ def open(file, mode="r", buffering=None, encoding=None, newline=None): can be: 0 = unbuffered, 1 = line buffered, larger = fully buffered. encoding: optional string giving the text encoding. - newline: optional newlines specifier; must be None, '\n' or '\r\n'; - specifies the line ending expected on input and written on - output. If None, use universal newlines on input and - use os.linesep on output. + newline: optional newlines specifier; must be None, '', '\n', '\r' + or '\r\n'; all other values are illegal. It controls the + handling of line endings. It works as follows: + + * On input, if `newline` is `None`, universal newlines + mode is enabled. Lines in the input can end in `'\n'`, + `'\r'`, or `'\r\n'`, and these are translated into + `'\n'` before being returned to the caller. If it is + `''`, universal newline mode is enabled, but line endings + are returned to the caller untranslated. If it has any of + the other legal values, input lines are only terminated by + the given string, and the line ending is returned to the + caller untranslated. + + * On output, if `newline` is `None`, any `'\n'` + characters written are translated to the system default + line separator, `os.linesep`. If `newline` is `''`, + no translation takes place. If `newline` is any of the + other legal values, any `'\n'` characters written are + translated to the given string. (*) If a file descriptor is given, it is closed when the returned I/O object is closed. If you don't want this to happen, use @@ -958,6 +974,17 @@ class TextIOBase(IOBase): """Subclasses should override.""" return None + @property + def newlines(self): + """newlines -> None | str | tuple of str. Line endings translated + so far. + + Only line endings translated during reading are considered. + + Subclasses should override. + """ + return None + class TextIOWrapper(TextIOBase): @@ -969,7 +996,7 @@ class TextIOWrapper(TextIOBase): _CHUNK_SIZE = 128 def __init__(self, buffer, encoding=None, newline=None): - if newline not in (None, "\n", "\r\n"): + if newline not in (None, "", "\n", "\r", "\r\n"): raise ValueError("illegal newline value: %r" % (newline,)) if encoding is None: try: @@ -987,8 +1014,12 @@ class TextIOWrapper(TextIOBase): self.buffer = buffer self._encoding = encoding - self._newline = newline or os.linesep - self._fix_newlines = newline is None + self._readuniversal = not newline + self._readtranslate = newline is None + self._readnl = newline + self._writetranslate = newline != '' + self._writenl = newline or os.linesep + self._seennl = 0 self._decoder = None self._pending = "" self._snapshot = None @@ -1032,13 +1063,15 @@ class TextIOWrapper(TextIOBase): def write(self, s: str): if self.closed: raise ValueError("write to closed file") + haslf = "\n" in s + if haslf and self._writetranslate and self._writenl != "\n": + s = s.replace("\n", self._writenl) # XXX What if we were just reading? b = s.encode(self._encoding) if isinstance(b, str): b = bytes(b) - n = self.buffer.write(b) - if "\n" in s: - # XXX only if isatty + self.buffer.write(b) + if haslf and self.isatty(): self.flush() self._snapshot = self._decoder = None return len(s) @@ -1159,7 +1192,7 @@ class TextIOWrapper(TextIOBase): res += decoder.decode(self.buffer.read(), True) self._pending = "" self._snapshot = None - return res.replace("\r\n", "\n") + return self._replacenl(res) else: while len(res) < n: readahead, pending = self._read_chunk() @@ -1167,7 +1200,7 @@ class TextIOWrapper(TextIOBase): if not readahead: break self._pending = res[n:] - return res[:n].replace("\r\n", "\n") + return self._replacenl(res[:n]) def __next__(self): self._telling = False @@ -1189,59 +1222,136 @@ class TextIOWrapper(TextIOBase): line = self._pending start = 0 + cr_eof = False decoder = self._decoder or self._get_decoder() + pos = endpos = None + ending = None while True: - # In C we'd look for these in parallel of course. - nlpos = line.find("\n", start) - crpos = line.find("\r", start) - if nlpos >= 0 and crpos >= 0: - endpos = min(nlpos, crpos) - else: - endpos = nlpos if nlpos >= 0 else crpos - - if endpos != -1: - endc = line[endpos] - if endc == "\n": - ending = "\n" - break - - # We've seen \r - is it standalone, \r\n or \r at end of line? - if endpos + 1 < len(line): - if line[endpos+1] == "\n": - ending = "\r\n" + if self._readuniversal: + # Universal newline search. Find any of \r, \r\n, \n + + # In C we'd look for these in parallel of course. + nlpos = line.find("\n", start) + crpos = line.find("\r", start) + if crpos == -1: + if nlpos == -1: + start = len(line) else: - ending = "\r" + # Found \n + pos = nlpos + endpos = pos + 1 + ending = self._LF + break + elif nlpos == -1: + if crpos == len(line) - 1: + # Found \r at end of buffer, must keep reading + start = crpos + cr_eof = True + else: + # Found lone \r + ending = self._CR + pos = crpos + endpos = pos + 1 + break + elif nlpos < crpos: + # Found \n + pos = nlpos + endpos = pos + 1 + ending = self._LF + break + elif nlpos == crpos + 1: + # Found \r\n + ending = self._CRLF + pos = crpos + endpos = pos + 2 + break + else: + # Found \r + pos = crpos + endpos = pos + 1 + ending = self._CR break - # There might be a following \n in the next block of data ... - start = endpos else: - start = len(line) + # non-universal + pos = line.find(self._readnl) + if pos >= 0: + endpos = pos+len(self._readnl) + ending = self._nlflag(self._readnl) + break # No line ending seen yet - get more data + more_line = '' while True: readahead, pending = self._read_chunk() more_line = pending if more_line or not readahead: break + if more_line: + line += more_line + else: + # end of file + self._pending = '' + self._snapshot = None + if cr_eof: + self._seennl |= self._CR + return line[:-1] + '\n' + else: + return line - if not more_line: - ending = "" - endpos = len(line) - break - - line += more_line - - nextpos = endpos + len(ending) - self._pending = line[nextpos:] - - # XXX Update self.newlines here if we want to support that - - if self._fix_newlines and ending not in ("\n", ""): - return line[:endpos] + "\n" + self._pending = line[endpos:] + if self._readtranslate: + self._seennl |= ending + if ending != self._LF: + return line[:pos] + '\n' + else: + return line[:endpos] else: - return line[:nextpos] - + return line[:endpos] + + def _replacenl(self, data): + # Replace newlines in data as needed and record that they have + # been seen. + if not self._readtranslate: + return data + if self._readuniversal: + crlf = data.count('\r\n') + cr = data.count('\r') - crlf + lf = data.count('\n') - crlf + self._seennl |= (lf and self._LF) | (cr and self._CR) \ + | (crlf and self._CRLF) + if crlf: + data = data.replace("\r\n", "\n") + if cr: + data = data.replace("\r", "\n") + elif self._readnl == '\n': + # Only need to detect if \n was seen. + if data.count('\n'): + self._seennl |= self._LF + else: + newdata = data.replace(self._readnl, '\n') + if newdata is not data: + self._seennl |= self._nlflag(self._readnl) + data = newdata + return data + + _LF = 1 + _CR = 2 + _CRLF = 4 + @property + def newlines(self): + return (None, + "\n", + "\r", + ("\r", "\n"), + "\r\n", + ("\n", "\r\n"), + ("\r", "\r\n"), + ("\r", "\n", "\r\n") + )[self._seennl] + + def _nlflag(self, nlstr): + return [None, "\n", "\r", None, "\r\n"].index(nlstr) class StringIO(TextIOWrapper): diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index c555623..92e1567 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -1,5 +1,6 @@ """Unit tests for io.py.""" +import os import sys import time import array @@ -481,30 +482,61 @@ class TextIOWrapperTest(unittest.TestCase): def tearDown(self): test_support.unlink(test_support.TESTFN) + def testNewlinesInput(self): + testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" + normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") + for newline, expected in [ + (None, normalized.decode("ASCII").splitlines(True)), + ("", testdata.decode("ASCII").splitlines(True)), + ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), + ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), + ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), + ]: + buf = io.BytesIO(testdata) + txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline) + self.assertEquals(txt.readlines(), expected) + txt.seek(0) + self.assertEquals(txt.read(), "".join(expected)) + + def testNewlinesOutput(self): + testdict = { + "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", + "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", + "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", + "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", + } + tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) + for newline, expected in tests: + buf = io.BytesIO() + txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline) + txt.write("AAA\nB") + txt.write("BB\nCCC\n") + txt.write("X\rY\r\nZ") + txt.flush() + self.assertEquals(buf.getvalue(), expected) + def testNewlines(self): input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] tests = [ [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], - [ '\n', input_lines ], - [ '\r\n', input_lines ], + [ '', input_lines ], + [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], + [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], + [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], ] encodings = ('utf-8', 'latin-1') - # Try a range of pad sizes to test the case where \r is the last + # Try a range of buffer sizes to test the case where \r is the last # character in TextIOWrapper._pending_line. for encoding in encodings: + # XXX: str.encode() should return bytes + data = bytes(''.join(input_lines).encode(encoding)) for do_reads in (False, True): - for padlen in chain(range(10), range(50, 60)): - pad = '.' * padlen - data_lines = [ pad + line for line in input_lines ] - # XXX: str.encode() should return bytes - data = bytes(''.join(data_lines).encode(encoding)) - - for newline, exp_line_ends in tests: - exp_lines = [ pad + line for line in exp_line_ends ] - bufio = io.BufferedReader(io.BytesIO(data)) + for bufsize in range(1, 10): + for newline, exp_lines in tests: + bufio = io.BufferedReader(io.BytesIO(data), bufsize) textio = io.TextIOWrapper(bufio, newline=newline, encoding=encoding) if do_reads: @@ -522,6 +554,47 @@ class TextIOWrapperTest(unittest.TestCase): self.assertEquals(got_line, exp_line) self.assertEquals(len(got_lines), len(exp_lines)) + def testNewlinesInput(self): + testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" + normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") + for newline, expected in [ + (None, normalized.decode("ASCII").splitlines(True)), + ("", testdata.decode("ASCII").splitlines(True)), + ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), + ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), + ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), + ]: + buf = io.BytesIO(testdata) + txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline) + self.assertEquals(txt.readlines(), expected) + txt.seek(0) + self.assertEquals(txt.read(), "".join(expected)) + + def testNewlinesOutput(self): + import os + orig_linesep = os.linesep + data = "AAA\nBBB\rCCC\n" + data_lf = b"AAA\nBBB\rCCC\n" + data_cr = b"AAA\rBBB\rCCC\r" + data_crlf = b"AAA\r\nBBB\rCCC\r\n" + for os.linesep, newline, expected in [ + ("\n", None, data_lf), + ("\r\n", None, data_crlf), + ("\n", "", data_lf), + ("\r\n", "", data_lf), + ("\n", "\n", data_lf), + ("\r\n", "\n", data_lf), + ("\n", "\r", data_cr), + ("\r\n", "\r", data_cr), + ("\n", "\r\n", data_crlf), + ("\r\n", "\r\n", data_crlf), + ]: + buf = io.BytesIO() + txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline) + txt.write(data) + txt.close() + self.assertEquals(buf.getvalue(), expected) + # Systematic tests of the text I/O API def testBasicIO(self): diff --git a/Lib/test/test_univnewlines.py b/Lib/test/test_univnewlines.py index 7810cae..350bad3 100644 --- a/Lib/test/test_univnewlines.py +++ b/Lib/test/test_univnewlines.py @@ -12,9 +12,8 @@ FATX = 'x' * (2**14) DATA_TEMPLATE = [ "line1=1", - "line2='this is a very long line designed to go past the magic " + - "hundred character limit that is inside fileobject.c and which " + - "is meant to speed up the common case, but we also want to test " + + "line2='this is a very long line designed to go past any default " + + "buffer limits that exist in io.py but we also want to test " + "the uncommon case, naturally.'", "def line3():pass", "line4 = '%s'" % FATX, @@ -32,7 +31,7 @@ DATA_SPLIT = [x + "\n" for x in DATA_TEMPLATE] class TestGenericUnivNewlines(unittest.TestCase): # use a class variable DATA to define the data to write to the file # and a class variable NEWLINE to set the expected newlines value - READMODE = 'U' + READMODE = 'r' WRITEMODE = 'wb' def setUp(self): @@ -79,12 +78,6 @@ class TestGenericUnivNewlines(unittest.TestCase): self.assertEqual(data, DATA_SPLIT[1:]) -class TestNativeNewlines(TestGenericUnivNewlines): - NEWLINE = None - DATA = DATA_LF - READMODE = 'r' - WRITEMODE = 'w' - class TestCRNewlines(TestGenericUnivNewlines): NEWLINE = '\r' DATA = DATA_CR @@ -104,7 +97,6 @@ class TestMixedNewlines(TestGenericUnivNewlines): def test_main(): test_support.run_unittest( - TestNativeNewlines, TestCRNewlines, TestLFNewlines, TestCRLFNewlines, |