summaryrefslogtreecommitdiffstats
path: root/Lib/wsgiref
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/wsgiref')
-rw-r--r--Lib/wsgiref/handlers.py49
-rw-r--r--Lib/wsgiref/headers.py22
-rw-r--r--Lib/wsgiref/validate.py42
3 files changed, 55 insertions, 58 deletions
diff --git a/Lib/wsgiref/handlers.py b/Lib/wsgiref/handlers.py
index a87c32c..3e11219 100644
--- a/Lib/wsgiref/handlers.py
+++ b/Lib/wsgiref/handlers.py
@@ -46,7 +46,7 @@ class BaseHandler:
traceback_limit = None # Print entire traceback to self.get_stderr()
error_status = "500 Internal Server Error"
error_headers = [('Content-Type','text/plain')]
- error_body = "A server error occurred. Please contact the administrator."
+ error_body = b"A server error occurred. Please contact the administrator."
# State variables (don't mess with these)
status = result = None
@@ -137,7 +137,7 @@ class BaseHandler:
self.set_content_length()
def start_response(self, status, headers,exc_info=None):
- """'start_response()' callable as specified by PEP 333"""
+ """'start_response()' callable as specified by PEP 3333"""
if exc_info:
try:
@@ -149,49 +149,48 @@ class BaseHandler:
elif self.headers is not None:
raise AssertionError("Headers already set!")
+ self.status = status
+ self.headers = self.headers_class(headers)
status = self._convert_string_type(status, "Status")
assert len(status)>=4,"Status must be at least 4 characters"
assert int(status[:3]),"Status message must begin w/3-digit code"
assert status[3]==" ", "Status message must have a space after code"
- str_headers = []
- for name,val in headers:
- name = self._convert_string_type(name, "Header name")
- val = self._convert_string_type(val, "Header value")
- str_headers.append((name, val))
- assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed"
+ if __debug__:
+ for name, val in headers:
+ name = self._convert_string_type(name, "Header name")
+ val = self._convert_string_type(val, "Header value")
+ assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed"
- self.status = status
- self.headers = self.headers_class(str_headers)
return self.write
def _convert_string_type(self, value, title):
"""Convert/check value type."""
- if isinstance(value, str):
+ if type(value) is str:
return value
- assert isinstance(value, bytes), \
- "{0} must be a string or bytes object (not {1})".format(title, value)
- return str(value, "iso-8859-1")
+ raise AssertionError(
+ "{0} must be of type str (got {1})".format(title, repr(value))
+ )
def send_preamble(self):
"""Transmit version/status/date/server, via self._write()"""
if self.origin_server:
if self.client_is_modern():
- self._write('HTTP/%s %s\r\n' % (self.http_version,self.status))
+ self._write(('HTTP/%s %s\r\n' % (self.http_version,self.status)).encode('iso-8859-1'))
if 'Date' not in self.headers:
self._write(
- 'Date: %s\r\n' % format_date_time(time.time())
+ ('Date: %s\r\n' % format_date_time(time.time())).encode('iso-8859-1')
)
if self.server_software and 'Server' not in self.headers:
- self._write('Server: %s\r\n' % self.server_software)
+ self._write(('Server: %s\r\n' % self.server_software).encode('iso-8859-1'))
else:
- self._write('Status: %s\r\n' % self.status)
+ self._write(('Status: %s\r\n' % self.status).encode('iso-8859-1'))
def write(self, data):
- """'write()' callable as specified by PEP 333"""
+ """'write()' callable as specified by PEP 3333"""
- assert isinstance(data, (str, bytes)), \
- "write() argument must be a string or bytes"
+ assert type(data) is bytes, \
+ "write() argument must be a bytes instance"
if not self.status:
raise AssertionError("write() before start_response()")
@@ -256,7 +255,7 @@ class BaseHandler:
self.headers_sent = True
if not self.origin_server or self.client_is_modern():
self.send_preamble()
- self._write(str(self.headers))
+ self._write(bytes(self.headers))
def result_is_file(self):
@@ -376,12 +375,6 @@ class SimpleHandler(BaseHandler):
self.environ.update(self.base_env)
def _write(self,data):
- if isinstance(data, str):
- try:
- data = data.encode("iso-8859-1")
- except UnicodeEncodeError:
- raise ValueError("Unicode data must contain only code points"
- " representable in ISO-8859-1 encoding")
self.stdout.write(data)
def _flush(self):
diff --git a/Lib/wsgiref/headers.py b/Lib/wsgiref/headers.py
index cc01f5f..d939628 100644
--- a/Lib/wsgiref/headers.py
+++ b/Lib/wsgiref/headers.py
@@ -30,21 +30,20 @@ class Headers:
"""Manage a collection of HTTP response headers"""
def __init__(self,headers):
- if not isinstance(headers, list):
+ if type(headers) is not list:
raise TypeError("Headers must be a list of name/value tuples")
- self._headers = []
- for k, v in headers:
- k = self._convert_string_type(k)
- v = self._convert_string_type(v)
- self._headers.append((k, v))
+ self._headers = headers
+ if __debug__:
+ for k, v in headers:
+ self._convert_string_type(k)
+ self._convert_string_type(v)
def _convert_string_type(self, value):
"""Convert/check value type."""
- if isinstance(value, str):
+ if type(value) is str:
return value
- assert isinstance(value, bytes), ("Header names/values must be"
- " a string or bytes object (not {0})".format(value))
- return str(value, "iso-8859-1")
+ raise AssertionError("Header names/values must be"
+ " of type str (got {0})".format(repr(value)))
def __len__(self):
"""Return the total number of headers, including duplicates."""
@@ -139,6 +138,9 @@ class Headers:
suitable for direct HTTP transmission."""
return '\r\n'.join(["%s: %s" % kv for kv in self._headers]+['',''])
+ def __bytes__(self):
+ return str(self).encode('iso-8859-1')
+
def setdefault(self,name,value):
"""Return first matching header value for 'name', or 'value'
diff --git a/Lib/wsgiref/validate.py b/Lib/wsgiref/validate.py
index 2df3f9f..05a485d 100644
--- a/Lib/wsgiref/validate.py
+++ b/Lib/wsgiref/validate.py
@@ -128,11 +128,10 @@ def assert_(cond, *args):
raise AssertionError(*args)
def check_string_type(value, title):
- if isinstance(value, str):
+ if type (value) is str:
return value
- assert isinstance(value, bytes), \
- "{0} must be a string or bytes object (not {1})".format(title, value)
- return str(value, "iso-8859-1")
+ raise AssertionError(
+ "{0} must be of type str (got {1})".format(title, repr(value)))
def validator(application):
@@ -197,20 +196,21 @@ class InputWrapper:
def read(self, *args):
assert_(len(args) == 1)
v = self.input.read(*args)
- assert_(isinstance(v, bytes))
+ assert_(type(v) is bytes)
return v
- def readline(self):
- v = self.input.readline()
- assert_(isinstance(v, bytes))
+ def readline(self, *args):
+ assert_(len(args) <= 1)
+ v = self.input.readline(*args)
+ assert_(type(v) is bytes)
return v
def readlines(self, *args):
assert_(len(args) <= 1)
lines = self.input.readlines(*args)
- assert_(isinstance(lines, list))
+ assert_(type(lines) is list)
for line in lines:
- assert_(isinstance(line, bytes))
+ assert_(type(line) is bytes)
return lines
def __iter__(self):
@@ -229,7 +229,7 @@ class ErrorWrapper:
self.errors = wsgi_errors
def write(self, s):
- assert_(isinstance(s, str))
+ assert_(type(s) is str)
self.errors.write(s)
def flush(self):
@@ -248,7 +248,7 @@ class WriteWrapper:
self.writer = wsgi_writer
def __call__(self, s):
- assert_(isinstance(s, (str, bytes)))
+ assert_(type(s) is bytes)
self.writer(s)
class PartialIteratorWrapper:
@@ -275,6 +275,8 @@ class IteratorWrapper:
assert_(not self.closed,
"Iterator read after closed")
v = next(self.iterator)
+ if type(v) is not bytes:
+ assert_(False, "Iterator yielded non-bytestring (%r)" % (v,))
if self.check_start_response is not None:
assert_(self.check_start_response,
"The application returns and we started iterating over its body, but start_response has not yet been called")
@@ -294,7 +296,7 @@ class IteratorWrapper:
"Iterator garbage collected without being closed")
def check_environ(environ):
- assert_(isinstance(environ, dict),
+ assert_(type(environ) is dict,
"Environment is not of the right type: %r (environment: %r)"
% (type(environ), environ))
@@ -321,11 +323,11 @@ def check_environ(environ):
if '.' in key:
# Extension, we don't care about its type
continue
- assert_(isinstance(environ[key], str),
+ assert_(type(environ[key]) is str,
"Environmental variable %s is not a string: %r (value: %r)"
% (key, type(environ[key]), environ[key]))
- assert_(isinstance(environ['wsgi.version'], tuple),
+ assert_(type(environ['wsgi.version']) is tuple,
"wsgi.version should be a tuple (%r)" % (environ['wsgi.version'],))
assert_(environ['wsgi.url_scheme'] in ('http', 'https'),
"wsgi.url_scheme unknown: %r" % environ['wsgi.url_scheme'])
@@ -385,12 +387,12 @@ def check_status(status):
% status, WSGIWarning)
def check_headers(headers):
- assert_(isinstance(headers, list),
+ assert_(type(headers) is list,
"Headers (%r) must be of type list: %r"
% (headers, type(headers)))
header_names = {}
for item in headers:
- assert_(isinstance(item, tuple),
+ assert_(type(item) is tuple,
"Individual headers (%r) must be of type tuple: %r"
% (item, type(item)))
assert_(len(item) == 2)
@@ -428,14 +430,14 @@ def check_content_type(status, headers):
assert_(0, "No Content-Type header found in headers (%s)" % headers)
def check_exc_info(exc_info):
- assert_(exc_info is None or isinstance(exc_info, tuple),
+ assert_(exc_info is None or type(exc_info) is tuple,
"exc_info (%r) is not a tuple: %r" % (exc_info, type(exc_info)))
# More exc_info checks?
def check_iterator(iterator):
- # Technically a string is legal, which is why it's a really bad
+ # Technically a bytestring is legal, which is why it's a really bad
# idea, because it may cause the response to be returned
# character-by-character
assert_(not isinstance(iterator, (str, bytes)),
"You should not return a string as your application iterator, "
- "instead return a single-item list containing that string.")
+ "instead return a single-item list containing a bytestring.")