diff options
Diffstat (limited to 'Lib/test/test_thread_local_bytecode.py')
-rw-r--r-- | Lib/test/test_thread_local_bytecode.py | 198 |
1 files changed, 198 insertions, 0 deletions
diff --git a/Lib/test/test_thread_local_bytecode.py b/Lib/test/test_thread_local_bytecode.py new file mode 100644 index 0000000..7a8809c --- /dev/null +++ b/Lib/test/test_thread_local_bytecode.py @@ -0,0 +1,198 @@ +"""Tests for thread-local bytecode.""" +import dis +import textwrap +import unittest + +from test import support +from test.support import cpython_only, import_helper, requires_specialization_ft +from test.support.script_helper import assert_python_ok +from test.support.threading_helper import requires_working_threading + +# Skip this test if the _testinternalcapi module isn't available +_testinternalcapi = import_helper.import_module("_testinternalcapi") + + +@cpython_only +@requires_working_threading() +@unittest.skipUnless(support.Py_GIL_DISABLED, "only in free-threaded builds") +class TLBCTests(unittest.TestCase): + @requires_specialization_ft + def test_new_threads_start_with_unspecialized_code(self): + code = textwrap.dedent(""" + import dis + import queue + import threading + + from _testinternalcapi import get_tlbc + + def all_opnames(bc): + return {i.opname for i in dis._get_instructions_bytes(bc)} + + def f(a, b, q=None): + if q is not None: + q.put(get_tlbc(f)) + return a + b + + for _ in range(100): + # specialize + f(1, 2) + + q = queue.Queue() + t = threading.Thread(target=f, args=('a', 'b', q)) + t.start() + t.join() + + assert "BINARY_OP_ADD_INT" in all_opnames(get_tlbc(f)) + assert "BINARY_OP_ADD_INT" not in all_opnames(q.get()) + """) + assert_python_ok("-X", "tlbc=1", "-c", code) + + @requires_specialization_ft + def test_threads_specialize_independently(self): + code = textwrap.dedent(""" + import dis + import queue + import threading + + from _testinternalcapi import get_tlbc + + def all_opnames(bc): + return {i.opname for i in dis._get_instructions_bytes(bc)} + + def f(a, b): + return a + b + + def g(a, b, q=None): + for _ in range(100): + f(a, b) + if q is not None: + q.put(get_tlbc(f)) + + # specialize in main thread + g(1, 2) + + # specialize in other thread + q = queue.Queue() + t = threading.Thread(target=g, args=('a', 'b', q)) + t.start() + t.join() + + assert "BINARY_OP_ADD_INT" in all_opnames(get_tlbc(f)) + t_opnames = all_opnames(q.get()) + assert "BINARY_OP_ADD_INT" not in t_opnames + assert "BINARY_OP_ADD_UNICODE" in t_opnames + """) + assert_python_ok("-X", "tlbc=1", "-c", code) + + def test_reuse_tlbc_across_threads_different_lifetimes(self): + code = textwrap.dedent(""" + import queue + import threading + + from _testinternalcapi import get_tlbc_id + + def f(a, b, q=None): + if q is not None: + q.put(get_tlbc_id(f)) + return a + b + + q = queue.Queue() + tlbc_ids = [] + for _ in range(3): + t = threading.Thread(target=f, args=('a', 'b', q)) + t.start() + t.join() + tlbc_ids.append(q.get()) + + assert tlbc_ids[0] == tlbc_ids[1] + assert tlbc_ids[1] == tlbc_ids[2] + """) + assert_python_ok("-X", "tlbc=1", "-c", code) + + def test_no_copies_if_tlbc_disabled(self): + code = textwrap.dedent(""" + import queue + import threading + + from _testinternalcapi import get_tlbc_id + + def f(a, b, q=None): + if q is not None: + q.put(get_tlbc_id(f)) + return a + b + + q = queue.Queue() + threads = [] + for _ in range(3): + t = threading.Thread(target=f, args=('a', 'b', q)) + t.start() + threads.append(t) + + tlbc_ids = [] + for t in threads: + t.join() + tlbc_ids.append(q.get()) + + main_tlbc_id = get_tlbc_id(f) + assert main_tlbc_id is not None + assert tlbc_ids[0] == main_tlbc_id + assert tlbc_ids[1] == main_tlbc_id + assert tlbc_ids[2] == main_tlbc_id + """) + assert_python_ok("-X", "tlbc=0", "-c", code) + + def test_no_specialization_if_tlbc_disabled(self): + code = textwrap.dedent(""" + import dis + import queue + import threading + + from _testinternalcapi import get_tlbc + + def all_opnames(f): + bc = get_tlbc(f) + return {i.opname for i in dis._get_instructions_bytes(bc)} + + def f(a, b): + return a + b + + for _ in range(100): + f(1, 2) + + assert "BINARY_OP_ADD_INT" not in all_opnames(f) + """) + assert_python_ok("-X", "tlbc=0", "-c", code) + + def test_generator_throw(self): + code = textwrap.dedent(""" + import queue + import threading + + from _testinternalcapi import get_tlbc_id + + def g(): + try: + yield + except: + yield get_tlbc_id(g) + + def f(q): + gen = g() + next(gen) + q.put(gen.throw(ValueError)) + + q = queue.Queue() + t = threading.Thread(target=f, args=(q,)) + t.start() + t.join() + + gen = g() + next(gen) + main_id = gen.throw(ValueError) + assert main_id != q.get() + """) + assert_python_ok("-X", "tlbc=1", "-c", code) + + +if __name__ == "__main__": + unittest.main() |