summaryrefslogtreecommitdiffstats
path: root/xpa/python
diff options
context:
space:
mode:
Diffstat (limited to 'xpa/python')
-rw-r--r--xpa/python/PythonXPA/README25
-rw-r--r--xpa/python/PythonXPA/client/XPAClient.py98
-rwxr-xr-xxpa/python/PythonXPA/client/test_XPAC.py104
-rw-r--r--xpa/python/PythonXPA/server/XPAServer.py125
-rwxr-xr-xxpa/python/PythonXPA/server/test_XPAS.py91
5 files changed, 443 insertions, 0 deletions
diff --git a/xpa/python/PythonXPA/README b/xpa/python/PythonXPA/README
new file mode 100644
index 0000000..9c803db
--- /dev/null
+++ b/xpa/python/PythonXPA/README
@@ -0,0 +1,25 @@
+Hi Eric,
+
+it's me again. Hope everything is ok with you.
+
+I started working with python and because python 2.5 comes with ctypes
+included I started to wrap some XPA functionality. In the web I only
+found PyXPA 0.2 which is very alpha. And with ctypes, we don't need any
+c-code linked in, python package is enough.
+
+Although I have wrapped only the functionality I need for now, I think
+it is already usable. And, of course, when you or anyone is interested
+in it, I can easily add more functionality.
+
+There is no installation needed. Just adjust the path to your libxpa.so
+in XPAClient.py and XPAServer.py and optionally adjust NXPA in
+XCPAClient.py.
+
+Then you can start stest and run test_XPAC.py or run test_XPAS.py and
+issue some xpainfo/get/set commands.
+
+
+
+Regards,
+Bernhard
+
diff --git a/xpa/python/PythonXPA/client/XPAClient.py b/xpa/python/PythonXPA/client/XPAClient.py
new file mode 100644
index 0000000..29e39eb
--- /dev/null
+++ b/xpa/python/PythonXPA/client/XPAClient.py
@@ -0,0 +1,98 @@
+
+import sys
+from ctypes import *
+
+libxpa=cdll.LoadLibrary("../../libxpa.so.1.0")
+libc=cdll.LoadLibrary(None);
+
+
+#xpa_t = c_void_p
+#xpa = xpa_t(None)
+
+c_byte_p = POINTER(c_byte)
+
+
+
+##############
+
+NXPA = 10
+
+##############
+
+
+def free_bufs(p_arr, len):
+ for i in range(len):
+ if p_arr[i]:
+ print "freeing [", i, "] ", p_arr[i][0], p_arr[i][1]
+ libc.free(p_arr[i])
+
+
+
+
+
+## XPA XPAOpen(char *mode);
+
+XPAOpen = libxpa.XPAOpen
+libxpa.XPAOpen.restype = c_void_p
+libxpa.XPAOpen.argtypes = [c_char_p]
+
+## void XPAClose(XPA xpa);
+XPAClose = libxpa.XPAClose
+libxpa.XPAClose.argtypes = [c_void_p]
+
+
+## int XPAGet(XPA xpa, char *template, char *paramlist, char *mode,
+## char **bufs, int *lens, char **names, char **messages, int n);
+
+XPAGet = libxpa.XPAGet
+libxpa.XPAGet.restype = c_int
+libxpa.XPAGet.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, \
+ c_byte_p*NXPA, c_int*NXPA, c_byte_p*NXPA, c_byte_p*NXPA, \
+ c_int]
+
+
+## when you want to send something like "hello\0x00 you" use this
+
+## int XPASet(XPA xpa,
+## char *template, char *paramlist, char *mode,
+## char *buf, int len, char **names, char **messages,
+## int n);
+
+#libxpa.XPASet.restype = c_int
+#libxpa.XPASet.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, \
+# c_byte_p, c_int, c_char_p*NXPA, c_char_p*NXPA, \
+# c_int]
+
+## when you know, you will send strings only, use this one to avoid conversions
+
+XPASet = libxpa.XPASet
+libxpa.XPASet.restype = c_int
+libxpa.XPASet.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, \
+ c_char_p, c_int, c_byte_p*NXPA, c_byte_p*NXPA, \
+ c_int]
+
+## int XPAInfo(XPA xpa,
+## char *template, char *paramlist, char *mode,
+## char **names, char **messages, int n);
+
+XPAInfo = libxpa.XPAInfo
+libxpa.XPAInfo.restype = c_int
+libxpa.XPAInfo.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, \
+ c_byte_p*NXPA, c_byte_p*NXPA, \
+ c_int]
+
+## int XPAAccess(XPA xpa,
+## char *template, char *paramlist, char *mode,
+## char **names, char **messages, int n);
+
+XPAAccess = libxpa.XPAAccess
+libxpa.XPAAccess.restype = c_int
+libxpa.XPAAccess.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, \
+ c_byte_p*NXPA, c_byte_p*NXPA, \
+ c_int]
+
+#del libxpa
+
+
+
+
diff --git a/xpa/python/PythonXPA/client/test_XPAC.py b/xpa/python/PythonXPA/client/test_XPAC.py
new file mode 100755
index 0000000..7111a98
--- /dev/null
+++ b/xpa/python/PythonXPA/client/test_XPAC.py
@@ -0,0 +1,104 @@
+#!/usr/bin/python
+
+################################
+####### testing: #
+####### please start "stest" #
+################################
+
+from XPAClient import *
+
+import time
+
+xpa = None
+print xpa
+xpa = XPAOpen("ack=false")
+############################
+print xpa
+
+buff_t = c_byte_p*NXPA
+
+buffs = buff_t()
+names = buff_t()
+messages = buff_t()
+
+
+
+int_t = c_int*NXPA
+lens = int_t()
+
+ok = True
+while ok:
+ #ok = False
+ print "--- XPAGet Test 1"
+ n = XPAGet(xpa, "XPA:xpa*", "help", "ack=false", buffs, lens, names, \
+ messages, NXPA)
+ #########################################################################
+ print n
+ print buffs[0][0:lens[0]], "\n", buffs[1][0:lens[1]]
+ print string_at(names[0]), string_at(names[1])
+
+ for j in range(n):
+ ascii_text = string_at(buffs[j], lens[j])
+ #ascii_text = ''.join(chr(buffs[j][i]) for i in range(0, lens[j]))
+ print "---", j, "---\n>>>\n" + ascii_text + "\n<<<"
+
+ free_bufs(buffs, NXPA)
+ free_bufs(names, NXPA)
+ free_bufs(messages, NXPA)
+
+
+ print "--- XPASet Test 1"
+ txt = "Hey, you!"
+ n = XPASet(xpa, "XPA:xpa", "", "", txt, len(txt), names, messages, NXPA)
+ #########################################################################
+ print string_at(names[0])
+ free_bufs(names, NXPA)
+ free_bufs(messages, NXPA)
+
+ #sys.exit(0)
+
+ print "--- XPAGet Test 2"
+ n = XPAGet(xpa, "XPA:xpa[12]", "error", "", buffs, lens, names, \
+ messages, NXPA)
+ #########################################################################
+ print n
+ print ":::: "
+ if names[0]:
+ print "\t names[0]: ", string_at(names[0])
+ if names[1]:
+ print "\t names[1]: ", string_at(names[1])
+
+ free_bufs(buffs, NXPA)
+ free_bufs(names, NXPA)
+ free_bufs(messages, NXPA)
+
+ print "--- XPAInfo Test 1"
+ n = XPAInfo(xpa, "XPA:i_xpa", "hey there", "", names, messages, NXPA)
+ #########################################################################
+ print n
+ if names:
+ free_bufs(names, NXPA)
+ if messages:
+ free_bufs(messages, NXPA)
+
+ n = XPAAccess(xpa, "XPA:*", "", "", names, messages, NXPA)
+ #########################################################################
+ print n
+ for j in range(n):
+ print "access ---", j, "--- >>>" + string_at(names[j]) + "<<<"
+
+ if names:
+ free_bufs(names, NXPA)
+ if messages:
+ free_bufs(messages, NXPA)
+
+
+
+ #time.sleep(0.010)
+
+
+
+XPAClose(xpa)
+###############
+xpa = None
+
diff --git a/xpa/python/PythonXPA/server/XPAServer.py b/xpa/python/PythonXPA/server/XPAServer.py
new file mode 100644
index 0000000..4c46ed3
--- /dev/null
+++ b/xpa/python/PythonXPA/server/XPAServer.py
@@ -0,0 +1,125 @@
+
+from ctypes import *
+
+x=cdll.LoadLibrary("../libxpa.so.1.0")
+
+
+c_byte_p = POINTER(c_byte)
+
+
+## int XPAPoll(int msec, int maxreq);
+
+XPAPoll = x.XPAPoll
+x.XPAPoll.restype = c_int
+x.XPAPoll.argtypes = [c_int, c_int]
+
+
+
+
+## typedef struct xparec{
+## /* xpa version */
+## char *version;
+## /* status of this xpa */
+## int status;
+## /* "g", "s", "i" are server types; "c" for client */
+## char *type;
+## /*
+## * THE SERVER SIDE
+## */
+## struct xparec *next;
+## char *xclass;
+## char *name;
+## char *help;
+
+##
+## /* and so on */
+##
+## } *XPA;
+
+# ctypes wrapper
+class xparec(Structure):
+ _fields_ = [ ("version", c_char_p), \
+ ("status", c_int), \
+ ("type", c_char_p), \
+ ("next", c_void_p), \
+ ("xclass", c_char_p), \
+ ("name", c_char_p), \
+ ("help", c_char_p) \
+ ]
+
+XPA = POINTER(xparec)
+
+
+
+
+
+## int XPAFree(XPA xpa);
+
+XPAFree = x.XPAFree
+x.XPAFree.restype = c_void_p
+x.XPAFree.argtypes = [XPA]
+
+
+
+## info callback:
+## int info_cb(void *info_data, void *call_data, char *paramlist)
+
+
+
+INFOCBFUNC = CFUNCTYPE(c_int, c_char_p, XPA, c_char_p)
+
+### implement like this:
+#def py_infocb_func(a, b, c):
+# print "py_cmp_func", a, b
+# return 0
+#
+#infocb_func = INFOCBFUNC(py_infocb_func)
+
+
+
+## XPA XPAInfoNew(char *class, char *name,
+## int (*info_callback)(),
+## void *info_data, char *info_mode);
+
+XPAInfoNew = x.XPAInfoNew
+x.XPAInfoNew.restype = XPA
+x.XPAInfoNew.argtypes = [c_char_p, c_char_p, INFOCBFUNC, c_char_p, c_char_p]
+
+
+
+
+
+## int send_callback(void *send_data, void *call_data,
+## char *paramlist, char **buf, int *len)
+
+SENDCBFUNC = CFUNCTYPE(c_int, c_char_p, XPA, c_char_p, POINTER(c_byte_p), POINTER(c_int))
+
+### implement like this:
+#def py_sendcb_func(a, data, call_data, param, buf, len):
+# print "inside send_callback"
+# return 0
+#
+#sendcb_func = SENDCBFUNC(py_sendcb_func)
+
+
+## int receive_callback(void *receive_data, void *call_data,
+## char *paramlist, char *buf, int len)
+
+RCVCBFUNC = CFUNCTYPE(c_int, c_char_p, XPA, c_char_p, c_byte_p, c_int)
+
+
+
+## XPA XPANew(char *class, char *name, char *help,
+## int (*send_callback)(),
+## void *send_data, char *send_mode,
+## int (*rec_callback)(),
+## void *rec_data, char *rec_mode);
+
+XPANew = x.XPANew
+x.XPANew.restype = XPA
+x.XPANew.argtypes = [c_char_p, c_char_p, c_char_p, \
+ SENDCBFUNC, \
+ c_char_p, c_char_p, \
+ RCVCBFUNC, \
+ c_char_p, c_char_p \
+ ]
diff --git a/xpa/python/PythonXPA/server/test_XPAS.py b/xpa/python/PythonXPA/server/test_XPAS.py
new file mode 100755
index 0000000..fe4f1a2
--- /dev/null
+++ b/xpa/python/PythonXPA/server/test_XPAS.py
@@ -0,0 +1,91 @@
+#!/usr/bin/python
+
+################################
+####### testing: #
+################################
+
+from XPAServer import *
+
+import time
+
+print "Poll"
+n = x.XPAPoll(10000, 0)
+print "got %d" % n
+
+EXIT_FLAG = False
+
+
+##############################
+# setting up Info Point #
+##############################
+
+def py_infocb_func(a, b, c):
+ global EXIT_FLAG
+ print ">>> INFO:"
+ print "params:", c
+ print "info:", a
+ print "XPA.name", b[0].name
+ print "<<<"
+ if (c[0:4] == "exit"):
+ print "setting EXIT_FLAG"
+ EXIT_FLAG = True
+ return 0
+
+infocb_func = INFOCBFUNC(py_infocb_func)
+
+xpa = XPAInfoNew("XPA", "i_test", infocb_func, "my info", "")
+
+##############################
+
+
+
+
+#######################################
+# setting up AccessPoint Get/Set #
+#######################################
+
+def py_sendcb_func(data, call_data, param, buf, len):
+ print "inside send_callback"
+ print "param:", param
+ print "buf:", string_at(buf)
+ buf[0] = cast("this is test only\n", c_byte_p)
+ len[0] = 19
+ return 0
+
+sendcb_func = SENDCBFUNC(py_sendcb_func)
+
+
+def py_rcvcb_func(data, call_data, param, buf, len):
+ print "inside rcv_callback"
+ print "param:", param
+ print "got %d bytes" % len
+ print "buf:", string_at(buf)
+ return 0
+
+rcvcb_func = RCVCBFUNC(py_rcvcb_func)
+
+
+xpa2 = XPANew("XPA", "myxpa", "this is great help",
+ sendcb_func,
+ "SEND_DATA", "freebuf=false",
+ rcvcb_func,
+ "", "")
+
+
+
+
+
+##############################
+print "Entering loop"
+
+while EXIT_FLAG == False:
+ n = XPAPoll(1000, 1)
+ print "got:", n
+
+print "loop finished"
+##############################
+
+
+print "calling XPAFree"
+XPAFree(xpa)
+XPAFree(xpa2)