summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_gc.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_gc.py')
-rw-r--r--Lib/test/test_gc.py69
1 files changed, 69 insertions, 0 deletions
diff --git a/Lib/test/test_gc.py b/Lib/test/test_gc.py
index d35f9ed..c59b72e 100644
--- a/Lib/test/test_gc.py
+++ b/Lib/test/test_gc.py
@@ -2,9 +2,15 @@ import unittest
from test.support import (verbose, refcount_test, run_unittest,
strip_python_stderr)
import sys
+import time
import gc
import weakref
+try:
+ import threading
+except ImportError:
+ threading = None
+
### Support code
###############################################################################
@@ -328,6 +334,69 @@ class GCTests(unittest.TestCase):
v = {1: v, 2: Ouch()}
gc.disable()
+ @unittest.skipUnless(threading, "test meaningless on builds without threads")
+ def test_trashcan_threads(self):
+ # Issue #13992: trashcan mechanism should be thread-safe
+ NESTING = 60
+ N_THREADS = 2
+
+ def sleeper_gen():
+ """A generator that releases the GIL when closed or dealloc'ed."""
+ try:
+ yield
+ finally:
+ time.sleep(0.000001)
+
+ class C(list):
+ # Appending to a list is atomic, which avoids the use of a lock.
+ inits = []
+ dels = []
+ def __init__(self, alist):
+ self[:] = alist
+ C.inits.append(None)
+ def __del__(self):
+ # This __del__ is called by subtype_dealloc().
+ C.dels.append(None)
+ # `g` will release the GIL when garbage-collected. This
+ # helps assert subtype_dealloc's behaviour when threads
+ # switch in the middle of it.
+ g = sleeper_gen()
+ next(g)
+ # Now that __del__ is finished, subtype_dealloc will proceed
+ # to call list_dealloc, which also uses the trashcan mechanism.
+
+ def make_nested():
+ """Create a sufficiently nested container object so that the
+ trashcan mechanism is invoked when deallocating it."""
+ x = C([])
+ for i in range(NESTING):
+ x = [C([x])]
+ del x
+
+ def run_thread():
+ """Exercise make_nested() in a loop."""
+ while not exit:
+ make_nested()
+
+ old_switchinterval = sys.getswitchinterval()
+ sys.setswitchinterval(1e-5)
+ try:
+ exit = False
+ threads = []
+ for i in range(N_THREADS):
+ t = threading.Thread(target=run_thread)
+ threads.append(t)
+ for t in threads:
+ t.start()
+ time.sleep(1.0)
+ exit = True
+ for t in threads:
+ t.join()
+ finally:
+ sys.setswitchinterval(old_switchinterval)
+ gc.collect()
+ self.assertEqual(len(C.inits), len(C.dels))
+
def test_boom(self):
class Boom:
def __getattr__(self, someattribute):