diff options
Diffstat (limited to 'Lib/test/test_logging.py')
| -rw-r--r-- | Lib/test/test_logging.py | 1315 |
1 files changed, 1289 insertions, 26 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index 9b36350..b62545c 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2001-2010 by Vinay Sajip. All Rights Reserved. +# Copyright 2001-2011 by Vinay Sajip. All Rights Reserved. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose and without fee is hereby granted, @@ -18,7 +18,7 @@ """Test harness for the logging module. Run all tests. -Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved. +Copyright (C) 2001-2011 Vinay Sajip. All Rights Reserved. """ import logging @@ -26,27 +26,29 @@ import logging.handlers import logging.config import codecs -import copy +import datetime import pickle import io import gc +import json import os +import queue import re import select import socket from socketserver import ThreadingTCPServer, StreamRequestHandler -import string import struct import sys import tempfile from test.support import captured_stdout, run_with_locale, run_unittest import textwrap -import threading -import time -import types import unittest import warnings import weakref +try: + import threading +except ImportError: + threading = None class BaseTest(unittest.TestCase): @@ -65,11 +67,21 @@ class BaseTest(unittest.TestCase): try: self.saved_handlers = logging._handlers.copy() self.saved_handler_list = logging._handlerList[:] - self.saved_loggers = logger_dict.copy() + self.saved_loggers = saved_loggers = logger_dict.copy() self.saved_level_names = logging._levelNames.copy() + self.logger_states = logger_states = {} + for name in saved_loggers: + logger_states[name] = getattr(saved_loggers[name], + 'disabled', None) finally: logging._releaseLock() + # Set two unused loggers: one non-ASCII and one Unicode. + # This is to test correct operation when sorting existing + # loggers in the configuration code. See issue 8201. + self.logger1 = logging.getLogger("\xab\xd7\xbb") + self.logger2 = logging.getLogger("\u013f\u00d6\u0047") + self.root_logger = logging.getLogger("") self.original_logging_level = self.root_logger.getEffectiveLevel() @@ -78,13 +90,25 @@ class BaseTest(unittest.TestCase): self.root_hdlr = logging.StreamHandler(self.stream) self.root_formatter = logging.Formatter(self.log_format) self.root_hdlr.setFormatter(self.root_formatter) + if self.logger1.hasHandlers(): + hlist = self.logger1.handlers + self.root_logger.handlers + raise AssertionError('Unexpected handlers: %s' % hlist) + if self.logger2.hasHandlers(): + hlist = self.logger2.handlers + self.root_logger.handlers + raise AssertionError('Unexpected handlers: %s' % hlist) self.root_logger.addHandler(self.root_hdlr) + self.assertTrue(self.logger1.hasHandlers()) + self.assertTrue(self.logger2.hasHandlers()) def tearDown(self): """Remove our logging stream, and restore the original logging level.""" self.stream.close() self.root_logger.removeHandler(self.root_hdlr) + while self.root_logger.handlers: + h = self.root_logger.handlers[0] + self.root_logger.removeHandler(h) + h.close() self.root_logger.setLevel(self.original_logging_level) logging._acquireLock() try: @@ -96,6 +120,10 @@ class BaseTest(unittest.TestCase): loggerDict = logging.getLogger().manager.loggerDict loggerDict.clear() loggerDict.update(self.saved_loggers) + logger_states = self.logger_states + for name in self.logger_states: + if logger_states[name] is not None: + self.saved_loggers[name].disabled = logger_states[name] finally: logging._releaseLock() @@ -111,7 +139,8 @@ class BaseTest(unittest.TestCase): except AttributeError: # StringIO.StringIO lacks a reset() method. actual_lines = stream.getvalue().splitlines() - self.assertEqual(len(actual_lines), len(expected_values)) + self.assertEqual(len(actual_lines), len(expected_values), + '%s vs. %s' % (actual_lines, expected_values)) for actual, expected in zip(actual_lines, expected_values): match = pat.search(actual) if not match: @@ -138,7 +167,7 @@ class BuiltinLevelsTest(BaseTest): ERR = logging.getLogger("ERR") ERR.setLevel(logging.ERROR) - INF = logging.getLogger("INF") + INF = logging.LoggerAdapter(logging.getLogger("INF"), {}) INF.setLevel(logging.INFO) DEB = logging.getLogger("DEB") DEB.setLevel(logging.DEBUG) @@ -292,6 +321,35 @@ class BasicFilterTest(BaseTest): finally: handler.removeFilter(filter_) + def test_callable_filter(self): + # Only messages satisfying the specified criteria pass through the + # filter. + + def filterfunc(record): + parts = record.name.split('.') + prefix = '.'.join(parts[:2]) + return prefix == 'spam.eggs' + + handler = self.root_logger.handlers[0] + try: + handler.addFilter(filterfunc) + spam = logging.getLogger("spam") + spam_eggs = logging.getLogger("spam.eggs") + spam_eggs_fish = logging.getLogger("spam.eggs.fish") + spam_bakedbeans = logging.getLogger("spam.bakedbeans") + + spam.info(self.next_message()) + spam_eggs.info(self.next_message()) # Good. + spam_eggs_fish.info(self.next_message()) # Good. + spam_bakedbeans.info(self.next_message()) + + self.assert_log_lines([ + ('spam.eggs', 'INFO', '2'), + ('spam.eggs.fish', 'INFO', '3'), + ]) + finally: + handler.removeFilter(filterfunc) + # # First, we define our levels. There can be as many as you want - the only @@ -355,7 +413,7 @@ class CustomLevelsAndFiltersTest(BaseTest): def setUp(self): BaseTest.setUp(self) - for k, v in list(my_logging_levels.items()): + for k, v in my_logging_levels.items(): logging.addLevelName(k, v) def log_at_all_levels(self, logger): @@ -702,14 +760,8 @@ class ConfigFileTest(BaseTest): """ def apply_config(self, conf): - try: - fn = tempfile.mktemp(".ini") - f = open(fn, "w") - f.write(textwrap.dedent(conf)) - f.close() - logging.config.fileConfig(fn) - finally: - os.remove(fn) + file = io.StringIO(textwrap.dedent(conf)) + logging.config.fileConfig(file) def test_config0_ok(self): # A simple config file which overrides the default settings. @@ -876,6 +928,7 @@ class LogRecordSocketReceiver(ThreadingTCPServer): self.server_close() +@unittest.skipUnless(threading, 'Threading required for this test.') class SocketHandlerTest(BaseTest): """Test for SocketHandler objects.""" @@ -944,7 +997,7 @@ class MemoryTest(BaseTest): # Trigger cycle breaking. gc.collect() dead = [] - for (id_, repr_), ref in list(self._survivors.items()): + for (id_, repr_), ref in self._survivors.items(): if ref() is None: dead.append(repr_) if dead: @@ -979,11 +1032,12 @@ class EncodingTest(BaseTest): def test_encoding_plain_file(self): # In Python 2.x, a plain file object is treated as having no encoding. log = logging.getLogger("test") - fn = tempfile.mktemp(".log") + fd, fn = tempfile.mkstemp(".log", "test_logging-1-") + os.close(fd) # the non-ascii data we write to the log. data = "foo\x80" try: - handler = logging.FileHandler(fn, encoding="utf8") + handler = logging.FileHandler(fn, encoding="utf-8") log.addHandler(handler) try: # write non-ascii data to the log. @@ -992,7 +1046,7 @@ class EncodingTest(BaseTest): log.removeHandler(handler) handler.close() # check we wrote exactly those bytes, ignoring trailing \n etc - f = open(fn, encoding="utf8") + f = open(fn, encoding="utf-8") try: self.assertEqual(f.read().rstrip(), data) finally: @@ -1051,15 +1105,1224 @@ class WarningsTest(BaseTest): finally: logging.captureWarnings(False) + +def formatFunc(format, datefmt=None): + return logging.Formatter(format, datefmt) + +def handlerFunc(): + return logging.StreamHandler() + +class CustomHandler(logging.StreamHandler): + pass + +class ConfigDictTest(BaseTest): + + """Reading logging config from a dictionary.""" + + expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" + + # config0 is a standard configuration. + config0 = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'root' : { + 'level' : 'WARNING', + 'handlers' : ['hand1'], + }, + } + + # config1 adds a little to the standard configuration. + config1 = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + }, + 'root' : { + 'level' : 'WARNING', + }, + } + + # config1a moves the handler to the root. Used with config8a + config1a = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'DEBUG', + }, + }, + 'root' : { + 'level' : 'WARNING', + 'handlers' : ['hand1'], + }, + } + + # config2 has a subtle configuration error that should be reported + config2 = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdbout', + }, + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + }, + 'root' : { + 'level' : 'WARNING', + }, + } + + #As config1 but with a misspelt level on a handler + config2a = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NTOSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + }, + 'root' : { + 'level' : 'WARNING', + }, + } + + + #As config1 but with a misspelt level on a logger + config2b = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + }, + 'root' : { + 'level' : 'WRANING', + }, + } + + # config3 has a less subtle configuration error + config3 = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'misspelled_name', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + }, + 'root' : { + 'level' : 'WARNING', + }, + } + + # config4 specifies a custom formatter class to be loaded + config4 = { + 'version': 1, + 'formatters': { + 'form1' : { + '()' : __name__ + '.ExceptionFormatter', + 'format' : '%(levelname)s:%(name)s:%(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'root' : { + 'level' : 'NOTSET', + 'handlers' : ['hand1'], + }, + } + + # As config4 but using an actual callable rather than a string + config4a = { + 'version': 1, + 'formatters': { + 'form1' : { + '()' : ExceptionFormatter, + 'format' : '%(levelname)s:%(name)s:%(message)s', + }, + 'form2' : { + '()' : __name__ + '.formatFunc', + 'format' : '%(levelname)s:%(name)s:%(message)s', + }, + 'form3' : { + '()' : formatFunc, + 'format' : '%(levelname)s:%(name)s:%(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + 'hand2' : { + '()' : handlerFunc, + }, + }, + 'root' : { + 'level' : 'NOTSET', + 'handlers' : ['hand1'], + }, + } + + # config5 specifies a custom handler class to be loaded + config5 = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : __name__ + '.CustomHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + }, + 'root' : { + 'level' : 'WARNING', + }, + } + + # config6 specifies a custom handler class to be loaded + # but has bad arguments + config6 = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : __name__ + '.CustomHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + '9' : 'invalid parameter name', + }, + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + }, + 'root' : { + 'level' : 'WARNING', + }, + } + + #config 7 does not define compiler.parser but defines compiler.lexer + #so compiler.parser should be disabled after applying it + config7 = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'loggers' : { + 'compiler.lexer' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + }, + 'root' : { + 'level' : 'WARNING', + }, + } + + # config8 defines both compiler and compiler.lexer + # so compiler.parser should not be disabled (since + # compiler is defined) + config8 = { + 'version': 1, + 'disable_existing_loggers' : False, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'loggers' : { + 'compiler' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + 'compiler.lexer' : { + }, + }, + 'root' : { + 'level' : 'WARNING', + }, + } + + # config8a disables existing loggers + config8a = { + 'version': 1, + 'disable_existing_loggers' : True, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'loggers' : { + 'compiler' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + 'compiler.lexer' : { + }, + }, + 'root' : { + 'level' : 'WARNING', + }, + } + + config9 = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'WARNING', + 'stream' : 'ext://sys.stdout', + }, + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'WARNING', + 'handlers' : ['hand1'], + }, + }, + 'root' : { + 'level' : 'NOTSET', + }, + } + + config9a = { + 'version': 1, + 'incremental' : True, + 'handlers' : { + 'hand1' : { + 'level' : 'WARNING', + }, + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'INFO', + }, + }, + } + + config9b = { + 'version': 1, + 'incremental' : True, + 'handlers' : { + 'hand1' : { + 'level' : 'INFO', + }, + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'INFO', + }, + }, + } + + #As config1 but with a filter added + config10 = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'filters' : { + 'filt1' : { + 'name' : 'compiler.parser', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + 'filters' : ['filt1'], + }, + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'DEBUG', + 'filters' : ['filt1'], + }, + }, + 'root' : { + 'level' : 'WARNING', + 'handlers' : ['hand1'], + }, + } + + #As config1 but using cfg:// references + config11 = { + 'version': 1, + 'true_formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handler_configs': { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'formatters' : 'cfg://true_formatters', + 'handlers' : { + 'hand1' : 'cfg://handler_configs[hand1]', + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + }, + 'root' : { + 'level' : 'WARNING', + }, + } + + #As config11 but missing the version key + config12 = { + 'true_formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handler_configs': { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'formatters' : 'cfg://true_formatters', + 'handlers' : { + 'hand1' : 'cfg://handler_configs[hand1]', + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + }, + 'root' : { + 'level' : 'WARNING', + }, + } + + #As config11 but using an unsupported version + config13 = { + 'version': 2, + 'true_formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handler_configs': { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + }, + }, + 'formatters' : 'cfg://true_formatters', + 'handlers' : { + 'hand1' : 'cfg://handler_configs[hand1]', + }, + 'loggers' : { + 'compiler.parser' : { + 'level' : 'DEBUG', + 'handlers' : ['hand1'], + }, + }, + 'root' : { + 'level' : 'WARNING', + }, + } + + def apply_config(self, conf): + logging.config.dictConfig(conf) + + def test_config0_ok(self): + # A simple config which overrides the default settings. + with captured_stdout() as output: + self.apply_config(self.config0) + logger = logging.getLogger() + # Won't output anything + logger.info(self.next_message()) + # Outputs a message + logger.error(self.next_message()) + self.assert_log_lines([ + ('ERROR', '2'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + + def test_config1_ok(self, config=config1): + # A config defining a sub-parser as well. + with captured_stdout() as output: + self.apply_config(config) + logger = logging.getLogger("compiler.parser") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + + def test_config2_failure(self): + # A simple config which overrides the default settings. + self.assertRaises(Exception, self.apply_config, self.config2) + + def test_config2a_failure(self): + # A simple config which overrides the default settings. + self.assertRaises(Exception, self.apply_config, self.config2a) + + def test_config2b_failure(self): + # A simple config which overrides the default settings. + self.assertRaises(Exception, self.apply_config, self.config2b) + + def test_config3_failure(self): + # A simple config which overrides the default settings. + self.assertRaises(Exception, self.apply_config, self.config3) + + def test_config4_ok(self): + # A config specifying a custom formatter class. + with captured_stdout() as output: + self.apply_config(self.config4) + #logger = logging.getLogger() + try: + raise RuntimeError() + except RuntimeError: + logging.exception("just testing") + sys.stdout.seek(0) + self.assertEqual(output.getvalue(), + "ERROR:root:just testing\nGot a [RuntimeError]\n") + # Original logger output is empty + self.assert_log_lines([]) + + def test_config4a_ok(self): + # A config specifying a custom formatter class. + with captured_stdout() as output: + self.apply_config(self.config4a) + #logger = logging.getLogger() + try: + raise RuntimeError() + except RuntimeError: + logging.exception("just testing") + sys.stdout.seek(0) + self.assertEqual(output.getvalue(), + "ERROR:root:just testing\nGot a [RuntimeError]\n") + # Original logger output is empty + self.assert_log_lines([]) + + def test_config5_ok(self): + self.test_config1_ok(config=self.config5) + + def test_config6_failure(self): + self.assertRaises(Exception, self.apply_config, self.config6) + + def test_config7_ok(self): + with captured_stdout() as output: + self.apply_config(self.config1) + logger = logging.getLogger("compiler.parser") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + with captured_stdout() as output: + self.apply_config(self.config7) + logger = logging.getLogger("compiler.parser") + self.assertTrue(logger.disabled) + logger = logging.getLogger("compiler.lexer") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '3'), + ('ERROR', '4'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + + #Same as test_config_7_ok but don't disable old loggers. + def test_config_8_ok(self): + with captured_stdout() as output: + self.apply_config(self.config1) + logger = logging.getLogger("compiler.parser") + # All will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + with captured_stdout() as output: + self.apply_config(self.config8) + logger = logging.getLogger("compiler.parser") + self.assertFalse(logger.disabled) + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + logger = logging.getLogger("compiler.lexer") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '3'), + ('ERROR', '4'), + ('INFO', '5'), + ('ERROR', '6'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + + def test_config_8a_ok(self): + with captured_stdout() as output: + self.apply_config(self.config1a) + logger = logging.getLogger("compiler.parser") + # See issue #11424. compiler-hyphenated sorts + # between compiler and compiler.xyz and this + # was preventing compiler.xyz from being included + # in the child loggers of compiler because of an + # overzealous loop termination condition. + hyphenated = logging.getLogger('compiler-hyphenated') + # All will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + hyphenated.critical(self.next_message()) + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ('CRITICAL', '3'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + with captured_stdout() as output: + self.apply_config(self.config8a) + logger = logging.getLogger("compiler.parser") + self.assertFalse(logger.disabled) + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + logger = logging.getLogger("compiler.lexer") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + # Will not appear + hyphenated.critical(self.next_message()) + self.assert_log_lines([ + ('INFO', '4'), + ('ERROR', '5'), + ('INFO', '6'), + ('ERROR', '7'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + + def test_config_9_ok(self): + with captured_stdout() as output: + self.apply_config(self.config9) + logger = logging.getLogger("compiler.parser") + #Nothing will be output since both handler and logger are set to WARNING + logger.info(self.next_message()) + self.assert_log_lines([], stream=output) + self.apply_config(self.config9a) + #Nothing will be output since both handler is still set to WARNING + logger.info(self.next_message()) + self.assert_log_lines([], stream=output) + self.apply_config(self.config9b) + #Message should now be output + logger.info(self.next_message()) + self.assert_log_lines([ + ('INFO', '3'), + ], stream=output) + + def test_config_10_ok(self): + with captured_stdout() as output: + self.apply_config(self.config10) + logger = logging.getLogger("compiler.parser") + logger.warning(self.next_message()) + logger = logging.getLogger('compiler') + #Not output, because filtered + logger.warning(self.next_message()) + logger = logging.getLogger('compiler.lexer') + #Not output, because filtered + logger.warning(self.next_message()) + logger = logging.getLogger("compiler.parser.codegen") + #Output, as not filtered + logger.error(self.next_message()) + self.assert_log_lines([ + ('WARNING', '1'), + ('ERROR', '4'), + ], stream=output) + + def test_config11_ok(self): + self.test_config1_ok(self.config11) + + def test_config12_failure(self): + self.assertRaises(Exception, self.apply_config, self.config12) + + def test_config13_failure(self): + self.assertRaises(Exception, self.apply_config, self.config13) + + @unittest.skipUnless(threading, 'listen() needs threading to work') + def setup_via_listener(self, text): + text = text.encode("utf-8") + # Ask for a randomly assigned port (by using port 0) + t = logging.config.listen(0) + t.start() + t.ready.wait() + # Now get the port allocated + port = t.port + t.ready.clear() + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(2.0) + sock.connect(('localhost', port)) + + slen = struct.pack('>L', len(text)) + s = slen + text + sentsofar = 0 + left = len(s) + while left > 0: + sent = sock.send(s[sentsofar:]) + sentsofar += sent + left -= sent + sock.close() + finally: + t.ready.wait(2.0) + logging.config.stopListening() + t.join(2.0) + + def test_listen_config_10_ok(self): + with captured_stdout() as output: + self.setup_via_listener(json.dumps(self.config10)) + logger = logging.getLogger("compiler.parser") + logger.warning(self.next_message()) + logger = logging.getLogger('compiler') + #Not output, because filtered + logger.warning(self.next_message()) + logger = logging.getLogger('compiler.lexer') + #Not output, because filtered + logger.warning(self.next_message()) + logger = logging.getLogger("compiler.parser.codegen") + #Output, as not filtered + logger.error(self.next_message()) + self.assert_log_lines([ + ('WARNING', '1'), + ('ERROR', '4'), + ], stream=output) + + def test_listen_config_1_ok(self): + with captured_stdout() as output: + self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) + logger = logging.getLogger("compiler.parser") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + + +class ManagerTest(BaseTest): + def test_manager_loggerclass(self): + logged = [] + + class MyLogger(logging.Logger): + def _log(self, level, msg, args, exc_info=None, extra=None): + logged.append(msg) + + man = logging.Manager(None) + self.assertRaises(TypeError, man.setLoggerClass, int) + man.setLoggerClass(MyLogger) + logger = man.getLogger('test') + logger.warning('should appear in logged') + logging.warning('should not appear in logged') + + self.assertEqual(logged, ['should appear in logged']) + + +class ChildLoggerTest(BaseTest): + def test_child_loggers(self): + r = logging.getLogger() + l1 = logging.getLogger('abc') + l2 = logging.getLogger('def.ghi') + c1 = r.getChild('xyz') + c2 = r.getChild('uvw.xyz') + self.assertTrue(c1 is logging.getLogger('xyz')) + self.assertTrue(c2 is logging.getLogger('uvw.xyz')) + c1 = l1.getChild('def') + c2 = c1.getChild('ghi') + c3 = l1.getChild('def.ghi') + self.assertTrue(c1 is logging.getLogger('abc.def')) + self.assertTrue(c2 is logging.getLogger('abc.def.ghi')) + self.assertTrue(c2 is c3) + + +class DerivedLogRecord(logging.LogRecord): + pass + +class LogRecordFactoryTest(BaseTest): + + def setUp(self): + class CheckingFilter(logging.Filter): + def __init__(self, cls): + self.cls = cls + + def filter(self, record): + t = type(record) + if t is not self.cls: + msg = 'Unexpected LogRecord type %s, expected %s' % (t, + self.cls) + raise TypeError(msg) + return True + + BaseTest.setUp(self) + self.filter = CheckingFilter(DerivedLogRecord) + self.root_logger.addFilter(self.filter) + self.orig_factory = logging.getLogRecordFactory() + + def tearDown(self): + self.root_logger.removeFilter(self.filter) + BaseTest.tearDown(self) + logging.setLogRecordFactory(self.orig_factory) + + def test_logrecord_class(self): + self.assertRaises(TypeError, self.root_logger.warning, + self.next_message()) + logging.setLogRecordFactory(DerivedLogRecord) + self.root_logger.error(self.next_message()) + self.assert_log_lines([ + ('root', 'ERROR', '2'), + ]) + + +class QueueHandlerTest(BaseTest): + # Do not bother with a logger name group. + expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + + def setUp(self): + BaseTest.setUp(self) + self.queue = queue.Queue(-1) + self.que_hdlr = logging.handlers.QueueHandler(self.queue) + self.que_logger = logging.getLogger('que') + self.que_logger.propagate = False + self.que_logger.setLevel(logging.WARNING) + self.que_logger.addHandler(self.que_hdlr) + + def tearDown(self): + self.que_hdlr.close() + BaseTest.tearDown(self) + + def test_queue_handler(self): + self.que_logger.debug(self.next_message()) + self.assertRaises(queue.Empty, self.queue.get_nowait) + self.que_logger.info(self.next_message()) + self.assertRaises(queue.Empty, self.queue.get_nowait) + msg = self.next_message() + self.que_logger.warning(msg) + data = self.queue.get_nowait() + self.assertTrue(isinstance(data, logging.LogRecord)) + self.assertEqual(data.name, self.que_logger.name) + self.assertEqual((data.msg, data.args), (msg, None)) + +class FormatterTest(unittest.TestCase): + def setUp(self): + self.common = { + 'name': 'formatter.test', + 'level': logging.DEBUG, + 'pathname': os.path.join('path', 'to', 'dummy.ext'), + 'lineno': 42, + 'exc_info': None, + 'func': None, + 'msg': 'Message with %d %s', + 'args': (2, 'placeholders'), + } + self.variants = { + } + + def get_record(self, name=None): + result = dict(self.common) + if name is not None: + result.update(self.variants[name]) + return logging.makeLogRecord(result) + + def test_percent(self): + # Test %-formatting + r = self.get_record() + f = logging.Formatter('${%(message)s}') + self.assertEqual(f.format(r), '${Message with 2 placeholders}') + f = logging.Formatter('%(random)s') + self.assertRaises(KeyError, f.format, r) + self.assertFalse(f.usesTime()) + f = logging.Formatter('%(asctime)s') + self.assertTrue(f.usesTime()) + f = logging.Formatter('%(asctime)-15s') + self.assertTrue(f.usesTime()) + f = logging.Formatter('asctime') + self.assertFalse(f.usesTime()) + + def test_braces(self): + # Test {}-formatting + r = self.get_record() + f = logging.Formatter('$%{message}%$', style='{') + self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') + f = logging.Formatter('{random}', style='{') + self.assertRaises(KeyError, f.format, r) + self.assertFalse(f.usesTime()) + f = logging.Formatter('{asctime}', style='{') + self.assertTrue(f.usesTime()) + f = logging.Formatter('{asctime!s:15}', style='{') + self.assertTrue(f.usesTime()) + f = logging.Formatter('{asctime:15}', style='{') + self.assertTrue(f.usesTime()) + f = logging.Formatter('asctime', style='{') + self.assertFalse(f.usesTime()) + + def test_dollars(self): + # Test $-formatting + r = self.get_record() + f = logging.Formatter('$message', style='$') + self.assertEqual(f.format(r), 'Message with 2 placeholders') + f = logging.Formatter('$$%${message}%$$', style='$') + self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') + f = logging.Formatter('${random}', style='$') + self.assertRaises(KeyError, f.format, r) + self.assertFalse(f.usesTime()) + f = logging.Formatter('${asctime}', style='$') + self.assertTrue(f.usesTime()) + f = logging.Formatter('${asctime', style='$') + self.assertFalse(f.usesTime()) + f = logging.Formatter('$asctime', style='$') + self.assertTrue(f.usesTime()) + f = logging.Formatter('asctime', style='$') + self.assertFalse(f.usesTime()) + +class LastResortTest(BaseTest): + def test_last_resort(self): + # Test the last resort handler + root = self.root_logger + root.removeHandler(self.root_hdlr) + old_stderr = sys.stderr + old_lastresort = logging.lastResort + old_raise_exceptions = logging.raiseExceptions + try: + sys.stderr = sio = io.StringIO() + root.warning('This is your final chance!') + self.assertEqual(sio.getvalue(), 'This is your final chance!\n') + #No handlers and no last resort, so 'No handlers' message + logging.lastResort = None + sys.stderr = sio = io.StringIO() + root.warning('This is your final chance!') + self.assertEqual(sio.getvalue(), 'No handlers could be found for logger "root"\n') + # 'No handlers' message only printed once + sys.stderr = sio = io.StringIO() + root.warning('This is your final chance!') + self.assertEqual(sio.getvalue(), '') + root.manager.emittedNoHandlerWarning = False + #If raiseExceptions is False, no message is printed + logging.raiseExceptions = False + sys.stderr = sio = io.StringIO() + root.warning('This is your final chance!') + self.assertEqual(sio.getvalue(), '') + finally: + sys.stderr = old_stderr + root.addHandler(self.root_hdlr) + logging.lastResort = old_lastresort + logging.raiseExceptions = old_raise_exceptions + + +class BaseFileTest(BaseTest): + "Base class for handler tests that write log files" + + def setUp(self): + BaseTest.setUp(self) + fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-") + os.close(fd) + self.rmfiles = [] + + def tearDown(self): + for fn in self.rmfiles: + os.unlink(fn) + if os.path.exists(self.fn): + os.unlink(self.fn) + BaseTest.tearDown(self) + + def assertLogFile(self, filename): + "Assert a log file is there and register it for deletion" + self.assertTrue(os.path.exists(filename), + msg="Log file %r does not exist") + self.rmfiles.append(filename) + + +class RotatingFileHandlerTest(BaseFileTest): + def next_rec(self): + return logging.LogRecord('n', logging.DEBUG, 'p', 1, + self.next_message(), None, None, None) + + def test_should_not_rollover(self): + # If maxbytes is zero rollover never occurs + rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0) + self.assertFalse(rh.shouldRollover(None)) + rh.close() + + def test_should_rollover(self): + rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1) + self.assertTrue(rh.shouldRollover(self.next_rec())) + rh.close() + + def test_file_created(self): + # checks that the file is created and assumes it was created + # by us + rh = logging.handlers.RotatingFileHandler(self.fn) + rh.emit(self.next_rec()) + self.assertLogFile(self.fn) + rh.close() + + def test_rollover_filenames(self): + rh = logging.handlers.RotatingFileHandler( + self.fn, backupCount=2, maxBytes=1) + rh.emit(self.next_rec()) + self.assertLogFile(self.fn) + rh.emit(self.next_rec()) + self.assertLogFile(self.fn + ".1") + rh.emit(self.next_rec()) + self.assertLogFile(self.fn + ".2") + self.assertFalse(os.path.exists(self.fn + ".3")) + rh.close() + +class TimedRotatingFileHandlerTest(BaseFileTest): + # test methods added below + pass + +def secs(**kw): + return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) + +for when, exp in (('S', 1), + ('M', 60), + ('H', 60 * 60), + ('D', 60 * 60 * 24), + ('MIDNIGHT', 60 * 60 * 24), + # current time (epoch start) is a Thursday, W0 means Monday + ('W0', secs(days=4, hours=24)), + ): + def test_compute_rollover(self, when=when, exp=exp): + rh = logging.handlers.TimedRotatingFileHandler( + self.fn, when=when, interval=1, backupCount=0, utc=True) + currentTime = 0.0 + actual = rh.computeRollover(currentTime) + if exp != actual: + # Failures occur on some systems for MIDNIGHT and W0. + # Print detailed calculation for MIDNIGHT so we can try to see + # what's going on + import time + if when == 'MIDNIGHT': + try: + if rh.utc: + t = time.gmtime(currentTime) + else: + t = time.localtime(currentTime) + currentHour = t[3] + currentMinute = t[4] + currentSecond = t[5] + # r is the number of seconds left between now and midnight + r = logging.handlers._MIDNIGHT - ((currentHour * 60 + + currentMinute) * 60 + + currentSecond) + result = currentTime + r + print('t: %s (%s)' % (t, rh.utc), file=sys.stderr) + print('currentHour: %s' % currentHour, file=sys.stderr) + print('currentMinute: %s' % currentMinute, file=sys.stderr) + print('currentSecond: %s' % currentSecond, file=sys.stderr) + print('r: %s' % r, file=sys.stderr) + print('result: %s' % result, file=sys.stderr) + except Exception: + print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr) + self.assertEqual(exp, actual) + rh.close() + setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) + # Set the locale to the platform-dependent default. I have no idea # why the test does this, but in any case we save the current locale # first and restore it at the end. @run_with_locale('LC_ALL', '') def test_main(): run_unittest(BuiltinLevelsTest, BasicFilterTest, - CustomLevelsAndFiltersTest, MemoryHandlerTest, - ConfigFileTest, SocketHandlerTest, MemoryTest, - EncodingTest, WarningsTest) + CustomLevelsAndFiltersTest, MemoryHandlerTest, + ConfigFileTest, SocketHandlerTest, MemoryTest, + EncodingTest, WarningsTest, ConfigDictTest, ManagerTest, + FormatterTest, + LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest, + RotatingFileHandlerTest, + LastResortTest, + TimedRotatingFileHandlerTest + ) if __name__ == "__main__": test_main() |
