summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2011-10-04 09:51:23 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2011-10-04 09:51:23 (GMT)
commit75e78b6c775dbb791530af4fe4871e0ddeb2d880 (patch)
treeb17e40b5f7ab26c9766c2786582d09056780aefe
parent031487eb3b30ea940ce8ee732c0b1e965d3229c0 (diff)
downloadcpython-75e78b6c775dbb791530af4fe4871e0ddeb2d880.zip
cpython-75e78b6c775dbb791530af4fe4871e0ddeb2d880.tar.gz
cpython-75e78b6c775dbb791530af4fe4871e0ddeb2d880.tar.bz2
Use the faulthandler module's infrastructure to write a GIL-less
memory watchdog for timely stats collection.
-rw-r--r--Lib/test/support.py109
-rw-r--r--Modules/faulthandler.c183
2 files changed, 249 insertions, 43 deletions
diff --git a/Lib/test/support.py b/Lib/test/support.py
index 62ad606..cce04aa 100644
--- a/Lib/test/support.py
+++ b/Lib/test/support.py
@@ -23,6 +23,7 @@ import time
import sysconfig
import fnmatch
import logging.handlers
+import struct
try:
import _thread, threading
@@ -34,6 +35,10 @@ try:
except ImportError:
multiprocessing = None
+try:
+ import faulthandler
+except ImportError:
+ faulthandler = None
try:
import zlib
@@ -1133,41 +1138,66 @@ def set_memlimit(limit):
raise ValueError('Memory limit %r too low to be useful' % (limit,))
max_memuse = memlimit
-def _memory_watchdog(start_evt, finish_evt, period=10.0):
- """A function which periodically watches the process' memory consumption
+class _MemoryWatchdog:
+ """An object which periodically watches the process' memory consumption
and prints it out.
"""
- # XXX: because of the GIL, and because the very long operations tested
- # in most bigmem tests are uninterruptible, the loop below gets woken up
- # much less often than expected.
- # The polling code should be rewritten in raw C, without holding the GIL,
- # and push results onto an anonymous pipe.
- try:
- page_size = os.sysconf('SC_PAGESIZE')
- except (ValueError, AttributeError):
+
+ def __init__(self):
+ self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
+ self.started = False
+ self.thread = None
try:
- page_size = os.sysconf('SC_PAGE_SIZE')
+ self.page_size = os.sysconf('SC_PAGESIZE')
except (ValueError, AttributeError):
- page_size = 4096
- procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
- try:
- f = open(procfile, 'rb')
- except IOError as e:
- warnings.warn('/proc not available for stats: {}'.format(e),
- RuntimeWarning)
- sys.stderr.flush()
- return
- with f:
- start_evt.set()
- old_data = -1
- while not finish_evt.wait(period):
- f.seek(0)
- statm = f.read().decode('ascii')
- data = int(statm.split()[5])
- if data != old_data:
- old_data = data
+ try:
+ self.page_size = os.sysconf('SC_PAGE_SIZE')
+ except (ValueError, AttributeError):
+ self.page_size = 4096
+
+ def consumer(self, fd):
+ HEADER = "l"
+ header_size = struct.calcsize(HEADER)
+ try:
+ while True:
+ header = os.read(fd, header_size)
+ if len(header) < header_size:
+ # Pipe closed on other end
+ break
+ data_len, = struct.unpack(HEADER, header)
+ data = os.read(fd, data_len)
+ statm = data.decode('ascii')
+ data = int(statm.split()[5])
print(" ... process data size: {data:.1f}G"
- .format(data=data * page_size / (1024 ** 3)))
+ .format(data=data * self.page_size / (1024 ** 3)))
+ finally:
+ os.close(fd)
+
+ def start(self):
+ if not faulthandler or not hasattr(faulthandler, '_file_watchdog'):
+ return
+ try:
+ rfd = os.open(self.procfile, os.O_RDONLY)
+ except OSError as e:
+ warnings.warn('/proc not available for stats: {}'.format(e),
+ RuntimeWarning)
+ sys.stderr.flush()
+ return
+ pipe_fd, wfd = os.pipe()
+ # _file_watchdog() doesn't take the GIL in its child thread, and
+ # therefore collects statistics timely
+ faulthandler._file_watchdog(rfd, wfd, 3.0)
+ self.started = True
+ self.thread = threading.Thread(target=self.consumer, args=(pipe_fd,))
+ self.thread.daemon = True
+ self.thread.start()
+
+ def stop(self):
+ if not self.started:
+ return
+ faulthandler._cancel_file_watchdog()
+ self.thread.join()
+
def bigmemtest(size, memuse, dry_run=True):
"""Decorator for bigmem tests.
@@ -1194,27 +1224,20 @@ def bigmemtest(size, memuse, dry_run=True):
"not enough memory: %.1fG minimum needed"
% (size * memuse / (1024 ** 3)))
- if real_max_memuse and verbose and threading:
+ if real_max_memuse and verbose and faulthandler and threading:
print()
print(" ... expected peak memory use: {peak:.1f}G"
.format(peak=size * memuse / (1024 ** 3)))
- sys.stdout.flush()
- start_evt = threading.Event()
- finish_evt = threading.Event()
- t = threading.Thread(target=_memory_watchdog,
- args=(start_evt, finish_evt, 0.5))
- t.daemon = True
- t.start()
- start_evt.set()
+ watchdog = _MemoryWatchdog()
+ watchdog.start()
else:
- t = None
+ watchdog = None
try:
return f(self, maxsize)
finally:
- if t:
- finish_evt.set()
- t.join()
+ if watchdog:
+ watchdog.stop()
wrapper.size = size
wrapper.memuse = memuse
diff --git a/Modules/faulthandler.c b/Modules/faulthandler.c
index 8ee0630..3002840 100644
--- a/Modules/faulthandler.c
+++ b/Modules/faulthandler.c
@@ -13,6 +13,7 @@
#ifdef WITH_THREAD
# define FAULTHANDLER_LATER
+# define FAULTHANDLER_WATCHDOG
#endif
#ifndef MS_WINDOWS
@@ -65,6 +66,20 @@ static struct {
} thread;
#endif
+#ifdef FAULTHANDLER_WATCHDOG
+static struct {
+ int rfd;
+ int wfd;
+ PY_TIMEOUT_T period_us; /* period in microseconds */
+ /* The main thread always holds this lock. It is only released when
+ faulthandler_watchdog() is interrupted before this thread exits, or at
+ Python exit. */
+ PyThread_type_lock cancel_event;
+ /* released by child thread when joined */
+ PyThread_type_lock running;
+} watchdog;
+#endif
+
#ifdef FAULTHANDLER_USER
typedef struct {
int enabled;
@@ -587,6 +602,138 @@ faulthandler_cancel_dump_tracebacks_later_py(PyObject *self)
}
#endif /* FAULTHANDLER_LATER */
+#ifdef FAULTHANDLER_WATCHDOG
+
+static void
+file_watchdog(void *unused)
+{
+ PyLockStatus st;
+ PY_TIMEOUT_T timeout;
+
+ const int MAXDATA = 1024;
+ char buf1[MAXDATA], buf2[MAXDATA];
+ char *data = buf1, *old_data = buf2;
+ Py_ssize_t data_len, old_data_len = -1;
+
+#if defined(HAVE_PTHREAD_SIGMASK) && !defined(HAVE_BROKEN_PTHREAD_SIGMASK)
+ sigset_t set;
+
+ /* we don't want to receive any signal */
+ sigfillset(&set);
+ pthread_sigmask(SIG_SETMASK, &set, NULL);
+#endif
+
+ /* On first pass, feed file contents immediately */
+ timeout = 0;
+ do {
+ st = PyThread_acquire_lock_timed(watchdog.cancel_event,
+ timeout, 0);
+ timeout = watchdog.period_us;
+ if (st == PY_LOCK_ACQUIRED) {
+ PyThread_release_lock(watchdog.cancel_event);
+ break;
+ }
+ /* Timeout => read and write data */
+ assert(st == PY_LOCK_FAILURE);
+
+ if (lseek(watchdog.rfd, 0, SEEK_SET) < 0) {
+ break;
+ }
+ data_len = read(watchdog.rfd, data, MAXDATA);
+ if (data_len < 0) {
+ break;
+ }
+ if (data_len != old_data_len || memcmp(data, old_data, data_len)) {
+ char *tdata;
+ Py_ssize_t tlen;
+ /* Contents changed, feed them to wfd */
+ long x = (long) data_len;
+ /* We can't do anything if the consumer is too slow, just bail out */
+ if (write(watchdog.wfd, (void *) &x, sizeof(x)) < sizeof(x))
+ break;
+ if (write(watchdog.wfd, data, data_len) < data_len)
+ break;
+ tdata = data;
+ data = old_data;
+ old_data = tdata;
+ tlen = data_len;
+ data_len = old_data_len;
+ old_data_len = tlen;
+ }
+ } while (1);
+
+ close(watchdog.rfd);
+ close(watchdog.wfd);
+
+ /* The only way out */
+ PyThread_release_lock(watchdog.running);
+}
+
+static void
+cancel_file_watchdog(void)
+{
+ /* Notify cancellation */
+ PyThread_release_lock(watchdog.cancel_event);
+
+ /* Wait for thread to join */
+ PyThread_acquire_lock(watchdog.running, 1);
+ PyThread_release_lock(watchdog.running);
+
+ /* The main thread should always hold the cancel_event lock */
+ PyThread_acquire_lock(watchdog.cancel_event, 1);
+}
+
+static PyObject*
+faulthandler_file_watchdog(PyObject *self,
+ PyObject *args, PyObject *kwargs)
+{
+ static char *kwlist[] = {"rfd", "wfd", "period", NULL};
+ double period;
+ PY_TIMEOUT_T period_us;
+ int rfd, wfd;
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs,
+ "iid:_file_watchdog", kwlist,
+ &rfd, &wfd, &period))
+ return NULL;
+ if ((period * 1e6) >= (double) PY_TIMEOUT_MAX) {
+ PyErr_SetString(PyExc_OverflowError, "period value is too large");
+ return NULL;
+ }
+ period_us = (PY_TIMEOUT_T)(period * 1e6);
+ if (period_us <= 0) {
+ PyErr_SetString(PyExc_ValueError, "period must be greater than 0");
+ return NULL;
+ }
+
+ /* Cancel previous thread, if running */
+ cancel_file_watchdog();
+
+ watchdog.rfd = rfd;
+ watchdog.wfd = wfd;
+ watchdog.period_us = period_us;
+
+ /* Arm these locks to serve as events when released */
+ PyThread_acquire_lock(watchdog.running, 1);
+
+ if (PyThread_start_new_thread(file_watchdog, NULL) == -1) {
+ PyThread_release_lock(watchdog.running);
+ PyErr_SetString(PyExc_RuntimeError,
+ "unable to start file watchdog thread");
+ return NULL;
+ }
+
+ Py_RETURN_NONE;
+}
+
+static PyObject*
+faulthandler_cancel_file_watchdog(PyObject *self)
+{
+ cancel_file_watchdog();
+ Py_RETURN_NONE;
+}
+#endif /* FAULTHANDLER_WATCHDOG */
+
#ifdef FAULTHANDLER_USER
static int
faulthandler_register(int signum, int chain, _Py_sighandler_t *p_previous)
@@ -973,6 +1120,18 @@ static PyMethodDef module_methods[] = {
"to dump_tracebacks_later().")},
#endif
+#ifdef FAULTHANDLER_WATCHDOG
+ {"_file_watchdog",
+ (PyCFunction)faulthandler_file_watchdog, METH_VARARGS|METH_KEYWORDS,
+ PyDoc_STR("_file_watchdog(rfd, wfd, period):\n"
+ "feed the contents of 'rfd' to 'wfd', if changed,\n"
+ "every 'period seconds'.")},
+ {"_cancel_file_watchdog",
+ (PyCFunction)faulthandler_cancel_file_watchdog, METH_NOARGS,
+ PyDoc_STR("_cancel_file_watchdog():\ncancel the previous call "
+ "to _file_watchdog().")},
+#endif
+
#ifdef FAULTHANDLER_USER
{"register",
(PyCFunction)faulthandler_register_py, METH_VARARGS|METH_KEYWORDS,
@@ -1097,6 +1256,16 @@ int _PyFaulthandler_Init(void)
}
PyThread_acquire_lock(thread.cancel_event, 1);
#endif
+#ifdef FAULTHANDLER_WATCHDOG
+ watchdog.cancel_event = PyThread_allocate_lock();
+ watchdog.running = PyThread_allocate_lock();
+ if (!watchdog.cancel_event || !watchdog.running) {
+ PyErr_SetString(PyExc_RuntimeError,
+ "could not allocate locks for faulthandler");
+ return -1;
+ }
+ PyThread_acquire_lock(watchdog.cancel_event, 1);
+#endif
return faulthandler_env_options();
}
@@ -1121,6 +1290,20 @@ void _PyFaulthandler_Fini(void)
}
#endif
+#ifdef FAULTHANDLER_WATCHDOG
+ /* file watchdog */
+ cancel_file_watchdog();
+ if (watchdog.cancel_event) {
+ PyThread_release_lock(watchdog.cancel_event);
+ PyThread_free_lock(watchdog.cancel_event);
+ watchdog.cancel_event = NULL;
+ }
+ if (watchdog.running) {
+ PyThread_free_lock(watchdog.running);
+ watchdog.running = NULL;
+ }
+#endif
+
#ifdef FAULTHANDLER_USER
/* user */
if (user_signals != NULL) {