diff options
Diffstat (limited to 'Lib/test/test_weakref.py')
| -rw-r--r-- | Lib/test/test_weakref.py | 299 |
1 files changed, 196 insertions, 103 deletions
diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py index 3d86cb7..571e33f 100644 --- a/Lib/test/test_weakref.py +++ b/Lib/test/test_weakref.py @@ -1,11 +1,13 @@ import gc import sys import unittest -import UserList +import collections import weakref import operator +import contextlib +import copy -from test import test_support +from test import support # Used in ReferencesTestCase.test_ref_created_during_del() . ref_from_del = None @@ -29,9 +31,6 @@ def create_function(): def create_bound_method(): return C().method -def create_unbound_method(): - return C.method - class Object: def __init__(self, arg): @@ -42,9 +41,9 @@ class Object: if isinstance(other, Object): return self.arg == other.arg return NotImplemented - def __ne__(self, other): + def __lt__(self, other): if isinstance(other, Object): - return self.arg != other.arg + return self.arg < other.arg return NotImplemented def __hash__(self): return hash(self.arg) @@ -69,7 +68,6 @@ class ReferencesTestCase(TestBase): self.check_basic_ref(C) self.check_basic_ref(create_function) self.check_basic_ref(create_bound_method) - self.check_basic_ref(create_unbound_method) # Just make sure the tp_repr handler doesn't raise an exception. # Live reference: @@ -84,7 +82,6 @@ class ReferencesTestCase(TestBase): self.check_basic_callback(C) self.check_basic_callback(create_function) self.check_basic_callback(create_bound_method) - self.check_basic_callback(create_unbound_method) def test_multiple_callbacks(self): o = C() @@ -126,10 +123,10 @@ class ReferencesTestCase(TestBase): def check(proxy): proxy.bar - self.assertRaises(weakref.ReferenceError, check, ref1) - self.assertRaises(weakref.ReferenceError, check, ref2) - self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C())) - self.assertTrue(self.cbcalled == 2) + self.assertRaises(ReferenceError, check, ref1) + self.assertRaises(ReferenceError, check, ref2) + self.assertRaises(ReferenceError, bool, weakref.proxy(C())) + self.assertEqual(self.cbcalled, 2) def check_basic_ref(self, factory): o = factory() @@ -184,42 +181,40 @@ class ReferencesTestCase(TestBase): o = C() self.check_proxy(o, weakref.proxy(o)) - L = UserList.UserList() + L = collections.UserList() p = weakref.proxy(L) self.assertFalse(p, "proxy for empty UserList should be false") p.append(12) self.assertEqual(len(L), 1) self.assertTrue(p, "proxy for non-empty UserList should be true") - with test_support.check_py3k_warnings(): - p[:] = [2, 3] + p[:] = [2, 3] self.assertEqual(len(L), 2) self.assertEqual(len(p), 2) self.assertIn(3, p, "proxy didn't support __contains__() properly") p[1] = 5 self.assertEqual(L[1], 5) self.assertEqual(p[1], 5) - L2 = UserList.UserList(L) + L2 = collections.UserList(L) p2 = weakref.proxy(L2) self.assertEqual(p, p2) ## self.assertEqual(repr(L2), repr(p2)) - L3 = UserList.UserList(range(10)) + L3 = collections.UserList(range(10)) p3 = weakref.proxy(L3) - with test_support.check_py3k_warnings(): - self.assertEqual(L3[:], p3[:]) - self.assertEqual(L3[5:], p3[5:]) - self.assertEqual(L3[:5], p3[:5]) - self.assertEqual(L3[2:5], p3[2:5]) + self.assertEqual(L3[:], p3[:]) + self.assertEqual(L3[5:], p3[5:]) + self.assertEqual(L3[:5], p3[:5]) + self.assertEqual(L3[2:5], p3[2:5]) def test_proxy_unicode(self): # See bug 5037 class C(object): def __str__(self): return "string" - def __unicode__(self): - return u"unicode" + def __bytes__(self): + return b"bytes" instance = C() - self.assertIn("__unicode__", dir(weakref.proxy(instance))) - self.assertEqual(unicode(weakref.proxy(instance)), u"unicode") + self.assertIn("__bytes__", dir(weakref.proxy(instance))) + self.assertEqual(bytes(weakref.proxy(instance)), b"bytes") def test_proxy_index(self): class C: @@ -707,12 +702,9 @@ class ReferencesTestCase(TestBase): gc.collect() def test_classes(self): - # Check that both old-style classes and new-style classes - # are weakrefable. + # Check that classes are weakrefable. class A(object): pass - class B: - pass l = [] weakref.ref(int) a = weakref.ref(A, l.append) @@ -720,11 +712,6 @@ class ReferencesTestCase(TestBase): gc.collect() self.assertEqual(a(), None) self.assertEqual(l, [a]) - b = weakref.ref(B, l.append) - B = None - gc.collect() - self.assertEqual(b(), None) - self.assertEqual(l, [a, b]) def test_equality(self): # Alive weakrefs defer equality testing to their underlying object. @@ -759,6 +746,21 @@ class ReferencesTestCase(TestBase): self.assertEqual(a == d, a is d) self.assertEqual(a != d, a is not d) + def test_ordering(self): + # weakrefs cannot be ordered, even if the underlying objects can. + ops = [operator.lt, operator.gt, operator.le, operator.ge] + x = Object(1) + y = Object(1) + a = weakref.ref(x) + b = weakref.ref(y) + for op in ops: + self.assertRaises(TypeError, op, a, b) + # Same when dead. + del x, y + gc.collect() + for op in ops: + self.assertRaises(TypeError, op, a, b) + def test_hashing(self): # Alive weakrefs hash the same as the underlying object x = Object(42) @@ -779,7 +781,7 @@ class ReferencesTestCase(TestBase): # deallocation chain, the trashcan mechanism could delay clearing # of the weakref and make the target object visible from outside # code even though its refcount had dropped to 0. A crash ensued. - class C(object): + class C: def __init__(self, parent): if not parent: return @@ -802,10 +804,10 @@ class SubclassableWeakrefTestCase(TestBase): class MyRef(weakref.ref): def __init__(self, ob, callback=None, value=42): self.value = value - super(MyRef, self).__init__(ob, callback) + super().__init__(ob, callback) def __call__(self): self.called = True - return super(MyRef, self).__call__() + return super().__call__() o = Object("foo") mr = MyRef(o, value=24) self.assertTrue(mr() is o) @@ -908,7 +910,7 @@ class MappingTestCase(TestBase): items = [RefCycle() for i in range(N)] dct = dict_type(cons(o) for o in items) # Keep an iterator alive - it = dct.iteritems() + it = dct.items() try: next(it) except StopIteration: @@ -940,7 +942,7 @@ class MappingTestCase(TestBase): dct = dict_type(cons(o) for o in items) del items # All items will be collected at next garbage collection pass - it = dct.iteritems() + it = dct.items() try: next(it) except StopIteration: @@ -965,23 +967,22 @@ class MappingTestCase(TestBase): # dict, objects = self.make_weak_valued_dict() for o in objects: - self.assertTrue(weakref.getweakrefcount(o) == 1, - "wrong number of weak references to %r!" % o) + self.assertEqual(weakref.getweakrefcount(o), 1) self.assertTrue(o is dict[o.arg], "wrong object returned by weak dict!") - items1 = dict.items() - items2 = dict.copy().items() + items1 = list(dict.items()) + items2 = list(dict.copy().items()) items1.sort() items2.sort() - self.assertTrue(items1 == items2, + self.assertEqual(items1, items2, "cloning of weak-valued dictionary did not work!") del items1, items2 - self.assertTrue(len(dict) == self.COUNT) + self.assertEqual(len(dict), self.COUNT) del objects[0] - self.assertTrue(len(dict) == (self.COUNT - 1), + self.assertEqual(len(dict), self.COUNT - 1, "deleting object did not cause dictionary update") del objects, o - self.assertTrue(len(dict) == 0, + self.assertEqual(len(dict), 0, "deleting the values did not clear the dictionary") # regression on SF bug #447152: dict = weakref.WeakValueDictionary() @@ -992,7 +993,7 @@ class MappingTestCase(TestBase): def test_weak_keys(self): # # This exercises d.copy(), d.items(), d[] = v, d[], del d[], - # len(d), in d. + # len(d), k in d. # dict, objects = self.make_weak_keyed_dict() for o in objects: @@ -1002,10 +1003,10 @@ class MappingTestCase(TestBase): "wrong object returned by weak dict!") items1 = dict.items() items2 = dict.copy().items() - self.assertTrue(set(items1) == set(items2), + self.assertEqual(set(items1), set(items2), "cloning of weak-keyed dictionary did not work!") del items1, items2 - self.assertTrue(len(dict) == self.COUNT) + self.assertEqual(len(dict), self.COUNT) del objects[0] self.assertTrue(len(dict) == (self.COUNT - 1), "deleting object did not cause dictionary update") @@ -1028,16 +1029,18 @@ class MappingTestCase(TestBase): for wr in refs: ob = wr() self.assertIn(ob, dict) + self.assertIn(ob, dict) self.assertEqual(ob.arg, dict[ob]) objects2.remove(ob) self.assertEqual(len(objects2), 0) # Test iterkeyrefs() objects2 = list(objects) - self.assertEqual(len(list(dict.iterkeyrefs())), len(objects)) - for wr in dict.iterkeyrefs(): + self.assertEqual(len(list(dict.keyrefs())), len(objects)) + for wr in dict.keyrefs(): ob = wr() self.assertIn(ob, dict) + self.assertIn(ob, dict) self.assertEqual(ob.arg, dict[ob]) objects2.remove(ob) self.assertEqual(len(objects2), 0) @@ -1069,51 +1072,143 @@ class MappingTestCase(TestBase): def check_iters(self, dict): # item iterator: - items = dict.items() - for item in dict.iteritems(): + items = list(dict.items()) + for item in dict.items(): items.remove(item) - self.assertTrue(len(items) == 0, "iteritems() did not touch all items") + self.assertFalse(items, "items() did not touch all items") # key iterator, via __iter__(): - keys = dict.keys() + keys = list(dict.keys()) for k in dict: keys.remove(k) - self.assertTrue(len(keys) == 0, "__iter__() did not touch all keys") + self.assertFalse(keys, "__iter__() did not touch all keys") # key iterator, via iterkeys(): - keys = dict.keys() - for k in dict.iterkeys(): + keys = list(dict.keys()) + for k in dict.keys(): keys.remove(k) - self.assertTrue(len(keys) == 0, "iterkeys() did not touch all keys") + self.assertFalse(keys, "iterkeys() did not touch all keys") # value iterator: - values = dict.values() - for v in dict.itervalues(): + values = list(dict.values()) + for v in dict.values(): values.remove(v) - self.assertTrue(len(values) == 0, + self.assertFalse(values, "itervalues() did not touch all values") + def check_weak_destroy_while_iterating(self, dict, objects, iter_name): + n = len(dict) + it = iter(getattr(dict, iter_name)()) + next(it) # Trigger internal iteration + # Destroy an object + del objects[-1] + gc.collect() # just in case + # We have removed either the first consumed object, or another one + self.assertIn(len(list(it)), [len(objects), len(objects) - 1]) + del it + # The removal has been committed + self.assertEqual(len(dict), n - 1) + + def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext): + # Check that we can explicitly mutate the weak dict without + # interfering with delayed removal. + # `testcontext` should create an iterator, destroy one of the + # weakref'ed objects and then return a new key/value pair corresponding + # to the destroyed object. + with testcontext() as (k, v): + self.assertNotIn(k, dict) + with testcontext() as (k, v): + self.assertRaises(KeyError, dict.__delitem__, k) + self.assertNotIn(k, dict) + with testcontext() as (k, v): + self.assertRaises(KeyError, dict.pop, k) + self.assertNotIn(k, dict) + with testcontext() as (k, v): + dict[k] = v + self.assertEqual(dict[k], v) + ddict = copy.copy(dict) + with testcontext() as (k, v): + dict.update(ddict) + self.assertEqual(dict, ddict) + with testcontext() as (k, v): + dict.clear() + self.assertEqual(len(dict), 0) + + def test_weak_keys_destroy_while_iterating(self): + # Issue #7105: iterators shouldn't crash when a key is implicitly removed + dict, objects = self.make_weak_keyed_dict() + self.check_weak_destroy_while_iterating(dict, objects, 'keys') + self.check_weak_destroy_while_iterating(dict, objects, 'items') + self.check_weak_destroy_while_iterating(dict, objects, 'values') + self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs') + dict, objects = self.make_weak_keyed_dict() + @contextlib.contextmanager + def testcontext(): + try: + it = iter(dict.items()) + next(it) + # Schedule a key/value for removal and recreate it + v = objects.pop().arg + gc.collect() # just in case + yield Object(v), v + finally: + it = None # should commit all removals + self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) + + def test_weak_values_destroy_while_iterating(self): + # Issue #7105: iterators shouldn't crash when a key is implicitly removed + dict, objects = self.make_weak_valued_dict() + self.check_weak_destroy_while_iterating(dict, objects, 'keys') + self.check_weak_destroy_while_iterating(dict, objects, 'items') + self.check_weak_destroy_while_iterating(dict, objects, 'values') + self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') + self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs') + dict, objects = self.make_weak_valued_dict() + @contextlib.contextmanager + def testcontext(): + try: + it = iter(dict.items()) + next(it) + # Schedule a key/value for removal and recreate it + k = objects.pop().arg + gc.collect() # just in case + yield k, Object(k) + finally: + it = None # should commit all removals + self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) + def test_make_weak_keyed_dict_from_dict(self): o = Object(3) dict = weakref.WeakKeyDictionary({o:364}) - self.assertTrue(dict[o] == 364) + self.assertEqual(dict[o], 364) def test_make_weak_keyed_dict_from_weak_keyed_dict(self): o = Object(3) dict = weakref.WeakKeyDictionary({o:364}) dict2 = weakref.WeakKeyDictionary(dict) - self.assertTrue(dict[o] == 364) + self.assertEqual(dict[o], 364) def make_weak_keyed_dict(self): dict = weakref.WeakKeyDictionary() - objects = map(Object, range(self.COUNT)) + objects = list(map(Object, range(self.COUNT))) for o in objects: dict[o] = o.arg return dict, objects + def test_make_weak_valued_dict_from_dict(self): + o = Object(3) + dict = weakref.WeakValueDictionary({364:o}) + self.assertEqual(dict[364], o) + + def test_make_weak_valued_dict_from_weak_valued_dict(self): + o = Object(3) + dict = weakref.WeakValueDictionary({364:o}) + dict2 = weakref.WeakValueDictionary(dict) + self.assertEqual(dict[364], o) + def make_weak_valued_dict(self): dict = weakref.WeakValueDictionary() - objects = map(Object, range(self.COUNT)) + objects = list(map(Object, range(self.COUNT))) for o in objects: dict[o.arg] = o return dict, objects @@ -1122,15 +1217,15 @@ class MappingTestCase(TestBase): weakdict = klass() weakdict[key1] = value1 weakdict[key2] = value2 - self.assertTrue(len(weakdict) == 2) + self.assertEqual(len(weakdict), 2) k, v = weakdict.popitem() - self.assertTrue(len(weakdict) == 1) + self.assertEqual(len(weakdict), 1) if k is key1: self.assertTrue(v is value1) else: self.assertTrue(v is value2) k, v = weakdict.popitem() - self.assertTrue(len(weakdict) == 0) + self.assertEqual(len(weakdict), 0) if k is key1: self.assertTrue(v is value1) else: @@ -1150,16 +1245,16 @@ class MappingTestCase(TestBase): " -- value parameters must be distinct objects") weakdict = klass() o = weakdict.setdefault(key, value1) - self.assertIs(o, value1) + self.assertTrue(o is value1) self.assertIn(key, weakdict) - self.assertIs(weakdict.get(key), value1) - self.assertIs(weakdict[key], value1) + self.assertTrue(weakdict.get(key) is value1) + self.assertTrue(weakdict[key] is value1) o = weakdict.setdefault(key, value2) - self.assertIs(o, value1) + self.assertTrue(o is value1) self.assertIn(key, weakdict) - self.assertIs(weakdict.get(key), value1) - self.assertIs(weakdict[key], value1) + self.assertTrue(weakdict.get(key) is value1) + self.assertTrue(weakdict[key] is value1) def test_weak_valued_dict_setdefault(self): self.check_setdefault(weakref.WeakValueDictionary, @@ -1171,24 +1266,22 @@ class MappingTestCase(TestBase): def check_update(self, klass, dict): # - # This exercises d.update(), len(d), d.keys(), in d, + # This exercises d.update(), len(d), d.keys(), k in d, # d.get(), d[]. # weakdict = klass() weakdict.update(dict) self.assertEqual(len(weakdict), len(dict)) for k in weakdict.keys(): - self.assertIn(k, dict, - "mysterious new key appeared in weak dict") + self.assertIn(k, dict, "mysterious new key appeared in weak dict") v = dict.get(k) - self.assertIs(v, weakdict[k]) - self.assertIs(v, weakdict.get(k)) + self.assertTrue(v is weakdict[k]) + self.assertTrue(v is weakdict.get(k)) for k in dict.keys(): - self.assertIn(k, weakdict, - "original key disappeared in weak dict") + self.assertIn(k, weakdict, "original key disappeared in weak dict") v = dict[k] - self.assertIs(v, weakdict[k]) - self.assertIs(v, weakdict.get(k)) + self.assertTrue(v is weakdict[k]) + self.assertTrue(v is weakdict.get(k)) def test_weak_valued_dict_update(self): self.check_update(weakref.WeakValueDictionary, @@ -1204,10 +1297,10 @@ class MappingTestCase(TestBase): o2 = Object('2') d[o1] = 'something' d[o2] = 'something' - self.assertTrue(len(d) == 2) + self.assertEqual(len(d), 2) del d[o1] - self.assertTrue(len(d) == 1) - self.assertTrue(d.keys() == [o2]) + self.assertEqual(len(d), 1) + self.assertEqual(list(d.keys()), [o2]) def test_weak_valued_delitem(self): d = weakref.WeakValueDictionary() @@ -1215,10 +1308,10 @@ class MappingTestCase(TestBase): o2 = Object('2') d['something'] = o1 d['something else'] = o2 - self.assertTrue(len(d) == 2) + self.assertEqual(len(d), 2) del d['something'] - self.assertTrue(len(d) == 1) - self.assertTrue(d.items() == [('something else', o2)]) + self.assertEqual(len(d), 1) + self.assertTrue(list(d.items()) == [('something else', o2)]) def test_weak_keyed_bad_delitem(self): d = weakref.WeakKeyDictionary() @@ -1259,7 +1352,7 @@ class MappingTestCase(TestBase): d[o] = o.value del o # now the only strong references to keys are in objs # Find the order in which iterkeys sees the keys. - objs = d.keys() + objs = list(d.keys()) # Reverse it, so that the iteration implementation of __delitem__ # has to keep looping to find the first object we delete. objs.reverse() @@ -1303,7 +1396,7 @@ libreftest = """ Doctest for examples in the library reference: weakref.rst ... >>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable >>> r = weakref.ref(obj) ->>> print r() is obj +>>> print(r() is obj) True >>> import weakref @@ -1316,21 +1409,21 @@ True >>> o is o2 True >>> del o, o2 ->>> print r() +>>> print(r()) None >>> import weakref >>> class ExtendedRef(weakref.ref): ... def __init__(self, ob, callback=None, **annotations): -... super(ExtendedRef, self).__init__(ob, callback) +... super().__init__(ob, callback) ... self.__counter = 0 -... for k, v in annotations.iteritems(): +... for k, v in annotations.items(): ... setattr(self, k, v) ... def __call__(self): ... '''Return a pair containing the referent and the number of ... times the reference has been called. ... ''' -... ob = super(ExtendedRef, self).__call__() +... ob = super().__call__() ... if ob is not None: ... self.__counter += 1 ... ob = (ob, self.__counter) @@ -1371,9 +1464,9 @@ True >>> try: ... id2obj(a_id) ... except KeyError: -... print 'OK' +... print('OK') ... else: -... print 'WeakValueDictionary error' +... print('WeakValueDictionary error') OK """ @@ -1381,14 +1474,14 @@ OK __test__ = {'libreftest' : libreftest} def test_main(): - test_support.run_unittest( + support.run_unittest( ReferencesTestCase, MappingTestCase, WeakValueDictionaryTestCase, WeakKeyDictionaryTestCase, SubclassableWeakrefTestCase, ) - test_support.run_doctest(sys.modules[__name__]) + support.run_doctest(sys.modules[__name__]) if __name__ == "__main__": |
