diff options
Diffstat (limited to 'xpa/python')
-rw-r--r-- | xpa/python/PythonXPA/README | 25 | ||||
-rw-r--r-- | xpa/python/PythonXPA/client/XPAClient.py | 98 | ||||
-rwxr-xr-x | xpa/python/PythonXPA/client/test_XPAC.py | 104 | ||||
-rw-r--r-- | xpa/python/PythonXPA/server/XPAServer.py | 125 | ||||
-rwxr-xr-x | xpa/python/PythonXPA/server/test_XPAS.py | 91 |
5 files changed, 443 insertions, 0 deletions
diff --git a/xpa/python/PythonXPA/README b/xpa/python/PythonXPA/README new file mode 100644 index 0000000..9c803db --- /dev/null +++ b/xpa/python/PythonXPA/README @@ -0,0 +1,25 @@ +Hi Eric, + +it's me again. Hope everything is ok with you. + +I started working with python and because python 2.5 comes with ctypes +included I started to wrap some XPA functionality. In the web I only +found PyXPA 0.2 which is very alpha. And with ctypes, we don't need any +c-code linked in, python package is enough. + +Although I have wrapped only the functionality I need for now, I think +it is already usable. And, of course, when you or anyone is interested +in it, I can easily add more functionality. + +There is no installation needed. Just adjust the path to your libxpa.so +in XPAClient.py and XPAServer.py and optionally adjust NXPA in +XCPAClient.py. + +Then you can start stest and run test_XPAC.py or run test_XPAS.py and +issue some xpainfo/get/set commands. + + + +Regards, +Bernhard + diff --git a/xpa/python/PythonXPA/client/XPAClient.py b/xpa/python/PythonXPA/client/XPAClient.py new file mode 100644 index 0000000..29e39eb --- /dev/null +++ b/xpa/python/PythonXPA/client/XPAClient.py @@ -0,0 +1,98 @@ + +import sys +from ctypes import * + +libxpa=cdll.LoadLibrary("../../libxpa.so.1.0") +libc=cdll.LoadLibrary(None); + + +#xpa_t = c_void_p +#xpa = xpa_t(None) + +c_byte_p = POINTER(c_byte) + + + +############## + +NXPA = 10 + +############## + + +def free_bufs(p_arr, len): + for i in range(len): + if p_arr[i]: + print "freeing [", i, "] ", p_arr[i][0], p_arr[i][1] + libc.free(p_arr[i]) + + + + + +## XPA XPAOpen(char *mode); + +XPAOpen = libxpa.XPAOpen +libxpa.XPAOpen.restype = c_void_p +libxpa.XPAOpen.argtypes = [c_char_p] + +## void XPAClose(XPA xpa); +XPAClose = libxpa.XPAClose +libxpa.XPAClose.argtypes = [c_void_p] + + +## int XPAGet(XPA xpa, char *template, char *paramlist, char *mode, +## char **bufs, int *lens, char **names, char **messages, int n); + +XPAGet = libxpa.XPAGet +libxpa.XPAGet.restype = c_int +libxpa.XPAGet.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, \ + c_byte_p*NXPA, c_int*NXPA, c_byte_p*NXPA, c_byte_p*NXPA, \ + c_int] + + +## when you want to send something like "hello\0x00 you" use this + +## int XPASet(XPA xpa, +## char *template, char *paramlist, char *mode, +## char *buf, int len, char **names, char **messages, +## int n); + +#libxpa.XPASet.restype = c_int +#libxpa.XPASet.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, \ +# c_byte_p, c_int, c_char_p*NXPA, c_char_p*NXPA, \ +# c_int] + +## when you know, you will send strings only, use this one to avoid conversions + +XPASet = libxpa.XPASet +libxpa.XPASet.restype = c_int +libxpa.XPASet.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, \ + c_char_p, c_int, c_byte_p*NXPA, c_byte_p*NXPA, \ + c_int] + +## int XPAInfo(XPA xpa, +## char *template, char *paramlist, char *mode, +## char **names, char **messages, int n); + +XPAInfo = libxpa.XPAInfo +libxpa.XPAInfo.restype = c_int +libxpa.XPAInfo.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, \ + c_byte_p*NXPA, c_byte_p*NXPA, \ + c_int] + +## int XPAAccess(XPA xpa, +## char *template, char *paramlist, char *mode, +## char **names, char **messages, int n); + +XPAAccess = libxpa.XPAAccess +libxpa.XPAAccess.restype = c_int +libxpa.XPAAccess.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, \ + c_byte_p*NXPA, c_byte_p*NXPA, \ + c_int] + +#del libxpa + + + + diff --git a/xpa/python/PythonXPA/client/test_XPAC.py b/xpa/python/PythonXPA/client/test_XPAC.py new file mode 100755 index 0000000..7111a98 --- /dev/null +++ b/xpa/python/PythonXPA/client/test_XPAC.py @@ -0,0 +1,104 @@ +#!/usr/bin/python + +################################ +####### testing: # +####### please start "stest" # +################################ + +from XPAClient import * + +import time + +xpa = None +print xpa +xpa = XPAOpen("ack=false") +############################ +print xpa + +buff_t = c_byte_p*NXPA + +buffs = buff_t() +names = buff_t() +messages = buff_t() + + + +int_t = c_int*NXPA +lens = int_t() + +ok = True +while ok: + #ok = False + print "--- XPAGet Test 1" + n = XPAGet(xpa, "XPA:xpa*", "help", "ack=false", buffs, lens, names, \ + messages, NXPA) + ######################################################################### + print n + print buffs[0][0:lens[0]], "\n", buffs[1][0:lens[1]] + print string_at(names[0]), string_at(names[1]) + + for j in range(n): + ascii_text = string_at(buffs[j], lens[j]) + #ascii_text = ''.join(chr(buffs[j][i]) for i in range(0, lens[j])) + print "---", j, "---\n>>>\n" + ascii_text + "\n<<<" + + free_bufs(buffs, NXPA) + free_bufs(names, NXPA) + free_bufs(messages, NXPA) + + + print "--- XPASet Test 1" + txt = "Hey, you!" + n = XPASet(xpa, "XPA:xpa", "", "", txt, len(txt), names, messages, NXPA) + ######################################################################### + print string_at(names[0]) + free_bufs(names, NXPA) + free_bufs(messages, NXPA) + + #sys.exit(0) + + print "--- XPAGet Test 2" + n = XPAGet(xpa, "XPA:xpa[12]", "error", "", buffs, lens, names, \ + messages, NXPA) + ######################################################################### + print n + print ":::: " + if names[0]: + print "\t names[0]: ", string_at(names[0]) + if names[1]: + print "\t names[1]: ", string_at(names[1]) + + free_bufs(buffs, NXPA) + free_bufs(names, NXPA) + free_bufs(messages, NXPA) + + print "--- XPAInfo Test 1" + n = XPAInfo(xpa, "XPA:i_xpa", "hey there", "", names, messages, NXPA) + ######################################################################### + print n + if names: + free_bufs(names, NXPA) + if messages: + free_bufs(messages, NXPA) + + n = XPAAccess(xpa, "XPA:*", "", "", names, messages, NXPA) + ######################################################################### + print n + for j in range(n): + print "access ---", j, "--- >>>" + string_at(names[j]) + "<<<" + + if names: + free_bufs(names, NXPA) + if messages: + free_bufs(messages, NXPA) + + + + #time.sleep(0.010) + + + +XPAClose(xpa) +############### +xpa = None + diff --git a/xpa/python/PythonXPA/server/XPAServer.py b/xpa/python/PythonXPA/server/XPAServer.py new file mode 100644 index 0000000..4c46ed3 --- /dev/null +++ b/xpa/python/PythonXPA/server/XPAServer.py @@ -0,0 +1,125 @@ + +from ctypes import * + +x=cdll.LoadLibrary("../libxpa.so.1.0") + + +c_byte_p = POINTER(c_byte) + + +## int XPAPoll(int msec, int maxreq); + +XPAPoll = x.XPAPoll +x.XPAPoll.restype = c_int +x.XPAPoll.argtypes = [c_int, c_int] + + + + +## typedef struct xparec{ +## /* xpa version */ +## char *version; +## /* status of this xpa */ +## int status; +## /* "g", "s", "i" are server types; "c" for client */ +## char *type; +## /* +## * THE SERVER SIDE +## */ +## struct xparec *next; +## char *xclass; +## char *name; +## char *help; + +## +## /* and so on */ +## +## } *XPA; + +# ctypes wrapper +class xparec(Structure): + _fields_ = [ ("version", c_char_p), \ + ("status", c_int), \ + ("type", c_char_p), \ + ("next", c_void_p), \ + ("xclass", c_char_p), \ + ("name", c_char_p), \ + ("help", c_char_p) \ + ] + +XPA = POINTER(xparec) + + + + + +## int XPAFree(XPA xpa); + +XPAFree = x.XPAFree +x.XPAFree.restype = c_void_p +x.XPAFree.argtypes = [XPA] + + + +## info callback: +## int info_cb(void *info_data, void *call_data, char *paramlist) + + + +INFOCBFUNC = CFUNCTYPE(c_int, c_char_p, XPA, c_char_p) + +### implement like this: +#def py_infocb_func(a, b, c): +# print "py_cmp_func", a, b +# return 0 +# +#infocb_func = INFOCBFUNC(py_infocb_func) + + + +## XPA XPAInfoNew(char *class, char *name, +## int (*info_callback)(), +## void *info_data, char *info_mode); + +XPAInfoNew = x.XPAInfoNew +x.XPAInfoNew.restype = XPA +x.XPAInfoNew.argtypes = [c_char_p, c_char_p, INFOCBFUNC, c_char_p, c_char_p] + + + + + +## int send_callback(void *send_data, void *call_data, +## char *paramlist, char **buf, int *len) + +SENDCBFUNC = CFUNCTYPE(c_int, c_char_p, XPA, c_char_p, POINTER(c_byte_p), POINTER(c_int)) + +### implement like this: +#def py_sendcb_func(a, data, call_data, param, buf, len): +# print "inside send_callback" +# return 0 +# +#sendcb_func = SENDCBFUNC(py_sendcb_func) + + +## int receive_callback(void *receive_data, void *call_data, +## char *paramlist, char *buf, int len) + +RCVCBFUNC = CFUNCTYPE(c_int, c_char_p, XPA, c_char_p, c_byte_p, c_int) + + + +## XPA XPANew(char *class, char *name, char *help, +## int (*send_callback)(), +## void *send_data, char *send_mode, +## int (*rec_callback)(), +## void *rec_data, char *rec_mode); + +XPANew = x.XPANew +x.XPANew.restype = XPA +x.XPANew.argtypes = [c_char_p, c_char_p, c_char_p, \ + SENDCBFUNC, \ + c_char_p, c_char_p, \ + RCVCBFUNC, \ + c_char_p, c_char_p \ + ] diff --git a/xpa/python/PythonXPA/server/test_XPAS.py b/xpa/python/PythonXPA/server/test_XPAS.py new file mode 100755 index 0000000..fe4f1a2 --- /dev/null +++ b/xpa/python/PythonXPA/server/test_XPAS.py @@ -0,0 +1,91 @@ +#!/usr/bin/python + +################################ +####### testing: # +################################ + +from XPAServer import * + +import time + +print "Poll" +n = x.XPAPoll(10000, 0) +print "got %d" % n + +EXIT_FLAG = False + + +############################## +# setting up Info Point # +############################## + +def py_infocb_func(a, b, c): + global EXIT_FLAG + print ">>> INFO:" + print "params:", c + print "info:", a + print "XPA.name", b[0].name + print "<<<" + if (c[0:4] == "exit"): + print "setting EXIT_FLAG" + EXIT_FLAG = True + return 0 + +infocb_func = INFOCBFUNC(py_infocb_func) + +xpa = XPAInfoNew("XPA", "i_test", infocb_func, "my info", "") + +############################## + + + + +####################################### +# setting up AccessPoint Get/Set # +####################################### + +def py_sendcb_func(data, call_data, param, buf, len): + print "inside send_callback" + print "param:", param + print "buf:", string_at(buf) + buf[0] = cast("this is test only\n", c_byte_p) + len[0] = 19 + return 0 + +sendcb_func = SENDCBFUNC(py_sendcb_func) + + +def py_rcvcb_func(data, call_data, param, buf, len): + print "inside rcv_callback" + print "param:", param + print "got %d bytes" % len + print "buf:", string_at(buf) + return 0 + +rcvcb_func = RCVCBFUNC(py_rcvcb_func) + + +xpa2 = XPANew("XPA", "myxpa", "this is great help", + sendcb_func, + "SEND_DATA", "freebuf=false", + rcvcb_func, + "", "") + + + + + +############################## +print "Entering loop" + +while EXIT_FLAG == False: + n = XPAPoll(1000, 1) + print "got:", n + +print "loop finished" +############################## + + +print "calling XPAFree" +XPAFree(xpa) +XPAFree(xpa2) |