diff options
author | Kurt B. Kaiser <kbk@shore.net> | 2003-05-08 20:26:55 (GMT) |
---|---|---|
committer | Kurt B. Kaiser <kbk@shore.net> | 2003-05-08 20:26:55 (GMT) |
commit | a00050f209acf2201b2382f9d534a2595bacf5c3 (patch) | |
tree | c3b85c94b6edeaca2174e6a96e1fb6af421ecfd1 /Lib/idlelib/rpc.py | |
parent | c4607dadce95073361a091d2c4cef42de5be44a3 (diff) | |
download | cpython-a00050f209acf2201b2382f9d534a2595bacf5c3.zip cpython-a00050f209acf2201b2382f9d534a2595bacf5c3.tar.gz cpython-a00050f209acf2201b2382f9d534a2595bacf5c3.tar.bz2 |
1. Implement processing of user code in subprocess MainThread. Pass loop
is now interruptable on Windows.
2. Tweak signal.signal() wait parameters as called by various methods
to improve I/O response, especially on Windows.
3. Debugger is disabled at this check-in pending further development.
M NEWS.txt
M PyShell.py
M rpc.py
M run.py
Diffstat (limited to 'Lib/idlelib/rpc.py')
-rw-r--r-- | Lib/idlelib/rpc.py | 199 |
1 files changed, 146 insertions, 53 deletions
diff --git a/Lib/idlelib/rpc.py b/Lib/idlelib/rpc.py index 15946a6..4c3ef3e 100644 --- a/Lib/idlelib/rpc.py +++ b/Lib/idlelib/rpc.py @@ -28,17 +28,21 @@ accomplished in Idle. """ import sys +import os import socket import select import SocketServer import struct import cPickle as pickle import threading +import Queue import traceback import copy_reg import types import marshal +import interrupt + def unpickle_code(ms): co = marshal.loads(ms) assert isinstance(co, types.CodeType) @@ -98,8 +102,6 @@ class RPCServer(SocketServer.TCPServer): raise except SystemExit: raise - except EOFError: - pass except: erf = sys.__stderr__ print>>erf, '\n' + '-'*40 @@ -110,28 +112,29 @@ class RPCServer(SocketServer.TCPServer): traceback.print_exc(file=erf) print>>erf, '\n*** Unrecoverable, server exiting!' print>>erf, '-'*40 - import os os._exit(0) +#----------------- end class RPCServer -------------------- objecttable = {} +request_queue = Queue.Queue(0) +response_queue = Queue.Queue(0) + class SocketIO: nextseq = 0 def __init__(self, sock, objtable=None, debugging=None): - self.mainthread = threading.currentThread() + self.sockthread = threading.currentThread() if debugging is not None: self.debugging = debugging self.sock = sock if objtable is None: objtable = objecttable self.objtable = objtable - self.cvar = threading.Condition() self.responses = {} self.cvars = {} - self.interrupted = False def close(self): sock = self.sock @@ -139,6 +142,10 @@ class SocketIO: if sock is not None: sock.close() + def exithook(self): + "override for specific exit action" + os._exit() + def debug(self, *args): if not self.debugging: return @@ -156,13 +163,12 @@ class SocketIO: except KeyError: pass - def localcall(self, request): + def localcall(self, seq, request): self.debug("localcall:", request) try: how, (oid, methodname, args, kwargs) = request except TypeError: return ("ERROR", "Bad request format") - assert how == "call" if not self.objtable.has_key(oid): return ("ERROR", "Unknown object id: %s" % `oid`) obj = self.objtable[oid] @@ -178,14 +184,20 @@ class SocketIO: return ("ERROR", "Unsupported method name: %s" % `methodname`) method = getattr(obj, methodname) try: - ret = method(*args, **kwargs) - if isinstance(ret, RemoteObject): - ret = remoteref(ret) - return ("OK", ret) + if how == 'CALL': + ret = method(*args, **kwargs) + if isinstance(ret, RemoteObject): + ret = remoteref(ret) + return ("OK", ret) + elif how == 'QUEUE': + request_queue.put((seq, (method, args, kwargs))) + return("QUEUED", None) + else: + return ("ERROR", "Unsupported message type: %s" % how) except SystemExit: raise except socket.error: - pass + raise except: self.debug("localcall:EXCEPTION") traceback.print_exc(file=sys.__stderr__) @@ -193,24 +205,37 @@ class SocketIO: def remotecall(self, oid, methodname, args, kwargs): self.debug("remotecall:asynccall: ", oid, methodname) - # XXX KBK 06Feb03 self.interrupted logic may not be necessary if - # subprocess is threaded. - if self.interrupted: - self.interrupted = False - raise KeyboardInterrupt seq = self.asynccall(oid, methodname, args, kwargs) return self.asyncreturn(seq) + def remotequeue(self, oid, methodname, args, kwargs): + self.debug("remotequeue:asyncqueue: ", oid, methodname) + seq = self.asyncqueue(oid, methodname, args, kwargs) + return self.asyncreturn(seq) + def asynccall(self, oid, methodname, args, kwargs): - request = ("call", (oid, methodname, args, kwargs)) + request = ("CALL", (oid, methodname, args, kwargs)) seq = self.newseq() + if threading.currentThread() != self.sockthread: + cvar = threading.Condition() + self.cvars[seq] = cvar self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs) self.putmessage((seq, request)) return seq + def asyncqueue(self, oid, methodname, args, kwargs): + request = ("QUEUE", (oid, methodname, args, kwargs)) + seq = self.newseq() + if threading.currentThread() != self.sockthread: + cvar = threading.Condition() + self.cvars[seq] = cvar + self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs) + self.putmessage((seq, request)) + return seq + def asyncreturn(self, seq): self.debug("asyncreturn:%d:call getresponse(): " % seq) - response = self.getresponse(seq, wait=None) + response = self.getresponse(seq, wait=0.05) self.debug(("asyncreturn:%d:response: " % seq), response) return self.decoderesponse(response) @@ -218,25 +243,36 @@ class SocketIO: how, what = response if how == "OK": return what + if how == "QUEUED": + return None if how == "EXCEPTION": self.debug("decoderesponse: EXCEPTION") return None + if how == "EOF": + self.debug("decoderesponse: EOF") + self.decode_interrupthook() + return None if how == "ERROR": self.debug("decoderesponse: Internal ERROR:", what) raise RuntimeError, what raise SystemError, (how, what) + def decode_interrupthook(self): + "" + raise EOFError + def mainloop(self): """Listen on socket until I/O not ready or EOF - Main thread pollresponse() will loop looking for seq number None, which + pollresponse() will loop looking for seq number None, which never comes, and exit on EOFError. """ try: - self.getresponse(myseq=None, wait=None) + self.getresponse(myseq=None, wait=0.05) except EOFError: - pass + self.debug("mainloop:return") + return def getresponse(self, myseq, wait): response = self._getresponse(myseq, wait) @@ -256,23 +292,24 @@ class SocketIO: def _getresponse(self, myseq, wait): self.debug("_getresponse:myseq:", myseq) - if threading.currentThread() is self.mainthread: - # Main thread: does all reading of requests or responses - # Loop here, blocking each time until socket is ready. + if threading.currentThread() is self.sockthread: + # this thread does all reading of requests or responses while 1: response = self.pollresponse(myseq, wait) if response is not None: return response else: - # Auxiliary thread: wait for notification from main thread - self.cvar.acquire() - self.cvars[myseq] = self.cvar + # wait for notification from socket handling thread + cvar = self.cvars[myseq] + cvar.acquire() while not self.responses.has_key(myseq): - self.cvar.wait() + cvar.wait() response = self.responses[myseq] + self.debug("_getresponse:%s: thread woke up: response: %s" % + (myseq, response)) del self.responses[myseq] del self.cvars[myseq] - self.cvar.release() + cvar.release() return response def newseq(self): @@ -283,7 +320,7 @@ class SocketIO: self.debug("putmessage:%d:" % message[0]) try: s = pickle.dumps(message) - except: + except pickle.UnpicklingError: print >>sys.__stderr__, "Cannot pickle:", `message` raise s = struct.pack("<i", len(s)) + s @@ -293,10 +330,13 @@ class SocketIO: except AttributeError: # socket was closed raise IOError + except socket.error: + self.debug("putmessage:socketerror:pid:%s" % os.getpid()) + os._exit(0) else: s = s[n:] - def ioready(self, wait=0.0): + def ioready(self, wait): r, w, x = select.select([self.sock.fileno()], [], [], wait) return len(r) @@ -304,7 +344,7 @@ class SocketIO: bufneed = 4 bufstate = 0 # meaning: 0 => reading count; 1 => reading data - def pollpacket(self, wait=0.0): + def pollpacket(self, wait): self._stage0() if len(self.buffer) < self.bufneed: if not self.ioready(wait): @@ -334,7 +374,7 @@ class SocketIO: self.bufstate = 0 return packet - def pollmessage(self, wait=0.0): + def pollmessage(self, wait): packet = self.pollpacket(wait) if packet is None: return None @@ -348,45 +388,97 @@ class SocketIO: raise return message - def pollresponse(self, myseq, wait=0.0): + def pollresponse(self, myseq, wait): """Handle messages received on the socket. - Some messages received may be asynchronous 'call' commands, and - some may be responses intended for other threads. + Some messages received may be asynchronous 'call' or 'queue' requests, + and some may be responses for other threads. + + 'call' requests are passed to self.localcall() with the expectation of + immediate execution, during which time the socket is not serviced. - Loop until message with myseq sequence number is received. Save others - in self.responses and notify the owning thread, except that 'call' - commands are handed off to localcall() and the response sent back - across the link with the appropriate sequence number. + 'queue' requests are used for tasks (which may block or hang) to be + processed in a different thread. These requests are fed into + request_queue by self.localcall(). Responses to queued requests are + taken from response_queue and sent across the link with the associated + sequence numbers. Messages in the queues are (sequence_number, + request/response) tuples and code using this module removing messages + from the request_queue is responsible for returning the correct + sequence number in the response_queue. + + pollresponse() will loop until a response message with the myseq + sequence number is received, and will save other responses in + self.responses and notify the owning thread. """ while 1: - message = self.pollmessage(wait) - if message is None: # socket not ready + # send queued response if there is one available + try: + qmsg = response_queue.get(0) + except Queue.Empty: + pass + else: + seq, response = qmsg + message = (seq, ('OK', response)) + self.putmessage(message) + # poll for message on link + try: + message = self.pollmessage(wait) + if message is None: # socket not ready + return None + except EOFError: + self.handle_EOF() + return None + except AttributeError: return None - #wait = 0.0 # poll on subsequent passes instead of blocking seq, resq = message + how = resq[0] self.debug("pollresponse:%d:myseq:%s" % (seq, myseq)) - if resq[0] == "call": + # process or queue a request + if how in ("CALL", "QUEUE"): self.debug("pollresponse:%d:localcall:call:" % seq) - response = self.localcall(resq) + response = self.localcall(seq, resq) self.debug("pollresponse:%d:localcall:response:%s" % (seq, response)) - self.putmessage((seq, response)) + if how == "CALL": + self.putmessage((seq, response)) + elif how == "QUEUE": + # don't acknowledge the 'queue' request! + pass continue + # return if completed message transaction elif seq == myseq: return resq + # must be a response for a different thread: else: - self.cvar.acquire() - cv = self.cvars.get(seq) + cv = self.cvars.get(seq, None) # response involving unknown sequence number is discarded, - # probably intended for prior incarnation + # probably intended for prior incarnation of server if cv is not None: + cv.acquire() self.responses[seq] = resq cv.notify() - self.cvar.release() + cv.release() continue + def handle_EOF(self): + "action taken upon link being closed by peer" + self.EOFhook() + self.debug("handle_EOF") + for key in self.cvars: + cv = self.cvars[key] + cv.acquire() + self.responses[key] = ('EOF', None) + cv.notify() + cv.release() + interrupt.interrupt_main() + # call our (possibly overridden) exit function + self.exithook() + + def EOFhook(self): + "Classes using rpc client/server can override to augment EOF action" + pass + #----------------- end class SocketIO -------------------- class RemoteObject: @@ -465,7 +557,8 @@ class RPCProxy: self.__getattributes() if not self.__attributes.has_key(name): raise AttributeError, name - __getattr__.DebuggerStepThrough=1 + + __getattr__.DebuggerStepThrough = 1 def __getattributes(self): self.__attributes = self.sockio.remotecall(self.oid, |