diff options
-rw-r--r-- | Lib/test/test_threading.py | 138 | ||||
-rw-r--r-- | Lib/threading.py | 59 |
2 files changed, 30 insertions, 167 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 58b0b4e..75ae247 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -647,144 +647,8 @@ class ThreadJoinOnShutdown(BaseTestCase): """ self._run_and_join(script) - def assertScriptHasOutput(self, script, expected_output): - rc, out, err = assert_python_ok("-c", script) - data = out.decode().replace('\r', '') - self.assertEqual(data, expected_output) - - @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") - @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - def test_4_joining_across_fork_in_worker_thread(self): - # There used to be a possible deadlock when forking from a child - # thread. See http://bugs.python.org/issue6643. - - # The script takes the following steps: - # - The main thread in the parent process starts a new thread and then - # tries to join it. - # - The join operation acquires the Lock inside the thread's _block - # Condition. (See threading.py:Thread.join().) - # - We stub out the acquire method on the condition to force it to wait - # until the child thread forks. (See LOCK ACQUIRED HERE) - # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS - # HERE) - # - The main thread of the parent process enters Condition.wait(), - # which releases the lock on the child thread. - # - The child process returns. Without the necessary fix, when the - # main thread of the child process (which used to be the child thread - # in the parent process) attempts to exit, it will try to acquire the - # lock in the Thread._block Condition object and hang, because the - # lock was held across the fork. - - script = """if 1: - import os, time, threading - - finish_join = False - start_fork = False - - def worker(): - # Wait until this thread's lock is acquired before forking to - # create the deadlock. - global finish_join - while not start_fork: - time.sleep(0.01) - # LOCK HELD: Main thread holds lock across this call. - childpid = os.fork() - finish_join = True - if childpid != 0: - # Parent process just waits for child. - os.waitpid(childpid, 0) - # Child process should just return. - - w = threading.Thread(target=worker) - - # Stub out the private condition variable's lock acquire method. - # This acquires the lock and then waits until the child has forked - # before returning, which will release the lock soon after. If - # someone else tries to fix this test case by acquiring this lock - # before forking instead of resetting it, the test case will - # deadlock when it shouldn't. - condition = w._stopped._cond - orig_acquire = condition.acquire - call_count_lock = threading.Lock() - call_count = 0 - def my_acquire(): - global call_count - global start_fork - orig_acquire() # LOCK ACQUIRED HERE - start_fork = True - if call_count == 0: - while not finish_join: - time.sleep(0.01) # WORKER THREAD FORKS HERE - with call_count_lock: - call_count += 1 - condition.acquire = my_acquire - - w.start() - w.join() - print('end of main') - """ - self.assertScriptHasOutput(script, "end of main\n") - - @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") - @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - def test_5_clear_waiter_locks_to_avoid_crash(self): - # Check that a spawned thread that forks doesn't segfault on certain - # platforms, namely OS X. This used to happen if there was a waiter - # lock in the thread's condition variable's waiters list. Even though - # we know the lock will be held across the fork, it is not safe to - # release locks held across forks on all platforms, so releasing the - # waiter lock caused a segfault on OS X. Furthermore, since locks on - # OS X are (as of this writing) implemented with a mutex + condition - # variable instead of a semaphore, while we know that the Python-level - # lock will be acquired, we can't know if the internal mutex will be - # acquired at the time of the fork. - - script = """if True: - import os, time, threading - - start_fork = False - - def worker(): - # Wait until the main thread has attempted to join this thread - # before continuing. - while not start_fork: - time.sleep(0.01) - childpid = os.fork() - if childpid != 0: - # Parent process just waits for child. - (cpid, rc) = os.waitpid(childpid, 0) - assert cpid == childpid - assert rc == 0 - print('end of worker thread') - else: - # Child process should just return. - pass - - w = threading.Thread(target=worker) - - # Stub out the private condition variable's _release_save method. - # This releases the condition's lock and flips the global that - # causes the worker to fork. At this point, the problematic waiter - # lock has been acquired once by the waiter and has been put onto - # the waiters list. - condition = w._stopped._cond - orig_release_save = condition._release_save - def my_release_save(): - global start_fork - orig_release_save() - # Waiter lock held here, condition lock released. - start_fork = True - condition._release_save = my_release_save - - w.start() - w.join() - print('end of main thread') - """ - output = "end of worker thread\nend of main thread\n" - self.assertScriptHasOutput(script, output) - @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") - def test_6_daemon_threads(self): + def test_4_daemon_threads(self): # Check that a daemon thread cannot crash the interpreter on shutdown # by manipulating internal structures that are being disposed of in # the main thread. diff --git a/Lib/threading.py b/Lib/threading.py index b4b73a8..1921ee3 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -549,7 +549,7 @@ class Thread: self._ident = None self._tstate_lock = None self._started = Event() - self._stopped = Event() + self._is_stopped = False self._initialized = True # sys.stderr is not stored in the class like # sys.exc_info since it can be changed between instances @@ -561,7 +561,6 @@ class Thread: # private! Called by _after_fork() to reset our internal locks as # they may be in an invalid state leading to a deadlock or crash. self._started._reset_internal_locks() - self._stopped._reset_internal_locks() if is_alive: self._set_tstate_lock() else: @@ -574,7 +573,7 @@ class Thread: status = "initial" if self._started.is_set(): status = "started" - if self._stopped.is_set(): + if self._is_stopped: status = "stopped" if self._daemonic: status += " daemon" @@ -696,7 +695,6 @@ class Thread: pass finally: with _active_limbo_lock: - self._stop() try: # We don't call self._delete() because it also # grabs _active_limbo_lock. @@ -705,7 +703,8 @@ class Thread: pass def _stop(self): - self._stopped.set() + self._is_stopped = True + self._tstate_lock = None def _delete(self): "Remove current thread from the dict of currently running threads." @@ -749,29 +748,24 @@ class Thread: raise RuntimeError("cannot join thread before it is started") if self is current_thread(): raise RuntimeError("cannot join current thread") - if not self.is_alive(): - return - self._stopped.wait(timeout) - if self._stopped.is_set(): - self._wait_for_tstate_lock(timeout is None) + if timeout is None: + self._wait_for_tstate_lock() + else: + self._wait_for_tstate_lock(timeout=timeout) - def _wait_for_tstate_lock(self, block): + def _wait_for_tstate_lock(self, block=True, timeout=-1): # Issue #18808: wait for the thread state to be gone. - # When self._stopped is set, the Python part of the thread is done, - # but the thread's tstate has not yet been destroyed. The C code - # releases self._tstate_lock when the C part of the thread is done - # (the code at the end of the thread's life to remove all knowledge - # of the thread from the C data structures). - # This method waits to acquire _tstate_lock if `block` is True, or - # sees whether it can be acquired immediately if `block` is False. - # If it does acquire the lock, the C code is done, and _tstate_lock - # is set to None. + # At the end of the thread's life, after all knowledge of the thread + # is removed from C data structures, C code releases our _tstate_lock. + # This method passes its arguments to _tstate_lock.aquire(). + # If the lock is acquired, the C code is done, and self._stop() is + # called. That sets ._is_stopped to True, and ._tstate_lock to None. lock = self._tstate_lock - if lock is None: - return # already determined that the C code is done - if lock.acquire(block): + if lock is None: # already determined that the C code is done + assert self._is_stopped + elif lock.acquire(block, timeout): lock.release() - self._tstate_lock = None + self._stop() @property def name(self): @@ -790,14 +784,10 @@ class Thread: def is_alive(self): assert self._initialized, "Thread.__init__() not called" - if not self._started.is_set(): + if self._is_stopped or not self._started.is_set(): return False - if not self._stopped.is_set(): - return True - # The Python part of the thread is done, but the C part may still be - # waiting to run. self._wait_for_tstate_lock(False) - return self._tstate_lock is not None + return not self._is_stopped isAlive = is_alive @@ -861,6 +851,7 @@ class _MainThread(Thread): def __init__(self): Thread.__init__(self, name="MainThread", daemon=False) + self._set_tstate_lock() self._started.set() self._set_ident() with _active_limbo_lock: @@ -925,6 +916,14 @@ from _thread import stack_size _main_thread = _MainThread() def _shutdown(): + # Obscure: other threads may be waiting to join _main_thread. That's + # dubious, but some code does it. We can't wait for C code to release + # the main thread's tstate_lock - that won't happen until the interpreter + # is nearly dead. So we release it here. Note that just calling _stop() + # isn't enough: other threads may already be waiting on _tstate_lock. + assert _main_thread._tstate_lock is not None + assert _main_thread._tstate_lock.locked() + _main_thread._tstate_lock.release() _main_thread._stop() t = _pickSomeNonDaemonThread() while t: |