summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_logging.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r--Lib/test/test_logging.py1406
1 files changed, 825 insertions, 581 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index d96d703..27eeb27 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2001-2004 by Vinay Sajip. All Rights Reserved.
+# Copyright 2001-2010 by Vinay Sajip. All Rights Reserved.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose and without fee is hereby granted,
@@ -15,193 +15,288 @@
# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-#
-# This file is part of the Python logging distribution. See
-# http://www.red-dove.com/python_logging.html
-#
+
"""Test harness for the logging module. Run all tests.
-Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved.
+Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved.
"""
+import logging
+import logging.handlers
+import logging.config
+
+import codecs
+import copy
+import cPickle
+import cStringIO
+import gc
+import os
+import re
import select
-import os, sys, string, struct, types, cPickle, cStringIO
-import socket, tempfile, threading, time
-import logging, logging.handlers, logging.config
-from test.test_support import run_with_locale
-
-BANNER = "-- %-10s %-6s ---------------------------------------------------\n"
-
-FINISH_UP = "Finish up, it's closing time. Messages should bear numbers 0 through 24."
-
-#----------------------------------------------------------------------------
-# Log receiver
-#----------------------------------------------------------------------------
-
-TIMEOUT = 10
-
+import socket
from SocketServer import ThreadingTCPServer, StreamRequestHandler
+import string
+import struct
+import sys
+import tempfile
+from test.test_support import captured_stdout, run_with_locale, run_unittest
+import textwrap
+import threading
+import time
+import types
+import unittest
+import weakref
+
+
+class BaseTest(unittest.TestCase):
+
+ """Base class for logging tests."""
+
+ log_format = "%(name)s -> %(levelname)s: %(message)s"
+ expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
+ message_num = 0
+
+ def setUp(self):
+ """Setup the default logging stream to an internal StringIO instance,
+ so that we can examine log output as we want."""
+ logger_dict = logging.getLogger().manager.loggerDict
+ logging._acquireLock()
+ try:
+ self.saved_handlers = logging._handlers.copy()
+ self.saved_handler_list = logging._handlerList[:]
+ self.saved_loggers = logger_dict.copy()
+ self.saved_level_names = logging._levelNames.copy()
+ finally:
+ logging._releaseLock()
-class LogRecordStreamHandler(StreamRequestHandler):
- """
- Handler for a streaming logging request. It basically logs the record
- using whatever logging policy is configured locally.
- """
-
- def handle(self):
- """
- Handle multiple requests - each expected to be a 4-byte length,
- followed by the LogRecord in pickle format. Logs the record
- according to whatever policy is configured locally.
- """
- while 1:
- try:
- chunk = self.connection.recv(4)
- if len(chunk) < 4:
- break
- slen = struct.unpack(">L", chunk)[0]
- chunk = self.connection.recv(slen)
- while len(chunk) < slen:
- chunk = chunk + self.connection.recv(slen - len(chunk))
- obj = self.unPickle(chunk)
- record = logging.makeLogRecord(obj)
- self.handleLogRecord(record)
- except:
- raise
-
- def unPickle(self, data):
- return cPickle.loads(data)
-
- def handleLogRecord(self, record):
- logname = "logrecv.tcp." + record.name
- #If the end-of-messages sentinel is seen, tell the server to terminate
- if record.msg == FINISH_UP:
- self.server.abort = 1
- record.msg = record.msg + " (via " + logname + ")"
- logger = logging.getLogger(logname)
- logger.handle(record)
-
-# The server sets socketDataProcessed when it's done.
-socketDataProcessed = threading.Event()
-
-class LogRecordSocketReceiver(ThreadingTCPServer):
- """
- A simple-minded TCP socket-based logging receiver suitable for test
- purposes.
- """
-
- allow_reuse_address = 1
-
- def __init__(self, host='localhost',
- port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
- handler=LogRecordStreamHandler):
- ThreadingTCPServer.__init__(self, (host, port), handler)
- self.abort = 0
- self.timeout = 1
+ # Set two unused loggers: one non-ASCII and one Unicode.
+ # This is to test correct operation when sorting existing
+ # loggers in the configuration code. See issues 8201, 9310.
+ logging.getLogger("\xab\xd7\xbb")
+ logging.getLogger(u"\u013f\u00d6\u0047")
+
+ self.root_logger = logging.getLogger("")
+ self.original_logging_level = self.root_logger.getEffectiveLevel()
+
+ self.stream = cStringIO.StringIO()
+ self.root_logger.setLevel(logging.DEBUG)
+ self.root_hdlr = logging.StreamHandler(self.stream)
+ self.root_formatter = logging.Formatter(self.log_format)
+ self.root_hdlr.setFormatter(self.root_formatter)
+ self.root_logger.addHandler(self.root_hdlr)
+
+ def tearDown(self):
+ """Remove our logging stream, and restore the original logging
+ level."""
+ self.stream.close()
+ self.root_logger.removeHandler(self.root_hdlr)
+ self.root_logger.setLevel(self.original_logging_level)
+ logging._acquireLock()
+ try:
+ logging._levelNames.clear()
+ logging._levelNames.update(self.saved_level_names)
+ logging._handlers.clear()
+ logging._handlers.update(self.saved_handlers)
+ logging._handlerList[:] = self.saved_handler_list
+ loggerDict = logging.getLogger().manager.loggerDict
+ loggerDict.clear()
+ loggerDict.update(self.saved_loggers)
+ finally:
+ logging._releaseLock()
- def serve_until_stopped(self):
- while not self.abort:
- rd, wr, ex = select.select([self.socket.fileno()], [], [],
- self.timeout)
- if rd:
- self.handle_request()
- #notify the main thread that we're about to exit
- socketDataProcessed.set()
- # close the listen socket
- self.server_close()
+ def assert_log_lines(self, expected_values, stream=None):
+ """Match the collected log lines against the regular expression
+ self.expected_log_pat, and compare the extracted group values to
+ the expected_values list of tuples."""
+ stream = stream or self.stream
+ pat = re.compile(self.expected_log_pat)
+ try:
+ stream.reset()
+ actual_lines = stream.readlines()
+ except AttributeError:
+ # StringIO.StringIO lacks a reset() method.
+ actual_lines = stream.getvalue().splitlines()
+ self.assertEquals(len(actual_lines), len(expected_values))
+ for actual, expected in zip(actual_lines, expected_values):
+ match = pat.search(actual)
+ if not match:
+ self.fail("Log line does not match expected pattern:\n" +
+ actual)
+ self.assertEquals(tuple(match.groups()), expected)
+ s = stream.read()
+ if s:
+ self.fail("Remaining output at end of log stream:\n" + s)
+
+ def next_message(self):
+ """Generate a message consisting solely of an auto-incrementing
+ integer."""
+ self.message_num += 1
+ return "%d" % self.message_num
+
+
+class BuiltinLevelsTest(BaseTest):
+ """Test builtin levels and their inheritance."""
+
+ def test_flat(self):
+ #Logging levels in a flat logger namespace.
+ m = self.next_message
+
+ ERR = logging.getLogger("ERR")
+ ERR.setLevel(logging.ERROR)
+ INF = logging.getLogger("INF")
+ INF.setLevel(logging.INFO)
+ DEB = logging.getLogger("DEB")
+ DEB.setLevel(logging.DEBUG)
+
+ # These should log.
+ ERR.log(logging.CRITICAL, m())
+ ERR.error(m())
+
+ INF.log(logging.CRITICAL, m())
+ INF.error(m())
+ INF.warn(m())
+ INF.info(m())
+
+ DEB.log(logging.CRITICAL, m())
+ DEB.error(m())
+ DEB.warn (m())
+ DEB.info (m())
+ DEB.debug(m())
+
+ # These should not log.
+ ERR.warn(m())
+ ERR.info(m())
+ ERR.debug(m())
+
+ INF.debug(m())
+
+ self.assert_log_lines([
+ ('ERR', 'CRITICAL', '1'),
+ ('ERR', 'ERROR', '2'),
+ ('INF', 'CRITICAL', '3'),
+ ('INF', 'ERROR', '4'),
+ ('INF', 'WARNING', '5'),
+ ('INF', 'INFO', '6'),
+ ('DEB', 'CRITICAL', '7'),
+ ('DEB', 'ERROR', '8'),
+ ('DEB', 'WARNING', '9'),
+ ('DEB', 'INFO', '10'),
+ ('DEB', 'DEBUG', '11'),
+ ])
+
+ def test_nested_explicit(self):
+ # Logging levels in a nested namespace, all explicitly set.
+ m = self.next_message
+
+ INF = logging.getLogger("INF")
+ INF.setLevel(logging.INFO)
+ INF_ERR = logging.getLogger("INF.ERR")
+ INF_ERR.setLevel(logging.ERROR)
+
+ # These should log.
+ INF_ERR.log(logging.CRITICAL, m())
+ INF_ERR.error(m())
+
+ # These should not log.
+ INF_ERR.warn(m())
+ INF_ERR.info(m())
+ INF_ERR.debug(m())
+
+ self.assert_log_lines([
+ ('INF.ERR', 'CRITICAL', '1'),
+ ('INF.ERR', 'ERROR', '2'),
+ ])
+
+ def test_nested_inherited(self):
+ #Logging levels in a nested namespace, inherited from parent loggers.
+ m = self.next_message
+
+ INF = logging.getLogger("INF")
+ INF.setLevel(logging.INFO)
+ INF_ERR = logging.getLogger("INF.ERR")
+ INF_ERR.setLevel(logging.ERROR)
+ INF_UNDEF = logging.getLogger("INF.UNDEF")
+ INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
+ UNDEF = logging.getLogger("UNDEF")
+
+ # These should log.
+ INF_UNDEF.log(logging.CRITICAL, m())
+ INF_UNDEF.error(m())
+ INF_UNDEF.warn(m())
+ INF_UNDEF.info(m())
+ INF_ERR_UNDEF.log(logging.CRITICAL, m())
+ INF_ERR_UNDEF.error(m())
+
+ # These should not log.
+ INF_UNDEF.debug(m())
+ INF_ERR_UNDEF.warn(m())
+ INF_ERR_UNDEF.info(m())
+ INF_ERR_UNDEF.debug(m())
+
+ self.assert_log_lines([
+ ('INF.UNDEF', 'CRITICAL', '1'),
+ ('INF.UNDEF', 'ERROR', '2'),
+ ('INF.UNDEF', 'WARNING', '3'),
+ ('INF.UNDEF', 'INFO', '4'),
+ ('INF.ERR.UNDEF', 'CRITICAL', '5'),
+ ('INF.ERR.UNDEF', 'ERROR', '6'),
+ ])
+
+ def test_nested_with_virtual_parent(self):
+ # Logging levels when some parent does not exist yet.
+ m = self.next_message
+
+ INF = logging.getLogger("INF")
+ GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
+ CHILD = logging.getLogger("INF.BADPARENT")
+ INF.setLevel(logging.INFO)
+
+ # These should log.
+ GRANDCHILD.log(logging.FATAL, m())
+ GRANDCHILD.info(m())
+ CHILD.log(logging.FATAL, m())
+ CHILD.info(m())
+
+ # These should not log.
+ GRANDCHILD.debug(m())
+ CHILD.debug(m())
+
+ self.assert_log_lines([
+ ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
+ ('INF.BADPARENT.UNDEF', 'INFO', '2'),
+ ('INF.BADPARENT', 'CRITICAL', '3'),
+ ('INF.BADPARENT', 'INFO', '4'),
+ ])
+
+
+class BasicFilterTest(BaseTest):
+
+ """Test the bundled Filter class."""
+
+ def test_filter(self):
+ # Only messages satisfying the specified criteria pass through the
+ # filter.
+ filter_ = logging.Filter("spam.eggs")
+ handler = self.root_logger.handlers[0]
+ try:
+ handler.addFilter(filter_)
+ spam = logging.getLogger("spam")
+ spam_eggs = logging.getLogger("spam.eggs")
+ spam_eggs_fish = logging.getLogger("spam.eggs.fish")
+ spam_bakedbeans = logging.getLogger("spam.bakedbeans")
+
+ spam.info(self.next_message())
+ spam_eggs.info(self.next_message()) # Good.
+ spam_eggs_fish.info(self.next_message()) # Good.
+ spam_bakedbeans.info(self.next_message())
+
+ self.assert_log_lines([
+ ('spam.eggs', 'INFO', '2'),
+ ('spam.eggs.fish', 'INFO', '3'),
+ ])
+ finally:
+ handler.removeFilter(filter_)
- def process_request(self, request, client_address):
- #import threading
- t = threading.Thread(target = self.finish_request,
- args = (request, client_address))
- t.start()
-
-def runTCP(tcpserver):
- tcpserver.serve_until_stopped()
-
-#----------------------------------------------------------------------------
-# Test 0
-#----------------------------------------------------------------------------
-
-msgcount = 0
-
-def nextmessage():
- global msgcount
- rv = "Message %d" % msgcount
- msgcount = msgcount + 1
- return rv
-
-def test0():
- ERR = logging.getLogger("ERR")
- ERR.setLevel(logging.ERROR)
- INF = logging.getLogger("INF")
- INF.setLevel(logging.INFO)
- INF_ERR = logging.getLogger("INF.ERR")
- INF_ERR.setLevel(logging.ERROR)
- DEB = logging.getLogger("DEB")
- DEB.setLevel(logging.DEBUG)
-
- INF_UNDEF = logging.getLogger("INF.UNDEF")
- INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
- UNDEF = logging.getLogger("UNDEF")
-
- GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
- CHILD = logging.getLogger("INF.BADPARENT")
-
- #These should log
- ERR.log(logging.FATAL, nextmessage())
- ERR.error(nextmessage())
-
- INF.log(logging.FATAL, nextmessage())
- INF.error(nextmessage())
- INF.warn(nextmessage())
- INF.info(nextmessage())
-
- INF_UNDEF.log(logging.FATAL, nextmessage())
- INF_UNDEF.error(nextmessage())
- INF_UNDEF.warn (nextmessage())
- INF_UNDEF.info (nextmessage())
-
- INF_ERR.log(logging.FATAL, nextmessage())
- INF_ERR.error(nextmessage())
-
- INF_ERR_UNDEF.log(logging.FATAL, nextmessage())
- INF_ERR_UNDEF.error(nextmessage())
-
- DEB.log(logging.FATAL, nextmessage())
- DEB.error(nextmessage())
- DEB.warn (nextmessage())
- DEB.info (nextmessage())
- DEB.debug(nextmessage())
-
- UNDEF.log(logging.FATAL, nextmessage())
- UNDEF.error(nextmessage())
- UNDEF.warn (nextmessage())
- UNDEF.info (nextmessage())
-
- GRANDCHILD.log(logging.FATAL, nextmessage())
- CHILD.log(logging.FATAL, nextmessage())
-
- #These should not log
- ERR.warn(nextmessage())
- ERR.info(nextmessage())
- ERR.debug(nextmessage())
-
- INF.debug(nextmessage())
- INF_UNDEF.debug(nextmessage())
-
- INF_ERR.warn(nextmessage())
- INF_ERR.info(nextmessage())
- INF_ERR.debug(nextmessage())
- INF_ERR_UNDEF.warn(nextmessage())
- INF_ERR_UNDEF.info(nextmessage())
- INF_ERR_UNDEF.debug(nextmessage())
-
- INF.info(FINISH_UP)
-
-#----------------------------------------------------------------------------
-# Test 1
-#----------------------------------------------------------------------------
#
# First, we define our levels. There can be as many as you want - the only
@@ -211,16 +306,16 @@ def test0():
# mapping dictionary to convert between your application levels and the
# logging system.
#
-SILENT = 10
-TACITURN = 9
-TERSE = 8
-EFFUSIVE = 7
-SOCIABLE = 6
-VERBOSE = 5
-TALKATIVE = 4
-GARRULOUS = 3
-CHATTERBOX = 2
-BORING = 1
+SILENT = 120
+TACITURN = 119
+TERSE = 118
+EFFUSIVE = 117
+SOCIABLE = 116
+VERBOSE = 115
+TALKATIVE = 114
+GARRULOUS = 113
+CHATTERBOX = 112
+BORING = 111
LEVEL_RANGE = range(BORING, SILENT + 1)
@@ -241,445 +336,594 @@ my_logging_levels = {
BORING : 'Boring',
}
-#
-# Now, to demonstrate filtering: suppose for some perverse reason we only
-# want to print out all except GARRULOUS messages. Let's create a filter for
-# this purpose...
-#
-class SpecificLevelFilter(logging.Filter):
- def __init__(self, lvl):
- self.level = lvl
+class GarrulousFilter(logging.Filter):
- def filter(self, record):
- return self.level != record.levelno
+ """A filter which blocks garrulous messages."""
-class GarrulousFilter(SpecificLevelFilter):
- def __init__(self):
- SpecificLevelFilter.__init__(self, GARRULOUS)
+ def filter(self, record):
+ return record.levelno != GARRULOUS
-#
-# Now, let's demonstrate filtering at the logger. This time, use a filter
-# which excludes SOCIABLE and TACITURN messages. Note that GARRULOUS events
-# are still excluded.
-#
class VerySpecificFilter(logging.Filter):
+
+ """A filter which blocks sociable and taciturn messages."""
+
def filter(self, record):
return record.levelno not in [SOCIABLE, TACITURN]
-def message(s):
- sys.stdout.write("%s\n" % s)
-SHOULD1 = "This should only be seen at the '%s' logging level (or lower)"
+class CustomLevelsAndFiltersTest(BaseTest):
-def test1():
-#
-# Now, tell the logging system to associate names with our levels.
-#
- for lvl in my_logging_levels.keys():
- logging.addLevelName(lvl, my_logging_levels[lvl])
+ """Test various filtering possibilities with custom logging levels."""
-#
-# Now, define a test function which logs an event at each of our levels.
-#
+ # Skip the logger name group.
+ expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
- def doLog(log):
+ def setUp(self):
+ BaseTest.setUp(self)
+ for k, v in my_logging_levels.items():
+ logging.addLevelName(k, v)
+
+ def log_at_all_levels(self, logger):
for lvl in LEVEL_RANGE:
- log.log(lvl, SHOULD1, logging.getLevelName(lvl))
+ logger.log(lvl, self.next_message())
+
+ def test_logger_filter(self):
+ # Filter at logger level.
+ self.root_logger.setLevel(VERBOSE)
+ # Levels >= 'Verbose' are good.
+ self.log_at_all_levels(self.root_logger)
+ self.assert_log_lines([
+ ('Verbose', '5'),
+ ('Sociable', '6'),
+ ('Effusive', '7'),
+ ('Terse', '8'),
+ ('Taciturn', '9'),
+ ('Silent', '10'),
+ ])
+
+ def test_handler_filter(self):
+ # Filter at handler level.
+ self.root_logger.handlers[0].setLevel(SOCIABLE)
+ try:
+ # Levels >= 'Sociable' are good.
+ self.log_at_all_levels(self.root_logger)
+ self.assert_log_lines([
+ ('Sociable', '6'),
+ ('Effusive', '7'),
+ ('Terse', '8'),
+ ('Taciturn', '9'),
+ ('Silent', '10'),
+ ])
+ finally:
+ self.root_logger.handlers[0].setLevel(logging.NOTSET)
+
+ def test_specific_filters(self):
+ # Set a specific filter object on the handler, and then add another
+ # filter object on the logger itself.
+ handler = self.root_logger.handlers[0]
+ specific_filter = None
+ garr = GarrulousFilter()
+ handler.addFilter(garr)
+ try:
+ self.log_at_all_levels(self.root_logger)
+ first_lines = [
+ # Notice how 'Garrulous' is missing
+ ('Boring', '1'),
+ ('Chatterbox', '2'),
+ ('Talkative', '4'),
+ ('Verbose', '5'),
+ ('Sociable', '6'),
+ ('Effusive', '7'),
+ ('Terse', '8'),
+ ('Taciturn', '9'),
+ ('Silent', '10'),
+ ]
+ self.assert_log_lines(first_lines)
+
+ specific_filter = VerySpecificFilter()
+ self.root_logger.addFilter(specific_filter)
+ self.log_at_all_levels(self.root_logger)
+ self.assert_log_lines(first_lines + [
+ # Not only 'Garrulous' is still missing, but also 'Sociable'
+ # and 'Taciturn'
+ ('Boring', '11'),
+ ('Chatterbox', '12'),
+ ('Talkative', '14'),
+ ('Verbose', '15'),
+ ('Effusive', '17'),
+ ('Terse', '18'),
+ ('Silent', '20'),
+ ])
+ finally:
+ if specific_filter:
+ self.root_logger.removeFilter(specific_filter)
+ handler.removeFilter(garr)
+
+
+class MemoryHandlerTest(BaseTest):
+
+ """Tests for the MemoryHandler."""
+
+ # Do not bother with a logger name group.
+ expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+
+ def setUp(self):
+ BaseTest.setUp(self)
+ self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
+ self.root_hdlr)
+ self.mem_logger = logging.getLogger('mem')
+ self.mem_logger.propagate = 0
+ self.mem_logger.addHandler(self.mem_hdlr)
+
+ def tearDown(self):
+ self.mem_hdlr.close()
+ BaseTest.tearDown(self)
+
+ def test_flush(self):
+ # The memory handler flushes to its target handler based on specific
+ # criteria (message count and message level).
+ self.mem_logger.debug(self.next_message())
+ self.assert_log_lines([])
+ self.mem_logger.info(self.next_message())
+ self.assert_log_lines([])
+ # This will flush because the level is >= logging.WARNING
+ self.mem_logger.warn(self.next_message())
+ lines = [
+ ('DEBUG', '1'),
+ ('INFO', '2'),
+ ('WARNING', '3'),
+ ]
+ self.assert_log_lines(lines)
+ for n in (4, 14):
+ for i in range(9):
+ self.mem_logger.debug(self.next_message())
+ self.assert_log_lines(lines)
+ # This will flush because it's the 10th message since the last
+ # flush.
+ self.mem_logger.debug(self.next_message())
+ lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
+ self.assert_log_lines(lines)
+
+ self.mem_logger.debug(self.next_message())
+ self.assert_log_lines(lines)
+
+
+class ExceptionFormatter(logging.Formatter):
+ """A special exception formatter."""
+ def formatException(self, ei):
+ return "Got a [%s]" % ei[0].__name__
- log = logging.getLogger("")
- hdlr = log.handlers[0]
-#
-# Set the logging level to each different value and call the utility
-# function to log events.
-# In the output, you should see that each time round the loop, the number of
-# logging events which are actually output decreases.
-#
- for lvl in LEVEL_RANGE:
- message("-- setting logging level to '%s' -----" %
- logging.getLevelName(lvl))
- log.setLevel(lvl)
- doLog(log)
- #
- # Now, we demonstrate level filtering at the handler level. Tell the
- # handler defined above to filter at level 'SOCIABLE', and repeat the
- # above loop. Compare the output from the two runs.
- #
- hdlr.setLevel(SOCIABLE)
- message("-- Filtering at handler level to SOCIABLE --")
- for lvl in LEVEL_RANGE:
- message("-- setting logging level to '%s' -----" %
- logging.getLevelName(lvl))
- log.setLevel(lvl)
- doLog(log)
-
- hdlr.setLevel(0) #turn off level filtering at the handler
-
- garr = GarrulousFilter()
- hdlr.addFilter(garr)
- message("-- Filtering using GARRULOUS filter --")
- for lvl in LEVEL_RANGE:
- message("-- setting logging level to '%s' -----" %
- logging.getLevelName(lvl))
- log.setLevel(lvl)
- doLog(log)
- spec = VerySpecificFilter()
- log.addFilter(spec)
- message("-- Filtering using specific filter for SOCIABLE, TACITURN --")
- for lvl in LEVEL_RANGE:
- message("-- setting logging level to '%s' -----" %
- logging.getLevelName(lvl))
- log.setLevel(lvl)
- doLog(log)
-
- log.removeFilter(spec)
- hdlr.removeFilter(garr)
- #Undo the one level which clashes...for regression tests
- logging.addLevelName(logging.DEBUG, "DEBUG")
-
-#----------------------------------------------------------------------------
-# Test 2
-#----------------------------------------------------------------------------
-
-MSG = "-- logging %d at INFO, messages should be seen every 10 events --"
-def test2():
- logger = logging.getLogger("")
- sh = logger.handlers[0]
- sh.close()
- logger.removeHandler(sh)
- mh = logging.handlers.MemoryHandler(10,logging.WARNING, sh)
- logger.setLevel(logging.DEBUG)
- logger.addHandler(mh)
- message("-- logging at DEBUG, nothing should be seen yet --")
- logger.debug("Debug message")
- message("-- logging at INFO, nothing should be seen yet --")
- logger.info("Info message")
- message("-- logging at WARNING, 3 messages should be seen --")
- logger.warn("Warn message")
- for i in xrange(102):
- message(MSG % i)
- logger.info("Info index = %d", i)
- mh.close()
- logger.removeHandler(mh)
- logger.addHandler(sh)
-
-#----------------------------------------------------------------------------
-# Test 3
-#----------------------------------------------------------------------------
-
-FILTER = "a.b"
-
-def doLog3():
- logging.getLogger("a").info("Info 1")
- logging.getLogger("a.b").info("Info 2")
- logging.getLogger("a.c").info("Info 3")
- logging.getLogger("a.b.c").info("Info 4")
- logging.getLogger("a.b.c.d").info("Info 5")
- logging.getLogger("a.bb.c").info("Info 6")
- logging.getLogger("b").info("Info 7")
- logging.getLogger("b.a").info("Info 8")
- logging.getLogger("c.a.b").info("Info 9")
- logging.getLogger("a.bb").info("Info 10")
-
-def test3():
- root = logging.getLogger()
- root.setLevel(logging.DEBUG)
- hand = root.handlers[0]
- message("Unfiltered...")
- doLog3()
- message("Filtered with '%s'..." % FILTER)
- filt = logging.Filter(FILTER)
- hand.addFilter(filt)
- doLog3()
- hand.removeFilter(filt)
-
-#----------------------------------------------------------------------------
-# Test 4
-#----------------------------------------------------------------------------
-
-# config0 is a standard configuration.
-config0 = """
-[loggers]
-keys=root
-
-[handlers]
-keys=hand1
-
-[formatters]
-keys=form1
-
-[logger_root]
-level=NOTSET
-handlers=hand1
-
-[handler_hand1]
-class=StreamHandler
-level=NOTSET
-formatter=form1
-args=(sys.stdout,)
-
-[formatter_form1]
-format=%(levelname)s:%(name)s:%(message)s
-datefmt=
-"""
-# config1 adds a little to the standard configuration.
-config1 = """
-[loggers]
-keys=root,parser
+class ConfigFileTest(BaseTest):
-[handlers]
-keys=hand1
+ """Reading logging config from a .ini-style config file."""
-[formatters]
-keys=form1
+ expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
-[logger_root]
-level=NOTSET
-handlers=hand1
+ # config0 is a standard configuration.
+ config0 = """
+ [loggers]
+ keys=root
-[logger_parser]
-level=DEBUG
-handlers=hand1
-propagate=1
-qualname=compiler.parser
+ [handlers]
+ keys=hand1
-[handler_hand1]
-class=StreamHandler
-level=NOTSET
-formatter=form1
-args=(sys.stdout,)
+ [formatters]
+ keys=form1
-[formatter_form1]
-format=%(levelname)s:%(name)s:%(message)s
-datefmt=
-"""
+ [logger_root]
+ level=WARNING
+ handlers=hand1
+
+ [handler_hand1]
+ class=StreamHandler
+ level=NOTSET
+ formatter=form1
+ args=(sys.stdout,)
-# config2 has a subtle configuration error that should be reported
-config2 = string.replace(config1, "sys.stdout", "sys.stbout")
+ [formatter_form1]
+ format=%(levelname)s ++ %(message)s
+ datefmt=
+ """
-# config3 has a less subtle configuration error
-config3 = string.replace(
- config1, "formatter=form1", "formatter=misspelled_name")
+ # config1 adds a little to the standard configuration.
+ config1 = """
+ [loggers]
+ keys=root,parser
-def test4():
- for i in range(4):
- conf = globals()['config%d' % i]
- sys.stdout.write('config%d: ' % i)
- loggerDict = logging.getLogger().manager.loggerDict
- logging._acquireLock()
- try:
- saved_handlers = logging._handlers.copy()
- saved_handler_list = logging._handlerList[:]
- saved_loggers = loggerDict.copy()
- finally:
- logging._releaseLock()
+ [handlers]
+ keys=hand1
+
+ [formatters]
+ keys=form1
+
+ [logger_root]
+ level=WARNING
+ handlers=
+
+ [logger_parser]
+ level=DEBUG
+ handlers=hand1
+ propagate=1
+ qualname=compiler.parser
+
+ [handler_hand1]
+ class=StreamHandler
+ level=NOTSET
+ formatter=form1
+ args=(sys.stdout,)
+
+ [formatter_form1]
+ format=%(levelname)s ++ %(message)s
+ datefmt=
+ """
+
+ # config2 has a subtle configuration error that should be reported
+ config2 = config1.replace("sys.stdout", "sys.stbout")
+
+ # config3 has a less subtle configuration error
+ config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
+
+ # config4 specifies a custom formatter class to be loaded
+ config4 = """
+ [loggers]
+ keys=root
+
+ [handlers]
+ keys=hand1
+
+ [formatters]
+ keys=form1
+
+ [logger_root]
+ level=NOTSET
+ handlers=hand1
+
+ [handler_hand1]
+ class=StreamHandler
+ level=NOTSET
+ formatter=form1
+ args=(sys.stdout,)
+
+ [formatter_form1]
+ class=""" + __name__ + """.ExceptionFormatter
+ format=%(levelname)s:%(name)s:%(message)s
+ datefmt=
+ """
+
+ # config5 specifies a custom handler class to be loaded
+ config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
+
+ # config6 uses ', ' delimiters in the handlers and formatters sections
+ config6 = """
+ [loggers]
+ keys=root,parser
+
+ [handlers]
+ keys=hand1, hand2
+
+ [formatters]
+ keys=form1, form2
+
+ [logger_root]
+ level=WARNING
+ handlers=
+
+ [logger_parser]
+ level=DEBUG
+ handlers=hand1
+ propagate=1
+ qualname=compiler.parser
+
+ [handler_hand1]
+ class=StreamHandler
+ level=NOTSET
+ formatter=form1
+ args=(sys.stdout,)
+
+ [handler_hand2]
+ class=StreamHandler
+ level=NOTSET
+ formatter=form1
+ args=(sys.stderr,)
+
+ [formatter_form1]
+ format=%(levelname)s ++ %(message)s
+ datefmt=
+
+ [formatter_form2]
+ format=%(message)s
+ datefmt=
+ """
+
+ def apply_config(self, conf):
try:
fn = tempfile.mktemp(".ini")
f = open(fn, "w")
- f.write(conf)
+ f.write(textwrap.dedent(conf))
f.close()
- try:
- logging.config.fileConfig(fn)
- #call again to make sure cleanup is correct
- logging.config.fileConfig(fn)
- except:
- t = sys.exc_info()[0]
- message(str(t))
- else:
- message('ok.')
- os.remove(fn)
+ logging.config.fileConfig(fn)
finally:
- logging._acquireLock()
+ os.remove(fn)
+
+ def test_config0_ok(self):
+ # A simple config file which overrides the default settings.
+ with captured_stdout() as output:
+ self.apply_config(self.config0)
+ logger = logging.getLogger()
+ # Won't output anything
+ logger.info(self.next_message())
+ # Outputs a message
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_config1_ok(self, config=config1):
+ # A config file defining a sub-parser as well.
+ with captured_stdout() as output:
+ self.apply_config(config)
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_config2_failure(self):
+ # A simple config file which overrides the default settings.
+ self.assertRaises(StandardError, self.apply_config, self.config2)
+
+ def test_config3_failure(self):
+ # A simple config file which overrides the default settings.
+ self.assertRaises(StandardError, self.apply_config, self.config3)
+
+ def test_config4_ok(self):
+ # A config file specifying a custom formatter class.
+ with captured_stdout() as output:
+ self.apply_config(self.config4)
+ logger = logging.getLogger()
try:
- logging._handlers.clear()
- logging._handlers.update(saved_handlers)
- logging._handlerList[:] = saved_handler_list
- loggerDict = logging.getLogger().manager.loggerDict
- loggerDict.clear()
- loggerDict.update(saved_loggers)
- finally:
- logging._releaseLock()
+ raise RuntimeError()
+ except RuntimeError:
+ logging.exception("just testing")
+ sys.stdout.seek(0)
+ self.assertEquals(output.getvalue(),
+ "ERROR:root:just testing\nGot a [RuntimeError]\n")
+ # Original logger output is empty
+ self.assert_log_lines([])
-#----------------------------------------------------------------------------
-# Test 5
-#----------------------------------------------------------------------------
+ def test_config5_ok(self):
+ self.test_config1_ok(config=self.config5)
-test5_config = """
-[loggers]
-keys=root
+ def test_config6_ok(self):
+ self.test_config1_ok(config=self.config6)
-[handlers]
-keys=hand1
+class LogRecordStreamHandler(StreamRequestHandler):
-[formatters]
-keys=form1
+ """Handler for a streaming logging request. It saves the log message in the
+ TCP server's 'log_output' attribute."""
-[logger_root]
-level=NOTSET
-handlers=hand1
+ TCP_LOG_END = "!!!END!!!"
-[handler_hand1]
-class=StreamHandler
-level=NOTSET
-formatter=form1
-args=(sys.stdout,)
+ def handle(self):
+ """Handle multiple requests - each expected to be of 4-byte length,
+ followed by the LogRecord in pickle format. Logs the record
+ according to whatever policy is configured locally."""
+ while True:
+ chunk = self.connection.recv(4)
+ if len(chunk) < 4:
+ break
+ slen = struct.unpack(">L", chunk)[0]
+ chunk = self.connection.recv(slen)
+ while len(chunk) < slen:
+ chunk = chunk + self.connection.recv(slen - len(chunk))
+ obj = self.unpickle(chunk)
+ record = logging.makeLogRecord(obj)
+ self.handle_log_record(record)
+
+ def unpickle(self, data):
+ return cPickle.loads(data)
-[formatter_form1]
-class=test.test_logging.FriendlyFormatter
-format=%(levelname)s:%(name)s:%(message)s
-datefmt=
-"""
+ def handle_log_record(self, record):
+ # If the end-of-messages sentinel is seen, tell the server to
+ # terminate.
+ if self.TCP_LOG_END in record.msg:
+ self.server.abort = 1
+ return
+ self.server.log_output += record.msg + "\n"
-class FriendlyFormatter (logging.Formatter):
- def formatException(self, ei):
- return "%s... Don't panic!" % str(ei[0])
-
-
-def test5():
- loggerDict = logging.getLogger().manager.loggerDict
- logging._acquireLock()
- try:
- saved_handlers = logging._handlers.copy()
- saved_handler_list = logging._handlerList[:]
- saved_loggers = loggerDict.copy()
- finally:
- logging._releaseLock()
- try:
- fn = tempfile.mktemp(".ini")
- f = open(fn, "w")
- f.write(test5_config)
- f.close()
- logging.config.fileConfig(fn)
- try:
- raise KeyError
- except KeyError:
- logging.exception("just testing")
- os.remove(fn)
- hdlr = logging.getLogger().handlers[0]
- logging.getLogger().handlers.remove(hdlr)
- finally:
- logging._acquireLock()
- try:
- logging._handlers.clear()
- logging._handlers.update(saved_handlers)
- logging._handlerList[:] = saved_handler_list
- loggerDict = logging.getLogger().manager.loggerDict
- loggerDict.clear()
- loggerDict.update(saved_loggers)
- finally:
- logging._releaseLock()
+class LogRecordSocketReceiver(ThreadingTCPServer):
+
+ """A simple-minded TCP socket-based logging receiver suitable for test
+ purposes."""
-#----------------------------------------------------------------------------
-# Test Harness
-#----------------------------------------------------------------------------
-def banner(nm, typ):
- sep = BANNER % (nm, typ)
- sys.stdout.write(sep)
- sys.stdout.flush()
-
-def test_main_inner():
- rootLogger = logging.getLogger("")
- rootLogger.setLevel(logging.DEBUG)
- hdlr = logging.StreamHandler(sys.stdout)
- fmt = logging.Formatter(logging.BASIC_FORMAT)
- hdlr.setFormatter(fmt)
- rootLogger.addHandler(hdlr)
-
- # Find an unused port number
- port = logging.handlers.DEFAULT_TCP_LOGGING_PORT
- while port < logging.handlers.DEFAULT_TCP_LOGGING_PORT+100:
+ allow_reuse_address = 1
+ log_output = ""
+
+ def __init__(self, host='localhost',
+ port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
+ handler=LogRecordStreamHandler):
+ ThreadingTCPServer.__init__(self, (host, port), handler)
+ self.abort = False
+ self.timeout = 0.1
+ self.finished = threading.Event()
+
+ def serve_until_stopped(self):
+ while not self.abort:
+ rd, wr, ex = select.select([self.socket.fileno()], [], [],
+ self.timeout)
+ if rd:
+ self.handle_request()
+ # Notify the main thread that we're about to exit
+ self.finished.set()
+ # close the listen socket
+ self.server_close()
+
+
+class SocketHandlerTest(BaseTest):
+
+ """Test for SocketHandler objects."""
+
+ def setUp(self):
+ """Set up a TCP server to receive log messages, and a SocketHandler
+ pointing to that server's address and port."""
+ BaseTest.setUp(self)
+ self.tcpserver = LogRecordSocketReceiver(port=0)
+ self.port = self.tcpserver.socket.getsockname()[1]
+ self.threads = [
+ threading.Thread(target=self.tcpserver.serve_until_stopped)]
+ for thread in self.threads:
+ thread.start()
+
+ self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
+ self.sock_hdlr.setFormatter(self.root_formatter)
+ self.root_logger.removeHandler(self.root_logger.handlers[0])
+ self.root_logger.addHandler(self.sock_hdlr)
+
+ def tearDown(self):
+ """Shutdown the TCP server."""
try:
- tcpserver = LogRecordSocketReceiver(port=port)
- except socket.error:
- port += 1
- else:
- break
- else:
- raise ImportError, "Could not find unused port"
-
-
- #Set up a handler such that all events are sent via a socket to the log
- #receiver (logrecv).
- #The handler will only be added to the rootLogger for some of the tests
- shdlr = logging.handlers.SocketHandler('localhost', port)
-
- #Configure the logger for logrecv so events do not propagate beyond it.
- #The sockLogger output is buffered in memory until the end of the test,
- #and printed at the end.
- sockOut = cStringIO.StringIO()
- sockLogger = logging.getLogger("logrecv")
- sockLogger.setLevel(logging.DEBUG)
- sockhdlr = logging.StreamHandler(sockOut)
- sockhdlr.setFormatter(logging.Formatter(
- "%(name)s -> %(levelname)s: %(message)s"))
- sockLogger.addHandler(sockhdlr)
- sockLogger.propagate = 0
-
- #Set up servers
- threads = []
- #sys.stdout.write("About to start TCP server...\n")
- threads.append(threading.Thread(target=runTCP, args=(tcpserver,)))
-
- for thread in threads:
- thread.start()
- try:
- banner("log_test0", "begin")
-
- rootLogger.addHandler(shdlr)
- test0()
- # XXX(nnorwitz): Try to fix timing related test failures.
- # This sleep gives us some extra time to read messages.
- # The test generally only fails on Solaris without this sleep.
- time.sleep(2.0)
- shdlr.close()
- rootLogger.removeHandler(shdlr)
-
- banner("log_test0", "end")
-
- for t in range(1,6):
- banner("log_test%d" % t, "begin")
- globals()['test%d' % t]()
- banner("log_test%d" % t, "end")
-
- finally:
- #wait for TCP receiver to terminate
- socketDataProcessed.wait()
- # ensure the server dies
- tcpserver.abort = 1
- for thread in threads:
- thread.join(2.0)
- banner("logrecv output", "begin")
- sys.stdout.write(sockOut.getvalue())
- sockOut.close()
- sockLogger.removeHandler(sockhdlr)
- sockhdlr.close()
- banner("logrecv output", "end")
- sys.stdout.flush()
+ self.tcpserver.abort = True
+ del self.tcpserver
+ self.root_logger.removeHandler(self.sock_hdlr)
+ self.sock_hdlr.close()
+ for thread in self.threads:
+ thread.join(2.0)
+ finally:
+ BaseTest.tearDown(self)
+
+ def get_output(self):
+ """Get the log output as received by the TCP server."""
+ # Signal the TCP receiver and wait for it to terminate.
+ self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
+ self.tcpserver.finished.wait(2.0)
+ return self.tcpserver.log_output
+
+ def test_output(self):
+ # The log message sent to the SocketHandler is properly received.
+ logger = logging.getLogger("tcp")
+ logger.error("spam")
+ logger.debug("eggs")
+ self.assertEquals(self.get_output(), "spam\neggs\n")
+
+
+class MemoryTest(BaseTest):
+
+ """Test memory persistence of logger objects."""
+
+ def setUp(self):
+ """Create a dict to remember potentially destroyed objects."""
+ BaseTest.setUp(self)
+ self._survivors = {}
+
+ def _watch_for_survival(self, *args):
+ """Watch the given objects for survival, by creating weakrefs to
+ them."""
+ for obj in args:
+ key = id(obj), repr(obj)
+ self._survivors[key] = weakref.ref(obj)
+
+ def _assert_survival(self):
+ """Assert that all objects watched for survival have survived."""
+ # Trigger cycle breaking.
+ gc.collect()
+ dead = []
+ for (id_, repr_), ref in self._survivors.items():
+ if ref() is None:
+ dead.append(repr_)
+ if dead:
+ self.fail("%d objects should have survived "
+ "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
+
+ def test_persistent_loggers(self):
+ # Logger objects are persistent and retain their configuration, even
+ # if visible references are destroyed.
+ self.root_logger.setLevel(logging.INFO)
+ foo = logging.getLogger("foo")
+ self._watch_for_survival(foo)
+ foo.setLevel(logging.DEBUG)
+ self.root_logger.debug(self.next_message())
+ foo.debug(self.next_message())
+ self.assert_log_lines([
+ ('foo', 'DEBUG', '2'),
+ ])
+ del foo
+ # foo has survived.
+ self._assert_survival()
+ # foo has retained its settings.
+ bar = logging.getLogger("foo")
+ bar.debug(self.next_message())
+ self.assert_log_lines([
+ ('foo', 'DEBUG', '2'),
+ ('foo', 'DEBUG', '3'),
+ ])
+
+
+class EncodingTest(BaseTest):
+ def test_encoding_plain_file(self):
+ # In Python 2.x, a plain file object is treated as having no encoding.
+ log = logging.getLogger("test")
+ fn = tempfile.mktemp(".log")
+ # the non-ascii data we write to the log.
+ data = "foo\x80"
try:
- hdlr.close()
- except:
- pass
- rootLogger.removeHandler(hdlr)
+ handler = logging.FileHandler(fn)
+ log.addHandler(handler)
+ try:
+ # write non-ascii data to the log.
+ log.warning(data)
+ finally:
+ log.removeHandler(handler)
+ handler.close()
+ # check we wrote exactly those bytes, ignoring trailing \n etc
+ f = open(fn)
+ try:
+ self.failUnlessEqual(f.read().rstrip(), data)
+ finally:
+ f.close()
+ finally:
+ if os.path.isfile(fn):
+ os.remove(fn)
+
+ def test_encoding_cyrillic_unicode(self):
+ log = logging.getLogger("test")
+ #Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
+ message = u'\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
+ #Ensure it's written in a Cyrillic encoding
+ writer_class = codecs.getwriter('cp1251')
+ writer_class.encoding = 'cp1251'
+ stream = cStringIO.StringIO()
+ writer = writer_class(stream, 'strict')
+ handler = logging.StreamHandler(writer)
+ log.addHandler(handler)
+ try:
+ log.warning(message)
+ finally:
+ log.removeHandler(handler)
+ handler.close()
+ # check we wrote exactly those bytes, ignoring trailing \n etc
+ s = stream.getvalue()
+ #Compare against what the data should be when encoded in CP-1251
+ self.assertEqual(s, '\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
+
# Set the locale to the platform-dependent default. I have no idea
# why the test does this, but in any case we save the current locale
# first and restore it at the end.
@run_with_locale('LC_ALL', '')
def test_main():
- # Save and restore the original root logger level across the tests.
- # Otherwise, e.g., if any test using cookielib runs after test_logging,
- # cookielib's debug-level logger tries to log messages, leading to
- # confusing:
- # No handlers could be found for logger "cookielib"
- # output while the tests are running.
- root_logger = logging.getLogger("")
- original_logging_level = root_logger.getEffectiveLevel()
- try:
- test_main_inner()
- finally:
- root_logger.setLevel(original_logging_level)
+ run_unittest(BuiltinLevelsTest, BasicFilterTest,
+ CustomLevelsAndFiltersTest, MemoryHandlerTest,
+ ConfigFileTest, SocketHandlerTest, MemoryTest,
+ EncodingTest)
if __name__ == "__main__":
- sys.stdout.write("test_logging\n")
test_main()