summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2007-02-27 05:47:44 (GMT)
committerGuido van Rossum <guido@python.org>2007-02-27 05:47:44 (GMT)
commit28524c7f10beb2821cae57a165ef920abbf76b77 (patch)
treecdfa308617e0a0b2ce2ee983859aa8d1144e1a32 /Lib
parenteff072c66c23b287d2fa77e2ef9114cb97d988dd (diff)
downloadcpython-28524c7f10beb2821cae57a165ef920abbf76b77.zip
cpython-28524c7f10beb2821cae57a165ef920abbf76b77.tar.gz
cpython-28524c7f10beb2821cae57a165ef920abbf76b77.tar.bz2
Checkpoint for new I/O library.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/io.py264
-rw-r--r--Lib/test/test_io.py62
2 files changed, 326 insertions, 0 deletions
diff --git a/Lib/io.py b/Lib/io.py
new file mode 100644
index 0000000..c714c6b
--- /dev/null
+++ b/Lib/io.py
@@ -0,0 +1,264 @@
+# Copyright 2006 Google, Inc. All Rights Reserved.
+# Licensed to PSF under a Contributor Agreement.
+
+"""New I/O library.
+
+See PEP XXX; for now: http://docs.google.com/Doc?id=dfksfvqd_1cn5g5m
+"""
+
+__author__ = "Guido van Rossum <guido@python.org>"
+
+__all__ = ["open", "RawIOBase", "FileIO", "SocketIO", "BytesIO"]
+
+import os
+
+def open(filename, mode="r", buffering=None, *, encoding=None):
+ """Replacement for the built-in open function, with encoding parameter."""
+ assert isinstance(filename, str)
+ assert isinstance(mode, str)
+ assert buffering is None or isinstance(buffering, int)
+ assert encoding is None or isinstance(encoding, str)
+ modes = set(mode)
+ if modes - set("arwb+t") or len(mode) > len(modes):
+ raise ValueError("invalid mode: %r" % mode)
+ reading = "r" in modes
+ writing = "w" in modes or "a" in modes
+ binary = "b" in modes
+ appending = "a" in modes
+ updating = "+" in modes
+ text = "t" in modes or not binary
+ if text and binary:
+ raise ValueError("can't have text and binary mode at once")
+ if reading + writing + appending > 1:
+ raise ValueError("can't have read/write/append mode at once")
+ if not (reading or writing or appending):
+ raise ValueError("must have exactly one of read/write/append mode")
+ if binary and encoding is not None:
+ raise ValueError("binary mode doesn't take an encoding")
+ raw = FileIO(filename,
+ (reading and "r" or "") +
+ (writing and "w" or "") +
+ (appending and "a" or "") +
+ (updating and "+" or ""))
+ if buffering is None:
+ buffering = 8*1024 # International standard buffer size
+ if buffering < 0:
+ raise ValueError("invalid buffering size")
+ if buffering == 0:
+ if binary:
+ return raw
+ raise ValueError("can't have unbuffered text I/O")
+ if updating:
+ buffer = BufferedRandom(raw, buffering)
+ elif writing:
+ buffer = BufferedWriter(raw, buffering)
+ else:
+ assert reading
+ buffer = BufferedReader(raw, buffering)
+ if binary:
+ return buffer
+ assert text
+ textio = TextIOWrapper(buffer) # Universal newlines default to on
+ return textio
+
+
+class RawIOBase:
+
+ """Base class for raw binary I/O."""
+
+ def read(self, n):
+ b = bytes(n.__index__())
+ self.readinto(b)
+ return b
+
+ def readinto(self, b):
+ raise IOError(".readinto() not supported")
+
+ def write(self, b):
+ raise IOError(".write() not supported")
+
+ def seek(self, pos, whence=0):
+ raise IOError(".seek() not supported")
+
+ def tell(self):
+ raise IOError(".tell() not supported")
+
+ def truncate(self, pos=None):
+ raise IOError(".truncate() not supported")
+
+ def close(self):
+ pass
+
+ def seekable(self):
+ return False
+
+ def readable(self):
+ return False
+
+ def writable(self):
+ return False
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+ def fileno(self):
+ raise IOError(".fileno() not supported")
+
+
+class FileIO(RawIOBase):
+
+ """Raw I/O implementation for OS files."""
+
+ def __init__(self, filename, mode):
+ self._seekable = None
+ self._mode = mode
+ if mode == "r":
+ flags = os.O_RDONLY
+ elif mode == "w":
+ flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
+ self._writable = True
+ elif mode == "r+":
+ flags = os.O_RDWR
+ else:
+ assert 0, "unsupported mode %r (for now)" % mode
+ if hasattr(os, "O_BINARY"):
+ flags |= os.O_BINARY
+ self._fd = os.open(filename, flags)
+
+ def readinto(self, b):
+ # XXX We really should have os.readinto()
+ b[:] = os.read(self._fd, len(b))
+ return len(b)
+
+ def write(self, b):
+ return os.write(self._fd, b)
+
+ def seek(self, pos, whence=0):
+ os.lseek(self._fd, pos, whence)
+
+ def tell(self):
+ return os.lseek(self._fd, 0, 1)
+
+ def truncate(self, pos=None):
+ if pos is None:
+ pos = self.tell()
+ os.ftruncate(self._fd, pos)
+
+ def close(self):
+ os.close(self._fd)
+
+ def readable(self):
+ return "r" in self._mode or "+" in self._mode
+
+ def writable(self):
+ return "w" in self._mode or "+" in self._mode or "a" in self._mode
+
+ def seekable(self):
+ if self._seekable is None:
+ try:
+ os.lseek(self._fd, 0, 1)
+ except os.error:
+ self._seekable = False
+ else:
+ self._seekable = True
+ return self._seekable
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+ def fileno(self):
+ return self._fd
+
+
+class SocketIO(RawIOBase):
+
+ """Raw I/O implementation for stream sockets."""
+
+ def __init__(self, sock, mode):
+ assert mode in ("r", "w", "rw")
+ self._sock = sock
+ self._mode = mode
+ self._readable = "r" in mode
+ self._writable = "w" in mode
+ self._seekable = False
+
+ def readinto(self, b):
+ return self._sock.recv_into(b)
+
+ def write(self, b):
+ return self._sock.send(b)
+
+ def close(self):
+ self._sock.close()
+
+ def readable(self):
+ return "r" in self._mode
+
+ def writable(self):
+ return "w" in self._mode
+
+
+class BytesIO(RawIOBase):
+
+ """Raw I/O implementation for bytes, like StringIO."""
+
+ def __init__(self, inital_bytes=None):
+ self._buffer = b""
+ self._pos = 0
+ if inital_bytes is not None:
+ self._buffer += inital_bytes
+
+ def getvalue(self):
+ return self._buffer
+
+ def read(self, n):
+ assert n >= 0
+ newpos = min(len(self._buffer), self._pos + n)
+ b = self._buffer[self._pos : newpos]
+ self._pos = newpos
+ return b
+
+ def readinto(self, b):
+ b[:] = self.read(len(b))
+
+ def write(self, b):
+ n = len(b)
+ newpos = self._pos + n
+ self._buffer[self._pos:newpos] = b
+ self._pos = newpos
+ return n
+
+ def seek(self, pos, whence=0):
+ if whence == 0:
+ self._pos = max(0, pos)
+ elif whence == 1:
+ self._pos = max(0, self._pos + pos)
+ elif whence == 2:
+ self._pos = max(0, len(self._buffer) + pos)
+ else:
+ raise IOError("invalid whence value")
+
+ def tell(self):
+ return self._pos
+
+ def truncate(self, pos=None):
+ if pos is None:
+ pos = self._pos
+ else:
+ self._pos = max(0, pos)
+ del self._buffer[pos:]
+
+ def readable(self):
+ return True
+
+ def writable(self):
+ return True
+
+ def seekable(self):
+ return True
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
new file mode 100644
index 0000000..0e3b03d
--- /dev/null
+++ b/Lib/test/test_io.py
@@ -0,0 +1,62 @@
+import unittest
+from test import test_support
+
+import io
+
+class IOTest(unittest.TestCase):
+
+ def write_ops(self, f):
+ f.write(b"blah.")
+ f.seek(0)
+ f.write(b"Hello.")
+ self.assertEqual(f.tell(), 6)
+ f.seek(-1, 1)
+ self.assertEqual(f.tell(), 5)
+ f.write(" world\n\n\n")
+ f.seek(0)
+ f.write("h")
+ f.seek(-2, 2)
+ f.truncate()
+
+ def read_ops(self, f):
+ data = f.read(5)
+ self.assertEqual(data, b"hello")
+ f.readinto(data)
+ self.assertEqual(data, b" worl")
+ f.readinto(data)
+ self.assertEqual(data, b"d\n")
+ f.seek(0)
+ self.assertEqual(f.read(20), b"hello world\n")
+ f.seek(-6, 2)
+ self.assertEqual(f.read(5), b"world")
+ f.seek(-6, 1)
+ self.assertEqual(f.read(5), b" worl")
+ self.assertEqual(f.tell(), 10)
+
+ def test_raw_file_io(self):
+ f = io.open(test_support.TESTFN, "wb", buffering=0)
+ self.assertEqual(f.readable(), False)
+ self.assertEqual(f.writable(), True)
+ self.assertEqual(f.seekable(), True)
+ self.write_ops(f)
+ f.close()
+ f = io.open(test_support.TESTFN, "rb", buffering=0)
+ self.assertEqual(f.readable(), True)
+ self.assertEqual(f.writable(), False)
+ self.assertEqual(f.seekable(), True)
+ self.read_ops(f)
+ f.close()
+
+ def test_raw_bytes_io(self):
+ f = io.BytesIO()
+ self.write_ops(f)
+ data = f.getvalue()
+ self.assertEqual(data, b"hello world\n")
+ f = io.BytesIO(data)
+ self.read_ops(f)
+
+def test_main():
+ test_support.run_unittest(IOTest)
+
+if __name__ == "__main__":
+ test_main()