summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2013-11-23 11:27:24 (GMT)
committerVictor Stinner <victor.stinner@gmail.com>2013-11-23 11:27:24 (GMT)
commited3b0bca3ef9d7bdbb8bd8e67e60e85f5a336da0 (patch)
treefd4390855d293372f73048fdf4b3e6b4a7cdf440 /Lib/test
parent0fb6072fad411eba171b53037bcc04d07c7b0770 (diff)
downloadcpython-ed3b0bca3ef9d7bdbb8bd8e67e60e85f5a336da0.zip
cpython-ed3b0bca3ef9d7bdbb8bd8e67e60e85f5a336da0.tar.gz
cpython-ed3b0bca3ef9d7bdbb8bd8e67e60e85f5a336da0.tar.bz2
Issue #18874: Implement the PEP 454 (tracemalloc)
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/support/__init__.py19
-rw-r--r--Lib/test/test_atexit.py4
-rw-r--r--Lib/test/test_capi.py2
-rw-r--r--Lib/test/test_threading.py4
-rw-r--r--Lib/test/test_tracemalloc.py797
5 files changed, 821 insertions, 5 deletions
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 87c039a..a242a47 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -2154,3 +2154,22 @@ def patch(test_instance, object_to_patch, attr_name, new_value):
# actually override the attribute
setattr(object_to_patch, attr_name, new_value)
+
+
+def run_in_subinterp(code):
+ """
+ Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
+ module is enabled.
+ """
+ # Issue #10915, #15751: PyGILState_*() functions don't work with
+ # sub-interpreters, the tracemalloc module uses these functions internally
+ try:
+ import tracemalloc
+ except ImportError:
+ pass
+ else:
+ if tracemalloc.is_tracing():
+ raise unittest.SkipTest("run_in_subinterp() cannot be used "
+ "if tracemalloc module is tracing "
+ "memory allocations")
+ return _testcapi.run_in_subinterp(code)
diff --git a/Lib/test/test_atexit.py b/Lib/test/test_atexit.py
index b641015..84644f1 100644
--- a/Lib/test/test_atexit.py
+++ b/Lib/test/test_atexit.py
@@ -158,7 +158,7 @@ class SubinterpreterTest(unittest.TestCase):
atexit.register(f)
del atexit
"""
- ret = _testcapi.run_in_subinterp(code)
+ ret = support.run_in_subinterp(code)
self.assertEqual(ret, 0)
self.assertEqual(atexit._ncallbacks(), n)
@@ -173,7 +173,7 @@ class SubinterpreterTest(unittest.TestCase):
atexit.register(f)
atexit.__atexit = atexit
"""
- ret = _testcapi.run_in_subinterp(code)
+ ret = support.run_in_subinterp(code)
self.assertEqual(ret, 0)
self.assertEqual(atexit._ncallbacks(), n)
diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py
index d37c057..000079e 100644
--- a/Lib/test/test_capi.py
+++ b/Lib/test/test_capi.py
@@ -205,7 +205,7 @@ class SubinterpreterTest(unittest.TestCase):
pickle.dump(id(builtins), f)
""".format(w)
with open(r, "rb") as f:
- ret = _testcapi.run_in_subinterp(code)
+ ret = support.run_in_subinterp(code)
self.assertEqual(ret, 0)
self.assertNotEqual(pickle.load(f), id(sys.modules))
self.assertNotEqual(pickle.load(f), id(builtins))
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 66eace0..a84577c 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -853,7 +853,7 @@ class SubinterpThreadingTests(BaseTestCase):
os.write(%d, b"x")
threading.Thread(target=f).start()
""" % (w,)
- ret = _testcapi.run_in_subinterp(code)
+ ret = test.support.run_in_subinterp(code)
self.assertEqual(ret, 0)
# The thread was joined properly.
self.assertEqual(os.read(r, 1), b"x")
@@ -885,7 +885,7 @@ class SubinterpThreadingTests(BaseTestCase):
os.write(%d, b"x")
threading.Thread(target=f).start()
""" % (w,)
- ret = _testcapi.run_in_subinterp(code)
+ ret = test.support.run_in_subinterp(code)
self.assertEqual(ret, 0)
# The thread was joined properly.
self.assertEqual(os.read(r, 1), b"x")
diff --git a/Lib/test/test_tracemalloc.py b/Lib/test/test_tracemalloc.py
new file mode 100644
index 0000000..ecb5aee
--- /dev/null
+++ b/Lib/test/test_tracemalloc.py
@@ -0,0 +1,797 @@
+import _tracemalloc
+import contextlib
+import datetime
+import os
+import sys
+import tracemalloc
+import unittest
+from unittest.mock import patch
+from test.script_helper import assert_python_ok, assert_python_failure
+from test import support
+try:
+ import threading
+except ImportError:
+ threading = None
+
+EMPTY_STRING_SIZE = sys.getsizeof(b'')
+
+def get_frames(nframe, lineno_delta):
+ frames = []
+ frame = sys._getframe(1)
+ for index in range(nframe):
+ code = frame.f_code
+ lineno = frame.f_lineno + lineno_delta
+ frames.append((code.co_filename, lineno))
+ lineno_delta = 0
+ frame = frame.f_back
+ if frame is None:
+ break
+ return tuple(frames)
+
+def allocate_bytes(size):
+ nframe = tracemalloc.get_traceback_limit()
+ bytes_len = (size - EMPTY_STRING_SIZE)
+ frames = get_frames(nframe, 1)
+ data = b'x' * bytes_len
+ return data, tracemalloc.Traceback(frames)
+
+def create_snapshots():
+ traceback_limit = 2
+
+ raw_traces = [
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+
+ (2, (('a.py', 5), ('b.py', 4))),
+
+ (66, (('b.py', 1),)),
+
+ (7, (('<unknown>', 0),)),
+ ]
+ snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
+
+ raw_traces2 = [
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+
+ (2, (('a.py', 5), ('b.py', 4))),
+ (5000, (('a.py', 5), ('b.py', 4))),
+
+ (400, (('c.py', 578),)),
+ ]
+ snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
+
+ return (snapshot, snapshot2)
+
+def frame(filename, lineno):
+ return tracemalloc._Frame((filename, lineno))
+
+def traceback(*frames):
+ return tracemalloc.Traceback(frames)
+
+def traceback_lineno(filename, lineno):
+ return traceback((filename, lineno))
+
+def traceback_filename(filename):
+ return traceback_lineno(filename, 0)
+
+
+class TestTracemallocEnabled(unittest.TestCase):
+ def setUp(self):
+ if tracemalloc.is_tracing():
+ self.skipTest("tracemalloc must be stopped before the test")
+
+ tracemalloc.set_traceback_limit(1)
+ tracemalloc.start()
+
+ def tearDown(self):
+ tracemalloc.stop()
+
+ def test_get_tracemalloc_memory(self):
+ data = [allocate_bytes(123) for count in range(1000)]
+ size = tracemalloc.get_tracemalloc_memory()
+ self.assertGreaterEqual(size, 0)
+
+ tracemalloc.clear_traces()
+ size2 = tracemalloc.get_tracemalloc_memory()
+ self.assertGreaterEqual(size2, 0)
+ self.assertLessEqual(size2, size)
+
+ def test_get_object_traceback(self):
+ tracemalloc.clear_traces()
+ obj_size = 12345
+ obj, obj_traceback = allocate_bytes(obj_size)
+ traceback = tracemalloc.get_object_traceback(obj)
+ self.assertEqual(traceback, obj_traceback)
+
+ def test_set_traceback_limit(self):
+ obj_size = 10
+
+ nframe = tracemalloc.get_traceback_limit()
+ self.addCleanup(tracemalloc.set_traceback_limit, nframe)
+
+ self.assertRaises(ValueError, tracemalloc.set_traceback_limit, -1)
+
+ tracemalloc.clear_traces()
+ tracemalloc.set_traceback_limit(10)
+ obj2, obj2_traceback = allocate_bytes(obj_size)
+ traceback = tracemalloc.get_object_traceback(obj2)
+ self.assertEqual(len(traceback), 10)
+ self.assertEqual(traceback, obj2_traceback)
+
+ tracemalloc.clear_traces()
+ tracemalloc.set_traceback_limit(1)
+ obj, obj_traceback = allocate_bytes(obj_size)
+ traceback = tracemalloc.get_object_traceback(obj)
+ self.assertEqual(len(traceback), 1)
+ self.assertEqual(traceback, obj_traceback)
+
+
+ def find_trace(self, traces, traceback):
+ for trace in traces:
+ if trace[1] == traceback._frames:
+ return trace
+
+ self.fail("trace not found")
+
+ def test_get_traces(self):
+ tracemalloc.clear_traces()
+ obj_size = 12345
+ obj, obj_traceback = allocate_bytes(obj_size)
+
+ traces = tracemalloc._get_traces()
+ trace = self.find_trace(traces, obj_traceback)
+
+ self.assertIsInstance(trace, tuple)
+ size, traceback = trace
+ self.assertEqual(size, obj_size)
+ self.assertEqual(traceback, obj_traceback._frames)
+
+ tracemalloc.stop()
+ self.assertEqual(tracemalloc._get_traces(), [])
+
+
+ def test_get_traces_intern_traceback(self):
+ # dummy wrappers to get more useful and identical frames in the traceback
+ def allocate_bytes2(size):
+ return allocate_bytes(size)
+ def allocate_bytes3(size):
+ return allocate_bytes2(size)
+ def allocate_bytes4(size):
+ return allocate_bytes3(size)
+
+ # Ensure that two identical tracebacks are not duplicated
+ tracemalloc.clear_traces()
+ tracemalloc.set_traceback_limit(4)
+ obj_size = 123
+ obj1, obj1_traceback = allocate_bytes4(obj_size)
+ obj2, obj2_traceback = allocate_bytes4(obj_size)
+
+ traces = tracemalloc._get_traces()
+
+ trace1 = self.find_trace(traces, obj1_traceback)
+ trace2 = self.find_trace(traces, obj2_traceback)
+ size1, traceback1 = trace1
+ size2, traceback2 = trace2
+ self.assertEqual(traceback2, traceback1)
+ self.assertIs(traceback2, traceback1)
+
+ def test_get_traced_memory(self):
+ # Python allocates some internals objects, so the test must tolerate
+ # a small difference between the expected size and the real usage
+ max_error = 2048
+
+ # allocate one object
+ obj_size = 1024 * 1024
+ tracemalloc.clear_traces()
+ obj, obj_traceback = allocate_bytes(obj_size)
+ size, max_size = tracemalloc.get_traced_memory()
+ self.assertGreaterEqual(size, obj_size)
+ self.assertGreaterEqual(max_size, size)
+
+ self.assertLessEqual(size - obj_size, max_error)
+ self.assertLessEqual(max_size - size, max_error)
+
+ # destroy the object
+ obj = None
+ size2, max_size2 = tracemalloc.get_traced_memory()
+ self.assertLess(size2, size)
+ self.assertGreaterEqual(size - size2, obj_size - max_error)
+ self.assertGreaterEqual(max_size2, max_size)
+
+ # clear_traces() must reset traced memory counters
+ tracemalloc.clear_traces()
+ self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
+
+ # allocate another object
+ obj, obj_traceback = allocate_bytes(obj_size)
+ size, max_size = tracemalloc.get_traced_memory()
+ self.assertGreater(size, 0)
+
+ # stop() rests also traced memory counters
+ tracemalloc.stop()
+ self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
+
+ def test_clear_traces(self):
+ obj, obj_traceback = allocate_bytes(123)
+ traceback = tracemalloc.get_object_traceback(obj)
+ self.assertIsNotNone(traceback)
+
+ tracemalloc.clear_traces()
+ traceback2 = tracemalloc.get_object_traceback(obj)
+ self.assertIsNone(traceback2)
+
+ def test_is_tracing(self):
+ tracemalloc.stop()
+ self.assertFalse(tracemalloc.is_tracing())
+
+ tracemalloc.start()
+ self.assertTrue(tracemalloc.is_tracing())
+
+ def test_snapshot(self):
+ obj, source = allocate_bytes(123)
+
+ # take a snapshot
+ snapshot = tracemalloc.take_snapshot()
+
+ # write on disk
+ snapshot.dump(support.TESTFN)
+ self.addCleanup(support.unlink, support.TESTFN)
+
+ # load from disk
+ snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
+ self.assertEqual(snapshot2.traces, snapshot.traces)
+
+ # tracemalloc must be tracing memory allocations to take a snapshot
+ tracemalloc.stop()
+ with self.assertRaises(RuntimeError) as cm:
+ tracemalloc.take_snapshot()
+ self.assertEqual(str(cm.exception),
+ "the tracemalloc module must be tracing memory "
+ "allocations to take a snapshot")
+
+ def test_snapshot_save_attr(self):
+ # take a snapshot with a new attribute
+ snapshot = tracemalloc.take_snapshot()
+ snapshot.test_attr = "new"
+ snapshot.dump(support.TESTFN)
+ self.addCleanup(support.unlink, support.TESTFN)
+
+ # load() should recreates the attribute
+ snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
+ self.assertEqual(snapshot2.test_attr, "new")
+
+ def fork_child(self):
+ if not tracemalloc.is_tracing():
+ return 2
+
+ obj_size = 12345
+ obj, obj_traceback = allocate_bytes(obj_size)
+ traceback = tracemalloc.get_object_traceback(obj)
+ if traceback is None:
+ return 3
+
+ # everything is fine
+ return 0
+
+ @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
+ def test_fork(self):
+ # check that tracemalloc is still working after fork
+ pid = os.fork()
+ if not pid:
+ # child
+ exitcode = 1
+ try:
+ exitcode = self.fork_child()
+ finally:
+ os._exit(exitcode)
+ else:
+ pid2, status = os.waitpid(pid, 0)
+ self.assertTrue(os.WIFEXITED(status))
+ exitcode = os.WEXITSTATUS(status)
+ self.assertEqual(exitcode, 0)
+
+
+class TestSnapshot(unittest.TestCase):
+ maxDiff = 4000
+
+ def test_create_snapshot(self):
+ raw_traces = [(5, (('a.py', 2),))]
+
+ with contextlib.ExitStack() as stack:
+ stack.enter_context(patch.object(tracemalloc, 'is_tracing',
+ return_value=True))
+ stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
+ return_value=5))
+ stack.enter_context(patch.object(tracemalloc, '_get_traces',
+ return_value=raw_traces))
+
+ snapshot = tracemalloc.take_snapshot()
+ self.assertEqual(snapshot.traceback_limit, 5)
+ self.assertEqual(len(snapshot.traces), 1)
+ trace = snapshot.traces[0]
+ self.assertEqual(trace.size, 5)
+ self.assertEqual(len(trace.traceback), 1)
+ self.assertEqual(trace.traceback[0].filename, 'a.py')
+ self.assertEqual(trace.traceback[0].lineno, 2)
+
+ def test_filter_traces(self):
+ snapshot, snapshot2 = create_snapshots()
+ filter1 = tracemalloc.Filter(False, "b.py")
+ filter2 = tracemalloc.Filter(True, "a.py", 2)
+ filter3 = tracemalloc.Filter(True, "a.py", 5)
+
+ original_traces = list(snapshot.traces._traces)
+
+ # exclude b.py
+ snapshot3 = snapshot.filter_traces((filter1,))
+ self.assertEqual(snapshot3.traces._traces, [
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+ (2, (('a.py', 5), ('b.py', 4))),
+ (7, (('<unknown>', 0),)),
+ ])
+
+ # filter_traces() must not touch the original snapshot
+ self.assertEqual(snapshot.traces._traces, original_traces)
+
+ # only include two lines of a.py
+ snapshot4 = snapshot3.filter_traces((filter2, filter3))
+ self.assertEqual(snapshot4.traces._traces, [
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+ (10, (('a.py', 2), ('b.py', 4))),
+ (2, (('a.py', 5), ('b.py', 4))),
+ ])
+
+ # No filter: just duplicate the snapshot
+ snapshot5 = snapshot.filter_traces(())
+ self.assertIsNot(snapshot5, snapshot)
+ self.assertIsNot(snapshot5.traces, snapshot.traces)
+ self.assertEqual(snapshot5.traces, snapshot.traces)
+
+ def test_snapshot_group_by_line(self):
+ snapshot, snapshot2 = create_snapshots()
+ tb_0 = traceback_lineno('<unknown>', 0)
+ tb_a_2 = traceback_lineno('a.py', 2)
+ tb_a_5 = traceback_lineno('a.py', 5)
+ tb_b_1 = traceback_lineno('b.py', 1)
+ tb_c_578 = traceback_lineno('c.py', 578)
+
+ # stats per file and line
+ stats1 = snapshot.statistics('lineno')
+ self.assertEqual(stats1, [
+ tracemalloc.Statistic(tb_b_1, 66, 1),
+ tracemalloc.Statistic(tb_a_2, 30, 3),
+ tracemalloc.Statistic(tb_0, 7, 1),
+ tracemalloc.Statistic(tb_a_5, 2, 1),
+ ])
+
+ # stats per file and line (2)
+ stats2 = snapshot2.statistics('lineno')
+ self.assertEqual(stats2, [
+ tracemalloc.Statistic(tb_a_5, 5002, 2),
+ tracemalloc.Statistic(tb_c_578, 400, 1),
+ tracemalloc.Statistic(tb_a_2, 30, 3),
+ ])
+
+ # stats diff per file and line
+ statistics = snapshot2.compare_to(snapshot, 'lineno')
+ self.assertEqual(statistics, [
+ tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1),
+ tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1),
+ tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1),
+ tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
+ tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0),
+ ])
+
+ def test_snapshot_group_by_file(self):
+ snapshot, snapshot2 = create_snapshots()
+ tb_0 = traceback_filename('<unknown>')
+ tb_a = traceback_filename('a.py')
+ tb_b = traceback_filename('b.py')
+ tb_c = traceback_filename('c.py')
+
+ # stats per file
+ stats1 = snapshot.statistics('filename')
+ self.assertEqual(stats1, [
+ tracemalloc.Statistic(tb_b, 66, 1),
+ tracemalloc.Statistic(tb_a, 32, 4),
+ tracemalloc.Statistic(tb_0, 7, 1),
+ ])
+
+ # stats per file (2)
+ stats2 = snapshot2.statistics('filename')
+ self.assertEqual(stats2, [
+ tracemalloc.Statistic(tb_a, 5032, 5),
+ tracemalloc.Statistic(tb_c, 400, 1),
+ ])
+
+ # stats diff per file
+ diff = snapshot2.compare_to(snapshot, 'filename')
+ self.assertEqual(diff, [
+ tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1),
+ tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1),
+ tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1),
+ tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
+ ])
+
+ def test_snapshot_group_by_traceback(self):
+ snapshot, snapshot2 = create_snapshots()
+
+ # stats per file
+ tb1 = traceback(('a.py', 2), ('b.py', 4))
+ tb2 = traceback(('a.py', 5), ('b.py', 4))
+ tb3 = traceback(('b.py', 1))
+ tb4 = traceback(('<unknown>', 0))
+ stats1 = snapshot.statistics('traceback')
+ self.assertEqual(stats1, [
+ tracemalloc.Statistic(tb3, 66, 1),
+ tracemalloc.Statistic(tb1, 30, 3),
+ tracemalloc.Statistic(tb4, 7, 1),
+ tracemalloc.Statistic(tb2, 2, 1),
+ ])
+
+ # stats per file (2)
+ tb5 = traceback(('c.py', 578))
+ stats2 = snapshot2.statistics('traceback')
+ self.assertEqual(stats2, [
+ tracemalloc.Statistic(tb2, 5002, 2),
+ tracemalloc.Statistic(tb5, 400, 1),
+ tracemalloc.Statistic(tb1, 30, 3),
+ ])
+
+ # stats diff per file
+ diff = snapshot2.compare_to(snapshot, 'traceback')
+ self.assertEqual(diff, [
+ tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1),
+ tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1),
+ tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1),
+ tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1),
+ tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0),
+ ])
+
+ self.assertRaises(ValueError,
+ snapshot.statistics, 'traceback', cumulative=True)
+
+ def test_snapshot_group_by_cumulative(self):
+ snapshot, snapshot2 = create_snapshots()
+ tb_0 = traceback_filename('<unknown>')
+ tb_a = traceback_filename('a.py')
+ tb_b = traceback_filename('b.py')
+ tb_a_2 = traceback_lineno('a.py', 2)
+ tb_a_5 = traceback_lineno('a.py', 5)
+ tb_b_1 = traceback_lineno('b.py', 1)
+ tb_b_4 = traceback_lineno('b.py', 4)
+
+ # per file
+ stats = snapshot.statistics('filename', True)
+ self.assertEqual(stats, [
+ tracemalloc.Statistic(tb_b, 98, 5),
+ tracemalloc.Statistic(tb_a, 32, 4),
+ tracemalloc.Statistic(tb_0, 7, 1),
+ ])
+
+ # per line
+ stats = snapshot.statistics('lineno', True)
+ self.assertEqual(stats, [
+ tracemalloc.Statistic(tb_b_1, 66, 1),
+ tracemalloc.Statistic(tb_b_4, 32, 4),
+ tracemalloc.Statistic(tb_a_2, 30, 3),
+ tracemalloc.Statistic(tb_0, 7, 1),
+ tracemalloc.Statistic(tb_a_5, 2, 1),
+ ])
+
+ def test_trace_format(self):
+ snapshot, snapshot2 = create_snapshots()
+ trace = snapshot.traces[0]
+ self.assertEqual(str(trace), 'a.py:2: 10 B')
+ traceback = trace.traceback
+ self.assertEqual(str(traceback), 'a.py:2')
+ frame = traceback[0]
+ self.assertEqual(str(frame), 'a.py:2')
+
+ def test_statistic_format(self):
+ snapshot, snapshot2 = create_snapshots()
+ stats = snapshot.statistics('lineno')
+ stat = stats[0]
+ self.assertEqual(str(stat),
+ 'b.py:1: size=66 B, count=1, average=66 B')
+
+ def test_statistic_diff_format(self):
+ snapshot, snapshot2 = create_snapshots()
+ stats = snapshot2.compare_to(snapshot, 'lineno')
+ stat = stats[0]
+ self.assertEqual(str(stat),
+ 'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B')
+
+
+
+class TestFilters(unittest.TestCase):
+ maxDiff = 2048
+
+ def test_filter_attributes(self):
+ # test default values
+ f = tracemalloc.Filter(True, "abc")
+ self.assertEqual(f.inclusive, True)
+ self.assertEqual(f.filename_pattern, "abc")
+ self.assertIsNone(f.lineno)
+ self.assertEqual(f.all_frames, False)
+
+ # test custom values
+ f = tracemalloc.Filter(False, "test.py", 123, True)
+ self.assertEqual(f.inclusive, False)
+ self.assertEqual(f.filename_pattern, "test.py")
+ self.assertEqual(f.lineno, 123)
+ self.assertEqual(f.all_frames, True)
+
+ # parameters passed by keyword
+ f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True)
+ self.assertEqual(f.inclusive, False)
+ self.assertEqual(f.filename_pattern, "test.py")
+ self.assertEqual(f.lineno, 123)
+ self.assertEqual(f.all_frames, True)
+
+ # read-only attribute
+ self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc")
+
+ def test_filter_match(self):
+ # filter without line number
+ f = tracemalloc.Filter(True, "abc")
+ self.assertTrue(f._match_frame("abc", 0))
+ self.assertTrue(f._match_frame("abc", 5))
+ self.assertTrue(f._match_frame("abc", 10))
+ self.assertFalse(f._match_frame("12356", 0))
+ self.assertFalse(f._match_frame("12356", 5))
+ self.assertFalse(f._match_frame("12356", 10))
+
+ f = tracemalloc.Filter(False, "abc")
+ self.assertFalse(f._match_frame("abc", 0))
+ self.assertFalse(f._match_frame("abc", 5))
+ self.assertFalse(f._match_frame("abc", 10))
+ self.assertTrue(f._match_frame("12356", 0))
+ self.assertTrue(f._match_frame("12356", 5))
+ self.assertTrue(f._match_frame("12356", 10))
+
+ # filter with line number > 0
+ f = tracemalloc.Filter(True, "abc", 5)
+ self.assertFalse(f._match_frame("abc", 0))
+ self.assertTrue(f._match_frame("abc", 5))
+ self.assertFalse(f._match_frame("abc", 10))
+ self.assertFalse(f._match_frame("12356", 0))
+ self.assertFalse(f._match_frame("12356", 5))
+ self.assertFalse(f._match_frame("12356", 10))
+
+ f = tracemalloc.Filter(False, "abc", 5)
+ self.assertTrue(f._match_frame("abc", 0))
+ self.assertFalse(f._match_frame("abc", 5))
+ self.assertTrue(f._match_frame("abc", 10))
+ self.assertTrue(f._match_frame("12356", 0))
+ self.assertTrue(f._match_frame("12356", 5))
+ self.assertTrue(f._match_frame("12356", 10))
+
+ # filter with line number 0
+ f = tracemalloc.Filter(True, "abc", 0)
+ self.assertTrue(f._match_frame("abc", 0))
+ self.assertFalse(f._match_frame("abc", 5))
+ self.assertFalse(f._match_frame("abc", 10))
+ self.assertFalse(f._match_frame("12356", 0))
+ self.assertFalse(f._match_frame("12356", 5))
+ self.assertFalse(f._match_frame("12356", 10))
+
+ f = tracemalloc.Filter(False, "abc", 0)
+ self.assertFalse(f._match_frame("abc", 0))
+ self.assertTrue(f._match_frame("abc", 5))
+ self.assertTrue(f._match_frame("abc", 10))
+ self.assertTrue(f._match_frame("12356", 0))
+ self.assertTrue(f._match_frame("12356", 5))
+ self.assertTrue(f._match_frame("12356", 10))
+
+ def test_filter_match_filename(self):
+ def fnmatch(inclusive, filename, pattern):
+ f = tracemalloc.Filter(inclusive, pattern)
+ return f._match_frame(filename, 0)
+
+ self.assertTrue(fnmatch(True, "abc", "abc"))
+ self.assertFalse(fnmatch(True, "12356", "abc"))
+ self.assertFalse(fnmatch(True, "<unknown>", "abc"))
+
+ self.assertFalse(fnmatch(False, "abc", "abc"))
+ self.assertTrue(fnmatch(False, "12356", "abc"))
+ self.assertTrue(fnmatch(False, "<unknown>", "abc"))
+
+ def test_filter_match_filename_joker(self):
+ def fnmatch(filename, pattern):
+ filter = tracemalloc.Filter(True, pattern)
+ return filter._match_frame(filename, 0)
+
+ # empty string
+ self.assertFalse(fnmatch('abc', ''))
+ self.assertFalse(fnmatch('', 'abc'))
+ self.assertTrue(fnmatch('', ''))
+ self.assertTrue(fnmatch('', '*'))
+
+ # no *
+ self.assertTrue(fnmatch('abc', 'abc'))
+ self.assertFalse(fnmatch('abc', 'abcd'))
+ self.assertFalse(fnmatch('abc', 'def'))
+
+ # a*
+ self.assertTrue(fnmatch('abc', 'a*'))
+ self.assertTrue(fnmatch('abc', 'abc*'))
+ self.assertFalse(fnmatch('abc', 'b*'))
+ self.assertFalse(fnmatch('abc', 'abcd*'))
+
+ # a*b
+ self.assertTrue(fnmatch('abc', 'a*c'))
+ self.assertTrue(fnmatch('abcdcx', 'a*cx'))
+ self.assertFalse(fnmatch('abb', 'a*c'))
+ self.assertFalse(fnmatch('abcdce', 'a*cx'))
+
+ # a*b*c
+ self.assertTrue(fnmatch('abcde', 'a*c*e'))
+ self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg'))
+ self.assertFalse(fnmatch('abcdd', 'a*c*e'))
+ self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg'))
+
+ # replace .pyc and .pyo suffix with .py
+ self.assertTrue(fnmatch('a.pyc', 'a.py'))
+ self.assertTrue(fnmatch('a.pyo', 'a.py'))
+ self.assertTrue(fnmatch('a.py', 'a.pyc'))
+ self.assertTrue(fnmatch('a.py', 'a.pyo'))
+
+ if os.name == 'nt':
+ # case insensitive
+ self.assertTrue(fnmatch('aBC', 'ABc'))
+ self.assertTrue(fnmatch('aBcDe', 'Ab*dE'))
+
+ self.assertTrue(fnmatch('a.pyc', 'a.PY'))
+ self.assertTrue(fnmatch('a.PYO', 'a.py'))
+ self.assertTrue(fnmatch('a.py', 'a.PYC'))
+ self.assertTrue(fnmatch('a.PY', 'a.pyo'))
+ else:
+ # case sensitive
+ self.assertFalse(fnmatch('aBC', 'ABc'))
+ self.assertFalse(fnmatch('aBcDe', 'Ab*dE'))
+
+ self.assertFalse(fnmatch('a.pyc', 'a.PY'))
+ self.assertFalse(fnmatch('a.PYO', 'a.py'))
+ self.assertFalse(fnmatch('a.py', 'a.PYC'))
+ self.assertFalse(fnmatch('a.PY', 'a.pyo'))
+
+ if os.name == 'nt':
+ # normalize alternate separator "/" to the standard separator "\"
+ self.assertTrue(fnmatch(r'a/b', r'a\b'))
+ self.assertTrue(fnmatch(r'a\b', r'a/b'))
+ self.assertTrue(fnmatch(r'a/b\c', r'a\b/c'))
+ self.assertTrue(fnmatch(r'a/b/c', r'a\b\c'))
+ else:
+ # there is no alternate separator
+ self.assertFalse(fnmatch(r'a/b', r'a\b'))
+ self.assertFalse(fnmatch(r'a\b', r'a/b'))
+ self.assertFalse(fnmatch(r'a/b\c', r'a\b/c'))
+ self.assertFalse(fnmatch(r'a/b/c', r'a\b\c'))
+
+ def test_filter_match_trace(self):
+ t1 = (("a.py", 2), ("b.py", 3))
+ t2 = (("b.py", 4), ("b.py", 5))
+ t3 = (("c.py", 5), ('<unknown>', 0))
+ unknown = (('<unknown>', 0),)
+
+ f = tracemalloc.Filter(True, "b.py", all_frames=True)
+ self.assertTrue(f._match_traceback(t1))
+ self.assertTrue(f._match_traceback(t2))
+ self.assertFalse(f._match_traceback(t3))
+ self.assertFalse(f._match_traceback(unknown))
+
+ f = tracemalloc.Filter(True, "b.py", all_frames=False)
+ self.assertFalse(f._match_traceback(t1))
+ self.assertTrue(f._match_traceback(t2))
+ self.assertFalse(f._match_traceback(t3))
+ self.assertFalse(f._match_traceback(unknown))
+
+ f = tracemalloc.Filter(False, "b.py", all_frames=True)
+ self.assertFalse(f._match_traceback(t1))
+ self.assertFalse(f._match_traceback(t2))
+ self.assertTrue(f._match_traceback(t3))
+ self.assertTrue(f._match_traceback(unknown))
+
+ f = tracemalloc.Filter(False, "b.py", all_frames=False)
+ self.assertTrue(f._match_traceback(t1))
+ self.assertFalse(f._match_traceback(t2))
+ self.assertTrue(f._match_traceback(t3))
+ self.assertTrue(f._match_traceback(unknown))
+
+ f = tracemalloc.Filter(False, "<unknown>", all_frames=False)
+ self.assertTrue(f._match_traceback(t1))
+ self.assertTrue(f._match_traceback(t2))
+ self.assertTrue(f._match_traceback(t3))
+ self.assertFalse(f._match_traceback(unknown))
+
+ f = tracemalloc.Filter(True, "<unknown>", all_frames=True)
+ self.assertFalse(f._match_traceback(t1))
+ self.assertFalse(f._match_traceback(t2))
+ self.assertTrue(f._match_traceback(t3))
+ self.assertTrue(f._match_traceback(unknown))
+
+ f = tracemalloc.Filter(False, "<unknown>", all_frames=True)
+ self.assertTrue(f._match_traceback(t1))
+ self.assertTrue(f._match_traceback(t2))
+ self.assertFalse(f._match_traceback(t3))
+ self.assertFalse(f._match_traceback(unknown))
+
+
+class TestCommandLine(unittest.TestCase):
+ def test_env_var(self):
+ # not tracing by default
+ code = 'import tracemalloc; print(tracemalloc.is_tracing())'
+ ok, stdout, stderr = assert_python_ok('-c', code)
+ stdout = stdout.rstrip()
+ self.assertEqual(stdout, b'False')
+
+ # PYTHON* environment varibles must be ignored when -E option is
+ # present
+ code = 'import tracemalloc; print(tracemalloc.is_tracing())'
+ ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1')
+ stdout = stdout.rstrip()
+ self.assertEqual(stdout, b'False')
+
+ # tracing at startup
+ code = 'import tracemalloc; print(tracemalloc.is_tracing())'
+ ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1')
+ stdout = stdout.rstrip()
+ self.assertEqual(stdout, b'True')
+
+ # start and set the number of frames
+ code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
+ ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10')
+ stdout = stdout.rstrip()
+ self.assertEqual(stdout, b'10')
+
+ def test_env_var_invalid(self):
+ for nframe in (-1, 0, 5000):
+ with self.subTest(nframe=nframe):
+ with support.SuppressCrashReport():
+ ok, stdout, stderr = assert_python_failure(
+ '-c', 'pass',
+ PYTHONTRACEMALLOC=str(nframe))
+ self.assertIn(b'PYTHONTRACEMALLOC must be an integer '
+ b'in range [1; 100]',
+ stderr)
+
+ def test_sys_xoptions(self):
+ for xoptions, nframe in (
+ ('tracemalloc', 1),
+ ('tracemalloc=1', 1),
+ ('tracemalloc=15', 15),
+ ):
+ with self.subTest(xoptions=xoptions, nframe=nframe):
+ code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
+ ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code)
+ stdout = stdout.rstrip()
+ self.assertEqual(stdout, str(nframe).encode('ascii'))
+
+ def test_sys_xoptions_invalid(self):
+ for nframe in (-1, 0, 5000):
+ with self.subTest(nframe=nframe):
+ with support.SuppressCrashReport():
+ args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass')
+ ok, stdout, stderr = assert_python_failure(*args)
+ self.assertIn(b'-X tracemalloc=NFRAME: number of frame must '
+ b'be an integer in range [1; 100]',
+ stderr)
+
+
+def test_main():
+ support.run_unittest(
+ TestTracemallocEnabled,
+ TestSnapshot,
+ TestFilters,
+ TestCommandLine,
+ )
+
+if __name__ == "__main__":
+ test_main()