diff options
Diffstat (limited to 'Lib/test/test_tracemalloc.py')
-rw-r--r-- | Lib/test/test_tracemalloc.py | 219 |
1 files changed, 192 insertions, 27 deletions
diff --git a/Lib/test/test_tracemalloc.py b/Lib/test/test_tracemalloc.py index da89a9a..790ab7e 100644 --- a/Lib/test/test_tracemalloc.py +++ b/Lib/test/test_tracemalloc.py @@ -11,9 +11,15 @@ try: import threading except ImportError: threading = None +try: + import _testcapi +except ImportError: + _testcapi = None + EMPTY_STRING_SIZE = sys.getsizeof(b'') + def get_frames(nframe, lineno_delta): frames = [] frame = sys._getframe(1) @@ -37,28 +43,31 @@ def allocate_bytes(size): def create_snapshots(): traceback_limit = 2 + # _tracemalloc._get_traces() returns a list of (domain, size, + # traceback_frames) tuples. traceback_frames is a tuple of (filename, + # line_number) tuples. raw_traces = [ - (10, (('a.py', 2), ('b.py', 4))), - (10, (('a.py', 2), ('b.py', 4))), - (10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), - (2, (('a.py', 5), ('b.py', 4))), + (1, 2, (('a.py', 5), ('b.py', 4))), - (66, (('b.py', 1),)), + (2, 66, (('b.py', 1),)), - (7, (('<unknown>', 0),)), + (3, 7, (('<unknown>', 0),)), ] snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit) raw_traces2 = [ - (10, (('a.py', 2), ('b.py', 4))), - (10, (('a.py', 2), ('b.py', 4))), - (10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), - (2, (('a.py', 5), ('b.py', 4))), - (5000, (('a.py', 5), ('b.py', 4))), + (2, 2, (('a.py', 5), ('b.py', 4))), + (2, 5000, (('a.py', 5), ('b.py', 4))), - (400, (('c.py', 578),)), + (4, 400, (('c.py', 578),)), ] snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit) @@ -126,7 +135,7 @@ class TestTracemallocEnabled(unittest.TestCase): def find_trace(self, traces, traceback): for trace in traces: - if trace[1] == traceback._frames: + if trace[2] == traceback._frames: return trace self.fail("trace not found") @@ -140,7 +149,7 @@ class TestTracemallocEnabled(unittest.TestCase): trace = self.find_trace(traces, obj_traceback) self.assertIsInstance(trace, tuple) - size, traceback = trace + domain, size, traceback = trace self.assertEqual(size, obj_size) self.assertEqual(traceback, obj_traceback._frames) @@ -167,9 +176,8 @@ class TestTracemallocEnabled(unittest.TestCase): trace1 = self.find_trace(traces, obj1_traceback) trace2 = self.find_trace(traces, obj2_traceback) - size1, traceback1 = trace1 - size2, traceback2 = trace2 - self.assertEqual(traceback2, traceback1) + domain1, size1, traceback1 = trace1 + domain2, size2, traceback2 = trace2 self.assertIs(traceback2, traceback1) def test_get_traced_memory(self): @@ -292,7 +300,7 @@ class TestSnapshot(unittest.TestCase): maxDiff = 4000 def test_create_snapshot(self): - raw_traces = [(5, (('a.py', 2),))] + raw_traces = [(0, 5, (('a.py', 2),))] with contextlib.ExitStack() as stack: stack.enter_context(patch.object(tracemalloc, 'is_tracing', @@ -322,11 +330,11 @@ class TestSnapshot(unittest.TestCase): # exclude b.py snapshot3 = snapshot.filter_traces((filter1,)) self.assertEqual(snapshot3.traces._traces, [ - (10, (('a.py', 2), ('b.py', 4))), - (10, (('a.py', 2), ('b.py', 4))), - (10, (('a.py', 2), ('b.py', 4))), - (2, (('a.py', 5), ('b.py', 4))), - (7, (('<unknown>', 0),)), + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (1, 2, (('a.py', 5), ('b.py', 4))), + (3, 7, (('<unknown>', 0),)), ]) # filter_traces() must not touch the original snapshot @@ -335,10 +343,10 @@ class TestSnapshot(unittest.TestCase): # only include two lines of a.py snapshot4 = snapshot3.filter_traces((filter2, filter3)) self.assertEqual(snapshot4.traces._traces, [ - (10, (('a.py', 2), ('b.py', 4))), - (10, (('a.py', 2), ('b.py', 4))), - (10, (('a.py', 2), ('b.py', 4))), - (2, (('a.py', 5), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (1, 2, (('a.py', 5), ('b.py', 4))), ]) # No filter: just duplicate the snapshot @@ -349,6 +357,54 @@ class TestSnapshot(unittest.TestCase): self.assertRaises(TypeError, snapshot.filter_traces, filter1) + def test_filter_traces_domain(self): + snapshot, snapshot2 = create_snapshots() + filter1 = tracemalloc.Filter(False, "a.py", domain=1) + filter2 = tracemalloc.Filter(True, "a.py", domain=1) + + original_traces = list(snapshot.traces._traces) + + # exclude a.py of domain 1 + snapshot3 = snapshot.filter_traces((filter1,)) + self.assertEqual(snapshot3.traces._traces, [ + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (2, 66, (('b.py', 1),)), + (3, 7, (('<unknown>', 0),)), + ]) + + # include domain 1 + snapshot3 = snapshot.filter_traces((filter1,)) + self.assertEqual(snapshot3.traces._traces, [ + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (2, 66, (('b.py', 1),)), + (3, 7, (('<unknown>', 0),)), + ]) + + def test_filter_traces_domain_filter(self): + snapshot, snapshot2 = create_snapshots() + filter1 = tracemalloc.DomainFilter(False, domain=3) + filter2 = tracemalloc.DomainFilter(True, domain=3) + + # exclude domain 2 + snapshot3 = snapshot.filter_traces((filter1,)) + self.assertEqual(snapshot3.traces._traces, [ + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (0, 10, (('a.py', 2), ('b.py', 4))), + (1, 2, (('a.py', 5), ('b.py', 4))), + (2, 66, (('b.py', 1),)), + ]) + + # include domain 2 + snapshot3 = snapshot.filter_traces((filter2,)) + self.assertEqual(snapshot3.traces._traces, [ + (3, 7, (('<unknown>', 0),)), + ]) + def test_snapshot_group_by_line(self): snapshot, snapshot2 = create_snapshots() tb_0 = traceback_lineno('<unknown>', 0) @@ -816,12 +872,121 @@ class TestCommandLine(unittest.TestCase): assert_python_ok('-X', 'tracemalloc', '-c', code) +@unittest.skipIf(_testcapi is None, 'need _testcapi') +class TestCAPI(unittest.TestCase): + maxDiff = 80 * 20 + + def setUp(self): + if tracemalloc.is_tracing(): + self.skipTest("tracemalloc must be stopped before the test") + + self.domain = 5 + self.size = 123 + self.obj = allocate_bytes(self.size)[0] + + # for the type "object", id(obj) is the address of its memory block. + # This type is not tracked by the garbage collector + self.ptr = id(self.obj) + + def tearDown(self): + tracemalloc.stop() + + def get_traceback(self): + frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr) + if frames is not None: + return tracemalloc.Traceback(frames) + else: + return None + + def track(self, release_gil=False, nframe=1): + frames = get_frames(nframe, 2) + _testcapi.tracemalloc_track(self.domain, self.ptr, self.size, + release_gil) + return frames + + def untrack(self): + _testcapi.tracemalloc_untrack(self.domain, self.ptr) + + def get_traced_memory(self): + # Get the traced size in the domain + snapshot = tracemalloc.take_snapshot() + domain_filter = tracemalloc.DomainFilter(True, self.domain) + snapshot = snapshot.filter_traces([domain_filter]) + return sum(trace.size for trace in snapshot.traces) + + def check_track(self, release_gil): + nframe = 5 + tracemalloc.start(nframe) + + size = tracemalloc.get_traced_memory()[0] + + frames = self.track(release_gil, nframe) + self.assertEqual(self.get_traceback(), + tracemalloc.Traceback(frames)) + + self.assertEqual(self.get_traced_memory(), self.size) + + def test_track(self): + self.check_track(False) + + def test_track_without_gil(self): + # check that calling _PyTraceMalloc_Track() without holding the GIL + # works too + self.check_track(True) + + def test_track_already_tracked(self): + nframe = 5 + tracemalloc.start(nframe) + + # track a first time + self.track() + + # calling _PyTraceMalloc_Track() must remove the old trace and add + # a new trace with the new traceback + frames = self.track(nframe=nframe) + self.assertEqual(self.get_traceback(), + tracemalloc.Traceback(frames)) + + def test_untrack(self): + tracemalloc.start() + + self.track() + self.assertIsNotNone(self.get_traceback()) + self.assertEqual(self.get_traced_memory(), self.size) + + # untrack must remove the trace + self.untrack() + self.assertIsNone(self.get_traceback()) + self.assertEqual(self.get_traced_memory(), 0) + + # calling _PyTraceMalloc_Untrack() multiple times must not crash + self.untrack() + self.untrack() + + def test_stop_track(self): + tracemalloc.start() + tracemalloc.stop() + + with self.assertRaises(RuntimeError): + self.track() + self.assertIsNone(self.get_traceback()) + + def test_stop_untrack(self): + tracemalloc.start() + self.track() + + tracemalloc.stop() + with self.assertRaises(RuntimeError): + self.untrack() + + def test_main(): support.run_unittest( TestTracemallocEnabled, TestSnapshot, TestFilters, TestCommandLine, + TestCAPI, ) if __name__ == "__main__": |