diff options
Diffstat (limited to 'Lib/test/test_weakref.py')
-rw-r--r-- | Lib/test/test_weakref.py | 316 |
1 files changed, 315 insertions, 1 deletions
diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py index 571e33f..551d95c 100644 --- a/Lib/test/test_weakref.py +++ b/Lib/test/test_weakref.py @@ -7,11 +7,15 @@ import operator import contextlib import copy -from test import support +from test import support, script_helper # Used in ReferencesTestCase.test_ref_created_during_del() . ref_from_del = None +# Used by FinalizeTestCase as a global that may be replaced by None +# when the interpreter shuts down. +_global_var = 'foobar' + class C: def method(self): pass @@ -47,6 +51,11 @@ class Object: return NotImplemented def __hash__(self): return hash(self.arg) + def some_method(self): + return 4 + def other_method(self): + return 5 + class RefCycle: def __init__(self): @@ -797,6 +806,30 @@ class ReferencesTestCase(TestBase): del root gc.collect() + def test_callback_attribute(self): + x = Object(1) + callback = lambda ref: None + ref1 = weakref.ref(x, callback) + self.assertIs(ref1.__callback__, callback) + + ref2 = weakref.ref(x) + self.assertIsNone(ref2.__callback__) + + def test_callback_attribute_after_deletion(self): + x = Object(1) + ref = weakref.ref(x, self.callback) + self.assertIsNotNone(ref.__callback__) + del x + support.gc_collect() + self.assertIsNone(ref.__callback__) + + def test_set_callback_attribute(self): + x = Object(1) + callback = lambda ref: None + ref1 = weakref.ref(x, callback) + with self.assertRaises(AttributeError): + ref1.__callback__ = lambda ref: None + class SubclassableWeakrefTestCase(TestBase): @@ -901,6 +934,140 @@ class SubclassableWeakrefTestCase(TestBase): self.assertEqual(self.cbcalled, 0) +class WeakMethodTestCase(unittest.TestCase): + + def _subclass(self): + """Return a Object subclass overriding `some_method`.""" + class C(Object): + def some_method(self): + return 6 + return C + + def test_alive(self): + o = Object(1) + r = weakref.WeakMethod(o.some_method) + self.assertIsInstance(r, weakref.ReferenceType) + self.assertIsInstance(r(), type(o.some_method)) + self.assertIs(r().__self__, o) + self.assertIs(r().__func__, o.some_method.__func__) + self.assertEqual(r()(), 4) + + def test_object_dead(self): + o = Object(1) + r = weakref.WeakMethod(o.some_method) + del o + gc.collect() + self.assertIs(r(), None) + + def test_method_dead(self): + C = self._subclass() + o = C(1) + r = weakref.WeakMethod(o.some_method) + del C.some_method + gc.collect() + self.assertIs(r(), None) + + def test_callback_when_object_dead(self): + # Test callback behaviour when object dies first. + C = self._subclass() + calls = [] + def cb(arg): + calls.append(arg) + o = C(1) + r = weakref.WeakMethod(o.some_method, cb) + del o + gc.collect() + self.assertEqual(calls, [r]) + # Callback is only called once. + C.some_method = Object.some_method + gc.collect() + self.assertEqual(calls, [r]) + + def test_callback_when_method_dead(self): + # Test callback behaviour when method dies first. + C = self._subclass() + calls = [] + def cb(arg): + calls.append(arg) + o = C(1) + r = weakref.WeakMethod(o.some_method, cb) + del C.some_method + gc.collect() + self.assertEqual(calls, [r]) + # Callback is only called once. + del o + gc.collect() + self.assertEqual(calls, [r]) + + @support.cpython_only + def test_no_cycles(self): + # A WeakMethod doesn't create any reference cycle to itself. + o = Object(1) + def cb(_): + pass + r = weakref.WeakMethod(o.some_method, cb) + wr = weakref.ref(r) + del r + self.assertIs(wr(), None) + + def test_equality(self): + def _eq(a, b): + self.assertTrue(a == b) + self.assertFalse(a != b) + def _ne(a, b): + self.assertTrue(a != b) + self.assertFalse(a == b) + x = Object(1) + y = Object(1) + a = weakref.WeakMethod(x.some_method) + b = weakref.WeakMethod(y.some_method) + c = weakref.WeakMethod(x.other_method) + d = weakref.WeakMethod(y.other_method) + # Objects equal, same method + _eq(a, b) + _eq(c, d) + # Objects equal, different method + _ne(a, c) + _ne(a, d) + _ne(b, c) + _ne(b, d) + # Objects unequal, same or different method + z = Object(2) + e = weakref.WeakMethod(z.some_method) + f = weakref.WeakMethod(z.other_method) + _ne(a, e) + _ne(a, f) + _ne(b, e) + _ne(b, f) + del x, y, z + gc.collect() + # Dead WeakMethods compare by identity + refs = a, b, c, d, e, f + for q in refs: + for r in refs: + self.assertEqual(q == r, q is r) + self.assertEqual(q != r, q is not r) + + def test_hashing(self): + # Alive WeakMethods are hashable if the underlying object is + # hashable. + x = Object(1) + y = Object(1) + a = weakref.WeakMethod(x.some_method) + b = weakref.WeakMethod(y.some_method) + c = weakref.WeakMethod(y.other_method) + # Since WeakMethod objects are equal, the hashes should be equal. + self.assertEqual(hash(a), hash(b)) + ha = hash(a) + # Dead WeakMethods retain their old hash value + del x, y + gc.collect() + self.assertEqual(hash(a), ha) + self.assertEqual(hash(b), ha) + # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed. + self.assertRaises(TypeError, hash, c) + + class MappingTestCase(TestBase): COUNT = 10 @@ -1388,6 +1555,151 @@ class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): def _reference(self): return self.__ref.copy() + +class FinalizeTestCase(unittest.TestCase): + + class A: + pass + + def _collect_if_necessary(self): + # we create no ref-cycles so in CPython no gc should be needed + if sys.implementation.name != 'cpython': + support.gc_collect() + + def test_finalize(self): + def add(x,y,z): + res.append(x + y + z) + return x + y + z + + a = self.A() + + res = [] + f = weakref.finalize(a, add, 67, 43, z=89) + self.assertEqual(f.alive, True) + self.assertEqual(f.peek(), (a, add, (67,43), {'z':89})) + self.assertEqual(f(), 199) + self.assertEqual(f(), None) + self.assertEqual(f(), None) + self.assertEqual(f.peek(), None) + self.assertEqual(f.detach(), None) + self.assertEqual(f.alive, False) + self.assertEqual(res, [199]) + + res = [] + f = weakref.finalize(a, add, 67, 43, 89) + self.assertEqual(f.peek(), (a, add, (67,43,89), {})) + self.assertEqual(f.detach(), (a, add, (67,43,89), {})) + self.assertEqual(f(), None) + self.assertEqual(f(), None) + self.assertEqual(f.peek(), None) + self.assertEqual(f.detach(), None) + self.assertEqual(f.alive, False) + self.assertEqual(res, []) + + res = [] + f = weakref.finalize(a, add, x=67, y=43, z=89) + del a + self._collect_if_necessary() + self.assertEqual(f(), None) + self.assertEqual(f(), None) + self.assertEqual(f.peek(), None) + self.assertEqual(f.detach(), None) + self.assertEqual(f.alive, False) + self.assertEqual(res, [199]) + + def test_order(self): + a = self.A() + res = [] + + f1 = weakref.finalize(a, res.append, 'f1') + f2 = weakref.finalize(a, res.append, 'f2') + f3 = weakref.finalize(a, res.append, 'f3') + f4 = weakref.finalize(a, res.append, 'f4') + f5 = weakref.finalize(a, res.append, 'f5') + + # make sure finalizers can keep themselves alive + del f1, f4 + + self.assertTrue(f2.alive) + self.assertTrue(f3.alive) + self.assertTrue(f5.alive) + + self.assertTrue(f5.detach()) + self.assertFalse(f5.alive) + + f5() # nothing because previously unregistered + res.append('A') + f3() # => res.append('f3') + self.assertFalse(f3.alive) + res.append('B') + f3() # nothing because previously called + res.append('C') + del a + self._collect_if_necessary() + # => res.append('f4') + # => res.append('f2') + # => res.append('f1') + self.assertFalse(f2.alive) + res.append('D') + f2() # nothing because previously called by gc + + expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D'] + self.assertEqual(res, expected) + + def test_all_freed(self): + # we want a weakrefable subclass of weakref.finalize + class MyFinalizer(weakref.finalize): + pass + + a = self.A() + res = [] + def callback(): + res.append(123) + f = MyFinalizer(a, callback) + + wr_callback = weakref.ref(callback) + wr_f = weakref.ref(f) + del callback, f + + self.assertIsNotNone(wr_callback()) + self.assertIsNotNone(wr_f()) + + del a + self._collect_if_necessary() + + self.assertIsNone(wr_callback()) + self.assertIsNone(wr_f()) + self.assertEqual(res, [123]) + + @classmethod + def run_in_child(cls): + def error(): + # Create an atexit finalizer from inside a finalizer called + # at exit. This should be the next to be run. + g1 = weakref.finalize(cls, print, 'g1') + print('f3 error') + 1/0 + + # cls should stay alive till atexit callbacks run + f1 = weakref.finalize(cls, print, 'f1', _global_var) + f2 = weakref.finalize(cls, print, 'f2', _global_var) + f3 = weakref.finalize(cls, error) + f4 = weakref.finalize(cls, print, 'f4', _global_var) + + assert f1.atexit == True + f2.atexit = False + assert f3.atexit == True + assert f4.atexit == True + + def test_atexit(self): + prog = ('from test.test_weakref import FinalizeTestCase;'+ + 'FinalizeTestCase.run_in_child()') + rc, out, err = script_helper.assert_python_ok('-c', prog) + out = out.decode('ascii').splitlines() + self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar']) + self.assertTrue(b'ZeroDivisionError' in err) + + libreftest = """ Doctest for examples in the library reference: weakref.rst >>> import weakref @@ -1476,10 +1788,12 @@ __test__ = {'libreftest' : libreftest} def test_main(): support.run_unittest( ReferencesTestCase, + WeakMethodTestCase, MappingTestCase, WeakValueDictionaryTestCase, WeakKeyDictionaryTestCase, SubclassableWeakrefTestCase, + FinalizeTestCase, ) support.run_doctest(sys.modules[__name__]) |