diff options
Diffstat (limited to 'Lib/test/test_gc.py')
-rw-r--r-- | Lib/test/test_gc.py | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py index d35f9ed..c59b72e 100644 --- a/Lib/test/test_gc.py +++ b/Lib/test/test_gc.py @@ -2,9 +2,15 @@ import unittest from test.support import (verbose, refcount_test, run_unittest, strip_python_stderr) import sys +import time import gc import weakref +try: + import threading +except ImportError: + threading = None + ### Support code ############################################################################### @@ -328,6 +334,69 @@ class GCTests(unittest.TestCase): v = {1: v, 2: Ouch()} gc.disable() + @unittest.skipUnless(threading, "test meaningless on builds without threads") + def test_trashcan_threads(self): + # Issue #13992: trashcan mechanism should be thread-safe + NESTING = 60 + N_THREADS = 2 + + def sleeper_gen(): + """A generator that releases the GIL when closed or dealloc'ed.""" + try: + yield + finally: + time.sleep(0.000001) + + class C(list): + # Appending to a list is atomic, which avoids the use of a lock. + inits = [] + dels = [] + def __init__(self, alist): + self[:] = alist + C.inits.append(None) + def __del__(self): + # This __del__ is called by subtype_dealloc(). + C.dels.append(None) + # `g` will release the GIL when garbage-collected. This + # helps assert subtype_dealloc's behaviour when threads + # switch in the middle of it. + g = sleeper_gen() + next(g) + # Now that __del__ is finished, subtype_dealloc will proceed + # to call list_dealloc, which also uses the trashcan mechanism. + + def make_nested(): + """Create a sufficiently nested container object so that the + trashcan mechanism is invoked when deallocating it.""" + x = C([]) + for i in range(NESTING): + x = [C([x])] + del x + + def run_thread(): + """Exercise make_nested() in a loop.""" + while not exit: + make_nested() + + old_switchinterval = sys.getswitchinterval() + sys.setswitchinterval(1e-5) + try: + exit = False + threads = [] + for i in range(N_THREADS): + t = threading.Thread(target=run_thread) + threads.append(t) + for t in threads: + t.start() + time.sleep(1.0) + exit = True + for t in threads: + t.join() + finally: + sys.setswitchinterval(old_switchinterval) + gc.collect() + self.assertEqual(len(C.inits), len(C.dels)) + def test_boom(self): class Boom: def __getattr__(self, someattribute): |