diff options
Diffstat (limited to 'Lib/io.py')
-rw-r--r-- | Lib/io.py | 288 |
1 files changed, 278 insertions, 10 deletions
@@ -1769,20 +1769,20 @@ class TextIOWrapper(TextIOBase): def newlines(self): return self._decoder.newlines if self._decoder else None -class StringIO(TextIOWrapper): - """An in-memory stream for text. The initial_value argument sets the - value of object. The other arguments are like those of TextIOWrapper's - constructor. +class _StringIO(TextIOWrapper): + """Text I/O implementation using an in-memory buffer. + + The initial_value argument sets the value of object. The newline + argument is like the one of TextIOWrapper's constructor. """ # XXX This is really slow, but fully functional - def __init__(self, initial_value="", encoding="utf-8", - errors="strict", newline="\n"): - super(StringIO, self).__init__(BytesIO(), - encoding=encoding, - errors=errors, - newline=newline) + def __init__(self, initial_value="", newline="\n"): + super(_StringIO, self).__init__(BytesIO(), + encoding="utf-8", + errors="strict", + newline=newline) if initial_value: if not isinstance(initial_value, str): initial_value = str(initial_value) @@ -1792,3 +1792,271 @@ class StringIO(TextIOWrapper): def getvalue(self): self.flush() return self.buffer.getvalue().decode(self._encoding, self._errors) + +try: + import _stringio + + # This subclass is a reimplementation of the TextIOWrapper + # interface without any of its text decoding facilities. All the + # stored data is manipulated with the efficient + # _stringio._StringIO extension type. Also, the newline decoding + # mechanism of IncrementalNewlineDecoder is reimplemented here for + # efficiency. Doing otherwise, would require us to implement a + # fake decoder which would add an additional and unnecessary layer + # on top of the _StringIO methods. + + class StringIO(_stringio._StringIO, TextIOBase): + """Text I/O implementation using an in-memory buffer. + + The initial_value argument sets the value of object. The newline + argument is like the one of TextIOWrapper's constructor. + """ + + _CHUNK_SIZE = 4096 + + def __init__(self, initial_value="", newline="\n"): + if newline not in (None, "", "\n", "\r", "\r\n"): + raise ValueError("illegal newline value: %r" % (newline,)) + + self._readuniversal = not newline + self._readtranslate = newline is None + self._readnl = newline + self._writetranslate = newline != "" + self._writenl = newline or os.linesep + self._pending = "" + self._seennl = 0 + + # Reset the buffer first, in case __init__ is called + # multiple times. + self.truncate(0) + if initial_value is None: + initial_value = "" + self.write(initial_value) + self.seek(0) + + @property + def buffer(self): + raise UnsupportedOperation("%s.buffer attribute is unsupported" % + self.__class__.__name__) + + def _decode_newlines(self, input, final=False): + # decode input (with the eventual \r from a previous pass) + if self._pending: + input = self._pending + input + + # retain last \r even when not translating data: + # then readline() is sure to get \r\n in one pass + if input.endswith("\r") and not final: + input = input[:-1] + self._pending = "\r" + else: + self._pending = "" + + # Record which newlines are read + crlf = input.count('\r\n') + cr = input.count('\r') - crlf + lf = input.count('\n') - crlf + self._seennl |= (lf and self._LF) | (cr and self._CR) \ + | (crlf and self._CRLF) + + if self._readtranslate: + if crlf: + output = input.replace("\r\n", "\n") + if cr: + output = input.replace("\r", "\n") + else: + output = input + + return output + + def writable(self): + return True + + def readable(self): + return True + + def seekable(self): + return True + + _read = _stringio._StringIO.read + _write = _stringio._StringIO.write + _tell = _stringio._StringIO.tell + _seek = _stringio._StringIO.seek + _truncate = _stringio._StringIO.truncate + _getvalue = _stringio._StringIO.getvalue + + def getvalue(self) -> str: + """Retrieve the entire contents of the object.""" + if self.closed: + raise ValueError("read on closed file") + return self._getvalue() + + def write(self, s: str) -> int: + """Write string s to file. + + Returns the number of characters written. + """ + if self.closed: + raise ValueError("write to closed file") + if not isinstance(s, str): + raise TypeError("can't write %s to text stream" % + s.__class__.__name__) + length = len(s) + if self._writetranslate and self._writenl != "\n": + s = s.replace("\n", self._writenl) + self._pending = "" + self._write(s) + return length + + def read(self, n: int = None) -> str: + """Read at most n characters, returned as a string. + + If the argument is negative or omitted, read until EOF + is reached. Return an empty string at EOF. + """ + if self.closed: + raise ValueError("read to closed file") + if n is None: + n = -1 + res = self._pending + if n < 0: + res += self._decode_newlines(self._read(), True) + self._pending = "" + return res + else: + res = self._decode_newlines(self._read(n), True) + self._pending = res[n:] + return res[:n] + + def tell(self) -> int: + """Tell the current file position.""" + if self.closed: + raise ValueError("tell from closed file") + if self._pending: + return self._tell() - len(self._pending) + else: + return self._tell() + + def seek(self, pos: int = None, whence: int = 0) -> int: + """Change stream position. + + Seek to character offset pos relative to position indicated by whence: + 0 Start of stream (the default). pos should be >= 0; + 1 Current position - pos must be 0; + 2 End of stream - pos must be 0. + Returns the new absolute position. + """ + if self.closed: + raise ValueError("seek from closed file") + self._pending = "" + return self._seek(pos, whence) + + def truncate(self, pos: int = None) -> int: + """Truncate size to pos. + + The pos argument defaults to the current file position, as + returned by tell(). Imply an absolute seek to pos. + Returns the new absolute position. + """ + if self.closed: + raise ValueError("truncate from closed file") + self._pending = "" + return self._truncate(pos) + + def readline(self, limit: int = None) -> str: + if self.closed: + raise ValueError("read from closed file") + if limit is None: + limit = -1 + if limit >= 0: + # XXX: Hack to support limit argument, for backwards + # XXX compatibility + line = self.readline() + if len(line) <= limit: + return line + line, self._pending = line[:limit], line[limit:] + self._pending + return line + + line = self._pending + self._pending = "" + + start = 0 + pos = endpos = None + while True: + if self._readtranslate: + # Newlines are already translated, only search for \n + pos = line.find('\n', start) + if pos >= 0: + endpos = pos + 1 + break + else: + start = len(line) + + elif self._readuniversal: + # Universal newline search. Find any of \r, \r\n, \n + # The decoder ensures that \r\n are not split in two pieces + + # In C we'd look for these in parallel of course. + nlpos = line.find("\n", start) + crpos = line.find("\r", start) + if crpos == -1: + if nlpos == -1: + # Nothing found + start = len(line) + else: + # Found \n + endpos = nlpos + 1 + break + elif nlpos == -1: + # Found lone \r + endpos = crpos + 1 + break + elif nlpos < crpos: + # Found \n + endpos = nlpos + 1 + break + elif nlpos == crpos + 1: + # Found \r\n + endpos = crpos + 2 + break + else: + # Found \r + endpos = crpos + 1 + break + else: + # non-universal + pos = line.find(self._readnl) + if pos >= 0: + endpos = pos + len(self._readnl) + break + + # No line ending seen yet - get more data + more_line = self.read(self._CHUNK_SIZE) + if more_line: + line += more_line + else: + # end of file + return line + + self._pending = line[endpos:] + return line[:endpos] + + _LF = 1 + _CR = 2 + _CRLF = 4 + + @property + def newlines(self): + return (None, + "\n", + "\r", + ("\r", "\n"), + "\r\n", + ("\n", "\r\n"), + ("\r", "\r\n"), + ("\r", "\n", "\r\n") + )[self._seennl] + + +except ImportError: + StringIO = _StringIO |