From 87255be6964979b5abdc4b9dcf81cdcfdad6e753 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Tue, 7 Apr 2020 23:11:49 +0200 Subject: bpo-40089: Add _at_fork_reinit() method to locks (GH-19195) Add a private _at_fork_reinit() method to _thread.Lock, _thread.RLock, threading.RLock and threading.Condition classes: reinitialize the lock after fork in the child process; reset the lock to the unlocked state. Rename also the private _reset_internal_locks() method of threading.Event to _at_fork_reinit(). * Add _PyThread_at_fork_reinit() private function. It is excluded from the limited C API. * threading.Thread._reset_internal_locks() now calls _at_fork_reinit() on self._tstate_lock rather than creating a new Python lock object. --- Include/pythread.h | 9 +++ Lib/test/lock_tests.py | 30 +++++++++- Lib/threading.py | 20 +++++-- .../2020-03-27-16-54-29.bpo-40089.VTq_8s.rst | 6 ++ Modules/_threadmodule.c | 67 +++++++++++++++++----- Modules/posixmodule.c | 3 +- Python/thread_pthread.h | 20 +++++++ 7 files changed, 133 insertions(+), 22 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2020-03-27-16-54-29.bpo-40089.VTq_8s.rst diff --git a/Include/pythread.h b/Include/pythread.h index 1cf83b7..bb9d864 100644 --- a/Include/pythread.h +++ b/Include/pythread.h @@ -36,6 +36,15 @@ PyAPI_FUNC(int) PyThread_acquire_lock(PyThread_type_lock, int); #define WAIT_LOCK 1 #define NOWAIT_LOCK 0 +#ifndef Py_LIMITED_API +#ifdef HAVE_FORK +/* Private function to reinitialize a lock at fork in the child process. + Reset the lock to the unlocked state. + Return 0 on success, return -1 on error. */ +PyAPI_FUNC(int) _PyThread_at_fork_reinit(PyThread_type_lock *lock); +#endif /* HAVE_FORK */ +#endif /* !Py_LIMITED_API */ + /* PY_TIMEOUT_T is the integral type used to specify timeouts when waiting on a lock (see PyThread_acquire_lock_timed() below). PY_TIMEOUT_MAX is the highest usable value (in microseconds) of that diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py index cd1155d..b397525 100644 --- a/Lib/test/lock_tests.py +++ b/Lib/test/lock_tests.py @@ -2,6 +2,7 @@ Various tests for synchronization primitives. """ +import os import sys import time from _thread import start_new_thread, TIMEOUT_MAX @@ -12,6 +13,11 @@ import weakref from test import support +requires_fork = unittest.skipUnless(hasattr(os, 'fork'), + "platform doesn't support fork " + "(no _at_fork_reinit method)") + + def _wait(): # A crude wait/yield function not relying on synchronization primitives. time.sleep(0.01) @@ -265,6 +271,25 @@ class LockTests(BaseLockTests): self.assertFalse(lock.locked()) self.assertTrue(lock.acquire(blocking=False)) + @requires_fork + def test_at_fork_reinit(self): + def use_lock(lock): + # make sure that the lock still works normally + # after _at_fork_reinit() + lock.acquire() + lock.release() + + # unlocked + lock = self.locktype() + lock._at_fork_reinit() + use_lock(lock) + + # locked: _at_fork_reinit() resets the lock to the unlocked state + lock2 = self.locktype() + lock2.acquire() + lock2._at_fork_reinit() + use_lock(lock2) + class RLockTests(BaseLockTests): """ @@ -417,12 +442,13 @@ class EventTests(BaseTestCase): b.wait_for_finished() self.assertEqual(results, [True] * N) - def test_reset_internal_locks(self): + @requires_fork + def test_at_fork_reinit(self): # ensure that condition is still using a Lock after reset evt = self.eventtype() with evt._cond: self.assertFalse(evt._cond.acquire(False)) - evt._reset_internal_locks() + evt._at_fork_reinit() with evt._cond: self.assertFalse(evt._cond.acquire(False)) diff --git a/Lib/threading.py b/Lib/threading.py index 6b25e7a..5424db3 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -123,6 +123,11 @@ class _RLock: hex(id(self)) ) + def _at_fork_reinit(self): + self._block._at_fork_reinit() + self._owner = None + self._count = 0 + def acquire(self, blocking=True, timeout=-1): """Acquire a lock, blocking or non-blocking. @@ -245,6 +250,10 @@ class Condition: pass self._waiters = _deque() + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + self._waiters.clear() + def __enter__(self): return self._lock.__enter__() @@ -514,9 +523,9 @@ class Event: self._cond = Condition(Lock()) self._flag = False - def _reset_internal_locks(self): - # private! called by Thread._reset_internal_locks by _after_fork() - self._cond.__init__(Lock()) + def _at_fork_reinit(self): + # Private method called by Thread._reset_internal_locks() + self._cond._at_fork_reinit() def is_set(self): """Return true if and only if the internal flag is true.""" @@ -816,9 +825,10 @@ class Thread: def _reset_internal_locks(self, is_alive): # private! Called by _after_fork() to reset our internal locks as # they may be in an invalid state leading to a deadlock or crash. - self._started._reset_internal_locks() + self._started._at_fork_reinit() if is_alive: - self._set_tstate_lock() + self._tstate_lock._at_fork_reinit() + self._tstate_lock.acquire() else: # The thread isn't alive after fork: it doesn't have a tstate # anymore. diff --git a/Misc/NEWS.d/next/Library/2020-03-27-16-54-29.bpo-40089.VTq_8s.rst b/Misc/NEWS.d/next/Library/2020-03-27-16-54-29.bpo-40089.VTq_8s.rst new file mode 100644 index 0000000..3948852 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-03-27-16-54-29.bpo-40089.VTq_8s.rst @@ -0,0 +1,6 @@ +Add a private ``_at_fork_reinit()`` method to :class:`_thread.Lock`, +:class:`_thread.RLock`, :class:`threading.RLock` and +:class:`threading.Condition` classes: reinitialize the lock at fork in the +child process, reset the lock to the unlocked state. +Rename also the private ``_reset_internal_locks()`` method of +:class:`threading.Event` to ``_at_fork_reinit()``. diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index bd8a40f..addef3e 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -213,6 +213,22 @@ lock_repr(lockobject *self) self->locked ? "locked" : "unlocked", Py_TYPE(self)->tp_name, self); } +#ifdef HAVE_FORK +static PyObject * +lock__at_fork_reinit(lockobject *self, PyObject *Py_UNUSED(args)) +{ + if (_PyThread_at_fork_reinit(&self->lock_lock) < 0) { + PyErr_SetString(ThreadError, "failed to reinitialize lock at fork"); + return NULL; + } + + self->locked = 0; + + Py_RETURN_NONE; +} +#endif /* HAVE_FORK */ + + static PyMethodDef lock_methods[] = { {"acquire_lock", (PyCFunction)(void(*)(void))lock_PyThread_acquire_lock, METH_VARARGS | METH_KEYWORDS, acquire_doc}, @@ -230,6 +246,10 @@ static PyMethodDef lock_methods[] = { METH_VARARGS | METH_KEYWORDS, acquire_doc}, {"__exit__", (PyCFunction)lock_PyThread_release_lock, METH_VARARGS, release_doc}, +#ifdef HAVE_FORK + {"_at_fork_reinit", (PyCFunction)lock__at_fork_reinit, + METH_NOARGS, NULL}, +#endif {NULL, NULL} /* sentinel */ }; @@ -446,22 +466,20 @@ For internal use by `threading.Condition`."); static PyObject * rlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds) { - rlockobject *self; - - self = (rlockobject *) type->tp_alloc(type, 0); - if (self != NULL) { - self->in_weakreflist = NULL; - self->rlock_owner = 0; - self->rlock_count = 0; - - self->rlock_lock = PyThread_allocate_lock(); - if (self->rlock_lock == NULL) { - Py_DECREF(self); - PyErr_SetString(ThreadError, "can't allocate lock"); - return NULL; - } + rlockobject *self = (rlockobject *) type->tp_alloc(type, 0); + if (self == NULL) { + return NULL; } + self->in_weakreflist = NULL; + self->rlock_owner = 0; + self->rlock_count = 0; + self->rlock_lock = PyThread_allocate_lock(); + if (self->rlock_lock == NULL) { + Py_DECREF(self); + PyErr_SetString(ThreadError, "can't allocate lock"); + return NULL; + } return (PyObject *) self; } @@ -475,6 +493,23 @@ rlock_repr(rlockobject *self) } +#ifdef HAVE_FORK +static PyObject * +rlock__at_fork_reinit(rlockobject *self, PyObject *Py_UNUSED(args)) +{ + if (_PyThread_at_fork_reinit(&self->rlock_lock) < 0) { + PyErr_SetString(ThreadError, "failed to reinitialize lock at fork"); + return NULL; + } + + self->rlock_owner = 0; + self->rlock_count = 0; + + Py_RETURN_NONE; +} +#endif /* HAVE_FORK */ + + static PyMethodDef rlock_methods[] = { {"acquire", (PyCFunction)(void(*)(void))rlock_acquire, METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc}, @@ -490,6 +525,10 @@ static PyMethodDef rlock_methods[] = { METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc}, {"__exit__", (PyCFunction)rlock_release, METH_VARARGS, rlock_release_doc}, +#ifdef HAVE_FORK + {"_at_fork_reinit", (PyCFunction)rlock__at_fork_reinit, + METH_NOARGS, NULL}, +#endif {NULL, NULL} /* sentinel */ }; diff --git a/Modules/posixmodule.c b/Modules/posixmodule.c index 345798d..4264bf1 100644 --- a/Modules/posixmodule.c +++ b/Modules/posixmodule.c @@ -491,7 +491,8 @@ register_at_forker(PyObject **lst, PyObject *func) } return PyList_Append(*lst, func); } -#endif +#endif /* HAVE_FORK */ + /* Legacy wrapper */ void diff --git a/Python/thread_pthread.h b/Python/thread_pthread.h index 40e2e11..e3497e7 100644 --- a/Python/thread_pthread.h +++ b/Python/thread_pthread.h @@ -694,6 +694,26 @@ PyThread_release_lock(PyThread_type_lock lock) #endif /* USE_SEMAPHORES */ int +_PyThread_at_fork_reinit(PyThread_type_lock *lock) +{ + PyThread_type_lock new_lock = PyThread_allocate_lock(); + if (new_lock == NULL) { + return -1; + } + + /* bpo-6721, bpo-40089: The old lock can be in an inconsistent state. + fork() can be called in the middle of an operation on the lock done by + another thread. So don't call PyThread_free_lock(*lock). + + Leak memory on purpose. Don't release the memory either since the + address of a mutex is relevant. Putting two mutexes at the same address + can lead to problems. */ + + *lock = new_lock; + return 0; +} + +int PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) { return PyThread_acquire_lock_timed(lock, waitflag ? -1 : 0, /*intr_flag=*/0); -- cgit v0.12