1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
import asyncore
import socket
import threading
import smtpd
import smtplib
import StringIO
import sys
import time
import select
from unittest import TestCase
from test import test_support
HOST = "localhost"
PORT = 54328
def server(evt, buf):
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.settimeout(3)
serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serv.bind(("", PORT))
serv.listen(5)
try:
conn, addr = serv.accept()
except socket.timeout:
pass
else:
n = 200
while buf and n > 0:
r, w, e = select.select([], [conn], [])
if w:
sent = conn.send(buf)
buf = buf[sent:]
n -= 1
time.sleep(0.01)
conn.close()
finally:
serv.close()
evt.set()
class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
servargs = (self.evt, "220 Hola mundo\n")
threading.Thread(target=server, args=servargs).start()
time.sleep(.1)
def tearDown(self):
self.evt.wait()
def testBasic1(self):
# connects
smtp = smtplib.SMTP(HOST, PORT)
smtp.sock.close()
def testBasic2(self):
# connects, include port in host name
smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
smtp.sock.close()
def testLocalHostName(self):
# check that supplied local_hostname is used
smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
self.assertEqual(smtp.local_hostname, "testhost")
smtp.sock.close()
def testNonnumericPort(self):
# check that non-numeric port raises ValueError
self.assertRaises(socket.error, smtplib.SMTP, "localhost", "bogus")
def testTimeoutDefault(self):
# default
smtp = smtplib.SMTP(HOST, PORT)
self.assertTrue(smtp.sock.gettimeout() is None)
smtp.sock.close()
def testTimeoutValue(self):
# a value
smtp = smtplib.SMTP(HOST, PORT, timeout=30)
self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close()
def testTimeoutNone(self):
# None, having other default
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
smtp = smtplib.SMTP(HOST, PORT, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close()
# Test server using smtpd.DebuggingServer
def debugging_server(evt):
serv = smtpd.DebuggingServer(("", PORT), ('nowhere', -1))
try:
asyncore.loop(timeout=.01, count=300)
except socket.timeout:
pass
finally:
# allow some time for the client to read the result
time.sleep(0.5)
asyncore.close_all()
evt.set()
MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
MSG_END = '------------ END MESSAGE ------------\n'
# Test behavior of smtpd.DebuggingServer
class DebuggingServerTests(TestCase):
def setUp(self):
self.old_stdout = sys.stdout
self.output = StringIO.StringIO()
sys.stdout = self.output
self.evt = threading.Event()
threading.Thread(target=debugging_server, args=(self.evt,)).start()
time.sleep(.5)
def tearDown(self):
self.evt.wait()
sys.stdout = self.old_stdout
def testBasic(self):
# connect
smtp = smtplib.SMTP(HOST, PORT)
smtp.sock.close()
def testEHLO(self):
smtp = smtplib.SMTP(HOST, PORT)
self.assertEqual(smtp.ehlo(), (502, 'Error: command "EHLO" not implemented'))
smtp.sock.close()
def testHELP(self):
smtp = smtplib.SMTP(HOST, PORT)
self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
smtp.sock.close()
def testSend(self):
# connect and send mail
m = 'A test message'
smtp = smtplib.SMTP(HOST, PORT)
smtp.sendmail('John', 'Sally', m)
smtp.sock.close()
self.evt.wait()
self.output.flush()
mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
self.assertEqual(self.output.getvalue(), mexpect)
class BadHELOServerTests(TestCase):
def setUp(self):
self.old_stdout = sys.stdout
self.output = StringIO.StringIO()
sys.stdout = self.output
self.evt = threading.Event()
servargs = (self.evt, "199 no hello for you!\n")
threading.Thread(target=server, args=servargs).start()
time.sleep(.5)
def tearDown(self):
self.evt.wait()
sys.stdout = self.old_stdout
def testFailingHELO(self):
self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, HOST, PORT)
def test_main(verbose=None):
test_support.run_unittest(GeneralTests, DebuggingServerTests, BadHELOServerTests)
if __name__ == '__main__':
test_main()
|