summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_logging.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r--Lib/test/test_logging.py576
1 files changed, 403 insertions, 173 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index f34172a..b17f5e5 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -24,6 +24,7 @@ import logging.handlers
import logging.config
import codecs
+import configparser
import datetime
import pickle
import io
@@ -38,8 +39,10 @@ import socket
import struct
import sys
import tempfile
+from test.script_helper import assert_python_ok
from test.support import (captured_stdout, run_with_locale, run_unittest,
- patch, requires_zlib, TestHandler, Matcher)
+ patch, requires_zlib, TestHandler, Matcher, HOST,
+ swap_attr)
import textwrap
import time
import unittest
@@ -56,30 +59,27 @@ try:
import smtpd
from urllib.parse import urlparse, parse_qs
from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
- ThreadingTCPServer, StreamRequestHandler)
+ ThreadingTCPServer, StreamRequestHandler,
+ ThreadingUnixStreamServer,
+ ThreadingUnixDatagramServer)
except ImportError:
threading = None
try:
- import win32evtlog
+ import win32evtlog, win32evtlogutil, pywintypes
except ImportError:
- win32evtlog = None
-try:
- import win32evtlogutil
-except ImportError:
- win32evtlogutil = None
- win32evtlog = None
+ win32evtlog = win32evtlogutil = pywintypes = None
+
try:
import zlib
except ImportError:
pass
-
class BaseTest(unittest.TestCase):
"""Base class for logging tests."""
log_format = "%(name)s -> %(levelname)s: %(message)s"
- expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
+ expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$"
message_num = 0
def setUp(self):
@@ -91,7 +91,8 @@ class BaseTest(unittest.TestCase):
self.saved_handlers = logging._handlers.copy()
self.saved_handler_list = logging._handlerList[:]
self.saved_loggers = saved_loggers = logger_dict.copy()
- self.saved_level_names = logging._levelNames.copy()
+ self.saved_name_to_level = logging._nameToLevel.copy()
+ self.saved_level_to_name = logging._levelToName.copy()
self.logger_states = logger_states = {}
for name in saved_loggers:
logger_states[name] = getattr(saved_loggers[name],
@@ -133,8 +134,10 @@ class BaseTest(unittest.TestCase):
self.root_logger.setLevel(self.original_logging_level)
logging._acquireLock()
try:
- logging._levelNames.clear()
- logging._levelNames.update(self.saved_level_names)
+ logging._levelToName.clear()
+ logging._levelToName.update(self.saved_level_to_name)
+ logging._nameToLevel.clear()
+ logging._nameToLevel.update(self.saved_name_to_level)
logging._handlers.clear()
logging._handlers.update(self.saved_handlers)
logging._handlerList[:] = self.saved_handler_list
@@ -148,12 +151,12 @@ class BaseTest(unittest.TestCase):
finally:
logging._releaseLock()
- def assert_log_lines(self, expected_values, stream=None):
+ def assert_log_lines(self, expected_values, stream=None, pat=None):
"""Match the collected log lines against the regular expression
self.expected_log_pat, and compare the extracted group values to
the expected_values list of tuples."""
stream = stream or self.stream
- pat = re.compile(self.expected_log_pat)
+ pat = re.compile(pat or self.expected_log_pat)
actual_lines = stream.getvalue().splitlines()
self.assertEqual(len(actual_lines), len(expected_values))
for actual, expected in zip(actual_lines, expected_values):
@@ -307,6 +310,10 @@ class BuiltinLevelsTest(BaseTest):
('INF.BADPARENT', 'INFO', '4'),
])
+ def test_regression_22386(self):
+ """See issue #22386 for more information."""
+ self.assertEqual(logging.getLevelName('INFO'), logging.INFO)
+ self.assertEqual(logging.getLevelName(logging.INFO), 'INFO')
class BasicFilterTest(BaseTest):
@@ -428,7 +435,7 @@ class CustomLevelsAndFiltersTest(BaseTest):
"""Test various filtering possibilities with custom logging levels."""
# Skip the logger name group.
- expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+ expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
def setUp(self):
BaseTest.setUp(self)
@@ -558,7 +565,7 @@ class HandlerTest(BaseTest):
self.assertEqual(h.facility, h.LOG_USER)
self.assertTrue(h.unixsocket)
h.close()
- except socket.error: # syslogd might not be available
+ except OSError: # syslogd might not be available
pass
for method in ('GET', 'POST', 'PUT'):
if method == 'PUT':
@@ -583,6 +590,7 @@ class HandlerTest(BaseTest):
for _ in range(tries):
try:
os.unlink(fname)
+ self.deletion_time = time.time()
except OSError:
pass
time.sleep(0.004 * random.randint(0, 4))
@@ -590,6 +598,9 @@ class HandlerTest(BaseTest):
del_count = 500
log_count = 500
+ self.handle_time = None
+ self.deletion_time = None
+
for delay in (False, True):
fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
os.close(fd)
@@ -603,7 +614,14 @@ class HandlerTest(BaseTest):
for _ in range(log_count):
time.sleep(0.005)
r = logging.makeLogRecord({'msg': 'testing' })
- h.handle(r)
+ try:
+ self.handle_time = time.time()
+ h.handle(r)
+ except Exception:
+ print('Deleted at %s, '
+ 'opened at %s' % (self.deletion_time,
+ self.handle_time))
+ raise
finally:
remover.join()
h.close()
@@ -645,41 +663,6 @@ class StreamHandlerTest(BaseTest):
# -- if it proves to be of wider utility than just test_logging
if threading:
- class TestSMTPChannel(smtpd.SMTPChannel):
- """
- This derived class has had to be created because smtpd does not
- support use of custom channel maps, although they are allowed by
- asyncore's design. Issue #11959 has been raised to address this,
- and if resolved satisfactorily, some of this code can be removed.
- """
- def __init__(self, server, conn, addr, sockmap):
- asynchat.async_chat.__init__(self, conn, sockmap)
- self.smtp_server = server
- self.conn = conn
- self.addr = addr
- self.data_size_limit = None
- self.received_lines = []
- self.smtp_state = self.COMMAND
- self.seen_greeting = ''
- self.mailfrom = None
- self.rcpttos = []
- self.received_data = ''
- self.fqdn = socket.getfqdn()
- self.num_bytes = 0
- try:
- self.peer = conn.getpeername()
- except socket.error as err:
- # a race condition may occur if the other end is closing
- # before we can get the peername
- self.close()
- if err.args[0] != errno.ENOTCONN:
- raise
- return
- self.push('220 %s %s' % (self.fqdn, smtpd.__version__))
- self.set_terminator(b'\r\n')
- self.extended_smtp = False
-
-
class TestSMTPServer(smtpd.SMTPServer):
"""
This class implements a test SMTP server.
@@ -700,37 +683,14 @@ if threading:
:func:`asyncore.loop`. This avoids changing the
:mod:`asyncore` module's global state.
"""
- channel_class = TestSMTPChannel
def __init__(self, addr, handler, poll_interval, sockmap):
- self._localaddr = addr
- self._remoteaddr = None
- self.data_size_limit = None
- self.sockmap = sockmap
- asyncore.dispatcher.__init__(self, map=sockmap)
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setblocking(0)
- self.set_socket(sock, map=sockmap)
- # try to re-use a server port if possible
- self.set_reuse_addr()
- self.bind(addr)
- self.port = sock.getsockname()[1]
- self.listen(5)
- except:
- self.close()
- raise
+ smtpd.SMTPServer.__init__(self, addr, None, map=sockmap)
+ self.port = self.socket.getsockname()[1]
self._handler = handler
self._thread = None
self.poll_interval = poll_interval
- def handle_accepted(self, conn, addr):
- """
- Redefined only because the base class does not pass in a
- map, forcing use of a global in :mod:`asyncore`.
- """
- channel = self.channel_class(self, conn, addr, self.sockmap)
-
def process_message(self, peer, mailfrom, rcpttos, data):
"""
Delegates to the handler passed in to the server's constructor.
@@ -761,8 +721,8 @@ if threading:
:func:`asyncore.loop`.
"""
try:
- asyncore.loop(poll_interval, map=self.sockmap)
- except select.error:
+ asyncore.loop(poll_interval, map=self._map)
+ except OSError:
# On FreeBSD 8, closing the server repeatably
# raises this error. We swallow it if the
# server has been closed.
@@ -869,7 +829,7 @@ if threading:
sock, addr = self.socket.accept()
if self.sslctx:
sock = self.sslctx.wrap_socket(sock, server_side=True)
- except socket.error as e:
+ except OSError as e:
# socket errors are silenced by the caller, print them here
sys.stderr.write("Got an error:\n%s\n" % e)
raise
@@ -935,7 +895,7 @@ if threading:
if data:
try:
super(DelegatingUDPRequestHandler, self).finish()
- except socket.error:
+ except OSError:
if not self.server._closed:
raise
@@ -953,6 +913,13 @@ if threading:
super(TestUDPServer, self).server_close()
self._closed = True
+ if hasattr(socket, "AF_UNIX"):
+ class TestUnixStreamServer(TestTCPServer):
+ address_family = socket.AF_UNIX
+
+ class TestUnixDatagramServer(TestUDPServer):
+ address_family = socket.AF_UNIX
+
# - end of server_helper section
@unittest.skipUnless(threading, 'Threading required for this test.')
@@ -960,15 +927,15 @@ class SMTPHandlerTest(BaseTest):
TIMEOUT = 8.0
def test_basic(self):
sockmap = {}
- server = TestSMTPServer(('localhost', 0), self.process_message, 0.001,
+ server = TestSMTPServer((HOST, 0), self.process_message, 0.001,
sockmap)
server.start()
- addr = ('localhost', server.port)
+ addr = (HOST, server.port)
h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
timeout=self.TIMEOUT)
self.assertEqual(h.toaddrs, ['you'])
self.messages = []
- r = logging.makeLogRecord({'msg': 'Hello'})
+ r = logging.makeLogRecord({'msg': 'Hello \u2713'})
self.handled = threading.Event()
h.handle(r)
self.handled.wait(self.TIMEOUT) # 14314: don't wait forever
@@ -979,7 +946,7 @@ class SMTPHandlerTest(BaseTest):
self.assertEqual(mailfrom, 'me')
self.assertEqual(rcpttos, ['you'])
self.assertIn('\nSubject: Log\n', data)
- self.assertTrue(data.endswith('\n\nHello'))
+ self.assertTrue(data.endswith('\n\nHello \u2713'))
h.close()
def process_message(self, *args):
@@ -991,7 +958,7 @@ class MemoryHandlerTest(BaseTest):
"""Tests for the MemoryHandler."""
# Do not bother with a logger name group.
- expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+ expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
def setUp(self):
BaseTest.setUp(self)
@@ -1044,7 +1011,7 @@ class ConfigFileTest(BaseTest):
"""Reading logging config from a .ini-style config file."""
- expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
+ expected_log_pat = r"^(\w+) \+\+ (\w+)$"
# config0 is a standard configuration.
config0 = """
@@ -1292,6 +1259,24 @@ class ConfigFileTest(BaseTest):
# Original logger output is empty.
self.assert_log_lines([])
+ def test_config0_using_cp_ok(self):
+ # A simple config file which overrides the default settings.
+ with captured_stdout() as output:
+ file = io.StringIO(textwrap.dedent(self.config0))
+ cp = configparser.ConfigParser()
+ cp.read_file(file)
+ logging.config.fileConfig(cp)
+ logger = logging.getLogger()
+ # Won't output anything
+ logger.info(self.next_message())
+ # Outputs a message
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('ERROR', '2'),
+ ], stream=output)
+ # Original logger output is empty.
+ self.assert_log_lines([])
+
def test_config1_ok(self, config=config1):
# A config file defining a sub-parser as well.
with captured_stdout() as output:
@@ -1381,7 +1366,7 @@ class ConfigFileTest(BaseTest):
def test_logger_disabling(self):
self.apply_config(self.disable_test)
- logger = logging.getLogger('foo')
+ logger = logging.getLogger('some_pristine_logger')
self.assertFalse(logger.disabled)
self.apply_config(self.disable_test)
self.assertTrue(logger.disabled)
@@ -1394,17 +1379,23 @@ class SocketHandlerTest(BaseTest):
"""Test for SocketHandler objects."""
+ if threading:
+ server_class = TestTCPServer
+ address = ('localhost', 0)
+
def setUp(self):
"""Set up a TCP server to receive log messages, and a SocketHandler
pointing to that server's address and port."""
BaseTest.setUp(self)
- addr = ('localhost', 0)
- self.server = server = TestTCPServer(addr, self.handle_socket,
- 0.01)
+ self.server = server = self.server_class(self.address,
+ self.handle_socket, 0.01)
server.start()
server.ready.wait()
- self.sock_hdlr = logging.handlers.SocketHandler('localhost',
- server.port)
+ hcls = logging.handlers.SocketHandler
+ if isinstance(server.server_address, tuple):
+ self.sock_hdlr = hcls('localhost', server.port)
+ else:
+ self.sock_hdlr = hcls(server.server_address, None)
self.log_output = ''
self.root_logger.removeHandler(self.root_logger.handlers[0])
self.root_logger.addHandler(self.sock_hdlr)
@@ -1444,35 +1435,70 @@ class SocketHandlerTest(BaseTest):
self.assertEqual(self.log_output, "spam\neggs\n")
def test_noserver(self):
+ # Avoid timing-related failures due to SocketHandler's own hard-wired
+ # one-second timeout on socket.create_connection() (issue #16264).
+ self.sock_hdlr.retryStart = 2.5
# Kill the server
self.server.stop(2.0)
- #The logging call should try to connect, which should fail
+ # The logging call should try to connect, which should fail
try:
raise RuntimeError('Deliberate mistake')
except RuntimeError:
self.root_logger.exception('Never sent')
self.root_logger.error('Never sent, either')
now = time.time()
- self.assertTrue(self.sock_hdlr.retryTime > now)
+ self.assertGreater(self.sock_hdlr.retryTime, now)
time.sleep(self.sock_hdlr.retryTime - now + 0.001)
self.root_logger.error('Nor this')
+def _get_temp_domain_socket():
+ fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock')
+ os.close(fd)
+ # just need a name - file can't be present, or we'll get an
+ # 'address already in use' error.
+ os.remove(fn)
+ return fn
+
+@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class UnixSocketHandlerTest(SocketHandlerTest):
+
+ """Test for SocketHandler with unix sockets."""
+
+ if threading and hasattr(socket, "AF_UNIX"):
+ server_class = TestUnixStreamServer
+
+ def setUp(self):
+ # override the definition in the base class
+ self.address = _get_temp_domain_socket()
+ SocketHandlerTest.setUp(self)
+
+ def tearDown(self):
+ SocketHandlerTest.tearDown(self)
+ os.remove(self.address)
@unittest.skipUnless(threading, 'Threading required for this test.')
class DatagramHandlerTest(BaseTest):
"""Test for DatagramHandler."""
+ if threading:
+ server_class = TestUDPServer
+ address = ('localhost', 0)
+
def setUp(self):
"""Set up a UDP server to receive log messages, and a DatagramHandler
pointing to that server's address and port."""
BaseTest.setUp(self)
- addr = ('localhost', 0)
- self.server = server = TestUDPServer(addr, self.handle_datagram, 0.01)
+ self.server = server = self.server_class(self.address,
+ self.handle_datagram, 0.01)
server.start()
server.ready.wait()
- self.sock_hdlr = logging.handlers.DatagramHandler('localhost',
- server.port)
+ hcls = logging.handlers.DatagramHandler
+ if isinstance(server.server_address, tuple):
+ self.sock_hdlr = hcls('localhost', server.port)
+ else:
+ self.sock_hdlr = hcls(server.server_address, None)
self.log_output = ''
self.root_logger.removeHandler(self.root_logger.handlers[0])
self.root_logger.addHandler(self.sock_hdlr)
@@ -1505,23 +1531,46 @@ class DatagramHandlerTest(BaseTest):
self.handled.wait()
self.assertEqual(self.log_output, "spam\neggs\n")
+@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class UnixDatagramHandlerTest(DatagramHandlerTest):
+
+ """Test for DatagramHandler using Unix sockets."""
+
+ if threading and hasattr(socket, "AF_UNIX"):
+ server_class = TestUnixDatagramServer
+
+ def setUp(self):
+ # override the definition in the base class
+ self.address = _get_temp_domain_socket()
+ DatagramHandlerTest.setUp(self)
+
+ def tearDown(self):
+ DatagramHandlerTest.tearDown(self)
+ os.remove(self.address)
@unittest.skipUnless(threading, 'Threading required for this test.')
class SysLogHandlerTest(BaseTest):
"""Test for SysLogHandler using UDP."""
+ if threading:
+ server_class = TestUDPServer
+ address = ('localhost', 0)
+
def setUp(self):
"""Set up a UDP server to receive log messages, and a SysLogHandler
pointing to that server's address and port."""
BaseTest.setUp(self)
- addr = ('localhost', 0)
- self.server = server = TestUDPServer(addr, self.handle_datagram,
- 0.01)
+ self.server = server = self.server_class(self.address,
+ self.handle_datagram, 0.01)
server.start()
server.ready.wait()
- self.sl_hdlr = logging.handlers.SysLogHandler(('localhost',
- server.port))
+ hcls = logging.handlers.SysLogHandler
+ if isinstance(server.server_address, tuple):
+ self.sl_hdlr = hcls(('localhost', server.port))
+ else:
+ self.sl_hdlr = hcls(server.server_address)
self.log_output = ''
self.root_logger.removeHandler(self.root_logger.handlers[0])
self.root_logger.addHandler(self.sl_hdlr)
@@ -1557,41 +1606,28 @@ class SysLogHandlerTest(BaseTest):
self.handled.wait()
self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m')
+@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class UnixSysLogHandlerTest(SysLogHandlerTest):
+
+ """Test for SysLogHandler with Unix sockets."""
+
+ if threading and hasattr(socket, "AF_UNIX"):
+ server_class = TestUnixDatagramServer
+
+ def setUp(self):
+ # override the definition in the base class
+ self.address = _get_temp_domain_socket()
+ SysLogHandlerTest.setUp(self)
+
+ def tearDown(self):
+ SysLogHandlerTest.tearDown(self)
+ os.remove(self.address)
@unittest.skipUnless(threading, 'Threading required for this test.')
class HTTPHandlerTest(BaseTest):
"""Test for HTTPHandler."""
- PEMFILE = """-----BEGIN RSA PRIVATE KEY-----
-MIICXQIBAAKBgQDGT4xS5r91rbLJQK2nUDenBhBG6qFk+bVOjuAGC/LSHlAoBnvG
-zQG3agOG+e7c5z2XT8m2ktORLqG3E4mYmbxgyhDrzP6ei2Anc+pszmnxPoK3Puh5
-aXV+XKt0bU0C1m2+ACmGGJ0t3P408art82nOxBw8ZHgIg9Dtp6xIUCyOqwIDAQAB
-AoGBAJFTnFboaKh5eUrIzjmNrKsG44jEyy+vWvHN/FgSC4l103HxhmWiuL5Lv3f7
-0tMp1tX7D6xvHwIG9VWvyKb/Cq9rJsDibmDVIOslnOWeQhG+XwJyitR0pq/KlJIB
-5LjORcBw795oKWOAi6RcOb1ON59tysEFYhAGQO9k6VL621gRAkEA/Gb+YXULLpbs
-piXN3q4zcHzeaVANo69tUZ6TjaQqMeTxE4tOYM0G0ZoSeHEdaP59AOZGKXXNGSQy
-2z/MddcYGQJBAMkjLSYIpOLJY11ja8OwwswFG2hEzHe0cS9bzo++R/jc1bHA5R0Y
-i6vA5iPi+wopPFvpytdBol7UuEBe5xZrxWMCQQCWxELRHiP2yWpEeLJ3gGDzoXMN
-PydWjhRju7Bx3AzkTtf+D6lawz1+eGTuEss5i0JKBkMEwvwnN2s1ce+EuF4JAkBb
-E96h1lAzkVW5OAfYOPY8RCPA90ZO/hoyg7PpSxR0ECuDrgERR8gXIeYUYfejBkEa
-rab4CfRoVJKKM28Yq/xZAkBvuq670JRCwOgfUTdww7WpdOQBYPkzQccsKNCslQW8
-/DyW6y06oQusSENUvynT6dr3LJxt/NgZPhZX2+k1eYDV
------END RSA PRIVATE KEY-----
------BEGIN CERTIFICATE-----
-MIICGzCCAYSgAwIBAgIJAIq84a2Q/OvlMA0GCSqGSIb3DQEBBQUAMBQxEjAQBgNV
-BAMTCWxvY2FsaG9zdDAeFw0xMTA1MjExMDIzMzNaFw03NTAzMjEwMzU1MTdaMBQx
-EjAQBgNVBAMTCWxvY2FsaG9zdDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA
-xk+MUua/da2yyUCtp1A3pwYQRuqhZPm1To7gBgvy0h5QKAZ7xs0Bt2oDhvnu3Oc9
-l0/JtpLTkS6htxOJmJm8YMoQ68z+notgJ3PqbM5p8T6Ctz7oeWl1flyrdG1NAtZt
-vgAphhidLdz+NPGq7fNpzsQcPGR4CIPQ7aesSFAsjqsCAwEAAaN1MHMwHQYDVR0O
-BBYEFLWaUPO6N7efGiuoS9i3DVYcUwn0MEQGA1UdIwQ9MDuAFLWaUPO6N7efGiuo
-S9i3DVYcUwn0oRikFjAUMRIwEAYDVQQDEwlsb2NhbGhvc3SCCQCKvOGtkPzr5TAM
-BgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4GBAMK5whPjLNQK1Ivvk88oqJqq
-4f889OwikGP0eUhOBhbFlsZs+jq5YZC2UzHz+evzKBlgAP1u4lP/cB85CnjvWqM+
-1c/lywFHQ6HOdDeQ1L72tSYMrNOG4XNmLn0h7rx6GoTU7dcFRfseahBCq8mv0IDt
-IRbTpvlHWPjsSvHz0ZOH
------END CERTIFICATE-----"""
-
def setUp(self):
"""Set up an HTTP server to receive log messages, and a HTTPHandler
pointing to that server's address and port."""
@@ -1621,17 +1657,18 @@ IRbTpvlHWPjsSvHz0ZOH
if secure:
try:
import ssl
- fd, fn = tempfile.mkstemp()
- os.close(fd)
- with open(fn, 'w') as f:
- f.write(self.PEMFILE)
- sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- sslctx.load_cert_chain(fn)
- os.unlink(fn)
except ImportError:
sslctx = None
+ else:
+ here = os.path.dirname(__file__)
+ localhost_cert = os.path.join(here, "keycert.pem")
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslctx.load_cert_chain(localhost_cert)
+
+ context = ssl.create_default_context(cafile=localhost_cert)
else:
sslctx = None
+ context = None
self.server = server = TestHTTPServer(addr, self.handle_request,
0.01, sslctx=sslctx)
server.start()
@@ -1639,7 +1676,8 @@ IRbTpvlHWPjsSvHz0ZOH
host = 'localhost:%d' % server.server_port
secure_client = secure and sslctx
self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob',
- secure=secure_client)
+ secure=secure_client,
+ context=context)
self.log_data = None
root_logger.addHandler(self.h_hdlr)
@@ -1779,7 +1817,7 @@ class WarningsTest(BaseTest):
logger.removeHandler(h)
s = stream.getvalue()
h.close()
- self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
+ self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0)
#See if an explicit file uses the original implementation
a_file = io.StringIO()
@@ -1817,7 +1855,7 @@ class ConfigDictTest(BaseTest):
"""Reading logging config from a dictionary."""
- expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
+ expected_log_pat = r"^(\w+) \+\+ (\w+)$"
# config0 is a standard configuration.
config0 = {
@@ -2389,6 +2427,32 @@ class ConfigDictTest(BaseTest):
},
}
+ # As config0, but with properties
+ config14 = {
+ 'version': 1,
+ 'formatters': {
+ 'form1' : {
+ 'format' : '%(levelname)s ++ %(message)s',
+ },
+ },
+ 'handlers' : {
+ 'hand1' : {
+ 'class' : 'logging.StreamHandler',
+ 'formatter' : 'form1',
+ 'level' : 'NOTSET',
+ 'stream' : 'ext://sys.stdout',
+ '.': {
+ 'foo': 'bar',
+ 'terminator': '!\n',
+ }
+ },
+ },
+ 'root' : {
+ 'level' : 'WARNING',
+ 'handlers' : ['hand1'],
+ },
+ }
+
out_of_order = {
"version": 1,
"formatters": {
@@ -2656,11 +2720,20 @@ class ConfigDictTest(BaseTest):
def test_config13_failure(self):
self.assertRaises(Exception, self.apply_config, self.config13)
+ def test_config14_ok(self):
+ with captured_stdout() as output:
+ self.apply_config(self.config14)
+ h = logging._handlers['hand1']
+ self.assertEqual(h.foo, 'bar')
+ self.assertEqual(h.terminator, '!\n')
+ logging.warning('Exclamation')
+ self.assertTrue(output.getvalue().endswith('Exclamation!\n'))
+
@unittest.skipUnless(threading, 'listen() needs threading to work')
- def setup_via_listener(self, text):
+ def setup_via_listener(self, text, verify=None):
text = text.encode("utf-8")
# Ask for a randomly assigned port (by using port 0)
- t = logging.config.listen(0)
+ t = logging.config.listen(0, verify)
t.start()
t.ready.wait()
# Now get the port allocated
@@ -2720,6 +2793,69 @@ class ConfigDictTest(BaseTest):
# Original logger output is empty.
self.assert_log_lines([])
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def test_listen_verify(self):
+
+ def verify_fail(stuff):
+ return None
+
+ def verify_reverse(stuff):
+ return stuff[::-1]
+
+ logger = logging.getLogger("compiler.parser")
+ to_send = textwrap.dedent(ConfigFileTest.config1)
+ # First, specify a verification function that will fail.
+ # We expect to see no output, since our configuration
+ # never took effect.
+ with captured_stdout() as output:
+ self.setup_via_listener(to_send, verify_fail)
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([], stream=output)
+ # Original logger output has the stuff we logged.
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
+
+ # Now, perform no verification. Our configuration
+ # should take effect.
+
+ with captured_stdout() as output:
+ self.setup_via_listener(to_send) # no verify callable specified
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '3'),
+ ('ERROR', '4'),
+ ], stream=output)
+ # Original logger output still has the stuff we logged before.
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
+
+ # Now, perform verification which transforms the bytes.
+
+ with captured_stdout() as output:
+ self.setup_via_listener(to_send[::-1], verify_reverse)
+ logger = logging.getLogger("compiler.parser")
+ # Both will output a message
+ logger.info(self.next_message())
+ logger.error(self.next_message())
+ self.assert_log_lines([
+ ('INFO', '5'),
+ ('ERROR', '6'),
+ ], stream=output)
+ # Original logger output still has the stuff we logged before.
+ self.assert_log_lines([
+ ('INFO', '1'),
+ ('ERROR', '2'),
+ ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
+
def test_out_of_order(self):
self.apply_config(self.out_of_order)
handler = logging.getLogger('mymodule').handlers[0]
@@ -2779,14 +2915,14 @@ class ChildLoggerTest(BaseTest):
l2 = logging.getLogger('def.ghi')
c1 = r.getChild('xyz')
c2 = r.getChild('uvw.xyz')
- self.assertTrue(c1 is logging.getLogger('xyz'))
- self.assertTrue(c2 is logging.getLogger('uvw.xyz'))
+ self.assertIs(c1, logging.getLogger('xyz'))
+ self.assertIs(c2, logging.getLogger('uvw.xyz'))
c1 = l1.getChild('def')
c2 = c1.getChild('ghi')
c3 = l1.getChild('def.ghi')
- self.assertTrue(c1 is logging.getLogger('abc.def'))
- self.assertTrue(c2 is logging.getLogger('abc.def.ghi'))
- self.assertTrue(c2 is c3)
+ self.assertIs(c1, logging.getLogger('abc.def'))
+ self.assertIs(c2, logging.getLogger('abc.def.ghi'))
+ self.assertIs(c2, c3)
class DerivedLogRecord(logging.LogRecord):
@@ -2829,7 +2965,7 @@ class LogRecordFactoryTest(BaseTest):
class QueueHandlerTest(BaseTest):
# Do not bother with a logger name group.
- expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
+ expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
def setUp(self):
BaseTest.setUp(self)
@@ -3123,13 +3259,13 @@ class ShutdownTest(BaseTest):
self.assertEqual('0 - release', self.called[-1])
def test_with_ioerror_in_acquire(self):
- self._test_with_failure_in_method('acquire', IOError)
+ self._test_with_failure_in_method('acquire', OSError)
def test_with_ioerror_in_flush(self):
- self._test_with_failure_in_method('flush', IOError)
+ self._test_with_failure_in_method('flush', OSError)
def test_with_ioerror_in_close(self):
- self._test_with_failure_in_method('close', IOError)
+ self._test_with_failure_in_method('close', OSError)
def test_with_valueerror_in_acquire(self):
self._test_with_failure_in_method('acquire', ValueError)
@@ -3235,6 +3371,25 @@ class ModuleLevelMiscTest(BaseTest):
logging.setLoggerClass(logging.Logger)
self.assertEqual(logging.getLoggerClass(), logging.Logger)
+ def test_logging_at_shutdown(self):
+ # Issue #20037
+ code = """if 1:
+ import logging
+
+ class A:
+ def __del__(self):
+ try:
+ raise ValueError("some error")
+ except Exception:
+ logging.exception("exception in __del__")
+
+ a = A()"""
+ rc, out, err = assert_python_ok("-c", code)
+ err = err.decode()
+ self.assertIn("exception in __del__", err)
+ self.assertIn("ValueError: some error", err)
+
+
class LogRecordTest(BaseTest):
def test_str_rep(self):
r = logging.makeLogRecord({})
@@ -3352,6 +3507,12 @@ class BasicConfigTest(unittest.TestCase):
"ERROR:root:Log an error")
def test_filename(self):
+
+ def cleanup(h1, h2, fn):
+ h1.close()
+ h2.close()
+ os.remove(fn)
+
logging.basicConfig(filename='test.log')
self.assertEqual(len(logging.root.handlers), 1)
@@ -3359,17 +3520,23 @@ class BasicConfigTest(unittest.TestCase):
self.assertIsInstance(handler, logging.FileHandler)
expected = logging.FileHandler('test.log', 'a')
- self.addCleanup(expected.close)
self.assertEqual(handler.stream.mode, expected.stream.mode)
self.assertEqual(handler.stream.name, expected.stream.name)
+ self.addCleanup(cleanup, handler, expected, 'test.log')
def test_filemode(self):
+
+ def cleanup(h1, h2, fn):
+ h1.close()
+ h2.close()
+ os.remove(fn)
+
logging.basicConfig(filename='test.log', filemode='wb')
handler = logging.root.handlers[0]
expected = logging.FileHandler('test.log', 'wb')
- self.addCleanup(expected.close)
self.assertEqual(handler.stream.mode, expected.stream.mode)
+ self.addCleanup(cleanup, handler, expected, 'test.log')
def test_stream(self):
stream = io.StringIO()
@@ -3419,6 +3586,10 @@ class BasicConfigTest(unittest.TestCase):
handlers=handlers)
assertRaises(ValueError, logging.basicConfig, stream=stream,
handlers=handlers)
+ # Issue 23207: test for invalid kwargs
+ assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO)
+ # Should pop both filename and filemode even if filename is None
+ logging.basicConfig(filename=None, filemode='a')
def test_handlers(self):
handlers = [
@@ -3574,18 +3745,12 @@ class LoggerTest(BaseTest):
(exc.__class__, exc, exc.__traceback__))
def test_log_invalid_level_with_raise(self):
- old_raise = logging.raiseExceptions
- self.addCleanup(setattr, logging, 'raiseExecptions', old_raise)
-
- logging.raiseExceptions = True
- self.assertRaises(TypeError, self.logger.log, '10', 'test message')
+ with swap_attr(logging, 'raiseExceptions', True):
+ self.assertRaises(TypeError, self.logger.log, '10', 'test message')
def test_log_invalid_level_no_raise(self):
- old_raise = logging.raiseExceptions
- self.addCleanup(setattr, logging, 'raiseExecptions', old_raise)
-
- logging.raiseExceptions = False
- self.logger.log('10', 'test message') # no exception happens
+ with swap_attr(logging, 'raiseExceptions', False):
+ self.logger.log('10', 'test message') # no exception happens
def test_find_caller_with_stack_info(self):
called = []
@@ -3825,6 +3990,63 @@ class TimedRotatingFileHandlerTest(BaseFileTest):
assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
self.fn, 'W7', delay=True)
+ def test_compute_rollover_daily_attime(self):
+ currentTime = 0
+ atTime = datetime.time(12, 0, 0)
+ rh = logging.handlers.TimedRotatingFileHandler(
+ self.fn, when='MIDNIGHT', interval=1, backupCount=0, utc=True,
+ atTime=atTime)
+ try:
+ actual = rh.computeRollover(currentTime)
+ self.assertEqual(actual, currentTime + 12 * 60 * 60)
+
+ actual = rh.computeRollover(currentTime + 13 * 60 * 60)
+ self.assertEqual(actual, currentTime + 36 * 60 * 60)
+ finally:
+ rh.close()
+
+ #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.')
+ def test_compute_rollover_weekly_attime(self):
+ currentTime = int(time.time())
+ today = currentTime - currentTime % 86400
+
+ atTime = datetime.time(12, 0, 0)
+
+ wday = time.gmtime(today).tm_wday
+ for day in range(7):
+ rh = logging.handlers.TimedRotatingFileHandler(
+ self.fn, when='W%d' % day, interval=1, backupCount=0, utc=True,
+ atTime=atTime)
+ try:
+ if wday > day:
+ # The rollover day has already passed this week, so we
+ # go over into next week
+ expected = (7 - wday + day)
+ else:
+ expected = (day - wday)
+ # At this point expected is in days from now, convert to seconds
+ expected *= 24 * 60 * 60
+ # Add in the rollover time
+ expected += 12 * 60 * 60
+ # Add in adjustment for today
+ expected += today
+ actual = rh.computeRollover(today)
+ if actual != expected:
+ print('failed in timezone: %d' % time.timezone)
+ print('local vars: %s' % locals())
+ self.assertEqual(actual, expected)
+ if day == wday:
+ # goes into following week
+ expected += 7 * 24 * 60 * 60
+ actual = rh.computeRollover(today + 13 * 60 * 60)
+ if actual != expected:
+ print('failed in timezone: %d' % time.timezone)
+ print('local vars: %s' % locals())
+ self.assertEqual(actual, expected)
+ finally:
+ rh.close()
+
+
def secs(**kw):
return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
@@ -3872,18 +4094,25 @@ for when, exp in (('S', 1),
setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover)
-@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil required for this test.')
+@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.')
class NTEventLogHandlerTest(BaseTest):
def test_basic(self):
logtype = 'Application'
elh = win32evtlog.OpenEventLog(None, logtype)
num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
- h = logging.handlers.NTEventLogHandler('test_logging')
+
+ try:
+ h = logging.handlers.NTEventLogHandler('test_logging')
+ except pywintypes.error as e:
+ if e.winerror == 5: # access denied
+ raise unittest.SkipTest('Insufficient privileges to run test')
+ raise
+
r = logging.makeLogRecord({'msg': 'Test Log Message'})
h.handle(r)
h.close()
# Now see if the event is recorded
- self.assertTrue(num_recs < win32evtlog.GetNumberOfEventLogRecords(elh))
+ self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh))
flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
win32evtlog.EVENTLOG_SEQUENTIAL_READ
found = False
@@ -3916,7 +4145,8 @@ def test_main():
SMTPHandlerTest, FileHandlerTest, RotatingFileHandlerTest,
LastResortTest, LogRecordTest, ExceptionTest,
SysLogHandlerTest, HTTPHandlerTest, NTEventLogHandlerTest,
- TimedRotatingFileHandlerTest
+ TimedRotatingFileHandlerTest, UnixSocketHandlerTest,
+ UnixDatagramHandlerTest, UnixSysLogHandlerTest
)
if __name__ == "__main__":