diff options
author | Guido van Rossum <guido@python.org> | 2007-04-10 21:06:59 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2007-04-10 21:06:59 (GMT) |
commit | 8742977b33f2af9b92265c1b332af0f6798bf2b6 (patch) | |
tree | 938cba4969aa85a38eaaf096b33c3a743b1bda2a | |
parent | 34d69e57e31763bdfcff77dc3689ec42a5228814 (diff) | |
download | cpython-8742977b33f2af9b92265c1b332af0f6798bf2b6.zip cpython-8742977b33f2af9b92265c1b332af0f6798bf2b6.tar.gz cpython-8742977b33f2af9b92265c1b332af0f6798bf2b6.tar.bz2 |
truncate() returns the new size and position.
write() returns the number of bytes/characters written/buffered.
FileIO.close() calls self.flush().
Implement readinto() for buffered readers.
Tests th check all these.
Test proper behavior of __enter__/__exit__.
-rw-r--r-- | Lib/io.py | 25 | ||||
-rw-r--r-- | Lib/test/test_io.py | 83 | ||||
-rw-r--r-- | Modules/_fileio.c | 14 |
3 files changed, 95 insertions, 27 deletions
@@ -177,10 +177,11 @@ class IOBase: """tell() -> int. Return current stream position.""" return self.seek(0, 1) - def truncate(self, pos: int = None) -> None: - """truncate(size: int = None) -> None. Truncate file to size bytes. + def truncate(self, pos: int = None) -> int: + """truncate(size: int = None) -> int. Truncate file to size bytes. Size defaults to the current IO position as reported by tell(). + Returns the new size. """ self._unsupported("truncate") @@ -329,6 +330,10 @@ class FileIO(_fileio._FileIO, RawIOBase): would be hard to do since _fileio.c is written in C). """ + def close(self): + _fileio._FileIO.close(self) + RawIOBase.close(self) + class SocketIO(RawIOBase): @@ -413,7 +418,10 @@ class BufferedIOBase(IOBase): Raises BlockingIOError if the underlying raw stream has no data at the moment. """ - self._unsupported("readinto") + data = self.read(len(b)) + n = len(data) + b[:n] = data + return n def write(self, b: bytes) -> int: """write(b: bytes) -> int. Write the given buffer to the IO stream. @@ -448,7 +456,7 @@ class _BufferedIOMixin(BufferedIOBase): return self.raw.tell() def truncate(self, pos=None): - self.raw.truncate(pos) + return self.raw.truncate(pos) ### Flush and close ### @@ -503,12 +511,6 @@ class _MemoryIOMixin(BufferedIOBase): self._pos = newpos return b - def readinto(self, b): - tmp = self.read(len(b)) - n = len(tmp) - b[:n] = tmp - return n - def write(self, b): n = len(b) newpos = self._pos + n @@ -536,6 +538,7 @@ class _MemoryIOMixin(BufferedIOBase): else: self._pos = max(0, pos) del self._buffer[pos:] + return pos def readable(self): return True @@ -652,7 +655,6 @@ class BufferedWriter(_BufferedIOMixin): def write(self, b): # XXX we can implement some more tricks to try and avoid partial writes - ##assert issubclass(type(b), bytes) if len(self._write_buf) > self.buffer_size: # We're full, so let's pre-flush the buffer try: @@ -672,6 +674,7 @@ class BufferedWriter(_BufferedIOMixin): overage = len(self._write_buf) - self.max_buffer_size self._write_buf = self._write_buf[:self.max_buffer_size] raise BlockingIOError(e.errno, e.strerror, overage) + return len(b) def flush(self): written = 0 diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 0a37fc2..27fd56f 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -79,17 +79,19 @@ class IOTest(unittest.TestCase): test_support.unlink(test_support.TESTFN) def write_ops(self, f): - f.write(b"blah.") - f.seek(0) - f.write(b"Hello.") + self.assertEqual(f.write(b"blah."), 5) + self.assertEqual(f.seek(0), 0) + self.assertEqual(f.write(b"Hello."), 6) self.assertEqual(f.tell(), 6) - f.seek(-1, 1) + self.assertEqual(f.seek(-1, 1), 5) self.assertEqual(f.tell(), 5) - f.write(" world\n\n\n") - f.seek(0) - f.write("h") - f.seek(-2, 2) - f.truncate() + self.assertEqual(f.write(" world\n\n\n"), 9) + self.assertEqual(f.seek(0), 0) + self.assertEqual(f.write("h"), 1) + self.assertEqual(f.seek(-1, 2), 13) + self.assertEqual(f.tell(), 13) + self.assertEqual(f.truncate(12), 12) + self.assertEqual(f.tell(), 12) LARGE = 2**31 @@ -101,10 +103,10 @@ class IOTest(unittest.TestCase): self.assertEqual(f.write(b"xxx"), 3) self.assertEqual(f.tell(), self.LARGE + 3) self.assertEqual(f.seek(-1, 1), self.LARGE + 2) - f.truncate() + self.assertEqual(f.truncate(), self.LARGE + 2) self.assertEqual(f.tell(), self.LARGE + 2) self.assertEqual(f.seek(0, 2), self.LARGE + 2) - f.truncate(self.LARGE + 1) + self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) self.assertEqual(f.tell(), self.LARGE + 1) self.assertEqual(f.seek(0, 2), self.LARGE + 1) self.assertEqual(f.seek(-1, 2), self.LARGE) @@ -142,6 +144,20 @@ class IOTest(unittest.TestCase): self.read_ops(f) f.close() + def test_buffered_file_io(self): + f = io.open(test_support.TESTFN, "wb") + self.assertEqual(f.readable(), False) + self.assertEqual(f.writable(), True) + self.assertEqual(f.seekable(), True) + self.write_ops(f) + f.close() + f = io.open(test_support.TESTFN, "rb") + self.assertEqual(f.readable(), True) + self.assertEqual(f.writable(), False) + self.assertEqual(f.seekable(), True) + self.read_ops(f) + f.close() + def test_raw_bytes_io(self): f = io.BytesIO() self.write_ops(f) @@ -163,9 +179,52 @@ class IOTest(unittest.TestCase): print("Use 'regrtest.py -u largefile test_io' to run it.", file=sys.stderr) return - f = io.open(test_support.TESTFN, "w+b", buffering=0) + f = io.open(test_support.TESTFN, "w+b", 0) self.large_file_ops(f) f.close() + f = io.open(test_support.TESTFN, "w+b") + self.large_file_ops(f) + f.close() + + def test_with_open(self): + for bufsize in (0, 1, 100): + f = None + with open(test_support.TESTFN, "wb", bufsize) as f: + f.write("xxx") + self.assertEqual(f.closed, True) + f = None + try: + with open(test_support.TESTFN, "wb", bufsize) as f: + 1/0 + except ZeroDivisionError: + self.assertEqual(f.closed, True) + else: + self.fail("1/0 didn't raise an exception") + + def test_destructor(self): + record = [] + class MyFileIO(io.FileIO): + def __del__(self): + record.append(1) + io.FileIO.__del__(self) + def close(self): + record.append(2) + io.FileIO.close(self) + def flush(self): + record.append(3) + io.FileIO.flush(self) + f = MyFileIO(test_support.TESTFN, "w") + f.write("xxx") + del f + self.assertEqual(record, [1, 2, 3]) + + def test_close_flushes(self): + f = io.open(test_support.TESTFN, "wb") + f.write("xxx") + f.close() + f = io.open(test_support.TESTFN, "rb") + self.assertEqual(f.read(), b"xxx") + f.close() class MemorySeekTestMixin: diff --git a/Modules/_fileio.c b/Modules/_fileio.c index 985fa81..97b2199 100644 --- a/Modules/_fileio.c +++ b/Modules/_fileio.c @@ -530,6 +530,9 @@ fileio_truncate(PyFileIOObject *self, PyObject *args) if (!PyArg_ParseTuple(args, "|O", &posobj)) return NULL; + if (posobj == Py_None) + posobj = NULL; + if (posobj == NULL) whence = 1; else @@ -545,19 +548,22 @@ fileio_truncate(PyFileIOObject *self, PyObject *args) pos = PyLong_Check(posobj) ? PyLong_AsLongLong(posobj) : PyInt_AsLong(posobj); #endif - Py_DECREF(posobj); - if (PyErr_Occurred()) + if (PyErr_Occurred()) { + Py_DECREF(posobj); return NULL; + } Py_BEGIN_ALLOW_THREADS errno = 0; pos = ftruncate(fd, pos); Py_END_ALLOW_THREADS - if (errno < 0) + if (pos < 0) { + Py_DECREF(posobj); PyErr_SetFromErrno(PyExc_IOError); + } - Py_RETURN_NONE; + return posobj; } static char * |