summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/test/test_threading.py138
-rw-r--r--Lib/threading.py59
2 files changed, 30 insertions, 167 deletions
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 58b0b4e..75ae247 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -647,144 +647,8 @@ class ThreadJoinOnShutdown(BaseTestCase):
"""
self._run_and_join(script)
- def assertScriptHasOutput(self, script, expected_output):
- rc, out, err = assert_python_ok("-c", script)
- data = out.decode().replace('\r', '')
- self.assertEqual(data, expected_output)
-
- @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
- @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_4_joining_across_fork_in_worker_thread(self):
- # There used to be a possible deadlock when forking from a child
- # thread. See http://bugs.python.org/issue6643.
-
- # The script takes the following steps:
- # - The main thread in the parent process starts a new thread and then
- # tries to join it.
- # - The join operation acquires the Lock inside the thread's _block
- # Condition. (See threading.py:Thread.join().)
- # - We stub out the acquire method on the condition to force it to wait
- # until the child thread forks. (See LOCK ACQUIRED HERE)
- # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS
- # HERE)
- # - The main thread of the parent process enters Condition.wait(),
- # which releases the lock on the child thread.
- # - The child process returns. Without the necessary fix, when the
- # main thread of the child process (which used to be the child thread
- # in the parent process) attempts to exit, it will try to acquire the
- # lock in the Thread._block Condition object and hang, because the
- # lock was held across the fork.
-
- script = """if 1:
- import os, time, threading
-
- finish_join = False
- start_fork = False
-
- def worker():
- # Wait until this thread's lock is acquired before forking to
- # create the deadlock.
- global finish_join
- while not start_fork:
- time.sleep(0.01)
- # LOCK HELD: Main thread holds lock across this call.
- childpid = os.fork()
- finish_join = True
- if childpid != 0:
- # Parent process just waits for child.
- os.waitpid(childpid, 0)
- # Child process should just return.
-
- w = threading.Thread(target=worker)
-
- # Stub out the private condition variable's lock acquire method.
- # This acquires the lock and then waits until the child has forked
- # before returning, which will release the lock soon after. If
- # someone else tries to fix this test case by acquiring this lock
- # before forking instead of resetting it, the test case will
- # deadlock when it shouldn't.
- condition = w._stopped._cond
- orig_acquire = condition.acquire
- call_count_lock = threading.Lock()
- call_count = 0
- def my_acquire():
- global call_count
- global start_fork
- orig_acquire() # LOCK ACQUIRED HERE
- start_fork = True
- if call_count == 0:
- while not finish_join:
- time.sleep(0.01) # WORKER THREAD FORKS HERE
- with call_count_lock:
- call_count += 1
- condition.acquire = my_acquire
-
- w.start()
- w.join()
- print('end of main')
- """
- self.assertScriptHasOutput(script, "end of main\n")
-
- @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
- @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_5_clear_waiter_locks_to_avoid_crash(self):
- # Check that a spawned thread that forks doesn't segfault on certain
- # platforms, namely OS X. This used to happen if there was a waiter
- # lock in the thread's condition variable's waiters list. Even though
- # we know the lock will be held across the fork, it is not safe to
- # release locks held across forks on all platforms, so releasing the
- # waiter lock caused a segfault on OS X. Furthermore, since locks on
- # OS X are (as of this writing) implemented with a mutex + condition
- # variable instead of a semaphore, while we know that the Python-level
- # lock will be acquired, we can't know if the internal mutex will be
- # acquired at the time of the fork.
-
- script = """if True:
- import os, time, threading
-
- start_fork = False
-
- def worker():
- # Wait until the main thread has attempted to join this thread
- # before continuing.
- while not start_fork:
- time.sleep(0.01)
- childpid = os.fork()
- if childpid != 0:
- # Parent process just waits for child.
- (cpid, rc) = os.waitpid(childpid, 0)
- assert cpid == childpid
- assert rc == 0
- print('end of worker thread')
- else:
- # Child process should just return.
- pass
-
- w = threading.Thread(target=worker)
-
- # Stub out the private condition variable's _release_save method.
- # This releases the condition's lock and flips the global that
- # causes the worker to fork. At this point, the problematic waiter
- # lock has been acquired once by the waiter and has been put onto
- # the waiters list.
- condition = w._stopped._cond
- orig_release_save = condition._release_save
- def my_release_save():
- global start_fork
- orig_release_save()
- # Waiter lock held here, condition lock released.
- start_fork = True
- condition._release_save = my_release_save
-
- w.start()
- w.join()
- print('end of main thread')
- """
- output = "end of worker thread\nend of main thread\n"
- self.assertScriptHasOutput(script, output)
-
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
- def test_6_daemon_threads(self):
+ def test_4_daemon_threads(self):
# Check that a daemon thread cannot crash the interpreter on shutdown
# by manipulating internal structures that are being disposed of in
# the main thread.
diff --git a/Lib/threading.py b/Lib/threading.py
index b4b73a8..1921ee3 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -549,7 +549,7 @@ class Thread:
self._ident = None
self._tstate_lock = None
self._started = Event()
- self._stopped = Event()
+ self._is_stopped = False
self._initialized = True
# sys.stderr is not stored in the class like
# sys.exc_info since it can be changed between instances
@@ -561,7 +561,6 @@ class Thread:
# private! Called by _after_fork() to reset our internal locks as
# they may be in an invalid state leading to a deadlock or crash.
self._started._reset_internal_locks()
- self._stopped._reset_internal_locks()
if is_alive:
self._set_tstate_lock()
else:
@@ -574,7 +573,7 @@ class Thread:
status = "initial"
if self._started.is_set():
status = "started"
- if self._stopped.is_set():
+ if self._is_stopped:
status = "stopped"
if self._daemonic:
status += " daemon"
@@ -696,7 +695,6 @@ class Thread:
pass
finally:
with _active_limbo_lock:
- self._stop()
try:
# We don't call self._delete() because it also
# grabs _active_limbo_lock.
@@ -705,7 +703,8 @@ class Thread:
pass
def _stop(self):
- self._stopped.set()
+ self._is_stopped = True
+ self._tstate_lock = None
def _delete(self):
"Remove current thread from the dict of currently running threads."
@@ -749,29 +748,24 @@ class Thread:
raise RuntimeError("cannot join thread before it is started")
if self is current_thread():
raise RuntimeError("cannot join current thread")
- if not self.is_alive():
- return
- self._stopped.wait(timeout)
- if self._stopped.is_set():
- self._wait_for_tstate_lock(timeout is None)
+ if timeout is None:
+ self._wait_for_tstate_lock()
+ else:
+ self._wait_for_tstate_lock(timeout=timeout)
- def _wait_for_tstate_lock(self, block):
+ def _wait_for_tstate_lock(self, block=True, timeout=-1):
# Issue #18808: wait for the thread state to be gone.
- # When self._stopped is set, the Python part of the thread is done,
- # but the thread's tstate has not yet been destroyed. The C code
- # releases self._tstate_lock when the C part of the thread is done
- # (the code at the end of the thread's life to remove all knowledge
- # of the thread from the C data structures).
- # This method waits to acquire _tstate_lock if `block` is True, or
- # sees whether it can be acquired immediately if `block` is False.
- # If it does acquire the lock, the C code is done, and _tstate_lock
- # is set to None.
+ # At the end of the thread's life, after all knowledge of the thread
+ # is removed from C data structures, C code releases our _tstate_lock.
+ # This method passes its arguments to _tstate_lock.aquire().
+ # If the lock is acquired, the C code is done, and self._stop() is
+ # called. That sets ._is_stopped to True, and ._tstate_lock to None.
lock = self._tstate_lock
- if lock is None:
- return # already determined that the C code is done
- if lock.acquire(block):
+ if lock is None: # already determined that the C code is done
+ assert self._is_stopped
+ elif lock.acquire(block, timeout):
lock.release()
- self._tstate_lock = None
+ self._stop()
@property
def name(self):
@@ -790,14 +784,10 @@ class Thread:
def is_alive(self):
assert self._initialized, "Thread.__init__() not called"
- if not self._started.is_set():
+ if self._is_stopped or not self._started.is_set():
return False
- if not self._stopped.is_set():
- return True
- # The Python part of the thread is done, but the C part may still be
- # waiting to run.
self._wait_for_tstate_lock(False)
- return self._tstate_lock is not None
+ return not self._is_stopped
isAlive = is_alive
@@ -861,6 +851,7 @@ class _MainThread(Thread):
def __init__(self):
Thread.__init__(self, name="MainThread", daemon=False)
+ self._set_tstate_lock()
self._started.set()
self._set_ident()
with _active_limbo_lock:
@@ -925,6 +916,14 @@ from _thread import stack_size
_main_thread = _MainThread()
def _shutdown():
+ # Obscure: other threads may be waiting to join _main_thread. That's
+ # dubious, but some code does it. We can't wait for C code to release
+ # the main thread's tstate_lock - that won't happen until the interpreter
+ # is nearly dead. So we release it here. Note that just calling _stop()
+ # isn't enough: other threads may already be waiting on _tstate_lock.
+ assert _main_thread._tstate_lock is not None
+ assert _main_thread._tstate_lock.locked()
+ _main_thread._tstate_lock.release()
_main_thread._stop()
t = _pickSomeNonDaemonThread()
while t: