diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/_pyio.py | 21 | ||||
-rw-r--r-- | Lib/os.py | 1 | ||||
-rw-r--r-- | Lib/test/test_io.py | 2 | ||||
-rw-r--r-- | Lib/test/test_posix.py | 20 |
4 files changed, 35 insertions, 9 deletions
diff --git a/Lib/_pyio.py b/Lib/_pyio.py index b684a9f..c06f4b8 100644 --- a/Lib/_pyio.py +++ b/Lib/_pyio.py @@ -16,6 +16,11 @@ except ImportError: import io from io import (__all__, SEEK_SET, SEEK_CUR, SEEK_END) +valid_seek_flags = {0, 1, 2} # Hardwired values +if hasattr(os, 'SEEK_HOLE') : + valid_seek_flags.add(os.SEEK_HOLE) + valid_seek_flags.add(os.SEEK_DATA) + # open() uses st_blksize whenever we can DEFAULT_BUFFER_SIZE = 8 * 1024 # bytes @@ -306,6 +311,7 @@ class IOBase(metaclass=abc.ABCMeta): * 0 -- start of stream (the default); offset should be zero or positive * 1 -- current stream position; offset may be negative * 2 -- end of stream; offset is usually negative + Some operating systems / file systems could provide additional values. Return an int indicating the new absolute position. """ @@ -866,7 +872,7 @@ class BytesIO(BufferedIOBase): elif whence == 2: self._pos = max(0, len(self._buffer) + pos) else: - raise ValueError("invalid whence value") + raise ValueError("unsupported whence value") return self._pos def tell(self): @@ -1041,7 +1047,7 @@ class BufferedReader(_BufferedIOMixin): return _BufferedIOMixin.tell(self) - len(self._read_buf) + self._read_pos def seek(self, pos, whence=0): - if not (0 <= whence <= 2): + if whence not in valid_seek_flags: raise ValueError("invalid whence value") with self._read_lock: if whence == 1: @@ -1138,8 +1144,8 @@ class BufferedWriter(_BufferedIOMixin): return _BufferedIOMixin.tell(self) + len(self._write_buf) def seek(self, pos, whence=0): - if not (0 <= whence <= 2): - raise ValueError("invalid whence") + if whence not in valid_seek_flags: + raise ValueError("invalid whence value") with self._write_lock: self._flush_unlocked() return _BufferedIOMixin.seek(self, pos, whence) @@ -1235,8 +1241,8 @@ class BufferedRandom(BufferedWriter, BufferedReader): BufferedWriter.__init__(self, raw, buffer_size, max_buffer_size) def seek(self, pos, whence=0): - if not (0 <= whence <= 2): - raise ValueError("invalid whence") + if whence not in valid_seek_flags: + raise ValueError("invalid whence value") self.flush() if self._read_buf: # Undo read ahead. @@ -1852,8 +1858,7 @@ class TextIOWrapper(TextIOBase): self._decoder.reset() return position if whence != 0: - raise ValueError("invalid whence (%r, should be 0, 1 or 2)" % - (whence,)) + raise ValueError("unsupported whence (%r)" % (whence,)) if cookie < 0: raise ValueError("negative seek position %r" % (cookie,)) self.flush() @@ -121,6 +121,7 @@ del _names # Python uses fixed values for the SEEK_ constants; they are mapped # to native constants if necessary in posixmodule.c +# Other possible SEEK values are directly imported from posixmodule.c SEEK_SET = 0 SEEK_CUR = 1 SEEK_END = 2 diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 1951a06..54ba179 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -706,7 +706,7 @@ class CommonBufferedTests: bufio = self.tp(rawio) # Invalid whence self.assertRaises(ValueError, bufio.seek, 0, -1) - self.assertRaises(ValueError, bufio.seek, 0, 3) + self.assertRaises(ValueError, bufio.seek, 0, 9) def test_override_destructor(self): tp = self.tp diff --git a/Lib/test/test_posix.py b/Lib/test/test_posix.py index 21473fc..ae46866 100644 --- a/Lib/test/test_posix.py +++ b/Lib/test/test_posix.py @@ -1015,6 +1015,26 @@ class PosixTester(unittest.TestCase): posix.RTLD_GLOBAL posix.RTLD_LOCAL + @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'), + "test needs an OS that reports file holes") + def test_fs_holes(self) : + # Even if the filesystem doesn't report holes, + # if the OS supports it the SEEK_* constants + # will be defined and will have a consistent + # behaviour: + # os.SEEK_DATA = current position + # os.SEEK_HOLE = end of file position + with open(support.TESTFN, 'r+b') as fp : + fp.write(b"hello") + fp.flush() + size = fp.tell() + fno = fp.fileno() + for i in range(size) : + self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA)) + self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE)) + self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA) + self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE) + class PosixGroupsTester(unittest.TestCase): def setUp(self): |