summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/_pyio.py5
-rw-r--r--Lib/test/test_memoryio.py156
2 files changed, 152 insertions, 9 deletions
diff --git a/Lib/_pyio.py b/Lib/_pyio.py
index 5f18b12..b0da045 100644
--- a/Lib/_pyio.py
+++ b/Lib/_pyio.py
@@ -773,6 +773,11 @@ class BytesIO(BufferedIOBase):
self._buffer = buf
self._pos = 0
+ def __getstate__(self):
+ if self.closed:
+ raise ValueError("__getstate__ on closed file")
+ return self.__dict__.copy()
+
def getvalue(self):
"""Return the bytes value (contents) of the buffer
"""
diff --git a/Lib/test/test_memoryio.py b/Lib/test/test_memoryio.py
index 8972f79..d94d1a6 100644
--- a/Lib/test/test_memoryio.py
+++ b/Lib/test/test_memoryio.py
@@ -12,6 +12,7 @@ from test import test_support as support
import io
import _pyio as pyio
import sys
+import pickle
class MemorySeekTestMixin:
@@ -352,6 +353,42 @@ class MemoryTestMixin:
memio = self.ioclass()
memio.foo = 1
+ def test_pickling(self):
+ buf = self.buftype("1234567890")
+ memio = self.ioclass(buf)
+ memio.foo = 42
+ memio.seek(2)
+
+ class PickleTestMemIO(self.ioclass):
+ def __init__(me, initvalue, foo):
+ self.ioclass.__init__(me, initvalue)
+ me.foo = foo
+ # __getnewargs__ is undefined on purpose. This checks that PEP 307
+ # is used to provide pickling support.
+
+ # Pickle expects the class to be on the module level. Here we use a
+ # little hack to allow the PickleTestMemIO class to derive from
+ # self.ioclass without having to define all combinations explictly on
+ # the module-level.
+ import __main__
+ PickleTestMemIO.__module__ = '__main__'
+ __main__.PickleTestMemIO = PickleTestMemIO
+ submemio = PickleTestMemIO(buf, 80)
+ submemio.seek(2)
+
+ # We only support pickle protocol 2 and onward since we use extended
+ # __reduce__ API of PEP 307 to provide pickling support.
+ for proto in range(2, pickle.HIGHEST_PROTOCOL):
+ for obj in (memio, submemio):
+ obj2 = pickle.loads(pickle.dumps(obj, protocol=proto))
+ self.assertEqual(obj.getvalue(), obj2.getvalue())
+ self.assertEqual(obj.__class__, obj2.__class__)
+ self.assertEqual(obj.foo, obj2.foo)
+ self.assertEqual(obj.tell(), obj2.tell())
+ obj.close()
+ self.assertRaises(ValueError, pickle.dumps, obj, proto)
+ del __main__.PickleTestMemIO
+
class PyBytesIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase):
@@ -431,13 +468,26 @@ class PyBytesIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase):
self.assertEqual(memio.getvalue(), buf)
-class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase):
- buftype = unicode
- ioclass = pyio.StringIO
- UnsupportedOperation = pyio.UnsupportedOperation
- EOF = ""
+class TextIOTestMixin:
- # TextIO-specific behaviour.
+ def test_relative_seek(self):
+ memio = self.ioclass()
+
+ self.assertRaises(IOError, memio.seek, -1, 1)
+ self.assertRaises(IOError, memio.seek, 3, 1)
+ self.assertRaises(IOError, memio.seek, -3, 1)
+ self.assertRaises(IOError, memio.seek, -1, 2)
+ self.assertRaises(IOError, memio.seek, 1, 1)
+ self.assertRaises(IOError, memio.seek, 1, 2)
+
+ def test_textio_properties(self):
+ memio = self.ioclass()
+
+ # These are just dummy values but we nevertheless check them for fear
+ # of unexpected breakage.
+ self.assertTrue(memio.encoding is None)
+ self.assertEqual(memio.errors, "strict")
+ self.assertEqual(memio.line_buffering, False)
def test_newlines_property(self):
memio = self.ioclass(newline=None)
@@ -519,7 +569,6 @@ class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase):
def test_newline_cr(self):
# newline="\r"
memio = self.ioclass("a\nb\r\nc\rd", newline="\r")
- memio.seek(0)
self.assertEqual(memio.read(), "a\rb\r\rc\rd")
memio.seek(0)
self.assertEqual(list(memio), ["a\r", "b\r", "\r", "c\r", "d"])
@@ -527,7 +576,6 @@ class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase):
def test_newline_crlf(self):
# newline="\r\n"
memio = self.ioclass("a\nb\r\nc\rd", newline="\r\n")
- memio.seek(0)
self.assertEqual(memio.read(), "a\r\nb\r\r\nc\rd")
memio.seek(0)
self.assertEqual(list(memio), ["a\r\n", "b\r\r\n", "c\rd"])
@@ -538,6 +586,28 @@ class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase):
self.assertEqual(memio.read(5), "a\nb\n")
+class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin,
+ TextIOTestMixin, unittest.TestCase):
+ buftype = unicode
+ ioclass = pyio.StringIO
+ UnsupportedOperation = pyio.UnsupportedOperation
+ EOF = ""
+
+
+class PyStringIOPickleTest(TextIOTestMixin, unittest.TestCase):
+ """Test if pickle restores properly the internal state of StringIO.
+ """
+ buftype = unicode
+ UnsupportedOperation = pyio.UnsupportedOperation
+ EOF = ""
+
+ class ioclass(pyio.StringIO):
+ def __new__(cls, *args, **kwargs):
+ return pickle.loads(pickle.dumps(pyio.StringIO(*args, **kwargs)))
+ def __init__(self, *args, **kwargs):
+ pass
+
+
class CBytesIOTest(PyBytesIOTest):
ioclass = io.BytesIO
UnsupportedOperation = io.UnsupportedOperation
@@ -547,6 +617,33 @@ class CBytesIOTest(PyBytesIOTest):
)(PyBytesIOTest.test_bytes_array)
+ def test_getstate(self):
+ memio = self.ioclass()
+ state = memio.__getstate__()
+ self.assertEqual(len(state), 3)
+ bytearray(state[0]) # Check if state[0] supports the buffer interface.
+ self.assert_(isinstance(state[1], int))
+ self.assert_(isinstance(state[2], dict) or state[2] is None)
+ memio.close()
+ self.assertRaises(ValueError, memio.__getstate__)
+
+ def test_setstate(self):
+ # This checks whether __setstate__ does proper input validation.
+ memio = self.ioclass()
+ memio.__setstate__((b"no error", 0, None))
+ memio.__setstate__((bytearray(b"no error"), 0, None))
+ memio.__setstate__((b"no error", 0, {'spam': 3}))
+ self.assertRaises(ValueError, memio.__setstate__, (b"", -1, None))
+ self.assertRaises(TypeError, memio.__setstate__, ("unicode", 0, None))
+ self.assertRaises(TypeError, memio.__setstate__, (b"", 0.0, None))
+ self.assertRaises(TypeError, memio.__setstate__, (b"", 0, 0))
+ self.assertRaises(TypeError, memio.__setstate__, (b"len-test", 0))
+ self.assertRaises(TypeError, memio.__setstate__)
+ self.assertRaises(TypeError, memio.__setstate__, 0)
+ memio.close()
+ self.assertRaises(ValueError, memio.__setstate__, (b"closed", 0, None))
+
+
class CStringIOTest(PyStringIOTest):
ioclass = io.StringIO
UnsupportedOperation = io.UnsupportedOperation
@@ -565,9 +662,50 @@ class CStringIOTest(PyStringIOTest):
self.assertEqual(memio.tell(), len(buf) * 2)
self.assertEqual(memio.getvalue(), buf + buf)
+ def test_getstate(self):
+ memio = self.ioclass()
+ state = memio.__getstate__()
+ self.assertEqual(len(state), 4)
+ self.assert_(isinstance(state[0], unicode))
+ self.assert_(isinstance(state[1], str))
+ self.assert_(isinstance(state[2], int))
+ self.assert_(isinstance(state[3], dict) or state[3] is None)
+ memio.close()
+ self.assertRaises(ValueError, memio.__getstate__)
+
+ def test_setstate(self):
+ # This checks whether __setstate__ does proper input validation.
+ memio = self.ioclass()
+ memio.__setstate__(("no error", "\n", 0, None))
+ memio.__setstate__(("no error", "", 0, {'spam': 3}))
+ self.assertRaises(ValueError, memio.__setstate__, ("", "f", 0, None))
+ self.assertRaises(ValueError, memio.__setstate__, ("", "", -1, None))
+ self.assertRaises(TypeError, memio.__setstate__, (b"", "", 0, None))
+ # trunk is more tolerant than py3k on the type of the newline param
+ #self.assertRaises(TypeError, memio.__setstate__, ("", b"", 0, None))
+ self.assertRaises(TypeError, memio.__setstate__, ("", "", 0.0, None))
+ self.assertRaises(TypeError, memio.__setstate__, ("", "", 0, 0))
+ self.assertRaises(TypeError, memio.__setstate__, ("len-test", 0))
+ self.assertRaises(TypeError, memio.__setstate__)
+ self.assertRaises(TypeError, memio.__setstate__, 0)
+ memio.close()
+ self.assertRaises(ValueError, memio.__setstate__, ("closed", "", 0, None))
+
+
+class CStringIOPickleTest(PyStringIOPickleTest):
+ UnsupportedOperation = io.UnsupportedOperation
+
+ class ioclass(io.StringIO):
+ def __new__(cls, *args, **kwargs):
+ return pickle.loads(pickle.dumps(io.StringIO(*args, **kwargs),
+ protocol=2))
+ def __init__(self, *args, **kwargs):
+ pass
+
def test_main():
- tests = [PyBytesIOTest, PyStringIOTest, CBytesIOTest, CStringIOTest]
+ tests = [PyBytesIOTest, PyStringIOTest, CBytesIOTest, CStringIOTest,
+ PyStringIOPickleTest, CStringIOPickleTest]
support.run_unittest(*tests)
if __name__ == '__main__':