diff options
Diffstat (limited to 'Lib/test/test_weakref.py')
-rw-r--r-- | Lib/test/test_weakref.py | 775 |
1 files changed, 121 insertions, 654 deletions
diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py index 228bc17..418481d 100644 --- a/Lib/test/test_weakref.py +++ b/Lib/test/test_weakref.py @@ -1,25 +1,18 @@ import gc import sys import unittest -import collections +import UserList import weakref import operator import contextlib import copy -import threading import time -import random -from test import support -from test.support import script_helper, ALWAYS_EQ +from test import test_support # Used in ReferencesTestCase.test_ref_created_during_del() . ref_from_del = None -# Used by FinalizeTestCase as a global that may be replaced by None -# when the interpreter shuts down. -_global_var = 'foobar' - class C: def method(self): pass @@ -39,6 +32,9 @@ def create_function(): def create_bound_method(): return C().method +def create_unbound_method(): + return C.method + class Object: def __init__(self, arg): @@ -49,37 +45,24 @@ class Object: if isinstance(other, Object): return self.arg == other.arg return NotImplemented - def __lt__(self, other): + def __ne__(self, other): if isinstance(other, Object): - return self.arg < other.arg + return self.arg != other.arg return NotImplemented def __hash__(self): return hash(self.arg) - def some_method(self): - return 4 - def other_method(self): - return 5 - class RefCycle: def __init__(self): self.cycle = self -class TestBase(unittest.TestCase): - - def setUp(self): - self.cbcalled = 0 - - def callback(self, ref): - self.cbcalled += 1 - - @contextlib.contextmanager -def collect_in_thread(period=0.0001): +def collect_in_thread(period=0.001): """ Ensure GC collections happen in a different thread, at a high frequency. """ + threading = test_support.import_module('threading') please_stop = False def collect(): @@ -87,7 +70,9 @@ def collect_in_thread(period=0.0001): time.sleep(period) gc.collect() - with support.disable_gc(): + with test_support.disable_gc(): + old_interval = sys.getcheckinterval() + sys.setcheckinterval(20) t = threading.Thread(target=collect) t.start() try: @@ -95,6 +80,16 @@ def collect_in_thread(period=0.0001): finally: please_stop = True t.join() + sys.setcheckinterval(old_interval) + + +class TestBase(unittest.TestCase): + + def setUp(self): + self.cbcalled = 0 + + def callback(self, ref): + self.cbcalled += 1 class ReferencesTestCase(TestBase): @@ -103,6 +98,7 @@ class ReferencesTestCase(TestBase): self.check_basic_ref(C) self.check_basic_ref(create_function) self.check_basic_ref(create_bound_method) + self.check_basic_ref(create_unbound_method) # Just make sure the tp_repr handler doesn't raise an exception. # Live reference: @@ -117,18 +113,7 @@ class ReferencesTestCase(TestBase): self.check_basic_callback(C) self.check_basic_callback(create_function) self.check_basic_callback(create_bound_method) - - @support.cpython_only - def test_cfunction(self): - import _testcapi - create_cfunction = _testcapi.create_cfunction - f = create_cfunction() - wr = weakref.ref(f) - self.assertIs(wr(), f) - del f - self.assertIsNone(wr()) - self.check_basic_ref(create_cfunction) - self.check_basic_callback(create_cfunction) + self.check_basic_callback(create_unbound_method) def test_multiple_callbacks(self): o = C() @@ -172,9 +157,9 @@ class ReferencesTestCase(TestBase): def check(proxy): proxy.bar - self.assertRaises(ReferenceError, check, ref1) - self.assertRaises(ReferenceError, check, ref2) - self.assertRaises(ReferenceError, bool, weakref.proxy(C())) + self.assertRaises(weakref.ReferenceError, check, ref1) + self.assertRaises(weakref.ReferenceError, check, ref2) + self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C())) self.assertEqual(self.cbcalled, 2) def check_basic_ref(self, factory): @@ -230,40 +215,42 @@ class ReferencesTestCase(TestBase): o = C() self.check_proxy(o, weakref.proxy(o)) - L = collections.UserList() + L = UserList.UserList() p = weakref.proxy(L) self.assertFalse(p, "proxy for empty UserList should be false") p.append(12) self.assertEqual(len(L), 1) self.assertTrue(p, "proxy for non-empty UserList should be true") - p[:] = [2, 3] + with test_support.check_py3k_warnings(): + p[:] = [2, 3] self.assertEqual(len(L), 2) self.assertEqual(len(p), 2) self.assertIn(3, p, "proxy didn't support __contains__() properly") p[1] = 5 self.assertEqual(L[1], 5) self.assertEqual(p[1], 5) - L2 = collections.UserList(L) + L2 = UserList.UserList(L) p2 = weakref.proxy(L2) self.assertEqual(p, p2) ## self.assertEqual(repr(L2), repr(p2)) - L3 = collections.UserList(range(10)) + L3 = UserList.UserList(range(10)) p3 = weakref.proxy(L3) - self.assertEqual(L3[:], p3[:]) - self.assertEqual(L3[5:], p3[5:]) - self.assertEqual(L3[:5], p3[:5]) - self.assertEqual(L3[2:5], p3[2:5]) + with test_support.check_py3k_warnings(): + self.assertEqual(L3[:], p3[:]) + self.assertEqual(L3[5:], p3[5:]) + self.assertEqual(L3[:5], p3[:5]) + self.assertEqual(L3[2:5], p3[2:5]) def test_proxy_unicode(self): # See bug 5037 class C(object): def __str__(self): return "string" - def __bytes__(self): - return b"bytes" + def __unicode__(self): + return u"unicode" instance = C() - self.assertIn("__bytes__", dir(weakref.proxy(instance))) - self.assertEqual(bytes(weakref.proxy(instance)), b"bytes") + self.assertIn("__unicode__", dir(weakref.proxy(instance))) + self.assertEqual(unicode(weakref.proxy(instance)), u"unicode") def test_proxy_index(self): class C: @@ -285,21 +272,6 @@ class ReferencesTestCase(TestBase): p //= 5 self.assertEqual(p, 21) - def test_proxy_matmul(self): - class C: - def __matmul__(self, other): - return 1729 - def __rmatmul__(self, other): - return -163 - def __imatmul__(self, other): - return 561 - o = C() - p = weakref.proxy(o) - self.assertEqual(p @ 5, 1729) - self.assertEqual(5 @ p, -163) - p @= 5 - self.assertEqual(p, 561) - # The PyWeakref_* C API is documented as allowing either NULL or # None as the value for the callback, where either means "no # callback". The "no callback" ref and proxy objects are supposed @@ -391,26 +363,6 @@ class ReferencesTestCase(TestBase): lyst = List() self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst)) - def test_proxy_iter(self): - # Test fails with a debug build of the interpreter - # (see bpo-38395). - - obj = None - - class MyObj: - def __iter__(self): - nonlocal obj - del obj - return NotImplemented - - obj = MyObj() - p = weakref.proxy(obj) - with self.assertRaises(TypeError): - # "blech" in p calls MyObj.__iter__ through the proxy, - # without keeping a reference to the real object, so it - # can be killed in the middle of the call - "blech" in p - def test_getweakrefcount(self): o = C() ref1 = weakref.ref(o) @@ -649,7 +601,7 @@ class ReferencesTestCase(TestBase): del c1, c2, C, D gc.collect() - @support.requires_type_collecting + @test_support.requires_type_collecting def test_callback_in_cycle_resurrection(self): import gc @@ -786,9 +738,12 @@ class ReferencesTestCase(TestBase): gc.collect() def test_classes(self): - # Check that classes are weakrefable. + # Check that both old-style classes and new-style classes + # are weakrefable. class A(object): pass + class B: + pass l = [] weakref.ref(int) a = weakref.ref(A, l.append) @@ -796,6 +751,11 @@ class ReferencesTestCase(TestBase): gc.collect() self.assertEqual(a(), None) self.assertEqual(l, [a]) + b = weakref.ref(B, l.append) + B = None + gc.collect() + self.assertEqual(b(), None) + self.assertEqual(l, [a, b]) def test_equality(self): # Alive weakrefs defer equality testing to their underlying object. @@ -814,10 +774,6 @@ class ReferencesTestCase(TestBase): self.assertTrue(a != c) self.assertTrue(a == d) self.assertFalse(a != d) - self.assertFalse(a == x) - self.assertTrue(a != x) - self.assertTrue(a == ALWAYS_EQ) - self.assertFalse(a != ALWAYS_EQ) del x, y, z gc.collect() for r in a, b, c: @@ -834,21 +790,6 @@ class ReferencesTestCase(TestBase): self.assertEqual(a == d, a is d) self.assertEqual(a != d, a is not d) - def test_ordering(self): - # weakrefs cannot be ordered, even if the underlying objects can. - ops = [operator.lt, operator.gt, operator.le, operator.ge] - x = Object(1) - y = Object(1) - a = weakref.ref(x) - b = weakref.ref(y) - for op in ops: - self.assertRaises(TypeError, op, a, b) - # Same when dead. - del x, y - gc.collect() - for op in ops: - self.assertRaises(TypeError, op, a, b) - def test_hashing(self): # Alive weakrefs hash the same as the underlying object x = Object(42) @@ -869,7 +810,7 @@ class ReferencesTestCase(TestBase): # deallocation chain, the trashcan mechanism could delay clearing # of the weakref and make the target object visible from outside # code even though its refcount had dropped to 0. A crash ensued. - class C: + class C(object): def __init__(self, parent): if not parent: return @@ -885,38 +826,6 @@ class ReferencesTestCase(TestBase): del root gc.collect() - def test_callback_attribute(self): - x = Object(1) - callback = lambda ref: None - ref1 = weakref.ref(x, callback) - self.assertIs(ref1.__callback__, callback) - - ref2 = weakref.ref(x) - self.assertIsNone(ref2.__callback__) - - def test_callback_attribute_after_deletion(self): - x = Object(1) - ref = weakref.ref(x, self.callback) - self.assertIsNotNone(ref.__callback__) - del x - support.gc_collect() - self.assertIsNone(ref.__callback__) - - def test_set_callback_attribute(self): - x = Object(1) - callback = lambda ref: None - ref1 = weakref.ref(x, callback) - with self.assertRaises(AttributeError): - ref1.__callback__ = lambda ref: None - - def test_callback_gcs(self): - class ObjectWithDel(Object): - def __del__(self): pass - x = ObjectWithDel(1) - ref1 = weakref.ref(x, lambda ref: support.gc_collect()) - del x - support.gc_collect() - class SubclassableWeakrefTestCase(TestBase): @@ -924,10 +833,10 @@ class SubclassableWeakrefTestCase(TestBase): class MyRef(weakref.ref): def __init__(self, ob, callback=None, value=42): self.value = value - super().__init__(ob, callback) + super(MyRef, self).__init__(ob, callback) def __call__(self): self.called = True - return super().__call__() + return super(MyRef, self).__call__() o = Object("foo") mr = MyRef(o, value=24) self.assertIs(mr(), o) @@ -983,7 +892,7 @@ class SubclassableWeakrefTestCase(TestBase): self.assertFalse(hasattr(r, "__dict__")) def test_subclass_refs_with_cycle(self): - """Confirm https://bugs.python.org/issue3100 is fixed.""" + # Bug #3110 # An instance of a weakref subclass can have attributes. # If such a weakref holds the only strong reference to the object, # deleting the weakref will delete the object. In this case, @@ -1021,143 +930,6 @@ class SubclassableWeakrefTestCase(TestBase): self.assertEqual(self.cbcalled, 0) -class WeakMethodTestCase(unittest.TestCase): - - def _subclass(self): - """Return an Object subclass overriding `some_method`.""" - class C(Object): - def some_method(self): - return 6 - return C - - def test_alive(self): - o = Object(1) - r = weakref.WeakMethod(o.some_method) - self.assertIsInstance(r, weakref.ReferenceType) - self.assertIsInstance(r(), type(o.some_method)) - self.assertIs(r().__self__, o) - self.assertIs(r().__func__, o.some_method.__func__) - self.assertEqual(r()(), 4) - - def test_object_dead(self): - o = Object(1) - r = weakref.WeakMethod(o.some_method) - del o - gc.collect() - self.assertIs(r(), None) - - def test_method_dead(self): - C = self._subclass() - o = C(1) - r = weakref.WeakMethod(o.some_method) - del C.some_method - gc.collect() - self.assertIs(r(), None) - - def test_callback_when_object_dead(self): - # Test callback behaviour when object dies first. - C = self._subclass() - calls = [] - def cb(arg): - calls.append(arg) - o = C(1) - r = weakref.WeakMethod(o.some_method, cb) - del o - gc.collect() - self.assertEqual(calls, [r]) - # Callback is only called once. - C.some_method = Object.some_method - gc.collect() - self.assertEqual(calls, [r]) - - def test_callback_when_method_dead(self): - # Test callback behaviour when method dies first. - C = self._subclass() - calls = [] - def cb(arg): - calls.append(arg) - o = C(1) - r = weakref.WeakMethod(o.some_method, cb) - del C.some_method - gc.collect() - self.assertEqual(calls, [r]) - # Callback is only called once. - del o - gc.collect() - self.assertEqual(calls, [r]) - - @support.cpython_only - def test_no_cycles(self): - # A WeakMethod doesn't create any reference cycle to itself. - o = Object(1) - def cb(_): - pass - r = weakref.WeakMethod(o.some_method, cb) - wr = weakref.ref(r) - del r - self.assertIs(wr(), None) - - def test_equality(self): - def _eq(a, b): - self.assertTrue(a == b) - self.assertFalse(a != b) - def _ne(a, b): - self.assertTrue(a != b) - self.assertFalse(a == b) - x = Object(1) - y = Object(1) - a = weakref.WeakMethod(x.some_method) - b = weakref.WeakMethod(y.some_method) - c = weakref.WeakMethod(x.other_method) - d = weakref.WeakMethod(y.other_method) - # Objects equal, same method - _eq(a, b) - _eq(c, d) - # Objects equal, different method - _ne(a, c) - _ne(a, d) - _ne(b, c) - _ne(b, d) - # Objects unequal, same or different method - z = Object(2) - e = weakref.WeakMethod(z.some_method) - f = weakref.WeakMethod(z.other_method) - _ne(a, e) - _ne(a, f) - _ne(b, e) - _ne(b, f) - # Compare with different types - _ne(a, x.some_method) - _eq(a, ALWAYS_EQ) - del x, y, z - gc.collect() - # Dead WeakMethods compare by identity - refs = a, b, c, d, e, f - for q in refs: - for r in refs: - self.assertEqual(q == r, q is r) - self.assertEqual(q != r, q is not r) - - def test_hashing(self): - # Alive WeakMethods are hashable if the underlying object is - # hashable. - x = Object(1) - y = Object(1) - a = weakref.WeakMethod(x.some_method) - b = weakref.WeakMethod(y.some_method) - c = weakref.WeakMethod(y.other_method) - # Since WeakMethod objects are equal, the hashes should be equal. - self.assertEqual(hash(a), hash(b)) - ha = hash(a) - # Dead WeakMethods retain their old hash value - del x, y - gc.collect() - self.assertEqual(hash(a), ha) - self.assertEqual(hash(b), ha) - # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed. - self.assertRaises(TypeError, hash, c) - - class MappingTestCase(TestBase): COUNT = 10 @@ -1165,9 +937,9 @@ class MappingTestCase(TestBase): def check_len_cycles(self, dict_type, cons): N = 20 items = [RefCycle() for i in range(N)] - dct = dict_type(cons(o) for o in items) + dct = dict_type(cons(i, o) for i, o in enumerate(items)) # Keep an iterator alive - it = dct.items() + it = dct.iteritems() try: next(it) except StopIteration: @@ -1175,18 +947,23 @@ class MappingTestCase(TestBase): del items gc.collect() n1 = len(dct) + list(it) del it gc.collect() n2 = len(dct) - # one item may be kept alive inside the iterator - self.assertIn(n1, (0, 1)) + # iteration should prevent garbage collection here + # Note that this is a test on an implementation detail. The requirement + # is only to provide stable iteration, not that the size of the container + # stay fixed. + self.assertEqual(n1, 20) + #self.assertIn(n1, (0, 1)) self.assertEqual(n2, 0) def test_weak_keyed_len_cycles(self): - self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1)) + self.check_len_cycles(weakref.WeakKeyDictionary, lambda n, k: (k, n)) def test_weak_valued_len_cycles(self): - self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k)) + self.check_len_cycles(weakref.WeakValueDictionary, lambda n, k: (n, k)) def check_len_race(self, dict_type, cons): # Extended sanity checks for len() in the face of cyclic collection @@ -1199,7 +976,7 @@ class MappingTestCase(TestBase): dct = dict_type(cons(o) for o in items) del items # All items will be collected at next garbage collection pass - it = dct.items() + it = dct.iteritems() try: next(it) except StopIteration: @@ -1224,11 +1001,12 @@ class MappingTestCase(TestBase): # dict, objects = self.make_weak_valued_dict() for o in objects: - self.assertEqual(weakref.getweakrefcount(o), 1) + self.assertEqual(weakref.getweakrefcount(o), 1, + "wrong number of weak references to %r!" % o) self.assertIs(o, dict[o.arg], "wrong object returned by weak dict!") - items1 = list(dict.items()) - items2 = list(dict.copy().items()) + items1 = dict.items() + items2 = dict.copy().items() items1.sort() items2.sort() self.assertEqual(items1, items2, @@ -1236,7 +1014,7 @@ class MappingTestCase(TestBase): del items1, items2 self.assertEqual(len(dict), self.COUNT) del objects[0] - self.assertEqual(len(dict), self.COUNT - 1, + self.assertEqual(len(dict), (self.COUNT - 1), "deleting object did not cause dictionary update") del objects, o self.assertEqual(len(dict), 0, @@ -1250,7 +1028,7 @@ class MappingTestCase(TestBase): def test_weak_keys(self): # # This exercises d.copy(), d.items(), d[] = v, d[], del d[], - # len(d), k in d. + # len(d), in d. # dict, objects = self.make_weak_keyed_dict() for o in objects: @@ -1286,18 +1064,16 @@ class MappingTestCase(TestBase): for wr in refs: ob = wr() self.assertIn(ob, dict) - self.assertIn(ob, dict) self.assertEqual(ob.arg, dict[ob]) objects2.remove(ob) self.assertEqual(len(objects2), 0) # Test iterkeyrefs() objects2 = list(objects) - self.assertEqual(len(list(dict.keyrefs())), len(objects)) - for wr in dict.keyrefs(): + self.assertEqual(len(list(dict.iterkeyrefs())), len(objects)) + for wr in dict.iterkeyrefs(): ob = wr() self.assertIn(ob, dict) - self.assertIn(ob, dict) self.assertEqual(ob.arg, dict[ob]) objects2.remove(ob) self.assertEqual(len(objects2), 0) @@ -1329,28 +1105,28 @@ class MappingTestCase(TestBase): def check_iters(self, dict): # item iterator: - items = list(dict.items()) - for item in dict.items(): + items = dict.items() + for item in dict.iteritems(): items.remove(item) - self.assertFalse(items, "items() did not touch all items") + self.assertEqual(len(items), 0, "iteritems() did not touch all items") # key iterator, via __iter__(): - keys = list(dict.keys()) + keys = dict.keys() for k in dict: keys.remove(k) - self.assertFalse(keys, "__iter__() did not touch all keys") + self.assertEqual(len(keys), 0, "__iter__() did not touch all keys") # key iterator, via iterkeys(): - keys = list(dict.keys()) - for k in dict.keys(): + keys = dict.keys() + for k in dict.iterkeys(): keys.remove(k) - self.assertFalse(keys, "iterkeys() did not touch all keys") + self.assertEqual(len(keys), 0, "iterkeys() did not touch all keys") # value iterator: - values = list(dict.values()) - for v in dict.values(): + values = dict.values() + for v in dict.itervalues(): values.remove(v) - self.assertFalse(values, + self.assertEqual(len(values), 0, "itervalues() did not touch all values") def check_weak_destroy_while_iterating(self, dict, objects, iter_name): @@ -1373,13 +1149,13 @@ class MappingTestCase(TestBase): # weakref'ed objects and then return a new key/value pair corresponding # to the destroyed object. with testcontext() as (k, v): - self.assertNotIn(k, dict) + self.assertFalse(k in dict) with testcontext() as (k, v): self.assertRaises(KeyError, dict.__delitem__, k) - self.assertNotIn(k, dict) + self.assertFalse(k in dict) with testcontext() as (k, v): self.assertRaises(KeyError, dict.pop, k) - self.assertNotIn(k, dict) + self.assertFalse(k in dict) with testcontext() as (k, v): dict[k] = v self.assertEqual(dict[k], v) @@ -1391,51 +1167,18 @@ class MappingTestCase(TestBase): dict.clear() self.assertEqual(len(dict), 0) - def check_weak_del_and_len_while_iterating(self, dict, testcontext): - # Check that len() works when both iterating and removing keys - # explicitly through various means (.pop(), .clear()...), while - # implicit mutation is deferred because an iterator is alive. - # (each call to testcontext() should schedule one item for removal - # for this test to work properly) - o = Object(123456) - with testcontext(): - n = len(dict) - # Since underlaying dict is ordered, first item is popped - dict.pop(next(dict.keys())) - self.assertEqual(len(dict), n - 1) - dict[o] = o - self.assertEqual(len(dict), n) - # last item in objects is removed from dict in context shutdown - with testcontext(): - self.assertEqual(len(dict), n - 1) - # Then, (o, o) is popped - dict.popitem() - self.assertEqual(len(dict), n - 2) - with testcontext(): - self.assertEqual(len(dict), n - 3) - del dict[next(dict.keys())] - self.assertEqual(len(dict), n - 4) - with testcontext(): - self.assertEqual(len(dict), n - 5) - dict.popitem() - self.assertEqual(len(dict), n - 6) - with testcontext(): - dict.clear() - self.assertEqual(len(dict), 0) - self.assertEqual(len(dict), 0) - def test_weak_keys_destroy_while_iterating(self): # Issue #7105: iterators shouldn't crash when a key is implicitly removed dict, objects = self.make_weak_keyed_dict() - self.check_weak_destroy_while_iterating(dict, objects, 'keys') - self.check_weak_destroy_while_iterating(dict, objects, 'items') - self.check_weak_destroy_while_iterating(dict, objects, 'values') - self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs') + self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys') + self.check_weak_destroy_while_iterating(dict, objects, 'iteritems') + self.check_weak_destroy_while_iterating(dict, objects, 'itervalues') + self.check_weak_destroy_while_iterating(dict, objects, 'iterkeyrefs') dict, objects = self.make_weak_keyed_dict() @contextlib.contextmanager def testcontext(): try: - it = iter(dict.items()) + it = iter(dict.iteritems()) next(it) # Schedule a key/value for removal and recreate it v = objects.pop().arg @@ -1445,24 +1188,19 @@ class MappingTestCase(TestBase): it = None # should commit all removals gc.collect() self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) - # Issue #21173: len() fragile when keys are both implicitly and - # explicitly removed. - dict, objects = self.make_weak_keyed_dict() - self.check_weak_del_and_len_while_iterating(dict, testcontext) def test_weak_values_destroy_while_iterating(self): # Issue #7105: iterators shouldn't crash when a key is implicitly removed dict, objects = self.make_weak_valued_dict() - self.check_weak_destroy_while_iterating(dict, objects, 'keys') - self.check_weak_destroy_while_iterating(dict, objects, 'items') - self.check_weak_destroy_while_iterating(dict, objects, 'values') + self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys') + self.check_weak_destroy_while_iterating(dict, objects, 'iteritems') + self.check_weak_destroy_while_iterating(dict, objects, 'itervalues') self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') - self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs') dict, objects = self.make_weak_valued_dict() @contextlib.contextmanager def testcontext(): try: - it = iter(dict.items()) + it = iter(dict.iteritems()) next(it) # Schedule a key/value for removal and recreate it k = objects.pop().arg @@ -1472,8 +1210,6 @@ class MappingTestCase(TestBase): it = None # should commit all removals gc.collect() self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) - dict, objects = self.make_weak_valued_dict() - self.check_weak_del_and_len_while_iterating(dict, testcontext) def test_make_weak_keyed_dict_from_dict(self): o = Object(3) @@ -1488,22 +1224,11 @@ class MappingTestCase(TestBase): def make_weak_keyed_dict(self): dict = weakref.WeakKeyDictionary() - objects = list(map(Object, range(self.COUNT))) + objects = map(Object, range(self.COUNT)) for o in objects: dict[o] = o.arg return dict, objects - def test_make_weak_valued_dict_from_dict(self): - o = Object(3) - dict = weakref.WeakValueDictionary({364:o}) - self.assertEqual(dict[364], o) - - def test_make_weak_valued_dict_from_weak_valued_dict(self): - o = Object(3) - dict = weakref.WeakValueDictionary({364:o}) - dict2 = weakref.WeakValueDictionary(dict) - self.assertEqual(dict[364], o) - def test_make_weak_valued_dict_misc(self): # errors self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__) @@ -1511,14 +1236,14 @@ class MappingTestCase(TestBase): self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ()) # special keyword arguments o = Object(3) - for kw in 'self', 'dict', 'other', 'iterable': + for kw in 'self', 'other', 'iterable': d = weakref.WeakValueDictionary(**{kw: o}) self.assertEqual(list(d.keys()), [kw]) self.assertEqual(d[kw], o) def make_weak_valued_dict(self): dict = weakref.WeakValueDictionary() - objects = list(map(Object, range(self.COUNT))) + objects = map(Object, range(self.COUNT)) for o in objects: dict[o.arg] = o return dict, objects @@ -1576,19 +1301,21 @@ class MappingTestCase(TestBase): def check_update(self, klass, dict): # - # This exercises d.update(), len(d), d.keys(), k in d, + # This exercises d.update(), len(d), d.keys(), in d, # d.get(), d[]. # weakdict = klass() weakdict.update(dict) self.assertEqual(len(weakdict), len(dict)) for k in weakdict.keys(): - self.assertIn(k, dict, "mysterious new key appeared in weak dict") + self.assertIn(k, dict, + "mysterious new key appeared in weak dict") v = dict.get(k) self.assertIs(v, weakdict[k]) self.assertIs(v, weakdict.get(k)) for k in dict.keys(): - self.assertIn(k, weakdict, "original key disappeared in weak dict") + self.assertIn(k, weakdict, + "original key disappeared in weak dict") v = dict[k] self.assertIs(v, weakdict[k]) self.assertIs(v, weakdict.get(k)) @@ -1623,7 +1350,7 @@ class MappingTestCase(TestBase): self.assertEqual(len(d), 2) del d[o1] self.assertEqual(len(d), 1) - self.assertEqual(list(d.keys()), [o2]) + self.assertEqual(d.keys(), [o2]) def test_weak_valued_delitem(self): d = weakref.WeakValueDictionary() @@ -1634,7 +1361,7 @@ class MappingTestCase(TestBase): self.assertEqual(len(d), 2) del d['something'] self.assertEqual(len(d), 1) - self.assertEqual(list(d.items()), [('something else', o2)]) + self.assertEqual(d.items(), [('something else', o2)]) def test_weak_keyed_bad_delitem(self): d = weakref.WeakKeyDictionary() @@ -1675,12 +1402,12 @@ class MappingTestCase(TestBase): d[o] = o.value del o # now the only strong references to keys are in objs # Find the order in which iterkeys sees the keys. - objs = list(d.keys()) + objs = d.keys() # Reverse it, so that the iteration implementation of __delitem__ # has to keep looping to find the first object we delete. objs.reverse() - # Turn on mutation in C.__eq__. The first time through the loop, + # Turn on mutation in C.__eq__. The first time thru the loop, # under the iterkeys() business the first comparison will delete # the last item iterkeys() would see, and that causes a # RuntimeError: dictionary changed size during iteration @@ -1695,18 +1422,10 @@ class MappingTestCase(TestBase): self.assertEqual(len(d), 0) self.assertEqual(count, 2) - def test_make_weak_valued_dict_repr(self): - dict = weakref.WeakValueDictionary() - self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>') - - def test_make_weak_keyed_dict_repr(self): - dict = weakref.WeakKeyDictionary() - self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>') - def test_threaded_weak_valued_setdefault(self): d = weakref.WeakValueDictionary() with collect_in_thread(): - for i in range(100000): + for i in range(50000): x = d.setdefault(10, RefCycle()) self.assertIsNot(x, None) # we never put None in there! del x @@ -1714,7 +1433,7 @@ class MappingTestCase(TestBase): def test_threaded_weak_valued_pop(self): d = weakref.WeakValueDictionary() with collect_in_thread(): - for i in range(100000): + for i in range(50000): d[10] = RefCycle() x = d.pop(10, 10) self.assertIsNot(x, None) # we never put None in there! @@ -1731,92 +1450,6 @@ class MappingTestCase(TestBase): self.assertEqual(len(d), 1) o = None # lose ref - def check_threaded_weak_dict_copy(self, type_, deepcopy): - # `type_` should be either WeakKeyDictionary or WeakValueDictionary. - # `deepcopy` should be either True or False. - exc = [] - - class DummyKey: - def __init__(self, ctr): - self.ctr = ctr - - class DummyValue: - def __init__(self, ctr): - self.ctr = ctr - - def dict_copy(d, exc): - try: - if deepcopy is True: - _ = copy.deepcopy(d) - else: - _ = d.copy() - except Exception as ex: - exc.append(ex) - - def pop_and_collect(lst): - gc_ctr = 0 - while lst: - i = random.randint(0, len(lst) - 1) - gc_ctr += 1 - lst.pop(i) - if gc_ctr % 10000 == 0: - gc.collect() # just in case - - self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary)) - - d = type_() - keys = [] - values = [] - # Initialize d with many entries - for i in range(70000): - k, v = DummyKey(i), DummyValue(i) - keys.append(k) - values.append(v) - d[k] = v - del k - del v - - t_copy = threading.Thread(target=dict_copy, args=(d, exc,)) - if type_ is weakref.WeakKeyDictionary: - t_collect = threading.Thread(target=pop_and_collect, args=(keys,)) - else: # weakref.WeakValueDictionary - t_collect = threading.Thread(target=pop_and_collect, args=(values,)) - - t_copy.start() - t_collect.start() - - t_copy.join() - t_collect.join() - - # Test exceptions - if exc: - raise exc[0] - - def test_threaded_weak_key_dict_copy(self): - # Issue #35615: Weakref keys or values getting GC'ed during dict - # copying should not result in a crash. - self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False) - - def test_threaded_weak_key_dict_deepcopy(self): - # Issue #35615: Weakref keys or values getting GC'ed during dict - # copying should not result in a crash. - self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True) - - def test_threaded_weak_value_dict_copy(self): - # Issue #35615: Weakref keys or values getting GC'ed during dict - # copying should not result in a crash. - self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False) - - def test_threaded_weak_value_dict_deepcopy(self): - # Issue #35615: Weakref keys or values getting GC'ed during dict - # copying should not result in a crash. - self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True) - - @support.cpython_only - def test_remove_closure(self): - d = weakref.WeakValueDictionary() - self.assertIsNone(d._remove.__closure__) - from test import mapping_tests @@ -1834,170 +1467,6 @@ class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): def _reference(self): return self.__ref.copy() - -class FinalizeTestCase(unittest.TestCase): - - class A: - pass - - def _collect_if_necessary(self): - # we create no ref-cycles so in CPython no gc should be needed - if sys.implementation.name != 'cpython': - support.gc_collect() - - def test_finalize(self): - def add(x,y,z): - res.append(x + y + z) - return x + y + z - - a = self.A() - - res = [] - f = weakref.finalize(a, add, 67, 43, z=89) - self.assertEqual(f.alive, True) - self.assertEqual(f.peek(), (a, add, (67,43), {'z':89})) - self.assertEqual(f(), 199) - self.assertEqual(f(), None) - self.assertEqual(f(), None) - self.assertEqual(f.peek(), None) - self.assertEqual(f.detach(), None) - self.assertEqual(f.alive, False) - self.assertEqual(res, [199]) - - res = [] - f = weakref.finalize(a, add, 67, 43, 89) - self.assertEqual(f.peek(), (a, add, (67,43,89), {})) - self.assertEqual(f.detach(), (a, add, (67,43,89), {})) - self.assertEqual(f(), None) - self.assertEqual(f(), None) - self.assertEqual(f.peek(), None) - self.assertEqual(f.detach(), None) - self.assertEqual(f.alive, False) - self.assertEqual(res, []) - - res = [] - f = weakref.finalize(a, add, x=67, y=43, z=89) - del a - self._collect_if_necessary() - self.assertEqual(f(), None) - self.assertEqual(f(), None) - self.assertEqual(f.peek(), None) - self.assertEqual(f.detach(), None) - self.assertEqual(f.alive, False) - self.assertEqual(res, [199]) - - def test_arg_errors(self): - def fin(*args, **kwargs): - res.append((args, kwargs)) - - a = self.A() - - res = [] - f = weakref.finalize(a, fin, 1, 2, func=3, obj=4) - self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4})) - f() - self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})]) - - with self.assertRaises(TypeError): - weakref.finalize(a, func=fin, arg=1) - with self.assertRaises(TypeError): - weakref.finalize(obj=a, func=fin, arg=1) - self.assertRaises(TypeError, weakref.finalize, a) - self.assertRaises(TypeError, weakref.finalize) - - def test_order(self): - a = self.A() - res = [] - - f1 = weakref.finalize(a, res.append, 'f1') - f2 = weakref.finalize(a, res.append, 'f2') - f3 = weakref.finalize(a, res.append, 'f3') - f4 = weakref.finalize(a, res.append, 'f4') - f5 = weakref.finalize(a, res.append, 'f5') - - # make sure finalizers can keep themselves alive - del f1, f4 - - self.assertTrue(f2.alive) - self.assertTrue(f3.alive) - self.assertTrue(f5.alive) - - self.assertTrue(f5.detach()) - self.assertFalse(f5.alive) - - f5() # nothing because previously unregistered - res.append('A') - f3() # => res.append('f3') - self.assertFalse(f3.alive) - res.append('B') - f3() # nothing because previously called - res.append('C') - del a - self._collect_if_necessary() - # => res.append('f4') - # => res.append('f2') - # => res.append('f1') - self.assertFalse(f2.alive) - res.append('D') - f2() # nothing because previously called by gc - - expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D'] - self.assertEqual(res, expected) - - def test_all_freed(self): - # we want a weakrefable subclass of weakref.finalize - class MyFinalizer(weakref.finalize): - pass - - a = self.A() - res = [] - def callback(): - res.append(123) - f = MyFinalizer(a, callback) - - wr_callback = weakref.ref(callback) - wr_f = weakref.ref(f) - del callback, f - - self.assertIsNotNone(wr_callback()) - self.assertIsNotNone(wr_f()) - - del a - self._collect_if_necessary() - - self.assertIsNone(wr_callback()) - self.assertIsNone(wr_f()) - self.assertEqual(res, [123]) - - @classmethod - def run_in_child(cls): - def error(): - # Create an atexit finalizer from inside a finalizer called - # at exit. This should be the next to be run. - g1 = weakref.finalize(cls, print, 'g1') - print('f3 error') - 1/0 - - # cls should stay alive till atexit callbacks run - f1 = weakref.finalize(cls, print, 'f1', _global_var) - f2 = weakref.finalize(cls, print, 'f2', _global_var) - f3 = weakref.finalize(cls, error) - f4 = weakref.finalize(cls, print, 'f4', _global_var) - - assert f1.atexit == True - f2.atexit = False - assert f3.atexit == True - assert f4.atexit == True - - def test_atexit(self): - prog = ('from test.test_weakref import FinalizeTestCase;'+ - 'FinalizeTestCase.run_in_child()') - rc, out, err = script_helper.assert_python_ok('-c', prog) - out = out.decode('ascii').splitlines() - self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar']) - self.assertTrue(b'ZeroDivisionError' in err) - - libreftest = """ Doctest for examples in the library reference: weakref.rst >>> import weakref @@ -2006,7 +1475,7 @@ libreftest = """ Doctest for examples in the library reference: weakref.rst ... >>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable >>> r = weakref.ref(obj) ->>> print(r() is obj) +>>> print r() is obj True >>> import weakref @@ -2019,21 +1488,21 @@ True >>> o is o2 True >>> del o, o2 ->>> print(r()) +>>> print r() None >>> import weakref >>> class ExtendedRef(weakref.ref): ... def __init__(self, ob, callback=None, **annotations): -... super().__init__(ob, callback) +... super(ExtendedRef, self).__init__(ob, callback) ... self.__counter = 0 -... for k, v in annotations.items(): +... for k, v in annotations.iteritems(): ... setattr(self, k, v) ... def __call__(self): ... '''Return a pair containing the referent and the number of ... times the reference has been called. ... ''' -... ob = super().__call__() +... ob = super(ExtendedRef, self).__call__() ... if ob is not None: ... self.__counter += 1 ... ob = (ob, self.__counter) @@ -2074,9 +1543,9 @@ True >>> try: ... id2obj(a_id) ... except KeyError: -... print('OK') +... print 'OK' ... else: -... print('WeakValueDictionary error') +... print 'WeakValueDictionary error' OK """ @@ -2084,16 +1553,14 @@ OK __test__ = {'libreftest' : libreftest} def test_main(): - support.run_unittest( + test_support.run_unittest( ReferencesTestCase, - WeakMethodTestCase, MappingTestCase, WeakValueDictionaryTestCase, WeakKeyDictionaryTestCase, SubclassableWeakrefTestCase, - FinalizeTestCase, ) - support.run_doctest(sys.modules[__name__]) + test_support.run_doctest(sys.modules[__name__]) if __name__ == "__main__": |