/* * This file derives from SFMT 1.3.3 * (http://www.math.sci.hiroshima-u.ac.jp/~m-mat/MT/SFMT/index.html), which was * released under the terms of the following license: * * Copyright (c) 2006,2007 Mutsuo Saito, Makoto Matsumoto and Hiroshima * University. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials provided * with the distribution. * * Neither the name of the Hiroshima University nor the names of * its contributors may be used to endorse or promote products * derived from this software without specific prior written * permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */#ifndef SFMT_PARAMS2281_H#define SFMT_PARAMS2281_H#define POS1 12#define SL1 19#define SL2 1#define SR1 5#define SR2 1#define MSK1 0xbff7ffbfU#define MSK2 0xfdfffffeU#define MSK3 0xf7ffef7fU#define MSK4 0xf2f7cbbfU#define PARITY1 0x00000001U#define PARITY2 0x00000000U#define PARITY3 0x00000000U#define PARITY4 0x41dfa600U/* PARAMETERS FOR ALTIVEC */#if defined(__APPLE__)/* For OSX */#define ALTI_SL1 (vector unsigned int)(SL1, SL1, SL1, SL1)#define ALTI_SR1 (vector unsigned int)(SR1, SR1, SR1, SR1)#define ALTI_MSK (vector unsigned int)(MSK1, MSK2, MSK3, MSK4)#define ALTI_MSK64 \ (vector unsigned int)(MSK2, MSK1, MSK4, MSK3)#define ALTI_SL2_PERM \ (vector unsigned char)(1,2,3,23,5,6,7,0,9,10,11,4,13,14,15,8)#define ALTI_SL2_PERM64 \ (vector unsigned char)(1,2,3,4,5,6,7,31,9,10,11,12,13,14,15,0)#define ALTI_SR2_PERM \ (vector unsigned char)(7,0,1,2,11,4,5,6,15,8,9,10,17,12,13,14)#define ALTI_SR2_PERM64 \ (vector unsigned char)(15,0,1,2,3,4,5,6,17,8,9,10,11,12,13,14)#else/* For OTHER OSs(Linux?) */#define ALTI_SL1 {SL1, SL1, SL1, SL1}#define ALTI_SR1 {SR1, SR1, SR1, SR1}#define ALTI_MSK {MSK1, MSK2, MSK3, MSK4}#define ALTI_MSK64 {MSK2, MSK1, MSK4, MSK3}#define ALTI_SL2_PERM {1,2,3,23,5,6,7,0,9,10,11,4,13,14,15,8}#define ALTI_SL2_PERM64 {1,2,3,4,5,6,7,31,9,10,11,12,13,14,15,0}#define ALTI_SR2_PERM {7,0,1,2,11,4,5,6,15,8,9,10,17,12,13,14}#define ALTI_SR2_PERM64 {15,0,1,2,3,4,5,6,17,8,9,10,11,12,13,14}#endif/* For OSX */#define IDSTR"SFMT-2281:12-19-1-5-1:bff7ffbf-fdfffffe-f7ffef7f-f2f7cbbf"#endif/* SFMT_PARAMS2281_H */
"""Tests for the threading module."""import test.support
from test.support import verbose, strip_python_stderr, import_module, cpython_only
from test.script_helper import assert_python_ok, assert_python_failure
import random
import re
import sys
_thread =import_module('_thread')
threading =import_module('threading')import time
import unittest
import weakref
import os
import subprocess
from test import lock_tests
# Between fork() and exec(), only async-safe functions are allowed (issues# #12316 and #11870), and fork() from a worker thread is known to trigger# problems with some operating systems (issue #3863): skip problematic tests# on platforms known to behave badly.
platforms_to_skip = ('freebsd4','freebsd5','freebsd6','netbsd5','hp-ux11')# A trivial mutable counter.classCounter(object):def__init__(self):
self.value =0definc(self):
self.value +=1defdec(self):
self.value -=1defget(self):return self.value
classTestThread(threading.Thread):def__init__(self, name, testcase, sema, mutex, nrunning):
threading.Thread.__init__(self, name=name)
self.testcase = testcase
self.sema = sema
self.mutex = mutex
self.nrunning = nrunning
defrun(self):
delay = random.random() /10000.0if verbose:print('task %s will run for %.1f usec'%(self.name, delay *1e6))
with self.sema:
with self.mutex:
self.nrunning.inc()if verbose:print(self.nrunning.get(),'tasks are running')
self.testcase.assertTrue(self.nrunning.get() <=3)
time.sleep(delay)if verbose:print('task', self.name,'done')
with self.mutex:
self.nrunning.dec()
self.testcase.assertTrue(self.nrunning.get() >=0)if verbose:print('%s is finished. %d tasks are running'%(self.name, self.nrunning.get()))classBaseTestCase(unittest.TestCase):defsetUp(self):
self._threads = test.support.threading_setup()deftearDown(self):
test.support.threading_cleanup(*self._threads)
test.support.reap_children()classThreadTests(BaseTestCase):# Create a bunch of threads, let each do some work, wait until all are# done.deftest_various_ops(self):# This takes about n/3 seconds to run (about n/3 clumps of tasks,# times about 1 second per clump).
NUMTASKS =10# no more than 3 of the 10 can run at once
sema = threading.BoundedSemaphore(value=3)
mutex = threading.RLock()
numrunning =Counter()
threads = []for i inrange(NUMTASKS):
t =TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
threads.append(t)
self.assertEqual(t.ident,None)
self.assertTrue(re.match('<TestThread\(.*, initial\)>',repr(t)))
t.start()if verbose:print('waiting for all tasks to complete')for t in threads:
t.join()
self.assertTrue(not t.is_alive())
self.assertNotEqual(t.ident,0)
self.assertFalse(t.ident is None)
self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>',repr(t)))if verbose:print('all tasks done')
self.assertEqual(numrunning.get(),0)deftest_ident_of_no_threading_threads(self):# The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None)deff():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
_thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0]is None)# Kill the "immortal" _DummyThreaddel threading._active[ident[0]]# run with a small(ish) thread stack size (256kB)deftest_various_ops_small_stack(self):if verbose:print('with 256kB thread stack size...')try:
threading.stack_size(262144)except _thread.error:raise unittest.SkipTest('platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)# run with a large thread stack size (1MB)deftest_various_ops_large_stack(self):if verbose:print('with 1MB thread stack size...')try:
threading.stack_size(0x100000)except _thread.error:raise unittest.SkipTest('platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)deftest_foreign_thread(self):# Check that a "foreign" thread can use the threading module.deff(mutex):# Calling current_thread() forces an entry for the foreign# thread to get made in the threading._active map.
threading.current_thread()
mutex.release()
mutex = threading.Lock()
mutex.acquire()
tid = _thread.start_new_thread(f, (mutex,))# Wait for the thread to finish.
mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)del threading._active[tid]# PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)# exposed at the Python level. This test relies on ctypes to get at it.deftest_PyThreadState_SetAsyncExc(self):
ctypes =import_module("ctypes")
set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
classAsyncExc(Exception):pass
exception = ctypes.py_object(AsyncExc)# First check it works when setting the exception from the same thread.
tid = threading.get_ident()try:
result =set_async_exc(ctypes.c_long(tid), exception)# The exception is async, so we might have to keep the VM busy until# it notices.while True:passexcept AsyncExc:passelse:# This code is unreachable but it reflects the intent. If we wanted# to be smarter the above loop wouldn't be infinite.
self.fail("AsyncExc not raised")try:
self.assertEqual(result,1)# one thread state modifiedexceptUnboundLocalError:# The exception was raised too quickly for us to get the result.pass# `worker_started` is set by the thread when it's inside a try/except# block waiting to catch the asynchronously set AsyncExc exception.# `worker_saw_exception` is set by the thread upon catching that# exception.
worker_started = threading.Event()
worker_saw_exception = threading.Event()classWorker(threading.Thread):defrun(self):
self.id= threading.get_ident()
self.finished =Falsetry:while True:
worker_started.set()
time.sleep(0.1)except AsyncExc:
self.finished =True
worker_saw_exception.set()
t =Worker()
t.daemon =True# so if this fails, we don't hang Python at shutdown
t.start()if verbose:print(" started worker thread")# Try a thread id that doesn't make sense.if verbose:print(" trying nonsensical thread id")
result =set_async_exc(ctypes.c_long(-1), exception)
self.assertEqual(result,0)# no thread states modified# Now raise an exception in the worker thread.if verbose:print(" waiting for worker thread to get started")
ret = worker_started.wait()
self.assertTrue(ret)if verbose:print(" verifying worker hasn't exited")
self.assertTrue(not t.finished)if verbose:print(" attempting to raise asynch exception in worker")
result =set_async_exc(ctypes.c_long(t.id), exception)
self.assertEqual(result,1)# one thread state modifiedif verbose:print(" waiting for worker to say it caught the exception")
worker_saw_exception.wait(timeout=10)
self.assertTrue(t.finished)if verbose:print(" all OK -- joining worker")if t.finished:
t.join()# else the thread is still running, and we have no way to kill itdeftest_limbo_cleanup(self):# Issue 7481: Failure to start thread should cleanup the limbo map.deffail_new_thread(*args):raise threading.ThreadError()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda:None)
self.assertRaises(threading.ThreadError, t.start)
self.assertFalse(
t in threading._limbo,"Failed to cleanup _limbo map on failure of Thread.start().")finally:
threading._start_new_thread = _start_new_thread
deftest_finalize_runnning_thread(self):# Issue 1402: the PyGILState_Ensure / _Release functions may be called# very late on python exit: on deallocation of a running thread for# example.import_module("ctypes")
rc, out, err =assert_python_failure("-c","""if 1: import ctypes, sys, time, _thread # This lock is used as a simple event variable. ready = _thread.allocate_lock() ready.acquire() # Module globals are cleared before __del__ is run # So we save the functions in class dict class C: ensure = ctypes.pythonapi.PyGILState_Ensure release = ctypes.pythonapi.PyGILState_Release def __del__(self): state = self.ensure() self.release(state) def waitingThread(): x = C() ready.release() time.sleep(100) _thread.start_new_thread(waitingThread, ()) ready.acquire() # Be sure the other thread is waiting. sys.exit(42) """)
self.assertEqual(rc,42)deftest_finalize_with_trace(self):# Issue1733757# Avoid a deadlock when sys.settrace steps into threading._shutdownassert_python_ok("-c","""if 1: import sys, threading # A deadlock-killer, to prevent the # testsuite to hang forever def killer(): import os, time time.sleep(2) print('program blocked; aborting') os._exit(2) t = threading.Thread(target=killer) t.daemon = True t.start() # This is the trace function def func(frame, event, arg): threading.current_thread() return func sys.settrace(func) """)deftest_join_nondaemon_on_shutdown(self):# Issue 1722344# Raising SystemExit skipped threading._shutdown
rc, out, err =assert_python_ok("-c","""if 1: import threading from time import sleep def child(): sleep(1) # As a non-daemon thread we SHOULD wake up and nothing # should be torn down yet print("Woke up, sleep function is:", sleep) threading.Thread(target=child).start() raise SystemExit """)
self.assertEqual(out.strip(),
b"Woke up, sleep function is: <built-in function sleep>")
self.assertEqual(err, b"")deftest_enumerate_after_join(self):# Try hard to trigger #1703448: a thread is still returned in# threading.enumerate() after it has been join()ed.
enum = threading.enumerate
old_interval = sys.getswitchinterval()try:for i inrange(1,100):
sys.setswitchinterval(i *0.0002)
t = threading.Thread(target=lambda:None)
t.start()
t.join()
l =enum()
self.assertNotIn(t, l,"#1703448 triggered after %d trials: %s"% (i, l))finally:
sys.setswitchinterval(old_interval)deftest_no_refcycle_through_target(self):classRunSelfFunction(object):def__init__(self, should_raise):# The links in this refcycle from Thread back to self# should be cleaned up when the thread completes.
self.should_raise = should_raise
self.thread = threading.Thread(target=self._run,
args=(self,),
kwargs={'yet_another':self})
self.thread.start()def_run(self, other_ref, yet_another):if self.should_raise:raiseSystemExit
cyclic_object =RunSelfFunction(should_raise=False)
weak_cyclic_object = weakref.ref(cyclic_object)
cyclic_object.thread.join()del cyclic_object
self.assertIsNone(weak_cyclic_object(),
msg=('%d references still around'%
sys.getrefcount(weak_cyclic_object())))
raising_cyclic_object =RunSelfFunction(should_raise=True)
weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
raising_cyclic_object.thread.join()del raising_cyclic_object
self.assertIsNone(weak_raising_cyclic_object(),
msg=('%d references still around'%
sys.getrefcount(weak_raising_cyclic_object())))deftest_old_threading_api(self):# Just a quick sanity check to make sure the old method names are# still present
t = threading.Thread()
t.isDaemon()
t.setDaemon(True)
t.getName()
t.setName("name")
t.isAlive()
e = threading.Event()
e.isSet()
threading.activeCount()deftest_repr_daemon(self):
t = threading.Thread()
self.assertFalse('daemon'inrepr(t))
t.daemon =True
self.assertTrue('daemon'inrepr(t))deftest_deamon_param(self):
t = threading.Thread()
self.assertFalse(t.daemon)
t = threading.Thread(daemon=False)
self.assertFalse(t.daemon)
t = threading.Thread(daemon=True)
self.assertTrue(t.daemon)@unittest.skipUnless(hasattr(os,'fork'),'test needs fork()')deftest_dummy_thread_after_fork(self):# Issue #14308: a dummy thread in the active list doesn't mess up# the after-fork mechanism.
code ="""if 1: import _thread, threading, os, time def background_thread(evt): # Creates and registers the _DummyThread instance threading.current_thread() evt.set() time.sleep(10) evt = threading.Event() _thread.start_new_thread(background_thread, (evt,)) evt.wait() assert threading.active_count() == 2, threading.active_count() if os.fork() == 0: assert threading.active_count() == 1, threading.active_count() os._exit(0) else: os.wait() """
_, out, err =assert_python_ok("-c", code)
self.assertEqual(out, b'')
self.assertEqual(err, b'')@unittest.skipUnless(hasattr(os,'fork'),"needs os.fork()")deftest_is_alive_after_fork(self):# Try hard to trigger #18418: is_alive() could sometimes be True on# threads that vanished after a fork.
old_interval = sys.getswitchinterval()
self.addCleanup(sys.setswitchinterval, old_interval)# Make the bug more likely to manifest.
sys.setswitchinterval(1e-6)for i inrange(20):
t = threading.Thread(target=lambda:None)
t.start()
self.addCleanup(t.join)
pid = os.fork()if pid ==0:
os._exit(1if t.is_alive()else0)else:
pid, status = os.waitpid(pid,0)
self.assertEqual(0, status)deftest_main_thread(self):
main = threading.main_thread()
self.assertEqual(main.name,'MainThread')
self.assertEqual(main.ident, threading.current_thread().ident)
self.assertEqual(main.ident, threading.get_ident())deff():
self.assertNotEqual(threading.main_thread().ident,
threading.current_thread().ident)
th = threading.Thread(target=f)
th.start()
th.join()@unittest.skipUnless(hasattr(os,'fork'),"test needs os.fork()")@unittest.skipUnless(hasattr(os,'waitpid'),"test needs os.waitpid()")deftest_main_thread_after_fork(self):
code ="""if 1: import os, threading pid = os.fork() if pid == 0: main = threading.main_thread() print(main.name) print(main.ident == threading.current_thread().ident) print(main.ident == threading.get_ident()) else: os.waitpid(pid, 0) """
_, out, err =assert_python_ok("-c", code)
data = out.decode().replace('\r','')
self.assertEqual(err, b"")
self.assertEqual(data,"MainThread\nTrue\nTrue\n")@unittest.skipIf(sys.platform in platforms_to_skip,"due to known OS bug")@unittest.skipUnless(hasattr(os,'fork'),"test needs os.fork()")@unittest.skipUnless(hasattr(os,'waitpid'),"test needs os.waitpid()")deftest_main_thread_after_fork_from_nonmain_thread(self):
code ="""if 1: import os, threading, sys def f(): pid = os.fork() if pid == 0: main = threading.main_thread() print(main.name) print(main.ident == threading.current_thread().ident) print(main.ident == threading.get_ident()) # stdout is fully buffered because not a tty, # we have to flush before exit. sys.stdout.flush() else: os.waitpid(pid, 0) th = threading.Thread(target=f) th.start() th.join() """
_, out, err =assert_python_ok("-c", code)
data = out.decode().replace('\r','')
self.assertEqual(err, b"")
self.assertEqual(data,"Thread-1\nTrue\nTrue\n")deftest_tstate_lock(self):# Test an implementation detail of Thread objects.
started = _thread.allocate_lock()
finish = _thread.allocate_lock()
started.acquire()
finish.acquire()deff():
started.release()
finish.acquire()
time.sleep(0.01)# The tstate lock is None until the thread is started
t = threading.Thread(target=f)
self.assertIs(t._tstate_lock,None)
t.start()
started.acquire()
self.assertTrue(t.is_alive())# The tstate lock can't be acquired when the thread is running# (or suspended).
tstate_lock = t._tstate_lock
self.assertFalse(tstate_lock.acquire(timeout=0),False)
finish.release()# When the thread ends, the state_lock can be successfully# acquired.
self.assertTrue(tstate_lock.acquire(timeout=5),False)# But is_alive() is still True: we hold _tstate_lock now, which# prevents is_alive() from knowing the thread's end-of-life C code# is done.
self.assertTrue(t.is_alive())# Let is_alive() find out the C code is done.
tstate_lock.release()
self.assertFalse(t.is_alive())# And verify the thread disposed of _tstate_lock.
self.assertTrue(t._tstate_lock is None)deftest_repr_stopped(self):# Verify that "stopped" shows up in repr(Thread) appropriately.
started = _thread.allocate_lock()
finish = _thread.allocate_lock()
started.acquire()
finish.acquire()deff():
started.release()
finish.acquire()
t = threading.Thread(target=f)
t.start()
started.acquire()
self.assertIn("started",repr(t))
finish.release()# "stopped" should appear in the repr in a reasonable amount of time.# Implementation detail: as of this writing, that's trivially true# if .join() is called, and almost trivially true if .is_alive() is# called. The detail we're testing here is that "stopped" shows up# "all on its own".
LOOKING_FOR ="stopped"for i inrange(500):if LOOKING_FOR inrepr(t):break
time.sleep(0.01)
self.assertIn(LOOKING_FOR,repr(t))# we waited at least 5 secondsdeftest_BoundedSemaphore_limit(self):# BoundedSemaphore should raise ValueError if released too often.for limit inrange(1,10):
bs = threading.BoundedSemaphore(limit)
threads = [threading.Thread(target=bs.acquire)for _ inrange(limit)]for t in threads:
t.start()for t in threads:
t.join()
threads = [threading.Thread(target=bs.release)for _ inrange(limit)]for t in threads:
t.start()for t in threads:
t.join()
self.assertRaises(ValueError, bs.release)@cpython_onlydeftest_frame_tstate_tracing(self):# Issue #14432: Crash when a generator is created in a C thread that is# destroyed while the generator is still used. The issue was that a# generator contains a frame, and the frame kept a reference to the# Python state of the destroyed C thread. The crash occurs when a trace# function is setup.defnoop_trace(frame, event, arg):# no operationreturn noop_trace
defgenerator():while1:yield"genereator"defcallback():if callback.gen is None:
callback.gen =generator()returnnext(callback.gen)
callback.gen =None
old_trace = sys.gettrace()
sys.settrace(noop_trace)try:# Install a trace function
threading.settrace(noop_trace)# Create a generator in a C thread which exits after the callimport _testcapi
_testcapi.call_in_temporary_c_thread(callback)# Call the generator in a different Python thread, check that the# generator didn't keep a reference to the destroyed thread statefor test inrange(3):# The trace function is still called herecallback()finally:
sys.settrace(old_trace)classThreadJoinOnShutdown(BaseTestCase):def_run_and_join(self, script):
script ="""if 1: import sys, os, time, threading # a thread, which waits for the main program to terminate def joiningfunc(mainthread): mainthread.join() print('end of thread') # stdout is fully buffered because not a tty, we have to flush # before exit. sys.stdout.flush()\n"""+ script
rc, out, err =assert_python_ok("-c", script)
data = out.decode().replace('\r','')
self.assertEqual(data,"end of main\nend of thread\n")deftest_1_join_on_shutdown(self):# The usual case: on exit, wait for a non-daemon thread
script ="""if 1: import os t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) t.start() time.sleep(0.1) print('end of main') """
self._run_and_join(script)@unittest.skipUnless(hasattr(os,'fork'),"needs os.fork()")@unittest.skipIf(sys.platform in platforms_to_skip,"due to known OS bug")deftest_2_join_in_forked_process(self):# Like the test above, but from a forked interpreter
script ="""if 1: childpid = os.fork() if childpid != 0: os.waitpid(childpid, 0) sys.exit(0) t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) t.start() print('end of main') """
self._run_and_join(script)@unittest.skipUnless(hasattr(os,'fork'),"needs os.fork()")@unittest.skipIf(sys.platform in platforms_to_skip,"due to known OS bug")deftest_3_join_in_forked_from_thread(self):# Like the test above, but fork() was called from a worker thread# In the forked process, the main Thread object must be marked as stopped.
script ="""if 1: main_thread = threading.current_thread() def worker(): childpid = os.fork() if childpid != 0: os.waitpid(childpid, 0) sys.exit(0) t = threading.Thread(target=joiningfunc, args=(main_thread,)) print('end of main') t.start() t.join() # Should not block: main_thread is already stopped w = threading.Thread(target=worker) w.start() """
self._run_and_join(script)@unittest.skipIf(sys.platform in platforms_to_skip,"due to known OS bug")deftest_4_daemon_threads(self):# Check that a daemon thread cannot crash the interpreter on shutdown# by manipulating internal structures that are being disposed of in# the main thread.
script ="""if True: import os import random import sys import time import threading thread_has_run = set() def random_io(): '''Loop for a while sleeping random tiny amounts and doing some I/O.''' while True: in_f = open(os.__file__, 'rb') stuff = in_f.read(200) null_f = open(os.devnull, 'wb') null_f.write(stuff) time.sleep(random.random() / 1995) null_f.close() in_f.close() thread_has_run.add(threading.current_thread()) def main(): count = 0 for _ in range(40): new_thread = threading.Thread(target=random_io) new_thread.daemon = True new_thread.start() count += 1 while len(thread_has_run) < count: time.sleep(0.001) # Trigger process shutdown sys.exit(0) main() """
rc, out, err =assert_python_ok('-c', script)
self.assertFalse(err)@unittest.skipUnless(hasattr(os,'fork'),"needs os.fork()")@unittest.skipIf(sys.platform in platforms_to_skip,"due to known OS bug")deftest_reinit_tls_after_fork(self):# Issue #13817: fork() would deadlock in a multithreaded program with# the ad-hoc TLS implementation.defdo_fork_and_wait():# just fork a child process and wait it
pid = os.fork()if pid >0:
os.waitpid(pid,0)else:
os._exit(0)# start a bunch of threads that will fork() child processes
threads = []for i inrange(16):
t = threading.Thread(target=do_fork_and_wait)
threads.append(t)
t.start()for t in threads:
t.join()@unittest.skipUnless(hasattr(os,'fork'),"needs os.fork()")deftest_clear_threads_states_after_fork(self):# Issue #17094: check that threads states are cleared after fork()# start a bunch of threads
threads = []for i inrange(16):
t = threading.Thread(target=lambda: time.sleep(0.3))
threads.append(t)
t.start()
pid = os.fork()if pid ==0:# check that threads states have been clearediflen(sys._current_frames()) ==1:
os._exit(0)else:
os._exit(1)else:
_, status = os.waitpid(pid,0)
self.assertEqual(0, status)for t in threads:
t.join()classSubinterpThreadingTests(BaseTestCase):deftest_threads_join(self):# Non-daemon threads should be joined at subinterpreter shutdown# (issue #18808)
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
code = r"""if 1: import os import threading import time def f(): # Sleep a bit so that the thread is still running when # Py_EndInterpreter is called. time.sleep(0.05) os.write(%d, b"x") threading.Thread(target=f).start() """% (w,)
ret = test.support.run_in_subinterp(code)
self.assertEqual(ret,0)# The thread was joined properly.
self.assertEqual(os.read(r,1), b"x")deftest_threads_join_2(self):# Same as above, but a delay gets introduced after the thread's# Python code returned but before the thread state is deleted.# To achieve this, we register a thread-local object which sleeps# a bit when deallocated.
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
code = r"""if 1: import os import threading import time class Sleeper: def __del__(self): time.sleep(0.05) tls = threading.local() def f(): # Sleep a bit so that the thread is still running when # Py_EndInterpreter is called. time.sleep(0.05) tls.x = Sleeper() os.write(%d, b"x") threading.Thread(target=f).start() """% (w,)
ret = test.support.run_in_subinterp(code)
self.assertEqual(ret,0)# The thread was joined properly.
self.assertEqual(os.read(r,1), b"x")@cpython_onlydeftest_daemon_threads_fatal_error(self):
subinterp_code = r"""if 1: import os import threading import time def f(): # Make sure the daemon thread is still running when # Py_EndInterpreter is called. time.sleep(10) threading.Thread(target=f, daemon=True).start() """
script = r"""if 1: import _testcapi _testcapi.run_in_subinterp(%r) """% (subinterp_code,)
with test.support.SuppressCrashReport():
rc, out, err =assert_python_failure("-c", script)
self.assertIn("Fatal Python error: Py_EndInterpreter: ""not the last thread", err.decode())classThreadingExceptionTests(BaseTestCase):# A RuntimeError should be raised if Thread.start() is called# multiple times.deftest_start_thread_again(self):
thread = threading.Thread()
thread.start()
self.assertRaises(RuntimeError, thread.start)deftest_joining_current_thread(self):
current_thread = threading.current_thread()
self.assertRaises(RuntimeError, current_thread.join);deftest_joining_inactive_thread(self):
thread = threading.Thread()
self.assertRaises(RuntimeError, thread.join)deftest_daemonize_active_thread(self):
thread = threading.Thread()
thread.start()
self.assertRaises(RuntimeError,setattr, thread,"daemon",True)deftest_releasing_unacquired_lock(self):
lock = threading.Lock()
self.assertRaises(RuntimeError, lock.release)@unittest.skipUnless(sys.platform =='darwin'and test.support.python_is_optimized(),'test macosx problem')deftest_recursion_limit(self):# Issue 9670# test that excessive recursion within a non-main thread causes# an exception rather than crashing the interpreter on platforms# like Mac OS X or FreeBSD which have small default stack sizes# for threads
script ="""if True: import threading def recurse(): return recurse() def outer(): try: recurse() except RuntimeError: pass w = threading.Thread(target=outer) w.start() w.join() print('end of main thread') """
expected_output ="end of main thread\n"
p = subprocess.Popen([sys.executable,"-c", script],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
data = stdout.decode().replace('\r','')
self.assertEqual(p.returncode,0,"Unexpected error: "+ stderr.decode())
self.assertEqual(data, expected_output)deftest_print_exception(self):
script = r"""if True: import threading import time running = False def run(): global running running = True while running: time.sleep(0.01) 1/0 t = threading.Thread(target=run) t.start() while not running: time.sleep(0.01) running = False t.join() """
rc, out, err =assert_python_ok("-c", script)
self.assertEqual(out, b'')
err = err.decode()
self.assertIn("Exception in thread", err)
self.assertIn("Traceback (most recent call last):", err)
self.assertIn("ZeroDivisionError", err)
self.assertNotIn("Unhandled exception", err)deftest_print_exception_stderr_is_none_1(self):
script = r"""if True: import sys import threading import time running = False def run(): global running running = True while running: time.sleep(0.01) 1/0 t = threading.Thread(target=run) t.start() while not running: time.sleep(0.01) sys.stderr = None running = False t.join() """
rc, out, err =assert_python_ok("-c", script)
self.assertEqual(out, b'')
err = err.decode()
self.assertIn("Exception in thread", err)
self.assertIn("Traceback (most recent call last):", err)
self.assertIn("ZeroDivisionError", err)
self.assertNotIn("Unhandled exception", err)deftest_print_exception_stderr_is_none_2(self):
script = r"""if True: import sys import threading import time running = False def run(): global running running = True while running: time.sleep(0.01) 1/0 sys.stderr = None t = threading.Thread(target=run) t.start() while not running: time.sleep(0.01) running = False t.join() """
rc, out, err =assert_python_ok("-c", script)
self.assertEqual(out, b'')
self.assertNotIn("Unhandled exception", err.decode())classTimerTests(BaseTestCase):defsetUp(self):
BaseTestCase.setUp(self)
self.callback_args = []
self.callback_event = threading.Event()deftest_init_immutable_default_args(self):# Issue 17435: constructor defaults were mutable objects, they could be# mutated via the object attributes and affect other Timer objects.
timer1 = threading.Timer(0.01, self._callback_spy)
timer1.start()
self.callback_event.wait()
timer1.args.append("blah")
timer1.kwargs["foo"] ="bar"
self.callback_event.clear()
timer2 = threading.Timer(0.01, self._callback_spy)
timer2.start()
self.callback_event.wait()
self.assertEqual(len(self.callback_args),2)
self.assertEqual(self.callback_args, [((), {}), ((), {})])def_callback_spy(self, *args, **kwargs):
self.callback_args.append((args[:], kwargs.copy()))
self.callback_event.set()classLockTests(lock_tests.LockTests):
locktype =staticmethod(threading.Lock)classPyRLockTests(lock_tests.RLockTests):
locktype =staticmethod(threading._PyRLock)@unittest.skipIf(threading._CRLock is None,'RLock not implemented in C')classCRLockTests(lock_tests.RLockTests):
locktype =staticmethod(threading._CRLock)classEventTests(lock_tests.EventTests):
eventtype =staticmethod(threading.Event)classConditionAsRLockTests(lock_tests.RLockTests):# An Condition uses an RLock by default and exports its API.
locktype =staticmethod(threading.Condition)classConditionTests(lock_tests.ConditionTests):
condtype =staticmethod(threading.Condition)classSemaphoreTests(lock_tests.SemaphoreTests):
semtype =staticmethod(threading.Semaphore)classBoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
semtype =staticmethod(threading.BoundedSemaphore)classBarrierTests(lock_tests.BarrierTests):
barriertype =staticmethod(threading.Barrier)if __name__ =="__main__":
unittest.main()