summaryrefslogtreecommitdiffstats
path: root/src/uscxml/plugins/invoker/umundo
diff options
context:
space:
mode:
Diffstat (limited to 'src/uscxml/plugins/invoker/umundo')
-rw-r--r--src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp120
-rw-r--r--src/uscxml/plugins/invoker/umundo/UmundoInvoker.h11
2 files changed, 63 insertions, 68 deletions
diff --git a/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp b/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp
index f81f47f..a09f3af 100644
--- a/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp
+++ b/src/uscxml/plugins/invoker/umundo/UmundoInvoker.cpp
@@ -37,19 +37,20 @@ bool pluginConnect(pluma::Host& host) {
}
#endif
-UmundoInvoker::UmundoInvoker() : _pub(NULL), _sub(NULL) {
+UmundoInvoker::UmundoInvoker() : _pub(NULL), _sub(NULL), _node(NULL) {
}
UmundoInvoker::~UmundoInvoker() {
if (_node) {
-// if (_sub) {
-// _node->removeSubscriber(*_sub);
-// delete _sub;
-// }
-// if (_pub) {
-// _node->removePublisher(*_pub);
-// delete _pub;
-// }
+ if (_sub) {
+ _node->removeSubscriber(*_sub);
+ delete _sub;
+ }
+ if (_pub) {
+ _node->removePublisher(*_pub);
+ delete _pub;
+ }
+ delete(_node);
}
};
@@ -172,33 +173,40 @@ void UmundoInvoker::invoke(const InvokeRequest& req) {
if (req.params.find("domain") != req.params.end()) {
domain = req.params.find("domain")->second.atom;
}
- _node = getNode(_interpreter, domain);
+ _node = new umundo::Node();
+ umundo::MDNSDiscoveryOptions discOpts;
+ _discovery = new umundo::Discovery(umundo::Discovery::MDNS, &discOpts);
+
+ _discovery->add(*_node);
+
// add type from .proto or .desc files
- if (req.params.find("type") != req.params.end()) {
- std::pair<InvokeRequest::params_t::const_iterator, InvokeRequest::params_t::const_iterator> typeRange = req.params.equal_range("types");
- for (InvokeRequest::params_t::const_iterator it = typeRange.first; it != typeRange.second; it++) {
- URL typeURI(it->second.atom);
- if (typeURI.toAbsolute(_interpreter->getBaseURI())) {
- std::string filename = typeURI.asLocalFile(".proto");
- umundo::PBSerializer::addProto(filename);
- } else {
- LOG(ERROR) << "umundo invoker has relative type src but nor baseURI set with interpreter.";
- }
+ std::list<std::string> type;
+ Event::getParam(req.params, "type", type);
+ std::list<std::string>::const_iterator typeIter = type.begin();
+ while(typeIter != type.end()) {
+ URL typeURI(*typeIter);
+ if (typeURI.toAbsolute(_interpreter->getBaseURI())) {
+ std::string filename = typeURI.asLocalFile(".proto");
+ umundo::PBSerializer::addProto(filename);
+ } else {
+ LOG(ERROR) << "umundo invoker has relative type src but nor baseURI set with interpreter.";
}
+ typeIter++;
}
-
+
// add directory with .proto or .desc files
- if (req.params.find("types") != req.params.end()) {
- std::pair<InvokeRequest::params_t::const_iterator, InvokeRequest::params_t::const_iterator> typeRange = req.params.equal_range("types");
- for (InvokeRequest::params_t::const_iterator it = typeRange.first; it != typeRange.second; it++) {
- URL typeURI(it->second.atom);
- if (typeURI.toAbsolute(_interpreter->getBaseURI()) && typeURI.scheme().compare("file") == 0) {
- umundo::PBSerializer::addProto(typeURI.path());
- } else {
- LOG(ERROR) << "invoke element has relative src URI with no baseURI set.";
- }
+ std::list<std::string> types;
+ Event::getParam(req.params, "type", types);
+ std::list<std::string>::const_iterator typesIter = types.begin();
+ while(typesIter != types.end()) {
+ URL typeURI(*typesIter);
+ if (typeURI.toAbsolute(_interpreter->getBaseURI())) {
+ umundo::PBSerializer::addProto(typeURI.path());
+ } else {
+ LOG(ERROR) << "invoke element has relative src URI with no baseURI set.";
}
+ typesIter++;
}
if (!_isService) {
@@ -220,23 +228,21 @@ void UmundoInvoker::invoke(const InvokeRequest& req) {
}
}
-void UmundoInvoker::welcome(umundo::TypedPublisher pub, const std::string& nodeId, const std::string& subId) {
+void UmundoInvoker::welcome(umundo::TypedPublisher atPub, const umundo::SubscriberStub& sub) {
Event event;
event.name = "umundo.sub.added";
- event.data.compound["nodeId"] = Data(nodeId, Data::VERBATIM);
- event.data.compound["subId"] = Data(subId, Data::VERBATIM);
- event.data.compound["channel"] = Data(pub.getChannelName(), Data::VERBATIM);
- event.data.compound["totalSubs"] = Data(toStr(pub.waitForSubscribers(0)), Data::VERBATIM);
+ event.data.compound["subId"] = Data(sub.getUUID(), Data::VERBATIM);
+ event.data.compound["channel"] = Data(atPub.getChannelName(), Data::VERBATIM);
+ event.data.compound["totalSubs"] = Data(toStr(atPub.waitForSubscribers(0)), Data::VERBATIM);
returnEvent(event);
}
-void UmundoInvoker::farewell(umundo::TypedPublisher pub, const std::string& nodeId, const std::string& subId) {
+void UmundoInvoker::farewell(umundo::TypedPublisher fromPub, const umundo::SubscriberStub& sub) {
Event event;
event.name = "umundo.sub.removed";
- event.data.compound["nodeId"] = Data(nodeId, Data::VERBATIM);
- event.data.compound["subId"] = Data(subId, Data::VERBATIM);
- event.data.compound["channel"] = Data(pub.getChannelName(), Data::VERBATIM);
- event.data.compound["totalSubs"] = Data(toStr(pub.waitForSubscribers(0)), Data::VERBATIM);
+ event.data.compound["subId"] = Data(sub.getUUID(), Data::VERBATIM);
+ event.data.compound["channel"] = Data(fromPub.getChannelName(), Data::VERBATIM);
+ event.data.compound["totalSubs"] = Data(toStr(fromPub.waitForSubscribers(0)), Data::VERBATIM);
returnEvent(event);
}
@@ -253,9 +259,6 @@ void UmundoInvoker::receive(void* object, umundo::Message* msg) {
event.origintype = "umundo";
event.eventType = Event::EXTERNAL;
-// if (msg->getMeta().find("um.s11n.type") != msg->getMeta().end())
-// event.compound["class"] = msg->getMeta("um.s11n.type");
-
if (object != NULL) {
if (msg->getMeta().find("um.s11n.type") != msg->getMeta().end() &&
boost::equals(msg->getMeta().find("um.s11n.type")->second, "JSON")) {
@@ -276,6 +279,10 @@ void UmundoInvoker::receive(void* object, umundo::Message* msg) {
metaIter++;
}
+ if (msg->size() > 0) {
+ event.data.compound["protobuf"] = Data(msg->data(), msg->size(), "application/x-protobuf");
+ }
+
returnEvent(event);
}
@@ -330,23 +337,6 @@ void UmundoInvoker::removed(umundo::ServiceDescription desc) {
void UmundoInvoker::changed(umundo::ServiceDescription desc) {
}
-std::multimap<std::string, std::pair<std::string, boost::weak_ptr<umundo::Node> > > UmundoInvoker::_nodes;
-boost::shared_ptr<umundo::Node> UmundoInvoker::getNode(InterpreterImpl* interpreter, const std::string& domain) {
- std::pair<_nodes_t::iterator, _nodes_t::iterator> range = _nodes.equal_range(interpreter->getName());
- for (_nodes_t::iterator it = range.first; it != range.second; it++) {
- if (it->second.first.compare(domain) == 0) {
- boost::shared_ptr<umundo::Node> node = it->second.second.lock();
- if (node)
- return node;
- }
- }
- // create a new node
- boost::shared_ptr<umundo::Node> node = boost::shared_ptr<umundo::Node>(new umundo::Node(domain));
- std::pair<std::string, std::pair<std::string, boost::weak_ptr<umundo::Node> > > pair = std::make_pair(interpreter->getName(), std::make_pair(domain, node));
- _nodes.insert(pair);
- return node;
-}
-
bool UmundoInvoker::jsonbufToData(Data& data, const JSONProto& json) {
if (json.compound_size() > 0) {
if (json.compound(0).key().size() > 0) {
@@ -418,7 +408,15 @@ bool UmundoInvoker::protobufToData(Data& data, const google::protobuf::Message&
}
break;
case google::protobuf::FieldDescriptor::TYPE_ENUM:
- LOG(ERROR) << "TYPE_ENUM is unimplemented" << std::endl;
+ if (fieldDesc->is_repeated()) {
+ for (int j = 0; j < reflect->FieldSize(msg, fieldDesc); j++) {
+ const google::protobuf::EnumValueDescriptor* enumDesc = reflect->GetRepeatedEnum(msg, fieldDesc, j);
+ data.compound[key].array.push_back(Data(toStr(enumDesc->name()), Data::VERBATIM));
+ }
+ } else {
+ const google::protobuf::EnumValueDescriptor* enumDesc = reflect->GetEnum(msg, fieldDesc);
+ data.compound[key] = Data(enumDesc->name(), Data::VERBATIM);
+ }
break;
case google::protobuf::FieldDescriptor::TYPE_FIXED32:
case google::protobuf::FieldDescriptor::TYPE_UINT32:
diff --git a/src/uscxml/plugins/invoker/umundo/UmundoInvoker.h b/src/uscxml/plugins/invoker/umundo/UmundoInvoker.h
index 4b28bfe..9c64886 100644
--- a/src/uscxml/plugins/invoker/umundo/UmundoInvoker.h
+++ b/src/uscxml/plugins/invoker/umundo/UmundoInvoker.h
@@ -62,8 +62,8 @@ public:
virtual void removed(umundo::ServiceDescription);
virtual void changed(umundo::ServiceDescription);
- virtual void welcome(umundo::TypedPublisher, const std::string& nodeId, const std::string& subId);
- virtual void farewell(umundo::TypedPublisher, const std::string& nodeId, const std::string& subId);
+ virtual void welcome(umundo::TypedPublisher atPub, const umundo::SubscriberStub& sub);
+ virtual void farewell(umundo::TypedPublisher fromPub, const umundo::SubscriberStub& sub);
protected:
bool _isService;
@@ -74,17 +74,14 @@ protected:
bool jsonbufToData(Data& data, const JSONProto& json);
bool protobufToData(Data& data, const google::protobuf::Message& msg);
- boost::shared_ptr<umundo::Node> _node;
+ umundo::Node* _node;
+ umundo::Discovery* _discovery;
umundo::TypedPublisher* _pub;
umundo::TypedSubscriber* _sub;
umundo::ServiceFilter* _svcFilter;
umundo::ServiceManager* _svcMgr;
std::map<umundo::ServiceDescription, umundo::ServiceStub*> _svcs;
-
- static std::multimap<std::string, std::pair<std::string, boost::weak_ptr<umundo::Node> > > _nodes;
- typedef std::multimap<std::string, std::pair<std::string, boost::weak_ptr<umundo::Node> > > _nodes_t;
- static boost::shared_ptr<umundo::Node> getNode(InterpreterImpl* interpreter, const std::string& domain);
};
#ifdef BUILD_AS_PLUGINS