summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/_pyio.py18
-rw-r--r--Lib/test/test_io.py116
2 files changed, 114 insertions, 20 deletions
diff --git a/Lib/_pyio.py b/Lib/_pyio.py
index 37157d5..a467ddd 100644
--- a/Lib/_pyio.py
+++ b/Lib/_pyio.py
@@ -390,7 +390,7 @@ class IOBase(metaclass=abc.ABCMeta):
def seekable(self):
"""Return a bool indicating whether object supports random access.
- If False, seek(), tell() and truncate() will raise UnsupportedOperation.
+ If False, seek(), tell() and truncate() will raise OSError.
This method may need to do a test seek().
"""
return False
@@ -405,7 +405,7 @@ class IOBase(metaclass=abc.ABCMeta):
def readable(self):
"""Return a bool indicating whether object was opened for reading.
- If False, read() will raise UnsupportedOperation.
+ If False, read() will raise OSError.
"""
return False
@@ -419,7 +419,7 @@ class IOBase(metaclass=abc.ABCMeta):
def writable(self):
"""Return a bool indicating whether object was opened for writing.
- If False, write() and truncate() will raise UnsupportedOperation.
+ If False, write() and truncate() will raise OSError.
"""
return False
@@ -787,12 +787,6 @@ class _BufferedIOMixin(BufferedIOBase):
def seekable(self):
return self.raw.seekable()
- def readable(self):
- return self.raw.readable()
-
- def writable(self):
- return self.raw.writable()
-
@property
def raw(self):
return self._raw
@@ -982,6 +976,9 @@ class BufferedReader(_BufferedIOMixin):
self._reset_read_buf()
self._read_lock = Lock()
+ def readable(self):
+ return self.raw.readable()
+
def _reset_read_buf(self):
self._read_buf = b""
self._read_pos = 0
@@ -1170,6 +1167,9 @@ class BufferedWriter(_BufferedIOMixin):
self._write_buf = bytearray()
self._write_lock = Lock()
+ def writable(self):
+ return self.raw.writable()
+
def write(self, b):
if self.closed:
raise ValueError("write to closed file")
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index 86440c5..51c250b 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -203,6 +203,9 @@ class MockUnseekableIO:
def tell(self, *args):
raise self.UnsupportedOperation("not seekable")
+ def truncate(self, *args):
+ raise self.UnsupportedOperation("not seekable")
+
class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
UnsupportedOperation = io.UnsupportedOperation
@@ -361,6 +364,107 @@ class IOTest(unittest.TestCase):
self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
+ def test_optional_abilities(self):
+ # Test for OSError when optional APIs are not supported
+ # The purpose of this test is to try fileno(), reading, writing and
+ # seeking operations with various objects that indicate they do not
+ # support these operations.
+
+ def pipe_reader():
+ [r, w] = os.pipe()
+ os.close(w) # So that read() is harmless
+ return self.FileIO(r, "r")
+
+ def pipe_writer():
+ [r, w] = os.pipe()
+ self.addCleanup(os.close, r)
+ # Guarantee that we can write into the pipe without blocking
+ thread = threading.Thread(target=os.read, args=(r, 100))
+ thread.start()
+ self.addCleanup(thread.join)
+ return self.FileIO(w, "w")
+
+ def buffered_reader():
+ return self.BufferedReader(self.MockUnseekableIO())
+
+ def buffered_writer():
+ return self.BufferedWriter(self.MockUnseekableIO())
+
+ def buffered_random():
+ return self.BufferedRandom(self.BytesIO())
+
+ def buffered_rw_pair():
+ return self.BufferedRWPair(self.MockUnseekableIO(),
+ self.MockUnseekableIO())
+
+ def text_reader():
+ class UnseekableReader(self.MockUnseekableIO):
+ writable = self.BufferedIOBase.writable
+ write = self.BufferedIOBase.write
+ return self.TextIOWrapper(UnseekableReader(), "ascii")
+
+ def text_writer():
+ class UnseekableWriter(self.MockUnseekableIO):
+ readable = self.BufferedIOBase.readable
+ read = self.BufferedIOBase.read
+ return self.TextIOWrapper(UnseekableWriter(), "ascii")
+
+ tests = (
+ (pipe_reader, "fr"), (pipe_writer, "fw"),
+ (buffered_reader, "r"), (buffered_writer, "w"),
+ (buffered_random, "rws"), (buffered_rw_pair, "rw"),
+ (text_reader, "r"), (text_writer, "w"),
+ (self.BytesIO, "rws"), (self.StringIO, "rws"),
+ )
+ for [test, abilities] in tests:
+ if test is pipe_writer and not threading:
+ continue # Skip subtest that uses a background thread
+ with self.subTest(test), test() as obj:
+ readable = "r" in abilities
+ self.assertEqual(obj.readable(), readable)
+ writable = "w" in abilities
+ self.assertEqual(obj.writable(), writable)
+ seekable = "s" in abilities
+ self.assertEqual(obj.seekable(), seekable)
+
+ if isinstance(obj, self.TextIOBase):
+ data = "3"
+ elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
+ data = b"3"
+ else:
+ self.fail("Unknown base class")
+
+ if "f" in abilities:
+ obj.fileno()
+ else:
+ self.assertRaises(OSError, obj.fileno)
+
+ if readable:
+ obj.read(1)
+ obj.read()
+ else:
+ self.assertRaises(OSError, obj.read, 1)
+ self.assertRaises(OSError, obj.read)
+
+ if writable:
+ obj.write(data)
+ else:
+ self.assertRaises(OSError, obj.write, data)
+
+ if seekable:
+ obj.tell()
+ obj.seek(0)
+ else:
+ self.assertRaises(OSError, obj.tell)
+ self.assertRaises(OSError, obj.seek, 0)
+
+ if writable and seekable:
+ obj.truncate()
+ obj.truncate(0)
+ else:
+ self.assertRaises(OSError, obj.truncate)
+ self.assertRaises(OSError, obj.truncate, 0)
+
def test_open_handles_NUL_chars(self):
fn_with_NUL = 'foo\0bar'
self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
@@ -747,12 +851,6 @@ class CommonBufferedTests:
self.assertEqual(42, bufio.fileno())
- @unittest.skip('test having existential crisis')
- def test_no_fileno(self):
- # XXX will we always have fileno() function? If so, kill
- # this test. Else, write it.
- pass
-
def test_invalid_args(self):
rawio = self.MockRawIO()
bufio = self.tp(rawio)
@@ -780,13 +878,9 @@ class CommonBufferedTests:
super().flush()
rawio = self.MockRawIO()
bufio = MyBufferedIO(rawio)
- writable = bufio.writable()
del bufio
support.gc_collect()
- if writable:
- self.assertEqual(record, [1, 2, 3])
- else:
- self.assertEqual(record, [1, 2])
+ self.assertEqual(record, [1, 2, 3])
def test_context_manager(self):
# Test usability as a context manager