1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
|
"""protocol (David Scherer <dscherer@cmu.edu>)
This module implements a simple RPC or "distributed object" protocol.
I am probably the 100,000th person to write this in Python, but, hey,
it was fun.
Contents:
connectionLost is an exception that will be thrown by functions in
the protocol module or calls to remote methods that fail because
the remote program has closed the socket or because no connection
could be established in the first place.
Server( port=None, connection_hook=None ) creates a server on a
well-known port, to which clients can connect. When a client
connects, a Connection is created for it. If connection_hook
is defined, then connection_hook( socket.getpeername() ) is called
before a Connection is created, and if it returns false then the
connection is refused. connection_hook must be prepared to be
called from any thread.
Client( ip='127.0.0.1', port=None ) returns a Connection to a Server
object at a well-known address and port.
Connection( socket ) creates an RPC connection on an arbitrary socket,
which must already be connected to another program. You do not
need to use this directly if you are using Client() or Server().
publish( name, connect_function ) provides an object with the
specified name to some or all Connections. When another program
calls Connection.getobject() with the specified name, the
specified connect_function is called with the arguments
connect_function( conn, addr )
where conn is the Connection object to the requesting client and
addr is the address returned by socket.getpeername(). If that
function returns an object, that object becomes accessible to
the caller. If it returns None, the caller's request fails.
Connection objects:
.close() refuses additional RPC messages from the peer, and notifies
the peer that the connection has been closed. All pending remote
method calls in either program will fail with a connectionLost
exception. Further remote method calls on this connection will
also result in errors.
.getobject(name) returns a proxy for the remote object with the
specified name, if it exists and the peer permits us access.
Otherwise, it returns None. It may throw a connectionLost
exception. The returned proxy supports basic attribute access
and method calls, and its methods have an extra attribute,
.void, which is a function that has the same effect but always
returns None. This last capability is provided as a performance
hack: object.method.void(params) can return without waiting for
the remote process to respond, but object.method(params) needs
to wait for a return value or exception.
.rpc_loop(block=0) processes *incoming* messages for this connection.
If block=1, it continues processing until an exception or return
value is received, which is normally forever. Otherwise it
returns when all currently pending messages have been delivered.
It may throw a connectionLost exception.
.set_close_hook(f) specifies a function to be called when the remote
object closes the connection during a call to rpc_loop(). This
is a good way for servers to be notified when clients disconnect.
.set_shutdown_hook(f) specifies a function called *immediately* when
the receive loop detects that the connection has been lost. The
provided function must be prepared to run in any thread.
Server objects:
.rpc_loop() processes incoming messages on all connections, and
returns when all pending messages have been processed. It will
*not* throw connectionLost exceptions; the
Connection.set_close_hook() mechanism is much better for servers.
"""
import sys, os, string, types
import socket
from threading import Thread
from Queue import Queue, Empty
from cPickle import Pickler, Unpickler, PicklingError
class connectionLost:
def __init__(self, what=""): self.what = what
def __repr__(self): return self.what
def __str__(self): return self.what
def getmethods(cls):
"Returns a list of the names of the methods of a class."
methods = []
for b in cls.__bases__:
methods = methods + getmethods(b)
d = cls.__dict__
for k in d.keys():
if type(d[k])==types.FunctionType:
methods.append(k)
return methods
class methodproxy:
"Proxy for a method of a remote object."
def __init__(self, classp, name):
self.classp=classp
self.name=name
self.client = classp.client
def __call__(self, *args, **keywords):
return self.client.call( 'm', self.classp.name, self.name, args, keywords )
def void(self, *args, **keywords):
self.client.call_void( 'm', self.classp.name,self.name,args,keywords)
class classproxy:
"Proxy for a remote object."
def __init__(self, client, name, methods):
self.__dict__['client'] = client
self.__dict__['name'] = name
for m in methods:
prox = methodproxy( self, m )
self.__dict__[m] = prox
def __getattr__(self, attr):
return self.client.call( 'g', self.name, attr )
def __setattr__(self, attr, value):
self.client.call_void( 's', self.name, attr, value )
local_connect = {}
def publish(name, connect_function):
local_connect[name]=connect_function
class socketFile:
"File emulator based on a socket. Provides only blocking semantics for now."
def __init__(self, socket):
self.socket = socket
self.buffer = ''
def _recv(self,bytes):
try:
r=self.socket.recv(bytes)
except:
raise connectionLost()
if not r:
raise connectionLost()
return r
def write(self, string):
try:
self.socket.send( string )
except:
raise connectionLost()
def read(self,bytes):
x = bytes-len(self.buffer)
while x>0:
self.buffer=self.buffer+self._recv(x)
x = bytes-len(self.buffer)
s = self.buffer[:bytes]
self.buffer=self.buffer[bytes:]
return s
def readline(self):
while 1:
f = string.find(self.buffer,'\n')
if f>=0:
s = self.buffer[:f+1]
self.buffer=self.buffer[f+1:]
return s
self.buffer = self.buffer + self._recv(1024)
class Connection (Thread):
debug = 0
def __init__(self, socket):
self.local_objects = {}
self.socket = socket
self.name = socket.getpeername()
self.socketfile = socketFile(socket)
self.queue = Queue(-1)
self.refuse_messages = 0
self.cmds = { 'm': self.r_meth,
'g': self.r_get,
's': self.r_set,
'o': self.r_geto,
'e': self.r_exc,
#'r' handled by rpc_loop
}
Thread.__init__(self)
self.setDaemon(1)
self.start()
def getobject(self, name):
methods = self.call( 'o', name )
if methods is None: return None
return classproxy(self, name, methods)
# close_hook is called from rpc_loop(), like a normal remote method
# invocation
def set_close_hook(self,hook): self.close_hook = hook
# shutdown_hook is called directly from the run() thread, and needs
# to be "thread safe"
def set_shutdown_hook(self,hook): self.shutdown_hook = hook
close_hook = None
shutdown_hook = None
def close(self):
self._shutdown()
self.refuse_messages = 1
def call(self, c, *args):
self.send( (c, args, 1 ) )
return self.rpc_loop( block = 1 )
def call_void(self, c, *args):
try:
self.send( (c, args, 0 ) )
except:
pass
# the following methods handle individual RPC calls:
def r_geto(self, obj):
c = local_connect.get(obj)
if not c: return None
o = c(self, self.name)
if not o: return None
self.local_objects[obj] = o
return getmethods(o.__class__)
def r_meth(self, obj, name, args, keywords):
return apply( getattr(self.local_objects[obj],name), args, keywords)
def r_get(self, obj, name):
return getattr(self.local_objects[obj],name)
def r_set(self, obj, name, value):
setattr(self.local_objects[obj],name,value)
def r_exc(self, e, v):
raise e, v
def rpc_exec(self, cmd, arg, ret):
if self.refuse_messages: return
if self.debug: print cmd,arg,ret
if ret:
try:
r=apply(self.cmds.get(cmd), arg)
self.send( ('r', r, 0) )
except:
try:
self.send( ('e', sys.exc_info()[:2], 0) )
except PicklingError:
self.send( ('e', (TypeError, 'Unpicklable exception.'), 0 ) )
else:
# we cannot report exceptions to the caller, so
# we report them in this process.
r=apply(self.cmds.get(cmd), arg)
# the following methods implement the RPC and message loops:
def rpc_loop(self, block=0):
if self.refuse_messages: raise connectionLost('(already closed)')
try:
while 1:
try:
cmd, arg, ret = self.queue.get( block )
except Empty:
return None
if cmd=='r': return arg
self.rpc_exec(cmd,arg,ret)
except connectionLost:
if self.close_hook:
self.close_hook()
self.close_hook = None
raise
def run(self):
try:
while 1:
data = self.recv()
self.queue.put( data )
except:
self.queue.put( ('e', sys.exc_info()[:2], 0) )
# The following send raw pickled data to the peer
def send(self, data):
try:
Pickler(self.socketfile,1).dump( data )
except connectionLost:
self._shutdown()
if self.shutdown_hook: self.shutdown_hook()
raise
def recv(self):
try:
return Unpickler(self.socketfile).load()
except connectionLost:
self._shutdown()
if self.shutdown_hook: self.shutdown_hook()
raise
except:
raise
def _shutdown(self):
try:
self.socket.shutdown(1)
self.socket.close()
except:
pass
class Server (Thread):
default_port = 0x1D1E # "IDlE"
def __init__(self, port=None, connection_hook=None):
self.connections = []
self.port = port or self.default_port
self.connection_hook = connection_hook
try:
self.wellknown = s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind('', self.port)
s.listen(3)
except:
raise connectionLost
Thread.__init__(self)
self.setDaemon(1)
self.start()
def run(self):
s = self.wellknown
while 1:
conn, addr = s.accept()
if self.connection_hook and not self.connection_hook(addr):
try:
conn.shutdown(1)
except:
pass
continue
self.connections.append( Connection(conn) )
def rpc_loop(self):
cns = self.connections[:]
for c in cns:
try:
c.rpc_loop(block = 0)
except connectionLost:
if c in self.connections:
self.connections.remove(c)
def Client(ip='127.0.0.1', port=None):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(ip,port or Server.default_port)
except socket.error, what:
raise connectionLost(str(what))
except:
raise connectionLost()
return Connection(s)
|