From 69d34f5598974dddeab14e0fe9169108362622ec Mon Sep 17 00:00:00 2001 From: Stefan Radomski Date: Sat, 16 Nov 2013 01:44:33 +0100 Subject: WebSockets! --- CMakeLists.txt | 6 +- apps/samples/websockets/websockets.html | 146 ++++----- apps/samples/websockets/websockets.scxml | 37 ++- apps/uscxml-browser.cpp | 2 +- src/uscxml/CMakeLists.txt | 7 +- src/uscxml/Factory.cpp | 6 +- src/uscxml/Interpreter.cpp | 14 +- src/uscxml/Interpreter.h | 15 +- src/uscxml/Message.h | 8 +- .../plugins/element/respond/RespondElement.cpp | 4 +- src/uscxml/server/HTTPServer.cpp | 353 +++++++++++++++------ src/uscxml/server/HTTPServer.h | 71 ++++- src/uscxml/server/InterpreterServlet.cpp | 136 +++++++- src/uscxml/server/InterpreterServlet.h | 58 +++- src/uscxml/server/WebSocketServer.cpp | 0 src/uscxml/server/WebSocketServer.h | 8 - 16 files changed, 639 insertions(+), 232 deletions(-) delete mode 100644 src/uscxml/server/WebSocketServer.cpp delete mode 100644 src/uscxml/server/WebSocketServer.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 32b1700..0454f4c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -254,6 +254,7 @@ if (WIN32) include_directories(${PROJECT_SOURCE_DIR}/contrib/src/inttypes) endif() include_directories(${PROJECT_SOURCE_DIR}/contrib/src/jsmn) +include_directories(${PROJECT_SOURCE_DIR}/contrib/src/evws) #include_directories(${PROJECT_SOURCE_DIR}/contrib/src/google-url) ############################################################ @@ -913,7 +914,10 @@ endif() # Header Files ############################################################ -file(GLOB_RECURSE USCXML_HEADERS ${PROJECT_SOURCE_DIR}/src/*.h ${CMAKE_BINARY_DIR}/*.h) +file(GLOB_RECURSE USCXML_HEADERS + ${PROJECT_SOURCE_DIR}/src/*.h + ${PROJECT_SOURCE_DIR}/src/*.hpp + ${CMAKE_BINARY_DIR}/*.h) INSTALL_HEADERS(HEADERS ${USCXML_HEADERS} COMPONENT headers) ############################################################ diff --git a/apps/samples/websockets/websockets.html b/apps/samples/websockets/websockets.html index 9a21f46..c90f728 100644 --- a/apps/samples/websockets/websockets.html +++ b/apps/samples/websockets/websockets.html @@ -1,77 +1,79 @@ - + - - + + + + WebSocket Test + + + var littlePacket = "0123456789ABCDEF"; + var mediumPacket = ""; + var hugePacket = ""; + for (var i = 0; i < 16; i++) { + mediumPacket += littlePacket; + } + for (var i = 0; i < 16; i++) { + hugePacket += mediumPacket; + } - - - - - - - - - -
-
-
-
- + function init() { + output = document.getElementById("output"); + testWebSocket(); + } + function testWebSocket() { + websocket = new WebSocket(wsUri); + websocket.onopen = function(evt) { + onOpen(evt) + }; + websocket.onclose = function(evt) { + onClose(evt) + }; + websocket.onmessage = function(evt) { + onMessage(evt) + }; + websocket.onerror = function(evt) { + onError(evt) + }; + } + function onOpen(evt) { + writeToScreen("CONNECTED"); + doSend(littlePacket); + // doSend(mediumPacket); + // doSend(hugePacket); + } + function onClose(evt) { + writeToScreen("DISCONNECTED"); + } + function onMessage(evt) { + var value = evt.data.replace(/&/g,'&').replace(//g,'>'); + writeToScreen('RESPONSE:
' + value + '
<\/span>'); +// websocket.close(); + } + function onError(evt) { + writeToScreen('ERROR:<\/span> ' + evt.data); + } + function doSend(message) { + writeToScreen("SENT: " + message); + websocket.send(message); + } + function writeToScreen(message) { + var pre = document.createElement("p"); + pre.style.wordWrap = "break-word"; + pre.innerHTML = message; + output.appendChild(pre); + } + window.addEventListener("load", init, false); + + + +

+ WebSocket Test +

+
+ diff --git a/apps/samples/websockets/websockets.scxml b/apps/samples/websockets/websockets.scxml index 0050091..25fdb7a 100644 --- a/apps/samples/websockets/websockets.scxml +++ b/apps/samples/websockets/websockets.scxml @@ -1,16 +1,39 @@ + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/apps/uscxml-browser.cpp b/apps/uscxml-browser.cpp index 8708e09..4188132 100644 --- a/apps/uscxml-browser.cpp +++ b/apps/uscxml-browser.cpp @@ -140,7 +140,7 @@ int main(int argc, char** argv) { sslConf->port = options.httpsPort; } - HTTPServer::getInstance(options.httpPort, sslConf); + HTTPServer::getInstance(options.httpPort, options.wsPort, sslConf); // instantiate and configure interpreters std::list interpreters; diff --git a/src/uscxml/CMakeLists.txt b/src/uscxml/CMakeLists.txt index b32eb81..7f53d51 100644 --- a/src/uscxml/CMakeLists.txt +++ b/src/uscxml/CMakeLists.txt @@ -10,7 +10,9 @@ source_group("Interpreter" FILES ${USCXML_SERVER}) list (APPEND USCXML_FILES ${USCXML_SERVER}) file(GLOB_RECURSE USCXML_UTIL - util/*.cpp + util/*.cpp + util/*.hpp + util/*.c util/*.h ) source_group("Interpreter" FILES ${USCXML_UTIL}) @@ -46,7 +48,8 @@ if(APPLE OR IOS) endif() file(GLOB USCXML_CORE - ${CMAKE_SOURCE_DIR}/contrib/src/jsmn/jsmn.c + ${CMAKE_SOURCE_DIR}/contrib/src/jsmn/jsmn.c + ${CMAKE_SOURCE_DIR}/contrib/src/evws/*.c *.cpp *.h ) diff --git a/src/uscxml/Factory.cpp b/src/uscxml/Factory.cpp index c836582..7677fd4 100644 --- a/src/uscxml/Factory.cpp +++ b/src/uscxml/Factory.cpp @@ -295,7 +295,11 @@ Factory::Factory() { registerIOProcessor(ioProcessor); } { - InterpreterServlet* ioProcessor = new InterpreterServlet(); + InterpreterHTTPServlet* ioProcessor = new InterpreterHTTPServlet(); + registerIOProcessor(ioProcessor); + } + { + InterpreterWebSocketServlet* ioProcessor = new InterpreterWebSocketServlet(); registerIOProcessor(ioProcessor); } { diff --git a/src/uscxml/Interpreter.cpp b/src/uscxml/Interpreter.cpp index d14fae4..1314986 100644 --- a/src/uscxml/Interpreter.cpp +++ b/src/uscxml/Interpreter.cpp @@ -102,6 +102,7 @@ void InterpreterOptions::printUsageAndExit(const char* progName) { printf("\t-lN : Set loglevel to N\n"); printf("\t-tN : port for HTTP server\n"); printf("\t-sN : port for HTTPS server\n"); + printf("\t-wN : port for WebSocket server\n"); printf("\n"); exit(1); } @@ -122,6 +123,7 @@ InterpreterOptions InterpreterOptions::fromCmdLine(int argc, char** argv) { {"dot", no_argument, 0, 'd'}, {"port", required_argument, 0, 't'}, {"ssl-port", required_argument, 0, 's'}, + {"ws-port", required_argument, 0, 'w'}, {"certificate", required_argument, 0, 'c'}, {"private-key", required_argument, 0, 0}, {"public-key", required_argument, 0, 0}, @@ -143,7 +145,7 @@ InterpreterOptions InterpreterOptions::fromCmdLine(int argc, char** argv) { int optionInd = 0; int option; for (;;) { - option = getopt_long_only(argc, argv, "+vdt:s:c:p:l:", longOptions, &optionInd); + option = getopt_long_only(argc, argv, "+vdt:s:w:c:p:l:", longOptions, &optionInd); if (option == -1) { if (optind == argc) // we are done with parsing @@ -192,6 +194,9 @@ InterpreterOptions InterpreterOptions::fromCmdLine(int argc, char** argv) { case 's': currOptions->httpsPort = strTo(optarg); break; + case 'w': + currOptions->wsPort = strTo(optarg); + break; case 'v': currOptions->verbose = true; break; @@ -1957,7 +1962,12 @@ void InterpreterImpl::setupIOProcessors() { if (iequals(ioProcIter->first, "http")) { // this is somewhat ugly - _httpServlet = static_cast(_ioProcessors[ioProcIter->first]._impl.get()); + _httpServlet = static_cast(_ioProcessors[ioProcIter->first]._impl.get()); + } + + if (iequals(ioProcIter->first, "websocket")) { + // this is somewhat ugly + _wsServlet = static_cast(_ioProcessors[ioProcIter->first]._impl.get()); } // register aliases diff --git a/src/uscxml/Interpreter.h b/src/uscxml/Interpreter.h index 4bccf5b..f192152 100644 --- a/src/uscxml/Interpreter.h +++ b/src/uscxml/Interpreter.h @@ -90,9 +90,11 @@ public: bool verbose; bool withHTTP; bool withHTTPS; + bool withWS; int logLevel; unsigned short httpPort; unsigned short httpsPort; + unsigned short wsPort; std::string pluginPath; std::string certificate; std::string privateKey; @@ -116,9 +118,11 @@ protected: verbose(false), withHTTP(true), withHTTPS(true), + withWS(true), logLevel(0), - httpPort(8080), - httpsPort(8443) + httpPort(0), + httpsPort(0), + wsPort(0) {} }; @@ -166,7 +170,7 @@ public: return _cmdLineOptions; } - InterpreterServlet* getHTTPServlet() { + InterpreterHTTPServlet* getHTTPServlet() { return _httpServlet; } @@ -352,7 +356,8 @@ protected: Event _currEvent; Factory* _factory; - InterpreterServlet* _httpServlet; + InterpreterHTTPServlet* _httpServlet; + InterpreterWebSocketServlet* _wsServlet; std::set _monitors; void executeContent(const Arabica::DOM::Node& content, bool rethrow = false); @@ -477,7 +482,7 @@ public: return _impl->getCmdLineOptions(); } - InterpreterServlet* getHTTPServlet() { + InterpreterHTTPServlet* getHTTPServlet() { return _impl->getHTTPServlet(); } diff --git a/src/uscxml/Message.h b/src/uscxml/Message.h index 44358be..5de9a56 100644 --- a/src/uscxml/Message.h +++ b/src/uscxml/Message.h @@ -37,8 +37,8 @@ #include "uscxml/Convenience.h" -#include "uscxml/util/MD5.h" -#include "uscxml/util/Base64.h" +#include "uscxml/util/MD5.hpp" +#include "uscxml/util/Base64.hpp" namespace uscxml { @@ -56,11 +56,11 @@ public: } std::string base64() { - return base64_encode((char* const)data, size); + return base64Encode((char* const)data, size); } Blob* fromBase64(const std::string base64) { - std::string decoded = base64_decode(base64); + std::string decoded = base64Decode(base64); return new Blob((void*)decoded.c_str(), decoded.length(), mimeType); } }; diff --git a/src/uscxml/plugins/element/respond/RespondElement.cpp b/src/uscxml/plugins/element/respond/RespondElement.cpp index a88d9c6..f415b19 100644 --- a/src/uscxml/plugins/element/respond/RespondElement.cpp +++ b/src/uscxml/plugins/element/respond/RespondElement.cpp @@ -55,7 +55,7 @@ void RespondElement::enterElement(const Arabica::DOM::Node& node) { std::string requestId = _interpreter->getDataModel().evalAsString(ATTR(node, "to")); // try to get the request object - InterpreterServlet* servlet = _interpreter->getHTTPServlet(); + InterpreterHTTPServlet* servlet = _interpreter->getHTTPServlet(); tthread::lock_guard lock(servlet->getMutex()); if (servlet->getRequests().find(requestId) == servlet->getRequests().end()) { @@ -65,7 +65,7 @@ void RespondElement::enterElement(const Arabica::DOM::Node& node) { assert(servlet->getRequests().find(requestId) != servlet->getRequests().end()); HTTPServer::Request httpReq = servlet->getRequests()[requestId]; - assert(httpReq.curlReq != NULL); + assert(httpReq.evhttpReq != NULL); HTTPServer::Reply httpReply(httpReq); servlet->getRequests().erase(requestId); diff --git a/src/uscxml/server/HTTPServer.cpp b/src/uscxml/server/HTTPServer.cpp index 6065740..04a831e 100644 --- a/src/uscxml/server/HTTPServer.cpp +++ b/src/uscxml/server/HTTPServer.cpp @@ -68,11 +68,16 @@ extern "C" { namespace uscxml { -HTTPServer::HTTPServer(unsigned short port, SSLConfig* sslConf) { +HTTPServer::HTTPServer(unsigned short port, unsigned short wsPort, SSLConfig* sslConf) { _port = port; _base = event_base_new(); _http = evhttp_new(_base); + _evws = evws_new(_base); _thread = NULL; + _httpHandle = NULL; + _wsHandle = NULL; + + determineAddress(); unsigned int allowedMethods = EVHTTP_REQ_GET | @@ -86,17 +91,22 @@ HTTPServer::HTTPServer(unsigned short port, SSLConfig* sslConf) { EVHTTP_REQ_PATCH; evhttp_set_allowed_methods(_http, allowedMethods); // allow all methods - - _handle = NULL; - while((_handle = evhttp_bind_socket_with_handle(_http, INADDR_ANY, _port)) == NULL) { - _port++; - } - determineAddress(); + + if (_port > 0) + _httpHandle = evhttp_bind_socket_with_handle(_http, INADDR_ANY, _port); + if (_httpHandle) + LOG(INFO) << "HTTP server listening on tcp/" << _port; + + _wsPort = wsPort; + if (_wsPort > 0) + _wsHandle = evws_bind_socket(_evws, _wsPort); + if (_wsHandle) + LOG(INFO) << "WebSocket server listening on tcp/" << _wsPort; #if (defined EVENT_SSL_FOUND && defined OPENSSL_FOUND && defined OPENSSL_HAS_ELIPTIC_CURVES) + _sslHandle = NULL; + _https = NULL; if (!sslConf) { - _https = NULL; - _sslHandle = NULL; _sslPort = 0; } else { _sslPort = sslConf->port; @@ -126,10 +136,11 @@ HTTPServer::HTTPServer(unsigned short port, SSLConfig* sslConf) { evhttp_set_bevcb(_https, sslBufferEventCallback, ctx); evhttp_set_gencb(_https, sslGeneralBufferEventCallback, NULL); - _sslHandle = NULL; - while((_sslHandle = evhttp_bind_socket_with_handle(_https, INADDR_ANY, _sslPort)) == NULL) { - _sslPort++; - } + if (_sslPort > 0) + _sslHandle = evhttp_bind_socket_with_handle(_https, INADDR_ANY, _sslPort); + if (_sslHandle) + LOG(INFO) << "HTTPS server listening on tcp/" << _wsPort; + } #endif @@ -137,6 +148,7 @@ HTTPServer::HTTPServer(unsigned short port, SSLConfig* sslConf) { // generic callback evhttp_set_gencb(_http, HTTPServer::httpRecvReqCallback, NULL); + evws_set_gencb(_evws, HTTPServer::wsRecvReqCallback, NULL); } HTTPServer::~HTTPServer() { @@ -148,7 +160,7 @@ HTTPServer::~HTTPServer() { HTTPServer* HTTPServer::_instance = NULL; tthread::recursive_mutex HTTPServer::_instanceMutex; -HTTPServer* HTTPServer::getInstance(unsigned short port, SSLConfig* sslConf) { +HTTPServer* HTTPServer::getInstance(unsigned short port, unsigned short wsPort, SSLConfig* sslConf) { // tthread::lock_guard lock(_instanceMutex); if (_instance == NULL) { #ifdef _WIN32 @@ -160,90 +172,60 @@ HTTPServer* HTTPServer::getInstance(unsigned short port, SSLConfig* sslConf) { #else evthread_use_windows_threads(); #endif - _instance = new HTTPServer(port, sslConf); + _instance = new HTTPServer(port, wsPort, sslConf); _instance->start(); } return _instance; } -#if (defined EVENT_SSL_FOUND && defined OPENSSL_FOUND && defined OPENSSL_HAS_ELIPTIC_CURVES) -// see https://github.com/ppelleti/https-example/blob/master/https-server.c -struct bufferevent* HTTPServer::sslBufferEventCallback(struct event_base *base, void *arg) { - struct bufferevent* r; - SSL_CTX *ctx = (SSL_CTX *) arg; - r = bufferevent_openssl_socket_new (base, - -1, - SSL_new (ctx), - BUFFEREVENT_SSL_ACCEPTING, - BEV_OPT_CLOSE_ON_FREE); - return r; -} - - -void HTTPServer::sslGeneralBufferEventCallback (struct evhttp_request *req, void *arg) { - struct evbuffer *evb = NULL; - const char *uri = evhttp_request_get_uri (req); - struct evhttp_uri *decoded = NULL; +/** + * This callback is registered for all data received on websockets + */ +void HTTPServer::wsRecvReqCallback(struct evws_connection *conn, struct evws_frame *frame, void *callbackData) { + WSFrame wsFrame; + wsFrame.evwsConn = conn; - /* We only handle POST requests. */ - if (evhttp_request_get_command (req) != EVHTTP_REQ_POST) { - evhttp_send_reply (req, 200, "OK", NULL); - return; + struct evws_header *header; + TAILQ_FOREACH(header, &conn->headers, next) { + wsFrame.data.compound["header"].compound[header->key] = Data(header->value, Data::VERBATIM); } - printf ("Got a POST request for <%s>\n", uri); - - /* Decode the URI */ - decoded = evhttp_uri_parse (uri); - if (! decoded) { - printf ("It's not a good URI. Sending BADREQUEST\n"); - evhttp_send_error (req, HTTP_BADREQUEST, 0); - return; + switch (frame->opcode) { + case EVWS_CONTINUATION_FRAME: + wsFrame.data.compound["type"] = Data("continuation", Data::VERBATIM); + wsFrame.data.compound["content"] = Data(std::string(frame->data, frame->size), Data::VERBATIM); + break; + case EVWS_TEXT_FRAME: + wsFrame.data.compound["type"] = Data("text", Data::VERBATIM); + wsFrame.data.compound["content"] = Data(std::string(frame->data, frame->size), Data::VERBATIM); + break; + case EVWS_BINARY_FRAME: + wsFrame.data.compound["type"] = Data("binary", Data::VERBATIM); + wsFrame.data.compound["content"] = Data(frame->data, frame->size, "application/octet-stream"); + break; + case EVWS_CONNECTION_CLOSE: + wsFrame.data.compound["type"] = Data("close", Data::VERBATIM); + break; + case EVWS_PING: + wsFrame.data.compound["type"] = Data("ping", Data::VERBATIM); + break; + case EVWS_PONG: + wsFrame.data.compound["type"] = Data("ping", Data::VERBATIM); + break; } - /* Decode the payload */ - struct evkeyvalq kv; - memset (&kv, 0, sizeof (kv)); - struct evbuffer *buf = evhttp_request_get_input_buffer (req); - evbuffer_add (buf, "", 1); /* NUL-terminate the buffer */ - char *payload = (char *) evbuffer_pullup (buf, -1); - if (0 != evhttp_parse_query_str (payload, &kv)) { - printf ("Malformed payload. Sending BADREQUEST\n"); - evhttp_send_error (req, HTTP_BADREQUEST, 0); - return; - } + wsFrame.data.compound["uri"] = Data(HTTPServer::getBaseURL(WebSockets) + conn->uri, Data::VERBATIM); + wsFrame.data.compound["path"] = Data(conn->uri, Data::VERBATIM); + + // try with the handler registered for path first + bool answered = false; + if (callbackData != NULL) + answered = ((WebSocketServlet*)callbackData)->wsRecvRequest(conn, wsFrame); + + if (!answered) + HTTPServer::getInstance()->processByMatchingServlet(conn, wsFrame); - /* Determine peer */ - char *peer_addr; - ev_uint16_t peer_port; - struct evhttp_connection *con = evhttp_request_get_connection (req); - evhttp_connection_get_peer (con, &peer_addr, &peer_port); - - /* Extract passcode */ - const char *passcode = evhttp_find_header (&kv, "passcode"); - char response[256]; - evutil_snprintf (response, sizeof (response), - "Hi %s! I %s your passcode.\n", peer_addr, - (0 == strcmp (passcode, "R23") - ? "liked" - : "didn't like")); - evhttp_clear_headers (&kv); /* to free memory held by kv */ - - /* This holds the content we're sending. */ - evb = evbuffer_new (); - - evhttp_add_header (evhttp_request_get_output_headers (req), - "Content-Type", "application/x-yaml"); - evbuffer_add (evb, response, strlen (response)); - - evhttp_send_reply (req, 200, "OK", evb); - - if (decoded) - evhttp_uri_free (decoded); - if (evb) - evbuffer_free (evb); } -#endif /** * This callback is registered for all HTTP requests @@ -251,9 +233,23 @@ void HTTPServer::sslGeneralBufferEventCallback (struct evhttp_request *req, void void HTTPServer::httpRecvReqCallback(struct evhttp_request *req, void *callbackData) { std::stringstream raw; + struct evkeyvalq *headers; + headers = evhttp_request_get_input_headers(req); + +#if 0 + // first of all, see whether this is a websocket request + const char* wsUpgrade = evhttp_find_header(headers, "Upgrade"); + const char* wsConnection = evhttp_find_header(headers, "Connection"); + if (wsUpgrade && wsConnection) { + if (iequals(wsUpgrade, "websocket") && iequals(wsConnection, "Upgrade")) { + // this is a websocket request! .. but we do not know how to decouple form evhttp + } + } +#endif + evhttp_request_own(req); Request request; - request.curlReq = req; + request.evhttpReq = req; switch (evhttp_request_get_command(req)) { case EVHTTP_REQ_GET: @@ -307,12 +303,10 @@ void HTTPServer::httpRecvReqCallback(struct evhttp_request *req, void *callbackD raw << " HTTP/" << request.data.compound["httpMajor"].atom << "." << request.data.compound["httpMinor"].atom; raw << std::endl; - struct evkeyvalq *headers; struct evkeyval *header; struct evbuffer *buf; // insert headers to event data - headers = evhttp_request_get_input_headers(req); for (header = headers->tqh_first; header; header = header->next.tqe_next) { request.data.compound["header"].compound[header->key] = Data(header->value, Data::VERBATIM); raw << header->key << ": " << header->value << std::endl; @@ -415,12 +409,12 @@ void HTTPServer::httpRecvReqCallback(struct evhttp_request *req, void *callbackD void HTTPServer::processByMatchingServlet(const Request& request) { tthread::lock_guard lock(_mutex); - servlet_iter_t servletIter = _servlets.begin(); + http_servlet_iter_t servletIter = _httpServlets.begin(); std::string actualPath = request.data.compound.at("path").atom; std::map matches; - while(servletIter != _servlets.end()) { + while(servletIter != _httpServlets.end()) { // is the servlet path a prefix of the actual path? std::string servletPath = "/" + servletIter->first; if (iequals(actualPath.substr(0, servletPath.length()), servletPath) && // servlet path is a prefix @@ -440,7 +434,35 @@ void HTTPServer::processByMatchingServlet(const Request& request) { } LOG(INFO) << "Got an HTTP request at " << actualPath << " but no servlet is registered there or at a prefix"; - evhttp_send_error(request.curlReq, 404, NULL); + evhttp_send_error(request.evhttpReq, 404, NULL); +} + +void HTTPServer::processByMatchingServlet(evws_connection* conn, const WSFrame& frame) { + tthread::lock_guard lock(_mutex); + + ws_servlet_iter_t servletIter = _wsServlets.begin(); + + std::string actualPath = frame.data.compound.at("path").atom; + std::map matches; + + while(servletIter != _wsServlets.end()) { + // is the servlet path a prefix of the actual path? + std::string servletPath = "/" + servletIter->first; + if (iequals(actualPath.substr(0, servletPath.length()), servletPath) && // servlet path is a prefix + iequals(actualPath.substr(servletPath.length(), 1), "/")) { // and next character is a '/' + matches.insert(std::make_pair(servletPath, servletIter->second)); + } + servletIter++; + } + + // process by best matching servlet until someone feels responsible + std::map::iterator matchesIter = matches.begin(); + while(matchesIter != matches.end()) { + if (matchesIter->second->wsRecvRequest(conn, frame)) { + return; + } + matchesIter++; + } } void HTTPServer::reply(const Reply& reply) { @@ -459,12 +481,12 @@ void HTTPServer::replyCallback(evutil_socket_t fd, short what, void *arg) { std::map::const_iterator headerIter = reply->headers.begin(); while(headerIter != reply->headers.end()) { - evhttp_add_header(evhttp_request_get_output_headers(reply->curlReq), headerIter->first.c_str(), headerIter->second.c_str()); + evhttp_add_header(evhttp_request_get_output_headers(reply->evhttpReq), headerIter->first.c_str(), headerIter->second.c_str()); headerIter++; } if (reply->status >= 400) { - evhttp_send_error(reply->curlReq, reply->status, NULL); + evhttp_send_error(reply->evhttpReq, reply->status, NULL); return; } @@ -475,7 +497,7 @@ void HTTPServer::replyCallback(evutil_socket_t fd, short what, void *arg) { evbuffer_add(evb, reply->content.data(), reply->content.size()); } - evhttp_send_reply(reply->curlReq, reply->status, NULL, evb); + evhttp_send_reply(reply->evhttpReq, reply->status, NULL, evb); if (evb != NULL) evbuffer_free(evb); @@ -497,7 +519,7 @@ bool HTTPServer::registerServlet(const std::string& path, HTTPServlet* servlet) // if this servlet allows to adapt the path, do so int i = 2; - while(INSTANCE->_servlets.find(suffixedPath) != INSTANCE->_servlets.end()) { + while(INSTANCE->_httpServlets.find(suffixedPath) != INSTANCE->_httpServlets.end()) { if (!servlet->canAdaptPath()) return false; std::stringstream ss; @@ -509,7 +531,7 @@ bool HTTPServer::registerServlet(const std::string& path, HTTPServlet* servlet) servletURL << "http://" << INSTANCE->_address << ":" << INSTANCE->_port << "/" << suffixedPath; servlet->setURL(servletURL.str()); - INSTANCE->_servlets[suffixedPath] = servlet; + INSTANCE->_httpServlets[suffixedPath] = servlet; // LOG(INFO) << "HTTP Servlet listening at: " << servletURL.str() << std::endl; @@ -519,21 +541,68 @@ bool HTTPServer::registerServlet(const std::string& path, HTTPServlet* servlet) return true; } -std::string HTTPServer::getBaseURL() { +bool HTTPServer::registerServlet(const std::string& path, WebSocketServlet* servlet) { HTTPServer* INSTANCE = getInstance(); + tthread::lock_guard lock(INSTANCE->_mutex); + + // remove trailing and leading slash + std::string actualPath = path; + if (boost::ends_with(actualPath, "/")) + actualPath = actualPath.substr(0, actualPath.size() - 1); + if (boost::starts_with(actualPath, "/")) + actualPath = actualPath.substr(1); + std::string suffixedPath = actualPath; + + // if this servlet allows to adapt the path, do so + int i = 2; + while(INSTANCE->_wsServlets.find(suffixedPath) != INSTANCE->_wsServlets.end()) { + if (!servlet->canAdaptPath()) + return false; + std::stringstream ss; + ss << actualPath << i++; + suffixedPath = ss.str(); + } + std::stringstream servletURL; - servletURL << "http://" << INSTANCE->_address << ":" << INSTANCE->_port; + servletURL << "ws://" << INSTANCE->_address << ":" << INSTANCE->_wsPort << "/" << suffixedPath; + servlet->setURL(servletURL.str()); + + INSTANCE->_wsServlets[suffixedPath] = servlet; + + // LOG(INFO) << "HTTP Servlet listening at: " << servletURL.str() << std::endl; + + // register callback + evws_set_cb(INSTANCE->_evws, ("/" + suffixedPath).c_str(), HTTPServer::wsRecvReqCallback, NULL, servlet); + + return true; +} + +std::string HTTPServer::getBaseURL(ServerType type) { + HTTPServer* INSTANCE = getInstance(); + std::stringstream servletURL; + + switch (type) { + case HTTP: + servletURL << "http://" << INSTANCE->_address << ":" << INSTANCE->_port; + break; + case HTTPS: + servletURL << "https://" << INSTANCE->_address << ":" << INSTANCE->_sslPort; + break; + case WebSockets: + servletURL << "ws://" << INSTANCE->_address << ":" << INSTANCE->_wsPort; + break; + } return servletURL.str(); } void HTTPServer::unregisterServlet(HTTPServlet* servlet) { HTTPServer* INSTANCE = getInstance(); tthread::lock_guard lock(INSTANCE->_mutex); - servlet_iter_t servletIter = INSTANCE->_servlets.begin(); - while(servletIter != INSTANCE->_servlets.end()) { + http_servlet_iter_t servletIter = INSTANCE->_httpServlets.begin(); + while(servletIter != INSTANCE->_httpServlets.end()) { if (servletIter->second == servlet) { evhttp_del_cb(INSTANCE->_http, std::string("/" + servletIter->first).c_str()); - INSTANCE->_servlets.erase(servletIter); + INSTANCE->_httpServlets.erase(servletIter); break; } servletIter++; @@ -559,4 +628,84 @@ void HTTPServer::determineAddress() { _address = std::string(hostname); } + +#if (defined EVENT_SSL_FOUND && defined OPENSSL_FOUND && defined OPENSSL_HAS_ELIPTIC_CURVES) + // see https://github.com/ppelleti/https-example/blob/master/https-server.c + struct bufferevent* HTTPServer::sslBufferEventCallback(struct event_base *base, void *arg) { + struct bufferevent* r; + SSL_CTX *ctx = (SSL_CTX *) arg; + r = bufferevent_openssl_socket_new (base, + -1, + SSL_new (ctx), + BUFFEREVENT_SSL_ACCEPTING, + BEV_OPT_CLOSE_ON_FREE); + return r; + } + + + void HTTPServer::sslGeneralBufferEventCallback (struct evhttp_request *req, void *arg) { + struct evbuffer *evb = NULL; + const char *uri = evhttp_request_get_uri (req); + struct evhttp_uri *decoded = NULL; + + /* We only handle POST requests. */ + if (evhttp_request_get_command (req) != EVHTTP_REQ_POST) { + evhttp_send_reply (req, 200, "OK", NULL); + return; + } + + printf ("Got a POST request for <%s>\n", uri); + + /* Decode the URI */ + decoded = evhttp_uri_parse (uri); + if (! decoded) { + printf ("It's not a good URI. Sending BADREQUEST\n"); + evhttp_send_error (req, HTTP_BADREQUEST, 0); + return; + } + + /* Decode the payload */ + struct evkeyvalq kv; + memset (&kv, 0, sizeof (kv)); + struct evbuffer *buf = evhttp_request_get_input_buffer (req); + evbuffer_add (buf, "", 1); /* NUL-terminate the buffer */ + char *payload = (char *) evbuffer_pullup (buf, -1); + if (0 != evhttp_parse_query_str (payload, &kv)) { + printf ("Malformed payload. Sending BADREQUEST\n"); + evhttp_send_error (req, HTTP_BADREQUEST, 0); + return; + } + + /* Determine peer */ + char *peer_addr; + ev_uint16_t peer_port; + struct evhttp_connection *con = evhttp_request_get_connection (req); + evhttp_connection_get_peer (con, &peer_addr, &peer_port); + + /* Extract passcode */ + const char *passcode = evhttp_find_header (&kv, "passcode"); + char response[256]; + evutil_snprintf (response, sizeof (response), + "Hi %s! I %s your passcode.\n", peer_addr, + (0 == strcmp (passcode, "R23") + ? "liked" + : "didn't like")); + evhttp_clear_headers (&kv); /* to free memory held by kv */ + + /* This holds the content we're sending. */ + evb = evbuffer_new (); + + evhttp_add_header (evhttp_request_get_output_headers (req), + "Content-Type", "application/x-yaml"); + evbuffer_add (evb, response, strlen (response)); + + evhttp_send_reply (req, 200, "OK", evb); + + if (decoded) + evhttp_uri_free (decoded); + if (evb) + evbuffer_free (evb); + } +#endif + } \ No newline at end of file diff --git a/src/uscxml/server/HTTPServer.h b/src/uscxml/server/HTTPServer.h index 049abc4..6f3c792 100644 --- a/src/uscxml/server/HTTPServer.h +++ b/src/uscxml/server/HTTPServer.h @@ -28,6 +28,7 @@ extern "C" { #include "event2/util.h" // for evutil_socket_t #include // for evhttp_request +#include } #include "uscxml/Common.h" // for USCXML_API @@ -38,20 +39,28 @@ extern "C" { namespace uscxml { class HTTPServlet; - +class WebSocketServlet; + class USCXML_API HTTPServer { public: class Request : public Event { public: - Request() : curlReq(NULL) {} + Request() : evhttpReq(NULL) {} std::string content; - struct evhttp_request* curlReq; + struct evhttp_request* evhttpReq; operator bool() { - return curlReq != NULL; + return evhttpReq != NULL; } }; + class WSFrame : public Event { + public: + WSFrame() : evwsConn(NULL) {} + std::string content; + struct evws_connection* evwsConn; + }; + class SSLConfig { public: SSLConfig() : port(8443) {} @@ -62,12 +71,12 @@ public: class Reply { public: - Reply(Request req) : status(200), type(req.data.compound["type"].atom), curlReq(req.curlReq) {} + Reply(Request req) : status(200), type(req.data.compound["type"].atom), evhttpReq(req.evhttpReq) {} int status; std::string type; std::map headers; std::string content; - struct evhttp_request* curlReq; + struct evhttp_request* evhttpReq; }; struct CallbackData { @@ -75,13 +84,26 @@ public: evhttp_request* httpReq; }; - static HTTPServer* getInstance(unsigned short port = 8080, SSLConfig* sslConf = NULL); - static std::string getBaseURL(); + enum ServerType { + HTTPS, + HTTP, + WebSockets + }; + + static HTTPServer* getInstance(unsigned short port, unsigned short wsPort, SSLConfig* sslConf = NULL); + static HTTPServer* getInstance() { + return getInstance(0, 0, NULL); + } + + static std::string getBaseURL(ServerType type = HTTP); static void reply(const Reply& reply); static bool registerServlet(const std::string& path, HTTPServlet* servlet); ///< Register a servlet, returns false if path is already taken static void unregisterServlet(HTTPServlet* servlet); + static bool registerServlet(const std::string& path, WebSocketServlet* servlet); ///< Register a servlet, returns false if path is already taken + static void unregisterServlet(WebSocketServlet* servlet); + private: struct comp_strsize_less { bool operator()(std::string const& l, std::string const& r) const { @@ -92,7 +114,7 @@ private: }; }; - HTTPServer(unsigned short port, SSLConfig* sslConf = NULL); + HTTPServer(unsigned short port, unsigned short wsPort, SSLConfig* sslConf = NULL); ~HTTPServer(); void start(); @@ -103,17 +125,27 @@ private: static void replyCallback(evutil_socket_t fd, short what, void *arg); static void httpRecvReqCallback(struct evhttp_request *req, void *callbackData); + static void wsRecvReqCallback(struct evws_connection *conn, struct evws_frame *, void *callbackData); + void processByMatchingServlet(const Request& request); + void processByMatchingServlet(evws_connection* conn, const WSFrame& frame); static std::map mimeTypes; - std::map _servlets; - typedef std::map::iterator servlet_iter_t; + std::map _httpServlets; + typedef std::map::iterator http_servlet_iter_t; + + std::map _wsServlets; + typedef std::map::iterator ws_servlet_iter_t; struct event_base* _base; struct evhttp* _http; - struct evhttp_bound_socket* _handle; - + struct evws* _evws; + + struct evhttp_bound_socket* _httpHandle; + evutil_socket_t _wsHandle; + unsigned short _port; + unsigned short _wsPort; std::string _address; static HTTPServer* _instance; @@ -124,6 +156,7 @@ private: bool _isRunning; friend class HTTPServlet; + friend class WebSocketServlet; #if (defined EVENT_SSL_FOUND && defined OPENSSL_FOUND && defined OPENSSL_HAS_ELIPTIC_CURVES) struct evhttp* _https; @@ -144,6 +177,18 @@ public: } }; +class USCXML_API WebSocketServlet { +public: + virtual bool wsRecvRequest(struct evws_connection *conn, const HTTPServer::WSFrame& frame) = 0; + virtual void setURL(const std::string& url) = 0; /// Called by the server with the actual URL + virtual bool canAdaptPath() { + return true; + } + struct evws* getWSBase() { + return HTTPServer::getInstance()->_evws; + } +}; + } #endif /* end of include guard: HTTPSERVER_H_AIH108EG */ diff --git a/src/uscxml/server/InterpreterServlet.cpp b/src/uscxml/server/InterpreterServlet.cpp index beaa364..8218669 100644 --- a/src/uscxml/server/InterpreterServlet.cpp +++ b/src/uscxml/server/InterpreterServlet.cpp @@ -23,7 +23,7 @@ namespace uscxml { -InterpreterServlet::InterpreterServlet(InterpreterImpl* interpreter) { +InterpreterHTTPServlet::InterpreterHTTPServlet(InterpreterImpl* interpreter) { _interpreter = interpreter; std::stringstream path; @@ -37,23 +37,23 @@ InterpreterServlet::InterpreterServlet(InterpreterImpl* interpreter) { _path = path.str(); } -boost::shared_ptr InterpreterServlet::create(InterpreterImpl* interpreter) { +boost::shared_ptr InterpreterHTTPServlet::create(InterpreterImpl* interpreter) { // we instantiate directly in Interpreter - boost::shared_ptr io = boost::shared_ptr(new InterpreterServlet(interpreter)); + boost::shared_ptr io = boost::shared_ptr(new InterpreterHTTPServlet(interpreter)); return io; } -bool InterpreterServlet::httpRecvRequest(const HTTPServer::Request& req) { +bool InterpreterHTTPServlet::httpRecvRequest(const HTTPServer::Request& req) { tthread::lock_guard lock(_mutex); // evhttp_request_own(req.curlReq); - _requests[toStr((uintptr_t)req.curlReq)] = req; + _requests[toStr((uintptr_t)req.evhttpReq)] = req; Event event = req; event.name = "http." + event.data.compound["type"].atom; - event.origin = toStr((uintptr_t)req.curlReq); + event.origin = toStr((uintptr_t)req.evhttpReq); if (event.data.compound["content"]) { if (event.data.compound["content"].compound.size() > 0) { @@ -74,15 +74,135 @@ bool InterpreterServlet::httpRecvRequest(const HTTPServer::Request& req) { return true; } -Data InterpreterServlet::getDataModelVariables() { +Data InterpreterHTTPServlet::getDataModelVariables() { Data data; assert(_url.length() > 0); data.compound["location"] = Data(_url, Data::VERBATIM); return data; } -void InterpreterServlet::send(const SendRequest& req) { +void InterpreterHTTPServlet::send(const SendRequest& req) { LOG(ERROR) << "send not supported by http iorprocessor, use the fetch element"; } + + +InterpreterWebSocketServlet::InterpreterWebSocketServlet(InterpreterImpl* interpreter) { + _interpreter = interpreter; + + std::stringstream path; + path << _interpreter->getName(); + int i = 2; + while(!HTTPServer::registerServlet(path.str(), this)) { + path.clear(); + path.str(); + path << _interpreter->getName() << i++; + } + _path = path.str(); +} + +boost::shared_ptr InterpreterWebSocketServlet::create(InterpreterImpl* interpreter) { + // we instantiate directly in Interpreter + boost::shared_ptr io = boost::shared_ptr(new InterpreterWebSocketServlet(interpreter)); + return io; +} + +bool InterpreterWebSocketServlet::wsRecvRequest(struct evws_connection *conn, const HTTPServer::WSFrame& frame) { + tthread::lock_guard lock(_mutex); + + // evhttp_request_own(req.curlReq); + + _requests[toStr((uintptr_t)conn)] = conn; + + Event event = frame; + + event.name = "ws." + event.data.compound["type"].atom; + event.origin = toStr((uintptr_t)conn); + + if (event.data.compound["type"].atom.compare("text") == 0 && event.data.compound["content"]) { + if (event.data.compound["content"].compound.size() > 0) { + std::map::iterator compoundIter = event.data.compound["content"].compound.begin(); + while(compoundIter != event.data.compound["content"].compound.end()) { + Data json = Data::fromJSON(compoundIter->second.atom); + if (json) { + compoundIter->second = json; + } + compoundIter++; + } + } + } + + _interpreter->receive(event); + return true; +} + +Data InterpreterWebSocketServlet::getDataModelVariables() { + Data data; + assert(_url.length() > 0); + data.compound["location"] = Data(_url, Data::VERBATIM); + return data; +} + +void InterpreterWebSocketServlet::send(const SendRequest& req) { + + if (!req.data) { + LOG(WARNING) << "No content given to send on websocket!"; + return; + } + + if (_requests.find(req.target) != _requests.end()) { + // send data to the given connection + if (false) { + } else if (req.data.binary) { + evws_send_data(_requests[req.target], + EVWS_BINARY_FRAME, + req.data.binary->data, + req.data.binary->size); + } else if (req.data.node) { + std::stringstream ssXML; + ssXML << req.data.node; + std::string data = ssXML.str(); + evws_send_data(_requests[req.target], + EVWS_TEXT_FRAME, + data.c_str(), + data.length()); + } else if (req.data) { + std::string data = Data::toJSON(req.data); + evws_send_data(_requests[req.target], + EVWS_TEXT_FRAME, + data.c_str(), + data.length()); + } else { + LOG(WARNING) << "Not sure what to make off content given to send on websocket!"; + } + } else if(req.target.size() && req.target.compare(0, 1, "/") == 0) { + // broadcast to the given path + if (false) { + } else if (req.data.binary) { + evws_broadcast(getWSBase(), req.target.c_str(), + EVWS_BINARY_FRAME, + req.data.binary->data, + req.data.binary->size); + } else if (req.data.node) { + std::stringstream ssXML; + ssXML << req.data.node; + std::string data = ssXML.str(); + evws_broadcast(getWSBase(), req.target.c_str(), + EVWS_TEXT_FRAME, + data.c_str(), + data.length()); + } else if (req.data) { + std::string data = Data::toJSON(req.data); + evws_broadcast(getWSBase(), req.target.c_str(), + EVWS_TEXT_FRAME, + data.c_str(), + data.length()); + } else { + LOG(WARNING) << "Not sure what to make off content given to broadcast on websocket!"; + } + } else { + LOG(WARNING) << "Invalid target for websocket"; + } +} + } \ No newline at end of file diff --git a/src/uscxml/server/InterpreterServlet.h b/src/uscxml/server/InterpreterServlet.h index 55b97bd..46cc737 100644 --- a/src/uscxml/server/InterpreterServlet.h +++ b/src/uscxml/server/InterpreterServlet.h @@ -27,11 +27,11 @@ namespace uscxml { class Interpreter; -class InterpreterServlet : public HTTPServlet, public IOProcessorImpl { +class InterpreterHTTPServlet : public HTTPServlet, public IOProcessorImpl { public: - InterpreterServlet() {}; - InterpreterServlet(InterpreterImpl* interpreter); - virtual ~InterpreterServlet() {} + InterpreterHTTPServlet() {}; + InterpreterHTTPServlet(InterpreterImpl* interpreter); + virtual ~InterpreterHTTPServlet() {} virtual boost::shared_ptr create(InterpreterImpl* interpreter); @@ -78,6 +78,56 @@ protected: }; +class InterpreterWebSocketServlet : public WebSocketServlet, public IOProcessorImpl { +public: + InterpreterWebSocketServlet() {}; + InterpreterWebSocketServlet(InterpreterImpl* interpreter); + virtual ~InterpreterWebSocketServlet() {} + + virtual boost::shared_ptr create(InterpreterImpl* interpreter); + + virtual std::set getNames() { + std::set names; + names.insert("websocket"); + names.insert("http://www.w3.org/TR/scxml/#WebSocketEventProcessor"); + return names; + } + + Data getDataModelVariables(); + virtual void send(const SendRequest& req); + + virtual bool wsRecvRequest(struct evws_connection *conn, const HTTPServer::WSFrame& frame); + + std::string getPath() { + return _path; + } + std::string getURL() { + return _url; + } + void setURL(const std::string& url) { + _url = url; + } + bool canAdaptPath() { + return false; + } + + std::map& getRequests() { + return _requests; + } + tthread::recursive_mutex& getMutex() { + return _mutex; + } + +protected: + InterpreterImpl* _interpreter; + + tthread::recursive_mutex _mutex; + std::map _requests; + std::string _path; + std::string _url; + +}; + } diff --git a/src/uscxml/server/WebSocketServer.cpp b/src/uscxml/server/WebSocketServer.cpp deleted file mode 100644 index e69de29..0000000 diff --git a/src/uscxml/server/WebSocketServer.h b/src/uscxml/server/WebSocketServer.h deleted file mode 100644 index 438b932..0000000 --- a/src/uscxml/server/WebSocketServer.h +++ /dev/null @@ -1,8 +0,0 @@ -#ifndef WEBSOCKETSERVER_H_FG7908O3 -#define WEBSOCKETSERVER_H_FG7908O3 - -namespace uscxml { - -} - -#endif /* end of include guard: WEBSOCKETSERVER_H_FG7908O3 */ -- cgit v0.12