diff options
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r-- | Lib/test/test_logging.py | 1787 |
1 files changed, 1622 insertions, 165 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index 2d264f8..cb908fb 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -37,12 +37,11 @@ import random import re import select import socket -from socketserver import ThreadingTCPServer, StreamRequestHandler import struct import sys import tempfile -from test.support import captured_stdout, run_with_locale, run_unittest -from test.support import TestHandler, Matcher +from test.support import (captured_stdout, run_with_locale, run_unittest, + patch, requires_zlib, TestHandler, Matcher) import textwrap import time import unittest @@ -50,8 +49,31 @@ import warnings import weakref try: import threading + # The following imports are needed only for tests which + # require threading + import asynchat + import asyncore + import errno + from http.server import HTTPServer, BaseHTTPRequestHandler + import smtpd + from urllib.parse import urlparse, parse_qs + from socketserver import (ThreadingUDPServer, DatagramRequestHandler, + ThreadingTCPServer, StreamRequestHandler) except ImportError: threading = None +try: + import win32evtlog +except ImportError: + win32evtlog = None +try: + import win32evtlogutil +except ImportError: + win32evtlogutil = None + win32evtlog = None +try: + import zlib +except ImportError: + pass class BaseTest(unittest.TestCase): @@ -79,9 +101,7 @@ class BaseTest(unittest.TestCase): finally: logging._releaseLock() - # Set two unused loggers: one non-ASCII and one Unicode. - # This is to test correct operation when sorting existing - # loggers in the configuration code. See issue 8201. + # Set two unused loggers self.logger1 = logging.getLogger("\xab\xd7\xbb") self.logger2 = logging.getLogger("\u013f\u00d6\u0047") @@ -142,8 +162,7 @@ class BaseTest(unittest.TestCase): except AttributeError: # StringIO.StringIO lacks a reset() method. actual_lines = stream.getvalue().splitlines() - self.assertEqual(len(actual_lines), len(expected_values), - '%s vs. %s' % (actual_lines, expected_values)) + self.assertEqual(len(actual_lines), len(expected_values)) for actual, expected in zip(actual_lines, expected_values): match = pat.search(actual) if not match: @@ -181,17 +200,17 @@ class BuiltinLevelsTest(BaseTest): INF.log(logging.CRITICAL, m()) INF.error(m()) - INF.warn(m()) + INF.warning(m()) INF.info(m()) DEB.log(logging.CRITICAL, m()) DEB.error(m()) - DEB.warn (m()) - DEB.info (m()) + DEB.warning(m()) + DEB.info(m()) DEB.debug(m()) # These should not log. - ERR.warn(m()) + ERR.warning(m()) ERR.info(m()) ERR.debug(m()) @@ -225,7 +244,7 @@ class BuiltinLevelsTest(BaseTest): INF_ERR.error(m()) # These should not log. - INF_ERR.warn(m()) + INF_ERR.warning(m()) INF_ERR.info(m()) INF_ERR.debug(m()) @@ -249,14 +268,14 @@ class BuiltinLevelsTest(BaseTest): # These should log. INF_UNDEF.log(logging.CRITICAL, m()) INF_UNDEF.error(m()) - INF_UNDEF.warn(m()) + INF_UNDEF.warning(m()) INF_UNDEF.info(m()) INF_ERR_UNDEF.log(logging.CRITICAL, m()) INF_ERR_UNDEF.error(m()) # These should not log. INF_UNDEF.debug(m()) - INF_ERR_UNDEF.warn(m()) + INF_ERR_UNDEF.warning(m()) INF_ERR_UNDEF.info(m()) INF_ERR_UNDEF.debug(m()) @@ -295,8 +314,6 @@ class BuiltinLevelsTest(BaseTest): ('INF.BADPARENT', 'INFO', '4'), ]) - def test_invalid_name(self): - self.assertRaises(TypeError, logging.getLogger, any) class BasicFilterTest(BaseTest): @@ -355,6 +372,10 @@ class BasicFilterTest(BaseTest): finally: handler.removeFilter(filterfunc) + def test_empty_filter(self): + f = logging.Filter() + r = logging.makeLogRecord({'name': 'spam.eggs'}) + self.assertTrue(f.filter(r)) # # First, we define our levels. There can be as many as you want - the only @@ -498,6 +519,478 @@ class CustomLevelsAndFiltersTest(BaseTest): handler.removeFilter(garr) +class HandlerTest(BaseTest): + def test_name(self): + h = logging.Handler() + h.name = 'generic' + self.assertEqual(h.name, 'generic') + h.name = 'anothergeneric' + self.assertEqual(h.name, 'anothergeneric') + self.assertRaises(NotImplementedError, h.emit, None) + + def test_builtin_handlers(self): + # We can't actually *use* too many handlers in the tests, + # but we can try instantiating them with various options + if sys.platform in ('linux', 'darwin'): + for existing in (True, False): + fd, fn = tempfile.mkstemp() + os.close(fd) + if not existing: + os.unlink(fn) + h = logging.handlers.WatchedFileHandler(fn, delay=True) + if existing: + dev, ino = h.dev, h.ino + self.assertEqual(dev, -1) + self.assertEqual(ino, -1) + r = logging.makeLogRecord({'msg': 'Test'}) + h.handle(r) + # Now remove the file. + os.unlink(fn) + self.assertFalse(os.path.exists(fn)) + # The next call should recreate the file. + h.handle(r) + self.assertTrue(os.path.exists(fn)) + else: + self.assertEqual(h.dev, -1) + self.assertEqual(h.ino, -1) + h.close() + if existing: + os.unlink(fn) + if sys.platform == 'darwin': + sockname = '/var/run/syslog' + else: + sockname = '/dev/log' + try: + h = logging.handlers.SysLogHandler(sockname) + self.assertEqual(h.facility, h.LOG_USER) + self.assertTrue(h.unixsocket) + h.close() + except socket.error: # syslogd might not be available + pass + for method in ('GET', 'POST', 'PUT'): + if method == 'PUT': + self.assertRaises(ValueError, logging.handlers.HTTPHandler, + 'localhost', '/log', method) + else: + h = logging.handlers.HTTPHandler('localhost', '/log', method) + h.close() + h = logging.handlers.BufferingHandler(0) + r = logging.makeLogRecord({}) + self.assertTrue(h.shouldFlush(r)) + h.close() + h = logging.handlers.BufferingHandler(1) + self.assertFalse(h.shouldFlush(r)) + h.close() + + @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.') + @unittest.skipUnless(threading, 'Threading required for this test.') + def test_race(self): + # Issue #14632 refers. + def remove_loop(fname, tries): + for _ in range(tries): + try: + os.unlink(fname) + except OSError: + pass + time.sleep(0.004 * random.randint(0, 4)) + + del_count = 500 + log_count = 500 + + for delay in (False, True): + fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') + os.close(fd) + remover = threading.Thread(target=remove_loop, args=(fn, del_count)) + remover.daemon = True + remover.start() + h = logging.handlers.WatchedFileHandler(fn, delay=delay) + f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') + h.setFormatter(f) + try: + for _ in range(log_count): + time.sleep(0.005) + r = logging.makeLogRecord({'msg': 'testing' }) + h.handle(r) + finally: + remover.join() + h.close() + if os.path.exists(fn): + os.unlink(fn) + + +class BadStream(object): + def write(self, data): + raise RuntimeError('deliberate mistake') + +class TestStreamHandler(logging.StreamHandler): + def handleError(self, record): + self.error_record = record + +class StreamHandlerTest(BaseTest): + def test_error_handling(self): + h = TestStreamHandler(BadStream()) + r = logging.makeLogRecord({}) + old_raise = logging.raiseExceptions + old_stderr = sys.stderr + try: + h.handle(r) + self.assertIs(h.error_record, r) + h = logging.StreamHandler(BadStream()) + sys.stderr = sio = io.StringIO() + h.handle(r) + self.assertIn('\nRuntimeError: deliberate mistake\n', + sio.getvalue()) + logging.raiseExceptions = False + sys.stderr = sio = io.StringIO() + h.handle(r) + self.assertEqual('', sio.getvalue()) + finally: + logging.raiseExceptions = old_raise + sys.stderr = old_stderr + +# -- The following section could be moved into a server_helper.py module +# -- if it proves to be of wider utility than just test_logging + +if threading: + class TestSMTPChannel(smtpd.SMTPChannel): + """ + This derived class has had to be created because smtpd does not + support use of custom channel maps, although they are allowed by + asyncore's design. Issue #11959 has been raised to address this, + and if resolved satisfactorily, some of this code can be removed. + """ + def __init__(self, server, conn, addr, sockmap): + asynchat.async_chat.__init__(self, conn, sockmap) + self.smtp_server = server + self.conn = conn + self.addr = addr + self.data_size_limit = None + self.received_lines = [] + self.smtp_state = self.COMMAND + self.seen_greeting = '' + self.mailfrom = None + self.rcpttos = [] + self.received_data = '' + self.fqdn = socket.getfqdn() + self.num_bytes = 0 + try: + self.peer = conn.getpeername() + except socket.error as err: + # a race condition may occur if the other end is closing + # before we can get the peername + self.close() + if err.args[0] != errno.ENOTCONN: + raise + return + self.push('220 %s %s' % (self.fqdn, smtpd.__version__)) + self.set_terminator(b'\r\n') + self.extended_smtp = False + + + class TestSMTPServer(smtpd.SMTPServer): + """ + This class implements a test SMTP server. + + :param addr: A (host, port) tuple which the server listens on. + You can specify a port value of zero: the server's + *port* attribute will hold the actual port number + used, which can be used in client connections. + :param handler: A callable which will be called to process + incoming messages. The handler will be passed + the client address tuple, who the message is from, + a list of recipients and the message data. + :param poll_interval: The interval, in seconds, used in the underlying + :func:`select` or :func:`poll` call by + :func:`asyncore.loop`. + :param sockmap: A dictionary which will be used to hold + :class:`asyncore.dispatcher` instances used by + :func:`asyncore.loop`. This avoids changing the + :mod:`asyncore` module's global state. + """ + channel_class = TestSMTPChannel + + def __init__(self, addr, handler, poll_interval, sockmap): + self._localaddr = addr + self._remoteaddr = None + self.data_size_limit = None + self.sockmap = sockmap + asyncore.dispatcher.__init__(self, map=sockmap) + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setblocking(0) + self.set_socket(sock, map=sockmap) + # try to re-use a server port if possible + self.set_reuse_addr() + self.bind(addr) + self.port = sock.getsockname()[1] + self.listen(5) + except: + self.close() + raise + self._handler = handler + self._thread = None + self.poll_interval = poll_interval + + def handle_accepted(self, conn, addr): + """ + Redefined only because the base class does not pass in a + map, forcing use of a global in :mod:`asyncore`. + """ + channel = self.channel_class(self, conn, addr, self.sockmap) + + def process_message(self, peer, mailfrom, rcpttos, data): + """ + Delegates to the handler passed in to the server's constructor. + + Typically, this will be a test case method. + :param peer: The client (host, port) tuple. + :param mailfrom: The address of the sender. + :param rcpttos: The addresses of the recipients. + :param data: The message. + """ + self._handler(peer, mailfrom, rcpttos, data) + + def start(self): + """ + Start the server running on a separate daemon thread. + """ + self._thread = t = threading.Thread(target=self.serve_forever, + args=(self.poll_interval,)) + t.setDaemon(True) + t.start() + + def serve_forever(self, poll_interval): + """ + Run the :mod:`asyncore` loop until normal termination + conditions arise. + :param poll_interval: The interval, in seconds, used in the underlying + :func:`select` or :func:`poll` call by + :func:`asyncore.loop`. + """ + try: + asyncore.loop(poll_interval, map=self.sockmap) + except select.error: + # On FreeBSD 8, closing the server repeatably + # raises this error. We swallow it if the + # server has been closed. + if self.connected or self.accepting: + raise + + def stop(self, timeout=None): + """ + Stop the thread by closing the server instance. + Wait for the server thread to terminate. + + :param timeout: How long to wait for the server thread + to terminate. + """ + self.close() + self._thread.join(timeout) + self._thread = None + + class ControlMixin(object): + """ + This mixin is used to start a server on a separate thread, and + shut it down programmatically. Request handling is simplified - instead + of needing to derive a suitable RequestHandler subclass, you just + provide a callable which will be passed each received request to be + processed. + + :param handler: A handler callable which will be called with a + single parameter - the request - in order to + process the request. This handler is called on the + server thread, effectively meaning that requests are + processed serially. While not quite Web scale ;-), + this should be fine for testing applications. + :param poll_interval: The polling interval in seconds. + """ + def __init__(self, handler, poll_interval): + self._thread = None + self.poll_interval = poll_interval + self._handler = handler + self.ready = threading.Event() + + def start(self): + """ + Create a daemon thread to run the server, and start it. + """ + self._thread = t = threading.Thread(target=self.serve_forever, + args=(self.poll_interval,)) + t.setDaemon(True) + t.start() + + def serve_forever(self, poll_interval): + """ + Run the server. Set the ready flag before entering the + service loop. + """ + self.ready.set() + super(ControlMixin, self).serve_forever(poll_interval) + + def stop(self, timeout=None): + """ + Tell the server thread to stop, and wait for it to do so. + + :param timeout: How long to wait for the server thread + to terminate. + """ + self.shutdown() + if self._thread is not None: + self._thread.join(timeout) + self._thread = None + self.server_close() + self.ready.clear() + + class TestHTTPServer(ControlMixin, HTTPServer): + """ + An HTTP server which is controllable using :class:`ControlMixin`. + + :param addr: A tuple with the IP address and port to listen on. + :param handler: A handler callable which will be called with a + single parameter - the request - in order to + process the request. + :param poll_interval: The polling interval in seconds. + :param log: Pass ``True`` to enable log messages. + """ + def __init__(self, addr, handler, poll_interval=0.5, + log=False, sslctx=None): + class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): + def __getattr__(self, name, default=None): + if name.startswith('do_'): + return self.process_request + raise AttributeError(name) + + def process_request(self): + self.server._handler(self) + + def log_message(self, format, *args): + if log: + super(DelegatingHTTPRequestHandler, + self).log_message(format, *args) + HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) + ControlMixin.__init__(self, handler, poll_interval) + self.sslctx = sslctx + + def get_request(self): + try: + sock, addr = self.socket.accept() + if self.sslctx: + sock = self.sslctx.wrap_socket(sock, server_side=True) + except socket.error as e: + # socket errors are silenced by the caller, print them here + sys.stderr.write("Got an error:\n%s\n" % e) + raise + return sock, addr + + class TestTCPServer(ControlMixin, ThreadingTCPServer): + """ + A TCP server which is controllable using :class:`ControlMixin`. + + :param addr: A tuple with the IP address and port to listen on. + :param handler: A handler callable which will be called with a single + parameter - the request - in order to process the request. + :param poll_interval: The polling interval in seconds. + :bind_and_activate: If True (the default), binds the server and starts it + listening. If False, you need to call + :meth:`server_bind` and :meth:`server_activate` at + some later time before calling :meth:`start`, so that + the server will set up the socket and listen on it. + """ + + allow_reuse_address = True + + def __init__(self, addr, handler, poll_interval=0.5, + bind_and_activate=True): + class DelegatingTCPRequestHandler(StreamRequestHandler): + + def handle(self): + self.server._handler(self) + ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, + bind_and_activate) + ControlMixin.__init__(self, handler, poll_interval) + + def server_bind(self): + super(TestTCPServer, self).server_bind() + self.port = self.socket.getsockname()[1] + + class TestUDPServer(ControlMixin, ThreadingUDPServer): + """ + A UDP server which is controllable using :class:`ControlMixin`. + + :param addr: A tuple with the IP address and port to listen on. + :param handler: A handler callable which will be called with a + single parameter - the request - in order to + process the request. + :param poll_interval: The polling interval for shutdown requests, + in seconds. + :bind_and_activate: If True (the default), binds the server and + starts it listening. If False, you need to + call :meth:`server_bind` and + :meth:`server_activate` at some later time + before calling :meth:`start`, so that the server will + set up the socket and listen on it. + """ + def __init__(self, addr, handler, poll_interval=0.5, + bind_and_activate=True): + class DelegatingUDPRequestHandler(DatagramRequestHandler): + + def handle(self): + self.server._handler(self) + + def finish(self): + data = self.wfile.getvalue() + if data: + try: + super(DelegatingUDPRequestHandler, self).finish() + except socket.error: + if not self.server._closed: + raise + + ThreadingUDPServer.__init__(self, addr, + DelegatingUDPRequestHandler, + bind_and_activate) + ControlMixin.__init__(self, handler, poll_interval) + self._closed = False + + def server_bind(self): + super(TestUDPServer, self).server_bind() + self.port = self.socket.getsockname()[1] + + def server_close(self): + super(TestUDPServer, self).server_close() + self._closed = True + +# - end of server_helper section + +@unittest.skipUnless(threading, 'Threading required for this test.') +class SMTPHandlerTest(BaseTest): + def test_basic(self): + sockmap = {} + server = TestSMTPServer(('localhost', 0), self.process_message, 0.001, + sockmap) + server.start() + addr = ('localhost', server.port) + h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', timeout=5.0) + self.assertEqual(h.toaddrs, ['you']) + self.messages = [] + r = logging.makeLogRecord({'msg': 'Hello'}) + self.handled = threading.Event() + h.handle(r) + self.handled.wait(5.0) # 14314: don't wait forever + server.stop() + self.assertTrue(self.handled.is_set()) + self.assertEqual(len(self.messages), 1) + peer, mailfrom, rcpttos, data = self.messages[0] + self.assertEqual(mailfrom, 'me') + self.assertEqual(rcpttos, ['you']) + self.assertIn('\nSubject: Log\n', data) + self.assertTrue(data.endswith('\n\nHello')) + h.close() + + def process_message(self, *args): + self.messages.append(args) + self.handled.set() + class MemoryHandlerTest(BaseTest): """Tests for the MemoryHandler.""" @@ -525,7 +1018,7 @@ class MemoryHandlerTest(BaseTest): self.mem_logger.info(self.next_message()) self.assert_log_lines([]) # This will flush because the level is >= logging.WARNING - self.mem_logger.warn(self.next_message()) + self.mem_logger.warning(self.next_message()) lines = [ ('DEBUG', '1'), ('INFO', '2'), @@ -870,116 +1363,280 @@ class ConfigFileTest(BaseTest): # Original logger output is empty. self.assert_log_lines([]) -class LogRecordStreamHandler(StreamRequestHandler): - """Handler for a streaming logging request. It saves the log message in the - TCP server's 'log_output' attribute.""" +@unittest.skipUnless(threading, 'Threading required for this test.') +class SocketHandlerTest(BaseTest): + + """Test for SocketHandler objects.""" + + def setUp(self): + """Set up a TCP server to receive log messages, and a SocketHandler + pointing to that server's address and port.""" + BaseTest.setUp(self) + addr = ('localhost', 0) + self.server = server = TestTCPServer(addr, self.handle_socket, + 0.01) + server.start() + server.ready.wait() + self.sock_hdlr = logging.handlers.SocketHandler('localhost', + server.port) + self.log_output = '' + self.root_logger.removeHandler(self.root_logger.handlers[0]) + self.root_logger.addHandler(self.sock_hdlr) + self.handled = threading.Semaphore(0) - TCP_LOG_END = "!!!END!!!" + def tearDown(self): + """Shutdown the TCP server.""" + try: + self.server.stop(2.0) + self.root_logger.removeHandler(self.sock_hdlr) + self.sock_hdlr.close() + finally: + BaseTest.tearDown(self) - def handle(self): - """Handle multiple requests - each expected to be of 4-byte length, - followed by the LogRecord in pickle format. Logs the record - according to whatever policy is configured locally.""" + def handle_socket(self, request): + conn = request.connection while True: - chunk = self.connection.recv(4) + chunk = conn.recv(4) if len(chunk) < 4: break slen = struct.unpack(">L", chunk)[0] - chunk = self.connection.recv(slen) + chunk = conn.recv(slen) while len(chunk) < slen: - chunk = chunk + self.connection.recv(slen - len(chunk)) - obj = self.unpickle(chunk) + chunk = chunk + conn.recv(slen - len(chunk)) + obj = pickle.loads(chunk) record = logging.makeLogRecord(obj) - self.handle_log_record(record) + self.log_output += record.msg + '\n' + self.handled.release() - def unpickle(self, data): - return pickle.loads(data) + def test_output(self): + # The log message sent to the SocketHandler is properly received. + logger = logging.getLogger("tcp") + logger.error("spam") + self.handled.acquire() + logger.debug("eggs") + self.handled.acquire() + self.assertEqual(self.log_output, "spam\neggs\n") - def handle_log_record(self, record): - # If the end-of-messages sentinel is seen, tell the server to - # terminate. - if self.TCP_LOG_END in record.msg: - self.server.abort = 1 - return - self.server.log_output += record.msg + "\n" + def test_noserver(self): + # Kill the server + self.server.stop(2.0) + #The logging call should try to connect, which should fail + try: + raise RuntimeError('Deliberate mistake') + except RuntimeError: + self.root_logger.exception('Never sent') + self.root_logger.error('Never sent, either') + now = time.time() + self.assertTrue(self.sock_hdlr.retryTime > now) + time.sleep(self.sock_hdlr.retryTime - now + 0.001) + self.root_logger.error('Nor this') -class LogRecordSocketReceiver(ThreadingTCPServer): +@unittest.skipUnless(threading, 'Threading required for this test.') +class DatagramHandlerTest(BaseTest): - """A simple-minded TCP socket-based logging receiver suitable for test - purposes.""" + """Test for DatagramHandler.""" - allow_reuse_address = 1 - log_output = "" + def setUp(self): + """Set up a UDP server to receive log messages, and a DatagramHandler + pointing to that server's address and port.""" + BaseTest.setUp(self) + addr = ('localhost', 0) + self.server = server = TestUDPServer(addr, self.handle_datagram, 0.01) + server.start() + server.ready.wait() + self.sock_hdlr = logging.handlers.DatagramHandler('localhost', + server.port) + self.log_output = '' + self.root_logger.removeHandler(self.root_logger.handlers[0]) + self.root_logger.addHandler(self.sock_hdlr) + self.handled = threading.Event() - def __init__(self, host='localhost', - port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, - handler=LogRecordStreamHandler): - ThreadingTCPServer.__init__(self, (host, port), handler) - self.abort = False - self.timeout = 0.1 - self.finished = threading.Event() + def tearDown(self): + """Shutdown the UDP server.""" + try: + self.server.stop(2.0) + self.root_logger.removeHandler(self.sock_hdlr) + self.sock_hdlr.close() + finally: + BaseTest.tearDown(self) - def serve_until_stopped(self): - while not self.abort: - rd, wr, ex = select.select([self.socket.fileno()], [], [], - self.timeout) - if rd: - self.handle_request() - # Notify the main thread that we're about to exit - self.finished.set() - # close the listen socket - self.server_close() + def handle_datagram(self, request): + slen = struct.pack('>L', 0) # length of prefix + packet = request.packet[len(slen):] + obj = pickle.loads(packet) + record = logging.makeLogRecord(obj) + self.log_output += record.msg + '\n' + self.handled.set() + + def test_output(self): + # The log message sent to the DatagramHandler is properly received. + logger = logging.getLogger("udp") + logger.error("spam") + self.handled.wait() + self.handled.clear() + logger.error("eggs") + self.handled.wait() + self.assertEqual(self.log_output, "spam\neggs\n") @unittest.skipUnless(threading, 'Threading required for this test.') -class SocketHandlerTest(BaseTest): +class SysLogHandlerTest(BaseTest): - """Test for SocketHandler objects.""" + """Test for SysLogHandler using UDP.""" def setUp(self): - """Set up a TCP server to receive log messages, and a SocketHandler + """Set up a UDP server to receive log messages, and a SysLogHandler pointing to that server's address and port.""" BaseTest.setUp(self) - self.tcpserver = LogRecordSocketReceiver(port=0) - self.port = self.tcpserver.socket.getsockname()[1] - self.threads = [ - threading.Thread(target=self.tcpserver.serve_until_stopped)] - for thread in self.threads: - thread.start() - - self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port) - self.sock_hdlr.setFormatter(self.root_formatter) + addr = ('localhost', 0) + self.server = server = TestUDPServer(addr, self.handle_datagram, + 0.01) + server.start() + server.ready.wait() + self.sl_hdlr = logging.handlers.SysLogHandler(('localhost', + server.port)) + self.log_output = '' self.root_logger.removeHandler(self.root_logger.handlers[0]) - self.root_logger.addHandler(self.sock_hdlr) + self.root_logger.addHandler(self.sl_hdlr) + self.handled = threading.Event() def tearDown(self): - """Shutdown the TCP server.""" + """Shutdown the UDP server.""" try: - self.tcpserver.abort = True - del self.tcpserver - self.root_logger.removeHandler(self.sock_hdlr) - self.sock_hdlr.close() - for thread in self.threads: - thread.join(2.0) + self.server.stop(2.0) + self.root_logger.removeHandler(self.sl_hdlr) + self.sl_hdlr.close() finally: BaseTest.tearDown(self) - def get_output(self): - """Get the log output as received by the TCP server.""" - # Signal the TCP receiver and wait for it to terminate. - self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END) - self.tcpserver.finished.wait(2.0) - return self.tcpserver.log_output + def handle_datagram(self, request): + self.log_output = request.packet + self.handled.set() def test_output(self): - # The log message sent to the SocketHandler is properly received. - logger = logging.getLogger("tcp") - logger.error("spam") - logger.debug("eggs") - self.assertEqual(self.get_output(), "spam\neggs\n") + # The log message sent to the SysLogHandler is properly received. + logger = logging.getLogger("slh") + logger.error("sp\xe4m") + self.handled.wait() + self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00') + self.handled.clear() + self.sl_hdlr.append_nul = False + logger.error("sp\xe4m") + self.handled.wait() + self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m') + self.handled.clear() + self.sl_hdlr.ident = "h\xe4m-" + logger.error("sp\xe4m") + self.handled.wait() + self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m') + + +@unittest.skipUnless(threading, 'Threading required for this test.') +class HTTPHandlerTest(BaseTest): + """Test for HTTPHandler.""" + + PEMFILE = """-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQDGT4xS5r91rbLJQK2nUDenBhBG6qFk+bVOjuAGC/LSHlAoBnvG +zQG3agOG+e7c5z2XT8m2ktORLqG3E4mYmbxgyhDrzP6ei2Anc+pszmnxPoK3Puh5 +aXV+XKt0bU0C1m2+ACmGGJ0t3P408art82nOxBw8ZHgIg9Dtp6xIUCyOqwIDAQAB +AoGBAJFTnFboaKh5eUrIzjmNrKsG44jEyy+vWvHN/FgSC4l103HxhmWiuL5Lv3f7 +0tMp1tX7D6xvHwIG9VWvyKb/Cq9rJsDibmDVIOslnOWeQhG+XwJyitR0pq/KlJIB +5LjORcBw795oKWOAi6RcOb1ON59tysEFYhAGQO9k6VL621gRAkEA/Gb+YXULLpbs +piXN3q4zcHzeaVANo69tUZ6TjaQqMeTxE4tOYM0G0ZoSeHEdaP59AOZGKXXNGSQy +2z/MddcYGQJBAMkjLSYIpOLJY11ja8OwwswFG2hEzHe0cS9bzo++R/jc1bHA5R0Y +i6vA5iPi+wopPFvpytdBol7UuEBe5xZrxWMCQQCWxELRHiP2yWpEeLJ3gGDzoXMN +PydWjhRju7Bx3AzkTtf+D6lawz1+eGTuEss5i0JKBkMEwvwnN2s1ce+EuF4JAkBb +E96h1lAzkVW5OAfYOPY8RCPA90ZO/hoyg7PpSxR0ECuDrgERR8gXIeYUYfejBkEa +rab4CfRoVJKKM28Yq/xZAkBvuq670JRCwOgfUTdww7WpdOQBYPkzQccsKNCslQW8 +/DyW6y06oQusSENUvynT6dr3LJxt/NgZPhZX2+k1eYDV +-----END RSA PRIVATE KEY----- +-----BEGIN CERTIFICATE----- +MIICGzCCAYSgAwIBAgIJAIq84a2Q/OvlMA0GCSqGSIb3DQEBBQUAMBQxEjAQBgNV +BAMTCWxvY2FsaG9zdDAeFw0xMTA1MjExMDIzMzNaFw03NTAzMjEwMzU1MTdaMBQx +EjAQBgNVBAMTCWxvY2FsaG9zdDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA +xk+MUua/da2yyUCtp1A3pwYQRuqhZPm1To7gBgvy0h5QKAZ7xs0Bt2oDhvnu3Oc9 +l0/JtpLTkS6htxOJmJm8YMoQ68z+notgJ3PqbM5p8T6Ctz7oeWl1flyrdG1NAtZt +vgAphhidLdz+NPGq7fNpzsQcPGR4CIPQ7aesSFAsjqsCAwEAAaN1MHMwHQYDVR0O +BBYEFLWaUPO6N7efGiuoS9i3DVYcUwn0MEQGA1UdIwQ9MDuAFLWaUPO6N7efGiuo +S9i3DVYcUwn0oRikFjAUMRIwEAYDVQQDEwlsb2NhbGhvc3SCCQCKvOGtkPzr5TAM +BgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4GBAMK5whPjLNQK1Ivvk88oqJqq +4f889OwikGP0eUhOBhbFlsZs+jq5YZC2UzHz+evzKBlgAP1u4lP/cB85CnjvWqM+ +1c/lywFHQ6HOdDeQ1L72tSYMrNOG4XNmLn0h7rx6GoTU7dcFRfseahBCq8mv0IDt +IRbTpvlHWPjsSvHz0ZOH +-----END CERTIFICATE-----""" + + def setUp(self): + """Set up an HTTP server to receive log messages, and a HTTPHandler + pointing to that server's address and port.""" + BaseTest.setUp(self) + self.handled = threading.Event() + def handle_request(self, request): + self.command = request.command + self.log_data = urlparse(request.path) + if self.command == 'POST': + try: + rlen = int(request.headers['Content-Length']) + self.post_data = request.rfile.read(rlen) + except: + self.post_data = None + request.send_response(200) + request.end_headers() + self.handled.set() + + def test_output(self): + # The log message sent to the HTTPHandler is properly received. + logger = logging.getLogger("http") + root_logger = self.root_logger + root_logger.removeHandler(self.root_logger.handlers[0]) + for secure in (False, True): + addr = ('localhost', 0) + if secure: + try: + import ssl + fd, fn = tempfile.mkstemp() + os.close(fd) + with open(fn, 'w') as f: + f.write(self.PEMFILE) + sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + sslctx.load_cert_chain(fn) + os.unlink(fn) + except ImportError: + sslctx = None + else: + sslctx = None + self.server = server = TestHTTPServer(addr, self.handle_request, + 0.01, sslctx=sslctx) + server.start() + server.ready.wait() + host = 'localhost:%d' % server.server_port + secure_client = secure and sslctx + self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob', + secure=secure_client) + self.log_data = None + root_logger.addHandler(self.h_hdlr) + + for method in ('GET', 'POST'): + self.h_hdlr.method = method + self.handled.clear() + msg = "sp\xe4m" + logger.error(msg) + self.handled.wait() + self.assertEqual(self.log_data.path, '/frob') + self.assertEqual(self.command, method) + if method == 'GET': + d = parse_qs(self.log_data.query) + else: + d = parse_qs(self.post_data.decode('utf-8')) + self.assertEqual(d['name'], ['http']) + self.assertEqual(d['funcName'], ['test_output']) + self.assertEqual(d['msg'], [msg]) + + self.server.stop(2.0) + self.root_logger.removeHandler(self.h_hdlr) + self.h_hdlr.close() class MemoryTest(BaseTest): @@ -1087,28 +1744,39 @@ class WarningsTest(BaseTest): def test_warnings(self): with warnings.catch_warnings(): logging.captureWarnings(True) - try: - warnings.filterwarnings("always", category=UserWarning) - file = io.StringIO() - h = logging.StreamHandler(file) - logger = logging.getLogger("py.warnings") - logger.addHandler(h) - warnings.warn("I'm warning you...") - logger.removeHandler(h) - s = file.getvalue() - h.close() - self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0) - - #See if an explicit file uses the original implementation - file = io.StringIO() - warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, - file, "Dummy line") - s = file.getvalue() - file.close() - self.assertEqual(s, - "dummy.py:42: UserWarning: Explicit\n Dummy line\n") - finally: - logging.captureWarnings(False) + self.addCleanup(logging.captureWarnings, False) + warnings.filterwarnings("always", category=UserWarning) + stream = io.StringIO() + h = logging.StreamHandler(stream) + logger = logging.getLogger("py.warnings") + logger.addHandler(h) + warnings.warn("I'm warning you...") + logger.removeHandler(h) + s = stream.getvalue() + h.close() + self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0) + + #See if an explicit file uses the original implementation + a_file = io.StringIO() + warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, + a_file, "Dummy line") + s = a_file.getvalue() + a_file.close() + self.assertEqual(s, + "dummy.py:42: UserWarning: Explicit\n Dummy line\n") + + def test_warnings_no_handlers(self): + with warnings.catch_warnings(): + logging.captureWarnings(True) + self.addCleanup(logging.captureWarnings, False) + + # confirm our assumption: no loggers are set + logger = logging.getLogger("py.warnings") + self.assertEqual(logger.handlers, []) + + warnings.showwarning("Explicit", UserWarning, "dummy.py", 42) + self.assertEqual(len(logger.handlers), 1) + self.assertIsInstance(logger.handlers[0], logging.NullHandler) def formatFunc(format, datefmt=None): @@ -1961,6 +2629,7 @@ class ConfigDictTest(BaseTest): logging.config.stopListening() t.join(2.0) + @unittest.skipUnless(threading, 'Threading required for this test.') def test_listen_config_10_ok(self): with captured_stdout() as output: self.setup_via_listener(json.dumps(self.config10)) @@ -1980,6 +2649,7 @@ class ConfigDictTest(BaseTest): ('ERROR', '4'), ], stream=output) + @unittest.skipUnless(threading, 'Threading required for this test.') def test_listen_config_1_ok(self): with captured_stdout() as output: self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) @@ -1994,6 +2664,27 @@ class ConfigDictTest(BaseTest): # Original logger output is empty. self.assert_log_lines([]) + def test_baseconfig(self): + d = { + 'atuple': (1, 2, 3), + 'alist': ['a', 'b', 'c'], + 'adict': {'d': 'e', 'f': 3 }, + 'nest1': ('g', ('h', 'i'), 'j'), + 'nest2': ['k', ['l', 'm'], 'n'], + 'nest3': ['o', 'cfg://alist', 'p'], + } + bc = logging.config.BaseConfigurator(d) + self.assertEqual(bc.convert('cfg://atuple[1]'), 2) + self.assertEqual(bc.convert('cfg://alist[1]'), 'b') + self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h') + self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm') + self.assertEqual(bc.convert('cfg://adict.d'), 'e') + self.assertEqual(bc.convert('cfg://adict[f]'), 3) + v = bc.convert('cfg://nest3') + self.assertEqual(v.pop(1), ['a', 'b', 'c']) + self.assertRaises(KeyError, bc.convert, 'cfg://nosuch') + self.assertRaises(ValueError, bc.convert, 'cfg://!') + self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]') class ManagerTest(BaseTest): def test_manager_loggerclass(self): @@ -2012,6 +2703,11 @@ class ManagerTest(BaseTest): self.assertEqual(logged, ['should appear in logged']) + def test_set_log_record_factory(self): + man = logging.Manager(None) + expected = object() + man.setLogRecordFactory(expected) + self.assertEqual(man.logRecordFactory, expected) class ChildLoggerTest(BaseTest): def test_child_loggers(self): @@ -2113,6 +2809,18 @@ class QueueHandlerTest(BaseTest): self.assertTrue(handler.matches(levelno=logging.ERROR, message='2')) self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3')) +ZERO = datetime.timedelta(0) + +class UTC(datetime.tzinfo): + def utcoffset(self, dt): + return ZERO + + dst = utcoffset + + def tzname(self, dt): + return 'UTC' + +utc = UTC() class FormatterTest(unittest.TestCase): def setUp(self): @@ -2186,6 +2894,71 @@ class FormatterTest(unittest.TestCase): f = logging.Formatter('asctime', style='$') self.assertFalse(f.usesTime()) + def test_invalid_style(self): + self.assertRaises(ValueError, logging.Formatter, None, None, 'x') + + def test_time(self): + r = self.get_record() + dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc) + # We use None to indicate we want the local timezone + # We're essentially converting a UTC time to local time + r.created = time.mktime(dt.astimezone(None).timetuple()) + r.msecs = 123 + f = logging.Formatter('%(asctime)s %(message)s') + f.converter = time.gmtime + self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123') + self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21') + f.format(r) + self.assertEqual(r.asctime, '1993-04-21 08:03:00,123') + +class TestBufferingFormatter(logging.BufferingFormatter): + def formatHeader(self, records): + return '[(%d)' % len(records) + + def formatFooter(self, records): + return '(%d)]' % len(records) + +class BufferingFormatterTest(unittest.TestCase): + def setUp(self): + self.records = [ + logging.makeLogRecord({'msg': 'one'}), + logging.makeLogRecord({'msg': 'two'}), + ] + + def test_default(self): + f = logging.BufferingFormatter() + self.assertEqual('', f.format([])) + self.assertEqual('onetwo', f.format(self.records)) + + def test_custom(self): + f = TestBufferingFormatter() + self.assertEqual('[(2)onetwo(2)]', f.format(self.records)) + lf = logging.Formatter('<%(message)s>') + f = TestBufferingFormatter(lf) + self.assertEqual('[(2)<one><two>(2)]', f.format(self.records)) + +class ExceptionTest(BaseTest): + def test_formatting(self): + r = self.root_logger + h = RecordingHandler() + r.addHandler(h) + try: + raise RuntimeError('deliberate mistake') + except: + logging.exception('failed', stack_info=True) + r.removeHandler(h) + h.close() + r = h.records[0] + self.assertTrue(r.exc_text.startswith('Traceback (most recent ' + 'call last):\n')) + self.assertTrue(r.exc_text.endswith('\nRuntimeError: ' + 'deliberate mistake')) + self.assertTrue(r.stack_info.startswith('Stack (most recent ' + 'call last):\n')) + self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', ' + 'stack_info=True)')) + + class LastResortTest(BaseTest): def test_last_resort(self): # Test the last resort handler @@ -2196,6 +2969,8 @@ class LastResortTest(BaseTest): old_raise_exceptions = logging.raiseExceptions try: sys.stderr = sio = io.StringIO() + root.debug('This should not appear') + self.assertEqual(sio.getvalue(), '') root.warning('This is your final chance!') self.assertEqual(sio.getvalue(), 'This is your final chance!\n') #No handlers and no last resort, so 'No handlers' message @@ -2220,6 +2995,586 @@ class LastResortTest(BaseTest): logging.raiseExceptions = old_raise_exceptions +class FakeHandler: + + def __init__(self, identifier, called): + for method in ('acquire', 'flush', 'close', 'release'): + setattr(self, method, self.record_call(identifier, method, called)) + + def record_call(self, identifier, method_name, called): + def inner(): + called.append('{} - {}'.format(identifier, method_name)) + return inner + + +class RecordingHandler(logging.NullHandler): + + def __init__(self, *args, **kwargs): + super(RecordingHandler, self).__init__(*args, **kwargs) + self.records = [] + + def handle(self, record): + """Keep track of all the emitted records.""" + self.records.append(record) + + +class ShutdownTest(BaseTest): + + """Test suite for the shutdown method.""" + + def setUp(self): + super(ShutdownTest, self).setUp() + self.called = [] + + raise_exceptions = logging.raiseExceptions + self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions) + + def raise_error(self, error): + def inner(): + raise error() + return inner + + def test_no_failure(self): + # create some fake handlers + handler0 = FakeHandler(0, self.called) + handler1 = FakeHandler(1, self.called) + handler2 = FakeHandler(2, self.called) + + # create live weakref to those handlers + handlers = map(logging.weakref.ref, [handler0, handler1, handler2]) + + logging.shutdown(handlerList=list(handlers)) + + expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release', + '1 - acquire', '1 - flush', '1 - close', '1 - release', + '0 - acquire', '0 - flush', '0 - close', '0 - release'] + self.assertEqual(expected, self.called) + + def _test_with_failure_in_method(self, method, error): + handler = FakeHandler(0, self.called) + setattr(handler, method, self.raise_error(error)) + handlers = [logging.weakref.ref(handler)] + + logging.shutdown(handlerList=list(handlers)) + + self.assertEqual('0 - release', self.called[-1]) + + def test_with_ioerror_in_acquire(self): + self._test_with_failure_in_method('acquire', IOError) + + def test_with_ioerror_in_flush(self): + self._test_with_failure_in_method('flush', IOError) + + def test_with_ioerror_in_close(self): + self._test_with_failure_in_method('close', IOError) + + def test_with_valueerror_in_acquire(self): + self._test_with_failure_in_method('acquire', ValueError) + + def test_with_valueerror_in_flush(self): + self._test_with_failure_in_method('flush', ValueError) + + def test_with_valueerror_in_close(self): + self._test_with_failure_in_method('close', ValueError) + + def test_with_other_error_in_acquire_without_raise(self): + logging.raiseExceptions = False + self._test_with_failure_in_method('acquire', IndexError) + + def test_with_other_error_in_flush_without_raise(self): + logging.raiseExceptions = False + self._test_with_failure_in_method('flush', IndexError) + + def test_with_other_error_in_close_without_raise(self): + logging.raiseExceptions = False + self._test_with_failure_in_method('close', IndexError) + + def test_with_other_error_in_acquire_with_raise(self): + logging.raiseExceptions = True + self.assertRaises(IndexError, self._test_with_failure_in_method, + 'acquire', IndexError) + + def test_with_other_error_in_flush_with_raise(self): + logging.raiseExceptions = True + self.assertRaises(IndexError, self._test_with_failure_in_method, + 'flush', IndexError) + + def test_with_other_error_in_close_with_raise(self): + logging.raiseExceptions = True + self.assertRaises(IndexError, self._test_with_failure_in_method, + 'close', IndexError) + + +class ModuleLevelMiscTest(BaseTest): + + """Test suite for some module level methods.""" + + def test_disable(self): + old_disable = logging.root.manager.disable + # confirm our assumptions are correct + self.assertEqual(old_disable, 0) + self.addCleanup(logging.disable, old_disable) + + logging.disable(83) + self.assertEqual(logging.root.manager.disable, 83) + + def _test_log(self, method, level=None): + called = [] + patch(self, logging, 'basicConfig', + lambda *a, **kw: called.append((a, kw))) + + recording = RecordingHandler() + logging.root.addHandler(recording) + + log_method = getattr(logging, method) + if level is not None: + log_method(level, "test me: %r", recording) + else: + log_method("test me: %r", recording) + + self.assertEqual(len(recording.records), 1) + record = recording.records[0] + self.assertEqual(record.getMessage(), "test me: %r" % recording) + + expected_level = level if level is not None else getattr(logging, method.upper()) + self.assertEqual(record.levelno, expected_level) + + # basicConfig was not called! + self.assertEqual(called, []) + + def test_log(self): + self._test_log('log', logging.ERROR) + + def test_debug(self): + self._test_log('debug') + + def test_info(self): + self._test_log('info') + + def test_warning(self): + self._test_log('warning') + + def test_error(self): + self._test_log('error') + + def test_critical(self): + self._test_log('critical') + + def test_set_logger_class(self): + self.assertRaises(TypeError, logging.setLoggerClass, object) + + class MyLogger(logging.Logger): + pass + + logging.setLoggerClass(MyLogger) + self.assertEqual(logging.getLoggerClass(), MyLogger) + + logging.setLoggerClass(logging.Logger) + self.assertEqual(logging.getLoggerClass(), logging.Logger) + +class LogRecordTest(BaseTest): + def test_str_rep(self): + r = logging.makeLogRecord({}) + s = str(r) + self.assertTrue(s.startswith('<LogRecord: ')) + self.assertTrue(s.endswith('>')) + + def test_dict_arg(self): + h = RecordingHandler() + r = logging.getLogger() + r.addHandler(h) + d = {'less' : 'more' } + logging.warning('less is %(less)s', d) + self.assertIs(h.records[0].args, d) + self.assertEqual(h.records[0].message, 'less is more') + r.removeHandler(h) + h.close() + + def test_multiprocessing(self): + r = logging.makeLogRecord({}) + self.assertEqual(r.processName, 'MainProcess') + try: + import multiprocessing as mp + r = logging.makeLogRecord({}) + self.assertEqual(r.processName, mp.current_process().name) + except ImportError: + pass + + def test_optional(self): + r = logging.makeLogRecord({}) + NOT_NONE = self.assertIsNotNone + if threading: + NOT_NONE(r.thread) + NOT_NONE(r.threadName) + NOT_NONE(r.process) + NOT_NONE(r.processName) + log_threads = logging.logThreads + log_processes = logging.logProcesses + log_multiprocessing = logging.logMultiprocessing + try: + logging.logThreads = False + logging.logProcesses = False + logging.logMultiprocessing = False + r = logging.makeLogRecord({}) + NONE = self.assertIsNone + NONE(r.thread) + NONE(r.threadName) + NONE(r.process) + NONE(r.processName) + finally: + logging.logThreads = log_threads + logging.logProcesses = log_processes + logging.logMultiprocessing = log_multiprocessing + +class BasicConfigTest(unittest.TestCase): + + """Test suite for logging.basicConfig.""" + + def setUp(self): + super(BasicConfigTest, self).setUp() + self.handlers = logging.root.handlers + self.saved_handlers = logging._handlers.copy() + self.saved_handler_list = logging._handlerList[:] + self.original_logging_level = logging.root.level + self.addCleanup(self.cleanup) + logging.root.handlers = [] + + def tearDown(self): + for h in logging.root.handlers[:]: + logging.root.removeHandler(h) + h.close() + super(BasicConfigTest, self).tearDown() + + def cleanup(self): + setattr(logging.root, 'handlers', self.handlers) + logging._handlers.clear() + logging._handlers.update(self.saved_handlers) + logging._handlerList[:] = self.saved_handler_list + logging.root.level = self.original_logging_level + + def test_no_kwargs(self): + logging.basicConfig() + + # handler defaults to a StreamHandler to sys.stderr + self.assertEqual(len(logging.root.handlers), 1) + handler = logging.root.handlers[0] + self.assertIsInstance(handler, logging.StreamHandler) + self.assertEqual(handler.stream, sys.stderr) + + formatter = handler.formatter + # format defaults to logging.BASIC_FORMAT + self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT) + # datefmt defaults to None + self.assertIsNone(formatter.datefmt) + # style defaults to % + self.assertIsInstance(formatter._style, logging.PercentStyle) + + # level is not explicitly set + self.assertEqual(logging.root.level, self.original_logging_level) + + def test_filename(self): + logging.basicConfig(filename='test.log') + + self.assertEqual(len(logging.root.handlers), 1) + handler = logging.root.handlers[0] + self.assertIsInstance(handler, logging.FileHandler) + + expected = logging.FileHandler('test.log', 'a') + self.addCleanup(expected.close) + self.assertEqual(handler.stream.mode, expected.stream.mode) + self.assertEqual(handler.stream.name, expected.stream.name) + + def test_filemode(self): + logging.basicConfig(filename='test.log', filemode='wb') + + handler = logging.root.handlers[0] + expected = logging.FileHandler('test.log', 'wb') + self.addCleanup(expected.close) + self.assertEqual(handler.stream.mode, expected.stream.mode) + + def test_stream(self): + stream = io.StringIO() + self.addCleanup(stream.close) + logging.basicConfig(stream=stream) + + self.assertEqual(len(logging.root.handlers), 1) + handler = logging.root.handlers[0] + self.assertIsInstance(handler, logging.StreamHandler) + self.assertEqual(handler.stream, stream) + + def test_format(self): + logging.basicConfig(format='foo') + + formatter = logging.root.handlers[0].formatter + self.assertEqual(formatter._style._fmt, 'foo') + + def test_datefmt(self): + logging.basicConfig(datefmt='bar') + + formatter = logging.root.handlers[0].formatter + self.assertEqual(formatter.datefmt, 'bar') + + def test_style(self): + logging.basicConfig(style='$') + + formatter = logging.root.handlers[0].formatter + self.assertIsInstance(formatter._style, logging.StringTemplateStyle) + + def test_level(self): + old_level = logging.root.level + self.addCleanup(logging.root.setLevel, old_level) + + logging.basicConfig(level=57) + self.assertEqual(logging.root.level, 57) + # Test that second call has no effect + logging.basicConfig(level=58) + self.assertEqual(logging.root.level, 57) + + def test_incompatible(self): + assertRaises = self.assertRaises + handlers = [logging.StreamHandler()] + stream = sys.stderr + assertRaises(ValueError, logging.basicConfig, filename='test.log', + stream=stream) + assertRaises(ValueError, logging.basicConfig, filename='test.log', + handlers=handlers) + assertRaises(ValueError, logging.basicConfig, stream=stream, + handlers=handlers) + + def test_handlers(self): + handlers = [ + logging.StreamHandler(), + logging.StreamHandler(sys.stdout), + logging.StreamHandler(), + ] + f = logging.Formatter() + handlers[2].setFormatter(f) + logging.basicConfig(handlers=handlers) + self.assertIs(handlers[0], logging.root.handlers[0]) + self.assertIs(handlers[1], logging.root.handlers[1]) + self.assertIs(handlers[2], logging.root.handlers[2]) + self.assertIsNotNone(handlers[0].formatter) + self.assertIsNotNone(handlers[1].formatter) + self.assertIs(handlers[2].formatter, f) + self.assertIs(handlers[0].formatter, handlers[1].formatter) + + def _test_log(self, method, level=None): + # logging.root has no handlers so basicConfig should be called + called = [] + + old_basic_config = logging.basicConfig + def my_basic_config(*a, **kw): + old_basic_config() + old_level = logging.root.level + logging.root.setLevel(100) # avoid having messages in stderr + self.addCleanup(logging.root.setLevel, old_level) + called.append((a, kw)) + + patch(self, logging, 'basicConfig', my_basic_config) + + log_method = getattr(logging, method) + if level is not None: + log_method(level, "test me") + else: + log_method("test me") + + # basicConfig was called with no arguments + self.assertEqual(called, [((), {})]) + + def test_log(self): + self._test_log('log', logging.WARNING) + + def test_debug(self): + self._test_log('debug') + + def test_info(self): + self._test_log('info') + + def test_warning(self): + self._test_log('warning') + + def test_error(self): + self._test_log('error') + + def test_critical(self): + self._test_log('critical') + + +class LoggerAdapterTest(unittest.TestCase): + + def setUp(self): + super(LoggerAdapterTest, self).setUp() + old_handler_list = logging._handlerList[:] + + self.recording = RecordingHandler() + self.logger = logging.root + self.logger.addHandler(self.recording) + self.addCleanup(self.logger.removeHandler, self.recording) + self.addCleanup(self.recording.close) + + def cleanup(): + logging._handlerList[:] = old_handler_list + + self.addCleanup(cleanup) + self.addCleanup(logging.shutdown) + self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None) + + def test_exception(self): + msg = 'testing exception: %r' + exc = None + try: + 1 / 0 + except ZeroDivisionError as e: + exc = e + self.adapter.exception(msg, self.recording) + + self.assertEqual(len(self.recording.records), 1) + record = self.recording.records[0] + self.assertEqual(record.levelno, logging.ERROR) + self.assertEqual(record.msg, msg) + self.assertEqual(record.args, (self.recording,)) + self.assertEqual(record.exc_info, + (exc.__class__, exc, exc.__traceback__)) + + def test_critical(self): + msg = 'critical test! %r' + self.adapter.critical(msg, self.recording) + + self.assertEqual(len(self.recording.records), 1) + record = self.recording.records[0] + self.assertEqual(record.levelno, logging.CRITICAL) + self.assertEqual(record.msg, msg) + self.assertEqual(record.args, (self.recording,)) + + def test_is_enabled_for(self): + old_disable = self.adapter.logger.manager.disable + self.adapter.logger.manager.disable = 33 + self.addCleanup(setattr, self.adapter.logger.manager, 'disable', + old_disable) + self.assertFalse(self.adapter.isEnabledFor(32)) + + def test_has_handlers(self): + self.assertTrue(self.adapter.hasHandlers()) + + for handler in self.logger.handlers: + self.logger.removeHandler(handler) + + self.assertFalse(self.logger.hasHandlers()) + self.assertFalse(self.adapter.hasHandlers()) + + +class LoggerTest(BaseTest): + + def setUp(self): + super(LoggerTest, self).setUp() + self.recording = RecordingHandler() + self.logger = logging.Logger(name='blah') + self.logger.addHandler(self.recording) + self.addCleanup(self.logger.removeHandler, self.recording) + self.addCleanup(self.recording.close) + self.addCleanup(logging.shutdown) + + def test_set_invalid_level(self): + self.assertRaises(TypeError, self.logger.setLevel, object()) + + def test_exception(self): + msg = 'testing exception: %r' + exc = None + try: + 1 / 0 + except ZeroDivisionError as e: + exc = e + self.logger.exception(msg, self.recording) + + self.assertEqual(len(self.recording.records), 1) + record = self.recording.records[0] + self.assertEqual(record.levelno, logging.ERROR) + self.assertEqual(record.msg, msg) + self.assertEqual(record.args, (self.recording,)) + self.assertEqual(record.exc_info, + (exc.__class__, exc, exc.__traceback__)) + + def test_log_invalid_level_with_raise(self): + old_raise = logging.raiseExceptions + self.addCleanup(setattr, logging, 'raiseExecptions', old_raise) + + logging.raiseExceptions = True + self.assertRaises(TypeError, self.logger.log, '10', 'test message') + + def test_log_invalid_level_no_raise(self): + old_raise = logging.raiseExceptions + self.addCleanup(setattr, logging, 'raiseExecptions', old_raise) + + logging.raiseExceptions = False + self.logger.log('10', 'test message') # no exception happens + + def test_find_caller_with_stack_info(self): + called = [] + patch(self, logging.traceback, 'print_stack', + lambda f, file: called.append(file.getvalue())) + + self.logger.findCaller(stack_info=True) + + self.assertEqual(len(called), 1) + self.assertEqual('Stack (most recent call last):\n', called[0]) + + def test_make_record_with_extra_overwrite(self): + name = 'my record' + level = 13 + fn = lno = msg = args = exc_info = func = sinfo = None + rv = logging._logRecordFactory(name, level, fn, lno, msg, args, + exc_info, func, sinfo) + + for key in ('message', 'asctime') + tuple(rv.__dict__.keys()): + extra = {key: 'some value'} + self.assertRaises(KeyError, self.logger.makeRecord, name, level, + fn, lno, msg, args, exc_info, + extra=extra, sinfo=sinfo) + + def test_make_record_with_extra_no_overwrite(self): + name = 'my record' + level = 13 + fn = lno = msg = args = exc_info = func = sinfo = None + extra = {'valid_key': 'some value'} + result = self.logger.makeRecord(name, level, fn, lno, msg, args, + exc_info, extra=extra, sinfo=sinfo) + self.assertIn('valid_key', result.__dict__) + + def test_has_handlers(self): + self.assertTrue(self.logger.hasHandlers()) + + for handler in self.logger.handlers: + self.logger.removeHandler(handler) + self.assertFalse(self.logger.hasHandlers()) + + def test_has_handlers_no_propagate(self): + child_logger = logging.getLogger('blah.child') + child_logger.propagate = False + self.assertFalse(child_logger.hasHandlers()) + + def test_is_enabled_for(self): + old_disable = self.logger.manager.disable + self.logger.manager.disable = 23 + self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) + self.assertFalse(self.logger.isEnabledFor(22)) + + def test_root_logger_aliases(self): + root = logging.getLogger() + self.assertIs(root, logging.root) + self.assertIs(root, logging.getLogger(None)) + self.assertIs(root, logging.getLogger('')) + self.assertIs(root, logging.getLogger('foo').root) + self.assertIs(root, logging.getLogger('foo.bar').root) + self.assertIs(root, logging.getLogger('foo').parent) + + self.assertIsNot(root, logging.getLogger('\0')) + self.assertIsNot(root, logging.getLogger('foo.bar').parent) + + def test_invalid_names(self): + self.assertRaises(TypeError, logging.getLogger, any) + self.assertRaises(TypeError, logging.getLogger, b'foo') + + class BaseFileTest(BaseTest): "Base class for handler tests that write log files" @@ -2239,10 +3594,21 @@ class BaseFileTest(BaseTest): def assertLogFile(self, filename): "Assert a log file is there and register it for deletion" self.assertTrue(os.path.exists(filename), - msg="Log file %r does not exist") + msg="Log file %r does not exist" % filename) self.rmfiles.append(filename) +class FileHandlerTest(BaseFileTest): + def test_delay(self): + os.unlink(self.fn) + fh = logging.FileHandler(self.fn, delay=True) + self.assertIsNone(fh.stream) + self.assertFalse(os.path.exists(self.fn)) + fh.handle(logging.makeLogRecord({})) + self.assertIsNotNone(fh.stream) + self.assertTrue(os.path.exists(self.fn)) + fh.close() + class RotatingFileHandlerTest(BaseFileTest): def next_rec(self): return logging.LogRecord('n', logging.DEBUG, 'p', 1, @@ -2268,20 +3634,117 @@ class RotatingFileHandlerTest(BaseFileTest): rh.close() def test_rollover_filenames(self): + def namer(name): + return name + ".test" rh = logging.handlers.RotatingFileHandler( self.fn, backupCount=2, maxBytes=1) + rh.namer = namer rh.emit(self.next_rec()) self.assertLogFile(self.fn) rh.emit(self.next_rec()) - self.assertLogFile(self.fn + ".1") + self.assertLogFile(namer(self.fn + ".1")) rh.emit(self.next_rec()) - self.assertLogFile(self.fn + ".2") - self.assertFalse(os.path.exists(self.fn + ".3")) + self.assertLogFile(namer(self.fn + ".2")) + self.assertFalse(os.path.exists(namer(self.fn + ".3"))) + rh.close() + + @requires_zlib + def test_rotator(self): + def namer(name): + return name + ".gz" + + def rotator(source, dest): + with open(source, "rb") as sf: + data = sf.read() + compressed = zlib.compress(data, 9) + with open(dest, "wb") as df: + df.write(compressed) + os.remove(source) + + rh = logging.handlers.RotatingFileHandler( + self.fn, backupCount=2, maxBytes=1) + rh.rotator = rotator + rh.namer = namer + m1 = self.next_rec() + rh.emit(m1) + self.assertLogFile(self.fn) + m2 = self.next_rec() + rh.emit(m2) + fn = namer(self.fn + ".1") + self.assertLogFile(fn) + newline = os.linesep + with open(fn, "rb") as f: + compressed = f.read() + data = zlib.decompress(compressed) + self.assertEqual(data.decode("ascii"), m1.msg + newline) + rh.emit(self.next_rec()) + fn = namer(self.fn + ".2") + self.assertLogFile(fn) + with open(fn, "rb") as f: + compressed = f.read() + data = zlib.decompress(compressed) + self.assertEqual(data.decode("ascii"), m1.msg + newline) + rh.emit(self.next_rec()) + fn = namer(self.fn + ".2") + with open(fn, "rb") as f: + compressed = f.read() + data = zlib.decompress(compressed) + self.assertEqual(data.decode("ascii"), m2.msg + newline) + self.assertFalse(os.path.exists(namer(self.fn + ".3"))) rh.close() class TimedRotatingFileHandlerTest(BaseFileTest): - # test methods added below - pass + # other test methods added below + def test_rollover(self): + fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S', + backupCount=1) + fmt = logging.Formatter('%(asctime)s %(message)s') + fh.setFormatter(fmt) + r1 = logging.makeLogRecord({'msg': 'testing - initial'}) + fh.emit(r1) + self.assertLogFile(self.fn) + time.sleep(1.1) # a little over a second ... + r2 = logging.makeLogRecord({'msg': 'testing - after delay'}) + fh.emit(r2) + fh.close() + # At this point, we should have a recent rotated file which we + # can test for the existence of. However, in practice, on some + # machines which run really slowly, we don't know how far back + # in time to go to look for the log file. So, we go back a fair + # bit, and stop as soon as we see a rotated file. In theory this + # could of course still fail, but the chances are lower. + found = False + now = datetime.datetime.now() + GO_BACK = 5 * 60 # seconds + for secs in range(GO_BACK): + prev = now - datetime.timedelta(seconds=secs) + fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S") + found = os.path.exists(fn) + if found: + self.rmfiles.append(fn) + break + msg = 'No rotated files found, went back %d seconds' % GO_BACK + if not found: + #print additional diagnostics + dn, fn = os.path.split(self.fn) + files = [f for f in os.listdir(dn) if f.startswith(fn)] + print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr) + print('The only matching files are: %s' % files, file=sys.stderr) + for f in files: + print('Contents of %s:' % f) + path = os.path.join(dn, f) + with open(path, 'r') as tf: + print(tf.read()) + self.assertTrue(found, msg=msg) + + def test_invalid(self): + assertRaises = self.assertRaises + assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, + self.fn, 'X', delay=True) + assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, + self.fn, 'W', delay=True) + assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, + self.fn, 'W7', delay=True) def secs(**kw): return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) @@ -2329,43 +3792,34 @@ for when, exp in (('S', 1), rh.close() setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) -class HandlerTest(BaseTest): - - @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.') - @unittest.skipUnless(threading, 'Threading required for this test.') - def test_race(self): - # Issue #14632 refers. - def remove_loop(fname, tries): - for _ in range(tries): - try: - os.unlink(fname) - except OSError: - pass - time.sleep(0.004 * random.randint(0, 4)) - - del_count = 500 - log_count = 500 - - for delay in (False, True): - fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') - os.close(fd) - remover = threading.Thread(target=remove_loop, args=(fn, del_count)) - remover.daemon = True - remover.start() - h = logging.handlers.WatchedFileHandler(fn, delay=delay) - f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') - h.setFormatter(f) - try: - for _ in range(log_count): - time.sleep(0.005) - r = logging.makeLogRecord({'msg': 'testing' }) - h.handle(r) - finally: - remover.join() - h.close() - if os.path.exists(fn): - os.unlink(fn) +@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil required for this test.') +class NTEventLogHandlerTest(BaseTest): + def test_basic(self): + logtype = 'Application' + elh = win32evtlog.OpenEventLog(None, logtype) + num_recs = win32evtlog.GetNumberOfEventLogRecords(elh) + h = logging.handlers.NTEventLogHandler('test_logging') + r = logging.makeLogRecord({'msg': 'Test Log Message'}) + h.handle(r) + h.close() + # Now see if the event is recorded + self.assertTrue(num_recs < win32evtlog.GetNumberOfEventLogRecords(elh)) + flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ + win32evtlog.EVENTLOG_SEQUENTIAL_READ + found = False + GO_BACK = 100 + events = win32evtlog.ReadEventLog(elh, flags, GO_BACK) + for e in events: + if e.SourceName != 'test_logging': + continue + msg = win32evtlogutil.SafeFormatMessage(e, logtype) + if msg != 'Test Log Message\r\n': + continue + found = True + break + msg = 'Record not found in event log, went back %d records' % GO_BACK + self.assertTrue(found, msg=msg) # Set the locale to the platform-dependent default. I have no idea # why the test does this, but in any case we save the current locale @@ -2373,14 +3827,17 @@ class HandlerTest(BaseTest): @run_with_locale('LC_ALL', '') def test_main(): run_unittest(BuiltinLevelsTest, BasicFilterTest, - CustomLevelsAndFiltersTest, MemoryHandlerTest, - ConfigFileTest, SocketHandlerTest, MemoryTest, - EncodingTest, WarningsTest, ConfigDictTest, ManagerTest, - FormatterTest, - LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest, - RotatingFileHandlerTest, - LastResortTest, - TimedRotatingFileHandlerTest, HandlerTest, + CustomLevelsAndFiltersTest, HandlerTest, MemoryHandlerTest, + ConfigFileTest, SocketHandlerTest, DatagramHandlerTest, + MemoryTest, EncodingTest, WarningsTest, ConfigDictTest, + ManagerTest, FormatterTest, BufferingFormatterTest, + StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest, + QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest, + BasicConfigTest, LoggerAdapterTest, LoggerTest, + SMTPHandlerTest, FileHandlerTest, RotatingFileHandlerTest, + LastResortTest, LogRecordTest, ExceptionTest, + SysLogHandlerTest, HTTPHandlerTest, NTEventLogHandlerTest, + TimedRotatingFileHandlerTest ) if __name__ == "__main__": |