summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>1996-08-19 22:26:43 (GMT)
committerGuido van Rossum <guido@python.org>1996-08-19 22:26:43 (GMT)
commit731630b5bf785ccff76ac623278b9b174ca13fe9 (patch)
tree584dad208258c75bdffe265022775458fa4c24df /Lib
parentbf66c64ffbb1f91a26c6f67e896fa75479aba83e (diff)
downloadcpython-731630b5bf785ccff76ac623278b9b174ca13fe9.zip
cpython-731630b5bf785ccff76ac623278b9b174ca13fe9.tar.gz
cpython-731630b5bf785ccff76ac623278b9b174ca13fe9.tar.bz2
XDR interface, evolved from Demo/rpc/xdr.py by Barry.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/xdrlib.py324
1 files changed, 324 insertions, 0 deletions
diff --git a/Lib/xdrlib.py b/Lib/xdrlib.py
new file mode 100644
index 0000000..901a05b
--- /dev/null
+++ b/Lib/xdrlib.py
@@ -0,0 +1,324 @@
+"""Implements (a subset of) Sun XDR -- eXternal Data Representation.
+
+See: RFC 1014
+
+This module will conditionally use the _xdrmodule.so module to get
+support for those representations we can't do much with from Python.
+
+"""
+
+import struct
+from types import LongType
+
+# workaround Python 1.4b2 bug
+import sys
+sys.path[0] = '.'
+
+# use C layer XDR libraries for some data types if available
+try:
+ import _xdr
+except ImportError:
+ _xdr = None
+
+# this test is done to see if machine representation is the same as
+# network representation. if so, we can use module struct for packing
+# some data types
+__USE_MACHINE_REP = (struct.pack('l', 1) == '\0\0\0\1')
+
+# exceptions
+class Error:
+ """Exception class for this module. Use:
+
+ except xdrlib.Error, var:
+ # var has the Error instance for the exception
+
+ Public ivars:
+ msg -- contains the message
+
+ """
+ def __init__(self, msg):
+ self.msg = msg
+ def __repr__(self):
+ return repr(self.msg)
+ def __str__(self):
+ return str(self.msg)
+
+
+class ConversionError(Error):
+ pass
+
+
+
+class Packer:
+ """Pack various data representations into a buffer."""
+
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.__buf = ''
+
+ def get_buffer(self):
+ return self.__buf
+ # backwards compatibility
+ get_buf = get_buffer
+
+ def pack_uint(self, x):
+ self.__buf = self.__buf + \
+ (chr(int(x>>24 & 0xff)) + chr(int(x>>16 & 0xff)) + \
+ chr(int(x>>8 & 0xff)) + chr(int(x & 0xff)))
+ if __USE_MACHINE_REP:
+ def pack_uint(self, x):
+ if type(x) == LongType:
+ x = int((x + 0x80000000L) % 0x100000000L - 0x80000000L)
+ self.__buf = self.__buf + struct.pack('l', x)
+
+ pack_int = pack_uint
+ pack_enum = pack_int
+
+ def pack_bool(self, x):
+ if x: self.__buf = self.__buf + '\0\0\0\1'
+ else: self.__buf = self.__buf + '\0\0\0\0'
+
+ def pack_uhyper(self, x):
+ self.pack_uint(int(x>>32 & 0xffffffff))
+ self.pack_uint(int(x & 0xffffffff))
+
+ pack_hyper = pack_uhyper
+
+ def pack_float(self, x):
+ raise ConversionError('Not supported')
+ def pack_double(self, x):
+ raise ConversionError('Not supported')
+ # get these from the C layer if available
+ if _xdr:
+ def pack_float(self, x):
+ try: self.__buf = self.__buf + _xdr.pack_float(x)
+ except _xdr.error, msg:
+ raise ConversionError(msg)
+ def pack_double(self, x):
+ try: self.__buf = self.__buf + _xdr.pack_double(x)
+ except _xdr.error, msg:
+ raise ConversionError(msg)
+
+ def pack_fstring(self, n, s):
+ if n < 0:
+ raise ValueError, 'fstring size must be nonnegative'
+ n = ((n+3)/4)*4
+ data = s[:n]
+ data = data + (n - len(data)) * '\0'
+ self.__buf = self.__buf + data
+
+ pack_fopaque = pack_fstring
+
+ def pack_string(self, s):
+ n = len(s)
+ self.pack_uint(n)
+ self.pack_fstring(n, s)
+
+ pack_opaque = pack_string
+ pack_bytes = pack_string
+
+ def pack_list(self, list, pack_item):
+ for item in list:
+ self.pack_uint(1)
+ pack_item(item)
+ self.pack_uint(0)
+
+ def pack_farray(self, n, list, pack_item):
+ if len(list) <> n:
+ raise ValueError, 'wrong array size'
+ for item in list:
+ pack_item(item)
+
+ def pack_array(self, list, pack_item):
+ n = len(list)
+ self.pack_uint(n)
+ self.pack_farray(n, list, pack_item)
+
+
+
+class Unpacker:
+ """Unpacks various data representations from the given buffer."""
+
+ def __init__(self, data):
+ self.reset(data)
+
+ def reset(self, data):
+ self.__buf = data
+ self.__pos = 0
+
+ def get_position(self):
+ return self.__pos
+
+ def set_position(self, position):
+ self.__pos = position
+
+ def done(self):
+ if self.__pos < len(self.__buf):
+ raise Error('unextracted data remains')
+
+ def unpack_uint(self):
+ i = self.__pos
+ self.__pos = j = i+4
+ data = self.__buf[i:j]
+ if len(data) < 4:
+ raise EOFError
+ x = long(ord(data[0]))<<24 | ord(data[1])<<16 | \
+ ord(data[2])<<8 | ord(data[3])
+ # Return a Python long only if the value is not representable
+ # as a nonnegative Python int
+ if x < 0x80000000L:
+ x = int(x)
+ return x
+ if __USE_MACHINE_REP:
+ def unpack_uint(self):
+ i = self.__pos
+ self.__pos = j = i+4
+ data = self.__buf[i:j]
+ if len(data) < 4:
+ raise EOFError
+ return struct.unpack('l', data)[0]
+
+ def unpack_int(self):
+ x = self.unpack_uint()
+ if x >= 0x80000000L:
+ x = x - 0x100000000L
+ return int(x)
+
+ unpack_enum = unpack_int
+ unpack_bool = unpack_int
+
+ def unpack_uhyper(self):
+ hi = self.unpack_uint()
+ lo = self.unpack_uint()
+ return long(hi)<<32 | lo
+
+ def unpack_hyper(self):
+ x = self.unpack_uhyper()
+ if x >= 0x8000000000000000L:
+ x = x - 0x10000000000000000L
+ return x
+
+ def unpack_float(self):
+ raise ConversionError('Not supported')
+ def unpack_double(self):
+ raise ConversionError('Not supported')
+ # get these from the C layer if available
+ if _xdr:
+ def unpack_float(self):
+ i = self.__pos
+ self.__pos = j = i+4
+ data = self.__buf[i:j]
+ if len(data) < 4:
+ raise EOFError
+ try: return _xdr.unpack_float(data)
+ except _xdr.error, msg:
+ raise ConversionError(msg)
+
+ def unpack_double(self):
+ i = self.__pos
+ self.__pos = j = i+8
+ data = self.__buf[i:j]
+ if len(data) < 8:
+ raise EOFError
+ try: return _xdr.unpack_double(data)
+ except _xdr.error, msg:
+ raise ConversionError(msg)
+
+ def unpack_fstring(self, n):
+ if n < 0:
+ raise ValueError, 'fstring size must be nonnegative'
+ i = self.__pos
+ j = i + (n+3)/4*4
+ if j > len(self.__buf):
+ raise EOFError
+ self.__pos = j
+ return self.__buf[i:i+n]
+
+ unpack_fopaque = unpack_fstring
+
+ def unpack_string(self):
+ n = self.unpack_uint()
+ return self.unpack_fstring(n)
+
+ unpack_opaque = unpack_string
+ unpack_bytes = unpack_string
+
+ def unpack_list(self, unpack_item):
+ list = []
+ while 1:
+ x = self.unpack_uint()
+ if x == 0: break
+ if x <> 1:
+ raise ConversionError('0 or 1 expected, got ' + `x`)
+ item = unpack_item()
+ list.append(item)
+ return list
+
+ def unpack_farray(self, n, unpack_item):
+ list = []
+ for i in range(n):
+ list.append(unpack_item())
+ return list
+
+ def unpack_array(self, unpack_item):
+ n = self.unpack_uint()
+ return self.unpack_farray(n, unpack_item)
+
+
+# test suite
+def __test():
+ p = Packer()
+ packtest = [
+ (p.pack_uint, (9,)),
+ (p.pack_bool, (None,)),
+ (p.pack_bool, ('hello',)),
+ (p.pack_uhyper, (45L,)),
+ (p.pack_float, (1.9,)),
+ (p.pack_double, (1.9,)),
+ (p.pack_string, ('hello world',)),
+ (p.pack_list, (range(5), p.pack_uint)),
+ (p.pack_array, (['what', 'is', 'hapnin', 'doctor'], p.pack_string)),
+ ]
+ succeedlist = [1] * len(packtest)
+ count = 0
+ for method, args in packtest:
+ print 'pack test', count,
+ try:
+ apply(method, args)
+ print 'succeeded'
+ except ConversionError, var:
+ print 'ConversionError:', var.msg
+ succeedlist[count] = 0
+ count = count + 1
+ data = p.get_buffer()
+ # now verify
+ up = Unpacker(data)
+ unpacktest = [
+ (up.unpack_uint, (), lambda x: x == 9),
+ (up.unpack_bool, (), lambda x: not x),
+ (up.unpack_bool, (), lambda x: x),
+ (up.unpack_uhyper, (), lambda x: x == 45L),
+ (up.unpack_float, (), lambda x: 1.89 < x < 1.91),
+ (up.unpack_double, (), lambda x: 1.89 < x < 1.91),
+ (up.unpack_string, (), lambda x: x == 'hello world'),
+ (up.unpack_list, (up.unpack_uint,), lambda x: x == range(5)),
+ (up.unpack_array, (up.unpack_string,),
+ lambda x: x == ['what', 'is', 'hapnin', 'doctor']),
+ ]
+ count = 0
+ for method, args, pred in unpacktest:
+ print 'unpack test', count,
+ try:
+ if succeedlist[count]:
+ x = apply(method, args)
+ print pred(x) and 'succeeded' or 'failed', ':', x
+ else:
+ print 'skipping'
+ except ConversionError, var:
+ print 'ConversionError:', var.msg
+ count = count + 1
+
+if __name__ == '__main__':
+ __test()