summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_wsgiref.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_wsgiref.py')
-rw-r--r--Lib/test/test_wsgiref.py149
1 files changed, 107 insertions, 42 deletions
diff --git a/Lib/test/test_wsgiref.py b/Lib/test/test_wsgiref.py
index b7d4c86..08f8d9a 100644
--- a/Lib/test/test_wsgiref.py
+++ b/Lib/test/test_wsgiref.py
@@ -7,13 +7,13 @@ from wsgiref import util
from wsgiref.validate import validator
from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
from wsgiref.simple_server import make_server
-from StringIO import StringIO
-from SocketServer import BaseServer
+from io import StringIO, BytesIO, BufferedReader
+from socketserver import BaseServer
import os
import re
import sys
-from test import test_support
+from test import support
class MockServer(WSGIServer):
"""Non-socket HTTP server"""
@@ -44,21 +44,22 @@ def hello_app(environ,start_response):
('Content-Type','text/plain'),
('Date','Mon, 05 Jun 2006 18:49:54 GMT')
])
- return ["Hello, world!"]
+ return [b"Hello, world!"]
-def run_amock(app=hello_app, data="GET / HTTP/1.0\n\n"):
+def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"):
server = make_server("", 80, app, MockServer, MockHandler)
- inp, out, err, olderr = StringIO(data), StringIO(), StringIO(), sys.stderr
- sys.stderr = err
+ inp = BufferedReader(BytesIO(data))
+ out = BytesIO()
+ olderr = sys.stderr
+ err = sys.stderr = StringIO()
try:
- server.finish_request((inp,out), ("127.0.0.1",8888))
+ server.finish_request((inp, out), ("127.0.0.1",8888))
finally:
sys.stderr = olderr
return out.getvalue(), err.getvalue()
-
def compare_generic_iter(make_it,match):
"""Utility to compare a generic 2.1/2.2+ iterator with an iterable
@@ -87,26 +88,26 @@ def compare_generic_iter(make_it,match):
it = make_it()
if not iter(it) is it: raise AssertionError
for item in match:
- if not it.next()==item: raise AssertionError
+ if not next(it) == item: raise AssertionError
try:
- it.next()
+ next(it)
except StopIteration:
pass
else:
- raise AssertionError("Too many items from .next()",it)
+ raise AssertionError("Too many items from .__next__()", it)
class IntegrationTests(TestCase):
def check_hello(self, out, has_length=True):
self.assertEqual(out,
- "HTTP/1.0 200 OK\r\n"
- "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
+ ("HTTP/1.0 200 OK\r\n"
+ "Server: WSGIServer/0.2 Python/"+sys.version.split()[0]+"\r\n"
"Content-Type: text/plain\r\n"
"Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
(has_length and "Content-Length: 13\r\n" or "") +
"\r\n"
- "Hello, world!"
+ "Hello, world!").encode("iso-8859-1")
)
def test_plain_hello(self):
@@ -124,14 +125,46 @@ class IntegrationTests(TestCase):
return ["Hello, world!"]
out, err = run_amock(validator(bad_app))
self.assertTrue(out.endswith(
- "A server error occurred. Please contact the administrator."
+ b"A server error occurred. Please contact the administrator."
))
self.assertEqual(
err.splitlines()[-2],
"AssertionError: Headers (('Content-Type', 'text/plain')) must"
- " be of type list: <type 'tuple'>"
+ " be of type list: <class 'tuple'>"
)
+ def test_wsgi_input(self):
+ def bad_app(e,s):
+ e["wsgi.input"].read()
+ s("200 OK", [("Content-Type", "text/plain; charset=utf-8")])
+ return [b"data"]
+ out, err = run_amock(validator(bad_app))
+ self.assertTrue(out.endswith(
+ b"A server error occurred. Please contact the administrator."
+ ))
+ self.assertEqual(
+ err.splitlines()[-2], "AssertionError"
+ )
+
+ def test_bytes_validation(self):
+ def app(e, s):
+ s("200 OK", [
+ ("Content-Type", "text/plain; charset=utf-8"),
+ ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"),
+ ])
+ return [b"data"]
+ out, err = run_amock(validator(app))
+ self.assertTrue(err.endswith('"GET / HTTP/1.0" 200 4\n'))
+ ver = sys.version.split()[0].encode('ascii')
+ self.assertEqual(
+ b"HTTP/1.0 200 OK\r\n"
+ b"Server: WSGIServer/0.2 Python/" + ver + b"\r\n"
+ b"Content-Type: text/plain; charset=utf-8\r\n"
+ b"Date: Wed, 24 Dec 2008 13:29:32 GMT\r\n"
+ b"\r\n"
+ b"data",
+ out)
+
class UtilityTests(TestCase):
@@ -149,6 +182,8 @@ class UtilityTests(TestCase):
util.setup_testing_defaults(env)
if isinstance(value, StringIO):
self.assertIsInstance(env[key], StringIO)
+ elif isinstance(value,BytesIO):
+ self.assertIsInstance(env[key],BytesIO)
else:
self.assertEqual(env[key], value)
@@ -220,7 +255,7 @@ class UtilityTests(TestCase):
('wsgi.run_once', 0),
('wsgi.multithread', 0),
('wsgi.multiprocess', 0),
- ('wsgi.input', StringIO("")),
+ ('wsgi.input', BytesIO()),
('wsgi.errors', StringIO()),
('wsgi.url_scheme','http'),
]:
@@ -245,6 +280,7 @@ class UtilityTests(TestCase):
def testAppURIs(self):
self.checkAppURI("http://127.0.0.1/")
self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
+ self.checkAppURI("http://127.0.0.1/sp%C3%A4m", SCRIPT_NAME="/späm")
self.checkAppURI("http://spam.example.com:2071/",
HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
self.checkAppURI("http://spam.example.com/",
@@ -258,6 +294,7 @@ class UtilityTests(TestCase):
def testReqURIs(self):
self.checkReqURI("http://127.0.0.1/")
self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
+ self.checkReqURI("http://127.0.0.1/sp%C3%A4m", SCRIPT_NAME="/späm")
self.checkReqURI("http://127.0.0.1/spammity/spam",
SCRIPT_NAME="/spammity", PATH_INFO="/spam")
self.checkReqURI("http://127.0.0.1/spammity/spam;ham",
@@ -302,7 +339,7 @@ class HeaderTests(TestCase):
del h['foo'] # should not raise an error
h['Foo'] = 'bar'
- for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__:
+ for m in h.__contains__, h.get, h.get_all, h.__getitem__:
self.assertTrue(m('foo'))
self.assertTrue(m('Foo'))
self.assertTrue(m('FOO'))
@@ -342,7 +379,6 @@ class HeaderTests(TestCase):
'\r\n'
)
-
class ErrorHandler(BaseCGIHandler):
"""Simple handler subclass for testing BaseHandler"""
@@ -354,7 +390,7 @@ class ErrorHandler(BaseCGIHandler):
def __init__(self,**kw):
setup_testing_defaults(kw)
BaseCGIHandler.__init__(
- self, StringIO(''), StringIO(), StringIO(), kw,
+ self, BytesIO(), BytesIO(), StringIO(), kw,
multithread=True, multiprocess=True
)
@@ -418,12 +454,16 @@ class HandlerTests(TestCase):
def trivial_app1(e,s):
s('200 OK',[])
- return [e['wsgi.url_scheme']]
+ return [e['wsgi.url_scheme'].encode('iso-8859-1')]
def trivial_app2(e,s):
- s('200 OK',[])(e['wsgi.url_scheme'])
+ s('200 OK',[])(e['wsgi.url_scheme'].encode('iso-8859-1'))
return []
+ def trivial_app3(e,s):
+ s('200 OK',[])
+ return ['\u0442\u0435\u0441\u0442'.encode("utf-8")]
+
def trivial_app4(e,s):
# Simulate a response to a HEAD request
s('200 OK',[('Content-Length', '12345')])
@@ -432,18 +472,25 @@ class HandlerTests(TestCase):
h = TestHandler()
h.run(trivial_app1)
self.assertEqual(h.stdout.getvalue(),
- "Status: 200 OK\r\n"
+ ("Status: 200 OK\r\n"
"Content-Length: 4\r\n"
"\r\n"
- "http")
+ "http").encode("iso-8859-1"))
h = TestHandler()
h.run(trivial_app2)
self.assertEqual(h.stdout.getvalue(),
- "Status: 200 OK\r\n"
+ ("Status: 200 OK\r\n"
"\r\n"
- "http")
+ "http").encode("iso-8859-1"))
+ h = TestHandler()
+ h.run(trivial_app3)
+ self.assertEqual(h.stdout.getvalue(),
+ b'Status: 200 OK\r\n'
+ b'Content-Length: 8\r\n'
+ b'\r\n'
+ b'\xd1\x82\xd0\xb5\xd1\x81\xd1\x82')
h = TestHandler()
h.run(trivial_app4)
@@ -464,23 +511,24 @@ class HandlerTests(TestCase):
h = ErrorHandler()
h.run(non_error_app)
self.assertEqual(h.stdout.getvalue(),
- "Status: 200 OK\r\n"
+ ("Status: 200 OK\r\n"
"Content-Length: 0\r\n"
- "\r\n")
+ "\r\n").encode("iso-8859-1"))
self.assertEqual(h.stderr.getvalue(),"")
h = ErrorHandler()
h.run(error_app)
self.assertEqual(h.stdout.getvalue(),
- "Status: %s\r\n"
+ ("Status: %s\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: %d\r\n"
- "\r\n%s" % (h.error_status,len(h.error_body),h.error_body))
+ "\r\n" % (h.error_status,len(h.error_body))).encode('iso-8859-1')
+ + h.error_body)
- self.assertNotEqual(h.stderr.getvalue().find("AssertionError"), -1)
+ self.assertIn("AssertionError", h.stderr.getvalue())
def testErrorAfterOutput(self):
- MSG = "Some output has been sent"
+ MSG = b"Some output has been sent"
def error_app(e,s):
s("200 OK",[])(MSG)
raise AssertionError("This should be caught by handler")
@@ -488,9 +536,9 @@ class HandlerTests(TestCase):
h = ErrorHandler()
h.run(error_app)
self.assertEqual(h.stdout.getvalue(),
- "Status: 200 OK\r\n"
- "\r\n"+MSG)
- self.assertNotEqual(h.stderr.getvalue().find("AssertionError"), -1)
+ ("Status: 200 OK\r\n"
+ "\r\n".encode("iso-8859-1")+MSG))
+ self.assertIn("AssertionError", h.stderr.getvalue())
def testHeaderFormats(self):
@@ -505,7 +553,7 @@ class HandlerTests(TestCase):
)
shortpat = (
"Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
- )
+ ).encode("iso-8859-1")
for ssw in "FooBar/1.0", None:
sw = ssw and "Server: %s\r\n" % ssw or ""
@@ -526,13 +574,31 @@ class HandlerTests(TestCase):
h.server_software = ssw
h.run(non_error_app)
if proto=="HTTP/0.9":
- self.assertEqual(h.stdout.getvalue(),"")
+ self.assertEqual(h.stdout.getvalue(),b"")
else:
self.assertTrue(
- re.match(stdpat%(version,sw), h.stdout.getvalue()),
- (stdpat%(version,sw), h.stdout.getvalue())
+ re.match((stdpat%(version,sw)).encode("iso-8859-1"),
+ h.stdout.getvalue()),
+ ((stdpat%(version,sw)).encode("iso-8859-1"),
+ h.stdout.getvalue())
)
+ def testBytesData(self):
+ def app(e, s):
+ s("200 OK", [
+ ("Content-Type", "text/plain; charset=utf-8"),
+ ])
+ return [b"data"]
+
+ h = TestHandler()
+ h.run(app)
+ self.assertEqual(b"Status: 200 OK\r\n"
+ b"Content-Type: text/plain; charset=utf-8\r\n"
+ b"Content-Length: 4\r\n"
+ b"\r\n"
+ b"data",
+ h.stdout.getvalue())
+
def testCloseOnError(self):
side_effects = {'close_called': False}
MSG = b"Some output has been sent"
@@ -543,7 +609,6 @@ class HandlerTests(TestCase):
while True:
yield b'blah'
raise AssertionError("This should be caught by handler")
-
def close(self):
side_effects['close_called'] = True
return CrashyIterable()
@@ -554,7 +619,7 @@ class HandlerTests(TestCase):
def test_main():
- test_support.run_unittest(__name__)
+ support.run_unittest(__name__)
if __name__ == "__main__":
test_main()