diff options
author | Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de> | 2013-07-04 19:46:07 (GMT) |
---|---|---|
committer | Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de> | 2013-07-04 19:46:07 (GMT) |
commit | 7bd0256239f247ed01ee6c673e31283c794bb3d0 (patch) | |
tree | a8c9b03374d995c6fd4b23cac2f5be282344bba3 /src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp | |
parent | 096f7df1137dd62871cacd371bf023e39d6b30e5 (diff) | |
download | uscxml-7bd0256239f247ed01ee6c673e31283c794bb3d0.zip uscxml-7bd0256239f247ed01ee6c673e31283c794bb3d0.tar.gz uscxml-7bd0256239f247ed01ee6c673e31283c794bb3d0.tar.bz2 |
Reactiveated umundo invoker
Diffstat (limited to 'src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp')
-rw-r--r-- | src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp | 228 |
1 files changed, 174 insertions, 54 deletions
diff --git a/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp b/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp index 641d713..2b3377e 100644 --- a/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp +++ b/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp @@ -16,15 +16,19 @@ bool connect(pluma::Host& host) { } #endif -UmundoInvoker::UmundoInvoker() : _node(NULL), _pub(NULL), _sub(NULL) { +UmundoInvoker::UmundoInvoker() : _pub(NULL), _sub(NULL) { } UmundoInvoker::~UmundoInvoker() { if (_node) { - if (_sub) + if (_sub) { _node->removeSubscriber(*_sub); - if (_pub) + delete _sub; + } + if (_pub) { _node->removePublisher(*_pub); + delete _pub; + } } }; @@ -40,64 +44,88 @@ Data UmundoInvoker::getDataModelVariables() { } void UmundoInvoker::send(const SendRequest& req) { - umundo::Message* msg = new umundo::Message(); + umundo::Message msg; if (req.name.length() > 0) { - msg->putMeta("event", req.name); + msg.putMeta("event", req.name); } else { - msg->putMeta("event", "umundo"); + msg.putMeta("event", "umundo"); } - if (req.params.find("type") != req.params.end()) { - // assume JSON in content to transform to protobuf object - if (req.content.length() > 0 && _interpreter->getDataModel()) { - std::string type; - std::multimap<std::string, std::string>::const_iterator typeIter = req.params.find("type"); - if (typeIter != req.params.end()) - type = typeIter->second; - const google::protobuf::Message* protoMsg = umundo::PBSerializer::getProto(type); - if (protoMsg == NULL) { - LOG(ERROR) << "No type " << type << " is known, pass a directory with proto .desc files via types param when invoking"; + if (req.content.length()) { + try { + Data data = _interpreter->getDataModel().getStringAsData(req.content); + if (!data) { + LOG(ERROR) << "Cannot transform content to data object per datamodel"; return; } - try { - Data data = _interpreter->getDataModel().getStringAsData(req.content); + + std::string type; + if (req.params.find("type") != req.params.end()) { + // we are supposed to build a typed object + type = req.params.find("type")->second; + + const google::protobuf::Message* protoMsg = umundo::PBSerializer::getProto(type); + if (protoMsg == NULL) { + LOG(ERROR) << "No type '" << type << "' is known, pass a directory with proto .desc files via types param when invoking"; + return; + } + google::protobuf::Message* pbMsg = protoMsg->New(); if (!dataToProtobuf(pbMsg, data)) { LOG(ERROR) << "Cannot create message from JSON - not sending"; - } else { + return; + } + + if (!_isService) { // add all s11n properties - if (!_isService) { - _pub->prepareMsg(msg, type, pbMsg); - _pub->send(msg); - } else { - std::map<umundo::ServiceDescription, umundo::ServiceStub*>::iterator svcIter = _svcs.begin(); - while(svcIter != _svcs.end()) { - umundo::ServiceStub* stub = svcIter->second; - Event event; - void* rv = NULL; - stub->callStubMethod(req.name, pbMsg, type, rv, ""); - protobufToData(event.data, *(const google::protobuf::Message*)rv); - - event.name = _invokeId + ".reply." + req.name; - event.origin = msg->getMeta("um.channel"); - event.origintype = "umundo"; - event.type = Event::EXTERNAL; - - returnEvent(event); - svcIter++; - } + _pub->prepareMsg(&msg, type, pbMsg); + _pub->send(&msg); + } else { + // invoke as service + std::map<umundo::ServiceDescription, umundo::ServiceStub*>::iterator svcIter = _svcs.begin(); + while(svcIter != _svcs.end()) { + umundo::ServiceStub* stub = svcIter->second; + Event event; + void* rv = NULL; + stub->callStubMethod(req.name, pbMsg, type, rv, ""); + protobufToData(event.data, *(const google::protobuf::Message*)rv); + + event.name = _invokeId + ".reply." + req.name; + event.origin = msg.getMeta("um.channel"); + event.origintype = "umundo"; + event.type = Event::EXTERNAL; + + returnEvent(event); + svcIter++; } } - } catch (Event e) { - LOG(ERROR) << "Syntax error when invoking umundo:" << std::endl << e << std::endl; - return; + } else { + // just encode JSON + JSONProto* jsonProtoMsg = new JSONProto(); + if (!dataToJSONbuf(jsonProtoMsg, data)) { + LOG(ERROR) << "Cannot create message from JSON - not sending"; + return; + } + + if (!_isService) { + // add all s11n properties + _pub->prepareMsg(&msg, "JSON", jsonProtoMsg); + _pub->send(&msg); + } else { + LOG(ERROR) << "Cannot invoke services with untyped JSON"; + return; + } + } - } else { - LOG(ERROR) << "Required JSON object in content" << std::endl; + } catch (Event e) { + LOG(ERROR) << "Syntax error when invoking umundo:" << std::endl << e << std::endl; return; } - } + } else { + LOG(ERROR) << "Required JSON object in content" << std::endl; + return; + } } void UmundoInvoker::cancel(const std::string sendId) { @@ -157,6 +185,9 @@ void UmundoInvoker::invoke(const InvokeRequest& req) { _pub = new umundo::TypedPublisher(channelName); _sub = new umundo::TypedSubscriber(channelName, this); + _pub->setGreeter(this); + _sub->registerType("JSON", new JSONProto()); + _node->addPublisher(*_pub); _node->addSubscriber(*_sub); @@ -168,6 +199,26 @@ void UmundoInvoker::invoke(const InvokeRequest& req) { } } +void UmundoInvoker::welcome(umundo::TypedPublisher pub, const std::string& nodeId, const std::string& subId) { + Event event; + event.name = "umundo.sub.added"; + event.data.compound["nodeId"] = Data(nodeId, Data::VERBATIM); + event.data.compound["subId"] = Data(subId, Data::VERBATIM); + event.data.compound["channel"] = Data(pub.getChannelName(), Data::VERBATIM); + event.data.compound["totalSubs"] = Data(toStr(pub.waitForSubscribers(0)), Data::VERBATIM); + returnEvent(event); +} + +void UmundoInvoker::farewell(umundo::TypedPublisher pub, const std::string& nodeId, const std::string& subId) { + Event event; + event.name = "umundo.sub.removed"; + event.data.compound["nodeId"] = Data(nodeId, Data::VERBATIM); + event.data.compound["subId"] = Data(subId, Data::VERBATIM); + event.data.compound["channel"] = Data(pub.getChannelName(), Data::VERBATIM); + event.data.compound["totalSubs"] = Data(toStr(pub.waitForSubscribers(0)), Data::VERBATIM); + returnEvent(event); +} + void UmundoInvoker::receive(void* object, umundo::Message* msg) { uscxml::Event event; if (msg->getMeta().find("event") != msg->getMeta().end()) { @@ -184,14 +235,19 @@ void UmundoInvoker::receive(void* object, umundo::Message* msg) { // if (msg->getMeta().find("um.s11n.type") != msg->getMeta().end()) // event.compound["class"] = msg->getMeta("um.s11n.type"); - if (object != NULL) - protobufToData(event.data, *(const google::protobuf::Message*)object); + if (object != NULL) { + if (msg->getMeta().find("um.s11n.type") != msg->getMeta().end() && + boost::equals(msg->getMeta().find("um.s11n.type")->second, "JSON")) { + jsonbufToData(event.data, *(JSONProto*)object); + } else { + protobufToData(event.data, *(const google::protobuf::Message*)object); + } + } // get meta fields into event std::map<std::string, std::string>::const_iterator metaIter = msg->getMeta().begin(); while(metaIter != msg->getMeta().end()) { - if (metaIter->first.substr(0,3).compare("um.") != 0) - event.data.compound[metaIter->first] = Data(metaIter->second, Data::VERBATIM); + event.data.compound[metaIter->first] = Data(metaIter->second, Data::VERBATIM); metaIter++; } @@ -249,19 +305,46 @@ void UmundoInvoker::removed(umundo::ServiceDescription desc) { void UmundoInvoker::changed(umundo::ServiceDescription desc) { } -std::multimap<std::string, std::pair<std::string, umundo::Node*> > UmundoInvoker::_nodes; -umundo::Node* UmundoInvoker::getNode(InterpreterImpl* interpreter, const std::string& domain) { +std::multimap<std::string, std::pair<std::string, boost::weak_ptr<umundo::Node> > > UmundoInvoker::_nodes; +boost::shared_ptr<umundo::Node> UmundoInvoker::getNode(InterpreterImpl* interpreter, const std::string& domain) { std::pair<_nodes_t::iterator, _nodes_t::iterator> range = _nodes.equal_range(interpreter->getName()); for (_nodes_t::iterator it = range.first; it != range.second; it++) { if (it->second.first.compare(domain) == 0) - return it->second.second; + return it->second.second.lock(); } - umundo::Node* node = new umundo::Node(domain); - std::pair<std::string, std::pair<std::string, umundo::Node*> > pair = std::make_pair(interpreter->getName(), std::make_pair(domain, node)); + boost::shared_ptr<umundo::Node> node = boost::shared_ptr<umundo::Node>(new umundo::Node(domain)); + std::pair<std::string, std::pair<std::string, boost::weak_ptr<umundo::Node> > > pair = std::make_pair(interpreter->getName(), std::make_pair(domain, node)); _nodes.insert(pair); return node; } +bool UmundoInvoker::jsonbufToData(Data& data, const JSONProto& json) { + if (json.compound_size() > 0) { + if (json.compound(0).key().size() > 0) { + // compound + for (int i = 0; i < json.compound_size(); i++) { + jsonbufToData(data.compound[json.compound(i).key()], json.compound(i)); + } + } else { + // array + for (int i = 0; i < json.compound_size(); i++) { + Data arrayData; + data.array.push_back(arrayData); + jsonbufToData(data.array.back(), json.compound(i)); + } + } + } else if (json.atom().size() > 0) { + data.atom = json.atom(); + if (json.verbatim()) { + data.type = Data::VERBATIM; + } else { + data.type = Data::INTERPRETED; + } + } + + return true; +} + bool UmundoInvoker::protobufToData(Data& data, const google::protobuf::Message& msg) { const google::protobuf::Descriptor* desc = msg.GetDescriptor(); const google::protobuf::Reflection* reflect = msg.GetReflection(); @@ -377,6 +460,43 @@ bool UmundoInvoker::protobufToData(Data& data, const google::protobuf::Message& return true; } +bool UmundoInvoker::dataToJSONbuf(JSONProto* msg, Data& data) { + const google::protobuf::Descriptor* desc = msg->GetDescriptor(); + const google::protobuf::Reflection* reflect = msg->GetReflection(); + + if (!data.compound.empty()) { + const google::protobuf::FieldDescriptor* fieldDesc = desc->FindFieldByName("compound"); + + std::map<std::string, Data>::iterator compoundIter = data.compound.begin(); + while(compoundIter != data.compound.end()) { + JSONProto* compoundMsg = (JSONProto*)reflect->AddMessage(msg, fieldDesc); + dataToJSONbuf(compoundMsg, compoundIter->second); + compoundMsg->set_key(compoundIter->first); + compoundIter++; + } + } else if (!data.array.empty()) { + const google::protobuf::FieldDescriptor* fieldDesc = desc->FindFieldByName("compound"); + + std::list<Data>::iterator arrayIter = data.array.begin(); + while(arrayIter != data.array.end()) { + JSONProto* arrayMsg = (JSONProto*)reflect->AddMessage(msg, fieldDesc); + dataToJSONbuf(arrayMsg, *arrayIter); + arrayIter++; + } + } else if (!data.atom.empty()) { + const google::protobuf::FieldDescriptor* atomDesc = desc->FindFieldByName("atom"); + const google::protobuf::FieldDescriptor* verbDesc = desc->FindFieldByName("verbatim"); + + if (data.type == Data::VERBATIM) { + reflect->SetBool(msg, verbDesc, true); + } else { + reflect->SetBool(msg, verbDesc, false); + } + reflect->SetString(msg, atomDesc, data.atom); + } + return true; +} + bool UmundoInvoker::dataToProtobuf(google::protobuf::Message* msg, Data& data) { const google::protobuf::Descriptor* desc = msg->GetDescriptor(); const google::protobuf::Reflection* reflect = msg->GetReflection(); |