1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
|
import httplib
import io
import sys
import socket
from unittest import TestCase
from test import test_support
class FakeSocket:
def __init__(self, text, fileclass=io.BytesIO):
if isinstance(text, str):
text = text.encode("ascii")
self.text = text
self.fileclass = fileclass
self.data = b''
def sendall(self, data):
self.data += data
def makefile(self, mode, bufsize=None):
if mode != 'r' and mode != 'rb':
raise httplib.UnimplementedFileMode()
return self.fileclass(self.text)
class NoEOFStringIO(io.BytesIO):
"""Like StringIO, but raises AssertionError on EOF.
This is used below to test that httplib doesn't try to read
more from the underlying file than it should.
"""
def read(self, n=-1):
data = io.BytesIO.read(self, n)
if data == b'':
raise AssertionError('caller tried to read past EOF')
return data
def readline(self, length=None):
data = io.BytesIO.readline(self, length)
if data == b'':
raise AssertionError('caller tried to read past EOF')
return data
class HeaderTests(TestCase):
def test_auto_headers(self):
# Some headers are added automatically, but should not be added by
# .request() if they are explicitly set.
import httplib
class HeaderCountingBuffer(list):
def __init__(self):
self.count = {}
def append(self, item):
kv = item.split(b':')
if len(kv) > 1:
# item is a 'Key: Value' header string
lcKey = kv[0].decode('ascii').lower()
self.count.setdefault(lcKey, 0)
self.count[lcKey] += 1
list.append(self, item)
for explicit_header in True, False:
for header in 'Content-length', 'Host', 'Accept-encoding':
conn = httplib.HTTPConnection('example.com')
conn.sock = FakeSocket('blahblahblah')
conn._buffer = HeaderCountingBuffer()
body = 'spamspamspam'
headers = {}
if explicit_header:
headers[header] = str(len(body))
conn.request('POST', '/', body, headers)
self.assertEqual(conn._buffer.count[header.lower()], 1)
class BasicTest(TestCase):
def test_status_lines(self):
# Test HTTP status lines
body = "HTTP/1.1 200 Ok\r\n\r\nText"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock)
resp.begin()
self.assertEqual(resp.read(), b"Text")
resp.close()
body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock)
self.assertRaises(httplib.BadStatusLine, resp.begin)
def test_host_port(self):
# Check invalid host_port
for hp in ("www.python.org:abc", "www.python.org:"):
self.assertRaises(httplib.InvalidURL, httplib.HTTPConnection, hp)
for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
"fe80::207:e9ff:fe9b", 8000),
("www.python.org:80", "www.python.org", 80),
("www.python.org", "www.python.org", 80),
("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
c = httplib.HTTPConnection(hp)
self.assertEqual(h, c.host)
self.assertEqual(p, c.port)
def test_response_headers(self):
# test response with multiple message headers with the same field name.
text = ('HTTP/1.1 200 OK\r\n'
'Set-Cookie: Customer="WILE_E_COYOTE"; '
'Version="1"; Path="/acme"\r\n'
'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
' Path="/acme"\r\n'
'\r\n'
'No body\r\n')
hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
', '
'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
s = FakeSocket(text)
r = httplib.HTTPResponse(s)
r.begin()
cookies = r.getheader("Set-Cookie")
self.assertEqual(cookies, hdr)
def test_read_head(self):
# Test that the library doesn't attempt to read any data
# from a HEAD request. (Tickles SF bug #622042.)
sock = FakeSocket(
'HTTP/1.1 200 OK\r\n'
'Content-Length: 14432\r\n'
'\r\n',
NoEOFStringIO)
resp = httplib.HTTPResponse(sock, method="HEAD")
resp.begin()
if resp.read() != "":
self.fail("Did not expect response from HEAD request")
resp.close()
def test_send_file(self):
expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
b'Accept-Encoding: identity\r\nContent-Length:')
body = open(__file__, 'rb')
conn = httplib.HTTPConnection('example.com')
sock = FakeSocket(body)
conn.sock = sock
conn.request('GET', '/foo', body)
self.assertTrue(sock.data.startswith(expected))
class OfflineTest(TestCase):
def test_responses(self):
self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found")
PORT = 50003
HOST = "localhost"
class TimeoutTest(TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
global PORT
PORT = test_support.bind_port(self.serv, HOST, PORT)
self.serv.listen(5)
def tearDown(self):
self.serv.close()
self.serv = None
def testTimeoutAttribute(self):
# This will prove that the timeout gets through HTTPConnection
# and into the socket.
# default
httpConn = httplib.HTTPConnection(HOST, PORT)
httpConn.connect()
self.assertTrue(httpConn.sock.gettimeout() is None)
httpConn.close()
# a value
httpConn = httplib.HTTPConnection(HOST, PORT, timeout=30)
httpConn.connect()
self.assertEqual(httpConn.sock.gettimeout(), 30)
httpConn.close()
# None, having other default
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
httpConn = httplib.HTTPConnection(HOST, PORT, timeout=None)
httpConn.connect()
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(httpConn.sock.gettimeout(), 30)
httpConn.close()
class HTTPSTimeoutTest(TestCase):
# XXX Here should be tests for HTTPS, there isn't any right now!
def test_attributes(self):
# simple test to check it's storing it
h = httplib.HTTPSConnection(HOST, PORT, timeout=30)
self.assertEqual(h.timeout, 30)
def test_main(verbose=None):
test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest, HTTPSTimeoutTest)
if __name__ == '__main__':
test_main()
|