diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2008-08-14 22:44:29 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2008-08-14 22:44:29 (GMT) |
commit | 8769576477b88b05e6c857e6a63bed506eae014e (patch) | |
tree | 598f15f2d89e3fd8cab2d30052bd8be59b4be30c | |
parent | 74bbea7ed7a75a180d02846e93b53e24ef6ca651 (diff) | |
download | cpython-8769576477b88b05e6c857e6a63bed506eae014e.zip cpython-8769576477b88b05e6c857e6a63bed506eae014e.tar.gz cpython-8769576477b88b05e6c857e6a63bed506eae014e.tar.bz2 |
Merged revisions 65686 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r65686 | antoine.pitrou | 2008-08-14 23:04:30 +0200 (jeu., 14 août 2008) | 3 lines
Issue #3476: make BufferedReader and BufferedWriter thread-safe
........
-rw-r--r-- | Lib/io.py | 102 | ||||
-rw-r--r-- | Lib/test/test_cmd_line.py | 16 | ||||
-rw-r--r-- | Lib/test/test_io.py | 79 | ||||
-rw-r--r-- | Misc/NEWS | 3 |
4 files changed, 159 insertions, 41 deletions
@@ -61,6 +61,7 @@ import sys import codecs import _fileio import warnings +import threading # open() uses st_blksize whenever we can DEFAULT_BUFFER_SIZE = 8 * 1024 # bytes @@ -895,6 +896,7 @@ class BufferedReader(_BufferedIOMixin): _BufferedIOMixin.__init__(self, raw) self.buffer_size = buffer_size self._reset_read_buf() + self._read_lock = threading.Lock() def _reset_read_buf(self): self._read_buf = b"" @@ -908,6 +910,10 @@ class BufferedReader(_BufferedIOMixin): mode. If n is negative, read until EOF or until read() would block. """ + with self._read_lock: + return self._read_unlocked(n) + + def _read_unlocked(self, n=None): nodata_val = b"" empty_values = (b"", None) buf = self._read_buf @@ -960,6 +966,10 @@ class BufferedReader(_BufferedIOMixin): do at most one raw read to satisfy it. We never return more than self.buffer_size. """ + with self._read_lock: + return self._peek_unlocked(n) + + def _peek_unlocked(self, n=0): want = min(n, self.buffer_size) have = len(self._read_buf) - self._read_pos if have < want: @@ -976,18 +986,21 @@ class BufferedReader(_BufferedIOMixin): # only return buffered bytes. Otherwise, we do one raw read. if n <= 0: return b"" - self.peek(1) - return self.read(min(n, len(self._read_buf) - self._read_pos)) + with self._read_lock: + self._peek_unlocked(1) + return self._read_unlocked( + min(n, len(self._read_buf) - self._read_pos)) def tell(self): return self.raw.tell() - len(self._read_buf) + self._read_pos def seek(self, pos, whence=0): - if whence == 1: - pos -= len(self._read_buf) - self._read_pos - pos = self.raw.seek(pos, whence) - self._reset_read_buf() - return pos + with self._read_lock: + if whence == 1: + pos -= len(self._read_buf) - self._read_pos + pos = self.raw.seek(pos, whence) + self._reset_read_buf() + return pos class BufferedWriter(_BufferedIOMixin): @@ -1009,43 +1022,51 @@ class BufferedWriter(_BufferedIOMixin): if max_buffer_size is None else max_buffer_size) self._write_buf = bytearray() + self._write_lock = threading.Lock() def write(self, b): if self.closed: raise ValueError("write to closed file") if isinstance(b, str): raise TypeError("can't write str to binary stream") - # XXX we can implement some more tricks to try and avoid partial writes - if len(self._write_buf) > self.buffer_size: - # We're full, so let's pre-flush the buffer - try: - self.flush() - except BlockingIOError as e: - # We can't accept anything else. - # XXX Why not just let the exception pass through? - raise BlockingIOError(e.errno, e.strerror, 0) - before = len(self._write_buf) - self._write_buf.extend(b) - written = len(self._write_buf) - before - if len(self._write_buf) > self.buffer_size: - try: - self.flush() - except BlockingIOError as e: - if (len(self._write_buf) > self.max_buffer_size): - # We've hit max_buffer_size. We have to accept a partial - # write and cut back our buffer. - overage = len(self._write_buf) - self.max_buffer_size - self._write_buf = self._write_buf[:self.max_buffer_size] - raise BlockingIOError(e.errno, e.strerror, overage) - return written + with self._write_lock: + # XXX we can implement some more tricks to try and avoid + # partial writes + if len(self._write_buf) > self.buffer_size: + # We're full, so let's pre-flush the buffer + try: + self._flush_unlocked() + except BlockingIOError as e: + # We can't accept anything else. + # XXX Why not just let the exception pass through? + raise BlockingIOError(e.errno, e.strerror, 0) + before = len(self._write_buf) + self._write_buf.extend(b) + written = len(self._write_buf) - before + if len(self._write_buf) > self.buffer_size: + try: + self._flush_unlocked() + except BlockingIOError as e: + if len(self._write_buf) > self.max_buffer_size: + # We've hit max_buffer_size. We have to accept a + # partial write and cut back our buffer. + overage = len(self._write_buf) - self.max_buffer_size + self._write_buf = self._write_buf[:self.max_buffer_size] + raise BlockingIOError(e.errno, e.strerror, overage) + return written def truncate(self, pos=None): - self.flush() - if pos is None: - pos = self.raw.tell() - return self.raw.truncate(pos) + with self._write_lock: + self._flush_unlocked() + if pos is None: + pos = self.raw.tell() + return self.raw.truncate(pos) def flush(self): + with self._write_lock: + self._flush_unlocked() + + def _flush_unlocked(self): if self.closed: raise ValueError("flush of closed file") written = 0 @@ -1064,8 +1085,9 @@ class BufferedWriter(_BufferedIOMixin): return self.raw.tell() + len(self._write_buf) def seek(self, pos, whence=0): - self.flush() - return self.raw.seek(pos, whence) + with self._write_lock: + self._flush_unlocked() + return self.raw.seek(pos, whence) class BufferedRWPair(BufferedIOBase): @@ -1155,7 +1177,8 @@ class BufferedRandom(BufferedWriter, BufferedReader): # First do the raw seek, then empty the read buffer, so that # if the raw seek fails, we don't lose buffered data forever. pos = self.raw.seek(pos, whence) - self._reset_read_buf() + with self._read_lock: + self._reset_read_buf() return pos def tell(self): @@ -1192,8 +1215,9 @@ class BufferedRandom(BufferedWriter, BufferedReader): def write(self, b): if self._read_buf: # Undo readahead - self.raw.seek(self._read_pos - len(self._read_buf), 1) - self._reset_read_buf() + with self._read_lock: + self.raw.seek(self._read_pos - len(self._read_buf), 1) + self._reset_read_buf() return BufferedWriter.write(self, b) diff --git a/Lib/test/test_cmd_line.py b/Lib/test/test_cmd_line.py index 9ccf3f5..d63dfa1 100644 --- a/Lib/test/test_cmd_line.py +++ b/Lib/test/test_cmd_line.py @@ -3,11 +3,15 @@ # See test_cmd_line_script.py for testing of script execution import test.support, unittest +import os import sys import subprocess def _spawn_python(*args): - cmd_line = [sys.executable, '-E'] + cmd_line = [sys.executable] + # When testing -S, we need PYTHONPATH to work (see test_site_flag()) + if '-S' not in args: + cmd_line.append('-E') cmd_line.extend(args) return subprocess.Popen(cmd_line, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) @@ -59,6 +63,16 @@ class CmdLineTest(unittest.TestCase): self.verify_valid_flag('-Qwarnall') def test_site_flag(self): + if os.name == 'posix': + # Workaround bug #586680 by adding the extension dir to PYTHONPATH + from distutils.util import get_platform + s = "./build/lib.%s-%.3s" % (get_platform(), sys.version) + if hasattr(sys, 'gettotalrefcount'): + s += '-pydebug' + p = os.environ.get('PYTHONPATH', '') + if p: + p += ':' + os.environ['PYTHONPATH'] = p + s self.verify_valid_flag('-S') def test_usage(self): diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py index 320c7c3..bcb37d7 100644 --- a/Lib/test/test_io.py +++ b/Lib/test/test_io.py @@ -4,8 +4,10 @@ import os import sys import time import array +import threading +import random import unittest -from itertools import chain +from itertools import chain, cycle from test import support import codecs @@ -390,6 +392,49 @@ class BufferedReaderTest(unittest.TestCase): # this test. Else, write it. pass + def testThreads(self): + try: + # Write out many bytes with exactly the same number of 0's, + # 1's... 255's. This will help us check that concurrent reading + # doesn't duplicate or forget contents. + N = 1000 + l = list(range(256)) * N + random.shuffle(l) + s = bytes(bytearray(l)) + with io.open(support.TESTFN, "wb") as f: + f.write(s) + with io.open(support.TESTFN, "rb", buffering=0) as raw: + bufio = io.BufferedReader(raw, 8) + errors = [] + results = [] + def f(): + try: + # Intra-buffer read then buffer-flushing read + for n in cycle([1, 19]): + s = bufio.read(n) + if not s: + break + # list.append() is atomic + results.append(s) + except Exception as e: + errors.append(e) + raise + threads = [threading.Thread(target=f) for x in range(20)] + for t in threads: + t.start() + time.sleep(0.02) # yield + for t in threads: + t.join() + self.assertFalse(errors, + "the following exceptions were caught: %r" % errors) + s = b''.join(results) + for i in range(256): + c = bytes(bytearray([i])) + self.assertEqual(s.count(c), N) + finally: + support.unlink(support.TESTFN) + + class BufferedWriterTest(unittest.TestCase): @@ -446,6 +491,38 @@ class BufferedWriterTest(unittest.TestCase): self.assertEquals(b"abc", writer._write_stack[0]) + def testThreads(self): + # BufferedWriter should not raise exceptions or crash + # when called from multiple threads. + try: + # We use a real file object because it allows us to + # exercise situations where the GIL is released before + # writing the buffer to the raw streams. This is in addition + # to concurrency issues due to switching threads in the middle + # of Python code. + with io.open(support.TESTFN, "wb", buffering=0) as raw: + bufio = io.BufferedWriter(raw, 8) + errors = [] + def f(): + try: + # Write enough bytes to flush the buffer + s = b"a" * 19 + for i in range(50): + bufio.write(s) + except Exception as e: + errors.append(e) + raise + threads = [threading.Thread(target=f) for x in range(20)] + for t in threads: + t.start() + time.sleep(0.02) # yield + for t in threads: + t.join() + self.assertFalse(errors, + "the following exceptions were caught: %r" % errors) + finally: + support.unlink(support.TESTFN) + class BufferedRWPairTest(unittest.TestCase): @@ -30,6 +30,9 @@ Core and Builtins Library ------- +- Issue #3476: binary buffered reading through the new "io" library is now + thread-safe. + - Issue #1342811: Fix leak in Tkinter.Menu.delete. Commands associated to menu entries were not deleted. |