diff options
Diffstat (limited to 'Lib/test/test_logging.py')
| -rw-r--r-- | Lib/test/test_logging.py | 576 |
1 files changed, 403 insertions, 173 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index f34172a..b17f5e5 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -24,6 +24,7 @@ import logging.handlers import logging.config import codecs +import configparser import datetime import pickle import io @@ -38,8 +39,10 @@ import socket import struct import sys import tempfile +from test.script_helper import assert_python_ok from test.support import (captured_stdout, run_with_locale, run_unittest, - patch, requires_zlib, TestHandler, Matcher) + patch, requires_zlib, TestHandler, Matcher, HOST, + swap_attr) import textwrap import time import unittest @@ -56,30 +59,27 @@ try: import smtpd from urllib.parse import urlparse, parse_qs from socketserver import (ThreadingUDPServer, DatagramRequestHandler, - ThreadingTCPServer, StreamRequestHandler) + ThreadingTCPServer, StreamRequestHandler, + ThreadingUnixStreamServer, + ThreadingUnixDatagramServer) except ImportError: threading = None try: - import win32evtlog + import win32evtlog, win32evtlogutil, pywintypes except ImportError: - win32evtlog = None -try: - import win32evtlogutil -except ImportError: - win32evtlogutil = None - win32evtlog = None + win32evtlog = win32evtlogutil = pywintypes = None + try: import zlib except ImportError: pass - class BaseTest(unittest.TestCase): """Base class for logging tests.""" log_format = "%(name)s -> %(levelname)s: %(message)s" - expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$" + expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$" message_num = 0 def setUp(self): @@ -91,7 +91,8 @@ class BaseTest(unittest.TestCase): self.saved_handlers = logging._handlers.copy() self.saved_handler_list = logging._handlerList[:] self.saved_loggers = saved_loggers = logger_dict.copy() - self.saved_level_names = logging._levelNames.copy() + self.saved_name_to_level = logging._nameToLevel.copy() + self.saved_level_to_name = logging._levelToName.copy() self.logger_states = logger_states = {} for name in saved_loggers: logger_states[name] = getattr(saved_loggers[name], @@ -133,8 +134,10 @@ class BaseTest(unittest.TestCase): self.root_logger.setLevel(self.original_logging_level) logging._acquireLock() try: - logging._levelNames.clear() - logging._levelNames.update(self.saved_level_names) + logging._levelToName.clear() + logging._levelToName.update(self.saved_level_to_name) + logging._nameToLevel.clear() + logging._nameToLevel.update(self.saved_name_to_level) logging._handlers.clear() logging._handlers.update(self.saved_handlers) logging._handlerList[:] = self.saved_handler_list @@ -148,12 +151,12 @@ class BaseTest(unittest.TestCase): finally: logging._releaseLock() - def assert_log_lines(self, expected_values, stream=None): + def assert_log_lines(self, expected_values, stream=None, pat=None): """Match the collected log lines against the regular expression self.expected_log_pat, and compare the extracted group values to the expected_values list of tuples.""" stream = stream or self.stream - pat = re.compile(self.expected_log_pat) + pat = re.compile(pat or self.expected_log_pat) actual_lines = stream.getvalue().splitlines() self.assertEqual(len(actual_lines), len(expected_values)) for actual, expected in zip(actual_lines, expected_values): @@ -307,6 +310,10 @@ class BuiltinLevelsTest(BaseTest): ('INF.BADPARENT', 'INFO', '4'), ]) + def test_regression_22386(self): + """See issue #22386 for more information.""" + self.assertEqual(logging.getLevelName('INFO'), logging.INFO) + self.assertEqual(logging.getLevelName(logging.INFO), 'INFO') class BasicFilterTest(BaseTest): @@ -428,7 +435,7 @@ class CustomLevelsAndFiltersTest(BaseTest): """Test various filtering possibilities with custom logging levels.""" # Skip the logger name group. - expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" def setUp(self): BaseTest.setUp(self) @@ -558,7 +565,7 @@ class HandlerTest(BaseTest): self.assertEqual(h.facility, h.LOG_USER) self.assertTrue(h.unixsocket) h.close() - except socket.error: # syslogd might not be available + except OSError: # syslogd might not be available pass for method in ('GET', 'POST', 'PUT'): if method == 'PUT': @@ -583,6 +590,7 @@ class HandlerTest(BaseTest): for _ in range(tries): try: os.unlink(fname) + self.deletion_time = time.time() except OSError: pass time.sleep(0.004 * random.randint(0, 4)) @@ -590,6 +598,9 @@ class HandlerTest(BaseTest): del_count = 500 log_count = 500 + self.handle_time = None + self.deletion_time = None + for delay in (False, True): fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') os.close(fd) @@ -603,7 +614,14 @@ class HandlerTest(BaseTest): for _ in range(log_count): time.sleep(0.005) r = logging.makeLogRecord({'msg': 'testing' }) - h.handle(r) + try: + self.handle_time = time.time() + h.handle(r) + except Exception: + print('Deleted at %s, ' + 'opened at %s' % (self.deletion_time, + self.handle_time)) + raise finally: remover.join() h.close() @@ -645,41 +663,6 @@ class StreamHandlerTest(BaseTest): # -- if it proves to be of wider utility than just test_logging if threading: - class TestSMTPChannel(smtpd.SMTPChannel): - """ - This derived class has had to be created because smtpd does not - support use of custom channel maps, although they are allowed by - asyncore's design. Issue #11959 has been raised to address this, - and if resolved satisfactorily, some of this code can be removed. - """ - def __init__(self, server, conn, addr, sockmap): - asynchat.async_chat.__init__(self, conn, sockmap) - self.smtp_server = server - self.conn = conn - self.addr = addr - self.data_size_limit = None - self.received_lines = [] - self.smtp_state = self.COMMAND - self.seen_greeting = '' - self.mailfrom = None - self.rcpttos = [] - self.received_data = '' - self.fqdn = socket.getfqdn() - self.num_bytes = 0 - try: - self.peer = conn.getpeername() - except socket.error as err: - # a race condition may occur if the other end is closing - # before we can get the peername - self.close() - if err.args[0] != errno.ENOTCONN: - raise - return - self.push('220 %s %s' % (self.fqdn, smtpd.__version__)) - self.set_terminator(b'\r\n') - self.extended_smtp = False - - class TestSMTPServer(smtpd.SMTPServer): """ This class implements a test SMTP server. @@ -700,37 +683,14 @@ if threading: :func:`asyncore.loop`. This avoids changing the :mod:`asyncore` module's global state. """ - channel_class = TestSMTPChannel def __init__(self, addr, handler, poll_interval, sockmap): - self._localaddr = addr - self._remoteaddr = None - self.data_size_limit = None - self.sockmap = sockmap - asyncore.dispatcher.__init__(self, map=sockmap) - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setblocking(0) - self.set_socket(sock, map=sockmap) - # try to re-use a server port if possible - self.set_reuse_addr() - self.bind(addr) - self.port = sock.getsockname()[1] - self.listen(5) - except: - self.close() - raise + smtpd.SMTPServer.__init__(self, addr, None, map=sockmap) + self.port = self.socket.getsockname()[1] self._handler = handler self._thread = None self.poll_interval = poll_interval - def handle_accepted(self, conn, addr): - """ - Redefined only because the base class does not pass in a - map, forcing use of a global in :mod:`asyncore`. - """ - channel = self.channel_class(self, conn, addr, self.sockmap) - def process_message(self, peer, mailfrom, rcpttos, data): """ Delegates to the handler passed in to the server's constructor. @@ -761,8 +721,8 @@ if threading: :func:`asyncore.loop`. """ try: - asyncore.loop(poll_interval, map=self.sockmap) - except select.error: + asyncore.loop(poll_interval, map=self._map) + except OSError: # On FreeBSD 8, closing the server repeatably # raises this error. We swallow it if the # server has been closed. @@ -869,7 +829,7 @@ if threading: sock, addr = self.socket.accept() if self.sslctx: sock = self.sslctx.wrap_socket(sock, server_side=True) - except socket.error as e: + except OSError as e: # socket errors are silenced by the caller, print them here sys.stderr.write("Got an error:\n%s\n" % e) raise @@ -935,7 +895,7 @@ if threading: if data: try: super(DelegatingUDPRequestHandler, self).finish() - except socket.error: + except OSError: if not self.server._closed: raise @@ -953,6 +913,13 @@ if threading: super(TestUDPServer, self).server_close() self._closed = True + if hasattr(socket, "AF_UNIX"): + class TestUnixStreamServer(TestTCPServer): + address_family = socket.AF_UNIX + + class TestUnixDatagramServer(TestUDPServer): + address_family = socket.AF_UNIX + # - end of server_helper section @unittest.skipUnless(threading, 'Threading required for this test.') @@ -960,15 +927,15 @@ class SMTPHandlerTest(BaseTest): TIMEOUT = 8.0 def test_basic(self): sockmap = {} - server = TestSMTPServer(('localhost', 0), self.process_message, 0.001, + server = TestSMTPServer((HOST, 0), self.process_message, 0.001, sockmap) server.start() - addr = ('localhost', server.port) + addr = (HOST, server.port) h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', timeout=self.TIMEOUT) self.assertEqual(h.toaddrs, ['you']) self.messages = [] - r = logging.makeLogRecord({'msg': 'Hello'}) + r = logging.makeLogRecord({'msg': 'Hello \u2713'}) self.handled = threading.Event() h.handle(r) self.handled.wait(self.TIMEOUT) # 14314: don't wait forever @@ -979,7 +946,7 @@ class SMTPHandlerTest(BaseTest): self.assertEqual(mailfrom, 'me') self.assertEqual(rcpttos, ['you']) self.assertIn('\nSubject: Log\n', data) - self.assertTrue(data.endswith('\n\nHello')) + self.assertTrue(data.endswith('\n\nHello \u2713')) h.close() def process_message(self, *args): @@ -991,7 +958,7 @@ class MemoryHandlerTest(BaseTest): """Tests for the MemoryHandler.""" # Do not bother with a logger name group. - expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" def setUp(self): BaseTest.setUp(self) @@ -1044,7 +1011,7 @@ class ConfigFileTest(BaseTest): """Reading logging config from a .ini-style config file.""" - expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" + expected_log_pat = r"^(\w+) \+\+ (\w+)$" # config0 is a standard configuration. config0 = """ @@ -1292,6 +1259,24 @@ class ConfigFileTest(BaseTest): # Original logger output is empty. self.assert_log_lines([]) + def test_config0_using_cp_ok(self): + # A simple config file which overrides the default settings. + with captured_stdout() as output: + file = io.StringIO(textwrap.dedent(self.config0)) + cp = configparser.ConfigParser() + cp.read_file(file) + logging.config.fileConfig(cp) + logger = logging.getLogger() + # Won't output anything + logger.info(self.next_message()) + # Outputs a message + logger.error(self.next_message()) + self.assert_log_lines([ + ('ERROR', '2'), + ], stream=output) + # Original logger output is empty. + self.assert_log_lines([]) + def test_config1_ok(self, config=config1): # A config file defining a sub-parser as well. with captured_stdout() as output: @@ -1381,7 +1366,7 @@ class ConfigFileTest(BaseTest): def test_logger_disabling(self): self.apply_config(self.disable_test) - logger = logging.getLogger('foo') + logger = logging.getLogger('some_pristine_logger') self.assertFalse(logger.disabled) self.apply_config(self.disable_test) self.assertTrue(logger.disabled) @@ -1394,17 +1379,23 @@ class SocketHandlerTest(BaseTest): """Test for SocketHandler objects.""" + if threading: + server_class = TestTCPServer + address = ('localhost', 0) + def setUp(self): """Set up a TCP server to receive log messages, and a SocketHandler pointing to that server's address and port.""" BaseTest.setUp(self) - addr = ('localhost', 0) - self.server = server = TestTCPServer(addr, self.handle_socket, - 0.01) + self.server = server = self.server_class(self.address, + self.handle_socket, 0.01) server.start() server.ready.wait() - self.sock_hdlr = logging.handlers.SocketHandler('localhost', - server.port) + hcls = logging.handlers.SocketHandler + if isinstance(server.server_address, tuple): + self.sock_hdlr = hcls('localhost', server.port) + else: + self.sock_hdlr = hcls(server.server_address, None) self.log_output = '' self.root_logger.removeHandler(self.root_logger.handlers[0]) self.root_logger.addHandler(self.sock_hdlr) @@ -1444,35 +1435,70 @@ class SocketHandlerTest(BaseTest): self.assertEqual(self.log_output, "spam\neggs\n") def test_noserver(self): + # Avoid timing-related failures due to SocketHandler's own hard-wired + # one-second timeout on socket.create_connection() (issue #16264). + self.sock_hdlr.retryStart = 2.5 # Kill the server self.server.stop(2.0) - #The logging call should try to connect, which should fail + # The logging call should try to connect, which should fail try: raise RuntimeError('Deliberate mistake') except RuntimeError: self.root_logger.exception('Never sent') self.root_logger.error('Never sent, either') now = time.time() - self.assertTrue(self.sock_hdlr.retryTime > now) + self.assertGreater(self.sock_hdlr.retryTime, now) time.sleep(self.sock_hdlr.retryTime - now + 0.001) self.root_logger.error('Nor this') +def _get_temp_domain_socket(): + fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock') + os.close(fd) + # just need a name - file can't be present, or we'll get an + # 'address already in use' error. + os.remove(fn) + return fn + +@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") +@unittest.skipUnless(threading, 'Threading required for this test.') +class UnixSocketHandlerTest(SocketHandlerTest): + + """Test for SocketHandler with unix sockets.""" + + if threading and hasattr(socket, "AF_UNIX"): + server_class = TestUnixStreamServer + + def setUp(self): + # override the definition in the base class + self.address = _get_temp_domain_socket() + SocketHandlerTest.setUp(self) + + def tearDown(self): + SocketHandlerTest.tearDown(self) + os.remove(self.address) @unittest.skipUnless(threading, 'Threading required for this test.') class DatagramHandlerTest(BaseTest): """Test for DatagramHandler.""" + if threading: + server_class = TestUDPServer + address = ('localhost', 0) + def setUp(self): """Set up a UDP server to receive log messages, and a DatagramHandler pointing to that server's address and port.""" BaseTest.setUp(self) - addr = ('localhost', 0) - self.server = server = TestUDPServer(addr, self.handle_datagram, 0.01) + self.server = server = self.server_class(self.address, + self.handle_datagram, 0.01) server.start() server.ready.wait() - self.sock_hdlr = logging.handlers.DatagramHandler('localhost', - server.port) + hcls = logging.handlers.DatagramHandler + if isinstance(server.server_address, tuple): + self.sock_hdlr = hcls('localhost', server.port) + else: + self.sock_hdlr = hcls(server.server_address, None) self.log_output = '' self.root_logger.removeHandler(self.root_logger.handlers[0]) self.root_logger.addHandler(self.sock_hdlr) @@ -1505,23 +1531,46 @@ class DatagramHandlerTest(BaseTest): self.handled.wait() self.assertEqual(self.log_output, "spam\neggs\n") +@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") +@unittest.skipUnless(threading, 'Threading required for this test.') +class UnixDatagramHandlerTest(DatagramHandlerTest): + + """Test for DatagramHandler using Unix sockets.""" + + if threading and hasattr(socket, "AF_UNIX"): + server_class = TestUnixDatagramServer + + def setUp(self): + # override the definition in the base class + self.address = _get_temp_domain_socket() + DatagramHandlerTest.setUp(self) + + def tearDown(self): + DatagramHandlerTest.tearDown(self) + os.remove(self.address) @unittest.skipUnless(threading, 'Threading required for this test.') class SysLogHandlerTest(BaseTest): """Test for SysLogHandler using UDP.""" + if threading: + server_class = TestUDPServer + address = ('localhost', 0) + def setUp(self): """Set up a UDP server to receive log messages, and a SysLogHandler pointing to that server's address and port.""" BaseTest.setUp(self) - addr = ('localhost', 0) - self.server = server = TestUDPServer(addr, self.handle_datagram, - 0.01) + self.server = server = self.server_class(self.address, + self.handle_datagram, 0.01) server.start() server.ready.wait() - self.sl_hdlr = logging.handlers.SysLogHandler(('localhost', - server.port)) + hcls = logging.handlers.SysLogHandler + if isinstance(server.server_address, tuple): + self.sl_hdlr = hcls(('localhost', server.port)) + else: + self.sl_hdlr = hcls(server.server_address) self.log_output = '' self.root_logger.removeHandler(self.root_logger.handlers[0]) self.root_logger.addHandler(self.sl_hdlr) @@ -1557,41 +1606,28 @@ class SysLogHandlerTest(BaseTest): self.handled.wait() self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m') +@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") +@unittest.skipUnless(threading, 'Threading required for this test.') +class UnixSysLogHandlerTest(SysLogHandlerTest): + + """Test for SysLogHandler with Unix sockets.""" + + if threading and hasattr(socket, "AF_UNIX"): + server_class = TestUnixDatagramServer + + def setUp(self): + # override the definition in the base class + self.address = _get_temp_domain_socket() + SysLogHandlerTest.setUp(self) + + def tearDown(self): + SysLogHandlerTest.tearDown(self) + os.remove(self.address) @unittest.skipUnless(threading, 'Threading required for this test.') class HTTPHandlerTest(BaseTest): """Test for HTTPHandler.""" - PEMFILE = """-----BEGIN RSA PRIVATE KEY----- -MIICXQIBAAKBgQDGT4xS5r91rbLJQK2nUDenBhBG6qFk+bVOjuAGC/LSHlAoBnvG -zQG3agOG+e7c5z2XT8m2ktORLqG3E4mYmbxgyhDrzP6ei2Anc+pszmnxPoK3Puh5 -aXV+XKt0bU0C1m2+ACmGGJ0t3P408art82nOxBw8ZHgIg9Dtp6xIUCyOqwIDAQAB -AoGBAJFTnFboaKh5eUrIzjmNrKsG44jEyy+vWvHN/FgSC4l103HxhmWiuL5Lv3f7 -0tMp1tX7D6xvHwIG9VWvyKb/Cq9rJsDibmDVIOslnOWeQhG+XwJyitR0pq/KlJIB -5LjORcBw795oKWOAi6RcOb1ON59tysEFYhAGQO9k6VL621gRAkEA/Gb+YXULLpbs -piXN3q4zcHzeaVANo69tUZ6TjaQqMeTxE4tOYM0G0ZoSeHEdaP59AOZGKXXNGSQy -2z/MddcYGQJBAMkjLSYIpOLJY11ja8OwwswFG2hEzHe0cS9bzo++R/jc1bHA5R0Y -i6vA5iPi+wopPFvpytdBol7UuEBe5xZrxWMCQQCWxELRHiP2yWpEeLJ3gGDzoXMN -PydWjhRju7Bx3AzkTtf+D6lawz1+eGTuEss5i0JKBkMEwvwnN2s1ce+EuF4JAkBb -E96h1lAzkVW5OAfYOPY8RCPA90ZO/hoyg7PpSxR0ECuDrgERR8gXIeYUYfejBkEa -rab4CfRoVJKKM28Yq/xZAkBvuq670JRCwOgfUTdww7WpdOQBYPkzQccsKNCslQW8 -/DyW6y06oQusSENUvynT6dr3LJxt/NgZPhZX2+k1eYDV ------END RSA PRIVATE KEY----- ------BEGIN CERTIFICATE----- -MIICGzCCAYSgAwIBAgIJAIq84a2Q/OvlMA0GCSqGSIb3DQEBBQUAMBQxEjAQBgNV -BAMTCWxvY2FsaG9zdDAeFw0xMTA1MjExMDIzMzNaFw03NTAzMjEwMzU1MTdaMBQx -EjAQBgNVBAMTCWxvY2FsaG9zdDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA -xk+MUua/da2yyUCtp1A3pwYQRuqhZPm1To7gBgvy0h5QKAZ7xs0Bt2oDhvnu3Oc9 -l0/JtpLTkS6htxOJmJm8YMoQ68z+notgJ3PqbM5p8T6Ctz7oeWl1flyrdG1NAtZt -vgAphhidLdz+NPGq7fNpzsQcPGR4CIPQ7aesSFAsjqsCAwEAAaN1MHMwHQYDVR0O -BBYEFLWaUPO6N7efGiuoS9i3DVYcUwn0MEQGA1UdIwQ9MDuAFLWaUPO6N7efGiuo -S9i3DVYcUwn0oRikFjAUMRIwEAYDVQQDEwlsb2NhbGhvc3SCCQCKvOGtkPzr5TAM -BgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4GBAMK5whPjLNQK1Ivvk88oqJqq -4f889OwikGP0eUhOBhbFlsZs+jq5YZC2UzHz+evzKBlgAP1u4lP/cB85CnjvWqM+ -1c/lywFHQ6HOdDeQ1L72tSYMrNOG4XNmLn0h7rx6GoTU7dcFRfseahBCq8mv0IDt -IRbTpvlHWPjsSvHz0ZOH ------END CERTIFICATE-----""" - def setUp(self): """Set up an HTTP server to receive log messages, and a HTTPHandler pointing to that server's address and port.""" @@ -1621,17 +1657,18 @@ IRbTpvlHWPjsSvHz0ZOH if secure: try: import ssl - fd, fn = tempfile.mkstemp() - os.close(fd) - with open(fn, 'w') as f: - f.write(self.PEMFILE) - sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - sslctx.load_cert_chain(fn) - os.unlink(fn) except ImportError: sslctx = None + else: + here = os.path.dirname(__file__) + localhost_cert = os.path.join(here, "keycert.pem") + sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + sslctx.load_cert_chain(localhost_cert) + + context = ssl.create_default_context(cafile=localhost_cert) else: sslctx = None + context = None self.server = server = TestHTTPServer(addr, self.handle_request, 0.01, sslctx=sslctx) server.start() @@ -1639,7 +1676,8 @@ IRbTpvlHWPjsSvHz0ZOH host = 'localhost:%d' % server.server_port secure_client = secure and sslctx self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob', - secure=secure_client) + secure=secure_client, + context=context) self.log_data = None root_logger.addHandler(self.h_hdlr) @@ -1779,7 +1817,7 @@ class WarningsTest(BaseTest): logger.removeHandler(h) s = stream.getvalue() h.close() - self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0) + self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0) #See if an explicit file uses the original implementation a_file = io.StringIO() @@ -1817,7 +1855,7 @@ class ConfigDictTest(BaseTest): """Reading logging config from a dictionary.""" - expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" + expected_log_pat = r"^(\w+) \+\+ (\w+)$" # config0 is a standard configuration. config0 = { @@ -2389,6 +2427,32 @@ class ConfigDictTest(BaseTest): }, } + # As config0, but with properties + config14 = { + 'version': 1, + 'formatters': { + 'form1' : { + 'format' : '%(levelname)s ++ %(message)s', + }, + }, + 'handlers' : { + 'hand1' : { + 'class' : 'logging.StreamHandler', + 'formatter' : 'form1', + 'level' : 'NOTSET', + 'stream' : 'ext://sys.stdout', + '.': { + 'foo': 'bar', + 'terminator': '!\n', + } + }, + }, + 'root' : { + 'level' : 'WARNING', + 'handlers' : ['hand1'], + }, + } + out_of_order = { "version": 1, "formatters": { @@ -2656,11 +2720,20 @@ class ConfigDictTest(BaseTest): def test_config13_failure(self): self.assertRaises(Exception, self.apply_config, self.config13) + def test_config14_ok(self): + with captured_stdout() as output: + self.apply_config(self.config14) + h = logging._handlers['hand1'] + self.assertEqual(h.foo, 'bar') + self.assertEqual(h.terminator, '!\n') + logging.warning('Exclamation') + self.assertTrue(output.getvalue().endswith('Exclamation!\n')) + @unittest.skipUnless(threading, 'listen() needs threading to work') - def setup_via_listener(self, text): + def setup_via_listener(self, text, verify=None): text = text.encode("utf-8") # Ask for a randomly assigned port (by using port 0) - t = logging.config.listen(0) + t = logging.config.listen(0, verify) t.start() t.ready.wait() # Now get the port allocated @@ -2720,6 +2793,69 @@ class ConfigDictTest(BaseTest): # Original logger output is empty. self.assert_log_lines([]) + @unittest.skipUnless(threading, 'Threading required for this test.') + def test_listen_verify(self): + + def verify_fail(stuff): + return None + + def verify_reverse(stuff): + return stuff[::-1] + + logger = logging.getLogger("compiler.parser") + to_send = textwrap.dedent(ConfigFileTest.config1) + # First, specify a verification function that will fail. + # We expect to see no output, since our configuration + # never took effect. + with captured_stdout() as output: + self.setup_via_listener(to_send, verify_fail) + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([], stream=output) + # Original logger output has the stuff we logged. + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], pat=r"^[\w.]+ -> (\w+): (\d+)$") + + # Now, perform no verification. Our configuration + # should take effect. + + with captured_stdout() as output: + self.setup_via_listener(to_send) # no verify callable specified + logger = logging.getLogger("compiler.parser") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '3'), + ('ERROR', '4'), + ], stream=output) + # Original logger output still has the stuff we logged before. + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], pat=r"^[\w.]+ -> (\w+): (\d+)$") + + # Now, perform verification which transforms the bytes. + + with captured_stdout() as output: + self.setup_via_listener(to_send[::-1], verify_reverse) + logger = logging.getLogger("compiler.parser") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assert_log_lines([ + ('INFO', '5'), + ('ERROR', '6'), + ], stream=output) + # Original logger output still has the stuff we logged before. + self.assert_log_lines([ + ('INFO', '1'), + ('ERROR', '2'), + ], pat=r"^[\w.]+ -> (\w+): (\d+)$") + def test_out_of_order(self): self.apply_config(self.out_of_order) handler = logging.getLogger('mymodule').handlers[0] @@ -2779,14 +2915,14 @@ class ChildLoggerTest(BaseTest): l2 = logging.getLogger('def.ghi') c1 = r.getChild('xyz') c2 = r.getChild('uvw.xyz') - self.assertTrue(c1 is logging.getLogger('xyz')) - self.assertTrue(c2 is logging.getLogger('uvw.xyz')) + self.assertIs(c1, logging.getLogger('xyz')) + self.assertIs(c2, logging.getLogger('uvw.xyz')) c1 = l1.getChild('def') c2 = c1.getChild('ghi') c3 = l1.getChild('def.ghi') - self.assertTrue(c1 is logging.getLogger('abc.def')) - self.assertTrue(c2 is logging.getLogger('abc.def.ghi')) - self.assertTrue(c2 is c3) + self.assertIs(c1, logging.getLogger('abc.def')) + self.assertIs(c2, logging.getLogger('abc.def.ghi')) + self.assertIs(c2, c3) class DerivedLogRecord(logging.LogRecord): @@ -2829,7 +2965,7 @@ class LogRecordFactoryTest(BaseTest): class QueueHandlerTest(BaseTest): # Do not bother with a logger name group. - expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" + expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" def setUp(self): BaseTest.setUp(self) @@ -3123,13 +3259,13 @@ class ShutdownTest(BaseTest): self.assertEqual('0 - release', self.called[-1]) def test_with_ioerror_in_acquire(self): - self._test_with_failure_in_method('acquire', IOError) + self._test_with_failure_in_method('acquire', OSError) def test_with_ioerror_in_flush(self): - self._test_with_failure_in_method('flush', IOError) + self._test_with_failure_in_method('flush', OSError) def test_with_ioerror_in_close(self): - self._test_with_failure_in_method('close', IOError) + self._test_with_failure_in_method('close', OSError) def test_with_valueerror_in_acquire(self): self._test_with_failure_in_method('acquire', ValueError) @@ -3235,6 +3371,25 @@ class ModuleLevelMiscTest(BaseTest): logging.setLoggerClass(logging.Logger) self.assertEqual(logging.getLoggerClass(), logging.Logger) + def test_logging_at_shutdown(self): + # Issue #20037 + code = """if 1: + import logging + + class A: + def __del__(self): + try: + raise ValueError("some error") + except Exception: + logging.exception("exception in __del__") + + a = A()""" + rc, out, err = assert_python_ok("-c", code) + err = err.decode() + self.assertIn("exception in __del__", err) + self.assertIn("ValueError: some error", err) + + class LogRecordTest(BaseTest): def test_str_rep(self): r = logging.makeLogRecord({}) @@ -3352,6 +3507,12 @@ class BasicConfigTest(unittest.TestCase): "ERROR:root:Log an error") def test_filename(self): + + def cleanup(h1, h2, fn): + h1.close() + h2.close() + os.remove(fn) + logging.basicConfig(filename='test.log') self.assertEqual(len(logging.root.handlers), 1) @@ -3359,17 +3520,23 @@ class BasicConfigTest(unittest.TestCase): self.assertIsInstance(handler, logging.FileHandler) expected = logging.FileHandler('test.log', 'a') - self.addCleanup(expected.close) self.assertEqual(handler.stream.mode, expected.stream.mode) self.assertEqual(handler.stream.name, expected.stream.name) + self.addCleanup(cleanup, handler, expected, 'test.log') def test_filemode(self): + + def cleanup(h1, h2, fn): + h1.close() + h2.close() + os.remove(fn) + logging.basicConfig(filename='test.log', filemode='wb') handler = logging.root.handlers[0] expected = logging.FileHandler('test.log', 'wb') - self.addCleanup(expected.close) self.assertEqual(handler.stream.mode, expected.stream.mode) + self.addCleanup(cleanup, handler, expected, 'test.log') def test_stream(self): stream = io.StringIO() @@ -3419,6 +3586,10 @@ class BasicConfigTest(unittest.TestCase): handlers=handlers) assertRaises(ValueError, logging.basicConfig, stream=stream, handlers=handlers) + # Issue 23207: test for invalid kwargs + assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO) + # Should pop both filename and filemode even if filename is None + logging.basicConfig(filename=None, filemode='a') def test_handlers(self): handlers = [ @@ -3574,18 +3745,12 @@ class LoggerTest(BaseTest): (exc.__class__, exc, exc.__traceback__)) def test_log_invalid_level_with_raise(self): - old_raise = logging.raiseExceptions - self.addCleanup(setattr, logging, 'raiseExecptions', old_raise) - - logging.raiseExceptions = True - self.assertRaises(TypeError, self.logger.log, '10', 'test message') + with swap_attr(logging, 'raiseExceptions', True): + self.assertRaises(TypeError, self.logger.log, '10', 'test message') def test_log_invalid_level_no_raise(self): - old_raise = logging.raiseExceptions - self.addCleanup(setattr, logging, 'raiseExecptions', old_raise) - - logging.raiseExceptions = False - self.logger.log('10', 'test message') # no exception happens + with swap_attr(logging, 'raiseExceptions', False): + self.logger.log('10', 'test message') # no exception happens def test_find_caller_with_stack_info(self): called = [] @@ -3825,6 +3990,63 @@ class TimedRotatingFileHandlerTest(BaseFileTest): assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, self.fn, 'W7', delay=True) + def test_compute_rollover_daily_attime(self): + currentTime = 0 + atTime = datetime.time(12, 0, 0) + rh = logging.handlers.TimedRotatingFileHandler( + self.fn, when='MIDNIGHT', interval=1, backupCount=0, utc=True, + atTime=atTime) + try: + actual = rh.computeRollover(currentTime) + self.assertEqual(actual, currentTime + 12 * 60 * 60) + + actual = rh.computeRollover(currentTime + 13 * 60 * 60) + self.assertEqual(actual, currentTime + 36 * 60 * 60) + finally: + rh.close() + + #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.') + def test_compute_rollover_weekly_attime(self): + currentTime = int(time.time()) + today = currentTime - currentTime % 86400 + + atTime = datetime.time(12, 0, 0) + + wday = time.gmtime(today).tm_wday + for day in range(7): + rh = logging.handlers.TimedRotatingFileHandler( + self.fn, when='W%d' % day, interval=1, backupCount=0, utc=True, + atTime=atTime) + try: + if wday > day: + # The rollover day has already passed this week, so we + # go over into next week + expected = (7 - wday + day) + else: + expected = (day - wday) + # At this point expected is in days from now, convert to seconds + expected *= 24 * 60 * 60 + # Add in the rollover time + expected += 12 * 60 * 60 + # Add in adjustment for today + expected += today + actual = rh.computeRollover(today) + if actual != expected: + print('failed in timezone: %d' % time.timezone) + print('local vars: %s' % locals()) + self.assertEqual(actual, expected) + if day == wday: + # goes into following week + expected += 7 * 24 * 60 * 60 + actual = rh.computeRollover(today + 13 * 60 * 60) + if actual != expected: + print('failed in timezone: %d' % time.timezone) + print('local vars: %s' % locals()) + self.assertEqual(actual, expected) + finally: + rh.close() + + def secs(**kw): return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) @@ -3872,18 +4094,25 @@ for when, exp in (('S', 1), setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) -@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil required for this test.') +@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.') class NTEventLogHandlerTest(BaseTest): def test_basic(self): logtype = 'Application' elh = win32evtlog.OpenEventLog(None, logtype) num_recs = win32evtlog.GetNumberOfEventLogRecords(elh) - h = logging.handlers.NTEventLogHandler('test_logging') + + try: + h = logging.handlers.NTEventLogHandler('test_logging') + except pywintypes.error as e: + if e.winerror == 5: # access denied + raise unittest.SkipTest('Insufficient privileges to run test') + raise + r = logging.makeLogRecord({'msg': 'Test Log Message'}) h.handle(r) h.close() # Now see if the event is recorded - self.assertTrue(num_recs < win32evtlog.GetNumberOfEventLogRecords(elh)) + self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh)) flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ win32evtlog.EVENTLOG_SEQUENTIAL_READ found = False @@ -3916,7 +4145,8 @@ def test_main(): SMTPHandlerTest, FileHandlerTest, RotatingFileHandlerTest, LastResortTest, LogRecordTest, ExceptionTest, SysLogHandlerTest, HTTPHandlerTest, NTEventLogHandlerTest, - TimedRotatingFileHandlerTest + TimedRotatingFileHandlerTest, UnixSocketHandlerTest, + UnixDatagramHandlerTest, UnixSysLogHandlerTest ) if __name__ == "__main__": |
