summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_capi.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_capi.py')
-rw-r--r--Lib/test/test_capi.py169
1 files changed, 168 insertions, 1 deletions
diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py
index a2183cf..364c607 100644
--- a/Lib/test/test_capi.py
+++ b/Lib/test/test_capi.py
@@ -2,7 +2,7 @@
# these are all functions _testcapi exports whose name begins with 'test_'.
from collections import OrderedDict
-from contextlib import contextmanager
+from contextlib import contextmanager, ExitStack
import _thread
import importlib.machinery
import importlib.util
@@ -1606,5 +1606,172 @@ class TestDictWatchers(unittest.TestCase):
self.clear_watcher(1)
+class TestTypeWatchers(unittest.TestCase):
+ # types of watchers testcapimodule can add:
+ TYPES = 0 # appends modified types to global event list
+ ERROR = 1 # unconditionally sets and signals a RuntimeException
+ WRAP = 2 # appends modified type wrapped in list to global event list
+
+ # duplicating the C constant
+ TYPE_MAX_WATCHERS = 8
+
+ def add_watcher(self, kind=TYPES):
+ return _testcapi.add_type_watcher(kind)
+
+ def clear_watcher(self, watcher_id):
+ _testcapi.clear_type_watcher(watcher_id)
+
+ @contextmanager
+ def watcher(self, kind=TYPES):
+ wid = self.add_watcher(kind)
+ try:
+ yield wid
+ finally:
+ self.clear_watcher(wid)
+
+ def assert_events(self, expected):
+ actual = _testcapi.get_type_modified_events()
+ self.assertEqual(actual, expected)
+
+ def watch(self, wid, t):
+ _testcapi.watch_type(wid, t)
+
+ def unwatch(self, wid, t):
+ _testcapi.unwatch_type(wid, t)
+
+ def test_watch_type(self):
+ class C: pass
+ with self.watcher() as wid:
+ self.watch(wid, C)
+ C.foo = "bar"
+ self.assert_events([C])
+
+ def test_event_aggregation(self):
+ class C: pass
+ with self.watcher() as wid:
+ self.watch(wid, C)
+ C.foo = "bar"
+ C.bar = "baz"
+ # only one event registered for both modifications
+ self.assert_events([C])
+
+ def test_lookup_resets_aggregation(self):
+ class C: pass
+ with self.watcher() as wid:
+ self.watch(wid, C)
+ C.foo = "bar"
+ # lookup resets type version tag
+ self.assertEqual(C.foo, "bar")
+ C.bar = "baz"
+ # both events registered
+ self.assert_events([C, C])
+
+ def test_unwatch_type(self):
+ class C: pass
+ with self.watcher() as wid:
+ self.watch(wid, C)
+ C.foo = "bar"
+ self.assertEqual(C.foo, "bar")
+ self.assert_events([C])
+ self.unwatch(wid, C)
+ C.bar = "baz"
+ self.assert_events([C])
+
+ def test_clear_watcher(self):
+ class C: pass
+ # outer watcher is unused, it's just to keep events list alive
+ with self.watcher() as _:
+ with self.watcher() as wid:
+ self.watch(wid, C)
+ C.foo = "bar"
+ self.assertEqual(C.foo, "bar")
+ self.assert_events([C])
+ C.bar = "baz"
+ # Watcher on C has been cleared, no new event
+ self.assert_events([C])
+
+ def test_watch_type_subclass(self):
+ class C: pass
+ class D(C): pass
+ with self.watcher() as wid:
+ self.watch(wid, D)
+ C.foo = "bar"
+ self.assert_events([D])
+
+ def test_error(self):
+ class C: pass
+ with self.watcher(kind=self.ERROR) as wid:
+ self.watch(wid, C)
+ with catch_unraisable_exception() as cm:
+ C.foo = "bar"
+ self.assertIs(cm.unraisable.object, C)
+ self.assertEqual(str(cm.unraisable.exc_value), "boom!")
+ self.assert_events([])
+
+ def test_two_watchers(self):
+ class C1: pass
+ class C2: pass
+ with self.watcher() as wid1:
+ with self.watcher(kind=self.WRAP) as wid2:
+ self.assertNotEqual(wid1, wid2)
+ self.watch(wid1, C1)
+ self.watch(wid2, C2)
+ C1.foo = "bar"
+ C2.hmm = "baz"
+ self.assert_events([C1, [C2]])
+
+ def test_watch_non_type(self):
+ with self.watcher() as wid:
+ with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
+ self.watch(wid, 1)
+
+ def test_watch_out_of_range_watcher_id(self):
+ class C: pass
+ with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
+ self.watch(-1, C)
+ with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
+ self.watch(self.TYPE_MAX_WATCHERS, C)
+
+ def test_watch_unassigned_watcher_id(self):
+ class C: pass
+ with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
+ self.watch(1, C)
+
+ def test_unwatch_non_type(self):
+ with self.watcher() as wid:
+ with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
+ self.unwatch(wid, 1)
+
+ def test_unwatch_out_of_range_watcher_id(self):
+ class C: pass
+ with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
+ self.unwatch(-1, C)
+ with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
+ self.unwatch(self.TYPE_MAX_WATCHERS, C)
+
+ def test_unwatch_unassigned_watcher_id(self):
+ class C: pass
+ with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
+ self.unwatch(1, C)
+
+ def test_clear_out_of_range_watcher_id(self):
+ with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
+ self.clear_watcher(-1)
+ with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
+ self.clear_watcher(self.TYPE_MAX_WATCHERS)
+
+ def test_clear_unassigned_watcher_id(self):
+ with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
+ self.clear_watcher(1)
+
+ def test_no_more_ids_available(self):
+ contexts = [self.watcher() for i in range(self.TYPE_MAX_WATCHERS)]
+ with ExitStack() as stack:
+ for ctx in contexts:
+ stack.enter_context(ctx)
+ with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"):
+ self.add_watcher()
+
+
if __name__ == "__main__":
unittest.main()