diff options
author | Alexandre Vassalotti <alexandre@peadrop.com> | 2008-05-12 02:31:37 (GMT) |
---|---|---|
committer | Alexandre Vassalotti <alexandre@peadrop.com> | 2008-05-12 02:31:37 (GMT) |
commit | ce261952e640b2d09ada1b160f0cfa37db32f928 (patch) | |
tree | 17b1c2129377435cbaf0169f702b78a61407b90f /Lib/socketserver.py | |
parent | 6f1e619b414a93084efbb7244034a3546899fa4b (diff) | |
download | cpython-ce261952e640b2d09ada1b160f0cfa37db32f928.zip cpython-ce261952e640b2d09ada1b160f0cfa37db32f928.tar.gz cpython-ce261952e640b2d09ada1b160f0cfa37db32f928.tar.bz2 |
Renamed the SocketServer module to 'socketserver'.
Merged revisions 63132 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r63132 | alexandre.vassalotti | 2008-05-11 22:11:22 -0400 (Sun, 11 May 2008) | 4 lines
Updated all import statements to use the new socketserver module name.
Renamed socketserver module in its own documentation.
Renamed documentation references.
........
Diffstat (limited to 'Lib/socketserver.py')
-rw-r--r-- | Lib/socketserver.py | 675 |
1 files changed, 675 insertions, 0 deletions
diff --git a/Lib/socketserver.py b/Lib/socketserver.py new file mode 100644 index 0000000..2a9ec8a --- /dev/null +++ b/Lib/socketserver.py @@ -0,0 +1,675 @@ +"""Generic socket server classes. + +This module tries to capture the various aspects of defining a server: + +For socket-based servers: + +- address family: + - AF_INET{,6}: IP (Internet Protocol) sockets (default) + - AF_UNIX: Unix domain sockets + - others, e.g. AF_DECNET are conceivable (see <socket.h> +- socket type: + - SOCK_STREAM (reliable stream, e.g. TCP) + - SOCK_DGRAM (datagrams, e.g. UDP) + +For request-based servers (including socket-based): + +- client address verification before further looking at the request + (This is actually a hook for any processing that needs to look + at the request before anything else, e.g. logging) +- how to handle multiple requests: + - synchronous (one request is handled at a time) + - forking (each request is handled by a new process) + - threading (each request is handled by a new thread) + +The classes in this module favor the server type that is simplest to +write: a synchronous TCP/IP server. This is bad class design, but +save some typing. (There's also the issue that a deep class hierarchy +slows down method lookups.) + +There are five classes in an inheritance diagram, four of which represent +synchronous servers of four types: + + +------------+ + | BaseServer | + +------------+ + | + v + +-----------+ +------------------+ + | TCPServer |------->| UnixStreamServer | + +-----------+ +------------------+ + | + v + +-----------+ +--------------------+ + | UDPServer |------->| UnixDatagramServer | + +-----------+ +--------------------+ + +Note that UnixDatagramServer derives from UDPServer, not from +UnixStreamServer -- the only difference between an IP and a Unix +stream server is the address family, which is simply repeated in both +unix server classes. + +Forking and threading versions of each type of server can be created +using the ForkingMixIn and ThreadingMixIn mix-in classes. For +instance, a threading UDP server class is created as follows: + + class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass + +The Mix-in class must come first, since it overrides a method defined +in UDPServer! Setting the various member variables also changes +the behavior of the underlying server mechanism. + +To implement a service, you must derive a class from +BaseRequestHandler and redefine its handle() method. You can then run +various versions of the service by combining one of the server classes +with your request handler class. + +The request handler class must be different for datagram or stream +services. This can be hidden by using the request handler +subclasses StreamRequestHandler or DatagramRequestHandler. + +Of course, you still have to use your head! + +For instance, it makes no sense to use a forking server if the service +contains state in memory that can be modified by requests (since the +modifications in the child process would never reach the initial state +kept in the parent process and passed to each child). In this case, +you can use a threading server, but you will probably have to use +locks to avoid two requests that come in nearly simultaneous to apply +conflicting changes to the server state. + +On the other hand, if you are building e.g. an HTTP server, where all +data is stored externally (e.g. in the file system), a synchronous +class will essentially render the service "deaf" while one request is +being handled -- which may be for a very long time if a client is slow +to reqd all the data it has requested. Here a threading or forking +server is appropriate. + +In some cases, it may be appropriate to process part of a request +synchronously, but to finish processing in a forked child depending on +the request data. This can be implemented by using a synchronous +server and doing an explicit fork in the request handler class +handle() method. + +Another approach to handling multiple simultaneous requests in an +environment that supports neither threads nor fork (or where these are +too expensive or inappropriate for the service) is to maintain an +explicit table of partially finished requests and to use select() to +decide which request to work on next (or whether to handle a new +incoming request). This is particularly important for stream services +where each client can potentially be connected for a long time (if +threads or subprocesses cannot be used). + +Future work: +- Standard classes for Sun RPC (which uses either UDP or TCP) +- Standard mix-in classes to implement various authentication + and encryption schemes +- Standard framework for select-based multiplexing + +XXX Open problems: +- What to do with out-of-band data? + +BaseServer: +- split generic "request" functionality out into BaseServer class. + Copyright (C) 2000 Luke Kenneth Casson Leighton <lkcl@samba.org> + + example: read entries from a SQL database (requires overriding + get_request() to return a table entry from the database). + entry is processed by a RequestHandlerClass. + +""" + +# Author of the BaseServer patch: Luke Kenneth Casson Leighton + +# XXX Warning! +# There is a test suite for this module, but it cannot be run by the +# standard regression test. +# To run it manually, run Lib/test/test_socketserver.py. + +__version__ = "0.4" + + +import socket +import select +import sys +import os +try: + import threading +except ImportError: + import dummy_threading as threading + +__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer", + "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler", + "StreamRequestHandler","DatagramRequestHandler", + "ThreadingMixIn", "ForkingMixIn"] +if hasattr(socket, "AF_UNIX"): + __all__.extend(["UnixStreamServer","UnixDatagramServer", + "ThreadingUnixStreamServer", + "ThreadingUnixDatagramServer"]) + +class BaseServer: + + """Base class for server classes. + + Methods for the caller: + + - __init__(server_address, RequestHandlerClass) + - serve_forever(poll_interval=0.5) + - shutdown() + - handle_request() # if you do not use serve_forever() + - fileno() -> int # for select() + + Methods that may be overridden: + + - server_bind() + - server_activate() + - get_request() -> request, client_address + - handle_timeout() + - verify_request(request, client_address) + - server_close() + - process_request(request, client_address) + - close_request(request) + - handle_error() + + Methods for derived classes: + + - finish_request(request, client_address) + + Class variables that may be overridden by derived classes or + instances: + + - timeout + - address_family + - socket_type + - allow_reuse_address + + Instance variables: + + - RequestHandlerClass + - socket + + """ + + timeout = None + + def __init__(self, server_address, RequestHandlerClass): + """Constructor. May be extended, do not override.""" + self.server_address = server_address + self.RequestHandlerClass = RequestHandlerClass + self.__is_shut_down = threading.Event() + self.__serving = False + + def server_activate(self): + """Called by constructor to activate the server. + + May be overridden. + + """ + pass + + def serve_forever(self, poll_interval=0.5): + """Handle one request at a time until shutdown. + + Polls for shutdown every poll_interval seconds. Ignores + self.timeout. If you need to do periodic tasks, do them in + another thread. + """ + self.__serving = True + self.__is_shut_down.clear() + while self.__serving: + # XXX: Consider using another file descriptor or + # connecting to the socket to wake this up instead of + # polling. Polling reduces our responsiveness to a + # shutdown request and wastes cpu at all other times. + r, w, e = select.select([self], [], [], poll_interval) + if r: + self._handle_request_noblock() + self.__is_shut_down.set() + + def shutdown(self): + """Stops the serve_forever loop. + + Blocks until the loop has finished. This must be called while + serve_forever() is running in another thread, or it will + deadlock. + """ + self.__serving = False + self.__is_shut_down.wait() + + # The distinction between handling, getting, processing and + # finishing a request is fairly arbitrary. Remember: + # + # - handle_request() is the top-level call. It calls + # select, get_request(), verify_request() and process_request() + # - get_request() is different for stream or datagram sockets + # - process_request() is the place that may fork a new process + # or create a new thread to finish the request + # - finish_request() instantiates the request handler class; + # this constructor will handle the request all by itself + + def handle_request(self): + """Handle one request, possibly blocking. + + Respects self.timeout. + """ + # Support people who used socket.settimeout() to escape + # handle_request before self.timeout was available. + timeout = self.socket.gettimeout() + if timeout is None: + timeout = self.timeout + elif self.timeout is not None: + timeout = min(timeout, self.timeout) + fd_sets = select.select([self], [], [], timeout) + if not fd_sets[0]: + self.handle_timeout() + return + self._handle_request_noblock() + + def _handle_request_noblock(self): + """Handle one request, without blocking. + + I assume that select.select has returned that the socket is + readable before this function was called, so there should be + no risk of blocking in get_request(). + """ + try: + request, client_address = self.get_request() + except socket.error: + return + if self.verify_request(request, client_address): + try: + self.process_request(request, client_address) + except: + self.handle_error(request, client_address) + self.close_request(request) + + def handle_timeout(self): + """Called if no new request arrives within self.timeout. + + Overridden by ForkingMixIn. + """ + pass + + def verify_request(self, request, client_address): + """Verify the request. May be overridden. + + Return True if we should proceed with this request. + + """ + return True + + def process_request(self, request, client_address): + """Call finish_request. + + Overridden by ForkingMixIn and ThreadingMixIn. + + """ + self.finish_request(request, client_address) + self.close_request(request) + + def server_close(self): + """Called to clean-up the server. + + May be overridden. + + """ + pass + + def finish_request(self, request, client_address): + """Finish one request by instantiating RequestHandlerClass.""" + self.RequestHandlerClass(request, client_address, self) + + def close_request(self, request): + """Called to clean up an individual request.""" + pass + + def handle_error(self, request, client_address): + """Handle an error gracefully. May be overridden. + + The default is to print a traceback and continue. + + """ + print('-'*40) + print('Exception happened during processing of request from', end=' ') + print(client_address) + import traceback + traceback.print_exc() # XXX But this goes to stderr! + print('-'*40) + + +class TCPServer(BaseServer): + + """Base class for various socket-based server classes. + + Defaults to synchronous IP stream (i.e., TCP). + + Methods for the caller: + + - __init__(server_address, RequestHandlerClass, bind_and_activate=True) + - serve_forever(poll_interval=0.5) + - shutdown() + - handle_request() # if you don't use serve_forever() + - fileno() -> int # for select() + + Methods that may be overridden: + + - server_bind() + - server_activate() + - get_request() -> request, client_address + - handle_timeout() + - verify_request(request, client_address) + - process_request(request, client_address) + - close_request(request) + - handle_error() + + Methods for derived classes: + + - finish_request(request, client_address) + + Class variables that may be overridden by derived classes or + instances: + + - timeout + - address_family + - socket_type + - request_queue_size (only for stream sockets) + - allow_reuse_address + + Instance variables: + + - server_address + - RequestHandlerClass + - socket + + """ + + address_family = socket.AF_INET + + socket_type = socket.SOCK_STREAM + + request_queue_size = 5 + + allow_reuse_address = False + + def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): + """Constructor. May be extended, do not override.""" + BaseServer.__init__(self, server_address, RequestHandlerClass) + self.socket = socket.socket(self.address_family, + self.socket_type) + if bind_and_activate: + self.server_bind() + self.server_activate() + + def server_bind(self): + """Called by constructor to bind the socket. + + May be overridden. + + """ + if self.allow_reuse_address: + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind(self.server_address) + self.server_address = self.socket.getsockname() + + def server_activate(self): + """Called by constructor to activate the server. + + May be overridden. + + """ + self.socket.listen(self.request_queue_size) + + def server_close(self): + """Called to clean-up the server. + + May be overridden. + + """ + self.socket.close() + + def fileno(self): + """Return socket file number. + + Interface required by select(). + + """ + return self.socket.fileno() + + def get_request(self): + """Get the request and client address from the socket. + + May be overridden. + + """ + return self.socket.accept() + + def close_request(self, request): + """Called to clean up an individual request.""" + request.close() + + +class UDPServer(TCPServer): + + """UDP server class.""" + + allow_reuse_address = False + + socket_type = socket.SOCK_DGRAM + + max_packet_size = 8192 + + def get_request(self): + data, client_addr = self.socket.recvfrom(self.max_packet_size) + return (data, self.socket), client_addr + + def server_activate(self): + # No need to call listen() for UDP. + pass + + def close_request(self, request): + # No need to close anything. + pass + +class ForkingMixIn: + + """Mix-in class to handle each request in a new process.""" + + timeout = 300 + active_children = None + max_children = 40 + + def collect_children(self): + """Internal routine to wait for children that have exited.""" + if self.active_children is None: return + while len(self.active_children) >= self.max_children: + # XXX: This will wait for any child process, not just ones + # spawned by this library. This could confuse other + # libraries that expect to be able to wait for their own + # children. + try: + pid, status = os.waitpid(0, options=0) + except os.error: + pid = None + if pid not in self.active_children: continue + self.active_children.remove(pid) + + # XXX: This loop runs more system calls than it ought + # to. There should be a way to put the active_children into a + # process group and then use os.waitpid(-pgid) to wait for any + # of that set, but I couldn't find a way to allocate pgids + # that couldn't collide. + for child in self.active_children: + try: + pid, status = os.waitpid(child, os.WNOHANG) + except os.error: + pid = None + if not pid: continue + try: + self.active_children.remove(pid) + except ValueError as e: + raise ValueError('%s. x=%d and list=%r' % (e.message, pid, + self.active_children)) + + def handle_timeout(self): + """Wait for zombies after self.timeout seconds of inactivity. + + May be extended, do not override. + """ + self.collect_children() + + def process_request(self, request, client_address): + """Fork a new subprocess to process the request.""" + self.collect_children() + pid = os.fork() + if pid: + # Parent process + if self.active_children is None: + self.active_children = [] + self.active_children.append(pid) + self.close_request(request) + return + else: + # Child process. + # This must never return, hence os._exit()! + try: + self.finish_request(request, client_address) + os._exit(0) + except: + try: + self.handle_error(request, client_address) + finally: + os._exit(1) + + +class ThreadingMixIn: + """Mix-in class to handle each request in a new thread.""" + + # Decides how threads will act upon termination of the + # main process + daemon_threads = False + + def process_request_thread(self, request, client_address): + """Same as in BaseServer but as a thread. + + In addition, exception handling is done here. + + """ + try: + self.finish_request(request, client_address) + self.close_request(request) + except: + self.handle_error(request, client_address) + self.close_request(request) + + def process_request(self, request, client_address): + """Start a new thread to process the request.""" + t = threading.Thread(target = self.process_request_thread, + args = (request, client_address)) + if self.daemon_threads: + t.setDaemon (1) + t.start() + + +class ForkingUDPServer(ForkingMixIn, UDPServer): pass +class ForkingTCPServer(ForkingMixIn, TCPServer): pass + +class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass +class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass + +if hasattr(socket, 'AF_UNIX'): + + class UnixStreamServer(TCPServer): + address_family = socket.AF_UNIX + + class UnixDatagramServer(UDPServer): + address_family = socket.AF_UNIX + + class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass + + class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass + +class BaseRequestHandler: + + """Base class for request handler classes. + + This class is instantiated for each request to be handled. The + constructor sets the instance variables request, client_address + and server, and then calls the handle() method. To implement a + specific service, all you need to do is to derive a class which + defines a handle() method. + + The handle() method can find the request as self.request, the + client address as self.client_address, and the server (in case it + needs access to per-server information) as self.server. Since a + separate instance is created for each request, the handle() method + can define arbitrary other instance variariables. + + """ + + def __init__(self, request, client_address, server): + self.request = request + self.client_address = client_address + self.server = server + self.setup() + self.handle() + self.finish() + + def setup(self): + pass + + def handle(self): + pass + + def finish(self): + pass + + +# The following two classes make it possible to use the same service +# class for stream or datagram servers. +# Each class sets up these instance variables: +# - rfile: a file object from which receives the request is read +# - wfile: a file object to which the reply is written +# When the handle() method returns, wfile is flushed properly + + +class StreamRequestHandler(BaseRequestHandler): + + """Define self.rfile and self.wfile for stream sockets.""" + + # Default buffer sizes for rfile, wfile. + # We default rfile to buffered because otherwise it could be + # really slow for large data (a getc() call per byte); we make + # wfile unbuffered because (a) often after a write() we want to + # read and we need to flush the line; (b) big writes to unbuffered + # files are typically optimized by stdio even when big reads + # aren't. + rbufsize = -1 + wbufsize = 0 + + def setup(self): + self.connection = self.request + self.rfile = self.connection.makefile('rb', self.rbufsize) + self.wfile = self.connection.makefile('wb', self.wbufsize) + + def finish(self): + if not self.wfile.closed: + self.wfile.flush() + self.wfile.close() + self.rfile.close() + + +class DatagramRequestHandler(BaseRequestHandler): + + # XXX Regrettably, I cannot get this working on Linux; + # s.recvfrom() doesn't return a meaningful client address. + + """Define self.rfile and self.wfile for datagram sockets.""" + + def setup(self): + from io import BytesIO + self.packet, self.socket = self.request + self.rfile = BytesIO(self.packet) + self.wfile = BytesIO() + + def finish(self): + self.socket.sendto(self.wfile.getvalue(), self.client_address) |