summaryrefslogtreecommitdiffstats
path: root/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp
diff options
context:
space:
mode:
authorStefan Radomski <radomski@tk.informatik.tu-darmstadt.de>2013-07-04 19:46:07 (GMT)
committerStefan Radomski <radomski@tk.informatik.tu-darmstadt.de>2013-07-04 19:46:07 (GMT)
commit7bd0256239f247ed01ee6c673e31283c794bb3d0 (patch)
treea8c9b03374d995c6fd4b23cac2f5be282344bba3 /src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp
parent096f7df1137dd62871cacd371bf023e39d6b30e5 (diff)
downloaduscxml-7bd0256239f247ed01ee6c673e31283c794bb3d0.zip
uscxml-7bd0256239f247ed01ee6c673e31283c794bb3d0.tar.gz
uscxml-7bd0256239f247ed01ee6c673e31283c794bb3d0.tar.bz2
Reactiveated umundo invoker
Diffstat (limited to 'src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp')
-rw-r--r--src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp228
1 files changed, 174 insertions, 54 deletions
diff --git a/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp b/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp
index 641d713..2b3377e 100644
--- a/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp
+++ b/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp
@@ -16,15 +16,19 @@ bool connect(pluma::Host& host) {
}
#endif
-UmundoInvoker::UmundoInvoker() : _node(NULL), _pub(NULL), _sub(NULL) {
+UmundoInvoker::UmundoInvoker() : _pub(NULL), _sub(NULL) {
}
UmundoInvoker::~UmundoInvoker() {
if (_node) {
- if (_sub)
+ if (_sub) {
_node->removeSubscriber(*_sub);
- if (_pub)
+ delete _sub;
+ }
+ if (_pub) {
_node->removePublisher(*_pub);
+ delete _pub;
+ }
}
};
@@ -40,64 +44,88 @@ Data UmundoInvoker::getDataModelVariables() {
}
void UmundoInvoker::send(const SendRequest& req) {
- umundo::Message* msg = new umundo::Message();
+ umundo::Message msg;
if (req.name.length() > 0) {
- msg->putMeta("event", req.name);
+ msg.putMeta("event", req.name);
} else {
- msg->putMeta("event", "umundo");
+ msg.putMeta("event", "umundo");
}
- if (req.params.find("type") != req.params.end()) {
- // assume JSON in content to transform to protobuf object
- if (req.content.length() > 0 && _interpreter->getDataModel()) {
- std::string type;
- std::multimap<std::string, std::string>::const_iterator typeIter = req.params.find("type");
- if (typeIter != req.params.end())
- type = typeIter->second;
- const google::protobuf::Message* protoMsg = umundo::PBSerializer::getProto(type);
- if (protoMsg == NULL) {
- LOG(ERROR) << "No type " << type << " is known, pass a directory with proto .desc files via types param when invoking";
+ if (req.content.length()) {
+ try {
+ Data data = _interpreter->getDataModel().getStringAsData(req.content);
+ if (!data) {
+ LOG(ERROR) << "Cannot transform content to data object per datamodel";
return;
}
- try {
- Data data = _interpreter->getDataModel().getStringAsData(req.content);
+
+ std::string type;
+ if (req.params.find("type") != req.params.end()) {
+ // we are supposed to build a typed object
+ type = req.params.find("type")->second;
+
+ const google::protobuf::Message* protoMsg = umundo::PBSerializer::getProto(type);
+ if (protoMsg == NULL) {
+ LOG(ERROR) << "No type '" << type << "' is known, pass a directory with proto .desc files via types param when invoking";
+ return;
+ }
+
google::protobuf::Message* pbMsg = protoMsg->New();
if (!dataToProtobuf(pbMsg, data)) {
LOG(ERROR) << "Cannot create message from JSON - not sending";
- } else {
+ return;
+ }
+
+ if (!_isService) {
// add all s11n properties
- if (!_isService) {
- _pub->prepareMsg(msg, type, pbMsg);
- _pub->send(msg);
- } else {
- std::map<umundo::ServiceDescription, umundo::ServiceStub*>::iterator svcIter = _svcs.begin();
- while(svcIter != _svcs.end()) {
- umundo::ServiceStub* stub = svcIter->second;
- Event event;
- void* rv = NULL;
- stub->callStubMethod(req.name, pbMsg, type, rv, "");
- protobufToData(event.data, *(const google::protobuf::Message*)rv);
-
- event.name = _invokeId + ".reply." + req.name;
- event.origin = msg->getMeta("um.channel");
- event.origintype = "umundo";
- event.type = Event::EXTERNAL;
-
- returnEvent(event);
- svcIter++;
- }
+ _pub->prepareMsg(&msg, type, pbMsg);
+ _pub->send(&msg);
+ } else {
+ // invoke as service
+ std::map<umundo::ServiceDescription, umundo::ServiceStub*>::iterator svcIter = _svcs.begin();
+ while(svcIter != _svcs.end()) {
+ umundo::ServiceStub* stub = svcIter->second;
+ Event event;
+ void* rv = NULL;
+ stub->callStubMethod(req.name, pbMsg, type, rv, "");
+ protobufToData(event.data, *(const google::protobuf::Message*)rv);
+
+ event.name = _invokeId + ".reply." + req.name;
+ event.origin = msg.getMeta("um.channel");
+ event.origintype = "umundo";
+ event.type = Event::EXTERNAL;
+
+ returnEvent(event);
+ svcIter++;
}
}
- } catch (Event e) {
- LOG(ERROR) << "Syntax error when invoking umundo:" << std::endl << e << std::endl;
- return;
+ } else {
+ // just encode JSON
+ JSONProto* jsonProtoMsg = new JSONProto();
+ if (!dataToJSONbuf(jsonProtoMsg, data)) {
+ LOG(ERROR) << "Cannot create message from JSON - not sending";
+ return;
+ }
+
+ if (!_isService) {
+ // add all s11n properties
+ _pub->prepareMsg(&msg, "JSON", jsonProtoMsg);
+ _pub->send(&msg);
+ } else {
+ LOG(ERROR) << "Cannot invoke services with untyped JSON";
+ return;
+ }
+
}
- } else {
- LOG(ERROR) << "Required JSON object in content" << std::endl;
+ } catch (Event e) {
+ LOG(ERROR) << "Syntax error when invoking umundo:" << std::endl << e << std::endl;
return;
}
- }
+ } else {
+ LOG(ERROR) << "Required JSON object in content" << std::endl;
+ return;
+ }
}
void UmundoInvoker::cancel(const std::string sendId) {
@@ -157,6 +185,9 @@ void UmundoInvoker::invoke(const InvokeRequest& req) {
_pub = new umundo::TypedPublisher(channelName);
_sub = new umundo::TypedSubscriber(channelName, this);
+ _pub->setGreeter(this);
+ _sub->registerType("JSON", new JSONProto());
+
_node->addPublisher(*_pub);
_node->addSubscriber(*_sub);
@@ -168,6 +199,26 @@ void UmundoInvoker::invoke(const InvokeRequest& req) {
}
}
+void UmundoInvoker::welcome(umundo::TypedPublisher pub, const std::string& nodeId, const std::string& subId) {
+ Event event;
+ event.name = "umundo.sub.added";
+ event.data.compound["nodeId"] = Data(nodeId, Data::VERBATIM);
+ event.data.compound["subId"] = Data(subId, Data::VERBATIM);
+ event.data.compound["channel"] = Data(pub.getChannelName(), Data::VERBATIM);
+ event.data.compound["totalSubs"] = Data(toStr(pub.waitForSubscribers(0)), Data::VERBATIM);
+ returnEvent(event);
+}
+
+void UmundoInvoker::farewell(umundo::TypedPublisher pub, const std::string& nodeId, const std::string& subId) {
+ Event event;
+ event.name = "umundo.sub.removed";
+ event.data.compound["nodeId"] = Data(nodeId, Data::VERBATIM);
+ event.data.compound["subId"] = Data(subId, Data::VERBATIM);
+ event.data.compound["channel"] = Data(pub.getChannelName(), Data::VERBATIM);
+ event.data.compound["totalSubs"] = Data(toStr(pub.waitForSubscribers(0)), Data::VERBATIM);
+ returnEvent(event);
+}
+
void UmundoInvoker::receive(void* object, umundo::Message* msg) {
uscxml::Event event;
if (msg->getMeta().find("event") != msg->getMeta().end()) {
@@ -184,14 +235,19 @@ void UmundoInvoker::receive(void* object, umundo::Message* msg) {
// if (msg->getMeta().find("um.s11n.type") != msg->getMeta().end())
// event.compound["class"] = msg->getMeta("um.s11n.type");
- if (object != NULL)
- protobufToData(event.data, *(const google::protobuf::Message*)object);
+ if (object != NULL) {
+ if (msg->getMeta().find("um.s11n.type") != msg->getMeta().end() &&
+ boost::equals(msg->getMeta().find("um.s11n.type")->second, "JSON")) {
+ jsonbufToData(event.data, *(JSONProto*)object);
+ } else {
+ protobufToData(event.data, *(const google::protobuf::Message*)object);
+ }
+ }
// get meta fields into event
std::map<std::string, std::string>::const_iterator metaIter = msg->getMeta().begin();
while(metaIter != msg->getMeta().end()) {
- if (metaIter->first.substr(0,3).compare("um.") != 0)
- event.data.compound[metaIter->first] = Data(metaIter->second, Data::VERBATIM);
+ event.data.compound[metaIter->first] = Data(metaIter->second, Data::VERBATIM);
metaIter++;
}
@@ -249,19 +305,46 @@ void UmundoInvoker::removed(umundo::ServiceDescription desc) {
void UmundoInvoker::changed(umundo::ServiceDescription desc) {
}
-std::multimap<std::string, std::pair<std::string, umundo::Node*> > UmundoInvoker::_nodes;
-umundo::Node* UmundoInvoker::getNode(InterpreterImpl* interpreter, const std::string& domain) {
+std::multimap<std::string, std::pair<std::string, boost::weak_ptr<umundo::Node> > > UmundoInvoker::_nodes;
+boost::shared_ptr<umundo::Node> UmundoInvoker::getNode(InterpreterImpl* interpreter, const std::string& domain) {
std::pair<_nodes_t::iterator, _nodes_t::iterator> range = _nodes.equal_range(interpreter->getName());
for (_nodes_t::iterator it = range.first; it != range.second; it++) {
if (it->second.first.compare(domain) == 0)
- return it->second.second;
+ return it->second.second.lock();
}
- umundo::Node* node = new umundo::Node(domain);
- std::pair<std::string, std::pair<std::string, umundo::Node*> > pair = std::make_pair(interpreter->getName(), std::make_pair(domain, node));
+ boost::shared_ptr<umundo::Node> node = boost::shared_ptr<umundo::Node>(new umundo::Node(domain));
+ std::pair<std::string, std::pair<std::string, boost::weak_ptr<umundo::Node> > > pair = std::make_pair(interpreter->getName(), std::make_pair(domain, node));
_nodes.insert(pair);
return node;
}
+bool UmundoInvoker::jsonbufToData(Data& data, const JSONProto& json) {
+ if (json.compound_size() > 0) {
+ if (json.compound(0).key().size() > 0) {
+ // compound
+ for (int i = 0; i < json.compound_size(); i++) {
+ jsonbufToData(data.compound[json.compound(i).key()], json.compound(i));
+ }
+ } else {
+ // array
+ for (int i = 0; i < json.compound_size(); i++) {
+ Data arrayData;
+ data.array.push_back(arrayData);
+ jsonbufToData(data.array.back(), json.compound(i));
+ }
+ }
+ } else if (json.atom().size() > 0) {
+ data.atom = json.atom();
+ if (json.verbatim()) {
+ data.type = Data::VERBATIM;
+ } else {
+ data.type = Data::INTERPRETED;
+ }
+ }
+
+ return true;
+}
+
bool UmundoInvoker::protobufToData(Data& data, const google::protobuf::Message& msg) {
const google::protobuf::Descriptor* desc = msg.GetDescriptor();
const google::protobuf::Reflection* reflect = msg.GetReflection();
@@ -377,6 +460,43 @@ bool UmundoInvoker::protobufToData(Data& data, const google::protobuf::Message&
return true;
}
+bool UmundoInvoker::dataToJSONbuf(JSONProto* msg, Data& data) {
+ const google::protobuf::Descriptor* desc = msg->GetDescriptor();
+ const google::protobuf::Reflection* reflect = msg->GetReflection();
+
+ if (!data.compound.empty()) {
+ const google::protobuf::FieldDescriptor* fieldDesc = desc->FindFieldByName("compound");
+
+ std::map<std::string, Data>::iterator compoundIter = data.compound.begin();
+ while(compoundIter != data.compound.end()) {
+ JSONProto* compoundMsg = (JSONProto*)reflect->AddMessage(msg, fieldDesc);
+ dataToJSONbuf(compoundMsg, compoundIter->second);
+ compoundMsg->set_key(compoundIter->first);
+ compoundIter++;
+ }
+ } else if (!data.array.empty()) {
+ const google::protobuf::FieldDescriptor* fieldDesc = desc->FindFieldByName("compound");
+
+ std::list<Data>::iterator arrayIter = data.array.begin();
+ while(arrayIter != data.array.end()) {
+ JSONProto* arrayMsg = (JSONProto*)reflect->AddMessage(msg, fieldDesc);
+ dataToJSONbuf(arrayMsg, *arrayIter);
+ arrayIter++;
+ }
+ } else if (!data.atom.empty()) {
+ const google::protobuf::FieldDescriptor* atomDesc = desc->FindFieldByName("atom");
+ const google::protobuf::FieldDescriptor* verbDesc = desc->FindFieldByName("verbatim");
+
+ if (data.type == Data::VERBATIM) {
+ reflect->SetBool(msg, verbDesc, true);
+ } else {
+ reflect->SetBool(msg, verbDesc, false);
+ }
+ reflect->SetString(msg, atomDesc, data.atom);
+ }
+ return true;
+}
+
bool UmundoInvoker::dataToProtobuf(google::protobuf::Message* msg, Data& data) {
const google::protobuf::Descriptor* desc = msg->GetDescriptor();
const google::protobuf::Reflection* reflect = msg->GetReflection();