summaryrefslogtreecommitdiffstats
path: root/Lib/io.py
diff options
context:
space:
mode:
authorAmaury Forgeot d'Arc <amauryfa@gmail.com>2007-11-19 20:34:10 (GMT)
committerAmaury Forgeot d'Arc <amauryfa@gmail.com>2007-11-19 20:34:10 (GMT)
commit1ff9910f595c1f50e1590caf19e839a917657b36 (patch)
tree407d598b311197be59ae6828d55b3d5dee4d6037 /Lib/io.py
parent74c29c71b1c9a68030e6b3fda6af54ca52b2c6c3 (diff)
downloadcpython-1ff9910f595c1f50e1590caf19e839a917657b36.zip
cpython-1ff9910f595c1f50e1590caf19e839a917657b36.tar.gz
cpython-1ff9910f595c1f50e1590caf19e839a917657b36.tar.bz2
Issue1395: Universal mode used to duplicate newlines when using read(1).
"Universal newline" is now an incremental decoder wrapping the initial one, with its own additional buffer (if '\r' is seen at the end of the input). A decoder allows the tell() funtion to record the state of the translation. This also simplifies the readline() process. Now test_netrc passes on Windows, as well as many new tests in test_io.py
Diffstat (limited to 'Lib/io.py')
-rw-r--r--Lib/io.py205
1 files changed, 116 insertions, 89 deletions
diff --git a/Lib/io.py b/Lib/io.py
index 74076d3..d2d2fbc 100644
--- a/Lib/io.py
+++ b/Lib/io.py
@@ -1041,6 +1041,84 @@ class TextIOBase(IOBase):
return None
+class IncrementalNewlineDecoder(codecs.IncrementalDecoder):
+ """Codec used when reading a file in universal newlines mode.
+ It wraps another incremental decoder, translating \\r\\n and \\r into \\n.
+ It also records the types of newlines encountered.
+ When used with translate=False, it ensures that the newline sequence is
+ returned in one piece.
+ """
+ def __init__(self, decoder, translate, errors='strict'):
+ codecs.IncrementalDecoder.__init__(self, errors=errors)
+ self.buffer = b''
+ self.translate = translate
+ self.decoder = decoder
+ self.seennl = 0
+
+ def decode(self, input, final=False):
+ # decode input (with the eventual \r from a previous pass)
+ if self.buffer:
+ input = self.buffer + input
+
+ output = self.decoder.decode(input, final=final)
+
+ # retain last \r even when not translating data:
+ # then readline() is sure to get \r\n in one pass
+ if output.endswith("\r") and not final:
+ output = output[:-1]
+ self.buffer = b'\r'
+ else:
+ self.buffer = b''
+
+ # Record which newlines are read
+ crlf = output.count('\r\n')
+ cr = output.count('\r') - crlf
+ lf = output.count('\n') - crlf
+ self.seennl |= (lf and self._LF) | (cr and self._CR) \
+ | (crlf and self._CRLF)
+
+ if self.translate:
+ if crlf:
+ output = output.replace("\r\n", "\n")
+ if cr:
+ output = output.replace("\r", "\n")
+
+ return output
+
+ def getstate(self):
+ buf, flag = self.decoder.getstate()
+ return buf + self.buffer, flag
+
+ def setstate(self, state):
+ buf, flag = state
+ if buf.endswith(b'\r'):
+ self.buffer = b'\r'
+ buf = buf[:-1]
+ else:
+ self.buffer = b''
+ self.decoder.setstate((buf, flag))
+
+ def reset(self):
+ self.buffer = b''
+ self.decoder.reset()
+
+ _LF = 1
+ _CR = 2
+ _CRLF = 4
+
+ @property
+ def newlines(self):
+ return (None,
+ "\n",
+ "\r",
+ ("\r", "\n"),
+ "\r\n",
+ ("\n", "\r\n"),
+ ("\r", "\r\n"),
+ ("\r", "\n", "\r\n")
+ )[self.seennl]
+
+
class TextIOWrapper(TextIOBase):
"""Buffered text stream.
@@ -1077,7 +1155,6 @@ class TextIOWrapper(TextIOBase):
self._readnl = newline
self._writetranslate = newline != ''
self._writenl = newline or os.linesep
- self._seennl = 0
self._decoder = None
self._pending = ""
self._snapshot = None
@@ -1124,6 +1201,7 @@ class TextIOWrapper(TextIOBase):
if not isinstance(s, str):
raise TypeError("can't write %s to text stream" %
s.__class__.__name__)
+ length = len(s)
haslf = "\n" in s
if haslf and self._writetranslate and self._writenl != "\n":
s = s.replace("\n", self._writenl)
@@ -1132,15 +1210,20 @@ class TextIOWrapper(TextIOBase):
self.buffer.write(b)
if haslf and self.isatty():
self.flush()
- self._snapshot = self._decoder = None
- return len(s)
+ self._snapshot = None
+ if self._decoder:
+ self._decoder.reset()
+ return length
def _get_decoder(self):
make_decoder = codecs.getincrementaldecoder(self._encoding)
if make_decoder is None:
raise IOError("Can't find an incremental decoder for encoding %s" %
self._encoding)
- decoder = self._decoder = make_decoder() # XXX: errors
+ decoder = make_decoder() # XXX: errors
+ if self._readuniversal:
+ decoder = IncrementalNewlineDecoder(decoder, self._readtranslate)
+ self._decoder = decoder
return decoder
def _read_chunk(self):
@@ -1220,7 +1303,8 @@ class TextIOWrapper(TextIOBase):
pos = self.buffer.seek(0, 2)
self._snapshot = None
self._pending = ""
- self._decoder = None
+ if self._decoder:
+ self._decoder.reset()
return pos
if whence != 0:
raise ValueError("Invalid whence (%r, should be 0, 1 or 2)" %
@@ -1234,7 +1318,8 @@ class TextIOWrapper(TextIOBase):
self.buffer.seek(pos)
self._snapshot = None
self._pending = ""
- self._decoder = None
+ if self._decoder:
+ self._decoder.reset()
return pos
decoder = self._decoder or self._get_decoder()
decoder.set_state(("", ds))
@@ -1253,7 +1338,7 @@ class TextIOWrapper(TextIOBase):
res += decoder.decode(self.buffer.read(), True)
self._pending = ""
self._snapshot = None
- return self._replacenl(res)
+ return res
else:
while len(res) < n:
readahead, pending = self._read_chunk()
@@ -1261,7 +1346,7 @@ class TextIOWrapper(TextIOBase):
if not readahead:
break
self._pending = res[n:]
- return self._replacenl(res[:n])
+ return res[:n]
def __next__(self):
self._telling = False
@@ -1285,62 +1370,55 @@ class TextIOWrapper(TextIOBase):
line = self._pending
start = 0
- cr_eof = False
decoder = self._decoder or self._get_decoder()
pos = endpos = None
- ending = None
while True:
- if self._readuniversal:
+ if self._readtranslate:
+ # Newlines are already translated, only search for \n
+ pos = line.find('\n', start)
+ if pos >= 0:
+ endpos = pos + 1
+ break
+ else:
+ start = len(line)
+
+ elif self._readuniversal:
# Universal newline search. Find any of \r, \r\n, \n
+ # The decoder ensures that \r\n are not split in two pieces
# In C we'd look for these in parallel of course.
nlpos = line.find("\n", start)
crpos = line.find("\r", start)
if crpos == -1:
if nlpos == -1:
+ # Nothing found
start = len(line)
else:
# Found \n
- pos = nlpos
- endpos = pos + 1
- ending = self._LF
+ endpos = nlpos + 1
break
elif nlpos == -1:
- if crpos == len(line) - 1:
- # Found \r at end of buffer, must keep reading
- start = crpos
- cr_eof = True
- else:
- # Found lone \r
- ending = self._CR
- pos = crpos
- endpos = pos + 1
- break
+ # Found lone \r
+ endpos = crpos + 1
+ break
elif nlpos < crpos:
# Found \n
- pos = nlpos
- endpos = pos + 1
- ending = self._LF
+ endpos = nlpos + 1
break
elif nlpos == crpos + 1:
# Found \r\n
- ending = self._CRLF
- pos = crpos
- endpos = pos + 2
+ endpos = crpos + 2
break
else:
# Found \r
- pos = crpos
- endpos = pos + 1
- ending = self._CR
+ endpos = crpos + 1
break
else:
# non-universal
pos = line.find(self._readnl)
if pos >= 0:
- endpos = pos+len(self._readnl)
- ending = self._nlflag(self._readnl)
+ endpos = pos + len(self._readnl)
break
# No line ending seen yet - get more data
@@ -1356,65 +1434,14 @@ class TextIOWrapper(TextIOBase):
# end of file
self._pending = ''
self._snapshot = None
- if cr_eof:
- self._seennl |= self._CR
- return line[:-1] + '\n'
- else:
- return line
+ return line
self._pending = line[endpos:]
- if self._readtranslate:
- self._seennl |= ending
- if ending != self._LF:
- return line[:pos] + '\n'
- else:
- return line[:endpos]
- else:
- return line[:endpos]
+ return line[:endpos]
- def _replacenl(self, data):
- # Replace newlines in data as needed and record that they have
- # been seen.
- if not self._readtranslate:
- return data
- if self._readuniversal:
- crlf = data.count('\r\n')
- cr = data.count('\r') - crlf
- lf = data.count('\n') - crlf
- self._seennl |= (lf and self._LF) | (cr and self._CR) \
- | (crlf and self._CRLF)
- if crlf:
- data = data.replace("\r\n", "\n")
- if cr:
- data = data.replace("\r", "\n")
- elif self._readnl == '\n':
- # Only need to detect if \n was seen.
- if data.count('\n'):
- self._seennl |= self._LF
- else:
- newdata = data.replace(self._readnl, '\n')
- if newdata is not data:
- self._seennl |= self._nlflag(self._readnl)
- data = newdata
- return data
-
- _LF = 1
- _CR = 2
- _CRLF = 4
@property
def newlines(self):
- return (None,
- "\n",
- "\r",
- ("\r", "\n"),
- "\r\n",
- ("\n", "\r\n"),
- ("\r", "\r\n"),
- ("\r", "\n", "\r\n")
- )[self._seennl]
-
- def _nlflag(self, nlstr):
- return [None, "\n", "\r", None, "\r\n"].index(nlstr)
+ return self._decoder.newlines if self._decoder else None
class StringIO(TextIOWrapper):