summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_logging.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r--Lib/test/test_logging.py534
1 files changed, 256 insertions, 278 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index f4aef9f..9c3816a 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -42,22 +42,19 @@ import tempfile
from test.support.script_helper import assert_python_ok
from test import support
import textwrap
+import threading
import time
import unittest
import warnings
import weakref
-try:
- import threading
- # The following imports are needed only for tests which
- # require threading
- import asyncore
- from http.server import HTTPServer, BaseHTTPRequestHandler
- import smtpd
- from urllib.parse import urlparse, parse_qs
- from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
- ThreadingTCPServer, StreamRequestHandler)
-except ImportError:
- threading = None
+
+import asyncore
+from http.server import HTTPServer, BaseHTTPRequestHandler
+import smtpd
+from urllib.parse import urlparse, parse_qs
+from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
+ ThreadingTCPServer, StreamRequestHandler)
+
try:
import win32evtlog, win32evtlogutil, pywintypes
except ImportError:
@@ -625,7 +622,6 @@ class HandlerTest(BaseTest):
os.unlink(fn)
@unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
- @unittest.skipUnless(threading, 'Threading required for this test.')
def test_race(self):
# Issue #14632 refers.
def remove_loop(fname, tries):
@@ -719,276 +715,274 @@ class StreamHandlerTest(BaseTest):
# -- The following section could be moved into a server_helper.py module
# -- if it proves to be of wider utility than just test_logging
-if threading:
- class TestSMTPServer(smtpd.SMTPServer):
+class TestSMTPServer(smtpd.SMTPServer):
+ """
+ This class implements a test SMTP server.
+
+ :param addr: A (host, port) tuple which the server listens on.
+ You can specify a port value of zero: the server's
+ *port* attribute will hold the actual port number
+ used, which can be used in client connections.
+ :param handler: A callable which will be called to process
+ incoming messages. The handler will be passed
+ the client address tuple, who the message is from,
+ a list of recipients and the message data.
+ :param poll_interval: The interval, in seconds, used in the underlying
+ :func:`select` or :func:`poll` call by
+ :func:`asyncore.loop`.
+ :param sockmap: A dictionary which will be used to hold
+ :class:`asyncore.dispatcher` instances used by
+ :func:`asyncore.loop`. This avoids changing the
+ :mod:`asyncore` module's global state.
+ """
+
+ def __init__(self, addr, handler, poll_interval, sockmap):
+ smtpd.SMTPServer.__init__(self, addr, None, map=sockmap,
+ decode_data=True)
+ self.port = self.socket.getsockname()[1]
+ self._handler = handler
+ self._thread = None
+ self.poll_interval = poll_interval
+
+ def process_message(self, peer, mailfrom, rcpttos, data):
+ """
+ Delegates to the handler passed in to the server's constructor.
+
+ Typically, this will be a test case method.
+ :param peer: The client (host, port) tuple.
+ :param mailfrom: The address of the sender.
+ :param rcpttos: The addresses of the recipients.
+ :param data: The message.
"""
- This class implements a test SMTP server.
-
- :param addr: A (host, port) tuple which the server listens on.
- You can specify a port value of zero: the server's
- *port* attribute will hold the actual port number
- used, which can be used in client connections.
- :param handler: A callable which will be called to process
- incoming messages. The handler will be passed
- the client address tuple, who the message is from,
- a list of recipients and the message data.
+ self._handler(peer, mailfrom, rcpttos, data)
+
+ def start(self):
+ """
+ Start the server running on a separate daemon thread.
+ """
+ self._thread = t = threading.Thread(target=self.serve_forever,
+ args=(self.poll_interval,))
+ t.setDaemon(True)
+ t.start()
+
+ def serve_forever(self, poll_interval):
+ """
+ Run the :mod:`asyncore` loop until normal termination
+ conditions arise.
:param poll_interval: The interval, in seconds, used in the underlying
:func:`select` or :func:`poll` call by
:func:`asyncore.loop`.
- :param sockmap: A dictionary which will be used to hold
- :class:`asyncore.dispatcher` instances used by
- :func:`asyncore.loop`. This avoids changing the
- :mod:`asyncore` module's global state.
"""
+ try:
+ asyncore.loop(poll_interval, map=self._map)
+ except OSError:
+ # On FreeBSD 8, closing the server repeatably
+ # raises this error. We swallow it if the
+ # server has been closed.
+ if self.connected or self.accepting:
+ raise
- def __init__(self, addr, handler, poll_interval, sockmap):
- smtpd.SMTPServer.__init__(self, addr, None, map=sockmap,
- decode_data=True)
- self.port = self.socket.getsockname()[1]
- self._handler = handler
- self._thread = None
- self.poll_interval = poll_interval
+ def stop(self, timeout=None):
+ """
+ Stop the thread by closing the server instance.
+ Wait for the server thread to terminate.
- def process_message(self, peer, mailfrom, rcpttos, data):
- """
- Delegates to the handler passed in to the server's constructor.
+ :param timeout: How long to wait for the server thread
+ to terminate.
+ """
+ self.close()
+ self._thread.join(timeout)
+ asyncore.close_all(map=self._map, ignore_all=True)
- Typically, this will be a test case method.
- :param peer: The client (host, port) tuple.
- :param mailfrom: The address of the sender.
- :param rcpttos: The addresses of the recipients.
- :param data: The message.
- """
- self._handler(peer, mailfrom, rcpttos, data)
+ alive = self._thread.is_alive()
+ self._thread = None
+ if alive:
+ self.fail("join() timed out")
- def start(self):
- """
- Start the server running on a separate daemon thread.
- """
- self._thread = t = threading.Thread(target=self.serve_forever,
- args=(self.poll_interval,))
- t.setDaemon(True)
- t.start()
+class ControlMixin(object):
+ """
+ This mixin is used to start a server on a separate thread, and
+ shut it down programmatically. Request handling is simplified - instead
+ of needing to derive a suitable RequestHandler subclass, you just
+ provide a callable which will be passed each received request to be
+ processed.
+
+ :param handler: A handler callable which will be called with a
+ single parameter - the request - in order to
+ process the request. This handler is called on the
+ server thread, effectively meaning that requests are
+ processed serially. While not quite Web scale ;-),
+ this should be fine for testing applications.
+ :param poll_interval: The polling interval in seconds.
+ """
+ def __init__(self, handler, poll_interval):
+ self._thread = None
+ self.poll_interval = poll_interval
+ self._handler = handler
+ self.ready = threading.Event()
- def serve_forever(self, poll_interval):
- """
- Run the :mod:`asyncore` loop until normal termination
- conditions arise.
- :param poll_interval: The interval, in seconds, used in the underlying
- :func:`select` or :func:`poll` call by
- :func:`asyncore.loop`.
- """
- try:
- asyncore.loop(poll_interval, map=self._map)
- except OSError:
- # On FreeBSD 8, closing the server repeatably
- # raises this error. We swallow it if the
- # server has been closed.
- if self.connected or self.accepting:
- raise
-
- def stop(self, timeout=None):
- """
- Stop the thread by closing the server instance.
- Wait for the server thread to terminate.
+ def start(self):
+ """
+ Create a daemon thread to run the server, and start it.
+ """
+ self._thread = t = threading.Thread(target=self.serve_forever,
+ args=(self.poll_interval,))
+ t.setDaemon(True)
+ t.start()
- :param timeout: How long to wait for the server thread
- to terminate.
- """
- self.close()
- self._thread.join(timeout)
- asyncore.close_all(map=self._map, ignore_all=True)
+ def serve_forever(self, poll_interval):
+ """
+ Run the server. Set the ready flag before entering the
+ service loop.
+ """
+ self.ready.set()
+ super(ControlMixin, self).serve_forever(poll_interval)
+ def stop(self, timeout=None):
+ """
+ Tell the server thread to stop, and wait for it to do so.
+
+ :param timeout: How long to wait for the server thread
+ to terminate.
+ """
+ self.shutdown()
+ if self._thread is not None:
+ self._thread.join(timeout)
alive = self._thread.is_alive()
self._thread = None
if alive:
self.fail("join() timed out")
+ self.server_close()
+ self.ready.clear()
- class ControlMixin(object):
- """
- This mixin is used to start a server on a separate thread, and
- shut it down programmatically. Request handling is simplified - instead
- of needing to derive a suitable RequestHandler subclass, you just
- provide a callable which will be passed each received request to be
- processed.
-
- :param handler: A handler callable which will be called with a
- single parameter - the request - in order to
- process the request. This handler is called on the
- server thread, effectively meaning that requests are
- processed serially. While not quite Web scale ;-),
- this should be fine for testing applications.
- :param poll_interval: The polling interval in seconds.
- """
- def __init__(self, handler, poll_interval):
- self._thread = None
- self.poll_interval = poll_interval
- self._handler = handler
- self.ready = threading.Event()
+class TestHTTPServer(ControlMixin, HTTPServer):
+ """
+ An HTTP server which is controllable using :class:`ControlMixin`.
+
+ :param addr: A tuple with the IP address and port to listen on.
+ :param handler: A handler callable which will be called with a
+ single parameter - the request - in order to
+ process the request.
+ :param poll_interval: The polling interval in seconds.
+ :param log: Pass ``True`` to enable log messages.
+ """
+ def __init__(self, addr, handler, poll_interval=0.5,
+ log=False, sslctx=None):
+ class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
+ def __getattr__(self, name, default=None):
+ if name.startswith('do_'):
+ return self.process_request
+ raise AttributeError(name)
+
+ def process_request(self):
+ self.server._handler(self)
+
+ def log_message(self, format, *args):
+ if log:
+ super(DelegatingHTTPRequestHandler,
+ self).log_message(format, *args)
+ HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
+ ControlMixin.__init__(self, handler, poll_interval)
+ self.sslctx = sslctx
+
+ def get_request(self):
+ try:
+ sock, addr = self.socket.accept()
+ if self.sslctx:
+ sock = self.sslctx.wrap_socket(sock, server_side=True)
+ except OSError as e:
+ # socket errors are silenced by the caller, print them here
+ sys.stderr.write("Got an error:\n%s\n" % e)
+ raise
+ return sock, addr
- def start(self):
- """
- Create a daemon thread to run the server, and start it.
- """
- self._thread = t = threading.Thread(target=self.serve_forever,
- args=(self.poll_interval,))
- t.setDaemon(True)
- t.start()
+class TestTCPServer(ControlMixin, ThreadingTCPServer):
+ """
+ A TCP server which is controllable using :class:`ControlMixin`.
+
+ :param addr: A tuple with the IP address and port to listen on.
+ :param handler: A handler callable which will be called with a single
+ parameter - the request - in order to process the request.
+ :param poll_interval: The polling interval in seconds.
+ :bind_and_activate: If True (the default), binds the server and starts it
+ listening. If False, you need to call
+ :meth:`server_bind` and :meth:`server_activate` at
+ some later time before calling :meth:`start`, so that
+ the server will set up the socket and listen on it.
+ """
- def serve_forever(self, poll_interval):
- """
- Run the server. Set the ready flag before entering the
- service loop.
- """
- self.ready.set()
- super(ControlMixin, self).serve_forever(poll_interval)
+ allow_reuse_address = True
- def stop(self, timeout=None):
- """
- Tell the server thread to stop, and wait for it to do so.
+ def __init__(self, addr, handler, poll_interval=0.5,
+ bind_and_activate=True):
+ class DelegatingTCPRequestHandler(StreamRequestHandler):
- :param timeout: How long to wait for the server thread
- to terminate.
- """
- self.shutdown()
- if self._thread is not None:
- self._thread.join(timeout)
- alive = self._thread.is_alive()
- self._thread = None
- if alive:
- self.fail("join() timed out")
- self.server_close()
- self.ready.clear()
-
- class TestHTTPServer(ControlMixin, HTTPServer):
- """
- An HTTP server which is controllable using :class:`ControlMixin`.
-
- :param addr: A tuple with the IP address and port to listen on.
- :param handler: A handler callable which will be called with a
- single parameter - the request - in order to
- process the request.
- :param poll_interval: The polling interval in seconds.
- :param log: Pass ``True`` to enable log messages.
- """
- def __init__(self, addr, handler, poll_interval=0.5,
- log=False, sslctx=None):
- class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
- def __getattr__(self, name, default=None):
- if name.startswith('do_'):
- return self.process_request
- raise AttributeError(name)
-
- def process_request(self):
- self.server._handler(self)
-
- def log_message(self, format, *args):
- if log:
- super(DelegatingHTTPRequestHandler,
- self).log_message(format, *args)
- HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
- ControlMixin.__init__(self, handler, poll_interval)
- self.sslctx = sslctx
-
- def get_request(self):
- try:
- sock, addr = self.socket.accept()
- if self.sslctx:
- sock = self.sslctx.wrap_socket(sock, server_side=True)
- except OSError as e:
- # socket errors are silenced by the caller, print them here
- sys.stderr.write("Got an error:\n%s\n" % e)
- raise
- return sock, addr
+ def handle(self):
+ self.server._handler(self)
+ ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
+ bind_and_activate)
+ ControlMixin.__init__(self, handler, poll_interval)
- class TestTCPServer(ControlMixin, ThreadingTCPServer):
- """
- A TCP server which is controllable using :class:`ControlMixin`.
-
- :param addr: A tuple with the IP address and port to listen on.
- :param handler: A handler callable which will be called with a single
- parameter - the request - in order to process the request.
- :param poll_interval: The polling interval in seconds.
- :bind_and_activate: If True (the default), binds the server and starts it
- listening. If False, you need to call
- :meth:`server_bind` and :meth:`server_activate` at
- some later time before calling :meth:`start`, so that
- the server will set up the socket and listen on it.
- """
+ def server_bind(self):
+ super(TestTCPServer, self).server_bind()
+ self.port = self.socket.getsockname()[1]
- allow_reuse_address = True
+class TestUDPServer(ControlMixin, ThreadingUDPServer):
+ """
+ A UDP server which is controllable using :class:`ControlMixin`.
+
+ :param addr: A tuple with the IP address and port to listen on.
+ :param handler: A handler callable which will be called with a
+ single parameter - the request - in order to
+ process the request.
+ :param poll_interval: The polling interval for shutdown requests,
+ in seconds.
+ :bind_and_activate: If True (the default), binds the server and
+ starts it listening. If False, you need to
+ call :meth:`server_bind` and
+ :meth:`server_activate` at some later time
+ before calling :meth:`start`, so that the server will
+ set up the socket and listen on it.
+ """
+ def __init__(self, addr, handler, poll_interval=0.5,
+ bind_and_activate=True):
+ class DelegatingUDPRequestHandler(DatagramRequestHandler):
- def __init__(self, addr, handler, poll_interval=0.5,
- bind_and_activate=True):
- class DelegatingTCPRequestHandler(StreamRequestHandler):
+ def handle(self):
+ self.server._handler(self)
- def handle(self):
- self.server._handler(self)
- ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
- bind_and_activate)
- ControlMixin.__init__(self, handler, poll_interval)
+ def finish(self):
+ data = self.wfile.getvalue()
+ if data:
+ try:
+ super(DelegatingUDPRequestHandler, self).finish()
+ except OSError:
+ if not self.server._closed:
+ raise
- def server_bind(self):
- super(TestTCPServer, self).server_bind()
- self.port = self.socket.getsockname()[1]
+ ThreadingUDPServer.__init__(self, addr,
+ DelegatingUDPRequestHandler,
+ bind_and_activate)
+ ControlMixin.__init__(self, handler, poll_interval)
+ self._closed = False
- class TestUDPServer(ControlMixin, ThreadingUDPServer):
- """
- A UDP server which is controllable using :class:`ControlMixin`.
-
- :param addr: A tuple with the IP address and port to listen on.
- :param handler: A handler callable which will be called with a
- single parameter - the request - in order to
- process the request.
- :param poll_interval: The polling interval for shutdown requests,
- in seconds.
- :bind_and_activate: If True (the default), binds the server and
- starts it listening. If False, you need to
- call :meth:`server_bind` and
- :meth:`server_activate` at some later time
- before calling :meth:`start`, so that the server will
- set up the socket and listen on it.
- """
- def __init__(self, addr, handler, poll_interval=0.5,
- bind_and_activate=True):
- class DelegatingUDPRequestHandler(DatagramRequestHandler):
-
- def handle(self):
- self.server._handler(self)
-
- def finish(self):
- data = self.wfile.getvalue()
- if data:
- try:
- super(DelegatingUDPRequestHandler, self).finish()
- except OSError:
- if not self.server._closed:
- raise
-
- ThreadingUDPServer.__init__(self, addr,
- DelegatingUDPRequestHandler,
- bind_and_activate)
- ControlMixin.__init__(self, handler, poll_interval)
- self._closed = False
-
- def server_bind(self):
- super(TestUDPServer, self).server_bind()
- self.port = self.socket.getsockname()[1]
-
- def server_close(self):
- super(TestUDPServer, self).server_close()
- self._closed = True
+ def server_bind(self):
+ super(TestUDPServer, self).server_bind()
+ self.port = self.socket.getsockname()[1]
- if hasattr(socket, "AF_UNIX"):
- class TestUnixStreamServer(TestTCPServer):
- address_family = socket.AF_UNIX
+ def server_close(self):
+ super(TestUDPServer, self).server_close()
+ self._closed = True
- class TestUnixDatagramServer(TestUDPServer):
- address_family = socket.AF_UNIX
+if hasattr(socket, "AF_UNIX"):
+ class TestUnixStreamServer(TestTCPServer):
+ address_family = socket.AF_UNIX
+
+ class TestUnixDatagramServer(TestUDPServer):
+ address_family = socket.AF_UNIX
# - end of server_helper section
-@unittest.skipUnless(threading, 'Threading required for this test.')
class SMTPHandlerTest(BaseTest):
TIMEOUT = 8.0
@@ -1472,14 +1466,12 @@ class ConfigFileTest(BaseTest):
@unittest.skipIf(True, "FIXME: bpo-30830")
-@unittest.skipUnless(threading, 'Threading required for this test.')
class SocketHandlerTest(BaseTest):
"""Test for SocketHandler objects."""
- if threading:
- server_class = TestTCPServer
- address = ('localhost', 0)
+ server_class = TestTCPServer
+ address = ('localhost', 0)
def setUp(self):
"""Set up a TCP server to receive log messages, and a SocketHandler
@@ -1573,12 +1565,11 @@ def _get_temp_domain_socket():
@unittest.skipIf(True, "FIXME: bpo-30830")
@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
-@unittest.skipUnless(threading, 'Threading required for this test.')
class UnixSocketHandlerTest(SocketHandlerTest):
"""Test for SocketHandler with unix sockets."""
- if threading and hasattr(socket, "AF_UNIX"):
+ if hasattr(socket, "AF_UNIX"):
server_class = TestUnixStreamServer
def setUp(self):
@@ -1591,14 +1582,12 @@ class UnixSocketHandlerTest(SocketHandlerTest):
support.unlink(self.address)
@unittest.skipIf(True, "FIXME: bpo-30830")
-@unittest.skipUnless(threading, 'Threading required for this test.')
class DatagramHandlerTest(BaseTest):
"""Test for DatagramHandler."""
- if threading:
- server_class = TestUDPServer
- address = ('localhost', 0)
+ server_class = TestUDPServer
+ address = ('localhost', 0)
def setUp(self):
"""Set up a UDP server to receive log messages, and a DatagramHandler
@@ -1659,12 +1648,11 @@ class DatagramHandlerTest(BaseTest):
@unittest.skipIf(True, "FIXME: bpo-30830")
@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
-@unittest.skipUnless(threading, 'Threading required for this test.')
class UnixDatagramHandlerTest(DatagramHandlerTest):
"""Test for DatagramHandler using Unix sockets."""
- if threading and hasattr(socket, "AF_UNIX"):
+ if hasattr(socket, "AF_UNIX"):
server_class = TestUnixDatagramServer
def setUp(self):
@@ -1676,14 +1664,12 @@ class UnixDatagramHandlerTest(DatagramHandlerTest):
DatagramHandlerTest.tearDown(self)
support.unlink(self.address)
-@unittest.skipUnless(threading, 'Threading required for this test.')
class SysLogHandlerTest(BaseTest):
"""Test for SysLogHandler using UDP."""
- if threading:
- server_class = TestUDPServer
- address = ('localhost', 0)
+ server_class = TestUDPServer
+ address = ('localhost', 0)
def setUp(self):
"""Set up a UDP server to receive log messages, and a SysLogHandler
@@ -1747,12 +1733,11 @@ class SysLogHandlerTest(BaseTest):
@unittest.skipIf(True, "FIXME: bpo-30830")
@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
-@unittest.skipUnless(threading, 'Threading required for this test.')
class UnixSysLogHandlerTest(SysLogHandlerTest):
"""Test for SysLogHandler with Unix sockets."""
- if threading and hasattr(socket, "AF_UNIX"):
+ if hasattr(socket, "AF_UNIX"):
server_class = TestUnixDatagramServer
def setUp(self):
@@ -1767,7 +1752,6 @@ class UnixSysLogHandlerTest(SysLogHandlerTest):
@unittest.skipIf(True, "FIXME: bpo-30830")
@unittest.skipUnless(support.IPV6_ENABLED,
'IPv6 support required for this test.')
-@unittest.skipUnless(threading, 'Threading required for this test.')
class IPv6SysLogHandlerTest(SysLogHandlerTest):
"""Test for SysLogHandler with IPv6 host."""
@@ -1783,7 +1767,6 @@ class IPv6SysLogHandlerTest(SysLogHandlerTest):
self.server_class.address_family = socket.AF_INET
super(IPv6SysLogHandlerTest, self).tearDown()
-@unittest.skipUnless(threading, 'Threading required for this test.')
class HTTPHandlerTest(BaseTest):
"""Test for HTTPHandler."""
@@ -2892,7 +2875,6 @@ class ConfigDictTest(BaseTest):
# listen() uses ConfigSocketReceiver which is based
# on socketserver.ThreadingTCPServer
@unittest.skipIf(True, "FIXME: bpo-30830")
- @unittest.skipUnless(threading, 'listen() needs threading to work')
def setup_via_listener(self, text, verify=None):
text = text.encode("utf-8")
# Ask for a randomly assigned port (by using port 0)
@@ -2923,7 +2905,6 @@ class ConfigDictTest(BaseTest):
if t.is_alive():
self.fail("join() timed out")
- @unittest.skipUnless(threading, 'Threading required for this test.')
def test_listen_config_10_ok(self):
with support.captured_stdout() as output:
self.setup_via_listener(json.dumps(self.config10))
@@ -2943,7 +2924,6 @@ class ConfigDictTest(BaseTest):
('ERROR', '4'),
], stream=output)
- @unittest.skipUnless(threading, 'Threading required for this test.')
def test_listen_config_1_ok(self):
with support.captured_stdout() as output:
self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
@@ -2958,7 +2938,6 @@ class ConfigDictTest(BaseTest):
# Original logger output is empty.
self.assert_log_lines([])
- @unittest.skipUnless(threading, 'Threading required for this test.')
def test_listen_verify(self):
def verify_fail(stuff):
@@ -3713,9 +3692,8 @@ class LogRecordTest(BaseTest):
def test_optional(self):
r = logging.makeLogRecord({})
NOT_NONE = self.assertIsNotNone
- if threading:
- NOT_NONE(r.thread)
- NOT_NONE(r.threadName)
+ NOT_NONE(r.thread)
+ NOT_NONE(r.threadName)
NOT_NONE(r.process)
NOT_NONE(r.processName)
log_threads = logging.logThreads