summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>1999-06-08 13:20:05 (GMT)
committerGuido van Rossum <guido@python.org>1999-06-08 13:20:05 (GMT)
commita8d0f4fd2d10a1f5e05d31e048e52a1192d84321 (patch)
treedf9bf52847f089314545a5374afff5b0709198ea /Lib
parentcf09a3924f80ec7dfc706c49f8b7c60c990e594b (diff)
downloadcpython-a8d0f4fd2d10a1f5e05d31e048e52a1192d84321.zip
cpython-a8d0f4fd2d10a1f5e05d31e048e52a1192d84321.tar.gz
cpython-a8d0f4fd2d10a1f5e05d31e048e52a1192d84321.tar.bz2
Sam's latest versions
Diffstat (limited to 'Lib')
-rw-r--r--Lib/asynchat.py100
-rw-r--r--Lib/asyncore.py96
2 files changed, 109 insertions, 87 deletions
diff --git a/Lib/asynchat.py b/Lib/asynchat.py
index 5486419..f04d2fa 100644
--- a/Lib/asynchat.py
+++ b/Lib/asynchat.py
@@ -63,11 +63,8 @@ class async_chat (asyncore.dispatcher):
asyncore.dispatcher.__init__ (self, conn)
def set_terminator (self, term):
- "Set the input delimiter. Can be a fixed string of any length, or None"
- if term is None:
- self.terminator = ''
- else:
- self.terminator = term
+ "Set the input delimiter. Can be a fixed string of any length, an integer, or None"
+ self.terminator = term
def get_terminator (self):
return self.terminator
@@ -82,8 +79,7 @@ class async_chat (asyncore.dispatcher):
try:
data = self.recv (self.ac_in_buffer_size)
except socket.error, why:
- import sys
- self.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback)
+ self.handle_error()
return
self.ac_in_buffer = self.ac_in_buffer + data
@@ -94,17 +90,33 @@ class async_chat (asyncore.dispatcher):
# combos with a single recv(1024).
while self.ac_in_buffer:
+ lb = len(self.ac_in_buffer)
terminator = self.get_terminator()
- terminator_len = len(terminator)
- # 4 cases:
- # 1) end of buffer matches terminator exactly:
- # collect data, transition
- # 2) end of buffer matches some prefix:
- # collect data to the prefix
- # 3) end of buffer does not match any prefix:
- # collect data
- # 4) no terminator, just collect the data
- if terminator:
+ if terminator is None:
+ # no terminator, collect it all
+ self.collect_incoming_data (self.ac_in_buffer)
+ self.ac_in_buffer = ''
+ elif type(terminator) == type(0):
+ # numeric terminator
+ n = terminator
+ if lb < n:
+ self.collect_incoming_data (self.ac_in_buffer)
+ self.ac_in_buffer = ''
+ self.terminator = self.terminator - lb
+ else:
+ self.collect_incoming_data (self.ac_in_buffer[:n])
+ self.ac_in_buffer = self.ac_in_buffer[n:]
+ self.terminator = 0
+ self.found_terminator()
+ else:
+ # 3 cases:
+ # 1) end of buffer matches terminator exactly:
+ # collect data, transition
+ # 2) end of buffer matches some prefix:
+ # collect data to the prefix
+ # 3) end of buffer does not match any prefix:
+ # collect data
+ terminator_len = len(terminator)
index = string.find (self.ac_in_buffer, terminator)
if index != -1:
# we found the terminator
@@ -116,18 +128,15 @@ class async_chat (asyncore.dispatcher):
# check for a prefix of the terminator
index = find_prefix_at_end (self.ac_in_buffer, terminator)
if index:
- # we found a prefix, collect up to the prefix
- self.collect_incoming_data (self.ac_in_buffer[:-index])
- self.ac_in_buffer = self.ac_in_buffer[-index:]
+ if index != lb:
+ # we found a prefix, collect up to the prefix
+ self.collect_incoming_data (self.ac_in_buffer[:-index])
+ self.ac_in_buffer = self.ac_in_buffer[-index:]
break
else:
# no prefix, collect it all
self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = ''
- else:
- # no terminator, collect it all
- self.collect_incoming_data (self.ac_in_buffer)
- self.ac_in_buffer = ''
def handle_write (self):
self.initiate_send ()
@@ -144,17 +153,27 @@ class async_chat (asyncore.dispatcher):
self.initiate_send()
def readable (self):
+ "predicate for inclusion in the readable for select()"
return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
def writable (self):
- return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
+ "predicate for inclusion in the writable for select()"
+ # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
+ # this is about twice as fast, though not as clear.
+ return not (
+ (self.ac_out_buffer is '') and
+ self.producer_fifo.is_empty() and
+ self.connected
+ )
def close_when_done (self):
+ "automatically close this channel once the outgoing queue is empty"
self.producer_fifo.push (None)
# refill the outgoing buffer by calling the more() method
# of the first producer in the queue
def refill_buffer (self):
+ _string_type = type('')
while 1:
if len(self.producer_fifo):
p = self.producer_fifo.first()
@@ -165,6 +184,10 @@ class async_chat (asyncore.dispatcher):
self.producer_fifo.pop()
self.close()
return
+ elif type(p) is _string_type:
+ self.producer_fifo.pop()
+ self.ac_out_buffer = self.ac_out_buffer + p
+ return
data = p.more()
if data:
self.ac_out_buffer = self.ac_out_buffer + data
@@ -177,14 +200,19 @@ class async_chat (asyncore.dispatcher):
def initiate_send (self):
obs = self.ac_out_buffer_size
# try to refill the buffer
- if (not self._push_mode) and (len (self.ac_out_buffer) < obs):
+ if (len (self.ac_out_buffer) < obs):
self.refill_buffer()
if self.ac_out_buffer and self.connected:
# try to send the buffer
- num_sent = self.send (self.ac_out_buffer[:obs])
- if num_sent:
- self.ac_out_buffer = self.ac_out_buffer[num_sent:]
+ try:
+ num_sent = self.send (self.ac_out_buffer[:obs])
+ if num_sent:
+ self.ac_out_buffer = self.ac_out_buffer[num_sent:]
+
+ except socket.error, why:
+ self.handle_error()
+ return
def discard_buffers (self):
# Emergencies only!
@@ -193,17 +221,8 @@ class async_chat (asyncore.dispatcher):
while self.producer_fifo:
self.producer_fifo.pop()
- # ==================================================
- # support for push mode.
- # ==================================================
- _push_mode = 0
- def push_mode (self, boolean):
- self._push_mode = boolean
-
- def writable_push (self):
- return self.connected and len(self.ac_out_buffer)
-
class simple_producer:
+
def __init__ (self, data, buffer_size=512):
self.data = data
self.buffer_size = buffer_size
@@ -228,6 +247,9 @@ class fifo:
def __len__ (self):
return len(self.list)
+ def is_empty (self):
+ return self.list == []
+
def first (self):
return self.list[0]
diff --git a/Lib/asyncore.py b/Lib/asyncore.py
index c9b39a3..69becac 100644
--- a/Lib/asyncore.py
+++ b/Lib/asyncore.py
@@ -37,38 +37,33 @@ if os.name == 'nt':
EALREADY = 10037
ECONNRESET = 10054
ENOTCONN = 10057
+ ESHUTDOWN = 10058
else:
- from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN
+ from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN, ESHUTDOWN
socket_map = {}
-def poll (timeout=0.0, ignore_expt=1):
+def poll (timeout=0.0):
if socket_map:
- sockets = socket_map.keys()
- r = filter (lambda x: x.readable(), sockets)
- w = filter (lambda x: x.writable(), sockets)
- if ignore_expt:
- e = []
- else:
- e = sockets[:]
+ r = []; w = []; e = []
+ for s in socket_map.keys():
+ if s.readable():
+ r.append (s)
+ if s.writable():
+ w.append (s)
(r,w,e) = select.select (r,w,e, timeout)
- for x in e:
- try:
- x.handle_expt_event()
- except:
- x.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback)
for x in r:
try:
x.handle_read_event()
except:
- x.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback)
+ x.handle_error()
for x in w:
try:
x.handle_write_event()
except:
- x.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback)
+ x.handle_error()
def poll2 (timeout=0.0):
import poll
@@ -88,18 +83,17 @@ def poll2 (timeout=0.0):
if flags:
l.append (fd, flags)
r = poll.poll (l, timeout)
- print r
for fd, flags in r:
s = fd_map[fd]
try:
if (flags & poll.POLLIN):
- s.handle_read_event()
+ s.handle_read_event()
if (flags & poll.POLLOUT):
- s.handle_write_event()
+ s.handle_write_event()
if (flags & poll.POLLERR):
- s.handle_expt_event()
+ s.handle_expt_event()
except:
- apply (s.handle_error, sys.exc_info())
+ s.handle_error()
def loop (timeout=30.0, use_poll=0):
@@ -149,12 +143,14 @@ class dispatcher:
return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar)
def add_channel (self):
- self.log ('adding channel %s' % self)
+ if __debug__:
+ self.log ('adding channel %s' % self)
socket_map [self] = 1
def del_channel (self):
if socket_map.has_key (self):
- self.log ('closing channel %d:%s' % (self.fileno(), self))
+ if __debug__:
+ self.log ('closing channel %d:%s' % (self.fileno(), self))
del socket_map [self]
def create_socket (self, family, type):
@@ -164,7 +160,8 @@ class dispatcher:
self.add_channel()
def set_socket (self, socket):
- self.socket = socket
+ # This is done so we can be called safely from __init__
+ self.__dict__['socket'] = socket
self.add_channel()
def set_reuse_addr (self):
@@ -210,6 +207,7 @@ class dispatcher:
return self.socket.bind (addr)
def connect (self, address):
+ self.connected = 0
try:
self.socket.connect (address)
except socket.error, why:
@@ -253,7 +251,7 @@ class dispatcher:
return data
except socket.error, why:
# winsock sometimes throws ENOTCONN
- if why[0] in [ECONNRESET, ENOTCONN]:
+ if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
self.handle_close()
return ''
else:
@@ -262,15 +260,12 @@ class dispatcher:
def close (self):
self.del_channel()
self.socket.close()
- self.connected = 0
# cheap inheritance, used to pass all other attribute
# references to the underlying socket object.
+ # NOTE: this may be removed soon for performance reasons.
def __getattr__ (self, attr):
- if attr != 'socket':
- return getattr (self.socket, attr)
- else:
- raise AttributeError, attr
+ return getattr (self.socket, attr)
def log (self, message):
print 'log:', message
@@ -299,9 +294,8 @@ class dispatcher:
def handle_expt_event (self):
self.handle_expt()
- def handle_error (self, *info):
- (t,v,tb) = info
- (file,fun,line), tbinfo = compact_traceback (t,v,tb)
+ def handle_error (self):
+ (file,fun,line), t, v, tbinfo = compact_traceback()
# sometimes a user repr method will crash.
try:
@@ -312,34 +306,36 @@ class dispatcher:
print (
'uncaptured python exception, closing channel %s (%s:%s %s)' % (
self_repr,
- str(t),
- str(v),
+ t,
+ v,
tbinfo
)
)
- del t,v,tb
self.close()
def handle_expt (self):
- self.log ('unhandled exception')
+ if __debug__:
+ self.log ('unhandled exception')
def handle_read (self):
- self.log ('unhandled read event')
+ if __debug__:
+ self.log ('unhandled read event')
def handle_write (self):
- self.log ('unhandled write event')
+ if __debug__:
+ self.log ('unhandled write event')
def handle_connect (self):
- self.log ('unhandled connect event')
-
- def handle_oob (self):
- self.log ('unhandled out-of-band event')
+ if __debug__:
+ self.log ('unhandled connect event')
def handle_accept (self):
- self.log ('unhandled accept event')
+ if __debug__:
+ self.log ('unhandled accept event')
def handle_close (self):
- self.log ('unhandled close event')
+ if __debug__:
+ self.log ('unhandled close event')
self.close()
# ---------------------------------------------------------------------------
@@ -373,7 +369,8 @@ class dispatcher_with_send (dispatcher):
# used for debugging.
# ---------------------------------------------------------------------------
-def compact_traceback (t,v,tb):
+def compact_traceback ():
+ t,v,tb = sys.exc_info()
tbinfo = []
while 1:
tbinfo.append (
@@ -385,6 +382,9 @@ def compact_traceback (t,v,tb):
if not tb:
break
+ # just to be safe
+ del tb
+
file, function, line = tbinfo[-1]
info = '[' + string.join (
map (
@@ -393,7 +393,7 @@ def compact_traceback (t,v,tb):
),
'] ['
) + ']'
- return (file, function, line), info
+ return (file, function, line), t, v, info
def close_all ():
global socket_map
@@ -450,4 +450,4 @@ if os.name == 'posix':
def set_file (self, fd):
self.socket = file_wrapper (fd)
self.add_channel()
-#not really
+