summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/io.py102
-rw-r--r--Lib/test/test_cmd_line.py16
-rw-r--r--Lib/test/test_io.py79
-rw-r--r--Misc/NEWS3
4 files changed, 159 insertions, 41 deletions
diff --git a/Lib/io.py b/Lib/io.py
index ab59ddc..3e48eb1 100644
--- a/Lib/io.py
+++ b/Lib/io.py
@@ -61,6 +61,7 @@ import sys
import codecs
import _fileio
import warnings
+import threading
# open() uses st_blksize whenever we can
DEFAULT_BUFFER_SIZE = 8 * 1024 # bytes
@@ -895,6 +896,7 @@ class BufferedReader(_BufferedIOMixin):
_BufferedIOMixin.__init__(self, raw)
self.buffer_size = buffer_size
self._reset_read_buf()
+ self._read_lock = threading.Lock()
def _reset_read_buf(self):
self._read_buf = b""
@@ -908,6 +910,10 @@ class BufferedReader(_BufferedIOMixin):
mode. If n is negative, read until EOF or until read() would
block.
"""
+ with self._read_lock:
+ return self._read_unlocked(n)
+
+ def _read_unlocked(self, n=None):
nodata_val = b""
empty_values = (b"", None)
buf = self._read_buf
@@ -960,6 +966,10 @@ class BufferedReader(_BufferedIOMixin):
do at most one raw read to satisfy it. We never return more
than self.buffer_size.
"""
+ with self._read_lock:
+ return self._peek_unlocked(n)
+
+ def _peek_unlocked(self, n=0):
want = min(n, self.buffer_size)
have = len(self._read_buf) - self._read_pos
if have < want:
@@ -976,18 +986,21 @@ class BufferedReader(_BufferedIOMixin):
# only return buffered bytes. Otherwise, we do one raw read.
if n <= 0:
return b""
- self.peek(1)
- return self.read(min(n, len(self._read_buf) - self._read_pos))
+ with self._read_lock:
+ self._peek_unlocked(1)
+ return self._read_unlocked(
+ min(n, len(self._read_buf) - self._read_pos))
def tell(self):
return self.raw.tell() - len(self._read_buf) + self._read_pos
def seek(self, pos, whence=0):
- if whence == 1:
- pos -= len(self._read_buf) - self._read_pos
- pos = self.raw.seek(pos, whence)
- self._reset_read_buf()
- return pos
+ with self._read_lock:
+ if whence == 1:
+ pos -= len(self._read_buf) - self._read_pos
+ pos = self.raw.seek(pos, whence)
+ self._reset_read_buf()
+ return pos
class BufferedWriter(_BufferedIOMixin):
@@ -1009,43 +1022,51 @@ class BufferedWriter(_BufferedIOMixin):
if max_buffer_size is None
else max_buffer_size)
self._write_buf = bytearray()
+ self._write_lock = threading.Lock()
def write(self, b):
if self.closed:
raise ValueError("write to closed file")
if isinstance(b, str):
raise TypeError("can't write str to binary stream")
- # XXX we can implement some more tricks to try and avoid partial writes
- if len(self._write_buf) > self.buffer_size:
- # We're full, so let's pre-flush the buffer
- try:
- self.flush()
- except BlockingIOError as e:
- # We can't accept anything else.
- # XXX Why not just let the exception pass through?
- raise BlockingIOError(e.errno, e.strerror, 0)
- before = len(self._write_buf)
- self._write_buf.extend(b)
- written = len(self._write_buf) - before
- if len(self._write_buf) > self.buffer_size:
- try:
- self.flush()
- except BlockingIOError as e:
- if (len(self._write_buf) > self.max_buffer_size):
- # We've hit max_buffer_size. We have to accept a partial
- # write and cut back our buffer.
- overage = len(self._write_buf) - self.max_buffer_size
- self._write_buf = self._write_buf[:self.max_buffer_size]
- raise BlockingIOError(e.errno, e.strerror, overage)
- return written
+ with self._write_lock:
+ # XXX we can implement some more tricks to try and avoid
+ # partial writes
+ if len(self._write_buf) > self.buffer_size:
+ # We're full, so let's pre-flush the buffer
+ try:
+ self._flush_unlocked()
+ except BlockingIOError as e:
+ # We can't accept anything else.
+ # XXX Why not just let the exception pass through?
+ raise BlockingIOError(e.errno, e.strerror, 0)
+ before = len(self._write_buf)
+ self._write_buf.extend(b)
+ written = len(self._write_buf) - before
+ if len(self._write_buf) > self.buffer_size:
+ try:
+ self._flush_unlocked()
+ except BlockingIOError as e:
+ if len(self._write_buf) > self.max_buffer_size:
+ # We've hit max_buffer_size. We have to accept a
+ # partial write and cut back our buffer.
+ overage = len(self._write_buf) - self.max_buffer_size
+ self._write_buf = self._write_buf[:self.max_buffer_size]
+ raise BlockingIOError(e.errno, e.strerror, overage)
+ return written
def truncate(self, pos=None):
- self.flush()
- if pos is None:
- pos = self.raw.tell()
- return self.raw.truncate(pos)
+ with self._write_lock:
+ self._flush_unlocked()
+ if pos is None:
+ pos = self.raw.tell()
+ return self.raw.truncate(pos)
def flush(self):
+ with self._write_lock:
+ self._flush_unlocked()
+
+ def _flush_unlocked(self):
if self.closed:
raise ValueError("flush of closed file")
written = 0
@@ -1064,8 +1085,9 @@ class BufferedWriter(_BufferedIOMixin):
return self.raw.tell() + len(self._write_buf)
def seek(self, pos, whence=0):
- self.flush()
- return self.raw.seek(pos, whence)
+ with self._write_lock:
+ self._flush_unlocked()
+ return self.raw.seek(pos, whence)
class BufferedRWPair(BufferedIOBase):
@@ -1155,7 +1177,8 @@ class BufferedRandom(BufferedWriter, BufferedReader):
# First do the raw seek, then empty the read buffer, so that
# if the raw seek fails, we don't lose buffered data forever.
pos = self.raw.seek(pos, whence)
- self._reset_read_buf()
+ with self._read_lock:
+ self._reset_read_buf()
return pos
def tell(self):
@@ -1192,8 +1215,9 @@ class BufferedRandom(BufferedWriter, BufferedReader):
def write(self, b):
if self._read_buf:
# Undo readahead
- self.raw.seek(self._read_pos - len(self._read_buf), 1)
- self._reset_read_buf()
+ with self._read_lock:
+ self.raw.seek(self._read_pos - len(self._read_buf), 1)
+ self._reset_read_buf()
return BufferedWriter.write(self, b)
diff --git a/Lib/test/test_cmd_line.py b/Lib/test/test_cmd_line.py
index 9ccf3f5..d63dfa1 100644
--- a/Lib/test/test_cmd_line.py
+++ b/Lib/test/test_cmd_line.py
@@ -3,11 +3,15 @@
# See test_cmd_line_script.py for testing of script execution
import test.support, unittest
+import os
import sys
import subprocess
def _spawn_python(*args):
- cmd_line = [sys.executable, '-E']
+ cmd_line = [sys.executable]
+ # When testing -S, we need PYTHONPATH to work (see test_site_flag())
+ if '-S' not in args:
+ cmd_line.append('-E')
cmd_line.extend(args)
return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
@@ -59,6 +63,16 @@ class CmdLineTest(unittest.TestCase):
self.verify_valid_flag('-Qwarnall')
def test_site_flag(self):
+ if os.name == 'posix':
+ # Workaround bug #586680 by adding the extension dir to PYTHONPATH
+ from distutils.util import get_platform
+ s = "./build/lib.%s-%.3s" % (get_platform(), sys.version)
+ if hasattr(sys, 'gettotalrefcount'):
+ s += '-pydebug'
+ p = os.environ.get('PYTHONPATH', '')
+ if p:
+ p += ':'
+ os.environ['PYTHONPATH'] = p + s
self.verify_valid_flag('-S')
def test_usage(self):
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 320c7c3..bcb37d7 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -4,8 +4,10 @@ import os
import sys
import time
import array
+import threading
+import random
import unittest
-from itertools import chain
+from itertools import chain, cycle
from test import support
import codecs
@@ -390,6 +392,49 @@ class BufferedReaderTest(unittest.TestCase):
# this test. Else, write it.
pass
+ def testThreads(self):
+ try:
+ # Write out many bytes with exactly the same number of 0's,
+ # 1's... 255's. This will help us check that concurrent reading
+ # doesn't duplicate or forget contents.
+ N = 1000
+ l = list(range(256)) * N
+ random.shuffle(l)
+ s = bytes(bytearray(l))
+ with io.open(support.TESTFN, "wb") as f:
+ f.write(s)
+ with io.open(support.TESTFN, "rb", buffering=0) as raw:
+ bufio = io.BufferedReader(raw, 8)
+ errors = []
+ results = []
+ def f():
+ try:
+ # Intra-buffer read then buffer-flushing read
+ for n in cycle([1, 19]):
+ s = bufio.read(n)
+ if not s:
+ break
+ # list.append() is atomic
+ results.append(s)
+ except Exception as e:
+ errors.append(e)
+ raise
+ threads = [threading.Thread(target=f) for x in range(20)]
+ for t in threads:
+ t.start()
+ time.sleep(0.02) # yield
+ for t in threads:
+ t.join()
+ self.assertFalse(errors,
+ "the following exceptions were caught: %r" % errors)
+ s = b''.join(results)
+ for i in range(256):
+ c = bytes(bytearray([i]))
+ self.assertEqual(s.count(c), N)
+ finally:
+ support.unlink(support.TESTFN)
+
+
class BufferedWriterTest(unittest.TestCase):
@@ -446,6 +491,38 @@ class BufferedWriterTest(unittest.TestCase):
self.assertEquals(b"abc", writer._write_stack[0])
+ def testThreads(self):
+ # BufferedWriter should not raise exceptions or crash
+ # when called from multiple threads.
+ try:
+ # We use a real file object because it allows us to
+ # exercise situations where the GIL is released before
+ # writing the buffer to the raw streams. This is in addition
+ # to concurrency issues due to switching threads in the middle
+ # of Python code.
+ with io.open(support.TESTFN, "wb", buffering=0) as raw:
+ bufio = io.BufferedWriter(raw, 8)
+ errors = []
+ def f():
+ try:
+ # Write enough bytes to flush the buffer
+ s = b"a" * 19
+ for i in range(50):
+ bufio.write(s)
+ except Exception as e:
+ errors.append(e)
+ raise
+ threads = [threading.Thread(target=f) for x in range(20)]
+ for t in threads:
+ t.start()
+ time.sleep(0.02) # yield
+ for t in threads:
+ t.join()
+ self.assertFalse(errors,
+ "the following exceptions were caught: %r" % errors)
+ finally:
+ support.unlink(support.TESTFN)
+
class BufferedRWPairTest(unittest.TestCase):
diff --git a/Misc/NEWS b/Misc/NEWS
index dfa1a2f..afe6211 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -30,6 +30,9 @@ Core and Builtins
Library
-------
+- Issue #3476: binary buffered reading through the new "io" library is now
+ thread-safe.
+
- Issue #1342811: Fix leak in Tkinter.Menu.delete. Commands associated to
menu entries were not deleted.