diff options
Diffstat (limited to 'src/uscxml/ioprocessor/basichttp')
7 files changed, 598 insertions, 0 deletions
diff --git a/src/uscxml/ioprocessor/basichttp/README.md b/src/uscxml/ioprocessor/basichttp/README.md new file mode 100644 index 0000000..de89944 --- /dev/null +++ b/src/uscxml/ioprocessor/basichttp/README.md @@ -0,0 +1,2 @@ +Only the libevent basichttp ioprocessor is supported. Mongoose seemed somewhat +unmaintained and pion comes with too many dependencies.
\ No newline at end of file diff --git a/src/uscxml/ioprocessor/basichttp/libevent/EventIOProcessor.cpp b/src/uscxml/ioprocessor/basichttp/libevent/EventIOProcessor.cpp new file mode 100644 index 0000000..c06c7e8 --- /dev/null +++ b/src/uscxml/ioprocessor/basichttp/libevent/EventIOProcessor.cpp @@ -0,0 +1,361 @@ +#include "uscxml/ioprocessor/basichttp/libevent/EventIOProcessor.h" +#include "uscxml/Message.h" +#include <iostream> +#include <event2/dns.h> +#include <event2/buffer.h> +#include <event2/keyvalq_struct.h> + +#include <string.h> + +#include <io/uri.hpp> +#include <glog/logging.h> + +#include <netdb.h> +#include <arpa/inet.h> + +namespace uscxml { +namespace io { +namespace libevent { + +// see http://www.w3.org/TR/scxml/#BasicHTTPEventProcessor + +EventIOProcessor::EventIOProcessor() { +} + +EventIOProcessor::~EventIOProcessor() { + _eventQueue.stop(); + EventIOServer* httpServer = EventIOServer::getInstance(); + httpServer->unregisterProcessor(this); +} + +IOProcessor* EventIOProcessor::create(Interpreter* interpreter) { + EventIOProcessor* io = new EventIOProcessor(); + io->_interpreter = interpreter; + + io->_dns = evdns_base_new(io->_eventQueue._eventLoop, 1); + assert(io->_dns); + assert(evdns_base_count_nameservers(io->_dns) > 0); + + // register at http server + EventIOServer* httpServer = EventIOServer::getInstance(); + httpServer->registerProcessor(io); + + io->start(); + return io; +} + +void EventIOProcessor::start() { + _eventQueue.start(); +} + +void EventIOProcessor::send(SendRequest& req) { + // I cant figure out how to copy the reference into the struct :( + _sendData[req.sendid].req = req; + _sendData[req.sendid].ioProcessor = this; + + if (req.delayMs > 0) { + LOG(INFO) << "Enqueing HTTP send request"; + _eventQueue.addEvent(req.sendid, EventIOProcessor::httpMakeSendReq, req.delayMs, &_sendData[req.sendid]); + } else { + LOG(INFO) << "Sending HTTP send request"; + EventIOProcessor::httpMakeSendReq(&_sendData[req.sendid], req.sendid); + } +} + +void EventIOProcessor::httpMakeSendReq(void* userdata, std::string eventName) { + SendData* sendData = ((SendData*)userdata); + EventIOProcessor* THIS = sendData->ioProcessor; + int err = 0; + char uriBuf[1024]; + + struct evhttp_uri* targetURI = evhttp_uri_parse(sendData->req.target.c_str()); + if (evhttp_uri_get_port(targetURI) == 0) + evhttp_uri_set_port(targetURI, 80); + const char* hostName = evhttp_uri_get_host(targetURI); + + // use synchronous dns resolving for multicast dns + if(strlen(hostName) >= strlen(".local")) { + if(strcmp(hostName + strlen(hostName) - strlen(".local"), ".local") == 0) { + evhttp_uri_set_host(targetURI, EventIOServer::syncResolve(hostName).c_str()); + } + } + evhttp_uri_join(targetURI, uriBuf, 1024); + + LOG(INFO) << "URI for send request: " << uriBuf << std::endl; + + std::stringstream ssEndPoint; + ssEndPoint << evhttp_uri_get_host(targetURI) << ":" << evhttp_uri_get_port(targetURI); + std::string endPoint = ssEndPoint.str(); + + std::stringstream ssLocalURI; + ssLocalURI << evhttp_uri_get_path(targetURI) << evhttp_uri_get_fragment(targetURI); + std::string localURI = ssLocalURI.str(); + + if (THIS->_httpConnections.find(endPoint) == THIS->_httpConnections.end()) + THIS->_httpConnections[endPoint] = evhttp_connection_base_new(THIS->_eventQueue._eventLoop, THIS->_dns, evhttp_uri_get_host(targetURI), evhttp_uri_get_port(targetURI)); + + struct evhttp_connection* httpConn = THIS->_httpConnections[endPoint]; + struct evhttp_request* httpReq = evhttp_request_new(EventIOProcessor::httpSendReqDone, userdata); + +#if 0 + // event name + if (sendData->req.event.size() > 0) { + evhttp_add_header(evhttp_request_get_output_headers(httpReq), "_scxmleventname", evhttp_encode_uri(sendData->req.event.c_str())); + } + + // event namelist + if (sendData->req.namelist.size() > 0) { + std::map<std::string, std::string>::iterator namelistIter = sendData->req.namelist.begin(); + while (namelistIter != sendData->req.namelist.end()) { + evhttp_add_header(evhttp_request_get_output_headers(httpReq), + namelistIter->first.c_str(), + evhttp_encode_uri(namelistIter->second.c_str())); + namelistIter++; + } + } + + // event params + if (sendData->req.params.size() > 0) { + std::map<std::string, std::string>::iterator paramIter = sendData->req.params.begin(); + while (paramIter != sendData->req.params.end()) { + evhttp_add_header(evhttp_request_get_output_headers(httpReq), + paramIter->first.c_str(), + evhttp_encode_uri(paramIter->second.c_str())); + paramIter++; + } + } + + // content + if (sendData->req.content.size() > 0) + evbuffer_add(evhttp_request_get_output_buffer(httpReq), sendData->req.content.c_str(), sendData->req.content.size()); +#endif + + evhttp_add_header(evhttp_request_get_output_headers(httpReq), "_scxmleventstruct", evhttp_encode_uri(sendData->req.toXMLString().c_str())); + + + THIS->_httpRequests[sendData->req.sendid] = httpReq; + err = evhttp_make_request(httpConn, + httpReq, + EVHTTP_REQ_POST, localURI.c_str()); + if (err) { + LOG(ERROR) << "Could not make http request to " << sendData->req.target; + } +} + +void EventIOProcessor::httpRecvReq(struct evhttp_request *req, void *arg) { + + const char *cmdtype; + struct evkeyvalq *headers; + struct evkeyval *header; + struct evbuffer *buf; + + switch (evhttp_request_get_command(req)) { + case EVHTTP_REQ_GET: cmdtype = "GET"; break; + case EVHTTP_REQ_POST: cmdtype = "POST"; break; + case EVHTTP_REQ_HEAD: cmdtype = "HEAD"; break; + case EVHTTP_REQ_PUT: cmdtype = "PUT"; break; + case EVHTTP_REQ_DELETE: cmdtype = "DELETE"; break; + case EVHTTP_REQ_OPTIONS: cmdtype = "OPTIONS"; break; + case EVHTTP_REQ_TRACE: cmdtype = "TRACE"; break; + case EVHTTP_REQ_CONNECT: cmdtype = "CONNECT"; break; + case EVHTTP_REQ_PATCH: cmdtype = "PATCH"; break; + default: cmdtype = "unknown"; break; + } + + Event reqEvent; + reqEvent.type = Event::EXTERNAL; + + // map headers to event structure + headers = evhttp_request_get_input_headers(req); + for (header = headers->tqh_first; header; + header = header->next.tqe_next) { +// std::cout << "Header: " << header->key << std::endl; +// std::cout << "Value: " << evhttp_decode_uri(header->value) << std::endl; + if (boost::iequals("_scxmleventstruct", header->key)) { + reqEvent = Event::fromXML(evhttp_decode_uri(header->value)); + break; + } else if (boost::iequals("_scxmleventname", header->key)) { + reqEvent.name = evhttp_decode_uri(header->value); + } else { + reqEvent.compound[header->key] = Data(evhttp_decode_uri(header->value), Data::VERBATIM); + } + } + + // get content into event + std::string content; + buf = evhttp_request_get_input_buffer(req); + while (evbuffer_get_length(buf)) { + int n; + char cbuf[128]; + n = evbuffer_remove(buf, cbuf, sizeof(buf)-1); + if (n > 0) { + content.append(cbuf, n); + } + } + reqEvent.compound["content"] = Data(content, Data::VERBATIM); + + EventIOProcessor* THIS = (EventIOProcessor*)arg; + THIS->_interpreter->receive(reqEvent); + + evhttp_send_reply(req, 200, "OK", NULL); +} + +void EventIOProcessor::httpSendReqDone(struct evhttp_request *req, void *cb_arg) { + if (req) { + LOG(INFO) << "got return code " << evhttp_request_get_response_code(req) << std::endl; + } +} + +void EventIOProcessor::invoke(InvokeRequest& req) { + +} +void EventIOProcessor::cancel(const std::string sendId) { + +} + +EventIOServer::EventIOServer(unsigned short port) { + _port = port; + _base = event_base_new(); + _http = evhttp_new(_base); + _handle = NULL; + while((_handle = evhttp_bind_socket_with_handle(_http, INADDR_ANY, _port)) == NULL) { + _port++; + } + determineAddress(); +} + +EventIOServer::~EventIOServer() { +} + +EventIOServer* EventIOServer::_instance = NULL; +tthread::recursive_mutex EventIOServer::_instanceMutex; + +EventIOServer* EventIOServer::getInstance() { + tthread::lock_guard<tthread::recursive_mutex> lock(_instanceMutex); + if (_instance == NULL) { + _instance = new EventIOServer(8080); + _instance->start(); + } + return _instance; +} + +void EventIOServer::registerProcessor(EventIOProcessor* processor) { + EventIOServer* THIS = getInstance(); + tthread::lock_guard<tthread::recursive_mutex> lock(THIS->_mutex); + + /** + * Determine path for interpreter. + * + * If the interpreter has a name and it is not yet taken, choose it as the path + * for requests. If the interpreters name path is already taken, append digits + * until we have an available path. + * + * If the interpreter does not specify a name, take its sessionid. + */ + + std::string path = processor->_interpreter->getName(); + if (path.size() == 0) { + path = processor->_interpreter->getSessionId(); + } + assert(path.size() > 0); + + std::stringstream actualPath(path); + int i = 1; + while(THIS->_processors.find(actualPath.str()) != THIS->_processors.end()) { + actualPath.str(std::string()); + actualPath.clear(); + actualPath << path << ++i; + } + + std::stringstream processorURL; + processorURL << "http://" << THIS->_address << ":" << THIS->_port << "/" << actualPath.str(); + + THIS->_processors[actualPath.str()] = processor; + processor->setURL(processorURL.str()); + + evhttp_set_cb(THIS->_http, ("/" + actualPath.str()).c_str(), EventIOProcessor::httpRecvReq, processor); +// evhttp_set_cb(THIS->_http, "/", EventIOProcessor::httpRecvReq, processor); +// evhttp_set_gencb(THIS->_http, EventIOProcessor::httpRecvReq, NULL); +} + +void EventIOServer::unregisterProcessor(EventIOProcessor* processor) { + EventIOServer* THIS = getInstance(); + tthread::lock_guard<tthread::recursive_mutex> lock(THIS->_mutex); + evhttp_del_cb(THIS->_http, processor->getURL().c_str()); +} + +void EventIOServer::start() { + _isRunning = true; + _thread = new tthread::thread(EventIOServer::run, this); +} + +void EventIOServer::run(void* instance) { + EventIOServer* THIS = (EventIOServer*)instance; + while(THIS->_isRunning) { + LOG(INFO) << "Dispatching HTTP Server" << std::endl; + event_base_dispatch(THIS->_base); + } + LOG(INFO) << "HTTP Server stopped" << std::endl; +} + +std::string EventIOServer::syncResolve(const std::string& hostname) { + struct hostent *he; + struct in_addr **addr_list; + int i; + + if ( (he = gethostbyname( hostname.c_str() ) ) != NULL) { + addr_list = (struct in_addr **) he->h_addr_list; + for(i = 0; addr_list[i] != NULL; i++) { + return std::string(inet_ntoa(*addr_list[i])); + } + } + return ""; +} + +void EventIOServer::determineAddress() { + + char hostname[1024]; + gethostname(hostname, 1024); + _address = std::string(hostname); + +#if 0 + struct sockaddr_storage ss; + evutil_socket_t fd; + ev_socklen_t socklen = sizeof(ss); + char addrbuf[128]; + + void *inaddr; + const char *addr; + int got_port = -1; + fd = evhttp_bound_socket_get_fd(_handle); + memset(&ss, 0, sizeof(ss)); + if (getsockname(fd, (struct sockaddr *)&ss, &socklen)) { + perror("getsockname() failed"); + return; + } + + if (ss.ss_family == AF_INET) { + got_port = ntohs(((struct sockaddr_in*)&ss)->sin_port); + inaddr = &((struct sockaddr_in*)&ss)->sin_addr; + } else if (ss.ss_family == AF_INET6) { + got_port = ntohs(((struct sockaddr_in6*)&ss)->sin6_port); + inaddr = &((struct sockaddr_in6*)&ss)->sin6_addr; + } else { + fprintf(stderr, "Weird address family %d\n", + ss.ss_family); + return; + } + addr = evutil_inet_ntop(ss.ss_family, inaddr, addrbuf, + sizeof(addrbuf)); + if (addr) { + _address = std::string(addr); + } else { + fprintf(stderr, "evutil_inet_ntop failed\n"); + return; + } +#endif +} + +} +} +}
\ No newline at end of file diff --git a/src/uscxml/ioprocessor/basichttp/libevent/EventIOProcessor.h b/src/uscxml/ioprocessor/basichttp/libevent/EventIOProcessor.h new file mode 100644 index 0000000..7e8eaa9 --- /dev/null +++ b/src/uscxml/ioprocessor/basichttp/libevent/EventIOProcessor.h @@ -0,0 +1,96 @@ +#ifndef EVENTIOPROCESSOR_H_2CUY93KU +#define EVENTIOPROCESSOR_H_2CUY93KU + +#include "uscxml/concurrency/eventqueue/libevent/DelayedEventQueue.h" +#include "uscxml/Interpreter.h" +#include "uscxml/Factory.h" +#include <sys/time.h> + +#include <event2/http.h> +#include <event2/http_struct.h> + +namespace uscxml { +namespace io { +namespace libevent { + +class EventIOServer; + +class EventIOProcessor : public uscxml::IOProcessor { +public: + struct SendData { + EventIOProcessor* ioProcessor; + uscxml::SendRequest req; + }; + + EventIOProcessor(); + virtual ~EventIOProcessor(); + virtual IOProcessor* create(uscxml::Interpreter* interpreter); + + virtual void send(uscxml::SendRequest& req); + virtual void invoke(uscxml::InvokeRequest& req); + virtual void cancel(const std::string sendId); + + std::string getURL() { return _url; } + void setURL(const std::string& url) { _url = url; } + + void start(); + static void run(void* instance); + + static void httpMakeSendReq(void* userdata, std::string eventName); + static void httpSendReqDone(struct evhttp_request *req, void *cb_arg); + static void httpRecvReq(struct evhttp_request *req, void *arg); + +protected: + std::map<std::string, SendData> _sendData; + + std::string _url; + + uscxml::DelayedEventQueue _eventQueue; + uscxml::Interpreter* _interpreter; + std::map<std::string, struct evhttp_connection*> _httpConnections; + std::map<std::string, struct evhttp_request*> _httpRequests; + struct evdns_base* _dns; + + friend class EventIOServer; +}; + +class EventIOServer { +private: + static EventIOServer* getInstance(); + EventIOServer(unsigned short port); + ~EventIOServer(); + + void start(); + void stop(); + static void run(void* instance); + + void determineAddress(); + static std::string syncResolve(const std::string& hostname); + + static void registerProcessor(EventIOProcessor* processor); + static void unregisterProcessor(EventIOProcessor* processor); + + + std::map<std::string, EventIOProcessor*> _processors; + + struct event_base* _base; + struct evhttp* _http; + struct evhttp_bound_socket* _handle; + + unsigned short _port; + std::string _address; + + static EventIOServer* _instance; + static tthread::recursive_mutex _instanceMutex; + tthread::thread* _thread; + tthread::recursive_mutex _mutex; + bool _isRunning; + + friend class EventIOProcessor; +}; + +} +} +} + +#endif /* end of include guard: EVENTIOPROCESSOR_H_2CUY93KU */
\ No newline at end of file diff --git a/src/uscxml/ioprocessor/basichttp/mongoose/MongooseIOProcessor.cpp b/src/uscxml/ioprocessor/basichttp/mongoose/MongooseIOProcessor.cpp new file mode 100644 index 0000000..a62fefc --- /dev/null +++ b/src/uscxml/ioprocessor/basichttp/mongoose/MongooseIOProcessor.cpp @@ -0,0 +1,3 @@ +#include "uscxml/ioprocessor/basichttp/mongoose/MongooseIOProcessor.h" +#include "uscxml/Message.h" + diff --git a/src/uscxml/ioprocessor/basichttp/mongoose/MongooseIOProcessor.h b/src/uscxml/ioprocessor/basichttp/mongoose/MongooseIOProcessor.h new file mode 100644 index 0000000..bb7a0fc --- /dev/null +++ b/src/uscxml/ioprocessor/basichttp/mongoose/MongooseIOProcessor.h @@ -0,0 +1,15 @@ +#ifndef MONGOOSEIOPROCESSOR_H_JS0GMSFO +#define MONGOOSEIOPROCESSOR_H_JS0GMSFO + +#include "uscxml/Interpreter.h" +#include "uscxml/Factory.h" + +namespace uscxml { + +class MongooseIOProcessor : public IOProcessor { + +}; + +} + +#endif /* end of include guard: MONGOOSEIOPROCESSOR_H_JS0GMSFO */ diff --git a/src/uscxml/ioprocessor/basichttp/pion/PionIOProcessor.cpp b/src/uscxml/ioprocessor/basichttp/pion/PionIOProcessor.cpp new file mode 100644 index 0000000..7aa9169 --- /dev/null +++ b/src/uscxml/ioprocessor/basichttp/pion/PionIOProcessor.cpp @@ -0,0 +1,74 @@ +#include "uscxml/ioprocessor/basichttp/pion/PionIOProcessor.h" +#include "uscxml/Message.h" + +#include <pion/http/request.hpp> +#include <pion/http/message.hpp> + +namespace uscxml { + +using namespace pion; + +PionIOProcessor::PionIOProcessor() { +} + +PionIOProcessor::~PionIOProcessor() { + +} + +IOProcessor* PionIOProcessor::create(Interpreter* interpreter) { + PionIOProcessor* io = new PionIOProcessor(); + io->_interpreter = interpreter; + io->_ioServer = PionIOServer::getInstance(); + return io; +} + +void handle_connection(pion::tcp::connection_ptr& tcp_conn) { +} + +void PionIOProcessor::send(SendRequest& req) { + + boost::system::error_code error_code; + boost::asio::io_service io_service; + + pion::tcp::connection tcp_conn(io_service, 0); + error_code = tcp_conn.connect("localhost", 8080); + if (error_code) throw error_code; // connection failed + + + + http::request httpReq; + httpReq.set_method("POST"); + if (req.event.size() > 0) + httpReq.add_header("_scxmleventname", req.event); + + httpReq.send(tcp_conn, error_code); + +// http::request_writer writer; +// writer. + +} +void PionIOProcessor::invoke(InvokeRequest& req) { + +} +void PionIOProcessor::cancel(const std::string sendId) { + +} + +PionIOServer::PionIOServer() : pion::tcp::server(0) { +} + +PionIOServer::~PionIOServer() { +} + +void PionIOServer::handle_connection(pion::tcp::connection_ptr& tcp_conn) { +} + +PionIOServer* PionIOServer::_instance = NULL; +PionIOServer* PionIOServer::getInstance() { + if (_instance == NULL) { + _instance = new PionIOServer(); + } + return _instance; +} + +}
\ No newline at end of file diff --git a/src/uscxml/ioprocessor/basichttp/pion/PionIOProcessor.h b/src/uscxml/ioprocessor/basichttp/pion/PionIOProcessor.h new file mode 100644 index 0000000..154acdb --- /dev/null +++ b/src/uscxml/ioprocessor/basichttp/pion/PionIOProcessor.h @@ -0,0 +1,47 @@ +#ifndef PIONIOPROCESSOR_H_VAITDNCN +#define PIONIOPROCESSOR_H_VAITDNCN + +#include "uscxml/Interpreter.h" +#include "uscxml/Factory.h" +#include "uscxml/concurrency/DelayedEventQueue.h" + +#include <pion/http/request_writer.hpp> +#include <pion/http/request_writer.hpp> +#include <pion/tcp/server.hpp> + +namespace uscxml { + +class PionIOServer : public pion::tcp::server { +public: + PionIOServer(); + virtual ~PionIOServer(); + DelayedEventQueue _eventQueue; + + virtual void handle_connection(pion::tcp::connection_ptr& tcp_conn); + + static PionIOServer* getInstance(); + static PionIOServer* _instance; + + pion::http::request_writer_ptr _writer; + pion::tcp::connection_ptr _conn; + +}; + +class PionIOProcessor : public IOProcessor { +public: + PionIOProcessor(); + virtual ~PionIOProcessor(); + virtual IOProcessor* create(Interpreter* interpreter); + + virtual void send(SendRequest& req); + virtual void invoke(InvokeRequest& req); + virtual void cancel(const std::string sendId); + +protected: + Interpreter* _interpreter; + PionIOServer* _ioServer; +}; + +} + +#endif /* end of include guard: PIONIOPROCESSOR_H_VAITDNCN */ |