diff options
Diffstat (limited to 'src/uscxml/plugins/invoker/umundo')
-rw-r--r-- | src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp | 120 | ||||
-rw-r--r-- | src/uscxml/plugins/invoker/umundo/UmundoInvoker.h | 11 |
2 files changed, 63 insertions, 68 deletions
diff --git a/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp b/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp index f81f47f..a09f3af 100644 --- a/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp +++ b/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp @@ -37,19 +37,20 @@ bool pluginConnect(pluma::Host& host) { } #endif -UmundoInvoker::UmundoInvoker() : _pub(NULL), _sub(NULL) { +UmundoInvoker::UmundoInvoker() : _pub(NULL), _sub(NULL), _node(NULL) { } UmundoInvoker::~UmundoInvoker() { if (_node) { -// if (_sub) { -// _node->removeSubscriber(*_sub); -// delete _sub; -// } -// if (_pub) { -// _node->removePublisher(*_pub); -// delete _pub; -// } + if (_sub) { + _node->removeSubscriber(*_sub); + delete _sub; + } + if (_pub) { + _node->removePublisher(*_pub); + delete _pub; + } + delete(_node); } }; @@ -172,33 +173,40 @@ void UmundoInvoker::invoke(const InvokeRequest& req) { if (req.params.find("domain") != req.params.end()) { domain = req.params.find("domain")->second.atom; } - _node = getNode(_interpreter, domain); + _node = new umundo::Node(); + umundo::MDNSDiscoveryOptions discOpts; + _discovery = new umundo::Discovery(umundo::Discovery::MDNS, &discOpts); + + _discovery->add(*_node); + // add type from .proto or .desc files - if (req.params.find("type") != req.params.end()) { - std::pair<InvokeRequest::params_t::const_iterator, InvokeRequest::params_t::const_iterator> typeRange = req.params.equal_range("types"); - for (InvokeRequest::params_t::const_iterator it = typeRange.first; it != typeRange.second; it++) { - URL typeURI(it->second.atom); - if (typeURI.toAbsolute(_interpreter->getBaseURI())) { - std::string filename = typeURI.asLocalFile(".proto"); - umundo::PBSerializer::addProto(filename); - } else { - LOG(ERROR) << "umundo invoker has relative type src but nor baseURI set with interpreter."; - } + std::list<std::string> type; + Event::getParam(req.params, "type", type); + std::list<std::string>::const_iterator typeIter = type.begin(); + while(typeIter != type.end()) { + URL typeURI(*typeIter); + if (typeURI.toAbsolute(_interpreter->getBaseURI())) { + std::string filename = typeURI.asLocalFile(".proto"); + umundo::PBSerializer::addProto(filename); + } else { + LOG(ERROR) << "umundo invoker has relative type src but nor baseURI set with interpreter."; } + typeIter++; } - + // add directory with .proto or .desc files - if (req.params.find("types") != req.params.end()) { - std::pair<InvokeRequest::params_t::const_iterator, InvokeRequest::params_t::const_iterator> typeRange = req.params.equal_range("types"); - for (InvokeRequest::params_t::const_iterator it = typeRange.first; it != typeRange.second; it++) { - URL typeURI(it->second.atom); - if (typeURI.toAbsolute(_interpreter->getBaseURI()) && typeURI.scheme().compare("file") == 0) { - umundo::PBSerializer::addProto(typeURI.path()); - } else { - LOG(ERROR) << "invoke element has relative src URI with no baseURI set."; - } + std::list<std::string> types; + Event::getParam(req.params, "type", types); + std::list<std::string>::const_iterator typesIter = types.begin(); + while(typesIter != types.end()) { + URL typeURI(*typesIter); + if (typeURI.toAbsolute(_interpreter->getBaseURI())) { + umundo::PBSerializer::addProto(typeURI.path()); + } else { + LOG(ERROR) << "invoke element has relative src URI with no baseURI set."; } + typesIter++; } if (!_isService) { @@ -220,23 +228,21 @@ void UmundoInvoker::invoke(const InvokeRequest& req) { } } -void UmundoInvoker::welcome(umundo::TypedPublisher pub, const std::string& nodeId, const std::string& subId) { +void UmundoInvoker::welcome(umundo::TypedPublisher atPub, const umundo::SubscriberStub& sub) { Event event; event.name = "umundo.sub.added"; - event.data.compound["nodeId"] = Data(nodeId, Data::VERBATIM); - event.data.compound["subId"] = Data(subId, Data::VERBATIM); - event.data.compound["channel"] = Data(pub.getChannelName(), Data::VERBATIM); - event.data.compound["totalSubs"] = Data(toStr(pub.waitForSubscribers(0)), Data::VERBATIM); + event.data.compound["subId"] = Data(sub.getUUID(), Data::VERBATIM); + event.data.compound["channel"] = Data(atPub.getChannelName(), Data::VERBATIM); + event.data.compound["totalSubs"] = Data(toStr(atPub.waitForSubscribers(0)), Data::VERBATIM); returnEvent(event); } -void UmundoInvoker::farewell(umundo::TypedPublisher pub, const std::string& nodeId, const std::string& subId) { +void UmundoInvoker::farewell(umundo::TypedPublisher fromPub, const umundo::SubscriberStub& sub) { Event event; event.name = "umundo.sub.removed"; - event.data.compound["nodeId"] = Data(nodeId, Data::VERBATIM); - event.data.compound["subId"] = Data(subId, Data::VERBATIM); - event.data.compound["channel"] = Data(pub.getChannelName(), Data::VERBATIM); - event.data.compound["totalSubs"] = Data(toStr(pub.waitForSubscribers(0)), Data::VERBATIM); + event.data.compound["subId"] = Data(sub.getUUID(), Data::VERBATIM); + event.data.compound["channel"] = Data(fromPub.getChannelName(), Data::VERBATIM); + event.data.compound["totalSubs"] = Data(toStr(fromPub.waitForSubscribers(0)), Data::VERBATIM); returnEvent(event); } @@ -253,9 +259,6 @@ void UmundoInvoker::receive(void* object, umundo::Message* msg) { event.origintype = "umundo"; event.eventType = Event::EXTERNAL; -// if (msg->getMeta().find("um.s11n.type") != msg->getMeta().end()) -// event.compound["class"] = msg->getMeta("um.s11n.type"); - if (object != NULL) { if (msg->getMeta().find("um.s11n.type") != msg->getMeta().end() && boost::equals(msg->getMeta().find("um.s11n.type")->second, "JSON")) { @@ -276,6 +279,10 @@ void UmundoInvoker::receive(void* object, umundo::Message* msg) { metaIter++; } + if (msg->size() > 0) { + event.data.compound["protobuf"] = Data(msg->data(), msg->size(), "application/x-protobuf"); + } + returnEvent(event); } @@ -330,23 +337,6 @@ void UmundoInvoker::removed(umundo::ServiceDescription desc) { void UmundoInvoker::changed(umundo::ServiceDescription desc) { } -std::multimap<std::string, std::pair<std::string, boost::weak_ptr<umundo::Node> > > UmundoInvoker::_nodes; -boost::shared_ptr<umundo::Node> UmundoInvoker::getNode(InterpreterImpl* interpreter, const std::string& domain) { - std::pair<_nodes_t::iterator, _nodes_t::iterator> range = _nodes.equal_range(interpreter->getName()); - for (_nodes_t::iterator it = range.first; it != range.second; it++) { - if (it->second.first.compare(domain) == 0) { - boost::shared_ptr<umundo::Node> node = it->second.second.lock(); - if (node) - return node; - } - } - // create a new node - boost::shared_ptr<umundo::Node> node = boost::shared_ptr<umundo::Node>(new umundo::Node(domain)); - std::pair<std::string, std::pair<std::string, boost::weak_ptr<umundo::Node> > > pair = std::make_pair(interpreter->getName(), std::make_pair(domain, node)); - _nodes.insert(pair); - return node; -} - bool UmundoInvoker::jsonbufToData(Data& data, const JSONProto& json) { if (json.compound_size() > 0) { if (json.compound(0).key().size() > 0) { @@ -418,7 +408,15 @@ bool UmundoInvoker::protobufToData(Data& data, const google::protobuf::Message& } break; case google::protobuf::FieldDescriptor::TYPE_ENUM: - LOG(ERROR) << "TYPE_ENUM is unimplemented" << std::endl; + if (fieldDesc->is_repeated()) { + for (int j = 0; j < reflect->FieldSize(msg, fieldDesc); j++) { + const google::protobuf::EnumValueDescriptor* enumDesc = reflect->GetRepeatedEnum(msg, fieldDesc, j); + data.compound[key].array.push_back(Data(toStr(enumDesc->name()), Data::VERBATIM)); + } + } else { + const google::protobuf::EnumValueDescriptor* enumDesc = reflect->GetEnum(msg, fieldDesc); + data.compound[key] = Data(enumDesc->name(), Data::VERBATIM); + } break; case google::protobuf::FieldDescriptor::TYPE_FIXED32: case google::protobuf::FieldDescriptor::TYPE_UINT32: diff --git a/src/uscxml/plugins/invoker/umundo/UmundoInvoker.h b/src/uscxml/plugins/invoker/umundo/UmundoInvoker.h index 4b28bfe..9c64886 100644 --- a/src/uscxml/plugins/invoker/umundo/UmundoInvoker.h +++ b/src/uscxml/plugins/invoker/umundo/UmundoInvoker.h @@ -62,8 +62,8 @@ public: virtual void removed(umundo::ServiceDescription); virtual void changed(umundo::ServiceDescription); - virtual void welcome(umundo::TypedPublisher, const std::string& nodeId, const std::string& subId); - virtual void farewell(umundo::TypedPublisher, const std::string& nodeId, const std::string& subId); + virtual void welcome(umundo::TypedPublisher atPub, const umundo::SubscriberStub& sub); + virtual void farewell(umundo::TypedPublisher fromPub, const umundo::SubscriberStub& sub); protected: bool _isService; @@ -74,17 +74,14 @@ protected: bool jsonbufToData(Data& data, const JSONProto& json); bool protobufToData(Data& data, const google::protobuf::Message& msg); - boost::shared_ptr<umundo::Node> _node; + umundo::Node* _node; + umundo::Discovery* _discovery; umundo::TypedPublisher* _pub; umundo::TypedSubscriber* _sub; umundo::ServiceFilter* _svcFilter; umundo::ServiceManager* _svcMgr; std::map<umundo::ServiceDescription, umundo::ServiceStub*> _svcs; - - static std::multimap<std::string, std::pair<std::string, boost::weak_ptr<umundo::Node> > > _nodes; - typedef std::multimap<std::string, std::pair<std::string, boost::weak_ptr<umundo::Node> > > _nodes_t; - static boost::shared_ptr<umundo::Node> getNode(InterpreterImpl* interpreter, const std::string& domain); }; #ifdef BUILD_AS_PLUGINS |