import unittest import string import subprocess import sys import sysconfig import os import pathlib from test import support from test.support.script_helper import ( make_script, assert_python_failure, assert_python_ok, ) from test.support.os_helper import temp_dir if not support.has_subprocess_support: raise unittest.SkipTest("test module requires subprocess") if support.check_sanitizer(address=True, memory=True, ub=True): # gh-109580: Skip the test because it does crash randomly if Python is # built with ASAN. raise unittest.SkipTest("test crash randomly on ASAN/MSAN/UBSAN build") def supports_trampoline_profiling(): perf_trampoline = sysconfig.get_config_var("PY_HAVE_PERF_TRAMPOLINE") if not perf_trampoline: return False return int(perf_trampoline) == 1 if not supports_trampoline_profiling(): raise unittest.SkipTest("perf trampoline profiling not supported") class TestPerfTrampoline(unittest.TestCase): def setUp(self): super().setUp() self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map")) def tearDown(self) -> None: super().tearDown() files_to_delete = ( set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files ) for file in files_to_delete: file.unlink() def test_trampoline_works(self): code = """if 1: def foo(): pass def bar(): foo() def baz(): bar() baz() """ with temp_dir() as script_dir: script = make_script(script_dir, "perftest", code) with subprocess.Popen( [sys.executable, "-Xperf", script], text=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, ) as process: stdout, stderr = process.communicate() self.assertEqual(stderr, "") self.assertEqual(stdout, "") perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map") self.assertTrue(perf_file.exists()) perf_file_contents = perf_file.read_text() perf_lines = perf_file_contents.splitlines(); expected_symbols = [f"py::foo:{script}", f"py::bar:{script}", f"py::baz:{script}"] for expected_symbol in expected_symbols: perf_line = next((line for line in perf_lines if expected_symbol in line), None) self.assertIsNotNone(perf_line, f"Could not find {expected_symbol} in perf file") perf_addr = perf_line.split(" ")[0] self.assertFalse(perf_addr.startswith("0x"), "Address should not be prefixed with 0x") self.assertTrue(set(perf_addr).issubset(string.hexdigits), "Address should contain only hex characters") def test_trampoline_works_with_forks(self): code = """if 1: import os, sys def foo_fork(): pass def bar_fork(): foo_fork() def baz_fork(): bar_fork() def foo(): pid = os.fork() if pid == 0: print(os.getpid()) baz_fork() else: _, status = os.waitpid(-1, 0) sys.exit(status) def bar(): foo() def baz(): bar() baz() """ with temp_dir() as script_dir: script = make_script(script_dir, "perftest", code) with subprocess.Popen( [sys.executable, "-Xperf", script], text=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, ) as process: stdout, stderr = process.communicate() self.assertEqual(process.returncode, 0) self.assertEqual(stderr, "") child_pid = int(stdout.strip()) perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map") perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map") self.assertTrue(perf_file.exists()) self.assertTrue(perf_child_file.exists()) perf_file_contents = perf_file.read_text() self.assertIn(f"py::foo:{script}", perf_file_contents) self.assertIn(f"py::bar:{script}", perf_file_contents) self.assertIn(f"py::baz:{script}", perf_file_contents) child_perf_file_contents = perf_child_file.read_text() self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents) self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents) self.assertIn(f"py::baz_fork:{script}", child_perf_file_contents) def test_sys_api(self): code = """if 1: import sys def foo(): pass def spam(): pass def bar(): sys.deactivate_stack_trampoline() foo() sys.activate_stack_trampoline("perf") spam() def baz(): bar() sys.activate_stack_trampoline("perf") baz() """ with temp_dir() as script_dir: script = make_script(script_dir, "perftest", code) with subprocess.Popen( [sys.executable, script], text=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, ) as process: stdout, stderr = process.communicate() self.assertEqual(stderr, "") self.assertEqual(stdout, "") perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map") self.assertTrue(perf_file.exists()) perf_file_contents = perf_file.read_text() self.assertNotIn(f"py::foo:{script}", perf_file_contents) self.assertIn(f"py::spam:{script}", perf_file_contents) self.assertIn(f"py::bar:{script}", perf_file_contents) self.assertIn(f"py::baz:{script}", perf_file_contents) def test_sys_api_with_existing_trampoline(self): code = """if 1: import sys sys.activate_stack_trampoline("perf") sys.activate_stack_trampoline("perf") """ assert_python_ok("-c", code) def test_sys_api_with_invalid_trampoline(self): code = """if 1: import sys sys.activate_stack_trampoline("invalid") """ rc, out, err = assert_python_failure("-c", code) self.assertIn("invalid backend: invalid", err.decode()) def test_sys_api_get_status(self): code = """if 1: import sys sys.activate_stack_trampoline("perf") assert sys.is_stack_trampoline_active() is True sys.deactivate_stack_trampoline() assert sys.is_stack_trampoline_active() is False """ assert_python_ok("-c", code) def is_unwinding_reliable(): cflags = sysconfig.get_config_var("PY_CORE_CFLAGS") if not cflags: return False return "no-omit-frame-pointer" in cflags def perf_command_works(): try: cmd = ["perf", "--help"] stdout = subprocess.check_output(cmd, text=True) except (subprocess.SubprocessError, OSError): return False # perf version does not return a version number on Fedora. Use presence # of "perf.data" in help as indicator that it's perf from Linux tools. if "perf.data" not in stdout: return False # Check that we can run a simple perf run with temp_dir() as script_dir: try: output_file = script_dir + "/perf_output.perf" cmd = ( "perf", "record", "-g", "--call-graph=fp", "-o", output_file, "--", sys.executable, "-c", 'print("hello")', ) stdout = subprocess.check_output( cmd, cwd=script_dir, text=True, stderr=subprocess.STDOUT ) except (subprocess.SubprocessError, OSError): return False if "hello" not in stdout: return False return True def run_perf(cwd, *args, **env_vars): if env_vars: env = os.environ.copy() env.update(env_vars) else: env = None output_file = cwd + "/perf_output.perf" base_cmd = ("perf", "record", "-g", "--call-graph=fp", "-o", output_file, "--") proc = subprocess.run( base_cmd + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, ) if proc.returncode: print(proc.stderr) raise ValueError(f"Perf failed with return code {proc.returncode}") base_cmd = ("perf", "script") proc = subprocess.run( ("perf", "script", "-i", output_file), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, check=True, ) return proc.stdout.decode("utf-8", "replace"), proc.stderr.decode( "utf-8", "replace" ) @unittest.skipUnless(perf_command_works(), "perf command doesn't work") @unittest.skipUnless(is_unwinding_reliable(), "Unwinding is unreliable") class TestPerfProfiler(unittest.TestCase): def setUp(self): super().setUp() self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map")) def tearDown(self) -> None: super().tearDown() files_to_delete = ( set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files ) for file in files_to_delete: file.unlink() def test_python_calls_appear_in_the_stack_if_perf_activated(self): with temp_dir() as script_dir: code = """if 1: def foo(n): x = 0 for i in range(n): x += i def bar(n): foo(n) def baz(n): bar(n) baz(10000000) """ script = make_script(script_dir, "perftest", code) stdout, stderr = run_perf(script_dir, sys.executable, "-Xperf", script) self.assertEqual(stderr, "") self.assertIn(f"py::foo:{script}", stdout) self.assertIn(f"py::bar:{script}", stdout) self.assertIn(f"py::baz:{script}", stdout) def test_python_calls_do_not_appear_in_the_stack_if_perf_activated(self): with temp_dir() as script_dir: code = """if 1: def foo(n): x = 0 for i in range(n): x += i def bar(n): foo(n) def baz(n): bar(n) baz(10000000) """ script = make_script(script_dir, "perftest", code) stdout, stderr = run_perf(script_dir, sys.executable, script) self.assertEqual(stderr, "") self.assertNotIn(f"py::foo:{script}", stdout) self.assertNotIn(f"py::bar:{script}", stdout) self.assertNotIn(f"py::baz:{script}", stdout) def test_pre_fork_compile(self): code = """if 1: import sys import os import sysconfig from _testinternalcapi import ( compile_perf_trampoline_entry, perf_trampoline_set_persist_after_fork, ) def foo_fork(): pass def bar_fork(): foo_fork() def foo(): pass def bar(): foo() def compile_trampolines_for_all_functions(): perf_trampoline_set_persist_after_fork(1) for _, obj in globals().items(): if callable(obj) and hasattr(obj, '__code__'): compile_perf_trampoline_entry(obj.__code__) if __name__ == "__main__": compile_trampolines_for_all_functions() pid = os.fork() if pid == 0: print(os.getpid()) bar_fork() else: bar() """ with temp_dir() as script_dir: script = make_script(script_dir, "perftest", code) with subprocess.Popen( [sys.executable, "-Xperf", script], universal_newlines=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, ) as process: stdout, stderr = process.communicate() self.assertEqual(process.returncode, 0) self.assertNotIn("Error:", stderr) child_pid = int(stdout.strip()) perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map") perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map") self.assertTrue(perf_file.exists()) self.assertTrue(perf_child_file.exists()) perf_file_contents = perf_file.read_text() self.assertIn(f"py::foo:{script}", perf_file_contents) self.assertIn(f"py::bar:{script}", perf_file_contents) self.assertIn(f"py::foo_fork:{script}", perf_file_contents) self.assertIn(f"py::bar_fork:{script}", perf_file_contents) child_perf_file_contents = perf_child_file.read_text() self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents) self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents) # Pre-compiled perf-map entries of a forked process must be # identical in both the parent and child perf-map files. perf_file_lines = perf_file_contents.split("\n") for line in perf_file_lines: if ( f"py::foo_fork:{script}" in line or f"py::bar_fork:{script}" in line ): self.assertIn(line, child_perf_file_contents) if __name__ == "__main__": unittest.main()