diff options
Diffstat (limited to 'Lib/test/test_audit.py')
-rw-r--r-- | Lib/test/test_audit.py | 260 |
1 files changed, 260 insertions, 0 deletions
diff --git a/Lib/test/test_audit.py b/Lib/test/test_audit.py new file mode 100644 index 0000000..5b33d97 --- /dev/null +++ b/Lib/test/test_audit.py @@ -0,0 +1,260 @@ +"""Tests for sys.audit and sys.addaudithook +""" + +import os +import subprocess +import sys +import unittest +from test import support + +if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"): + raise unittest.SkipTest("test only relevant when sys.audit is available") + + +class TestHook: + """Used in standard hook tests to collect any logged events. + + Should be used in a with block to ensure that it has no impact + after the test completes. Audit hooks cannot be removed, so the + best we can do for the test run is disable it by calling close(). + """ + + def __init__(self, raise_on_events=None, exc_type=RuntimeError): + self.raise_on_events = raise_on_events or () + self.exc_type = exc_type + self.seen = [] + self.closed = False + + def __enter__(self, *a): + sys.addaudithook(self) + return self + + def __exit__(self, *a): + self.close() + + def close(self): + self.closed = True + + @property + def seen_events(self): + return [i[0] for i in self.seen] + + def __call__(self, event, args): + if self.closed: + return + self.seen.append((event, args)) + if event in self.raise_on_events: + raise self.exc_type("saw event " + event) + + +class TestFinalizeHook: + """Used in the test_finalize_hooks function to ensure that hooks + are correctly cleaned up, that they are notified about the cleanup, + and are unable to prevent it. + """ + + def __init__(self): + print("Created", id(self), file=sys.stderr, flush=True) + + def __call__(self, event, args): + # Avoid recursion when we call id() below + if event == "builtins.id": + return + + print(event, id(self), file=sys.stderr, flush=True) + + if event == "cpython._PySys_ClearAuditHooks": + raise RuntimeError("Should be ignored") + elif event == "cpython.PyInterpreterState_Clear": + raise RuntimeError("Should be ignored") + + +def run_finalize_test(): + """Called by test_finalize_hooks in a subprocess.""" + sys.addaudithook(TestFinalizeHook()) + + +class AuditTest(unittest.TestCase): + def test_basic(self): + with TestHook() as hook: + sys.audit("test_event", 1, 2, 3) + self.assertEqual(hook.seen[0][0], "test_event") + self.assertEqual(hook.seen[0][1], (1, 2, 3)) + + def test_block_add_hook(self): + # Raising an exception should prevent a new hook from being added, + # but will not propagate out. + with TestHook(raise_on_events="sys.addaudithook") as hook1: + with TestHook() as hook2: + sys.audit("test_event") + self.assertIn("test_event", hook1.seen_events) + self.assertNotIn("test_event", hook2.seen_events) + + def test_block_add_hook_baseexception(self): + # Raising BaseException will propagate out when adding a hook + with self.assertRaises(BaseException): + with TestHook( + raise_on_events="sys.addaudithook", exc_type=BaseException + ) as hook1: + # Adding this next hook should raise BaseException + with TestHook() as hook2: + pass + + def test_finalize_hooks(self): + events = [] + with subprocess.Popen( + [ + sys.executable, + "-c", + "import test.test_audit; test.test_audit.run_finalize_test()", + ], + encoding="utf-8", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) as p: + p.wait() + for line in p.stderr: + events.append(line.strip().partition(" ")) + firstId = events[0][2] + self.assertSequenceEqual( + [ + ("Created", " ", firstId), + ("cpython._PySys_ClearAuditHooks", " ", firstId), + ], + events, + ) + + def test_pickle(self): + pickle = support.import_module("pickle") + + class PicklePrint: + def __reduce_ex__(self, p): + return str, ("Pwned!",) + + payload_1 = pickle.dumps(PicklePrint()) + payload_2 = pickle.dumps(("a", "b", "c", 1, 2, 3)) + + # Before we add the hook, ensure our malicious pickle loads + self.assertEqual("Pwned!", pickle.loads(payload_1)) + + with TestHook(raise_on_events="pickle.find_class") as hook: + with self.assertRaises(RuntimeError): + # With the hook enabled, loading globals is not allowed + pickle.loads(payload_1) + # pickles with no globals are okay + pickle.loads(payload_2) + + def test_monkeypatch(self): + class A: + pass + + class B: + pass + + class C(A): + pass + + a = A() + + with TestHook() as hook: + # Catch name changes + C.__name__ = "X" + # Catch type changes + C.__bases__ = (B,) + # Ensure bypassing __setattr__ is still caught + type.__dict__["__bases__"].__set__(C, (B,)) + # Catch attribute replacement + C.__init__ = B.__init__ + # Catch attribute addition + C.new_attr = 123 + # Catch class changes + a.__class__ = B + + actual = [(a[0], a[1]) for e, a in hook.seen if e == "object.__setattr__"] + self.assertSequenceEqual( + [(C, "__name__"), (C, "__bases__"), (C, "__bases__"), (a, "__class__")], + actual, + ) + + def test_open(self): + # SSLContext.load_dh_params uses _Py_fopen_obj rather than normal open() + try: + import ssl + + load_dh_params = ssl.create_default_context().load_dh_params + except ImportError: + load_dh_params = None + + # Try a range of "open" functions. + # All of them should fail + with TestHook(raise_on_events={"open"}) as hook: + for fn, *args in [ + (open, support.TESTFN, "r"), + (open, sys.executable, "rb"), + (open, 3, "wb"), + (open, support.TESTFN, "w", -1, None, None, None, False, lambda *a: 1), + (load_dh_params, support.TESTFN), + ]: + if not fn: + continue + self.assertRaises(RuntimeError, fn, *args) + + actual_mode = [(a[0], a[1]) for e, a in hook.seen if e == "open" and a[1]] + actual_flag = [(a[0], a[2]) for e, a in hook.seen if e == "open" and not a[1]] + self.assertSequenceEqual( + [ + i + for i in [ + (support.TESTFN, "r"), + (sys.executable, "r"), + (3, "w"), + (support.TESTFN, "w"), + (support.TESTFN, "rb") if load_dh_params else None, + ] + if i is not None + ], + actual_mode, + ) + self.assertSequenceEqual([], actual_flag) + + def test_cantrace(self): + traced = [] + + def trace(frame, event, *args): + if frame.f_code == TestHook.__call__.__code__: + traced.append(event) + + old = sys.settrace(trace) + try: + with TestHook() as hook: + # No traced call + eval("1") + + # No traced call + hook.__cantrace__ = False + eval("2") + + # One traced call + hook.__cantrace__ = True + eval("3") + + # Two traced calls (writing to private member, eval) + hook.__cantrace__ = 1 + eval("4") + + # One traced call (writing to private member) + hook.__cantrace__ = 0 + finally: + sys.settrace(old) + + self.assertSequenceEqual(["call"] * 4, traced) + + +if __name__ == "__main__": + if len(sys.argv) >= 2 and sys.argv[1] == "spython_test": + # Doesn't matter what we add - it will be blocked + sys.addaudithook(None) + + sys.exit(0) + + unittest.main() |