diff options
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r-- | Lib/test/test_logging.py | 459 |
1 files changed, 459 insertions, 0 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py new file mode 100644 index 0000000..360d3ac --- /dev/null +++ b/Lib/test/test_logging.py @@ -0,0 +1,459 @@ +#!/usr/bin/env python +# +# Copyright 2001-2002 by Vinay Sajip. All Rights Reserved. +# +# Permission to use, copy, modify, and distribute this software and its +# documentation for any purpose and without fee is hereby granted, +# provided that the above copyright notice appear in all copies and that +# both that copyright notice and this permission notice appear in +# supporting documentation, and that the name of Vinay Sajip +# not be used in advertising or publicity pertaining to distribution +# of the software without specific, written prior permission. +# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING +# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL +# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR +# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER +# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT +# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +# +# This file is part of the Python logging distribution. See +# http://www.red-dove.com/python_logging.html +# +"""Test harness for the logging module. Run all tests. + +Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved. +""" + +from select import select +import os, sys, string, struct, types, cPickle, cStringIO +import socket, threading, time, locale +import logging, logging.handlers, logging.config + +locale.setlocale(locale.LC_ALL, '') + +BANNER = "-- %-10s %-6s ---------------------------------------------------\n" + +#---------------------------------------------------------------------------- +# Log receiver +#---------------------------------------------------------------------------- + +TIMEOUT = 10 + +from SocketServer import ThreadingTCPServer, StreamRequestHandler + +class LogRecordStreamHandler(StreamRequestHandler): + """ + Handler for a streaming logging request. It basically logs the record + using whatever logging policy is configured locally. + """ + + def handle(self): + """ + Handle multiple requests - each expected to be a 4-byte length, + followed by the LogRecord in pickle format. Logs the record + according to whatever policy is configured locally. + """ + while 1: + try: + chunk = self.connection.recv(4) + if len(chunk) < 4: + break + slen = struct.unpack(">L", chunk)[0] + #print slen + chunk = self.connection.recv(slen) + while len(chunk) < slen: + chunk = chunk + self.connection.recv(slen - len(chunk)) + obj = self.unPickle(chunk) + record = logging.LogRecord(None, None, "", 0, "", (), None) + record.__dict__.update(obj) + self.handleLogRecord(record) + except: + raise + + def unPickle(self, data): + return cPickle.loads(data) + + def handleLogRecord(self, record): + logname = "logrecv.tcp." + record.name + record.msg = record.msg + " (via " + logname + ")" + logger = logging.getLogger(logname) + logger.handle(record) + +class LogRecordSocketReceiver(ThreadingTCPServer): + """ + A simple-minded TCP socket-based logging receiver suitable for test + purposes. + """ + + allow_reuse_address = 1 + + def __init__(self, host='localhost', + port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, + handler=LogRecordStreamHandler): + ThreadingTCPServer.__init__(self, (host, port), handler) + self.abort = 0 + self.timeout = 1 + + def serve_until_stopped(self): + abort = 0 + while not abort: + rd, wr, ex = select([self.socket.fileno()], + [], [], + self.timeout) + if rd: + self.handle_request() + abort = self.abort + +def runTCP(tcpserver): + tcpserver.serve_until_stopped() + +#---------------------------------------------------------------------------- +# Test 0 +#---------------------------------------------------------------------------- + +msgcount = 0 + +def nextmessage(): + global msgcount + rv = "Message %d" % msgcount + msgcount = msgcount + 1 + return rv + +def test0(): + ERR = logging.getLogger("ERR") + ERR.setLevel(logging.ERROR) + INF = logging.getLogger("INF") + INF.setLevel(logging.INFO) + INF_ERR = logging.getLogger("INF.ERR") + INF_ERR.setLevel(logging.ERROR) + DEB = logging.getLogger("DEB") + DEB.setLevel(logging.DEBUG) + + INF_UNDEF = logging.getLogger("INF.UNDEF") + INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") + UNDEF = logging.getLogger("UNDEF") + + GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") + CHILD = logging.getLogger("INF.BADPARENT") + + #These should log + ERR.log(logging.FATAL, nextmessage()) + ERR.error(nextmessage()) + + INF.log(logging.FATAL, nextmessage()) + INF.error(nextmessage()) + INF.warn(nextmessage()) + INF.info(nextmessage()) + + INF_UNDEF.log(logging.FATAL, nextmessage()) + INF_UNDEF.error(nextmessage()) + INF_UNDEF.warn (nextmessage()) + INF_UNDEF.info (nextmessage()) + + INF_ERR.log(logging.FATAL, nextmessage()) + INF_ERR.error(nextmessage()) + + INF_ERR_UNDEF.log(logging.FATAL, nextmessage()) + INF_ERR_UNDEF.error(nextmessage()) + + DEB.log(logging.FATAL, nextmessage()) + DEB.error(nextmessage()) + DEB.warn (nextmessage()) + DEB.info (nextmessage()) + DEB.debug(nextmessage()) + + UNDEF.log(logging.FATAL, nextmessage()) + UNDEF.error(nextmessage()) + UNDEF.warn (nextmessage()) + UNDEF.info (nextmessage()) + + GRANDCHILD.log(logging.FATAL, nextmessage()) + CHILD.log(logging.FATAL, nextmessage()) + + #These should not log + ERR.warn(nextmessage()) + ERR.info(nextmessage()) + ERR.debug(nextmessage()) + + INF.debug(nextmessage()) + INF_UNDEF.debug(nextmessage()) + + INF_ERR.warn(nextmessage()) + INF_ERR.info(nextmessage()) + INF_ERR.debug(nextmessage()) + INF_ERR_UNDEF.warn(nextmessage()) + INF_ERR_UNDEF.info(nextmessage()) + INF_ERR_UNDEF.debug(nextmessage()) + + INF.info("Messages should bear numbers 0 through 24.") + +#---------------------------------------------------------------------------- +# Test 1 +#---------------------------------------------------------------------------- + +# +# First, we define our levels. There can be as many as you want - the only +# limitations are that they should be integers, the lowest should be > 0 and +# larger values mean less information being logged. If you need specific +# level values which do not fit into these limitations, you can use a +# mapping dictionary to convert between your application levels and the +# logging system. +# +SILENT = 10 +TACITURN = 9 +TERSE = 8 +EFFUSIVE = 7 +SOCIABLE = 6 +VERBOSE = 5 +TALKATIVE = 4 +GARRULOUS = 3 +CHATTERBOX = 2 +BORING = 1 + +LEVEL_RANGE = range(BORING, SILENT + 1) + +# +# Next, we define names for our levels. You don't need to do this - in which +# case the system will use "Level n" to denote the text for the level. +# +my_logging_levels = { + SILENT : 'Silent', + TACITURN : 'Taciturn', + TERSE : 'Terse', + EFFUSIVE : 'Effusive', + SOCIABLE : 'Sociable', + VERBOSE : 'Verbose', + TALKATIVE : 'Talkative', + GARRULOUS : 'Garrulous', + CHATTERBOX : 'Chatterbox', + BORING : 'Boring', +} + +# +# Now, to demonstrate filtering: suppose for some perverse reason we only +# want to print out all except GARRULOUS messages. Let's create a filter for +# this purpose... +# +class SpecificLevelFilter(logging.Filter): + def __init__(self, lvl): + self.level = lvl + + def filter(self, record): + return self.level != record.levelno + +class GarrulousFilter(SpecificLevelFilter): + def __init__(self): + SpecificLevelFilter.__init__(self, GARRULOUS) + +# +# Now, let's demonstrate filtering at the logger. This time, use a filter +# which excludes SOCIABLE and TACITURN messages. Note that GARRULOUS events +# are still excluded. +# +class VerySpecificFilter(logging.Filter): + def filter(self, record): + return record.levelno not in [SOCIABLE, TACITURN] + +def message(s): + sys.stdout.write("%s\n" % s) + +SHOULD1 = "This should only be seen at the '%s' logging level (or lower)" + +def test1(): +# +# Now, tell the logging system to associate names with our levels. +# + for lvl in my_logging_levels.keys(): + logging.addLevelName(lvl, my_logging_levels[lvl]) + +# +# Now, define a test function which logs an event at each of our levels. +# + + def doLog(log): + for lvl in LEVEL_RANGE: + log.log(lvl, SHOULD1, logging.getLevelName(lvl)) + + log = logging.getLogger("") + hdlr = log.handlers[0] +# +# Set the logging level to each different value and call the utility +# function to log events. +# In the output, you should see that each time round the loop, the number of +# logging events which are actually output decreases. +# + for lvl in LEVEL_RANGE: + message("-- setting logging level to '%s' -----" % + logging.getLevelName(lvl)) + log.setLevel(lvl) + doLog(log) + # + # Now, we demonstrate level filtering at the handler level. Tell the + # handler defined above to filter at level 'SOCIABLE', and repeat the + # above loop. Compare the output from the two runs. + # + hdlr.setLevel(SOCIABLE) + message("-- Filtering at handler level to SOCIABLE --") + for lvl in LEVEL_RANGE: + message("-- setting logging level to '%s' -----" % + logging.getLevelName(lvl)) + log.setLevel(lvl) + doLog(log) + + hdlr.setLevel(0) #turn off level filtering at the handler + + garr = GarrulousFilter() + hdlr.addFilter(garr) + message("-- Filtering using GARRULOUS filter --") + for lvl in LEVEL_RANGE: + message("-- setting logging level to '%s' -----" % + logging.getLevelName(lvl)) + log.setLevel(lvl) + doLog(log) + spec = VerySpecificFilter() + log.addFilter(spec) + message("-- Filtering using specific filter for SOCIABLE, TACITURN --") + for lvl in LEVEL_RANGE: + message("-- setting logging level to '%s' -----" % + logging.getLevelName(lvl)) + log.setLevel(lvl) + doLog(log) + + log.removeFilter(spec) + hdlr.removeFilter(garr) + #Undo the one level which clashes...for regression tests + logging.addLevelName(logging.DEBUG, "DEBUG") + +#---------------------------------------------------------------------------- +# Test 2 +#---------------------------------------------------------------------------- + +MSG = "-- logging %d at INFO, messages should be seen every 10 events --" +def test2(): + logger = logging.getLogger("") + sh = logger.handlers[0] + logger.removeHandler(sh) + mh = logging.handlers.MemoryHandler(10,logging.WARN, sh) + logger.setLevel(logging.DEBUG) + logger.addHandler(mh) + message("-- logging at DEBUG, nothing should be seen yet --") + logger.debug("Debug message") + message("-- logging at INFO, nothing should be seen yet --") + logger.info("Info message") + message("-- logging at WARN, 3 messages should be seen --") + logger.warn("Warn message") + for i in xrange(102): + message(MSG % i) + logger.info("Info index = %d", i) + mh.close() + logger.removeHandler(mh) + logger.addHandler(sh) + +#---------------------------------------------------------------------------- +# Test 3 +#---------------------------------------------------------------------------- + +FILTER = "a.b" + +def doLog3(): + logging.getLogger("a").info("Info 1") + logging.getLogger("a.b").info("Info 2") + logging.getLogger("a.c").info("Info 3") + logging.getLogger("a.b.c").info("Info 4") + logging.getLogger("a.b.c.d").info("Info 5") + logging.getLogger("a.bb.c").info("Info 6") + logging.getLogger("b").info("Info 7") + logging.getLogger("b.a").info("Info 8") + logging.getLogger("c.a.b").info("Info 9") + logging.getLogger("a.bb").info("Info 10") + +def test3(): + root = logging.getLogger() + root.setLevel(logging.DEBUG) + hand = root.handlers[0] + message("Unfiltered...") + doLog3() + message("Filtered with '%s'..." % FILTER) + filt = logging.Filter(FILTER) + hand.addFilter(filt) + doLog3() + hand.removeFilter(filt) + +#---------------------------------------------------------------------------- +# Test Harness +#---------------------------------------------------------------------------- +def banner(nm, typ): + sep = BANNER % (nm, typ) + sys.stdout.write(sep) + sys.stdout.flush() + +def test_main(): + rootLogger = logging.getLogger("") + rootLogger.setLevel(logging.DEBUG) + hdlr = logging.StreamHandler(sys.stdout) + fmt = logging.Formatter(logging.BASIC_FORMAT) + hdlr.setFormatter(fmt) + rootLogger.addHandler(hdlr) + + #Set up a handler such that all events are sent via a socket to the log + #receiver (logrecv). + #The handler will only be added to the rootLogger for some of the tests + hdlr = logging.handlers.SocketHandler('localhost', + logging.handlers.DEFAULT_TCP_LOGGING_PORT) + + #Configure the logger for logrecv so events do not propagate beyond it. + #The sockLogger output is buffered in memory until the end of the test, + #and printed at the end. + sockOut = cStringIO.StringIO() + sockLogger = logging.getLogger("logrecv") + sockLogger.setLevel(logging.DEBUG) + sockhdlr = logging.StreamHandler(sockOut) + sockhdlr.setFormatter(logging.Formatter( + "%(name)s -> %(levelname)s: %(message)s")) + sockLogger.addHandler(sockhdlr) + sockLogger.propagate = 0 + + #Set up servers + threads = [] + tcpserver = LogRecordSocketReceiver() + sys.stdout.write("About to start TCP server...\n") + threads.append(threading.Thread(target=runTCP, args=(tcpserver,))) + + for thread in threads: + thread.start() + try: + banner("log_test0", "begin") + + rootLogger.addHandler(hdlr) + test0() + rootLogger.removeHandler(hdlr) + + banner("log_test0", "end") + + banner("log_test1", "begin") + test1() + banner("log_test1", "end") + + banner("log_test2", "begin") + test2() + banner("log_test2", "end") + + banner("log_test3", "begin") + test3() + banner("log_test3", "end") + + banner("logrecv output", "begin") + sys.stdout.write(sockOut.getvalue()) + sockOut.close() + banner("logrecv output", "end") + + finally: + #shut down server + tcpserver.abort = 1 + for thread in threads: + thread.join() + +if __name__ == "__main__": + sys.stdout.write("test_logging\n") + test_main() + sys.stdout.flush() |