summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorErlend E. Aasland <erlend.aasland@protonmail.com>2022-11-16 19:13:32 (GMT)
committerGitHub <noreply@github.com>2022-11-16 19:13:32 (GMT)
commit51d10354f43c708ebbb7d13c14bf0ea52b28489e (patch)
treebce59a3bca311fcd9192403265c806e0f72baa68
parent9db1e17c80217d7b18a2f44297d72acd10ee73d6 (diff)
downloadcpython-51d10354f43c708ebbb7d13c14bf0ea52b28489e.zip
cpython-51d10354f43c708ebbb7d13c14bf0ea52b28489e.tar.gz
cpython-51d10354f43c708ebbb7d13c14bf0ea52b28489e.tar.bz2
gh-93649: Split watcher API tests from _testcapimodule.c (#99532)
-rw-r--r--Lib/test/test_capi/test_misc.py330
-rw-r--r--Lib/test/test_capi/test_watchers.py340
-rw-r--r--Modules/Setup.stdlib.in2
-rw-r--r--Modules/_testcapi/parts.h1
-rw-r--r--Modules/_testcapi/watchers.c302
-rw-r--r--Modules/_testcapimodule.c288
-rw-r--r--PCbuild/_testcapi.vcxproj1
-rw-r--r--PCbuild/_testcapi.vcxproj.filters3
8 files changed, 651 insertions, 616 deletions
diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py
index ea4c9de..1d30ada 100644
--- a/Lib/test/test_capi/test_misc.py
+++ b/Lib/test/test_capi/test_misc.py
@@ -2,7 +2,6 @@
# these are all functions _testcapi exports whose name begins with 'test_'.
from collections import OrderedDict
-from contextlib import contextmanager, ExitStack
import _thread
import importlib.machinery
import importlib.util
@@ -20,7 +19,6 @@ import warnings
import weakref
from test import support
from test.support import MISSING_C_DOCSTRINGS
-from test.support import catch_unraisable_exception
from test.support import import_helper
from test.support import threading_helper
from test.support import warnings_helper
@@ -1705,333 +1703,5 @@ class Test_Pep523API(unittest.TestCase):
self.do_test(func2)
-class TestDictWatchers(unittest.TestCase):
- # types of watchers testcapimodule can add:
- EVENTS = 0 # appends dict events as strings to global event list
- ERROR = 1 # unconditionally sets and signals a RuntimeException
- SECOND = 2 # always appends "second" to global event list
-
- def add_watcher(self, kind=EVENTS):
- return _testcapi.add_dict_watcher(kind)
-
- def clear_watcher(self, watcher_id):
- _testcapi.clear_dict_watcher(watcher_id)
-
- @contextmanager
- def watcher(self, kind=EVENTS):
- wid = self.add_watcher(kind)
- try:
- yield wid
- finally:
- self.clear_watcher(wid)
-
- def assert_events(self, expected):
- actual = _testcapi.get_dict_watcher_events()
- self.assertEqual(actual, expected)
-
- def watch(self, wid, d):
- _testcapi.watch_dict(wid, d)
-
- def unwatch(self, wid, d):
- _testcapi.unwatch_dict(wid, d)
-
- def test_set_new_item(self):
- d = {}
- with self.watcher() as wid:
- self.watch(wid, d)
- d["foo"] = "bar"
- self.assert_events(["new:foo:bar"])
-
- def test_set_existing_item(self):
- d = {"foo": "bar"}
- with self.watcher() as wid:
- self.watch(wid, d)
- d["foo"] = "baz"
- self.assert_events(["mod:foo:baz"])
-
- def test_clone(self):
- d = {}
- d2 = {"foo": "bar"}
- with self.watcher() as wid:
- self.watch(wid, d)
- d.update(d2)
- self.assert_events(["clone"])
-
- def test_no_event_if_not_watched(self):
- d = {}
- with self.watcher() as wid:
- d["foo"] = "bar"
- self.assert_events([])
-
- def test_del(self):
- d = {"foo": "bar"}
- with self.watcher() as wid:
- self.watch(wid, d)
- del d["foo"]
- self.assert_events(["del:foo"])
-
- def test_pop(self):
- d = {"foo": "bar"}
- with self.watcher() as wid:
- self.watch(wid, d)
- d.pop("foo")
- self.assert_events(["del:foo"])
-
- def test_clear(self):
- d = {"foo": "bar"}
- with self.watcher() as wid:
- self.watch(wid, d)
- d.clear()
- self.assert_events(["clear"])
-
- def test_dealloc(self):
- d = {"foo": "bar"}
- with self.watcher() as wid:
- self.watch(wid, d)
- del d
- self.assert_events(["dealloc"])
-
- def test_unwatch(self):
- d = {}
- with self.watcher() as wid:
- self.watch(wid, d)
- d["foo"] = "bar"
- self.unwatch(wid, d)
- d["hmm"] = "baz"
- self.assert_events(["new:foo:bar"])
-
- def test_error(self):
- d = {}
- with self.watcher(kind=self.ERROR) as wid:
- self.watch(wid, d)
- with catch_unraisable_exception() as cm:
- d["foo"] = "bar"
- self.assertIs(cm.unraisable.object, d)
- self.assertEqual(str(cm.unraisable.exc_value), "boom!")
- self.assert_events([])
-
- def test_two_watchers(self):
- d1 = {}
- d2 = {}
- with self.watcher() as wid1:
- with self.watcher(kind=self.SECOND) as wid2:
- self.watch(wid1, d1)
- self.watch(wid2, d2)
- d1["foo"] = "bar"
- d2["hmm"] = "baz"
- self.assert_events(["new:foo:bar", "second"])
-
- def test_watch_non_dict(self):
- with self.watcher() as wid:
- with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
- self.watch(wid, 1)
-
- def test_watch_out_of_range_watcher_id(self):
- d = {}
- with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
- self.watch(-1, d)
- with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
- self.watch(8, d) # DICT_MAX_WATCHERS = 8
-
- def test_watch_unassigned_watcher_id(self):
- d = {}
- with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
- self.watch(1, d)
-
- def test_unwatch_non_dict(self):
- with self.watcher() as wid:
- with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
- self.unwatch(wid, 1)
-
- def test_unwatch_out_of_range_watcher_id(self):
- d = {}
- with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
- self.unwatch(-1, d)
- with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
- self.unwatch(8, d) # DICT_MAX_WATCHERS = 8
-
- def test_unwatch_unassigned_watcher_id(self):
- d = {}
- with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
- self.unwatch(1, d)
-
- def test_clear_out_of_range_watcher_id(self):
- with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
- self.clear_watcher(-1)
- with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
- self.clear_watcher(8) # DICT_MAX_WATCHERS = 8
-
- def test_clear_unassigned_watcher_id(self):
- with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
- self.clear_watcher(1)
-
-
-class TestTypeWatchers(unittest.TestCase):
- # types of watchers testcapimodule can add:
- TYPES = 0 # appends modified types to global event list
- ERROR = 1 # unconditionally sets and signals a RuntimeException
- WRAP = 2 # appends modified type wrapped in list to global event list
-
- # duplicating the C constant
- TYPE_MAX_WATCHERS = 8
-
- def add_watcher(self, kind=TYPES):
- return _testcapi.add_type_watcher(kind)
-
- def clear_watcher(self, watcher_id):
- _testcapi.clear_type_watcher(watcher_id)
-
- @contextmanager
- def watcher(self, kind=TYPES):
- wid = self.add_watcher(kind)
- try:
- yield wid
- finally:
- self.clear_watcher(wid)
-
- def assert_events(self, expected):
- actual = _testcapi.get_type_modified_events()
- self.assertEqual(actual, expected)
-
- def watch(self, wid, t):
- _testcapi.watch_type(wid, t)
-
- def unwatch(self, wid, t):
- _testcapi.unwatch_type(wid, t)
-
- def test_watch_type(self):
- class C: pass
- with self.watcher() as wid:
- self.watch(wid, C)
- C.foo = "bar"
- self.assert_events([C])
-
- def test_event_aggregation(self):
- class C: pass
- with self.watcher() as wid:
- self.watch(wid, C)
- C.foo = "bar"
- C.bar = "baz"
- # only one event registered for both modifications
- self.assert_events([C])
-
- def test_lookup_resets_aggregation(self):
- class C: pass
- with self.watcher() as wid:
- self.watch(wid, C)
- C.foo = "bar"
- # lookup resets type version tag
- self.assertEqual(C.foo, "bar")
- C.bar = "baz"
- # both events registered
- self.assert_events([C, C])
-
- def test_unwatch_type(self):
- class C: pass
- with self.watcher() as wid:
- self.watch(wid, C)
- C.foo = "bar"
- self.assertEqual(C.foo, "bar")
- self.assert_events([C])
- self.unwatch(wid, C)
- C.bar = "baz"
- self.assert_events([C])
-
- def test_clear_watcher(self):
- class C: pass
- # outer watcher is unused, it's just to keep events list alive
- with self.watcher() as _:
- with self.watcher() as wid:
- self.watch(wid, C)
- C.foo = "bar"
- self.assertEqual(C.foo, "bar")
- self.assert_events([C])
- C.bar = "baz"
- # Watcher on C has been cleared, no new event
- self.assert_events([C])
-
- def test_watch_type_subclass(self):
- class C: pass
- class D(C): pass
- with self.watcher() as wid:
- self.watch(wid, D)
- C.foo = "bar"
- self.assert_events([D])
-
- def test_error(self):
- class C: pass
- with self.watcher(kind=self.ERROR) as wid:
- self.watch(wid, C)
- with catch_unraisable_exception() as cm:
- C.foo = "bar"
- self.assertIs(cm.unraisable.object, C)
- self.assertEqual(str(cm.unraisable.exc_value), "boom!")
- self.assert_events([])
-
- def test_two_watchers(self):
- class C1: pass
- class C2: pass
- with self.watcher() as wid1:
- with self.watcher(kind=self.WRAP) as wid2:
- self.assertNotEqual(wid1, wid2)
- self.watch(wid1, C1)
- self.watch(wid2, C2)
- C1.foo = "bar"
- C2.hmm = "baz"
- self.assert_events([C1, [C2]])
-
- def test_watch_non_type(self):
- with self.watcher() as wid:
- with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
- self.watch(wid, 1)
-
- def test_watch_out_of_range_watcher_id(self):
- class C: pass
- with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
- self.watch(-1, C)
- with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
- self.watch(self.TYPE_MAX_WATCHERS, C)
-
- def test_watch_unassigned_watcher_id(self):
- class C: pass
- with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
- self.watch(1, C)
-
- def test_unwatch_non_type(self):
- with self.watcher() as wid:
- with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
- self.unwatch(wid, 1)
-
- def test_unwatch_out_of_range_watcher_id(self):
- class C: pass
- with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
- self.unwatch(-1, C)
- with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
- self.unwatch(self.TYPE_MAX_WATCHERS, C)
-
- def test_unwatch_unassigned_watcher_id(self):
- class C: pass
- with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
- self.unwatch(1, C)
-
- def test_clear_out_of_range_watcher_id(self):
- with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
- self.clear_watcher(-1)
- with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
- self.clear_watcher(self.TYPE_MAX_WATCHERS)
-
- def test_clear_unassigned_watcher_id(self):
- with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
- self.clear_watcher(1)
-
- def test_no_more_ids_available(self):
- contexts = [self.watcher() for i in range(self.TYPE_MAX_WATCHERS)]
- with ExitStack() as stack:
- for ctx in contexts:
- stack.enter_context(ctx)
- with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"):
- self.add_watcher()
-
-
if __name__ == "__main__":
unittest.main()
diff --git a/Lib/test/test_capi/test_watchers.py b/Lib/test/test_capi/test_watchers.py
new file mode 100644
index 0000000..f635c74
--- /dev/null
+++ b/Lib/test/test_capi/test_watchers.py
@@ -0,0 +1,340 @@
+import unittest
+
+from contextlib import contextmanager, ExitStack
+from test.support import catch_unraisable_exception, import_helper
+
+
+# Skip this test if the _testcapi module isn't available.
+_testcapi = import_helper.import_module('_testcapi')
+
+
+class TestDictWatchers(unittest.TestCase):
+ # types of watchers testcapimodule can add:
+ EVENTS = 0 # appends dict events as strings to global event list
+ ERROR = 1 # unconditionally sets and signals a RuntimeException
+ SECOND = 2 # always appends "second" to global event list
+
+ def add_watcher(self, kind=EVENTS):
+ return _testcapi.add_dict_watcher(kind)
+
+ def clear_watcher(self, watcher_id):
+ _testcapi.clear_dict_watcher(watcher_id)
+
+ @contextmanager
+ def watcher(self, kind=EVENTS):
+ wid = self.add_watcher(kind)
+ try:
+ yield wid
+ finally:
+ self.clear_watcher(wid)
+
+ def assert_events(self, expected):
+ actual = _testcapi.get_dict_watcher_events()
+ self.assertEqual(actual, expected)
+
+ def watch(self, wid, d):
+ _testcapi.watch_dict(wid, d)
+
+ def unwatch(self, wid, d):
+ _testcapi.unwatch_dict(wid, d)
+
+ def test_set_new_item(self):
+ d = {}
+ with self.watcher() as wid:
+ self.watch(wid, d)
+ d["foo"] = "bar"
+ self.assert_events(["new:foo:bar"])
+
+ def test_set_existing_item(self):
+ d = {"foo": "bar"}
+ with self.watcher() as wid:
+ self.watch(wid, d)
+ d["foo"] = "baz"
+ self.assert_events(["mod:foo:baz"])
+
+ def test_clone(self):
+ d = {}
+ d2 = {"foo": "bar"}
+ with self.watcher() as wid:
+ self.watch(wid, d)
+ d.update(d2)
+ self.assert_events(["clone"])
+
+ def test_no_event_if_not_watched(self):
+ d = {}
+ with self.watcher() as wid:
+ d["foo"] = "bar"
+ self.assert_events([])
+
+ def test_del(self):
+ d = {"foo": "bar"}
+ with self.watcher() as wid:
+ self.watch(wid, d)
+ del d["foo"]
+ self.assert_events(["del:foo"])
+
+ def test_pop(self):
+ d = {"foo": "bar"}
+ with self.watcher() as wid:
+ self.watch(wid, d)
+ d.pop("foo")
+ self.assert_events(["del:foo"])
+
+ def test_clear(self):
+ d = {"foo": "bar"}
+ with self.watcher() as wid:
+ self.watch(wid, d)
+ d.clear()
+ self.assert_events(["clear"])
+
+ def test_dealloc(self):
+ d = {"foo": "bar"}
+ with self.watcher() as wid:
+ self.watch(wid, d)
+ del d
+ self.assert_events(["dealloc"])
+
+ def test_unwatch(self):
+ d = {}
+ with self.watcher() as wid:
+ self.watch(wid, d)
+ d["foo"] = "bar"
+ self.unwatch(wid, d)
+ d["hmm"] = "baz"
+ self.assert_events(["new:foo:bar"])
+
+ def test_error(self):
+ d = {}
+ with self.watcher(kind=self.ERROR) as wid:
+ self.watch(wid, d)
+ with catch_unraisable_exception() as cm:
+ d["foo"] = "bar"
+ self.assertIs(cm.unraisable.object, d)
+ self.assertEqual(str(cm.unraisable.exc_value), "boom!")
+ self.assert_events([])
+
+ def test_two_watchers(self):
+ d1 = {}
+ d2 = {}
+ with self.watcher() as wid1:
+ with self.watcher(kind=self.SECOND) as wid2:
+ self.watch(wid1, d1)
+ self.watch(wid2, d2)
+ d1["foo"] = "bar"
+ d2["hmm"] = "baz"
+ self.assert_events(["new:foo:bar", "second"])
+
+ def test_watch_non_dict(self):
+ with self.watcher() as wid:
+ with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
+ self.watch(wid, 1)
+
+ def test_watch_out_of_range_watcher_id(self):
+ d = {}
+ with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
+ self.watch(-1, d)
+ with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
+ self.watch(8, d) # DICT_MAX_WATCHERS = 8
+
+ def test_watch_unassigned_watcher_id(self):
+ d = {}
+ with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
+ self.watch(1, d)
+
+ def test_unwatch_non_dict(self):
+ with self.watcher() as wid:
+ with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
+ self.unwatch(wid, 1)
+
+ def test_unwatch_out_of_range_watcher_id(self):
+ d = {}
+ with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
+ self.unwatch(-1, d)
+ with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
+ self.unwatch(8, d) # DICT_MAX_WATCHERS = 8
+
+ def test_unwatch_unassigned_watcher_id(self):
+ d = {}
+ with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
+ self.unwatch(1, d)
+
+ def test_clear_out_of_range_watcher_id(self):
+ with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
+ self.clear_watcher(-1)
+ with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
+ self.clear_watcher(8) # DICT_MAX_WATCHERS = 8
+
+ def test_clear_unassigned_watcher_id(self):
+ with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
+ self.clear_watcher(1)
+
+
+class TestTypeWatchers(unittest.TestCase):
+ # types of watchers testcapimodule can add:
+ TYPES = 0 # appends modified types to global event list
+ ERROR = 1 # unconditionally sets and signals a RuntimeException
+ WRAP = 2 # appends modified type wrapped in list to global event list
+
+ # duplicating the C constant
+ TYPE_MAX_WATCHERS = 8
+
+ def add_watcher(self, kind=TYPES):
+ return _testcapi.add_type_watcher(kind)
+
+ def clear_watcher(self, watcher_id):
+ _testcapi.clear_type_watcher(watcher_id)
+
+ @contextmanager
+ def watcher(self, kind=TYPES):
+ wid = self.add_watcher(kind)
+ try:
+ yield wid
+ finally:
+ self.clear_watcher(wid)
+
+ def assert_events(self, expected):
+ actual = _testcapi.get_type_modified_events()
+ self.assertEqual(actual, expected)
+
+ def watch(self, wid, t):
+ _testcapi.watch_type(wid, t)
+
+ def unwatch(self, wid, t):
+ _testcapi.unwatch_type(wid, t)
+
+ def test_watch_type(self):
+ class C: pass
+ with self.watcher() as wid:
+ self.watch(wid, C)
+ C.foo = "bar"
+ self.assert_events([C])
+
+ def test_event_aggregation(self):
+ class C: pass
+ with self.watcher() as wid:
+ self.watch(wid, C)
+ C.foo = "bar"
+ C.bar = "baz"
+ # only one event registered for both modifications
+ self.assert_events([C])
+
+ def test_lookup_resets_aggregation(self):
+ class C: pass
+ with self.watcher() as wid:
+ self.watch(wid, C)
+ C.foo = "bar"
+ # lookup resets type version tag
+ self.assertEqual(C.foo, "bar")
+ C.bar = "baz"
+ # both events registered
+ self.assert_events([C, C])
+
+ def test_unwatch_type(self):
+ class C: pass
+ with self.watcher() as wid:
+ self.watch(wid, C)
+ C.foo = "bar"
+ self.assertEqual(C.foo, "bar")
+ self.assert_events([C])
+ self.unwatch(wid, C)
+ C.bar = "baz"
+ self.assert_events([C])
+
+ def test_clear_watcher(self):
+ class C: pass
+ # outer watcher is unused, it's just to keep events list alive
+ with self.watcher() as _:
+ with self.watcher() as wid:
+ self.watch(wid, C)
+ C.foo = "bar"
+ self.assertEqual(C.foo, "bar")
+ self.assert_events([C])
+ C.bar = "baz"
+ # Watcher on C has been cleared, no new event
+ self.assert_events([C])
+
+ def test_watch_type_subclass(self):
+ class C: pass
+ class D(C): pass
+ with self.watcher() as wid:
+ self.watch(wid, D)
+ C.foo = "bar"
+ self.assert_events([D])
+
+ def test_error(self):
+ class C: pass
+ with self.watcher(kind=self.ERROR) as wid:
+ self.watch(wid, C)
+ with catch_unraisable_exception() as cm:
+ C.foo = "bar"
+ self.assertIs(cm.unraisable.object, C)
+ self.assertEqual(str(cm.unraisable.exc_value), "boom!")
+ self.assert_events([])
+
+ def test_two_watchers(self):
+ class C1: pass
+ class C2: pass
+ with self.watcher() as wid1:
+ with self.watcher(kind=self.WRAP) as wid2:
+ self.assertNotEqual(wid1, wid2)
+ self.watch(wid1, C1)
+ self.watch(wid2, C2)
+ C1.foo = "bar"
+ C2.hmm = "baz"
+ self.assert_events([C1, [C2]])
+
+ def test_watch_non_type(self):
+ with self.watcher() as wid:
+ with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
+ self.watch(wid, 1)
+
+ def test_watch_out_of_range_watcher_id(self):
+ class C: pass
+ with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
+ self.watch(-1, C)
+ with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
+ self.watch(self.TYPE_MAX_WATCHERS, C)
+
+ def test_watch_unassigned_watcher_id(self):
+ class C: pass
+ with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
+ self.watch(1, C)
+
+ def test_unwatch_non_type(self):
+ with self.watcher() as wid:
+ with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
+ self.unwatch(wid, 1)
+
+ def test_unwatch_out_of_range_watcher_id(self):
+ class C: pass
+ with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
+ self.unwatch(-1, C)
+ with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
+ self.unwatch(self.TYPE_MAX_WATCHERS, C)
+
+ def test_unwatch_unassigned_watcher_id(self):
+ class C: pass
+ with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
+ self.unwatch(1, C)
+
+ def test_clear_out_of_range_watcher_id(self):
+ with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
+ self.clear_watcher(-1)
+ with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
+ self.clear_watcher(self.TYPE_MAX_WATCHERS)
+
+ def test_clear_unassigned_watcher_id(self):
+ with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
+ self.clear_watcher(1)
+
+ def test_no_more_ids_available(self):
+ contexts = [self.watcher() for i in range(self.TYPE_MAX_WATCHERS)]
+ with ExitStack() as stack:
+ for ctx in contexts:
+ stack.enter_context(ctx)
+ with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"):
+ self.add_watcher()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Modules/Setup.stdlib.in b/Modules/Setup.stdlib.in
index 26e7ffc..c033dbc 100644
--- a/Modules/Setup.stdlib.in
+++ b/Modules/Setup.stdlib.in
@@ -169,7 +169,7 @@
@MODULE__XXTESTFUZZ_TRUE@_xxtestfuzz _xxtestfuzz/_xxtestfuzz.c _xxtestfuzz/fuzzer.c
@MODULE__TESTBUFFER_TRUE@_testbuffer _testbuffer.c
@MODULE__TESTINTERNALCAPI_TRUE@_testinternalcapi _testinternalcapi.c
-@MODULE__TESTCAPI_TRUE@_testcapi _testcapimodule.c _testcapi/vectorcall.c _testcapi/vectorcall_limited.c _testcapi/heaptype.c _testcapi/unicode.c _testcapi/getargs.c _testcapi/pytime.c _testcapi/datetime.c _testcapi/docstring.c _testcapi/mem.c
+@MODULE__TESTCAPI_TRUE@_testcapi _testcapimodule.c _testcapi/vectorcall.c _testcapi/vectorcall_limited.c _testcapi/heaptype.c _testcapi/unicode.c _testcapi/getargs.c _testcapi/pytime.c _testcapi/datetime.c _testcapi/docstring.c _testcapi/mem.c _testcapi/watchers.c
# Some testing modules MUST be built as shared libraries.
*shared*
diff --git a/Modules/_testcapi/parts.h b/Modules/_testcapi/parts.h
index a39007b..2bc2a0e 100644
--- a/Modules/_testcapi/parts.h
+++ b/Modules/_testcapi/parts.h
@@ -32,6 +32,7 @@ int _PyTestCapi_Init_PyTime(PyObject *module);
int _PyTestCapi_Init_DateTime(PyObject *module);
int _PyTestCapi_Init_Docstring(PyObject *module);
int _PyTestCapi_Init_Mem(PyObject *module);
+int _PyTestCapi_Init_Watchers(PyObject *module);
#ifdef LIMITED_API_AVAILABLE
int _PyTestCapi_Init_VectorcallLimited(PyObject *module);
diff --git a/Modules/_testcapi/watchers.c b/Modules/_testcapi/watchers.c
new file mode 100644
index 0000000..e0d489a
--- /dev/null
+++ b/Modules/_testcapi/watchers.c
@@ -0,0 +1,302 @@
+#include "parts.h"
+
+
+// Test dict watching
+static PyObject *g_dict_watch_events;
+static int g_dict_watchers_installed;
+
+static int
+dict_watch_callback(PyDict_WatchEvent event,
+ PyObject *dict,
+ PyObject *key,
+ PyObject *new_value)
+{
+ PyObject *msg;
+ switch (event) {
+ case PyDict_EVENT_CLEARED:
+ msg = PyUnicode_FromString("clear");
+ break;
+ case PyDict_EVENT_DEALLOCATED:
+ msg = PyUnicode_FromString("dealloc");
+ break;
+ case PyDict_EVENT_CLONED:
+ msg = PyUnicode_FromString("clone");
+ break;
+ case PyDict_EVENT_ADDED:
+ msg = PyUnicode_FromFormat("new:%S:%S", key, new_value);
+ break;
+ case PyDict_EVENT_MODIFIED:
+ msg = PyUnicode_FromFormat("mod:%S:%S", key, new_value);
+ break;
+ case PyDict_EVENT_DELETED:
+ msg = PyUnicode_FromFormat("del:%S", key);
+ break;
+ default:
+ msg = PyUnicode_FromString("unknown");
+ }
+ if (msg == NULL) {
+ return -1;
+ }
+ assert(PyList_Check(g_dict_watch_events));
+ if (PyList_Append(g_dict_watch_events, msg) < 0) {
+ Py_DECREF(msg);
+ return -1;
+ }
+ Py_DECREF(msg);
+ return 0;
+}
+
+static int
+dict_watch_callback_second(PyDict_WatchEvent event,
+ PyObject *dict,
+ PyObject *key,
+ PyObject *new_value)
+{
+ PyObject *msg = PyUnicode_FromString("second");
+ if (msg == NULL) {
+ return -1;
+ }
+ int rc = PyList_Append(g_dict_watch_events, msg);
+ Py_DECREF(msg);
+ if (rc < 0) {
+ return -1;
+ }
+ return 0;
+}
+
+static int
+dict_watch_callback_error(PyDict_WatchEvent event,
+ PyObject *dict,
+ PyObject *key,
+ PyObject *new_value)
+{
+ PyErr_SetString(PyExc_RuntimeError, "boom!");
+ return -1;
+}
+
+static PyObject *
+add_dict_watcher(PyObject *self, PyObject *kind)
+{
+ int watcher_id;
+ assert(PyLong_Check(kind));
+ long kind_l = PyLong_AsLong(kind);
+ if (kind_l == 2) {
+ watcher_id = PyDict_AddWatcher(dict_watch_callback_second);
+ }
+ else if (kind_l == 1) {
+ watcher_id = PyDict_AddWatcher(dict_watch_callback_error);
+ }
+ else {
+ watcher_id = PyDict_AddWatcher(dict_watch_callback);
+ }
+ if (watcher_id < 0) {
+ return NULL;
+ }
+ if (!g_dict_watchers_installed) {
+ assert(!g_dict_watch_events);
+ if (!(g_dict_watch_events = PyList_New(0))) {
+ return NULL;
+ }
+ }
+ g_dict_watchers_installed++;
+ return PyLong_FromLong(watcher_id);
+}
+
+static PyObject *
+clear_dict_watcher(PyObject *self, PyObject *watcher_id)
+{
+ if (PyDict_ClearWatcher(PyLong_AsLong(watcher_id))) {
+ return NULL;
+ }
+ g_dict_watchers_installed--;
+ if (!g_dict_watchers_installed) {
+ assert(g_dict_watch_events);
+ Py_CLEAR(g_dict_watch_events);
+ }
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+watch_dict(PyObject *self, PyObject *args)
+{
+ PyObject *dict;
+ int watcher_id;
+ if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
+ return NULL;
+ }
+ if (PyDict_Watch(watcher_id, dict)) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+unwatch_dict(PyObject *self, PyObject *args)
+{
+ PyObject *dict;
+ int watcher_id;
+ if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
+ return NULL;
+ }
+ if (PyDict_Unwatch(watcher_id, dict)) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+get_dict_watcher_events(PyObject *self, PyObject *Py_UNUSED(args))
+{
+ if (!g_dict_watch_events) {
+ PyErr_SetString(PyExc_RuntimeError, "no watchers active");
+ return NULL;
+ }
+ return Py_NewRef(g_dict_watch_events);
+}
+
+// Test type watchers
+static PyObject *g_type_modified_events;
+static int g_type_watchers_installed;
+
+static int
+type_modified_callback(PyTypeObject *type)
+{
+ assert(PyList_Check(g_type_modified_events));
+ if(PyList_Append(g_type_modified_events, (PyObject *)type) < 0) {
+ return -1;
+ }
+ return 0;
+}
+
+static int
+type_modified_callback_wrap(PyTypeObject *type)
+{
+ assert(PyList_Check(g_type_modified_events));
+ PyObject *list = PyList_New(0);
+ if (list == NULL) {
+ return -1;
+ }
+ if (PyList_Append(list, (PyObject *)type) < 0) {
+ Py_DECREF(list);
+ return -1;
+ }
+ if (PyList_Append(g_type_modified_events, list) < 0) {
+ Py_DECREF(list);
+ return -1;
+ }
+ Py_DECREF(list);
+ return 0;
+}
+
+static int
+type_modified_callback_error(PyTypeObject *type)
+{
+ PyErr_SetString(PyExc_RuntimeError, "boom!");
+ return -1;
+}
+
+static PyObject *
+add_type_watcher(PyObject *self, PyObject *kind)
+{
+ int watcher_id;
+ assert(PyLong_Check(kind));
+ long kind_l = PyLong_AsLong(kind);
+ if (kind_l == 2) {
+ watcher_id = PyType_AddWatcher(type_modified_callback_wrap);
+ }
+ else if (kind_l == 1) {
+ watcher_id = PyType_AddWatcher(type_modified_callback_error);
+ }
+ else {
+ watcher_id = PyType_AddWatcher(type_modified_callback);
+ }
+ if (watcher_id < 0) {
+ return NULL;
+ }
+ if (!g_type_watchers_installed) {
+ assert(!g_type_modified_events);
+ if (!(g_type_modified_events = PyList_New(0))) {
+ return NULL;
+ }
+ }
+ g_type_watchers_installed++;
+ return PyLong_FromLong(watcher_id);
+}
+
+static PyObject *
+clear_type_watcher(PyObject *self, PyObject *watcher_id)
+{
+ if (PyType_ClearWatcher(PyLong_AsLong(watcher_id))) {
+ return NULL;
+ }
+ g_type_watchers_installed--;
+ if (!g_type_watchers_installed) {
+ assert(g_type_modified_events);
+ Py_CLEAR(g_type_modified_events);
+ }
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+get_type_modified_events(PyObject *self, PyObject *Py_UNUSED(args))
+{
+ if (!g_type_modified_events) {
+ PyErr_SetString(PyExc_RuntimeError, "no watchers active");
+ return NULL;
+ }
+ return Py_NewRef(g_type_modified_events);
+}
+
+static PyObject *
+watch_type(PyObject *self, PyObject *args)
+{
+ PyObject *type;
+ int watcher_id;
+ if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
+ return NULL;
+ }
+ if (PyType_Watch(watcher_id, type)) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+unwatch_type(PyObject *self, PyObject *args)
+{
+ PyObject *type;
+ int watcher_id;
+ if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
+ return NULL;
+ }
+ if (PyType_Unwatch(watcher_id, type)) {
+ return NULL;
+ }
+ Py_RETURN_NONE;
+}
+
+static PyMethodDef test_methods[] = {
+ // Dict watchers.
+ {"add_dict_watcher", add_dict_watcher, METH_O, NULL},
+ {"clear_dict_watcher", clear_dict_watcher, METH_O, NULL},
+ {"watch_dict", watch_dict, METH_VARARGS, NULL},
+ {"unwatch_dict", unwatch_dict, METH_VARARGS, NULL},
+ {"get_dict_watcher_events", get_dict_watcher_events, METH_NOARGS, NULL},
+
+ // Type watchers.
+ {"add_type_watcher", add_type_watcher, METH_O, NULL},
+ {"clear_type_watcher", clear_type_watcher, METH_O, NULL},
+ {"watch_type", watch_type, METH_VARARGS, NULL},
+ {"unwatch_type", unwatch_type, METH_VARARGS, NULL},
+ {"get_type_modified_events", get_type_modified_events, METH_NOARGS, NULL},
+ {NULL},
+};
+
+int
+_PyTestCapi_Init_Watchers(PyObject *mod)
+{
+ if (PyModule_AddFunctions(mod, test_methods) < 0) {
+ return -1;
+ }
+ return 0;
+}
diff --git a/Modules/_testcapimodule.c b/Modules/_testcapimodule.c
index 2c21782..01b3923 100644
--- a/Modules/_testcapimodule.c
+++ b/Modules/_testcapimodule.c
@@ -3390,159 +3390,6 @@ test_tstate_capi(PyObject *self, PyObject *Py_UNUSED(args))
Py_RETURN_NONE;
}
-
-// Test dict watching
-static PyObject *g_dict_watch_events;
-static int g_dict_watchers_installed;
-
-static int
-dict_watch_callback(PyDict_WatchEvent event,
- PyObject *dict,
- PyObject *key,
- PyObject *new_value)
-{
- PyObject *msg;
- switch(event) {
- case PyDict_EVENT_CLEARED:
- msg = PyUnicode_FromString("clear");
- break;
- case PyDict_EVENT_DEALLOCATED:
- msg = PyUnicode_FromString("dealloc");
- break;
- case PyDict_EVENT_CLONED:
- msg = PyUnicode_FromString("clone");
- break;
- case PyDict_EVENT_ADDED:
- msg = PyUnicode_FromFormat("new:%S:%S", key, new_value);
- break;
- case PyDict_EVENT_MODIFIED:
- msg = PyUnicode_FromFormat("mod:%S:%S", key, new_value);
- break;
- case PyDict_EVENT_DELETED:
- msg = PyUnicode_FromFormat("del:%S", key);
- break;
- default:
- msg = PyUnicode_FromString("unknown");
- }
- if (!msg) {
- return -1;
- }
- assert(PyList_Check(g_dict_watch_events));
- if (PyList_Append(g_dict_watch_events, msg) < 0) {
- Py_DECREF(msg);
- return -1;
- }
- Py_DECREF(msg);
- return 0;
-}
-
-static int
-dict_watch_callback_second(PyDict_WatchEvent event,
- PyObject *dict,
- PyObject *key,
- PyObject *new_value)
-{
- PyObject *msg = PyUnicode_FromString("second");
- if (!msg) {
- return -1;
- }
- if (PyList_Append(g_dict_watch_events, msg) < 0) {
- Py_DECREF(msg);
- return -1;
- }
- Py_DECREF(msg);
- return 0;
-}
-
-static int
-dict_watch_callback_error(PyDict_WatchEvent event,
- PyObject *dict,
- PyObject *key,
- PyObject *new_value)
-{
- PyErr_SetString(PyExc_RuntimeError, "boom!");
- return -1;
-}
-
-static PyObject *
-add_dict_watcher(PyObject *self, PyObject *kind)
-{
- int watcher_id;
- assert(PyLong_Check(kind));
- long kind_l = PyLong_AsLong(kind);
- if (kind_l == 2) {
- watcher_id = PyDict_AddWatcher(dict_watch_callback_second);
- } else if (kind_l == 1) {
- watcher_id = PyDict_AddWatcher(dict_watch_callback_error);
- } else {
- watcher_id = PyDict_AddWatcher(dict_watch_callback);
- }
- if (watcher_id < 0) {
- return NULL;
- }
- if (!g_dict_watchers_installed) {
- assert(!g_dict_watch_events);
- if (!(g_dict_watch_events = PyList_New(0))) {
- return NULL;
- }
- }
- g_dict_watchers_installed++;
- return PyLong_FromLong(watcher_id);
-}
-
-static PyObject *
-clear_dict_watcher(PyObject *self, PyObject *watcher_id)
-{
- if (PyDict_ClearWatcher(PyLong_AsLong(watcher_id))) {
- return NULL;
- }
- g_dict_watchers_installed--;
- if (!g_dict_watchers_installed) {
- assert(g_dict_watch_events);
- Py_CLEAR(g_dict_watch_events);
- }
- Py_RETURN_NONE;
-}
-
-static PyObject *
-watch_dict(PyObject *self, PyObject *args)
-{
- PyObject *dict;
- int watcher_id;
- if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
- return NULL;
- }
- if (PyDict_Watch(watcher_id, dict)) {
- return NULL;
- }
- Py_RETURN_NONE;
-}
-
-static PyObject *
-unwatch_dict(PyObject *self, PyObject *args)
-{
- PyObject *dict;
- int watcher_id;
- if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
- return NULL;
- }
- if (PyDict_Unwatch(watcher_id, dict)) {
- return NULL;
- }
- Py_RETURN_NONE;
-}
-
-static PyObject *
-get_dict_watcher_events(PyObject *self, PyObject *Py_UNUSED(args))
-{
- if (!g_dict_watch_events) {
- PyErr_SetString(PyExc_RuntimeError, "no watchers active");
- return NULL;
- }
- return Py_NewRef(g_dict_watch_events);
-}
-
-
// Test PyFloat_Pack2(), PyFloat_Pack4() and PyFloat_Pack8()
static PyObject *
test_float_pack(PyObject *self, PyObject *args)
@@ -3988,128 +3835,6 @@ function_set_kw_defaults(PyObject *self, PyObject *args)
Py_RETURN_NONE;
}
-
-// type watchers
-
-static PyObject *g_type_modified_events;
-static int g_type_watchers_installed;
-
-static int
-type_modified_callback(PyTypeObject *type)
-{
- assert(PyList_Check(g_type_modified_events));
- if(PyList_Append(g_type_modified_events, (PyObject *)type) < 0) {
- return -1;
- }
- return 0;
-}
-
-static int
-type_modified_callback_wrap(PyTypeObject *type)
-{
- assert(PyList_Check(g_type_modified_events));
- PyObject *list = PyList_New(0);
- if (!list) {
- return -1;
- }
- if (PyList_Append(list, (PyObject *)type) < 0) {
- Py_DECREF(list);
- return -1;
- }
- if (PyList_Append(g_type_modified_events, list) < 0) {
- Py_DECREF(list);
- return -1;
- }
- Py_DECREF(list);
- return 0;
-}
-
-static int
-type_modified_callback_error(PyTypeObject *type)
-{
- PyErr_SetString(PyExc_RuntimeError, "boom!");
- return -1;
-}
-
-static PyObject *
-add_type_watcher(PyObject *self, PyObject *kind)
-{
- int watcher_id;
- assert(PyLong_Check(kind));
- long kind_l = PyLong_AsLong(kind);
- if (kind_l == 2) {
- watcher_id = PyType_AddWatcher(type_modified_callback_wrap);
- } else if (kind_l == 1) {
- watcher_id = PyType_AddWatcher(type_modified_callback_error);
- } else {
- watcher_id = PyType_AddWatcher(type_modified_callback);
- }
- if (watcher_id < 0) {
- return NULL;
- }
- if (!g_type_watchers_installed) {
- assert(!g_type_modified_events);
- if (!(g_type_modified_events = PyList_New(0))) {
- return NULL;
- }
- }
- g_type_watchers_installed++;
- return PyLong_FromLong(watcher_id);
-}
-
-static PyObject *
-clear_type_watcher(PyObject *self, PyObject *watcher_id)
-{
- if (PyType_ClearWatcher(PyLong_AsLong(watcher_id))) {
- return NULL;
- }
- g_type_watchers_installed--;
- if (!g_type_watchers_installed) {
- assert(g_type_modified_events);
- Py_CLEAR(g_type_modified_events);
- }
- Py_RETURN_NONE;
-}
-
-static PyObject *
-get_type_modified_events(PyObject *self, PyObject *Py_UNUSED(args))
-{
- if (!g_type_modified_events) {
- PyErr_SetString(PyExc_RuntimeError, "no watchers active");
- return NULL;
- }
- return Py_NewRef(g_type_modified_events);
-}
-
-static PyObject *
-watch_type(PyObject *self, PyObject *args)
-{
- PyObject *type;
- int watcher_id;
- if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
- return NULL;
- }
- if (PyType_Watch(watcher_id, type)) {
- return NULL;
- }
- Py_RETURN_NONE;
-}
-
-static PyObject *
-unwatch_type(PyObject *self, PyObject *args)
-{
- PyObject *type;
- int watcher_id;
- if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
- return NULL;
- }
- if (PyType_Unwatch(watcher_id, type)) {
- return NULL;
- }
- Py_RETURN_NONE;
-}
-
-
static PyObject *test_buildvalue_issue38913(PyObject *, PyObject *);
static PyMethodDef TestMethods[] = {
@@ -4259,11 +3984,6 @@ static PyMethodDef TestMethods[] = {
{"settrace_to_record", settrace_to_record, METH_O, NULL},
{"test_macros", test_macros, METH_NOARGS, NULL},
{"clear_managed_dict", clear_managed_dict, METH_O, NULL},
- {"add_dict_watcher", add_dict_watcher, METH_O, NULL},
- {"clear_dict_watcher", clear_dict_watcher, METH_O, NULL},
- {"watch_dict", watch_dict, METH_VARARGS, NULL},
- {"unwatch_dict", unwatch_dict, METH_VARARGS, NULL},
- {"get_dict_watcher_events", get_dict_watcher_events, METH_NOARGS, NULL},
{"function_get_code", function_get_code, METH_O, NULL},
{"function_get_globals", function_get_globals, METH_O, NULL},
{"function_get_module", function_get_module, METH_O, NULL},
@@ -4271,11 +3991,6 @@ static PyMethodDef TestMethods[] = {
{"function_set_defaults", function_set_defaults, METH_VARARGS, NULL},
{"function_get_kw_defaults", function_get_kw_defaults, METH_O, NULL},
{"function_set_kw_defaults", function_set_kw_defaults, METH_VARARGS, NULL},
- {"add_type_watcher", add_type_watcher, METH_O, NULL},
- {"clear_type_watcher", clear_type_watcher, METH_O, NULL},
- {"watch_type", watch_type, METH_VARARGS, NULL},
- {"unwatch_type", unwatch_type, METH_VARARGS, NULL},
- {"get_type_modified_events", get_type_modified_events, METH_NOARGS, NULL},
{NULL, NULL} /* sentinel */
};
@@ -5096,6 +4811,9 @@ PyInit__testcapi(void)
if (_PyTestCapi_Init_Mem(m) < 0) {
return NULL;
}
+ if (_PyTestCapi_Init_Watchers(m) < 0) {
+ return NULL;
+ }
#ifndef LIMITED_API_AVAILABLE
PyModule_AddObjectRef(m, "LIMITED_API_AVAILABLE", Py_False);
diff --git a/PCbuild/_testcapi.vcxproj b/PCbuild/_testcapi.vcxproj
index 0151d85..42e7d30 100644
--- a/PCbuild/_testcapi.vcxproj
+++ b/PCbuild/_testcapi.vcxproj
@@ -103,6 +103,7 @@
<ClCompile Include="..\Modules\_testcapi\datetime.c" />
<ClCompile Include="..\Modules\_testcapi\docstring.c" />
<ClCompile Include="..\Modules\_testcapi\mem.c" />
+ <ClCompile Include="..\Modules\_testcapi\watchers.c" />
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="..\PC\python_nt.rc" />
diff --git a/PCbuild/_testcapi.vcxproj.filters b/PCbuild/_testcapi.vcxproj.filters
index c30c41b..75652c3 100644
--- a/PCbuild/_testcapi.vcxproj.filters
+++ b/PCbuild/_testcapi.vcxproj.filters
@@ -39,6 +39,9 @@
<ClCompile Include="..\Modules\_testcapi\mem.c">
<Filter>Source Files</Filter>
</ClCompile>
+ <ClCompile Include="..\Modules\_testcapi\watchers.c">
+ <Filter>Source Files</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="..\PC\python_nt.rc">