summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_logging.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r--Lib/test/test_logging.py1315
1 files changed, 1289 insertions, 26 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index 9b36350..b62545c 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2001-2010 by Vinay Sajip. All Rights Reserved.
+# Copyright 2001-2011 by Vinay Sajip. All Rights Reserved.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose and without fee is hereby granted,
@@ -18,7 +18,7 @@
"""Test harness for the logging module. Run all tests.
-Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved.
+Copyright (C) 2001-2011 Vinay Sajip. All Rights Reserved.
"""
import logging
@@ -26,27 +26,29 @@ import logging.handlers
import logging.config
import codecs
-import copy
+import datetime
import pickle
import io
import gc
+import json
import os
+import queue
import re
import select
import socket
from socketserver import ThreadingTCPServer, StreamRequestHandler
-import string
import struct
import sys
import tempfile
from test.support import captured_stdout, run_with_locale, run_unittest
import textwrap
-import threading
-import time
-import types
import unittest
import warnings
import weakref
+try:
+ import threading
+except ImportError:
+ threading = None
class BaseTest(unittest.TestCase):
@@ -65,11 +67,21 @@ class BaseTest(unittest.TestCase):
try:
self.saved_handlers = logging._handlers.copy()
self.saved_handler_list = logging._handlerList[:]
- self.saved_loggers = logger_dict.copy()
+ self.saved_loggers = saved_loggers = logger_dict.copy()
self.saved_level_names = logging._levelNames.copy()
+ self.logger_states = logger_states = {}
+ for name in saved_loggers:
+ logger_states[name] = getattr(saved_loggers[name],
+ 'disabled', None)
finally:
logging._releaseLock()
+ # Set two unused loggers: one non-ASCII and one Unicode.
+ # This is to test correct operation when sorting existing
+ # loggers in the configuration code. See issue 8201.
+ self.logger1 = logging.getLogger("\xab\xd7\xbb")
+ self.logger2 = logging.getLogger("\u013f\u00d6\u0047")
+
self.root_logger = logging.getLogger("")
self.original_logging_level = self.root_logger.getEffectiveLevel()
@@ -78,13 +90,25 @@ class BaseTest(unittest.TestCase):
self.root_hdlr = logging.StreamHandler(self.stream)
self.root_formatter = logging.Formatter(self.log_format)
self.root_hdlr.setFormatter(self.root_formatter)
+ if self.logger1.hasHandlers():
+ hlist = self.logger1.handlers + self.root_logger.handlers
+ raise AssertionError('Unexpected handlers: %s' % hlist)
+ if self.logger2.hasHandlers():
+ hlist = self.logger2.handlers + self.root_logger.handlers
+ raise AssertionError('Unexpected handlers: %s' % hlist)
self.root_logger.addHandler(self.root_hdlr)
+ self.assertTrue(self.logger1.hasHandlers())
+ self.assertTrue(self.logger2.hasHandlers())
def tearDown(self):
"""Remove our logging stream, and restore the original logging
level."""
self.stream.close()
self.root_logger.removeHandler(self.root_hdlr)
+ while self.root_logger.handlers:
+ h = self.root_logger.handlers[0]
+ self.root_logger.removeHandler(h)
+ h.close()
self.root_logger.setLevel(self.original_logging_level)
logging._acquireLock()
try:
@@ -96,6 +120,10 @@ class BaseTest(unittest.TestCase):
loggerDict = logging.getLogger().manager.loggerDict
loggerDict.clear()
loggerDict.update(self.saved_loggers)
+ logger_states = self.logger_states
+ for name in self.logger_states:
+ if logger_states[name] is not None:
+ self.saved_loggers[name].disabled = logger_states[name]
finally:
logging._releaseLock()
@@ -111,7 +139,8 @@ class BaseTest(unittest.TestCase):
except AttributeError:
# StringIO.StringIO lacks a reset() method.
actual_lines = stream.getvalue().splitlines()
- self.assertEqual(len(actual_lines), len(expected_values))
+ self.assertEqual(len(actual_lines), len(expected_values),
+ '%s vs. %s' % (actual_lines, expected_values))
for actual, expected in zip(actual_lines, expected_values):
match = pat.search(actual)
if not match:
@@ -138,7 +167,7 @@ class BuiltinLevelsTest(BaseTest):
ERR = logging.getLogger("ERR")
ERR.setLevel(logging.ERROR)
- INF = logging.getLogger("INF")
+ INF = logging.LoggerAdapter(logging.getLogger("INF"), {})
INF.setLevel(logging.INFO)
DEB = logging.getLogger("DEB")
DEB.setLevel(logging.DEBUG)
@@ -292,6 +321,35 @@ class BasicFilterTest(BaseTest):
finally:
handler.removeFilter(filter_)
+ def test_callable_filter(self):
+ # Only messages satisfying the specified criteria pass through the
+ # filter.
+
+ def filterfunc(record):
+ parts = record.name.split('.')
+ prefix = '.'.join(parts[:2])
+ return prefix == 'spam.eggs'
+
+ handler = self.root_logger.handlers[0]
+ try:
+ handler.addFilter(filterfunc)
+ spam = logging.getLogger("spam")
+ spam_eggs = logging.getLogger("spam.eggs")
+ spam_eggs_fish = logging.getLogger("spam.eggs.fish")
+ spam_bakedbeans = logging.getLogger("spam.bakedbeans")
+
+ spam.info(self.next_message())
+ spam_eggs.info(self.next_message()) # Good.
+ spam_eggs_fish.info(self.next_message()) # Good.
+ spam_bakedbeans.info(self.next_message())
+
+ self.assert_log_lines([
+ ('spam.eggs', 'INFO', '2'),
+ ('spam.eggs.fish', 'INFO', '3'),
+ ])
+ finally:
+ handler.removeFilter(filterfunc)
+
#
# First, we define our levels. There can be as many as you want - the only
@@ -355,7 +413,7 @@ class CustomLevelsAndFiltersTest(BaseTest):
def setUp(self):
BaseTest.setUp(self)
- for k, v in list(my_logging_levels.items()):
+ for k, v in my_logging_levels.items():
logging.addLevelName(k, v)
def log_at_all_levels(self, logger):
@@ -702,14 +760,8 @@ class ConfigFileTest(BaseTest):
"""
def apply_config(self, conf):
- try:
- fn = tempfile.mktemp(".ini")
- f = open(fn, "w")
- f.write(textwrap.dedent(conf))
- f.close()
- logging.config.fileConfig(fn)
- finally:
- os.remove(fn)
+ file = io.StringIO(textwrap.dedent(conf))
+ logging.config.fileConfig(file)
def test_config0_ok(self):
# A simple config file which overrides the default settings.
@@ -876,6 +928,7 @@ class LogRecordSocketReceiver(ThreadingTCPServer):
self.server_close()
+@unittest.skipUnless(threading, 'Threading required for this test.')
class SocketHandlerTest(BaseTest):
"""Test for SocketHandler objects."""
@@ -944,7 +997,7 @@ class MemoryTest(BaseTest):
# Trigger cycle breaking.
gc.collect()
dead = []
- for (id_, repr_), ref in list(self._survivors.items()):
+ for (id_, repr_), ref in self._survivors.items():
if ref() is None:
dead.append(repr_)
if dead:
@@ -979,11 +1032,12 @@ class EncodingTest(BaseTest):
def test_encoding_plain_file(self):
# In Python 2.x, a plain file object is treated as having no encoding.
log = logging.getLogger("test")
- fn = tempfile.mktemp(".log")
+ fd, fn = tempfile.mkstemp(".log", "test_logging-1-")
+ os.close(fd)
# the non-ascii data we write to the log.
data = "foo\x80"
try:
- handler = logging.FileHandler(fn, encoding="utf8")
+ handler = logging.FileHandler(fn, encoding="utf-8")
log.addHandler(handler)
try:
# write non-ascii data to the log.
@@ -992,7 +1046,7 @@ class EncodingTest(BaseTest):
log.removeHandler(handler)
handler.close()
# check we wrote exactly those bytes, ignoring trailing \n etc
- f = open(fn, encoding="utf8")
+ f = open(fn, encoding="utf-8")
try:
self.assertEqual(f.read().rstrip(), data)
finally:
@@ -1051,15 +1105,1224 @@ class WarningsTest(BaseTest):
finally:
logging.captureWarnings(False)
+
+def formatFunc(format, datefmt=None):
+ return logging.Formatter(format, datefmt)
+
+def handlerFunc():
+ return logging.StreamHandler()
+
+class CustomHandler(logging.StreamHandler):
+ pass
+
+class ConfigDictTest(BaseTest):
+
+ """Reading logging config from a dictionary."""
+
+ expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
+
+ # config0 is a standard configuration.
+ config0 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ 'handlers' : ['hand1'],
+ },
+ }
+
+ # config1 adds a little to the standard configuration.
+ config1 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ # config1a moves the handler to the root. Used with config8a
+ config1a = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ 'handlers' : ['hand1'],
+ },
+ }
+
+ # config2 has a subtle configuration error that should be reported
+ config2 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdbout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ #As config1 but with a misspelt level on a handler
+ config2a = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NTOSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+
+ #As config1 but with a misspelt level on a logger
+ config2b = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WRANING',
+ },
+ }
+
+ # config3 has a less subtle configuration error
+ config3 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'misspelled_name',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ # config4 specifies a custom formatter class to be loaded
+ config4 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ '()' : __name__ + '.ExceptionFormatter',
+ 'format' : '%(levelname)s:%(name)s:%(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'root' : {
+ 'level' : 'NOTSET',
+ 'handlers' : ['hand1'],
+ },
+ }
+
+ # As config4 but using an actual callable rather than a string
+ config4a = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ '()' : ExceptionFormatter,
+ 'format' : '%(levelname)s:%(name)s:%(message)s',
+ },
+ 'form2' : {
+ '()' : __name__ + '.formatFunc',
+ 'format' : '%(levelname)s:%(name)s:%(message)s',
+ },
+ 'form3' : {
+ '()' : formatFunc,
+ 'format' : '%(levelname)s:%(name)s:%(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ 'hand2' : {
+ '()' : handlerFunc,
+ },
+ },
+ 'root' : {
+ 'level' : 'NOTSET',
+ 'handlers' : ['hand1'],
+ },
+ }
+
+ # config5 specifies a custom handler class to be loaded
+ config5 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : __name__ + '.CustomHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ # config6 specifies a custom handler class to be loaded
+ # but has bad arguments
+ config6 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : __name__ + '.CustomHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ '9' : 'invalid parameter name',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ #config 7 does not define compiler.parser but defines compiler.lexer
+ #so compiler.parser should be disabled after applying it
+ config7 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.lexer' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ # config8 defines both compiler and compiler.lexer
+ # so compiler.parser should not be disabled (since
+ # compiler is defined)
+ config8 = {
+ 'version': 1,
+ 'disable_existing_loggers' : False,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ 'compiler.lexer' : {
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ # config8a disables existing loggers
+ config8a = {
+ 'version': 1,
+ 'disable_existing_loggers' : True,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ 'compiler.lexer' : {
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ config9 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'WARNING',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'WARNING',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'NOTSET',
+ },
+ }
+
+ config9a = {
+ 'version': 1,
+ 'incremental' : True,
+ 'handlers' : {
+ 'hand1' : {
+ 'level' : 'WARNING',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'INFO',
+ },
+ },
+ }
+
+ config9b = {
+ 'version': 1,
+ 'incremental' : True,
+ 'handlers' : {
+ 'hand1' : {
+ 'level' : 'INFO',
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'INFO',
+ },
+ },
+ }
+
+ #As config1 but with a filter added
+ config10 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'filters' : {
+ 'filt1' : {
+ 'name' : 'compiler.parser',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ 'filters' : ['filt1'],
+ },
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'filters' : ['filt1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ 'handlers' : ['hand1'],
+ },
+ }
+
+ #As config1 but using cfg:// references
+ config11 = {
+ 'version': 1,
+ 'true_formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handler_configs': {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'formatters' : 'cfg://true_formatters',
+ 'handlers' : {
+ 'hand1' : 'cfg://handler_configs[hand1]',
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ #As config11 but missing the version key
+ config12 = {
+ 'true_formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handler_configs': {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'formatters' : 'cfg://true_formatters',
+ 'handlers' : {
+ 'hand1' : 'cfg://handler_configs[hand1]',
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ #As config11 but using an unsupported version
+ config13 = {
+ 'version': 2,
+ 'true_formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handler_configs': {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ },
+ },
+ 'formatters' : 'cfg://true_formatters',
+ 'handlers' : {
+ 'hand1' : 'cfg://handler_configs[hand1]',
+ },
+ 'loggers' : {
+ 'compiler.parser' : {
+ 'level' : 'DEBUG',
+ 'handlers' : ['hand1'],
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ },
+ }
+
+ def apply_config(self, conf):
+ logging.config.dictConfig(conf)
+
+ def test_config0_ok(self):
+ # A simple config which overrides the default settings.
+ with captured_stdout() as output:
+ self.apply_config(self.config0)
+ logger = logging.getLogger()
+ # Won't output anything
+ logger.info(self.next_message())
+ # Outputs a message
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_config1_ok(self, config=config1):
+ # A config defining a sub-parser as well.
+ with captured_stdout() as output:
+ self.apply_config(config)
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_config2_failure(self):
+ # A simple config which overrides the default settings.
+ self.assertRaises(Exception, self.apply_config, self.config2)
+
+ def test_config2a_failure(self):
+ # A simple config which overrides the default settings.
+ self.assertRaises(Exception, self.apply_config, self.config2a)
+
+ def test_config2b_failure(self):
+ # A simple config which overrides the default settings.
+ self.assertRaises(Exception, self.apply_config, self.config2b)
+
+ def test_config3_failure(self):
+ # A simple config which overrides the default settings.
+ self.assertRaises(Exception, self.apply_config, self.config3)
+
+ def test_config4_ok(self):
+ # A config specifying a custom formatter class.
+ with captured_stdout() as output:
+ self.apply_config(self.config4)
+ #logger = logging.getLogger()
+ try:
+ raise RuntimeError()
+ except RuntimeError:
+ logging.exception("just testing")
+ sys.stdout.seek(0)
+ self.assertEqual(output.getvalue(),
+ "ERROR:root:just testing\nGot a [RuntimeError]\n")
+ # Original logger output is empty
+ self.assert_log_lines([])
+
+ def test_config4a_ok(self):
+ # A config specifying a custom formatter class.
+ with captured_stdout() as output:
+ self.apply_config(self.config4a)
+ #logger = logging.getLogger()
+ try:
+ raise RuntimeError()
+ except RuntimeError:
+ logging.exception("just testing")
+ sys.stdout.seek(0)
+ self.assertEqual(output.getvalue(),
+ "ERROR:root:just testing\nGot a [RuntimeError]\n")
+ # Original logger output is empty
+ self.assert_log_lines([])
+
+ def test_config5_ok(self):
+ self.test_config1_ok(config=self.config5)
+
+ def test_config6_failure(self):
+ self.assertRaises(Exception, self.apply_config, self.config6)
+
+ def test_config7_ok(self):
+ with captured_stdout() as output:
+ self.apply_config(self.config1)
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+ with captured_stdout() as output:
+ self.apply_config(self.config7)
+ logger = logging.getLogger("compiler.parser")
+ self.assertTrue(logger.disabled)
+ logger = logging.getLogger("compiler.lexer")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '3'),
+ ('ERROR', '4'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ #Same as test_config_7_ok but don't disable old loggers.
+ def test_config_8_ok(self):
+ with captured_stdout() as output:
+ self.apply_config(self.config1)
+ logger = logging.getLogger("compiler.parser")
+ # All will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+ with captured_stdout() as output:
+ self.apply_config(self.config8)
+ logger = logging.getLogger("compiler.parser")
+ self.assertFalse(logger.disabled)
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ logger = logging.getLogger("compiler.lexer")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '3'),
+ ('ERROR', '4'),
+ ('INFO', '5'),
+ ('ERROR', '6'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_config_8a_ok(self):
+ with captured_stdout() as output:
+ self.apply_config(self.config1a)
+ logger = logging.getLogger("compiler.parser")
+ # See issue #11424. compiler-hyphenated sorts
+ # between compiler and compiler.xyz and this
+ # was preventing compiler.xyz from being included
+ # in the child loggers of compiler because of an
+ # overzealous loop termination condition.
+ hyphenated = logging.getLogger('compiler-hyphenated')
+ # All will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ hyphenated.critical(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ('CRITICAL', '3'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+ with captured_stdout() as output:
+ self.apply_config(self.config8a)
+ logger = logging.getLogger("compiler.parser")
+ self.assertFalse(logger.disabled)
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ logger = logging.getLogger("compiler.lexer")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ # Will not appear
+ hyphenated.critical(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '4'),
+ ('ERROR', '5'),
+ ('INFO', '6'),
+ ('ERROR', '7'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+ def test_config_9_ok(self):
+ with captured_stdout() as output:
+ self.apply_config(self.config9)
+ logger = logging.getLogger("compiler.parser")
+ #Nothing will be output since both handler and logger are set to WARNING
+ logger.info(self.next_message())
+ self.assert_log_lines([], stream=output)
+ self.apply_config(self.config9a)
+ #Nothing will be output since both handler is still set to WARNING
+ logger.info(self.next_message())
+ self.assert_log_lines([], stream=output)
+ self.apply_config(self.config9b)
+ #Message should now be output
+ logger.info(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '3'),
+ ], stream=output)
+
+ def test_config_10_ok(self):
+ with captured_stdout() as output:
+ self.apply_config(self.config10)
+ logger = logging.getLogger("compiler.parser")
+ logger.warning(self.next_message())
+ logger = logging.getLogger('compiler')
+ #Not output, because filtered
+ logger.warning(self.next_message())
+ logger = logging.getLogger('compiler.lexer')
+ #Not output, because filtered
+ logger.warning(self.next_message())
+ logger = logging.getLogger("compiler.parser.codegen")
+ #Output, as not filtered
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('WARNING', '1'),
+ ('ERROR', '4'),
+ ], stream=output)
+
+ def test_config11_ok(self):
+ self.test_config1_ok(self.config11)
+
+ def test_config12_failure(self):
+ self.assertRaises(Exception, self.apply_config, self.config12)
+
+ def test_config13_failure(self):
+ self.assertRaises(Exception, self.apply_config, self.config13)
+
+ @unittest.skipUnless(threading, 'listen() needs threading to work')
+ def setup_via_listener(self, text):
+ text = text.encode("utf-8")
+ # Ask for a randomly assigned port (by using port 0)
+ t = logging.config.listen(0)
+ t.start()
+ t.ready.wait()
+ # Now get the port allocated
+ port = t.port
+ t.ready.clear()
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.settimeout(2.0)
+ sock.connect(('localhost', port))
+
+ slen = struct.pack('>L', len(text))
+ s = slen + text
+ sentsofar = 0
+ left = len(s)
+ while left > 0:
+ sent = sock.send(s[sentsofar:])
+ sentsofar += sent
+ left -= sent
+ sock.close()
+ finally:
+ t.ready.wait(2.0)
+ logging.config.stopListening()
+ t.join(2.0)
+
+ def test_listen_config_10_ok(self):
+ with captured_stdout() as output:
+ self.setup_via_listener(json.dumps(self.config10))
+ logger = logging.getLogger("compiler.parser")
+ logger.warning(self.next_message())
+ logger = logging.getLogger('compiler')
+ #Not output, because filtered
+ logger.warning(self.next_message())
+ logger = logging.getLogger('compiler.lexer')
+ #Not output, because filtered
+ logger.warning(self.next_message())
+ logger = logging.getLogger("compiler.parser.codegen")
+ #Output, as not filtered
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('WARNING', '1'),
+ ('ERROR', '4'),
+ ], stream=output)
+
+ def test_listen_config_1_ok(self):
+ with captured_stdout() as output:
+ self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
+
+class ManagerTest(BaseTest):
+ def test_manager_loggerclass(self):
+ logged = []
+
+ class MyLogger(logging.Logger):
+ def _log(self, level, msg, args, exc_info=None, extra=None):
+ logged.append(msg)
+
+ man = logging.Manager(None)
+ self.assertRaises(TypeError, man.setLoggerClass, int)
+ man.setLoggerClass(MyLogger)
+ logger = man.getLogger('test')
+ logger.warning('should appear in logged')
+ logging.warning('should not appear in logged')
+
+ self.assertEqual(logged, ['should appear in logged'])
+
+
+class ChildLoggerTest(BaseTest):
+ def test_child_loggers(self):
+ r = logging.getLogger()
+ l1 = logging.getLogger('abc')
+ l2 = logging.getLogger('def.ghi')
+ c1 = r.getChild('xyz')
+ c2 = r.getChild('uvw.xyz')
+ self.assertTrue(c1 is logging.getLogger('xyz'))
+ self.assertTrue(c2 is logging.getLogger('uvw.xyz'))
+ c1 = l1.getChild('def')
+ c2 = c1.getChild('ghi')
+ c3 = l1.getChild('def.ghi')
+ self.assertTrue(c1 is logging.getLogger('abc.def'))
+ self.assertTrue(c2 is logging.getLogger('abc.def.ghi'))
+ self.assertTrue(c2 is c3)
+
+
+class DerivedLogRecord(logging.LogRecord):
+ pass
+
+class LogRecordFactoryTest(BaseTest):
+
+ def setUp(self):
+ class CheckingFilter(logging.Filter):
+ def __init__(self, cls):
+ self.cls = cls
+
+ def filter(self, record):
+ t = type(record)
+ if t is not self.cls:
+ msg = 'Unexpected LogRecord type %s, expected %s' % (t,
+ self.cls)
+ raise TypeError(msg)
+ return True
+
+ BaseTest.setUp(self)
+ self.filter = CheckingFilter(DerivedLogRecord)
+ self.root_logger.addFilter(self.filter)
+ self.orig_factory = logging.getLogRecordFactory()
+
+ def tearDown(self):
+ self.root_logger.removeFilter(self.filter)
+ BaseTest.tearDown(self)
+ logging.setLogRecordFactory(self.orig_factory)
+
+ def test_logrecord_class(self):
+ self.assertRaises(TypeError, self.root_logger.warning,
+ self.next_message())
+ logging.setLogRecordFactory(DerivedLogRecord)
+ self.root_logger.error(self.next_message())
+ self.assert_log_lines([
+ ('root', 'ERROR', '2'),
+ ])
+
+
+class QueueHandlerTest(BaseTest):
+ # Do not bother with a logger name group.
+ expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+
+ def setUp(self):
+ BaseTest.setUp(self)
+ self.queue = queue.Queue(-1)
+ self.que_hdlr = logging.handlers.QueueHandler(self.queue)
+ self.que_logger = logging.getLogger('que')
+ self.que_logger.propagate = False
+ self.que_logger.setLevel(logging.WARNING)
+ self.que_logger.addHandler(self.que_hdlr)
+
+ def tearDown(self):
+ self.que_hdlr.close()
+ BaseTest.tearDown(self)
+
+ def test_queue_handler(self):
+ self.que_logger.debug(self.next_message())
+ self.assertRaises(queue.Empty, self.queue.get_nowait)
+ self.que_logger.info(self.next_message())
+ self.assertRaises(queue.Empty, self.queue.get_nowait)
+ msg = self.next_message()
+ self.que_logger.warning(msg)
+ data = self.queue.get_nowait()
+ self.assertTrue(isinstance(data, logging.LogRecord))
+ self.assertEqual(data.name, self.que_logger.name)
+ self.assertEqual((data.msg, data.args), (msg, None))
+
+class FormatterTest(unittest.TestCase):
+ def setUp(self):
+ self.common = {
+ 'name': 'formatter.test',
+ 'level': logging.DEBUG,
+ 'pathname': os.path.join('path', 'to', 'dummy.ext'),
+ 'lineno': 42,
+ 'exc_info': None,
+ 'func': None,
+ 'msg': 'Message with %d %s',
+ 'args': (2, 'placeholders'),
+ }
+ self.variants = {
+ }
+
+ def get_record(self, name=None):
+ result = dict(self.common)
+ if name is not None:
+ result.update(self.variants[name])
+ return logging.makeLogRecord(result)
+
+ def test_percent(self):
+ # Test %-formatting
+ r = self.get_record()
+ f = logging.Formatter('${%(message)s}')
+ self.assertEqual(f.format(r), '${Message with 2 placeholders}')
+ f = logging.Formatter('%(random)s')
+ self.assertRaises(KeyError, f.format, r)
+ self.assertFalse(f.usesTime())
+ f = logging.Formatter('%(asctime)s')
+ self.assertTrue(f.usesTime())
+ f = logging.Formatter('%(asctime)-15s')
+ self.assertTrue(f.usesTime())
+ f = logging.Formatter('asctime')
+ self.assertFalse(f.usesTime())
+
+ def test_braces(self):
+ # Test {}-formatting
+ r = self.get_record()
+ f = logging.Formatter('$%{message}%$', style='{')
+ self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
+ f = logging.Formatter('{random}', style='{')
+ self.assertRaises(KeyError, f.format, r)
+ self.assertFalse(f.usesTime())
+ f = logging.Formatter('{asctime}', style='{')
+ self.assertTrue(f.usesTime())
+ f = logging.Formatter('{asctime!s:15}', style='{')
+ self.assertTrue(f.usesTime())
+ f = logging.Formatter('{asctime:15}', style='{')
+ self.assertTrue(f.usesTime())
+ f = logging.Formatter('asctime', style='{')
+ self.assertFalse(f.usesTime())
+
+ def test_dollars(self):
+ # Test $-formatting
+ r = self.get_record()
+ f = logging.Formatter('$message', style='$')
+ self.assertEqual(f.format(r), 'Message with 2 placeholders')
+ f = logging.Formatter('$$%${message}%$$', style='$')
+ self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
+ f = logging.Formatter('${random}', style='$')
+ self.assertRaises(KeyError, f.format, r)
+ self.assertFalse(f.usesTime())
+ f = logging.Formatter('${asctime}', style='$')
+ self.assertTrue(f.usesTime())
+ f = logging.Formatter('${asctime', style='$')
+ self.assertFalse(f.usesTime())
+ f = logging.Formatter('$asctime', style='$')
+ self.assertTrue(f.usesTime())
+ f = logging.Formatter('asctime', style='$')
+ self.assertFalse(f.usesTime())
+
+class LastResortTest(BaseTest):
+ def test_last_resort(self):
+ # Test the last resort handler
+ root = self.root_logger
+ root.removeHandler(self.root_hdlr)
+ old_stderr = sys.stderr
+ old_lastresort = logging.lastResort
+ old_raise_exceptions = logging.raiseExceptions
+ try:
+ sys.stderr = sio = io.StringIO()
+ root.warning('This is your final chance!')
+ self.assertEqual(sio.getvalue(), 'This is your final chance!\n')
+ #No handlers and no last resort, so 'No handlers' message
+ logging.lastResort = None
+ sys.stderr = sio = io.StringIO()
+ root.warning('This is your final chance!')
+ self.assertEqual(sio.getvalue(), 'No handlers could be found for logger "root"\n')
+ # 'No handlers' message only printed once
+ sys.stderr = sio = io.StringIO()
+ root.warning('This is your final chance!')
+ self.assertEqual(sio.getvalue(), '')
+ root.manager.emittedNoHandlerWarning = False
+ #If raiseExceptions is False, no message is printed
+ logging.raiseExceptions = False
+ sys.stderr = sio = io.StringIO()
+ root.warning('This is your final chance!')
+ self.assertEqual(sio.getvalue(), '')
+ finally:
+ sys.stderr = old_stderr
+ root.addHandler(self.root_hdlr)
+ logging.lastResort = old_lastresort
+ logging.raiseExceptions = old_raise_exceptions
+
+
+class BaseFileTest(BaseTest):
+ "Base class for handler tests that write log files"
+
+ def setUp(self):
+ BaseTest.setUp(self)
+ fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-")
+ os.close(fd)
+ self.rmfiles = []
+
+ def tearDown(self):
+ for fn in self.rmfiles:
+ os.unlink(fn)
+ if os.path.exists(self.fn):
+ os.unlink(self.fn)
+ BaseTest.tearDown(self)
+
+ def assertLogFile(self, filename):
+ "Assert a log file is there and register it for deletion"
+ self.assertTrue(os.path.exists(filename),
+ msg="Log file %r does not exist")
+ self.rmfiles.append(filename)
+
+
+class RotatingFileHandlerTest(BaseFileTest):
+ def next_rec(self):
+ return logging.LogRecord('n', logging.DEBUG, 'p', 1,
+ self.next_message(), None, None, None)
+
+ def test_should_not_rollover(self):
+ # If maxbytes is zero rollover never occurs
+ rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0)
+ self.assertFalse(rh.shouldRollover(None))
+ rh.close()
+
+ def test_should_rollover(self):
+ rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1)
+ self.assertTrue(rh.shouldRollover(self.next_rec()))
+ rh.close()
+
+ def test_file_created(self):
+ # checks that the file is created and assumes it was created
+ # by us
+ rh = logging.handlers.RotatingFileHandler(self.fn)
+ rh.emit(self.next_rec())
+ self.assertLogFile(self.fn)
+ rh.close()
+
+ def test_rollover_filenames(self):
+ rh = logging.handlers.RotatingFileHandler(
+ self.fn, backupCount=2, maxBytes=1)
+ rh.emit(self.next_rec())
+ self.assertLogFile(self.fn)
+ rh.emit(self.next_rec())
+ self.assertLogFile(self.fn + ".1")
+ rh.emit(self.next_rec())
+ self.assertLogFile(self.fn + ".2")
+ self.assertFalse(os.path.exists(self.fn + ".3"))
+ rh.close()
+
+class TimedRotatingFileHandlerTest(BaseFileTest):
+ # test methods added below
+ pass
+
+def secs(**kw):
+ return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
+
+for when, exp in (('S', 1),
+ ('M', 60),
+ ('H', 60 * 60),
+ ('D', 60 * 60 * 24),
+ ('MIDNIGHT', 60 * 60 * 24),
+ # current time (epoch start) is a Thursday, W0 means Monday
+ ('W0', secs(days=4, hours=24)),
+ ):
+ def test_compute_rollover(self, when=when, exp=exp):
+ rh = logging.handlers.TimedRotatingFileHandler(
+ self.fn, when=when, interval=1, backupCount=0, utc=True)
+ currentTime = 0.0
+ actual = rh.computeRollover(currentTime)
+ if exp != actual:
+ # Failures occur on some systems for MIDNIGHT and W0.
+ # Print detailed calculation for MIDNIGHT so we can try to see
+ # what's going on
+ import time
+ if when == 'MIDNIGHT':
+ try:
+ if rh.utc:
+ t = time.gmtime(currentTime)
+ else:
+ t = time.localtime(currentTime)
+ currentHour = t[3]
+ currentMinute = t[4]
+ currentSecond = t[5]
+ # r is the number of seconds left between now and midnight
+ r = logging.handlers._MIDNIGHT - ((currentHour * 60 +
+ currentMinute) * 60 +
+ currentSecond)
+ result = currentTime + r
+ print('t: %s (%s)' % (t, rh.utc), file=sys.stderr)
+ print('currentHour: %s' % currentHour, file=sys.stderr)
+ print('currentMinute: %s' % currentMinute, file=sys.stderr)
+ print('currentSecond: %s' % currentSecond, file=sys.stderr)
+ print('r: %s' % r, file=sys.stderr)
+ print('result: %s' % result, file=sys.stderr)
+ except Exception:
+ print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr)
+ self.assertEqual(exp, actual)
+ rh.close()
+ setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover)
+
# Set the locale to the platform-dependent default. I have no idea
# why the test does this, but in any case we save the current locale
# first and restore it at the end.
@run_with_locale('LC_ALL', '')
def test_main():
run_unittest(BuiltinLevelsTest, BasicFilterTest,
- CustomLevelsAndFiltersTest, MemoryHandlerTest,
- ConfigFileTest, SocketHandlerTest, MemoryTest,
- EncodingTest, WarningsTest)
+ CustomLevelsAndFiltersTest, MemoryHandlerTest,
+ ConfigFileTest, SocketHandlerTest, MemoryTest,
+ EncodingTest, WarningsTest, ConfigDictTest, ManagerTest,
+ FormatterTest,
+ LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest,
+ RotatingFileHandlerTest,
+ LastResortTest,
+ TimedRotatingFileHandlerTest
+ )
if __name__ == "__main__":
test_main()