#!/usr/bin/env python # # Copyright 2001-2004 by Vinay Sajip. All Rights Reserved. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose and without fee is hereby granted, # provided that the above copyright notice appear in all copies and that # both that copyright notice and this permission notice appear in # supporting documentation, and that the name of Vinay Sajip # not be used in advertising or publicity pertaining to distribution # of the software without specific, written prior permission. # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # # This file is part of the Python logging distribution. See # http://www.red-dove.com/python_logging.html # """Test harness for the logging module. Run all tests. Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved. """ import select import os, sys, struct, types, pickle, cStringIO import socket, tempfile, threading, time import logging, logging.handlers, logging.config from test.test_support import run_with_locale BANNER = "-- %-10s %-6s ---------------------------------------------------\n" FINISH_UP = "Finish up, it's closing time. Messages should bear numbers 0 through 24." #---------------------------------------------------------------------------- # Log receiver #---------------------------------------------------------------------------- TIMEOUT = 10 from SocketServer import ThreadingTCPServer, StreamRequestHandler class LogRecordStreamHandler(StreamRequestHandler): """ Handler for a streaming logging request. It basically logs the record using whatever logging policy is configured locally. """ def handle(self): """ Handle multiple requests - each expected to be a 4-byte length, followed by the LogRecord in pickle format. Logs the record according to whatever policy is configured locally. """ while 1: try: chunk = self.connection.recv(4) if len(chunk) < 4: break slen = struct.unpack(">L", chunk)[0] chunk = self.connection.recv(slen) while len(chunk) < slen: chunk = chunk + self.connection.recv(slen - len(chunk)) obj = self.unPickle(chunk) record = logging.makeLogRecord(obj) self.handleLogRecord(record) except: raise def unPickle(self, data): return pickle.loads(data) def handleLogRecord(self, record): logname = "logrecv.tcp." + record.name #If the end-of-messages sentinel is seen, tell the server to terminate if record.msg == FINISH_UP: self.server.abort = 1 record.msg = record.msg + " (via " + logname + ")" logger = logging.getLogger(logname) logger.handle(record) # The server sets socketDataProcessed when it's done. socketDataProcessed = threading.Event() class LogRecordSocketReceiver(ThreadingTCPServer): """ A simple-minded TCP socket-based logging receiver suitable for test purposes. """ allow_reuse_address = 1 def __init__(self, host='localhost', port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, handler=LogRecordStreamHandler): ThreadingTCPServer.__init__(self, (host, port), handler) self.abort = 0 self.timeout = 1 def serve_until_stopped(self): while not self.abort: rd, wr, ex = select.select([self.socket.fileno()], [], [], self.timeout) if rd: self.handle_request() #notify the main thread that we're about to exit socketDataProcessed.set() # close the listen socket self.server_close() def process_request(self, request, client_address): #import threading t = threading.Thread(target = self.finish_request, args = (request, client_address)) t.start() def runTCP(tcpserver): tcpserver.serve_until_stopped() #---------------------------------------------------------------------------- # Test 0 #---------------------------------------------------------------------------- msgcount = 0 def nextmessage(): global msgcount rv = "Message %d" % msgcount msgcount = msgcount + 1 return rv def test0(): ERR = logging.getLogger("ERR") ERR.setLevel(logging.ERROR) INF = logging.getLogger("INF") INF.setLevel(logging.INFO) INF_ERR = logging.getLogger("INF.ERR") INF_ERR.setLevel(logging.ERROR) DEB = logging.getLogger("DEB") DEB.setLevel(logging.DEBUG) INF_UNDEF = logging.getLogger("INF.UNDEF") INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") UNDEF = logging.getLogger("UNDEF") GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") CHILD = logging.getLogger("INF.BADPARENT") #These should log ERR.log(logging.FATAL, nextmessage()) ERR.error(nextmessage()) INF.log(logging.FATAL, nextmessage()) INF.error(nextmessage()) INF.warn(nextmessage()) INF.info(nextmessage()) INF_UNDEF.log(logging.FATAL, nextmessage()) INF_UNDEF.error(nextmessage()) INF_UNDEF.warn (nextmessage()) INF_UNDEF.info (nextmessage()) INF_ERR.log(logging.FATAL, nextmessage()) INF_ERR.error(nextmessage()) INF_ERR_UNDEF.log(logging.FATAL, nextmessage()) INF_ERR_UNDEF.error(nextmessage()) DEB.log(logging.FATAL, nextmessage()) DEB.error(nextmessage()) DEB.warn (nextmessage()) DEB.info (nextmessage()) DEB.debug(nextmessage()) UNDEF.log(logging.FATAL, nextmessage()) UNDEF.error(nextmessage()) UNDEF.warn (nextmessage()) UNDEF.info (nextmessage()) GRANDCHILD.log(logging.FATAL, nextmessage()) CHILD.log(logging.FATAL, nextmessage()) #These should not log ERR.warn(nextmessage()) ERR.info(nextmessage()) ERR.debug(nextmessage()) INF.debug(nextmessage()) INF_UNDEF.debug(nextmessage()) INF_ERR.warn(nextmessage()) INF_ERR.info(nextmessage()) INF_ERR.debug(nextmessage()) INF_ERR_UNDEF.warn(nextmessage()) INF_ERR_UNDEF.info(nextmessage()) INF_ERR_UNDEF.debug(nextmessage()) INF.info(FINISH_UP) #---------------------------------------------------------------------------- # Test 1 #---------------------------------------------------------------------------- # # First, we define our levels. There can be as many as you want - the only # limitations are that they should be integers, the lowest should be > 0 and # larger values mean less information being logged. If you need specific # level values which do not fit into these limitations, you can use a # mapping dictionary to convert between your application levels and the # logging system. # SILENT = 10 TACITURN = 9 TERSE = 8 EFFUSIVE = 7 SOCIABLE = 6 VERBOSE = 5 TALKATIVE = 4 GARRULOUS = 3 CHATTERBOX = 2 BORING = 1 LEVEL_RANGE = range(BORING, SILENT + 1) # # Next, we define names for our levels. You don't need to do this - in which # case the system will use "Level n" to denote the text for the level. # my_logging_levels = { SILENT : 'Silent', TACITURN : 'Taciturn', TERSE : 'Terse', EFFUSIVE : 'Effusive', SOCIABLE : 'Sociable', VERBOSE : 'Verbose', TALKATIVE : 'Talkative', GARRULOUS : 'Garrulous', CHATTERBOX : 'Chatterbox', BORING : 'Boring', } # # Now, to demonstrate filtering: suppose for some perverse reason we only # want to print out all except GARRULOUS messages. Let's create a filter for # this purpose... # class SpecificLevelFilter(logging.Filter): def __init__(self, lvl): self.level = lvl def filter(self, record): return self.level != record.levelno class GarrulousFilter(SpecificLevelFilter): def __init__(self): SpecificLevelFilter.__init__(self, GARRULOUS) # # Now, let's demonstrate filtering at the logger. This time, use a filter # which excludes SOCIABLE and TACITURN messages. Note that GARRULOUS events # are still excluded. # class VerySpecificFilter(logging.Filter): def filter(self, record): return record.levelno not in [SOCIABLE, TACITURN] def message(s): sys.stdout.write("%s\n" % s) SHOULD1 = "This should only be seen at the '%s' logging level (or lower)" def test1(): # # Now, tell the logging system to associate names with our levels. # for lvl in my_logging_levels.keys(): logging.addLevelName(lvl, my_logging_levels[lvl]) # # Now, define a test function which logs an event at each of our levels. # def doLog(log): for lvl in LEVEL_RANGE: log.log(lvl, SHOULD1, logging.getLevelName(lvl)) log = logging.getLogger("") hdlr = log.handlers[0] # # Set the logging level to each different value and call the utility # function to log events. # In the output, you should see that each time round the loop, the number of # logging events which are actually output decreases. # for lvl in LEVEL_RANGE: message("-- setting logging level to '%s' -----" % logging.getLevelName(lvl)) log.setLevel(lvl) doLog(log) # # Now, we demonstrate level filtering at the handler level. Tell the # handler defined above to filter at level 'SOCIABLE', and repeat the # above loop. Compare the output from the two runs. # hdlr.setLevel(SOCIABLE) message("-- Filtering at handler level to SOCIABLE --") for lvl in LEVEL_RANGE: message("-- setting logging level to '%s' -----" % logging.getLevelName(lvl)) log.setLevel(lvl) doLog(log) hdlr.setLevel(0) #turn off level filtering at the handler garr = GarrulousFilter() hdlr.addFilter(garr) message("-- Filtering using GARRULOUS filter --") for lvl in LEVEL_RANGE: message("-- setting logging level to '%s' -----" % logging.getLevelName(lvl)) log.setLevel(lvl) doLog(log) spec = VerySpecificFilter() log.addFilter(spec) message("-- Filtering using specific filter for SOCIABLE, TACITURN --") for lvl in LEVEL_RANGE: message("-- setting logging level to '%s' -----" % logging.getLevelName(lvl)) log.setLevel(lvl) doLog(log) log.removeFilter(spec) hdlr.removeFilter(garr) #Undo the one level which clashes...for regression tests logging.addLevelName(logging.DEBUG, "DEBUG") #---------------------------------------------------------------------------- # Test 2 #---------------------------------------------------------------------------- MSG = "-- logging %d at INFO, messages should be seen every 10 events --" def test2(): logger = logging.getLogger("") sh = logger.handlers[0] sh.close() logger.removeHandler(sh) mh = logging.handlers.MemoryHandler(10,logging.WARNING, sh) logger.setLevel(logging.DEBUG) logger.addHandler(mh) message("-- logging at DEBUG, nothing should be seen yet --") logger.debug("Debug message") message("-- logging at INFO, nothing should be seen yet --") logger.info("Info message") message("-- logging at WARNING, 3 messages should be seen --") logger.warn("Warn message") for i in xrange(102): message(MSG % i) logger.info("Info index = %d", i) mh.close() logger.removeHandler(mh) logger.addHandler(sh) #---------------------------------------------------------------------------- # Test 3 #---------------------------------------------------------------------------- FILTER = "a.b" def doLog3(): logging.getLogger("a").info("Info 1") logging.getLogger("a.b").info("Info 2") logging.getLogger("a.c").info("Info 3") logging.getLogger("a.b.c").info("Info 4") logging.getLogger("a.b.c.d").info("Info 5") logging.getLogger("a.bb.c").info("Info 6") logging.getLogger("b").info("Info 7") logging.getLogger("b.a").info("Info 8") logging.getLogger("c.a.b").info("Info 9") logging.getLogger("a.bb").info("Info 10") def test3(): root = logging.getLogger() root.setLevel(logging.DEBUG) hand = root.handlers[0] message("Unfiltered...") doLog3() message("Filtered with '%s'..." % FILTER) filt = logging.Filter(FILTER) hand.addFilter(filt) doLog3() hand.removeFilter(filt) #---------------------------------------------------------------------------- # Test 4 #---------------------------------------------------------------------------- # config0 is a standard configuration. config0 = """ [loggers] keys=root [handlers] keys=hand1 [formatters] keys=form1 [logger_root] level=NOTSET handlers=hand1 [handler_hand1] class=StreamHandler level=NOTSET formatter=form1 args=(sys.stdout,) [formatter_form1] format=%(levelname)s:%(name)s:%(message)s datefmt= """ # config1 adds a little to the standard configuration. config1 = """ [loggers] keys=root,parser [handlers] keys=hand1 [formatters] keys=form1 [logger_root] level=NOTSET handlers=hand1 [logger_parser] level=DEBUG handlers=hand1 propagate=1 qualname=compiler.parser [handler_hand1] class=StreamHandler level=NOTSET formatter=form1 args=(sys.stdout,) [formatter_form1] format=%(levelname)s:%(name)s:%(message)s datefmt= """ # config2 has a subtle configuration error that should be reported config2 = config1.replace("sys.stdout", "sys.stbout") # config3 has a less subtle configuration error config3 = config1.replace("formatter=form1", "formatter=misspelled_name") def test4(): for i in range(4): conf = globals()['config%d' % i] sys.stdout.write('config%d: ' % i) loggerDict = logging.getLogger().manager.loggerDict logging._acquireLock() try: saved_handlers = logging._handlers.copy() saved_handler_list = logging._handlerList[:] saved_loggers = loggerDict.copy() finally: logging._releaseLock() try: fn = tempfile.mktemp(".ini") f = open(fn, "w") f.write(conf) f.close() try: logging.config.fileConfig(fn) #call again to make sure cleanup is correct logging.config.fileConfig(fn) except: t = sys.exc_info()[0] message(str(t)) else: message('ok.') os.remove(fn) finally: logging._acquireLock() try: logging._handlers.clear() logging._handlers.update(saved_handlers) logging._handlerList[:] = saved_handler_list loggerDict = logging.getLogger().manager.loggerDict loggerDict.clear() loggerDict.update(saved_loggers) finally: logging._releaseLock() #---------------------------------------------------------------------------- # Test 5 #---------------------------------------------------------------------------- test5_config = """ [loggers] keys=root [handlers] keys=hand1 [formatters] keys=form1 [logger_root] level=NOTSET handlers=hand1 [handler_hand1] class=StreamHandler level=NOTSET formatter=form1 args=(sys.stdout,) [formatter_form1] class=test.test_logging.FriendlyFormatter format=%(levelname)s:%(name)s:%(message)s datefmt= """ class FriendlyFormatter (logging.Formatter): def formatException(self, ei): return "%s... Don't panic!" % str(ei[0]) def test5(): loggerDict = logging.getLogger().manager.loggerDict logging._acquireLock() try: saved_handlers = logging._handlers.copy() saved_handler_list = logging._handlerList[:] saved_loggers = loggerDict.copy() finally: logging._releaseLock() try: fn = tempfile.mktemp(".ini") f = open(fn, "w") f.write(test5_config) f.close() logging.config.fileConfig(fn) try: raise KeyError except KeyError: logging.exception("just testing") os.remove(fn) hdlr = logging.getLogger().handlers[0] logging.getLogger().handlers.remove(hdlr) finally: logging._acquireLock() try: logging._handlers.clear() logging._handlers.update(saved_handlers) logging._handlerList[:] = saved_handler_list loggerDict = logging.getLogger().manager.loggerDict loggerDict.clear() loggerDict.update(saved_loggers) finally: logging._releaseLock() #---------------------------------------------------------------------------- # Test Harness #---------------------------------------------------------------------------- def banner(nm, typ): sep = BANNER % (nm, typ) sys.stdout.write(sep) sys.stdout.flush() def test_main_inner(): rootLogger = logging.getLogger("") rootLogger.setLevel(logging.DEBUG) hdlr = logging.StreamHandler(sys.stdout) fmt = logging.Formatter(logging.BASIC_FORMAT) hdlr.setFormatter(fmt) rootLogger.addHandler(hdlr) # Find an unused port number port = logging.handlers.DEFAULT_TCP_LOGGING_PORT while port < logging.handlers.DEFAULT_TCP_LOGGING_PORT+100: try: tcpserver = LogRecordSocketReceiver(port=port) except socket.error: port += 1 else: break else: raise ImportError, "Could not find unused port" #Set up a handler such that all events are sent via a socket to the log #receiver (logrecv). #The handler will only be added to the rootLogger for some of the tests shdlr = logging.handlers.SocketHandler('localhost', port) #Configure the logger for logrecv so events do not propagate beyond it. #The sockLogger output is buffered in memory until the end of the test, #and printed at the end. sockOut = cStringIO.StringIO() sockLogger = logging.getLogger("logrecv") sockLogger.setLevel(logging.DEBUG) sockhdlr = logging.StreamHandler(sockOut) sockhdlr.setFormatter(logging.Formatter( "%(name)s -> %(levelname)s: %(message)s")) sockLogger.addHandler(sockhdlr) sockLogger.propagate = 0 #Set up servers threads = [] #sys.stdout.write("About to start TCP server...\n") threads.append(threading.Thread(target=runTCP, args=(tcpserver,))) for thread in threads: thread.start() try: banner("log_test0", "begin") rootLogger.addHandler(shdlr) test0() # XXX(nnorwitz): Try to fix timing related test failures. # This sleep gives us some extra time to read messages. # The test generally only fails on Solaris without this sleep. time.sleep(2.0) shdlr.close() rootLogger.removeHandler(shdlr) banner("log_test0", "end") for t in range(1,6): banner("log_test%d" % t, "begin") globals()['test%d' % t]() banner("log_test%d" % t, "end") finally: #wait for TCP receiver to terminate socketDataProcessed.wait() # ensure the server dies tcpserver.abort = 1 for thread in threads: thread.join(2.0) banner("logrecv output", "begin") sys.stdout.write(sockOut.getvalue()) sockOut.close() sockLogger.removeHandler(sockhdlr) sockhdlr.close() banner("logrecv output", "end") sys.stdout.flush() try: hdlr.close() except: pass rootLogger.removeHandler(hdlr) # Set the locale to the platform-dependent default. I have no idea # why the test does this, but in any case we save the current locale # first and restore it at the end. @run_with_locale('LC_ALL', '') def test_main(): # Save and restore the original root logger level across the tests. # Otherwise, e.g., if any test using cookielib runs after test_logging, # cookielib's debug-level logger tries to log messages, leading to # confusing: # No handlers could be found for logger "cookielib" # output while the tests are running. root_logger = logging.getLogger("") original_logging_level = root_logger.getEffectiveLevel() try: test_main_inner() finally: root_logger.setLevel(original_logging_level) if __name__ == "__main__": sys.stdout.write("test_logging\n") test_main()