summaryrefslogtreecommitdiffstats
path: root/Lib/dos-8x3/cgihttps.py
blob: 837f7c2ea6bc78dce1199269819a29ed77db2de7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""CGI-savvy HTTP Server.

This module builds on SimpleHTTPServer by implementing GET and POST
requests to cgi-bin scripts.

"""


__version__ = "0.3"


import os
import sys
import time
import socket
import string
import urllib
import BaseHTTPServer
import SimpleHTTPServer


class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):

    """Complete HTTP server with GET, HEAD and POST commands.

    GET and HEAD also support running CGI scripts.

    The POST command is *only* implemented for CGI scripts.

    """

    def do_POST(self):
	"""Serve a POST request.

	This is only implemented for CGI scripts.

	"""

	if self.is_cgi():
	    self.run_cgi()
	else:
	    self.send_error(501, "Can only POST to CGI scripts")

    def send_head(self):
	"""Version of send_head that support CGI scripts"""
	if self.is_cgi():
	    return self.run_cgi()
	else:
	    return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)

    def is_cgi(self):
	"""test whether PATH corresponds to a CGI script.

	Return a tuple (dir, rest) if PATH requires running a
	CGI script, None if not.  Note that rest begins with a
	slash if it is not empty.

	The default implementation tests whether the path
	begins with one of the strings in the list
	self.cgi_directories (and the next character is a '/'
	or the end of the string).

	"""

	path = self.path

	for x in self.cgi_directories:
	    i = len(x)
	    if path[:i] == x and (not path[i:] or path[i] == '/'):
		self.cgi_info = path[:i], path[i+1:]
		return 1
	return 0

    cgi_directories = ['/cgi-bin', '/htbin']

    def run_cgi(self):
	"""Execute a CGI script."""
	dir, rest = self.cgi_info
	i = string.rfind(rest, '?')
	if i >= 0:
	    rest, query = rest[:i], rest[i+1:]
	else:
	    query = ''
	i = string.find(rest, '/')
	if i >= 0:
	    script, rest = rest[:i], rest[i:]
	else:
	    script, rest = rest, ''
	scriptname = dir + '/' + script
	scriptfile = self.translate_path(scriptname)
	if not os.path.exists(scriptfile):
	    self.send_error(404, "No such CGI script (%s)" % `scriptname`)
	    return
	if not os.path.isfile(scriptfile):
	    self.send_error(403, "CGI script is not a plain file (%s)" %
			    `scriptname`)
	    return
	if not executable(scriptfile):
	    self.send_error(403, "CGI script is not executable (%s)" %
			    `scriptname`)
	    return
	nobody = nobody_uid()
	self.send_response(200, "Script output follows")
	self.wfile.flush() # Always flush before forking
	pid = os.fork()
	if pid != 0:
	    # Parent
	    pid, sts = os.waitpid(pid, 0)
	    if sts:
		self.log_error("CGI script exit status x%x" % sts)
	    return
	# Child
	try:
	    # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
	    # XXX Much of the following could be prepared ahead of time!
	    env = {}
	    env['SERVER_SOFTWARE'] = self.version_string()
	    env['SERVER_NAME'] = self.server.server_name
	    env['GATEWAY_INTERFACE'] = 'CGI/1.1'
	    env['SERVER_PROTOCOL'] = self.protocol_version
	    env['SERVER_PORT'] = str(self.server.server_port)
	    env['REQUEST_METHOD'] = self.command
	    uqrest = urllib.unquote(rest)
	    env['PATH_INFO'] = uqrest
	    env['PATH_TRANSLATED'] = self.translate_path(uqrest)
	    env['SCRIPT_NAME'] = scriptname
	    if query:
		env['QUERY_STRING'] = query
	    host = self.address_string()
	    if host != self.client_address[0]:
		env['REMOTE_HOST'] = host
	    env['REMOTE_ADDR'] = self.client_address[0]
	    # AUTH_TYPE
	    # REMOTE_USER
	    # REMOTE_IDENT
	    env['CONTENT_TYPE'] = self.headers.type
	    length = self.headers.getheader('content-length')
	    if length:
		env['CONTENT_LENGTH'] = length
	    accept = []
	    for line in self.headers.getallmatchingheaders('accept'):
		if line[:1] in string.whitespace:
		    accept.append(string.strip(line))
		else:
		    accept = accept + string.split(line[7:])
	    env['HTTP_ACCEPT'] = string.joinfields(accept, ',')
	    ua = self.headers.getheader('user-agent')
	    if ua:
		env['HTTP_USER_AGENT'] = ua
	    # XXX Other HTTP_* headers
	    import regsub
	    decoded_query = regsub.gsub('+', ' ', query)
	    try:
		os.setuid(nobody)
	    except os.error:
		pass
	    os.dup2(self.rfile.fileno(), 0)
	    os.dup2(self.wfile.fileno(), 1)
	    print scriptfile, script, decoded_query
	    os.execve(scriptfile,
		      [script, decoded_query],
		      env)
	except:
	    self.server.handle_error(self.request, self.client_address)
	    os._exit(127)


nobody = None

def nobody_uid():
    """Internal routine to get nobody's uid"""
    global nobody
    if nobody:
	return nobody
    import pwd
    try:
	nobody = pwd.getpwnam('nobody')[2]
    except pwd.error:
	nobody = 1 + max(map(lambda x: x[2], pwd.getpwall()))
    return nobody


def executable(path):
    """Test for executable file."""
    try:
	st = os.stat(path)
    except os.error:
	return 0
    return st[0] & 0111 != 0


def test(HandlerClass = CGIHTTPRequestHandler,
	 ServerClass = BaseHTTPServer.HTTPServer):
    import sys
    if sys.argv[1:2] == ['-r']:
	db = MyArchive()
	db.regenindices()
	return
    SimpleHTTPServer.test(HandlerClass, ServerClass)


if __name__ == '__main__':
    test()