summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorStefan Radomski <radomski@tk.informatik.tu-darmstadt.de>2013-11-16 00:44:33 (GMT)
committerStefan Radomski <radomski@tk.informatik.tu-darmstadt.de>2013-11-16 00:44:33 (GMT)
commit69d34f5598974dddeab14e0fe9169108362622ec (patch)
tree17fbdbfc28dd11bc8852b697a262d63b9c76df38 /src
parentf33e8148fc6e214886a8b24fc9d743ee25da7d61 (diff)
downloaduscxml-69d34f5598974dddeab14e0fe9169108362622ec.zip
uscxml-69d34f5598974dddeab14e0fe9169108362622ec.tar.gz
uscxml-69d34f5598974dddeab14e0fe9169108362622ec.tar.bz2
WebSockets!
Diffstat (limited to 'src')
-rw-r--r--src/uscxml/CMakeLists.txt7
-rw-r--r--src/uscxml/Factory.cpp6
-rw-r--r--src/uscxml/Interpreter.cpp14
-rw-r--r--src/uscxml/Interpreter.h15
-rw-r--r--src/uscxml/Message.h8
-rw-r--r--src/uscxml/plugins/element/respond/RespondElement.cpp4
-rw-r--r--src/uscxml/server/HTTPServer.cpp353
-rw-r--r--src/uscxml/server/HTTPServer.h71
-rw-r--r--src/uscxml/server/InterpreterServlet.cpp136
-rw-r--r--src/uscxml/server/InterpreterServlet.h58
-rw-r--r--src/uscxml/server/WebSocketServer.cpp0
-rw-r--r--src/uscxml/server/WebSocketServer.h8
12 files changed, 529 insertions, 151 deletions
diff --git a/src/uscxml/CMakeLists.txt b/src/uscxml/CMakeLists.txt
index b32eb81..7f53d51 100644
--- a/src/uscxml/CMakeLists.txt
+++ b/src/uscxml/CMakeLists.txt
@@ -10,7 +10,9 @@ source_group("Interpreter" FILES ${USCXML_SERVER})
list (APPEND USCXML_FILES ${USCXML_SERVER})
file(GLOB_RECURSE USCXML_UTIL
- util/*.cpp
+ util/*.cpp
+ util/*.hpp
+ util/*.c
util/*.h
)
source_group("Interpreter" FILES ${USCXML_UTIL})
@@ -46,7 +48,8 @@ if(APPLE OR IOS)
endif()
file(GLOB USCXML_CORE
- ${CMAKE_SOURCE_DIR}/contrib/src/jsmn/jsmn.c
+ ${CMAKE_SOURCE_DIR}/contrib/src/jsmn/jsmn.c
+ ${CMAKE_SOURCE_DIR}/contrib/src/evws/*.c
*.cpp
*.h
)
diff --git a/src/uscxml/Factory.cpp b/src/uscxml/Factory.cpp
index c836582..7677fd4 100644
--- a/src/uscxml/Factory.cpp
+++ b/src/uscxml/Factory.cpp
@@ -295,7 +295,11 @@ Factory::Factory() {
registerIOProcessor(ioProcessor);
}
{
- InterpreterServlet* ioProcessor = new InterpreterServlet();
+ InterpreterHTTPServlet* ioProcessor = new InterpreterHTTPServlet();
+ registerIOProcessor(ioProcessor);
+ }
+ {
+ InterpreterWebSocketServlet* ioProcessor = new InterpreterWebSocketServlet();
registerIOProcessor(ioProcessor);
}
{
diff --git a/src/uscxml/Interpreter.cpp b/src/uscxml/Interpreter.cpp
index d14fae4..1314986 100644
--- a/src/uscxml/Interpreter.cpp
+++ b/src/uscxml/Interpreter.cpp
@@ -102,6 +102,7 @@ void InterpreterOptions::printUsageAndExit(const char* progName) {
printf("\t-lN : Set loglevel to N\n");
printf("\t-tN : port for HTTP server\n");
printf("\t-sN : port for HTTPS server\n");
+ printf("\t-wN : port for WebSocket server\n");
printf("\n");
exit(1);
}
@@ -122,6 +123,7 @@ InterpreterOptions InterpreterOptions::fromCmdLine(int argc, char** argv) {
{"dot", no_argument, 0, 'd'},
{"port", required_argument, 0, 't'},
{"ssl-port", required_argument, 0, 's'},
+ {"ws-port", required_argument, 0, 'w'},
{"certificate", required_argument, 0, 'c'},
{"private-key", required_argument, 0, 0},
{"public-key", required_argument, 0, 0},
@@ -143,7 +145,7 @@ InterpreterOptions InterpreterOptions::fromCmdLine(int argc, char** argv) {
int optionInd = 0;
int option;
for (;;) {
- option = getopt_long_only(argc, argv, "+vdt:s:c:p:l:", longOptions, &optionInd);
+ option = getopt_long_only(argc, argv, "+vdt:s:w:c:p:l:", longOptions, &optionInd);
if (option == -1) {
if (optind == argc)
// we are done with parsing
@@ -192,6 +194,9 @@ InterpreterOptions InterpreterOptions::fromCmdLine(int argc, char** argv) {
case 's':
currOptions->httpsPort = strTo<unsigned short>(optarg);
break;
+ case 'w':
+ currOptions->wsPort = strTo<unsigned short>(optarg);
+ break;
case 'v':
currOptions->verbose = true;
break;
@@ -1957,7 +1962,12 @@ void InterpreterImpl::setupIOProcessors() {
if (iequals(ioProcIter->first, "http")) {
// this is somewhat ugly
- _httpServlet = static_cast<InterpreterServlet*>(_ioProcessors[ioProcIter->first]._impl.get());
+ _httpServlet = static_cast<InterpreterHTTPServlet*>(_ioProcessors[ioProcIter->first]._impl.get());
+ }
+
+ if (iequals(ioProcIter->first, "websocket")) {
+ // this is somewhat ugly
+ _wsServlet = static_cast<InterpreterWebSocketServlet*>(_ioProcessors[ioProcIter->first]._impl.get());
}
// register aliases
diff --git a/src/uscxml/Interpreter.h b/src/uscxml/Interpreter.h
index 4bccf5b..f192152 100644
--- a/src/uscxml/Interpreter.h
+++ b/src/uscxml/Interpreter.h
@@ -90,9 +90,11 @@ public:
bool verbose;
bool withHTTP;
bool withHTTPS;
+ bool withWS;
int logLevel;
unsigned short httpPort;
unsigned short httpsPort;
+ unsigned short wsPort;
std::string pluginPath;
std::string certificate;
std::string privateKey;
@@ -116,9 +118,11 @@ protected:
verbose(false),
withHTTP(true),
withHTTPS(true),
+ withWS(true),
logLevel(0),
- httpPort(8080),
- httpsPort(8443)
+ httpPort(0),
+ httpsPort(0),
+ wsPort(0)
{}
};
@@ -166,7 +170,7 @@ public:
return _cmdLineOptions;
}
- InterpreterServlet* getHTTPServlet() {
+ InterpreterHTTPServlet* getHTTPServlet() {
return _httpServlet;
}
@@ -352,7 +356,8 @@ protected:
Event _currEvent;
Factory* _factory;
- InterpreterServlet* _httpServlet;
+ InterpreterHTTPServlet* _httpServlet;
+ InterpreterWebSocketServlet* _wsServlet;
std::set<InterpreterMonitor*> _monitors;
void executeContent(const Arabica::DOM::Node<std::string>& content, bool rethrow = false);
@@ -477,7 +482,7 @@ public:
return _impl->getCmdLineOptions();
}
- InterpreterServlet* getHTTPServlet() {
+ InterpreterHTTPServlet* getHTTPServlet() {
return _impl->getHTTPServlet();
}
diff --git a/src/uscxml/Message.h b/src/uscxml/Message.h
index 44358be..5de9a56 100644
--- a/src/uscxml/Message.h
+++ b/src/uscxml/Message.h
@@ -37,8 +37,8 @@
#include "uscxml/Convenience.h"
-#include "uscxml/util/MD5.h"
-#include "uscxml/util/Base64.h"
+#include "uscxml/util/MD5.hpp"
+#include "uscxml/util/Base64.hpp"
namespace uscxml {
@@ -56,11 +56,11 @@ public:
}
std::string base64() {
- return base64_encode((char* const)data, size);
+ return base64Encode((char* const)data, size);
}
Blob* fromBase64(const std::string base64) {
- std::string decoded = base64_decode(base64);
+ std::string decoded = base64Decode(base64);
return new Blob((void*)decoded.c_str(), decoded.length(), mimeType);
}
};
diff --git a/src/uscxml/plugins/element/respond/RespondElement.cpp b/src/uscxml/plugins/element/respond/RespondElement.cpp
index a88d9c6..f415b19 100644
--- a/src/uscxml/plugins/element/respond/RespondElement.cpp
+++ b/src/uscxml/plugins/element/respond/RespondElement.cpp
@@ -55,7 +55,7 @@ void RespondElement::enterElement(const Arabica::DOM::Node<std::string>& node) {
std::string requestId = _interpreter->getDataModel().evalAsString(ATTR(node, "to"));
// try to get the request object
- InterpreterServlet* servlet = _interpreter->getHTTPServlet();
+ InterpreterHTTPServlet* servlet = _interpreter->getHTTPServlet();
tthread::lock_guard<tthread::recursive_mutex> lock(servlet->getMutex());
if (servlet->getRequests().find(requestId) == servlet->getRequests().end()) {
@@ -65,7 +65,7 @@ void RespondElement::enterElement(const Arabica::DOM::Node<std::string>& node) {
assert(servlet->getRequests().find(requestId) != servlet->getRequests().end());
HTTPServer::Request httpReq = servlet->getRequests()[requestId];
- assert(httpReq.curlReq != NULL);
+ assert(httpReq.evhttpReq != NULL);
HTTPServer::Reply httpReply(httpReq);
servlet->getRequests().erase(requestId);
diff --git a/src/uscxml/server/HTTPServer.cpp b/src/uscxml/server/HTTPServer.cpp
index 6065740..04a831e 100644
--- a/src/uscxml/server/HTTPServer.cpp
+++ b/src/uscxml/server/HTTPServer.cpp
@@ -68,11 +68,16 @@ extern "C" {
namespace uscxml {
-HTTPServer::HTTPServer(unsigned short port, SSLConfig* sslConf) {
+HTTPServer::HTTPServer(unsigned short port, unsigned short wsPort, SSLConfig* sslConf) {
_port = port;
_base = event_base_new();
_http = evhttp_new(_base);
+ _evws = evws_new(_base);
_thread = NULL;
+ _httpHandle = NULL;
+ _wsHandle = NULL;
+
+ determineAddress();
unsigned int allowedMethods =
EVHTTP_REQ_GET |
@@ -86,17 +91,22 @@ HTTPServer::HTTPServer(unsigned short port, SSLConfig* sslConf) {
EVHTTP_REQ_PATCH;
evhttp_set_allowed_methods(_http, allowedMethods); // allow all methods
-
- _handle = NULL;
- while((_handle = evhttp_bind_socket_with_handle(_http, INADDR_ANY, _port)) == NULL) {
- _port++;
- }
- determineAddress();
+
+ if (_port > 0)
+ _httpHandle = evhttp_bind_socket_with_handle(_http, INADDR_ANY, _port);
+ if (_httpHandle)
+ LOG(INFO) << "HTTP server listening on tcp/" << _port;
+
+ _wsPort = wsPort;
+ if (_wsPort > 0)
+ _wsHandle = evws_bind_socket(_evws, _wsPort);
+ if (_wsHandle)
+ LOG(INFO) << "WebSocket server listening on tcp/" << _wsPort;
#if (defined EVENT_SSL_FOUND && defined OPENSSL_FOUND && defined OPENSSL_HAS_ELIPTIC_CURVES)
+ _sslHandle = NULL;
+ _https = NULL;
if (!sslConf) {
- _https = NULL;
- _sslHandle = NULL;
_sslPort = 0;
} else {
_sslPort = sslConf->port;
@@ -126,10 +136,11 @@ HTTPServer::HTTPServer(unsigned short port, SSLConfig* sslConf) {
evhttp_set_bevcb(_https, sslBufferEventCallback, ctx);
evhttp_set_gencb(_https, sslGeneralBufferEventCallback, NULL);
- _sslHandle = NULL;
- while((_sslHandle = evhttp_bind_socket_with_handle(_https, INADDR_ANY, _sslPort)) == NULL) {
- _sslPort++;
- }
+ if (_sslPort > 0)
+ _sslHandle = evhttp_bind_socket_with_handle(_https, INADDR_ANY, _sslPort);
+ if (_sslHandle)
+ LOG(INFO) << "HTTPS server listening on tcp/" << _wsPort;
+
}
#endif
@@ -137,6 +148,7 @@ HTTPServer::HTTPServer(unsigned short port, SSLConfig* sslConf) {
// generic callback
evhttp_set_gencb(_http, HTTPServer::httpRecvReqCallback, NULL);
+ evws_set_gencb(_evws, HTTPServer::wsRecvReqCallback, NULL);
}
HTTPServer::~HTTPServer() {
@@ -148,7 +160,7 @@ HTTPServer::~HTTPServer() {
HTTPServer* HTTPServer::_instance = NULL;
tthread::recursive_mutex HTTPServer::_instanceMutex;
-HTTPServer* HTTPServer::getInstance(unsigned short port, SSLConfig* sslConf) {
+HTTPServer* HTTPServer::getInstance(unsigned short port, unsigned short wsPort, SSLConfig* sslConf) {
// tthread::lock_guard<tthread::recursive_mutex> lock(_instanceMutex);
if (_instance == NULL) {
#ifdef _WIN32
@@ -160,90 +172,60 @@ HTTPServer* HTTPServer::getInstance(unsigned short port, SSLConfig* sslConf) {
#else
evthread_use_windows_threads();
#endif
- _instance = new HTTPServer(port, sslConf);
+ _instance = new HTTPServer(port, wsPort, sslConf);
_instance->start();
}
return _instance;
}
-#if (defined EVENT_SSL_FOUND && defined OPENSSL_FOUND && defined OPENSSL_HAS_ELIPTIC_CURVES)
-// see https://github.com/ppelleti/https-example/blob/master/https-server.c
-struct bufferevent* HTTPServer::sslBufferEventCallback(struct event_base *base, void *arg) {
- struct bufferevent* r;
- SSL_CTX *ctx = (SSL_CTX *) arg;
- r = bufferevent_openssl_socket_new (base,
- -1,
- SSL_new (ctx),
- BUFFEREVENT_SSL_ACCEPTING,
- BEV_OPT_CLOSE_ON_FREE);
- return r;
-}
-
-
-void HTTPServer::sslGeneralBufferEventCallback (struct evhttp_request *req, void *arg) {
- struct evbuffer *evb = NULL;
- const char *uri = evhttp_request_get_uri (req);
- struct evhttp_uri *decoded = NULL;
+/**
+ * This callback is registered for all data received on websockets
+ */
+void HTTPServer::wsRecvReqCallback(struct evws_connection *conn, struct evws_frame *frame, void *callbackData) {
+ WSFrame wsFrame;
+ wsFrame.evwsConn = conn;
- /* We only handle POST requests. */
- if (evhttp_request_get_command (req) != EVHTTP_REQ_POST) {
- evhttp_send_reply (req, 200, "OK", NULL);
- return;
+ struct evws_header *header;
+ TAILQ_FOREACH(header, &conn->headers, next) {
+ wsFrame.data.compound["header"].compound[header->key] = Data(header->value, Data::VERBATIM);
}
- printf ("Got a POST request for <%s>\n", uri);
-
- /* Decode the URI */
- decoded = evhttp_uri_parse (uri);
- if (! decoded) {
- printf ("It's not a good URI. Sending BADREQUEST\n");
- evhttp_send_error (req, HTTP_BADREQUEST, 0);
- return;
+ switch (frame->opcode) {
+ case EVWS_CONTINUATION_FRAME:
+ wsFrame.data.compound["type"] = Data("continuation", Data::VERBATIM);
+ wsFrame.data.compound["content"] = Data(std::string(frame->data, frame->size), Data::VERBATIM);
+ break;
+ case EVWS_TEXT_FRAME:
+ wsFrame.data.compound["type"] = Data("text", Data::VERBATIM);
+ wsFrame.data.compound["content"] = Data(std::string(frame->data, frame->size), Data::VERBATIM);
+ break;
+ case EVWS_BINARY_FRAME:
+ wsFrame.data.compound["type"] = Data("binary", Data::VERBATIM);
+ wsFrame.data.compound["content"] = Data(frame->data, frame->size, "application/octet-stream");
+ break;
+ case EVWS_CONNECTION_CLOSE:
+ wsFrame.data.compound["type"] = Data("close", Data::VERBATIM);
+ break;
+ case EVWS_PING:
+ wsFrame.data.compound["type"] = Data("ping", Data::VERBATIM);
+ break;
+ case EVWS_PONG:
+ wsFrame.data.compound["type"] = Data("ping", Data::VERBATIM);
+ break;
}
- /* Decode the payload */
- struct evkeyvalq kv;
- memset (&kv, 0, sizeof (kv));
- struct evbuffer *buf = evhttp_request_get_input_buffer (req);
- evbuffer_add (buf, "", 1); /* NUL-terminate the buffer */
- char *payload = (char *) evbuffer_pullup (buf, -1);
- if (0 != evhttp_parse_query_str (payload, &kv)) {
- printf ("Malformed payload. Sending BADREQUEST\n");
- evhttp_send_error (req, HTTP_BADREQUEST, 0);
- return;
- }
+ wsFrame.data.compound["uri"] = Data(HTTPServer::getBaseURL(WebSockets) + conn->uri, Data::VERBATIM);
+ wsFrame.data.compound["path"] = Data(conn->uri, Data::VERBATIM);
+
+ // try with the handler registered for path first
+ bool answered = false;
+ if (callbackData != NULL)
+ answered = ((WebSocketServlet*)callbackData)->wsRecvRequest(conn, wsFrame);
+
+ if (!answered)
+ HTTPServer::getInstance()->processByMatchingServlet(conn, wsFrame);
- /* Determine peer */
- char *peer_addr;
- ev_uint16_t peer_port;
- struct evhttp_connection *con = evhttp_request_get_connection (req);
- evhttp_connection_get_peer (con, &peer_addr, &peer_port);
-
- /* Extract passcode */
- const char *passcode = evhttp_find_header (&kv, "passcode");
- char response[256];
- evutil_snprintf (response, sizeof (response),
- "Hi %s! I %s your passcode.\n", peer_addr,
- (0 == strcmp (passcode, "R23")
- ? "liked"
- : "didn't like"));
- evhttp_clear_headers (&kv); /* to free memory held by kv */
-
- /* This holds the content we're sending. */
- evb = evbuffer_new ();
-
- evhttp_add_header (evhttp_request_get_output_headers (req),
- "Content-Type", "application/x-yaml");
- evbuffer_add (evb, response, strlen (response));
-
- evhttp_send_reply (req, 200, "OK", evb);
-
- if (decoded)
- evhttp_uri_free (decoded);
- if (evb)
- evbuffer_free (evb);
}
-#endif
/**
* This callback is registered for all HTTP requests
@@ -251,9 +233,23 @@ void HTTPServer::sslGeneralBufferEventCallback (struct evhttp_request *req, void
void HTTPServer::httpRecvReqCallback(struct evhttp_request *req, void *callbackData) {
std::stringstream raw;
+ struct evkeyvalq *headers;
+ headers = evhttp_request_get_input_headers(req);
+
+#if 0
+ // first of all, see whether this is a websocket request
+ const char* wsUpgrade = evhttp_find_header(headers, "Upgrade");
+ const char* wsConnection = evhttp_find_header(headers, "Connection");
+ if (wsUpgrade && wsConnection) {
+ if (iequals(wsUpgrade, "websocket") && iequals(wsConnection, "Upgrade")) {
+ // this is a websocket request! .. but we do not know how to decouple form evhttp
+ }
+ }
+#endif
+
evhttp_request_own(req);
Request request;
- request.curlReq = req;
+ request.evhttpReq = req;
switch (evhttp_request_get_command(req)) {
case EVHTTP_REQ_GET:
@@ -307,12 +303,10 @@ void HTTPServer::httpRecvReqCallback(struct evhttp_request *req, void *callbackD
raw << " HTTP/" << request.data.compound["httpMajor"].atom << "." << request.data.compound["httpMinor"].atom;
raw << std::endl;
- struct evkeyvalq *headers;
struct evkeyval *header;
struct evbuffer *buf;
// insert headers to event data
- headers = evhttp_request_get_input_headers(req);
for (header = headers->tqh_first; header; header = header->next.tqe_next) {
request.data.compound["header"].compound[header->key] = Data(header->value, Data::VERBATIM);
raw << header->key << ": " << header->value << std::endl;
@@ -415,12 +409,12 @@ void HTTPServer::httpRecvReqCallback(struct evhttp_request *req, void *callbackD
void HTTPServer::processByMatchingServlet(const Request& request) {
tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
- servlet_iter_t servletIter = _servlets.begin();
+ http_servlet_iter_t servletIter = _httpServlets.begin();
std::string actualPath = request.data.compound.at("path").atom;
std::map<std::string, HTTPServlet*, comp_strsize_less> matches;
- while(servletIter != _servlets.end()) {
+ while(servletIter != _httpServlets.end()) {
// is the servlet path a prefix of the actual path?
std::string servletPath = "/" + servletIter->first;
if (iequals(actualPath.substr(0, servletPath.length()), servletPath) && // servlet path is a prefix
@@ -440,7 +434,35 @@ void HTTPServer::processByMatchingServlet(const Request& request) {
}
LOG(INFO) << "Got an HTTP request at " << actualPath << " but no servlet is registered there or at a prefix";
- evhttp_send_error(request.curlReq, 404, NULL);
+ evhttp_send_error(request.evhttpReq, 404, NULL);
+}
+
+void HTTPServer::processByMatchingServlet(evws_connection* conn, const WSFrame& frame) {
+ tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
+
+ ws_servlet_iter_t servletIter = _wsServlets.begin();
+
+ std::string actualPath = frame.data.compound.at("path").atom;
+ std::map<std::string, WebSocketServlet*, comp_strsize_less> matches;
+
+ while(servletIter != _wsServlets.end()) {
+ // is the servlet path a prefix of the actual path?
+ std::string servletPath = "/" + servletIter->first;
+ if (iequals(actualPath.substr(0, servletPath.length()), servletPath) && // servlet path is a prefix
+ iequals(actualPath.substr(servletPath.length(), 1), "/")) { // and next character is a '/'
+ matches.insert(std::make_pair(servletPath, servletIter->second));
+ }
+ servletIter++;
+ }
+
+ // process by best matching servlet until someone feels responsible
+ std::map<std::string, WebSocketServlet*, comp_strsize_less>::iterator matchesIter = matches.begin();
+ while(matchesIter != matches.end()) {
+ if (matchesIter->second->wsRecvRequest(conn, frame)) {
+ return;
+ }
+ matchesIter++;
+ }
}
void HTTPServer::reply(const Reply& reply) {
@@ -459,12 +481,12 @@ void HTTPServer::replyCallback(evutil_socket_t fd, short what, void *arg) {
std::map<std::string, std::string>::const_iterator headerIter = reply->headers.begin();
while(headerIter != reply->headers.end()) {
- evhttp_add_header(evhttp_request_get_output_headers(reply->curlReq), headerIter->first.c_str(), headerIter->second.c_str());
+ evhttp_add_header(evhttp_request_get_output_headers(reply->evhttpReq), headerIter->first.c_str(), headerIter->second.c_str());
headerIter++;
}
if (reply->status >= 400) {
- evhttp_send_error(reply->curlReq, reply->status, NULL);
+ evhttp_send_error(reply->evhttpReq, reply->status, NULL);
return;
}
@@ -475,7 +497,7 @@ void HTTPServer::replyCallback(evutil_socket_t fd, short what, void *arg) {
evbuffer_add(evb, reply->content.data(), reply->content.size());
}
- evhttp_send_reply(reply->curlReq, reply->status, NULL, evb);
+ evhttp_send_reply(reply->evhttpReq, reply->status, NULL, evb);
if (evb != NULL)
evbuffer_free(evb);
@@ -497,7 +519,7 @@ bool HTTPServer::registerServlet(const std::string& path, HTTPServlet* servlet)
// if this servlet allows to adapt the path, do so
int i = 2;
- while(INSTANCE->_servlets.find(suffixedPath) != INSTANCE->_servlets.end()) {
+ while(INSTANCE->_httpServlets.find(suffixedPath) != INSTANCE->_httpServlets.end()) {
if (!servlet->canAdaptPath())
return false;
std::stringstream ss;
@@ -509,7 +531,7 @@ bool HTTPServer::registerServlet(const std::string& path, HTTPServlet* servlet)
servletURL << "http://" << INSTANCE->_address << ":" << INSTANCE->_port << "/" << suffixedPath;
servlet->setURL(servletURL.str());
- INSTANCE->_servlets[suffixedPath] = servlet;
+ INSTANCE->_httpServlets[suffixedPath] = servlet;
// LOG(INFO) << "HTTP Servlet listening at: " << servletURL.str() << std::endl;
@@ -519,21 +541,68 @@ bool HTTPServer::registerServlet(const std::string& path, HTTPServlet* servlet)
return true;
}
-std::string HTTPServer::getBaseURL() {
+bool HTTPServer::registerServlet(const std::string& path, WebSocketServlet* servlet) {
HTTPServer* INSTANCE = getInstance();
+ tthread::lock_guard<tthread::recursive_mutex> lock(INSTANCE->_mutex);
+
+ // remove trailing and leading slash
+ std::string actualPath = path;
+ if (boost::ends_with(actualPath, "/"))
+ actualPath = actualPath.substr(0, actualPath.size() - 1);
+ if (boost::starts_with(actualPath, "/"))
+ actualPath = actualPath.substr(1);
+ std::string suffixedPath = actualPath;
+
+ // if this servlet allows to adapt the path, do so
+ int i = 2;
+ while(INSTANCE->_wsServlets.find(suffixedPath) != INSTANCE->_wsServlets.end()) {
+ if (!servlet->canAdaptPath())
+ return false;
+ std::stringstream ss;
+ ss << actualPath << i++;
+ suffixedPath = ss.str();
+ }
+
std::stringstream servletURL;
- servletURL << "http://" << INSTANCE->_address << ":" << INSTANCE->_port;
+ servletURL << "ws://" << INSTANCE->_address << ":" << INSTANCE->_wsPort << "/" << suffixedPath;
+ servlet->setURL(servletURL.str());
+
+ INSTANCE->_wsServlets[suffixedPath] = servlet;
+
+ // LOG(INFO) << "HTTP Servlet listening at: " << servletURL.str() << std::endl;
+
+ // register callback
+ evws_set_cb(INSTANCE->_evws, ("/" + suffixedPath).c_str(), HTTPServer::wsRecvReqCallback, NULL, servlet);
+
+ return true;
+}
+
+std::string HTTPServer::getBaseURL(ServerType type) {
+ HTTPServer* INSTANCE = getInstance();
+ std::stringstream servletURL;
+
+ switch (type) {
+ case HTTP:
+ servletURL << "http://" << INSTANCE->_address << ":" << INSTANCE->_port;
+ break;
+ case HTTPS:
+ servletURL << "https://" << INSTANCE->_address << ":" << INSTANCE->_sslPort;
+ break;
+ case WebSockets:
+ servletURL << "ws://" << INSTANCE->_address << ":" << INSTANCE->_wsPort;
+ break;
+ }
return servletURL.str();
}
void HTTPServer::unregisterServlet(HTTPServlet* servlet) {
HTTPServer* INSTANCE = getInstance();
tthread::lock_guard<tthread::recursive_mutex> lock(INSTANCE->_mutex);
- servlet_iter_t servletIter = INSTANCE->_servlets.begin();
- while(servletIter != INSTANCE->_servlets.end()) {
+ http_servlet_iter_t servletIter = INSTANCE->_httpServlets.begin();
+ while(servletIter != INSTANCE->_httpServlets.end()) {
if (servletIter->second == servlet) {
evhttp_del_cb(INSTANCE->_http, std::string("/" + servletIter->first).c_str());
- INSTANCE->_servlets.erase(servletIter);
+ INSTANCE->_httpServlets.erase(servletIter);
break;
}
servletIter++;
@@ -559,4 +628,84 @@ void HTTPServer::determineAddress() {
_address = std::string(hostname);
}
+
+#if (defined EVENT_SSL_FOUND && defined OPENSSL_FOUND && defined OPENSSL_HAS_ELIPTIC_CURVES)
+ // see https://github.com/ppelleti/https-example/blob/master/https-server.c
+ struct bufferevent* HTTPServer::sslBufferEventCallback(struct event_base *base, void *arg) {
+ struct bufferevent* r;
+ SSL_CTX *ctx = (SSL_CTX *) arg;
+ r = bufferevent_openssl_socket_new (base,
+ -1,
+ SSL_new (ctx),
+ BUFFEREVENT_SSL_ACCEPTING,
+ BEV_OPT_CLOSE_ON_FREE);
+ return r;
+ }
+
+
+ void HTTPServer::sslGeneralBufferEventCallback (struct evhttp_request *req, void *arg) {
+ struct evbuffer *evb = NULL;
+ const char *uri = evhttp_request_get_uri (req);
+ struct evhttp_uri *decoded = NULL;
+
+ /* We only handle POST requests. */
+ if (evhttp_request_get_command (req) != EVHTTP_REQ_POST) {
+ evhttp_send_reply (req, 200, "OK", NULL);
+ return;
+ }
+
+ printf ("Got a POST request for <%s>\n", uri);
+
+ /* Decode the URI */
+ decoded = evhttp_uri_parse (uri);
+ if (! decoded) {
+ printf ("It's not a good URI. Sending BADREQUEST\n");
+ evhttp_send_error (req, HTTP_BADREQUEST, 0);
+ return;
+ }
+
+ /* Decode the payload */
+ struct evkeyvalq kv;
+ memset (&kv, 0, sizeof (kv));
+ struct evbuffer *buf = evhttp_request_get_input_buffer (req);
+ evbuffer_add (buf, "", 1); /* NUL-terminate the buffer */
+ char *payload = (char *) evbuffer_pullup (buf, -1);
+ if (0 != evhttp_parse_query_str (payload, &kv)) {
+ printf ("Malformed payload. Sending BADREQUEST\n");
+ evhttp_send_error (req, HTTP_BADREQUEST, 0);
+ return;
+ }
+
+ /* Determine peer */
+ char *peer_addr;
+ ev_uint16_t peer_port;
+ struct evhttp_connection *con = evhttp_request_get_connection (req);
+ evhttp_connection_get_peer (con, &peer_addr, &peer_port);
+
+ /* Extract passcode */
+ const char *passcode = evhttp_find_header (&kv, "passcode");
+ char response[256];
+ evutil_snprintf (response, sizeof (response),
+ "Hi %s! I %s your passcode.\n", peer_addr,
+ (0 == strcmp (passcode, "R23")
+ ? "liked"
+ : "didn't like"));
+ evhttp_clear_headers (&kv); /* to free memory held by kv */
+
+ /* This holds the content we're sending. */
+ evb = evbuffer_new ();
+
+ evhttp_add_header (evhttp_request_get_output_headers (req),
+ "Content-Type", "application/x-yaml");
+ evbuffer_add (evb, response, strlen (response));
+
+ evhttp_send_reply (req, 200, "OK", evb);
+
+ if (decoded)
+ evhttp_uri_free (decoded);
+ if (evb)
+ evbuffer_free (evb);
+ }
+#endif
+
} \ No newline at end of file
diff --git a/src/uscxml/server/HTTPServer.h b/src/uscxml/server/HTTPServer.h
index 049abc4..6f3c792 100644
--- a/src/uscxml/server/HTTPServer.h
+++ b/src/uscxml/server/HTTPServer.h
@@ -28,6 +28,7 @@
extern "C" {
#include "event2/util.h" // for evutil_socket_t
#include <event2/http.h> // for evhttp_request
+#include <evws.h>
}
#include "uscxml/Common.h" // for USCXML_API
@@ -38,20 +39,28 @@ extern "C" {
namespace uscxml {
class HTTPServlet;
-
+class WebSocketServlet;
+
class USCXML_API HTTPServer {
public:
class Request : public Event {
public:
- Request() : curlReq(NULL) {}
+ Request() : evhttpReq(NULL) {}
std::string content;
- struct evhttp_request* curlReq;
+ struct evhttp_request* evhttpReq;
operator bool() {
- return curlReq != NULL;
+ return evhttpReq != NULL;
}
};
+ class WSFrame : public Event {
+ public:
+ WSFrame() : evwsConn(NULL) {}
+ std::string content;
+ struct evws_connection* evwsConn;
+ };
+
class SSLConfig {
public:
SSLConfig() : port(8443) {}
@@ -62,12 +71,12 @@ public:
class Reply {
public:
- Reply(Request req) : status(200), type(req.data.compound["type"].atom), curlReq(req.curlReq) {}
+ Reply(Request req) : status(200), type(req.data.compound["type"].atom), evhttpReq(req.evhttpReq) {}
int status;
std::string type;
std::map<std::string, std::string> headers;
std::string content;
- struct evhttp_request* curlReq;
+ struct evhttp_request* evhttpReq;
};
struct CallbackData {
@@ -75,13 +84,26 @@ public:
evhttp_request* httpReq;
};
- static HTTPServer* getInstance(unsigned short port = 8080, SSLConfig* sslConf = NULL);
- static std::string getBaseURL();
+ enum ServerType {
+ HTTPS,
+ HTTP,
+ WebSockets
+ };
+
+ static HTTPServer* getInstance(unsigned short port, unsigned short wsPort, SSLConfig* sslConf = NULL);
+ static HTTPServer* getInstance() {
+ return getInstance(0, 0, NULL);
+ }
+
+ static std::string getBaseURL(ServerType type = HTTP);
static void reply(const Reply& reply);
static bool registerServlet(const std::string& path, HTTPServlet* servlet); ///< Register a servlet, returns false if path is already taken
static void unregisterServlet(HTTPServlet* servlet);
+ static bool registerServlet(const std::string& path, WebSocketServlet* servlet); ///< Register a servlet, returns false if path is already taken
+ static void unregisterServlet(WebSocketServlet* servlet);
+
private:
struct comp_strsize_less {
bool operator()(std::string const& l, std::string const& r) const {
@@ -92,7 +114,7 @@ private:
};
};
- HTTPServer(unsigned short port, SSLConfig* sslConf = NULL);
+ HTTPServer(unsigned short port, unsigned short wsPort, SSLConfig* sslConf = NULL);
~HTTPServer();
void start();
@@ -103,17 +125,27 @@ private:
static void replyCallback(evutil_socket_t fd, short what, void *arg);
static void httpRecvReqCallback(struct evhttp_request *req, void *callbackData);
+ static void wsRecvReqCallback(struct evws_connection *conn, struct evws_frame *, void *callbackData);
+
void processByMatchingServlet(const Request& request);
+ void processByMatchingServlet(evws_connection* conn, const WSFrame& frame);
static std::map<std::string, std::string> mimeTypes;
- std::map<std::string, HTTPServlet*> _servlets;
- typedef std::map<std::string, HTTPServlet*>::iterator servlet_iter_t;
+ std::map<std::string, HTTPServlet*> _httpServlets;
+ typedef std::map<std::string, HTTPServlet*>::iterator http_servlet_iter_t;
+
+ std::map<std::string, WebSocketServlet*> _wsServlets;
+ typedef std::map<std::string, WebSocketServlet*>::iterator ws_servlet_iter_t;
struct event_base* _base;
struct evhttp* _http;
- struct evhttp_bound_socket* _handle;
-
+ struct evws* _evws;
+
+ struct evhttp_bound_socket* _httpHandle;
+ evutil_socket_t _wsHandle;
+
unsigned short _port;
+ unsigned short _wsPort;
std::string _address;
static HTTPServer* _instance;
@@ -124,6 +156,7 @@ private:
bool _isRunning;
friend class HTTPServlet;
+ friend class WebSocketServlet;
#if (defined EVENT_SSL_FOUND && defined OPENSSL_FOUND && defined OPENSSL_HAS_ELIPTIC_CURVES)
struct evhttp* _https;
@@ -144,6 +177,18 @@ public:
}
};
+class USCXML_API WebSocketServlet {
+public:
+ virtual bool wsRecvRequest(struct evws_connection *conn, const HTTPServer::WSFrame& frame) = 0;
+ virtual void setURL(const std::string& url) = 0; /// Called by the server with the actual URL
+ virtual bool canAdaptPath() {
+ return true;
+ }
+ struct evws* getWSBase() {
+ return HTTPServer::getInstance()->_evws;
+ }
+};
+
}
#endif /* end of include guard: HTTPSERVER_H_AIH108EG */
diff --git a/src/uscxml/server/InterpreterServlet.cpp b/src/uscxml/server/InterpreterServlet.cpp
index beaa364..8218669 100644
--- a/src/uscxml/server/InterpreterServlet.cpp
+++ b/src/uscxml/server/InterpreterServlet.cpp
@@ -23,7 +23,7 @@
namespace uscxml {
-InterpreterServlet::InterpreterServlet(InterpreterImpl* interpreter) {
+InterpreterHTTPServlet::InterpreterHTTPServlet(InterpreterImpl* interpreter) {
_interpreter = interpreter;
std::stringstream path;
@@ -37,23 +37,23 @@ InterpreterServlet::InterpreterServlet(InterpreterImpl* interpreter) {
_path = path.str();
}
-boost::shared_ptr<IOProcessorImpl> InterpreterServlet::create(InterpreterImpl* interpreter) {
+boost::shared_ptr<IOProcessorImpl> InterpreterHTTPServlet::create(InterpreterImpl* interpreter) {
// we instantiate directly in Interpreter
- boost::shared_ptr<IOProcessorImpl> io = boost::shared_ptr<InterpreterServlet>(new InterpreterServlet(interpreter));
+ boost::shared_ptr<IOProcessorImpl> io = boost::shared_ptr<InterpreterHTTPServlet>(new InterpreterHTTPServlet(interpreter));
return io;
}
-bool InterpreterServlet::httpRecvRequest(const HTTPServer::Request& req) {
+bool InterpreterHTTPServlet::httpRecvRequest(const HTTPServer::Request& req) {
tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
// evhttp_request_own(req.curlReq);
- _requests[toStr((uintptr_t)req.curlReq)] = req;
+ _requests[toStr((uintptr_t)req.evhttpReq)] = req;
Event event = req;
event.name = "http." + event.data.compound["type"].atom;
- event.origin = toStr((uintptr_t)req.curlReq);
+ event.origin = toStr((uintptr_t)req.evhttpReq);
if (event.data.compound["content"]) {
if (event.data.compound["content"].compound.size() > 0) {
@@ -74,15 +74,135 @@ bool InterpreterServlet::httpRecvRequest(const HTTPServer::Request& req) {
return true;
}
-Data InterpreterServlet::getDataModelVariables() {
+Data InterpreterHTTPServlet::getDataModelVariables() {
Data data;
assert(_url.length() > 0);
data.compound["location"] = Data(_url, Data::VERBATIM);
return data;
}
-void InterpreterServlet::send(const SendRequest& req) {
+void InterpreterHTTPServlet::send(const SendRequest& req) {
LOG(ERROR) << "send not supported by http iorprocessor, use the fetch element";
}
+
+
+InterpreterWebSocketServlet::InterpreterWebSocketServlet(InterpreterImpl* interpreter) {
+ _interpreter = interpreter;
+
+ std::stringstream path;
+ path << _interpreter->getName();
+ int i = 2;
+ while(!HTTPServer::registerServlet(path.str(), this)) {
+ path.clear();
+ path.str();
+ path << _interpreter->getName() << i++;
+ }
+ _path = path.str();
+}
+
+boost::shared_ptr<IOProcessorImpl> InterpreterWebSocketServlet::create(InterpreterImpl* interpreter) {
+ // we instantiate directly in Interpreter
+ boost::shared_ptr<IOProcessorImpl> io = boost::shared_ptr<InterpreterWebSocketServlet>(new InterpreterWebSocketServlet(interpreter));
+ return io;
+}
+
+bool InterpreterWebSocketServlet::wsRecvRequest(struct evws_connection *conn, const HTTPServer::WSFrame& frame) {
+ tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
+
+ // evhttp_request_own(req.curlReq);
+
+ _requests[toStr((uintptr_t)conn)] = conn;
+
+ Event event = frame;
+
+ event.name = "ws." + event.data.compound["type"].atom;
+ event.origin = toStr((uintptr_t)conn);
+
+ if (event.data.compound["type"].atom.compare("text") == 0 && event.data.compound["content"]) {
+ if (event.data.compound["content"].compound.size() > 0) {
+ std::map<std::string, Data>::iterator compoundIter = event.data.compound["content"].compound.begin();
+ while(compoundIter != event.data.compound["content"].compound.end()) {
+ Data json = Data::fromJSON(compoundIter->second.atom);
+ if (json) {
+ compoundIter->second = json;
+ }
+ compoundIter++;
+ }
+ }
+ }
+
+ _interpreter->receive(event);
+ return true;
+}
+
+Data InterpreterWebSocketServlet::getDataModelVariables() {
+ Data data;
+ assert(_url.length() > 0);
+ data.compound["location"] = Data(_url, Data::VERBATIM);
+ return data;
+}
+
+void InterpreterWebSocketServlet::send(const SendRequest& req) {
+
+ if (!req.data) {
+ LOG(WARNING) << "No content given to send on websocket!";
+ return;
+ }
+
+ if (_requests.find(req.target) != _requests.end()) {
+ // send data to the given connection
+ if (false) {
+ } else if (req.data.binary) {
+ evws_send_data(_requests[req.target],
+ EVWS_BINARY_FRAME,
+ req.data.binary->data,
+ req.data.binary->size);
+ } else if (req.data.node) {
+ std::stringstream ssXML;
+ ssXML << req.data.node;
+ std::string data = ssXML.str();
+ evws_send_data(_requests[req.target],
+ EVWS_TEXT_FRAME,
+ data.c_str(),
+ data.length());
+ } else if (req.data) {
+ std::string data = Data::toJSON(req.data);
+ evws_send_data(_requests[req.target],
+ EVWS_TEXT_FRAME,
+ data.c_str(),
+ data.length());
+ } else {
+ LOG(WARNING) << "Not sure what to make off content given to send on websocket!";
+ }
+ } else if(req.target.size() && req.target.compare(0, 1, "/") == 0) {
+ // broadcast to the given path
+ if (false) {
+ } else if (req.data.binary) {
+ evws_broadcast(getWSBase(), req.target.c_str(),
+ EVWS_BINARY_FRAME,
+ req.data.binary->data,
+ req.data.binary->size);
+ } else if (req.data.node) {
+ std::stringstream ssXML;
+ ssXML << req.data.node;
+ std::string data = ssXML.str();
+ evws_broadcast(getWSBase(), req.target.c_str(),
+ EVWS_TEXT_FRAME,
+ data.c_str(),
+ data.length());
+ } else if (req.data) {
+ std::string data = Data::toJSON(req.data);
+ evws_broadcast(getWSBase(), req.target.c_str(),
+ EVWS_TEXT_FRAME,
+ data.c_str(),
+ data.length());
+ } else {
+ LOG(WARNING) << "Not sure what to make off content given to broadcast on websocket!";
+ }
+ } else {
+ LOG(WARNING) << "Invalid target for websocket";
+ }
+}
+
} \ No newline at end of file
diff --git a/src/uscxml/server/InterpreterServlet.h b/src/uscxml/server/InterpreterServlet.h
index 55b97bd..46cc737 100644
--- a/src/uscxml/server/InterpreterServlet.h
+++ b/src/uscxml/server/InterpreterServlet.h
@@ -27,11 +27,11 @@ namespace uscxml {
class Interpreter;
-class InterpreterServlet : public HTTPServlet, public IOProcessorImpl {
+class InterpreterHTTPServlet : public HTTPServlet, public IOProcessorImpl {
public:
- InterpreterServlet() {};
- InterpreterServlet(InterpreterImpl* interpreter);
- virtual ~InterpreterServlet() {}
+ InterpreterHTTPServlet() {};
+ InterpreterHTTPServlet(InterpreterImpl* interpreter);
+ virtual ~InterpreterHTTPServlet() {}
virtual boost::shared_ptr<IOProcessorImpl> create(InterpreterImpl* interpreter);
@@ -78,6 +78,56 @@ protected:
};
+class InterpreterWebSocketServlet : public WebSocketServlet, public IOProcessorImpl {
+public:
+ InterpreterWebSocketServlet() {};
+ InterpreterWebSocketServlet(InterpreterImpl* interpreter);
+ virtual ~InterpreterWebSocketServlet() {}
+
+ virtual boost::shared_ptr<IOProcessorImpl> create(InterpreterImpl* interpreter);
+
+ virtual std::set<std::string> getNames() {
+ std::set<std::string> names;
+ names.insert("websocket");
+ names.insert("http://www.w3.org/TR/scxml/#WebSocketEventProcessor");
+ return names;
+ }
+
+ Data getDataModelVariables();
+ virtual void send(const SendRequest& req);
+
+ virtual bool wsRecvRequest(struct evws_connection *conn, const HTTPServer::WSFrame& frame);
+
+ std::string getPath() {
+ return _path;
+ }
+ std::string getURL() {
+ return _url;
+ }
+ void setURL(const std::string& url) {
+ _url = url;
+ }
+ bool canAdaptPath() {
+ return false;
+ }
+
+ std::map<std::string, struct evws_connection*>& getRequests() {
+ return _requests;
+ }
+ tthread::recursive_mutex& getMutex() {
+ return _mutex;
+ }
+
+protected:
+ InterpreterImpl* _interpreter;
+
+ tthread::recursive_mutex _mutex;
+ std::map<std::string, struct evws_connection*> _requests;
+ std::string _path;
+ std::string _url;
+
+};
+
}
diff --git a/src/uscxml/server/WebSocketServer.cpp b/src/uscxml/server/WebSocketServer.cpp
deleted file mode 100644
index e69de29..0000000
--- a/src/uscxml/server/WebSocketServer.cpp
+++ /dev/null
diff --git a/src/uscxml/server/WebSocketServer.h b/src/uscxml/server/WebSocketServer.h
deleted file mode 100644
index 438b932..0000000
--- a/src/uscxml/server/WebSocketServer.h
+++ /dev/null
@@ -1,8 +0,0 @@
-#ifndef WEBSOCKETSERVER_H_FG7908O3
-#define WEBSOCKETSERVER_H_FG7908O3
-
-namespace uscxml {
-
-}
-
-#endif /* end of include guard: WEBSOCKETSERVER_H_FG7908O3 */