diff options
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r-- | Lib/test/test_logging.py | 534 |
1 files changed, 256 insertions, 278 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index f4aef9f..9c3816a 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -42,22 +42,19 @@ import tempfile from test.support.script_helper import assert_python_ok from test import support import textwrap +import threading import time import unittest import warnings import weakref -try: - import threading - # The following imports are needed only for tests which - # require threading - import asyncore - from http.server import HTTPServer, BaseHTTPRequestHandler - import smtpd - from urllib.parse import urlparse, parse_qs - from socketserver import (ThreadingUDPServer, DatagramRequestHandler, - ThreadingTCPServer, StreamRequestHandler) -except ImportError: - threading = None + +import asyncore +from http.server import HTTPServer, BaseHTTPRequestHandler +import smtpd +from urllib.parse import urlparse, parse_qs +from socketserver import (ThreadingUDPServer, DatagramRequestHandler, + ThreadingTCPServer, StreamRequestHandler) + try: import win32evtlog, win32evtlogutil, pywintypes except ImportError: @@ -625,7 +622,6 @@ class HandlerTest(BaseTest): os.unlink(fn) @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.') - @unittest.skipUnless(threading, 'Threading required for this test.') def test_race(self): # Issue #14632 refers. def remove_loop(fname, tries): @@ -719,276 +715,274 @@ class StreamHandlerTest(BaseTest): # -- The following section could be moved into a server_helper.py module # -- if it proves to be of wider utility than just test_logging -if threading: - class TestSMTPServer(smtpd.SMTPServer): +class TestSMTPServer(smtpd.SMTPServer): + """ + This class implements a test SMTP server. + + :param addr: A (host, port) tuple which the server listens on. + You can specify a port value of zero: the server's + *port* attribute will hold the actual port number + used, which can be used in client connections. + :param handler: A callable which will be called to process + incoming messages. The handler will be passed + the client address tuple, who the message is from, + a list of recipients and the message data. + :param poll_interval: The interval, in seconds, used in the underlying + :func:`select` or :func:`poll` call by + :func:`asyncore.loop`. + :param sockmap: A dictionary which will be used to hold + :class:`asyncore.dispatcher` instances used by + :func:`asyncore.loop`. This avoids changing the + :mod:`asyncore` module's global state. + """ + + def __init__(self, addr, handler, poll_interval, sockmap): + smtpd.SMTPServer.__init__(self, addr, None, map=sockmap, + decode_data=True) + self.port = self.socket.getsockname()[1] + self._handler = handler + self._thread = None + self.poll_interval = poll_interval + + def process_message(self, peer, mailfrom, rcpttos, data): + """ + Delegates to the handler passed in to the server's constructor. + + Typically, this will be a test case method. + :param peer: The client (host, port) tuple. + :param mailfrom: The address of the sender. + :param rcpttos: The addresses of the recipients. + :param data: The message. """ - This class implements a test SMTP server. - - :param addr: A (host, port) tuple which the server listens on. - You can specify a port value of zero: the server's - *port* attribute will hold the actual port number - used, which can be used in client connections. - :param handler: A callable which will be called to process - incoming messages. The handler will be passed - the client address tuple, who the message is from, - a list of recipients and the message data. + self._handler(peer, mailfrom, rcpttos, data) + + def start(self): + """ + Start the server running on a separate daemon thread. + """ + self._thread = t = threading.Thread(target=self.serve_forever, + args=(self.poll_interval,)) + t.setDaemon(True) + t.start() + + def serve_forever(self, poll_interval): + """ + Run the :mod:`asyncore` loop until normal termination + conditions arise. :param poll_interval: The interval, in seconds, used in the underlying :func:`select` or :func:`poll` call by :func:`asyncore.loop`. - :param sockmap: A dictionary which will be used to hold - :class:`asyncore.dispatcher` instances used by - :func:`asyncore.loop`. This avoids changing the - :mod:`asyncore` module's global state. """ + try: + asyncore.loop(poll_interval, map=self._map) + except OSError: + # On FreeBSD 8, closing the server repeatably + # raises this error. We swallow it if the + # server has been closed. + if self.connected or self.accepting: + raise - def __init__(self, addr, handler, poll_interval, sockmap): - smtpd.SMTPServer.__init__(self, addr, None, map=sockmap, - decode_data=True) - self.port = self.socket.getsockname()[1] - self._handler = handler - self._thread = None - self.poll_interval = poll_interval + def stop(self, timeout=None): + """ + Stop the thread by closing the server instance. + Wait for the server thread to terminate. - def process_message(self, peer, mailfrom, rcpttos, data): - """ - Delegates to the handler passed in to the server's constructor. + :param timeout: How long to wait for the server thread + to terminate. + """ + self.close() + self._thread.join(timeout) + asyncore.close_all(map=self._map, ignore_all=True) - Typically, this will be a test case method. - :param peer: The client (host, port) tuple. - :param mailfrom: The address of the sender. - :param rcpttos: The addresses of the recipients. - :param data: The message. - """ - self._handler(peer, mailfrom, rcpttos, data) + alive = self._thread.is_alive() + self._thread = None + if alive: + self.fail("join() timed out") - def start(self): - """ - Start the server running on a separate daemon thread. - """ - self._thread = t = threading.Thread(target=self.serve_forever, - args=(self.poll_interval,)) - t.setDaemon(True) - t.start() +class ControlMixin(object): + """ + This mixin is used to start a server on a separate thread, and + shut it down programmatically. Request handling is simplified - instead + of needing to derive a suitable RequestHandler subclass, you just + provide a callable which will be passed each received request to be + processed. + + :param handler: A handler callable which will be called with a + single parameter - the request - in order to + process the request. This handler is called on the + server thread, effectively meaning that requests are + processed serially. While not quite Web scale ;-), + this should be fine for testing applications. + :param poll_interval: The polling interval in seconds. + """ + def __init__(self, handler, poll_interval): + self._thread = None + self.poll_interval = poll_interval + self._handler = handler + self.ready = threading.Event() - def serve_forever(self, poll_interval): - """ - Run the :mod:`asyncore` loop until normal termination - conditions arise. - :param poll_interval: The interval, in seconds, used in the underlying - :func:`select` or :func:`poll` call by - :func:`asyncore.loop`. - """ - try: - asyncore.loop(poll_interval, map=self._map) - except OSError: - # On FreeBSD 8, closing the server repeatably - # raises this error. We swallow it if the - # server has been closed. - if self.connected or self.accepting: - raise - - def stop(self, timeout=None): - """ - Stop the thread by closing the server instance. - Wait for the server thread to terminate. + def start(self): + """ + Create a daemon thread to run the server, and start it. + """ + self._thread = t = threading.Thread(target=self.serve_forever, + args=(self.poll_interval,)) + t.setDaemon(True) + t.start() - :param timeout: How long to wait for the server thread - to terminate. - """ - self.close() - self._thread.join(timeout) - asyncore.close_all(map=self._map, ignore_all=True) + def serve_forever(self, poll_interval): + """ + Run the server. Set the ready flag before entering the + service loop. + """ + self.ready.set() + super(ControlMixin, self).serve_forever(poll_interval) + def stop(self, timeout=None): + """ + Tell the server thread to stop, and wait for it to do so. + + :param timeout: How long to wait for the server thread + to terminate. + """ + self.shutdown() + if self._thread is not None: + self._thread.join(timeout) alive = self._thread.is_alive() self._thread = None if alive: self.fail("join() timed out") + self.server_close() + self.ready.clear() - class ControlMixin(object): - """ - This mixin is used to start a server on a separate thread, and - shut it down programmatically. Request handling is simplified - instead - of needing to derive a suitable RequestHandler subclass, you just - provide a callable which will be passed each received request to be - processed. - - :param handler: A handler callable which will be called with a - single parameter - the request - in order to - process the request. This handler is called on the - server thread, effectively meaning that requests are - processed serially. While not quite Web scale ;-), - this should be fine for testing applications. - :param poll_interval: The polling interval in seconds. - """ - def __init__(self, handler, poll_interval): - self._thread = None - self.poll_interval = poll_interval - self._handler = handler - self.ready = threading.Event() +class TestHTTPServer(ControlMixin, HTTPServer): + """ + An HTTP server which is controllable using :class:`ControlMixin`. + + :param addr: A tuple with the IP address and port to listen on. + :param handler: A handler callable which will be called with a + single parameter - the request - in order to + process the request. + :param poll_interval: The polling interval in seconds. + :param log: Pass ``True`` to enable log messages. + """ + def __init__(self, addr, handler, poll_interval=0.5, + log=False, sslctx=None): + class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): + def __getattr__(self, name, default=None): + if name.startswith('do_'): + return self.process_request + raise AttributeError(name) + + def process_request(self): + self.server._handler(self) + + def log_message(self, format, *args): + if log: + super(DelegatingHTTPRequestHandler, + self).log_message(format, *args) + HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) + ControlMixin.__init__(self, handler, poll_interval) + self.sslctx = sslctx + + def get_request(self): + try: + sock, addr = self.socket.accept() + if self.sslctx: + sock = self.sslctx.wrap_socket(sock, server_side=True) + except OSError as e: + # socket errors are silenced by the caller, print them here + sys.stderr.write("Got an error:\n%s\n" % e) + raise + return sock, addr - def start(self): - """ - Create a daemon thread to run the server, and start it. - """ - self._thread = t = threading.Thread(target=self.serve_forever, - args=(self.poll_interval,)) - t.setDaemon(True) - t.start() +class TestTCPServer(ControlMixin, ThreadingTCPServer): + """ + A TCP server which is controllable using :class:`ControlMixin`. + + :param addr: A tuple with the IP address and port to listen on. + :param handler: A handler callable which will be called with a single + parameter - the request - in order to process the request. + :param poll_interval: The polling interval in seconds. + :bind_and_activate: If True (the default), binds the server and starts it + listening. If False, you need to call + :meth:`server_bind` and :meth:`server_activate` at + some later time before calling :meth:`start`, so that + the server will set up the socket and listen on it. + """ - def serve_forever(self, poll_interval): - """ - Run the server. Set the ready flag before entering the - service loop. - """ - self.ready.set() - super(ControlMixin, self).serve_forever(poll_interval) + allow_reuse_address = True - def stop(self, timeout=None): - """ - Tell the server thread to stop, and wait for it to do so. + def __init__(self, addr, handler, poll_interval=0.5, + bind_and_activate=True): + class DelegatingTCPRequestHandler(StreamRequestHandler): - :param timeout: How long to wait for the server thread - to terminate. - """ - self.shutdown() - if self._thread is not None: - self._thread.join(timeout) - alive = self._thread.is_alive() - self._thread = None - if alive: - self.fail("join() timed out") - self.server_close() - self.ready.clear() - - class TestHTTPServer(ControlMixin, HTTPServer): - """ - An HTTP server which is controllable using :class:`ControlMixin`. - - :param addr: A tuple with the IP address and port to listen on. - :param handler: A handler callable which will be called with a - single parameter - the request - in order to - process the request. - :param poll_interval: The polling interval in seconds. - :param log: Pass ``True`` to enable log messages. - """ - def __init__(self, addr, handler, poll_interval=0.5, - log=False, sslctx=None): - class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): - def __getattr__(self, name, default=None): - if name.startswith('do_'): - return self.process_request - raise AttributeError(name) - - def process_request(self): - self.server._handler(self) - - def log_message(self, format, *args): - if log: - super(DelegatingHTTPRequestHandler, - self).log_message(format, *args) - HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) - ControlMixin.__init__(self, handler, poll_interval) - self.sslctx = sslctx - - def get_request(self): - try: - sock, addr = self.socket.accept() - if self.sslctx: - sock = self.sslctx.wrap_socket(sock, server_side=True) - except OSError as e: - # socket errors are silenced by the caller, print them here - sys.stderr.write("Got an error:\n%s\n" % e) - raise - return sock, addr + def handle(self): + self.server._handler(self) + ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, + bind_and_activate) + ControlMixin.__init__(self, handler, poll_interval) - class TestTCPServer(ControlMixin, ThreadingTCPServer): - """ - A TCP server which is controllable using :class:`ControlMixin`. - - :param addr: A tuple with the IP address and port to listen on. - :param handler: A handler callable which will be called with a single - parameter - the request - in order to process the request. - :param poll_interval: The polling interval in seconds. - :bind_and_activate: If True (the default), binds the server and starts it - listening. If False, you need to call - :meth:`server_bind` and :meth:`server_activate` at - some later time before calling :meth:`start`, so that - the server will set up the socket and listen on it. - """ + def server_bind(self): + super(TestTCPServer, self).server_bind() + self.port = self.socket.getsockname()[1] - allow_reuse_address = True +class TestUDPServer(ControlMixin, ThreadingUDPServer): + """ + A UDP server which is controllable using :class:`ControlMixin`. + + :param addr: A tuple with the IP address and port to listen on. + :param handler: A handler callable which will be called with a + single parameter - the request - in order to + process the request. + :param poll_interval: The polling interval for shutdown requests, + in seconds. + :bind_and_activate: If True (the default), binds the server and + starts it listening. If False, you need to + call :meth:`server_bind` and + :meth:`server_activate` at some later time + before calling :meth:`start`, so that the server will + set up the socket and listen on it. + """ + def __init__(self, addr, handler, poll_interval=0.5, + bind_and_activate=True): + class DelegatingUDPRequestHandler(DatagramRequestHandler): - def __init__(self, addr, handler, poll_interval=0.5, - bind_and_activate=True): - class DelegatingTCPRequestHandler(StreamRequestHandler): + def handle(self): + self.server._handler(self) - def handle(self): - self.server._handler(self) - ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, - bind_and_activate) - ControlMixin.__init__(self, handler, poll_interval) + def finish(self): + data = self.wfile.getvalue() + if data: + try: + super(DelegatingUDPRequestHandler, self).finish() + except OSError: + if not self.server._closed: + raise - def server_bind(self): - super(TestTCPServer, self).server_bind() - self.port = self.socket.getsockname()[1] + ThreadingUDPServer.__init__(self, addr, + DelegatingUDPRequestHandler, + bind_and_activate) + ControlMixin.__init__(self, handler, poll_interval) + self._closed = False - class TestUDPServer(ControlMixin, ThreadingUDPServer): - """ - A UDP server which is controllable using :class:`ControlMixin`. - - :param addr: A tuple with the IP address and port to listen on. - :param handler: A handler callable which will be called with a - single parameter - the request - in order to - process the request. - :param poll_interval: The polling interval for shutdown requests, - in seconds. - :bind_and_activate: If True (the default), binds the server and - starts it listening. If False, you need to - call :meth:`server_bind` and - :meth:`server_activate` at some later time - before calling :meth:`start`, so that the server will - set up the socket and listen on it. - """ - def __init__(self, addr, handler, poll_interval=0.5, - bind_and_activate=True): - class DelegatingUDPRequestHandler(DatagramRequestHandler): - - def handle(self): - self.server._handler(self) - - def finish(self): - data = self.wfile.getvalue() - if data: - try: - super(DelegatingUDPRequestHandler, self).finish() - except OSError: - if not self.server._closed: - raise - - ThreadingUDPServer.__init__(self, addr, - DelegatingUDPRequestHandler, - bind_and_activate) - ControlMixin.__init__(self, handler, poll_interval) - self._closed = False - - def server_bind(self): - super(TestUDPServer, self).server_bind() - self.port = self.socket.getsockname()[1] - - def server_close(self): - super(TestUDPServer, self).server_close() - self._closed = True + def server_bind(self): + super(TestUDPServer, self).server_bind() + self.port = self.socket.getsockname()[1] - if hasattr(socket, "AF_UNIX"): - class TestUnixStreamServer(TestTCPServer): - address_family = socket.AF_UNIX + def server_close(self): + super(TestUDPServer, self).server_close() + self._closed = True - class TestUnixDatagramServer(TestUDPServer): - address_family = socket.AF_UNIX +if hasattr(socket, "AF_UNIX"): + class TestUnixStreamServer(TestTCPServer): + address_family = socket.AF_UNIX + + class TestUnixDatagramServer(TestUDPServer): + address_family = socket.AF_UNIX # - end of server_helper section -@unittest.skipUnless(threading, 'Threading required for this test.') class SMTPHandlerTest(BaseTest): TIMEOUT = 8.0 @@ -1472,14 +1466,12 @@ class ConfigFileTest(BaseTest): @unittest.skipIf(True, "FIXME: bpo-30830") -@unittest.skipUnless(threading, 'Threading required for this test.') class SocketHandlerTest(BaseTest): """Test for SocketHandler objects.""" - if threading: - server_class = TestTCPServer - address = ('localhost', 0) + server_class = TestTCPServer + address = ('localhost', 0) def setUp(self): """Set up a TCP server to receive log messages, and a SocketHandler @@ -1573,12 +1565,11 @@ def _get_temp_domain_socket(): @unittest.skipIf(True, "FIXME: bpo-30830") @unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") -@unittest.skipUnless(threading, 'Threading required for this test.') class UnixSocketHandlerTest(SocketHandlerTest): """Test for SocketHandler with unix sockets.""" - if threading and hasattr(socket, "AF_UNIX"): + if hasattr(socket, "AF_UNIX"): server_class = TestUnixStreamServer def setUp(self): @@ -1591,14 +1582,12 @@ class UnixSocketHandlerTest(SocketHandlerTest): support.unlink(self.address) @unittest.skipIf(True, "FIXME: bpo-30830") -@unittest.skipUnless(threading, 'Threading required for this test.') class DatagramHandlerTest(BaseTest): """Test for DatagramHandler.""" - if threading: - server_class = TestUDPServer - address = ('localhost', 0) + server_class = TestUDPServer + address = ('localhost', 0) def setUp(self): """Set up a UDP server to receive log messages, and a DatagramHandler @@ -1659,12 +1648,11 @@ class DatagramHandlerTest(BaseTest): @unittest.skipIf(True, "FIXME: bpo-30830") @unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") -@unittest.skipUnless(threading, 'Threading required for this test.') class UnixDatagramHandlerTest(DatagramHandlerTest): """Test for DatagramHandler using Unix sockets.""" - if threading and hasattr(socket, "AF_UNIX"): + if hasattr(socket, "AF_UNIX"): server_class = TestUnixDatagramServer def setUp(self): @@ -1676,14 +1664,12 @@ class UnixDatagramHandlerTest(DatagramHandlerTest): DatagramHandlerTest.tearDown(self) support.unlink(self.address) -@unittest.skipUnless(threading, 'Threading required for this test.') class SysLogHandlerTest(BaseTest): """Test for SysLogHandler using UDP.""" - if threading: - server_class = TestUDPServer - address = ('localhost', 0) + server_class = TestUDPServer + address = ('localhost', 0) def setUp(self): """Set up a UDP server to receive log messages, and a SysLogHandler @@ -1747,12 +1733,11 @@ class SysLogHandlerTest(BaseTest): @unittest.skipIf(True, "FIXME: bpo-30830") @unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") -@unittest.skipUnless(threading, 'Threading required for this test.') class UnixSysLogHandlerTest(SysLogHandlerTest): """Test for SysLogHandler with Unix sockets.""" - if threading and hasattr(socket, "AF_UNIX"): + if hasattr(socket, "AF_UNIX"): server_class = TestUnixDatagramServer def setUp(self): @@ -1767,7 +1752,6 @@ class UnixSysLogHandlerTest(SysLogHandlerTest): @unittest.skipIf(True, "FIXME: bpo-30830") @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required for this test.') -@unittest.skipUnless(threading, 'Threading required for this test.') class IPv6SysLogHandlerTest(SysLogHandlerTest): """Test for SysLogHandler with IPv6 host.""" @@ -1783,7 +1767,6 @@ class IPv6SysLogHandlerTest(SysLogHandlerTest): self.server_class.address_family = socket.AF_INET super(IPv6SysLogHandlerTest, self).tearDown() -@unittest.skipUnless(threading, 'Threading required for this test.') class HTTPHandlerTest(BaseTest): """Test for HTTPHandler.""" @@ -2892,7 +2875,6 @@ class ConfigDictTest(BaseTest): # listen() uses ConfigSocketReceiver which is based # on socketserver.ThreadingTCPServer @unittest.skipIf(True, "FIXME: bpo-30830") - @unittest.skipUnless(threading, 'listen() needs threading to work') def setup_via_listener(self, text, verify=None): text = text.encode("utf-8") # Ask for a randomly assigned port (by using port 0) @@ -2923,7 +2905,6 @@ class ConfigDictTest(BaseTest): if t.is_alive(): self.fail("join() timed out") - @unittest.skipUnless(threading, 'Threading required for this test.') def test_listen_config_10_ok(self): with support.captured_stdout() as output: self.setup_via_listener(json.dumps(self.config10)) @@ -2943,7 +2924,6 @@ class ConfigDictTest(BaseTest): ('ERROR', '4'), ], stream=output) - @unittest.skipUnless(threading, 'Threading required for this test.') def test_listen_config_1_ok(self): with support.captured_stdout() as output: self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) @@ -2958,7 +2938,6 @@ class ConfigDictTest(BaseTest): # Original logger output is empty. self.assert_log_lines([]) - @unittest.skipUnless(threading, 'Threading required for this test.') def test_listen_verify(self): def verify_fail(stuff): @@ -3713,9 +3692,8 @@ class LogRecordTest(BaseTest): def test_optional(self): r = logging.makeLogRecord({}) NOT_NONE = self.assertIsNotNone - if threading: - NOT_NONE(r.thread) - NOT_NONE(r.threadName) + NOT_NONE(r.thread) + NOT_NONE(r.threadName) NOT_NONE(r.process) NOT_NONE(r.processName) log_threads = logging.logThreads |