summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2007-08-18 21:39:55 (GMT)
committerGuido van Rossum <guido@python.org>2007-08-18 21:39:55 (GMT)
commit8358db22faa3d8fa9ac0ef2f2c1ff1770a843996 (patch)
treed3be700f72499f59816b5472ff3a03ae0150ac46 /Lib
parentb99f762f10edb2646a634c2290ecb064bd52e5c7 (diff)
downloadcpython-8358db22faa3d8fa9ac0ef2f2c1ff1770a843996.zip
cpython-8358db22faa3d8fa9ac0ef2f2c1ff1770a843996.tar.gz
cpython-8358db22faa3d8fa9ac0ef2f2c1ff1770a843996.tar.bz2
New I/O code from Tony Lownds implement newline feature correctly,
and implements .newlines attribute in a 2.x-compatible fashion.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/io.py210
-rw-r--r--Lib/test/test_io.py97
-rw-r--r--Lib/test/test_univnewlines.py14
3 files changed, 248 insertions, 73 deletions
diff --git a/Lib/io.py b/Lib/io.py
index 2f2d3c5..40ea687 100644
--- a/Lib/io.py
+++ b/Lib/io.py
@@ -61,10 +61,26 @@ def open(file, mode="r", buffering=None, encoding=None, newline=None):
can be: 0 = unbuffered, 1 = line buffered,
larger = fully buffered.
encoding: optional string giving the text encoding.
- newline: optional newlines specifier; must be None, '\n' or '\r\n';
- specifies the line ending expected on input and written on
- output. If None, use universal newlines on input and
- use os.linesep on output.
+ newline: optional newlines specifier; must be None, '', '\n', '\r'
+ or '\r\n'; all other values are illegal. It controls the
+ handling of line endings. It works as follows:
+
+ * On input, if `newline` is `None`, universal newlines
+ mode is enabled. Lines in the input can end in `'\n'`,
+ `'\r'`, or `'\r\n'`, and these are translated into
+ `'\n'` before being returned to the caller. If it is
+ `''`, universal newline mode is enabled, but line endings
+ are returned to the caller untranslated. If it has any of
+ the other legal values, input lines are only terminated by
+ the given string, and the line ending is returned to the
+ caller untranslated.
+
+ * On output, if `newline` is `None`, any `'\n'`
+ characters written are translated to the system default
+ line separator, `os.linesep`. If `newline` is `''`,
+ no translation takes place. If `newline` is any of the
+ other legal values, any `'\n'` characters written are
+ translated to the given string.
(*) If a file descriptor is given, it is closed when the returned
I/O object is closed. If you don't want this to happen, use
@@ -958,6 +974,17 @@ class TextIOBase(IOBase):
"""Subclasses should override."""
return None
+ @property
+ def newlines(self):
+ """newlines -> None | str | tuple of str. Line endings translated
+ so far.
+
+ Only line endings translated during reading are considered.
+
+ Subclasses should override.
+ """
+ return None
+
class TextIOWrapper(TextIOBase):
@@ -969,7 +996,7 @@ class TextIOWrapper(TextIOBase):
_CHUNK_SIZE = 128
def __init__(self, buffer, encoding=None, newline=None):
- if newline not in (None, "\n", "\r\n"):
+ if newline not in (None, "", "\n", "\r", "\r\n"):
raise ValueError("illegal newline value: %r" % (newline,))
if encoding is None:
try:
@@ -987,8 +1014,12 @@ class TextIOWrapper(TextIOBase):
self.buffer = buffer
self._encoding = encoding
- self._newline = newline or os.linesep
- self._fix_newlines = newline is None
+ self._readuniversal = not newline
+ self._readtranslate = newline is None
+ self._readnl = newline
+ self._writetranslate = newline != ''
+ self._writenl = newline or os.linesep
+ self._seennl = 0
self._decoder = None
self._pending = ""
self._snapshot = None
@@ -1032,13 +1063,15 @@ class TextIOWrapper(TextIOBase):
def write(self, s: str):
if self.closed:
raise ValueError("write to closed file")
+ haslf = "\n" in s
+ if haslf and self._writetranslate and self._writenl != "\n":
+ s = s.replace("\n", self._writenl)
# XXX What if we were just reading?
b = s.encode(self._encoding)
if isinstance(b, str):
b = bytes(b)
- n = self.buffer.write(b)
- if "\n" in s:
- # XXX only if isatty
+ self.buffer.write(b)
+ if haslf and self.isatty():
self.flush()
self._snapshot = self._decoder = None
return len(s)
@@ -1159,7 +1192,7 @@ class TextIOWrapper(TextIOBase):
res += decoder.decode(self.buffer.read(), True)
self._pending = ""
self._snapshot = None
- return res.replace("\r\n", "\n")
+ return self._replacenl(res)
else:
while len(res) < n:
readahead, pending = self._read_chunk()
@@ -1167,7 +1200,7 @@ class TextIOWrapper(TextIOBase):
if not readahead:
break
self._pending = res[n:]
- return res[:n].replace("\r\n", "\n")
+ return self._replacenl(res[:n])
def __next__(self):
self._telling = False
@@ -1189,59 +1222,136 @@ class TextIOWrapper(TextIOBase):
line = self._pending
start = 0
+ cr_eof = False
decoder = self._decoder or self._get_decoder()
+ pos = endpos = None
+ ending = None
while True:
- # In C we'd look for these in parallel of course.
- nlpos = line.find("\n", start)
- crpos = line.find("\r", start)
- if nlpos >= 0 and crpos >= 0:
- endpos = min(nlpos, crpos)
- else:
- endpos = nlpos if nlpos >= 0 else crpos
-
- if endpos != -1:
- endc = line[endpos]
- if endc == "\n":
- ending = "\n"
- break
-
- # We've seen \r - is it standalone, \r\n or \r at end of line?
- if endpos + 1 < len(line):
- if line[endpos+1] == "\n":
- ending = "\r\n"
+ if self._readuniversal:
+ # Universal newline search. Find any of \r, \r\n, \n
+
+ # In C we'd look for these in parallel of course.
+ nlpos = line.find("\n", start)
+ crpos = line.find("\r", start)
+ if crpos == -1:
+ if nlpos == -1:
+ start = len(line)
else:
- ending = "\r"
+ # Found \n
+ pos = nlpos
+ endpos = pos + 1
+ ending = self._LF
+ break
+ elif nlpos == -1:
+ if crpos == len(line) - 1:
+ # Found \r at end of buffer, must keep reading
+ start = crpos
+ cr_eof = True
+ else:
+ # Found lone \r
+ ending = self._CR
+ pos = crpos
+ endpos = pos + 1
+ break
+ elif nlpos < crpos:
+ # Found \n
+ pos = nlpos
+ endpos = pos + 1
+ ending = self._LF
+ break
+ elif nlpos == crpos + 1:
+ # Found \r\n
+ ending = self._CRLF
+ pos = crpos
+ endpos = pos + 2
+ break
+ else:
+ # Found \r
+ pos = crpos
+ endpos = pos + 1
+ ending = self._CR
break
- # There might be a following \n in the next block of data ...
- start = endpos
else:
- start = len(line)
+ # non-universal
+ pos = line.find(self._readnl)
+ if pos >= 0:
+ endpos = pos+len(self._readnl)
+ ending = self._nlflag(self._readnl)
+ break
# No line ending seen yet - get more data
+ more_line = ''
while True:
readahead, pending = self._read_chunk()
more_line = pending
if more_line or not readahead:
break
+ if more_line:
+ line += more_line
+ else:
+ # end of file
+ self._pending = ''
+ self._snapshot = None
+ if cr_eof:
+ self._seennl |= self._CR
+ return line[:-1] + '\n'
+ else:
+ return line
- if not more_line:
- ending = ""
- endpos = len(line)
- break
-
- line += more_line
-
- nextpos = endpos + len(ending)
- self._pending = line[nextpos:]
-
- # XXX Update self.newlines here if we want to support that
-
- if self._fix_newlines and ending not in ("\n", ""):
- return line[:endpos] + "\n"
+ self._pending = line[endpos:]
+ if self._readtranslate:
+ self._seennl |= ending
+ if ending != self._LF:
+ return line[:pos] + '\n'
+ else:
+ return line[:endpos]
else:
- return line[:nextpos]
-
+ return line[:endpos]
+
+ def _replacenl(self, data):
+ # Replace newlines in data as needed and record that they have
+ # been seen.
+ if not self._readtranslate:
+ return data
+ if self._readuniversal:
+ crlf = data.count('\r\n')
+ cr = data.count('\r') - crlf
+ lf = data.count('\n') - crlf
+ self._seennl |= (lf and self._LF) | (cr and self._CR) \
+ | (crlf and self._CRLF)
+ if crlf:
+ data = data.replace("\r\n", "\n")
+ if cr:
+ data = data.replace("\r", "\n")
+ elif self._readnl == '\n':
+ # Only need to detect if \n was seen.
+ if data.count('\n'):
+ self._seennl |= self._LF
+ else:
+ newdata = data.replace(self._readnl, '\n')
+ if newdata is not data:
+ self._seennl |= self._nlflag(self._readnl)
+ data = newdata
+ return data
+
+ _LF = 1
+ _CR = 2
+ _CRLF = 4
+ @property
+ def newlines(self):
+ return (None,
+ "\n",
+ "\r",
+ ("\r", "\n"),
+ "\r\n",
+ ("\n", "\r\n"),
+ ("\r", "\r\n"),
+ ("\r", "\n", "\r\n")
+ )[self._seennl]
+
+ def _nlflag(self, nlstr):
+ return [None, "\n", "\r", None, "\r\n"].index(nlstr)
class StringIO(TextIOWrapper):
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index c555623..92e1567 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -1,5 +1,6 @@
"""Unit tests for io.py."""
+import os
import sys
import time
import array
@@ -481,30 +482,61 @@ class TextIOWrapperTest(unittest.TestCase):
def tearDown(self):
test_support.unlink(test_support.TESTFN)
+ def testNewlinesInput(self):
+ testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
+ normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
+ for newline, expected in [
+ (None, normalized.decode("ASCII").splitlines(True)),
+ ("", testdata.decode("ASCII").splitlines(True)),
+ ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+ ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+ ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
+ ]:
+ buf = io.BytesIO(testdata)
+ txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
+ self.assertEquals(txt.readlines(), expected)
+ txt.seek(0)
+ self.assertEquals(txt.read(), "".join(expected))
+
+ def testNewlinesOutput(self):
+ testdict = {
+ "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+ "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+ "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
+ "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
+ }
+ tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
+ for newline, expected in tests:
+ buf = io.BytesIO()
+ txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
+ txt.write("AAA\nB")
+ txt.write("BB\nCCC\n")
+ txt.write("X\rY\r\nZ")
+ txt.flush()
+ self.assertEquals(buf.getvalue(), expected)
+
def testNewlines(self):
input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
tests = [
[ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
- [ '\n', input_lines ],
- [ '\r\n', input_lines ],
+ [ '', input_lines ],
+ [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
+ [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
+ [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
]
encodings = ('utf-8', 'latin-1')
- # Try a range of pad sizes to test the case where \r is the last
+ # Try a range of buffer sizes to test the case where \r is the last
# character in TextIOWrapper._pending_line.
for encoding in encodings:
+ # XXX: str.encode() should return bytes
+ data = bytes(''.join(input_lines).encode(encoding))
for do_reads in (False, True):
- for padlen in chain(range(10), range(50, 60)):
- pad = '.' * padlen
- data_lines = [ pad + line for line in input_lines ]
- # XXX: str.encode() should return bytes
- data = bytes(''.join(data_lines).encode(encoding))
-
- for newline, exp_line_ends in tests:
- exp_lines = [ pad + line for line in exp_line_ends ]
- bufio = io.BufferedReader(io.BytesIO(data))
+ for bufsize in range(1, 10):
+ for newline, exp_lines in tests:
+ bufio = io.BufferedReader(io.BytesIO(data), bufsize)
textio = io.TextIOWrapper(bufio, newline=newline,
encoding=encoding)
if do_reads:
@@ -522,6 +554,47 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEquals(got_line, exp_line)
self.assertEquals(len(got_lines), len(exp_lines))
+ def testNewlinesInput(self):
+ testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
+ normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
+ for newline, expected in [
+ (None, normalized.decode("ASCII").splitlines(True)),
+ ("", testdata.decode("ASCII").splitlines(True)),
+ ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+ ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+ ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
+ ]:
+ buf = io.BytesIO(testdata)
+ txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
+ self.assertEquals(txt.readlines(), expected)
+ txt.seek(0)
+ self.assertEquals(txt.read(), "".join(expected))
+
+ def testNewlinesOutput(self):
+ import os
+ orig_linesep = os.linesep
+ data = "AAA\nBBB\rCCC\n"
+ data_lf = b"AAA\nBBB\rCCC\n"
+ data_cr = b"AAA\rBBB\rCCC\r"
+ data_crlf = b"AAA\r\nBBB\rCCC\r\n"
+ for os.linesep, newline, expected in [
+ ("\n", None, data_lf),
+ ("\r\n", None, data_crlf),
+ ("\n", "", data_lf),
+ ("\r\n", "", data_lf),
+ ("\n", "\n", data_lf),
+ ("\r\n", "\n", data_lf),
+ ("\n", "\r", data_cr),
+ ("\r\n", "\r", data_cr),
+ ("\n", "\r\n", data_crlf),
+ ("\r\n", "\r\n", data_crlf),
+ ]:
+ buf = io.BytesIO()
+ txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
+ txt.write(data)
+ txt.close()
+ self.assertEquals(buf.getvalue(), expected)
+
# Systematic tests of the text I/O API
def testBasicIO(self):
diff --git a/Lib/test/test_univnewlines.py b/Lib/test/test_univnewlines.py
index 7810cae..350bad3 100644
--- a/Lib/test/test_univnewlines.py
+++ b/Lib/test/test_univnewlines.py
@@ -12,9 +12,8 @@ FATX = 'x' * (2**14)
DATA_TEMPLATE = [
"line1=1",
- "line2='this is a very long line designed to go past the magic " +
- "hundred character limit that is inside fileobject.c and which " +
- "is meant to speed up the common case, but we also want to test " +
+ "line2='this is a very long line designed to go past any default " +
+ "buffer limits that exist in io.py but we also want to test " +
"the uncommon case, naturally.'",
"def line3():pass",
"line4 = '%s'" % FATX,
@@ -32,7 +31,7 @@ DATA_SPLIT = [x + "\n" for x in DATA_TEMPLATE]
class TestGenericUnivNewlines(unittest.TestCase):
# use a class variable DATA to define the data to write to the file
# and a class variable NEWLINE to set the expected newlines value
- READMODE = 'U'
+ READMODE = 'r'
WRITEMODE = 'wb'
def setUp(self):
@@ -79,12 +78,6 @@ class TestGenericUnivNewlines(unittest.TestCase):
self.assertEqual(data, DATA_SPLIT[1:])
-class TestNativeNewlines(TestGenericUnivNewlines):
- NEWLINE = None
- DATA = DATA_LF
- READMODE = 'r'
- WRITEMODE = 'w'
-
class TestCRNewlines(TestGenericUnivNewlines):
NEWLINE = '\r'
DATA = DATA_CR
@@ -104,7 +97,6 @@ class TestMixedNewlines(TestGenericUnivNewlines):
def test_main():
test_support.run_unittest(
- TestNativeNewlines,
TestCRNewlines,
TestLFNewlines,
TestCRLFNewlines,