diff options
Diffstat (limited to 'Lib/test/test_thread.py')
-rw-r--r-- | Lib/test/test_thread.py | 97 |
1 files changed, 94 insertions, 3 deletions
diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index a86f86b..a191e15 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -2,8 +2,10 @@ import os import unittest import random from test import support -import _thread as thread +thread = support.import_module('_thread') import time +import sys +import weakref from test import lock_tests @@ -61,7 +63,7 @@ class ThreadRunningTests(BasicThreadTest): def test_stack_size(self): # Various stack size tests. - self.assertEqual(thread.stack_size(), 0, "intial stack size is not 0") + self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0") thread.stack_size(0) self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") @@ -100,6 +102,55 @@ class ThreadRunningTests(BasicThreadTest): thread.stack_size(0) + def test__count(self): + # Test the _count() function. + orig = thread._count() + mut = thread.allocate_lock() + mut.acquire() + started = [] + def task(): + started.append(None) + mut.acquire() + mut.release() + thread.start_new_thread(task, ()) + while not started: + time.sleep(0.01) + self.assertEqual(thread._count(), orig + 1) + # Allow the task to finish. + mut.release() + # The only reliable way to be sure that the thread ended from the + # interpreter's point of view is to wait for the function object to be + # destroyed. + done = [] + wr = weakref.ref(task, lambda _: done.append(None)) + del task + while not done: + time.sleep(0.01) + self.assertEqual(thread._count(), orig) + + def test_save_exception_state_on_error(self): + # See issue #14474 + def task(): + started.release() + raise SyntaxError + def mywrite(self, *args): + try: + raise ValueError + except ValueError: + pass + real_write(self, *args) + c = thread._count() + started = thread.allocate_lock() + with support.captured_output("stderr") as stderr: + real_write = stderr.write + stderr.write = mywrite + started.acquire() + thread.start_new_thread(task, ()) + started.acquire() + while thread._count() > c: + time.sleep(0.01) + self.assertIn("Traceback", stderr.getvalue()) + class Barrier: def __init__(self, num_threads): @@ -166,8 +217,48 @@ class LockTests(lock_tests.LockTests): locktype = thread.allocate_lock +class TestForkInThread(unittest.TestCase): + def setUp(self): + self.read_fd, self.write_fd = os.pipe() + + @unittest.skipIf(sys.platform.startswith('win'), + "This test is only appropriate for POSIX-like systems.") + @support.reap_threads + def test_forkinthread(self): + def thread1(): + try: + pid = os.fork() # fork in a thread + except RuntimeError: + os._exit(1) # exit the child + + if pid == 0: # child + try: + os.close(self.read_fd) + os.write(self.write_fd, b"OK") + finally: + os._exit(0) + else: # parent + os.close(self.write_fd) + + thread.start_new_thread(thread1, ()) + self.assertEqual(os.read(self.read_fd, 2), b"OK", + "Unable to fork() in thread") + + def tearDown(self): + try: + os.close(self.read_fd) + except OSError: + pass + + try: + os.close(self.write_fd) + except OSError: + pass + + def test_main(): - support.run_unittest(ThreadRunningTests, BarrierTest, LockTests) + support.run_unittest(ThreadRunningTests, BarrierTest, LockTests, + TestForkInThread) if __name__ == "__main__": test_main() |