summaryrefslogtreecommitdiffstats
path: root/src/uscxml/plugins/ioprocessor/basichttp/libevent/EventIOProcessor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/uscxml/plugins/ioprocessor/basichttp/libevent/EventIOProcessor.cpp')
-rw-r--r--src/uscxml/plugins/ioprocessor/basichttp/libevent/EventIOProcessor.cpp395
1 files changed, 80 insertions, 315 deletions
diff --git a/src/uscxml/plugins/ioprocessor/basichttp/libevent/EventIOProcessor.cpp b/src/uscxml/plugins/ioprocessor/basichttp/libevent/EventIOProcessor.cpp
index 98d08b3..ed51ef8 100644
--- a/src/uscxml/plugins/ioprocessor/basichttp/libevent/EventIOProcessor.cpp
+++ b/src/uscxml/plugins/ioprocessor/basichttp/libevent/EventIOProcessor.cpp
@@ -40,32 +40,25 @@ EventIOProcessor::EventIOProcessor() {
}
EventIOProcessor::~EventIOProcessor() {
- _asyncQueue.stop();
- evdns_base_free(_dns, 1);
- EventIOServer* httpServer = EventIOServer::getInstance();
- httpServer->unregisterProcessor(this);
+ HTTPServer* httpServer = HTTPServer::getInstance();
+ httpServer->unregisterServlet(this);
}
+
boost::shared_ptr<IOProcessorImpl> EventIOProcessor::create(Interpreter* interpreter) {
boost::shared_ptr<EventIOProcessor> io = boost::shared_ptr<EventIOProcessor>(new EventIOProcessor());
io->_interpreter = interpreter;
- io->_dns = evdns_base_new(io->_asyncQueue._eventLoop, 1);
- assert(io->_dns);
- assert(evdns_base_count_nameservers(io->_dns) > 0);
-
// register at http server
- EventIOServer* httpServer = EventIOServer::getInstance();
- httpServer->registerProcessor(io.get());
+ std::string path = interpreter->getName();
+ path += "/basichttp";
+ if (!HTTPServer::registerServlet(path, io.get())) {
+ LOG(ERROR) << "Cannot register basichttp ioprocessor at " << path << ": " << " already taken";
+ }
- io->start();
return io;
}
-void EventIOProcessor::start() {
- _asyncQueue.start();
-}
-
Data EventIOProcessor::getDataModelVariables() {
Data data;
assert(_url.length() > 0);
@@ -73,335 +66,107 @@ Data EventIOProcessor::getDataModelVariables() {
return data;
}
-
-void EventIOProcessor::send(const SendRequest& req) {
-
- _sendData[req.sendid] = new SendData();
- _sendData[req.sendid]->scxmlReq = req;
- _sendData[req.sendid]->ioProcessor = this;
-
- int err = 0;
- char uriBuf[1024];
-
- struct evhttp_uri* targetURI = evhttp_uri_parse(_sendData[req.sendid]->scxmlReq.target.c_str());
- if (evhttp_uri_get_port(targetURI) < 0)
- evhttp_uri_set_port(targetURI, 80);
- const char* hostName = evhttp_uri_get_host(targetURI);
-
- // use synchronous dns resolving for multicast dns
- if(hostName && strlen(hostName) >= strlen(".local")) {
- if(strcmp(hostName + strlen(hostName) - strlen(".local"), ".local") == 0) {
- evhttp_uri_set_host(targetURI, EventIOServer::syncResolve(hostName).c_str()) && LOG(ERROR) << "evhttp_uri_set_host: " << strerror(errno);
- }
+void EventIOProcessor::httpRecvRequest(const HTTPServer::Request& req) {
+ Event reqEvent;
+ reqEvent.type = Event::EXTERNAL;
+ bool scxmlStructFound = false;
+
+ std::map<std::string, std::string>::const_iterator headerIter = req.headers.begin();
+ while(headerIter != req.headers.end()) {
+ if (boost::iequals("_scxmleventstruct", headerIter->first)) {
+ reqEvent = Event::fromXML(evhttp_decode_uri(headerIter->second.c_str()));
+ scxmlStructFound = true;
+ break;
+ } else if (boost::iequals("_scxmleventname", headerIter->first)) {
+ reqEvent.name = evhttp_decode_uri(headerIter->second.c_str());
+ } else {
+ reqEvent.data.compound[headerIter->first] = Data(evhttp_decode_uri(headerIter->second.c_str()), Data::VERBATIM);
+ }
+ headerIter++;
+ }
+
+ if (reqEvent.name.length() == 0)
+ reqEvent.name = req.type;
+
+ if (!scxmlStructFound) {
+ // get content into event
+ reqEvent.data.compound["content"] = Data(req.content, Data::VERBATIM);
}
- evhttp_uri_join(targetURI, uriBuf, 1024) || LOG(ERROR) << "evhttp_uri_join: " << strerror(errno);
- LOG(INFO) << "URI for send request: " << uriBuf << std::endl;
-
- int port = evhttp_uri_get_port(targetURI);
- if (port <= 0)
- port = 80;
-
- std::stringstream ssEndPoint;
- ssEndPoint << evhttp_uri_get_host(targetURI) << ":" << port;
- std::string endPoint = ssEndPoint.str();
-
- std::stringstream ssLocalURI;
- ssLocalURI << evhttp_uri_get_path(targetURI) << evhttp_uri_get_fragment(targetURI);
- std::string localURI = ssLocalURI.str();
+
+ returnEvent(reqEvent);
+ evhttp_send_reply(req.curlReq, 200, "OK", NULL);
+}
- if (_httpConnections.find(endPoint) == _httpConnections.end()) {
- struct evhttp_connection* conn = evhttp_connection_base_new(_asyncQueue._eventLoop, _dns, evhttp_uri_get_host(targetURI), evhttp_uri_get_port(targetURI));
- evhttp_connection_set_retries(conn, 3);
- _httpConnections[endPoint] = conn;
- }
+void EventIOProcessor::send(const SendRequest& req) {
- struct evhttp_connection* httpConn = _httpConnections[endPoint];
- struct evhttp_request* httpReq = evhttp_request_new(EventIOServer::httpSendReqDoneCallback, _sendData[req.sendid]);
+ std::string target = req.target;
+ URL targetURL(target);
- // event name
+ // event name
if (req.name.size() > 0) {
- evhttp_add_header(evhttp_request_get_output_headers(httpReq), "_scxmleventname", evhttp_encode_uri(req.name.c_str())) && LOG(ERROR) << "evhttp_add_header: " << strerror(errno);
+ targetURL.addOutHeader("_scxmleventname", evhttp_encode_uri(req.name.c_str()));
}
- // event namelist
+ // event namelist
if (req.namelist.size() > 0) {
std::map<std::string, std::string>::const_iterator namelistIter = req.namelist.begin();
while (namelistIter != req.namelist.end()) {
- evhttp_add_header(evhttp_request_get_output_headers(httpReq),
- namelistIter->first.c_str(),
- evhttp_encode_uri(namelistIter->second.c_str()))
- && LOG(ERROR) << "evhttp_add_header: " << strerror(errno);
+ targetURL.addOutHeader(namelistIter->first, namelistIter->second);
namelistIter++;
}
}
- // event params
+ // event params
if (req.params.size() > 0) {
std::multimap<std::string, std::string>::const_iterator paramIter = req.params.begin();
while (paramIter != req.params.end()) {
-// LOG(INFO) << paramIter->first << " = " << paramIter->second << std::endl;
- evhttp_add_header(evhttp_request_get_output_headers(httpReq),
- paramIter->first.c_str(),
- evhttp_encode_uri(paramIter->second.c_str()))
- && LOG(ERROR) << "evhttp_add_header: " << strerror(errno);
+ targetURL.addOutHeader(paramIter->first, paramIter->second);
paramIter++;
}
}
-
+
// content
if (req.content.size() > 0)
- evbuffer_add(evhttp_request_get_output_buffer(httpReq), req.content.c_str(), req.content.size()) && LOG(ERROR) << "evbuffer_add: " << strerror(errno);
-
-#if 0
- evhttp_add_header(evhttp_request_get_output_headers(httpReq), "_scxmleventstruct", evhttp_encode_uri(req.toXMLString().c_str()));
-#endif
- // required as per http 1.1 RFC2616 section 14.23
- evhttp_add_header(evhttp_request_get_output_headers(httpReq), "Host", evhttp_uri_get_host(targetURI)) && LOG(ERROR) << "evhttp_add_header: " << strerror(errno);
-
- _httpRequests[req.sendid] = httpReq;
- err = evhttp_make_request(httpConn,
- httpReq,
- EVHTTP_REQ_POST, localURI.c_str());
- if (err) {
- LOG(ERROR) << "Could not make http request to " << req.target;
- }
-}
+ targetURL.setOutContent(req.content);
-void EventIOProcessor::httpSendReqDone(struct SendData* sendData) {
- if (sendData->httpReq == NULL || evhttp_request_get_response_code(sendData->httpReq) != 200) {
- Event failureEvent;
- failureEvent.name = "error.communication";
- sendData->ioProcessor->returnEvent(failureEvent);
+ targetURL.setRequestType("post");
+ targetURL.addMonitor(this);
+
+ _sendRequests[req.sendid] = std::make_pair(targetURL, req);
+ URLFetcher::fetchURL(targetURL);
+}
+
+void EventIOProcessor::downloadStarted(const URL& url) {}
+
+void EventIOProcessor::downloadCompleted(const URL& url) {
+ std::map<std::string, std::pair<URL, SendRequest> >::iterator reqIter = _sendRequests.begin();
+ while(reqIter != _sendRequests.end()) {
+ if (reqIter->second.first == url) {
+ _sendRequests.erase(reqIter);
+ return;
+ }
+ reqIter++;
}
- delete _sendData[sendData->scxmlReq.sendid];
-}
-
-void EventIOProcessor::httpRecvReq(struct evhttp_request *req) {
-
- const char *cmdtype;
- struct evkeyvalq *headers;
- struct evkeyval *header;
- struct evbuffer *buf;
-
- switch (evhttp_request_get_command(req)) {
- case EVHTTP_REQ_GET:
- cmdtype = "GET";
- break;
- case EVHTTP_REQ_POST:
- cmdtype = "POST";
- break;
- case EVHTTP_REQ_HEAD:
- cmdtype = "HEAD";
- break;
- case EVHTTP_REQ_PUT:
- cmdtype = "PUT";
- break;
- case EVHTTP_REQ_DELETE:
- cmdtype = "DELETE";
- break;
- case EVHTTP_REQ_OPTIONS:
- cmdtype = "OPTIONS";
- break;
- case EVHTTP_REQ_TRACE:
- cmdtype = "TRACE";
- break;
- case EVHTTP_REQ_CONNECT:
- cmdtype = "CONNECT";
- break;
- case EVHTTP_REQ_PATCH:
- cmdtype = "PATCH";
- break;
- default:
- cmdtype = "unknown";
- break;
- }
-
- Event reqEvent;
- reqEvent.type = Event::EXTERNAL;
- bool scxmlStructFound = false;
-
- // map headers to event structure
- headers = evhttp_request_get_input_headers(req);
- for (header = headers->tqh_first; header;
- header = header->next.tqe_next) {
-// std::cout << "Header: " << header->key << std::endl;
-// std::cout << "Value: " << evhttp_decode_uri(header->value) << std::endl;
- if (boost::iequals("_scxmleventstruct", header->key)) {
- reqEvent = Event::fromXML(evhttp_decode_uri(header->value));
- scxmlStructFound = true;
- break;
- } else if (boost::iequals("_scxmleventname", header->key)) {
- reqEvent.name = evhttp_decode_uri(header->value);
- } else {
- reqEvent.compound[header->key] = Data(evhttp_decode_uri(header->value), Data::VERBATIM);
- }
- }
-
- if (reqEvent.name.length() == 0)
- reqEvent.name = cmdtype;
-
- if (!scxmlStructFound) {
- // get content into event
- std::string content;
- buf = evhttp_request_get_input_buffer(req);
- while (evbuffer_get_length(buf)) {
- int n;
- char cbuf[128];
- n = evbuffer_remove(buf, cbuf, sizeof(buf)-1);
- if (n > 0) {
- content.append(cbuf, n);
- }
- }
- reqEvent.compound["content"] = Data(content, Data::VERBATIM);
- }
-
- returnEvent(reqEvent);
- evhttp_send_reply(req, 200, "OK", NULL);
-}
-
-EventIOServer::EventIOServer(unsigned short port) {
- _port = port;
- _base = event_base_new();
- _http = evhttp_new(_base);
- _handle = NULL;
- while((_handle = evhttp_bind_socket_with_handle(_http, INADDR_ANY, _port)) == NULL) {
- _port++;
- }
- determineAddress();
-}
-
-EventIOServer::~EventIOServer() {
+ assert(false);
}
-EventIOServer* EventIOServer::_instance = NULL;
-tthread::recursive_mutex EventIOServer::_instanceMutex;
-
-EventIOServer* EventIOServer::getInstance() {
- tthread::lock_guard<tthread::recursive_mutex> lock(_instanceMutex);
- if (_instance == NULL) {
- _instance = new EventIOServer(8080);
- _instance->start();
- }
- return _instance;
-}
-
-void EventIOServer::registerProcessor(EventIOProcessor* processor) {
- EventIOServer* INSTANCE = getInstance();
- tthread::lock_guard<tthread::recursive_mutex> lock(INSTANCE->_mutex);
-
- /**
- * Determine path for interpreter.
- *
- * If the interpreter has a name and it is not yet taken, choose it as the path
- * for requests. If the interpreters name path is already taken, append digits
- * until we have an available path.
- *
- * If the interpreter does not specify a name, take its sessionid.
- */
-
- std::string path = processor->getPath();
- if (path.size() == 0) {
- path = processor->_interpreter->getSessionId();
- }
- assert(path.size() > 0);
-
- std::stringstream actualPath(path);
- int i = 1;
- while(INSTANCE->_processors.find(actualPath.str()) != INSTANCE->_processors.end()) {
- actualPath.str(std::string());
- actualPath.clear();
- actualPath << path << ++i;
- }
-
- std::stringstream processorURL;
- processorURL << "http://" << INSTANCE->_address << ":" << INSTANCE->_port << "/" << actualPath.str();
-
- INSTANCE->_processors[actualPath.str()] = processor;
- processor->setURL(processorURL.str());
-
- LOG(INFO) << "SCXML listening at: " << processorURL.str() << std::endl;
-
- evhttp_set_cb(INSTANCE->_http, ("/" + actualPath.str()).c_str(), EventIOServer::httpRecvReqCallback, processor);
+void EventIOProcessor::downloadFailed(const URL& url, int errorCode) {
-// evhttp_set_cb(THIS->_http, "/", EventIOProcessor::httpRecvReq, processor);
-// evhttp_set_gencb(THIS->_http, EventIOProcessor::httpRecvReq, NULL);
-}
-
-void EventIOServer::unregisterProcessor(EventIOProcessor* processor) {
- EventIOServer* INSTANCE = getInstance();
- tthread::lock_guard<tthread::recursive_mutex> lock(INSTANCE->_mutex);
- evhttp_del_cb(INSTANCE->_http, processor->_url.c_str());
-}
-
-void EventIOServer::start() {
- _isRunning = true;
- _thread = new tthread::thread(EventIOServer::run, this);
-}
-
-void EventIOServer::run(void* instance) {
- EventIOServer* INSTANCE = (EventIOServer*)instance;
- while(INSTANCE->_isRunning) {
- event_base_dispatch(INSTANCE->_base);
- }
- LOG(INFO) << "HTTP Server stopped" << std::endl;
-}
-
-std::string EventIOServer::syncResolve(const std::string& hostname) {
- struct hostent *he;
- struct in_addr **addr_list;
- int i;
+ std::map<std::string, std::pair<URL, SendRequest> >::iterator reqIter = _sendRequests.begin();
+ while(reqIter != _sendRequests.end()) {
+ if (reqIter->second.first == url) {
+ Event failEvent;
+ failEvent.name = "error.communication";
+ returnEvent(failEvent);
+
+ _sendRequests.erase(reqIter);
+ return;
+ }
+ reqIter++;
+ }
+ assert(false);
- if ( (he = gethostbyname( hostname.c_str() ) ) != NULL) {
- addr_list = (struct in_addr **) he->h_addr_list;
- for(i = 0; addr_list[i] != NULL; i++) {
- return std::string(inet_ntoa(*addr_list[i]));
- }
- }
- return "";
}
-void EventIOServer::determineAddress() {
-
- char hostname[1024];
- gethostname(hostname, 1024);
- _address = std::string(hostname);
-
-#if 0
- struct sockaddr_storage ss;
- evutil_socket_t fd;
- ev_socklen_t socklen = sizeof(ss);
- char addrbuf[128];
-
- void *inaddr;
- const char *addr;
- int got_port = -1;
- fd = evhttp_bound_socket_get_fd(_handle);
- memset(&ss, 0, sizeof(ss));
- if (getsockname(fd, (struct sockaddr *)&ss, &socklen)) {
- perror("getsockname() failed");
- return;
- }
-
- if (ss.ss_family == AF_INET) {
- got_port = ntohs(((struct sockaddr_in*)&ss)->sin_port);
- inaddr = &((struct sockaddr_in*)&ss)->sin_addr;
- } else if (ss.ss_family == AF_INET6) {
- got_port = ntohs(((struct sockaddr_in6*)&ss)->sin6_port);
- inaddr = &((struct sockaddr_in6*)&ss)->sin6_addr;
- } else {
- fprintf(stderr, "Weird address family %d\n",
- ss.ss_family);
- return;
- }
- addr = evutil_inet_ntop(ss.ss_family, inaddr, addrbuf,
- sizeof(addrbuf));
- if (addr) {
- _address = std::string(addr);
- } else {
- fprintf(stderr, "evutil_inet_ntop failed\n");
- return;
- }
-#endif
-}
} \ No newline at end of file