summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_weakref.py
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2016-12-19 09:58:14 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2016-12-19 09:58:14 (GMT)
commitd4580ecb8da130fd9061565defcdb73bc08aa311 (patch)
treeafe4516e0ddd29c057367e47f188a14e35af9aa4 /Lib/test/test_weakref.py
parent5bccb0e03db7fc3356b294a0d6c7a1687533ca38 (diff)
parentc1ee488962f0a20c2814b14a4c29d6082dd38add (diff)
downloadcpython-d4580ecb8da130fd9061565defcdb73bc08aa311.zip
cpython-d4580ecb8da130fd9061565defcdb73bc08aa311.tar.gz
cpython-d4580ecb8da130fd9061565defcdb73bc08aa311.tar.bz2
Issue #19542: Fix bugs in WeakValueDictionary.setdefault() and WeakValueDictionary.pop()
when a GC collection happens in another thread. Original patch and report by Armin Rigo.
Diffstat (limited to 'Lib/test/test_weakref.py')
-rw-r--r--Lib/test/test_weakref.py41
1 files changed, 41 insertions, 0 deletions
diff --git a/Lib/test/test_weakref.py b/Lib/test/test_weakref.py
index a474a07..9341c6d 100644
--- a/Lib/test/test_weakref.py
+++ b/Lib/test/test_weakref.py
@@ -6,6 +6,7 @@ import weakref
import operator
import contextlib
import copy
+import time
from test import support
from test.support import script_helper
@@ -72,6 +73,29 @@ class TestBase(unittest.TestCase):
self.cbcalled += 1
+@contextlib.contextmanager
+def collect_in_thread(period=0.0001):
+ """
+ Ensure GC collections happen in a different thread, at a high frequency.
+ """
+ threading = support.import_module('threading')
+ please_stop = False
+
+ def collect():
+ while not please_stop:
+ time.sleep(period)
+ gc.collect()
+
+ with support.disable_gc():
+ t = threading.Thread(target=collect)
+ t.start()
+ try:
+ yield
+ finally:
+ please_stop = True
+ t.join()
+
+
class ReferencesTestCase(TestBase):
def test_basic_ref(self):
@@ -1636,6 +1660,23 @@ class MappingTestCase(TestBase):
dict = weakref.WeakKeyDictionary()
self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
+ def test_threaded_weak_valued_setdefault(self):
+ d = weakref.WeakValueDictionary()
+ with collect_in_thread():
+ for i in range(100000):
+ x = d.setdefault(10, RefCycle())
+ self.assertIsNot(x, None) # we never put None in there!
+ del x
+
+ def test_threaded_weak_valued_pop(self):
+ d = weakref.WeakValueDictionary()
+ with collect_in_thread():
+ for i in range(100000):
+ d[10] = RefCycle()
+ x = d.pop(10, 10)
+ self.assertIsNot(x, None) # we never put None in there!
+
+
from test import mapping_tests
class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):