summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_weakref.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_weakref.py')
-rw-r--r--Lib/test/test_weakref.py775
1 files changed, 121 insertions, 654 deletions
diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py
index 228bc17..418481d 100644
--- a/Lib/test/test_weakref.py
+++ b/Lib/test/test_weakref.py
@@ -1,25 +1,18 @@
import gc
import sys
import unittest
-import collections
+import UserList
import weakref
import operator
import contextlib
import copy
-import threading
import time
-import random
-from test import support
-from test.support import script_helper, ALWAYS_EQ
+from test import test_support
# Used in ReferencesTestCase.test_ref_created_during_del() .
ref_from_del = None
-# Used by FinalizeTestCase as a global that may be replaced by None
-# when the interpreter shuts down.
-_global_var = 'foobar'
-
class C:
def method(self):
pass
@@ -39,6 +32,9 @@ def create_function():
def create_bound_method():
return C().method
+def create_unbound_method():
+ return C.method
+
class Object:
def __init__(self, arg):
@@ -49,37 +45,24 @@ class Object:
if isinstance(other, Object):
return self.arg == other.arg
return NotImplemented
- def __lt__(self, other):
+ def __ne__(self, other):
if isinstance(other, Object):
- return self.arg < other.arg
+ return self.arg != other.arg
return NotImplemented
def __hash__(self):
return hash(self.arg)
- def some_method(self):
- return 4
- def other_method(self):
- return 5
-
class RefCycle:
def __init__(self):
self.cycle = self
-class TestBase(unittest.TestCase):
-
- def setUp(self):
- self.cbcalled = 0
-
- def callback(self, ref):
- self.cbcalled += 1
-
-
@contextlib.contextmanager
-def collect_in_thread(period=0.0001):
+def collect_in_thread(period=0.001):
"""
Ensure GC collections happen in a different thread, at a high frequency.
"""
+ threading = test_support.import_module('threading')
please_stop = False
def collect():
@@ -87,7 +70,9 @@ def collect_in_thread(period=0.0001):
time.sleep(period)
gc.collect()
- with support.disable_gc():
+ with test_support.disable_gc():
+ old_interval = sys.getcheckinterval()
+ sys.setcheckinterval(20)
t = threading.Thread(target=collect)
t.start()
try:
@@ -95,6 +80,16 @@ def collect_in_thread(period=0.0001):
finally:
please_stop = True
t.join()
+ sys.setcheckinterval(old_interval)
+
+
+class TestBase(unittest.TestCase):
+
+ def setUp(self):
+ self.cbcalled = 0
+
+ def callback(self, ref):
+ self.cbcalled += 1
class ReferencesTestCase(TestBase):
@@ -103,6 +98,7 @@ class ReferencesTestCase(TestBase):
self.check_basic_ref(C)
self.check_basic_ref(create_function)
self.check_basic_ref(create_bound_method)
+ self.check_basic_ref(create_unbound_method)
# Just make sure the tp_repr handler doesn't raise an exception.
# Live reference:
@@ -117,18 +113,7 @@ class ReferencesTestCase(TestBase):
self.check_basic_callback(C)
self.check_basic_callback(create_function)
self.check_basic_callback(create_bound_method)
-
- @support.cpython_only
- def test_cfunction(self):
- import _testcapi
- create_cfunction = _testcapi.create_cfunction
- f = create_cfunction()
- wr = weakref.ref(f)
- self.assertIs(wr(), f)
- del f
- self.assertIsNone(wr())
- self.check_basic_ref(create_cfunction)
- self.check_basic_callback(create_cfunction)
+ self.check_basic_callback(create_unbound_method)
def test_multiple_callbacks(self):
o = C()
@@ -172,9 +157,9 @@ class ReferencesTestCase(TestBase):
def check(proxy):
proxy.bar
- self.assertRaises(ReferenceError, check, ref1)
- self.assertRaises(ReferenceError, check, ref2)
- self.assertRaises(ReferenceError, bool, weakref.proxy(C()))
+ self.assertRaises(weakref.ReferenceError, check, ref1)
+ self.assertRaises(weakref.ReferenceError, check, ref2)
+ self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C()))
self.assertEqual(self.cbcalled, 2)
def check_basic_ref(self, factory):
@@ -230,40 +215,42 @@ class ReferencesTestCase(TestBase):
o = C()
self.check_proxy(o, weakref.proxy(o))
- L = collections.UserList()
+ L = UserList.UserList()
p = weakref.proxy(L)
self.assertFalse(p, "proxy for empty UserList should be false")
p.append(12)
self.assertEqual(len(L), 1)
self.assertTrue(p, "proxy for non-empty UserList should be true")
- p[:] = [2, 3]
+ with test_support.check_py3k_warnings():
+ p[:] = [2, 3]
self.assertEqual(len(L), 2)
self.assertEqual(len(p), 2)
self.assertIn(3, p, "proxy didn't support __contains__() properly")
p[1] = 5
self.assertEqual(L[1], 5)
self.assertEqual(p[1], 5)
- L2 = collections.UserList(L)
+ L2 = UserList.UserList(L)
p2 = weakref.proxy(L2)
self.assertEqual(p, p2)
## self.assertEqual(repr(L2), repr(p2))
- L3 = collections.UserList(range(10))
+ L3 = UserList.UserList(range(10))
p3 = weakref.proxy(L3)
- self.assertEqual(L3[:], p3[:])
- self.assertEqual(L3[5:], p3[5:])
- self.assertEqual(L3[:5], p3[:5])
- self.assertEqual(L3[2:5], p3[2:5])
+ with test_support.check_py3k_warnings():
+ self.assertEqual(L3[:], p3[:])
+ self.assertEqual(L3[5:], p3[5:])
+ self.assertEqual(L3[:5], p3[:5])
+ self.assertEqual(L3[2:5], p3[2:5])
def test_proxy_unicode(self):
# See bug 5037
class C(object):
def __str__(self):
return "string"
- def __bytes__(self):
- return b"bytes"
+ def __unicode__(self):
+ return u"unicode"
instance = C()
- self.assertIn("__bytes__", dir(weakref.proxy(instance)))
- self.assertEqual(bytes(weakref.proxy(instance)), b"bytes")
+ self.assertIn("__unicode__", dir(weakref.proxy(instance)))
+ self.assertEqual(unicode(weakref.proxy(instance)), u"unicode")
def test_proxy_index(self):
class C:
@@ -285,21 +272,6 @@ class ReferencesTestCase(TestBase):
p //= 5
self.assertEqual(p, 21)
- def test_proxy_matmul(self):
- class C:
- def __matmul__(self, other):
- return 1729
- def __rmatmul__(self, other):
- return -163
- def __imatmul__(self, other):
- return 561
- o = C()
- p = weakref.proxy(o)
- self.assertEqual(p @ 5, 1729)
- self.assertEqual(5 @ p, -163)
- p @= 5
- self.assertEqual(p, 561)
-
# The PyWeakref_* C API is documented as allowing either NULL or
# None as the value for the callback, where either means "no
# callback". The "no callback" ref and proxy objects are supposed
@@ -391,26 +363,6 @@ class ReferencesTestCase(TestBase):
lyst = List()
self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
- def test_proxy_iter(self):
- # Test fails with a debug build of the interpreter
- # (see bpo-38395).
-
- obj = None
-
- class MyObj:
- def __iter__(self):
- nonlocal obj
- del obj
- return NotImplemented
-
- obj = MyObj()
- p = weakref.proxy(obj)
- with self.assertRaises(TypeError):
- # "blech" in p calls MyObj.__iter__ through the proxy,
- # without keeping a reference to the real object, so it
- # can be killed in the middle of the call
- "blech" in p
-
def test_getweakrefcount(self):
o = C()
ref1 = weakref.ref(o)
@@ -649,7 +601,7 @@ class ReferencesTestCase(TestBase):
del c1, c2, C, D
gc.collect()
- @support.requires_type_collecting
+ @test_support.requires_type_collecting
def test_callback_in_cycle_resurrection(self):
import gc
@@ -786,9 +738,12 @@ class ReferencesTestCase(TestBase):
gc.collect()
def test_classes(self):
- # Check that classes are weakrefable.
+ # Check that both old-style classes and new-style classes
+ # are weakrefable.
class A(object):
pass
+ class B:
+ pass
l = []
weakref.ref(int)
a = weakref.ref(A, l.append)
@@ -796,6 +751,11 @@ class ReferencesTestCase(TestBase):
gc.collect()
self.assertEqual(a(), None)
self.assertEqual(l, [a])
+ b = weakref.ref(B, l.append)
+ B = None
+ gc.collect()
+ self.assertEqual(b(), None)
+ self.assertEqual(l, [a, b])
def test_equality(self):
# Alive weakrefs defer equality testing to their underlying object.
@@ -814,10 +774,6 @@ class ReferencesTestCase(TestBase):
self.assertTrue(a != c)
self.assertTrue(a == d)
self.assertFalse(a != d)
- self.assertFalse(a == x)
- self.assertTrue(a != x)
- self.assertTrue(a == ALWAYS_EQ)
- self.assertFalse(a != ALWAYS_EQ)
del x, y, z
gc.collect()
for r in a, b, c:
@@ -834,21 +790,6 @@ class ReferencesTestCase(TestBase):
self.assertEqual(a == d, a is d)
self.assertEqual(a != d, a is not d)
- def test_ordering(self):
- # weakrefs cannot be ordered, even if the underlying objects can.
- ops = [operator.lt, operator.gt, operator.le, operator.ge]
- x = Object(1)
- y = Object(1)
- a = weakref.ref(x)
- b = weakref.ref(y)
- for op in ops:
- self.assertRaises(TypeError, op, a, b)
- # Same when dead.
- del x, y
- gc.collect()
- for op in ops:
- self.assertRaises(TypeError, op, a, b)
-
def test_hashing(self):
# Alive weakrefs hash the same as the underlying object
x = Object(42)
@@ -869,7 +810,7 @@ class ReferencesTestCase(TestBase):
# deallocation chain, the trashcan mechanism could delay clearing
# of the weakref and make the target object visible from outside
# code even though its refcount had dropped to 0. A crash ensued.
- class C:
+ class C(object):
def __init__(self, parent):
if not parent:
return
@@ -885,38 +826,6 @@ class ReferencesTestCase(TestBase):
del root
gc.collect()
- def test_callback_attribute(self):
- x = Object(1)
- callback = lambda ref: None
- ref1 = weakref.ref(x, callback)
- self.assertIs(ref1.__callback__, callback)
-
- ref2 = weakref.ref(x)
- self.assertIsNone(ref2.__callback__)
-
- def test_callback_attribute_after_deletion(self):
- x = Object(1)
- ref = weakref.ref(x, self.callback)
- self.assertIsNotNone(ref.__callback__)
- del x
- support.gc_collect()
- self.assertIsNone(ref.__callback__)
-
- def test_set_callback_attribute(self):
- x = Object(1)
- callback = lambda ref: None
- ref1 = weakref.ref(x, callback)
- with self.assertRaises(AttributeError):
- ref1.__callback__ = lambda ref: None
-
- def test_callback_gcs(self):
- class ObjectWithDel(Object):
- def __del__(self): pass
- x = ObjectWithDel(1)
- ref1 = weakref.ref(x, lambda ref: support.gc_collect())
- del x
- support.gc_collect()
-
class SubclassableWeakrefTestCase(TestBase):
@@ -924,10 +833,10 @@ class SubclassableWeakrefTestCase(TestBase):
class MyRef(weakref.ref):
def __init__(self, ob, callback=None, value=42):
self.value = value
- super().__init__(ob, callback)
+ super(MyRef, self).__init__(ob, callback)
def __call__(self):
self.called = True
- return super().__call__()
+ return super(MyRef, self).__call__()
o = Object("foo")
mr = MyRef(o, value=24)
self.assertIs(mr(), o)
@@ -983,7 +892,7 @@ class SubclassableWeakrefTestCase(TestBase):
self.assertFalse(hasattr(r, "__dict__"))
def test_subclass_refs_with_cycle(self):
- """Confirm https://bugs.python.org/issue3100 is fixed."""
+ # Bug #3110
# An instance of a weakref subclass can have attributes.
# If such a weakref holds the only strong reference to the object,
# deleting the weakref will delete the object. In this case,
@@ -1021,143 +930,6 @@ class SubclassableWeakrefTestCase(TestBase):
self.assertEqual(self.cbcalled, 0)
-class WeakMethodTestCase(unittest.TestCase):
-
- def _subclass(self):
- """Return an Object subclass overriding `some_method`."""
- class C(Object):
- def some_method(self):
- return 6
- return C
-
- def test_alive(self):
- o = Object(1)
- r = weakref.WeakMethod(o.some_method)
- self.assertIsInstance(r, weakref.ReferenceType)
- self.assertIsInstance(r(), type(o.some_method))
- self.assertIs(r().__self__, o)
- self.assertIs(r().__func__, o.some_method.__func__)
- self.assertEqual(r()(), 4)
-
- def test_object_dead(self):
- o = Object(1)
- r = weakref.WeakMethod(o.some_method)
- del o
- gc.collect()
- self.assertIs(r(), None)
-
- def test_method_dead(self):
- C = self._subclass()
- o = C(1)
- r = weakref.WeakMethod(o.some_method)
- del C.some_method
- gc.collect()
- self.assertIs(r(), None)
-
- def test_callback_when_object_dead(self):
- # Test callback behaviour when object dies first.
- C = self._subclass()
- calls = []
- def cb(arg):
- calls.append(arg)
- o = C(1)
- r = weakref.WeakMethod(o.some_method, cb)
- del o
- gc.collect()
- self.assertEqual(calls, [r])
- # Callback is only called once.
- C.some_method = Object.some_method
- gc.collect()
- self.assertEqual(calls, [r])
-
- def test_callback_when_method_dead(self):
- # Test callback behaviour when method dies first.
- C = self._subclass()
- calls = []
- def cb(arg):
- calls.append(arg)
- o = C(1)
- r = weakref.WeakMethod(o.some_method, cb)
- del C.some_method
- gc.collect()
- self.assertEqual(calls, [r])
- # Callback is only called once.
- del o
- gc.collect()
- self.assertEqual(calls, [r])
-
- @support.cpython_only
- def test_no_cycles(self):
- # A WeakMethod doesn't create any reference cycle to itself.
- o = Object(1)
- def cb(_):
- pass
- r = weakref.WeakMethod(o.some_method, cb)
- wr = weakref.ref(r)
- del r
- self.assertIs(wr(), None)
-
- def test_equality(self):
- def _eq(a, b):
- self.assertTrue(a == b)
- self.assertFalse(a != b)
- def _ne(a, b):
- self.assertTrue(a != b)
- self.assertFalse(a == b)
- x = Object(1)
- y = Object(1)
- a = weakref.WeakMethod(x.some_method)
- b = weakref.WeakMethod(y.some_method)
- c = weakref.WeakMethod(x.other_method)
- d = weakref.WeakMethod(y.other_method)
- # Objects equal, same method
- _eq(a, b)
- _eq(c, d)
- # Objects equal, different method
- _ne(a, c)
- _ne(a, d)
- _ne(b, c)
- _ne(b, d)
- # Objects unequal, same or different method
- z = Object(2)
- e = weakref.WeakMethod(z.some_method)
- f = weakref.WeakMethod(z.other_method)
- _ne(a, e)
- _ne(a, f)
- _ne(b, e)
- _ne(b, f)
- # Compare with different types
- _ne(a, x.some_method)
- _eq(a, ALWAYS_EQ)
- del x, y, z
- gc.collect()
- # Dead WeakMethods compare by identity
- refs = a, b, c, d, e, f
- for q in refs:
- for r in refs:
- self.assertEqual(q == r, q is r)
- self.assertEqual(q != r, q is not r)
-
- def test_hashing(self):
- # Alive WeakMethods are hashable if the underlying object is
- # hashable.
- x = Object(1)
- y = Object(1)
- a = weakref.WeakMethod(x.some_method)
- b = weakref.WeakMethod(y.some_method)
- c = weakref.WeakMethod(y.other_method)
- # Since WeakMethod objects are equal, the hashes should be equal.
- self.assertEqual(hash(a), hash(b))
- ha = hash(a)
- # Dead WeakMethods retain their old hash value
- del x, y
- gc.collect()
- self.assertEqual(hash(a), ha)
- self.assertEqual(hash(b), ha)
- # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
- self.assertRaises(TypeError, hash, c)
-
-
class MappingTestCase(TestBase):
COUNT = 10
@@ -1165,9 +937,9 @@ class MappingTestCase(TestBase):
def check_len_cycles(self, dict_type, cons):
N = 20
items = [RefCycle() for i in range(N)]
- dct = dict_type(cons(o) for o in items)
+ dct = dict_type(cons(i, o) for i, o in enumerate(items))
# Keep an iterator alive
- it = dct.items()
+ it = dct.iteritems()
try:
next(it)
except StopIteration:
@@ -1175,18 +947,23 @@ class MappingTestCase(TestBase):
del items
gc.collect()
n1 = len(dct)
+ list(it)
del it
gc.collect()
n2 = len(dct)
- # one item may be kept alive inside the iterator
- self.assertIn(n1, (0, 1))
+ # iteration should prevent garbage collection here
+ # Note that this is a test on an implementation detail. The requirement
+ # is only to provide stable iteration, not that the size of the container
+ # stay fixed.
+ self.assertEqual(n1, 20)
+ #self.assertIn(n1, (0, 1))
self.assertEqual(n2, 0)
def test_weak_keyed_len_cycles(self):
- self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1))
+ self.check_len_cycles(weakref.WeakKeyDictionary, lambda n, k: (k, n))
def test_weak_valued_len_cycles(self):
- self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k))
+ self.check_len_cycles(weakref.WeakValueDictionary, lambda n, k: (n, k))
def check_len_race(self, dict_type, cons):
# Extended sanity checks for len() in the face of cyclic collection
@@ -1199,7 +976,7 @@ class MappingTestCase(TestBase):
dct = dict_type(cons(o) for o in items)
del items
# All items will be collected at next garbage collection pass
- it = dct.items()
+ it = dct.iteritems()
try:
next(it)
except StopIteration:
@@ -1224,11 +1001,12 @@ class MappingTestCase(TestBase):
#
dict, objects = self.make_weak_valued_dict()
for o in objects:
- self.assertEqual(weakref.getweakrefcount(o), 1)
+ self.assertEqual(weakref.getweakrefcount(o), 1,
+ "wrong number of weak references to %r!" % o)
self.assertIs(o, dict[o.arg],
"wrong object returned by weak dict!")
- items1 = list(dict.items())
- items2 = list(dict.copy().items())
+ items1 = dict.items()
+ items2 = dict.copy().items()
items1.sort()
items2.sort()
self.assertEqual(items1, items2,
@@ -1236,7 +1014,7 @@ class MappingTestCase(TestBase):
del items1, items2
self.assertEqual(len(dict), self.COUNT)
del objects[0]
- self.assertEqual(len(dict), self.COUNT - 1,
+ self.assertEqual(len(dict), (self.COUNT - 1),
"deleting object did not cause dictionary update")
del objects, o
self.assertEqual(len(dict), 0,
@@ -1250,7 +1028,7 @@ class MappingTestCase(TestBase):
def test_weak_keys(self):
#
# This exercises d.copy(), d.items(), d[] = v, d[], del d[],
- # len(d), k in d.
+ # len(d), in d.
#
dict, objects = self.make_weak_keyed_dict()
for o in objects:
@@ -1286,18 +1064,16 @@ class MappingTestCase(TestBase):
for wr in refs:
ob = wr()
self.assertIn(ob, dict)
- self.assertIn(ob, dict)
self.assertEqual(ob.arg, dict[ob])
objects2.remove(ob)
self.assertEqual(len(objects2), 0)
# Test iterkeyrefs()
objects2 = list(objects)
- self.assertEqual(len(list(dict.keyrefs())), len(objects))
- for wr in dict.keyrefs():
+ self.assertEqual(len(list(dict.iterkeyrefs())), len(objects))
+ for wr in dict.iterkeyrefs():
ob = wr()
self.assertIn(ob, dict)
- self.assertIn(ob, dict)
self.assertEqual(ob.arg, dict[ob])
objects2.remove(ob)
self.assertEqual(len(objects2), 0)
@@ -1329,28 +1105,28 @@ class MappingTestCase(TestBase):
def check_iters(self, dict):
# item iterator:
- items = list(dict.items())
- for item in dict.items():
+ items = dict.items()
+ for item in dict.iteritems():
items.remove(item)
- self.assertFalse(items, "items() did not touch all items")
+ self.assertEqual(len(items), 0, "iteritems() did not touch all items")
# key iterator, via __iter__():
- keys = list(dict.keys())
+ keys = dict.keys()
for k in dict:
keys.remove(k)
- self.assertFalse(keys, "__iter__() did not touch all keys")
+ self.assertEqual(len(keys), 0, "__iter__() did not touch all keys")
# key iterator, via iterkeys():
- keys = list(dict.keys())
- for k in dict.keys():
+ keys = dict.keys()
+ for k in dict.iterkeys():
keys.remove(k)
- self.assertFalse(keys, "iterkeys() did not touch all keys")
+ self.assertEqual(len(keys), 0, "iterkeys() did not touch all keys")
# value iterator:
- values = list(dict.values())
- for v in dict.values():
+ values = dict.values()
+ for v in dict.itervalues():
values.remove(v)
- self.assertFalse(values,
+ self.assertEqual(len(values), 0,
"itervalues() did not touch all values")
def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
@@ -1373,13 +1149,13 @@ class MappingTestCase(TestBase):
# weakref'ed objects and then return a new key/value pair corresponding
# to the destroyed object.
with testcontext() as (k, v):
- self.assertNotIn(k, dict)
+ self.assertFalse(k in dict)
with testcontext() as (k, v):
self.assertRaises(KeyError, dict.__delitem__, k)
- self.assertNotIn(k, dict)
+ self.assertFalse(k in dict)
with testcontext() as (k, v):
self.assertRaises(KeyError, dict.pop, k)
- self.assertNotIn(k, dict)
+ self.assertFalse(k in dict)
with testcontext() as (k, v):
dict[k] = v
self.assertEqual(dict[k], v)
@@ -1391,51 +1167,18 @@ class MappingTestCase(TestBase):
dict.clear()
self.assertEqual(len(dict), 0)
- def check_weak_del_and_len_while_iterating(self, dict, testcontext):
- # Check that len() works when both iterating and removing keys
- # explicitly through various means (.pop(), .clear()...), while
- # implicit mutation is deferred because an iterator is alive.
- # (each call to testcontext() should schedule one item for removal
- # for this test to work properly)
- o = Object(123456)
- with testcontext():
- n = len(dict)
- # Since underlaying dict is ordered, first item is popped
- dict.pop(next(dict.keys()))
- self.assertEqual(len(dict), n - 1)
- dict[o] = o
- self.assertEqual(len(dict), n)
- # last item in objects is removed from dict in context shutdown
- with testcontext():
- self.assertEqual(len(dict), n - 1)
- # Then, (o, o) is popped
- dict.popitem()
- self.assertEqual(len(dict), n - 2)
- with testcontext():
- self.assertEqual(len(dict), n - 3)
- del dict[next(dict.keys())]
- self.assertEqual(len(dict), n - 4)
- with testcontext():
- self.assertEqual(len(dict), n - 5)
- dict.popitem()
- self.assertEqual(len(dict), n - 6)
- with testcontext():
- dict.clear()
- self.assertEqual(len(dict), 0)
- self.assertEqual(len(dict), 0)
-
def test_weak_keys_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_keyed_dict()
- self.check_weak_destroy_while_iterating(dict, objects, 'keys')
- self.check_weak_destroy_while_iterating(dict, objects, 'items')
- self.check_weak_destroy_while_iterating(dict, objects, 'values')
- self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
+ self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
+ self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
+ self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
+ self.check_weak_destroy_while_iterating(dict, objects, 'iterkeyrefs')
dict, objects = self.make_weak_keyed_dict()
@contextlib.contextmanager
def testcontext():
try:
- it = iter(dict.items())
+ it = iter(dict.iteritems())
next(it)
# Schedule a key/value for removal and recreate it
v = objects.pop().arg
@@ -1445,24 +1188,19 @@ class MappingTestCase(TestBase):
it = None # should commit all removals
gc.collect()
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
- # Issue #21173: len() fragile when keys are both implicitly and
- # explicitly removed.
- dict, objects = self.make_weak_keyed_dict()
- self.check_weak_del_and_len_while_iterating(dict, testcontext)
def test_weak_values_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_valued_dict()
- self.check_weak_destroy_while_iterating(dict, objects, 'keys')
- self.check_weak_destroy_while_iterating(dict, objects, 'items')
- self.check_weak_destroy_while_iterating(dict, objects, 'values')
+ self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
+ self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
+ self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
- self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
dict, objects = self.make_weak_valued_dict()
@contextlib.contextmanager
def testcontext():
try:
- it = iter(dict.items())
+ it = iter(dict.iteritems())
next(it)
# Schedule a key/value for removal and recreate it
k = objects.pop().arg
@@ -1472,8 +1210,6 @@ class MappingTestCase(TestBase):
it = None # should commit all removals
gc.collect()
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
- dict, objects = self.make_weak_valued_dict()
- self.check_weak_del_and_len_while_iterating(dict, testcontext)
def test_make_weak_keyed_dict_from_dict(self):
o = Object(3)
@@ -1488,22 +1224,11 @@ class MappingTestCase(TestBase):
def make_weak_keyed_dict(self):
dict = weakref.WeakKeyDictionary()
- objects = list(map(Object, range(self.COUNT)))
+ objects = map(Object, range(self.COUNT))
for o in objects:
dict[o] = o.arg
return dict, objects
- def test_make_weak_valued_dict_from_dict(self):
- o = Object(3)
- dict = weakref.WeakValueDictionary({364:o})
- self.assertEqual(dict[364], o)
-
- def test_make_weak_valued_dict_from_weak_valued_dict(self):
- o = Object(3)
- dict = weakref.WeakValueDictionary({364:o})
- dict2 = weakref.WeakValueDictionary(dict)
- self.assertEqual(dict[364], o)
-
def test_make_weak_valued_dict_misc(self):
# errors
self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__)
@@ -1511,14 +1236,14 @@ class MappingTestCase(TestBase):
self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ())
# special keyword arguments
o = Object(3)
- for kw in 'self', 'dict', 'other', 'iterable':
+ for kw in 'self', 'other', 'iterable':
d = weakref.WeakValueDictionary(**{kw: o})
self.assertEqual(list(d.keys()), [kw])
self.assertEqual(d[kw], o)
def make_weak_valued_dict(self):
dict = weakref.WeakValueDictionary()
- objects = list(map(Object, range(self.COUNT)))
+ objects = map(Object, range(self.COUNT))
for o in objects:
dict[o.arg] = o
return dict, objects
@@ -1576,19 +1301,21 @@ class MappingTestCase(TestBase):
def check_update(self, klass, dict):
#
- # This exercises d.update(), len(d), d.keys(), k in d,
+ # This exercises d.update(), len(d), d.keys(), in d,
# d.get(), d[].
#
weakdict = klass()
weakdict.update(dict)
self.assertEqual(len(weakdict), len(dict))
for k in weakdict.keys():
- self.assertIn(k, dict, "mysterious new key appeared in weak dict")
+ self.assertIn(k, dict,
+ "mysterious new key appeared in weak dict")
v = dict.get(k)
self.assertIs(v, weakdict[k])
self.assertIs(v, weakdict.get(k))
for k in dict.keys():
- self.assertIn(k, weakdict, "original key disappeared in weak dict")
+ self.assertIn(k, weakdict,
+ "original key disappeared in weak dict")
v = dict[k]
self.assertIs(v, weakdict[k])
self.assertIs(v, weakdict.get(k))
@@ -1623,7 +1350,7 @@ class MappingTestCase(TestBase):
self.assertEqual(len(d), 2)
del d[o1]
self.assertEqual(len(d), 1)
- self.assertEqual(list(d.keys()), [o2])
+ self.assertEqual(d.keys(), [o2])
def test_weak_valued_delitem(self):
d = weakref.WeakValueDictionary()
@@ -1634,7 +1361,7 @@ class MappingTestCase(TestBase):
self.assertEqual(len(d), 2)
del d['something']
self.assertEqual(len(d), 1)
- self.assertEqual(list(d.items()), [('something else', o2)])
+ self.assertEqual(d.items(), [('something else', o2)])
def test_weak_keyed_bad_delitem(self):
d = weakref.WeakKeyDictionary()
@@ -1675,12 +1402,12 @@ class MappingTestCase(TestBase):
d[o] = o.value
del o # now the only strong references to keys are in objs
# Find the order in which iterkeys sees the keys.
- objs = list(d.keys())
+ objs = d.keys()
# Reverse it, so that the iteration implementation of __delitem__
# has to keep looping to find the first object we delete.
objs.reverse()
- # Turn on mutation in C.__eq__. The first time through the loop,
+ # Turn on mutation in C.__eq__. The first time thru the loop,
# under the iterkeys() business the first comparison will delete
# the last item iterkeys() would see, and that causes a
# RuntimeError: dictionary changed size during iteration
@@ -1695,18 +1422,10 @@ class MappingTestCase(TestBase):
self.assertEqual(len(d), 0)
self.assertEqual(count, 2)
- def test_make_weak_valued_dict_repr(self):
- dict = weakref.WeakValueDictionary()
- self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>')
-
- def test_make_weak_keyed_dict_repr(self):
- dict = weakref.WeakKeyDictionary()
- self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
-
def test_threaded_weak_valued_setdefault(self):
d = weakref.WeakValueDictionary()
with collect_in_thread():
- for i in range(100000):
+ for i in range(50000):
x = d.setdefault(10, RefCycle())
self.assertIsNot(x, None) # we never put None in there!
del x
@@ -1714,7 +1433,7 @@ class MappingTestCase(TestBase):
def test_threaded_weak_valued_pop(self):
d = weakref.WeakValueDictionary()
with collect_in_thread():
- for i in range(100000):
+ for i in range(50000):
d[10] = RefCycle()
x = d.pop(10, 10)
self.assertIsNot(x, None) # we never put None in there!
@@ -1731,92 +1450,6 @@ class MappingTestCase(TestBase):
self.assertEqual(len(d), 1)
o = None # lose ref
- def check_threaded_weak_dict_copy(self, type_, deepcopy):
- # `type_` should be either WeakKeyDictionary or WeakValueDictionary.
- # `deepcopy` should be either True or False.
- exc = []
-
- class DummyKey:
- def __init__(self, ctr):
- self.ctr = ctr
-
- class DummyValue:
- def __init__(self, ctr):
- self.ctr = ctr
-
- def dict_copy(d, exc):
- try:
- if deepcopy is True:
- _ = copy.deepcopy(d)
- else:
- _ = d.copy()
- except Exception as ex:
- exc.append(ex)
-
- def pop_and_collect(lst):
- gc_ctr = 0
- while lst:
- i = random.randint(0, len(lst) - 1)
- gc_ctr += 1
- lst.pop(i)
- if gc_ctr % 10000 == 0:
- gc.collect() # just in case
-
- self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary))
-
- d = type_()
- keys = []
- values = []
- # Initialize d with many entries
- for i in range(70000):
- k, v = DummyKey(i), DummyValue(i)
- keys.append(k)
- values.append(v)
- d[k] = v
- del k
- del v
-
- t_copy = threading.Thread(target=dict_copy, args=(d, exc,))
- if type_ is weakref.WeakKeyDictionary:
- t_collect = threading.Thread(target=pop_and_collect, args=(keys,))
- else: # weakref.WeakValueDictionary
- t_collect = threading.Thread(target=pop_and_collect, args=(values,))
-
- t_copy.start()
- t_collect.start()
-
- t_copy.join()
- t_collect.join()
-
- # Test exceptions
- if exc:
- raise exc[0]
-
- def test_threaded_weak_key_dict_copy(self):
- # Issue #35615: Weakref keys or values getting GC'ed during dict
- # copying should not result in a crash.
- self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False)
-
- def test_threaded_weak_key_dict_deepcopy(self):
- # Issue #35615: Weakref keys or values getting GC'ed during dict
- # copying should not result in a crash.
- self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True)
-
- def test_threaded_weak_value_dict_copy(self):
- # Issue #35615: Weakref keys or values getting GC'ed during dict
- # copying should not result in a crash.
- self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False)
-
- def test_threaded_weak_value_dict_deepcopy(self):
- # Issue #35615: Weakref keys or values getting GC'ed during dict
- # copying should not result in a crash.
- self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True)
-
- @support.cpython_only
- def test_remove_closure(self):
- d = weakref.WeakValueDictionary()
- self.assertIsNone(d._remove.__closure__)
-
from test import mapping_tests
@@ -1834,170 +1467,6 @@ class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
def _reference(self):
return self.__ref.copy()
-
-class FinalizeTestCase(unittest.TestCase):
-
- class A:
- pass
-
- def _collect_if_necessary(self):
- # we create no ref-cycles so in CPython no gc should be needed
- if sys.implementation.name != 'cpython':
- support.gc_collect()
-
- def test_finalize(self):
- def add(x,y,z):
- res.append(x + y + z)
- return x + y + z
-
- a = self.A()
-
- res = []
- f = weakref.finalize(a, add, 67, 43, z=89)
- self.assertEqual(f.alive, True)
- self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
- self.assertEqual(f(), 199)
- self.assertEqual(f(), None)
- self.assertEqual(f(), None)
- self.assertEqual(f.peek(), None)
- self.assertEqual(f.detach(), None)
- self.assertEqual(f.alive, False)
- self.assertEqual(res, [199])
-
- res = []
- f = weakref.finalize(a, add, 67, 43, 89)
- self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
- self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
- self.assertEqual(f(), None)
- self.assertEqual(f(), None)
- self.assertEqual(f.peek(), None)
- self.assertEqual(f.detach(), None)
- self.assertEqual(f.alive, False)
- self.assertEqual(res, [])
-
- res = []
- f = weakref.finalize(a, add, x=67, y=43, z=89)
- del a
- self._collect_if_necessary()
- self.assertEqual(f(), None)
- self.assertEqual(f(), None)
- self.assertEqual(f.peek(), None)
- self.assertEqual(f.detach(), None)
- self.assertEqual(f.alive, False)
- self.assertEqual(res, [199])
-
- def test_arg_errors(self):
- def fin(*args, **kwargs):
- res.append((args, kwargs))
-
- a = self.A()
-
- res = []
- f = weakref.finalize(a, fin, 1, 2, func=3, obj=4)
- self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4}))
- f()
- self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})])
-
- with self.assertRaises(TypeError):
- weakref.finalize(a, func=fin, arg=1)
- with self.assertRaises(TypeError):
- weakref.finalize(obj=a, func=fin, arg=1)
- self.assertRaises(TypeError, weakref.finalize, a)
- self.assertRaises(TypeError, weakref.finalize)
-
- def test_order(self):
- a = self.A()
- res = []
-
- f1 = weakref.finalize(a, res.append, 'f1')
- f2 = weakref.finalize(a, res.append, 'f2')
- f3 = weakref.finalize(a, res.append, 'f3')
- f4 = weakref.finalize(a, res.append, 'f4')
- f5 = weakref.finalize(a, res.append, 'f5')
-
- # make sure finalizers can keep themselves alive
- del f1, f4
-
- self.assertTrue(f2.alive)
- self.assertTrue(f3.alive)
- self.assertTrue(f5.alive)
-
- self.assertTrue(f5.detach())
- self.assertFalse(f5.alive)
-
- f5() # nothing because previously unregistered
- res.append('A')
- f3() # => res.append('f3')
- self.assertFalse(f3.alive)
- res.append('B')
- f3() # nothing because previously called
- res.append('C')
- del a
- self._collect_if_necessary()
- # => res.append('f4')
- # => res.append('f2')
- # => res.append('f1')
- self.assertFalse(f2.alive)
- res.append('D')
- f2() # nothing because previously called by gc
-
- expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
- self.assertEqual(res, expected)
-
- def test_all_freed(self):
- # we want a weakrefable subclass of weakref.finalize
- class MyFinalizer(weakref.finalize):
- pass
-
- a = self.A()
- res = []
- def callback():
- res.append(123)
- f = MyFinalizer(a, callback)
-
- wr_callback = weakref.ref(callback)
- wr_f = weakref.ref(f)
- del callback, f
-
- self.assertIsNotNone(wr_callback())
- self.assertIsNotNone(wr_f())
-
- del a
- self._collect_if_necessary()
-
- self.assertIsNone(wr_callback())
- self.assertIsNone(wr_f())
- self.assertEqual(res, [123])
-
- @classmethod
- def run_in_child(cls):
- def error():
- # Create an atexit finalizer from inside a finalizer called
- # at exit. This should be the next to be run.
- g1 = weakref.finalize(cls, print, 'g1')
- print('f3 error')
- 1/0
-
- # cls should stay alive till atexit callbacks run
- f1 = weakref.finalize(cls, print, 'f1', _global_var)
- f2 = weakref.finalize(cls, print, 'f2', _global_var)
- f3 = weakref.finalize(cls, error)
- f4 = weakref.finalize(cls, print, 'f4', _global_var)
-
- assert f1.atexit == True
- f2.atexit = False
- assert f3.atexit == True
- assert f4.atexit == True
-
- def test_atexit(self):
- prog = ('from test.test_weakref import FinalizeTestCase;'+
- 'FinalizeTestCase.run_in_child()')
- rc, out, err = script_helper.assert_python_ok('-c', prog)
- out = out.decode('ascii').splitlines()
- self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
- self.assertTrue(b'ZeroDivisionError' in err)
-
-
libreftest = """ Doctest for examples in the library reference: weakref.rst
>>> import weakref
@@ -2006,7 +1475,7 @@ libreftest = """ Doctest for examples in the library reference: weakref.rst
...
>>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable
>>> r = weakref.ref(obj)
->>> print(r() is obj)
+>>> print r() is obj
True
>>> import weakref
@@ -2019,21 +1488,21 @@ True
>>> o is o2
True
>>> del o, o2
->>> print(r())
+>>> print r()
None
>>> import weakref
>>> class ExtendedRef(weakref.ref):
... def __init__(self, ob, callback=None, **annotations):
-... super().__init__(ob, callback)
+... super(ExtendedRef, self).__init__(ob, callback)
... self.__counter = 0
-... for k, v in annotations.items():
+... for k, v in annotations.iteritems():
... setattr(self, k, v)
... def __call__(self):
... '''Return a pair containing the referent and the number of
... times the reference has been called.
... '''
-... ob = super().__call__()
+... ob = super(ExtendedRef, self).__call__()
... if ob is not None:
... self.__counter += 1
... ob = (ob, self.__counter)
@@ -2074,9 +1543,9 @@ True
>>> try:
... id2obj(a_id)
... except KeyError:
-... print('OK')
+... print 'OK'
... else:
-... print('WeakValueDictionary error')
+... print 'WeakValueDictionary error'
OK
"""
@@ -2084,16 +1553,14 @@ OK
__test__ = {'libreftest' : libreftest}
def test_main():
- support.run_unittest(
+ test_support.run_unittest(
ReferencesTestCase,
- WeakMethodTestCase,
MappingTestCase,
WeakValueDictionaryTestCase,
WeakKeyDictionaryTestCase,
SubclassableWeakrefTestCase,
- FinalizeTestCase,
)
- support.run_doctest(sys.modules[__name__])
+ test_support.run_doctest(sys.modules[__name__])
if __name__ == "__main__":