diff options
-rw-r--r-- | Lib/multiprocessing/util.py | 34 | ||||
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 63 | ||||
-rw-r--r-- | Misc/NEWS | 2 |
3 files changed, 86 insertions, 13 deletions
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 0ce274c..b490caa 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -241,20 +241,28 @@ def _run_finalizers(minpriority=None): return if minpriority is None: - f = lambda p : p[0][0] is not None + f = lambda p : p[0] is not None else: - f = lambda p : p[0][0] is not None and p[0][0] >= minpriority - - items = [x for x in list(_finalizer_registry.items()) if f(x)] - items.sort(reverse=True) - - for key, finalizer in items: - sub_debug('calling %s', finalizer) - try: - finalizer() - except Exception: - import traceback - traceback.print_exc() + f = lambda p : p[0] is not None and p[0] >= minpriority + + # Careful: _finalizer_registry may be mutated while this function + # is running (either by a GC run or by another thread). + + # list(_finalizer_registry) should be atomic, while + # list(_finalizer_registry.items()) is not. + keys = [key for key in list(_finalizer_registry) if f(key)] + keys.sort(reverse=True) + + for key in keys: + finalizer = _finalizer_registry.get(key) + # key may have been removed from the registry + if finalizer is not None: + sub_debug('calling %s', finalizer) + try: + finalizer() + except Exception: + import traceback + traceback.print_exc() if minpriority is None: _finalizer_registry.clear() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 70ecc54..d49e9c6 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3110,6 +3110,14 @@ class _TestFinalize(BaseTestCase): ALLOWED_TYPES = ('processes',) + def setUp(self): + self.registry_backup = util._finalizer_registry.copy() + util._finalizer_registry.clear() + + def tearDown(self): + self.assertFalse(util._finalizer_registry) + util._finalizer_registry.update(self.registry_backup) + @classmethod def _test_finalize(cls, conn): class Foo(object): @@ -3159,6 +3167,61 @@ class _TestFinalize(BaseTestCase): result = [obj for obj in iter(conn.recv, 'STOP')] self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) + def test_thread_safety(self): + # bpo-24484: _run_finalizers() should be thread-safe + def cb(): + pass + + class Foo(object): + def __init__(self): + self.ref = self # create reference cycle + # insert finalizer at random key + util.Finalize(self, cb, exitpriority=random.randint(1, 100)) + + finish = False + exc = None + + def run_finalizers(): + nonlocal exc + while not finish: + time.sleep(random.random() * 1e-1) + try: + # A GC run will eventually happen during this, + # collecting stale Foo's and mutating the registry + util._run_finalizers() + except Exception as e: + exc = e + + def make_finalizers(): + nonlocal exc + d = {} + while not finish: + try: + # Old Foo's get gradually replaced and later + # collected by the GC (because of the cyclic ref) + d[random.getrandbits(5)] = {Foo() for i in range(10)} + except Exception as e: + exc = e + d.clear() + + old_interval = sys.getswitchinterval() + old_threshold = gc.get_threshold() + try: + sys.setswitchinterval(1e-6) + gc.set_threshold(5, 5, 5) + threads = [threading.Thread(target=run_finalizers), + threading.Thread(target=make_finalizers)] + with test.support.start_threads(threads): + time.sleep(4.0) # Wait a bit to trigger race condition + finish = True + if exc is not None: + raise exc + finally: + sys.setswitchinterval(old_interval) + gc.set_threshold(*old_threshold) + gc.collect() # Collect remaining Foo's + + # # Test that from ... import * works for each module # @@ -362,6 +362,8 @@ Extension Modules Library ------- +- bpo-24484: Avoid race condition in multiprocessing cleanup. + - bpo-30589: Fix multiprocessing.Process.exitcode to return the opposite of the signal number when the process is killed by a signal (instead of 255) when using the "forkserver" method. |