summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_weakset.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_weakset.py')
-rw-r--r--Lib/test/test_weakset.py50
1 files changed, 50 insertions, 0 deletions
diff --git a/Lib/test/test_weakset.py b/Lib/test/test_weakset.py
index 651efe2..4e0aa38 100644
--- a/Lib/test/test_weakset.py
+++ b/Lib/test/test_weakset.py
@@ -10,6 +10,8 @@ import sys
import warnings
import collections
from collections import UserString as ustr
+import gc
+import contextlib
class Foo:
@@ -307,6 +309,54 @@ class TestWeakSet(unittest.TestCase):
self.assertFalse(self.s == WeakSet([Foo]))
self.assertFalse(self.s == 1)
+ def test_weak_destroy_while_iterating(self):
+ # Issue #7105: iterators shouldn't crash when a key is implicitly removed
+ # Create new items to be sure no-one else holds a reference
+ items = [ustr(c) for c in ('a', 'b', 'c')]
+ s = WeakSet(items)
+ it = iter(s)
+ next(it) # Trigger internal iteration
+ # Destroy an item
+ del items[-1]
+ gc.collect() # just in case
+ # We have removed either the first consumed items, or another one
+ self.assertIn(len(list(it)), [len(items), len(items) - 1])
+ del it
+ # The removal has been committed
+ self.assertEqual(len(s), len(items))
+
+ def test_weak_destroy_and_mutate_while_iterating(self):
+ # Issue #7105: iterators shouldn't crash when a key is implicitly removed
+ items = [ustr(c) for c in string.ascii_letters]
+ s = WeakSet(items)
+ @contextlib.contextmanager
+ def testcontext():
+ try:
+ it = iter(s)
+ next(it)
+ # Schedule an item for removal and recreate it
+ u = ustr(str(items.pop()))
+ gc.collect() # just in case
+ yield u
+ finally:
+ it = None # should commit all removals
+
+ with testcontext() as u:
+ self.assertFalse(u in s)
+ with testcontext() as u:
+ self.assertRaises(KeyError, s.remove, u)
+ self.assertFalse(u in s)
+ with testcontext() as u:
+ s.add(u)
+ self.assertTrue(u in s)
+ t = s.copy()
+ with testcontext() as u:
+ s.update(t)
+ self.assertEqual(len(s), len(t))
+ with testcontext() as u:
+ s.clear()
+ self.assertEqual(len(s), 0)
+
def test_main(verbose=None):
support.run_unittest(TestWeakSet)