diff options
author | Antoine Pitrou <solipsis@pitrou.net> | 2011-10-04 09:51:23 (GMT) |
---|---|---|
committer | Antoine Pitrou <solipsis@pitrou.net> | 2011-10-04 09:51:23 (GMT) |
commit | 75e78b6c775dbb791530af4fe4871e0ddeb2d880 (patch) | |
tree | b17e40b5f7ab26c9766c2786582d09056780aefe | |
parent | 031487eb3b30ea940ce8ee732c0b1e965d3229c0 (diff) | |
download | cpython-75e78b6c775dbb791530af4fe4871e0ddeb2d880.zip cpython-75e78b6c775dbb791530af4fe4871e0ddeb2d880.tar.gz cpython-75e78b6c775dbb791530af4fe4871e0ddeb2d880.tar.bz2 |
Use the faulthandler module's infrastructure to write a GIL-less
memory watchdog for timely stats collection.
-rw-r--r-- | Lib/test/support.py | 109 | ||||
-rw-r--r-- | Modules/faulthandler.c | 183 |
2 files changed, 249 insertions, 43 deletions
diff --git a/Lib/test/support.py b/Lib/test/support.py index 62ad606..cce04aa 100644 --- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -23,6 +23,7 @@ import time import sysconfig import fnmatch import logging.handlers +import struct try: import _thread, threading @@ -34,6 +35,10 @@ try: except ImportError: multiprocessing = None +try: + import faulthandler +except ImportError: + faulthandler = None try: import zlib @@ -1133,41 +1138,66 @@ def set_memlimit(limit): raise ValueError('Memory limit %r too low to be useful' % (limit,)) max_memuse = memlimit -def _memory_watchdog(start_evt, finish_evt, period=10.0): - """A function which periodically watches the process' memory consumption +class _MemoryWatchdog: + """An object which periodically watches the process' memory consumption and prints it out. """ - # XXX: because of the GIL, and because the very long operations tested - # in most bigmem tests are uninterruptible, the loop below gets woken up - # much less often than expected. - # The polling code should be rewritten in raw C, without holding the GIL, - # and push results onto an anonymous pipe. - try: - page_size = os.sysconf('SC_PAGESIZE') - except (ValueError, AttributeError): + + def __init__(self): + self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) + self.started = False + self.thread = None try: - page_size = os.sysconf('SC_PAGE_SIZE') + self.page_size = os.sysconf('SC_PAGESIZE') except (ValueError, AttributeError): - page_size = 4096 - procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) - try: - f = open(procfile, 'rb') - except IOError as e: - warnings.warn('/proc not available for stats: {}'.format(e), - RuntimeWarning) - sys.stderr.flush() - return - with f: - start_evt.set() - old_data = -1 - while not finish_evt.wait(period): - f.seek(0) - statm = f.read().decode('ascii') - data = int(statm.split()[5]) - if data != old_data: - old_data = data + try: + self.page_size = os.sysconf('SC_PAGE_SIZE') + except (ValueError, AttributeError): + self.page_size = 4096 + + def consumer(self, fd): + HEADER = "l" + header_size = struct.calcsize(HEADER) + try: + while True: + header = os.read(fd, header_size) + if len(header) < header_size: + # Pipe closed on other end + break + data_len, = struct.unpack(HEADER, header) + data = os.read(fd, data_len) + statm = data.decode('ascii') + data = int(statm.split()[5]) print(" ... process data size: {data:.1f}G" - .format(data=data * page_size / (1024 ** 3))) + .format(data=data * self.page_size / (1024 ** 3))) + finally: + os.close(fd) + + def start(self): + if not faulthandler or not hasattr(faulthandler, '_file_watchdog'): + return + try: + rfd = os.open(self.procfile, os.O_RDONLY) + except OSError as e: + warnings.warn('/proc not available for stats: {}'.format(e), + RuntimeWarning) + sys.stderr.flush() + return + pipe_fd, wfd = os.pipe() + # _file_watchdog() doesn't take the GIL in its child thread, and + # therefore collects statistics timely + faulthandler._file_watchdog(rfd, wfd, 3.0) + self.started = True + self.thread = threading.Thread(target=self.consumer, args=(pipe_fd,)) + self.thread.daemon = True + self.thread.start() + + def stop(self): + if not self.started: + return + faulthandler._cancel_file_watchdog() + self.thread.join() + def bigmemtest(size, memuse, dry_run=True): """Decorator for bigmem tests. @@ -1194,27 +1224,20 @@ def bigmemtest(size, memuse, dry_run=True): "not enough memory: %.1fG minimum needed" % (size * memuse / (1024 ** 3))) - if real_max_memuse and verbose and threading: + if real_max_memuse and verbose and faulthandler and threading: print() print(" ... expected peak memory use: {peak:.1f}G" .format(peak=size * memuse / (1024 ** 3))) - sys.stdout.flush() - start_evt = threading.Event() - finish_evt = threading.Event() - t = threading.Thread(target=_memory_watchdog, - args=(start_evt, finish_evt, 0.5)) - t.daemon = True - t.start() - start_evt.set() + watchdog = _MemoryWatchdog() + watchdog.start() else: - t = None + watchdog = None try: return f(self, maxsize) finally: - if t: - finish_evt.set() - t.join() + if watchdog: + watchdog.stop() wrapper.size = size wrapper.memuse = memuse diff --git a/Modules/faulthandler.c b/Modules/faulthandler.c index 8ee0630..3002840 100644 --- a/Modules/faulthandler.c +++ b/Modules/faulthandler.c @@ -13,6 +13,7 @@ #ifdef WITH_THREAD # define FAULTHANDLER_LATER +# define FAULTHANDLER_WATCHDOG #endif #ifndef MS_WINDOWS @@ -65,6 +66,20 @@ static struct { } thread; #endif +#ifdef FAULTHANDLER_WATCHDOG +static struct { + int rfd; + int wfd; + PY_TIMEOUT_T period_us; /* period in microseconds */ + /* The main thread always holds this lock. It is only released when + faulthandler_watchdog() is interrupted before this thread exits, or at + Python exit. */ + PyThread_type_lock cancel_event; + /* released by child thread when joined */ + PyThread_type_lock running; +} watchdog; +#endif + #ifdef FAULTHANDLER_USER typedef struct { int enabled; @@ -587,6 +602,138 @@ faulthandler_cancel_dump_tracebacks_later_py(PyObject *self) } #endif /* FAULTHANDLER_LATER */ +#ifdef FAULTHANDLER_WATCHDOG + +static void +file_watchdog(void *unused) +{ + PyLockStatus st; + PY_TIMEOUT_T timeout; + + const int MAXDATA = 1024; + char buf1[MAXDATA], buf2[MAXDATA]; + char *data = buf1, *old_data = buf2; + Py_ssize_t data_len, old_data_len = -1; + +#if defined(HAVE_PTHREAD_SIGMASK) && !defined(HAVE_BROKEN_PTHREAD_SIGMASK) + sigset_t set; + + /* we don't want to receive any signal */ + sigfillset(&set); + pthread_sigmask(SIG_SETMASK, &set, NULL); +#endif + + /* On first pass, feed file contents immediately */ + timeout = 0; + do { + st = PyThread_acquire_lock_timed(watchdog.cancel_event, + timeout, 0); + timeout = watchdog.period_us; + if (st == PY_LOCK_ACQUIRED) { + PyThread_release_lock(watchdog.cancel_event); + break; + } + /* Timeout => read and write data */ + assert(st == PY_LOCK_FAILURE); + + if (lseek(watchdog.rfd, 0, SEEK_SET) < 0) { + break; + } + data_len = read(watchdog.rfd, data, MAXDATA); + if (data_len < 0) { + break; + } + if (data_len != old_data_len || memcmp(data, old_data, data_len)) { + char *tdata; + Py_ssize_t tlen; + /* Contents changed, feed them to wfd */ + long x = (long) data_len; + /* We can't do anything if the consumer is too slow, just bail out */ + if (write(watchdog.wfd, (void *) &x, sizeof(x)) < sizeof(x)) + break; + if (write(watchdog.wfd, data, data_len) < data_len) + break; + tdata = data; + data = old_data; + old_data = tdata; + tlen = data_len; + data_len = old_data_len; + old_data_len = tlen; + } + } while (1); + + close(watchdog.rfd); + close(watchdog.wfd); + + /* The only way out */ + PyThread_release_lock(watchdog.running); +} + +static void +cancel_file_watchdog(void) +{ + /* Notify cancellation */ + PyThread_release_lock(watchdog.cancel_event); + + /* Wait for thread to join */ + PyThread_acquire_lock(watchdog.running, 1); + PyThread_release_lock(watchdog.running); + + /* The main thread should always hold the cancel_event lock */ + PyThread_acquire_lock(watchdog.cancel_event, 1); +} + +static PyObject* +faulthandler_file_watchdog(PyObject *self, + PyObject *args, PyObject *kwargs) +{ + static char *kwlist[] = {"rfd", "wfd", "period", NULL}; + double period; + PY_TIMEOUT_T period_us; + int rfd, wfd; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, + "iid:_file_watchdog", kwlist, + &rfd, &wfd, &period)) + return NULL; + if ((period * 1e6) >= (double) PY_TIMEOUT_MAX) { + PyErr_SetString(PyExc_OverflowError, "period value is too large"); + return NULL; + } + period_us = (PY_TIMEOUT_T)(period * 1e6); + if (period_us <= 0) { + PyErr_SetString(PyExc_ValueError, "period must be greater than 0"); + return NULL; + } + + /* Cancel previous thread, if running */ + cancel_file_watchdog(); + + watchdog.rfd = rfd; + watchdog.wfd = wfd; + watchdog.period_us = period_us; + + /* Arm these locks to serve as events when released */ + PyThread_acquire_lock(watchdog.running, 1); + + if (PyThread_start_new_thread(file_watchdog, NULL) == -1) { + PyThread_release_lock(watchdog.running); + PyErr_SetString(PyExc_RuntimeError, + "unable to start file watchdog thread"); + return NULL; + } + + Py_RETURN_NONE; +} + +static PyObject* +faulthandler_cancel_file_watchdog(PyObject *self) +{ + cancel_file_watchdog(); + Py_RETURN_NONE; +} +#endif /* FAULTHANDLER_WATCHDOG */ + #ifdef FAULTHANDLER_USER static int faulthandler_register(int signum, int chain, _Py_sighandler_t *p_previous) @@ -973,6 +1120,18 @@ static PyMethodDef module_methods[] = { "to dump_tracebacks_later().")}, #endif +#ifdef FAULTHANDLER_WATCHDOG + {"_file_watchdog", + (PyCFunction)faulthandler_file_watchdog, METH_VARARGS|METH_KEYWORDS, + PyDoc_STR("_file_watchdog(rfd, wfd, period):\n" + "feed the contents of 'rfd' to 'wfd', if changed,\n" + "every 'period seconds'.")}, + {"_cancel_file_watchdog", + (PyCFunction)faulthandler_cancel_file_watchdog, METH_NOARGS, + PyDoc_STR("_cancel_file_watchdog():\ncancel the previous call " + "to _file_watchdog().")}, +#endif + #ifdef FAULTHANDLER_USER {"register", (PyCFunction)faulthandler_register_py, METH_VARARGS|METH_KEYWORDS, @@ -1097,6 +1256,16 @@ int _PyFaulthandler_Init(void) } PyThread_acquire_lock(thread.cancel_event, 1); #endif +#ifdef FAULTHANDLER_WATCHDOG + watchdog.cancel_event = PyThread_allocate_lock(); + watchdog.running = PyThread_allocate_lock(); + if (!watchdog.cancel_event || !watchdog.running) { + PyErr_SetString(PyExc_RuntimeError, + "could not allocate locks for faulthandler"); + return -1; + } + PyThread_acquire_lock(watchdog.cancel_event, 1); +#endif return faulthandler_env_options(); } @@ -1121,6 +1290,20 @@ void _PyFaulthandler_Fini(void) } #endif +#ifdef FAULTHANDLER_WATCHDOG + /* file watchdog */ + cancel_file_watchdog(); + if (watchdog.cancel_event) { + PyThread_release_lock(watchdog.cancel_event); + PyThread_free_lock(watchdog.cancel_event); + watchdog.cancel_event = NULL; + } + if (watchdog.running) { + PyThread_free_lock(watchdog.running); + watchdog.running = NULL; + } +#endif + #ifdef FAULTHANDLER_USER /* user */ if (user_signals != NULL) { |