summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorJosiah Carlson <josiah.carlson@gmail.com>2008-06-10 05:00:08 (GMT)
committerJosiah Carlson <josiah.carlson@gmail.com>2008-06-10 05:00:08 (GMT)
commit1a72d88abf90edd72a9baf0fd6eebea2cded89c5 (patch)
tree3f7fe414a21a6021b9fbeaa8c514672302718d6f /Lib
parent602d8db2bc5ddc9a2de2843df92db53365478b3d (diff)
downloadcpython-1a72d88abf90edd72a9baf0fd6eebea2cded89c5.zip
cpython-1a72d88abf90edd72a9baf0fd6eebea2cded89c5.tar.gz
cpython-1a72d88abf90edd72a9baf0fd6eebea2cded89c5.tar.bz2
Applying updated patch from Issue 1736190, which addresses partial
issues in: 909005 and 17361001, as well as completely as possible issues 539444, 760475, 777588, 889153, 953599, 1025525, 1063924, and 658749. This patch also includes doc and test updates as necessary.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/asynchat.py128
-rw-r--r--Lib/asyncore.py159
-rw-r--r--Lib/test/test_asyncore.py3
3 files changed, 187 insertions, 103 deletions
diff --git a/Lib/asynchat.py b/Lib/asynchat.py
index 6f99ba1..1d5fb7f 100644
--- a/Lib/asynchat.py
+++ b/Lib/asynchat.py
@@ -60,16 +60,35 @@ class async_chat (asyncore.dispatcher):
ac_out_buffer_size = 4096
def __init__ (self, conn=None):
+ # for string terminator matching
self.ac_in_buffer = ''
- self.ac_out_buffer = ''
- self.producer_fifo = fifo()
+
+ # we use a list here rather than cStringIO for a few reasons...
+ # del lst[:] is faster than sio.truncate(0)
+ # lst = [] is faster than sio.truncate(0)
+ # cStringIO will be gaining unicode support in py3k, which
+ # will negatively affect the performance of bytes compared to
+ # a ''.join() equivalent
+ self.incoming = []
+
+ # we toss the use of the "simple producer" and replace it with
+ # a pure deque, which the original fifo was a wrapping of
+ self.producer_fifo = deque()
asyncore.dispatcher.__init__ (self, conn)
def collect_incoming_data(self, data):
- raise NotImplementedError, "must be implemented in subclass"
+ raise NotImplementedError("must be implemented in subclass")
+
+ def _collect_incoming_data(self, data):
+ self.incoming.append(data)
+
+ def _get_data(self):
+ d = ''.join(self.incoming)
+ del self.incoming[:]
+ return d
def found_terminator(self):
- raise NotImplementedError, "must be implemented in subclass"
+ raise NotImplementedError("must be implemented in subclass")
def set_terminator (self, term):
"Set the input delimiter. Can be a fixed string of any length, an integer, or None"
@@ -96,7 +115,7 @@ class async_chat (asyncore.dispatcher):
# Continue to search for self.terminator in self.ac_in_buffer,
# while calling self.collect_incoming_data. The while loop
# is necessary because we might read several data+terminator
- # combos with a single recv(1024).
+ # combos with a single recv(4096).
while self.ac_in_buffer:
lb = len(self.ac_in_buffer)
@@ -150,87 +169,82 @@ class async_chat (asyncore.dispatcher):
self.ac_in_buffer = ''
def handle_write (self):
- self.initiate_send ()
+ self.initiate_send()
def handle_close (self):
self.close()
def push (self, data):
- self.producer_fifo.push (simple_producer (data))
+ sabs = self.ac_out_buffer_size
+ if len(data) > sabs:
+ for i in xrange(0, len(data), sabs):
+ self.producer_fifo.append(data[i:i+sabs])
+ else:
+ self.producer_fifo.append(data)
self.initiate_send()
def push_with_producer (self, producer):
- self.producer_fifo.push (producer)
+ self.producer_fifo.append(producer)
self.initiate_send()
def readable (self):
"predicate for inclusion in the readable for select()"
- return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
+ # cannot use the old predicate, it violates the claim of the
+ # set_terminator method.
+
+ # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
+ return 1
def writable (self):
"predicate for inclusion in the writable for select()"
- # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
- # this is about twice as fast, though not as clear.
- return not (
- (self.ac_out_buffer == '') and
- self.producer_fifo.is_empty() and
- self.connected
- )
+ return self.producer_fifo or (not self.connected)
def close_when_done (self):
"automatically close this channel once the outgoing queue is empty"
- self.producer_fifo.push (None)
-
- # refill the outgoing buffer by calling the more() method
- # of the first producer in the queue
- def refill_buffer (self):
- while 1:
- if len(self.producer_fifo):
- p = self.producer_fifo.first()
- # a 'None' in the producer fifo is a sentinel,
- # telling us to close the channel.
- if p is None:
- if not self.ac_out_buffer:
- self.producer_fifo.pop()
- self.close()
- return
- elif isinstance(p, str):
- self.producer_fifo.pop()
- self.ac_out_buffer = self.ac_out_buffer + p
+ self.producer_fifo.append(None)
+
+ def initiate_send(self):
+ while self.producer_fifo and self.connected:
+ first = self.producer_fifo[0]
+ # handle empty string/buffer or None entry
+ if not first:
+ del self.producer_fifo[0]
+ if first is None:
+ self.handle_close()
return
- data = p.more()
+
+ # handle classic producer behavior
+ obs = self.ac_out_buffer_size
+ try:
+ data = buffer(first, 0, obs)
+ except TypeError:
+ data = first.more()
if data:
- self.ac_out_buffer = self.ac_out_buffer + data
- return
+ self.producer_fifo.appendleft(data)
else:
- self.producer_fifo.pop()
- else:
- return
+ del self.producer_fifo[0]
+ continue
- def initiate_send (self):
- obs = self.ac_out_buffer_size
- # try to refill the buffer
- if (len (self.ac_out_buffer) < obs):
- self.refill_buffer()
-
- if self.ac_out_buffer and self.connected:
- # try to send the buffer
+ # send the data
try:
- num_sent = self.send (self.ac_out_buffer[:obs])
- if num_sent:
- self.ac_out_buffer = self.ac_out_buffer[num_sent:]
-
- except socket.error, why:
+ num_sent = self.send(data)
+ except socket.error:
self.handle_error()
return
+ if num_sent:
+ if num_sent < len(data) or obs < len(first):
+ self.producer_fifo[0] = first[num_sent:]
+ else:
+ del self.producer_fifo[0]
+ # we tried to send some actual data
+ return
+
def discard_buffers (self):
# Emergencies only!
self.ac_in_buffer = ''
- self.ac_out_buffer = ''
- while self.producer_fifo:
- self.producer_fifo.pop()
-
+ del self.incoming[:]
+ self.producer_fifo.clear()
class simple_producer:
diff --git a/Lib/asyncore.py b/Lib/asyncore.py
index 886c845..184c993 100644
--- a/Lib/asyncore.py
+++ b/Lib/asyncore.py
@@ -53,20 +53,26 @@ import time
import os
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
- ENOTCONN, ESHUTDOWN, EINTR, EISCONN, errorcode
+ ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode
try:
socket_map
except NameError:
socket_map = {}
+def _strerror(err):
+ res = os.strerror(err)
+ if res == 'Unknown error':
+ res = errorcode[err]
+ return res
+
class ExitNow(Exception):
pass
def read(obj):
try:
obj.handle_read_event()
- except ExitNow:
+ except (ExitNow, KeyboardInterrupt, SystemExit):
raise
except:
obj.handle_error()
@@ -74,15 +80,15 @@ def read(obj):
def write(obj):
try:
obj.handle_write_event()
- except ExitNow:
+ except (ExitNow, KeyboardInterrupt, SystemExit):
raise
except:
obj.handle_error()
-def _exception (obj):
+def _exception(obj):
try:
obj.handle_expt_event()
- except ExitNow:
+ except (ExitNow, KeyboardInterrupt, SystemExit):
raise
except:
obj.handle_error()
@@ -95,7 +101,7 @@ def readwrite(obj, flags):
obj.handle_write_event()
if flags & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
obj.handle_expt_event()
- except ExitNow:
+ except (ExitNow, KeyboardInterrupt, SystemExit):
raise
except:
obj.handle_error()
@@ -116,14 +122,15 @@ def poll(timeout=0.0, map=None):
e.append(fd)
if [] == r == w == e:
time.sleep(timeout)
- else:
- try:
- r, w, e = select.select(r, w, e, timeout)
- except select.error, err:
- if err[0] != EINTR:
- raise
- else:
- return
+ return
+
+ try:
+ r, w, e = select.select(r, w, e, timeout)
+ except select.error, err:
+ if err[0] != EINTR:
+ raise
+ else:
+ return
for fd in r:
obj = map.get(fd)
@@ -209,18 +216,29 @@ class dispatcher:
else:
self._map = map
+ self._fileno = None
+
if sock:
+ # Set to nonblocking just to make sure for cases where we
+ # get a socket from a blocking source.
+ sock.setblocking(0)
self.set_socket(sock, map)
- # I think it should inherit this anyway
- self.socket.setblocking(0)
self.connected = True
- # XXX Does the constructor require that the socket passed
- # be connected?
+ # The constructor no longer requires that the socket
+ # passed be connected.
try:
self.addr = sock.getpeername()
except socket.error:
- # The addr isn't crucial
- pass
+ if err[0] == ENOTCONN:
+ # To handle the case where we got an unconnected
+ # socket.
+ self.connected = False
+ else:
+ # The socket is broken in some unknown way, alert
+ # the user and remove it from the map (to prevent
+ # polling of broken sockets).
+ self.del_channel(map)
+ raise
else:
self.socket = None
@@ -254,10 +272,9 @@ class dispatcher:
def create_socket(self, family, type):
self.family_and_type = family, type
- self.socket = socket.socket(family, type)
- self.socket.setblocking(0)
- self._fileno = self.socket.fileno()
- self.add_channel()
+ sock = socket.socket(family, type)
+ sock.setblocking(0)
+ self.set_socket(sock)
def set_socket(self, sock, map=None):
self.socket = sock
@@ -295,7 +312,7 @@ class dispatcher:
def listen(self, num):
self.accepting = True
if os.name == 'nt' and num > 5:
- num = 1
+ num = 5
return self.socket.listen(num)
def bind(self, addr):
@@ -310,10 +327,9 @@ class dispatcher:
return
if err in (0, EISCONN):
self.addr = address
- self.connected = True
- self.handle_connect()
+ self.handle_connect_event()
else:
- raise socket.error, (err, errorcode[err])
+ raise socket.error(err, errorcode[err])
def accept(self):
# XXX can return either an address pair or None
@@ -333,9 +349,11 @@ class dispatcher:
except socket.error, why:
if why[0] == EWOULDBLOCK:
return 0
+ elif why[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
+ self.handle_close()
+ return 0
else:
raise
- return 0
def recv(self, buffer_size):
try:
@@ -349,15 +367,21 @@ class dispatcher:
return data
except socket.error, why:
# winsock sometimes throws ENOTCONN
- if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
+ if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]:
self.handle_close()
return ''
else:
raise
def close(self):
+ self.connected = False
+ self.accepting = False
self.del_channel()
- self.socket.close()
+ try:
+ self.socket.close()
+ except socket.error, why:
+ if why[0] not in (ENOTCONN, EBADF):
+ raise
# cheap inheritance, used to pass all other attribute
# references to the underlying socket object.
@@ -377,27 +401,53 @@ class dispatcher:
def handle_read_event(self):
if self.accepting:
- # for an accepting socket, getting a read implies
- # that we are connected
- if not self.connected:
- self.connected = True
+ # accepting sockets are never connected, they "spawn" new
+ # sockets that are connected
self.handle_accept()
elif not self.connected:
- self.handle_connect()
- self.connected = True
+ self.handle_connect_event()
self.handle_read()
else:
self.handle_read()
+ def handle_connect_event(self):
+ self.connected = True
+ self.handle_connect()
+
def handle_write_event(self):
- # getting a write implies that we are connected
+ if self.accepting:
+ # Accepting sockets shouldn't get a write event.
+ # We will pretend it didn't happen.
+ return
+
if not self.connected:
- self.handle_connect()
- self.connected = True
+ #check for errors
+ err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ if err != 0:
+ raise socket.error(err, strerror(err))
+
+ self.handle_connect_event()
self.handle_write()
def handle_expt_event(self):
- self.handle_expt()
+ # if the handle_expt is the same default worthless method,
+ # we'll not even bother calling it, we'll instead generate
+ # a useful error
+ x = True
+ try:
+ y1 = self.__class__.handle_expt.im_func
+ y2 = dispatcher.handle_expt.im_func
+ x = y1 is y2
+ except AttributeError:
+ pass
+
+ if x:
+ err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ msg = _strerror(err)
+
+ raise socket.error(err, msg)
+ else:
+ self.handle_expt()
def handle_error(self):
nil, t, v, tbinfo = compact_traceback()
@@ -473,7 +523,8 @@ class dispatcher_with_send(dispatcher):
def compact_traceback():
t, v, tb = sys.exc_info()
tbinfo = []
- assert tb # Must have a traceback
+ if not tb: # Must have a traceback
+ raise AssertionError("traceback does not exist")
while tb:
tbinfo.append((
tb.tb_frame.f_code.co_filename,
@@ -489,11 +540,22 @@ def compact_traceback():
info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
return (file, function, line), t, v, info
-def close_all(map=None):
+def close_all(map=None, ignore_all=False):
if map is None:
map = socket_map
for x in map.values():
- x.socket.close()
+ try:
+ x.close()
+ except OSError, x:
+ if x[0] == EBADF:
+ pass
+ elif not ignore_all:
+ raise
+ except (ExitNow, KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ if not ignore_all:
+ raise
map.clear()
# Asynchronous File I/O:
@@ -513,11 +575,12 @@ if os.name == 'posix':
import fcntl
class file_wrapper:
- # here we override just enough to make a file
+ # Here we override just enough to make a file
# look like a socket for the purposes of asyncore.
+ # The passed fd is automatically os.dup()'d
def __init__(self, fd):
- self.fd = fd
+ self.fd = os.dup(fd)
def recv(self, *args):
return os.read(self.fd, *args)
@@ -539,6 +602,10 @@ if os.name == 'posix':
def __init__(self, fd, map=None):
dispatcher.__init__(self, None, map)
self.connected = True
+ try:
+ fd = fd.fileno()
+ except AttributeError:
+ pass
self.set_file(fd)
# set it to non-blocking mode
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
index 5fa7555..eead905 100644
--- a/Lib/test/test_asyncore.py
+++ b/Lib/test/test_asyncore.py
@@ -27,6 +27,9 @@ class dummychannel:
def __init__(self):
self.socket = dummysocket()
+ def close(self):
+ self.socket.close()
+
class exitingdummy:
def __init__(self):
pass