From 7b4769937fb612d576b6829c3b834f3dd31752f1 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 7 Sep 2013 23:38:37 +0200 Subject: Issue #18808: Thread.join() now waits for the underlying thread state to be destroyed before returning. This prevents unpredictable aborts in Py_EndInterpreter() when some non-daemon threads are still running. --- Include/pystate.h | 26 ++++++++++++++ Lib/_dummy_thread.py | 4 +++ Lib/test/test_threading.py | 70 +++++++++++++++++++++++++++++++++++-- Lib/threading.py | 87 +++++++++++++++++++++++++++++----------------- Misc/NEWS | 4 +++ Modules/_threadmodule.c | 62 +++++++++++++++++++++++++++++++++ Python/pystate.c | 5 +++ 7 files changed, 224 insertions(+), 34 deletions(-) diff --git a/Include/pystate.h b/Include/pystate.h index e41fe4c..ddc6892 100644 --- a/Include/pystate.h +++ b/Include/pystate.h @@ -118,6 +118,32 @@ typedef struct _ts { int trash_delete_nesting; PyObject *trash_delete_later; + /* Called when a thread state is deleted normally, but not when it + * is destroyed after fork(). + * Pain: to prevent rare but fatal shutdown errors (issue 18808), + * Thread.join() must wait for the join'ed thread's tstate to be unlinked + * from the tstate chain. That happens at the end of a thread's life, + * in pystate.c. + * The obvious way doesn't quite work: create a lock which the tstate + * unlinking code releases, and have Thread.join() wait to acquire that + * lock. The problem is that we _are_ at the end of the thread's life: + * if the thread holds the last reference to the lock, decref'ing the + * lock will delete the lock, and that may trigger arbitrary Python code + * if there's a weakref, with a callback, to the lock. But by this time + * _PyThreadState_Current is already NULL, so only the simplest of C code + * can be allowed to run (in particular it must not be possible to + * release the GIL). + * So instead of holding the lock directly, the tstate holds a weakref to + * the lock: that's the value of on_delete_data below. Decref'ing a + * weakref is harmless. + * on_delete points to _threadmodule.c's static release_sentinel() function. + * After the tstate is unlinked, release_sentinel is called with the + * weakref-to-lock (on_delete_data) argument, and release_sentinel releases + * the indirectly held lock. + */ + void (*on_delete)(void *); + void *on_delete_data; + /* XXX signal handlers should also be here */ } PyThreadState; diff --git a/Lib/_dummy_thread.py b/Lib/_dummy_thread.py index 13b1f26..b67cfb9 100644 --- a/Lib/_dummy_thread.py +++ b/Lib/_dummy_thread.py @@ -81,6 +81,10 @@ def stack_size(size=None): raise error("setting thread stack size not supported") return 0 +def _set_sentinel(): + """Dummy implementation of _thread._set_sentinel().""" + return LockType() + class LockType(object): """Class implementing dummy implementation of _thread.LockType. diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 971a635..79b10ed 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -539,6 +539,40 @@ class ThreadTests(BaseTestCase): self.assertEqual(err, b"") self.assertEqual(data, "Thread-1\nTrue\nTrue\n") + def test_tstate_lock(self): + # Test an implementation detail of Thread objects. + started = _thread.allocate_lock() + finish = _thread.allocate_lock() + started.acquire() + finish.acquire() + def f(): + started.release() + finish.acquire() + time.sleep(0.01) + # The tstate lock is None until the thread is started + t = threading.Thread(target=f) + self.assertIs(t._tstate_lock, None) + t.start() + started.acquire() + self.assertTrue(t.is_alive()) + # The tstate lock can't be acquired when the thread is running + # (or suspended). + tstate_lock = t._tstate_lock + self.assertFalse(tstate_lock.acquire(timeout=0), False) + finish.release() + # When the thread ends, the state_lock can be successfully + # acquired. + self.assertTrue(tstate_lock.acquire(timeout=5), False) + # But is_alive() is still True: we hold _tstate_lock now, which + # prevents is_alive() from knowing the thread's end-of-life C code + # is done. + self.assertTrue(t.is_alive()) + # Let is_alive() find out the C code is done. + tstate_lock.release() + self.assertFalse(t.is_alive()) + # And verify the thread disposed of _tstate_lock. + self.assertTrue(t._tstate_lock is None) + class ThreadJoinOnShutdown(BaseTestCase): @@ -669,7 +703,7 @@ class ThreadJoinOnShutdown(BaseTestCase): # someone else tries to fix this test case by acquiring this lock # before forking instead of resetting it, the test case will # deadlock when it shouldn't. - condition = w._block + condition = w._stopped._cond orig_acquire = condition.acquire call_count_lock = threading.Lock() call_count = 0 @@ -733,7 +767,7 @@ class ThreadJoinOnShutdown(BaseTestCase): # causes the worker to fork. At this point, the problematic waiter # lock has been acquired once by the waiter and has been put onto # the waiters list. - condition = w._block + condition = w._stopped._cond orig_release_save = condition._release_save def my_release_save(): global start_fork @@ -867,6 +901,38 @@ class SubinterpThreadingTests(BaseTestCase): # The thread was joined properly. self.assertEqual(os.read(r, 1), b"x") + def test_threads_join_2(self): + # Same as above, but a delay gets introduced after the thread's + # Python code returned but before the thread state is deleted. + # To achieve this, we register a thread-local object which sleeps + # a bit when deallocated. + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + code = r"""if 1: + import os + import threading + import time + + class Sleeper: + def __del__(self): + time.sleep(0.05) + + tls = threading.local() + + def f(): + # Sleep a bit so that the thread is still running when + # Py_EndInterpreter is called. + time.sleep(0.05) + tls.x = Sleeper() + os.write(%d, b"x") + threading.Thread(target=f).start() + """ % (w,) + ret = _testcapi.run_in_subinterp(code) + self.assertEqual(ret, 0) + # The thread was joined properly. + self.assertEqual(os.read(r, 1), b"x") + def test_daemon_threads_fatal_error(self): subinterp_code = r"""if 1: import os diff --git a/Lib/threading.py b/Lib/threading.py index b6d19d5..d49be0b 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -33,6 +33,7 @@ __all__ = ['active_count', 'Condition', 'current_thread', 'enumerate', 'Event', # Rename some stuff so "from threading import *" is safe _start_new_thread = _thread.start_new_thread _allocate_lock = _thread.allocate_lock +_set_sentinel = _thread._set_sentinel get_ident = _thread.get_ident ThreadError = _thread.error try: @@ -548,28 +549,33 @@ class Thread: else: self._daemonic = current_thread().daemon self._ident = None + self._tstate_lock = None self._started = Event() - self._stopped = False - self._block = Condition(Lock()) + self._stopped = Event() self._initialized = True # sys.stderr is not stored in the class like # sys.exc_info since it can be changed between instances self._stderr = _sys.stderr _dangling.add(self) - def _reset_internal_locks(self): + def _reset_internal_locks(self, is_alive): # private! Called by _after_fork() to reset our internal locks as # they may be in an invalid state leading to a deadlock or crash. - if hasattr(self, '_block'): # DummyThread deletes _block - self._block.__init__() self._started._reset_internal_locks() + self._stopped._reset_internal_locks() + if is_alive: + self._set_tstate_lock() + else: + # The thread isn't alive after fork: it doesn't have a tstate + # anymore. + self._tstate_lock = None def __repr__(self): assert self._initialized, "Thread.__init__() was not called" status = "initial" if self._started.is_set(): status = "started" - if self._stopped: + if self._stopped.is_set(): status = "stopped" if self._daemonic: status += " daemon" @@ -625,9 +631,18 @@ class Thread: def _set_ident(self): self._ident = get_ident() + def _set_tstate_lock(self): + """ + Set a lock object which will be released by the interpreter when + the underlying thread state (see pystate.h) gets deleted. + """ + self._tstate_lock = _set_sentinel() + self._tstate_lock.acquire() + def _bootstrap_inner(self): try: self._set_ident() + self._set_tstate_lock() self._started.set() with _active_limbo_lock: _active[self._ident] = self @@ -691,10 +706,7 @@ class Thread: pass def _stop(self): - self._block.acquire() - self._stopped = True - self._block.notify_all() - self._block.release() + self._stopped.set() def _delete(self): "Remove current thread from the dict of currently running threads." @@ -738,21 +750,29 @@ class Thread: raise RuntimeError("cannot join thread before it is started") if self is current_thread(): raise RuntimeError("cannot join current thread") - - self._block.acquire() - try: - if timeout is None: - while not self._stopped: - self._block.wait() - else: - deadline = _time() + timeout - while not self._stopped: - delay = deadline - _time() - if delay <= 0: - break - self._block.wait(delay) - finally: - self._block.release() + if not self.is_alive(): + return + self._stopped.wait(timeout) + if self._stopped.is_set(): + self._wait_for_tstate_lock(timeout is None) + + def _wait_for_tstate_lock(self, block): + # Issue #18808: wait for the thread state to be gone. + # When self._stopped is set, the Python part of the thread is done, + # but the thread's tstate has not yet been destroyed. The C code + # releases self._tstate_lock when the C part of the thread is done + # (the code at the end of the thread's life to remove all knowledge + # of the thread from the C data structures). + # This method waits to acquire _tstate_lock if `block` is True, or + # sees whether it can be acquired immediately if `block` is False. + # If it does acquire the lock, the C code is done, and _tstate_lock + # is set to None. + lock = self._tstate_lock + if lock is None: + return # already determined that the C code is done + if lock.acquire(block): + lock.release() + self._tstate_lock = None @property def name(self): @@ -771,7 +791,14 @@ class Thread: def is_alive(self): assert self._initialized, "Thread.__init__() not called" - return self._started.is_set() and not self._stopped + if not self._started.is_set(): + return False + if not self._stopped.is_set(): + return True + # The Python part of the thread is done, but the C part may still be + # waiting to run. + self._wait_for_tstate_lock(False) + return self._tstate_lock is not None isAlive = is_alive @@ -854,11 +881,6 @@ class _DummyThread(Thread): def __init__(self): Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) - # Thread._block consumes an OS-level locking primitive, which - # can never be used by a _DummyThread. Since a _DummyThread - # instance is immortal, that's bad, so release this resource. - del self._block - self._started.set() self._set_ident() with _active_limbo_lock: @@ -952,15 +974,16 @@ def _after_fork(): for thread in _enumerate(): # Any lock/condition variable may be currently locked or in an # invalid state, so we reinitialize them. - thread._reset_internal_locks() if thread is current: # There is only one active thread. We reset the ident to # its new value since it can have changed. + thread._reset_internal_locks(True) ident = get_ident() thread._ident = ident new_active[ident] = thread else: # All the others are already stopped. + thread._reset_internal_locks(False) thread._stop() _limbo.clear() diff --git a/Misc/NEWS b/Misc/NEWS index 757ac89..d375a3a 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -56,6 +56,10 @@ Core and Builtins Library ------- +- Issue #18808: Thread.join() now waits for the underlying thread state to + be destroyed before returning. This prevents unpredictable aborts in + Py_EndInterpreter() when some non-daemon threads are still running. + - Issue #18458: Prevent crashes with newer versions of libedit. Its readline emulation has changed from 0-based indexing to 1-based like gnu readline. diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index cbb2901..d83d117 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -1172,6 +1172,66 @@ yet finished.\n\ This function is meant for internal and specialized purposes only.\n\ In most applications `threading.enumerate()` should be used instead."); +static void +release_sentinel(void *wr) +{ + /* Tricky: this function is called when the current thread state + is being deleted. Therefore, only simple C code can safely + execute here. */ + PyObject *obj = PyWeakref_GET_OBJECT(wr); + lockobject *lock; + if (obj != Py_None) { + assert(Py_TYPE(obj) == &Locktype); + lock = (lockobject *) obj; + if (lock->locked) { + PyThread_release_lock(lock->lock_lock); + lock->locked = 0; + } + } + /* Deallocating a weakref with a NULL callback only calls + PyObject_GC_Del(), which can't call any Python code. */ + Py_DECREF(wr); +} + +static PyObject * +thread__set_sentinel(PyObject *self) +{ + PyObject *wr; + PyThreadState *tstate = PyThreadState_Get(); + lockobject *lock; + + if (tstate->on_delete_data != NULL) { + /* We must support the re-creation of the lock from a + fork()ed child. */ + assert(tstate->on_delete == &release_sentinel); + wr = (PyObject *) tstate->on_delete_data; + tstate->on_delete = NULL; + tstate->on_delete_data = NULL; + Py_DECREF(wr); + } + lock = newlockobject(); + if (lock == NULL) + return NULL; + /* The lock is owned by whoever called _set_sentinel(), but the weakref + hangs to the thread state. */ + wr = PyWeakref_NewRef((PyObject *) lock, NULL); + if (wr == NULL) { + Py_DECREF(lock); + return NULL; + } + tstate->on_delete_data = (void *) wr; + tstate->on_delete = &release_sentinel; + return (PyObject *) lock; +} + +PyDoc_STRVAR(_set_sentinel_doc, +"_set_sentinel() -> lock\n\ +\n\ +Set a sentinel lock that will be released when the current thread\n\ +state is finalized (after it is untied from the interpreter).\n\ +\n\ +This is a private API for the threading module."); + static PyObject * thread_stack_size(PyObject *self, PyObject *args) { @@ -1247,6 +1307,8 @@ static PyMethodDef thread_methods[] = { METH_NOARGS, _count_doc}, {"stack_size", (PyCFunction)thread_stack_size, METH_VARARGS, stack_size_doc}, + {"_set_sentinel", (PyCFunction)thread__set_sentinel, + METH_NOARGS, _set_sentinel_doc}, {NULL, NULL} /* sentinel */ }; diff --git a/Python/pystate.c b/Python/pystate.c index 924b6a2..ecd00ce 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -208,6 +208,8 @@ new_threadstate(PyInterpreterState *interp, int init) tstate->trash_delete_nesting = 0; tstate->trash_delete_later = NULL; + tstate->on_delete = NULL; + tstate->on_delete_data = NULL; if (init) _PyThreadState_Init(tstate); @@ -390,6 +392,9 @@ tstate_delete_common(PyThreadState *tstate) if (tstate->next) tstate->next->prev = tstate->prev; HEAD_UNLOCK(); + if (tstate->on_delete != NULL) { + tstate->on_delete(tstate->on_delete_data); + } PyMem_RawFree(tstate); } -- cgit v0.12