diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 3 | ||||
-rw-r--r-- | Lib/test/audit-tests.py | 3 | ||||
-rw-r--r-- | Lib/test/test_audit.py | 2 | ||||
-rw-r--r-- | Lib/test/test_concurrent_futures/test_process_pool.py | 6 | ||||
-rw-r--r-- | Lib/test/test_thread.py | 126 | ||||
-rw-r--r-- | Lib/test/test_threading.py | 47 |
6 files changed, 181 insertions, 6 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index bf87a3e..ec003d8 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2693,6 +2693,9 @@ class _TestPool(BaseTestCase): p.join() def test_terminate(self): + if self.TYPE == 'threads': + self.skipTest("Threads cannot be terminated") + # Simulate slow tasks which take "forever" to complete p = self.Pool(3) args = [support.LONG_TIMEOUT for i in range(10_000)] diff --git a/Lib/test/audit-tests.py b/Lib/test/audit-tests.py index 89f407d..ce4a11b 100644 --- a/Lib/test/audit-tests.py +++ b/Lib/test/audit-tests.py @@ -455,6 +455,9 @@ def test_threading(): i = _thread.start_new_thread(test_func(), ()) lock.acquire() + handle = _thread.start_joinable_thread(test_func()) + handle.join() + def test_threading_abort(): # Ensures that aborting PyThreadState_New raises the correct exception diff --git a/Lib/test/test_audit.py b/Lib/test/test_audit.py index 47e5832..cd0a4e2 100644 --- a/Lib/test/test_audit.py +++ b/Lib/test/test_audit.py @@ -209,6 +209,8 @@ class AuditTest(unittest.TestCase): expected = [ ("_thread.start_new_thread", "(<test_func>, (), None)"), ("test.test_func", "()"), + ("_thread.start_joinable_thread", "(<test_func>,)"), + ("test.test_func", "()"), ] self.assertEqual(actual, expected) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index c73c2da..3e61b0c 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -194,11 +194,11 @@ class ProcessPoolExecutorTest(ExecutorTest): context = self.get_context() - # gh-109047: Mock the threading.start_new_thread() function to inject + # gh-109047: Mock the threading.start_joinable_thread() function to inject # RuntimeError: simulate the error raised during Python finalization. # Block the second creation: create _ExecutorManagerThread, but block # QueueFeederThread. - orig_start_new_thread = threading._start_new_thread + orig_start_new_thread = threading._start_joinable_thread nthread = 0 def mock_start_new_thread(func, *args): nonlocal nthread @@ -208,7 +208,7 @@ class ProcessPoolExecutorTest(ExecutorTest): nthread += 1 return orig_start_new_thread(func, *args) - with support.swap_attr(threading, '_start_new_thread', + with support.swap_attr(threading, '_start_joinable_thread', mock_start_new_thread): executor = self.executor_type(max_workers=2, mp_context=context) with executor: diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index 831aaf5..931cb4b 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -160,6 +160,132 @@ class ThreadRunningTests(BasicThreadTest): f"Exception ignored in thread started by {task!r}") self.assertIsNotNone(cm.unraisable.exc_traceback) + def test_join_thread(self): + finished = [] + + def task(): + time.sleep(0.05) + finished.append(thread.get_ident()) + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handle.join() + self.assertEqual(len(finished), 1) + self.assertEqual(handle.ident, finished[0]) + + def test_join_thread_already_exited(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + time.sleep(0.05) + handle.join() + + def test_join_several_times(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handle.join() + with self.assertRaisesRegex(ValueError, "not joinable"): + handle.join() + + def test_joinable_not_joined(self): + handle_destroyed = thread.allocate_lock() + handle_destroyed.acquire() + + def task(): + handle_destroyed.acquire() + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + del handle + handle_destroyed.release() + + def test_join_from_self(self): + errors = [] + handles = [] + start_joinable_thread_returned = thread.allocate_lock() + start_joinable_thread_returned.acquire() + task_tried_to_join = thread.allocate_lock() + task_tried_to_join.acquire() + + def task(): + start_joinable_thread_returned.acquire() + try: + handles[0].join() + except Exception as e: + errors.append(e) + finally: + task_tried_to_join.release() + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handles.append(handle) + start_joinable_thread_returned.release() + # Can still join after joining failed in other thread + task_tried_to_join.acquire() + handle.join() + + assert len(errors) == 1 + with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): + raise errors[0] + + def test_detach_from_self(self): + errors = [] + handles = [] + start_joinable_thread_returned = thread.allocate_lock() + start_joinable_thread_returned.acquire() + thread_detached = thread.allocate_lock() + thread_detached.acquire() + + def task(): + start_joinable_thread_returned.acquire() + try: + handles[0].detach() + except Exception as e: + errors.append(e) + finally: + thread_detached.release() + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handles.append(handle) + start_joinable_thread_returned.release() + thread_detached.acquire() + with self.assertRaisesRegex(ValueError, "not joinable"): + handle.join() + + assert len(errors) == 0 + + def test_detach_then_join(self): + lock = thread.allocate_lock() + lock.acquire() + + def task(): + lock.acquire() + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + # detach() returns even though the thread is blocked on lock + handle.detach() + # join() then cannot be called anymore + with self.assertRaisesRegex(ValueError, "not joinable"): + handle.join() + lock.release() + + def test_join_then_detach(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + handle = thread.start_joinable_thread(task) + handle.join() + with self.assertRaisesRegex(ValueError, "not joinable"): + handle.detach() + class Barrier: def __init__(self, num_threads): diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 00a6437..146e2db 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -376,8 +376,8 @@ class ThreadTests(BaseTestCase): # Issue 7481: Failure to start thread should cleanup the limbo map. def fail_new_thread(*args): raise threading.ThreadError() - _start_new_thread = threading._start_new_thread - threading._start_new_thread = fail_new_thread + _start_joinable_thread = threading._start_joinable_thread + threading._start_joinable_thread = fail_new_thread try: t = threading.Thread(target=lambda: None) self.assertRaises(threading.ThreadError, t.start) @@ -385,7 +385,7 @@ class ThreadTests(BaseTestCase): t in threading._limbo, "Failed to cleanup _limbo map on failure of Thread.start().") finally: - threading._start_new_thread = _start_new_thread + threading._start_joinable_thread = _start_joinable_thread def test_finalize_running_thread(self): # Issue 1402: the PyGILState_Ensure / _Release functions may be called @@ -482,6 +482,47 @@ class ThreadTests(BaseTestCase): finally: sys.setswitchinterval(old_interval) + def test_join_from_multiple_threads(self): + # Thread.join() should be thread-safe + errors = [] + + def worker(): + time.sleep(0.005) + + def joiner(thread): + try: + thread.join() + except Exception as e: + errors.append(e) + + for N in range(2, 20): + threads = [threading.Thread(target=worker)] + for i in range(N): + threads.append(threading.Thread(target=joiner, + args=(threads[0],))) + for t in threads: + t.start() + time.sleep(0.01) + for t in threads: + t.join() + if errors: + raise errors[0] + + def test_join_with_timeout(self): + lock = _thread.allocate_lock() + lock.acquire() + + def worker(): + lock.acquire() + + thread = threading.Thread(target=worker) + thread.start() + thread.join(timeout=0.01) + assert thread.is_alive() + lock.release() + thread.join() + assert not thread.is_alive() + def test_no_refcycle_through_target(self): class RunSelfFunction(object): def __init__(self, should_raise): |