diff options
author | Alexandre Vassalotti <alexandre@peadrop.com> | 2009-07-22 03:24:36 (GMT) |
---|---|---|
committer | Alexandre Vassalotti <alexandre@peadrop.com> | 2009-07-22 03:24:36 (GMT) |
commit | cf76e1ac927cdc14ee321a363e161be24bfff059 (patch) | |
tree | d99bc3cf44f52bed75c23d18275c74e3fe38177e /Lib | |
parent | d2bb18b28165ccc6e4678e8046abe6ea7c9677b3 (diff) | |
download | cpython-cf76e1ac927cdc14ee321a363e161be24bfff059.zip cpython-cf76e1ac927cdc14ee321a363e161be24bfff059.tar.gz cpython-cf76e1ac927cdc14ee321a363e161be24bfff059.tar.bz2 |
Issue #6218: Make io.BytesIO and io.StringIO picklable.
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/_pyio.py | 5 | ||||
-rw-r--r-- | Lib/test/test_memoryio.py | 154 |
2 files changed, 150 insertions, 9 deletions
diff --git a/Lib/_pyio.py b/Lib/_pyio.py index 5458f99..04aed8d 100644 --- a/Lib/_pyio.py +++ b/Lib/_pyio.py @@ -765,6 +765,11 @@ class BytesIO(BufferedIOBase): self._buffer = buf self._pos = 0 + def __getstate__(self): + if self.closed: + raise ValueError("__getstate__ on closed file") + return self.__dict__.copy() + def getvalue(self): """Return the bytes value (contents) of the buffer """ diff --git a/Lib/test/test_memoryio.py b/Lib/test/test_memoryio.py index 670dab9..7dd1658 100644 --- a/Lib/test/test_memoryio.py +++ b/Lib/test/test_memoryio.py @@ -9,6 +9,7 @@ from test import support import io import _pyio as pyio import sys +import pickle class MemorySeekTestMixin: @@ -346,6 +347,42 @@ class MemoryTestMixin: memio = self.ioclass() memio.foo = 1 + def test_pickling(self): + buf = self.buftype("1234567890") + memio = self.ioclass(buf) + memio.foo = 42 + memio.seek(2) + + class PickleTestMemIO(self.ioclass): + def __init__(me, initvalue, foo): + self.ioclass.__init__(me, initvalue) + me.foo = foo + # __getnewargs__ is undefined on purpose. This checks that PEP 307 + # is used to provide pickling support. + + # Pickle expects the class to be on the module level. Here we use a + # little hack to allow the PickleTestMemIO class to derive from + # self.ioclass without having to define all combinations explictly on + # the module-level. + import __main__ + PickleTestMemIO.__module__ = '__main__' + __main__.PickleTestMemIO = PickleTestMemIO + submemio = PickleTestMemIO(buf, 80) + submemio.seek(2) + + # We only support pickle protocol 2 and onward since we use extended + # __reduce__ API of PEP 307 to provide pickling support. + for proto in range(2, pickle.HIGHEST_PROTOCOL): + for obj in (memio, submemio): + obj2 = pickle.loads(pickle.dumps(obj, protocol=proto)) + self.assertEqual(obj.getvalue(), obj2.getvalue()) + self.assertEqual(obj.__class__, obj2.__class__) + self.assertEqual(obj.foo, obj2.foo) + self.assertEqual(obj.tell(), obj2.tell()) + obj.close() + self.assertRaises(ValueError, pickle.dumps, obj, proto) + del __main__.PickleTestMemIO + class PyBytesIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase): @@ -425,13 +462,26 @@ class PyBytesIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase): self.assertEqual(memio.getvalue(), buf) -class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase): - buftype = str - ioclass = pyio.StringIO - UnsupportedOperation = pyio.UnsupportedOperation - EOF = "" +class TextIOTestMixin: - # TextIO-specific behaviour. + def test_relative_seek(self): + memio = self.ioclass() + + self.assertRaises(IOError, memio.seek, -1, 1) + self.assertRaises(IOError, memio.seek, 3, 1) + self.assertRaises(IOError, memio.seek, -3, 1) + self.assertRaises(IOError, memio.seek, -1, 2) + self.assertRaises(IOError, memio.seek, 1, 1) + self.assertRaises(IOError, memio.seek, 1, 2) + + def test_textio_properties(self): + memio = self.ioclass() + + # These are just dummy values but we nevertheless check them for fear + # of unexpected breakage. + self.assertTrue(memio.encoding is None) + self.assertEqual(memio.errors, "strict") + self.assertEqual(memio.line_buffering, False) def test_newlines_property(self): memio = self.ioclass(newline=None) @@ -513,7 +563,6 @@ class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase): def test_newline_cr(self): # newline="\r" memio = self.ioclass("a\nb\r\nc\rd", newline="\r") - memio.seek(0) self.assertEqual(memio.read(), "a\rb\r\rc\rd") memio.seek(0) self.assertEqual(list(memio), ["a\r", "b\r", "\r", "c\r", "d"]) @@ -521,7 +570,6 @@ class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase): def test_newline_crlf(self): # newline="\r\n" memio = self.ioclass("a\nb\r\nc\rd", newline="\r\n") - memio.seek(0) self.assertEqual(memio.read(), "a\r\nb\r\r\nc\rd") memio.seek(0) self.assertEqual(list(memio), ["a\r\n", "b\r\r\n", "c\rd"]) @@ -539,10 +587,59 @@ class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, unittest.TestCase): self.ioclass(newline=newline) +class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin, + TextIOTestMixin, unittest.TestCase): + buftype = str + ioclass = pyio.StringIO + UnsupportedOperation = pyio.UnsupportedOperation + EOF = "" + + +class PyStringIOPickleTest(TextIOTestMixin, unittest.TestCase): + """Test if pickle restores properly the internal state of StringIO. + """ + buftype = str + UnsupportedOperation = pyio.UnsupportedOperation + EOF = "" + + class ioclass(pyio.StringIO): + def __new__(cls, *args, **kwargs): + return pickle.loads(pickle.dumps(pyio.StringIO(*args, **kwargs))) + def __init__(self, *args, **kwargs): + pass + + class CBytesIOTest(PyBytesIOTest): ioclass = io.BytesIO UnsupportedOperation = io.UnsupportedOperation + def test_getstate(self): + memio = self.ioclass() + state = memio.__getstate__() + self.assertEqual(len(state), 3) + bytearray(state[0]) # Check if state[0] supports the buffer interface. + self.assert_(isinstance(state[1], int)) + self.assert_(isinstance(state[2], dict) or state[2] is None) + memio.close() + self.assertRaises(ValueError, memio.__getstate__) + + def test_setstate(self): + # This checks whether __setstate__ does proper input validation. + memio = self.ioclass() + memio.__setstate__((b"no error", 0, None)) + memio.__setstate__((bytearray(b"no error"), 0, None)) + memio.__setstate__((b"no error", 0, {'spam': 3})) + self.assertRaises(ValueError, memio.__setstate__, (b"", -1, None)) + self.assertRaises(TypeError, memio.__setstate__, ("unicode", 0, None)) + self.assertRaises(TypeError, memio.__setstate__, (b"", 0.0, None)) + self.assertRaises(TypeError, memio.__setstate__, (b"", 0, 0)) + self.assertRaises(TypeError, memio.__setstate__, (b"len-test", 0)) + self.assertRaises(TypeError, memio.__setstate__) + self.assertRaises(TypeError, memio.__setstate__, 0) + memio.close() + self.assertRaises(ValueError, memio.__setstate__, (b"closed", 0, None)) + + class CStringIOTest(PyStringIOTest): ioclass = io.StringIO UnsupportedOperation = io.UnsupportedOperation @@ -561,9 +658,48 @@ class CStringIOTest(PyStringIOTest): self.assertEqual(memio.tell(), len(buf) * 2) self.assertEqual(memio.getvalue(), buf + buf) + def test_getstate(self): + memio = self.ioclass() + state = memio.__getstate__() + self.assertEqual(len(state), 4) + self.assert_(isinstance(state[0], str)) + self.assert_(isinstance(state[1], str)) + self.assert_(isinstance(state[2], int)) + self.assert_(isinstance(state[3], dict) or state[3] is None) + memio.close() + self.assertRaises(ValueError, memio.__getstate__) + + def test_setstate(self): + # This checks whether __setstate__ does proper input validation. + memio = self.ioclass() + memio.__setstate__(("no error", "\n", 0, None)) + memio.__setstate__(("no error", "", 0, {'spam': 3})) + self.assertRaises(ValueError, memio.__setstate__, ("", "f", 0, None)) + self.assertRaises(ValueError, memio.__setstate__, ("", "", -1, None)) + self.assertRaises(TypeError, memio.__setstate__, (b"", "", 0, None)) + self.assertRaises(TypeError, memio.__setstate__, ("", b"", 0, None)) + self.assertRaises(TypeError, memio.__setstate__, ("", "", 0.0, None)) + self.assertRaises(TypeError, memio.__setstate__, ("", "", 0, 0)) + self.assertRaises(TypeError, memio.__setstate__, ("len-test", 0)) + self.assertRaises(TypeError, memio.__setstate__) + self.assertRaises(TypeError, memio.__setstate__, 0) + memio.close() + self.assertRaises(ValueError, memio.__setstate__, ("closed", "", 0, None)) + + +class CStringIOPickleTest(PyStringIOPickleTest): + UnsupportedOperation = io.UnsupportedOperation + + class ioclass(io.StringIO): + def __new__(cls, *args, **kwargs): + return pickle.loads(pickle.dumps(io.StringIO(*args, **kwargs))) + def __init__(self, *args, **kwargs): + pass + def test_main(): - tests = [PyBytesIOTest, PyStringIOTest, CBytesIOTest, CStringIOTest] + tests = [PyBytesIOTest, PyStringIOTest, CBytesIOTest, CStringIOTest, + PyStringIOPickleTest, CStringIOPickleTest] support.run_unittest(*tests) if __name__ == '__main__': |