diff options
Diffstat (limited to 'Lib/test/test_gc.py')
-rw-r--r-- | Lib/test/test_gc.py | 143 |
1 files changed, 141 insertions, 2 deletions
diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py index 100c767..d35f9ed 100644 --- a/Lib/test/test_gc.py +++ b/Lib/test/test_gc.py @@ -1,5 +1,6 @@ import unittest -from test.support import verbose, run_unittest, strip_python_stderr +from test.support import (verbose, refcount_test, run_unittest, + strip_python_stderr) import sys import gc import weakref @@ -31,6 +32,20 @@ class GC_Detector(object): # gc collects it. self.wr = weakref.ref(C1055820(666), it_happened) +class Uncollectable(object): + """Create a reference cycle with multiple __del__ methods. + + An object in a reference cycle will never have zero references, + and so must be garbage collected. If one or more objects in the + cycle have __del__ methods, the gc refuses to guess an order, + and leaves the cycle uncollected.""" + def __init__(self, partner=None): + if partner is None: + self.partner = Uncollectable(partner=self) + else: + self.partner = partner + def __del__(self): + pass ### Tests ############################################################################### @@ -175,6 +190,7 @@ class GCTests(unittest.TestCase): del d self.assertEqual(gc.collect(), 2) + @refcount_test def test_frame(self): def f(): frame = sys._getframe() @@ -242,6 +258,7 @@ class GCTests(unittest.TestCase): # For example, disposed tuples are not freed, but reused. # To minimize variations, though, we first store the get_count() results # and check them at the end. + @refcount_test def test_get_count(self): gc.collect() a, b, c = gc.get_count() @@ -255,6 +272,7 @@ class GCTests(unittest.TestCase): # created (the list). self.assertGreater(d, a) + @refcount_test def test_collect_generations(self): gc.collect() # This object will "trickle" into generation N + 1 after @@ -524,6 +542,127 @@ class GCTests(unittest.TestCase): self.assertNotIn(b"uncollectable objects at shutdown", stderr) +class GCCallbackTests(unittest.TestCase): + def setUp(self): + # Save gc state and disable it. + self.enabled = gc.isenabled() + gc.disable() + self.debug = gc.get_debug() + gc.set_debug(0) + gc.callbacks.append(self.cb1) + gc.callbacks.append(self.cb2) + self.othergarbage = [] + + def tearDown(self): + # Restore gc state + del self.visit + gc.callbacks.remove(self.cb1) + gc.callbacks.remove(self.cb2) + gc.set_debug(self.debug) + if self.enabled: + gc.enable() + # destroy any uncollectables + gc.collect() + for obj in gc.garbage: + if isinstance(obj, Uncollectable): + obj.partner = None + del gc.garbage[:] + del self.othergarbage + gc.collect() + + def preclean(self): + # Remove all fluff from the system. Invoke this function + # manually rather than through self.setUp() for maximum + # safety. + self.visit = [] + gc.collect() + garbage, gc.garbage[:] = gc.garbage[:], [] + self.othergarbage.append(garbage) + self.visit = [] + + def cb1(self, phase, info): + self.visit.append((1, phase, dict(info))) + + def cb2(self, phase, info): + self.visit.append((2, phase, dict(info))) + if phase == "stop" and hasattr(self, "cleanup"): + # Clean Uncollectable from garbage + uc = [e for e in gc.garbage if isinstance(e, Uncollectable)] + gc.garbage[:] = [e for e in gc.garbage + if not isinstance(e, Uncollectable)] + for e in uc: + e.partner = None + + def test_collect(self): + self.preclean() + gc.collect() + # Algorithmically verify the contents of self.visit + # because it is long and tortuous. + + # Count the number of visits to each callback + n = [v[0] for v in self.visit] + n1 = [i for i in n if i == 1] + n2 = [i for i in n if i == 2] + self.assertEqual(n1, [1]*2) + self.assertEqual(n2, [2]*2) + + # Count that we got the right number of start and stop callbacks. + n = [v[1] for v in self.visit] + n1 = [i for i in n if i == "start"] + n2 = [i for i in n if i == "stop"] + self.assertEqual(n1, ["start"]*2) + self.assertEqual(n2, ["stop"]*2) + + # Check that we got the right info dict for all callbacks + for v in self.visit: + info = v[2] + self.assertTrue("generation" in info) + self.assertTrue("collected" in info) + self.assertTrue("uncollectable" in info) + + def test_collect_generation(self): + self.preclean() + gc.collect(2) + for v in self.visit: + info = v[2] + self.assertEqual(info["generation"], 2) + + def test_collect_garbage(self): + self.preclean() + # Each of these cause four objects to be garbage: Two + # Uncolectables and their instance dicts. + Uncollectable() + Uncollectable() + C1055820(666) + gc.collect() + for v in self.visit: + if v[1] != "stop": + continue + info = v[2] + self.assertEqual(info["collected"], 2) + self.assertEqual(info["uncollectable"], 8) + + # We should now have the Uncollectables in gc.garbage + self.assertEqual(len(gc.garbage), 4) + for e in gc.garbage: + self.assertIsInstance(e, Uncollectable) + + # Now, let our callback handle the Uncollectable instances + self.cleanup=True + self.visit = [] + gc.garbage[:] = [] + gc.collect() + for v in self.visit: + if v[1] != "stop": + continue + info = v[2] + self.assertEqual(info["collected"], 0) + self.assertEqual(info["uncollectable"], 4) + + # Uncollectables should be gone + self.assertEqual(len(gc.garbage), 0) + + class GCTogglingTests(unittest.TestCase): def setUp(self): gc.enable() @@ -677,7 +816,7 @@ def test_main(): try: gc.collect() # Delete 2nd generation garbage - run_unittest(GCTests, GCTogglingTests) + run_unittest(GCTests, GCTogglingTests, GCCallbackTests) finally: gc.set_debug(debug) # test gc.enable() even if GC is disabled by default |