"""Tests for --subprocesses subprocess profiling support.""" import argparse import io import os import signal import subprocess import sys import tempfile import threading import time import unittest from test.support import ( SHORT_TIMEOUT, reap_children, requires_remote_subprocess_debugging, ) from .helpers import _cleanup_process # String to check for in stderr when profiler lacks permissions (e.g., macOS) _PERMISSION_ERROR_MSG = "Permission Error" def _readline_with_timeout(file_obj, timeout): # Thread-based readline with timeout - works across all platforms # including Windows where select() doesn't work with pipes. # Returns the line read, or None if timeout occurred. result = [None] exception = [None] def reader(): try: result[0] = file_obj.readline() except Exception as e: exception[0] = e thread = threading.Thread(target=reader, daemon=True) thread.start() thread.join(timeout=timeout) if thread.is_alive(): return None if exception[0] is not None: raise exception[0] return result[0] def _wait_for_process_ready(proc, timeout): # Wait for a subprocess to be ready using polling instead of fixed sleep. # Returns True if process is ready, False if it exited or timeout. deadline = time.time() + timeout poll_interval = 0.01 while time.time() < deadline: if proc.poll() is not None: return False try: if sys.platform == "linux": if os.path.exists(f"/proc/{proc.pid}/exe"): return True else: return True except OSError: pass time.sleep(poll_interval) poll_interval = min(poll_interval * 2, 0.1) return proc.poll() is None @requires_remote_subprocess_debugging() class TestGetChildPids(unittest.TestCase): """Tests for the get_child_pids function.""" def setUp(self): reap_children() def tearDown(self): reap_children() def test_get_child_pids_from_remote_debugging(self): """Test get_child_pids from _remote_debugging module.""" try: import _remote_debugging # Test that the function exists self.assertTrue(hasattr(_remote_debugging, "get_child_pids")) # Test with current process (should return empty or have children if any) result = _remote_debugging.get_child_pids(os.getpid()) self.assertIsInstance(result, list) except (ImportError, AttributeError): self.skipTest("_remote_debugging.get_child_pids not available") def test_get_child_pids_fallback(self): """Test the fallback implementation for get_child_pids.""" from profiling.sampling._child_monitor import get_child_pids # Test with current process result = get_child_pids(os.getpid()) self.assertIsInstance(result, list) @unittest.skipUnless(sys.platform == "linux", "Linux only") def test_discover_child_process_linux(self): """Test that we can discover child processes on Linux.""" from profiling.sampling._child_monitor import get_child_pids # Create a child process proc = subprocess.Popen( [sys.executable, "-c", "import time; time.sleep(10)"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) try: # Poll until child appears deadline = time.time() + SHORT_TIMEOUT children = [] while time.time() < deadline: children = get_child_pids(os.getpid()) if proc.pid in children: break time.sleep(0.05) self.assertIn( proc.pid, children, f"Child PID {proc.pid} not discovered within {SHORT_TIMEOUT}s. " f"Found PIDs: {children}", ) finally: _cleanup_process(proc) def test_recursive_child_discovery(self): """Test that recursive=True finds grandchildren.""" from profiling.sampling._child_monitor import get_child_pids # Create a child that spawns a grandchild and keeps a reference to it # so we can clean it up via the child process code = """ import subprocess import sys import threading grandchild = subprocess.Popen([sys.executable, '-c', 'import time; time.sleep(60)']) print(grandchild.pid, flush=True) # Wait for parent to send signal byte (cross-platform) # Using threading with timeout so test doesn't hang if something goes wrong # Timeout is 60s (2x test timeout) to ensure child outlives test in worst case def wait_for_signal(): try: sys.stdin.buffer.read(1) except: pass t = threading.Thread(target=wait_for_signal, daemon=True) t.start() t.join(timeout=60) # Clean up grandchild before exiting if grandchild.poll() is None: grandchild.terminate() try: grandchild.wait(timeout=2) except subprocess.TimeoutExpired: grandchild.kill() try: grandchild.wait(timeout=2) except subprocess.TimeoutExpired: grandchild.wait() """ proc = subprocess.Popen( [sys.executable, "-c", code], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) grandchild_pid = None try: # Read grandchild PID with thread-based timeout # This prevents indefinite blocking on all platforms grandchild_pid_line = _readline_with_timeout( proc.stdout, SHORT_TIMEOUT ) if grandchild_pid_line is None: self.fail( f"Timeout waiting for grandchild PID from child process " f"(child PID: {proc.pid})" ) if not grandchild_pid_line: self.fail( f"Child process {proc.pid} closed stdout without printing " f"grandchild PID" ) grandchild_pid = int(grandchild_pid_line.strip()) # Poll until grandchild is visible deadline = time.time() + SHORT_TIMEOUT pids_recursive = [] while time.time() < deadline: pids_recursive = get_child_pids(os.getpid(), recursive=True) if grandchild_pid in pids_recursive: break time.sleep(0.05) self.assertIn( proc.pid, pids_recursive, f"Child PID {proc.pid} not found in recursive discovery. " f"Found: {pids_recursive}", ) self.assertIn( grandchild_pid, pids_recursive, f"Grandchild PID {grandchild_pid} not found in recursive discovery. " f"Found: {pids_recursive}", ) # Non-recursive should find only direct child pids_direct = get_child_pids(os.getpid(), recursive=False) self.assertIn( proc.pid, pids_direct, f"Child PID {proc.pid} not found in non-recursive discovery. " f"Found: {pids_direct}", ) self.assertNotIn( grandchild_pid, pids_direct, f"Grandchild PID {grandchild_pid} should NOT be in non-recursive " f"discovery. Found: {pids_direct}", ) finally: # Send signal byte to child to trigger cleanup, then close stdin try: proc.stdin.write(b"x") proc.stdin.flush() proc.stdin.close() except OSError: pass proc.stdout.close() _cleanup_process(proc) # The grandchild may not have been cleaned up by the child process # (e.g., if the child was killed). Explicitly terminate the # grandchild to prevent PermissionError on Windows when removing # temp directories. if grandchild_pid is not None: try: os.kill(grandchild_pid, signal.SIGTERM) except (OSError, ProcessLookupError): pass # Process already exited def test_nonexistent_pid_returns_empty(self): """Test that nonexistent PID returns empty list.""" from profiling.sampling._child_monitor import get_child_pids # Use a very high PID that's unlikely to exist result = get_child_pids(999999999) self.assertEqual(result, []) @requires_remote_subprocess_debugging() class TestChildProcessMonitor(unittest.TestCase): """Tests for the ChildProcessMonitor class.""" def setUp(self): reap_children() def tearDown(self): reap_children() def test_monitor_creation(self): """Test that ChildProcessMonitor can be created.""" from profiling.sampling._child_monitor import ChildProcessMonitor monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=["-i", "100", "-d", "5"], output_pattern="test_{pid}.pstats", ) self.assertEqual(monitor.parent_pid, os.getpid()) self.assertEqual(monitor.cli_args, ["-i", "100", "-d", "5"]) self.assertEqual(monitor.output_pattern, "test_{pid}.pstats") def test_monitor_lifecycle(self): """Test monitor lifecycle via context manager.""" from profiling.sampling._child_monitor import ChildProcessMonitor monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) # Before entering context, thread should not exist self.assertIsNone(monitor._monitor_thread) with monitor: # Inside context, thread should be running self.assertIsNotNone(monitor._monitor_thread) self.assertTrue(monitor._monitor_thread.is_alive()) # After exiting context, thread should be stopped self.assertFalse(monitor._monitor_thread.is_alive()) def test_spawned_profilers_property(self): """Test that spawned_profilers returns a copy of the list.""" from profiling.sampling._child_monitor import ChildProcessMonitor monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) # Should return empty list initially profilers = monitor.spawned_profilers self.assertEqual(profilers, []) self.assertIsNot(profilers, monitor._spawned_profilers) def test_context_manager(self): """Test that ChildProcessMonitor works as a context manager.""" from profiling.sampling._child_monitor import ChildProcessMonitor with ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) as monitor: self.assertIsNotNone(monitor._monitor_thread) self.assertTrue(monitor._monitor_thread.is_alive()) # After exiting context, thread should be stopped self.assertFalse(monitor._monitor_thread.is_alive()) @requires_remote_subprocess_debugging() class TestCLIChildrenFlag(unittest.TestCase): """Tests for the --subprocesses CLI flag.""" def setUp(self): reap_children() def tearDown(self): reap_children() def test_subprocesses_flag_parsed(self): """Test that --subprocesses flag is recognized.""" from profiling.sampling.cli import _add_sampling_options parser = argparse.ArgumentParser() _add_sampling_options(parser) # Parse with --subprocesses args = parser.parse_args(["--subprocesses"]) self.assertTrue(args.subprocesses) # Parse without --subprocesses args = parser.parse_args([]) self.assertFalse(args.subprocesses) def test_subprocesses_incompatible_with_live(self): """Test that --subprocesses is incompatible with --live.""" from profiling.sampling.cli import _validate_args # Create mock args with both subprocesses and live args = argparse.Namespace( subprocesses=True, live=True, async_aware=False, format="pstats", mode="wall", sort=None, limit=None, no_summary=False, opcodes=False, ) parser = argparse.ArgumentParser() with self.assertRaises(SystemExit): _validate_args(args, parser) def test_build_child_profiler_args(self): """Test building CLI args for child profilers.""" from profiling.sampling.cli import _build_child_profiler_args args = argparse.Namespace( interval=200, duration=15, all_threads=True, realtime_stats=False, native=True, gc=True, opcodes=False, async_aware=False, mode="cpu", format="flamegraph", ) child_args = _build_child_profiler_args(args) # Verify flag-value pairs are correctly paired (flag followed by value) def assert_flag_value_pair(flag, value): self.assertIn( flag, child_args, f"Flag '{flag}' not found in args: {child_args}", ) flag_index = child_args.index(flag) self.assertGreater( len(child_args), flag_index + 1, f"No value after flag '{flag}' in args: {child_args}", ) self.assertEqual( child_args[flag_index + 1], str(value), f"Flag '{flag}' should be followed by '{value}', got " f"'{child_args[flag_index + 1]}' in args: {child_args}", ) assert_flag_value_pair("-i", 200) assert_flag_value_pair("-d", 15) assert_flag_value_pair("--mode", "cpu") # Verify standalone flags are present self.assertIn( "-a", child_args, f"Flag '-a' not found in args: {child_args}" ) self.assertIn( "--native", child_args, f"Flag '--native' not found in args: {child_args}", ) self.assertIn( "--flamegraph", child_args, f"Flag '--flamegraph' not found in args: {child_args}", ) def test_build_child_profiler_args_no_gc(self): """Test building CLI args with --no-gc.""" from profiling.sampling.cli import _build_child_profiler_args args = argparse.Namespace( interval=100, duration=5, all_threads=False, realtime_stats=False, native=False, gc=False, # Explicitly disabled opcodes=False, async_aware=False, mode="wall", format="pstats", ) child_args = _build_child_profiler_args(args) self.assertIn( "--no-gc", child_args, f"Flag '--no-gc' not found when gc=False. Args: {child_args}", ) def test_build_output_pattern_with_outfile(self): """Test output pattern generation with user-specified output.""" from profiling.sampling.cli import _build_output_pattern # With extension args = argparse.Namespace(outfile="output.html", format="flamegraph") pattern = _build_output_pattern(args) self.assertEqual(pattern, "output_{pid}.html") # Without extension args = argparse.Namespace(outfile="output", format="pstats") pattern = _build_output_pattern(args) self.assertEqual(pattern, "output_{pid}") def test_build_output_pattern_default(self): """Test output pattern generation with default output.""" from profiling.sampling.cli import _build_output_pattern # Flamegraph format args = argparse.Namespace(outfile=None, format="flamegraph") pattern = _build_output_pattern(args) self.assertIn("{pid}", pattern) self.assertIn("flamegraph", pattern) self.assertTrue(pattern.endswith(".html")) # Heatmap format args = argparse.Namespace(outfile=None, format="heatmap") pattern = _build_output_pattern(args) self.assertEqual(pattern, "heatmap_{pid}") @requires_remote_subprocess_debugging() class TestChildrenIntegration(unittest.TestCase): """Integration tests for --subprocesses functionality.""" def setUp(self): reap_children() def tearDown(self): reap_children() def test_setup_child_monitor(self): """Test setting up a child monitor from args.""" from profiling.sampling.cli import _setup_child_monitor args = argparse.Namespace( interval=100, duration=5, all_threads=False, realtime_stats=False, native=False, gc=True, opcodes=False, async_aware=False, mode="wall", format="pstats", outfile=None, ) monitor = _setup_child_monitor(args, os.getpid()) # Use addCleanup to ensure monitor is properly cleaned up even if # assertions fail self.addCleanup(monitor.__exit__, None, None, None) self.assertIsNotNone(monitor) self.assertEqual( monitor.parent_pid, os.getpid(), f"Monitor parent_pid should be {os.getpid()}, got {monitor.parent_pid}", ) @requires_remote_subprocess_debugging() class TestIsPythonProcess(unittest.TestCase): """Tests for the is_python_process function.""" def setUp(self): reap_children() def tearDown(self): reap_children() def test_is_python_process_current_process(self): """Test that current process is detected as Python.""" from profiling.sampling._child_monitor import is_python_process # Current process should be Python result = is_python_process(os.getpid()) self.assertTrue( result, f"Current process (PID {os.getpid()}) should be detected as Python", ) def test_is_python_process_python_subprocess(self): """Test that a Python subprocess is detected as Python.""" from profiling.sampling._child_monitor import is_python_process # Start a Python subprocess proc = subprocess.Popen( [sys.executable, "-c", "import time; time.sleep(10)"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) try: # Poll until Python runtime structures are initialized # (is_python_process probes for runtime structures which take # time to initialize after process start) deadline = time.time() + SHORT_TIMEOUT detected = False while time.time() < deadline: if proc.poll() is not None: self.fail(f"Process {proc.pid} exited unexpectedly") if is_python_process(proc.pid): detected = True break time.sleep(0.05) self.assertTrue( detected, f"Python subprocess (PID {proc.pid}) should be detected as Python " f"within {SHORT_TIMEOUT}s", ) finally: _cleanup_process(proc) @unittest.skipUnless(sys.platform == "linux", "Linux only test") def test_is_python_process_non_python_subprocess(self): """Test that a non-Python subprocess is not detected as Python.""" from profiling.sampling._child_monitor import is_python_process # Start a non-Python subprocess (sleep command) proc = subprocess.Popen( ["sleep", "10"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) try: # Wait for process to be ready using polling self.assertTrue( _wait_for_process_ready(proc, SHORT_TIMEOUT), f"Process {proc.pid} should be ready within {SHORT_TIMEOUT}s", ) self.assertFalse( is_python_process(proc.pid), f"Non-Python subprocess 'sleep' (PID {proc.pid}) should NOT be " f"detected as Python", ) finally: _cleanup_process(proc) def test_is_python_process_nonexistent_pid(self): """Test that nonexistent PID returns False.""" from profiling.sampling._child_monitor import is_python_process # Use a very high PID that's unlikely to exist result = is_python_process(999999999) self.assertFalse( result, "Nonexistent PID 999999999 should return False", ) def test_is_python_process_exited_process(self): """Test handling of a process that exits quickly.""" from profiling.sampling._child_monitor import is_python_process # Start a process that exits immediately proc = subprocess.Popen( [sys.executable, "-c", "pass"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) # Wait for it to exit proc.wait(timeout=SHORT_TIMEOUT) # Should return False for exited process (not raise) result = is_python_process(proc.pid) self.assertFalse( result, f"Exited process (PID {proc.pid}) should return False" ) @requires_remote_subprocess_debugging() class TestMaxChildProfilersLimit(unittest.TestCase): """Tests for the _MAX_CHILD_PROFILERS limit.""" def setUp(self): reap_children() def tearDown(self): reap_children() def test_max_profilers_constant_exists(self): """Test that _MAX_CHILD_PROFILERS constant is defined.""" from profiling.sampling._child_monitor import _MAX_CHILD_PROFILERS self.assertEqual( _MAX_CHILD_PROFILERS, 100, f"_MAX_CHILD_PROFILERS should be 100, got {_MAX_CHILD_PROFILERS}", ) def test_cleanup_interval_constant_exists(self): """Test that _CLEANUP_INTERVAL_CYCLES constant is defined.""" from profiling.sampling._child_monitor import _CLEANUP_INTERVAL_CYCLES self.assertEqual( _CLEANUP_INTERVAL_CYCLES, 10, f"_CLEANUP_INTERVAL_CYCLES should be 10, got {_CLEANUP_INTERVAL_CYCLES}", ) def test_monitor_respects_max_limit(self): """Test that monitor refuses to spawn more than _MAX_CHILD_PROFILERS.""" from profiling.sampling._child_monitor import ( ChildProcessMonitor, _MAX_CHILD_PROFILERS, ) from unittest.mock import MagicMock, patch # Create a monitor monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=["-i", "100", "-d", "5"], output_pattern="test_{pid}.pstats", ) # Manually fill up the profilers list to the limit mock_profilers = [MagicMock() for _ in range(_MAX_CHILD_PROFILERS)] for mock_proc in mock_profilers: mock_proc.poll.return_value = None # Simulate running process monitor._spawned_profilers = mock_profilers # Try to spawn another profiler - should be rejected stderr_capture = io.StringIO() with patch("sys.stderr", stderr_capture): monitor._spawn_profiler_for_child(99999) # Verify warning was printed stderr_output = stderr_capture.getvalue() self.assertIn( "Max child profilers", stderr_output, f"Expected warning about max profilers, got: {stderr_output}", ) self.assertIn( str(_MAX_CHILD_PROFILERS), stderr_output, f"Warning should mention limit ({_MAX_CHILD_PROFILERS}): {stderr_output}", ) # Verify no new profiler was added self.assertEqual( len(monitor._spawned_profilers), _MAX_CHILD_PROFILERS, f"Should still have {_MAX_CHILD_PROFILERS} profilers, got " f"{len(monitor._spawned_profilers)}", ) @requires_remote_subprocess_debugging() class TestWaitForProfilers(unittest.TestCase): """Tests for the wait_for_profilers method.""" def setUp(self): reap_children() def tearDown(self): reap_children() def test_wait_for_profilers_empty_list(self): """Test that wait_for_profilers returns immediately with no profilers.""" from profiling.sampling._child_monitor import ChildProcessMonitor monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) # Should return immediately without printing anything stderr_capture = io.StringIO() with unittest.mock.patch("sys.stderr", stderr_capture): start = time.time() monitor.wait_for_profilers(timeout=10.0) elapsed = time.time() - start # Should complete very quickly (less than 1 second) self.assertLess( elapsed, 1.0, f"wait_for_profilers with empty list took {elapsed:.2f}s, expected < 1s", ) # No "Waiting for..." message should be printed self.assertNotIn( "Waiting for", stderr_capture.getvalue(), "Should not print waiting message when no profilers", ) def test_wait_for_profilers_with_completed_process(self): """Test waiting for profilers that complete quickly.""" from profiling.sampling._child_monitor import ChildProcessMonitor monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) # Start a process that exits quickly proc = subprocess.Popen( [sys.executable, "-c", "pass"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) # Add to spawned profilers monitor._spawned_profilers.append(proc) try: stderr_capture = io.StringIO() with unittest.mock.patch("sys.stderr", stderr_capture): start = time.time() monitor.wait_for_profilers(timeout=SHORT_TIMEOUT) elapsed = time.time() - start # Should complete quickly since process exits fast self.assertLess( elapsed, 5.0, f"wait_for_profilers took {elapsed:.2f}s for quick process", ) # Should print waiting message self.assertIn( "Waiting for 1 child profiler", stderr_capture.getvalue(), "Should print waiting message", ) finally: _cleanup_process(proc) def test_wait_for_profilers_timeout(self): """Test that wait_for_profilers respects timeout.""" from profiling.sampling._child_monitor import ChildProcessMonitor monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) # Start a process that runs for a long time proc = subprocess.Popen( [sys.executable, "-c", "import time; time.sleep(60)"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) # Add to spawned profilers monitor._spawned_profilers.append(proc) try: stderr_capture = io.StringIO() with unittest.mock.patch("sys.stderr", stderr_capture): start = time.time() # Use short timeout monitor.wait_for_profilers(timeout=0.5) elapsed = time.time() - start # Should timeout after approximately 0.5 seconds self.assertGreater( elapsed, 0.4, f"wait_for_profilers returned too quickly ({elapsed:.2f}s)", ) self.assertLess( elapsed, 2.0, f"wait_for_profilers took too long ({elapsed:.2f}s), timeout not respected", ) finally: _cleanup_process(proc) def test_wait_for_profilers_multiple(self): """Test waiting for multiple profilers.""" from profiling.sampling._child_monitor import ChildProcessMonitor monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) # Start multiple processes procs = [] for _ in range(3): proc = subprocess.Popen( [sys.executable, "-c", "import time; time.sleep(0.1)"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) procs.append(proc) monitor._spawned_profilers.append(proc) try: stderr_capture = io.StringIO() with unittest.mock.patch("sys.stderr", stderr_capture): monitor.wait_for_profilers(timeout=SHORT_TIMEOUT) # Should report correct count self.assertIn( "Waiting for 3 child profiler", stderr_capture.getvalue(), "Should report correct profiler count", ) finally: for proc in procs: _cleanup_process(proc) @requires_remote_subprocess_debugging() class TestEndToEndChildrenCLI(unittest.TestCase): """End-to-end tests for --subprocesses CLI flag.""" def setUp(self): reap_children() def tearDown(self): reap_children() def test_subprocesses_flag_spawns_child_and_creates_output(self): """Test that --subprocesses flag works end-to-end with actual subprocesses.""" # Create a temporary directory for output files with tempfile.TemporaryDirectory() as tmpdir: # Create a script that spawns a child Python process parent_script = f""" import subprocess import sys import time # Spawn a child that does some work child = subprocess.Popen([ sys.executable, '-c', 'import time; [i**2 for i in range(1000)]; time.sleep(2)' ]) # Do some work in parent for i in range(1000): _ = i ** 2 time.sleep(2) child.wait() """ script_file = os.path.join(tmpdir, "parent_script.py") with open(script_file, "w") as f: f.write(parent_script) output_file = os.path.join(tmpdir, "profile.pstats") # Run the profiler with --subprocesses flag result = subprocess.run( [ sys.executable, "-m", "profiling.sampling", "run", "--subprocesses", "-d", "3", "-i", "10000", "-o", output_file, script_file, ], capture_output=True, text=True, timeout=SHORT_TIMEOUT, ) # Check that parent output file was created self.assertTrue( os.path.exists(output_file), f"Parent profile output not created. " f"stdout: {result.stdout}, stderr: {result.stderr}", ) # Check for child profiler output files (pattern: profile_{pid}.pstats) output_files = os.listdir(tmpdir) child_profiles = [ f for f in output_files if f.startswith("profile_") and f.endswith(".pstats") ] # Note: Child profiling is best-effort; the child may exit before # profiler attaches, or the process may not be detected as Python. # We just verify the mechanism doesn't crash. if result.returncode != 0: self.fail( f"Profiler exited with code {result.returncode}. " f"stdout: {result.stdout}, stderr: {result.stderr}" ) def test_subprocesses_flag_with_flamegraph_output(self): """Test --subprocesses with flamegraph output format.""" with tempfile.TemporaryDirectory() as tmpdir: # Simple parent that spawns a child parent_script = f""" import subprocess import sys import time child = subprocess.Popen([sys.executable, '-c', 'import time; time.sleep(1)']) time.sleep(1) child.wait() """ script_file = os.path.join(tmpdir, "parent.py") with open(script_file, "w") as f: f.write(parent_script) output_file = os.path.join(tmpdir, "flame.html") result = subprocess.run( [ sys.executable, "-m", "profiling.sampling", "run", "--subprocesses", "-d", "2", "-i", "10000", "--flamegraph", "-o", output_file, script_file, ], capture_output=True, text=True, timeout=SHORT_TIMEOUT, ) self.assertTrue( os.path.exists(output_file), f"Flamegraph output not created. stderr: {result.stderr}", ) # Verify it's valid HTML with open(output_file, "r") as f: content = f.read() self.assertIn( "