diff options
Diffstat (limited to 'Lib/test/test_httpservers.py')
| -rw-r--r-- | Lib/test/test_httpservers.py | 240 |
1 files changed, 204 insertions, 36 deletions
diff --git a/Lib/test/test_httpservers.py b/Lib/test/test_httpservers.py index 5c48671..a7752d9 100644 --- a/Lib/test/test_httpservers.py +++ b/Lib/test/test_httpservers.py @@ -7,18 +7,23 @@ Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer from SimpleHTTPServer import SimpleHTTPRequestHandler from CGIHTTPServer import CGIHTTPRequestHandler +import CGIHTTPServer import os import sys +import re import base64 import shutil import urllib import httplib import tempfile -import threading import unittest + +from StringIO import StringIO + from test import test_support +threading = test_support.import_module('threading') class NoLogRequestHandler: @@ -26,6 +31,21 @@ class NoLogRequestHandler: # don't write log messages to stderr pass +class SocketlessRequestHandler(SimpleHTTPRequestHandler): + def __init__(self): + self.get_called = False + self.protocol_version = "HTTP/1.1" + + def do_GET(self): + self.get_called = True + self.send_response(200) + self.send_header('Content-Type', 'text/html') + self.end_headers() + self.wfile.write(b'<html><body>Data</body></html>\r\n') + + def log_message(self, format, *args): + pass + class TestServerThread(threading.Thread): def __init__(self, test_object, request_handler): @@ -49,6 +69,8 @@ class TestServerThread(threading.Thread): class BaseTestCase(unittest.TestCase): def setUp(self): + self._threads = test_support.threading_setup() + os.environ = test_support.EnvironmentVarGuard() self.server_started = threading.Event() self.thread = TestServerThread(self, self.request_handler) self.thread.start() @@ -56,12 +78,78 @@ class BaseTestCase(unittest.TestCase): def tearDown(self): self.thread.stop() + os.environ.__exit__() + test_support.threading_cleanup(*self._threads) def request(self, uri, method='GET', body=None, headers={}): self.connection = httplib.HTTPConnection('localhost', self.PORT) self.connection.request(method, uri, body, headers) return self.connection.getresponse() +class BaseHTTPRequestHandlerTestCase(unittest.TestCase): + """Test the functionality of the BaseHTTPServer focussing on + BaseHTTPRequestHandler. + """ + + HTTPResponseMatch = re.compile('HTTP/1.[0-9]+ 200 OK') + + def setUp (self): + self.handler = SocketlessRequestHandler() + + def send_typical_request(self, message): + input = StringIO(message) + output = StringIO() + self.handler.rfile = input + self.handler.wfile = output + self.handler.handle_one_request() + output.seek(0) + return output.readlines() + + def verify_get_called(self): + self.assertTrue(self.handler.get_called) + + def verify_expected_headers(self, headers): + for fieldName in 'Server: ', 'Date: ', 'Content-Type: ': + self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) + + def verify_http_server_response(self, response): + match = self.HTTPResponseMatch.search(response) + self.assertTrue(match is not None) + + def test_http_1_1(self): + result = self.send_typical_request('GET / HTTP/1.1\r\n\r\n') + self.verify_http_server_response(result[0]) + self.verify_expected_headers(result[1:-1]) + self.verify_get_called() + self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n') + + def test_http_1_0(self): + result = self.send_typical_request('GET / HTTP/1.0\r\n\r\n') + self.verify_http_server_response(result[0]) + self.verify_expected_headers(result[1:-1]) + self.verify_get_called() + self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n') + + def test_http_0_9(self): + result = self.send_typical_request('GET / HTTP/0.9\r\n\r\n') + self.assertEqual(len(result), 1) + self.assertEqual(result[0], '<html><body>Data</body></html>\r\n') + self.verify_get_called() + + def test_with_continue_1_0(self): + result = self.send_typical_request('GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') + self.verify_http_server_response(result[0]) + self.verify_expected_headers(result[1:-1]) + self.verify_get_called() + self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n') + + def test_request_length(self): + # Issue #10714: huge request lines are discarded, to avoid Denial + # of Service attacks. + result = self.send_typical_request(b'GET ' + b'x' * 65537) + self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') + self.assertFalse(self.handler.get_called) + class BaseHTTPServerTestCase(BaseTestCase): class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): @@ -97,42 +185,42 @@ class BaseHTTPServerTestCase(BaseTestCase): def test_command(self): self.con.request('GET', '/') res = self.con.getresponse() - self.assertEquals(res.status, 501) + self.assertEqual(res.status, 501) def test_request_line_trimming(self): self.con._http_vsn_str = 'HTTP/1.1\n' self.con.putrequest('GET', '/') self.con.endheaders() res = self.con.getresponse() - self.assertEquals(res.status, 501) + self.assertEqual(res.status, 501) def test_version_bogus(self): self.con._http_vsn_str = 'FUBAR' self.con.putrequest('GET', '/') self.con.endheaders() res = self.con.getresponse() - self.assertEquals(res.status, 400) + self.assertEqual(res.status, 400) def test_version_digits(self): self.con._http_vsn_str = 'HTTP/9.9.9' self.con.putrequest('GET', '/') self.con.endheaders() res = self.con.getresponse() - self.assertEquals(res.status, 400) + self.assertEqual(res.status, 400) def test_version_none_get(self): self.con._http_vsn_str = '' self.con.putrequest('GET', '/') self.con.endheaders() res = self.con.getresponse() - self.assertEquals(res.status, 501) + self.assertEqual(res.status, 501) def test_version_none(self): self.con._http_vsn_str = '' self.con.putrequest('PUT', '/') self.con.endheaders() res = self.con.getresponse() - self.assertEquals(res.status, 400) + self.assertEqual(res.status, 400) def test_version_invalid(self): self.con._http_vsn = 99 @@ -140,21 +228,21 @@ class BaseHTTPServerTestCase(BaseTestCase): self.con.putrequest('GET', '/') self.con.endheaders() res = self.con.getresponse() - self.assertEquals(res.status, 505) + self.assertEqual(res.status, 505) def test_send_blank(self): self.con._http_vsn_str = '' self.con.putrequest('', '') self.con.endheaders() res = self.con.getresponse() - self.assertEquals(res.status, 400) + self.assertEqual(res.status, 400) def test_header_close(self): self.con.putrequest('GET', '/') self.con.putheader('Connection', 'close') self.con.endheaders() res = self.con.getresponse() - self.assertEquals(res.status, 501) + self.assertEqual(res.status, 501) def test_head_keep_alive(self): self.con._http_vsn_str = 'HTTP/1.1' @@ -162,28 +250,29 @@ class BaseHTTPServerTestCase(BaseTestCase): self.con.putheader('Connection', 'keep-alive') self.con.endheaders() res = self.con.getresponse() - self.assertEquals(res.status, 501) + self.assertEqual(res.status, 501) def test_handler(self): self.con.request('TEST', '/') res = self.con.getresponse() - self.assertEquals(res.status, 204) + self.assertEqual(res.status, 204) def test_return_header_keep_alive(self): self.con.request('KEEP', '/') res = self.con.getresponse() - self.assertEquals(res.getheader('Connection'), 'keep-alive') + self.assertEqual(res.getheader('Connection'), 'keep-alive') self.con.request('TEST', '/') + self.addCleanup(self.con.close) def test_internal_key_error(self): self.con.request('KEYERROR', '/') res = self.con.getresponse() - self.assertEquals(res.status, 999) + self.assertEqual(res.status, 999) def test_return_custom_status(self): self.con.request('CUSTOM', '/') res = self.con.getresponse() - self.assertEquals(res.status, 999) + self.assertEqual(res.status, 999) class SimpleHTTPServerTestCase(BaseTestCase): @@ -214,9 +303,9 @@ class SimpleHTTPServerTestCase(BaseTestCase): def check_status_and_reason(self, response, status, data=None): body = response.read() - self.assert_(response) - self.assertEquals(response.status, status) - self.assert_(response.reason != None) + self.assertTrue(response) + self.assertEqual(response.status, status) + self.assertIsNotNone(response.reason) if data: self.assertEqual(data, body) @@ -235,8 +324,10 @@ class SimpleHTTPServerTestCase(BaseTestCase): f = open(os.path.join(self.tempdir_name, 'index.html'), 'w') response = self.request('/' + self.tempdir_name + '/') self.check_status_and_reason(response, 200) - if os.name == 'posix': - # chmod won't work as expected on Windows platforms + + # chmod() doesn't work as expected on Windows, and filesystem + # permissions are ignored by root on Unix. + if os.name == 'posix' and os.geteuid() != 0: os.chmod(self.tempdir, 0) response = self.request(self.tempdir_name + '/') self.check_status_and_reason(response, 404) @@ -277,10 +368,13 @@ print "Content-type: text/html" print form = cgi.FieldStorage() -print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\ - form.getfirst("bacon")) +print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), + form.getfirst("bacon")) """ + +@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, + "This test can't be run reliably as root (issue #13308).") class CGIHTTPServerTestCase(BaseTestCase): class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): pass @@ -324,43 +418,117 @@ class CGIHTTPServerTestCase(BaseTestCase): finally: BaseTestCase.tearDown(self) + def test_url_collapse_path_split(self): + test_vectors = { + '': ('/', ''), + '..': IndexError, + '/.//..': IndexError, + '/': ('/', ''), + '//': ('/', ''), + '/\\': ('/', '\\'), + '/.//': ('/', ''), + 'cgi-bin/file1.py': ('/cgi-bin', 'file1.py'), + '/cgi-bin/file1.py': ('/cgi-bin', 'file1.py'), + '/cgi-bin/file1.py/PATH-INFO': ('/cgi-bin', 'file1.py/PATH-INFO'), + 'a': ('/', 'a'), + '/a': ('/', 'a'), + '//a': ('/', 'a'), + './a': ('/', 'a'), + './C:/': ('/C:', ''), + '/a/b': ('/a', 'b'), + '/a/b/': ('/a/b', ''), + '/a/b/c/..': ('/a/b', ''), + '/a/b/c/../d': ('/a/b', 'd'), + '/a/b/c/../d/e/../f': ('/a/b/d', 'f'), + '/a/b/c/../d/e/../../f': ('/a/b', 'f'), + '/a/b/c/../d/e/.././././..//f': ('/a/b', 'f'), + '../a/b/c/../d/e/.././././..//f': IndexError, + '/a/b/c/../d/e/../../../f': ('/a', 'f'), + '/a/b/c/../d/e/../../../../f': ('/', 'f'), + '/a/b/c/../d/e/../../../../../f': IndexError, + '/a/b/c/../d/e/../../../../f/..': ('/', ''), + } + for path, expected in test_vectors.iteritems(): + if isinstance(expected, type) and issubclass(expected, Exception): + self.assertRaises(expected, + CGIHTTPServer._url_collapse_path_split, path) + else: + actual = CGIHTTPServer._url_collapse_path_split(path) + self.assertEqual(expected, actual, + msg='path = %r\nGot: %r\nWanted: %r' % + (path, actual, expected)) + def test_headers_and_content(self): res = self.request('/cgi-bin/file1.py') - self.assertEquals(('Hello World\n', 'text/html', 200), \ - (res.read(), res.getheader('Content-type'), res.status)) + self.assertEqual(('Hello World\n', 'text/html', 200), + (res.read(), res.getheader('Content-type'), res.status)) def test_post(self): params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) headers = {'Content-type' : 'application/x-www-form-urlencoded'} res = self.request('/cgi-bin/file2.py', 'POST', params, headers) - self.assertEquals(res.read(), '1, python, 123456\n') + self.assertEqual(res.read(), '1, python, 123456\n') def test_invaliduri(self): res = self.request('/cgi-bin/invalid') res.read() - self.assertEquals(res.status, 404) + self.assertEqual(res.status, 404) def test_authorization(self): - headers = {'Authorization' : 'Basic %s' % \ - base64.b64encode('username:pass')} + headers = {'Authorization' : 'Basic %s' % + base64.b64encode('username:pass')} res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) - self.assertEquals(('Hello World\n', 'text/html', 200), \ + self.assertEqual(('Hello World\n', 'text/html', 200), + (res.read(), res.getheader('Content-type'), res.status)) + + def test_no_leading_slash(self): + # http://bugs.python.org/issue2254 + res = self.request('cgi-bin/file1.py') + self.assertEqual(('Hello World\n', 'text/html', 200), (res.read(), res.getheader('Content-type'), res.status)) + def test_os_environ_is_not_altered(self): + signature = "Test CGI Server" + os.environ['SERVER_SOFTWARE'] = signature + res = self.request('/cgi-bin/file1.py') + self.assertEqual((b'Hello World\n', 'text/html', 200), + (res.read(), res.getheader('Content-type'), res.status)) + self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) + + +class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): + """ Test url parsing """ + def setUp(self): + self.translated = os.getcwd() + self.translated = os.path.join(self.translated, 'filename') + self.handler = SocketlessRequestHandler() + + def test_query_arguments(self): + path = self.handler.translate_path('/filename') + self.assertEqual(path, self.translated) + path = self.handler.translate_path('/filename?foo=bar') + self.assertEqual(path, self.translated) + path = self.handler.translate_path('/filename?a=b&spam=eggs#zot') + self.assertEqual(path, self.translated) + + def test_start_with_double_slash(self): + path = self.handler.translate_path('//filename') + self.assertEqual(path, self.translated) + path = self.handler.translate_path('//filename?foo=bar') + self.assertEqual(path, self.translated) + def test_main(verbose=None): - cwd = os.getcwd() - env = os.environ.copy() try: - test_support.run_unittest(BaseHTTPServerTestCase, + cwd = os.getcwd() + test_support.run_unittest(BaseHTTPRequestHandlerTestCase, + SimpleHTTPRequestHandlerTestCase, + BaseHTTPServerTestCase, SimpleHTTPServerTestCase, CGIHTTPServerTestCase - ) + ) finally: - test_support.reap_children() - os.environ.clear() - os.environ.update(env) os.chdir(cwd) if __name__ == '__main__': |
