From a0013f1457494b00f2caca1eb4f23a3475be2259 Mon Sep 17 00:00:00 2001 From: Stefan Radomski Date: Tue, 15 Jan 2013 15:18:20 +0100 Subject: Fixed race condition when setting up ioprocessors --- src/uscxml/Factory.cpp | 25 ++++++--- src/uscxml/Interpreter.cpp | 61 +++++++++++++++------- src/uscxml/Interpreter.h | 1 + .../basichttp/libevent/EventIOProcessor.cpp | 17 +++--- 4 files changed, 70 insertions(+), 34 deletions(-) diff --git a/src/uscxml/Factory.cpp b/src/uscxml/Factory.cpp index 3220225..d90ab46 100644 --- a/src/uscxml/Factory.cpp +++ b/src/uscxml/Factory.cpp @@ -3,6 +3,7 @@ #include "uscxml/Factory.h" #include "uscxml/Message.h" +#include #ifdef BUILD_AS_PLUGINS # include "uscxml/plugins/Plugins.h" @@ -168,36 +169,48 @@ void Factory::registerInvoker(InvokerImpl* invoker) { boost::shared_ptr Factory::createInvoker(const std::string& type, Interpreter* interpreter) { Factory* factory = getInstance(); - if (factory->_invokerAliases.find(type) == factory->_invokerAliases.end()) + if (factory->_invokerAliases.find(type) == factory->_invokerAliases.end()) { + LOG(ERROR) << "No " << type << " Invoker known"; return boost::shared_ptr(); + } std::string canonicalName = factory->_invokerAliases[type]; - if (factory->_invokers.find(canonicalName) == factory->_invokers.end()) + if (factory->_invokers.find(canonicalName) == factory->_invokers.end()) { + LOG(ERROR) << "Invoker " << type << " known as " << canonicalName << " but not prototype is available in factory"; return boost::shared_ptr(); + } return boost::static_pointer_cast(factory->_invokers[canonicalName]->create(interpreter)); } boost::shared_ptr Factory::createDataModel(const std::string& type, Interpreter* interpreter) { Factory* factory = getInstance(); - if (factory->_dataModelAliases.find(type) == factory->_dataModelAliases.end()) + if (factory->_dataModelAliases.find(type) == factory->_dataModelAliases.end()) { + LOG(ERROR) << "No " << type << " DataModel known"; return boost::shared_ptr(); + } std::string canonicalName = factory->_dataModelAliases[type]; - if (factory->_dataModels.find(canonicalName) == factory->_dataModels.end()) + if (factory->_dataModels.find(canonicalName) == factory->_dataModels.end()) { + LOG(ERROR) << "DataModel " << type << " known as " << canonicalName << " but not prototype is available in factory"; return boost::shared_ptr(); + } return factory->_dataModels[canonicalName]->create(interpreter); } boost::shared_ptr Factory::createIOProcessor(const std::string& type, Interpreter* interpreter) { Factory* factory = getInstance(); - if (factory->_ioProcessorAliases.find(type) == factory->_ioProcessorAliases.end()) + if (factory->_ioProcessorAliases.find(type) == factory->_ioProcessorAliases.end()) { + LOG(ERROR) << "No " << type << " IOProcessor known"; return boost::shared_ptr(); + } std::string canonicalName = factory->_ioProcessorAliases[type]; - if (factory->_ioProcessors.find(canonicalName) == factory->_ioProcessors.end()) + if (factory->_ioProcessors.find(canonicalName) == factory->_ioProcessors.end()) { + LOG(ERROR) << "IOProcessor " << type << " known as " << canonicalName << " but not prototype is available in factory"; return boost::shared_ptr(); + } return factory->_ioProcessors[canonicalName]->create(interpreter); } diff --git a/src/uscxml/Interpreter.cpp b/src/uscxml/Interpreter.cpp index b46d60d..5449272 100644 --- a/src/uscxml/Interpreter.cpp +++ b/src/uscxml/Interpreter.cpp @@ -98,7 +98,7 @@ bool Interpreter::toAbsoluteURI(URL& uri) { return true; if (_baseURI.asString().size() > 0) { - if (uri.toAbsolute(_baseURI)); + if (uri.toAbsolute(_baseURI)) return true; return false; } @@ -183,6 +183,7 @@ bool Interpreter::runOnMainThread(int fps, bool blocking) { if (fps > 0) { uint64_t nextRun = _lastRunOnMainThread + (1000 / fps); if (blocking) { + tthread::lock_guard lock(_mutex); while(nextRun > tthread::timeStamp()) { tthread::this_thread::sleep_for(tthread::chrono::milliseconds(nextRun - tthread::timeStamp())); } @@ -231,32 +232,34 @@ void Interpreter::interpret() { setupIOProcessors(); - // executeGlobalScriptElements - NodeSet globalScriptElems = _xpath.evaluate("/" + _nsPrefix + "scxml/" + _nsPrefix + "script", _document).asNodeSet(); - for (unsigned int i = 0; i < globalScriptElems.size(); i++) { -// std::cout << globalScriptElems[i].getFirstChild().getNodeValue() << std::endl; - if (_dataModel) - executeContent(globalScriptElems[i]); - } - _running = true; - - std::string binding = _xpath.evaluate("/" + _nsPrefix + "scxml/@binding", _document).asString(); - _binding = (boost::iequals(binding, "late") ? LATE : EARLY); - - // initialize all data elements + _binding = (HAS_ATTR(_scxml, "binding") && boost::iequals(ATTR(_scxml, "binding"), "late") ? LATE : EARLY); + + // @TODO: Reread http://www.w3.org/TR/scxml/#DataBinding + if (_dataModel && _binding == EARLY) { + // initialize all data elements NodeSet dataElems = _xpath.evaluate("//" + _nsPrefix + "data", _document).asNodeSet(); for (unsigned int i = 0; i < dataElems.size(); i++) { initializeData(dataElems[i]); } } else if(_dataModel) { - NodeSet topDataElems = _xpath.evaluate("/" + _nsPrefix + "scxml/" + _nsPrefix + "datamodel/" + _nsPrefix + "data", _document).asNodeSet(); + // initialize current data elements + NodeSet topDataElems = filterChildElements("data", filterChildElements("datamodel", _scxml)); + // NodeSet topDataElems = _xpath.evaluate("/" + _nsPrefix + "scxml/" + _nsPrefix + "datamodel/" + _nsPrefix + "data", _document).asNodeSet(); for (unsigned int i = 0; i < topDataElems.size(); i++) { initializeData(topDataElems[i]); } } + // executeGlobalScriptElements + NodeSet globalScriptElems = _xpath.evaluate("/" + _nsPrefix + "scxml/" + _nsPrefix + "script", _document).asNodeSet(); + for (unsigned int i = 0; i < globalScriptElems.size(); i++) { + // std::cout << globalScriptElems[i].getFirstChild().getNodeValue() << std::endl; + if (_dataModel) + executeContent(globalScriptElems[i]); + } + // initial transition might be implict NodeSet initialTransitions = _xpath.evaluate("/" + _nsPrefix + "scxml/" + _nsPrefix + "initial/" + _nsPrefix + "transition", _document).asNodeSet(); if (initialTransitions.size() == 0) { @@ -286,17 +289,23 @@ void Interpreter::initializeData(const Arabica::DOM::Node& data) { return; } try { - if (!HAS_ATTR(data, "id")) + if (!HAS_ATTR(data, "id")) { + LOG(ERROR) << "Data element has no id!"; return; + } if (HAS_ATTR(data, "expr")) { std::string value = ATTR(data, "expr"); _dataModel.assign(ATTR(data, "id"), value); } else if (HAS_ATTR(data, "src")) { - Arabica::SAX::InputSourceResolver resolver(Arabica::SAX::InputSource(ATTR(data, "src")), - Arabica::default_string_adaptor()); - std::string value = std::string(std::istreambuf_iterator(*resolver.resolve()), std::istreambuf_iterator()); - _dataModel.assign(ATTR(data, "id"), value); + URL srcURL(ATTR(data, "src")); + if (!srcURL.isAbsolute()) + toAbsoluteURI(srcURL); + + std::stringstream ss; + ss << srcURL; + _dataModel.assign(ATTR(data, "id"), ss.str()); + } else if (data.hasChildNodes()) { // search for the text node with the actual script NodeList dataChilds = data.getChildNodes(); @@ -674,6 +683,7 @@ void Interpreter::delayedSend(void* userdata, std::string eventName) { // send to invoker std::string invokeId = sendReq.target.substr(2, sendReq.target.length() - 2); if (INSTANCE->_invokers.find(invokeId) != INSTANCE->_invokers.end()) { + tthread::lock_guard lock(INSTANCE->_mutex); INSTANCE->_invokers[invokeId].send(sendReq); } else { LOG(ERROR) << "Can not send to invoked component '" << invokeId << "', no such invokeId" << std::endl; @@ -778,6 +788,7 @@ void Interpreter::invoke(const Arabica::DOM::Node& element) { Invoker invoker(Factory::createInvoker(invokeReq.type, this)); if (invoker) { + tthread::lock_guard lock(_mutex); _invokers[invokeReq.invokeid] = invoker; LOG(INFO) << "Added invoker " << invokeReq.type << " at " << invokeReq.invokeid; invoker.invoke(invokeReq); @@ -1636,6 +1647,14 @@ std::vector Interpreter::tokenizeIdRefs(const std::string& idRefs) return ids; } +NodeSet Interpreter::filterChildElements(const std::string& tagName, const NodeSet& nodeSet) { + NodeSet filteredChildElems; + for (unsigned int i = 0; i < nodeSet.size(); i++) { + filteredChildElems.push_back(filterChildElements(tagName, nodeSet[i])); + } + return filteredChildElems; +} + NodeSet Interpreter::filterChildElements(const std::string& tagName, const Node& node) { NodeSet filteredChildElems; NodeList childs = node.getChildNodes(); @@ -1797,6 +1816,7 @@ bool Interpreter::isCompound(const Arabica::DOM::Node& state) { } void Interpreter::setupIOProcessors() { + tthread::lock_guard lock(_mutex); std::map::iterator ioProcIter = Factory::getInstance()->_ioProcessors.begin(); while(ioProcIter != Factory::getInstance()->_ioProcessors.end()) { _ioProcessors[ioProcIter->first] = Factory::createIOProcessor(ioProcIter->first, this); @@ -1814,6 +1834,7 @@ void Interpreter::setupIOProcessors() { } IOProcessor Interpreter::getIOProcessor(const std::string& type) { + tthread::lock_guard lock(_mutex); if (_ioProcessors.find(type) == _ioProcessors.end()) { LOG(ERROR) << "No ioProcessor known for type " << type; return IOProcessor(); diff --git a/src/uscxml/Interpreter.h b/src/uscxml/Interpreter.h index 33204b5..ab8cf6d 100644 --- a/src/uscxml/Interpreter.h +++ b/src/uscxml/Interpreter.h @@ -146,6 +146,7 @@ public: Arabica::XPath::NodeSet getTargetStates(const Arabica::DOM::Node& transition); static Arabica::XPath::NodeSet filterChildElements(const std::string& tagname, const Arabica::DOM::Node& node); + static Arabica::XPath::NodeSet filterChildElements(const std::string& tagName, const Arabica::XPath::NodeSet& nodeSet); static const std::string getUUID(); protected: diff --git a/src/uscxml/plugins/ioprocessor/basichttp/libevent/EventIOProcessor.cpp b/src/uscxml/plugins/ioprocessor/basichttp/libevent/EventIOProcessor.cpp index 2b6273c..7b90951 100644 --- a/src/uscxml/plugins/ioprocessor/basichttp/libevent/EventIOProcessor.cpp +++ b/src/uscxml/plugins/ioprocessor/basichttp/libevent/EventIOProcessor.cpp @@ -90,11 +90,10 @@ void EventIOProcessor::send(const SendRequest& req) { // use synchronous dns resolving for multicast dns if(hostName && strlen(hostName) >= strlen(".local")) { if(strcmp(hostName + strlen(hostName) - strlen(".local"), ".local") == 0) { - evhttp_uri_set_host(targetURI, EventIOServer::syncResolve(hostName).c_str()); + evhttp_uri_set_host(targetURI, EventIOServer::syncResolve(hostName).c_str()) && LOG(ERROR) << "evhttp_uri_set_host: " << strerror(errno); } } - evhttp_uri_join(targetURI, uriBuf, 1024); - + evhttp_uri_join(targetURI, uriBuf, 1024) || LOG(ERROR) << "evhttp_uri_join: " << strerror(errno); LOG(INFO) << "URI for send request: " << uriBuf << std::endl; int port = evhttp_uri_get_port(targetURI); @@ -117,7 +116,7 @@ void EventIOProcessor::send(const SendRequest& req) { // event name if (req.name.size() > 0) { - evhttp_add_header(evhttp_request_get_output_headers(httpReq), "_scxmleventname", evhttp_encode_uri(req.name.c_str())); + evhttp_add_header(evhttp_request_get_output_headers(httpReq), "_scxmleventname", evhttp_encode_uri(req.name.c_str())) && LOG(ERROR) << "evhttp_add_header: " << strerror(errno); } // event namelist @@ -126,7 +125,8 @@ void EventIOProcessor::send(const SendRequest& req) { while (namelistIter != req.namelist.end()) { evhttp_add_header(evhttp_request_get_output_headers(httpReq), namelistIter->first.c_str(), - evhttp_encode_uri(namelistIter->second.c_str())); + evhttp_encode_uri(namelistIter->second.c_str())) + && LOG(ERROR) << "evhttp_add_header: " << strerror(errno); namelistIter++; } } @@ -138,20 +138,21 @@ void EventIOProcessor::send(const SendRequest& req) { // LOG(INFO) << paramIter->first << " = " << paramIter->second << std::endl; evhttp_add_header(evhttp_request_get_output_headers(httpReq), paramIter->first.c_str(), - evhttp_encode_uri(paramIter->second.c_str())); + evhttp_encode_uri(paramIter->second.c_str())) + && LOG(ERROR) << "evhttp_add_header: " << strerror(errno); paramIter++; } } // content if (req.content.size() > 0) - evbuffer_add(evhttp_request_get_output_buffer(httpReq), req.content.c_str(), req.content.size()); + evbuffer_add(evhttp_request_get_output_buffer(httpReq), req.content.c_str(), req.content.size()) && LOG(ERROR) << "evbuffer_add: " << strerror(errno); #if 0 evhttp_add_header(evhttp_request_get_output_headers(httpReq), "_scxmleventstruct", evhttp_encode_uri(req.toXMLString().c_str())); #endif // required as per http 1.1 RFC2616 section 14.23 - evhttp_add_header(evhttp_request_get_output_headers(httpReq), "Host", evhttp_uri_get_host(targetURI)); + evhttp_add_header(evhttp_request_get_output_headers(httpReq), "Host", evhttp_uri_get_host(targetURI)) && LOG(ERROR) << "evhttp_add_header: " << strerror(errno); _httpRequests[req.sendid] = httpReq; err = evhttp_make_request(httpConn, -- cgit v0.12