diff options
Diffstat (limited to 'Lib/test/test_httpservers.py')
-rw-r--r-- | Lib/test/test_httpservers.py | 289 |
1 files changed, 221 insertions, 68 deletions
diff --git a/Lib/test/test_httpservers.py b/Lib/test/test_httpservers.py index 6e5f2db..72e6e08 100644 --- a/Lib/test/test_httpservers.py +++ b/Lib/test/test_httpservers.py @@ -6,12 +6,13 @@ Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. from http.server import BaseHTTPRequestHandler, HTTPServer, \ SimpleHTTPRequestHandler, CGIHTTPRequestHandler -from http import server +from http import server, HTTPStatus import os import sys import re import base64 +import ntpath import shutil import urllib.parse import html @@ -79,13 +80,13 @@ class BaseHTTPServerTestCase(BaseTestCase): default_request_version = 'HTTP/1.1' def do_TEST(self): - self.send_response(204) + self.send_response(HTTPStatus.NO_CONTENT) self.send_header('Content-Type', 'text/html') self.send_header('Connection', 'close') self.end_headers() def do_KEEP(self): - self.send_response(204) + self.send_response(HTTPStatus.NO_CONTENT) self.send_header('Content-Type', 'text/html') self.send_header('Connection', 'keep-alive') self.end_headers() @@ -94,11 +95,11 @@ class BaseHTTPServerTestCase(BaseTestCase): self.send_error(999) def do_NOTFOUND(self): - self.send_error(404) + self.send_error(HTTPStatus.NOT_FOUND) def do_EXPLAINERROR(self): self.send_error(999, "Short Message", - "This is a long \n explaination") + "This is a long \n explanation") def do_CUSTOM(self): self.send_response(999) @@ -114,6 +115,12 @@ class BaseHTTPServerTestCase(BaseTestCase): body = self.headers['x-special-incoming'].encode('utf-8') self.wfile.write(body) + def do_SEND_ERROR(self): + self.send_error(int(self.path[1:])) + + def do_HEAD(self): + self.send_error(int(self.path[1:])) + def setUp(self): BaseTestCase.setUp(self) self.con = http.client.HTTPConnection(self.HOST, self.PORT) @@ -122,35 +129,35 @@ class BaseHTTPServerTestCase(BaseTestCase): def test_command(self): self.con.request('GET', '/') res = self.con.getresponse() - self.assertEqual(res.status, 501) + self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) def test_request_line_trimming(self): self.con._http_vsn_str = 'HTTP/1.1\n' self.con.putrequest('XYZBOGUS', '/') self.con.endheaders() res = self.con.getresponse() - self.assertEqual(res.status, 501) + self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) def test_version_bogus(self): self.con._http_vsn_str = 'FUBAR' self.con.putrequest('GET', '/') self.con.endheaders() res = self.con.getresponse() - self.assertEqual(res.status, 400) + self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) def test_version_digits(self): self.con._http_vsn_str = 'HTTP/9.9.9' self.con.putrequest('GET', '/') self.con.endheaders() res = self.con.getresponse() - self.assertEqual(res.status, 400) + self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) def test_version_none_get(self): self.con._http_vsn_str = '' self.con.putrequest('GET', '/') self.con.endheaders() res = self.con.getresponse() - self.assertEqual(res.status, 501) + self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) def test_version_none(self): # Test that a valid method is rejected when not HTTP/1.x @@ -158,7 +165,7 @@ class BaseHTTPServerTestCase(BaseTestCase): self.con.putrequest('CUSTOM', '/') self.con.endheaders() res = self.con.getresponse() - self.assertEqual(res.status, 400) + self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) def test_version_invalid(self): self.con._http_vsn = 99 @@ -166,21 +173,21 @@ class BaseHTTPServerTestCase(BaseTestCase): self.con.putrequest('GET', '/') self.con.endheaders() res = self.con.getresponse() - self.assertEqual(res.status, 505) + self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED) def test_send_blank(self): self.con._http_vsn_str = '' self.con.putrequest('', '') self.con.endheaders() res = self.con.getresponse() - self.assertEqual(res.status, 400) + self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) def test_header_close(self): self.con.putrequest('GET', '/') self.con.putheader('Connection', 'close') self.con.endheaders() res = self.con.getresponse() - self.assertEqual(res.status, 501) + self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) def test_head_keep_alive(self): self.con._http_vsn_str = 'HTTP/1.1' @@ -188,12 +195,12 @@ class BaseHTTPServerTestCase(BaseTestCase): self.con.putheader('Connection', 'keep-alive') self.con.endheaders() res = self.con.getresponse() - self.assertEqual(res.status, 501) + self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) def test_handler(self): self.con.request('TEST', '/') res = self.con.getresponse() - self.assertEqual(res.status, 204) + self.assertEqual(res.status, HTTPStatus.NO_CONTENT) def test_return_header_keep_alive(self): self.con.request('KEEP', '/') @@ -230,10 +237,85 @@ class BaseHTTPServerTestCase(BaseTestCase): # Issue #16088: standard error responses should have a content-length self.con.request('NOTFOUND', '/') res = self.con.getresponse() - self.assertEqual(res.status, 404) + self.assertEqual(res.status, HTTPStatus.NOT_FOUND) + data = res.read() self.assertEqual(int(res.getheader('Content-Length')), len(data)) + def test_send_error(self): + allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, + HTTPStatus.RESET_CONTENT) + for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED, + HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT, + HTTPStatus.SWITCHING_PROTOCOLS): + self.con.request('SEND_ERROR', '/{}'.format(code)) + res = self.con.getresponse() + self.assertEqual(code, res.status) + self.assertEqual(None, res.getheader('Content-Length')) + self.assertEqual(None, res.getheader('Content-Type')) + if code not in allow_transfer_encoding_codes: + self.assertEqual(None, res.getheader('Transfer-Encoding')) + + data = res.read() + self.assertEqual(b'', data) + + def test_head_via_send_error(self): + allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, + HTTPStatus.RESET_CONTENT) + for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT, + HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT, + HTTPStatus.SWITCHING_PROTOCOLS): + self.con.request('HEAD', '/{}'.format(code)) + res = self.con.getresponse() + self.assertEqual(code, res.status) + if code == HTTPStatus.OK: + self.assertTrue(int(res.getheader('Content-Length')) > 0) + self.assertIn('text/html', res.getheader('Content-Type')) + else: + self.assertEqual(None, res.getheader('Content-Length')) + self.assertEqual(None, res.getheader('Content-Type')) + if code not in allow_transfer_encoding_codes: + self.assertEqual(None, res.getheader('Transfer-Encoding')) + + data = res.read() + self.assertEqual(b'', data) + + +class RequestHandlerLoggingTestCase(BaseTestCase): + class request_handler(BaseHTTPRequestHandler): + protocol_version = 'HTTP/1.1' + default_request_version = 'HTTP/1.1' + + def do_GET(self): + self.send_response(HTTPStatus.OK) + self.end_headers() + + def do_ERROR(self): + self.send_error(HTTPStatus.NOT_FOUND, 'File not found') + + def test_get(self): + self.con = http.client.HTTPConnection(self.HOST, self.PORT) + self.con.connect() + + with support.captured_stderr() as err: + self.con.request('GET', '/') + self.con.getresponse() + + self.assertTrue( + err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n')) + + def test_err(self): + self.con = http.client.HTTPConnection(self.HOST, self.PORT) + self.con.connect() + + with support.captured_stderr() as err: + self.con.request('ERROR', '/') + self.con.getresponse() + + lines = err.getvalue().split('\n') + self.assertTrue(lines[0].endswith('code 404, message File not found')) + self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -')) + class SimpleHTTPServerTestCase(BaseTestCase): class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): @@ -247,6 +329,7 @@ class SimpleHTTPServerTestCase(BaseTestCase): self.data = b'We are the knights who say Ni!' self.tempdir = tempfile.mkdtemp(dir=basetempdir) self.tempdir_name = os.path.basename(self.tempdir) + self.base_url = '/' + self.tempdir_name with open(os.path.join(self.tempdir, 'test'), 'wb') as temp: temp.write(self.data) @@ -261,12 +344,28 @@ class SimpleHTTPServerTestCase(BaseTestCase): BaseTestCase.tearDown(self) def check_status_and_reason(self, response, status, data=None): + def close_conn(): + """Don't close reader yet so we can check if there was leftover + buffered input""" + nonlocal reader + reader = response.fp + response.fp = None + reader = None + response._close_conn = close_conn + body = response.read() self.assertTrue(response) self.assertEqual(response.status, status) self.assertIsNotNone(response.reason) if data: self.assertEqual(data, body) + # Ensure the server has not set up a persistent connection, and has + # not sent any extra data + self.assertEqual(response.version, 10) + self.assertEqual(response.msg.get("Connection", "close"), "close") + self.assertEqual(reader.read(30), b'', 'Connection should be closed') + + reader.close() return body @support.requires_mac_ver(10, 5) @@ -277,7 +376,7 @@ class SimpleHTTPServerTestCase(BaseTestCase): filename = os.fsdecode(support.TESTFN_UNDECODABLE) + '.txt' with open(os.path.join(self.tempdir, filename), 'wb') as f: f.write(support.TESTFN_UNDECODABLE) - response = self.request(self.tempdir_name + '/') + response = self.request(self.base_url + '/') if sys.platform == 'darwin': # On Mac OS the HFS+ filesystem replaces bytes that aren't valid # UTF-8 into a percent-encoded value. @@ -285,52 +384,58 @@ class SimpleHTTPServerTestCase(BaseTestCase): if name != 'test': # Ignore a filename created in setUp(). filename = name break - body = self.check_status_and_reason(response, 200) + body = self.check_status_and_reason(response, HTTPStatus.OK) quotedname = urllib.parse.quote(filename, errors='surrogatepass') self.assertIn(('href="%s"' % quotedname) .encode(enc, 'surrogateescape'), body) self.assertIn(('>%s<' % html.escape(filename)) .encode(enc, 'surrogateescape'), body) - response = self.request(self.tempdir_name + '/' + quotedname) - self.check_status_and_reason(response, 200, + response = self.request(self.base_url + '/' + quotedname) + self.check_status_and_reason(response, HTTPStatus.OK, data=support.TESTFN_UNDECODABLE) def test_get(self): #constructs the path relative to the root directory of the HTTPServer - response = self.request(self.tempdir_name + '/test') - self.check_status_and_reason(response, 200, data=self.data) + response = self.request(self.base_url + '/test') + self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) # check for trailing "/" which should return 404. See Issue17324 - response = self.request(self.tempdir_name + '/test/') - self.check_status_and_reason(response, 404) - response = self.request(self.tempdir_name + '/') - self.check_status_and_reason(response, 200) - response = self.request(self.tempdir_name) - self.check_status_and_reason(response, 301) - response = self.request(self.tempdir_name + '/?hi=2') - self.check_status_and_reason(response, 200) - response = self.request(self.tempdir_name + '?hi=1') - self.check_status_and_reason(response, 301) + response = self.request(self.base_url + '/test/') + self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) + response = self.request(self.base_url + '/') + self.check_status_and_reason(response, HTTPStatus.OK) + response = self.request(self.base_url) + self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) + response = self.request(self.base_url + '/?hi=2') + self.check_status_and_reason(response, HTTPStatus.OK) + response = self.request(self.base_url + '?hi=1') + self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) self.assertEqual(response.getheader("Location"), - self.tempdir_name + "/?hi=1") + self.base_url + "/?hi=1") response = self.request('/ThisDoesNotExist') - self.check_status_and_reason(response, 404) + self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) response = self.request('/' + 'ThisDoesNotExist' + '/') - self.check_status_and_reason(response, 404) - with open(os.path.join(self.tempdir_name, 'index.html'), 'w') as f: - response = self.request('/' + self.tempdir_name + '/') - self.check_status_and_reason(response, 200) - # chmod() doesn't work as expected on Windows, and filesystem - # permissions are ignored by root on Unix. - if os.name == 'posix' and os.geteuid() != 0: - os.chmod(self.tempdir, 0) - response = self.request(self.tempdir_name + '/') - self.check_status_and_reason(response, 404) + self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) + + data = b"Dummy index file\r\n" + with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f: + f.write(data) + response = self.request(self.base_url + '/') + self.check_status_and_reason(response, HTTPStatus.OK, data) + + # chmod() doesn't work as expected on Windows, and filesystem + # permissions are ignored by root on Unix. + if os.name == 'posix' and os.geteuid() != 0: + os.chmod(self.tempdir, 0) + try: + response = self.request(self.base_url + '/') + self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) + finally: os.chmod(self.tempdir, 0o755) def test_head(self): response = self.request( - self.tempdir_name + '/test', method='HEAD') - self.check_status_and_reason(response, 200) + self.base_url + '/test', method='HEAD') + self.check_status_and_reason(response, HTTPStatus.OK) self.assertEqual(response.getheader('content-length'), str(len(self.data))) self.assertEqual(response.getheader('content-type'), @@ -338,12 +443,28 @@ class SimpleHTTPServerTestCase(BaseTestCase): def test_invalid_requests(self): response = self.request('/', method='FOO') - self.check_status_and_reason(response, 501) + self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) # requests must be case sensitive,so this should fail too response = self.request('/', method='custom') - self.check_status_and_reason(response, 501) + self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) response = self.request('/', method='GETs') - self.check_status_and_reason(response, 501) + self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) + + def test_path_without_leading_slash(self): + response = self.request(self.tempdir_name + '/test') + self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) + response = self.request(self.tempdir_name + '/test/') + self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) + response = self.request(self.tempdir_name + '/') + self.check_status_and_reason(response, HTTPStatus.OK) + response = self.request(self.tempdir_name) + self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) + response = self.request(self.tempdir_name + '/?hi=2') + self.check_status_and_reason(response, HTTPStatus.OK) + response = self.request(self.tempdir_name + '?hi=1') + self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) + self.assertEqual(response.getheader("Location"), + self.tempdir_name + "/?hi=1") cgi_file1 = """\ @@ -508,12 +629,13 @@ class CGIHTTPServerTestCase(BaseTestCase): def test_headers_and_content(self): res = self.request('/cgi-bin/file1.py') - self.assertEqual((b'Hello World' + self.linesep, 'text/html', 200), - (res.read(), res.getheader('Content-type'), res.status)) + self.assertEqual( + (res.read(), res.getheader('Content-type'), res.status), + (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK)) def test_issue19435(self): res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh') - self.assertEqual(res.status, 404) + self.assertEqual(res.status, HTTPStatus.NOT_FOUND) def test_post(self): params = urllib.parse.urlencode( @@ -526,50 +648,55 @@ class CGIHTTPServerTestCase(BaseTestCase): def test_invaliduri(self): res = self.request('/cgi-bin/invalid') res.read() - self.assertEqual(res.status, 404) + self.assertEqual(res.status, HTTPStatus.NOT_FOUND) def test_authorization(self): headers = {b'Authorization' : b'Basic ' + base64.b64encode(b'username:pass')} res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) - self.assertEqual((b'Hello World' + self.linesep, 'text/html', 200), - (res.read(), res.getheader('Content-type'), res.status)) + self.assertEqual( + (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), + (res.read(), res.getheader('Content-type'), res.status)) def test_no_leading_slash(self): # http://bugs.python.org/issue2254 res = self.request('cgi-bin/file1.py') - self.assertEqual((b'Hello World' + self.linesep, 'text/html', 200), - (res.read(), res.getheader('Content-type'), res.status)) + self.assertEqual( + (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), + (res.read(), res.getheader('Content-type'), res.status)) def test_os_environ_is_not_altered(self): signature = "Test CGI Server" os.environ['SERVER_SOFTWARE'] = signature res = self.request('/cgi-bin/file1.py') - self.assertEqual((b'Hello World' + self.linesep, 'text/html', 200), - (res.read(), res.getheader('Content-type'), res.status)) + self.assertEqual( + (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), + (res.read(), res.getheader('Content-type'), res.status)) self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) def test_urlquote_decoding_in_cgi_check(self): res = self.request('/cgi-bin%2ffile1.py') - self.assertEqual((b'Hello World' + self.linesep, 'text/html', 200), - (res.read(), res.getheader('Content-type'), res.status)) + self.assertEqual( + (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), + (res.read(), res.getheader('Content-type'), res.status)) def test_nested_cgi_path_issue21323(self): res = self.request('/cgi-bin/child-dir/file3.py') - self.assertEqual((b'Hello World' + self.linesep, 'text/html', 200), - (res.read(), res.getheader('Content-type'), res.status)) + self.assertEqual( + (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), + (res.read(), res.getheader('Content-type'), res.status)) def test_query_with_multiple_question_mark(self): res = self.request('/cgi-bin/file4.py?a=b?c=d') self.assertEqual( - (b'a=b?c=d' + self.linesep, 'text/html', 200), + (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK), (res.read(), res.getheader('Content-type'), res.status)) def test_query_with_continuous_slashes(self): res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//') self.assertEqual( (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep, - 'text/html', 200), + 'text/html', HTTPStatus.OK), (res.read(), res.getheader('Content-type'), res.status)) @@ -580,7 +707,7 @@ class SocketlessRequestHandler(SimpleHTTPRequestHandler): def do_GET(self): self.get_called = True - self.send_response(200) + self.send_response(HTTPStatus.OK) self.send_header('Content-Type', 'text/html') self.end_headers() self.wfile.write(b'<html><body>Data</body></html>\r\n') @@ -590,7 +717,7 @@ class SocketlessRequestHandler(SimpleHTTPRequestHandler): class RejectingSocketlessRequestHandler(SocketlessRequestHandler): def handle_expect_100(self): - self.send_error(417) + self.send_error(HTTPStatus.EXPECTATION_FAILED) return False @@ -793,6 +920,13 @@ class BaseHTTPRequestHandlerTestCase(unittest.TestCase): self.assertFalse(self.handler.get_called) self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') + def test_too_many_headers(self): + result = self.send_typical_request( + b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n') + self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n') + self.assertFalse(self.handler.get_called) + self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') + def test_close_connection(self): # handle_one_request() should be repeatedly called until # it sets close_connection @@ -829,6 +963,24 @@ class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): path = self.handler.translate_path('//filename?foo=bar') self.assertEqual(path, self.translated) + def test_windows_colon(self): + with support.swap_attr(server.os, 'path', ntpath): + path = self.handler.translate_path('c:c:c:foo/filename') + path = path.replace(ntpath.sep, os.sep) + self.assertEqual(path, self.translated) + + path = self.handler.translate_path('\\c:../filename') + path = path.replace(ntpath.sep, os.sep) + self.assertEqual(path, self.translated) + + path = self.handler.translate_path('c:\\c:..\\foo/filename') + path = path.replace(ntpath.sep, os.sep) + self.assertEqual(path, self.translated) + + path = self.handler.translate_path('c:c:foo\\c:c:bar/filename') + path = path.replace(ntpath.sep, os.sep) + self.assertEqual(path, self.translated) + class MiscTestCase(unittest.TestCase): def test_all(self): @@ -847,6 +999,7 @@ def test_main(verbose=None): cwd = os.getcwd() try: support.run_unittest( + RequestHandlerLoggingTestCase, BaseHTTPRequestHandlerTestCase, BaseHTTPServerTestCase, SimpleHTTPServerTestCase, |