1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
|
"""Tests for the heatmap collector (profiling.sampling)."""
import os
import shutil
import tempfile
import unittest
from collections import namedtuple
from pathlib import Path
# Matches the C structseq LocationInfo from _remote_debugging
LocationInfo = namedtuple('LocationInfo', ['lineno', 'end_lineno', 'col_offset', 'end_col_offset'])
from profiling.sampling.heatmap_collector import (
HeatmapCollector,
get_python_path_info,
extract_module_name,
)
from test.support import captured_stdout, captured_stderr
# =============================================================================
# Unit Tests for Public Helper Functions
# =============================================================================
class TestPathInfoFunctions(unittest.TestCase):
"""Test public helper functions for path information."""
def test_get_python_path_info_returns_dict(self):
"""Test that get_python_path_info returns a dictionary with expected keys."""
path_info = get_python_path_info()
self.assertIsInstance(path_info, dict)
self.assertIn('stdlib', path_info)
self.assertIn('site_packages', path_info)
self.assertIn('sys_path', path_info)
def test_get_python_path_info_stdlib_is_path_or_none(self):
"""Test that stdlib is either a Path object or None."""
path_info = get_python_path_info()
if path_info['stdlib'] is not None:
self.assertIsInstance(path_info['stdlib'], Path)
def test_get_python_path_info_site_packages_is_list(self):
"""Test that site_packages is a list."""
path_info = get_python_path_info()
self.assertIsInstance(path_info['site_packages'], list)
for item in path_info['site_packages']:
self.assertIsInstance(item, Path)
def test_get_python_path_info_sys_path_is_list(self):
"""Test that sys_path is a list of Path objects."""
path_info = get_python_path_info()
self.assertIsInstance(path_info['sys_path'], list)
for item in path_info['sys_path']:
self.assertIsInstance(item, Path)
def test_extract_module_name_with_none(self):
"""Test extract_module_name with None filename."""
path_info = get_python_path_info()
module_name, module_type = extract_module_name(None, path_info)
self.assertEqual(module_name, 'unknown')
self.assertEqual(module_type, 'other')
def test_extract_module_name_with_empty_string(self):
"""Test extract_module_name with empty filename."""
path_info = get_python_path_info()
module_name, module_type = extract_module_name('', path_info)
self.assertEqual(module_name, 'unknown')
self.assertEqual(module_type, 'other')
def test_extract_module_name_with_stdlib_file(self):
"""Test extract_module_name with a standard library file."""
path_info = get_python_path_info()
# Use os module as a known stdlib file
if path_info['stdlib']:
stdlib_file = str(path_info['stdlib'] / 'os.py')
module_name, module_type = extract_module_name(stdlib_file, path_info)
self.assertEqual(module_type, 'stdlib')
self.assertIn('os', module_name)
def test_extract_module_name_with_project_file(self):
"""Test extract_module_name with a project file."""
path_info = get_python_path_info()
# Create a mock project file path
if path_info['sys_path']:
# Use current directory as project path
project_file = '/some/project/path/mymodule.py'
module_name, module_type = extract_module_name(project_file, path_info)
# Should classify as 'other' if not in sys.path
self.assertIn(module_type, ['project', 'other'])
def test_extract_module_name_removes_py_extension(self):
"""Test that .py extension is removed from module names."""
path_info = get_python_path_info()
# Test with a simple .py file
module_name, module_type = extract_module_name('/path/to/test.py', path_info)
# Module name should not contain .py
self.assertNotIn('.py', module_name)
def test_extract_module_name_with_special_files(self):
"""Test extract_module_name with special filenames like <string>."""
path_info = get_python_path_info()
special_files = ['<string>', '<stdin>', '[eval]']
for special_file in special_files:
module_name, module_type = extract_module_name(special_file, path_info)
self.assertEqual(module_type, 'other')
# =============================================================================
# Unit Tests for HeatmapCollector Public API
# =============================================================================
class TestHeatmapCollectorInit(unittest.TestCase):
"""Test HeatmapCollector initialization."""
def test_init_creates_empty_data_structures(self):
"""Test that __init__ creates empty data structures."""
collector = HeatmapCollector(sample_interval_usec=100)
# Check that data structures are initialized
self.assertIsInstance(collector.line_samples, dict)
self.assertIsInstance(collector.file_samples, dict)
self.assertIsInstance(collector.line_self_samples, dict)
self.assertIsInstance(collector.file_self_samples, dict)
self.assertIsInstance(collector.call_graph, dict)
self.assertIsInstance(collector.callers_graph, dict)
self.assertIsInstance(collector.function_definitions, dict)
self.assertIsInstance(collector.edge_samples, dict)
# Check that they're empty
self.assertEqual(len(collector.line_samples), 0)
self.assertEqual(len(collector.file_samples), 0)
self.assertEqual(len(collector.line_self_samples), 0)
self.assertEqual(len(collector.file_self_samples), 0)
def test_init_sets_total_samples_to_zero(self):
"""Test that total samples starts at zero."""
collector = HeatmapCollector(sample_interval_usec=100)
self.assertEqual(collector._total_samples, 0)
def test_init_gets_path_info(self):
"""Test that path info is retrieved during init."""
collector = HeatmapCollector(sample_interval_usec=100)
self.assertIsNotNone(collector._path_info)
self.assertIn('stdlib', collector._path_info)
class TestHeatmapCollectorSetStats(unittest.TestCase):
"""Test HeatmapCollector.set_stats() method."""
def test_set_stats_stores_all_parameters(self):
"""Test that set_stats stores all provided parameters."""
collector = HeatmapCollector(sample_interval_usec=100)
collector.set_stats(
sample_interval_usec=500,
duration_sec=10.5,
sample_rate=99.5,
error_rate=0.5
)
self.assertEqual(collector.stats['sample_interval_usec'], 500)
self.assertEqual(collector.stats['duration_sec'], 10.5)
self.assertEqual(collector.stats['sample_rate'], 99.5)
self.assertEqual(collector.stats['error_rate'], 0.5)
def test_set_stats_includes_system_info(self):
"""Test that set_stats includes Python and platform info."""
collector = HeatmapCollector(sample_interval_usec=100)
collector.set_stats(sample_interval_usec=100, duration_sec=1.0, sample_rate=100.0)
self.assertIn('python_version', collector.stats)
self.assertIn('python_implementation', collector.stats)
self.assertIn('platform', collector.stats)
def test_set_stats_accepts_kwargs(self):
"""Test that set_stats accepts additional kwargs."""
collector = HeatmapCollector(sample_interval_usec=100)
collector.set_stats(
sample_interval_usec=100,
duration_sec=1.0,
sample_rate=100.0,
custom_key='custom_value',
another_key=42
)
self.assertEqual(collector.stats['custom_key'], 'custom_value')
self.assertEqual(collector.stats['another_key'], 42)
def test_set_stats_with_none_error_rate(self):
"""Test set_stats with error_rate=None."""
collector = HeatmapCollector(sample_interval_usec=100)
collector.set_stats(sample_interval_usec=100, duration_sec=1.0, sample_rate=100.0)
self.assertIn('error_rate', collector.stats)
self.assertIsNone(collector.stats['error_rate'])
class TestHeatmapCollectorProcessFrames(unittest.TestCase):
"""Test HeatmapCollector.process_frames() method."""
def test_process_frames_increments_total_samples(self):
"""Test that process_frames increments total samples count."""
collector = HeatmapCollector(sample_interval_usec=100)
initial_count = collector._total_samples
frames = [('file.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
self.assertEqual(collector._total_samples, initial_count + 1)
def test_process_frames_records_line_samples(self):
"""Test that process_frames records line samples."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('test.py', (5, 5, -1, -1), 'test_func', None)]
collector.process_frames(frames, thread_id=1)
# Check that line was recorded
self.assertIn(('test.py', 5), collector.line_samples)
self.assertEqual(collector.line_samples[('test.py', 5)], 1)
def test_process_frames_records_multiple_lines_in_stack(self):
"""Test that process_frames records all lines in a stack."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [
('file1.py', (10, 10, -1, -1), 'func1', None),
('file2.py', (20, 20, -1, -1), 'func2', None),
('file3.py', (30, 30, -1, -1), 'func3', None)
]
collector.process_frames(frames, thread_id=1)
# All frames should be recorded
self.assertIn(('file1.py', 10), collector.line_samples)
self.assertIn(('file2.py', 20), collector.line_samples)
self.assertIn(('file3.py', 30), collector.line_samples)
def test_process_frames_distinguishes_self_samples(self):
"""Test that process_frames distinguishes self (leaf) samples."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [
('leaf.py', (5, 5, -1, -1), 'leaf_func', None), # This is the leaf (top of stack)
('caller.py', (10, 10, -1, -1), 'caller_func', None)
]
collector.process_frames(frames, thread_id=1)
# Leaf should have self sample
self.assertIn(('leaf.py', 5), collector.line_self_samples)
self.assertEqual(collector.line_self_samples[('leaf.py', 5)], 1)
# Caller should NOT have self sample
self.assertNotIn(('caller.py', 10), collector.line_self_samples)
def test_process_frames_accumulates_samples(self):
"""Test that multiple calls accumulate samples."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('file.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
collector.process_frames(frames, thread_id=1)
collector.process_frames(frames, thread_id=1)
self.assertEqual(collector.line_samples[('file.py', 10)], 3)
self.assertEqual(collector._total_samples, 3)
def test_process_frames_ignores_invalid_frames(self):
"""Test that process_frames ignores invalid frames."""
collector = HeatmapCollector(sample_interval_usec=100)
# These should be ignored
invalid_frames = [
('<string>', (1, 1, -1, -1), 'test', None),
('[eval]', (1, 1, -1, -1), 'test', None),
('', (1, 1, -1, -1), 'test', None),
(None, (1, 1, -1, -1), 'test', None),
('__init__', (0, 0, -1, -1), 'test', None), # Special invalid frame
]
for frame in invalid_frames:
collector.process_frames([frame], thread_id=1)
# Should not record these invalid frames
for frame in invalid_frames:
if frame[0]:
self.assertNotIn((frame[0], frame[1][0]), collector.line_samples)
def test_process_frames_builds_call_graph(self):
"""Test that process_frames builds call graph relationships."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [
('callee.py', (5, 5, -1, -1), 'callee_func', None),
('caller.py', (10, 10, -1, -1), 'caller_func', None)
]
collector.process_frames(frames, thread_id=1)
# Check that call relationship was recorded
caller_key = ('caller.py', 10)
self.assertIn(caller_key, collector.call_graph)
# Check callers graph
callee_key = ('callee.py', 5)
self.assertIn(callee_key, collector.callers_graph)
def test_process_frames_records_function_definitions(self):
"""Test that process_frames records function definition locations."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('module.py', (42, 42, -1, -1), 'my_function', None)]
collector.process_frames(frames, thread_id=1)
self.assertIn(('module.py', 'my_function'), collector.function_definitions)
self.assertEqual(collector.function_definitions[('module.py', 'my_function')], 42)
def test_process_frames_tracks_edge_samples(self):
"""Test that process_frames tracks edge sample counts."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [
('callee.py', (5, 5, -1, -1), 'callee', None),
('caller.py', (10, 10, -1, -1), 'caller', None)
]
# Process same call stack multiple times
collector.process_frames(frames, thread_id=1)
collector.process_frames(frames, thread_id=1)
# Check that edge count is tracked
self.assertGreater(len(collector.edge_samples), 0)
def test_process_frames_handles_empty_frames(self):
"""Test that process_frames handles empty frame list."""
collector = HeatmapCollector(sample_interval_usec=100)
initial_count = collector._total_samples
collector.process_frames([], thread_id=1)
# Should still increment total samples
self.assertEqual(collector._total_samples, initial_count + 1)
def test_process_frames_with_file_samples_dict(self):
"""Test that file_samples dict is properly populated."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('test.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
self.assertIn('test.py', collector.file_samples)
self.assertIn(10, collector.file_samples['test.py'])
self.assertEqual(collector.file_samples['test.py'][10], 1)
class TestHeatmapCollectorExport(unittest.TestCase):
"""Test HeatmapCollector.export() method."""
def setUp(self):
"""Set up test directory."""
self.test_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.test_dir)
def test_export_creates_output_directory(self):
"""Test that export creates the output directory."""
collector = HeatmapCollector(sample_interval_usec=100)
# Add some data
frames = [('test.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
output_path = os.path.join(self.test_dir, 'heatmap_output')
with captured_stdout(), captured_stderr():
collector.export(output_path)
self.assertTrue(os.path.exists(output_path))
self.assertTrue(os.path.isdir(output_path))
def test_export_creates_index_html(self):
"""Test that export creates index.html."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('test.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
output_path = os.path.join(self.test_dir, 'heatmap_output')
with captured_stdout(), captured_stderr():
collector.export(output_path)
index_path = os.path.join(output_path, 'index.html')
self.assertTrue(os.path.exists(index_path))
def test_export_creates_file_htmls(self):
"""Test that export creates individual file HTMLs."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('test.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
output_path = os.path.join(self.test_dir, 'heatmap_output')
with captured_stdout(), captured_stderr():
collector.export(output_path)
# Check for file_XXXX.html files
html_files = [f for f in os.listdir(output_path)
if f.startswith('file_') and f.endswith('.html')]
self.assertGreater(len(html_files), 0)
def test_export_with_empty_data(self):
"""Test export with no data collected."""
collector = HeatmapCollector(sample_interval_usec=100)
output_path = os.path.join(self.test_dir, 'empty_output')
# Should handle empty data gracefully
with captured_stdout(), captured_stderr():
collector.export(output_path)
def test_export_handles_html_suffix(self):
"""Test that export handles .html suffix in output path."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('test.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
# Path with .html suffix should be stripped
output_path = os.path.join(self.test_dir, 'output.html')
with captured_stdout(), captured_stderr():
collector.export(output_path)
# Should create directory without .html
expected_dir = os.path.join(self.test_dir, 'output')
self.assertTrue(os.path.exists(expected_dir))
def test_export_with_multiple_files(self):
"""Test export with multiple files."""
collector = HeatmapCollector(sample_interval_usec=100)
# Add samples for multiple files
collector.process_frames([('file1.py', (10, 10, -1, -1), 'func1', None)], thread_id=1)
collector.process_frames([('file2.py', (20, 20, -1, -1), 'func2', None)], thread_id=1)
collector.process_frames([('file3.py', (30, 30, -1, -1), 'func3', None)], thread_id=1)
output_path = os.path.join(self.test_dir, 'multi_file')
with captured_stdout(), captured_stderr():
collector.export(output_path)
# Should create HTML for each file
html_files = [f for f in os.listdir(output_path)
if f.startswith('file_') and f.endswith('.html')]
self.assertGreaterEqual(len(html_files), 3)
def test_export_index_contains_file_references(self):
"""Test that index.html contains references to profiled files."""
collector = HeatmapCollector(sample_interval_usec=100)
collector.set_stats(sample_interval_usec=100, duration_sec=1.0, sample_rate=100.0)
frames = [('mytest.py', (10, 10, -1, -1), 'my_func', None)]
collector.process_frames(frames, thread_id=1)
output_path = os.path.join(self.test_dir, 'test_output')
with captured_stdout(), captured_stderr():
collector.export(output_path)
index_path = os.path.join(output_path, 'index.html')
with open(index_path, 'r', encoding='utf-8') as f:
content = f.read()
# Should contain reference to the file
self.assertIn('mytest', content)
def test_export_file_html_has_line_numbers(self):
"""Test that exported file HTML contains line numbers."""
collector = HeatmapCollector(sample_interval_usec=100)
# Create a temporary Python file
temp_file = os.path.join(self.test_dir, 'temp_source.py')
with open(temp_file, 'w') as f:
f.write('def test():\n pass\n')
frames = [(temp_file, (1, 1, -1, -1), 'test', None)]
collector.process_frames(frames, thread_id=1)
output_path = os.path.join(self.test_dir, 'line_test')
with captured_stdout(), captured_stderr():
collector.export(output_path)
# Find the generated file HTML
html_files = [f for f in os.listdir(output_path)
if f.startswith('file_') and f.endswith('.html')]
if html_files:
with open(os.path.join(output_path, html_files[0]), 'r', encoding='utf-8') as f:
content = f.read()
# Should have line-related content
self.assertIn('line-', content)
class MockFrameInfo:
"""Mock FrameInfo for testing.
Frame format: (filename, location, funcname, opcode) where:
- location is a tuple (lineno, end_lineno, col_offset, end_col_offset)
- opcode is an int or None
"""
def __init__(self, filename, lineno, funcname, opcode=None):
self.filename = filename
self.funcname = funcname
self.opcode = opcode
self.location = (lineno, lineno, -1, -1)
def __iter__(self):
return iter((self.filename, self.location, self.funcname, self.opcode))
def __getitem__(self, index):
return (self.filename, self.location, self.funcname, self.opcode)[index]
def __len__(self):
return 4
def __repr__(self):
return f"MockFrameInfo('{self.filename}', {self.location}, '{self.funcname}', {self.opcode})"
class MockThreadInfo:
"""Mock ThreadInfo for testing since the real one isn't accessible."""
def __init__(self, thread_id, frame_info, status=0):
self.thread_id = thread_id
self.frame_info = frame_info
self.status = status # Thread status flags
def __repr__(self):
return f"MockThreadInfo(thread_id={self.thread_id}, frame_info={self.frame_info})"
class MockInterpreterInfo:
"""Mock InterpreterInfo for testing since the real one isn't accessible."""
def __init__(self, interpreter_id, threads):
self.interpreter_id = interpreter_id
self.threads = threads
def __repr__(self):
return f"MockInterpreterInfo(interpreter_id={self.interpreter_id}, threads={self.threads})"
class TestHeatmapCollector(unittest.TestCase):
"""Tests for HeatmapCollector functionality."""
def test_heatmap_collector_basic(self):
"""Test basic HeatmapCollector functionality."""
collector = HeatmapCollector(sample_interval_usec=100)
# Test empty state
self.assertEqual(len(collector.file_samples), 0)
self.assertEqual(len(collector.line_samples), 0)
# Test collecting sample data - frames are 4-tuples: (filename, location, funcname, opcode)
test_frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")],
)]
)
]
collector.collect(test_frames)
# Should have recorded samples for the file
self.assertGreater(len(collector.line_samples), 0)
self.assertIn("file.py", collector.file_samples)
# Check that line samples were recorded
file_data = collector.file_samples["file.py"]
self.assertGreater(len(file_data), 0)
def test_heatmap_collector_export(self):
"""Test heatmap HTML export functionality."""
heatmap_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, heatmap_dir)
collector = HeatmapCollector(sample_interval_usec=100)
# Create test data with multiple files using MockFrameInfo
test_frames1 = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")])],
)
]
test_frames2 = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")])],
)
] # Same stack
test_frames3 = [
MockInterpreterInfo(0, [MockThreadInfo(1, [MockFrameInfo("other.py", 5, "other_func")])])
]
collector.collect(test_frames1)
collector.collect(test_frames2)
collector.collect(test_frames3)
# Export heatmap
with (captured_stdout(), captured_stderr()):
collector.export(heatmap_dir)
# Verify index.html was created
index_path = os.path.join(heatmap_dir, "index.html")
self.assertTrue(os.path.exists(index_path))
self.assertGreater(os.path.getsize(index_path), 0)
# Check index contains HTML content
with open(index_path, "r", encoding="utf-8") as f:
content = f.read()
# Should be valid HTML
self.assertIn("<!doctype html>", content.lower())
self.assertIn("<html", content)
self.assertIn("Tachyon Profiler", content)
# Should contain file references
self.assertIn("file.py", content)
self.assertIn("other.py", content)
# Verify individual file HTMLs were created
file_htmls = [f for f in os.listdir(heatmap_dir) if f.startswith("file_") and f.endswith(".html")]
self.assertGreater(len(file_htmls), 0)
# Check one of the file HTMLs
file_html_path = os.path.join(heatmap_dir, file_htmls[0])
with open(file_html_path, "r", encoding="utf-8") as f:
file_content = f.read()
# Should contain heatmap styling and JavaScript
self.assertIn("line-sample", file_content)
self.assertIn("nav-btn", file_content)
class TestHeatmapCollectorLocation(unittest.TestCase):
"""Tests for HeatmapCollector location handling."""
def test_heatmap_with_full_location_info(self):
"""Test HeatmapCollector uses full location tuple."""
collector = HeatmapCollector(sample_interval_usec=1000)
# Frame with full location: (lineno, end_lineno, col_offset, end_col_offset)
frame = MockFrameInfo("test.py", 10, "func")
# Override with full location info
frame.location = LocationInfo(10, 15, 4, 20)
frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame])]
)
]
collector.collect(frames)
# Verify data was collected with location info
# HeatmapCollector uses file_samples dict with filename -> Counter of linenos
self.assertIn("test.py", collector.file_samples)
# Line 10 should have samples
self.assertIn(10, collector.file_samples["test.py"])
def test_heatmap_with_none_location(self):
"""Test HeatmapCollector handles None location gracefully."""
collector = HeatmapCollector(sample_interval_usec=1000)
# Synthetic frame with None location
frame = MockFrameInfo("~", 0, "<native>")
frame.location = None
frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame])]
)
]
# Should not raise
collector.collect(frames)
def test_heatmap_export_with_location_data(self):
"""Test HeatmapCollector export includes location info."""
tmp_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmp_dir)
collector = HeatmapCollector(sample_interval_usec=1000)
frame = MockFrameInfo("test.py", 10, "process")
frame.location = LocationInfo(10, 12, 0, 30)
frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame])]
)
]
collector.collect(frames)
# Export should work
with (captured_stdout(), captured_stderr()):
collector.export(tmp_dir)
self.assertTrue(os.path.exists(os.path.join(tmp_dir, "index.html")))
def test_heatmap_collector_frame_format(self):
"""Test HeatmapCollector with 4-element frame format."""
collector = HeatmapCollector(sample_interval_usec=1000)
frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
MockFrameInfo("app.py", 100, "main", opcode=90),
MockFrameInfo("utils.py", 50, "helper", opcode=100),
MockFrameInfo("lib.py", 25, "process", opcode=None),
],
)
],
)
]
collector.collect(frames)
# Should have recorded data for the files
self.assertIn("app.py", collector.file_samples)
self.assertIn("utils.py", collector.file_samples)
self.assertIn("lib.py", collector.file_samples)
if __name__ == "__main__":
unittest.main()
|