diff options
author | Vinay Sajip <vinay_sajip@yahoo.co.uk> | 2011-04-30 20:52:48 (GMT) |
---|---|---|
committer | Vinay Sajip <vinay_sajip@yahoo.co.uk> | 2011-04-30 20:52:48 (GMT) |
commit | a463d259304616e3e4051cdace0d7c5d5cc6075f (patch) | |
tree | 5ed1e174832b083c86ca286ddb68fb0ad9f728ba /Lib/test/test_logging.py | |
parent | de19e08de4a844f617c3990cf46d6bb8ccb9a8ed (diff) | |
download | cpython-a463d259304616e3e4051cdace0d7c5d5cc6075f.zip cpython-a463d259304616e3e4051cdace0d7c5d5cc6075f.tar.gz cpython-a463d259304616e3e4051cdace0d7c5d5cc6075f.tar.bz2 |
Improved test coverage.
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r-- | Lib/test/test_logging.py | 109 |
1 files changed, 105 insertions, 4 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index 06dd6d2..54ed734 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -25,8 +25,11 @@ import logging import logging.handlers import logging.config +import asynchat +import asyncore import codecs import datetime +import errno import pickle import io import gc @@ -35,6 +38,7 @@ import os import queue import re import select +import smtpd import socket from socketserver import ThreadingTCPServer, StreamRequestHandler import struct @@ -547,9 +551,6 @@ class HandlerTest(BaseTest): h.close() except socket.error: # syslogd might not be available pass - h = logging.handlers.SMTPHandler('localhost', 'me', 'you', 'Log') - self.assertEqual(h.toaddrs, ['you']) - h.close() for method in ('GET', 'POST', 'PUT'): if method == 'PUT': self.assertRaises(ValueError, logging.handlers.HTTPHandler, @@ -595,6 +596,106 @@ class StreamHandlerTest(BaseTest): logging.raiseExceptions = old_raise sys.stderr = old_stderr + +TEST_SMTP_PORT = 9025 + + +class TestSMTPChannel(smtpd.SMTPChannel): + def __init__(self, server, conn, addr, sockmap): + asynchat.async_chat.__init__(self, conn, sockmap) + self.smtp_server = server + self.conn = conn + self.addr = addr + self.received_lines = [] + self.smtp_state = self.COMMAND + self.seen_greeting = '' + self.mailfrom = None + self.rcpttos = [] + self.received_data = '' + self.fqdn = socket.getfqdn() + self.num_bytes = 0 + try: + self.peer = conn.getpeername() + except socket.error as err: + # a race condition may occur if the other end is closing + # before we can get the peername + self.close() + if err.args[0] != errno.ENOTCONN: + raise + return + self.push('220 %s %s' % (self.fqdn, smtpd.__version__)) + self.set_terminator(b'\r\n') + + +class TestSMTPServer(smtpd.SMTPServer): + channel_class = TestSMTPChannel + + def __init__(self, addr, handler, poll_interval, sockmap): + self._localaddr = addr + self._remoteaddr = None + self.sockmap = sockmap + asyncore.dispatcher.__init__(self, map=sockmap) + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setblocking(0) + self.set_socket(sock, map=sockmap) + # try to re-use a server port if possible + self.set_reuse_addr() + self.bind(addr) + self.listen(5) + except: + self.close() + raise + self._handler = handler + self._thread = None + self.poll_interval = poll_interval + + def handle_accepted(self, conn, addr): + print('Incoming connection from %s' % repr(addr), file=smtpd.DEBUGSTREAM) + channel = self.channel_class(self, conn, addr, self.sockmap) + + def process_message(self, peer, mailfrom, rcpttos, data): + self._handler(peer, mailfrom, rcpttos, data) + + def start(self): + self._thread = t = threading.Thread(target=self.serve_forever, + args=(self.poll_interval,)) + t.setDaemon(True) + t.start() + + def serve_forever(self, poll_interval): + asyncore.loop(poll_interval, map=self.sockmap) + + def stop(self): + self.close() + self._thread.join() + self._thread = None + + +class SMTPHandlerTest(BaseTest): + def test_basic(self): + addr = ('localhost', TEST_SMTP_PORT) + h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log') + self.assertEqual(h.toaddrs, ['you']) + self.messages = [] + sockmap = {} + server = TestSMTPServer(addr, self.process_message, 0.001, sockmap) + server.start() + r = logging.makeLogRecord({'msg': 'Hello'}) + h.handle(r) + server.stop() + self.assertEqual(len(self.messages), 1) + peer, mailfrom, rcpttos, data = self.messages[0] + self.assertEqual(mailfrom, 'me') + self.assertEqual(rcpttos, ['you']) + self.assertTrue('\nSubject: Log\n' in data) + self.assertTrue(data.endswith('\n\nHello')) + h.close() + + def process_message(self, *args): + self.messages.append(args) + + class MemoryHandlerTest(BaseTest): """Tests for the MemoryHandler.""" @@ -3142,7 +3243,7 @@ def test_main(): FormatterTest, BufferingFormatterTest, StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest, BasicConfigTest, - LoggerAdapterTest, LoggerTest, + LoggerAdapterTest, LoggerTest, SMTPHandlerTest, FileHandlerTest, RotatingFileHandlerTest, LastResortTest, LogRecordTest, ExceptionTest, TimedRotatingFileHandlerTest |