diff options
Diffstat (limited to 'Lib/io.py')
-rw-r--r-- | Lib/io.py | 102 |
1 files changed, 63 insertions, 39 deletions
@@ -63,6 +63,7 @@ import sys import codecs import _fileio import warnings +import threading # open() uses st_blksize whenever we can DEFAULT_BUFFER_SIZE = 8 * 1024 # bytes @@ -908,6 +909,7 @@ class BufferedReader(_BufferedIOMixin): _BufferedIOMixin.__init__(self, raw) self.buffer_size = buffer_size self._reset_read_buf() + self._read_lock = threading.Lock() def _reset_read_buf(self): self._read_buf = b"" @@ -921,6 +923,10 @@ class BufferedReader(_BufferedIOMixin): mode. If n is negative, read until EOF or until read() would block. """ + with self._read_lock: + return self._read_unlocked(n) + + def _read_unlocked(self, n=None): nodata_val = b"" empty_values = (b"", None) buf = self._read_buf @@ -973,6 +979,10 @@ class BufferedReader(_BufferedIOMixin): do at most one raw read to satisfy it. We never return more than self.buffer_size. """ + with self._read_lock: + return self._peek_unlocked(n) + + def _peek_unlocked(self, n=0): want = min(n, self.buffer_size) have = len(self._read_buf) - self._read_pos if have < want: @@ -989,18 +999,21 @@ class BufferedReader(_BufferedIOMixin): # only return buffered bytes. Otherwise, we do one raw read. if n <= 0: return b"" - self.peek(1) - return self.read(min(n, len(self._read_buf) - self._read_pos)) + with self._read_lock: + self._peek_unlocked(1) + return self._read_unlocked( + min(n, len(self._read_buf) - self._read_pos)) def tell(self): return self.raw.tell() - len(self._read_buf) + self._read_pos def seek(self, pos, whence=0): - if whence == 1: - pos -= len(self._read_buf) - self._read_pos - pos = self.raw.seek(pos, whence) - self._reset_read_buf() - return pos + with self._read_lock: + if whence == 1: + pos -= len(self._read_buf) - self._read_pos + pos = self.raw.seek(pos, whence) + self._reset_read_buf() + return pos class BufferedWriter(_BufferedIOMixin): @@ -1022,43 +1035,51 @@ class BufferedWriter(_BufferedIOMixin): if max_buffer_size is None else max_buffer_size) self._write_buf = bytearray() + self._write_lock = threading.Lock() def write(self, b): if self.closed: raise ValueError("write to closed file") if isinstance(b, unicode): raise TypeError("can't write unicode to binary stream") - # XXX we can implement some more tricks to try and avoid partial writes - if len(self._write_buf) > self.buffer_size: - # We're full, so let's pre-flush the buffer - try: - self.flush() - except BlockingIOError as e: - # We can't accept anything else. - # XXX Why not just let the exception pass through? - raise BlockingIOError(e.errno, e.strerror, 0) - before = len(self._write_buf) - self._write_buf.extend(b) - written = len(self._write_buf) - before - if len(self._write_buf) > self.buffer_size: - try: - self.flush() - except BlockingIOError as e: - if (len(self._write_buf) > self.max_buffer_size): - # We've hit max_buffer_size. We have to accept a partial - # write and cut back our buffer. - overage = len(self._write_buf) - self.max_buffer_size - self._write_buf = self._write_buf[:self.max_buffer_size] - raise BlockingIOError(e.errno, e.strerror, overage) - return written + with self._write_lock: + # XXX we can implement some more tricks to try and avoid + # partial writes + if len(self._write_buf) > self.buffer_size: + # We're full, so let's pre-flush the buffer + try: + self._flush_unlocked() + except BlockingIOError as e: + # We can't accept anything else. + # XXX Why not just let the exception pass through? + raise BlockingIOError(e.errno, e.strerror, 0) + before = len(self._write_buf) + self._write_buf.extend(b) + written = len(self._write_buf) - before + if len(self._write_buf) > self.buffer_size: + try: + self._flush_unlocked() + except BlockingIOError as e: + if len(self._write_buf) > self.max_buffer_size: + # We've hit max_buffer_size. We have to accept a + # partial write and cut back our buffer. + overage = len(self._write_buf) - self.max_buffer_size + self._write_buf = self._write_buf[:self.max_buffer_size] + raise BlockingIOError(e.errno, e.strerror, overage) + return written def truncate(self, pos=None): - self.flush() - if pos is None: - pos = self.raw.tell() - return self.raw.truncate(pos) + with self._write_lock: + self._flush_unlocked() + if pos is None: + pos = self.raw.tell() + return self.raw.truncate(pos) def flush(self): + with self._write_lock: + self._flush_unlocked() + + def _flush_unlocked(self): if self.closed: raise ValueError("flush of closed file") written = 0 @@ -1077,8 +1098,9 @@ class BufferedWriter(_BufferedIOMixin): return self.raw.tell() + len(self._write_buf) def seek(self, pos, whence=0): - self.flush() - return self.raw.seek(pos, whence) + with self._write_lock: + self._flush_unlocked() + return self.raw.seek(pos, whence) class BufferedRWPair(BufferedIOBase): @@ -1168,7 +1190,8 @@ class BufferedRandom(BufferedWriter, BufferedReader): # First do the raw seek, then empty the read buffer, so that # if the raw seek fails, we don't lose buffered data forever. pos = self.raw.seek(pos, whence) - self._reset_read_buf() + with self._read_lock: + self._reset_read_buf() return pos def tell(self): @@ -1205,8 +1228,9 @@ class BufferedRandom(BufferedWriter, BufferedReader): def write(self, b): if self._read_buf: # Undo readahead - self.raw.seek(self._read_pos - len(self._read_buf), 1) - self._reset_read_buf() + with self._read_lock: + self.raw.seek(self._read_pos - len(self._read_buf), 1) + self._reset_read_buf() return BufferedWriter.write(self, b) |