diff options
Diffstat (limited to 'Lib/test/test_logging.py')
| -rw-r--r-- | Lib/test/test_logging.py | 1212 | 
1 files changed, 1186 insertions, 26 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index 05a7f9e..b29d400 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -1,6 +1,6 @@  #!/usr/bin/env python  # -# Copyright 2001-2010 by Vinay Sajip. All Rights Reserved. +# Copyright 2001-2011 by Vinay Sajip. All Rights Reserved.  #  # Permission to use, copy, modify, and distribute this software and its  # documentation for any purpose and without fee is hereby granted, @@ -18,7 +18,7 @@  """Test harness for the logging module. Run all tests. -Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved. +Copyright (C) 2001-2011 Vinay Sajip. All Rights Reserved.  """  import logging @@ -26,27 +26,29 @@ import logging.handlers  import logging.config  import codecs -import copy +import datetime  import pickle  import io  import gc +import json  import os +import queue  import re  import select  import socket  from socketserver import ThreadingTCPServer, StreamRequestHandler -import string  import struct  import sys  import tempfile  from test.support import captured_stdout, run_with_locale, run_unittest  import textwrap -import threading -import time -import types  import unittest  import warnings  import weakref +try: +    import threading +except ImportError: +    threading = None  class BaseTest(unittest.TestCase): @@ -65,11 +67,21 @@ class BaseTest(unittest.TestCase):          try:              self.saved_handlers = logging._handlers.copy()              self.saved_handler_list = logging._handlerList[:] -            self.saved_loggers = logger_dict.copy() +            self.saved_loggers = saved_loggers = logger_dict.copy()              self.saved_level_names = logging._levelNames.copy() +            self.logger_states = logger_states = {} +            for name in saved_loggers: +                logger_states[name] = getattr(saved_loggers[name], +                                              'disabled', None)          finally:              logging._releaseLock() +        # Set two unused loggers: one non-ASCII and one Unicode. +        # This is to test correct operation when sorting existing +        # loggers in the configuration code. See issue 8201. +        self.logger1 = logging.getLogger("\xab\xd7\xbb") +        self.logger2 = logging.getLogger("\u013f\u00d6\u0047") +          self.root_logger = logging.getLogger("")          self.original_logging_level = self.root_logger.getEffectiveLevel() @@ -78,13 +90,25 @@ class BaseTest(unittest.TestCase):          self.root_hdlr = logging.StreamHandler(self.stream)          self.root_formatter = logging.Formatter(self.log_format)          self.root_hdlr.setFormatter(self.root_formatter) +        if self.logger1.hasHandlers(): +            hlist = self.logger1.handlers + self.root_logger.handlers +            raise AssertionError('Unexpected handlers: %s' % hlist) +        if self.logger2.hasHandlers(): +            hlist = self.logger2.handlers + self.root_logger.handlers +            raise AssertionError('Unexpected handlers: %s' % hlist)          self.root_logger.addHandler(self.root_hdlr) +        self.assertTrue(self.logger1.hasHandlers()) +        self.assertTrue(self.logger2.hasHandlers())      def tearDown(self):          """Remove our logging stream, and restore the original logging          level."""          self.stream.close()          self.root_logger.removeHandler(self.root_hdlr) +        while self.root_logger.handlers: +            h = self.root_logger.handlers[0] +            self.root_logger.removeHandler(h) +            h.close()          self.root_logger.setLevel(self.original_logging_level)          logging._acquireLock()          try: @@ -96,6 +120,10 @@ class BaseTest(unittest.TestCase):              loggerDict = logging.getLogger().manager.loggerDict              loggerDict.clear()              loggerDict.update(self.saved_loggers) +            logger_states = self.logger_states +            for name in self.logger_states: +                if logger_states[name] is not None: +                    self.saved_loggers[name].disabled = logger_states[name]          finally:              logging._releaseLock() @@ -111,7 +139,8 @@ class BaseTest(unittest.TestCase):          except AttributeError:              # StringIO.StringIO lacks a reset() method.              actual_lines = stream.getvalue().splitlines() -        self.assertEqual(len(actual_lines), len(expected_values)) +        self.assertEqual(len(actual_lines), len(expected_values), +                          '%s vs. %s' % (actual_lines, expected_values))          for actual, expected in zip(actual_lines, expected_values):              match = pat.search(actual)              if not match: @@ -138,7 +167,7 @@ class BuiltinLevelsTest(BaseTest):          ERR = logging.getLogger("ERR")          ERR.setLevel(logging.ERROR) -        INF = logging.getLogger("INF") +        INF = logging.LoggerAdapter(logging.getLogger("INF"), {})          INF.setLevel(logging.INFO)          DEB = logging.getLogger("DEB")          DEB.setLevel(logging.DEBUG) @@ -292,6 +321,35 @@ class BasicFilterTest(BaseTest):          finally:              handler.removeFilter(filter_) +    def test_callable_filter(self): +        # Only messages satisfying the specified criteria pass through the +        #  filter. + +        def filterfunc(record): +            parts = record.name.split('.') +            prefix = '.'.join(parts[:2]) +            return prefix == 'spam.eggs' + +        handler = self.root_logger.handlers[0] +        try: +            handler.addFilter(filterfunc) +            spam = logging.getLogger("spam") +            spam_eggs = logging.getLogger("spam.eggs") +            spam_eggs_fish = logging.getLogger("spam.eggs.fish") +            spam_bakedbeans = logging.getLogger("spam.bakedbeans") + +            spam.info(self.next_message()) +            spam_eggs.info(self.next_message())  # Good. +            spam_eggs_fish.info(self.next_message())  # Good. +            spam_bakedbeans.info(self.next_message()) + +            self.assert_log_lines([ +                ('spam.eggs', 'INFO', '2'), +                ('spam.eggs.fish', 'INFO', '3'), +            ]) +        finally: +            handler.removeFilter(filterfunc) +  #  #   First, we define our levels. There can be as many as you want - the only @@ -355,7 +413,7 @@ class CustomLevelsAndFiltersTest(BaseTest):      def setUp(self):          BaseTest.setUp(self) -        for k, v in list(my_logging_levels.items()): +        for k, v in my_logging_levels.items():              logging.addLevelName(k, v)      def log_at_all_levels(self, logger): @@ -632,14 +690,8 @@ class ConfigFileTest(BaseTest):      """      def apply_config(self, conf): -        try: -            fn = tempfile.mktemp(".ini") -            f = open(fn, "w") -            f.write(textwrap.dedent(conf)) -            f.close() -            logging.config.fileConfig(fn) -        finally: -            os.remove(fn) +        file = io.StringIO(textwrap.dedent(conf)) +        logging.config.fileConfig(file)      def test_config0_ok(self):          # A simple config file which overrides the default settings. @@ -763,6 +815,7 @@ class LogRecordSocketReceiver(ThreadingTCPServer):          self.server_close() +@unittest.skipUnless(threading, 'Threading required for this test.')  class SocketHandlerTest(BaseTest):      """Test for SocketHandler objects.""" @@ -831,7 +884,7 @@ class MemoryTest(BaseTest):          # Trigger cycle breaking.          gc.collect()          dead = [] -        for (id_, repr_), ref in list(self._survivors.items()): +        for (id_, repr_), ref in self._survivors.items():              if ref() is None:                  dead.append(repr_)          if dead: @@ -866,11 +919,12 @@ class EncodingTest(BaseTest):      def test_encoding_plain_file(self):          # In Python 2.x, a plain file object is treated as having no encoding.          log = logging.getLogger("test") -        fn = tempfile.mktemp(".log") +        fd, fn = tempfile.mkstemp(".log", "test_logging-1-") +        os.close(fd)          # the non-ascii data we write to the log.          data = "foo\x80"          try: -            handler = logging.FileHandler(fn, encoding="utf8") +            handler = logging.FileHandler(fn, encoding="utf-8")              log.addHandler(handler)              try:                  # write non-ascii data to the log. @@ -879,7 +933,7 @@ class EncodingTest(BaseTest):                  log.removeHandler(handler)                  handler.close()              # check we wrote exactly those bytes, ignoring trailing \n etc -            f = open(fn, encoding="utf8") +            f = open(fn, encoding="utf-8")              try:                  self.assertEqual(f.read().rstrip(), data)              finally: @@ -938,15 +992,1121 @@ class WarningsTest(BaseTest):              finally:                  logging.captureWarnings(False) + +def formatFunc(format, datefmt=None): +    return logging.Formatter(format, datefmt) + +def handlerFunc(): +    return logging.StreamHandler() + +class CustomHandler(logging.StreamHandler): +    pass + +class ConfigDictTest(BaseTest): + +    """Reading logging config from a dictionary.""" + +    expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" + +    # config0 is a standard configuration. +    config0 = { +        'version': 1, +        'formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'root' : { +            'level' : 'WARNING', +            'handlers' : ['hand1'], +        }, +    } + +    # config1 adds a little to the standard configuration. +    config1 = { +        'version': 1, +        'formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'DEBUG', +                'handlers' : ['hand1'], +            }, +        }, +        'root' : { +            'level' : 'WARNING', +        }, +    } + +    # config2 has a subtle configuration error that should be reported +    config2 = { +        'version': 1, +        'formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdbout', +            }, +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'DEBUG', +                'handlers' : ['hand1'], +            }, +        }, +        'root' : { +            'level' : 'WARNING', +        }, +    } + +    #As config1 but with a misspelt level on a handler +    config2a = { +        'version': 1, +        'formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NTOSET', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'DEBUG', +                'handlers' : ['hand1'], +            }, +        }, +        'root' : { +            'level' : 'WARNING', +        }, +    } + + +    #As config1 but with a misspelt level on a logger +    config2b = { +        'version': 1, +        'formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'DEBUG', +                'handlers' : ['hand1'], +            }, +        }, +        'root' : { +            'level' : 'WRANING', +        }, +    } + +    # config3 has a less subtle configuration error +    config3 = { +        'version': 1, +        'formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'misspelled_name', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'DEBUG', +                'handlers' : ['hand1'], +            }, +        }, +        'root' : { +            'level' : 'WARNING', +        }, +    } + +    # config4 specifies a custom formatter class to be loaded +    config4 = { +        'version': 1, +        'formatters': { +            'form1' : { +                '()' : __name__ + '.ExceptionFormatter', +                'format' : '%(levelname)s:%(name)s:%(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'root' : { +            'level' : 'NOTSET', +                'handlers' : ['hand1'], +        }, +    } + +    # As config4 but using an actual callable rather than a string +    config4a = { +        'version': 1, +        'formatters': { +            'form1' : { +                '()' : ExceptionFormatter, +                'format' : '%(levelname)s:%(name)s:%(message)s', +            }, +            'form2' : { +                '()' : __name__ + '.formatFunc', +                'format' : '%(levelname)s:%(name)s:%(message)s', +            }, +            'form3' : { +                '()' : formatFunc, +                'format' : '%(levelname)s:%(name)s:%(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +            }, +            'hand2' : { +                '()' : handlerFunc, +            }, +        }, +        'root' : { +            'level' : 'NOTSET', +                'handlers' : ['hand1'], +        }, +    } + +    # config5 specifies a custom handler class to be loaded +    config5 = { +        'version': 1, +        'formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : __name__ + '.CustomHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'DEBUG', +                'handlers' : ['hand1'], +            }, +        }, +        'root' : { +            'level' : 'WARNING', +        }, +    } + +    # config6 specifies a custom handler class to be loaded +    # but has bad arguments +    config6 = { +        'version': 1, +        'formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : __name__ + '.CustomHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +                '9' : 'invalid parameter name', +            }, +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'DEBUG', +                'handlers' : ['hand1'], +            }, +        }, +        'root' : { +            'level' : 'WARNING', +        }, +    } + +    #config 7 does not define compiler.parser but defines compiler.lexer +    #so compiler.parser should be disabled after applying it +    config7 = { +        'version': 1, +        'formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'loggers' : { +            'compiler.lexer' : { +                'level' : 'DEBUG', +                'handlers' : ['hand1'], +            }, +        }, +        'root' : { +            'level' : 'WARNING', +        }, +    } + +    config8 = { +        'version': 1, +        'disable_existing_loggers' : False, +        'formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'loggers' : { +            'compiler' : { +                'level' : 'DEBUG', +                'handlers' : ['hand1'], +            }, +            'compiler.lexer' : { +            }, +        }, +        'root' : { +            'level' : 'WARNING', +        }, +    } + +    config9 = { +        'version': 1, +        'formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'WARNING', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'WARNING', +                'handlers' : ['hand1'], +            }, +        }, +        'root' : { +            'level' : 'NOTSET', +        }, +    } + +    config9a = { +        'version': 1, +        'incremental' : True, +        'handlers' : { +            'hand1' : { +                'level' : 'WARNING', +            }, +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'INFO', +            }, +        }, +    } + +    config9b = { +        'version': 1, +        'incremental' : True, +        'handlers' : { +            'hand1' : { +                'level' : 'INFO', +            }, +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'INFO', +            }, +        }, +    } + +    #As config1 but with a filter added +    config10 = { +        'version': 1, +        'formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'filters' : { +            'filt1' : { +                'name' : 'compiler.parser', +            }, +        }, +        'handlers' : { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +                'filters' : ['filt1'], +            }, +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'DEBUG', +                'filters' : ['filt1'], +            }, +        }, +        'root' : { +            'level' : 'WARNING', +            'handlers' : ['hand1'], +        }, +    } + +    #As config1 but using cfg:// references +    config11 = { +        'version': 1, +        'true_formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handler_configs': { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'formatters' : 'cfg://true_formatters', +        'handlers' : { +            'hand1' : 'cfg://handler_configs[hand1]', +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'DEBUG', +                'handlers' : ['hand1'], +            }, +        }, +        'root' : { +            'level' : 'WARNING', +        }, +    } + +    #As config11 but missing the version key +    config12 = { +        'true_formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handler_configs': { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'formatters' : 'cfg://true_formatters', +        'handlers' : { +            'hand1' : 'cfg://handler_configs[hand1]', +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'DEBUG', +                'handlers' : ['hand1'], +            }, +        }, +        'root' : { +            'level' : 'WARNING', +        }, +    } + +    #As config11 but using an unsupported version +    config13 = { +        'version': 2, +        'true_formatters': { +            'form1' : { +                'format' : '%(levelname)s ++ %(message)s', +            }, +        }, +        'handler_configs': { +            'hand1' : { +                'class' : 'logging.StreamHandler', +                'formatter' : 'form1', +                'level' : 'NOTSET', +                'stream'  : 'ext://sys.stdout', +            }, +        }, +        'formatters' : 'cfg://true_formatters', +        'handlers' : { +            'hand1' : 'cfg://handler_configs[hand1]', +        }, +        'loggers' : { +            'compiler.parser' : { +                'level' : 'DEBUG', +                'handlers' : ['hand1'], +            }, +        }, +        'root' : { +            'level' : 'WARNING', +        }, +    } + +    def apply_config(self, conf): +        logging.config.dictConfig(conf) + +    def test_config0_ok(self): +        # A simple config which overrides the default settings. +        with captured_stdout() as output: +            self.apply_config(self.config0) +            logger = logging.getLogger() +            # Won't output anything +            logger.info(self.next_message()) +            # Outputs a message +            logger.error(self.next_message()) +            self.assert_log_lines([ +                ('ERROR', '2'), +            ], stream=output) +            # Original logger output is empty. +            self.assert_log_lines([]) + +    def test_config1_ok(self, config=config1): +        # A config defining a sub-parser as well. +        with captured_stdout() as output: +            self.apply_config(config) +            logger = logging.getLogger("compiler.parser") +            # Both will output a message +            logger.info(self.next_message()) +            logger.error(self.next_message()) +            self.assert_log_lines([ +                ('INFO', '1'), +                ('ERROR', '2'), +            ], stream=output) +            # Original logger output is empty. +            self.assert_log_lines([]) + +    def test_config2_failure(self): +        # A simple config which overrides the default settings. +        self.assertRaises(Exception, self.apply_config, self.config2) + +    def test_config2a_failure(self): +        # A simple config which overrides the default settings. +        self.assertRaises(Exception, self.apply_config, self.config2a) + +    def test_config2b_failure(self): +        # A simple config which overrides the default settings. +        self.assertRaises(Exception, self.apply_config, self.config2b) + +    def test_config3_failure(self): +        # A simple config which overrides the default settings. +        self.assertRaises(Exception, self.apply_config, self.config3) + +    def test_config4_ok(self): +        # A config specifying a custom formatter class. +        with captured_stdout() as output: +            self.apply_config(self.config4) +            #logger = logging.getLogger() +            try: +                raise RuntimeError() +            except RuntimeError: +                logging.exception("just testing") +            sys.stdout.seek(0) +            self.assertEqual(output.getvalue(), +                "ERROR:root:just testing\nGot a [RuntimeError]\n") +            # Original logger output is empty +            self.assert_log_lines([]) + +    def test_config4a_ok(self): +        # A config specifying a custom formatter class. +        with captured_stdout() as output: +            self.apply_config(self.config4a) +            #logger = logging.getLogger() +            try: +                raise RuntimeError() +            except RuntimeError: +                logging.exception("just testing") +            sys.stdout.seek(0) +            self.assertEqual(output.getvalue(), +                "ERROR:root:just testing\nGot a [RuntimeError]\n") +            # Original logger output is empty +            self.assert_log_lines([]) + +    def test_config5_ok(self): +        self.test_config1_ok(config=self.config5) + +    def test_config6_failure(self): +        self.assertRaises(Exception, self.apply_config, self.config6) + +    def test_config7_ok(self): +        with captured_stdout() as output: +            self.apply_config(self.config1) +            logger = logging.getLogger("compiler.parser") +            # Both will output a message +            logger.info(self.next_message()) +            logger.error(self.next_message()) +            self.assert_log_lines([ +                ('INFO', '1'), +                ('ERROR', '2'), +            ], stream=output) +            # Original logger output is empty. +            self.assert_log_lines([]) +        with captured_stdout() as output: +            self.apply_config(self.config7) +            logger = logging.getLogger("compiler.parser") +            self.assertTrue(logger.disabled) +            logger = logging.getLogger("compiler.lexer") +            # Both will output a message +            logger.info(self.next_message()) +            logger.error(self.next_message()) +            self.assert_log_lines([ +                ('INFO', '3'), +                ('ERROR', '4'), +            ], stream=output) +            # Original logger output is empty. +            self.assert_log_lines([]) + +    #Same as test_config_7_ok but don't disable old loggers. +    def test_config_8_ok(self): +        with captured_stdout() as output: +            self.apply_config(self.config1) +            logger = logging.getLogger("compiler.parser") +            # Both will output a message +            logger.info(self.next_message()) +            logger.error(self.next_message()) +            self.assert_log_lines([ +                ('INFO', '1'), +                ('ERROR', '2'), +            ], stream=output) +            # Original logger output is empty. +            self.assert_log_lines([]) +        with captured_stdout() as output: +            self.apply_config(self.config8) +            logger = logging.getLogger("compiler.parser") +            self.assertFalse(logger.disabled) +            # Both will output a message +            logger.info(self.next_message()) +            logger.error(self.next_message()) +            logger = logging.getLogger("compiler.lexer") +            # Both will output a message +            logger.info(self.next_message()) +            logger.error(self.next_message()) +            self.assert_log_lines([ +                ('INFO', '3'), +                ('ERROR', '4'), +                ('INFO', '5'), +                ('ERROR', '6'), +            ], stream=output) +            # Original logger output is empty. +            self.assert_log_lines([]) + +    def test_config_9_ok(self): +        with captured_stdout() as output: +            self.apply_config(self.config9) +            logger = logging.getLogger("compiler.parser") +            #Nothing will be output since both handler and logger are set to WARNING +            logger.info(self.next_message()) +            self.assert_log_lines([], stream=output) +            self.apply_config(self.config9a) +            #Nothing will be output since both handler is still set to WARNING +            logger.info(self.next_message()) +            self.assert_log_lines([], stream=output) +            self.apply_config(self.config9b) +            #Message should now be output +            logger.info(self.next_message()) +            self.assert_log_lines([ +                ('INFO', '3'), +            ], stream=output) + +    def test_config_10_ok(self): +        with captured_stdout() as output: +            self.apply_config(self.config10) +            logger = logging.getLogger("compiler.parser") +            logger.warning(self.next_message()) +            logger = logging.getLogger('compiler') +            #Not output, because filtered +            logger.warning(self.next_message()) +            logger = logging.getLogger('compiler.lexer') +            #Not output, because filtered +            logger.warning(self.next_message()) +            logger = logging.getLogger("compiler.parser.codegen") +            #Output, as not filtered +            logger.error(self.next_message()) +            self.assert_log_lines([ +                ('WARNING', '1'), +                ('ERROR', '4'), +            ], stream=output) + +    def test_config11_ok(self): +        self.test_config1_ok(self.config11) + +    def test_config12_failure(self): +        self.assertRaises(Exception, self.apply_config, self.config12) + +    def test_config13_failure(self): +        self.assertRaises(Exception, self.apply_config, self.config13) + +    @unittest.skipUnless(threading, 'listen() needs threading to work') +    def setup_via_listener(self, text): +        text = text.encode("utf-8") +        # Ask for a randomly assigned port (by using port 0) +        t = logging.config.listen(0) +        t.start() +        t.ready.wait() +        # Now get the port allocated +        port = t.port +        t.ready.clear() +        try: +            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +            sock.settimeout(2.0) +            sock.connect(('localhost', port)) + +            slen = struct.pack('>L', len(text)) +            s = slen + text +            sentsofar = 0 +            left = len(s) +            while left > 0: +                sent = sock.send(s[sentsofar:]) +                sentsofar += sent +                left -= sent +            sock.close() +        finally: +            t.ready.wait(2.0) +            logging.config.stopListening() +            t.join(2.0) + +    def test_listen_config_10_ok(self): +        with captured_stdout() as output: +            self.setup_via_listener(json.dumps(self.config10)) +            logger = logging.getLogger("compiler.parser") +            logger.warning(self.next_message()) +            logger = logging.getLogger('compiler') +            #Not output, because filtered +            logger.warning(self.next_message()) +            logger = logging.getLogger('compiler.lexer') +            #Not output, because filtered +            logger.warning(self.next_message()) +            logger = logging.getLogger("compiler.parser.codegen") +            #Output, as not filtered +            logger.error(self.next_message()) +            self.assert_log_lines([ +                ('WARNING', '1'), +                ('ERROR', '4'), +            ], stream=output) + +    def test_listen_config_1_ok(self): +        with captured_stdout() as output: +            self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) +            logger = logging.getLogger("compiler.parser") +            # Both will output a message +            logger.info(self.next_message()) +            logger.error(self.next_message()) +            self.assert_log_lines([ +                ('INFO', '1'), +                ('ERROR', '2'), +            ], stream=output) +            # Original logger output is empty. +            self.assert_log_lines([]) + + +class ManagerTest(BaseTest): +    def test_manager_loggerclass(self): +        logged = [] + +        class MyLogger(logging.Logger): +            def _log(self, level, msg, args, exc_info=None, extra=None): +                logged.append(msg) + +        man = logging.Manager(None) +        self.assertRaises(TypeError, man.setLoggerClass, int) +        man.setLoggerClass(MyLogger) +        logger = man.getLogger('test') +        logger.warning('should appear in logged') +        logging.warning('should not appear in logged') + +        self.assertEqual(logged, ['should appear in logged']) + + +class ChildLoggerTest(BaseTest): +    def test_child_loggers(self): +        r = logging.getLogger() +        l1 = logging.getLogger('abc') +        l2 = logging.getLogger('def.ghi') +        c1 = r.getChild('xyz') +        c2 = r.getChild('uvw.xyz') +        self.assertTrue(c1 is logging.getLogger('xyz')) +        self.assertTrue(c2 is logging.getLogger('uvw.xyz')) +        c1 = l1.getChild('def') +        c2 = c1.getChild('ghi') +        c3 = l1.getChild('def.ghi') +        self.assertTrue(c1 is logging.getLogger('abc.def')) +        self.assertTrue(c2 is logging.getLogger('abc.def.ghi')) +        self.assertTrue(c2 is c3) + + +class DerivedLogRecord(logging.LogRecord): +    pass + +class LogRecordFactoryTest(BaseTest): + +    def setUp(self): +        class CheckingFilter(logging.Filter): +            def __init__(self, cls): +                self.cls = cls + +            def filter(self, record): +                t = type(record) +                if t is not self.cls: +                    msg = 'Unexpected LogRecord type %s, expected %s' % (t, +                            self.cls) +                    raise TypeError(msg) +                return True + +        BaseTest.setUp(self) +        self.filter = CheckingFilter(DerivedLogRecord) +        self.root_logger.addFilter(self.filter) +        self.orig_factory = logging.getLogRecordFactory() + +    def tearDown(self): +        self.root_logger.removeFilter(self.filter) +        BaseTest.tearDown(self) +        logging.setLogRecordFactory(self.orig_factory) + +    def test_logrecord_class(self): +        self.assertRaises(TypeError, self.root_logger.warning, +                          self.next_message()) +        logging.setLogRecordFactory(DerivedLogRecord) +        self.root_logger.error(self.next_message()) +        self.assert_log_lines([ +           ('root', 'ERROR', '2'), +        ]) + + +class QueueHandlerTest(BaseTest): +    # Do not bother with a logger name group. +    expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + +    def setUp(self): +        BaseTest.setUp(self) +        self.queue = queue.Queue(-1) +        self.que_hdlr = logging.handlers.QueueHandler(self.queue) +        self.que_logger = logging.getLogger('que') +        self.que_logger.propagate = False +        self.que_logger.setLevel(logging.WARNING) +        self.que_logger.addHandler(self.que_hdlr) + +    def tearDown(self): +        self.que_hdlr.close() +        BaseTest.tearDown(self) + +    def test_queue_handler(self): +        self.que_logger.debug(self.next_message()) +        self.assertRaises(queue.Empty, self.queue.get_nowait) +        self.que_logger.info(self.next_message()) +        self.assertRaises(queue.Empty, self.queue.get_nowait) +        msg = self.next_message() +        self.que_logger.warning(msg) +        data = self.queue.get_nowait() +        self.assertTrue(isinstance(data, logging.LogRecord)) +        self.assertEqual(data.name, self.que_logger.name) +        self.assertEqual((data.msg, data.args), (msg, None)) + +class FormatterTest(unittest.TestCase): +    def setUp(self): +        self.common = { +            'name': 'formatter.test', +            'level': logging.DEBUG, +            'pathname': os.path.join('path', 'to', 'dummy.ext'), +            'lineno': 42, +            'exc_info': None, +            'func': None, +            'msg': 'Message with %d %s', +            'args': (2, 'placeholders'), +        } +        self.variants = { +        } + +    def get_record(self, name=None): +        result = dict(self.common) +        if name is not None: +            result.update(self.variants[name]) +        return logging.makeLogRecord(result) + +    def test_percent(self): +        # Test %-formatting +        r = self.get_record() +        f = logging.Formatter('${%(message)s}') +        self.assertEqual(f.format(r), '${Message with 2 placeholders}') +        f = logging.Formatter('%(random)s') +        self.assertRaises(KeyError, f.format, r) +        self.assertFalse(f.usesTime()) +        f = logging.Formatter('%(asctime)s') +        self.assertTrue(f.usesTime()) +        f = logging.Formatter('%(asctime)-15s') +        self.assertTrue(f.usesTime()) +        f = logging.Formatter('asctime') +        self.assertFalse(f.usesTime()) + +    def test_braces(self): +        # Test {}-formatting +        r = self.get_record() +        f = logging.Formatter('$%{message}%$', style='{') +        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') +        f = logging.Formatter('{random}', style='{') +        self.assertRaises(KeyError, f.format, r) +        self.assertFalse(f.usesTime()) +        f = logging.Formatter('{asctime}', style='{') +        self.assertTrue(f.usesTime()) +        f = logging.Formatter('{asctime!s:15}', style='{') +        self.assertTrue(f.usesTime()) +        f = logging.Formatter('{asctime:15}', style='{') +        self.assertTrue(f.usesTime()) +        f = logging.Formatter('asctime', style='{') +        self.assertFalse(f.usesTime()) + +    def test_dollars(self): +        # Test $-formatting +        r = self.get_record() +        f = logging.Formatter('$message', style='$') +        self.assertEqual(f.format(r), 'Message with 2 placeholders') +        f = logging.Formatter('$$%${message}%$$', style='$') +        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') +        f = logging.Formatter('${random}', style='$') +        self.assertRaises(KeyError, f.format, r) +        self.assertFalse(f.usesTime()) +        f = logging.Formatter('${asctime}', style='$') +        self.assertTrue(f.usesTime()) +        f = logging.Formatter('${asctime', style='$') +        self.assertFalse(f.usesTime()) +        f = logging.Formatter('$asctime', style='$') +        self.assertTrue(f.usesTime()) +        f = logging.Formatter('asctime', style='$') +        self.assertFalse(f.usesTime()) + +class LastResortTest(BaseTest): +    def test_last_resort(self): +        # Test the last resort handler +        root = self.root_logger +        root.removeHandler(self.root_hdlr) +        old_stderr = sys.stderr +        old_lastresort = logging.lastResort +        old_raise_exceptions = logging.raiseExceptions +        try: +            sys.stderr = sio = io.StringIO() +            root.warning('This is your final chance!') +            self.assertEqual(sio.getvalue(), 'This is your final chance!\n') +            #No handlers and no last resort, so 'No handlers' message +            logging.lastResort = None +            sys.stderr = sio = io.StringIO() +            root.warning('This is your final chance!') +            self.assertEqual(sio.getvalue(), 'No handlers could be found for logger "root"\n') +            # 'No handlers' message only printed once +            sys.stderr = sio = io.StringIO() +            root.warning('This is your final chance!') +            self.assertEqual(sio.getvalue(), '') +            root.manager.emittedNoHandlerWarning = False +            #If raiseExceptions is False, no message is printed +            logging.raiseExceptions = False +            sys.stderr = sio = io.StringIO() +            root.warning('This is your final chance!') +            self.assertEqual(sio.getvalue(), '') +        finally: +            sys.stderr = old_stderr +            root.addHandler(self.root_hdlr) +            logging.lastResort = old_lastresort +            logging.raiseExceptions = old_raise_exceptions + + +class BaseFileTest(BaseTest): +    "Base class for handler tests that write log files" + +    def setUp(self): +        BaseTest.setUp(self) +        fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-") +        os.close(fd) +        self.rmfiles = [] + +    def tearDown(self): +        for fn in self.rmfiles: +            os.unlink(fn) +        if os.path.exists(self.fn): +            os.unlink(self.fn) +        BaseTest.tearDown(self) + +    def assertLogFile(self, filename): +        "Assert a log file is there and register it for deletion" +        self.assertTrue(os.path.exists(filename), +                        msg="Log file %r does not exist") +        self.rmfiles.append(filename) + + +class RotatingFileHandlerTest(BaseFileTest): +    def next_rec(self): +        return logging.LogRecord('n', logging.DEBUG, 'p', 1, +                                 self.next_message(), None, None, None) + +    def test_should_not_rollover(self): +        # If maxbytes is zero rollover never occurs +        rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0) +        self.assertFalse(rh.shouldRollover(None)) +        rh.close() + +    def test_should_rollover(self): +        rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1) +        self.assertTrue(rh.shouldRollover(self.next_rec())) +        rh.close() + +    def test_file_created(self): +        # checks that the file is created and assumes it was created +        # by us +        rh = logging.handlers.RotatingFileHandler(self.fn) +        rh.emit(self.next_rec()) +        self.assertLogFile(self.fn) +        rh.close() + +    def test_rollover_filenames(self): +        rh = logging.handlers.RotatingFileHandler( +            self.fn, backupCount=2, maxBytes=1) +        rh.emit(self.next_rec()) +        self.assertLogFile(self.fn) +        rh.emit(self.next_rec()) +        self.assertLogFile(self.fn + ".1") +        rh.emit(self.next_rec()) +        self.assertLogFile(self.fn + ".2") +        self.assertFalse(os.path.exists(self.fn + ".3")) +        rh.close() + +class TimedRotatingFileHandlerTest(BaseFileTest): +    # test methods added below +    pass + +def secs(**kw): +    return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) + +for when, exp in (('S', 1), +                  ('M', 60), +                  ('H', 60 * 60), +                  ('D', 60 * 60 * 24), +                  ('MIDNIGHT', 60 * 60 * 24), +                  # current time (epoch start) is a Thursday, W0 means Monday +                  ('W0', secs(days=4, hours=24)), +                 ): +    def test_compute_rollover(self, when=when, exp=exp): +        rh = logging.handlers.TimedRotatingFileHandler( +            self.fn, when=when, interval=1, backupCount=0, utc=True) +        currentTime = 0.0 +        actual = rh.computeRollover(currentTime) +        if exp != actual: +            # Failures occur on some systems for MIDNIGHT and W0. +            # Print detailed calculation for MIDNIGHT so we can try to see +            # what's going on +            import time +            if when == 'MIDNIGHT': +                try: +                    if rh.utc: +                        t = time.gmtime(currentTime) +                    else: +                        t = time.localtime(currentTime) +                    currentHour = t[3] +                    currentMinute = t[4] +                    currentSecond = t[5] +                    # r is the number of seconds left between now and midnight +                    r = logging.handlers._MIDNIGHT - ((currentHour * 60 + +                                                       currentMinute) * 60 + +                            currentSecond) +                    result = currentTime + r +                    print('t: %s (%s)' % (t, rh.utc), file=sys.stderr) +                    print('currentHour: %s' % currentHour, file=sys.stderr) +                    print('currentMinute: %s' % currentMinute, file=sys.stderr) +                    print('currentSecond: %s' % currentSecond, file=sys.stderr) +                    print('r: %s' % r, file=sys.stderr) +                    print('result: %s' % result, file=sys.stderr) +                except Exception: +                    print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr) +        self.assertEqual(exp, actual) +        rh.close() +    setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) +  # Set the locale to the platform-dependent default.  I have no idea  # why the test does this, but in any case we save the current locale  # first and restore it at the end.  @run_with_locale('LC_ALL', '')  def test_main():      run_unittest(BuiltinLevelsTest, BasicFilterTest, -                    CustomLevelsAndFiltersTest, MemoryHandlerTest, -                    ConfigFileTest, SocketHandlerTest, MemoryTest, -                    EncodingTest, WarningsTest) +                 CustomLevelsAndFiltersTest, MemoryHandlerTest, +                 ConfigFileTest, SocketHandlerTest, MemoryTest, +                 EncodingTest, WarningsTest, ConfigDictTest, ManagerTest, +                 FormatterTest, +                 LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest, +                 RotatingFileHandlerTest, +                 LastResortTest, +                 TimedRotatingFileHandlerTest +                )  if __name__ == "__main__":      test_main()  | 
