summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/pickletester.py423
-rw-r--r--Lib/test/test_inspect.py11
-rw-r--r--Lib/test/test_pickle.py12
-rw-r--r--Lib/test/test_picklebuffer.py154
-rw-r--r--Lib/test/test_pickletools.py17
-rw-r--r--Lib/test/test_pyclbr.py2
6 files changed, 586 insertions, 33 deletions
diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py
index 4f8c294..f6fda9e 100644
--- a/Lib/test/pickletester.py
+++ b/Lib/test/pickletester.py
@@ -16,6 +16,16 @@ import weakref
from textwrap import dedent
from http.cookies import SimpleCookie
+try:
+ import _testbuffer
+except ImportError:
+ _testbuffer = None
+
+try:
+ import numpy as np
+except ImportError:
+ np = None
+
from test import support
from test.support import (
TestFailed, TESTFN, run_with_locale, no_tracing,
@@ -162,6 +172,139 @@ def create_dynamic_class(name, bases):
result.reduce_args = (name, bases)
return result
+
+class ZeroCopyBytes(bytes):
+ readonly = True
+ c_contiguous = True
+ f_contiguous = True
+ zero_copy_reconstruct = True
+
+ def __reduce_ex__(self, protocol):
+ if protocol >= 5:
+ return type(self)._reconstruct, (pickle.PickleBuffer(self),), None
+ else:
+ return type(self)._reconstruct, (bytes(self),)
+
+ def __repr__(self):
+ return "{}({!r})".format(self.__class__.__name__, bytes(self))
+
+ __str__ = __repr__
+
+ @classmethod
+ def _reconstruct(cls, obj):
+ with memoryview(obj) as m:
+ obj = m.obj
+ if type(obj) is cls:
+ # Zero-copy
+ return obj
+ else:
+ return cls(obj)
+
+
+class ZeroCopyBytearray(bytearray):
+ readonly = False
+ c_contiguous = True
+ f_contiguous = True
+ zero_copy_reconstruct = True
+
+ def __reduce_ex__(self, protocol):
+ if protocol >= 5:
+ return type(self)._reconstruct, (pickle.PickleBuffer(self),), None
+ else:
+ return type(self)._reconstruct, (bytes(self),)
+
+ def __repr__(self):
+ return "{}({!r})".format(self.__class__.__name__, bytes(self))
+
+ __str__ = __repr__
+
+ @classmethod
+ def _reconstruct(cls, obj):
+ with memoryview(obj) as m:
+ obj = m.obj
+ if type(obj) is cls:
+ # Zero-copy
+ return obj
+ else:
+ return cls(obj)
+
+
+if _testbuffer is not None:
+
+ class PicklableNDArray:
+ # A not-really-zero-copy picklable ndarray, as the ndarray()
+ # constructor doesn't allow for it
+
+ zero_copy_reconstruct = False
+
+ def __init__(self, *args, **kwargs):
+ self.array = _testbuffer.ndarray(*args, **kwargs)
+
+ def __getitem__(self, idx):
+ cls = type(self)
+ new = cls.__new__(cls)
+ new.array = self.array[idx]
+ return new
+
+ @property
+ def readonly(self):
+ return self.array.readonly
+
+ @property
+ def c_contiguous(self):
+ return self.array.c_contiguous
+
+ @property
+ def f_contiguous(self):
+ return self.array.f_contiguous
+
+ def __eq__(self, other):
+ if not isinstance(other, PicklableNDArray):
+ return NotImplemented
+ return (other.array.format == self.array.format and
+ other.array.shape == self.array.shape and
+ other.array.strides == self.array.strides and
+ other.array.readonly == self.array.readonly and
+ other.array.tobytes() == self.array.tobytes())
+
+ def __ne__(self, other):
+ if not isinstance(other, PicklableNDArray):
+ return NotImplemented
+ return not (self == other)
+
+ def __repr__(self):
+ return (f"{type(self)}(shape={self.array.shape},"
+ f"strides={self.array.strides}, "
+ f"bytes={self.array.tobytes()})")
+
+ def __reduce_ex__(self, protocol):
+ if not self.array.contiguous:
+ raise NotImplementedError("Reconstructing a non-contiguous "
+ "ndarray does not seem possible")
+ ndarray_kwargs = {"shape": self.array.shape,
+ "strides": self.array.strides,
+ "format": self.array.format,
+ "flags": (0 if self.readonly
+ else _testbuffer.ND_WRITABLE)}
+ pb = pickle.PickleBuffer(self.array)
+ if protocol >= 5:
+ return (type(self)._reconstruct,
+ (pb, ndarray_kwargs))
+ else:
+ # Need to serialize the bytes in physical order
+ with pb.raw() as m:
+ return (type(self)._reconstruct,
+ (m.tobytes(), ndarray_kwargs))
+
+ @classmethod
+ def _reconstruct(cls, obj, kwargs):
+ with memoryview(obj) as m:
+ # For some reason, ndarray() wants a list of integers...
+ # XXX This only works if format == 'B'
+ items = list(m.tobytes())
+ return cls(items, **kwargs)
+
+
# DATA0 .. DATA4 are the pickles we expect under the various protocols, for
# the object returned by create_data().
@@ -888,6 +1031,10 @@ class AbstractUnpickleTests(unittest.TestCase):
dumped = b'\x80\x04\x8d\4\0\0\0\0\0\0\0\xe2\x82\xac\x00.'
self.assertEqual(self.loads(dumped), '\u20ac\x00')
+ def test_bytearray8(self):
+ dumped = b'\x80\x05\x96\x03\x00\x00\x00\x00\x00\x00\x00xxx.'
+ self.assertEqual(self.loads(dumped), bytearray(b'xxx'))
+
@requires_32b
def test_large_32b_binbytes8(self):
dumped = b'\x80\x04\x8e\4\0\0\0\1\0\0\0\xe2\x82\xac\x00.'
@@ -895,6 +1042,12 @@ class AbstractUnpickleTests(unittest.TestCase):
dumped)
@requires_32b
+ def test_large_32b_bytearray8(self):
+ dumped = b'\x80\x05\x96\4\0\0\0\1\0\0\0\xe2\x82\xac\x00.'
+ self.check_unpickling_error((pickle.UnpicklingError, OverflowError),
+ dumped)
+
+ @requires_32b
def test_large_32b_binunicode8(self):
dumped = b'\x80\x04\x8d\4\0\0\0\1\0\0\0\xe2\x82\xac\x00.'
self.check_unpickling_error((pickle.UnpicklingError, OverflowError),
@@ -1171,6 +1324,10 @@ class AbstractUnpickleTests(unittest.TestCase):
b'\x8e\x03\x00\x00\x00\x00\x00\x00',
b'\x8e\x03\x00\x00\x00\x00\x00\x00\x00',
b'\x8e\x03\x00\x00\x00\x00\x00\x00\x00ab',
+ b'\x96', # BYTEARRAY8
+ b'\x96\x03\x00\x00\x00\x00\x00\x00',
+ b'\x96\x03\x00\x00\x00\x00\x00\x00\x00',
+ b'\x96\x03\x00\x00\x00\x00\x00\x00\x00ab',
b'\x95', # FRAME
b'\x95\x02\x00\x00\x00\x00\x00\x00',
b'\x95\x02\x00\x00\x00\x00\x00\x00\x00',
@@ -1482,6 +1639,25 @@ class AbstractPickleTests(unittest.TestCase):
p = self.dumps(s, proto)
self.assert_is_copy(s, self.loads(p))
+ def test_bytearray(self):
+ for proto in protocols:
+ for s in b'', b'xyz', b'xyz'*100:
+ b = bytearray(s)
+ p = self.dumps(b, proto)
+ bb = self.loads(p)
+ self.assertIsNot(bb, b)
+ self.assert_is_copy(b, bb)
+ if proto <= 3:
+ # bytearray is serialized using a global reference
+ self.assertIn(b'bytearray', p)
+ self.assertTrue(opcode_in_pickle(pickle.GLOBAL, p))
+ elif proto == 4:
+ self.assertIn(b'bytearray', p)
+ self.assertTrue(opcode_in_pickle(pickle.STACK_GLOBAL, p))
+ elif proto == 5:
+ self.assertNotIn(b'bytearray', p)
+ self.assertTrue(opcode_in_pickle(pickle.BYTEARRAY8, p))
+
def test_ints(self):
for proto in protocols:
n = sys.maxsize
@@ -2114,7 +2290,8 @@ class AbstractPickleTests(unittest.TestCase):
the following consistency check.
"""
frame_end = frameless_start = None
- frameless_opcodes = {'BINBYTES', 'BINUNICODE', 'BINBYTES8', 'BINUNICODE8'}
+ frameless_opcodes = {'BINBYTES', 'BINUNICODE', 'BINBYTES8',
+ 'BINUNICODE8', 'BYTEARRAY8'}
for op, arg, pos in pickletools.genops(pickled):
if frame_end is not None:
self.assertLessEqual(pos, frame_end)
@@ -2225,19 +2402,20 @@ class AbstractPickleTests(unittest.TestCase):
num_frames = 20
# Large byte objects (dict values) intermittent with small objects
# (dict keys)
- obj = {i: bytes([i]) * frame_size for i in range(num_frames)}
+ for bytes_type in (bytes, bytearray):
+ obj = {i: bytes_type([i]) * frame_size for i in range(num_frames)}
- for proto in range(4, pickle.HIGHEST_PROTOCOL + 1):
- pickled = self.dumps(obj, proto)
+ for proto in range(4, pickle.HIGHEST_PROTOCOL + 1):
+ pickled = self.dumps(obj, proto)
- frameless_pickle = remove_frames(pickled)
- self.assertEqual(count_opcode(pickle.FRAME, frameless_pickle), 0)
- self.assertEqual(obj, self.loads(frameless_pickle))
+ frameless_pickle = remove_frames(pickled)
+ self.assertEqual(count_opcode(pickle.FRAME, frameless_pickle), 0)
+ self.assertEqual(obj, self.loads(frameless_pickle))
- some_frames_pickle = remove_frames(pickled, lambda i: i % 2)
- self.assertLess(count_opcode(pickle.FRAME, some_frames_pickle),
- count_opcode(pickle.FRAME, pickled))
- self.assertEqual(obj, self.loads(some_frames_pickle))
+ some_frames_pickle = remove_frames(pickled, lambda i: i % 2)
+ self.assertLess(count_opcode(pickle.FRAME, some_frames_pickle),
+ count_opcode(pickle.FRAME, pickled))
+ self.assertEqual(obj, self.loads(some_frames_pickle))
def test_framed_write_sizes_with_delayed_writer(self):
class ChunkAccumulator:
@@ -2452,6 +2630,186 @@ class AbstractPickleTests(unittest.TestCase):
with self.assertRaises((AttributeError, pickle.PicklingError)):
pickletools.dis(self.dumps(f, proto))
+ #
+ # PEP 574 tests below
+ #
+
+ def buffer_like_objects(self):
+ # Yield buffer-like objects with the bytestring "abcdef" in them
+ bytestring = b"abcdefgh"
+ yield ZeroCopyBytes(bytestring)
+ yield ZeroCopyBytearray(bytestring)
+ if _testbuffer is not None:
+ items = list(bytestring)
+ value = int.from_bytes(bytestring, byteorder='little')
+ for flags in (0, _testbuffer.ND_WRITABLE):
+ # 1-D, contiguous
+ yield PicklableNDArray(items, format='B', shape=(8,),
+ flags=flags)
+ # 2-D, C-contiguous
+ yield PicklableNDArray(items, format='B', shape=(4, 2),
+ strides=(2, 1), flags=flags)
+ # 2-D, Fortran-contiguous
+ yield PicklableNDArray(items, format='B',
+ shape=(4, 2), strides=(1, 4),
+ flags=flags)
+
+ def test_in_band_buffers(self):
+ # Test in-band buffers (PEP 574)
+ for obj in self.buffer_like_objects():
+ for proto in range(0, pickle.HIGHEST_PROTOCOL + 1):
+ data = self.dumps(obj, proto)
+ if obj.c_contiguous and proto >= 5:
+ # The raw memory bytes are serialized in physical order
+ self.assertIn(b"abcdefgh", data)
+ self.assertEqual(count_opcode(pickle.NEXT_BUFFER, data), 0)
+ if proto >= 5:
+ self.assertEqual(count_opcode(pickle.SHORT_BINBYTES, data),
+ 1 if obj.readonly else 0)
+ self.assertEqual(count_opcode(pickle.BYTEARRAY8, data),
+ 0 if obj.readonly else 1)
+ # Return a true value from buffer_callback should have
+ # the same effect
+ def buffer_callback(obj):
+ return True
+ data2 = self.dumps(obj, proto,
+ buffer_callback=buffer_callback)
+ self.assertEqual(data2, data)
+
+ new = self.loads(data)
+ # It's a copy
+ self.assertIsNot(new, obj)
+ self.assertIs(type(new), type(obj))
+ self.assertEqual(new, obj)
+
+ # XXX Unfortunately cannot test non-contiguous array
+ # (see comment in PicklableNDArray.__reduce_ex__)
+
+ def test_oob_buffers(self):
+ # Test out-of-band buffers (PEP 574)
+ for obj in self.buffer_like_objects():
+ for proto in range(0, 5):
+ # Need protocol >= 5 for buffer_callback
+ with self.assertRaises(ValueError):
+ self.dumps(obj, proto,
+ buffer_callback=[].append)
+ for proto in range(5, pickle.HIGHEST_PROTOCOL + 1):
+ buffers = []
+ buffer_callback = lambda pb: buffers.append(pb.raw())
+ data = self.dumps(obj, proto,
+ buffer_callback=buffer_callback)
+ self.assertNotIn(b"abcdefgh", data)
+ self.assertEqual(count_opcode(pickle.SHORT_BINBYTES, data), 0)
+ self.assertEqual(count_opcode(pickle.BYTEARRAY8, data), 0)
+ self.assertEqual(count_opcode(pickle.NEXT_BUFFER, data), 1)
+ self.assertEqual(count_opcode(pickle.READONLY_BUFFER, data),
+ 1 if obj.readonly else 0)
+
+ if obj.c_contiguous:
+ self.assertEqual(bytes(buffers[0]), b"abcdefgh")
+ # Need buffers argument to unpickle properly
+ with self.assertRaises(pickle.UnpicklingError):
+ self.loads(data)
+
+ new = self.loads(data, buffers=buffers)
+ if obj.zero_copy_reconstruct:
+ # Zero-copy achieved
+ self.assertIs(new, obj)
+ else:
+ self.assertIs(type(new), type(obj))
+ self.assertEqual(new, obj)
+ # Non-sequence buffers accepted too
+ new = self.loads(data, buffers=iter(buffers))
+ if obj.zero_copy_reconstruct:
+ # Zero-copy achieved
+ self.assertIs(new, obj)
+ else:
+ self.assertIs(type(new), type(obj))
+ self.assertEqual(new, obj)
+
+ def test_oob_buffers_writable_to_readonly(self):
+ # Test reconstructing readonly object from writable buffer
+ obj = ZeroCopyBytes(b"foobar")
+ for proto in range(5, pickle.HIGHEST_PROTOCOL + 1):
+ buffers = []
+ buffer_callback = buffers.append
+ data = self.dumps(obj, proto, buffer_callback=buffer_callback)
+
+ buffers = map(bytearray, buffers)
+ new = self.loads(data, buffers=buffers)
+ self.assertIs(type(new), type(obj))
+ self.assertEqual(new, obj)
+
+ def test_picklebuffer_error(self):
+ # PickleBuffer forbidden with protocol < 5
+ pb = pickle.PickleBuffer(b"foobar")
+ for proto in range(0, 5):
+ with self.assertRaises(pickle.PickleError):
+ self.dumps(pb, proto)
+
+ def test_buffer_callback_error(self):
+ def buffer_callback(buffers):
+ 1/0
+ pb = pickle.PickleBuffer(b"foobar")
+ with self.assertRaises(ZeroDivisionError):
+ self.dumps(pb, 5, buffer_callback=buffer_callback)
+
+ def test_buffers_error(self):
+ pb = pickle.PickleBuffer(b"foobar")
+ for proto in range(5, pickle.HIGHEST_PROTOCOL + 1):
+ data = self.dumps(pb, proto, buffer_callback=[].append)
+ # Non iterable buffers
+ with self.assertRaises(TypeError):
+ self.loads(data, buffers=object())
+ # Buffer iterable exhausts too early
+ with self.assertRaises(pickle.UnpicklingError):
+ self.loads(data, buffers=[])
+
+ @unittest.skipIf(np is None, "Test needs Numpy")
+ def test_buffers_numpy(self):
+ def check_no_copy(x, y):
+ np.testing.assert_equal(x, y)
+ self.assertEqual(x.ctypes.data, y.ctypes.data)
+
+ def check_copy(x, y):
+ np.testing.assert_equal(x, y)
+ self.assertNotEqual(x.ctypes.data, y.ctypes.data)
+
+ def check_array(arr):
+ # In-band
+ for proto in range(0, pickle.HIGHEST_PROTOCOL + 1):
+ data = self.dumps(arr, proto)
+ new = self.loads(data)
+ check_copy(arr, new)
+ for proto in range(5, pickle.HIGHEST_PROTOCOL + 1):
+ buffer_callback = lambda _: True
+ data = self.dumps(arr, proto, buffer_callback=buffer_callback)
+ new = self.loads(data)
+ check_copy(arr, new)
+ # Out-of-band
+ for proto in range(5, pickle.HIGHEST_PROTOCOL + 1):
+ buffers = []
+ buffer_callback = buffers.append
+ data = self.dumps(arr, proto, buffer_callback=buffer_callback)
+ new = self.loads(data, buffers=buffers)
+ if arr.flags.c_contiguous or arr.flags.f_contiguous:
+ check_no_copy(arr, new)
+ else:
+ check_copy(arr, new)
+
+ # 1-D
+ arr = np.arange(6)
+ check_array(arr)
+ # 1-D, non-contiguous
+ check_array(arr[::2])
+ # 2-D, C-contiguous
+ arr = np.arange(12).reshape((3, 4))
+ check_array(arr)
+ # 2-D, F-contiguous
+ check_array(arr.T)
+ # 2-D, non-contiguous
+ check_array(arr[::2])
+
class BigmemPickleTests(unittest.TestCase):
@@ -2736,7 +3094,7 @@ class AbstractPickleModuleTests(unittest.TestCase):
def test_highest_protocol(self):
# Of course this needs to be changed when HIGHEST_PROTOCOL changes.
- self.assertEqual(pickle.HIGHEST_PROTOCOL, 4)
+ self.assertEqual(pickle.HIGHEST_PROTOCOL, 5)
def test_callapi(self):
f = io.BytesIO()
@@ -2760,6 +3118,47 @@ class AbstractPickleModuleTests(unittest.TestCase):
self.assertRaises(pickle.PicklingError, BadPickler().dump, 0)
self.assertRaises(pickle.UnpicklingError, BadUnpickler().load)
+ def check_dumps_loads_oob_buffers(self, dumps, loads):
+ # No need to do the full gamut of tests here, just enough to
+ # check that dumps() and loads() redirect their arguments
+ # to the underlying Pickler and Unpickler, respectively.
+ obj = ZeroCopyBytes(b"foo")
+
+ for proto in range(0, 5):
+ # Need protocol >= 5 for buffer_callback
+ with self.assertRaises(ValueError):
+ dumps(obj, protocol=proto,
+ buffer_callback=[].append)
+ for proto in range(5, pickle.HIGHEST_PROTOCOL + 1):
+ buffers = []
+ buffer_callback = buffers.append
+ data = dumps(obj, protocol=proto,
+ buffer_callback=buffer_callback)
+ self.assertNotIn(b"foo", data)
+ self.assertEqual(bytes(buffers[0]), b"foo")
+ # Need buffers argument to unpickle properly
+ with self.assertRaises(pickle.UnpicklingError):
+ loads(data)
+ new = loads(data, buffers=buffers)
+ self.assertIs(new, obj)
+
+ def test_dumps_loads_oob_buffers(self):
+ # Test out-of-band buffers (PEP 574) with top-level dumps() and loads()
+ self.check_dumps_loads_oob_buffers(self.dumps, self.loads)
+
+ def test_dump_load_oob_buffers(self):
+ # Test out-of-band buffers (PEP 574) with top-level dump() and load()
+ def dumps(obj, **kwargs):
+ f = io.BytesIO()
+ self.dump(obj, f, **kwargs)
+ return f.getvalue()
+
+ def loads(data, **kwargs):
+ f = io.BytesIO(data)
+ return self.load(f, **kwargs)
+
+ self.check_dumps_loads_oob_buffers(dumps, loads)
+
class AbstractPersistentPicklerTests(unittest.TestCase):
diff --git a/Lib/test/test_inspect.py b/Lib/test/test_inspect.py
index be52b38..b3aae3a 100644
--- a/Lib/test/test_inspect.py
+++ b/Lib/test/test_inspect.py
@@ -2894,16 +2894,15 @@ class TestSignatureObject(unittest.TestCase):
@unittest.skipIf(MISSING_C_DOCSTRINGS,
"Signature information for builtins requires docstrings")
def test_signature_on_builtin_class(self):
- self.assertEqual(str(inspect.signature(_pickle.Pickler)),
- '(file, protocol=None, fix_imports=True)')
+ expected = ('(file, protocol=None, fix_imports=True, '
+ 'buffer_callback=None)')
+ self.assertEqual(str(inspect.signature(_pickle.Pickler)), expected)
class P(_pickle.Pickler): pass
class EmptyTrait: pass
class P2(EmptyTrait, P): pass
- self.assertEqual(str(inspect.signature(P)),
- '(file, protocol=None, fix_imports=True)')
- self.assertEqual(str(inspect.signature(P2)),
- '(file, protocol=None, fix_imports=True)')
+ self.assertEqual(str(inspect.signature(P)), expected)
+ self.assertEqual(str(inspect.signature(P2)), expected)
class P3(P2):
def __init__(self, spam):
diff --git a/Lib/test/test_pickle.py b/Lib/test/test_pickle.py
index 435c248..5f7a879 100644
--- a/Lib/test/test_pickle.py
+++ b/Lib/test/test_pickle.py
@@ -57,9 +57,9 @@ class PyPicklerTests(AbstractPickleTests):
pickler = pickle._Pickler
unpickler = pickle._Unpickler
- def dumps(self, arg, proto=None):
+ def dumps(self, arg, proto=None, **kwargs):
f = io.BytesIO()
- p = self.pickler(f, proto)
+ p = self.pickler(f, proto, **kwargs)
p.dump(arg)
f.seek(0)
return bytes(f.read())
@@ -78,8 +78,8 @@ class InMemoryPickleTests(AbstractPickleTests, AbstractUnpickleTests,
AttributeError, ValueError,
struct.error, IndexError, ImportError)
- def dumps(self, arg, protocol=None):
- return pickle.dumps(arg, protocol)
+ def dumps(self, arg, protocol=None, **kwargs):
+ return pickle.dumps(arg, protocol, **kwargs)
def loads(self, buf, **kwds):
return pickle.loads(buf, **kwds)
@@ -271,7 +271,7 @@ if has_c_implementation:
check_sizeof = support.check_sizeof
def test_pickler(self):
- basesize = support.calcobjsize('6P2n3i2n3i2P')
+ basesize = support.calcobjsize('7P2n3i2n3i2P')
p = _pickle.Pickler(io.BytesIO())
self.assertEqual(object.__sizeof__(p), basesize)
MT_size = struct.calcsize('3nP0n')
@@ -288,7 +288,7 @@ if has_c_implementation:
0) # Write buffer is cleared after every dump().
def test_unpickler(self):
- basesize = support.calcobjsize('2P2n2P 2P2n2i5P 2P3n6P2n2i')
+ basesize = support.calcobjsize('2P2n2P 2P2n2i5P 2P3n8P2n2i')
unpickler = _pickle.Unpickler
P = struct.calcsize('P') # Size of memo table entry.
n = struct.calcsize('n') # Size of mark table entry.
diff --git a/Lib/test/test_picklebuffer.py b/Lib/test/test_picklebuffer.py
new file mode 100644
index 0000000..7e72157
--- /dev/null
+++ b/Lib/test/test_picklebuffer.py
@@ -0,0 +1,154 @@
+"""Unit tests for the PickleBuffer object.
+
+Pickling tests themselves are in pickletester.py.
+"""
+
+import gc
+from pickle import PickleBuffer
+import sys
+import weakref
+import unittest
+
+from test import support
+
+
+class B(bytes):
+ pass
+
+
+class PickleBufferTest(unittest.TestCase):
+
+ def check_memoryview(self, pb, equiv):
+ with memoryview(pb) as m:
+ with memoryview(equiv) as expected:
+ self.assertEqual(m.nbytes, expected.nbytes)
+ self.assertEqual(m.readonly, expected.readonly)
+ self.assertEqual(m.itemsize, expected.itemsize)
+ self.assertEqual(m.shape, expected.shape)
+ self.assertEqual(m.strides, expected.strides)
+ self.assertEqual(m.c_contiguous, expected.c_contiguous)
+ self.assertEqual(m.f_contiguous, expected.f_contiguous)
+ self.assertEqual(m.format, expected.format)
+ self.assertEqual(m.tobytes(), expected.tobytes())
+
+ def test_constructor_failure(self):
+ with self.assertRaises(TypeError):
+ PickleBuffer()
+ with self.assertRaises(TypeError):
+ PickleBuffer("foo")
+ # Released memoryview fails taking a buffer
+ m = memoryview(b"foo")
+ m.release()
+ with self.assertRaises(ValueError):
+ PickleBuffer(m)
+
+ def test_basics(self):
+ pb = PickleBuffer(b"foo")
+ self.assertEqual(b"foo", bytes(pb))
+ with memoryview(pb) as m:
+ self.assertTrue(m.readonly)
+
+ pb = PickleBuffer(bytearray(b"foo"))
+ self.assertEqual(b"foo", bytes(pb))
+ with memoryview(pb) as m:
+ self.assertFalse(m.readonly)
+ m[0] = 48
+ self.assertEqual(b"0oo", bytes(pb))
+
+ def test_release(self):
+ pb = PickleBuffer(b"foo")
+ pb.release()
+ with self.assertRaises(ValueError) as raises:
+ memoryview(pb)
+ self.assertIn("operation forbidden on released PickleBuffer object",
+ str(raises.exception))
+ # Idempotency
+ pb.release()
+
+ def test_cycle(self):
+ b = B(b"foo")
+ pb = PickleBuffer(b)
+ b.cycle = pb
+ wpb = weakref.ref(pb)
+ del b, pb
+ gc.collect()
+ self.assertIsNone(wpb())
+
+ def test_ndarray_2d(self):
+ # C-contiguous
+ ndarray = support.import_module("_testbuffer").ndarray
+ arr = ndarray(list(range(12)), shape=(4, 3), format='<i')
+ self.assertTrue(arr.c_contiguous)
+ self.assertFalse(arr.f_contiguous)
+ pb = PickleBuffer(arr)
+ self.check_memoryview(pb, arr)
+ # Non-contiguous
+ arr = arr[::2]
+ self.assertFalse(arr.c_contiguous)
+ self.assertFalse(arr.f_contiguous)
+ pb = PickleBuffer(arr)
+ self.check_memoryview(pb, arr)
+ # F-contiguous
+ arr = ndarray(list(range(12)), shape=(3, 4), strides=(4, 12), format='<i')
+ self.assertTrue(arr.f_contiguous)
+ self.assertFalse(arr.c_contiguous)
+ pb = PickleBuffer(arr)
+ self.check_memoryview(pb, arr)
+
+ # Tests for PickleBuffer.raw()
+
+ def check_raw(self, obj, equiv):
+ pb = PickleBuffer(obj)
+ with pb.raw() as m:
+ self.assertIsInstance(m, memoryview)
+ self.check_memoryview(m, equiv)
+
+ def test_raw(self):
+ for obj in (b"foo", bytearray(b"foo")):
+ with self.subTest(obj=obj):
+ self.check_raw(obj, obj)
+
+ def test_raw_ndarray(self):
+ # 1-D, contiguous
+ ndarray = support.import_module("_testbuffer").ndarray
+ arr = ndarray(list(range(3)), shape=(3,), format='<h')
+ equiv = b"\x00\x00\x01\x00\x02\x00"
+ self.check_raw(arr, equiv)
+ # 2-D, C-contiguous
+ arr = ndarray(list(range(6)), shape=(2, 3), format='<h')
+ equiv = b"\x00\x00\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00"
+ self.check_raw(arr, equiv)
+ # 2-D, F-contiguous
+ arr = ndarray(list(range(6)), shape=(2, 3), strides=(2, 4),
+ format='<h')
+ # Note this is different from arr.tobytes()
+ equiv = b"\x00\x00\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00"
+ self.check_raw(arr, equiv)
+ # 0-D
+ arr = ndarray(456, shape=(), format='<i')
+ equiv = b'\xc8\x01\x00\x00'
+ self.check_raw(arr, equiv)
+
+ def check_raw_non_contiguous(self, obj):
+ pb = PickleBuffer(obj)
+ with self.assertRaisesRegex(BufferError, "non-contiguous"):
+ pb.raw()
+
+ def test_raw_non_contiguous(self):
+ # 1-D
+ ndarray = support.import_module("_testbuffer").ndarray
+ arr = ndarray(list(range(6)), shape=(6,), format='<i')[::2]
+ self.check_raw_non_contiguous(arr)
+ # 2-D
+ arr = ndarray(list(range(12)), shape=(4, 3), format='<i')[::2]
+ self.check_raw_non_contiguous(arr)
+
+ def test_raw_released(self):
+ pb = PickleBuffer(b"foo")
+ pb.release()
+ with self.assertRaises(ValueError) as raises:
+ pb.raw()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_pickletools.py b/Lib/test/test_pickletools.py
index e40a958..8cc6ca5 100644
--- a/Lib/test/test_pickletools.py
+++ b/Lib/test/test_pickletools.py
@@ -6,8 +6,8 @@ import unittest
class OptimizedPickleTests(AbstractPickleTests):
- def dumps(self, arg, proto=None):
- return pickletools.optimize(pickle.dumps(arg, proto))
+ def dumps(self, arg, proto=None, **kwargs):
+ return pickletools.optimize(pickle.dumps(arg, proto, **kwargs))
def loads(self, buf, **kwds):
return pickle.loads(buf, **kwds)
@@ -71,23 +71,24 @@ class MiscTestCase(unittest.TestCase):
'read_uint8', 'read_stringnl', 'read_stringnl_noescape',
'read_stringnl_noescape_pair', 'read_string1',
'read_string4', 'read_bytes1', 'read_bytes4',
- 'read_bytes8', 'read_unicodestringnl',
+ 'read_bytes8', 'read_bytearray8', 'read_unicodestringnl',
'read_unicodestring1', 'read_unicodestring4',
'read_unicodestring8', 'read_decimalnl_short',
'read_decimalnl_long', 'read_floatnl', 'read_float8',
'read_long1', 'read_long4',
'uint1', 'uint2', 'int4', 'uint4', 'uint8', 'stringnl',
'stringnl_noescape', 'stringnl_noescape_pair', 'string1',
- 'string4', 'bytes1', 'bytes4', 'bytes8',
+ 'string4', 'bytes1', 'bytes4', 'bytes8', 'bytearray8',
'unicodestringnl', 'unicodestring1', 'unicodestring4',
'unicodestring8', 'decimalnl_short', 'decimalnl_long',
'floatnl', 'float8', 'long1', 'long4',
'StackObject',
'pyint', 'pylong', 'pyinteger_or_bool', 'pybool', 'pyfloat',
- 'pybytes_or_str', 'pystring', 'pybytes', 'pyunicode',
- 'pynone', 'pytuple', 'pylist', 'pydict', 'pyset',
- 'pyfrozenset', 'anyobject', 'markobject', 'stackslice',
- 'OpcodeInfo', 'opcodes', 'code2op',
+ 'pybytes_or_str', 'pystring', 'pybytes', 'pybytearray',
+ 'pyunicode', 'pynone', 'pytuple', 'pylist', 'pydict',
+ 'pyset', 'pyfrozenset', 'pybuffer', 'anyobject',
+ 'markobject', 'stackslice', 'OpcodeInfo', 'opcodes',
+ 'code2op',
}
support.check__all__(self, pickletools, blacklist=blacklist)
diff --git a/Lib/test/test_pyclbr.py b/Lib/test/test_pyclbr.py
index 839c58f..0b3934f 100644
--- a/Lib/test/test_pyclbr.py
+++ b/Lib/test/test_pyclbr.py
@@ -224,7 +224,7 @@ class PyclbrTest(TestCase):
# These were once about the 10 longest modules
cm('random', ignore=('Random',)) # from _random import Random as CoreGenerator
cm('cgi', ignore=('log',)) # set with = in module
- cm('pickle', ignore=('partial',))
+ cm('pickle', ignore=('partial', 'PickleBuffer'))
# TODO(briancurtin): openfp is deprecated as of 3.7.
# Update this once it has been removed.
cm('aifc', ignore=('openfp', '_aifc_params')) # set with = in module