From 95c3505e11d22f9a022647b0c7383364682d91de Mon Sep 17 00:00:00 2001 From: Stefan Radomski Date: Mon, 27 Jan 2014 20:30:19 +0100 Subject: Bug fixes and thread safety for web sockets --- contrib/src/evws/evws.c | 16 +++++++++-- contrib/src/evws/evws.h | 1 + src/uscxml/Interpreter.cpp | 2 +- src/uscxml/Message.cpp | 10 ++++++- src/uscxml/server/HTTPServer.cpp | 29 ++++++++++++++++++- src/uscxml/server/HTTPServer.h | 26 ++++++++++++++--- src/uscxml/server/InterpreterServlet.cpp | 48 ++++++++++++++++---------------- 7 files changed, 98 insertions(+), 34 deletions(-) diff --git a/contrib/src/evws/evws.c b/contrib/src/evws/evws.c index 8932b9f..a2cc992 100644 --- a/contrib/src/evws/evws.c +++ b/contrib/src/evws/evws.c @@ -143,6 +143,16 @@ void evws_broadcast(struct evws *ws, const char *uri, enum evws_opcode opcode, c } } +int evws_is_valid_connection(struct evws *ws, struct evws_connection *conn) { + struct evws_connection *ws_connection; + for ((ws_connection) = (&ws->connections)->tqh_first; ws_connection; ws_connection = ws_connection->next.tqe_next) { + if (ws_connection == conn) + return 1; + } + return 0; +} + + // Error callback static void cb_error(struct bufferevent *bev, short what, void *ctx) { struct evws_connection *conn = ctx; @@ -374,7 +384,7 @@ NEXT_FRAME: while(conn->frame->payload_bytes > 0 && // need to add more bytes to the payload length (size - (dataPtr - readbuf) > 0)) { // and bytes still available // length is in MSB network order - shift to left and add - conn->frame->size = (conn->frame->size << 8) + dataPtr[0]; + conn->frame->size = (conn->frame->size << 8) + (uint8_t)dataPtr[0]; conn->frame->payload_bytes--; dataPtr++; } @@ -427,9 +437,9 @@ NEXT_FRAME: // TAILQ_FOREACH for (ws_cb = ((&ws->callbacks))->tqh_first; ws_cb; ws_cb = (ws_cb->next.tqe_next)) { if (strcmp(ws_cb->uri, conn->uri) == 0) { - if(ws_cb->msg_cb != ((void*)0)) + if(ws_cb->msg_cb != ((void*)0)) { ws_cb->msg_cb(conn, conn->frame, ws_cb->cb_arg); - return; + } } } ws->gencb(conn, conn->frame, ws->gencb_arg); diff --git a/contrib/src/evws/evws.h b/contrib/src/evws/evws.h index f9ea567..d32c3e1 100644 --- a/contrib/src/evws/evws.h +++ b/contrib/src/evws/evws.h @@ -194,6 +194,7 @@ struct evws { struct evws_connection *evws_connection_new(struct evws *ws, evutil_socket_t fd); void evws_connection_free(struct evws_connection *conn); +int evws_is_valid_connection(struct evws *ws, struct evws_connection *conn); struct evws_frame *evws_frame_new(); void evws_frame_free(struct evws_frame *frame); diff --git a/src/uscxml/Interpreter.cpp b/src/uscxml/Interpreter.cpp index c3a85f9..fa3e227 100644 --- a/src/uscxml/Interpreter.cpp +++ b/src/uscxml/Interpreter.cpp @@ -92,7 +92,7 @@ void InterpreterOptions::printUsageAndExit(const char* progName) { #ifdef EVENT_SSL_FOUND printf(" [-sN] [--certificate=FILE | --private-key=FILE --public-key=FILE] "); #endif - printf(" \\\n\t\tURL1 [--disable-http] [--option1=value1 --option2=value2]"); + printf(" \\\n\t\t URL1 [--disable-http] [--option1=value1 --option2=value2]"); printf(" \\\n\t\t[URL2 [--disable-http] [--option3=value3 --option4=value4]]"); printf(" \\\n\t\t[URLN [--disable-http] [--optionN=valueN --optionM=valueM]]"); printf("\n"); diff --git a/src/uscxml/Message.cpp b/src/uscxml/Message.cpp index 4591484..b664fd1 100644 --- a/src/uscxml/Message.cpp +++ b/src/uscxml/Message.cpp @@ -682,7 +682,15 @@ std::string Data::toJSON(const Data& data) { os << "]"; } else if (data.atom.size() > 0) { if (data.type == Data::VERBATIM) { - os << "\"" << data.atom << "\""; + os << "\""; + for (int i = 0; i < data.atom.size(); i++) { + // escape string + if (data.atom[i] == '"') { + os << '\\'; + } + os << data.atom[i]; + } + os << "\""; } else { os << data.atom; } diff --git a/src/uscxml/server/HTTPServer.cpp b/src/uscxml/server/HTTPServer.cpp index 5066584..eddfe6e 100644 --- a/src/uscxml/server/HTTPServer.cpp +++ b/src/uscxml/server/HTTPServer.cpp @@ -489,7 +489,7 @@ void HTTPServer::processByMatchingServlet(evws_connection* conn, const WSFrame& } void HTTPServer::reply(const Reply& reply) { - // we need to reply from the thread calling event_base_dispatch, just add to ist base queue! + // we need to reply from the thread calling event_base_dispatch, just add to its base queue! Reply* replyCB = new Reply(reply); HTTPServer* INSTANCE = getInstance(); event_base_once(INSTANCE->_base, -1, EV_TIMEOUT, HTTPServer::replyCallback, replyCB, NULL); @@ -528,6 +528,33 @@ void HTTPServer::replyCallback(evutil_socket_t fd, short what, void *arg) { delete(reply); } + +void HTTPServer::wsSend(struct evws_connection *conn, enum evws_opcode opcode, const char *data, uint64_t length) { + HTTPServer* INSTANCE = getInstance(); + WSData* sendCB = new WSData(conn, NULL, opcode, data, length); + event_base_once(INSTANCE->_base, -1, EV_TIMEOUT, HTTPServer::wsSendCallback, sendCB, NULL); +} + +void HTTPServer::wsBroadcast(const char *uri, enum evws_opcode opcode, const char *data, uint64_t length) { + HTTPServer* INSTANCE = getInstance(); + WSData* sendCB = new WSData(NULL, uri, opcode, data, length); + event_base_once(INSTANCE->_base, -1, EV_TIMEOUT, HTTPServer::wsSendCallback, sendCB, NULL); + +} + +void HTTPServer::wsSendCallback(evutil_socket_t fd, short what, void *arg) { + WSData* wsSend = (WSData*)arg; + if (wsSend->uri.size() > 0) { + evws_broadcast(getInstance()->_evws, wsSend->uri.c_str(), wsSend->opcode, wsSend->data.data(), wsSend->data.length()); + } else { + if (evws_is_valid_connection(getInstance()->_evws, wsSend->conn) > 0) { + evws_send_data(wsSend->conn, wsSend->opcode, wsSend->data.data(), wsSend->data.length()); + } + } + + delete wsSend; +} + bool HTTPServer::registerServlet(const std::string& path, HTTPServlet* servlet) { HTTPServer* INSTANCE = getInstance(); diff --git a/src/uscxml/server/HTTPServer.h b/src/uscxml/server/HTTPServer.h index 7083a3c..2003130 100644 --- a/src/uscxml/server/HTTPServer.h +++ b/src/uscxml/server/HTTPServer.h @@ -98,6 +98,9 @@ public: static std::string getBaseURL(ServerType type = HTTP); static void reply(const Reply& reply); + static void wsSend(struct evws_connection *conn, enum evws_opcode opcode, const char *data, uint64_t length); + static void wsBroadcast(const char *uri, enum evws_opcode opcode, const char *data, uint64_t length); + static bool registerServlet(const std::string& path, HTTPServlet* servlet); ///< Register a servlet, returns false if path is already taken static void unregisterServlet(HTTPServlet* servlet); @@ -105,6 +108,22 @@ public: static void unregisterServlet(WebSocketServlet* servlet); private: + + class WSData { + public: + WSData(struct evws_connection *conn_, const char *uri_, enum evws_opcode opcode_, const char *data_, uint64_t length_) { + conn = conn_; + if (uri_) + uri = uri_; + opcode = opcode_; + data = std::string(data_, length_); + } + struct evws_connection *conn; + std::string data; + std::string uri; + evws_opcode opcode; + }; + struct comp_strsize_less { bool operator()(std::string const& l, std::string const& r) const { if (l.size() < r.size()) @@ -124,9 +143,11 @@ private: void determineAddress(); static void replyCallback(evutil_socket_t fd, short what, void *arg); + static void wsSendCallback(evutil_socket_t fd, short what, void *arg); + static void httpRecvReqCallback(struct evhttp_request *req, void *callbackData); static void wsRecvReqCallback(struct evws_connection *conn, struct evws_frame *, void *callbackData); - + void processByMatchingServlet(const Request& request); void processByMatchingServlet(evws_connection* conn, const WSFrame& frame); @@ -184,9 +205,6 @@ public: virtual bool canAdaptPath() { return true; } - struct evws* getWSBase() { - return HTTPServer::getInstance()->_evws; - } }; } diff --git a/src/uscxml/server/InterpreterServlet.cpp b/src/uscxml/server/InterpreterServlet.cpp index af56ba3..b956820 100644 --- a/src/uscxml/server/InterpreterServlet.cpp +++ b/src/uscxml/server/InterpreterServlet.cpp @@ -154,24 +154,24 @@ void InterpreterWebSocketServlet::send(const SendRequest& req) { // send data to the given connection if (false) { } else if (req.data.binary) { - evws_send_data(_requests[req.target], - EVWS_BINARY_FRAME, - req.data.binary->data, - req.data.binary->size); + HTTPServer::wsSend(_requests[req.target], + EVWS_BINARY_FRAME, + req.data.binary->data, + req.data.binary->size); } else if (req.data.node) { std::stringstream ssXML; ssXML << req.data.node; std::string data = ssXML.str(); - evws_send_data(_requests[req.target], - EVWS_TEXT_FRAME, - data.c_str(), - data.length()); + HTTPServer::wsSend(_requests[req.target], + EVWS_TEXT_FRAME, + data.c_str(), + data.length()); } else if (req.data) { std::string data = Data::toJSON(req.data); - evws_send_data(_requests[req.target], - EVWS_TEXT_FRAME, - data.c_str(), - data.length()); + HTTPServer::wsSend(_requests[req.target], + EVWS_TEXT_FRAME, + data.c_str(), + data.length()); } else { LOG(WARNING) << "Not sure what to make off content given to send on websocket!"; } @@ -179,24 +179,24 @@ void InterpreterWebSocketServlet::send(const SendRequest& req) { // broadcast to the given path if (false) { } else if (req.data.binary) { - evws_broadcast(getWSBase(), req.target.c_str(), - EVWS_BINARY_FRAME, - req.data.binary->data, - req.data.binary->size); + HTTPServer::wsBroadcast(req.target.c_str(), + EVWS_BINARY_FRAME, + req.data.binary->data, + req.data.binary->size); } else if (req.data.node) { std::stringstream ssXML; ssXML << req.data.node; std::string data = ssXML.str(); - evws_broadcast(getWSBase(), req.target.c_str(), - EVWS_TEXT_FRAME, - data.c_str(), - data.length()); + HTTPServer::wsBroadcast(req.target.c_str(), + EVWS_TEXT_FRAME, + data.c_str(), + data.length()); } else if (req.data) { std::string data = Data::toJSON(req.data); - evws_broadcast(getWSBase(), req.target.c_str(), - EVWS_TEXT_FRAME, - data.c_str(), - data.length()); + HTTPServer::wsBroadcast(req.target.c_str(), + EVWS_TEXT_FRAME, + data.c_str(), + data.length()); } else { LOG(WARNING) << "Not sure what to make off content given to broadcast on websocket!"; } -- cgit v0.12