diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2016-12-19 10:12:58 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2016-12-19 10:12:58 (GMT) |
commit | 805f283aa311043a498fecc29cf7bc13e4311fd6 (patch) | |
tree | 69429d7b935fd7f4d1ea2fb039826ecbdbbfb9dd /Lib/test/test_weakref.py | |
parent | 88e420645612cbec172820c2c80a3f1fe68312c7 (diff) | |
download | cpython-805f283aa311043a498fecc29cf7bc13e4311fd6.zip cpython-805f283aa311043a498fecc29cf7bc13e4311fd6.tar.gz cpython-805f283aa311043a498fecc29cf7bc13e4311fd6.tar.bz2 |
Issue #19542: Fix bugs in WeakValueDictionary.setdefault() and WeakValueDictionary.pop()
when a GC collection happens in another thread.
Original patch and report by Armin Rigo.
Diffstat (limited to 'Lib/test/test_weakref.py')
-rw-r--r-- | Lib/test/test_weakref.py | 44 |
1 files changed, 44 insertions, 0 deletions
diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py index 4073d49..779a9b3 100644 --- a/Lib/test/test_weakref.py +++ b/Lib/test/test_weakref.py @@ -6,6 +6,7 @@ import weakref import operator import contextlib import copy +import time from test import test_support @@ -56,6 +57,32 @@ class RefCycle: self.cycle = self +@contextlib.contextmanager +def collect_in_thread(period=0.0001): + """ + Ensure GC collections happen in a different thread, at a high frequency. + """ + threading = test_support.import_module('threading') + please_stop = False + + def collect(): + while not please_stop: + time.sleep(period) + gc.collect() + + with test_support.disable_gc(): + old_interval = sys.getcheckinterval() + sys.setcheckinterval(20) + t = threading.Thread(target=collect) + t.start() + try: + yield + finally: + please_stop = True + t.join() + sys.setcheckinterval(old_interval) + + class TestBase(unittest.TestCase): def setUp(self): @@ -1394,6 +1421,23 @@ class MappingTestCase(TestBase): self.assertEqual(len(d), 0) self.assertEqual(count, 2) + def test_threaded_weak_valued_setdefault(self): + d = weakref.WeakValueDictionary() + with collect_in_thread(): + for i in range(50000): + x = d.setdefault(10, RefCycle()) + self.assertIsNot(x, None) # we never put None in there! + del x + + def test_threaded_weak_valued_pop(self): + d = weakref.WeakValueDictionary() + with collect_in_thread(): + for i in range(50000): + d[10] = RefCycle() + x = d.pop(10, 10) + self.assertIsNot(x, None) # we never put None in there! + + from test import mapping_tests class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): |