diff options
author | Guido van Rossum <guido@python.org> | 1999-01-12 20:19:27 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 1999-01-12 20:19:27 (GMT) |
commit | 0039d7b4e6f07411f788dbcb52cd05d26fc7fec5 (patch) | |
tree | 441cfeda8b9dfb4df9c3d35e46998c8e62df2f07 /Lib | |
parent | dd7610cac90368d5017dc4cb0769e4fe0b04cb55 (diff) | |
download | cpython-0039d7b4e6f07411f788dbcb52cd05d26fc7fec5.zip cpython-0039d7b4e6f07411f788dbcb52cd05d26fc7fec5.tar.gz cpython-0039d7b4e6f07411f788dbcb52cd05d26fc7fec5.tar.bz2 |
A gift from Sam Rushing - modules asyncore and asynchat for the
standard Python library. (Async socket support.)
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/asynchat.py | 290 | ||||
-rw-r--r-- | Lib/asyncore.py | 453 |
2 files changed, 743 insertions, 0 deletions
diff --git a/Lib/asynchat.py b/Lib/asynchat.py new file mode 100644 index 0000000..5486419 --- /dev/null +++ b/Lib/asynchat.py @@ -0,0 +1,290 @@ +# -*- Mode: Python; tab-width: 4 -*- +# $Id$ +# Author: Sam Rushing <rushing@nightmare.com> + +# ====================================================================== +# Copyright 1996 by Sam Rushing +# +# All Rights Reserved +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose and without fee is hereby +# granted, provided that the above copyright notice appear in all +# copies and that both that copyright notice and this permission +# notice appear in supporting documentation, and that the name of Sam +# Rushing not be used in advertising or publicity pertaining to +# distribution of the software without specific, written prior +# permission. +# +# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, +# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN +# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR +# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS +# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, +# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +# ====================================================================== + +import socket +import asyncore +import string + +# This class adds support for 'chat' style protocols - where one side +# sends a 'command', and the other sends a response (examples would be +# the common internet protocols - smtp, nntp, ftp, etc..). + +# The handle_read() method looks at the input stream for the current +# 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' +# for multi-line output), calling self.found_terminator() on its +# receipt. + +# for example: +# Say you build an async nntp client using this class. At the start +# of the connection, you'll have self.terminator set to '\r\n', in +# order to process the single-line greeting. Just before issuing a +# 'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST +# command will be accumulated (using your own 'collect_incoming_data' +# method) up to the terminator, and then control will be returned to +# you - by calling your self.found_terminator() method + +class async_chat (asyncore.dispatcher): + """This is an abstract class. You must derive from this class, and add + the two methods collect_incoming_data() and found_terminator()""" + + # these are overridable defaults + + ac_in_buffer_size = 4096 + ac_out_buffer_size = 4096 + + def __init__ (self, conn=None): + self.ac_in_buffer = '' + self.ac_out_buffer = '' + self.producer_fifo = fifo() + asyncore.dispatcher.__init__ (self, conn) + + def set_terminator (self, term): + "Set the input delimiter. Can be a fixed string of any length, or None" + if term is None: + self.terminator = '' + else: + self.terminator = term + + def get_terminator (self): + return self.terminator + + # grab some more data from the socket, + # throw it to the collector method, + # check for the terminator, + # if found, transition to the next state. + + def handle_read (self): + + try: + data = self.recv (self.ac_in_buffer_size) + except socket.error, why: + import sys + self.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback) + return + + self.ac_in_buffer = self.ac_in_buffer + data + + # Continue to search for self.terminator in self.ac_in_buffer, + # while calling self.collect_incoming_data. The while loop + # is necessary because we might read several data+terminator + # combos with a single recv(1024). + + while self.ac_in_buffer: + terminator = self.get_terminator() + terminator_len = len(terminator) + # 4 cases: + # 1) end of buffer matches terminator exactly: + # collect data, transition + # 2) end of buffer matches some prefix: + # collect data to the prefix + # 3) end of buffer does not match any prefix: + # collect data + # 4) no terminator, just collect the data + if terminator: + index = string.find (self.ac_in_buffer, terminator) + if index != -1: + # we found the terminator + self.collect_incoming_data (self.ac_in_buffer[:index]) + self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] + # This does the Right Thing if the terminator is changed here. + self.found_terminator() + else: + # check for a prefix of the terminator + index = find_prefix_at_end (self.ac_in_buffer, terminator) + if index: + # we found a prefix, collect up to the prefix + self.collect_incoming_data (self.ac_in_buffer[:-index]) + self.ac_in_buffer = self.ac_in_buffer[-index:] + break + else: + # no prefix, collect it all + self.collect_incoming_data (self.ac_in_buffer) + self.ac_in_buffer = '' + else: + # no terminator, collect it all + self.collect_incoming_data (self.ac_in_buffer) + self.ac_in_buffer = '' + + def handle_write (self): + self.initiate_send () + + def handle_close (self): + self.close() + + def push (self, data): + self.producer_fifo.push (simple_producer (data)) + self.initiate_send() + + def push_with_producer (self, producer): + self.producer_fifo.push (producer) + self.initiate_send() + + def readable (self): + return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) + + def writable (self): + return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) + + def close_when_done (self): + self.producer_fifo.push (None) + + # refill the outgoing buffer by calling the more() method + # of the first producer in the queue + def refill_buffer (self): + while 1: + if len(self.producer_fifo): + p = self.producer_fifo.first() + # a 'None' in the producer fifo is a sentinel, + # telling us to close the channel. + if p is None: + if not self.ac_out_buffer: + self.producer_fifo.pop() + self.close() + return + data = p.more() + if data: + self.ac_out_buffer = self.ac_out_buffer + data + return + else: + self.producer_fifo.pop() + else: + return + + def initiate_send (self): + obs = self.ac_out_buffer_size + # try to refill the buffer + if (not self._push_mode) and (len (self.ac_out_buffer) < obs): + self.refill_buffer() + + if self.ac_out_buffer and self.connected: + # try to send the buffer + num_sent = self.send (self.ac_out_buffer[:obs]) + if num_sent: + self.ac_out_buffer = self.ac_out_buffer[num_sent:] + + def discard_buffers (self): + # Emergencies only! + self.ac_in_buffer = '' + self.ac_out_buffer == '' + while self.producer_fifo: + self.producer_fifo.pop() + + # ================================================== + # support for push mode. + # ================================================== + _push_mode = 0 + def push_mode (self, boolean): + self._push_mode = boolean + + def writable_push (self): + return self.connected and len(self.ac_out_buffer) + +class simple_producer: + def __init__ (self, data, buffer_size=512): + self.data = data + self.buffer_size = buffer_size + + def more (self): + if len (self.data) > self.buffer_size: + result = self.data[:self.buffer_size] + self.data = self.data[self.buffer_size:] + return result + else: + result = self.data + self.data = '' + return result + +class fifo: + def __init__ (self, list=None): + if not list: + self.list = [] + else: + self.list = list + + def __len__ (self): + return len(self.list) + + def first (self): + return self.list[0] + + def push (self, data): + self.list.append (data) + + def pop (self): + if self.list: + result = self.list[0] + del self.list[0] + return (1, result) + else: + return (0, None) + +# Given 'haystack', see if any prefix of 'needle' is at its end. This +# assumes an exact match has already been checked. Return the number of +# characters matched. +# for example: +# f_p_a_e ("qwerty\r", "\r\n") => 1 +# f_p_a_e ("qwerty\r\n", "\r\n") => 2 +# f_p_a_e ("qwertydkjf", "\r\n") => 0 + +# this could maybe be made faster with a computed regex? + +##def find_prefix_at_end (haystack, needle): +## nl = len(needle) +## result = 0 +## for i in range (1,nl): +## if haystack[-(nl-i):] == needle[:(nl-i)]: +## result = nl-i +## break +## return result + +# yes, this is about twice as fast, but still seems +# to be neglible CPU. The previous could do about 290 +# searches/sec. the new one about 555/sec. + +import regex + +prefix_cache = {} + +def prefix_regex (needle): + if prefix_cache.has_key (needle): + return prefix_cache[needle] + else: + reg = needle[-1] + for i in range(1,len(needle)): + reg = '%c\(%s\)?' % (needle[-(i+1)], reg) + reg = regex.compile (reg+'$') + prefix_cache[needle] = reg, len(needle) + return reg, len(needle) + +def find_prefix_at_end (haystack, needle): + reg, length = prefix_regex (needle) + lh = len(haystack) + result = reg.search (haystack, max(0,lh-length)) + if result >= 0: + return (lh - result) + else: + return 0 diff --git a/Lib/asyncore.py b/Lib/asyncore.py new file mode 100644 index 0000000..c9b39a3 --- /dev/null +++ b/Lib/asyncore.py @@ -0,0 +1,453 @@ +# -*- Mode: Python; tab-width: 4 -*- +# $Id$ +# Author: Sam Rushing <rushing@nightmare.com> + +# ====================================================================== +# Copyright 1996 by Sam Rushing +# +# All Rights Reserved +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose and without fee is hereby +# granted, provided that the above copyright notice appear in all +# copies and that both that copyright notice and this permission +# notice appear in supporting documentation, and that the name of Sam +# Rushing not be used in advertising or publicity pertaining to +# distribution of the software without specific, written prior +# permission. +# +# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, +# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN +# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR +# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS +# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, +# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +# ====================================================================== + +import select +import socket +import string +import sys + +import os +if os.name == 'nt': + EWOULDBLOCK = 10035 + EINPROGRESS = 10036 + EALREADY = 10037 + ECONNRESET = 10054 + ENOTCONN = 10057 +else: + from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN + +socket_map = {} + +def poll (timeout=0.0, ignore_expt=1): + if socket_map: + sockets = socket_map.keys() + r = filter (lambda x: x.readable(), sockets) + w = filter (lambda x: x.writable(), sockets) + if ignore_expt: + e = [] + else: + e = sockets[:] + + (r,w,e) = select.select (r,w,e, timeout) + + for x in e: + try: + x.handle_expt_event() + except: + x.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback) + for x in r: + try: + x.handle_read_event() + except: + x.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback) + for x in w: + try: + x.handle_write_event() + except: + x.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback) + +def poll2 (timeout=0.0): + import poll + # timeout is in milliseconds + timeout = int(timeout*1000) + if socket_map: + fd_map = {} + for s in socket_map.keys(): + fd_map[s.fileno()] = s + l = [] + for fd, s in fd_map.items(): + flags = 0 + if s.readable(): + flags = poll.POLLIN + if s.writable(): + flags = flags | poll.POLLOUT + if flags: + l.append (fd, flags) + r = poll.poll (l, timeout) + print r + for fd, flags in r: + s = fd_map[fd] + try: + if (flags & poll.POLLIN): + s.handle_read_event() + if (flags & poll.POLLOUT): + s.handle_write_event() + if (flags & poll.POLLERR): + s.handle_expt_event() + except: + apply (s.handle_error, sys.exc_info()) + + +def loop (timeout=30.0, use_poll=0): + + if use_poll: + poll_fun = poll2 + else: + poll_fun = poll + + while socket_map: + poll_fun (timeout) + +class dispatcher: + debug = 0 + connected = 0 + accepting = 0 + closing = 0 + addr = None + + def __init__ (self, sock=None): + if sock: + self.set_socket (sock) + # I think it should inherit this anyway + self.socket.setblocking (0) + self.connected = 1 + + def __repr__ (self): + try: + status = [] + if self.accepting and self.addr: + status.append ('listening') + elif self.connected: + status.append ('connected') + if self.addr: + status.append ('%s:%d' % self.addr) + return '<%s %s at %x>' % ( + self.__class__.__name__, + string.join (status, ' '), + id(self) + ) + except: + try: + ar = repr(self.addr) + except: + ar = 'no self.addr!' + + return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar) + + def add_channel (self): + self.log ('adding channel %s' % self) + socket_map [self] = 1 + + def del_channel (self): + if socket_map.has_key (self): + self.log ('closing channel %d:%s' % (self.fileno(), self)) + del socket_map [self] + + def create_socket (self, family, type): + self.family_and_type = family, type + self.socket = socket.socket (family, type) + self.socket.setblocking(0) + self.add_channel() + + def set_socket (self, socket): + self.socket = socket + self.add_channel() + + def set_reuse_addr (self): + # try to re-use a server port if possible + try: + self.socket.setsockopt ( + socket.SOL_SOCKET, socket.SO_REUSEADDR, + self.socket.getsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1 + ) + except: + pass + + # ================================================== + # predicates for select() + # these are used as filters for the lists of sockets + # to pass to select(). + # ================================================== + + def readable (self): + return 1 + + if os.name == 'mac': + # The macintosh will select a listening socket for + # write if you let it. What might this mean? + def writable (self): + return not self.accepting + else: + def writable (self): + return 1 + + # ================================================== + # socket object methods. + # ================================================== + + def listen (self, num): + self.accepting = 1 + if os.name == 'nt' and num > 5: + num = 1 + return self.socket.listen (num) + + def bind (self, addr): + self.addr = addr + return self.socket.bind (addr) + + def connect (self, address): + try: + self.socket.connect (address) + except socket.error, why: + if why[0] in (EINPROGRESS, EALREADY, EWOULDBLOCK): + return + else: + raise socket.error, why + self.connected = 1 + self.handle_connect() + + def accept (self): + try: + conn, addr = self.socket.accept() + return conn, addr + except socket.error, why: + if why[0] == EWOULDBLOCK: + pass + else: + raise socket.error, why + + def send (self, data): + try: + result = self.socket.send (data) + return result + except socket.error, why: + if why[0] == EWOULDBLOCK: + return 0 + else: + raise socket.error, why + return 0 + + def recv (self, buffer_size): + try: + data = self.socket.recv (buffer_size) + if not data: + # a closed connection is indicated by signaling + # a read condition, and having recv() return 0. + self.handle_close() + return '' + else: + return data + except socket.error, why: + # winsock sometimes throws ENOTCONN + if why[0] in [ECONNRESET, ENOTCONN]: + self.handle_close() + return '' + else: + raise socket.error, why + + def close (self): + self.del_channel() + self.socket.close() + self.connected = 0 + + # cheap inheritance, used to pass all other attribute + # references to the underlying socket object. + def __getattr__ (self, attr): + if attr != 'socket': + return getattr (self.socket, attr) + else: + raise AttributeError, attr + + def log (self, message): + print 'log:', message + + def handle_read_event (self): + if self.accepting: + # for an accepting socket, getting a read implies + # that we are connected + if not self.connected: + self.connected = 1 + self.handle_accept() + elif not self.connected: + self.handle_connect() + self.connected = 1 + self.handle_read() + else: + self.handle_read() + + def handle_write_event (self): + # getting a write implies that we are connected + if not self.connected: + self.handle_connect() + self.connected = 1 + self.handle_write() + + def handle_expt_event (self): + self.handle_expt() + + def handle_error (self, *info): + (t,v,tb) = info + (file,fun,line), tbinfo = compact_traceback (t,v,tb) + + # sometimes a user repr method will crash. + try: + self_repr = repr (self) + except: + self_repr = '<__repr__ (self) failed for object at %0x>' % id(self) + + print ( + 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( + self_repr, + str(t), + str(v), + tbinfo + ) + ) + del t,v,tb + self.close() + + def handle_expt (self): + self.log ('unhandled exception') + + def handle_read (self): + self.log ('unhandled read event') + + def handle_write (self): + self.log ('unhandled write event') + + def handle_connect (self): + self.log ('unhandled connect event') + + def handle_oob (self): + self.log ('unhandled out-of-band event') + + def handle_accept (self): + self.log ('unhandled accept event') + + def handle_close (self): + self.log ('unhandled close event') + self.close() + +# --------------------------------------------------------------------------- +# adds simple buffered output capability, useful for simple clients. +# [for more sophisticated usage use asynchat.async_chat] +# --------------------------------------------------------------------------- + +class dispatcher_with_send (dispatcher): + def __init__ (self, sock=None): + dispatcher.__init__ (self, sock) + self.out_buffer = '' + + def initiate_send (self): + num_sent = 0 + num_sent = dispatcher.send (self, self.out_buffer[:512]) + self.out_buffer = self.out_buffer[num_sent:] + + def handle_write (self): + self.initiate_send() + + def writable (self): + return (not self.connected) or len(self.out_buffer) + + def send (self, data): + if self.debug: + self.log ('sending %s' % repr(data)) + self.out_buffer = self.out_buffer + data + self.initiate_send() + +# --------------------------------------------------------------------------- +# used for debugging. +# --------------------------------------------------------------------------- + +def compact_traceback (t,v,tb): + tbinfo = [] + while 1: + tbinfo.append ( + tb.tb_frame.f_code.co_filename, + tb.tb_frame.f_code.co_name, + str(tb.tb_lineno) + ) + tb = tb.tb_next + if not tb: + break + + file, function, line = tbinfo[-1] + info = '[' + string.join ( + map ( + lambda x: string.join (x, '|'), + tbinfo + ), + '] [' + ) + ']' + return (file, function, line), info + +def close_all (): + global socket_map + for x in socket_map.keys(): + x.socket.close() + socket_map.clear() + +# Asynchronous File I/O: +# +# After a little research (reading man pages on various unixen, and +# digging through the linux kernel), I've determined that select() +# isn't meant for doing doing asynchronous file i/o. +# Heartening, though - reading linux/mm/filemap.c shows that linux +# supports asynchronous read-ahead. So _MOST_ of the time, the data +# will be sitting in memory for us already when we go to read it. +# +# What other OS's (besides NT) support async file i/o? [VMS?] +# +# Regardless, this is useful for pipes, and stdin/stdout... + +import os +if os.name == 'posix': + import fcntl + import FCNTL + + class file_wrapper: + # here we override just enough to make a file + # look like a socket for the purposes of asyncore. + def __init__ (self, fd): + self.fd = fd + + def recv (self, *args): + return apply (os.read, (self.fd,)+args) + + def write (self, *args): + return apply (os.write, (self.fd,)+args) + + def close (self): + return os.close (self.fd) + + def fileno (self): + return self.fd + + class file_dispatcher (dispatcher): + def __init__ (self, fd): + dispatcher.__init__ (self) + self.connected = 1 + # set it to non-blocking mode + flags = fcntl.fcntl (fd, FCNTL.F_GETFL, 0) + flags = flags | FCNTL.O_NONBLOCK + fcntl.fcntl (fd, FCNTL.F_SETFL, flags) + self.set_file (fd) + + def set_file (self, fd): + self.socket = file_wrapper (fd) + self.add_channel() +#not really |