"""CGI-savvy HTTP Server. This module builds on SimpleHTTPServer by implementing GET and POST requests to cgi-bin scripts. If the os.fork() function is not present, this module will not work; SystemError will be raised instead. """ __version__ = "0.3" import os import string import urllib import BaseHTTPServer import SimpleHTTPServer try: os.fork except NameError: raise SystemError, __name__ + " requires os.fork()" class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): """Complete HTTP server with GET, HEAD and POST commands. GET and HEAD also support running CGI scripts. The POST command is *only* implemented for CGI scripts. """ def do_POST(self): """Serve a POST request. This is only implemented for CGI scripts. """ if self.is_cgi(): self.run_cgi() else: self.send_error(501, "Can only POST to CGI scripts") def send_head(self): """Version of send_head that support CGI scripts""" if self.is_cgi(): return self.run_cgi() else: return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self) def is_cgi(self): """test whether PATH corresponds to a CGI script. Return a tuple (dir, rest) if PATH requires running a CGI script, None if not. Note that rest begins with a slash if it is not empty. The default implementation tests whether the path begins with one of the strings in the list self.cgi_directories (and the next character is a '/' or the end of the string). """ path = self.path for x in self.cgi_directories: i = len(x) if path[:i] == x and (not path[i:] or path[i] == '/'): self.cgi_info = path[:i], path[i+1:] return 1 return 0 cgi_directories = ['/cgi-bin', '/htbin'] def run_cgi(self): """Execute a CGI script.""" dir, rest = self.cgi_info i = string.rfind(rest, '?') if i >= 0: rest, query = rest[:i], rest[i+1:] else: query = '' i = string.find(rest, '/') if i >= 0: script, rest = rest[:i], rest[i:] else: script, rest = rest, '' scriptname = dir + '/' + script scriptfile = self.translate_path(scriptname) if not os.path.exists(scriptfile): self.send_error(404, "No such CGI script (%s)" % `scriptname`) return if not os.path.isfile(scriptfile): self.send_error(403, "CGI script is not a plain file (%s)" % `scriptname`) return if not executable(scriptfile): self.send_error(403, "CGI script is not executable (%s)" % `scriptname`) return nobody = nobody_uid() self.send_response(200, "Script output follows") self.wfile.flush() # Always flush before forking pid = os.fork() if pid != 0: # Parent pid, sts = os.waitpid(pid, 0) if sts: self.log_error("CGI script exit status x%x" % sts) return # Child try: # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html # XXX Much of the following could be prepared ahead of time! env = {} env['SERVER_SOFTWARE'] = self.version_string() env['SERVER_NAME'] = self.server.server_name env['GATEWAY_INTERFACE'] = 'CGI/1.1' env['SERVER_PROTOCOL'] = self.protocol_version env['SERVER_PORT'] = str(self.server.server_port) env['REQUEST_METHOD'] = self.command uqrest = urllib.unquote(rest) env['PATH_INFO'] = uqrest env['PATH_TRANSLATED'] = self.translate_path(uqrest) env['SCRIPT_NAME'] = scriptname if query: env['QUERY_STRING'] = query host = self.address_string() if host != self.client_address[0]: env['REMOTE_HOST'] = host env['REMOTE_ADDR'] = self.client_address[0] # AUTH_TYPE # REMOTE_USER # REMOTE_IDENT if self.headers.typeheader is None: env['CONTENT_TYPE'] = self.headers.type else: env['CONTENT_TYPE'] = self.headers.typeheader length = self.headers.getheader('content-length') if length: env['CONTENT_LENGTH'] = length accept = [] for line in self.headers.getallmatchingheaders('accept'): if line[:1] in string.whitespace: accept.append(string.strip(line)) else: accept = accept + string.split(line[7:], ',') env['HTTP_ACCEPT'] = string.joinfields(accept, ',') ua = self.headers.getheader('user-agent') if ua: env['HTTP_USER_AGENT'] = ua co = filter(None, self.headers.getheaders('cookie')) if co: env['HTTP_COOKIE'] = string.join(co, ', ') # XXX Other HTTP_* headers decoded_query = string.replace(query, '+', ' ') try: os.setuid(nobody) except os.error: pass os.dup2(self.rfile.fileno(), 0) os.dup2(self.wfile.fileno(), 1) print scriptfile, script, decoded_query os.execve(scriptfile, [script, decoded_query], env) except: self.server.handle_error(self.request, self.client_address) os._exit(127) nobody = None def nobody_uid(): """Internal routine to get nobody's uid""" global nobody if nobody: return nobody import pwd try: nobody = pwd.getpwnam('nobody')[2] except KeyError: nobody = 1 + max(map(lambda x: x[2], pwd.getpwall())) return nobody def executable(path): """Test for executable file.""" try: st = os.stat(path) except os.error: return 0 return st[0] & 0111 != 0 def test(HandlerClass = CGIHTTPRequestHandler, ServerClass = BaseHTTPServer.HTTPServer): SimpleHTTPServer.test(HandlerClass, ServerClass) if __name__ == '__main__': test()