summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_weakref.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_weakref.py')
-rw-r--r--Lib/test/test_weakref.py299
1 files changed, 196 insertions, 103 deletions
diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py
index 3d86cb7..571e33f 100644
--- a/Lib/test/test_weakref.py
+++ b/Lib/test/test_weakref.py
@@ -1,11 +1,13 @@
import gc
import sys
import unittest
-import UserList
+import collections
import weakref
import operator
+import contextlib
+import copy
-from test import test_support
+from test import support
# Used in ReferencesTestCase.test_ref_created_during_del() .
ref_from_del = None
@@ -29,9 +31,6 @@ def create_function():
def create_bound_method():
return C().method
-def create_unbound_method():
- return C.method
-
class Object:
def __init__(self, arg):
@@ -42,9 +41,9 @@ class Object:
if isinstance(other, Object):
return self.arg == other.arg
return NotImplemented
- def __ne__(self, other):
+ def __lt__(self, other):
if isinstance(other, Object):
- return self.arg != other.arg
+ return self.arg < other.arg
return NotImplemented
def __hash__(self):
return hash(self.arg)
@@ -69,7 +68,6 @@ class ReferencesTestCase(TestBase):
self.check_basic_ref(C)
self.check_basic_ref(create_function)
self.check_basic_ref(create_bound_method)
- self.check_basic_ref(create_unbound_method)
# Just make sure the tp_repr handler doesn't raise an exception.
# Live reference:
@@ -84,7 +82,6 @@ class ReferencesTestCase(TestBase):
self.check_basic_callback(C)
self.check_basic_callback(create_function)
self.check_basic_callback(create_bound_method)
- self.check_basic_callback(create_unbound_method)
def test_multiple_callbacks(self):
o = C()
@@ -126,10 +123,10 @@ class ReferencesTestCase(TestBase):
def check(proxy):
proxy.bar
- self.assertRaises(weakref.ReferenceError, check, ref1)
- self.assertRaises(weakref.ReferenceError, check, ref2)
- self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C()))
- self.assertTrue(self.cbcalled == 2)
+ self.assertRaises(ReferenceError, check, ref1)
+ self.assertRaises(ReferenceError, check, ref2)
+ self.assertRaises(ReferenceError, bool, weakref.proxy(C()))
+ self.assertEqual(self.cbcalled, 2)
def check_basic_ref(self, factory):
o = factory()
@@ -184,42 +181,40 @@ class ReferencesTestCase(TestBase):
o = C()
self.check_proxy(o, weakref.proxy(o))
- L = UserList.UserList()
+ L = collections.UserList()
p = weakref.proxy(L)
self.assertFalse(p, "proxy for empty UserList should be false")
p.append(12)
self.assertEqual(len(L), 1)
self.assertTrue(p, "proxy for non-empty UserList should be true")
- with test_support.check_py3k_warnings():
- p[:] = [2, 3]
+ p[:] = [2, 3]
self.assertEqual(len(L), 2)
self.assertEqual(len(p), 2)
self.assertIn(3, p, "proxy didn't support __contains__() properly")
p[1] = 5
self.assertEqual(L[1], 5)
self.assertEqual(p[1], 5)
- L2 = UserList.UserList(L)
+ L2 = collections.UserList(L)
p2 = weakref.proxy(L2)
self.assertEqual(p, p2)
## self.assertEqual(repr(L2), repr(p2))
- L3 = UserList.UserList(range(10))
+ L3 = collections.UserList(range(10))
p3 = weakref.proxy(L3)
- with test_support.check_py3k_warnings():
- self.assertEqual(L3[:], p3[:])
- self.assertEqual(L3[5:], p3[5:])
- self.assertEqual(L3[:5], p3[:5])
- self.assertEqual(L3[2:5], p3[2:5])
+ self.assertEqual(L3[:], p3[:])
+ self.assertEqual(L3[5:], p3[5:])
+ self.assertEqual(L3[:5], p3[:5])
+ self.assertEqual(L3[2:5], p3[2:5])
def test_proxy_unicode(self):
# See bug 5037
class C(object):
def __str__(self):
return "string"
- def __unicode__(self):
- return u"unicode"
+ def __bytes__(self):
+ return b"bytes"
instance = C()
- self.assertIn("__unicode__", dir(weakref.proxy(instance)))
- self.assertEqual(unicode(weakref.proxy(instance)), u"unicode")
+ self.assertIn("__bytes__", dir(weakref.proxy(instance)))
+ self.assertEqual(bytes(weakref.proxy(instance)), b"bytes")
def test_proxy_index(self):
class C:
@@ -707,12 +702,9 @@ class ReferencesTestCase(TestBase):
gc.collect()
def test_classes(self):
- # Check that both old-style classes and new-style classes
- # are weakrefable.
+ # Check that classes are weakrefable.
class A(object):
pass
- class B:
- pass
l = []
weakref.ref(int)
a = weakref.ref(A, l.append)
@@ -720,11 +712,6 @@ class ReferencesTestCase(TestBase):
gc.collect()
self.assertEqual(a(), None)
self.assertEqual(l, [a])
- b = weakref.ref(B, l.append)
- B = None
- gc.collect()
- self.assertEqual(b(), None)
- self.assertEqual(l, [a, b])
def test_equality(self):
# Alive weakrefs defer equality testing to their underlying object.
@@ -759,6 +746,21 @@ class ReferencesTestCase(TestBase):
self.assertEqual(a == d, a is d)
self.assertEqual(a != d, a is not d)
+ def test_ordering(self):
+ # weakrefs cannot be ordered, even if the underlying objects can.
+ ops = [operator.lt, operator.gt, operator.le, operator.ge]
+ x = Object(1)
+ y = Object(1)
+ a = weakref.ref(x)
+ b = weakref.ref(y)
+ for op in ops:
+ self.assertRaises(TypeError, op, a, b)
+ # Same when dead.
+ del x, y
+ gc.collect()
+ for op in ops:
+ self.assertRaises(TypeError, op, a, b)
+
def test_hashing(self):
# Alive weakrefs hash the same as the underlying object
x = Object(42)
@@ -779,7 +781,7 @@ class ReferencesTestCase(TestBase):
# deallocation chain, the trashcan mechanism could delay clearing
# of the weakref and make the target object visible from outside
# code even though its refcount had dropped to 0. A crash ensued.
- class C(object):
+ class C:
def __init__(self, parent):
if not parent:
return
@@ -802,10 +804,10 @@ class SubclassableWeakrefTestCase(TestBase):
class MyRef(weakref.ref):
def __init__(self, ob, callback=None, value=42):
self.value = value
- super(MyRef, self).__init__(ob, callback)
+ super().__init__(ob, callback)
def __call__(self):
self.called = True
- return super(MyRef, self).__call__()
+ return super().__call__()
o = Object("foo")
mr = MyRef(o, value=24)
self.assertTrue(mr() is o)
@@ -908,7 +910,7 @@ class MappingTestCase(TestBase):
items = [RefCycle() for i in range(N)]
dct = dict_type(cons(o) for o in items)
# Keep an iterator alive
- it = dct.iteritems()
+ it = dct.items()
try:
next(it)
except StopIteration:
@@ -940,7 +942,7 @@ class MappingTestCase(TestBase):
dct = dict_type(cons(o) for o in items)
del items
# All items will be collected at next garbage collection pass
- it = dct.iteritems()
+ it = dct.items()
try:
next(it)
except StopIteration:
@@ -965,23 +967,22 @@ class MappingTestCase(TestBase):
#
dict, objects = self.make_weak_valued_dict()
for o in objects:
- self.assertTrue(weakref.getweakrefcount(o) == 1,
- "wrong number of weak references to %r!" % o)
+ self.assertEqual(weakref.getweakrefcount(o), 1)
self.assertTrue(o is dict[o.arg],
"wrong object returned by weak dict!")
- items1 = dict.items()
- items2 = dict.copy().items()
+ items1 = list(dict.items())
+ items2 = list(dict.copy().items())
items1.sort()
items2.sort()
- self.assertTrue(items1 == items2,
+ self.assertEqual(items1, items2,
"cloning of weak-valued dictionary did not work!")
del items1, items2
- self.assertTrue(len(dict) == self.COUNT)
+ self.assertEqual(len(dict), self.COUNT)
del objects[0]
- self.assertTrue(len(dict) == (self.COUNT - 1),
+ self.assertEqual(len(dict), self.COUNT - 1,
"deleting object did not cause dictionary update")
del objects, o
- self.assertTrue(len(dict) == 0,
+ self.assertEqual(len(dict), 0,
"deleting the values did not clear the dictionary")
# regression on SF bug #447152:
dict = weakref.WeakValueDictionary()
@@ -992,7 +993,7 @@ class MappingTestCase(TestBase):
def test_weak_keys(self):
#
# This exercises d.copy(), d.items(), d[] = v, d[], del d[],
- # len(d), in d.
+ # len(d), k in d.
#
dict, objects = self.make_weak_keyed_dict()
for o in objects:
@@ -1002,10 +1003,10 @@ class MappingTestCase(TestBase):
"wrong object returned by weak dict!")
items1 = dict.items()
items2 = dict.copy().items()
- self.assertTrue(set(items1) == set(items2),
+ self.assertEqual(set(items1), set(items2),
"cloning of weak-keyed dictionary did not work!")
del items1, items2
- self.assertTrue(len(dict) == self.COUNT)
+ self.assertEqual(len(dict), self.COUNT)
del objects[0]
self.assertTrue(len(dict) == (self.COUNT - 1),
"deleting object did not cause dictionary update")
@@ -1028,16 +1029,18 @@ class MappingTestCase(TestBase):
for wr in refs:
ob = wr()
self.assertIn(ob, dict)
+ self.assertIn(ob, dict)
self.assertEqual(ob.arg, dict[ob])
objects2.remove(ob)
self.assertEqual(len(objects2), 0)
# Test iterkeyrefs()
objects2 = list(objects)
- self.assertEqual(len(list(dict.iterkeyrefs())), len(objects))
- for wr in dict.iterkeyrefs():
+ self.assertEqual(len(list(dict.keyrefs())), len(objects))
+ for wr in dict.keyrefs():
ob = wr()
self.assertIn(ob, dict)
+ self.assertIn(ob, dict)
self.assertEqual(ob.arg, dict[ob])
objects2.remove(ob)
self.assertEqual(len(objects2), 0)
@@ -1069,51 +1072,143 @@ class MappingTestCase(TestBase):
def check_iters(self, dict):
# item iterator:
- items = dict.items()
- for item in dict.iteritems():
+ items = list(dict.items())
+ for item in dict.items():
items.remove(item)
- self.assertTrue(len(items) == 0, "iteritems() did not touch all items")
+ self.assertFalse(items, "items() did not touch all items")
# key iterator, via __iter__():
- keys = dict.keys()
+ keys = list(dict.keys())
for k in dict:
keys.remove(k)
- self.assertTrue(len(keys) == 0, "__iter__() did not touch all keys")
+ self.assertFalse(keys, "__iter__() did not touch all keys")
# key iterator, via iterkeys():
- keys = dict.keys()
- for k in dict.iterkeys():
+ keys = list(dict.keys())
+ for k in dict.keys():
keys.remove(k)
- self.assertTrue(len(keys) == 0, "iterkeys() did not touch all keys")
+ self.assertFalse(keys, "iterkeys() did not touch all keys")
# value iterator:
- values = dict.values()
- for v in dict.itervalues():
+ values = list(dict.values())
+ for v in dict.values():
values.remove(v)
- self.assertTrue(len(values) == 0,
+ self.assertFalse(values,
"itervalues() did not touch all values")
+ def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
+ n = len(dict)
+ it = iter(getattr(dict, iter_name)())
+ next(it) # Trigger internal iteration
+ # Destroy an object
+ del objects[-1]
+ gc.collect() # just in case
+ # We have removed either the first consumed object, or another one
+ self.assertIn(len(list(it)), [len(objects), len(objects) - 1])
+ del it
+ # The removal has been committed
+ self.assertEqual(len(dict), n - 1)
+
+ def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext):
+ # Check that we can explicitly mutate the weak dict without
+ # interfering with delayed removal.
+ # `testcontext` should create an iterator, destroy one of the
+ # weakref'ed objects and then return a new key/value pair corresponding
+ # to the destroyed object.
+ with testcontext() as (k, v):
+ self.assertNotIn(k, dict)
+ with testcontext() as (k, v):
+ self.assertRaises(KeyError, dict.__delitem__, k)
+ self.assertNotIn(k, dict)
+ with testcontext() as (k, v):
+ self.assertRaises(KeyError, dict.pop, k)
+ self.assertNotIn(k, dict)
+ with testcontext() as (k, v):
+ dict[k] = v
+ self.assertEqual(dict[k], v)
+ ddict = copy.copy(dict)
+ with testcontext() as (k, v):
+ dict.update(ddict)
+ self.assertEqual(dict, ddict)
+ with testcontext() as (k, v):
+ dict.clear()
+ self.assertEqual(len(dict), 0)
+
+ def test_weak_keys_destroy_while_iterating(self):
+ # Issue #7105: iterators shouldn't crash when a key is implicitly removed
+ dict, objects = self.make_weak_keyed_dict()
+ self.check_weak_destroy_while_iterating(dict, objects, 'keys')
+ self.check_weak_destroy_while_iterating(dict, objects, 'items')
+ self.check_weak_destroy_while_iterating(dict, objects, 'values')
+ self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
+ dict, objects = self.make_weak_keyed_dict()
+ @contextlib.contextmanager
+ def testcontext():
+ try:
+ it = iter(dict.items())
+ next(it)
+ # Schedule a key/value for removal and recreate it
+ v = objects.pop().arg
+ gc.collect() # just in case
+ yield Object(v), v
+ finally:
+ it = None # should commit all removals
+ self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
+
+ def test_weak_values_destroy_while_iterating(self):
+ # Issue #7105: iterators shouldn't crash when a key is implicitly removed
+ dict, objects = self.make_weak_valued_dict()
+ self.check_weak_destroy_while_iterating(dict, objects, 'keys')
+ self.check_weak_destroy_while_iterating(dict, objects, 'items')
+ self.check_weak_destroy_while_iterating(dict, objects, 'values')
+ self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
+ self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
+ dict, objects = self.make_weak_valued_dict()
+ @contextlib.contextmanager
+ def testcontext():
+ try:
+ it = iter(dict.items())
+ next(it)
+ # Schedule a key/value for removal and recreate it
+ k = objects.pop().arg
+ gc.collect() # just in case
+ yield k, Object(k)
+ finally:
+ it = None # should commit all removals
+ self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
+
def test_make_weak_keyed_dict_from_dict(self):
o = Object(3)
dict = weakref.WeakKeyDictionary({o:364})
- self.assertTrue(dict[o] == 364)
+ self.assertEqual(dict[o], 364)
def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
o = Object(3)
dict = weakref.WeakKeyDictionary({o:364})
dict2 = weakref.WeakKeyDictionary(dict)
- self.assertTrue(dict[o] == 364)
+ self.assertEqual(dict[o], 364)
def make_weak_keyed_dict(self):
dict = weakref.WeakKeyDictionary()
- objects = map(Object, range(self.COUNT))
+ objects = list(map(Object, range(self.COUNT)))
for o in objects:
dict[o] = o.arg
return dict, objects
+ def test_make_weak_valued_dict_from_dict(self):
+ o = Object(3)
+ dict = weakref.WeakValueDictionary({364:o})
+ self.assertEqual(dict[364], o)
+
+ def test_make_weak_valued_dict_from_weak_valued_dict(self):
+ o = Object(3)
+ dict = weakref.WeakValueDictionary({364:o})
+ dict2 = weakref.WeakValueDictionary(dict)
+ self.assertEqual(dict[364], o)
+
def make_weak_valued_dict(self):
dict = weakref.WeakValueDictionary()
- objects = map(Object, range(self.COUNT))
+ objects = list(map(Object, range(self.COUNT)))
for o in objects:
dict[o.arg] = o
return dict, objects
@@ -1122,15 +1217,15 @@ class MappingTestCase(TestBase):
weakdict = klass()
weakdict[key1] = value1
weakdict[key2] = value2
- self.assertTrue(len(weakdict) == 2)
+ self.assertEqual(len(weakdict), 2)
k, v = weakdict.popitem()
- self.assertTrue(len(weakdict) == 1)
+ self.assertEqual(len(weakdict), 1)
if k is key1:
self.assertTrue(v is value1)
else:
self.assertTrue(v is value2)
k, v = weakdict.popitem()
- self.assertTrue(len(weakdict) == 0)
+ self.assertEqual(len(weakdict), 0)
if k is key1:
self.assertTrue(v is value1)
else:
@@ -1150,16 +1245,16 @@ class MappingTestCase(TestBase):
" -- value parameters must be distinct objects")
weakdict = klass()
o = weakdict.setdefault(key, value1)
- self.assertIs(o, value1)
+ self.assertTrue(o is value1)
self.assertIn(key, weakdict)
- self.assertIs(weakdict.get(key), value1)
- self.assertIs(weakdict[key], value1)
+ self.assertTrue(weakdict.get(key) is value1)
+ self.assertTrue(weakdict[key] is value1)
o = weakdict.setdefault(key, value2)
- self.assertIs(o, value1)
+ self.assertTrue(o is value1)
self.assertIn(key, weakdict)
- self.assertIs(weakdict.get(key), value1)
- self.assertIs(weakdict[key], value1)
+ self.assertTrue(weakdict.get(key) is value1)
+ self.assertTrue(weakdict[key] is value1)
def test_weak_valued_dict_setdefault(self):
self.check_setdefault(weakref.WeakValueDictionary,
@@ -1171,24 +1266,22 @@ class MappingTestCase(TestBase):
def check_update(self, klass, dict):
#
- # This exercises d.update(), len(d), d.keys(), in d,
+ # This exercises d.update(), len(d), d.keys(), k in d,
# d.get(), d[].
#
weakdict = klass()
weakdict.update(dict)
self.assertEqual(len(weakdict), len(dict))
for k in weakdict.keys():
- self.assertIn(k, dict,
- "mysterious new key appeared in weak dict")
+ self.assertIn(k, dict, "mysterious new key appeared in weak dict")
v = dict.get(k)
- self.assertIs(v, weakdict[k])
- self.assertIs(v, weakdict.get(k))
+ self.assertTrue(v is weakdict[k])
+ self.assertTrue(v is weakdict.get(k))
for k in dict.keys():
- self.assertIn(k, weakdict,
- "original key disappeared in weak dict")
+ self.assertIn(k, weakdict, "original key disappeared in weak dict")
v = dict[k]
- self.assertIs(v, weakdict[k])
- self.assertIs(v, weakdict.get(k))
+ self.assertTrue(v is weakdict[k])
+ self.assertTrue(v is weakdict.get(k))
def test_weak_valued_dict_update(self):
self.check_update(weakref.WeakValueDictionary,
@@ -1204,10 +1297,10 @@ class MappingTestCase(TestBase):
o2 = Object('2')
d[o1] = 'something'
d[o2] = 'something'
- self.assertTrue(len(d) == 2)
+ self.assertEqual(len(d), 2)
del d[o1]
- self.assertTrue(len(d) == 1)
- self.assertTrue(d.keys() == [o2])
+ self.assertEqual(len(d), 1)
+ self.assertEqual(list(d.keys()), [o2])
def test_weak_valued_delitem(self):
d = weakref.WeakValueDictionary()
@@ -1215,10 +1308,10 @@ class MappingTestCase(TestBase):
o2 = Object('2')
d['something'] = o1
d['something else'] = o2
- self.assertTrue(len(d) == 2)
+ self.assertEqual(len(d), 2)
del d['something']
- self.assertTrue(len(d) == 1)
- self.assertTrue(d.items() == [('something else', o2)])
+ self.assertEqual(len(d), 1)
+ self.assertTrue(list(d.items()) == [('something else', o2)])
def test_weak_keyed_bad_delitem(self):
d = weakref.WeakKeyDictionary()
@@ -1259,7 +1352,7 @@ class MappingTestCase(TestBase):
d[o] = o.value
del o # now the only strong references to keys are in objs
# Find the order in which iterkeys sees the keys.
- objs = d.keys()
+ objs = list(d.keys())
# Reverse it, so that the iteration implementation of __delitem__
# has to keep looping to find the first object we delete.
objs.reverse()
@@ -1303,7 +1396,7 @@ libreftest = """ Doctest for examples in the library reference: weakref.rst
...
>>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable
>>> r = weakref.ref(obj)
->>> print r() is obj
+>>> print(r() is obj)
True
>>> import weakref
@@ -1316,21 +1409,21 @@ True
>>> o is o2
True
>>> del o, o2
->>> print r()
+>>> print(r())
None
>>> import weakref
>>> class ExtendedRef(weakref.ref):
... def __init__(self, ob, callback=None, **annotations):
-... super(ExtendedRef, self).__init__(ob, callback)
+... super().__init__(ob, callback)
... self.__counter = 0
-... for k, v in annotations.iteritems():
+... for k, v in annotations.items():
... setattr(self, k, v)
... def __call__(self):
... '''Return a pair containing the referent and the number of
... times the reference has been called.
... '''
-... ob = super(ExtendedRef, self).__call__()
+... ob = super().__call__()
... if ob is not None:
... self.__counter += 1
... ob = (ob, self.__counter)
@@ -1371,9 +1464,9 @@ True
>>> try:
... id2obj(a_id)
... except KeyError:
-... print 'OK'
+... print('OK')
... else:
-... print 'WeakValueDictionary error'
+... print('WeakValueDictionary error')
OK
"""
@@ -1381,14 +1474,14 @@ OK
__test__ = {'libreftest' : libreftest}
def test_main():
- test_support.run_unittest(
+ support.run_unittest(
ReferencesTestCase,
MappingTestCase,
WeakValueDictionaryTestCase,
WeakKeyDictionaryTestCase,
SubclassableWeakrefTestCase,
)
- test_support.run_doctest(sys.modules[__name__])
+ support.run_doctest(sys.modules[__name__])
if __name__ == "__main__":