diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/support/__init__.py | 19 | ||||
-rw-r--r-- | Lib/test/test_atexit.py | 4 | ||||
-rw-r--r-- | Lib/test/test_capi.py | 2 | ||||
-rw-r--r-- | Lib/test/test_threading.py | 4 | ||||
-rw-r--r-- | Lib/test/test_tracemalloc.py | 797 |
5 files changed, 821 insertions, 5 deletions
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 87c039a..a242a47 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -2154,3 +2154,22 @@ def patch(test_instance, object_to_patch, attr_name, new_value): # actually override the attribute setattr(object_to_patch, attr_name, new_value) + + +def run_in_subinterp(code): + """ + Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc + module is enabled. + """ + # Issue #10915, #15751: PyGILState_*() functions don't work with + # sub-interpreters, the tracemalloc module uses these functions internally + try: + import tracemalloc + except ImportError: + pass + else: + if tracemalloc.is_tracing(): + raise unittest.SkipTest("run_in_subinterp() cannot be used " + "if tracemalloc module is tracing " + "memory allocations") + return _testcapi.run_in_subinterp(code) diff --git a/Lib/test/test_atexit.py b/Lib/test/test_atexit.py index b641015..84644f1 100644 --- a/Lib/test/test_atexit.py +++ b/Lib/test/test_atexit.py @@ -158,7 +158,7 @@ class SubinterpreterTest(unittest.TestCase): atexit.register(f) del atexit """ - ret = _testcapi.run_in_subinterp(code) + ret = support.run_in_subinterp(code) self.assertEqual(ret, 0) self.assertEqual(atexit._ncallbacks(), n) @@ -173,7 +173,7 @@ class SubinterpreterTest(unittest.TestCase): atexit.register(f) atexit.__atexit = atexit """ - ret = _testcapi.run_in_subinterp(code) + ret = support.run_in_subinterp(code) self.assertEqual(ret, 0) self.assertEqual(atexit._ncallbacks(), n) diff --git a/Lib/test/test_capi.py b/Lib/test/test_capi.py index d37c057..000079e 100644 --- a/Lib/test/test_capi.py +++ b/Lib/test/test_capi.py @@ -205,7 +205,7 @@ class SubinterpreterTest(unittest.TestCase): pickle.dump(id(builtins), f) """.format(w) with open(r, "rb") as f: - ret = _testcapi.run_in_subinterp(code) + ret = support.run_in_subinterp(code) self.assertEqual(ret, 0) self.assertNotEqual(pickle.load(f), id(sys.modules)) self.assertNotEqual(pickle.load(f), id(builtins)) diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 66eace0..a84577c 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -853,7 +853,7 @@ class SubinterpThreadingTests(BaseTestCase): os.write(%d, b"x") threading.Thread(target=f).start() """ % (w,) - ret = _testcapi.run_in_subinterp(code) + ret = test.support.run_in_subinterp(code) self.assertEqual(ret, 0) # The thread was joined properly. self.assertEqual(os.read(r, 1), b"x") @@ -885,7 +885,7 @@ class SubinterpThreadingTests(BaseTestCase): os.write(%d, b"x") threading.Thread(target=f).start() """ % (w,) - ret = _testcapi.run_in_subinterp(code) + ret = test.support.run_in_subinterp(code) self.assertEqual(ret, 0) # The thread was joined properly. self.assertEqual(os.read(r, 1), b"x") diff --git a/Lib/test/test_tracemalloc.py b/Lib/test/test_tracemalloc.py new file mode 100644 index 0000000..ecb5aee --- /dev/null +++ b/Lib/test/test_tracemalloc.py @@ -0,0 +1,797 @@ +import _tracemalloc +import contextlib +import datetime +import os +import sys +import tracemalloc +import unittest +from unittest.mock import patch +from test.script_helper import assert_python_ok, assert_python_failure +from test import support +try: + import threading +except ImportError: + threading = None + +EMPTY_STRING_SIZE = sys.getsizeof(b'') + +def get_frames(nframe, lineno_delta): + frames = [] + frame = sys._getframe(1) + for index in range(nframe): + code = frame.f_code + lineno = frame.f_lineno + lineno_delta + frames.append((code.co_filename, lineno)) + lineno_delta = 0 + frame = frame.f_back + if frame is None: + break + return tuple(frames) + +def allocate_bytes(size): + nframe = tracemalloc.get_traceback_limit() + bytes_len = (size - EMPTY_STRING_SIZE) + frames = get_frames(nframe, 1) + data = b'x' * bytes_len + return data, tracemalloc.Traceback(frames) + +def create_snapshots(): + traceback_limit = 2 + + raw_traces = [ + (10, (('a.py', 2), ('b.py', 4))), + (10, (('a.py', 2), ('b.py', 4))), + (10, (('a.py', 2), ('b.py', 4))), + + (2, (('a.py', 5), ('b.py', 4))), + + (66, (('b.py', 1),)), + + (7, (('<unknown>', 0),)), + ] + snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit) + + raw_traces2 = [ + (10, (('a.py', 2), ('b.py', 4))), + (10, (('a.py', 2), ('b.py', 4))), + (10, (('a.py', 2), ('b.py', 4))), + + (2, (('a.py', 5), ('b.py', 4))), + (5000, (('a.py', 5), ('b.py', 4))), + + (400, (('c.py', 578),)), + ] + snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit) + + return (snapshot, snapshot2) + +def frame(filename, lineno): + return tracemalloc._Frame((filename, lineno)) + +def traceback(*frames): + return tracemalloc.Traceback(frames) + +def traceback_lineno(filename, lineno): + return traceback((filename, lineno)) + +def traceback_filename(filename): + return traceback_lineno(filename, 0) + + +class TestTracemallocEnabled(unittest.TestCase): + def setUp(self): + if tracemalloc.is_tracing(): + self.skipTest("tracemalloc must be stopped before the test") + + tracemalloc.set_traceback_limit(1) + tracemalloc.start() + + def tearDown(self): + tracemalloc.stop() + + def test_get_tracemalloc_memory(self): + data = [allocate_bytes(123) for count in range(1000)] + size = tracemalloc.get_tracemalloc_memory() + self.assertGreaterEqual(size, 0) + + tracemalloc.clear_traces() + size2 = tracemalloc.get_tracemalloc_memory() + self.assertGreaterEqual(size2, 0) + self.assertLessEqual(size2, size) + + def test_get_object_traceback(self): + tracemalloc.clear_traces() + obj_size = 12345 + obj, obj_traceback = allocate_bytes(obj_size) + traceback = tracemalloc.get_object_traceback(obj) + self.assertEqual(traceback, obj_traceback) + + def test_set_traceback_limit(self): + obj_size = 10 + + nframe = tracemalloc.get_traceback_limit() + self.addCleanup(tracemalloc.set_traceback_limit, nframe) + + self.assertRaises(ValueError, tracemalloc.set_traceback_limit, -1) + + tracemalloc.clear_traces() + tracemalloc.set_traceback_limit(10) + obj2, obj2_traceback = allocate_bytes(obj_size) + traceback = tracemalloc.get_object_traceback(obj2) + self.assertEqual(len(traceback), 10) + self.assertEqual(traceback, obj2_traceback) + + tracemalloc.clear_traces() + tracemalloc.set_traceback_limit(1) + obj, obj_traceback = allocate_bytes(obj_size) + traceback = tracemalloc.get_object_traceback(obj) + self.assertEqual(len(traceback), 1) + self.assertEqual(traceback, obj_traceback) + + + def find_trace(self, traces, traceback): + for trace in traces: + if trace[1] == traceback._frames: + return trace + + self.fail("trace not found") + + def test_get_traces(self): + tracemalloc.clear_traces() + obj_size = 12345 + obj, obj_traceback = allocate_bytes(obj_size) + + traces = tracemalloc._get_traces() + trace = self.find_trace(traces, obj_traceback) + + self.assertIsInstance(trace, tuple) + size, traceback = trace + self.assertEqual(size, obj_size) + self.assertEqual(traceback, obj_traceback._frames) + + tracemalloc.stop() + self.assertEqual(tracemalloc._get_traces(), []) + + + def test_get_traces_intern_traceback(self): + # dummy wrappers to get more useful and identical frames in the traceback + def allocate_bytes2(size): + return allocate_bytes(size) + def allocate_bytes3(size): + return allocate_bytes2(size) + def allocate_bytes4(size): + return allocate_bytes3(size) + + # Ensure that two identical tracebacks are not duplicated + tracemalloc.clear_traces() + tracemalloc.set_traceback_limit(4) + obj_size = 123 + obj1, obj1_traceback = allocate_bytes4(obj_size) + obj2, obj2_traceback = allocate_bytes4(obj_size) + + traces = tracemalloc._get_traces() + + trace1 = self.find_trace(traces, obj1_traceback) + trace2 = self.find_trace(traces, obj2_traceback) + size1, traceback1 = trace1 + size2, traceback2 = trace2 + self.assertEqual(traceback2, traceback1) + self.assertIs(traceback2, traceback1) + + def test_get_traced_memory(self): + # Python allocates some internals objects, so the test must tolerate + # a small difference between the expected size and the real usage + max_error = 2048 + + # allocate one object + obj_size = 1024 * 1024 + tracemalloc.clear_traces() + obj, obj_traceback = allocate_bytes(obj_size) + size, max_size = tracemalloc.get_traced_memory() + self.assertGreaterEqual(size, obj_size) + self.assertGreaterEqual(max_size, size) + + self.assertLessEqual(size - obj_size, max_error) + self.assertLessEqual(max_size - size, max_error) + + # destroy the object + obj = None + size2, max_size2 = tracemalloc.get_traced_memory() + self.assertLess(size2, size) + self.assertGreaterEqual(size - size2, obj_size - max_error) + self.assertGreaterEqual(max_size2, max_size) + + # clear_traces() must reset traced memory counters + tracemalloc.clear_traces() + self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) + + # allocate another object + obj, obj_traceback = allocate_bytes(obj_size) + size, max_size = tracemalloc.get_traced_memory() + self.assertGreater(size, 0) + + # stop() rests also traced memory counters + tracemalloc.stop() + self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) + + def test_clear_traces(self): + obj, obj_traceback = allocate_bytes(123) + traceback = tracemalloc.get_object_traceback(obj) + self.assertIsNotNone(traceback) + + tracemalloc.clear_traces() + traceback2 = tracemalloc.get_object_traceback(obj) + self.assertIsNone(traceback2) + + def test_is_tracing(self): + tracemalloc.stop() + self.assertFalse(tracemalloc.is_tracing()) + + tracemalloc.start() + self.assertTrue(tracemalloc.is_tracing()) + + def test_snapshot(self): + obj, source = allocate_bytes(123) + + # take a snapshot + snapshot = tracemalloc.take_snapshot() + + # write on disk + snapshot.dump(support.TESTFN) + self.addCleanup(support.unlink, support.TESTFN) + + # load from disk + snapshot2 = tracemalloc.Snapshot.load(support.TESTFN) + self.assertEqual(snapshot2.traces, snapshot.traces) + + # tracemalloc must be tracing memory allocations to take a snapshot + tracemalloc.stop() + with self.assertRaises(RuntimeError) as cm: + tracemalloc.take_snapshot() + self.assertEqual(str(cm.exception), + "the tracemalloc module must be tracing memory " + "allocations to take a snapshot") + + def test_snapshot_save_attr(self): + # take a snapshot with a new attribute + snapshot = tracemalloc.take_snapshot() + snapshot.test_attr = "new" + snapshot.dump(support.TESTFN) + self.addCleanup(support.unlink, support.TESTFN) + + # load() should recreates the attribute + snapshot2 = tracemalloc.Snapshot.load(support.TESTFN) + self.assertEqual(snapshot2.test_attr, "new") + + def fork_child(self): + if not tracemalloc.is_tracing(): + return 2 + + obj_size = 12345 + obj, obj_traceback = allocate_bytes(obj_size) + traceback = tracemalloc.get_object_traceback(obj) + if traceback is None: + return 3 + + # everything is fine + return 0 + + @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') + def test_fork(self): + # check that tracemalloc is still working after fork + pid = os.fork() + if not pid: + # child + exitcode = 1 + try: + exitcode = self.fork_child() + finally: + os._exit(exitcode) + else: + pid2, status = os.waitpid(pid, 0) + self.assertTrue(os.WIFEXITED(status)) + exitcode = os.WEXITSTATUS(status) + self.assertEqual(exitcode, 0) + + +class TestSnapshot(unittest.TestCase): + maxDiff = 4000 + + def test_create_snapshot(self): + raw_traces = [(5, (('a.py', 2),))] + + with contextlib.ExitStack() as stack: + stack.enter_context(patch.object(tracemalloc, 'is_tracing', + return_value=True)) + stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit', + return_value=5)) + stack.enter_context(patch.object(tracemalloc, '_get_traces', + return_value=raw_traces)) + + snapshot = tracemalloc.take_snapshot() + self.assertEqual(snapshot.traceback_limit, 5) + self.assertEqual(len(snapshot.traces), 1) + trace = snapshot.traces[0] + self.assertEqual(trace.size, 5) + self.assertEqual(len(trace.traceback), 1) + self.assertEqual(trace.traceback[0].filename, 'a.py') + self.assertEqual(trace.traceback[0].lineno, 2) + + def test_filter_traces(self): + snapshot, snapshot2 = create_snapshots() + filter1 = tracemalloc.Filter(False, "b.py") + filter2 = tracemalloc.Filter(True, "a.py", 2) + filter3 = tracemalloc.Filter(True, "a.py", 5) + + original_traces = list(snapshot.traces._traces) + + # exclude b.py + snapshot3 = snapshot.filter_traces((filter1,)) + self.assertEqual(snapshot3.traces._traces, [ + (10, (('a.py', 2), ('b.py', 4))), + (10, (('a.py', 2), ('b.py', 4))), + (10, (('a.py', 2), ('b.py', 4))), + (2, (('a.py', 5), ('b.py', 4))), + (7, (('<unknown>', 0),)), + ]) + + # filter_traces() must not touch the original snapshot + self.assertEqual(snapshot.traces._traces, original_traces) + + # only include two lines of a.py + snapshot4 = snapshot3.filter_traces((filter2, filter3)) + self.assertEqual(snapshot4.traces._traces, [ + (10, (('a.py', 2), ('b.py', 4))), + (10, (('a.py', 2), ('b.py', 4))), + (10, (('a.py', 2), ('b.py', 4))), + (2, (('a.py', 5), ('b.py', 4))), + ]) + + # No filter: just duplicate the snapshot + snapshot5 = snapshot.filter_traces(()) + self.assertIsNot(snapshot5, snapshot) + self.assertIsNot(snapshot5.traces, snapshot.traces) + self.assertEqual(snapshot5.traces, snapshot.traces) + + def test_snapshot_group_by_line(self): + snapshot, snapshot2 = create_snapshots() + tb_0 = traceback_lineno('<unknown>', 0) + tb_a_2 = traceback_lineno('a.py', 2) + tb_a_5 = traceback_lineno('a.py', 5) + tb_b_1 = traceback_lineno('b.py', 1) + tb_c_578 = traceback_lineno('c.py', 578) + + # stats per file and line + stats1 = snapshot.statistics('lineno') + self.assertEqual(stats1, [ + tracemalloc.Statistic(tb_b_1, 66, 1), + tracemalloc.Statistic(tb_a_2, 30, 3), + tracemalloc.Statistic(tb_0, 7, 1), + tracemalloc.Statistic(tb_a_5, 2, 1), + ]) + + # stats per file and line (2) + stats2 = snapshot2.statistics('lineno') + self.assertEqual(stats2, [ + tracemalloc.Statistic(tb_a_5, 5002, 2), + tracemalloc.Statistic(tb_c_578, 400, 1), + tracemalloc.Statistic(tb_a_2, 30, 3), + ]) + + # stats diff per file and line + statistics = snapshot2.compare_to(snapshot, 'lineno') + self.assertEqual(statistics, [ + tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1), + tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1), + tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1), + tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), + tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0), + ]) + + def test_snapshot_group_by_file(self): + snapshot, snapshot2 = create_snapshots() + tb_0 = traceback_filename('<unknown>') + tb_a = traceback_filename('a.py') + tb_b = traceback_filename('b.py') + tb_c = traceback_filename('c.py') + + # stats per file + stats1 = snapshot.statistics('filename') + self.assertEqual(stats1, [ + tracemalloc.Statistic(tb_b, 66, 1), + tracemalloc.Statistic(tb_a, 32, 4), + tracemalloc.Statistic(tb_0, 7, 1), + ]) + + # stats per file (2) + stats2 = snapshot2.statistics('filename') + self.assertEqual(stats2, [ + tracemalloc.Statistic(tb_a, 5032, 5), + tracemalloc.Statistic(tb_c, 400, 1), + ]) + + # stats diff per file + diff = snapshot2.compare_to(snapshot, 'filename') + self.assertEqual(diff, [ + tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1), + tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1), + tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1), + tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), + ]) + + def test_snapshot_group_by_traceback(self): + snapshot, snapshot2 = create_snapshots() + + # stats per file + tb1 = traceback(('a.py', 2), ('b.py', 4)) + tb2 = traceback(('a.py', 5), ('b.py', 4)) + tb3 = traceback(('b.py', 1)) + tb4 = traceback(('<unknown>', 0)) + stats1 = snapshot.statistics('traceback') + self.assertEqual(stats1, [ + tracemalloc.Statistic(tb3, 66, 1), + tracemalloc.Statistic(tb1, 30, 3), + tracemalloc.Statistic(tb4, 7, 1), + tracemalloc.Statistic(tb2, 2, 1), + ]) + + # stats per file (2) + tb5 = traceback(('c.py', 578)) + stats2 = snapshot2.statistics('traceback') + self.assertEqual(stats2, [ + tracemalloc.Statistic(tb2, 5002, 2), + tracemalloc.Statistic(tb5, 400, 1), + tracemalloc.Statistic(tb1, 30, 3), + ]) + + # stats diff per file + diff = snapshot2.compare_to(snapshot, 'traceback') + self.assertEqual(diff, [ + tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1), + tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1), + tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1), + tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1), + tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0), + ]) + + self.assertRaises(ValueError, + snapshot.statistics, 'traceback', cumulative=True) + + def test_snapshot_group_by_cumulative(self): + snapshot, snapshot2 = create_snapshots() + tb_0 = traceback_filename('<unknown>') + tb_a = traceback_filename('a.py') + tb_b = traceback_filename('b.py') + tb_a_2 = traceback_lineno('a.py', 2) + tb_a_5 = traceback_lineno('a.py', 5) + tb_b_1 = traceback_lineno('b.py', 1) + tb_b_4 = traceback_lineno('b.py', 4) + + # per file + stats = snapshot.statistics('filename', True) + self.assertEqual(stats, [ + tracemalloc.Statistic(tb_b, 98, 5), + tracemalloc.Statistic(tb_a, 32, 4), + tracemalloc.Statistic(tb_0, 7, 1), + ]) + + # per line + stats = snapshot.statistics('lineno', True) + self.assertEqual(stats, [ + tracemalloc.Statistic(tb_b_1, 66, 1), + tracemalloc.Statistic(tb_b_4, 32, 4), + tracemalloc.Statistic(tb_a_2, 30, 3), + tracemalloc.Statistic(tb_0, 7, 1), + tracemalloc.Statistic(tb_a_5, 2, 1), + ]) + + def test_trace_format(self): + snapshot, snapshot2 = create_snapshots() + trace = snapshot.traces[0] + self.assertEqual(str(trace), 'a.py:2: 10 B') + traceback = trace.traceback + self.assertEqual(str(traceback), 'a.py:2') + frame = traceback[0] + self.assertEqual(str(frame), 'a.py:2') + + def test_statistic_format(self): + snapshot, snapshot2 = create_snapshots() + stats = snapshot.statistics('lineno') + stat = stats[0] + self.assertEqual(str(stat), + 'b.py:1: size=66 B, count=1, average=66 B') + + def test_statistic_diff_format(self): + snapshot, snapshot2 = create_snapshots() + stats = snapshot2.compare_to(snapshot, 'lineno') + stat = stats[0] + self.assertEqual(str(stat), + 'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B') + + + +class TestFilters(unittest.TestCase): + maxDiff = 2048 + + def test_filter_attributes(self): + # test default values + f = tracemalloc.Filter(True, "abc") + self.assertEqual(f.inclusive, True) + self.assertEqual(f.filename_pattern, "abc") + self.assertIsNone(f.lineno) + self.assertEqual(f.all_frames, False) + + # test custom values + f = tracemalloc.Filter(False, "test.py", 123, True) + self.assertEqual(f.inclusive, False) + self.assertEqual(f.filename_pattern, "test.py") + self.assertEqual(f.lineno, 123) + self.assertEqual(f.all_frames, True) + + # parameters passed by keyword + f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True) + self.assertEqual(f.inclusive, False) + self.assertEqual(f.filename_pattern, "test.py") + self.assertEqual(f.lineno, 123) + self.assertEqual(f.all_frames, True) + + # read-only attribute + self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc") + + def test_filter_match(self): + # filter without line number + f = tracemalloc.Filter(True, "abc") + self.assertTrue(f._match_frame("abc", 0)) + self.assertTrue(f._match_frame("abc", 5)) + self.assertTrue(f._match_frame("abc", 10)) + self.assertFalse(f._match_frame("12356", 0)) + self.assertFalse(f._match_frame("12356", 5)) + self.assertFalse(f._match_frame("12356", 10)) + + f = tracemalloc.Filter(False, "abc") + self.assertFalse(f._match_frame("abc", 0)) + self.assertFalse(f._match_frame("abc", 5)) + self.assertFalse(f._match_frame("abc", 10)) + self.assertTrue(f._match_frame("12356", 0)) + self.assertTrue(f._match_frame("12356", 5)) + self.assertTrue(f._match_frame("12356", 10)) + + # filter with line number > 0 + f = tracemalloc.Filter(True, "abc", 5) + self.assertFalse(f._match_frame("abc", 0)) + self.assertTrue(f._match_frame("abc", 5)) + self.assertFalse(f._match_frame("abc", 10)) + self.assertFalse(f._match_frame("12356", 0)) + self.assertFalse(f._match_frame("12356", 5)) + self.assertFalse(f._match_frame("12356", 10)) + + f = tracemalloc.Filter(False, "abc", 5) + self.assertTrue(f._match_frame("abc", 0)) + self.assertFalse(f._match_frame("abc", 5)) + self.assertTrue(f._match_frame("abc", 10)) + self.assertTrue(f._match_frame("12356", 0)) + self.assertTrue(f._match_frame("12356", 5)) + self.assertTrue(f._match_frame("12356", 10)) + + # filter with line number 0 + f = tracemalloc.Filter(True, "abc", 0) + self.assertTrue(f._match_frame("abc", 0)) + self.assertFalse(f._match_frame("abc", 5)) + self.assertFalse(f._match_frame("abc", 10)) + self.assertFalse(f._match_frame("12356", 0)) + self.assertFalse(f._match_frame("12356", 5)) + self.assertFalse(f._match_frame("12356", 10)) + + f = tracemalloc.Filter(False, "abc", 0) + self.assertFalse(f._match_frame("abc", 0)) + self.assertTrue(f._match_frame("abc", 5)) + self.assertTrue(f._match_frame("abc", 10)) + self.assertTrue(f._match_frame("12356", 0)) + self.assertTrue(f._match_frame("12356", 5)) + self.assertTrue(f._match_frame("12356", 10)) + + def test_filter_match_filename(self): + def fnmatch(inclusive, filename, pattern): + f = tracemalloc.Filter(inclusive, pattern) + return f._match_frame(filename, 0) + + self.assertTrue(fnmatch(True, "abc", "abc")) + self.assertFalse(fnmatch(True, "12356", "abc")) + self.assertFalse(fnmatch(True, "<unknown>", "abc")) + + self.assertFalse(fnmatch(False, "abc", "abc")) + self.assertTrue(fnmatch(False, "12356", "abc")) + self.assertTrue(fnmatch(False, "<unknown>", "abc")) + + def test_filter_match_filename_joker(self): + def fnmatch(filename, pattern): + filter = tracemalloc.Filter(True, pattern) + return filter._match_frame(filename, 0) + + # empty string + self.assertFalse(fnmatch('abc', '')) + self.assertFalse(fnmatch('', 'abc')) + self.assertTrue(fnmatch('', '')) + self.assertTrue(fnmatch('', '*')) + + # no * + self.assertTrue(fnmatch('abc', 'abc')) + self.assertFalse(fnmatch('abc', 'abcd')) + self.assertFalse(fnmatch('abc', 'def')) + + # a* + self.assertTrue(fnmatch('abc', 'a*')) + self.assertTrue(fnmatch('abc', 'abc*')) + self.assertFalse(fnmatch('abc', 'b*')) + self.assertFalse(fnmatch('abc', 'abcd*')) + + # a*b + self.assertTrue(fnmatch('abc', 'a*c')) + self.assertTrue(fnmatch('abcdcx', 'a*cx')) + self.assertFalse(fnmatch('abb', 'a*c')) + self.assertFalse(fnmatch('abcdce', 'a*cx')) + + # a*b*c + self.assertTrue(fnmatch('abcde', 'a*c*e')) + self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg')) + self.assertFalse(fnmatch('abcdd', 'a*c*e')) + self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg')) + + # replace .pyc and .pyo suffix with .py + self.assertTrue(fnmatch('a.pyc', 'a.py')) + self.assertTrue(fnmatch('a.pyo', 'a.py')) + self.assertTrue(fnmatch('a.py', 'a.pyc')) + self.assertTrue(fnmatch('a.py', 'a.pyo')) + + if os.name == 'nt': + # case insensitive + self.assertTrue(fnmatch('aBC', 'ABc')) + self.assertTrue(fnmatch('aBcDe', 'Ab*dE')) + + self.assertTrue(fnmatch('a.pyc', 'a.PY')) + self.assertTrue(fnmatch('a.PYO', 'a.py')) + self.assertTrue(fnmatch('a.py', 'a.PYC')) + self.assertTrue(fnmatch('a.PY', 'a.pyo')) + else: + # case sensitive + self.assertFalse(fnmatch('aBC', 'ABc')) + self.assertFalse(fnmatch('aBcDe', 'Ab*dE')) + + self.assertFalse(fnmatch('a.pyc', 'a.PY')) + self.assertFalse(fnmatch('a.PYO', 'a.py')) + self.assertFalse(fnmatch('a.py', 'a.PYC')) + self.assertFalse(fnmatch('a.PY', 'a.pyo')) + + if os.name == 'nt': + # normalize alternate separator "/" to the standard separator "\" + self.assertTrue(fnmatch(r'a/b', r'a\b')) + self.assertTrue(fnmatch(r'a\b', r'a/b')) + self.assertTrue(fnmatch(r'a/b\c', r'a\b/c')) + self.assertTrue(fnmatch(r'a/b/c', r'a\b\c')) + else: + # there is no alternate separator + self.assertFalse(fnmatch(r'a/b', r'a\b')) + self.assertFalse(fnmatch(r'a\b', r'a/b')) + self.assertFalse(fnmatch(r'a/b\c', r'a\b/c')) + self.assertFalse(fnmatch(r'a/b/c', r'a\b\c')) + + def test_filter_match_trace(self): + t1 = (("a.py", 2), ("b.py", 3)) + t2 = (("b.py", 4), ("b.py", 5)) + t3 = (("c.py", 5), ('<unknown>', 0)) + unknown = (('<unknown>', 0),) + + f = tracemalloc.Filter(True, "b.py", all_frames=True) + self.assertTrue(f._match_traceback(t1)) + self.assertTrue(f._match_traceback(t2)) + self.assertFalse(f._match_traceback(t3)) + self.assertFalse(f._match_traceback(unknown)) + + f = tracemalloc.Filter(True, "b.py", all_frames=False) + self.assertFalse(f._match_traceback(t1)) + self.assertTrue(f._match_traceback(t2)) + self.assertFalse(f._match_traceback(t3)) + self.assertFalse(f._match_traceback(unknown)) + + f = tracemalloc.Filter(False, "b.py", all_frames=True) + self.assertFalse(f._match_traceback(t1)) + self.assertFalse(f._match_traceback(t2)) + self.assertTrue(f._match_traceback(t3)) + self.assertTrue(f._match_traceback(unknown)) + + f = tracemalloc.Filter(False, "b.py", all_frames=False) + self.assertTrue(f._match_traceback(t1)) + self.assertFalse(f._match_traceback(t2)) + self.assertTrue(f._match_traceback(t3)) + self.assertTrue(f._match_traceback(unknown)) + + f = tracemalloc.Filter(False, "<unknown>", all_frames=False) + self.assertTrue(f._match_traceback(t1)) + self.assertTrue(f._match_traceback(t2)) + self.assertTrue(f._match_traceback(t3)) + self.assertFalse(f._match_traceback(unknown)) + + f = tracemalloc.Filter(True, "<unknown>", all_frames=True) + self.assertFalse(f._match_traceback(t1)) + self.assertFalse(f._match_traceback(t2)) + self.assertTrue(f._match_traceback(t3)) + self.assertTrue(f._match_traceback(unknown)) + + f = tracemalloc.Filter(False, "<unknown>", all_frames=True) + self.assertTrue(f._match_traceback(t1)) + self.assertTrue(f._match_traceback(t2)) + self.assertFalse(f._match_traceback(t3)) + self.assertFalse(f._match_traceback(unknown)) + + +class TestCommandLine(unittest.TestCase): + def test_env_var(self): + # not tracing by default + code = 'import tracemalloc; print(tracemalloc.is_tracing())' + ok, stdout, stderr = assert_python_ok('-c', code) + stdout = stdout.rstrip() + self.assertEqual(stdout, b'False') + + # PYTHON* environment varibles must be ignored when -E option is + # present + code = 'import tracemalloc; print(tracemalloc.is_tracing())' + ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1') + stdout = stdout.rstrip() + self.assertEqual(stdout, b'False') + + # tracing at startup + code = 'import tracemalloc; print(tracemalloc.is_tracing())' + ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1') + stdout = stdout.rstrip() + self.assertEqual(stdout, b'True') + + # start and set the number of frames + code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' + ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10') + stdout = stdout.rstrip() + self.assertEqual(stdout, b'10') + + def test_env_var_invalid(self): + for nframe in (-1, 0, 5000): + with self.subTest(nframe=nframe): + with support.SuppressCrashReport(): + ok, stdout, stderr = assert_python_failure( + '-c', 'pass', + PYTHONTRACEMALLOC=str(nframe)) + self.assertIn(b'PYTHONTRACEMALLOC must be an integer ' + b'in range [1; 100]', + stderr) + + def test_sys_xoptions(self): + for xoptions, nframe in ( + ('tracemalloc', 1), + ('tracemalloc=1', 1), + ('tracemalloc=15', 15), + ): + with self.subTest(xoptions=xoptions, nframe=nframe): + code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' + ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code) + stdout = stdout.rstrip() + self.assertEqual(stdout, str(nframe).encode('ascii')) + + def test_sys_xoptions_invalid(self): + for nframe in (-1, 0, 5000): + with self.subTest(nframe=nframe): + with support.SuppressCrashReport(): + args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass') + ok, stdout, stderr = assert_python_failure(*args) + self.assertIn(b'-X tracemalloc=NFRAME: number of frame must ' + b'be an integer in range [1; 100]', + stderr) + + +def test_main(): + support.run_unittest( + TestTracemallocEnabled, + TestSnapshot, + TestFilters, + TestCommandLine, + ) + +if __name__ == "__main__": + test_main() |