summaryrefslogtreecommitdiffstats
path: root/Lib/test/pickletester.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/pickletester.py')
-rw-r--r--Lib/test/pickletester.py242
1 files changed, 226 insertions, 16 deletions
diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py
index 06789cd..4d491b0 100644
--- a/Lib/test/pickletester.py
+++ b/Lib/test/pickletester.py
@@ -2,10 +2,14 @@ import io
import unittest
import pickle
import pickletools
+import sys
import copyreg
from http.cookies import SimpleCookie
-from test.support import TestFailed, TESTFN, run_with_locale
+from test.support import (
+ TestFailed, TESTFN, run_with_locale,
+ _2G, _4G, bigmemtest,
+ )
from pickle import bytes_types
@@ -14,6 +18,8 @@ from pickle import bytes_types
# kind of outer loop.
protocols = range(pickle.HIGHEST_PROTOCOL + 1)
+character_size = 4 if sys.maxunicode > 0xFFFF else 2
+
# Return True if opcode code appears in the pickle, else False.
def opcode_in_pickle(code, pickle):
@@ -30,6 +36,21 @@ def count_opcode(code, pickle):
n += 1
return n
+
+class UnseekableIO(io.BytesIO):
+ def peek(self, *args):
+ raise NotImplementedError
+
+ def seekable(self):
+ return False
+
+ def seek(self, *args):
+ raise io.UnsupportedOperation
+
+ def tell(self):
+ raise io.UnsupportedOperation
+
+
# We can't very well test the extension registry without putting known stuff
# in it, but we have to be careful to restore its original state. Code
# should do this:
@@ -100,6 +121,19 @@ class metaclass(type):
class use_metaclass(object, metaclass=metaclass):
pass
+class pickling_metaclass(type):
+ def __eq__(self, other):
+ return (type(self) == type(other) and
+ self.reduce_args == other.reduce_args)
+
+ def __reduce__(self):
+ return (create_dynamic_class, self.reduce_args)
+
+def create_dynamic_class(name, bases):
+ result = pickling_metaclass(name, bases, dict())
+ result.reduce_args = (name, bases)
+ return result
+
# DATA0 .. DATA2 are the pickles we expect under the various protocols, for
# the object returned by create_data().
@@ -602,9 +636,15 @@ class AbstractPickleTests(unittest.TestCase):
def test_bytes(self):
for proto in protocols:
- for u in b'', b'xyz', b'xyz'*100:
- p = self.dumps(u)
- self.assertEqual(self.loads(p), u)
+ for s in b'', b'xyz', b'xyz'*100:
+ p = self.dumps(s)
+ self.assertEqual(self.loads(p), s)
+ for s in [bytes([i]) for i in range(256)]:
+ p = self.dumps(s)
+ self.assertEqual(self.loads(p), s)
+ for s in [bytes([i, i]) for i in range(256)]:
+ p = self.dumps(s)
+ self.assertEqual(self.loads(p), s)
def test_ints(self):
import sys
@@ -667,6 +707,11 @@ class AbstractPickleTests(unittest.TestCase):
def test_getinitargs(self):
pass
+ def test_pop_empty_stack(self):
+ # Test issue7455
+ s = b'0'
+ self.assertRaises((pickle.UnpicklingError, IndexError), self.loads, s)
+
def test_metaclass(self):
a = use_metaclass()
for proto in protocols:
@@ -674,6 +719,14 @@ class AbstractPickleTests(unittest.TestCase):
b = self.loads(s)
self.assertEqual(a.__class__, b.__class__)
+ def test_dynamic_class(self):
+ a = create_dynamic_class("my_dynamic_class", (object,))
+ copyreg.pickle(pickling_metaclass, pickling_metaclass.__reduce__)
+ for proto in protocols:
+ s = self.dumps(a, proto)
+ b = self.loads(s)
+ self.assertEqual(a, b)
+
def test_structseq(self):
import time
import os
@@ -841,8 +894,8 @@ class AbstractPickleTests(unittest.TestCase):
# Dump using protocol 1 for comparison.
s1 = self.dumps(x, 1)
- self.assertTrue(__name__.encode("utf-8") in s1)
- self.assertTrue(b"MyList" in s1)
+ self.assertIn(__name__.encode("utf-8"), s1)
+ self.assertIn(b"MyList", s1)
self.assertEqual(opcode_in_pickle(opcode, s1), False)
y = self.loads(s1)
@@ -851,8 +904,8 @@ class AbstractPickleTests(unittest.TestCase):
# Dump using protocol 2 for test.
s2 = self.dumps(x, 2)
- self.assertTrue(__name__.encode("utf-8") not in s2)
- self.assertTrue(b"MyList" not in s2)
+ self.assertNotIn(__name__.encode("utf-8"), s2)
+ self.assertNotIn(b"MyList", s2)
self.assertEqual(opcode_in_pickle(opcode, s2), True, repr(s2))
y = self.loads(s2)
@@ -903,7 +956,7 @@ class AbstractPickleTests(unittest.TestCase):
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
- assert isinstance(s, bytes_types)
+ self.assertIsInstance(s, bytes_types)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
@@ -1068,6 +1121,131 @@ class AbstractPickleTests(unittest.TestCase):
dumped = self.dumps(set([3]), 2)
self.assertEqual(dumped, DATA6)
+ def test_large_pickles(self):
+ # Test the correctness of internal buffering routines when handling
+ # large data.
+ for proto in protocols:
+ data = (1, min, b'xy' * (30 * 1024), len)
+ dumped = self.dumps(data, proto)
+ loaded = self.loads(dumped)
+ self.assertEqual(len(loaded), len(data))
+ self.assertEqual(loaded, data)
+
+ def test_empty_bytestring(self):
+ # issue 11286
+ empty = self.loads(b'\x80\x03U\x00q\x00.', encoding='koi8-r')
+ self.assertEqual(empty, '')
+
+ def check_negative_32b_binXXX(self, dumped):
+ if sys.maxsize > 2**32:
+ self.skipTest("test is only meaningful on 32-bit builds")
+ # XXX Pure Python pickle reads lengths as signed and passes
+ # them directly to read() (hence the EOFError)
+ with self.assertRaises((pickle.UnpicklingError, EOFError,
+ ValueError, OverflowError)):
+ self.loads(dumped)
+
+ def test_negative_32b_binbytes(self):
+ # On 32-bit builds, a BINBYTES of 2**31 or more is refused
+ self.check_negative_32b_binXXX(b'\x80\x03B\xff\xff\xff\xffxyzq\x00.')
+
+ def test_negative_32b_binunicode(self):
+ # On 32-bit builds, a BINUNICODE of 2**31 or more is refused
+ self.check_negative_32b_binXXX(b'\x80\x03X\xff\xff\xff\xffxyzq\x00.')
+
+ def test_negative_put(self):
+ # Issue #12847
+ dumped = b'Va\np-1\n.'
+ self.assertRaises(ValueError, self.loads, dumped)
+
+ def test_negative_32b_binput(self):
+ # Issue #12847
+ if sys.maxsize > 2**32:
+ self.skipTest("test is only meaningful on 32-bit builds")
+ dumped = b'\x80\x03X\x01\x00\x00\x00ar\xff\xff\xff\xff.'
+ self.assertRaises(ValueError, self.loads, dumped)
+
+
+class BigmemPickleTests(unittest.TestCase):
+
+ # Binary protocols can serialize longs of up to 2GB-1
+
+ @bigmemtest(size=_2G, memuse=1 + 1, dry_run=False)
+ def test_huge_long_32b(self, size):
+ data = 1 << (8 * size)
+ try:
+ for proto in protocols:
+ if proto < 2:
+ continue
+ with self.assertRaises((ValueError, OverflowError)):
+ self.dumps(data, protocol=proto)
+ finally:
+ data = None
+
+ # Protocol 3 can serialize up to 4GB-1 as a bytes object
+ # (older protocols don't have a dedicated opcode for bytes and are
+ # too inefficient)
+
+ @bigmemtest(size=_2G, memuse=1 + 1, dry_run=False)
+ def test_huge_bytes_32b(self, size):
+ data = b"abcd" * (size // 4)
+ try:
+ for proto in protocols:
+ if proto < 3:
+ continue
+ try:
+ pickled = self.dumps(data, protocol=proto)
+ self.assertTrue(b"abcd" in pickled[:15])
+ self.assertTrue(b"abcd" in pickled[-15:])
+ finally:
+ pickled = None
+ finally:
+ data = None
+
+ @bigmemtest(size=_4G, memuse=1 + 1, dry_run=False)
+ def test_huge_bytes_64b(self, size):
+ data = b"a" * size
+ try:
+ for proto in protocols:
+ if proto < 3:
+ continue
+ with self.assertRaises((ValueError, OverflowError)):
+ self.dumps(data, protocol=proto)
+ finally:
+ data = None
+
+ # All protocols use 1-byte per printable ASCII character; we add another
+ # byte because the encoded form has to be copied into the internal buffer.
+
+ @bigmemtest(size=_2G, memuse=2 + character_size, dry_run=False)
+ def test_huge_str_32b(self, size):
+ data = "abcd" * (size // 4)
+ try:
+ for proto in protocols:
+ try:
+ pickled = self.dumps(data, protocol=proto)
+ self.assertTrue(b"abcd" in pickled[:15])
+ self.assertTrue(b"abcd" in pickled[-15:])
+ finally:
+ pickled = None
+ finally:
+ data = None
+
+ # BINUNICODE (protocols 1, 2 and 3) cannot carry more than
+ # 2**32 - 1 bytes of utf-8 encoded unicode.
+
+ @bigmemtest(size=_4G, memuse=1 + character_size, dry_run=False)
+ def test_huge_str_64b(self, size):
+ data = "a" * size
+ try:
+ for proto in protocols:
+ if proto == 0:
+ continue
+ with self.assertRaises((ValueError, OverflowError)):
+ self.dumps(data, protocol=proto)
+ finally:
+ data = None
+
# Test classes for reduce_ex
@@ -1112,9 +1290,6 @@ class REX_five(object):
class MyInt(int):
sample = 1
-class MyLong(int):
- sample = 1
-
class MyFloat(float):
sample = 1.0
@@ -1136,7 +1311,7 @@ class MyList(list):
class MyDict(dict):
sample = {"a": 1, "b": 2}
-myclasses = [MyInt, MyLong, MyFloat,
+myclasses = [MyInt, MyFloat,
MyComplex,
MyStr, MyUnicode,
MyTuple, MyList, MyDict]
@@ -1213,9 +1388,6 @@ class AbstractPickleModuleTests(unittest.TestCase):
# Test issue4298
s = bytes([0x58, 0, 0, 0, 0x54])
self.assertRaises(EOFError, pickle.loads, s)
- # Test issue7455
- s = b'0'
- self.assertRaises(pickle.UnpicklingError, pickle.loads, s)
class AbstractPersistentPicklerTests(unittest.TestCase):
@@ -1367,6 +1539,44 @@ class AbstractPicklerUnpicklerObjectTests(unittest.TestCase):
f.seek(0)
self.assertEqual(unpickler.load(), data2)
+ def _check_multiple_unpicklings(self, ioclass):
+ for proto in protocols:
+ data1 = [(x, str(x)) for x in range(2000)] + [b"abcde", len]
+ f = ioclass()
+ pickler = self.pickler_class(f, protocol=proto)
+ pickler.dump(data1)
+ pickled = f.getvalue()
+
+ N = 5
+ f = ioclass(pickled * N)
+ unpickler = self.unpickler_class(f)
+ for i in range(N):
+ if f.seekable():
+ pos = f.tell()
+ self.assertEqual(unpickler.load(), data1)
+ if f.seekable():
+ self.assertEqual(f.tell(), pos + len(pickled))
+ self.assertRaises(EOFError, unpickler.load)
+
+ def test_multiple_unpicklings_seekable(self):
+ self._check_multiple_unpicklings(io.BytesIO)
+
+ def test_multiple_unpicklings_unseekable(self):
+ self._check_multiple_unpicklings(UnseekableIO)
+
+ def test_unpickling_buffering_readline(self):
+ # Issue #12687: the unpickler's buffering logic could fail with
+ # text mode opcodes.
+ data = list(range(10))
+ for proto in protocols:
+ for buf_size in range(1, 11):
+ f = io.BufferedRandom(io.BytesIO(), buffer_size=buf_size)
+ pickler = self.pickler_class(f, protocol=proto)
+ pickler.dump(data)
+ f.seek(0)
+ unpickler = self.unpickler_class(f)
+ self.assertEqual(unpickler.load(), data)
+
if __name__ == "__main__":
# Print some stuff that can be used to rewrite DATA{0,1,2}