summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorSerhiy Storchaka <storchaka@gmail.com>2024-08-31 09:30:05 (GMT)
committerGitHub <noreply@github.com>2024-08-31 09:30:05 (GMT)
commite5a567b0a721c26c79530249d9aa159afbd11955 (patch)
treecbd2daeb31da31e8a65757d357b9dc29cba7bc71 /Lib
parent5332d989af45378e6ae99aeda72bfa82042b8659 (diff)
downloadcpython-e5a567b0a721c26c79530249d9aa159afbd11955.zip
cpython-e5a567b0a721c26c79530249d9aa159afbd11955.tar.gz
cpython-e5a567b0a721c26c79530249d9aa159afbd11955.tar.bz2
gh-123309: Add more tests for the pickletools module (GH-123355)
Add tests for genops() and dis().
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/test_pickletools.py310
1 files changed, 310 insertions, 0 deletions
diff --git a/Lib/test/test_pickletools.py b/Lib/test/test_pickletools.py
index d37af79..8cb1f6d 100644
--- a/Lib/test/test_pickletools.py
+++ b/Lib/test/test_pickletools.py
@@ -1,3 +1,4 @@
+import io
import pickle
import pickletools
from test import support
@@ -62,6 +63,315 @@ class OptimizedPickleTests(AbstractPickleTests, unittest.TestCase):
self.assertNotIn(pickle.BINPUT, pickled2)
+class SimpleReader:
+ def __init__(self, data):
+ self.data = data
+ self.pos = 0
+
+ def read(self, n):
+ data = self.data[self.pos: self.pos + n]
+ self.pos += n
+ return data
+
+ def readline(self):
+ nl = self.data.find(b'\n', self.pos) + 1
+ if not nl:
+ nl = len(self.data)
+ data = self.data[self.pos: nl]
+ self.pos = nl
+ return data
+
+
+class GenopsTests(unittest.TestCase):
+ def test_genops(self):
+ it = pickletools.genops(b'(I123\nK\x12J\x12\x34\x56\x78t.')
+ self.assertEqual([(item[0].name,) + item[1:] for item in it], [
+ ('MARK', None, 0),
+ ('INT', 123, 1),
+ ('BININT1', 0x12, 6),
+ ('BININT', 0x78563412, 8),
+ ('TUPLE', None, 13),
+ ('STOP', None, 14),
+ ])
+
+ def test_from_file(self):
+ f = io.BytesIO(b'prefix(I123\nK\x12J\x12\x34\x56\x78t.suffix')
+ self.assertEqual(f.read(6), b'prefix')
+ it = pickletools.genops(f)
+ self.assertEqual([(item[0].name,) + item[1:] for item in it], [
+ ('MARK', None, 6),
+ ('INT', 123, 7),
+ ('BININT1', 0x12, 12),
+ ('BININT', 0x78563412, 14),
+ ('TUPLE', None, 19),
+ ('STOP', None, 20),
+ ])
+ self.assertEqual(f.read(), b'suffix')
+
+ def test_without_pos(self):
+ f = SimpleReader(b'(I123\nK\x12J\x12\x34\x56\x78t.')
+ it = pickletools.genops(f)
+ self.assertEqual([(item[0].name,) + item[1:] for item in it], [
+ ('MARK', None, None),
+ ('INT', 123, None),
+ ('BININT1', 0x12, None),
+ ('BININT', 0x78563412, None),
+ ('TUPLE', None, None),
+ ('STOP', None, None),
+ ])
+
+ def test_no_stop(self):
+ it = pickletools.genops(b'N')
+ item = next(it)
+ self.assertEqual(item[0].name, 'NONE')
+ with self.assertRaisesRegex(ValueError,
+ 'pickle exhausted before seeing STOP'):
+ next(it)
+
+ def test_truncated_data(self):
+ it = pickletools.genops(b'I123')
+ with self.assertRaisesRegex(ValueError,
+ 'no newline found when trying to read stringnl'):
+ next(it)
+ it = pickletools.genops(b'J\x12\x34')
+ with self.assertRaisesRegex(ValueError,
+ 'not enough data in stream to read int4'):
+ next(it)
+
+ def test_unknown_opcode(self):
+ it = pickletools.genops(b'N\xff')
+ item = next(it)
+ self.assertEqual(item[0].name, 'NONE')
+ with self.assertRaisesRegex(ValueError,
+ r"at position 1, opcode b'\\xff' unknown"):
+ next(it)
+
+ def test_unknown_opcode_without_pos(self):
+ f = SimpleReader(b'N\xff')
+ it = pickletools.genops(f)
+ item = next(it)
+ self.assertEqual(item[0].name, 'NONE')
+ with self.assertRaisesRegex(ValueError,
+ r"at position <unknown>, opcode b'\\xff' unknown"):
+ next(it)
+
+
+class DisTests(unittest.TestCase):
+ maxDiff = None
+
+ def check_dis(self, data, expected, **kwargs):
+ out = io.StringIO()
+ pickletools.dis(data, out=out, **kwargs)
+ self.assertEqual(out.getvalue(), expected)
+
+ def check_dis_error(self, data, expected, expected_error, **kwargs):
+ out = io.StringIO()
+ with self.assertRaisesRegex(ValueError, expected_error):
+ pickletools.dis(data, out=out, **kwargs)
+ self.assertEqual(out.getvalue(), expected)
+
+ def test_mark(self):
+ self.check_dis(b'(N(tl.', '''\
+ 0: ( MARK
+ 1: N NONE
+ 2: ( MARK
+ 3: t TUPLE (MARK at 2)
+ 4: l LIST (MARK at 0)
+ 5: . STOP
+highest protocol among opcodes = 0
+''')
+
+ def test_indentlevel(self):
+ self.check_dis(b'(N(tl.', '''\
+ 0: ( MARK
+ 1: N NONE
+ 2: ( MARK
+ 3: t TUPLE (MARK at 2)
+ 4: l LIST (MARK at 0)
+ 5: . STOP
+highest protocol among opcodes = 0
+''', indentlevel=2)
+
+ def test_mark_without_pos(self):
+ self.check_dis(SimpleReader(b'(N(tl.'), '''\
+( MARK
+N NONE
+( MARK
+t TUPLE (MARK at unknown opcode offset)
+l LIST (MARK at unknown opcode offset)
+. STOP
+highest protocol among opcodes = 0
+''')
+
+ def test_no_mark(self):
+ self.check_dis_error(b'Nt.', '''\
+ 0: N NONE
+ 1: t TUPLE no MARK exists on stack
+''', 'no MARK exists on stack')
+
+ def test_put(self):
+ self.check_dis(b'Np0\nq\x01r\x02\x00\x00\x00\x94.', '''\
+ 0: N NONE
+ 1: p PUT 0
+ 4: q BINPUT 1
+ 6: r LONG_BINPUT 2
+ 11: \\x94 MEMOIZE (as 3)
+ 12: . STOP
+highest protocol among opcodes = 4
+''')
+
+ def test_put_redefined(self):
+ self.check_dis_error(b'Np1\np1\n.', '''\
+ 0: N NONE
+ 1: p PUT 1
+ 4: p PUT 1
+''', 'memo key 1 already defined')
+ self.check_dis_error(b'Np1\nq\x01.', '''\
+ 0: N NONE
+ 1: p PUT 1
+ 4: q BINPUT 1
+''', 'memo key 1 already defined')
+ self.check_dis_error(b'Np1\nr\x01\x00\x00\x00.', '''\
+ 0: N NONE
+ 1: p PUT 1
+ 4: r LONG_BINPUT 1
+''', 'memo key 1 already defined')
+ self.check_dis_error(b'Np1\n\x94.', '''\
+ 0: N NONE
+ 1: p PUT 1
+ 4: \\x94 MEMOIZE (as 1)
+''', 'memo key None already defined')
+
+ def test_put_empty_stack(self):
+ self.check_dis_error(b'p0\n', '''\
+ 0: p PUT 0
+''', "stack is empty -- can't store into memo")
+
+ def test_put_markobject(self):
+ self.check_dis_error(b'(p0\n', '''\
+ 0: ( MARK
+ 1: p PUT 0
+''', "can't store markobject in the memo")
+
+ def test_get(self):
+ self.check_dis(b'(Np1\ng1\nh\x01j\x01\x00\x00\x00t.', '''\
+ 0: ( MARK
+ 1: N NONE
+ 2: p PUT 1
+ 5: g GET 1
+ 8: h BINGET 1
+ 10: j LONG_BINGET 1
+ 15: t TUPLE (MARK at 0)
+ 16: . STOP
+highest protocol among opcodes = 1
+''')
+
+ def test_get_without_put(self):
+ self.check_dis_error(b'g1\n.', '''\
+ 0: g GET 1
+''', 'memo key 1 has never been stored into')
+ self.check_dis_error(b'h\x01.', '''\
+ 0: h BINGET 1
+''', 'memo key 1 has never been stored into')
+ self.check_dis_error(b'j\x01\x00\x00\x00.', '''\
+ 0: j LONG_BINGET 1
+''', 'memo key 1 has never been stored into')
+
+ def test_memo(self):
+ memo = {}
+ self.check_dis(b'Np1\n.', '''\
+ 0: N NONE
+ 1: p PUT 1
+ 4: . STOP
+highest protocol among opcodes = 0
+''', memo=memo)
+ self.check_dis(b'g1\n.', '''\
+ 0: g GET 1
+ 3: . STOP
+highest protocol among opcodes = 0
+''', memo=memo)
+
+ def test_mark_pop(self):
+ self.check_dis(b'(N00N.', '''\
+ 0: ( MARK
+ 1: N NONE
+ 2: 0 POP
+ 3: 0 POP (MARK at 0)
+ 4: N NONE
+ 5: . STOP
+highest protocol among opcodes = 0
+''')
+
+ def test_too_small_stack(self):
+ self.check_dis_error(b'a', '''\
+ 0: a APPEND
+''', 'tries to pop 2 items from stack with only 0 items')
+ self.check_dis_error(b']a', '''\
+ 0: ] EMPTY_LIST
+ 1: a APPEND
+''', 'tries to pop 2 items from stack with only 1 items')
+
+ def test_no_stop(self):
+ self.check_dis_error(b'N', '''\
+ 0: N NONE
+''', 'pickle exhausted before seeing STOP')
+
+ def test_truncated_data(self):
+ self.check_dis_error(b'NI123', '''\
+ 0: N NONE
+''', 'no newline found when trying to read stringnl')
+ self.check_dis_error(b'NJ\x12\x34', '''\
+ 0: N NONE
+''', 'not enough data in stream to read int4')
+
+ def test_unknown_opcode(self):
+ self.check_dis_error(b'N\xff', '''\
+ 0: N NONE
+''', r"at position 1, opcode b'\\xff' unknown")
+
+ def test_stop_not_empty_stack(self):
+ self.check_dis_error(b']N.', '''\
+ 0: ] EMPTY_LIST
+ 1: N NONE
+ 2: . STOP
+highest protocol among opcodes = 1
+''', r'stack not empty after STOP: \[list\]')
+
+ def test_annotate(self):
+ self.check_dis(b'(Nt.', '''\
+ 0: ( MARK Push markobject onto the stack.
+ 1: N NONE Push None on the stack.
+ 2: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject.
+ 3: . STOP Stop the unpickling machine.
+highest protocol among opcodes = 0
+''', annotate=1)
+ self.check_dis(b'(Nt.', '''\
+ 0: ( MARK Push markobject onto the stack.
+ 1: N NONE Push None on the stack.
+ 2: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject.
+ 3: . STOP Stop the unpickling machine.
+highest protocol among opcodes = 0
+''', annotate=20)
+ self.check_dis(b'(((((((ttttttt.', '''\
+ 0: ( MARK Push markobject onto the stack.
+ 1: ( MARK Push markobject onto the stack.
+ 2: ( MARK Push markobject onto the stack.
+ 3: ( MARK Push markobject onto the stack.
+ 4: ( MARK Push markobject onto the stack.
+ 5: ( MARK Push markobject onto the stack.
+ 6: ( MARK Push markobject onto the stack.
+ 7: t TUPLE (MARK at 6) Build a tuple out of the topmost stack slice, after markobject.
+ 8: t TUPLE (MARK at 5) Build a tuple out of the topmost stack slice, after markobject.
+ 9: t TUPLE (MARK at 4) Build a tuple out of the topmost stack slice, after markobject.
+ 10: t TUPLE (MARK at 3) Build a tuple out of the topmost stack slice, after markobject.
+ 11: t TUPLE (MARK at 2) Build a tuple out of the topmost stack slice, after markobject.
+ 12: t TUPLE (MARK at 1) Build a tuple out of the topmost stack slice, after markobject.
+ 13: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject.
+ 14: . STOP Stop the unpickling machine.
+highest protocol among opcodes = 0
+''', annotate=20)
+
+
class MiscTestCase(unittest.TestCase):
def test__all__(self):
not_exported = {