diff options
Diffstat (limited to 'Lib/test/test_tracemalloc.py')
| -rw-r--r-- | Lib/test/test_tracemalloc.py | 219 | 
1 files changed, 192 insertions, 27 deletions
| diff --git a/Lib/test/test_tracemalloc.py b/Lib/test/test_tracemalloc.py index da89a9a..790ab7e 100644 --- a/Lib/test/test_tracemalloc.py +++ b/Lib/test/test_tracemalloc.py @@ -11,9 +11,15 @@ try:      import threading  except ImportError:      threading = None +try: +    import _testcapi +except ImportError: +    _testcapi = None +  EMPTY_STRING_SIZE = sys.getsizeof(b'') +  def get_frames(nframe, lineno_delta):      frames = []      frame = sys._getframe(1) @@ -37,28 +43,31 @@ def allocate_bytes(size):  def create_snapshots():      traceback_limit = 2 +    # _tracemalloc._get_traces() returns a list of (domain, size, +    # traceback_frames) tuples. traceback_frames is a tuple of (filename, +    # line_number) tuples.      raw_traces = [ -        (10, (('a.py', 2), ('b.py', 4))), -        (10, (('a.py', 2), ('b.py', 4))), -        (10, (('a.py', 2), ('b.py', 4))), +        (0, 10, (('a.py', 2), ('b.py', 4))), +        (0, 10, (('a.py', 2), ('b.py', 4))), +        (0, 10, (('a.py', 2), ('b.py', 4))), -        (2, (('a.py', 5), ('b.py', 4))), +        (1, 2, (('a.py', 5), ('b.py', 4))), -        (66, (('b.py', 1),)), +        (2, 66, (('b.py', 1),)), -        (7, (('<unknown>', 0),)), +        (3, 7, (('<unknown>', 0),)),      ]      snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)      raw_traces2 = [ -        (10, (('a.py', 2), ('b.py', 4))), -        (10, (('a.py', 2), ('b.py', 4))), -        (10, (('a.py', 2), ('b.py', 4))), +        (0, 10, (('a.py', 2), ('b.py', 4))), +        (0, 10, (('a.py', 2), ('b.py', 4))), +        (0, 10, (('a.py', 2), ('b.py', 4))), -        (2, (('a.py', 5), ('b.py', 4))), -        (5000, (('a.py', 5), ('b.py', 4))), +        (2, 2, (('a.py', 5), ('b.py', 4))), +        (2, 5000, (('a.py', 5), ('b.py', 4))), -        (400, (('c.py', 578),)), +        (4, 400, (('c.py', 578),)),      ]      snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit) @@ -126,7 +135,7 @@ class TestTracemallocEnabled(unittest.TestCase):      def find_trace(self, traces, traceback):          for trace in traces: -            if trace[1] == traceback._frames: +            if trace[2] == traceback._frames:                  return trace          self.fail("trace not found") @@ -140,7 +149,7 @@ class TestTracemallocEnabled(unittest.TestCase):          trace = self.find_trace(traces, obj_traceback)          self.assertIsInstance(trace, tuple) -        size, traceback = trace +        domain, size, traceback = trace          self.assertEqual(size, obj_size)          self.assertEqual(traceback, obj_traceback._frames) @@ -167,9 +176,8 @@ class TestTracemallocEnabled(unittest.TestCase):          trace1 = self.find_trace(traces, obj1_traceback)          trace2 = self.find_trace(traces, obj2_traceback) -        size1, traceback1 = trace1 -        size2, traceback2 = trace2 -        self.assertEqual(traceback2, traceback1) +        domain1, size1, traceback1 = trace1 +        domain2, size2, traceback2 = trace2          self.assertIs(traceback2, traceback1)      def test_get_traced_memory(self): @@ -292,7 +300,7 @@ class TestSnapshot(unittest.TestCase):      maxDiff = 4000      def test_create_snapshot(self): -        raw_traces = [(5, (('a.py', 2),))] +        raw_traces = [(0, 5, (('a.py', 2),))]          with contextlib.ExitStack() as stack:              stack.enter_context(patch.object(tracemalloc, 'is_tracing', @@ -322,11 +330,11 @@ class TestSnapshot(unittest.TestCase):          # exclude b.py          snapshot3 = snapshot.filter_traces((filter1,))          self.assertEqual(snapshot3.traces._traces, [ -            (10, (('a.py', 2), ('b.py', 4))), -            (10, (('a.py', 2), ('b.py', 4))), -            (10, (('a.py', 2), ('b.py', 4))), -            (2, (('a.py', 5), ('b.py', 4))), -            (7, (('<unknown>', 0),)), +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (1, 2, (('a.py', 5), ('b.py', 4))), +            (3, 7, (('<unknown>', 0),)),          ])          # filter_traces() must not touch the original snapshot @@ -335,10 +343,10 @@ class TestSnapshot(unittest.TestCase):          # only include two lines of a.py          snapshot4 = snapshot3.filter_traces((filter2, filter3))          self.assertEqual(snapshot4.traces._traces, [ -            (10, (('a.py', 2), ('b.py', 4))), -            (10, (('a.py', 2), ('b.py', 4))), -            (10, (('a.py', 2), ('b.py', 4))), -            (2, (('a.py', 5), ('b.py', 4))), +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (1, 2, (('a.py', 5), ('b.py', 4))),          ])          # No filter: just duplicate the snapshot @@ -349,6 +357,54 @@ class TestSnapshot(unittest.TestCase):          self.assertRaises(TypeError, snapshot.filter_traces, filter1) +    def test_filter_traces_domain(self): +        snapshot, snapshot2 = create_snapshots() +        filter1 = tracemalloc.Filter(False, "a.py", domain=1) +        filter2 = tracemalloc.Filter(True, "a.py", domain=1) + +        original_traces = list(snapshot.traces._traces) + +        # exclude a.py of domain 1 +        snapshot3 = snapshot.filter_traces((filter1,)) +        self.assertEqual(snapshot3.traces._traces, [ +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (2, 66, (('b.py', 1),)), +            (3, 7, (('<unknown>', 0),)), +        ]) + +        # include domain 1 +        snapshot3 = snapshot.filter_traces((filter1,)) +        self.assertEqual(snapshot3.traces._traces, [ +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (2, 66, (('b.py', 1),)), +            (3, 7, (('<unknown>', 0),)), +        ]) + +    def test_filter_traces_domain_filter(self): +        snapshot, snapshot2 = create_snapshots() +        filter1 = tracemalloc.DomainFilter(False, domain=3) +        filter2 = tracemalloc.DomainFilter(True, domain=3) + +        # exclude domain 2 +        snapshot3 = snapshot.filter_traces((filter1,)) +        self.assertEqual(snapshot3.traces._traces, [ +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (0, 10, (('a.py', 2), ('b.py', 4))), +            (1, 2, (('a.py', 5), ('b.py', 4))), +            (2, 66, (('b.py', 1),)), +        ]) + +        # include domain 2 +        snapshot3 = snapshot.filter_traces((filter2,)) +        self.assertEqual(snapshot3.traces._traces, [ +            (3, 7, (('<unknown>', 0),)), +        ]) +      def test_snapshot_group_by_line(self):          snapshot, snapshot2 = create_snapshots()          tb_0 = traceback_lineno('<unknown>', 0) @@ -816,12 +872,121 @@ class TestCommandLine(unittest.TestCase):          assert_python_ok('-X', 'tracemalloc', '-c', code) +@unittest.skipIf(_testcapi is None, 'need _testcapi') +class TestCAPI(unittest.TestCase): +    maxDiff = 80 * 20 + +    def setUp(self): +        if tracemalloc.is_tracing(): +            self.skipTest("tracemalloc must be stopped before the test") + +        self.domain = 5 +        self.size = 123 +        self.obj = allocate_bytes(self.size)[0] + +        # for the type "object", id(obj) is the address of its memory block. +        # This type is not tracked by the garbage collector +        self.ptr = id(self.obj) + +    def tearDown(self): +        tracemalloc.stop() + +    def get_traceback(self): +        frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr) +        if frames is not None: +            return tracemalloc.Traceback(frames) +        else: +            return None + +    def track(self, release_gil=False, nframe=1): +        frames = get_frames(nframe, 2) +        _testcapi.tracemalloc_track(self.domain, self.ptr, self.size, +                                    release_gil) +        return frames + +    def untrack(self): +        _testcapi.tracemalloc_untrack(self.domain, self.ptr) + +    def get_traced_memory(self): +        # Get the traced size in the domain +        snapshot = tracemalloc.take_snapshot() +        domain_filter = tracemalloc.DomainFilter(True, self.domain) +        snapshot = snapshot.filter_traces([domain_filter]) +        return sum(trace.size for trace in snapshot.traces) + +    def check_track(self, release_gil): +        nframe = 5 +        tracemalloc.start(nframe) + +        size = tracemalloc.get_traced_memory()[0] + +        frames = self.track(release_gil, nframe) +        self.assertEqual(self.get_traceback(), +                         tracemalloc.Traceback(frames)) + +        self.assertEqual(self.get_traced_memory(), self.size) + +    def test_track(self): +        self.check_track(False) + +    def test_track_without_gil(self): +        # check that calling _PyTraceMalloc_Track() without holding the GIL +        # works too +        self.check_track(True) + +    def test_track_already_tracked(self): +        nframe = 5 +        tracemalloc.start(nframe) + +        # track a first time +        self.track() + +        # calling _PyTraceMalloc_Track() must remove the old trace and add +        # a new trace with the new traceback +        frames = self.track(nframe=nframe) +        self.assertEqual(self.get_traceback(), +                         tracemalloc.Traceback(frames)) + +    def test_untrack(self): +        tracemalloc.start() + +        self.track() +        self.assertIsNotNone(self.get_traceback()) +        self.assertEqual(self.get_traced_memory(), self.size) + +        # untrack must remove the trace +        self.untrack() +        self.assertIsNone(self.get_traceback()) +        self.assertEqual(self.get_traced_memory(), 0) + +        # calling _PyTraceMalloc_Untrack() multiple times must not crash +        self.untrack() +        self.untrack() + +    def test_stop_track(self): +        tracemalloc.start() +        tracemalloc.stop() + +        with self.assertRaises(RuntimeError): +            self.track() +        self.assertIsNone(self.get_traceback()) + +    def test_stop_untrack(self): +        tracemalloc.start() +        self.track() + +        tracemalloc.stop() +        with self.assertRaises(RuntimeError): +            self.untrack() + +  def test_main():      support.run_unittest(          TestTracemallocEnabled,          TestSnapshot,          TestFilters,          TestCommandLine, +        TestCAPI,      )  if __name__ == "__main__": | 
