diff options
Diffstat (limited to 'src/uscxml/server')
-rw-r--r-- | src/uscxml/server/HTTPServer.cpp | 67 | ||||
-rw-r--r-- | src/uscxml/server/HTTPServer.h | 35 | ||||
-rw-r--r-- | src/uscxml/server/InterpreterServlet.cpp | 215 | ||||
-rw-r--r-- | src/uscxml/server/InterpreterServlet.h | 135 | ||||
-rw-r--r-- | src/uscxml/server/Socket.cpp | 389 | ||||
-rw-r--r-- | src/uscxml/server/Socket.h | 139 |
6 files changed, 48 insertions, 932 deletions
diff --git a/src/uscxml/server/HTTPServer.cpp b/src/uscxml/server/HTTPServer.cpp index 6f2347c..cb80ec4 100644 --- a/src/uscxml/server/HTTPServer.cpp +++ b/src/uscxml/server/HTTPServer.cpp @@ -25,9 +25,8 @@ #include <windows.h> #endif -#include "uscxml/server/HTTPServer.h" -#include "uscxml/dom/DOMUtils.h" -#include "uscxml/dom/NameSpacingParser.h" +#include "HTTPServer.h" +#include "uscxml/util/DOM.h" #include <string> #include <iostream> @@ -41,7 +40,7 @@ extern "C" { #include <event2/thread.h> } -#include <glog/logging.h> +#include <easylogging++.h> #include <boost/algorithm/string.hpp> #ifndef _WIN32 @@ -53,16 +52,8 @@ extern "C" { //#include <arpa/inet.h> #endif -#if (defined EVENT_SSL_FOUND && defined OPENSSL_FOUND && defined OPENSSL_HAS_ELIPTIC_CURVES) -#include <openssl/ssl.h> -#include <openssl/bio.h> -#include <openssl/err.h> -#include <openssl/pem.h> -#include <event2/bufferevent_ssl.h> -#endif -#include "uscxml/Message.h" -#include "uscxml/Convenience.h" // for toStr +#include "uscxml/util/Convenience.h" // for toStr #ifdef BUILD_AS_PLUGINS #include <Pluma/Connector.hpp> @@ -100,9 +91,9 @@ HTTPServer::HTTPServer(unsigned short port, unsigned short wsPort, SSLConfig* ss evhttp_set_allowed_methods(_http, allowedMethods); // allow all methods if (_port > 0) { - _httpHandle = evhttp_bind_socket_with_handle(_http, INADDR_ANY, _port); + _httpHandle = evhttp_bind_socket_with_handle(_http, NULL, _port); if (_httpHandle) { - DLOG(INFO) << "HTTP server listening on tcp/" << _port; + LOG(INFO) << "HTTP server listening on tcp/" << _port; } else { LOG(ERROR) << "HTTP server cannot bind to tcp/" << _port; } @@ -112,7 +103,7 @@ HTTPServer::HTTPServer(unsigned short port, unsigned short wsPort, SSLConfig* ss if (_wsPort > 0) { _wsHandle = evws_bind_socket(_evws, _wsPort); if (_wsHandle) { - DLOG(INFO) << "WebSocket server listening on tcp/" << _wsPort; + LOG(INFO) << "WebSocket server listening on tcp/" << _wsPort; } else { LOG(ERROR) << "WebSocket server cannot bind to tcp/" << _wsPort; } @@ -178,10 +169,10 @@ HTTPServer::~HTTPServer() { } HTTPServer* HTTPServer::_instance = NULL; -tthread::recursive_mutex HTTPServer::_instanceMutex; +std::recursive_mutex HTTPServer::_instanceMutex; HTTPServer* HTTPServer::getInstance(unsigned short port, unsigned short wsPort, SSLConfig* sslConf) { -// tthread::lock_guard<tthread::recursive_mutex> lock(_instanceMutex); +// std::lock_guard<std::recursive_mutex> lock(_instanceMutex); if (_instance == NULL) { #ifdef _WIN32 WSADATA wsaData; @@ -247,7 +238,7 @@ void HTTPServer::wsRecvReqCallback(struct evws_connection *conn, struct evws_fra // try with the handler registered for path first bool answered = false; if (callbackData != NULL) - answered = ((WebSocketServlet*)callbackData)->wsRecvRequest(conn, wsFrame); + answered = ((WebSocketServlet*)callbackData)->requestFromWS(conn, wsFrame); if (!answered) HTTPServer::getInstance()->processByMatchingServlet(conn, wsFrame); @@ -415,12 +406,13 @@ void HTTPServer::httpRecvReqCallback(struct evhttp_request *req, void *callbackD request.data.compound["content"] = json; } } else if (iequals(contentType.substr(0, 15), "application/xml")) { - NameSpacingParser parser = NameSpacingParser::fromXML(request.data.compound["content"].atom); - if (parser.errorsReported()) { - LOG(ERROR) << "Cannot parse contents of HTTP request as XML"; - } else { - request.data.compound["content"].node = parser.getDocument().getDocumentElement(); - } + assert(0); +// NameSpacingParser parser = NameSpacingParser::fromXML(request.data.compound["content"].atom); +// if (parser.errorsReported()) { +// LOG(ERROR) << "Cannot parse contents of HTTP request as XML"; +// } else { +// request.data.compound["content"].node = parser.getDocument().getDocumentElement(); +// } } } @@ -429,7 +421,7 @@ void HTTPServer::httpRecvReqCallback(struct evhttp_request *req, void *callbackD // try with the handler registered for path first bool answered = false; if (callbackData != NULL) - answered = ((HTTPServlet*)callbackData)->httpRecvRequest(request); + answered = ((HTTPServlet*)callbackData)->requestFromHTTP(request); if (!answered) HTTPServer::getInstance()->processByMatchingServlet(request); @@ -437,7 +429,7 @@ void HTTPServer::httpRecvReqCallback(struct evhttp_request *req, void *callbackD void HTTPServer::processByMatchingServlet(const Request& request) { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + std::lock_guard<std::recursive_mutex> lock(_mutex); http_servlet_iter_t servletIter = _httpServlets.begin(); @@ -459,7 +451,7 @@ void HTTPServer::processByMatchingServlet(const Request& request) { // process by best matching servlet until someone feels responsible std::map<std::string, HTTPServlet*, comp_strsize_less>::iterator matchesIter = matches.begin(); while(matchesIter != matches.end()) { - if (matchesIter->second->httpRecvRequest(request)) { + if (matchesIter->second->requestFromHTTP(request)) { return; } matchesIter++; @@ -470,7 +462,7 @@ void HTTPServer::processByMatchingServlet(const Request& request) { } void HTTPServer::processByMatchingServlet(evws_connection* conn, const WSFrame& frame) { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + std::lock_guard<std::recursive_mutex> lock(_mutex); ws_servlet_iter_t servletIter = _wsServlets.begin(); @@ -490,7 +482,7 @@ void HTTPServer::processByMatchingServlet(evws_connection* conn, const WSFrame& // process by best matching servlet until someone feels responsible std::map<std::string, WebSocketServlet*, comp_strsize_less>::iterator matchesIter = matches.begin(); while(matchesIter != matches.end()) { - if (matchesIter->second->wsRecvRequest(conn, frame)) { + if (matchesIter->second->requestFromWS(conn, frame)) { return; } matchesIter++; @@ -568,10 +560,11 @@ bool HTTPServer::registerServlet(const std::string& path, HTTPServlet* servlet) HTTPServer* INSTANCE = getInstance(); if (!INSTANCE->_httpHandle) { + LOG(INFO) << "Registering at unstarted HTTP Server"; return true; // this is the culprit! } - tthread::lock_guard<tthread::recursive_mutex> lock(INSTANCE->_mutex); + std::lock_guard<std::recursive_mutex> lock(INSTANCE->_mutex); // remove trailing and leading slash std::string actualPath = path; @@ -596,7 +589,7 @@ bool HTTPServer::registerServlet(const std::string& path, HTTPServlet* servlet) servlet->setURL(servletURL.str()); INSTANCE->_httpServlets[suffixedPath] = servlet; - DLOG(INFO) << "HTTP Servlet listening at: " << servletURL.str() << std::endl; +// LOG(INFO) << "HTTP Servlet listening at: " << servletURL.str(); // register callback evhttp_set_cb(INSTANCE->_http, ("/" + suffixedPath).c_str(), HTTPServer::httpRecvReqCallback, servlet); @@ -606,7 +599,7 @@ bool HTTPServer::registerServlet(const std::string& path, HTTPServlet* servlet) void HTTPServer::unregisterServlet(HTTPServlet* servlet) { HTTPServer* INSTANCE = getInstance(); - tthread::lock_guard<tthread::recursive_mutex> lock(INSTANCE->_mutex); + std::lock_guard<std::recursive_mutex> lock(INSTANCE->_mutex); http_servlet_iter_t servletIter = INSTANCE->_httpServlets.begin(); while(servletIter != INSTANCE->_httpServlets.end()) { if (servletIter->second == servlet) { @@ -624,7 +617,7 @@ bool HTTPServer::registerServlet(const std::string& path, WebSocketServlet* serv if (!INSTANCE->_wsHandle) return true; - tthread::lock_guard<tthread::recursive_mutex> lock(INSTANCE->_mutex); + std::lock_guard<std::recursive_mutex> lock(INSTANCE->_mutex); // remove trailing and leading slash std::string actualPath = path; @@ -660,7 +653,7 @@ bool HTTPServer::registerServlet(const std::string& path, WebSocketServlet* serv void HTTPServer::unregisterServlet(WebSocketServlet* servlet) { HTTPServer* INSTANCE = getInstance(); - tthread::lock_guard<tthread::recursive_mutex> lock(INSTANCE->_mutex); + std::lock_guard<std::recursive_mutex> lock(INSTANCE->_mutex); ws_servlet_iter_t servletIter = INSTANCE->_wsServlets.begin(); while(servletIter != INSTANCE->_wsServlets.end()) { if (servletIter->second == servlet) { @@ -696,7 +689,7 @@ std::string HTTPServer::getBaseURL(ServerType type) { void HTTPServer::start() { _isRunning = true; - _thread = new tthread::thread(HTTPServer::run, this); + _thread = new std::thread(HTTPServer::run, this); } void HTTPServer::run(void* instance) { @@ -704,7 +697,7 @@ void HTTPServer::run(void* instance) { while(INSTANCE->_isRunning) { event_base_dispatch(INSTANCE->_base); } - LOG(INFO) << "HTTP Server stopped" << std::endl; + LOG(INFO) << "HTTP Server stopped"; } void HTTPServer::determineAddress() { diff --git a/src/uscxml/server/HTTPServer.h b/src/uscxml/server/HTTPServer.h index 7fedd83..a584360 100644 --- a/src/uscxml/server/HTTPServer.h +++ b/src/uscxml/server/HTTPServer.h @@ -24,6 +24,8 @@ #include <map> // for map, map<>::iterator, etc #include <string> // for string, operator< +#include <thread> +#include <mutex> extern "C" { #include "event2/util.h" // for evutil_socket_t @@ -32,8 +34,7 @@ extern "C" { } #include "uscxml/Common.h" // for USCXML_API -#include "uscxml/Message.h" // for Data, Event -#include "uscxml/concurrency/tinythread.h" // for recursive_mutex, etc +#include "uscxml/messages/Event.h" // for Data, Event #include "uscxml/config.h" // for OPENSSL_FOUND namespace uscxml { @@ -54,14 +55,7 @@ public: } }; - class WSFrame : public Event { - public: - WSFrame() : evwsConn(NULL) {} - std::string content; - struct evws_connection* evwsConn; - }; - - class SSLConfig { + class USCXML_API SSLConfig { public: SSLConfig() : port(8443) {} std::string privateKey; @@ -69,7 +63,14 @@ public: unsigned short port; }; - class Reply { + class WSFrame : public Event { + public: + WSFrame() : evwsConn(NULL) {} + std::string content; + struct evws_connection* evwsConn; + }; + + class USCXML_API Reply { public: Reply() : status(200), type("get"), evhttpReq(NULL) {} Reply(Request req) : status(200), type(req.data.compound["type"].atom), evhttpReq(req.evhttpReq) {} @@ -140,7 +141,7 @@ private: }; }; - HTTPServer(unsigned short port, unsigned short wsPort, SSLConfig* sslConf = NULL); + HTTPServer(unsigned short port, unsigned short wsPort, SSLConfig* sslConf); virtual ~HTTPServer(); void start(); @@ -178,9 +179,9 @@ private: static HTTPServer* _instance; - static tthread::recursive_mutex _instanceMutex; - tthread::thread* _thread; - tthread::recursive_mutex _mutex; + static std::recursive_mutex _instanceMutex; + std::thread* _thread; + std::recursive_mutex _mutex; bool _isRunning; friend class HTTPServlet; @@ -199,7 +200,7 @@ private: class USCXML_API HTTPServlet { public: virtual ~HTTPServlet() {} - virtual bool httpRecvRequest(const HTTPServer::Request& request) = 0; + virtual bool requestFromHTTP(const HTTPServer::Request& request) = 0; virtual void setURL(const std::string& url) = 0; /// Called by the server with the actual URL virtual bool canAdaptPath() { return true; @@ -209,7 +210,7 @@ public: class USCXML_API WebSocketServlet { public: virtual ~WebSocketServlet() {} - virtual bool wsRecvRequest(struct evws_connection *conn, const HTTPServer::WSFrame& frame) = 0; + virtual bool requestFromWS(struct evws_connection *conn, const HTTPServer::WSFrame& frame) = 0; virtual void setURL(const std::string& url) = 0; /// Called by the server with the actual URL virtual bool canAdaptPath() { return true; diff --git a/src/uscxml/server/InterpreterServlet.cpp b/src/uscxml/server/InterpreterServlet.cpp deleted file mode 100644 index 285add9..0000000 --- a/src/uscxml/server/InterpreterServlet.cpp +++ /dev/null @@ -1,215 +0,0 @@ -/** - * @file - * @author 2012-2013 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) - * @copyright Simplified BSD - * - * @cond - * This program is free software: you can redistribute it and/or modify - * it under the terms of the FreeBSD license as published by the FreeBSD - * project. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the FreeBSD license along with this - * program. If not, see <http://www.opensource.org/licenses/bsd-license>. - * @endcond - */ - -#include "InterpreterServlet.h" -#include "uscxml/Interpreter.h" -#include <glog/logging.h> -#include <DOM/io/Stream.hpp> - -namespace uscxml { - -InterpreterHTTPServlet::~InterpreterHTTPServlet() { - HTTPServer::unregisterServlet(this); -} - -InterpreterHTTPServlet::InterpreterHTTPServlet(InterpreterImpl* interpreter) { - _interpreter = interpreter; - - std::stringstream path; - path << _interpreter->getName(); - int i = 2; - while(!HTTPServer::registerServlet(path.str(), this)) { - path.clear(); - path.str(); - path << _interpreter->getName() << i++; - } - _path = path.str(); -} - -boost::shared_ptr<IOProcessorImpl> InterpreterHTTPServlet::create(InterpreterImpl* interpreter) { - // we instantiate directly in Interpreter - boost::shared_ptr<IOProcessorImpl> io = boost::shared_ptr<InterpreterHTTPServlet>(new InterpreterHTTPServlet(interpreter)); - return io; -} - -bool InterpreterHTTPServlet::httpRecvRequest(const HTTPServer::Request& req) { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - - // evhttp_request_own(req.curlReq); - - _requests[toStr((uintptr_t)req.evhttpReq)] = req; - - Event event = req; - - event.name = "http." + event.data.compound["type"].atom; - event.origin = toStr((uintptr_t)req.evhttpReq); - - if (!event.data.compound["content"].empty()) { - if (event.data.compound["content"].compound.size() > 0) { - std::map<std::string, Data>::iterator compoundIter = event.data.compound["content"].compound.begin(); - while(compoundIter != event.data.compound["content"].compound.end()) { -// std::cout << compoundIter->second.atom << std::endl; - Data json = Data::fromJSON(compoundIter->second.atom); - if (!json.empty()) { -// std::cout << Data::toJSON(json) << std::endl; - compoundIter->second = json; - } - compoundIter++; - } - } - } - - _interpreter->receive(event); - return true; -} - -Data InterpreterHTTPServlet::getDataModelVariables() { - Data data; - if(_url.length() > 0) - data.compound["location"] = Data(_url, Data::VERBATIM); - return data; -} - -void InterpreterHTTPServlet::send(const SendRequest& req) { - LOG(ERROR) << "send not supported by http iorprocessor, use the fetch element"; -} - -InterpreterWebSocketServlet::~InterpreterWebSocketServlet() { - HTTPServer::unregisterServlet(this); -} - -InterpreterWebSocketServlet::InterpreterWebSocketServlet(InterpreterImpl* interpreter) { - _interpreter = interpreter; - - std::stringstream path; - path << _interpreter->getName(); - int i = 2; - while(!HTTPServer::registerServlet(path.str(), this)) { - path.clear(); - path.str(); - path << _interpreter->getName() << i++; - } - _path = path.str(); -} - -boost::shared_ptr<IOProcessorImpl> InterpreterWebSocketServlet::create(InterpreterImpl* interpreter) { - // we instantiate directly in Interpreter - boost::shared_ptr<IOProcessorImpl> io = boost::shared_ptr<InterpreterWebSocketServlet>(new InterpreterWebSocketServlet(interpreter)); - return io; -} - -bool InterpreterWebSocketServlet::wsRecvRequest(struct evws_connection *conn, const HTTPServer::WSFrame& frame) { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - - // evhttp_request_own(req.curlReq); - - _requests[toStr((uintptr_t)conn)] = conn; - - Event event = frame; - - event.name = "ws." + event.data.compound["type"].atom; - event.origin = toStr((uintptr_t)conn); - - if (event.data.compound["type"].atom.compare("text") == 0 && !event.data.compound["content"].empty()) { - if (event.data.compound["content"].compound.size() > 0) { - std::map<std::string, Data>::iterator compoundIter = event.data.compound["content"].compound.begin(); - while(compoundIter != event.data.compound["content"].compound.end()) { - Data json = Data::fromJSON(compoundIter->second.atom); - if (!json.empty()) { - compoundIter->second = json; - } - compoundIter++; - } - } - } - - _interpreter->receive(event); - return true; -} - -Data InterpreterWebSocketServlet::getDataModelVariables() { - Data data; - if(_url.length() > 0) - data.compound["location"] = Data(_url, Data::VERBATIM); - return data; -} - -void InterpreterWebSocketServlet::send(const SendRequest& req) { - - if (req.data.empty()) { - LOG(WARNING) << "No content given to send on websocket!"; - return; - } - - if (_requests.find(req.target) != _requests.end()) { - // send data to the given connection - if (false) { - } else if (req.data.binary) { - HTTPServer::wsSend(_requests[req.target], - EVWS_BINARY_FRAME, - req.data.binary.getData(), - req.data.binary.getSize()); - } else if (req.data.node) { - std::stringstream ssXML; - ssXML << req.data.node; - std::string data = ssXML.str(); - HTTPServer::wsSend(_requests[req.target], - EVWS_TEXT_FRAME, - data.c_str(), - data.length()); - } else if (!req.data.empty()) { - std::string data = Data::toJSON(req.data); - HTTPServer::wsSend(_requests[req.target], - EVWS_TEXT_FRAME, - data.c_str(), - data.length()); - } else { - LOG(WARNING) << "Not sure what to make off content given to send on websocket!"; - } - } else if(req.target.size() && req.target.compare(0, 1, "/") == 0) { - // broadcast to the given path - if (false) { - } else if (req.data.binary) { - HTTPServer::wsBroadcast(req.target.c_str(), - EVWS_BINARY_FRAME, - req.data.binary.getData(), - req.data.binary.getSize()); - } else if (req.data.node) { - std::stringstream ssXML; - ssXML << req.data.node; - std::string data = ssXML.str(); - HTTPServer::wsBroadcast(req.target.c_str(), - EVWS_TEXT_FRAME, - data.c_str(), - data.length()); - } else if (!req.data.empty()) { - std::string data = Data::toJSON(req.data); - HTTPServer::wsBroadcast(req.target.c_str(), - EVWS_TEXT_FRAME, - data.c_str(), - data.length()); - } else { - LOG(WARNING) << "Not sure what to make off content given to broadcast on websocket!"; - } - } else { - LOG(WARNING) << "Invalid target for websocket"; - } -} - -}
\ No newline at end of file diff --git a/src/uscxml/server/InterpreterServlet.h b/src/uscxml/server/InterpreterServlet.h deleted file mode 100644 index 436574b..0000000 --- a/src/uscxml/server/InterpreterServlet.h +++ /dev/null @@ -1,135 +0,0 @@ -/** - * @file - * @author 2012-2013 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) - * @copyright Simplified BSD - * - * @cond - * This program is free software: you can redistribute it and/or modify - * it under the terms of the FreeBSD license as published by the FreeBSD - * project. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the FreeBSD license along with this - * program. If not, see <http://www.opensource.org/licenses/bsd-license>. - * @endcond - */ - -#ifndef INTERPRETERSERVLET_H_XQLWNMH4 -#define INTERPRETERSERVLET_H_XQLWNMH4 - -#include "HTTPServer.h" -#include "uscxml/messages/SendRequest.h" // for SendRequest -#include "uscxml/plugins/IOProcessor.h" // for IOProcessorImpl - -namespace uscxml { - -class InterpreterImpl; - -class InterpreterHTTPServlet : public HTTPServlet, public IOProcessorImpl { -public: - InterpreterHTTPServlet() {}; - InterpreterHTTPServlet(InterpreterImpl* interpreter); - virtual ~InterpreterHTTPServlet(); - - virtual boost::shared_ptr<IOProcessorImpl> create(InterpreterImpl* interpreter); - - virtual std::list<std::string> getNames() { - std::list<std::string> names; - names.push_back("http"); - names.push_back("http://www.w3.org/TR/scxml/#HTTPEventProcessor"); - return names; - } - - Data getDataModelVariables(); - virtual void send(const SendRequest& req); - - virtual bool httpRecvRequest(const HTTPServer::Request& req); - - std::string getPath() { - return _path; - } - std::string getURL() { - return _url; - } - void setURL(const std::string& url) { - _url = url; - } - bool canAdaptPath() { - return false; - } - - - std::map<std::string, HTTPServer::Request>& getRequests() { - return _requests; - } - tthread::recursive_mutex& getMutex() { - return _mutex; - } - -protected: - InterpreterImpl* _interpreter; - - tthread::recursive_mutex _mutex; - std::map<std::string, HTTPServer::Request> _requests; - std::string _path; - std::string _url; - -}; - -class InterpreterWebSocketServlet : public WebSocketServlet, public IOProcessorImpl { -public: - InterpreterWebSocketServlet() {}; - InterpreterWebSocketServlet(InterpreterImpl* interpreter); - virtual ~InterpreterWebSocketServlet(); - - virtual boost::shared_ptr<IOProcessorImpl> create(InterpreterImpl* interpreter); - - virtual std::list<std::string> getNames() { - std::list<std::string> names; - names.push_back("websocket"); - names.push_back("http://www.w3.org/TR/scxml/#WebSocketEventProcessor"); - return names; - } - - Data getDataModelVariables(); - virtual void send(const SendRequest& req); - - virtual bool wsRecvRequest(struct evws_connection *conn, const HTTPServer::WSFrame& frame); - - std::string getPath() { - return _path; - } - std::string getURL() { - return _url; - } - void setURL(const std::string& url) { - _url = url; - } - bool canAdaptPath() { - return false; - } - - std::map<std::string, struct evws_connection*>& getRequests() { - return _requests; - } - tthread::recursive_mutex& getMutex() { - return _mutex; - } - -protected: - InterpreterImpl* _interpreter; - - tthread::recursive_mutex _mutex; - std::map<std::string, struct evws_connection*> _requests; - std::string _path; - std::string _url; - -}; - -} - - -#endif /* end of include guard: INTERPRETERSERVLET_H_XQLWNMH4 */ diff --git a/src/uscxml/server/Socket.cpp b/src/uscxml/server/Socket.cpp deleted file mode 100644 index 35b416e..0000000 --- a/src/uscxml/server/Socket.cpp +++ /dev/null @@ -1,389 +0,0 @@ -/** - * @file - * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) - * @copyright Simplified BSD - * - * @cond - * This program is free software: you can redistribute it and/or modify - * it under the terms of the FreeBSD license as published by the FreeBSD - * project. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the FreeBSD license along with this - * program. If not, see <http://www.opensource.org/licenses/bsd-license>. - * @endcond - */ - -#include "Socket.h" - -#include "uscxml/Common.h" // for Data, Event -#include "uscxml/Message.h" // for Data, Event -#include "uscxml/config.h" // for OPENSSL_FOUND - -#ifndef _WIN32 -#include <sys/socket.h> /* For socket functions */ -#include <arpa/inet.h> // inet_addr -#endif - -#include <fcntl.h> /* For fcntl */ -#include <iostream> - -namespace uscxml { - -// see: http://codepad.org/XRJAVg5m -Socket::Socket(int domain, int type, int protocol) { - - _base = EventBase::get("sockets"); - _blockSizeRead = 1024; - - if (!_base) - throw std::runtime_error("Cannot get eventbase"); - - _sin.sin_family = domain; - _socketFD = socket(domain, type, protocol); - - if (_socketFD == -1) - throw std::runtime_error(std::string("socket: ") + strerror(errno)); - -} - -Socket::~Socket() { - if (_socketFD > 0) -#ifdef WIN32 - closesocket(_socketFD); -#else - close(_socketFD); -#endif -} - -void Socket::setupSockAddr(const std::string& address, int port) { - - if (address == "*") { - _sin.sin_addr.s_addr = 0; - } else { - struct hostent *he = NULL; - if ( (he = gethostbyname(address.c_str()) ) != NULL ) { - memcpy(&_sin.sin_addr, he->h_addr_list[0], he->h_length); - } else { - _sin.sin_addr.s_addr = inet_addr(address.c_str()); - } - if (_sin.sin_addr.s_addr == INADDR_NONE) - throw std::runtime_error(std::string("inet_addr: ") + strerror(errno)); - } - - _sin.sin_port = htons(port); -} - -void Socket::setBlockSizeRead(size_t size) { -// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - _blockSizeRead = size; -} - -void Socket::parseAddress(const std::string& address, std::string& protocol, std::string& hostName, uint16_t& port) { - // tcp://hostname:port - size_t protEnd = address.find("://"); - if (protEnd != std::string::npos) { - protocol = address.substr(0, protEnd); - protEnd += 3; - } else { - protocol = "tcp"; - protEnd = 0; - } - - size_t hostEnd = address.find(":", protEnd); - if (hostEnd != std::string::npos) { - hostName = address.substr(protEnd, hostEnd - protEnd); - hostEnd += 1; - } else { - hostName = "127.0.0.1"; - hostEnd = protEnd; - } - - if (hostEnd < address.size()) { - port = strTo<uint16_t>(address.substr(hostEnd)); - } else { - port = 0; - } -} - - -ClientSocket::ClientSocket(int domain, int type, int protocol) : Socket(domain, type, protocol), _clientEvent(NULL) { -} - - -ClientSocket::~ClientSocket() { - if (_clientEvent) { - bufferevent_enable(_clientEvent, 0); - bufferevent_free(_clientEvent); - } -} - -void ClientSocket::errorCallback(struct bufferevent *bev, short error, void *ctx) { -// ClientSocket* instance = (ClientSocket*)ctx; - // tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); - - if (error & BEV_EVENT_READING) { - std::cout << "ClientSocket: error encountered while reading" << std::endl; - } else if (error & BEV_EVENT_WRITING) { - std::cout << "ClientSocket: error encountered while writing" << std::endl; - } else if (error & BEV_EVENT_EOF) { - std::cout << "ClientSocket: eof file reached" << std::endl; - } else if (error & BEV_EVENT_ERROR) { - std::cout << "ClientSocket: unrecoverable error encountered" << std::endl; - } else if (error & BEV_EVENT_TIMEOUT) { - std::cout << "ClientSocket: user-specified timeout reached" << std::endl; - } else if (error & BEV_EVENT_CONNECTED) { - std::cout << "ClientSocket: connect operation finished" << std::endl; - } - - // bufferevent_free(bev); -} - -void ClientSocket::connect(const std::string& address) { - std::string _prot; - std::string _address; - uint16_t _port; - parseAddress(address, _prot, _address, _port); - connect(_address, _port); -} - -void ClientSocket::connect(const std::string& address, int port) { -// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - - setupSockAddr(address, port); - if(::connect(_socketFD, (struct sockaddr *)&_sin, sizeof _sin) != 0) { - throw std::runtime_error(std::string("connect: ") + strerror(errno)); - } - - _clientEvent = bufferevent_socket_new(_base->base, _socketFD, BEV_OPT_THREADSAFE); //BEV_OPT_THREADSAFE); - bufferevent_setcb(_clientEvent, ClientSocket::readCallback, NULL, ClientSocket::errorCallback, this); - bufferevent_enable(_clientEvent, EV_READ|EV_WRITE); -} - -int ClientSocket::write(const std::string& data) { - return write(data.data(), data.size()); -} - -int ClientSocket::write(const char* data, size_t size) { -// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - bufferevent_write(_clientEvent, data, size); - return size; -} - -void ClientSocket::readCallback(struct bufferevent *bev, void *ctx) { - ClientSocket* instance = (ClientSocket*)ctx; -// tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); - - int n; - struct evbuffer* input; - char* data = (char*)malloc(instance->_blockSizeRead); - - input = bufferevent_get_input(bev); - - while((n = evbuffer_remove(input, data, instance->_blockSizeRead)) > 0) { - instance->readCallback(data, n); - } - free(data); -} - -std::set<ServerSocket*> ServerSocket::_instances; - -ServerSocket::ServerSocket(int domain, int type, int protocol) : Socket(domain, type, protocol), _listenerEvent(NULL) { - _instances.insert(this); -} - -ServerSocket::~ServerSocket() { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - - std::map<struct bufferevent*, Connection>::iterator connIter = _connections.begin(); - while(connIter != _connections.end()) { - bufferevent_enable(connIter->second.bufferEvent, 0); - bufferevent_setcb(connIter->second.bufferEvent, NULL, NULL, NULL, 0); - - bufferevent_free(connIter->second.bufferEvent); -#ifdef WIN32 - closesocket(connIter->second.fd); -#else - close(connIter->second.fd); -#endif - - connIter++; - } - - if (_listenerEvent) { - event_del(_listenerEvent); - event_free(_listenerEvent); - } - - _instances.erase(this); - -} - -void ServerSocket::errorCallback(struct bufferevent *bev, short error, void *ctx) { - ServerSocket* instance = (ServerSocket*)ctx; - tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); - - if (_instances.find(instance) == _instances.end()) - return; - - if (error & BEV_EVENT_READING || error & BEV_EVENT_WRITING) { - // remote end close the connection - tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); - std::map<struct bufferevent*, Connection>::iterator conn = instance->_connections.find(bev); - if (conn != instance->_connections.end()) { - bufferevent_enable(conn->second.bufferEvent, 0); - bufferevent_free(conn->second.bufferEvent); -#ifdef WIN32 - closesocket(conn->second.fd); -#else - close(conn->second.fd); -#endif - instance->_connections.erase(conn); - } - } else if (error & BEV_EVENT_EOF) { - std::cout << "ServerSocket: eof file reached" << std::endl; - } else if (error & BEV_EVENT_ERROR) { - std::cout << "ServerSocket: unrecoverable error encountered" << std::endl; - } else if (error & BEV_EVENT_TIMEOUT) { - std::cout << "ServerSocket: user-specified timeout reached" << std::endl; - } else if (error & BEV_EVENT_CONNECTED) { - std::cout << "ServerSocket: connect operation finished" << std::endl; - } - // bufferevent_free(bev); -} - -void ServerSocket::readCallback(struct bufferevent *bev, void *ctx) { - ServerSocket* instance = (ServerSocket*)ctx; - tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); - - // instance is already gone - if (_instances.find(instance) == _instances.end()) - return; - - size_t n; - struct evbuffer* input; - char* data = (char*)malloc(instance->_blockSizeRead); - - input = bufferevent_get_input(bev); - while((n = evbuffer_remove(input, data, instance->_blockSizeRead)) > 0) { - instance->readCallback(data, n, instance->_connections[bev]); - } - free(data); -} - -void ServerSocket::bind() { - if (::bind(_socketFD, (struct sockaddr*)&_sin, sizeof(_sin)) < 0) { - throw std::runtime_error(std::string("bind: ") + strerror(errno)); - } -} - -void ServerSocket::listen(const std::string& address) { - std::string _prot; - std::string _address; - uint16_t _port; - parseAddress(address, _prot, _address, _port); - listen(_address, _port); -} - -void ServerSocket::listen(const std::string& address, int port) { -// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - setupSockAddr(address, port); - bind(); - - int one = 1; - if (setsockopt(_socketFD, SOL_SOCKET, SO_REUSEADDR, (const char*)&one, sizeof(one)) != 0) { - throw std::runtime_error(std::string("setsockopt: ") + strerror(errno)); - } - -#ifndef _WIN32 - int flags = fcntl(_socketFD, F_GETFL); - if (flags >= 0) { - flags |= O_NONBLOCK; - if (fcntl(_socketFD, F_SETFL, flags) < 0) { - // could not set to non-block - } - } -#else - unsigned long on = 1; - if (ioctlsocket(_socketFD, FIONBIO, &on) != 0) { - // could not set to non-block - } -#endif - - _listenerEvent = event_new(_base->base, _socketFD, EV_READ|EV_PERSIST, acceptCallback, (void*)this); - /*XXX check it */ - event_add(_listenerEvent, NULL); - - if (::listen(_socketFD, 16)<0) { - throw std::runtime_error(std::string("listen: ") + strerror(errno)); - } -} - -void ServerSocket::acceptCallback(evutil_socket_t listener, short event, void *ctx) { - ServerSocket* instance = (ServerSocket*)ctx; -// tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); - - struct sockaddr_storage ss; - socklen_t slen = sizeof(ss); - int fd = accept(listener, (struct sockaddr*)&ss, &slen); - if (fd < 0) { - throw std::runtime_error(std::string("accept: ") + strerror(errno)); - } else if (fd > FD_SETSIZE) { -#ifdef WIN32 - closesocket(fd); -#else - close(fd); -#endif - - throw std::runtime_error(std::string("accept: ") + strerror(errno)); - } else { - struct bufferevent *bev; - evutil_make_socket_nonblocking(fd); - bev = bufferevent_socket_new(instance->_base->base, fd, BEV_OPT_THREADSAFE); //BEV_OPT_THREADSAFE - bufferevent_setcb(bev, ServerSocket::readCallback, NULL, ServerSocket::errorCallback, ctx); - bufferevent_enable(bev, EV_READ|EV_WRITE); - - instance->_connections[bev].bufferEvent = bev; - instance->_connections[bev].fd = fd; - } -} - -void ServerSocket::Connection::reply(const char* data, size_t size) { - bufferevent_write(bufferEvent, data, size); -} - -PacketServerSocket::~PacketServerSocket() { - for(std::map<Connection, std::stringstream*>::iterator fragIter = _fragments.begin(); - fragIter != _fragments.end(); - fragIter++) { - delete fragIter->second; - } -} - -void PacketServerSocket::readCallback(const char* data, size_t size, Connection& conn) { - if (_fragments.find(conn) == _fragments.end()) - _fragments[conn] = new std::stringstream(); - - std::stringstream* fragment = _fragments[conn]; - *fragment << std::string(data, size); - - size_t startPos = 0; - size_t endPos; - const std::string& buffer = fragment->str(); - while((endPos = buffer.find(_sep, startPos)) != std::string::npos) { -// std::cout << ">" << buffer.substr(startPos, endPos - startPos) << "<" << std::endl; - readCallback(buffer.substr(startPos, endPos - startPos), conn); - startPos = endPos + _sep.size(); - } - if (startPos != 0 && startPos < buffer.size() + 1) { - std::string rest = buffer.substr(startPos); - fragment->str(std::string()); - fragment->clear(); - *fragment << rest; - } -} - -} diff --git a/src/uscxml/server/Socket.h b/src/uscxml/server/Socket.h deleted file mode 100644 index 01e91b2..0000000 --- a/src/uscxml/server/Socket.h +++ /dev/null @@ -1,139 +0,0 @@ -/** - * @file - * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) - * @copyright Simplified BSD - * - * @cond - * This program is free software: you can redistribute it and/or modify - * it under the terms of the FreeBSD license as published by the FreeBSD - * project. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the FreeBSD license along with this - * program. If not, see <http://www.opensource.org/licenses/bsd-license>. - * @endcond - */ - -#ifndef SOCKETCLIENT_H_9A0B2A88 -#define SOCKETCLIENT_H_9A0B2A88 - -#include "uscxml/Common.h" // for USCXML_API -#include "uscxml/concurrency/EventBase.h" -#include <string> -#include <sstream> -#include <map> -#include <set> - -#ifdef _WIN32 -# include <winsock2.h> -#else -# include <netinet/in.h> /* For sockaddr_in */ -#endif -#include <cerrno> - -#include "uscxml/concurrency/tinythread.h" // for recursive_mutex, etc - -extern "C" { -#include <event2/event.h> -#include <event2/buffer.h> -#include <event2/bufferevent.h> -} - -namespace uscxml { - -class USCXML_API Socket { -public: - Socket(int domain, int type, int protocol); - virtual ~Socket(); - - void setBlockSizeRead(size_t size); - static void parseAddress(const std::string& address, std::string& protocol, std::string& hostName, uint16_t& port); - -protected: - - void setupSockAddr(const std::string& address, int port); - - evutil_socket_t _socketFD; - - tthread::recursive_mutex _mutex; - size_t _blockSizeRead; - struct sockaddr_in _sin; - - boost::shared_ptr<EventBase> _base; -}; - -class USCXML_API ServerSocket : public Socket { -public: - class Connection { - public: - bool operator<(const Connection& other) const { - return bufferEvent < other.bufferEvent; - } - - struct bufferevent* bufferEvent; - int fd; - - void reply(const char* data, size_t size); - }; - - ServerSocket(int domain, int type, int protocol); - virtual ~ServerSocket(); - - void listen(const std::string& address, int port); - void listen(const std::string& address); - virtual void readCallback(const char* data, size_t size, Connection& conn) {}; - - -protected: - void bind(); - static void acceptCallback(evutil_socket_t listener, short event, void *ctx); - static void errorCallback(struct bufferevent *bev, short error, void *ctx); - static void readCallback(struct bufferevent *bev, void *ctx); - - std::map<struct bufferevent*, Connection> _connections; - struct event* _listenerEvent; - - static std::set<ServerSocket*> _instances; - -}; - -class USCXML_API PacketServerSocket : public ServerSocket { -public: - PacketServerSocket(int domain, int type, int protocol, const std::string& sep) : ServerSocket(domain, type, protocol), _sep(sep) {} - virtual ~PacketServerSocket(); - - void readCallback(const char* data, size_t size, Connection& conn); - virtual void readCallback(const std::string& packet, Connection& conn) = 0; - -protected: - std::string _sep; - std::map<Connection, std::stringstream*> _fragments; -}; - -class USCXML_API ClientSocket : public Socket { -public: - ClientSocket(int domain, int type, int protocol); - virtual ~ClientSocket(); - - virtual void readCallback(const char* data, size_t size) {}; - void connect(const std::string& address, int port); - void connect(const std::string& address); - int write(const std::string& data); - int write(const char* data, size_t size); - - -protected: - static void readCallback(struct bufferevent *bev, void *ctx); - static void errorCallback(struct bufferevent *bev, short error, void *ctx); - - struct bufferevent* _clientEvent; - -}; - - -} - -#endif /* end of include guard: SOCKETCLIENT_H_9A0B2A88 */ |