summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_logging.py
diff options
context:
space:
mode:
authorVinay Sajip <vinay_sajip@yahoo.co.uk>2011-04-30 20:52:48 (GMT)
committerVinay Sajip <vinay_sajip@yahoo.co.uk>2011-04-30 20:52:48 (GMT)
commita463d259304616e3e4051cdace0d7c5d5cc6075f (patch)
tree5ed1e174832b083c86ca286ddb68fb0ad9f728ba /Lib/test/test_logging.py
parentde19e08de4a844f617c3990cf46d6bb8ccb9a8ed (diff)
downloadcpython-a463d259304616e3e4051cdace0d7c5d5cc6075f.zip
cpython-a463d259304616e3e4051cdace0d7c5d5cc6075f.tar.gz
cpython-a463d259304616e3e4051cdace0d7c5d5cc6075f.tar.bz2
Improved test coverage.
Diffstat (limited to 'Lib/test/test_logging.py')
-rw-r--r--Lib/test/test_logging.py109
1 files changed, 105 insertions, 4 deletions
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index 06dd6d2..54ed734 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -25,8 +25,11 @@ import logging
import logging.handlers
import logging.config
+import asynchat
+import asyncore
import codecs
import datetime
+import errno
import pickle
import io
import gc
@@ -35,6 +38,7 @@ import os
import queue
import re
import select
+import smtpd
import socket
from socketserver import ThreadingTCPServer, StreamRequestHandler
import struct
@@ -547,9 +551,6 @@ class HandlerTest(BaseTest):
h.close()
except socket.error: # syslogd might not be available
pass
- h = logging.handlers.SMTPHandler('localhost', 'me', 'you', 'Log')
- self.assertEqual(h.toaddrs, ['you'])
- h.close()
for method in ('GET', 'POST', 'PUT'):
if method == 'PUT':
self.assertRaises(ValueError, logging.handlers.HTTPHandler,
@@ -595,6 +596,106 @@ class StreamHandlerTest(BaseTest):
logging.raiseExceptions = old_raise
sys.stderr = old_stderr
+
+TEST_SMTP_PORT = 9025
+
+
+class TestSMTPChannel(smtpd.SMTPChannel):
+ def __init__(self, server, conn, addr, sockmap):
+ asynchat.async_chat.__init__(self, conn, sockmap)
+ self.smtp_server = server
+ self.conn = conn
+ self.addr = addr
+ self.received_lines = []
+ self.smtp_state = self.COMMAND
+ self.seen_greeting = ''
+ self.mailfrom = None
+ self.rcpttos = []
+ self.received_data = ''
+ self.fqdn = socket.getfqdn()
+ self.num_bytes = 0
+ try:
+ self.peer = conn.getpeername()
+ except socket.error as err:
+ # a race condition may occur if the other end is closing
+ # before we can get the peername
+ self.close()
+ if err.args[0] != errno.ENOTCONN:
+ raise
+ return
+ self.push('220 %s %s' % (self.fqdn, smtpd.__version__))
+ self.set_terminator(b'\r\n')
+
+
+class TestSMTPServer(smtpd.SMTPServer):
+ channel_class = TestSMTPChannel
+
+ def __init__(self, addr, handler, poll_interval, sockmap):
+ self._localaddr = addr
+ self._remoteaddr = None
+ self.sockmap = sockmap
+ asyncore.dispatcher.__init__(self, map=sockmap)
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setblocking(0)
+ self.set_socket(sock, map=sockmap)
+ # try to re-use a server port if possible
+ self.set_reuse_addr()
+ self.bind(addr)
+ self.listen(5)
+ except:
+ self.close()
+ raise
+ self._handler = handler
+ self._thread = None
+ self.poll_interval = poll_interval
+
+ def handle_accepted(self, conn, addr):
+ print('Incoming connection from %s' % repr(addr), file=smtpd.DEBUGSTREAM)
+ channel = self.channel_class(self, conn, addr, self.sockmap)
+
+ def process_message(self, peer, mailfrom, rcpttos, data):
+ self._handler(peer, mailfrom, rcpttos, data)
+
+ def start(self):
+ self._thread = t = threading.Thread(target=self.serve_forever,
+ args=(self.poll_interval,))
+ t.setDaemon(True)
+ t.start()
+
+ def serve_forever(self, poll_interval):
+ asyncore.loop(poll_interval, map=self.sockmap)
+
+ def stop(self):
+ self.close()
+ self._thread.join()
+ self._thread = None
+
+
+class SMTPHandlerTest(BaseTest):
+ def test_basic(self):
+ addr = ('localhost', TEST_SMTP_PORT)
+ h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log')
+ self.assertEqual(h.toaddrs, ['you'])
+ self.messages = []
+ sockmap = {}
+ server = TestSMTPServer(addr, self.process_message, 0.001, sockmap)
+ server.start()
+ r = logging.makeLogRecord({'msg': 'Hello'})
+ h.handle(r)
+ server.stop()
+ self.assertEqual(len(self.messages), 1)
+ peer, mailfrom, rcpttos, data = self.messages[0]
+ self.assertEqual(mailfrom, 'me')
+ self.assertEqual(rcpttos, ['you'])
+ self.assertTrue('\nSubject: Log\n' in data)
+ self.assertTrue(data.endswith('\n\nHello'))
+ h.close()
+
+ def process_message(self, *args):
+ self.messages.append(args)
+
+
class MemoryHandlerTest(BaseTest):
"""Tests for the MemoryHandler."""
@@ -3142,7 +3243,7 @@ def test_main():
FormatterTest, BufferingFormatterTest, StreamHandlerTest,
LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest,
ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
- LoggerAdapterTest, LoggerTest,
+ LoggerAdapterTest, LoggerTest, SMTPHandlerTest,
FileHandlerTest, RotatingFileHandlerTest,
LastResortTest, LogRecordTest, ExceptionTest,
TimedRotatingFileHandlerTest