diff options
Diffstat (limited to 'Lib/test/test_wsgiref.py')
| -rw-r--r-- | Lib/test/test_wsgiref.py | 149 |
1 files changed, 107 insertions, 42 deletions
diff --git a/Lib/test/test_wsgiref.py b/Lib/test/test_wsgiref.py index b7d4c86..08f8d9a 100644 --- a/Lib/test/test_wsgiref.py +++ b/Lib/test/test_wsgiref.py @@ -7,13 +7,13 @@ from wsgiref import util from wsgiref.validate import validator from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app from wsgiref.simple_server import make_server -from StringIO import StringIO -from SocketServer import BaseServer +from io import StringIO, BytesIO, BufferedReader +from socketserver import BaseServer import os import re import sys -from test import test_support +from test import support class MockServer(WSGIServer): """Non-socket HTTP server""" @@ -44,21 +44,22 @@ def hello_app(environ,start_response): ('Content-Type','text/plain'), ('Date','Mon, 05 Jun 2006 18:49:54 GMT') ]) - return ["Hello, world!"] + return [b"Hello, world!"] -def run_amock(app=hello_app, data="GET / HTTP/1.0\n\n"): +def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"): server = make_server("", 80, app, MockServer, MockHandler) - inp, out, err, olderr = StringIO(data), StringIO(), StringIO(), sys.stderr - sys.stderr = err + inp = BufferedReader(BytesIO(data)) + out = BytesIO() + olderr = sys.stderr + err = sys.stderr = StringIO() try: - server.finish_request((inp,out), ("127.0.0.1",8888)) + server.finish_request((inp, out), ("127.0.0.1",8888)) finally: sys.stderr = olderr return out.getvalue(), err.getvalue() - def compare_generic_iter(make_it,match): """Utility to compare a generic 2.1/2.2+ iterator with an iterable @@ -87,26 +88,26 @@ def compare_generic_iter(make_it,match): it = make_it() if not iter(it) is it: raise AssertionError for item in match: - if not it.next()==item: raise AssertionError + if not next(it) == item: raise AssertionError try: - it.next() + next(it) except StopIteration: pass else: - raise AssertionError("Too many items from .next()",it) + raise AssertionError("Too many items from .__next__()", it) class IntegrationTests(TestCase): def check_hello(self, out, has_length=True): self.assertEqual(out, - "HTTP/1.0 200 OK\r\n" - "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n" + ("HTTP/1.0 200 OK\r\n" + "Server: WSGIServer/0.2 Python/"+sys.version.split()[0]+"\r\n" "Content-Type: text/plain\r\n" "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" + (has_length and "Content-Length: 13\r\n" or "") + "\r\n" - "Hello, world!" + "Hello, world!").encode("iso-8859-1") ) def test_plain_hello(self): @@ -124,14 +125,46 @@ class IntegrationTests(TestCase): return ["Hello, world!"] out, err = run_amock(validator(bad_app)) self.assertTrue(out.endswith( - "A server error occurred. Please contact the administrator." + b"A server error occurred. Please contact the administrator." )) self.assertEqual( err.splitlines()[-2], "AssertionError: Headers (('Content-Type', 'text/plain')) must" - " be of type list: <type 'tuple'>" + " be of type list: <class 'tuple'>" ) + def test_wsgi_input(self): + def bad_app(e,s): + e["wsgi.input"].read() + s("200 OK", [("Content-Type", "text/plain; charset=utf-8")]) + return [b"data"] + out, err = run_amock(validator(bad_app)) + self.assertTrue(out.endswith( + b"A server error occurred. Please contact the administrator." + )) + self.assertEqual( + err.splitlines()[-2], "AssertionError" + ) + + def test_bytes_validation(self): + def app(e, s): + s("200 OK", [ + ("Content-Type", "text/plain; charset=utf-8"), + ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"), + ]) + return [b"data"] + out, err = run_amock(validator(app)) + self.assertTrue(err.endswith('"GET / HTTP/1.0" 200 4\n')) + ver = sys.version.split()[0].encode('ascii') + self.assertEqual( + b"HTTP/1.0 200 OK\r\n" + b"Server: WSGIServer/0.2 Python/" + ver + b"\r\n" + b"Content-Type: text/plain; charset=utf-8\r\n" + b"Date: Wed, 24 Dec 2008 13:29:32 GMT\r\n" + b"\r\n" + b"data", + out) + class UtilityTests(TestCase): @@ -149,6 +182,8 @@ class UtilityTests(TestCase): util.setup_testing_defaults(env) if isinstance(value, StringIO): self.assertIsInstance(env[key], StringIO) + elif isinstance(value,BytesIO): + self.assertIsInstance(env[key],BytesIO) else: self.assertEqual(env[key], value) @@ -220,7 +255,7 @@ class UtilityTests(TestCase): ('wsgi.run_once', 0), ('wsgi.multithread', 0), ('wsgi.multiprocess', 0), - ('wsgi.input', StringIO("")), + ('wsgi.input', BytesIO()), ('wsgi.errors', StringIO()), ('wsgi.url_scheme','http'), ]: @@ -245,6 +280,7 @@ class UtilityTests(TestCase): def testAppURIs(self): self.checkAppURI("http://127.0.0.1/") self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam") + self.checkAppURI("http://127.0.0.1/sp%C3%A4m", SCRIPT_NAME="/späm") self.checkAppURI("http://spam.example.com:2071/", HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071") self.checkAppURI("http://spam.example.com/", @@ -258,6 +294,7 @@ class UtilityTests(TestCase): def testReqURIs(self): self.checkReqURI("http://127.0.0.1/") self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam") + self.checkReqURI("http://127.0.0.1/sp%C3%A4m", SCRIPT_NAME="/späm") self.checkReqURI("http://127.0.0.1/spammity/spam", SCRIPT_NAME="/spammity", PATH_INFO="/spam") self.checkReqURI("http://127.0.0.1/spammity/spam;ham", @@ -302,7 +339,7 @@ class HeaderTests(TestCase): del h['foo'] # should not raise an error h['Foo'] = 'bar' - for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__: + for m in h.__contains__, h.get, h.get_all, h.__getitem__: self.assertTrue(m('foo')) self.assertTrue(m('Foo')) self.assertTrue(m('FOO')) @@ -342,7 +379,6 @@ class HeaderTests(TestCase): '\r\n' ) - class ErrorHandler(BaseCGIHandler): """Simple handler subclass for testing BaseHandler""" @@ -354,7 +390,7 @@ class ErrorHandler(BaseCGIHandler): def __init__(self,**kw): setup_testing_defaults(kw) BaseCGIHandler.__init__( - self, StringIO(''), StringIO(), StringIO(), kw, + self, BytesIO(), BytesIO(), StringIO(), kw, multithread=True, multiprocess=True ) @@ -418,12 +454,16 @@ class HandlerTests(TestCase): def trivial_app1(e,s): s('200 OK',[]) - return [e['wsgi.url_scheme']] + return [e['wsgi.url_scheme'].encode('iso-8859-1')] def trivial_app2(e,s): - s('200 OK',[])(e['wsgi.url_scheme']) + s('200 OK',[])(e['wsgi.url_scheme'].encode('iso-8859-1')) return [] + def trivial_app3(e,s): + s('200 OK',[]) + return ['\u0442\u0435\u0441\u0442'.encode("utf-8")] + def trivial_app4(e,s): # Simulate a response to a HEAD request s('200 OK',[('Content-Length', '12345')]) @@ -432,18 +472,25 @@ class HandlerTests(TestCase): h = TestHandler() h.run(trivial_app1) self.assertEqual(h.stdout.getvalue(), - "Status: 200 OK\r\n" + ("Status: 200 OK\r\n" "Content-Length: 4\r\n" "\r\n" - "http") + "http").encode("iso-8859-1")) h = TestHandler() h.run(trivial_app2) self.assertEqual(h.stdout.getvalue(), - "Status: 200 OK\r\n" + ("Status: 200 OK\r\n" "\r\n" - "http") + "http").encode("iso-8859-1")) + h = TestHandler() + h.run(trivial_app3) + self.assertEqual(h.stdout.getvalue(), + b'Status: 200 OK\r\n' + b'Content-Length: 8\r\n' + b'\r\n' + b'\xd1\x82\xd0\xb5\xd1\x81\xd1\x82') h = TestHandler() h.run(trivial_app4) @@ -464,23 +511,24 @@ class HandlerTests(TestCase): h = ErrorHandler() h.run(non_error_app) self.assertEqual(h.stdout.getvalue(), - "Status: 200 OK\r\n" + ("Status: 200 OK\r\n" "Content-Length: 0\r\n" - "\r\n") + "\r\n").encode("iso-8859-1")) self.assertEqual(h.stderr.getvalue(),"") h = ErrorHandler() h.run(error_app) self.assertEqual(h.stdout.getvalue(), - "Status: %s\r\n" + ("Status: %s\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" - "\r\n%s" % (h.error_status,len(h.error_body),h.error_body)) + "\r\n" % (h.error_status,len(h.error_body))).encode('iso-8859-1') + + h.error_body) - self.assertNotEqual(h.stderr.getvalue().find("AssertionError"), -1) + self.assertIn("AssertionError", h.stderr.getvalue()) def testErrorAfterOutput(self): - MSG = "Some output has been sent" + MSG = b"Some output has been sent" def error_app(e,s): s("200 OK",[])(MSG) raise AssertionError("This should be caught by handler") @@ -488,9 +536,9 @@ class HandlerTests(TestCase): h = ErrorHandler() h.run(error_app) self.assertEqual(h.stdout.getvalue(), - "Status: 200 OK\r\n" - "\r\n"+MSG) - self.assertNotEqual(h.stderr.getvalue().find("AssertionError"), -1) + ("Status: 200 OK\r\n" + "\r\n".encode("iso-8859-1")+MSG)) + self.assertIn("AssertionError", h.stderr.getvalue()) def testHeaderFormats(self): @@ -505,7 +553,7 @@ class HandlerTests(TestCase): ) shortpat = ( "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n" - ) + ).encode("iso-8859-1") for ssw in "FooBar/1.0", None: sw = ssw and "Server: %s\r\n" % ssw or "" @@ -526,13 +574,31 @@ class HandlerTests(TestCase): h.server_software = ssw h.run(non_error_app) if proto=="HTTP/0.9": - self.assertEqual(h.stdout.getvalue(),"") + self.assertEqual(h.stdout.getvalue(),b"") else: self.assertTrue( - re.match(stdpat%(version,sw), h.stdout.getvalue()), - (stdpat%(version,sw), h.stdout.getvalue()) + re.match((stdpat%(version,sw)).encode("iso-8859-1"), + h.stdout.getvalue()), + ((stdpat%(version,sw)).encode("iso-8859-1"), + h.stdout.getvalue()) ) + def testBytesData(self): + def app(e, s): + s("200 OK", [ + ("Content-Type", "text/plain; charset=utf-8"), + ]) + return [b"data"] + + h = TestHandler() + h.run(app) + self.assertEqual(b"Status: 200 OK\r\n" + b"Content-Type: text/plain; charset=utf-8\r\n" + b"Content-Length: 4\r\n" + b"\r\n" + b"data", + h.stdout.getvalue()) + def testCloseOnError(self): side_effects = {'close_called': False} MSG = b"Some output has been sent" @@ -543,7 +609,6 @@ class HandlerTests(TestCase): while True: yield b'blah' raise AssertionError("This should be caught by handler") - def close(self): side_effects['close_called'] = True return CrashyIterable() @@ -554,7 +619,7 @@ class HandlerTests(TestCase): def test_main(): - test_support.run_unittest(__name__) + support.run_unittest(__name__) if __name__ == "__main__": test_main() |
