summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2013-11-23 11:27:24 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2013-11-23 11:27:24 (GMT)
commited3b0bca3ef9d7bdbb8bd8e67e60e85f5a336da0 (patch)
treefd4390855d293372f73048fdf4b3e6b4a7cdf440 /Lib
parent0fb6072fad411eba171b53037bcc04d07c7b0770 (diff)
downloadcpython-ed3b0bca3ef9d7bdbb8bd8e67e60e85f5a336da0.zip
cpython-ed3b0bca3ef9d7bdbb8bd8e67e60e85f5a336da0.tar.gz
cpython-ed3b0bca3ef9d7bdbb8bd8e67e60e85f5a336da0.tar.bz2
Issue #18874: Implement the PEP 454 (tracemalloc)
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/support/__init__.py19
-rw-r--r--Lib/test/test_atexit.py4
-rw-r--r--Lib/test/test_capi.py2
-rw-r--r--Lib/test/test_threading.py4
-rw-r--r--Lib/test/test_tracemalloc.py797
-rw-r--r--Lib/tracemalloc.py464
6 files changed, 1285 insertions, 5 deletions
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 87c039a..a242a47 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -2154,3 +2154,22 @@ def patch(test_instance, object_to_patch, attr_name, new_value):
# actually override the attribute
setattr(object_to_patch, attr_name, new_value)
+
+
+def run_in_subinterp(code):
+ """
+ Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
+ module is enabled.
+ """
+ # Issue #10915, #15751: PyGILState_*() functions don't work with
+ # sub-interpreters, the tracemalloc module uses these functions internally
+ try:
+ import tracemalloc
+ except ImportError:
+ pass
+ else:
+ if tracemalloc.is_tracing():
+ raise unittest.SkipTest("run_in_subinterp() cannot be used "
+ "if tracemalloc module is tracing "
+ "memory allocations")
+ return _testcapi.run_in_subinterp(code)
diff --git a/Lib/test/test_atexit.py b/Lib/test/test_atexit.py
index b641015..84644f1 100644
--- a/Lib/test/test_atexit.py
+++ b/Lib/test/test_atexit.py
@@ -158,7 +158,7 @@ class SubinterpreterTest(unittest.TestCase):
atexit.register(f)
del atexit
"""
- ret = _testcapi.run_in_subinterp(code)
+ ret = support.run_in_subinterp(code)
self.assertEqual(ret, 0)
self.assertEqual(atexit._ncallbacks(), n)
@@ -173,7 +173,7 @@ class SubinterpreterTest(unittest.TestCase):
atexit.register(f)
atexit.__atexit = atexit
"""
- ret = _testcapi.run_in_subinterp(code)
+ ret = support.run_in_subinterp(code)
self.assertEqual(ret, 0)
self.assertEqual(atexit._ncallbacks(), n)
diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py
index d37c057..000079e 100644
--- a/Lib/test/test_capi.py
+++ b/Lib/test/test_capi.py
@@ -205,7 +205,7 @@ class SubinterpreterTest(unittest.TestCase):
pickle.dump(id(builtins), f)
""".format(w)
with open(r, "rb") as f:
- ret = _testcapi.run_in_subinterp(code)
+ ret = support.run_in_subinterp(code)
self.assertEqual(ret, 0)
self.assertNotEqual(pickle.load(f), id(sys.modules))
self.assertNotEqual(pickle.load(f), id(builtins))
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 66eace0..a84577c 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -853,7 +853,7 @@ class SubinterpThreadingTests(BaseTestCase):
os.write(%d, b"x")
threading.Thread(target=f).start()
""" % (w,)
- ret = _testcapi.run_in_subinterp(code)
+ ret = test.support.run_in_subinterp(code)
self.assertEqual(ret, 0)
# The thread was joined properly.
self.assertEqual(os.read(r, 1), b"x")
@@ -885,7 +885,7 @@ class SubinterpThreadingTests(BaseTestCase):
os.write(%d, b"x")
threading.Thread(target=f).start()
""" % (w,)
- ret = _testcapi.run_in_subinterp(code)
+ ret = test.support.run_in_subinterp(code)
self.assertEqual(ret, 0)
# The thread was joined properly.
self.assertEqual(os.read(r, 1), b"x")
diff --git a/Lib/test/test_tracemalloc.py b/Lib/test/test_tracemalloc.py
new file mode 100644
index 0000000..ecb5aee
--- /dev/null
+++ b/Lib/test/test_tracemalloc.py
@@ -0,0 +1,797 @@
+import _tracemalloc
+import contextlib
+import datetime
+import os
+import sys
+import tracemalloc
+import unittest
+from unittest.mock import patch
+from test.script_helper import assert_python_ok, assert_python_failure
+from test import support
+try:
+ import threading
+except ImportError:
+ threading = None
+
+EMPTY_STRING_SIZE = sys.getsizeof(b'')
+
+def get_frames(nframe, lineno_delta):
+ frames = []
+ frame = sys._getframe(1)
+ for index in range(nframe):
+ code = frame.f_code
+ lineno = frame.f_lineno + lineno_delta
+ frames.append((code.co_filename, lineno))
+ lineno_delta = 0
+ frame = frame.f_back
+ if frame is None:
+ break
+ return tuple(frames)
+
+def allocate_bytes(size):
+ nframe = tracemalloc.get_traceback_limit()
+ bytes_len = (size - EMPTY_STRING_SIZE)
+ frames = get_frames(nframe, 1)
+ data = b'x' * bytes_len
+ return data, tracemalloc.Traceback(frames)
+
+def create_snapshots():
+ traceback_limit = 2
+
+ raw_traces = [
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+
+ (2, (('a.py', 5), ('b.py', 4))),
+
+ (66, (('b.py', 1),)),
+
+ (7, (('<unknown>', 0),)),
+ ]
+ snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
+
+ raw_traces2 = [
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+
+ (2, (('a.py', 5), ('b.py', 4))),
+ (5000, (('a.py', 5), ('b.py', 4))),
+
+ (400, (('c.py', 578),)),
+ ]
+ snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
+
+ return (snapshot, snapshot2)
+
+def frame(filename, lineno):
+ return tracemalloc._Frame((filename, lineno))
+
+def traceback(*frames):
+ return tracemalloc.Traceback(frames)
+
+def traceback_lineno(filename, lineno):
+ return traceback((filename, lineno))
+
+def traceback_filename(filename):
+ return traceback_lineno(filename, 0)
+
+
+class TestTracemallocEnabled(unittest.TestCase):
+ def setUp(self):
+ if tracemalloc.is_tracing():
+ self.skipTest("tracemalloc must be stopped before the test")
+
+ tracemalloc.set_traceback_limit(1)
+ tracemalloc.start()
+
+ def tearDown(self):
+ tracemalloc.stop()
+
+ def test_get_tracemalloc_memory(self):
+ data = [allocate_bytes(123) for count in range(1000)]
+ size = tracemalloc.get_tracemalloc_memory()
+ self.assertGreaterEqual(size, 0)
+
+ tracemalloc.clear_traces()
+ size2 = tracemalloc.get_tracemalloc_memory()
+ self.assertGreaterEqual(size2, 0)
+ self.assertLessEqual(size2, size)
+
+ def test_get_object_traceback(self):
+ tracemalloc.clear_traces()
+ obj_size = 12345
+ obj, obj_traceback = allocate_bytes(obj_size)
+ traceback = tracemalloc.get_object_traceback(obj)
+ self.assertEqual(traceback, obj_traceback)
+
+ def test_set_traceback_limit(self):
+ obj_size = 10
+
+ nframe = tracemalloc.get_traceback_limit()
+ self.addCleanup(tracemalloc.set_traceback_limit, nframe)
+
+ self.assertRaises(ValueError, tracemalloc.set_traceback_limit, -1)
+
+ tracemalloc.clear_traces()
+ tracemalloc.set_traceback_limit(10)
+ obj2, obj2_traceback = allocate_bytes(obj_size)
+ traceback = tracemalloc.get_object_traceback(obj2)
+ self.assertEqual(len(traceback), 10)
+ self.assertEqual(traceback, obj2_traceback)
+
+ tracemalloc.clear_traces()
+ tracemalloc.set_traceback_limit(1)
+ obj, obj_traceback = allocate_bytes(obj_size)
+ traceback = tracemalloc.get_object_traceback(obj)
+ self.assertEqual(len(traceback), 1)
+ self.assertEqual(traceback, obj_traceback)
+
+
+ def find_trace(self, traces, traceback):
+ for trace in traces:
+ if trace[1] == traceback._frames:
+ return trace
+
+ self.fail("trace not found")
+
+ def test_get_traces(self):
+ tracemalloc.clear_traces()
+ obj_size = 12345
+ obj, obj_traceback = allocate_bytes(obj_size)
+
+ traces = tracemalloc._get_traces()
+ trace = self.find_trace(traces, obj_traceback)
+
+ self.assertIsInstance(trace, tuple)
+ size, traceback = trace
+ self.assertEqual(size, obj_size)
+ self.assertEqual(traceback, obj_traceback._frames)
+
+ tracemalloc.stop()
+ self.assertEqual(tracemalloc._get_traces(), [])
+
+
+ def test_get_traces_intern_traceback(self):
+ # dummy wrappers to get more useful and identical frames in the traceback
+ def allocate_bytes2(size):
+ return allocate_bytes(size)
+ def allocate_bytes3(size):
+ return allocate_bytes2(size)
+ def allocate_bytes4(size):
+ return allocate_bytes3(size)
+
+ # Ensure that two identical tracebacks are not duplicated
+ tracemalloc.clear_traces()
+ tracemalloc.set_traceback_limit(4)
+ obj_size = 123
+ obj1, obj1_traceback = allocate_bytes4(obj_size)
+ obj2, obj2_traceback = allocate_bytes4(obj_size)
+
+ traces = tracemalloc._get_traces()
+
+ trace1 = self.find_trace(traces, obj1_traceback)
+ trace2 = self.find_trace(traces, obj2_traceback)
+ size1, traceback1 = trace1
+ size2, traceback2 = trace2
+ self.assertEqual(traceback2, traceback1)
+ self.assertIs(traceback2, traceback1)
+
+ def test_get_traced_memory(self):
+ # Python allocates some internals objects, so the test must tolerate
+ # a small difference between the expected size and the real usage
+ max_error = 2048
+
+ # allocate one object
+ obj_size = 1024 * 1024
+ tracemalloc.clear_traces()
+ obj, obj_traceback = allocate_bytes(obj_size)
+ size, max_size = tracemalloc.get_traced_memory()
+ self.assertGreaterEqual(size, obj_size)
+ self.assertGreaterEqual(max_size, size)
+
+ self.assertLessEqual(size - obj_size, max_error)
+ self.assertLessEqual(max_size - size, max_error)
+
+ # destroy the object
+ obj = None
+ size2, max_size2 = tracemalloc.get_traced_memory()
+ self.assertLess(size2, size)
+ self.assertGreaterEqual(size - size2, obj_size - max_error)
+ self.assertGreaterEqual(max_size2, max_size)
+
+ # clear_traces() must reset traced memory counters
+ tracemalloc.clear_traces()
+ self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
+
+ # allocate another object
+ obj, obj_traceback = allocate_bytes(obj_size)
+ size, max_size = tracemalloc.get_traced_memory()
+ self.assertGreater(size, 0)
+
+ # stop() rests also traced memory counters
+ tracemalloc.stop()
+ self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
+
+ def test_clear_traces(self):
+ obj, obj_traceback = allocate_bytes(123)
+ traceback = tracemalloc.get_object_traceback(obj)
+ self.assertIsNotNone(traceback)
+
+ tracemalloc.clear_traces()
+ traceback2 = tracemalloc.get_object_traceback(obj)
+ self.assertIsNone(traceback2)
+
+ def test_is_tracing(self):
+ tracemalloc.stop()
+ self.assertFalse(tracemalloc.is_tracing())
+
+ tracemalloc.start()
+ self.assertTrue(tracemalloc.is_tracing())
+
+ def test_snapshot(self):
+ obj, source = allocate_bytes(123)
+
+ # take a snapshot
+ snapshot = tracemalloc.take_snapshot()
+
+ # write on disk
+ snapshot.dump(support.TESTFN)
+ self.addCleanup(support.unlink, support.TESTFN)
+
+ # load from disk
+ snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
+ self.assertEqual(snapshot2.traces, snapshot.traces)
+
+ # tracemalloc must be tracing memory allocations to take a snapshot
+ tracemalloc.stop()
+ with self.assertRaises(RuntimeError) as cm:
+ tracemalloc.take_snapshot()
+ self.assertEqual(str(cm.exception),
+ "the tracemalloc module must be tracing memory "
+ "allocations to take a snapshot")
+
+ def test_snapshot_save_attr(self):
+ # take a snapshot with a new attribute
+ snapshot = tracemalloc.take_snapshot()
+ snapshot.test_attr = "new"
+ snapshot.dump(support.TESTFN)
+ self.addCleanup(support.unlink, support.TESTFN)
+
+ # load() should recreates the attribute
+ snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
+ self.assertEqual(snapshot2.test_attr, "new")
+
+ def fork_child(self):
+ if not tracemalloc.is_tracing():
+ return 2
+
+ obj_size = 12345
+ obj, obj_traceback = allocate_bytes(obj_size)
+ traceback = tracemalloc.get_object_traceback(obj)
+ if traceback is None:
+ return 3
+
+ # everything is fine
+ return 0
+
+ @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
+ def test_fork(self):
+ # check that tracemalloc is still working after fork
+ pid = os.fork()
+ if not pid:
+ # child
+ exitcode = 1
+ try:
+ exitcode = self.fork_child()
+ finally:
+ os._exit(exitcode)
+ else:
+ pid2, status = os.waitpid(pid, 0)
+ self.assertTrue(os.WIFEXITED(status))
+ exitcode = os.WEXITSTATUS(status)
+ self.assertEqual(exitcode, 0)
+
+
+class TestSnapshot(unittest.TestCase):
+ maxDiff = 4000
+
+ def test_create_snapshot(self):
+ raw_traces = [(5, (('a.py', 2),))]
+
+ with contextlib.ExitStack() as stack:
+ stack.enter_context(patch.object(tracemalloc, 'is_tracing',
+ return_value=True))
+ stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
+ return_value=5))
+ stack.enter_context(patch.object(tracemalloc, '_get_traces',
+ return_value=raw_traces))
+
+ snapshot = tracemalloc.take_snapshot()
+ self.assertEqual(snapshot.traceback_limit, 5)
+ self.assertEqual(len(snapshot.traces), 1)
+ trace = snapshot.traces[0]
+ self.assertEqual(trace.size, 5)
+ self.assertEqual(len(trace.traceback), 1)
+ self.assertEqual(trace.traceback[0].filename, 'a.py')
+ self.assertEqual(trace.traceback[0].lineno, 2)
+
+ def test_filter_traces(self):
+ snapshot, snapshot2 = create_snapshots()
+ filter1 = tracemalloc.Filter(False, "b.py")
+ filter2 = tracemalloc.Filter(True, "a.py", 2)
+ filter3 = tracemalloc.Filter(True, "a.py", 5)
+
+ original_traces = list(snapshot.traces._traces)
+
+ # exclude b.py
+ snapshot3 = snapshot.filter_traces((filter1,))
+ self.assertEqual(snapshot3.traces._traces, [
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+ (2, (('a.py', 5), ('b.py', 4))),
+ (7, (('<unknown>', 0),)),
+ ])
+
+ # filter_traces() must not touch the original snapshot
+ self.assertEqual(snapshot.traces._traces, original_traces)
+
+ # only include two lines of a.py
+ snapshot4 = snapshot3.filter_traces((filter2, filter3))
+ self.assertEqual(snapshot4.traces._traces, [
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+ (2, (('a.py', 5), ('b.py', 4))),
+ ])
+
+ # No filter: just duplicate the snapshot
+ snapshot5 = snapshot.filter_traces(())
+ self.assertIsNot(snapshot5, snapshot)
+ self.assertIsNot(snapshot5.traces, snapshot.traces)
+ self.assertEqual(snapshot5.traces, snapshot.traces)
+
+ def test_snapshot_group_by_line(self):
+ snapshot, snapshot2 = create_snapshots()
+ tb_0 = traceback_lineno('<unknown>', 0)
+ tb_a_2 = traceback_lineno('a.py', 2)
+ tb_a_5 = traceback_lineno('a.py', 5)
+ tb_b_1 = traceback_lineno('b.py', 1)
+ tb_c_578 = traceback_lineno('c.py', 578)
+
+ # stats per file and line
+ stats1 = snapshot.statistics('lineno')
+ self.assertEqual(stats1, [
+ tracemalloc.Statistic(tb_b_1, 66, 1),
+ tracemalloc.Statistic(tb_a_2, 30, 3),
+ tracemalloc.Statistic(tb_0, 7, 1),
+ tracemalloc.Statistic(tb_a_5, 2, 1),
+ ])
+
+ # stats per file and line (2)
+ stats2 = snapshot2.statistics('lineno')
+ self.assertEqual(stats2, [
+ tracemalloc.Statistic(tb_a_5, 5002, 2),
+ tracemalloc.Statistic(tb_c_578, 400, 1),
+ tracemalloc.Statistic(tb_a_2, 30, 3),
+ ])
+
+ # stats diff per file and line
+ statistics = snapshot2.compare_to(snapshot, 'lineno')
+ self.assertEqual(statistics, [
+ tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1),
+ tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1),
+ tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1),
+ tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
+ tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0),
+ ])
+
+ def test_snapshot_group_by_file(self):
+ snapshot, snapshot2 = create_snapshots()
+ tb_0 = traceback_filename('<unknown>')
+ tb_a = traceback_filename('a.py')
+ tb_b = traceback_filename('b.py')
+ tb_c = traceback_filename('c.py')
+
+ # stats per file
+ stats1 = snapshot.statistics('filename')
+ self.assertEqual(stats1, [
+ tracemalloc.Statistic(tb_b, 66, 1),
+ tracemalloc.Statistic(tb_a, 32, 4),
+ tracemalloc.Statistic(tb_0, 7, 1),
+ ])
+
+ # stats per file (2)
+ stats2 = snapshot2.statistics('filename')
+ self.assertEqual(stats2, [
+ tracemalloc.Statistic(tb_a, 5032, 5),
+ tracemalloc.Statistic(tb_c, 400, 1),
+ ])
+
+ # stats diff per file
+ diff = snapshot2.compare_to(snapshot, 'filename')
+ self.assertEqual(diff, [
+ tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1),
+ tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1),
+ tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1),
+ tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
+ ])
+
+ def test_snapshot_group_by_traceback(self):
+ snapshot, snapshot2 = create_snapshots()
+
+ # stats per file
+ tb1 = traceback(('a.py', 2), ('b.py', 4))
+ tb2 = traceback(('a.py', 5), ('b.py', 4))
+ tb3 = traceback(('b.py', 1))
+ tb4 = traceback(('<unknown>', 0))
+ stats1 = snapshot.statistics('traceback')
+ self.assertEqual(stats1, [
+ tracemalloc.Statistic(tb3, 66, 1),
+ tracemalloc.Statistic(tb1, 30, 3),
+ tracemalloc.Statistic(tb4, 7, 1),
+ tracemalloc.Statistic(tb2, 2, 1),
+ ])
+
+ # stats per file (2)
+ tb5 = traceback(('c.py', 578))
+ stats2 = snapshot2.statistics('traceback')
+ self.assertEqual(stats2, [
+ tracemalloc.Statistic(tb2, 5002, 2),
+ tracemalloc.Statistic(tb5, 400, 1),
+ tracemalloc.Statistic(tb1, 30, 3),
+ ])
+
+ # stats diff per file
+ diff = snapshot2.compare_to(snapshot, 'traceback')
+ self.assertEqual(diff, [
+ tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1),
+ tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1),
+ tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1),
+ tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1),
+ tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0),
+ ])
+
+ self.assertRaises(ValueError,
+ snapshot.statistics, 'traceback', cumulative=True)
+
+ def test_snapshot_group_by_cumulative(self):
+ snapshot, snapshot2 = create_snapshots()
+ tb_0 = traceback_filename('<unknown>')
+ tb_a = traceback_filename('a.py')
+ tb_b = traceback_filename('b.py')
+ tb_a_2 = traceback_lineno('a.py', 2)
+ tb_a_5 = traceback_lineno('a.py', 5)
+ tb_b_1 = traceback_lineno('b.py', 1)
+ tb_b_4 = traceback_lineno('b.py', 4)
+
+ # per file
+ stats = snapshot.statistics('filename', True)
+ self.assertEqual(stats, [
+ tracemalloc.Statistic(tb_b, 98, 5),
+ tracemalloc.Statistic(tb_a, 32, 4),
+ tracemalloc.Statistic(tb_0, 7, 1),
+ ])
+
+ # per line
+ stats = snapshot.statistics('lineno', True)
+ self.assertEqual(stats, [
+ tracemalloc.Statistic(tb_b_1, 66, 1),
+ tracemalloc.Statistic(tb_b_4, 32, 4),
+ tracemalloc.Statistic(tb_a_2, 30, 3),
+ tracemalloc.Statistic(tb_0, 7, 1),
+ tracemalloc.Statistic(tb_a_5, 2, 1),
+ ])
+
+ def test_trace_format(self):
+ snapshot, snapshot2 = create_snapshots()
+ trace = snapshot.traces[0]
+ self.assertEqual(str(trace), 'a.py:2: 10 B')
+ traceback = trace.traceback
+ self.assertEqual(str(traceback), 'a.py:2')
+ frame = traceback[0]
+ self.assertEqual(str(frame), 'a.py:2')
+
+ def test_statistic_format(self):
+ snapshot, snapshot2 = create_snapshots()
+ stats = snapshot.statistics('lineno')
+ stat = stats[0]
+ self.assertEqual(str(stat),
+ 'b.py:1: size=66 B, count=1, average=66 B')
+
+ def test_statistic_diff_format(self):
+ snapshot, snapshot2 = create_snapshots()
+ stats = snapshot2.compare_to(snapshot, 'lineno')
+ stat = stats[0]
+ self.assertEqual(str(stat),
+ 'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B')
+
+
+
+class TestFilters(unittest.TestCase):
+ maxDiff = 2048
+
+ def test_filter_attributes(self):
+ # test default values
+ f = tracemalloc.Filter(True, "abc")
+ self.assertEqual(f.inclusive, True)
+ self.assertEqual(f.filename_pattern, "abc")
+ self.assertIsNone(f.lineno)
+ self.assertEqual(f.all_frames, False)
+
+ # test custom values
+ f = tracemalloc.Filter(False, "test.py", 123, True)
+ self.assertEqual(f.inclusive, False)
+ self.assertEqual(f.filename_pattern, "test.py")
+ self.assertEqual(f.lineno, 123)
+ self.assertEqual(f.all_frames, True)
+
+ # parameters passed by keyword
+ f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True)
+ self.assertEqual(f.inclusive, False)
+ self.assertEqual(f.filename_pattern, "test.py")
+ self.assertEqual(f.lineno, 123)
+ self.assertEqual(f.all_frames, True)
+
+ # read-only attribute
+ self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc")
+
+ def test_filter_match(self):
+ # filter without line number
+ f = tracemalloc.Filter(True, "abc")
+ self.assertTrue(f._match_frame("abc", 0))
+ self.assertTrue(f._match_frame("abc", 5))
+ self.assertTrue(f._match_frame("abc", 10))
+ self.assertFalse(f._match_frame("12356", 0))
+ self.assertFalse(f._match_frame("12356", 5))
+ self.assertFalse(f._match_frame("12356", 10))
+
+ f = tracemalloc.Filter(False, "abc")
+ self.assertFalse(f._match_frame("abc", 0))
+ self.assertFalse(f._match_frame("abc", 5))
+ self.assertFalse(f._match_frame("abc", 10))
+ self.assertTrue(f._match_frame("12356", 0))
+ self.assertTrue(f._match_frame("12356", 5))
+ self.assertTrue(f._match_frame("12356", 10))
+
+ # filter with line number > 0
+ f = tracemalloc.Filter(True, "abc", 5)
+ self.assertFalse(f._match_frame("abc", 0))
+ self.assertTrue(f._match_frame("abc", 5))
+ self.assertFalse(f._match_frame("abc", 10))
+ self.assertFalse(f._match_frame("12356", 0))
+ self.assertFalse(f._match_frame("12356", 5))
+ self.assertFalse(f._match_frame("12356", 10))
+
+ f = tracemalloc.Filter(False, "abc", 5)
+ self.assertTrue(f._match_frame("abc", 0))
+ self.assertFalse(f._match_frame("abc", 5))
+ self.assertTrue(f._match_frame("abc", 10))
+ self.assertTrue(f._match_frame("12356", 0))
+ self.assertTrue(f._match_frame("12356", 5))
+ self.assertTrue(f._match_frame("12356", 10))
+
+ # filter with line number 0
+ f = tracemalloc.Filter(True, "abc", 0)
+ self.assertTrue(f._match_frame("abc", 0))
+ self.assertFalse(f._match_frame("abc", 5))
+ self.assertFalse(f._match_frame("abc", 10))
+ self.assertFalse(f._match_frame("12356", 0))
+ self.assertFalse(f._match_frame("12356", 5))
+ self.assertFalse(f._match_frame("12356", 10))
+
+ f = tracemalloc.Filter(False, "abc", 0)
+ self.assertFalse(f._match_frame("abc", 0))
+ self.assertTrue(f._match_frame("abc", 5))
+ self.assertTrue(f._match_frame("abc", 10))
+ self.assertTrue(f._match_frame("12356", 0))
+ self.assertTrue(f._match_frame("12356", 5))
+ self.assertTrue(f._match_frame("12356", 10))
+
+ def test_filter_match_filename(self):
+ def fnmatch(inclusive, filename, pattern):
+ f = tracemalloc.Filter(inclusive, pattern)
+ return f._match_frame(filename, 0)
+
+ self.assertTrue(fnmatch(True, "abc", "abc"))
+ self.assertFalse(fnmatch(True, "12356", "abc"))
+ self.assertFalse(fnmatch(True, "<unknown>", "abc"))
+
+ self.assertFalse(fnmatch(False, "abc", "abc"))
+ self.assertTrue(fnmatch(False, "12356", "abc"))
+ self.assertTrue(fnmatch(False, "<unknown>", "abc"))
+
+ def test_filter_match_filename_joker(self):
+ def fnmatch(filename, pattern):
+ filter = tracemalloc.Filter(True, pattern)
+ return filter._match_frame(filename, 0)
+
+ # empty string
+ self.assertFalse(fnmatch('abc', ''))
+ self.assertFalse(fnmatch('', 'abc'))
+ self.assertTrue(fnmatch('', ''))
+ self.assertTrue(fnmatch('', '*'))
+
+ # no *
+ self.assertTrue(fnmatch('abc', 'abc'))
+ self.assertFalse(fnmatch('abc', 'abcd'))
+ self.assertFalse(fnmatch('abc', 'def'))
+
+ # a*
+ self.assertTrue(fnmatch('abc', 'a*'))
+ self.assertTrue(fnmatch('abc', 'abc*'))
+ self.assertFalse(fnmatch('abc', 'b*'))
+ self.assertFalse(fnmatch('abc', 'abcd*'))
+
+ # a*b
+ self.assertTrue(fnmatch('abc', 'a*c'))
+ self.assertTrue(fnmatch('abcdcx', 'a*cx'))
+ self.assertFalse(fnmatch('abb', 'a*c'))
+ self.assertFalse(fnmatch('abcdce', 'a*cx'))
+
+ # a*b*c
+ self.assertTrue(fnmatch('abcde', 'a*c*e'))
+ self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg'))
+ self.assertFalse(fnmatch('abcdd', 'a*c*e'))
+ self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg'))
+
+ # replace .pyc and .pyo suffix with .py
+ self.assertTrue(fnmatch('a.pyc', 'a.py'))
+ self.assertTrue(fnmatch('a.pyo', 'a.py'))
+ self.assertTrue(fnmatch('a.py', 'a.pyc'))
+ self.assertTrue(fnmatch('a.py', 'a.pyo'))
+
+ if os.name == 'nt':
+ # case insensitive
+ self.assertTrue(fnmatch('aBC', 'ABc'))
+ self.assertTrue(fnmatch('aBcDe', 'Ab*dE'))
+
+ self.assertTrue(fnmatch('a.pyc', 'a.PY'))
+ self.assertTrue(fnmatch('a.PYO', 'a.py'))
+ self.assertTrue(fnmatch('a.py', 'a.PYC'))
+ self.assertTrue(fnmatch('a.PY', 'a.pyo'))
+ else:
+ # case sensitive
+ self.assertFalse(fnmatch('aBC', 'ABc'))
+ self.assertFalse(fnmatch('aBcDe', 'Ab*dE'))
+
+ self.assertFalse(fnmatch('a.pyc', 'a.PY'))
+ self.assertFalse(fnmatch('a.PYO', 'a.py'))
+ self.assertFalse(fnmatch('a.py', 'a.PYC'))
+ self.assertFalse(fnmatch('a.PY', 'a.pyo'))
+
+ if os.name == 'nt':
+ # normalize alternate separator "/" to the standard separator "\"
+ self.assertTrue(fnmatch(r'a/b', r'a\b'))
+ self.assertTrue(fnmatch(r'a\b', r'a/b'))
+ self.assertTrue(fnmatch(r'a/b\c', r'a\b/c'))
+ self.assertTrue(fnmatch(r'a/b/c', r'a\b\c'))
+ else:
+ # there is no alternate separator
+ self.assertFalse(fnmatch(r'a/b', r'a\b'))
+ self.assertFalse(fnmatch(r'a\b', r'a/b'))
+ self.assertFalse(fnmatch(r'a/b\c', r'a\b/c'))
+ self.assertFalse(fnmatch(r'a/b/c', r'a\b\c'))
+
+ def test_filter_match_trace(self):
+ t1 = (("a.py", 2), ("b.py", 3))
+ t2 = (("b.py", 4), ("b.py", 5))
+ t3 = (("c.py", 5), ('<unknown>', 0))
+ unknown = (('<unknown>', 0),)
+
+ f = tracemalloc.Filter(True, "b.py", all_frames=True)
+ self.assertTrue(f._match_traceback(t1))
+ self.assertTrue(f._match_traceback(t2))
+ self.assertFalse(f._match_traceback(t3))
+ self.assertFalse(f._match_traceback(unknown))
+
+ f = tracemalloc.Filter(True, "b.py", all_frames=False)
+ self.assertFalse(f._match_traceback(t1))
+ self.assertTrue(f._match_traceback(t2))
+ self.assertFalse(f._match_traceback(t3))
+ self.assertFalse(f._match_traceback(unknown))
+
+ f = tracemalloc.Filter(False, "b.py", all_frames=True)
+ self.assertFalse(f._match_traceback(t1))
+ self.assertFalse(f._match_traceback(t2))
+ self.assertTrue(f._match_traceback(t3))
+ self.assertTrue(f._match_traceback(unknown))
+
+ f = tracemalloc.Filter(False, "b.py", all_frames=False)
+ self.assertTrue(f._match_traceback(t1))
+ self.assertFalse(f._match_traceback(t2))
+ self.assertTrue(f._match_traceback(t3))
+ self.assertTrue(f._match_traceback(unknown))
+
+ f = tracemalloc.Filter(False, "<unknown>", all_frames=False)
+ self.assertTrue(f._match_traceback(t1))
+ self.assertTrue(f._match_traceback(t2))
+ self.assertTrue(f._match_traceback(t3))
+ self.assertFalse(f._match_traceback(unknown))
+
+ f = tracemalloc.Filter(True, "<unknown>", all_frames=True)
+ self.assertFalse(f._match_traceback(t1))
+ self.assertFalse(f._match_traceback(t2))
+ self.assertTrue(f._match_traceback(t3))
+ self.assertTrue(f._match_traceback(unknown))
+
+ f = tracemalloc.Filter(False, "<unknown>", all_frames=True)
+ self.assertTrue(f._match_traceback(t1))
+ self.assertTrue(f._match_traceback(t2))
+ self.assertFalse(f._match_traceback(t3))
+ self.assertFalse(f._match_traceback(unknown))
+
+
+class TestCommandLine(unittest.TestCase):
+ def test_env_var(self):
+ # not tracing by default
+ code = 'import tracemalloc; print(tracemalloc.is_tracing())'
+ ok, stdout, stderr = assert_python_ok('-c', code)
+ stdout = stdout.rstrip()
+ self.assertEqual(stdout, b'False')
+
+ # PYTHON* environment varibles must be ignored when -E option is
+ # present
+ code = 'import tracemalloc; print(tracemalloc.is_tracing())'
+ ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1')
+ stdout = stdout.rstrip()
+ self.assertEqual(stdout, b'False')
+
+ # tracing at startup
+ code = 'import tracemalloc; print(tracemalloc.is_tracing())'
+ ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1')
+ stdout = stdout.rstrip()
+ self.assertEqual(stdout, b'True')
+
+ # start and set the number of frames
+ code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
+ ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10')
+ stdout = stdout.rstrip()
+ self.assertEqual(stdout, b'10')
+
+ def test_env_var_invalid(self):
+ for nframe in (-1, 0, 5000):
+ with self.subTest(nframe=nframe):
+ with support.SuppressCrashReport():
+ ok, stdout, stderr = assert_python_failure(
+ '-c', 'pass',
+ PYTHONTRACEMALLOC=str(nframe))
+ self.assertIn(b'PYTHONTRACEMALLOC must be an integer '
+ b'in range [1; 100]',
+ stderr)
+
+ def test_sys_xoptions(self):
+ for xoptions, nframe in (
+ ('tracemalloc', 1),
+ ('tracemalloc=1', 1),
+ ('tracemalloc=15', 15),
+ ):
+ with self.subTest(xoptions=xoptions, nframe=nframe):
+ code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
+ ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code)
+ stdout = stdout.rstrip()
+ self.assertEqual(stdout, str(nframe).encode('ascii'))
+
+ def test_sys_xoptions_invalid(self):
+ for nframe in (-1, 0, 5000):
+ with self.subTest(nframe=nframe):
+ with support.SuppressCrashReport():
+ args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass')
+ ok, stdout, stderr = assert_python_failure(*args)
+ self.assertIn(b'-X tracemalloc=NFRAME: number of frame must '
+ b'be an integer in range [1; 100]',
+ stderr)
+
+
+def test_main():
+ support.run_unittest(
+ TestTracemallocEnabled,
+ TestSnapshot,
+ TestFilters,
+ TestCommandLine,
+ )
+
+if __name__ == "__main__":
+ test_main()
diff --git a/Lib/tracemalloc.py b/Lib/tracemalloc.py
new file mode 100644
index 0000000..7780eca
--- /dev/null
+++ b/Lib/tracemalloc.py
@@ -0,0 +1,464 @@
+from collections import Sequence
+from functools import total_ordering
+import fnmatch
+import os.path
+import pickle
+
+# Import types and functions implemented in C
+from _tracemalloc import *
+from _tracemalloc import _get_object_traceback, _get_traces
+
+
+def _format_size(size, sign):
+ for unit in ('B', 'KiB', 'MiB', 'GiB', 'TiB'):
+ if abs(size) < 100 and unit != 'B':
+ # 3 digits (xx.x UNIT)
+ if sign:
+ return "%+.1f %s" % (size, unit)
+ else:
+ return "%.1f %s" % (size, unit)
+ if abs(size) < 10 * 1024 or unit == 'TiB':
+ # 4 or 5 digits (xxxx UNIT)
+ if sign:
+ return "%+.0f %s" % (size, unit)
+ else:
+ return "%.0f %s" % (size, unit)
+ size /= 1024
+
+
+class Statistic:
+ """
+ Statistic difference on memory allocations between two Snapshot instance.
+ """
+
+ __slots__ = ('traceback', 'size', 'count')
+
+ def __init__(self, traceback, size, count):
+ self.traceback = traceback
+ self.size = size
+ self.count = count
+
+ def __hash__(self):
+ return (self.traceback, self.size, self.count)
+
+ def __eq__(self, other):
+ return (self.traceback == other.traceback
+ and self.size == other.size
+ and self.count == other.count)
+
+ def __str__(self):
+ text = ("%s: size=%s, count=%i"
+ % (self.traceback,
+ _format_size(self.size, False),
+ self.count))
+ if self.count:
+ average = self.size / self.count
+ text += ", average=%s" % _format_size(average, False)
+ return text
+
+ def __repr__(self):
+ return ('<Statistic traceback=%r size=%i count=%i>'
+ % (self.traceback, self.size, self.count))
+
+ def _sort_key(self):
+ return (self.size, self.count, self.traceback)
+
+
+class StatisticDiff:
+ """
+ Statistic difference on memory allocations between an old and a new
+ Snapshot instance.
+ """
+ __slots__ = ('traceback', 'size', 'size_diff', 'count', 'count_diff')
+
+ def __init__(self, traceback, size, size_diff, count, count_diff):
+ self.traceback = traceback
+ self.size = size
+ self.size_diff = size_diff
+ self.count = count
+ self.count_diff = count_diff
+
+ def __hash__(self):
+ return (self.traceback, self.size, self.size_diff,
+ self.count, self.count_diff)
+
+ def __eq__(self, other):
+ return (self.traceback == other.traceback
+ and self.size == other.size
+ and self.size_diff == other.size_diff
+ and self.count == other.count
+ and self.count_diff == other.count_diff)
+
+ def __str__(self):
+ text = ("%s: size=%s (%s), count=%i (%+i)"
+ % (self.traceback,
+ _format_size(self.size, False),
+ _format_size(self.size_diff, True),
+ self.count,
+ self.count_diff))
+ if self.count:
+ average = self.size / self.count
+ text += ", average=%s" % _format_size(average, False)
+ return text
+
+ def __repr__(self):
+ return ('<StatisticDiff traceback=%r size=%i (%+i) count=%i (%+i)>'
+ % (self.traceback, self.size, self.size_diff,
+
+ self.count, self.count_diff))
+
+ def _sort_key(self):
+ return (abs(self.size_diff), self.size,
+ abs(self.count_diff), self.count,
+ self.traceback)
+
+
+def _compare_grouped_stats(old_group, new_group):
+ statistics = []
+ for traceback, stat in new_group.items():
+ previous = old_group.pop(traceback, None)
+ if previous is not None:
+ stat = StatisticDiff(traceback,
+ stat.size, stat.size - previous.size,
+ stat.count, stat.count - previous.count)
+ else:
+ stat = StatisticDiff(traceback,
+ stat.size, stat.size,
+ stat.count, stat.count)
+ statistics.append(stat)
+
+ for traceback, stat in old_group.items():
+ stat = StatisticDiff(traceback, 0, -stat.size, 0, -stat.count)
+ statistics.append(stat)
+ return statistics
+
+
+@total_ordering
+class Frame:
+ """
+ Frame of a traceback.
+ """
+ __slots__ = ("_frame",)
+
+ def __init__(self, frame):
+ self._frame = frame
+
+ @property
+ def filename(self):
+ return self._frame[0]
+
+ @property
+ def lineno(self):
+ return self._frame[1]
+
+ def __eq__(self, other):
+ return (self._frame == other._frame)
+
+ def __lt__(self, other):
+ return (self._frame < other._frame)
+
+ def __hash__(self):
+ return hash(self._frame)
+
+ def __str__(self):
+ return "%s:%s" % (self.filename, self.lineno)
+
+ def __repr__(self):
+ return "<Frame filename=%r lineno=%r>" % (self.filename, self.lineno)
+
+
+@total_ordering
+class Traceback(Sequence):
+ """
+ Sequence of Frame instances sorted from the most recent frame
+ to the oldest frame.
+ """
+ __slots__ = ("_frames",)
+
+ def __init__(self, frames):
+ Sequence.__init__(self)
+ self._frames = frames
+
+ def __len__(self):
+ return len(self._frames)
+
+ def __getitem__(self, index):
+ trace = self._frames[index]
+ return Frame(trace)
+
+ def __contains__(self, frame):
+ return frame._frame in self._frames
+
+ def __hash__(self):
+ return hash(self._frames)
+
+ def __eq__(self, other):
+ return (self._frames == other._frames)
+
+ def __lt__(self, other):
+ return (self._frames < other._frames)
+
+ def __str__(self):
+ return str(self[0])
+
+ def __repr__(self):
+ return "<Traceback %r>" % (tuple(self),)
+
+
+def get_object_traceback(obj):
+ """
+ Get the traceback where the Python object *obj* was allocated.
+ Return a Traceback instance.
+
+ Return None if the tracemalloc module is not tracing memory allocations or
+ did not trace the allocation of the object.
+ """
+ frames = _get_object_traceback(obj)
+ if frames is not None:
+ return Traceback(frames)
+ else:
+ return None
+
+
+class Trace:
+ """
+ Trace of a memory block.
+ """
+ __slots__ = ("_trace",)
+
+ def __init__(self, trace):
+ self._trace = trace
+
+ @property
+ def size(self):
+ return self._trace[0]
+
+ @property
+ def traceback(self):
+ return Traceback(self._trace[1])
+
+ def __eq__(self, other):
+ return (self._trace == other._trace)
+
+ def __hash__(self):
+ return hash(self._trace)
+
+ def __str__(self):
+ return "%s: %s" % (self.traceback, _format_size(self.size, False))
+
+ def __repr__(self):
+ return ("<Trace size=%s, traceback=%r>"
+ % (_format_size(self.size, False), self.traceback))
+
+
+class _Traces(Sequence):
+ def __init__(self, traces):
+ Sequence.__init__(self)
+ self._traces = traces
+
+ def __len__(self):
+ return len(self._traces)
+
+ def __getitem__(self, index):
+ trace = self._traces[index]
+ return Trace(trace)
+
+ def __contains__(self, trace):
+ return trace._trace in self._traces
+
+ def __eq__(self, other):
+ return (self._traces == other._traces)
+
+ def __repr__(self):
+ return "<Traces len=%s>" % len(self)
+
+
+def _normalize_filename(filename):
+ filename = os.path.normcase(filename)
+ if filename.endswith(('.pyc', '.pyo')):
+ filename = filename[:-1]
+ return filename
+
+
+class Filter:
+ def __init__(self, inclusive, filename_pattern,
+ lineno=None, all_frames=False):
+ self.inclusive = inclusive
+ self._filename_pattern = _normalize_filename(filename_pattern)
+ self.lineno = lineno
+ self.all_frames = all_frames
+
+ @property
+ def filename_pattern(self):
+ return self._filename_pattern
+
+ def __match_frame(self, filename, lineno):
+ filename = _normalize_filename(filename)
+ if not fnmatch.fnmatch(filename, self._filename_pattern):
+ return False
+ if self.lineno is None:
+ return True
+ else:
+ return (lineno == self.lineno)
+
+ def _match_frame(self, filename, lineno):
+ return self.__match_frame(filename, lineno) ^ (not self.inclusive)
+
+ def _match_traceback(self, traceback):
+ if self.all_frames:
+ if any(self.__match_frame(filename, lineno)
+ for filename, lineno in traceback):
+ return self.inclusive
+ else:
+ return (not self.inclusive)
+ else:
+ filename, lineno = traceback[0]
+ return self._match_frame(filename, lineno)
+
+
+class Snapshot:
+ """
+ Snapshot of traces of memory blocks allocated by Python.
+ """
+
+ def __init__(self, traces, traceback_limit):
+ self.traces = _Traces(traces)
+ self.traceback_limit = traceback_limit
+
+ def dump(self, filename):
+ """
+ Write the snapshot into a file.
+ """
+ with open(filename, "wb") as fp:
+ pickle.dump(self, fp, pickle.HIGHEST_PROTOCOL)
+
+ @staticmethod
+ def load(filename):
+ """
+ Load a snapshot from a file.
+ """
+ with open(filename, "rb") as fp:
+ return pickle.load(fp)
+
+ def _filter_trace(self, include_filters, exclude_filters, trace):
+ traceback = trace[1]
+ if include_filters:
+ if not any(trace_filter._match_traceback(traceback)
+ for trace_filter in include_filters):
+ return False
+ if exclude_filters:
+ if any(not trace_filter._match_traceback(traceback)
+ for trace_filter in exclude_filters):
+ return False
+ return True
+
+ def filter_traces(self, filters):
+ """
+ Create a new Snapshot instance with a filtered traces sequence, filters
+ is a list of Filter instances. If filters is an empty list, return a
+ new Snapshot instance with a copy of the traces.
+ """
+ if filters:
+ include_filters = []
+ exclude_filters = []
+ for trace_filter in filters:
+ if trace_filter.inclusive:
+ include_filters.append(trace_filter)
+ else:
+ exclude_filters.append(trace_filter)
+ new_traces = [trace for trace in self.traces._traces
+ if self._filter_trace(include_filters,
+ exclude_filters,
+ trace)]
+ else:
+ new_traces = self.traces._traces.copy()
+ return Snapshot(new_traces, self.traceback_limit)
+
+ def _group_by(self, key_type, cumulative):
+ if key_type not in ('traceback', 'filename', 'lineno'):
+ raise ValueError("unknown key_type: %r" % (key_type,))
+ if cumulative and key_type not in ('lineno', 'filename'):
+ raise ValueError("cumulative mode cannot by used "
+ "with key type %r" % key_type)
+ if cumulative and self.traceback_limit < 2:
+ raise ValueError("cumulative mode needs tracebacks with at least "
+ "2 frames, traceback limit is %s"
+ % self.traceback_limit)
+
+ stats = {}
+ tracebacks = {}
+ if not cumulative:
+ for trace in self.traces._traces:
+ size, trace_traceback = trace
+ try:
+ traceback = tracebacks[trace_traceback]
+ except KeyError:
+ if key_type == 'traceback':
+ frames = trace_traceback
+ elif key_type == 'lineno':
+ frames = trace_traceback[:1]
+ else: # key_type == 'filename':
+ frames = ((trace_traceback[0][0], 0),)
+ traceback = Traceback(frames)
+ tracebacks[trace_traceback] = traceback
+ try:
+ stat = stats[traceback]
+ stat.size += size
+ stat.count += 1
+ except KeyError:
+ stats[traceback] = Statistic(traceback, size, 1)
+ else:
+ # cumulative statistics
+ for trace in self.traces._traces:
+ size, trace_traceback = trace
+ for frame in trace_traceback:
+ try:
+ traceback = tracebacks[frame]
+ except KeyError:
+ if key_type == 'lineno':
+ frames = (frame,)
+ else: # key_type == 'filename':
+ frames = ((frame[0], 0),)
+ traceback = Traceback(frames)
+ tracebacks[frame] = traceback
+ try:
+ stat = stats[traceback]
+ stat.size += size
+ stat.count += 1
+ except KeyError:
+ stats[traceback] = Statistic(traceback, size, 1)
+ return stats
+
+ def statistics(self, key_type, cumulative=False):
+ """
+ Group statistics by key_type. Return a sorted list of Statistic
+ instances.
+ """
+ grouped = self._group_by(key_type, cumulative)
+ statistics = list(grouped.values())
+ statistics.sort(reverse=True, key=Statistic._sort_key)
+ return statistics
+
+ def compare_to(self, old_snapshot, key_type, cumulative=False):
+ """
+ Compute the differences with an old snapshot old_snapshot. Get
+ statistics as a sorted list of StatisticDiff instances, grouped by
+ group_by.
+ """
+ new_group = self._group_by(key_type, cumulative)
+ old_group = old_snapshot._group_by(key_type, cumulative)
+ statistics = _compare_grouped_stats(old_group, new_group)
+ statistics.sort(reverse=True, key=StatisticDiff._sort_key)
+ return statistics
+
+
+def take_snapshot():
+ """
+ Take a snapshot of traces of memory blocks allocated by Python.
+ """
+ if not is_tracing():
+ raise RuntimeError("the tracemalloc module must be tracing memory "
+ "allocations to take a snapshot")
+ traces = _get_traces()
+ traceback_limit = get_traceback_limit()
+ return Snapshot(traces, traceback_limit)