summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_audit.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_audit.py')
-rw-r--r--Lib/test/test_audit.py260
1 files changed, 260 insertions, 0 deletions
diff --git a/Lib/test/test_audit.py b/Lib/test/test_audit.py
new file mode 100644
index 0000000..5b33d97
--- /dev/null
+++ b/Lib/test/test_audit.py
@@ -0,0 +1,260 @@
+"""Tests for sys.audit and sys.addaudithook
+"""
+
+import os
+import subprocess
+import sys
+import unittest
+from test import support
+
+if not hasattr(sys, "addaudithook") or not hasattr(sys, "audit"):
+ raise unittest.SkipTest("test only relevant when sys.audit is available")
+
+
+class TestHook:
+ """Used in standard hook tests to collect any logged events.
+
+ Should be used in a with block to ensure that it has no impact
+ after the test completes. Audit hooks cannot be removed, so the
+ best we can do for the test run is disable it by calling close().
+ """
+
+ def __init__(self, raise_on_events=None, exc_type=RuntimeError):
+ self.raise_on_events = raise_on_events or ()
+ self.exc_type = exc_type
+ self.seen = []
+ self.closed = False
+
+ def __enter__(self, *a):
+ sys.addaudithook(self)
+ return self
+
+ def __exit__(self, *a):
+ self.close()
+
+ def close(self):
+ self.closed = True
+
+ @property
+ def seen_events(self):
+ return [i[0] for i in self.seen]
+
+ def __call__(self, event, args):
+ if self.closed:
+ return
+ self.seen.append((event, args))
+ if event in self.raise_on_events:
+ raise self.exc_type("saw event " + event)
+
+
+class TestFinalizeHook:
+ """Used in the test_finalize_hooks function to ensure that hooks
+ are correctly cleaned up, that they are notified about the cleanup,
+ and are unable to prevent it.
+ """
+
+ def __init__(self):
+ print("Created", id(self), file=sys.stderr, flush=True)
+
+ def __call__(self, event, args):
+ # Avoid recursion when we call id() below
+ if event == "builtins.id":
+ return
+
+ print(event, id(self), file=sys.stderr, flush=True)
+
+ if event == "cpython._PySys_ClearAuditHooks":
+ raise RuntimeError("Should be ignored")
+ elif event == "cpython.PyInterpreterState_Clear":
+ raise RuntimeError("Should be ignored")
+
+
+def run_finalize_test():
+ """Called by test_finalize_hooks in a subprocess."""
+ sys.addaudithook(TestFinalizeHook())
+
+
+class AuditTest(unittest.TestCase):
+ def test_basic(self):
+ with TestHook() as hook:
+ sys.audit("test_event", 1, 2, 3)
+ self.assertEqual(hook.seen[0][0], "test_event")
+ self.assertEqual(hook.seen[0][1], (1, 2, 3))
+
+ def test_block_add_hook(self):
+ # Raising an exception should prevent a new hook from being added,
+ # but will not propagate out.
+ with TestHook(raise_on_events="sys.addaudithook") as hook1:
+ with TestHook() as hook2:
+ sys.audit("test_event")
+ self.assertIn("test_event", hook1.seen_events)
+ self.assertNotIn("test_event", hook2.seen_events)
+
+ def test_block_add_hook_baseexception(self):
+ # Raising BaseException will propagate out when adding a hook
+ with self.assertRaises(BaseException):
+ with TestHook(
+ raise_on_events="sys.addaudithook", exc_type=BaseException
+ ) as hook1:
+ # Adding this next hook should raise BaseException
+ with TestHook() as hook2:
+ pass
+
+ def test_finalize_hooks(self):
+ events = []
+ with subprocess.Popen(
+ [
+ sys.executable,
+ "-c",
+ "import test.test_audit; test.test_audit.run_finalize_test()",
+ ],
+ encoding="utf-8",
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ ) as p:
+ p.wait()
+ for line in p.stderr:
+ events.append(line.strip().partition(" "))
+ firstId = events[0][2]
+ self.assertSequenceEqual(
+ [
+ ("Created", " ", firstId),
+ ("cpython._PySys_ClearAuditHooks", " ", firstId),
+ ],
+ events,
+ )
+
+ def test_pickle(self):
+ pickle = support.import_module("pickle")
+
+ class PicklePrint:
+ def __reduce_ex__(self, p):
+ return str, ("Pwned!",)
+
+ payload_1 = pickle.dumps(PicklePrint())
+ payload_2 = pickle.dumps(("a", "b", "c", 1, 2, 3))
+
+ # Before we add the hook, ensure our malicious pickle loads
+ self.assertEqual("Pwned!", pickle.loads(payload_1))
+
+ with TestHook(raise_on_events="pickle.find_class") as hook:
+ with self.assertRaises(RuntimeError):
+ # With the hook enabled, loading globals is not allowed
+ pickle.loads(payload_1)
+ # pickles with no globals are okay
+ pickle.loads(payload_2)
+
+ def test_monkeypatch(self):
+ class A:
+ pass
+
+ class B:
+ pass
+
+ class C(A):
+ pass
+
+ a = A()
+
+ with TestHook() as hook:
+ # Catch name changes
+ C.__name__ = "X"
+ # Catch type changes
+ C.__bases__ = (B,)
+ # Ensure bypassing __setattr__ is still caught
+ type.__dict__["__bases__"].__set__(C, (B,))
+ # Catch attribute replacement
+ C.__init__ = B.__init__
+ # Catch attribute addition
+ C.new_attr = 123
+ # Catch class changes
+ a.__class__ = B
+
+ actual = [(a[0], a[1]) for e, a in hook.seen if e == "object.__setattr__"]
+ self.assertSequenceEqual(
+ [(C, "__name__"), (C, "__bases__"), (C, "__bases__"), (a, "__class__")],
+ actual,
+ )
+
+ def test_open(self):
+ # SSLContext.load_dh_params uses _Py_fopen_obj rather than normal open()
+ try:
+ import ssl
+
+ load_dh_params = ssl.create_default_context().load_dh_params
+ except ImportError:
+ load_dh_params = None
+
+ # Try a range of "open" functions.
+ # All of them should fail
+ with TestHook(raise_on_events={"open"}) as hook:
+ for fn, *args in [
+ (open, support.TESTFN, "r"),
+ (open, sys.executable, "rb"),
+ (open, 3, "wb"),
+ (open, support.TESTFN, "w", -1, None, None, None, False, lambda *a: 1),
+ (load_dh_params, support.TESTFN),
+ ]:
+ if not fn:
+ continue
+ self.assertRaises(RuntimeError, fn, *args)
+
+ actual_mode = [(a[0], a[1]) for e, a in hook.seen if e == "open" and a[1]]
+ actual_flag = [(a[0], a[2]) for e, a in hook.seen if e == "open" and not a[1]]
+ self.assertSequenceEqual(
+ [
+ i
+ for i in [
+ (support.TESTFN, "r"),
+ (sys.executable, "r"),
+ (3, "w"),
+ (support.TESTFN, "w"),
+ (support.TESTFN, "rb") if load_dh_params else None,
+ ]
+ if i is not None
+ ],
+ actual_mode,
+ )
+ self.assertSequenceEqual([], actual_flag)
+
+ def test_cantrace(self):
+ traced = []
+
+ def trace(frame, event, *args):
+ if frame.f_code == TestHook.__call__.__code__:
+ traced.append(event)
+
+ old = sys.settrace(trace)
+ try:
+ with TestHook() as hook:
+ # No traced call
+ eval("1")
+
+ # No traced call
+ hook.__cantrace__ = False
+ eval("2")
+
+ # One traced call
+ hook.__cantrace__ = True
+ eval("3")
+
+ # Two traced calls (writing to private member, eval)
+ hook.__cantrace__ = 1
+ eval("4")
+
+ # One traced call (writing to private member)
+ hook.__cantrace__ = 0
+ finally:
+ sys.settrace(old)
+
+ self.assertSequenceEqual(["call"] * 4, traced)
+
+
+if __name__ == "__main__":
+ if len(sys.argv) >= 2 and sys.argv[1] == "spython_test":
+ # Doesn't matter what we add - it will be blocked
+ sys.addaudithook(None)
+
+ sys.exit(0)
+
+ unittest.main()