summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/test/test_opcache.py512
1 files changed, 511 insertions, 1 deletions
diff --git a/Lib/test/test_opcache.py b/Lib/test/test_opcache.py
index 5281eb7..2f6f91d 100644
--- a/Lib/test/test_opcache.py
+++ b/Lib/test/test_opcache.py
@@ -1,3 +1,6 @@
+import dis
+import threading
+import types
import unittest
@@ -481,6 +484,513 @@ class TestCallCache(unittest.TestCase):
f()
+class TestRacesDoNotCrash(unittest.TestCase):
+ # Careful with these. Bigger numbers have a higher chance of catching bugs,
+ # but you can also burn through a *ton* of type/dict/function versions:
+ ITEMS = 1000
+ LOOPS = 4
+ WARMUPS = 2
+ WRITERS = 2
+
+ def assert_specialized(self, f, opname):
+ instructions = dis.get_instructions(f, adaptive=True)
+ opnames = {instruction.opname for instruction in instructions}
+ self.assertIn(opname, opnames)
+
+ def assert_races_do_not_crash(
+ self, opname, get_items, read, write, *, check_items=False
+ ):
+ # This might need a few dozen loops in some cases:
+ for _ in range(self.LOOPS):
+ items = get_items()
+ # Reset:
+ if check_items:
+ for item in items:
+ item.__code__ = item.__code__.replace()
+ else:
+ read.__code__ = read.__code__.replace()
+ # Specialize:
+ for _ in range(self.WARMUPS):
+ read(items)
+ if check_items:
+ for item in items:
+ self.assert_specialized(item, opname)
+ else:
+ self.assert_specialized(read, opname)
+ # Create writers:
+ writers = []
+ for _ in range(self.WRITERS):
+ writer = threading.Thread(target=write, args=[items])
+ writers.append(writer)
+ # Run:
+ for writer in writers:
+ writer.start()
+ read(items) # BOOM!
+ for writer in writers:
+ writer.join()
+
+ def test_binary_subscr_getitem(self):
+ def get_items():
+ class C:
+ __getitem__ = lambda self, item: None
+
+ items = []
+ for _ in range(self.ITEMS):
+ item = C()
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ try:
+ item[None]
+ except TypeError:
+ pass
+
+ def write(items):
+ for item in items:
+ try:
+ del item.__getitem__
+ except AttributeError:
+ pass
+ type(item).__getitem__ = lambda self, item: None
+
+ opname = "BINARY_SUBSCR_GETITEM"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_binary_subscr_list_int(self):
+ def get_items():
+ items = []
+ for _ in range(self.ITEMS):
+ item = [None]
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ try:
+ item[0]
+ except IndexError:
+ pass
+
+ def write(items):
+ for item in items:
+ item.clear()
+ item.append(None)
+
+ opname = "BINARY_SUBSCR_LIST_INT"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_for_iter_gen(self):
+ def get_items():
+ def g():
+ yield
+ yield
+
+ items = []
+ for _ in range(self.ITEMS):
+ item = g()
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ try:
+ for _ in item:
+ break
+ except ValueError:
+ pass
+
+ def write(items):
+ for item in items:
+ try:
+ for _ in item:
+ break
+ except ValueError:
+ pass
+
+ opname = "FOR_ITER_GEN"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_for_iter_list(self):
+ def get_items():
+ items = []
+ for _ in range(self.ITEMS):
+ item = [None]
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ for item in item:
+ break
+
+ def write(items):
+ for item in items:
+ item.clear()
+ item.append(None)
+
+ opname = "FOR_ITER_LIST"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_load_attr_class(self):
+ def get_items():
+ class C:
+ a = object()
+
+ items = []
+ for _ in range(self.ITEMS):
+ item = C
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ try:
+ item.a
+ except AttributeError:
+ pass
+
+ def write(items):
+ for item in items:
+ try:
+ del item.a
+ except AttributeError:
+ pass
+ item.a = object()
+
+ opname = "LOAD_ATTR_CLASS"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_load_attr_getattribute_overridden(self):
+ def get_items():
+ class C:
+ __getattribute__ = lambda self, name: None
+
+ items = []
+ for _ in range(self.ITEMS):
+ item = C()
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ try:
+ item.a
+ except AttributeError:
+ pass
+
+ def write(items):
+ for item in items:
+ try:
+ del item.__getattribute__
+ except AttributeError:
+ pass
+ type(item).__getattribute__ = lambda self, name: None
+
+ opname = "LOAD_ATTR_GETATTRIBUTE_OVERRIDDEN"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_load_attr_instance_value(self):
+ def get_items():
+ class C:
+ pass
+
+ items = []
+ for _ in range(self.ITEMS):
+ item = C()
+ item.a = None
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ item.a
+
+ def write(items):
+ for item in items:
+ item.__dict__[None] = None
+
+ opname = "LOAD_ATTR_INSTANCE_VALUE"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_load_attr_method_lazy_dict(self):
+ def get_items():
+ class C(Exception):
+ m = lambda self: None
+
+ items = []
+ for _ in range(self.ITEMS):
+ item = C()
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ try:
+ item.m()
+ except AttributeError:
+ pass
+
+ def write(items):
+ for item in items:
+ try:
+ del item.m
+ except AttributeError:
+ pass
+ type(item).m = lambda self: None
+
+ opname = "LOAD_ATTR_METHOD_LAZY_DICT"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_load_attr_method_no_dict(self):
+ def get_items():
+ class C:
+ __slots__ = ()
+ m = lambda self: None
+
+ items = []
+ for _ in range(self.ITEMS):
+ item = C()
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ try:
+ item.m()
+ except AttributeError:
+ pass
+
+ def write(items):
+ for item in items:
+ try:
+ del item.m
+ except AttributeError:
+ pass
+ type(item).m = lambda self: None
+
+ opname = "LOAD_ATTR_METHOD_NO_DICT"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_load_attr_method_with_values(self):
+ def get_items():
+ class C:
+ m = lambda self: None
+
+ items = []
+ for _ in range(self.ITEMS):
+ item = C()
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ try:
+ item.m()
+ except AttributeError:
+ pass
+
+ def write(items):
+ for item in items:
+ try:
+ del item.m
+ except AttributeError:
+ pass
+ type(item).m = lambda self: None
+
+ opname = "LOAD_ATTR_METHOD_WITH_VALUES"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_load_attr_module(self):
+ def get_items():
+ items = []
+ for _ in range(self.ITEMS):
+ item = types.ModuleType("<item>")
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ try:
+ item.__name__
+ except AttributeError:
+ pass
+
+ def write(items):
+ for item in items:
+ d = item.__dict__.copy()
+ item.__dict__.clear()
+ item.__dict__.update(d)
+
+ opname = "LOAD_ATTR_MODULE"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_load_attr_property(self):
+ def get_items():
+ class C:
+ a = property(lambda self: None)
+
+ items = []
+ for _ in range(self.ITEMS):
+ item = C()
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ try:
+ item.a
+ except AttributeError:
+ pass
+
+ def write(items):
+ for item in items:
+ try:
+ del type(item).a
+ except AttributeError:
+ pass
+ type(item).a = property(lambda self: None)
+
+ opname = "LOAD_ATTR_PROPERTY"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_load_attr_with_hint(self):
+ def get_items():
+ class C:
+ pass
+
+ items = []
+ for _ in range(self.ITEMS):
+ item = C()
+ item.__dict__
+ item.a = None
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ item.a
+
+ def write(items):
+ for item in items:
+ item.__dict__[None] = None
+
+ opname = "LOAD_ATTR_WITH_HINT"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_load_global_module(self):
+ def get_items():
+ items = []
+ for _ in range(self.ITEMS):
+ item = eval("lambda: x", {"x": None})
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ item()
+
+ def write(items):
+ for item in items:
+ item.__globals__[None] = None
+
+ opname = "LOAD_GLOBAL_MODULE"
+ self.assert_races_do_not_crash(
+ opname, get_items, read, write, check_items=True
+ )
+
+ def test_store_attr_instance_value(self):
+ def get_items():
+ class C:
+ pass
+
+ items = []
+ for _ in range(self.ITEMS):
+ item = C()
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ item.a = None
+
+ def write(items):
+ for item in items:
+ item.__dict__[None] = None
+
+ opname = "STORE_ATTR_INSTANCE_VALUE"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_store_attr_with_hint(self):
+ def get_items():
+ class C:
+ pass
+
+ items = []
+ for _ in range(self.ITEMS):
+ item = C()
+ item.__dict__
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ item.a = None
+
+ def write(items):
+ for item in items:
+ item.__dict__[None] = None
+
+ opname = "STORE_ATTR_WITH_HINT"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_store_subscr_list_int(self):
+ def get_items():
+ items = []
+ for _ in range(self.ITEMS):
+ item = [None]
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ try:
+ item[0] = None
+ except IndexError:
+ pass
+
+ def write(items):
+ for item in items:
+ item.clear()
+ item.append(None)
+
+ opname = "STORE_SUBSCR_LIST_INT"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+ def test_unpack_sequence_list(self):
+ def get_items():
+ items = []
+ for _ in range(self.ITEMS):
+ item = [None]
+ items.append(item)
+ return items
+
+ def read(items):
+ for item in items:
+ try:
+ [_] = item
+ except ValueError:
+ pass
+
+ def write(items):
+ for item in items:
+ item.clear()
+ item.append(None)
+
+ opname = "UNPACK_SEQUENCE_LIST"
+ self.assert_races_do_not_crash(opname, get_items, read, write)
+
+
if __name__ == "__main__":
- import unittest
unittest.main()