summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_thread.py')
-rw-r--r--Lib/test/test_thread.py97
1 files changed, 94 insertions, 3 deletions
diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py
index a86f86b..a191e15 100644
--- a/Lib/test/test_thread.py
+++ b/Lib/test/test_thread.py
@@ -2,8 +2,10 @@ import os
import unittest
import random
from test import support
-import _thread as thread
+thread = support.import_module('_thread')
import time
+import sys
+import weakref
from test import lock_tests
@@ -61,7 +63,7 @@ class ThreadRunningTests(BasicThreadTest):
def test_stack_size(self):
# Various stack size tests.
- self.assertEqual(thread.stack_size(), 0, "intial stack size is not 0")
+ self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
thread.stack_size(0)
self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
@@ -100,6 +102,55 @@ class ThreadRunningTests(BasicThreadTest):
thread.stack_size(0)
+ def test__count(self):
+ # Test the _count() function.
+ orig = thread._count()
+ mut = thread.allocate_lock()
+ mut.acquire()
+ started = []
+ def task():
+ started.append(None)
+ mut.acquire()
+ mut.release()
+ thread.start_new_thread(task, ())
+ while not started:
+ time.sleep(0.01)
+ self.assertEqual(thread._count(), orig + 1)
+ # Allow the task to finish.
+ mut.release()
+ # The only reliable way to be sure that the thread ended from the
+ # interpreter's point of view is to wait for the function object to be
+ # destroyed.
+ done = []
+ wr = weakref.ref(task, lambda _: done.append(None))
+ del task
+ while not done:
+ time.sleep(0.01)
+ self.assertEqual(thread._count(), orig)
+
+ def test_save_exception_state_on_error(self):
+ # See issue #14474
+ def task():
+ started.release()
+ raise SyntaxError
+ def mywrite(self, *args):
+ try:
+ raise ValueError
+ except ValueError:
+ pass
+ real_write(self, *args)
+ c = thread._count()
+ started = thread.allocate_lock()
+ with support.captured_output("stderr") as stderr:
+ real_write = stderr.write
+ stderr.write = mywrite
+ started.acquire()
+ thread.start_new_thread(task, ())
+ started.acquire()
+ while thread._count() > c:
+ time.sleep(0.01)
+ self.assertIn("Traceback", stderr.getvalue())
+
class Barrier:
def __init__(self, num_threads):
@@ -166,8 +217,48 @@ class LockTests(lock_tests.LockTests):
locktype = thread.allocate_lock
+class TestForkInThread(unittest.TestCase):
+ def setUp(self):
+ self.read_fd, self.write_fd = os.pipe()
+
+ @unittest.skipIf(sys.platform.startswith('win'),
+ "This test is only appropriate for POSIX-like systems.")
+ @support.reap_threads
+ def test_forkinthread(self):
+ def thread1():
+ try:
+ pid = os.fork() # fork in a thread
+ except RuntimeError:
+ os._exit(1) # exit the child
+
+ if pid == 0: # child
+ try:
+ os.close(self.read_fd)
+ os.write(self.write_fd, b"OK")
+ finally:
+ os._exit(0)
+ else: # parent
+ os.close(self.write_fd)
+
+ thread.start_new_thread(thread1, ())
+ self.assertEqual(os.read(self.read_fd, 2), b"OK",
+ "Unable to fork() in thread")
+
+ def tearDown(self):
+ try:
+ os.close(self.read_fd)
+ except OSError:
+ pass
+
+ try:
+ os.close(self.write_fd)
+ except OSError:
+ pass
+
+
def test_main():
- support.run_unittest(ThreadRunningTests, BarrierTest, LockTests)
+ support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,
+ TestForkInThread)
if __name__ == "__main__":
test_main()