summaryrefslogtreecommitdiffstats
path: root/Lib/asynchat.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/asynchat.py')
-rw-r--r--Lib/asynchat.py164
1 files changed, 98 insertions, 66 deletions
diff --git a/Lib/asynchat.py b/Lib/asynchat.py
index 0e2457f..ae82cfa 100644
--- a/Lib/asynchat.py
+++ b/Lib/asynchat.py
@@ -45,12 +45,23 @@ command will be accumulated (using your own 'collect_incoming_data'
method) up to the terminator, and then control will be returned to
you - by calling your self.found_terminator() method.
"""
-
-import sys
import socket
import asyncore
from collections import deque
+def buffer(obj, start=None, stop=None):
+ # if memoryview objects gain slicing semantics,
+ # this function will change for the better
+ # memoryview used for the TypeError
+ memoryview(obj)
+ if start == None:
+ start = 0
+ if stop == None:
+ stop = len(obj)
+ x = obj[start:stop]
+ ## print("buffer type is: %s"%(type(x),))
+ return x
+
class async_chat (asyncore.dispatcher):
"""This is an abstract class. You must derive from this class, and add
the two methods collect_incoming_data() and found_terminator()"""
@@ -60,20 +71,47 @@ class async_chat (asyncore.dispatcher):
ac_in_buffer_size = 4096
ac_out_buffer_size = 4096
+ # we don't want to enable the use of encoding by default, because that is a
+ # sign of an application bug that we don't want to pass silently
+
+ use_encoding = 0
+ encoding = 'latin1'
+
def __init__ (self, conn=None):
+ # for string terminator matching
self.ac_in_buffer = b''
- self.ac_out_buffer = b''
- self.producer_fifo = fifo()
+
+ # we use a list here rather than cStringIO for a few reasons...
+ # del lst[:] is faster than sio.truncate(0)
+ # lst = [] is faster than sio.truncate(0)
+ # cStringIO will be gaining unicode support in py3k, which
+ # will negatively affect the performance of bytes compared to
+ # a ''.join() equivalent
+ self.incoming = []
+
+ # we toss the use of the "simple producer" and replace it with
+ # a pure deque, which the original fifo was a wrapping of
+ self.producer_fifo = deque()
asyncore.dispatcher.__init__ (self, conn)
def collect_incoming_data(self, data):
raise NotImplementedError("must be implemented in subclass")
+ def _collect_incoming_data(self, data):
+ self.incoming.append(data)
+
+ def _get_data(self):
+ d = b''.join(self.incoming)
+ del self.incoming[:]
+ return d
+
def found_terminator(self):
raise NotImplementedError("must be implemented in subclass")
def set_terminator (self, term):
"Set the input delimiter. Can be a fixed string of any length, an integer, or None"
+ if isinstance(term, str) and self.use_encoding:
+ term = bytes(term, self.encoding)
self.terminator = term
def get_terminator (self):
@@ -92,14 +130,14 @@ class async_chat (asyncore.dispatcher):
self.handle_error()
return
- if isinstance(data, str):
- data = data.encode('ascii')
- self.ac_in_buffer = self.ac_in_buffer + bytes(data)
+ if isinstance(data, str) and self.use_encoding:
+ data = bytes(str, self.encoding)
+ self.ac_in_buffer = self.ac_in_buffer + data
# Continue to search for self.terminator in self.ac_in_buffer,
# while calling self.collect_incoming_data. The while loop
# is necessary because we might read several data+terminator
- # combos with a single recv(1024).
+ # combos with a single recv(4096).
while self.ac_in_buffer:
lb = len(self.ac_in_buffer)
@@ -108,7 +146,7 @@ class async_chat (asyncore.dispatcher):
# no terminator, collect it all
self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = b''
- elif isinstance(terminator, int) or isinstance(terminator, int):
+ elif isinstance(terminator, int):
# numeric terminator
n = terminator
if lb < n:
@@ -129,8 +167,6 @@ class async_chat (asyncore.dispatcher):
# 3) end of buffer does not match any prefix:
# collect data
terminator_len = len(terminator)
- if isinstance(terminator, str):
- terminator = terminator.encode('ascii')
index = self.ac_in_buffer.find(terminator)
if index != -1:
# we found the terminator
@@ -155,91 +191,87 @@ class async_chat (asyncore.dispatcher):
self.ac_in_buffer = b''
def handle_write (self):
- self.initiate_send ()
+ self.initiate_send()
def handle_close (self):
self.close()
def push (self, data):
- self.producer_fifo.push (simple_producer (data))
+ sabs = self.ac_out_buffer_size
+ if len(data) > sabs:
+ for i in range(0, len(data), sabs):
+ self.producer_fifo.append(data[i:i+sabs])
+ else:
+ self.producer_fifo.append(data)
self.initiate_send()
def push_with_producer (self, producer):
- self.producer_fifo.push (producer)
+ self.producer_fifo.append(producer)
self.initiate_send()
def readable (self):
"predicate for inclusion in the readable for select()"
- return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
+ # cannot use the old predicate, it violates the claim of the
+ # set_terminator method.
+
+ # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
+ return 1
def writable (self):
"predicate for inclusion in the writable for select()"
- # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
- # this is about twice as fast, though not as clear.
- return not (
- (self.ac_out_buffer == b'') and
- self.producer_fifo.is_empty() and
- self.connected
- )
+ return self.producer_fifo or (not self.connected)
def close_when_done (self):
"automatically close this channel once the outgoing queue is empty"
- self.producer_fifo.push (None)
-
- # refill the outgoing buffer by calling the more() method
- # of the first producer in the queue
- def refill_buffer (self):
- while 1:
- if len(self.producer_fifo):
- p = self.producer_fifo.first()
- # a 'None' in the producer fifo is a sentinel,
- # telling us to close the channel.
- if p is None:
- if not self.ac_out_buffer:
- self.producer_fifo.pop()
- self.close()
- return
- elif isinstance(p, str) or isinstance(p, bytes):
- if isinstance(p, str):
- p = p.encode('ascii')
- self.producer_fifo.pop()
- self.ac_out_buffer = self.ac_out_buffer + p
+ self.producer_fifo.append(None)
+
+ def initiate_send(self):
+ while self.producer_fifo and self.connected:
+ first = self.producer_fifo[0]
+ # handle empty string/buffer or None entry
+ if not first:
+ del self.producer_fifo[0]
+ if first is None:
+ ## print("first is None")
+ self.handle_close()
return
- data = p.more()
+ ## print("first is not None")
+
+ # handle classic producer behavior
+ obs = self.ac_out_buffer_size
+ try:
+ data = buffer(first, 0, obs)
+ except TypeError:
+ data = first.more()
if data:
- if isinstance(data, str):
- data = data.encode('ascii')
- self.ac_out_buffer = self.ac_out_buffer + bytes(data)
- return
+ self.producer_fifo.appendleft(data)
else:
- self.producer_fifo.pop()
- else:
- return
+ del self.producer_fifo[0]
+ continue
- def initiate_send (self):
- obs = self.ac_out_buffer_size
- # try to refill the buffer
- if (len (self.ac_out_buffer) < obs):
- self.refill_buffer()
+ if isinstance(data, str) and self.use_encoding:
+ data = bytes(data, self.encoding)
- if self.ac_out_buffer and self.connected:
- # try to send the buffer
+ # send the data
try:
- num_sent = self.send (self.ac_out_buffer[:obs])
- if num_sent:
- self.ac_out_buffer = self.ac_out_buffer[num_sent:]
-
- except socket.error as why:
+ num_sent = self.send(data)
+ except socket.error:
self.handle_error()
return
+ if num_sent:
+ if num_sent < len(data) or obs < len(first):
+ self.producer_fifo[0] = first[num_sent:]
+ else:
+ del self.producer_fifo[0]
+ # we tried to send some actual data
+ return
+
def discard_buffers (self):
# Emergencies only!
self.ac_in_buffer = b''
- self.ac_out_buffer = b''
- while self.producer_fifo:
- self.producer_fifo.pop()
-
+ del self.incoming[:]
+ self.producer_fifo.clear()
class simple_producer: