summaryrefslogtreecommitdiffstats
path: root/Lib/io.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/io.py')
-rw-r--r--Lib/io.py288
1 files changed, 278 insertions, 10 deletions
diff --git a/Lib/io.py b/Lib/io.py
index 0c993b1..f543e20 100644
--- a/Lib/io.py
+++ b/Lib/io.py
@@ -1769,20 +1769,20 @@ class TextIOWrapper(TextIOBase):
def newlines(self):
return self._decoder.newlines if self._decoder else None
-class StringIO(TextIOWrapper):
- """An in-memory stream for text. The initial_value argument sets the
- value of object. The other arguments are like those of TextIOWrapper's
- constructor.
+class _StringIO(TextIOWrapper):
+ """Text I/O implementation using an in-memory buffer.
+
+ The initial_value argument sets the value of object. The newline
+ argument is like the one of TextIOWrapper's constructor.
"""
# XXX This is really slow, but fully functional
- def __init__(self, initial_value="", encoding="utf-8",
- errors="strict", newline="\n"):
- super(StringIO, self).__init__(BytesIO(),
- encoding=encoding,
- errors=errors,
- newline=newline)
+ def __init__(self, initial_value="", newline="\n"):
+ super(_StringIO, self).__init__(BytesIO(),
+ encoding="utf-8",
+ errors="strict",
+ newline=newline)
if initial_value:
if not isinstance(initial_value, str):
initial_value = str(initial_value)
@@ -1792,3 +1792,271 @@ class StringIO(TextIOWrapper):
def getvalue(self):
self.flush()
return self.buffer.getvalue().decode(self._encoding, self._errors)
+
+try:
+ import _stringio
+
+ # This subclass is a reimplementation of the TextIOWrapper
+ # interface without any of its text decoding facilities. All the
+ # stored data is manipulated with the efficient
+ # _stringio._StringIO extension type. Also, the newline decoding
+ # mechanism of IncrementalNewlineDecoder is reimplemented here for
+ # efficiency. Doing otherwise, would require us to implement a
+ # fake decoder which would add an additional and unnecessary layer
+ # on top of the _StringIO methods.
+
+ class StringIO(_stringio._StringIO, TextIOBase):
+ """Text I/O implementation using an in-memory buffer.
+
+ The initial_value argument sets the value of object. The newline
+ argument is like the one of TextIOWrapper's constructor.
+ """
+
+ _CHUNK_SIZE = 4096
+
+ def __init__(self, initial_value="", newline="\n"):
+ if newline not in (None, "", "\n", "\r", "\r\n"):
+ raise ValueError("illegal newline value: %r" % (newline,))
+
+ self._readuniversal = not newline
+ self._readtranslate = newline is None
+ self._readnl = newline
+ self._writetranslate = newline != ""
+ self._writenl = newline or os.linesep
+ self._pending = ""
+ self._seennl = 0
+
+ # Reset the buffer first, in case __init__ is called
+ # multiple times.
+ self.truncate(0)
+ if initial_value is None:
+ initial_value = ""
+ self.write(initial_value)
+ self.seek(0)
+
+ @property
+ def buffer(self):
+ raise UnsupportedOperation("%s.buffer attribute is unsupported" %
+ self.__class__.__name__)
+
+ def _decode_newlines(self, input, final=False):
+ # decode input (with the eventual \r from a previous pass)
+ if self._pending:
+ input = self._pending + input
+
+ # retain last \r even when not translating data:
+ # then readline() is sure to get \r\n in one pass
+ if input.endswith("\r") and not final:
+ input = input[:-1]
+ self._pending = "\r"
+ else:
+ self._pending = ""
+
+ # Record which newlines are read
+ crlf = input.count('\r\n')
+ cr = input.count('\r') - crlf
+ lf = input.count('\n') - crlf
+ self._seennl |= (lf and self._LF) | (cr and self._CR) \
+ | (crlf and self._CRLF)
+
+ if self._readtranslate:
+ if crlf:
+ output = input.replace("\r\n", "\n")
+ if cr:
+ output = input.replace("\r", "\n")
+ else:
+ output = input
+
+ return output
+
+ def writable(self):
+ return True
+
+ def readable(self):
+ return True
+
+ def seekable(self):
+ return True
+
+ _read = _stringio._StringIO.read
+ _write = _stringio._StringIO.write
+ _tell = _stringio._StringIO.tell
+ _seek = _stringio._StringIO.seek
+ _truncate = _stringio._StringIO.truncate
+ _getvalue = _stringio._StringIO.getvalue
+
+ def getvalue(self) -> str:
+ """Retrieve the entire contents of the object."""
+ if self.closed:
+ raise ValueError("read on closed file")
+ return self._getvalue()
+
+ def write(self, s: str) -> int:
+ """Write string s to file.
+
+ Returns the number of characters written.
+ """
+ if self.closed:
+ raise ValueError("write to closed file")
+ if not isinstance(s, str):
+ raise TypeError("can't write %s to text stream" %
+ s.__class__.__name__)
+ length = len(s)
+ if self._writetranslate and self._writenl != "\n":
+ s = s.replace("\n", self._writenl)
+ self._pending = ""
+ self._write(s)
+ return length
+
+ def read(self, n: int = None) -> str:
+ """Read at most n characters, returned as a string.
+
+ If the argument is negative or omitted, read until EOF
+ is reached. Return an empty string at EOF.
+ """
+ if self.closed:
+ raise ValueError("read to closed file")
+ if n is None:
+ n = -1
+ res = self._pending
+ if n < 0:
+ res += self._decode_newlines(self._read(), True)
+ self._pending = ""
+ return res
+ else:
+ res = self._decode_newlines(self._read(n), True)
+ self._pending = res[n:]
+ return res[:n]
+
+ def tell(self) -> int:
+ """Tell the current file position."""
+ if self.closed:
+ raise ValueError("tell from closed file")
+ if self._pending:
+ return self._tell() - len(self._pending)
+ else:
+ return self._tell()
+
+ def seek(self, pos: int = None, whence: int = 0) -> int:
+ """Change stream position.
+
+ Seek to character offset pos relative to position indicated by whence:
+ 0 Start of stream (the default). pos should be >= 0;
+ 1 Current position - pos must be 0;
+ 2 End of stream - pos must be 0.
+ Returns the new absolute position.
+ """
+ if self.closed:
+ raise ValueError("seek from closed file")
+ self._pending = ""
+ return self._seek(pos, whence)
+
+ def truncate(self, pos: int = None) -> int:
+ """Truncate size to pos.
+
+ The pos argument defaults to the current file position, as
+ returned by tell(). Imply an absolute seek to pos.
+ Returns the new absolute position.
+ """
+ if self.closed:
+ raise ValueError("truncate from closed file")
+ self._pending = ""
+ return self._truncate(pos)
+
+ def readline(self, limit: int = None) -> str:
+ if self.closed:
+ raise ValueError("read from closed file")
+ if limit is None:
+ limit = -1
+ if limit >= 0:
+ # XXX: Hack to support limit argument, for backwards
+ # XXX compatibility
+ line = self.readline()
+ if len(line) <= limit:
+ return line
+ line, self._pending = line[:limit], line[limit:] + self._pending
+ return line
+
+ line = self._pending
+ self._pending = ""
+
+ start = 0
+ pos = endpos = None
+ while True:
+ if self._readtranslate:
+ # Newlines are already translated, only search for \n
+ pos = line.find('\n', start)
+ if pos >= 0:
+ endpos = pos + 1
+ break
+ else:
+ start = len(line)
+
+ elif self._readuniversal:
+ # Universal newline search. Find any of \r, \r\n, \n
+ # The decoder ensures that \r\n are not split in two pieces
+
+ # In C we'd look for these in parallel of course.
+ nlpos = line.find("\n", start)
+ crpos = line.find("\r", start)
+ if crpos == -1:
+ if nlpos == -1:
+ # Nothing found
+ start = len(line)
+ else:
+ # Found \n
+ endpos = nlpos + 1
+ break
+ elif nlpos == -1:
+ # Found lone \r
+ endpos = crpos + 1
+ break
+ elif nlpos < crpos:
+ # Found \n
+ endpos = nlpos + 1
+ break
+ elif nlpos == crpos + 1:
+ # Found \r\n
+ endpos = crpos + 2
+ break
+ else:
+ # Found \r
+ endpos = crpos + 1
+ break
+ else:
+ # non-universal
+ pos = line.find(self._readnl)
+ if pos >= 0:
+ endpos = pos + len(self._readnl)
+ break
+
+ # No line ending seen yet - get more data
+ more_line = self.read(self._CHUNK_SIZE)
+ if more_line:
+ line += more_line
+ else:
+ # end of file
+ return line
+
+ self._pending = line[endpos:]
+ return line[:endpos]
+
+ _LF = 1
+ _CR = 2
+ _CRLF = 4
+
+ @property
+ def newlines(self):
+ return (None,
+ "\n",
+ "\r",
+ ("\r", "\n"),
+ "\r\n",
+ ("\n", "\r\n"),
+ ("\r", "\r\n"),
+ ("\r", "\n", "\r\n")
+ )[self._seennl]
+
+
+except ImportError:
+ StringIO = _StringIO