summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_smtplib.py
blob: df0bf4c1b009bf1d6cf0679bf2dd5a3c8025591f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import asyncore
import socket
import threading
import smtpd
import smtplib
import StringIO
import sys
import time
import select

from unittest import TestCase
from test import test_support

HOST = "localhost"
PORT = 54328

def server(evt, buf):
    serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serv.settimeout(3)
    serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    serv.bind(("", PORT))
    serv.listen(5)
    try:
        conn, addr = serv.accept()
    except socket.timeout:
        pass
    else:
        n = 200
        while buf and n > 0:
            r, w, e = select.select([], [conn], [])
            if w:
                sent = conn.send(buf)
                buf = buf[sent:]

            n -= 1
            time.sleep(0.01)

        conn.close()
    finally:
        serv.close()
        evt.set()

class GeneralTests(TestCase):

    def setUp(self):
        self.evt = threading.Event()
        servargs = (self.evt, "220 Hola mundo\n")
        threading.Thread(target=server, args=servargs).start()
        time.sleep(.1)

    def tearDown(self):
        self.evt.wait()

    def testBasic1(self):
        # connects
        smtp = smtplib.SMTP(HOST, PORT)
        smtp.sock.close()

    def testBasic2(self):
        # connects, include port in host name
        smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
        smtp.sock.close()

    def testLocalHostName(self):
        # check that supplied local_hostname is used
        smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
        self.assertEqual(smtp.local_hostname, "testhost")
        smtp.sock.close()

    def testNonnumericPort(self):
        # check that non-numeric port raises ValueError
        self.assertRaises(socket.error, smtplib.SMTP, "localhost", "bogus")

    def testTimeoutDefault(self):
        # default
        smtp = smtplib.SMTP(HOST, PORT)
        self.assertTrue(smtp.sock.gettimeout() is None)
        smtp.sock.close()

    def testTimeoutValue(self):
        # a value
        smtp = smtplib.SMTP(HOST, PORT, timeout=30)
        self.assertEqual(smtp.sock.gettimeout(), 30)
        smtp.sock.close()

    def testTimeoutNone(self):
        # None, having other default
        previous = socket.getdefaulttimeout()
        socket.setdefaulttimeout(30)
        try:
            smtp = smtplib.SMTP(HOST, PORT, timeout=None)
        finally:
            socket.setdefaulttimeout(previous)
        self.assertEqual(smtp.sock.gettimeout(), 30)
        smtp.sock.close()


# Test server using smtpd.DebuggingServer
def debugging_server(evt):
    serv = smtpd.DebuggingServer(("", PORT), ('nowhere', -1))

    try:
        asyncore.loop(timeout=.01, count=300)
    except socket.timeout:
        pass
    finally:
        # allow some time for the client to read the result
        time.sleep(0.5)
        asyncore.close_all()
        evt.set()

MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
MSG_END = '------------ END MESSAGE ------------\n'

# Test behavior of smtpd.DebuggingServer
class DebuggingServerTests(TestCase):

    def setUp(self):
        self.old_stdout = sys.stdout
        self.output = StringIO.StringIO()
        sys.stdout = self.output

        self.evt = threading.Event()
        threading.Thread(target=debugging_server, args=(self.evt,)).start()
        time.sleep(.5)

    def tearDown(self):
        self.evt.wait()
        sys.stdout = self.old_stdout

    def testBasic(self):
        # connect
        smtp = smtplib.SMTP(HOST, PORT)
        smtp.sock.close()

    def testEHLO(self):
        smtp = smtplib.SMTP(HOST, PORT)
        self.assertEqual(smtp.ehlo(), (502, 'Error: command "EHLO" not implemented'))
        smtp.sock.close()

    def testHELP(self):
        smtp = smtplib.SMTP(HOST, PORT)
        self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
        smtp.sock.close()

    def testSend(self):
        # connect and send mail
        m = 'A test message'
        smtp = smtplib.SMTP(HOST, PORT)
        smtp.sendmail('John', 'Sally', m)
        smtp.sock.close()

        self.evt.wait()
        self.output.flush()
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)


class BadHELOServerTests(TestCase):

    def setUp(self):
        self.old_stdout = sys.stdout
        self.output = StringIO.StringIO()
        sys.stdout = self.output

        self.evt = threading.Event()
        servargs = (self.evt, "199 no hello for you!\n")
        threading.Thread(target=server, args=servargs).start()
        time.sleep(.5)

    def tearDown(self):
        self.evt.wait()
        sys.stdout = self.old_stdout

    def testFailingHELO(self):
        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, HOST, PORT)

def test_main(verbose=None):
    test_support.run_unittest(GeneralTests, DebuggingServerTests, BadHELOServerTests)

if __name__ == '__main__':
    test_main()