summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_logging.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r--Lib/test/test_logging.py1696
1 files changed, 1569 insertions, 127 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index ab66596..e71a0fe 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -36,20 +36,40 @@ import queue
import re
import select
import socket
-from socketserver import ThreadingTCPServer, StreamRequestHandler
import struct
import sys
import tempfile
-from test.support import captured_stdout, run_with_locale, run_unittest
+from test.support import captured_stdout, run_with_locale, run_unittest, patch
from test.support import TestHandler, Matcher
import textwrap
+import time
import unittest
import warnings
import weakref
+import zlib
try:
import threading
+ # The following imports are needed only for tests which
+ # require threading
+ import asynchat
+ import asyncore
+ import errno
+ from http.server import HTTPServer, BaseHTTPRequestHandler
+ import smtpd
+ from urllib.parse import urlparse, parse_qs
+ from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
+ ThreadingTCPServer, StreamRequestHandler)
except ImportError:
threading = None
+try:
+ import win32evtlog
+except ImportError:
+ win32evtlog = None
+try:
+ import win32evtlogutil
+except ImportError:
+ win32evtlogutil = None
+ win32evtlog = None
class BaseTest(unittest.TestCase):
@@ -77,9 +97,7 @@ class BaseTest(unittest.TestCase):
finally:
logging._releaseLock()
- # Set two unused loggers: one non-ASCII and one Unicode.
- # This is to test correct operation when sorting existing
- # loggers in the configuration code. See issue 8201.
+ # Set two unused loggers
self.logger1 = logging.getLogger("\xab\xd7\xbb")
self.logger2 = logging.getLogger("\u013f\u00d6\u0047")
@@ -140,8 +158,7 @@ class BaseTest(unittest.TestCase):
except AttributeError:
# StringIO.StringIO lacks a reset() method.
actual_lines = stream.getvalue().splitlines()
- self.assertEqual(len(actual_lines), len(expected_values),
- '%s vs. %s' % (actual_lines, expected_values))
+ self.assertEqual(len(actual_lines), len(expected_values))
for actual, expected in zip(actual_lines, expected_values):
match = pat.search(actual)
if not match:
@@ -179,17 +196,17 @@ class BuiltinLevelsTest(BaseTest):
INF.log(logging.CRITICAL, m())
INF.error(m())
- INF.warn(m())
+ INF.warning(m())
INF.info(m())
DEB.log(logging.CRITICAL, m())
DEB.error(m())
- DEB.warn (m())
- DEB.info (m())
+ DEB.warning(m())
+ DEB.info(m())
DEB.debug(m())
# These should not log.
- ERR.warn(m())
+ ERR.warning(m())
ERR.info(m())
ERR.debug(m())
@@ -223,7 +240,7 @@ class BuiltinLevelsTest(BaseTest):
INF_ERR.error(m())
# These should not log.
- INF_ERR.warn(m())
+ INF_ERR.warning(m())
INF_ERR.info(m())
INF_ERR.debug(m())
@@ -247,14 +264,14 @@ class BuiltinLevelsTest(BaseTest):
# These should log.
INF_UNDEF.log(logging.CRITICAL, m())
INF_UNDEF.error(m())
- INF_UNDEF.warn(m())
+ INF_UNDEF.warning(m())
INF_UNDEF.info(m())
INF_ERR_UNDEF.log(logging.CRITICAL, m())
INF_ERR_UNDEF.error(m())
# These should not log.
INF_UNDEF.debug(m())
- INF_ERR_UNDEF.warn(m())
+ INF_ERR_UNDEF.warning(m())
INF_ERR_UNDEF.info(m())
INF_ERR_UNDEF.debug(m())
@@ -293,8 +310,6 @@ class BuiltinLevelsTest(BaseTest):
('INF.BADPARENT', 'INFO', '4'),
])
- def test_invalid_name(self):
- self.assertRaises(TypeError, logging.getLogger, any)
class BasicFilterTest(BaseTest):
@@ -353,6 +368,10 @@ class BasicFilterTest(BaseTest):
finally:
handler.removeFilter(filterfunc)
+ def test_empty_filter(self):
+ f = logging.Filter()
+ r = logging.makeLogRecord({'name': 'spam.eggs'})
+ self.assertTrue(f.filter(r))
#
# First, we define our levels. There can be as many as you want - the only
@@ -496,6 +515,438 @@ class CustomLevelsAndFiltersTest(BaseTest):
handler.removeFilter(garr)
+class HandlerTest(BaseTest):
+ def test_name(self):
+ h = logging.Handler()
+ h.name = 'generic'
+ self.assertEqual(h.name, 'generic')
+ h.name = 'anothergeneric'
+ self.assertEqual(h.name, 'anothergeneric')
+ self.assertRaises(NotImplementedError, h.emit, None)
+
+ def test_builtin_handlers(self):
+ # We can't actually *use* too many handlers in the tests,
+ # but we can try instantiating them with various options
+ if sys.platform in ('linux', 'darwin'):
+ for existing in (True, False):
+ fd, fn = tempfile.mkstemp()
+ os.close(fd)
+ if not existing:
+ os.unlink(fn)
+ h = logging.handlers.WatchedFileHandler(fn, delay=True)
+ if existing:
+ dev, ino = h.dev, h.ino
+ self.assertNotEqual(dev, -1)
+ self.assertNotEqual(ino, -1)
+ r = logging.makeLogRecord({'msg': 'Test'})
+ h.handle(r)
+ # Now remove the file.
+ os.unlink(fn)
+ self.assertFalse(os.path.exists(fn))
+ # The next call should recreate the file.
+ h.handle(r)
+ self.assertTrue(os.path.exists(fn))
+ else:
+ self.assertEqual(h.dev, -1)
+ self.assertEqual(h.ino, -1)
+ h.close()
+ if existing:
+ os.unlink(fn)
+ if sys.platform == 'darwin':
+ sockname = '/var/run/syslog'
+ else:
+ sockname = '/dev/log'
+ try:
+ h = logging.handlers.SysLogHandler(sockname)
+ self.assertEqual(h.facility, h.LOG_USER)
+ self.assertTrue(h.unixsocket)
+ h.close()
+ except socket.error: # syslogd might not be available
+ pass
+ for method in ('GET', 'POST', 'PUT'):
+ if method == 'PUT':
+ self.assertRaises(ValueError, logging.handlers.HTTPHandler,
+ 'localhost', '/log', method)
+ else:
+ h = logging.handlers.HTTPHandler('localhost', '/log', method)
+ h.close()
+ h = logging.handlers.BufferingHandler(0)
+ r = logging.makeLogRecord({})
+ self.assertTrue(h.shouldFlush(r))
+ h.close()
+ h = logging.handlers.BufferingHandler(1)
+ self.assertFalse(h.shouldFlush(r))
+ h.close()
+
+class BadStream(object):
+ def write(self, data):
+ raise RuntimeError('deliberate mistake')
+
+class TestStreamHandler(logging.StreamHandler):
+ def handleError(self, record):
+ self.error_record = record
+
+class StreamHandlerTest(BaseTest):
+ def test_error_handling(self):
+ h = TestStreamHandler(BadStream())
+ r = logging.makeLogRecord({})
+ old_raise = logging.raiseExceptions
+ old_stderr = sys.stderr
+ try:
+ h.handle(r)
+ self.assertIs(h.error_record, r)
+ h = logging.StreamHandler(BadStream())
+ sys.stderr = sio = io.StringIO()
+ h.handle(r)
+ self.assertIn('\nRuntimeError: deliberate mistake\n',
+ sio.getvalue())
+ logging.raiseExceptions = False
+ sys.stderr = sio = io.StringIO()
+ h.handle(r)
+ self.assertEqual('', sio.getvalue())
+ finally:
+ logging.raiseExceptions = old_raise
+ sys.stderr = old_stderr
+
+# -- The following section could be moved into a server_helper.py module
+# -- if it proves to be of wider utility than just test_logging
+
+if threading:
+ class TestSMTPChannel(smtpd.SMTPChannel):
+ """
+ This derived class has had to be created because smtpd does not
+ support use of custom channel maps, although they are allowed by
+ asyncore's design. Issue #11959 has been raised to address this,
+ and if resolved satisfactorily, some of this code can be removed.
+ """
+ def __init__(self, server, conn, addr, sockmap):
+ asynchat.async_chat.__init__(self, conn, sockmap)
+ self.smtp_server = server
+ self.conn = conn
+ self.addr = addr
+ self.received_lines = []
+ self.smtp_state = self.COMMAND
+ self.seen_greeting = ''
+ self.mailfrom = None
+ self.rcpttos = []
+ self.received_data = ''
+ self.fqdn = socket.getfqdn()
+ self.num_bytes = 0
+ try:
+ self.peer = conn.getpeername()
+ except socket.error as err:
+ # a race condition may occur if the other end is closing
+ # before we can get the peername
+ self.close()
+ if err.args[0] != errno.ENOTCONN:
+ raise
+ return
+ self.push('220 %s %s' % (self.fqdn, smtpd.__version__))
+ self.set_terminator(b'\r\n')
+
+
+ class TestSMTPServer(smtpd.SMTPServer):
+ """
+ This class implements a test SMTP server.
+
+ :param addr: A (host, port) tuple which the server listens on.
+ You can specify a port value of zero: the server's
+ *port* attribute will hold the actual port number
+ used, which can be used in client connections.
+ :param handler: A callable which will be called to process
+ incoming messages. The handler will be passed
+ the client address tuple, who the message is from,
+ a list of recipients and the message data.
+ :param poll_interval: The interval, in seconds, used in the underlying
+ :func:`select` or :func:`poll` call by
+ :func:`asyncore.loop`.
+ :param sockmap: A dictionary which will be used to hold
+ :class:`asyncore.dispatcher` instances used by
+ :func:`asyncore.loop`. This avoids changing the
+ :mod:`asyncore` module's global state.
+ """
+ channel_class = TestSMTPChannel
+
+ def __init__(self, addr, handler, poll_interval, sockmap):
+ self._localaddr = addr
+ self._remoteaddr = None
+ self.sockmap = sockmap
+ asyncore.dispatcher.__init__(self, map=sockmap)
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setblocking(0)
+ self.set_socket(sock, map=sockmap)
+ # try to re-use a server port if possible
+ self.set_reuse_addr()
+ self.bind(addr)
+ self.port = sock.getsockname()[1]
+ self.listen(5)
+ except:
+ self.close()
+ raise
+ self._handler = handler
+ self._thread = None
+ self.poll_interval = poll_interval
+
+ def handle_accepted(self, conn, addr):
+ """
+ Redefined only because the base class does not pass in a
+ map, forcing use of a global in :mod:`asyncore`.
+ """
+ channel = self.channel_class(self, conn, addr, self.sockmap)
+
+ def process_message(self, peer, mailfrom, rcpttos, data):
+ """
+ Delegates to the handler passed in to the server's constructor.
+
+ Typically, this will be a test case method.
+ :param peer: The client (host, port) tuple.
+ :param mailfrom: The address of the sender.
+ :param rcpttos: The addresses of the recipients.
+ :param data: The message.
+ """
+ self._handler(peer, mailfrom, rcpttos, data)
+
+ def start(self):
+ """
+ Start the server running on a separate daemon thread.
+ """
+ self._thread = t = threading.Thread(target=self.serve_forever,
+ args=(self.poll_interval,))
+ t.setDaemon(True)
+ t.start()
+
+ def serve_forever(self, poll_interval):
+ """
+ Run the :mod:`asyncore` loop until normal termination
+ conditions arise.
+ :param poll_interval: The interval, in seconds, used in the underlying
+ :func:`select` or :func:`poll` call by
+ :func:`asyncore.loop`.
+ """
+ try:
+ asyncore.loop(poll_interval, map=self.sockmap)
+ except select.error:
+ # On FreeBSD 8, closing the server repeatably
+ # raises this error. We swallow it if the
+ # server has been closed.
+ if self.connected or self.accepting:
+ raise
+
+ def stop(self, timeout=None):
+ """
+ Stop the thread by closing the server instance.
+ Wait for the server thread to terminate.
+
+ :param timeout: How long to wait for the server thread
+ to terminate.
+ """
+ self.close()
+ self._thread.join(timeout)
+ self._thread = None
+
+ class ControlMixin(object):
+ """
+ This mixin is used to start a server on a separate thread, and
+ shut it down programmatically. Request handling is simplified - instead
+ of needing to derive a suitable RequestHandler subclass, you just
+ provide a callable which will be passed each received request to be
+ processed.
+
+ :param handler: A handler callable which will be called with a
+ single parameter - the request - in order to
+ process the request. This handler is called on the
+ server thread, effectively meaning that requests are
+ processed serially. While not quite Web scale ;-),
+ this should be fine for testing applications.
+ :param poll_interval: The polling interval in seconds.
+ """
+ def __init__(self, handler, poll_interval):
+ self._thread = None
+ self.poll_interval = poll_interval
+ self._handler = handler
+ self.ready = threading.Event()
+
+ def start(self):
+ """
+ Create a daemon thread to run the server, and start it.
+ """
+ self._thread = t = threading.Thread(target=self.serve_forever,
+ args=(self.poll_interval,))
+ t.setDaemon(True)
+ t.start()
+
+ def serve_forever(self, poll_interval):
+ """
+ Run the server. Set the ready flag before entering the
+ service loop.
+ """
+ self.ready.set()
+ super(ControlMixin, self).serve_forever(poll_interval)
+
+ def stop(self, timeout=None):
+ """
+ Tell the server thread to stop, and wait for it to do so.
+
+ :param timeout: How long to wait for the server thread
+ to terminate.
+ """
+ self.shutdown()
+ if self._thread is not None:
+ self._thread.join(timeout)
+ self._thread = None
+ self.server_close()
+ self.ready.clear()
+
+ class TestHTTPServer(ControlMixin, HTTPServer):
+ """
+ An HTTP server which is controllable using :class:`ControlMixin`.
+
+ :param addr: A tuple with the IP address and port to listen on.
+ :param handler: A handler callable which will be called with a
+ single parameter - the request - in order to
+ process the request.
+ :param poll_interval: The polling interval in seconds.
+ :param log: Pass ``True`` to enable log messages.
+ """
+ def __init__(self, addr, handler, poll_interval=0.5,
+ log=False, sslctx=None):
+ class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
+ def __getattr__(self, name, default=None):
+ if name.startswith('do_'):
+ return self.process_request
+ raise AttributeError(name)
+
+ def process_request(self):
+ self.server._handler(self)
+
+ def log_message(self, format, *args):
+ if log:
+ super(DelegatingHTTPRequestHandler,
+ self).log_message(format, *args)
+ HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
+ ControlMixin.__init__(self, handler, poll_interval)
+ self.sslctx = sslctx
+
+ def get_request(self):
+ try:
+ sock, addr = self.socket.accept()
+ if self.sslctx:
+ sock = self.sslctx.wrap_socket(sock, server_side=True)
+ except socket.error as e:
+ # socket errors are silenced by the caller, print them here
+ sys.stderr.write("Got an error:\n%s\n" % e)
+ raise
+ return sock, addr
+
+ class TestTCPServer(ControlMixin, ThreadingTCPServer):
+ """
+ A TCP server which is controllable using :class:`ControlMixin`.
+
+ :param addr: A tuple with the IP address and port to listen on.
+ :param handler: A handler callable which will be called with a single
+ parameter - the request - in order to process the request.
+ :param poll_interval: The polling interval in seconds.
+ :bind_and_activate: If True (the default), binds the server and starts it
+ listening. If False, you need to call
+ :meth:`server_bind` and :meth:`server_activate` at
+ some later time before calling :meth:`start`, so that
+ the server will set up the socket and listen on it.
+ """
+
+ allow_reuse_address = True
+
+ def __init__(self, addr, handler, poll_interval=0.5,
+ bind_and_activate=True):
+ class DelegatingTCPRequestHandler(StreamRequestHandler):
+
+ def handle(self):
+ self.server._handler(self)
+ ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
+ bind_and_activate)
+ ControlMixin.__init__(self, handler, poll_interval)
+
+ def server_bind(self):
+ super(TestTCPServer, self).server_bind()
+ self.port = self.socket.getsockname()[1]
+
+ class TestUDPServer(ControlMixin, ThreadingUDPServer):
+ """
+ A UDP server which is controllable using :class:`ControlMixin`.
+
+ :param addr: A tuple with the IP address and port to listen on.
+ :param handler: A handler callable which will be called with a
+ single parameter - the request - in order to
+ process the request.
+ :param poll_interval: The polling interval for shutdown requests,
+ in seconds.
+ :bind_and_activate: If True (the default), binds the server and
+ starts it listening. If False, you need to
+ call :meth:`server_bind` and
+ :meth:`server_activate` at some later time
+ before calling :meth:`start`, so that the server will
+ set up the socket and listen on it.
+ """
+ def __init__(self, addr, handler, poll_interval=0.5,
+ bind_and_activate=True):
+ class DelegatingUDPRequestHandler(DatagramRequestHandler):
+
+ def handle(self):
+ self.server._handler(self)
+
+ def finish(self):
+ data = self.wfile.getvalue()
+ if data:
+ try:
+ super(DelegatingUDPRequestHandler, self).finish()
+ except socket.error:
+ if not self.server._closed:
+ raise
+
+ ThreadingUDPServer.__init__(self, addr,
+ DelegatingUDPRequestHandler,
+ bind_and_activate)
+ ControlMixin.__init__(self, handler, poll_interval)
+ self._closed = False
+
+ def server_bind(self):
+ super(TestUDPServer, self).server_bind()
+ self.port = self.socket.getsockname()[1]
+
+ def server_close(self):
+ super(TestUDPServer, self).server_close()
+ self._closed = True
+
+# - end of server_helper section
+
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class SMTPHandlerTest(BaseTest):
+ def test_basic(self):
+ sockmap = {}
+ server = TestSMTPServer(('localhost', 0), self.process_message, 0.001,
+ sockmap)
+ server.start()
+ addr = ('localhost', server.port)
+ h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log')
+ self.assertEqual(h.toaddrs, ['you'])
+ self.messages = []
+ r = logging.makeLogRecord({'msg': 'Hello'})
+ self.handled = threading.Event()
+ h.handle(r)
+ self.handled.wait()
+ server.stop()
+ self.assertEqual(len(self.messages), 1)
+ peer, mailfrom, rcpttos, data = self.messages[0]
+ self.assertEqual(mailfrom, 'me')
+ self.assertEqual(rcpttos, ['you'])
+ self.assertIn('\nSubject: Log\n', data)
+ self.assertTrue(data.endswith('\n\nHello'))
+ h.close()
+
+ def process_message(self, *args):
+ self.messages.append(args)
+ self.handled.set()
+
class MemoryHandlerTest(BaseTest):
"""Tests for the MemoryHandler."""
@@ -523,7 +974,7 @@ class MemoryHandlerTest(BaseTest):
self.mem_logger.info(self.next_message())
self.assert_log_lines([])
# This will flush because the level is >= logging.WARNING
- self.mem_logger.warn(self.next_message())
+ self.mem_logger.warning(self.next_message())
lines = [
('DEBUG', '1'),
('INFO', '2'),
@@ -868,116 +1319,280 @@ class ConfigFileTest(BaseTest):
# Original logger output is empty.
self.assert_log_lines([])
-class LogRecordStreamHandler(StreamRequestHandler):
- """Handler for a streaming logging request. It saves the log message in the
- TCP server's 'log_output' attribute."""
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class SocketHandlerTest(BaseTest):
- TCP_LOG_END = "!!!END!!!"
+ """Test for SocketHandler objects."""
- def handle(self):
- """Handle multiple requests - each expected to be of 4-byte length,
- followed by the LogRecord in pickle format. Logs the record
- according to whatever policy is configured locally."""
+ def setUp(self):
+ """Set up a TCP server to receive log messages, and a SocketHandler
+ pointing to that server's address and port."""
+ BaseTest.setUp(self)
+ addr = ('localhost', 0)
+ self.server = server = TestTCPServer(addr, self.handle_socket,
+ 0.01)
+ server.start()
+ server.ready.wait()
+ self.sock_hdlr = logging.handlers.SocketHandler('localhost',
+ server.port)
+ self.log_output = ''
+ self.root_logger.removeHandler(self.root_logger.handlers[0])
+ self.root_logger.addHandler(self.sock_hdlr)
+ self.handled = threading.Semaphore(0)
+
+ def tearDown(self):
+ """Shutdown the TCP server."""
+ try:
+ self.server.stop(2.0)
+ self.root_logger.removeHandler(self.sock_hdlr)
+ self.sock_hdlr.close()
+ finally:
+ BaseTest.tearDown(self)
+
+ def handle_socket(self, request):
+ conn = request.connection
while True:
- chunk = self.connection.recv(4)
+ chunk = conn.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack(">L", chunk)[0]
- chunk = self.connection.recv(slen)
+ chunk = conn.recv(slen)
while len(chunk) < slen:
- chunk = chunk + self.connection.recv(slen - len(chunk))
- obj = self.unpickle(chunk)
+ chunk = chunk + conn.recv(slen - len(chunk))
+ obj = pickle.loads(chunk)
record = logging.makeLogRecord(obj)
- self.handle_log_record(record)
+ self.log_output += record.msg + '\n'
+ self.handled.release()
- def unpickle(self, data):
- return pickle.loads(data)
+ def test_output(self):
+ # The log message sent to the SocketHandler is properly received.
+ logger = logging.getLogger("tcp")
+ logger.error("spam")
+ self.handled.acquire()
+ logger.debug("eggs")
+ self.handled.acquire()
+ self.assertEqual(self.log_output, "spam\neggs\n")
- def handle_log_record(self, record):
- # If the end-of-messages sentinel is seen, tell the server to
- # terminate.
- if self.TCP_LOG_END in record.msg:
- self.server.abort = 1
- return
- self.server.log_output += record.msg + "\n"
+ def test_noserver(self):
+ # Kill the server
+ self.server.stop(2.0)
+ #The logging call should try to connect, which should fail
+ try:
+ raise RuntimeError('Deliberate mistake')
+ except RuntimeError:
+ self.root_logger.exception('Never sent')
+ self.root_logger.error('Never sent, either')
+ now = time.time()
+ self.assertTrue(self.sock_hdlr.retryTime > now)
+ time.sleep(self.sock_hdlr.retryTime - now + 0.001)
+ self.root_logger.error('Nor this')
-class LogRecordSocketReceiver(ThreadingTCPServer):
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class DatagramHandlerTest(BaseTest):
- """A simple-minded TCP socket-based logging receiver suitable for test
- purposes."""
+ """Test for DatagramHandler."""
- allow_reuse_address = 1
- log_output = ""
+ def setUp(self):
+ """Set up a UDP server to receive log messages, and a DatagramHandler
+ pointing to that server's address and port."""
+ BaseTest.setUp(self)
+ addr = ('localhost', 0)
+ self.server = server = TestUDPServer(addr, self.handle_datagram, 0.01)
+ server.start()
+ server.ready.wait()
+ self.sock_hdlr = logging.handlers.DatagramHandler('localhost',
+ server.port)
+ self.log_output = ''
+ self.root_logger.removeHandler(self.root_logger.handlers[0])
+ self.root_logger.addHandler(self.sock_hdlr)
+ self.handled = threading.Event()
- def __init__(self, host='localhost',
- port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
- handler=LogRecordStreamHandler):
- ThreadingTCPServer.__init__(self, (host, port), handler)
- self.abort = False
- self.timeout = 0.1
- self.finished = threading.Event()
+ def tearDown(self):
+ """Shutdown the UDP server."""
+ try:
+ self.server.stop(2.0)
+ self.root_logger.removeHandler(self.sock_hdlr)
+ self.sock_hdlr.close()
+ finally:
+ BaseTest.tearDown(self)
- def serve_until_stopped(self):
- while not self.abort:
- rd, wr, ex = select.select([self.socket.fileno()], [], [],
- self.timeout)
- if rd:
- self.handle_request()
- # Notify the main thread that we're about to exit
- self.finished.set()
- # close the listen socket
- self.server_close()
+ def handle_datagram(self, request):
+ slen = struct.pack('>L', 0) # length of prefix
+ packet = request.packet[len(slen):]
+ obj = pickle.loads(packet)
+ record = logging.makeLogRecord(obj)
+ self.log_output += record.msg + '\n'
+ self.handled.set()
+
+ def test_output(self):
+ # The log message sent to the DatagramHandler is properly received.
+ logger = logging.getLogger("udp")
+ logger.error("spam")
+ self.handled.wait()
+ self.handled.clear()
+ logger.error("eggs")
+ self.handled.wait()
+ self.assertEqual(self.log_output, "spam\neggs\n")
@unittest.skipUnless(threading, 'Threading required for this test.')
-class SocketHandlerTest(BaseTest):
+class SysLogHandlerTest(BaseTest):
- """Test for SocketHandler objects."""
+ """Test for SysLogHandler using UDP."""
def setUp(self):
- """Set up a TCP server to receive log messages, and a SocketHandler
+ """Set up a UDP server to receive log messages, and a SysLogHandler
pointing to that server's address and port."""
BaseTest.setUp(self)
- self.tcpserver = LogRecordSocketReceiver(port=0)
- self.port = self.tcpserver.socket.getsockname()[1]
- self.threads = [
- threading.Thread(target=self.tcpserver.serve_until_stopped)]
- for thread in self.threads:
- thread.start()
-
- self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
- self.sock_hdlr.setFormatter(self.root_formatter)
+ addr = ('localhost', 0)
+ self.server = server = TestUDPServer(addr, self.handle_datagram,
+ 0.01)
+ server.start()
+ server.ready.wait()
+ self.sl_hdlr = logging.handlers.SysLogHandler(('localhost',
+ server.port))
+ self.log_output = ''
self.root_logger.removeHandler(self.root_logger.handlers[0])
- self.root_logger.addHandler(self.sock_hdlr)
+ self.root_logger.addHandler(self.sl_hdlr)
+ self.handled = threading.Event()
def tearDown(self):
- """Shutdown the TCP server."""
+ """Shutdown the UDP server."""
try:
- self.tcpserver.abort = True
- del self.tcpserver
- self.root_logger.removeHandler(self.sock_hdlr)
- self.sock_hdlr.close()
- for thread in self.threads:
- thread.join(2.0)
+ self.server.stop(2.0)
+ self.root_logger.removeHandler(self.sl_hdlr)
+ self.sl_hdlr.close()
finally:
BaseTest.tearDown(self)
- def get_output(self):
- """Get the log output as received by the TCP server."""
- # Signal the TCP receiver and wait for it to terminate.
- self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
- self.tcpserver.finished.wait(2.0)
- return self.tcpserver.log_output
+ def handle_datagram(self, request):
+ self.log_output = request.packet
+ self.handled.set()
def test_output(self):
- # The log message sent to the SocketHandler is properly received.
- logger = logging.getLogger("tcp")
- logger.error("spam")
- logger.debug("eggs")
- self.assertEqual(self.get_output(), "spam\neggs\n")
+ # The log message sent to the SysLogHandler is properly received.
+ logger = logging.getLogger("slh")
+ logger.error("sp\xe4m")
+ self.handled.wait()
+ self.assertEqual(self.log_output, b'<11>\xef\xbb\xbfsp\xc3\xa4m\x00')
+ self.handled.clear()
+ self.sl_hdlr.append_nul = False
+ logger.error("sp\xe4m")
+ self.handled.wait()
+ self.assertEqual(self.log_output, b'<11>\xef\xbb\xbfsp\xc3\xa4m')
+ self.handled.clear()
+ self.sl_hdlr.ident = "h\xe4m-"
+ logger.error("sp\xe4m")
+ self.handled.wait()
+ self.assertEqual(self.log_output, b'<11>\xef\xbb\xbfh\xc3\xa4m-sp\xc3\xa4m')
+
+
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class HTTPHandlerTest(BaseTest):
+ """Test for HTTPHandler."""
+
+ PEMFILE = """-----BEGIN RSA PRIVATE KEY-----
+MIICXQIBAAKBgQDGT4xS5r91rbLJQK2nUDenBhBG6qFk+bVOjuAGC/LSHlAoBnvG
+zQG3agOG+e7c5z2XT8m2ktORLqG3E4mYmbxgyhDrzP6ei2Anc+pszmnxPoK3Puh5
+aXV+XKt0bU0C1m2+ACmGGJ0t3P408art82nOxBw8ZHgIg9Dtp6xIUCyOqwIDAQAB
+AoGBAJFTnFboaKh5eUrIzjmNrKsG44jEyy+vWvHN/FgSC4l103HxhmWiuL5Lv3f7
+0tMp1tX7D6xvHwIG9VWvyKb/Cq9rJsDibmDVIOslnOWeQhG+XwJyitR0pq/KlJIB
+5LjORcBw795oKWOAi6RcOb1ON59tysEFYhAGQO9k6VL621gRAkEA/Gb+YXULLpbs
+piXN3q4zcHzeaVANo69tUZ6TjaQqMeTxE4tOYM0G0ZoSeHEdaP59AOZGKXXNGSQy
+2z/MddcYGQJBAMkjLSYIpOLJY11ja8OwwswFG2hEzHe0cS9bzo++R/jc1bHA5R0Y
+i6vA5iPi+wopPFvpytdBol7UuEBe5xZrxWMCQQCWxELRHiP2yWpEeLJ3gGDzoXMN
+PydWjhRju7Bx3AzkTtf+D6lawz1+eGTuEss5i0JKBkMEwvwnN2s1ce+EuF4JAkBb
+E96h1lAzkVW5OAfYOPY8RCPA90ZO/hoyg7PpSxR0ECuDrgERR8gXIeYUYfejBkEa
+rab4CfRoVJKKM28Yq/xZAkBvuq670JRCwOgfUTdww7WpdOQBYPkzQccsKNCslQW8
+/DyW6y06oQusSENUvynT6dr3LJxt/NgZPhZX2+k1eYDV
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIICGzCCAYSgAwIBAgIJAIq84a2Q/OvlMA0GCSqGSIb3DQEBBQUAMBQxEjAQBgNV
+BAMTCWxvY2FsaG9zdDAeFw0xMTA1MjExMDIzMzNaFw03NTAzMjEwMzU1MTdaMBQx
+EjAQBgNVBAMTCWxvY2FsaG9zdDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA
+xk+MUua/da2yyUCtp1A3pwYQRuqhZPm1To7gBgvy0h5QKAZ7xs0Bt2oDhvnu3Oc9
+l0/JtpLTkS6htxOJmJm8YMoQ68z+notgJ3PqbM5p8T6Ctz7oeWl1flyrdG1NAtZt
+vgAphhidLdz+NPGq7fNpzsQcPGR4CIPQ7aesSFAsjqsCAwEAAaN1MHMwHQYDVR0O
+BBYEFLWaUPO6N7efGiuoS9i3DVYcUwn0MEQGA1UdIwQ9MDuAFLWaUPO6N7efGiuo
+S9i3DVYcUwn0oRikFjAUMRIwEAYDVQQDEwlsb2NhbGhvc3SCCQCKvOGtkPzr5TAM
+BgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4GBAMK5whPjLNQK1Ivvk88oqJqq
+4f889OwikGP0eUhOBhbFlsZs+jq5YZC2UzHz+evzKBlgAP1u4lP/cB85CnjvWqM+
+1c/lywFHQ6HOdDeQ1L72tSYMrNOG4XNmLn0h7rx6GoTU7dcFRfseahBCq8mv0IDt
+IRbTpvlHWPjsSvHz0ZOH
+-----END CERTIFICATE-----"""
+
+ def setUp(self):
+ """Set up an HTTP server to receive log messages, and a HTTPHandler
+ pointing to that server's address and port."""
+ BaseTest.setUp(self)
+ self.handled = threading.Event()
+ def handle_request(self, request):
+ self.command = request.command
+ self.log_data = urlparse(request.path)
+ if self.command == 'POST':
+ try:
+ rlen = int(request.headers['Content-Length'])
+ self.post_data = request.rfile.read(rlen)
+ except:
+ self.post_data = None
+ request.send_response(200)
+ request.end_headers()
+ self.handled.set()
+
+ def test_output(self):
+ # The log message sent to the HTTPHandler is properly received.
+ logger = logging.getLogger("http")
+ root_logger = self.root_logger
+ root_logger.removeHandler(self.root_logger.handlers[0])
+ for secure in (False, True):
+ addr = ('localhost', 0)
+ if secure:
+ try:
+ import ssl
+ fd, fn = tempfile.mkstemp()
+ os.close(fd)
+ with open(fn, 'w') as f:
+ f.write(self.PEMFILE)
+ sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ sslctx.load_cert_chain(fn)
+ os.unlink(fn)
+ except ImportError:
+ sslctx = None
+ else:
+ sslctx = None
+ self.server = server = TestHTTPServer(addr, self.handle_request,
+ 0.01, sslctx=sslctx)
+ server.start()
+ server.ready.wait()
+ host = 'localhost:%d' % server.server_port
+ secure_client = secure and sslctx
+ self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob',
+ secure=secure_client)
+ self.log_data = None
+ root_logger.addHandler(self.h_hdlr)
+
+ for method in ('GET', 'POST'):
+ self.h_hdlr.method = method
+ self.handled.clear()
+ msg = "sp\xe4m"
+ logger.error(msg)
+ self.handled.wait()
+ self.assertEqual(self.log_data.path, '/frob')
+ self.assertEqual(self.command, method)
+ if method == 'GET':
+ d = parse_qs(self.log_data.query)
+ else:
+ d = parse_qs(self.post_data.decode('utf-8'))
+ self.assertEqual(d['name'], ['http'])
+ self.assertEqual(d['funcName'], ['test_output'])
+ self.assertEqual(d['msg'], [msg])
+
+ self.server.stop(2.0)
+ self.root_logger.removeHandler(self.h_hdlr)
+ self.h_hdlr.close()
class MemoryTest(BaseTest):
@@ -1085,28 +1700,39 @@ class WarningsTest(BaseTest):
def test_warnings(self):
with warnings.catch_warnings():
logging.captureWarnings(True)
- try:
- warnings.filterwarnings("always", category=UserWarning)
- file = io.StringIO()
- h = logging.StreamHandler(file)
- logger = logging.getLogger("py.warnings")
- logger.addHandler(h)
- warnings.warn("I'm warning you...")
- logger.removeHandler(h)
- s = file.getvalue()
- h.close()
- self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
-
- #See if an explicit file uses the original implementation
- file = io.StringIO()
- warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
- file, "Dummy line")
- s = file.getvalue()
- file.close()
- self.assertEqual(s,
- "dummy.py:42: UserWarning: Explicit\n Dummy line\n")
- finally:
- logging.captureWarnings(False)
+ self.addCleanup(logging.captureWarnings, False)
+ warnings.filterwarnings("always", category=UserWarning)
+ stream = io.StringIO()
+ h = logging.StreamHandler(stream)
+ logger = logging.getLogger("py.warnings")
+ logger.addHandler(h)
+ warnings.warn("I'm warning you...")
+ logger.removeHandler(h)
+ s = stream.getvalue()
+ h.close()
+ self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
+
+ #See if an explicit file uses the original implementation
+ a_file = io.StringIO()
+ warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
+ a_file, "Dummy line")
+ s = a_file.getvalue()
+ a_file.close()
+ self.assertEqual(s,
+ "dummy.py:42: UserWarning: Explicit\n Dummy line\n")
+
+ def test_warnings_no_handlers(self):
+ with warnings.catch_warnings():
+ logging.captureWarnings(True)
+ self.addCleanup(logging.captureWarnings, False)
+
+ # confirm our assumption: no loggers are set
+ logger = logging.getLogger("py.warnings")
+ self.assertEqual(logger.handlers, [])
+
+ warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
+ self.assertEqual(len(logger.handlers), 1)
+ self.assertIsInstance(logger.handlers[0], logging.NullHandler)
def formatFunc(format, datefmt=None):
@@ -1959,6 +2585,7 @@ class ConfigDictTest(BaseTest):
logging.config.stopListening()
t.join(2.0)
+ @unittest.skipUnless(threading, 'Threading required for this test.')
def test_listen_config_10_ok(self):
with captured_stdout() as output:
self.setup_via_listener(json.dumps(self.config10))
@@ -1978,6 +2605,7 @@ class ConfigDictTest(BaseTest):
('ERROR', '4'),
], stream=output)
+ @unittest.skipUnless(threading, 'Threading required for this test.')
def test_listen_config_1_ok(self):
with captured_stdout() as output:
self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
@@ -1992,6 +2620,27 @@ class ConfigDictTest(BaseTest):
# Original logger output is empty.
self.assert_log_lines([])
+ def test_baseconfig(self):
+ d = {
+ 'atuple': (1, 2, 3),
+ 'alist': ['a', 'b', 'c'],
+ 'adict': {'d': 'e', 'f': 3 },
+ 'nest1': ('g', ('h', 'i'), 'j'),
+ 'nest2': ['k', ['l', 'm'], 'n'],
+ 'nest3': ['o', 'cfg://alist', 'p'],
+ }
+ bc = logging.config.BaseConfigurator(d)
+ self.assertEqual(bc.convert('cfg://atuple[1]'), 2)
+ self.assertEqual(bc.convert('cfg://alist[1]'), 'b')
+ self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h')
+ self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm')
+ self.assertEqual(bc.convert('cfg://adict.d'), 'e')
+ self.assertEqual(bc.convert('cfg://adict[f]'), 3)
+ v = bc.convert('cfg://nest3')
+ self.assertEqual(v.pop(1), ['a', 'b', 'c'])
+ self.assertRaises(KeyError, bc.convert, 'cfg://nosuch')
+ self.assertRaises(ValueError, bc.convert, 'cfg://!')
+ self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]')
class ManagerTest(BaseTest):
def test_manager_loggerclass(self):
@@ -2010,6 +2659,11 @@ class ManagerTest(BaseTest):
self.assertEqual(logged, ['should appear in logged'])
+ def test_set_log_record_factory(self):
+ man = logging.Manager(None)
+ expected = object()
+ man.setLogRecordFactory(expected)
+ self.assertEqual(man.logRecordFactory, expected)
class ChildLoggerTest(BaseTest):
def test_child_loggers(self):
@@ -2111,6 +2765,18 @@ class QueueHandlerTest(BaseTest):
self.assertTrue(handler.matches(levelno=logging.ERROR, message='2'))
self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3'))
+ZERO = datetime.timedelta(0)
+
+class UTC(datetime.tzinfo):
+ def utcoffset(self, dt):
+ return ZERO
+
+ dst = utcoffset
+
+ def tzname(self, dt):
+ return 'UTC'
+
+utc = UTC()
class FormatterTest(unittest.TestCase):
def setUp(self):
@@ -2184,6 +2850,69 @@ class FormatterTest(unittest.TestCase):
f = logging.Formatter('asctime', style='$')
self.assertFalse(f.usesTime())
+ def test_invalid_style(self):
+ self.assertRaises(ValueError, logging.Formatter, None, None, 'x')
+
+ def test_time(self):
+ r = self.get_record()
+ dt = datetime.datetime(1993,4,21,8,3,0,0,utc)
+ r.created = time.mktime(dt.timetuple()) - time.timezone
+ r.msecs = 123
+ f = logging.Formatter('%(asctime)s %(message)s')
+ f.converter = time.gmtime
+ self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123')
+ self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21')
+ f.format(r)
+ self.assertEqual(r.asctime, '1993-04-21 08:03:00,123')
+
+class TestBufferingFormatter(logging.BufferingFormatter):
+ def formatHeader(self, records):
+ return '[(%d)' % len(records)
+
+ def formatFooter(self, records):
+ return '(%d)]' % len(records)
+
+class BufferingFormatterTest(unittest.TestCase):
+ def setUp(self):
+ self.records = [
+ logging.makeLogRecord({'msg': 'one'}),
+ logging.makeLogRecord({'msg': 'two'}),
+ ]
+
+ def test_default(self):
+ f = logging.BufferingFormatter()
+ self.assertEqual('', f.format([]))
+ self.assertEqual('onetwo', f.format(self.records))
+
+ def test_custom(self):
+ f = TestBufferingFormatter()
+ self.assertEqual('[(2)onetwo(2)]', f.format(self.records))
+ lf = logging.Formatter('<%(message)s>')
+ f = TestBufferingFormatter(lf)
+ self.assertEqual('[(2)<one><two>(2)]', f.format(self.records))
+
+class ExceptionTest(BaseTest):
+ def test_formatting(self):
+ r = self.root_logger
+ h = RecordingHandler()
+ r.addHandler(h)
+ try:
+ raise RuntimeError('deliberate mistake')
+ except:
+ logging.exception('failed', stack_info=True)
+ r.removeHandler(h)
+ h.close()
+ r = h.records[0]
+ self.assertTrue(r.exc_text.startswith('Traceback (most recent '
+ 'call last):\n'))
+ self.assertTrue(r.exc_text.endswith('\nRuntimeError: '
+ 'deliberate mistake'))
+ self.assertTrue(r.stack_info.startswith('Stack (most recent '
+ 'call last):\n'))
+ self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', '
+ 'stack_info=True)'))
+
+
class LastResortTest(BaseTest):
def test_last_resort(self):
# Test the last resort handler
@@ -2194,6 +2923,8 @@ class LastResortTest(BaseTest):
old_raise_exceptions = logging.raiseExceptions
try:
sys.stderr = sio = io.StringIO()
+ root.debug('This should not appear')
+ self.assertEqual(sio.getvalue(), '')
root.warning('This is your final chance!')
self.assertEqual(sio.getvalue(), 'This is your final chance!\n')
#No handlers and no last resort, so 'No handlers' message
@@ -2218,6 +2949,586 @@ class LastResortTest(BaseTest):
logging.raiseExceptions = old_raise_exceptions
+class FakeHandler:
+
+ def __init__(self, identifier, called):
+ for method in ('acquire', 'flush', 'close', 'release'):
+ setattr(self, method, self.record_call(identifier, method, called))
+
+ def record_call(self, identifier, method_name, called):
+ def inner():
+ called.append('{} - {}'.format(identifier, method_name))
+ return inner
+
+
+class RecordingHandler(logging.NullHandler):
+
+ def __init__(self, *args, **kwargs):
+ super(RecordingHandler, self).__init__(*args, **kwargs)
+ self.records = []
+
+ def handle(self, record):
+ """Keep track of all the emitted records."""
+ self.records.append(record)
+
+
+class ShutdownTest(BaseTest):
+
+ """Test suite for the shutdown method."""
+
+ def setUp(self):
+ super(ShutdownTest, self).setUp()
+ self.called = []
+
+ raise_exceptions = logging.raiseExceptions
+ self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions)
+
+ def raise_error(self, error):
+ def inner():
+ raise error()
+ return inner
+
+ def test_no_failure(self):
+ # create some fake handlers
+ handler0 = FakeHandler(0, self.called)
+ handler1 = FakeHandler(1, self.called)
+ handler2 = FakeHandler(2, self.called)
+
+ # create live weakref to those handlers
+ handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
+
+ logging.shutdown(handlerList=list(handlers))
+
+ expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
+ '1 - acquire', '1 - flush', '1 - close', '1 - release',
+ '0 - acquire', '0 - flush', '0 - close', '0 - release']
+ self.assertEqual(expected, self.called)
+
+ def _test_with_failure_in_method(self, method, error):
+ handler = FakeHandler(0, self.called)
+ setattr(handler, method, self.raise_error(error))
+ handlers = [logging.weakref.ref(handler)]
+
+ logging.shutdown(handlerList=list(handlers))
+
+ self.assertEqual('0 - release', self.called[-1])
+
+ def test_with_ioerror_in_acquire(self):
+ self._test_with_failure_in_method('acquire', IOError)
+
+ def test_with_ioerror_in_flush(self):
+ self._test_with_failure_in_method('flush', IOError)
+
+ def test_with_ioerror_in_close(self):
+ self._test_with_failure_in_method('close', IOError)
+
+ def test_with_valueerror_in_acquire(self):
+ self._test_with_failure_in_method('acquire', ValueError)
+
+ def test_with_valueerror_in_flush(self):
+ self._test_with_failure_in_method('flush', ValueError)
+
+ def test_with_valueerror_in_close(self):
+ self._test_with_failure_in_method('close', ValueError)
+
+ def test_with_other_error_in_acquire_without_raise(self):
+ logging.raiseExceptions = False
+ self._test_with_failure_in_method('acquire', IndexError)
+
+ def test_with_other_error_in_flush_without_raise(self):
+ logging.raiseExceptions = False
+ self._test_with_failure_in_method('flush', IndexError)
+
+ def test_with_other_error_in_close_without_raise(self):
+ logging.raiseExceptions = False
+ self._test_with_failure_in_method('close', IndexError)
+
+ def test_with_other_error_in_acquire_with_raise(self):
+ logging.raiseExceptions = True
+ self.assertRaises(IndexError, self._test_with_failure_in_method,
+ 'acquire', IndexError)
+
+ def test_with_other_error_in_flush_with_raise(self):
+ logging.raiseExceptions = True
+ self.assertRaises(IndexError, self._test_with_failure_in_method,
+ 'flush', IndexError)
+
+ def test_with_other_error_in_close_with_raise(self):
+ logging.raiseExceptions = True
+ self.assertRaises(IndexError, self._test_with_failure_in_method,
+ 'close', IndexError)
+
+
+class ModuleLevelMiscTest(BaseTest):
+
+ """Test suite for some module level methods."""
+
+ def test_disable(self):
+ old_disable = logging.root.manager.disable
+ # confirm our assumptions are correct
+ self.assertEqual(old_disable, 0)
+ self.addCleanup(logging.disable, old_disable)
+
+ logging.disable(83)
+ self.assertEqual(logging.root.manager.disable, 83)
+
+ def _test_log(self, method, level=None):
+ called = []
+ patch(self, logging, 'basicConfig',
+ lambda *a, **kw: called.append((a, kw)))
+
+ recording = RecordingHandler()
+ logging.root.addHandler(recording)
+
+ log_method = getattr(logging, method)
+ if level is not None:
+ log_method(level, "test me: %r", recording)
+ else:
+ log_method("test me: %r", recording)
+
+ self.assertEqual(len(recording.records), 1)
+ record = recording.records[0]
+ self.assertEqual(record.getMessage(), "test me: %r" % recording)
+
+ expected_level = level if level is not None else getattr(logging, method.upper())
+ self.assertEqual(record.levelno, expected_level)
+
+ # basicConfig was not called!
+ self.assertEqual(called, [])
+
+ def test_log(self):
+ self._test_log('log', logging.ERROR)
+
+ def test_debug(self):
+ self._test_log('debug')
+
+ def test_info(self):
+ self._test_log('info')
+
+ def test_warning(self):
+ self._test_log('warning')
+
+ def test_error(self):
+ self._test_log('error')
+
+ def test_critical(self):
+ self._test_log('critical')
+
+ def test_set_logger_class(self):
+ self.assertRaises(TypeError, logging.setLoggerClass, object)
+
+ class MyLogger(logging.Logger):
+ pass
+
+ logging.setLoggerClass(MyLogger)
+ self.assertEqual(logging.getLoggerClass(), MyLogger)
+
+ logging.setLoggerClass(logging.Logger)
+ self.assertEqual(logging.getLoggerClass(), logging.Logger)
+
+class LogRecordTest(BaseTest):
+ def test_str_rep(self):
+ r = logging.makeLogRecord({})
+ s = str(r)
+ self.assertTrue(s.startswith('<LogRecord: '))
+ self.assertTrue(s.endswith('>'))
+
+ def test_dict_arg(self):
+ h = RecordingHandler()
+ r = logging.getLogger()
+ r.addHandler(h)
+ d = {'less' : 'more' }
+ logging.warning('less is %(less)s', d)
+ self.assertIs(h.records[0].args, d)
+ self.assertEqual(h.records[0].message, 'less is more')
+ r.removeHandler(h)
+ h.close()
+
+ def test_multiprocessing(self):
+ r = logging.makeLogRecord({})
+ self.assertEqual(r.processName, 'MainProcess')
+ try:
+ import multiprocessing as mp
+ r = logging.makeLogRecord({})
+ self.assertEqual(r.processName, mp.current_process().name)
+ except ImportError:
+ pass
+
+ def test_optional(self):
+ r = logging.makeLogRecord({})
+ NOT_NONE = self.assertIsNotNone
+ if threading:
+ NOT_NONE(r.thread)
+ NOT_NONE(r.threadName)
+ NOT_NONE(r.process)
+ NOT_NONE(r.processName)
+ log_threads = logging.logThreads
+ log_processes = logging.logProcesses
+ log_multiprocessing = logging.logMultiprocessing
+ try:
+ logging.logThreads = False
+ logging.logProcesses = False
+ logging.logMultiprocessing = False
+ r = logging.makeLogRecord({})
+ NONE = self.assertIsNone
+ NONE(r.thread)
+ NONE(r.threadName)
+ NONE(r.process)
+ NONE(r.processName)
+ finally:
+ logging.logThreads = log_threads
+ logging.logProcesses = log_processes
+ logging.logMultiprocessing = log_multiprocessing
+
+class BasicConfigTest(unittest.TestCase):
+
+ """Test suite for logging.basicConfig."""
+
+ def setUp(self):
+ super(BasicConfigTest, self).setUp()
+ self.handlers = logging.root.handlers
+ self.saved_handlers = logging._handlers.copy()
+ self.saved_handler_list = logging._handlerList[:]
+ self.original_logging_level = logging.root.level
+ self.addCleanup(self.cleanup)
+ logging.root.handlers = []
+
+ def tearDown(self):
+ for h in logging.root.handlers[:]:
+ logging.root.removeHandler(h)
+ h.close()
+ super(BasicConfigTest, self).tearDown()
+
+ def cleanup(self):
+ setattr(logging.root, 'handlers', self.handlers)
+ logging._handlers.clear()
+ logging._handlers.update(self.saved_handlers)
+ logging._handlerList[:] = self.saved_handler_list
+ logging.root.level = self.original_logging_level
+
+ def test_no_kwargs(self):
+ logging.basicConfig()
+
+ # handler defaults to a StreamHandler to sys.stderr
+ self.assertEqual(len(logging.root.handlers), 1)
+ handler = logging.root.handlers[0]
+ self.assertIsInstance(handler, logging.StreamHandler)
+ self.assertEqual(handler.stream, sys.stderr)
+
+ formatter = handler.formatter
+ # format defaults to logging.BASIC_FORMAT
+ self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
+ # datefmt defaults to None
+ self.assertIsNone(formatter.datefmt)
+ # style defaults to %
+ self.assertIsInstance(formatter._style, logging.PercentStyle)
+
+ # level is not explicitly set
+ self.assertEqual(logging.root.level, self.original_logging_level)
+
+ def test_filename(self):
+ logging.basicConfig(filename='test.log')
+
+ self.assertEqual(len(logging.root.handlers), 1)
+ handler = logging.root.handlers[0]
+ self.assertIsInstance(handler, logging.FileHandler)
+
+ expected = logging.FileHandler('test.log', 'a')
+ self.addCleanup(expected.close)
+ self.assertEqual(handler.stream.mode, expected.stream.mode)
+ self.assertEqual(handler.stream.name, expected.stream.name)
+
+ def test_filemode(self):
+ logging.basicConfig(filename='test.log', filemode='wb')
+
+ handler = logging.root.handlers[0]
+ expected = logging.FileHandler('test.log', 'wb')
+ self.addCleanup(expected.close)
+ self.assertEqual(handler.stream.mode, expected.stream.mode)
+
+ def test_stream(self):
+ stream = io.StringIO()
+ self.addCleanup(stream.close)
+ logging.basicConfig(stream=stream)
+
+ self.assertEqual(len(logging.root.handlers), 1)
+ handler = logging.root.handlers[0]
+ self.assertIsInstance(handler, logging.StreamHandler)
+ self.assertEqual(handler.stream, stream)
+
+ def test_format(self):
+ logging.basicConfig(format='foo')
+
+ formatter = logging.root.handlers[0].formatter
+ self.assertEqual(formatter._style._fmt, 'foo')
+
+ def test_datefmt(self):
+ logging.basicConfig(datefmt='bar')
+
+ formatter = logging.root.handlers[0].formatter
+ self.assertEqual(formatter.datefmt, 'bar')
+
+ def test_style(self):
+ logging.basicConfig(style='$')
+
+ formatter = logging.root.handlers[0].formatter
+ self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
+
+ def test_level(self):
+ old_level = logging.root.level
+ self.addCleanup(logging.root.setLevel, old_level)
+
+ logging.basicConfig(level=57)
+ self.assertEqual(logging.root.level, 57)
+ # Test that second call has no effect
+ logging.basicConfig(level=58)
+ self.assertEqual(logging.root.level, 57)
+
+ def test_incompatible(self):
+ assertRaises = self.assertRaises
+ handlers = [logging.StreamHandler()]
+ stream = sys.stderr
+ assertRaises(ValueError, logging.basicConfig, filename='test.log',
+ stream=stream)
+ assertRaises(ValueError, logging.basicConfig, filename='test.log',
+ handlers=handlers)
+ assertRaises(ValueError, logging.basicConfig, stream=stream,
+ handlers=handlers)
+
+ def test_handlers(self):
+ handlers = [
+ logging.StreamHandler(),
+ logging.StreamHandler(sys.stdout),
+ logging.StreamHandler(),
+ ]
+ f = logging.Formatter()
+ handlers[2].setFormatter(f)
+ logging.basicConfig(handlers=handlers)
+ self.assertIs(handlers[0], logging.root.handlers[0])
+ self.assertIs(handlers[1], logging.root.handlers[1])
+ self.assertIs(handlers[2], logging.root.handlers[2])
+ self.assertIsNotNone(handlers[0].formatter)
+ self.assertIsNotNone(handlers[1].formatter)
+ self.assertIs(handlers[2].formatter, f)
+ self.assertIs(handlers[0].formatter, handlers[1].formatter)
+
+ def _test_log(self, method, level=None):
+ # logging.root has no handlers so basicConfig should be called
+ called = []
+
+ old_basic_config = logging.basicConfig
+ def my_basic_config(*a, **kw):
+ old_basic_config()
+ old_level = logging.root.level
+ logging.root.setLevel(100) # avoid having messages in stderr
+ self.addCleanup(logging.root.setLevel, old_level)
+ called.append((a, kw))
+
+ patch(self, logging, 'basicConfig', my_basic_config)
+
+ log_method = getattr(logging, method)
+ if level is not None:
+ log_method(level, "test me")
+ else:
+ log_method("test me")
+
+ # basicConfig was called with no arguments
+ self.assertEqual(called, [((), {})])
+
+ def test_log(self):
+ self._test_log('log', logging.WARNING)
+
+ def test_debug(self):
+ self._test_log('debug')
+
+ def test_info(self):
+ self._test_log('info')
+
+ def test_warning(self):
+ self._test_log('warning')
+
+ def test_error(self):
+ self._test_log('error')
+
+ def test_critical(self):
+ self._test_log('critical')
+
+
+class LoggerAdapterTest(unittest.TestCase):
+
+ def setUp(self):
+ super(LoggerAdapterTest, self).setUp()
+ old_handler_list = logging._handlerList[:]
+
+ self.recording = RecordingHandler()
+ self.logger = logging.root
+ self.logger.addHandler(self.recording)
+ self.addCleanup(self.logger.removeHandler, self.recording)
+ self.addCleanup(self.recording.close)
+
+ def cleanup():
+ logging._handlerList[:] = old_handler_list
+
+ self.addCleanup(cleanup)
+ self.addCleanup(logging.shutdown)
+ self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
+
+ def test_exception(self):
+ msg = 'testing exception: %r'
+ exc = None
+ try:
+ 1 / 0
+ except ZeroDivisionError as e:
+ exc = e
+ self.adapter.exception(msg, self.recording)
+
+ self.assertEqual(len(self.recording.records), 1)
+ record = self.recording.records[0]
+ self.assertEqual(record.levelno, logging.ERROR)
+ self.assertEqual(record.msg, msg)
+ self.assertEqual(record.args, (self.recording,))
+ self.assertEqual(record.exc_info,
+ (exc.__class__, exc, exc.__traceback__))
+
+ def test_critical(self):
+ msg = 'critical test! %r'
+ self.adapter.critical(msg, self.recording)
+
+ self.assertEqual(len(self.recording.records), 1)
+ record = self.recording.records[0]
+ self.assertEqual(record.levelno, logging.CRITICAL)
+ self.assertEqual(record.msg, msg)
+ self.assertEqual(record.args, (self.recording,))
+
+ def test_is_enabled_for(self):
+ old_disable = self.adapter.logger.manager.disable
+ self.adapter.logger.manager.disable = 33
+ self.addCleanup(setattr, self.adapter.logger.manager, 'disable',
+ old_disable)
+ self.assertFalse(self.adapter.isEnabledFor(32))
+
+ def test_has_handlers(self):
+ self.assertTrue(self.adapter.hasHandlers())
+
+ for handler in self.logger.handlers:
+ self.logger.removeHandler(handler)
+
+ self.assertFalse(self.logger.hasHandlers())
+ self.assertFalse(self.adapter.hasHandlers())
+
+
+class LoggerTest(BaseTest):
+
+ def setUp(self):
+ super(LoggerTest, self).setUp()
+ self.recording = RecordingHandler()
+ self.logger = logging.Logger(name='blah')
+ self.logger.addHandler(self.recording)
+ self.addCleanup(self.logger.removeHandler, self.recording)
+ self.addCleanup(self.recording.close)
+ self.addCleanup(logging.shutdown)
+
+ def test_set_invalid_level(self):
+ self.assertRaises(TypeError, self.logger.setLevel, object())
+
+ def test_exception(self):
+ msg = 'testing exception: %r'
+ exc = None
+ try:
+ 1 / 0
+ except ZeroDivisionError as e:
+ exc = e
+ self.logger.exception(msg, self.recording)
+
+ self.assertEqual(len(self.recording.records), 1)
+ record = self.recording.records[0]
+ self.assertEqual(record.levelno, logging.ERROR)
+ self.assertEqual(record.msg, msg)
+ self.assertEqual(record.args, (self.recording,))
+ self.assertEqual(record.exc_info,
+ (exc.__class__, exc, exc.__traceback__))
+
+ def test_log_invalid_level_with_raise(self):
+ old_raise = logging.raiseExceptions
+ self.addCleanup(setattr, logging, 'raiseExecptions', old_raise)
+
+ logging.raiseExceptions = True
+ self.assertRaises(TypeError, self.logger.log, '10', 'test message')
+
+ def test_log_invalid_level_no_raise(self):
+ old_raise = logging.raiseExceptions
+ self.addCleanup(setattr, logging, 'raiseExecptions', old_raise)
+
+ logging.raiseExceptions = False
+ self.logger.log('10', 'test message') # no exception happens
+
+ def test_find_caller_with_stack_info(self):
+ called = []
+ patch(self, logging.traceback, 'print_stack',
+ lambda f, file: called.append(file.getvalue()))
+
+ self.logger.findCaller(stack_info=True)
+
+ self.assertEqual(len(called), 1)
+ self.assertEqual('Stack (most recent call last):\n', called[0])
+
+ def test_make_record_with_extra_overwrite(self):
+ name = 'my record'
+ level = 13
+ fn = lno = msg = args = exc_info = func = sinfo = None
+ rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
+ exc_info, func, sinfo)
+
+ for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
+ extra = {key: 'some value'}
+ self.assertRaises(KeyError, self.logger.makeRecord, name, level,
+ fn, lno, msg, args, exc_info,
+ extra=extra, sinfo=sinfo)
+
+ def test_make_record_with_extra_no_overwrite(self):
+ name = 'my record'
+ level = 13
+ fn = lno = msg = args = exc_info = func = sinfo = None
+ extra = {'valid_key': 'some value'}
+ result = self.logger.makeRecord(name, level, fn, lno, msg, args,
+ exc_info, extra=extra, sinfo=sinfo)
+ self.assertIn('valid_key', result.__dict__)
+
+ def test_has_handlers(self):
+ self.assertTrue(self.logger.hasHandlers())
+
+ for handler in self.logger.handlers:
+ self.logger.removeHandler(handler)
+ self.assertFalse(self.logger.hasHandlers())
+
+ def test_has_handlers_no_propagate(self):
+ child_logger = logging.getLogger('blah.child')
+ child_logger.propagate = False
+ self.assertFalse(child_logger.hasHandlers())
+
+ def test_is_enabled_for(self):
+ old_disable = self.logger.manager.disable
+ self.logger.manager.disable = 23
+ self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
+ self.assertFalse(self.logger.isEnabledFor(22))
+
+ def test_root_logger_aliases(self):
+ root = logging.getLogger()
+ self.assertIs(root, logging.root)
+ self.assertIs(root, logging.getLogger(None))
+ self.assertIs(root, logging.getLogger(''))
+ self.assertIs(root, logging.getLogger('foo').root)
+ self.assertIs(root, logging.getLogger('foo.bar').root)
+ self.assertIs(root, logging.getLogger('foo').parent)
+
+ self.assertIsNot(root, logging.getLogger('\0'))
+ self.assertIsNot(root, logging.getLogger('foo.bar').parent)
+
+ def test_invalid_names(self):
+ self.assertRaises(TypeError, logging.getLogger, any)
+ self.assertRaises(TypeError, logging.getLogger, b'foo')
+
+
class BaseFileTest(BaseTest):
"Base class for handler tests that write log files"
@@ -2237,10 +3548,21 @@ class BaseFileTest(BaseTest):
def assertLogFile(self, filename):
"Assert a log file is there and register it for deletion"
self.assertTrue(os.path.exists(filename),
- msg="Log file %r does not exist")
+ msg="Log file %r does not exist" % filename)
self.rmfiles.append(filename)
+class FileHandlerTest(BaseFileTest):
+ def test_delay(self):
+ os.unlink(self.fn)
+ fh = logging.FileHandler(self.fn, delay=True)
+ self.assertIsNone(fh.stream)
+ self.assertFalse(os.path.exists(self.fn))
+ fh.handle(logging.makeLogRecord({}))
+ self.assertIsNotNone(fh.stream)
+ self.assertTrue(os.path.exists(self.fn))
+ fh.close()
+
class RotatingFileHandlerTest(BaseFileTest):
def next_rec(self):
return logging.LogRecord('n', logging.DEBUG, 'p', 1,
@@ -2266,20 +3588,108 @@ class RotatingFileHandlerTest(BaseFileTest):
rh.close()
def test_rollover_filenames(self):
+ def namer(name):
+ return name + ".test"
rh = logging.handlers.RotatingFileHandler(
self.fn, backupCount=2, maxBytes=1)
+ rh.namer = namer
rh.emit(self.next_rec())
self.assertLogFile(self.fn)
rh.emit(self.next_rec())
- self.assertLogFile(self.fn + ".1")
+ self.assertLogFile(namer(self.fn + ".1"))
rh.emit(self.next_rec())
- self.assertLogFile(self.fn + ".2")
- self.assertFalse(os.path.exists(self.fn + ".3"))
+ self.assertLogFile(namer(self.fn + ".2"))
+ self.assertFalse(os.path.exists(namer(self.fn + ".3")))
+ rh.close()
+
+ def test_rotator(self):
+ def namer(name):
+ return name + ".gz"
+
+ def rotator(source, dest):
+ with open(source, "rb") as sf:
+ data = sf.read()
+ compressed = zlib.compress(data, 9)
+ with open(dest, "wb") as df:
+ df.write(compressed)
+ os.remove(source)
+
+ rh = logging.handlers.RotatingFileHandler(
+ self.fn, backupCount=2, maxBytes=1)
+ rh.rotator = rotator
+ rh.namer = namer
+ m1 = self.next_rec()
+ rh.emit(m1)
+ self.assertLogFile(self.fn)
+ m2 = self.next_rec()
+ rh.emit(m2)
+ fn = namer(self.fn + ".1")
+ self.assertLogFile(fn)
+ newline = os.linesep
+ with open(fn, "rb") as f:
+ compressed = f.read()
+ data = zlib.decompress(compressed)
+ self.assertEqual(data.decode("ascii"), m1.msg + newline)
+ rh.emit(self.next_rec())
+ fn = namer(self.fn + ".2")
+ self.assertLogFile(fn)
+ with open(fn, "rb") as f:
+ compressed = f.read()
+ data = zlib.decompress(compressed)
+ self.assertEqual(data.decode("ascii"), m1.msg + newline)
+ rh.emit(self.next_rec())
+ fn = namer(self.fn + ".2")
+ with open(fn, "rb") as f:
+ compressed = f.read()
+ data = zlib.decompress(compressed)
+ self.assertEqual(data.decode("ascii"), m2.msg + newline)
+ self.assertFalse(os.path.exists(namer(self.fn + ".3")))
rh.close()
class TimedRotatingFileHandlerTest(BaseFileTest):
- # test methods added below
- pass
+ # other test methods added below
+ def test_rollover(self):
+ fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S',
+ backupCount=1)
+ r = logging.makeLogRecord({'msg': 'testing'})
+ fh.emit(r)
+ self.assertLogFile(self.fn)
+ time.sleep(1.01) # just a little over a second ...
+ fh.emit(r)
+ fh.close()
+ # At this point, we should have a recent rotated file which we
+ # can test for the existence of. However, in practice, on some
+ # machines which run really slowly, we don't know how far back
+ # in time to go to look for the log file. So, we go back a fair
+ # bit, and stop as soon as we see a rotated file. In theory this
+ # could of course still fail, but the chances are lower.
+ found = False
+ now = datetime.datetime.now()
+ GO_BACK = 5 * 60 # seconds
+ for secs in range(GO_BACK):
+ prev = now - datetime.timedelta(seconds=secs)
+ fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S")
+ found = os.path.exists(fn)
+ if found:
+ self.rmfiles.append(fn)
+ break
+ msg = 'No rotated files found, went back %d seconds' % GO_BACK
+ if not found:
+ #print additional diagnostics
+ dn, fn = os.path.split(self.fn)
+ files = [f for f in os.listdir(dn) if f.startswith(fn)]
+ print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr)
+ print('The only matching files are: %s' % files, file=sys.stderr)
+ self.assertTrue(found, msg=msg)
+
+ def test_invalid(self):
+ assertRaises = self.assertRaises
+ assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
+ self.fn, 'X', delay=True)
+ assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
+ self.fn, 'W', delay=True)
+ assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
+ self.fn, 'W7', delay=True)
def secs(**kw):
return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
@@ -2328,19 +3738,51 @@ for when, exp in (('S', 1),
rh.close()
setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover)
+
+@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil required for this test.')
+class NTEventLogHandlerTest(BaseTest):
+ def test_basic(self):
+ logtype = 'Application'
+ elh = win32evtlog.OpenEventLog(None, logtype)
+ num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
+ h = logging.handlers.NTEventLogHandler('test_logging')
+ r = logging.makeLogRecord({'msg': 'Test Log Message'})
+ h.handle(r)
+ h.close()
+ # Now see if the event is recorded
+ self.assertTrue(num_recs < win32evtlog.GetNumberOfEventLogRecords(elh))
+ flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
+ win32evtlog.EVENTLOG_SEQUENTIAL_READ
+ found = False
+ GO_BACK = 100
+ events = win32evtlog.ReadEventLog(elh, flags, GO_BACK)
+ for e in events:
+ if e.SourceName != 'test_logging':
+ continue
+ msg = win32evtlogutil.SafeFormatMessage(e, logtype)
+ if msg != 'Test Log Message\r\n':
+ continue
+ found = True
+ break
+ msg = 'Record not found in event log, went back %d records' % GO_BACK
+ self.assertTrue(found, msg=msg)
+
# Set the locale to the platform-dependent default. I have no idea
# why the test does this, but in any case we save the current locale
# first and restore it at the end.
@run_with_locale('LC_ALL', '')
def test_main():
run_unittest(BuiltinLevelsTest, BasicFilterTest,
- CustomLevelsAndFiltersTest, MemoryHandlerTest,
- ConfigFileTest, SocketHandlerTest, MemoryTest,
- EncodingTest, WarningsTest, ConfigDictTest, ManagerTest,
- FormatterTest,
- LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest,
- RotatingFileHandlerTest,
- LastResortTest,
+ CustomLevelsAndFiltersTest, HandlerTest, MemoryHandlerTest,
+ ConfigFileTest, SocketHandlerTest, DatagramHandlerTest,
+ MemoryTest, EncodingTest, WarningsTest, ConfigDictTest,
+ ManagerTest, FormatterTest, BufferingFormatterTest,
+ StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest,
+ QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest,
+ BasicConfigTest, LoggerAdapterTest, LoggerTest,
+ SMTPHandlerTest, FileHandlerTest, RotatingFileHandlerTest,
+ LastResortTest, LogRecordTest, ExceptionTest,
+ SysLogHandlerTest, HTTPHandlerTest, NTEventLogHandlerTest,
TimedRotatingFileHandlerTest
)