diff options
author | Guido van Rossum <guido@python.org> | 1999-06-08 13:20:05 (GMT) |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 1999-06-08 13:20:05 (GMT) |
commit | a8d0f4fd2d10a1f5e05d31e048e52a1192d84321 (patch) | |
tree | df9bf52847f089314545a5374afff5b0709198ea /Lib | |
parent | cf09a3924f80ec7dfc706c49f8b7c60c990e594b (diff) | |
download | cpython-a8d0f4fd2d10a1f5e05d31e048e52a1192d84321.zip cpython-a8d0f4fd2d10a1f5e05d31e048e52a1192d84321.tar.gz cpython-a8d0f4fd2d10a1f5e05d31e048e52a1192d84321.tar.bz2 |
Sam's latest versions
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/asynchat.py | 100 | ||||
-rw-r--r-- | Lib/asyncore.py | 96 |
2 files changed, 109 insertions, 87 deletions
diff --git a/Lib/asynchat.py b/Lib/asynchat.py index 5486419..f04d2fa 100644 --- a/Lib/asynchat.py +++ b/Lib/asynchat.py @@ -63,11 +63,8 @@ class async_chat (asyncore.dispatcher): asyncore.dispatcher.__init__ (self, conn) def set_terminator (self, term): - "Set the input delimiter. Can be a fixed string of any length, or None" - if term is None: - self.terminator = '' - else: - self.terminator = term + "Set the input delimiter. Can be a fixed string of any length, an integer, or None" + self.terminator = term def get_terminator (self): return self.terminator @@ -82,8 +79,7 @@ class async_chat (asyncore.dispatcher): try: data = self.recv (self.ac_in_buffer_size) except socket.error, why: - import sys - self.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback) + self.handle_error() return self.ac_in_buffer = self.ac_in_buffer + data @@ -94,17 +90,33 @@ class async_chat (asyncore.dispatcher): # combos with a single recv(1024). while self.ac_in_buffer: + lb = len(self.ac_in_buffer) terminator = self.get_terminator() - terminator_len = len(terminator) - # 4 cases: - # 1) end of buffer matches terminator exactly: - # collect data, transition - # 2) end of buffer matches some prefix: - # collect data to the prefix - # 3) end of buffer does not match any prefix: - # collect data - # 4) no terminator, just collect the data - if terminator: + if terminator is None: + # no terminator, collect it all + self.collect_incoming_data (self.ac_in_buffer) + self.ac_in_buffer = '' + elif type(terminator) == type(0): + # numeric terminator + n = terminator + if lb < n: + self.collect_incoming_data (self.ac_in_buffer) + self.ac_in_buffer = '' + self.terminator = self.terminator - lb + else: + self.collect_incoming_data (self.ac_in_buffer[:n]) + self.ac_in_buffer = self.ac_in_buffer[n:] + self.terminator = 0 + self.found_terminator() + else: + # 3 cases: + # 1) end of buffer matches terminator exactly: + # collect data, transition + # 2) end of buffer matches some prefix: + # collect data to the prefix + # 3) end of buffer does not match any prefix: + # collect data + terminator_len = len(terminator) index = string.find (self.ac_in_buffer, terminator) if index != -1: # we found the terminator @@ -116,18 +128,15 @@ class async_chat (asyncore.dispatcher): # check for a prefix of the terminator index = find_prefix_at_end (self.ac_in_buffer, terminator) if index: - # we found a prefix, collect up to the prefix - self.collect_incoming_data (self.ac_in_buffer[:-index]) - self.ac_in_buffer = self.ac_in_buffer[-index:] + if index != lb: + # we found a prefix, collect up to the prefix + self.collect_incoming_data (self.ac_in_buffer[:-index]) + self.ac_in_buffer = self.ac_in_buffer[-index:] break else: # no prefix, collect it all self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = '' - else: - # no terminator, collect it all - self.collect_incoming_data (self.ac_in_buffer) - self.ac_in_buffer = '' def handle_write (self): self.initiate_send () @@ -144,17 +153,27 @@ class async_chat (asyncore.dispatcher): self.initiate_send() def readable (self): + "predicate for inclusion in the readable for select()" return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) def writable (self): - return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) + "predicate for inclusion in the writable for select()" + # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) + # this is about twice as fast, though not as clear. + return not ( + (self.ac_out_buffer is '') and + self.producer_fifo.is_empty() and + self.connected + ) def close_when_done (self): + "automatically close this channel once the outgoing queue is empty" self.producer_fifo.push (None) # refill the outgoing buffer by calling the more() method # of the first producer in the queue def refill_buffer (self): + _string_type = type('') while 1: if len(self.producer_fifo): p = self.producer_fifo.first() @@ -165,6 +184,10 @@ class async_chat (asyncore.dispatcher): self.producer_fifo.pop() self.close() return + elif type(p) is _string_type: + self.producer_fifo.pop() + self.ac_out_buffer = self.ac_out_buffer + p + return data = p.more() if data: self.ac_out_buffer = self.ac_out_buffer + data @@ -177,14 +200,19 @@ class async_chat (asyncore.dispatcher): def initiate_send (self): obs = self.ac_out_buffer_size # try to refill the buffer - if (not self._push_mode) and (len (self.ac_out_buffer) < obs): + if (len (self.ac_out_buffer) < obs): self.refill_buffer() if self.ac_out_buffer and self.connected: # try to send the buffer - num_sent = self.send (self.ac_out_buffer[:obs]) - if num_sent: - self.ac_out_buffer = self.ac_out_buffer[num_sent:] + try: + num_sent = self.send (self.ac_out_buffer[:obs]) + if num_sent: + self.ac_out_buffer = self.ac_out_buffer[num_sent:] + + except socket.error, why: + self.handle_error() + return def discard_buffers (self): # Emergencies only! @@ -193,17 +221,8 @@ class async_chat (asyncore.dispatcher): while self.producer_fifo: self.producer_fifo.pop() - # ================================================== - # support for push mode. - # ================================================== - _push_mode = 0 - def push_mode (self, boolean): - self._push_mode = boolean - - def writable_push (self): - return self.connected and len(self.ac_out_buffer) - class simple_producer: + def __init__ (self, data, buffer_size=512): self.data = data self.buffer_size = buffer_size @@ -228,6 +247,9 @@ class fifo: def __len__ (self): return len(self.list) + def is_empty (self): + return self.list == [] + def first (self): return self.list[0] diff --git a/Lib/asyncore.py b/Lib/asyncore.py index c9b39a3..69becac 100644 --- a/Lib/asyncore.py +++ b/Lib/asyncore.py @@ -37,38 +37,33 @@ if os.name == 'nt': EALREADY = 10037 ECONNRESET = 10054 ENOTCONN = 10057 + ESHUTDOWN = 10058 else: - from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN + from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN, ESHUTDOWN socket_map = {} -def poll (timeout=0.0, ignore_expt=1): +def poll (timeout=0.0): if socket_map: - sockets = socket_map.keys() - r = filter (lambda x: x.readable(), sockets) - w = filter (lambda x: x.writable(), sockets) - if ignore_expt: - e = [] - else: - e = sockets[:] + r = []; w = []; e = [] + for s in socket_map.keys(): + if s.readable(): + r.append (s) + if s.writable(): + w.append (s) (r,w,e) = select.select (r,w,e, timeout) - for x in e: - try: - x.handle_expt_event() - except: - x.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback) for x in r: try: x.handle_read_event() except: - x.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback) + x.handle_error() for x in w: try: x.handle_write_event() except: - x.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback) + x.handle_error() def poll2 (timeout=0.0): import poll @@ -88,18 +83,17 @@ def poll2 (timeout=0.0): if flags: l.append (fd, flags) r = poll.poll (l, timeout) - print r for fd, flags in r: s = fd_map[fd] try: if (flags & poll.POLLIN): - s.handle_read_event() + s.handle_read_event() if (flags & poll.POLLOUT): - s.handle_write_event() + s.handle_write_event() if (flags & poll.POLLERR): - s.handle_expt_event() + s.handle_expt_event() except: - apply (s.handle_error, sys.exc_info()) + s.handle_error() def loop (timeout=30.0, use_poll=0): @@ -149,12 +143,14 @@ class dispatcher: return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar) def add_channel (self): - self.log ('adding channel %s' % self) + if __debug__: + self.log ('adding channel %s' % self) socket_map [self] = 1 def del_channel (self): if socket_map.has_key (self): - self.log ('closing channel %d:%s' % (self.fileno(), self)) + if __debug__: + self.log ('closing channel %d:%s' % (self.fileno(), self)) del socket_map [self] def create_socket (self, family, type): @@ -164,7 +160,8 @@ class dispatcher: self.add_channel() def set_socket (self, socket): - self.socket = socket + # This is done so we can be called safely from __init__ + self.__dict__['socket'] = socket self.add_channel() def set_reuse_addr (self): @@ -210,6 +207,7 @@ class dispatcher: return self.socket.bind (addr) def connect (self, address): + self.connected = 0 try: self.socket.connect (address) except socket.error, why: @@ -253,7 +251,7 @@ class dispatcher: return data except socket.error, why: # winsock sometimes throws ENOTCONN - if why[0] in [ECONNRESET, ENOTCONN]: + if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]: self.handle_close() return '' else: @@ -262,15 +260,12 @@ class dispatcher: def close (self): self.del_channel() self.socket.close() - self.connected = 0 # cheap inheritance, used to pass all other attribute # references to the underlying socket object. + # NOTE: this may be removed soon for performance reasons. def __getattr__ (self, attr): - if attr != 'socket': - return getattr (self.socket, attr) - else: - raise AttributeError, attr + return getattr (self.socket, attr) def log (self, message): print 'log:', message @@ -299,9 +294,8 @@ class dispatcher: def handle_expt_event (self): self.handle_expt() - def handle_error (self, *info): - (t,v,tb) = info - (file,fun,line), tbinfo = compact_traceback (t,v,tb) + def handle_error (self): + (file,fun,line), t, v, tbinfo = compact_traceback() # sometimes a user repr method will crash. try: @@ -312,34 +306,36 @@ class dispatcher: print ( 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( self_repr, - str(t), - str(v), + t, + v, tbinfo ) ) - del t,v,tb self.close() def handle_expt (self): - self.log ('unhandled exception') + if __debug__: + self.log ('unhandled exception') def handle_read (self): - self.log ('unhandled read event') + if __debug__: + self.log ('unhandled read event') def handle_write (self): - self.log ('unhandled write event') + if __debug__: + self.log ('unhandled write event') def handle_connect (self): - self.log ('unhandled connect event') - - def handle_oob (self): - self.log ('unhandled out-of-band event') + if __debug__: + self.log ('unhandled connect event') def handle_accept (self): - self.log ('unhandled accept event') + if __debug__: + self.log ('unhandled accept event') def handle_close (self): - self.log ('unhandled close event') + if __debug__: + self.log ('unhandled close event') self.close() # --------------------------------------------------------------------------- @@ -373,7 +369,8 @@ class dispatcher_with_send (dispatcher): # used for debugging. # --------------------------------------------------------------------------- -def compact_traceback (t,v,tb): +def compact_traceback (): + t,v,tb = sys.exc_info() tbinfo = [] while 1: tbinfo.append ( @@ -385,6 +382,9 @@ def compact_traceback (t,v,tb): if not tb: break + # just to be safe + del tb + file, function, line = tbinfo[-1] info = '[' + string.join ( map ( @@ -393,7 +393,7 @@ def compact_traceback (t,v,tb): ), '] [' ) + ']' - return (file, function, line), info + return (file, function, line), t, v, info def close_all (): global socket_map @@ -450,4 +450,4 @@ if os.name == 'posix': def set_file (self, fd): self.socket = file_wrapper (fd) self.add_channel() -#not really + |