diff options
author | Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de> | 2014-07-16 21:54:17 (GMT) |
---|---|---|
committer | Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de> | 2014-07-16 21:54:17 (GMT) |
commit | d8dc46cc000d81d031c5f08806dc26a3b433e1bf (patch) | |
tree | b4bc051b80a47ac849631a5da6da5fdc3bbd8d4d | |
parent | b9fcc8d669491725d8dc37583958eadd31a46456 (diff) | |
download | uscxml-d8dc46cc000d81d031c5f08806dc26a3b433e1bf.zip uscxml-d8dc46cc000d81d031c5f08806dc26a3b433e1bf.tar.gz uscxml-d8dc46cc000d81d031c5f08806dc26a3b433e1bf.tar.bz2 |
Generic socket support
19 files changed, 657 insertions, 11 deletions
diff --git a/src/bindings/swig/java/CMakeLists.txt b/src/bindings/swig/java/CMakeLists.txt index 86f2fd6..eb51f83 100644 --- a/src/bindings/swig/java/CMakeLists.txt +++ b/src/bindings/swig/java/CMakeLists.txt @@ -31,6 +31,7 @@ set_target_properties(uscxmlNativeJava PROPERTIES COMPILE_FLAGS "-DSWIG") swig_link_libraries(uscxmlNativeJava uscxml) +FIND_PROGRAM(ANT_EXECUTABLE ant PATHS $ENV{ANT_HOME}/bin ENV PATH ) if (ANT_EXECUTABLE) set(USCXML_LANGUAGE_BINDINGS "java ${USCXML_LANGUAGE_BINDINGS}") diff --git a/src/uscxml/Interpreter.cpp b/src/uscxml/Interpreter.cpp index 8a9ba63..90f00c4 100644 --- a/src/uscxml/Interpreter.cpp +++ b/src/uscxml/Interpreter.cpp @@ -29,7 +29,7 @@ #include "uscxml/plugins/invoker/http/HTTPServletInvoker.h" #include "uscxml/server/InterpreterServlet.h" -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include <DOM/Simple/DOMImplementation.hpp> #include <SAX/helpers/InputSourceResolver.hpp> diff --git a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp b/src/uscxml/concurrency/DelayedEventQueue.cpp index 642c4a0..642c4a0 100644 --- a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp +++ b/src/uscxml/concurrency/DelayedEventQueue.cpp diff --git a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.h b/src/uscxml/concurrency/DelayedEventQueue.h index 2248c47..2248c47 100644 --- a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.h +++ b/src/uscxml/concurrency/DelayedEventQueue.h diff --git a/src/uscxml/concurrency/EventBase.cpp b/src/uscxml/concurrency/EventBase.cpp new file mode 100644 index 0000000..b5c1308 --- /dev/null +++ b/src/uscxml/concurrency/EventBase.cpp @@ -0,0 +1,74 @@ +/** + * @file + * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) + * @copyright Simplified BSD + * + * @cond + * This program is free software: you can redistribute it and/or modify + * it under the terms of the FreeBSD license as published by the FreeBSD + * project. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the FreeBSD license along with this + * program. If not, see <http://www.opensource.org/licenses/bsd-license>. + * @endcond + */ + +#include "EventBase.h" + +namespace uscxml { + +std::map<std::string, boost::weak_ptr<EventBase> > EventBase::_eventBases; +tthread::recursive_mutex EventBase::_instanceMutex; + +boost::shared_ptr<EventBase> EventBase::get(const std::string& name) { + tthread::lock_guard<tthread::recursive_mutex> lock(_instanceMutex); + + std::map<std::string, boost::weak_ptr<EventBase> >::const_iterator instIter = _eventBases.begin(); + while(instIter != _eventBases.end()) { + if (!instIter->second.lock()) { + _eventBases.erase(instIter++); + } else { + instIter++; + } + } + + instIter = _eventBases.find(name); + boost::shared_ptr<EventBase> instance = instIter->second.lock(); + if (instance) + return instance; + + instance = boost::shared_ptr<EventBase>(new EventBase()); + _eventBases.insert(std::make_pair(name, instance)); + + return instance; +} + +EventBase::EventBase() { + base = event_base_new(); + _isStarted = true; + _thread = new tthread::thread(EventBase::run, this); +} + +void EventBase::run(void* arg) { + EventBase* INSTANCE = (EventBase*)arg; + int result; + + while(INSTANCE->_isStarted) { + result = event_base_loop(INSTANCE->base, EVLOOP_NO_EXIT_ON_EMPTY); + (void)result; + } +} + +EventBase::~EventBase() { + _isStarted = false; + event_base_loopbreak(base); + _thread->join(); + event_base_free(base); + delete _thread; +} + +}
\ No newline at end of file diff --git a/src/uscxml/concurrency/EventBase.h b/src/uscxml/concurrency/EventBase.h new file mode 100644 index 0000000..22bd664 --- /dev/null +++ b/src/uscxml/concurrency/EventBase.h @@ -0,0 +1,61 @@ +/** + * @file + * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) + * @copyright Simplified BSD + * + * @cond + * This program is free software: you can redistribute it and/or modify + * it under the terms of the FreeBSD license as published by the FreeBSD + * project. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the FreeBSD license along with this + * program. If not, see <http://www.opensource.org/licenses/bsd-license>. + * @endcond + */ + +#ifndef EVENTBASE_H_C479DA74 +#define EVENTBASE_H_C479DA74 + +#include "uscxml/Common.h" +#include "uscxml/concurrency/tinythread.h" + +extern "C" { +#include <event2/event.h> +#include <event2/buffer.h> +#include <event2/bufferevent.h> +} + +#include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> +#include <map> +#include <string> + +namespace uscxml { + +class USCXML_API EventBase { +public: + EventBase(); + virtual ~EventBase(); + + static boost::shared_ptr<EventBase> get(const std::string& name); + struct event_base* base; + +protected: + + static void run(void*); + + tthread::thread* _thread; + bool _isStarted; + + static std::map<std::string, boost::weak_ptr<EventBase> > _eventBases; + static tthread::recursive_mutex _instanceMutex; + +}; + +} + +#endif /* end of include guard: EVENTBASE_H_C479DA74 */ diff --git a/src/uscxml/interpreter/InterpreterDraft6.cpp b/src/uscxml/interpreter/InterpreterDraft6.cpp index e2ba95e..7569fdb 100644 --- a/src/uscxml/interpreter/InterpreterDraft6.cpp +++ b/src/uscxml/interpreter/InterpreterDraft6.cpp @@ -18,7 +18,7 @@ */ #include "InterpreterDraft6.h" -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include <glog/logging.h> #include "uscxml/UUID.h" diff --git a/src/uscxml/interpreter/InterpreterRC.cpp b/src/uscxml/interpreter/InterpreterRC.cpp index 3d17c87..24b9003 100644 --- a/src/uscxml/interpreter/InterpreterRC.cpp +++ b/src/uscxml/interpreter/InterpreterRC.cpp @@ -20,7 +20,7 @@ #include "InterpreterRC.h" #include "uscxml/Factory.h" -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include <glog/logging.h> #include "uscxml/UUID.h" diff --git a/src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp b/src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp index 849845e..8a4ea3d 100644 --- a/src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp +++ b/src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp @@ -20,7 +20,7 @@ #include <boost/algorithm/string.hpp> #include "CalendarInvoker.h" -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include <glog/logging.h> diff --git a/src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp b/src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp index 22c7942..69ad686 100644 --- a/src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp +++ b/src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp @@ -20,7 +20,7 @@ #include "ExpectInvoker.h" #include <glog/logging.h> -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #ifdef BUILD_AS_PLUGINS #include <Pluma/Connector.hpp> diff --git a/src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h b/src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h index ba4e9ef..f0bc32c 100644 --- a/src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h +++ b/src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h @@ -21,7 +21,7 @@ #define HEARTBEATINVOKER_H_W09J90F0 #include <uscxml/Interpreter.h> -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #ifdef BUILD_AS_PLUGINS #include "uscxml/plugins/Plugins.h" diff --git a/src/uscxml/plugins/invoker/im/IMInvoker.cpp b/src/uscxml/plugins/invoker/im/IMInvoker.cpp index 08d6a03..2bbb855 100644 --- a/src/uscxml/plugins/invoker/im/IMInvoker.cpp +++ b/src/uscxml/plugins/invoker/im/IMInvoker.cpp @@ -23,7 +23,7 @@ #include "uscxml/DOMUtils.h" #include <boost/algorithm/string.hpp> -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #ifdef BUILD_AS_PLUGINS #include <Pluma/Connector.hpp> diff --git a/src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h b/src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h index 29bc208..3a43e49 100644 --- a/src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h +++ b/src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h @@ -35,7 +35,7 @@ extern "C" { # define USCXML_PLUGIN_API #endif -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include "uscxml/server/HTTPServer.h" #include "uscxml/Interpreter.h" #include "uscxml/Factory.h" diff --git a/src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h b/src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h index e30bdb2..0a57d70 100644 --- a/src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h +++ b/src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h @@ -20,7 +20,7 @@ #ifndef COMETIOPROCESSOR_H_2CUY93KU #define COMETIOPROCESSOR_H_2CUY93KU -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include "uscxml/server/HTTPServer.h" #include "uscxml/Interpreter.h" #include "uscxml/Factory.h" diff --git a/src/uscxml/server/Socket.cpp b/src/uscxml/server/Socket.cpp new file mode 100644 index 0000000..9c844e5 --- /dev/null +++ b/src/uscxml/server/Socket.cpp @@ -0,0 +1,291 @@ +/** + * @file + * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) + * @copyright Simplified BSD + * + * @cond + * This program is free software: you can redistribute it and/or modify + * it under the terms of the FreeBSD license as published by the FreeBSD + * project. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the FreeBSD license along with this + * program. If not, see <http://www.opensource.org/licenses/bsd-license>. + * @endcond + */ + +#include "Socket.h" + +#include "uscxml/Message.h" // for Data, Event +#include "uscxml/config.h" // for OPENSSL_FOUND + +#ifndef _WIN32 +#include <sys/socket.h> /* For socket functions */ +#include <arpa/inet.h> // inet_addr +#endif + +#include <fcntl.h> /* For fcntl */ +#include <iostream> + +namespace uscxml { + +// see: http://codepad.org/XRJAVg5m +Socket::Socket(int domain, int type, int protocol) { + + _base = EventBase::get("sockets"); + _blockSizeRead = 1024; + + if (!_base) + throw std::runtime_error("Cannot get eventbase"); + + _sin.sin_family = domain; + _socketFD = socket(domain, type, protocol); + + if (_socketFD == -1) + throw std::runtime_error(std::string("socket: ") + strerror(errno)); +#ifndef WIN32 + { + int one = 1; + if (setsockopt(_socketFD, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) { + throw std::runtime_error(std::string("setsockopt: ") + strerror(errno)); + } + } +#endif + +} + +Socket::~Socket() { + if (_socketFD > 0) +#ifdef WIN32 + closesocket(_socketFD); +#else + close(_socketFD); +#endif +} + +void Socket::setupSockAddr(const std::string& address, int port) { + if (address == "*") { + _sin.sin_addr.s_addr = 0; + } else { + _sin.sin_addr.s_addr = inet_addr(address.c_str()); + if (_sin.sin_addr.s_addr == INADDR_NONE) + throw std::runtime_error(std::string("inet_addr: ") + strerror(errno)); + } + + _sin.sin_port = htons(port); +} + +void Socket::setBlockSizeRead(size_t size) { +// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + _blockSizeRead = size; +} + +ClientSocket::ClientSocket(int domain, int type, int protocol) : Socket(domain, type, protocol), _clientEvent(NULL) { +} + + +ClientSocket::~ClientSocket() { + if (_clientEvent) { + bufferevent_enable(_clientEvent, 0); + bufferevent_free(_clientEvent); + } +} + +void ClientSocket::errorCallback(struct bufferevent *bev, short error, void *ctx) { + ClientSocket* instance = (ClientSocket*)ctx; + // tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); + + if (error & BEV_EVENT_READING) { + std::cout << "ClientSocket: error encountered while reading" << std::endl; + } else if (error & BEV_EVENT_WRITING) { + std::cout << "ClientSocket: error encountered while writing" << std::endl; + } else if (error & BEV_EVENT_EOF) { + std::cout << "ClientSocket: eof file reached" << std::endl; + } else if (error & BEV_EVENT_ERROR) { + std::cout << "ClientSocket: unrecoverable error encountered" << std::endl; + } else if (error & BEV_EVENT_TIMEOUT) { + std::cout << "ClientSocket: user-specified timeout reached" << std::endl; + } else if (error & BEV_EVENT_CONNECTED) { + std::cout << "ClientSocket: connect operation finished" << std::endl; + } + + // bufferevent_free(bev); +} + +void ClientSocket::connect(const std::string& address, int port) { +// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + + setupSockAddr(address, port); + if(::connect(_socketFD, (struct sockaddr *)&_sin, sizeof _sin) != 0) { + throw std::runtime_error(std::string("connect: ") + strerror(errno)); + } + + _clientEvent = bufferevent_socket_new(_base->base, _socketFD, 0); //BEV_OPT_THREADSAFE); + bufferevent_setcb(_clientEvent, ClientSocket::readCallback, NULL, ClientSocket::errorCallback, this); + bufferevent_enable(_clientEvent, EV_READ|EV_WRITE); +} + +int ClientSocket::write(const char* data, size_t size) { +// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + bufferevent_write(_clientEvent, data, size); + return size; +} + +void ClientSocket::readCallback(struct bufferevent *bev, void *ctx) { + ClientSocket* instance = (ClientSocket*)ctx; +// tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); + + size_t n; + struct evbuffer* input; + char* data = (char*)malloc(instance->_blockSizeRead); + + input = bufferevent_get_input(bev); + n = evbuffer_remove(input, data, instance->_blockSizeRead); + + instance->readCallback(data, n); + free(data); +} + +std::set<ServerSocket*> ServerSocket::_instances; + +ServerSocket::ServerSocket(int domain, int type, int protocol) : Socket(domain, type, protocol), _listenerEvent(NULL) { + _instances.insert(this); +} + +ServerSocket::~ServerSocket() { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + + std::map<struct bufferevent*, Connection>::iterator connIter = _connections.begin(); + while(connIter != _connections.end()) { + bufferevent_enable(connIter->second.bufferEvent, 0); + bufferevent_setcb(connIter->second.bufferEvent, NULL, NULL, NULL, 0); + + bufferevent_free(connIter->second.bufferEvent); +#ifdef WIN32 + closesocket(connIter->second.fd); +#else + close(connIter->second.fd); +#endif + + connIter++; + } + + if (_listenerEvent) { + event_del(_listenerEvent); + event_free(_listenerEvent); + } + + _instances.erase(this); + +} + +void ServerSocket::errorCallback(struct bufferevent *bev, short error, void *ctx) { + ServerSocket* instance = (ServerSocket*)ctx; + tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); + + if (_instances.find(instance) == _instances.end()) + return; + + if (error & BEV_EVENT_READING || error & BEV_EVENT_WRITING) { + // remote end close the connection + tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); + std::map<struct bufferevent*, Connection>::iterator conn = instance->_connections.find(bev); + if (conn != instance->_connections.end()) { + bufferevent_enable(conn->second.bufferEvent, 0); + bufferevent_free(conn->second.bufferEvent); +#ifdef WIN32 + closesocket(conn->second.fd); +#else + close(conn->second.fd); +#endif + + instance->_connections.erase(conn); + } + } else if (error & BEV_EVENT_EOF) { + std::cout << "ServerSocket: eof file reached" << std::endl; + } else if (error & BEV_EVENT_ERROR) { + std::cout << "ServerSocket: unrecoverable error encountered" << std::endl; + } else if (error & BEV_EVENT_TIMEOUT) { + std::cout << "ServerSocket: user-specified timeout reached" << std::endl; + } else if (error & BEV_EVENT_CONNECTED) { + std::cout << "ServerSocket: connect operation finished" << std::endl; + } + // bufferevent_free(bev); +} + +void ServerSocket::readCallback(struct bufferevent *bev, void *ctx) { + ServerSocket* instance = (ServerSocket*)ctx; + tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); + + // instance is already gone + if (_instances.find(instance) == _instances.end()) + return; + + size_t n; + struct evbuffer* input; + char* data = (char*)malloc(instance->_blockSizeRead); + + input = bufferevent_get_input(bev); + n = evbuffer_remove(input, data, instance->_blockSizeRead); + + instance->readCallback(data, n, instance->_connections[bev]); + free(data); +} + +void ServerSocket::bind() { + if (::bind(_socketFD, (struct sockaddr*)&_sin, sizeof(_sin)) < 0) { + throw std::runtime_error(std::string("bind: ") + strerror(errno)); + } +} + +void ServerSocket::listen(const std::string& address, int port) { +// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + setupSockAddr(address, port); + bind(); + + _listenerEvent = event_new(_base->base, _socketFD, EV_READ|EV_PERSIST, acceptCallback, (void*)this); + /*XXX check it */ + event_add(_listenerEvent, NULL); + + if (::listen(_socketFD, 16)<0) { + throw std::runtime_error(std::string("listen: ") + strerror(errno)); + } +} + +void ServerSocket::acceptCallback(evutil_socket_t listener, short event, void *ctx) { + ServerSocket* instance = (ServerSocket*)ctx; +// tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); + + struct sockaddr_storage ss; + socklen_t slen = sizeof(ss); + int fd = accept(listener, (struct sockaddr*)&ss, &slen); + if (fd < 0) { + throw std::runtime_error(std::string("accept: ") + strerror(errno)); + } else if (fd > FD_SETSIZE) { +#ifdef WIN32 + closesocket(fd); +#else + close(fd); +#endif + + throw std::runtime_error(std::string("accept: ") + strerror(errno)); + } else { + struct bufferevent *bev; + evutil_make_socket_nonblocking(fd); + bev = bufferevent_socket_new(instance->_base->base, fd, BEV_OPT_THREADSAFE); + bufferevent_setcb(bev, ServerSocket::readCallback, NULL, ServerSocket::errorCallback, ctx); + bufferevent_enable(bev, EV_READ|EV_WRITE); + + instance->_connections[bev].bufferEvent = bev; + instance->_connections[bev].fd = fd; + } +} + +void ServerSocket::Connection::reply(const char* data, size_t size) { + bufferevent_write(bufferEvent, data, size); +} + +} diff --git a/src/uscxml/server/Socket.h b/src/uscxml/server/Socket.h new file mode 100644 index 0000000..fcaada4 --- /dev/null +++ b/src/uscxml/server/Socket.h @@ -0,0 +1,116 @@ +/** + * @file + * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) + * @copyright Simplified BSD + * + * @cond + * This program is free software: you can redistribute it and/or modify + * it under the terms of the FreeBSD license as published by the FreeBSD + * project. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the FreeBSD license along with this + * program. If not, see <http://www.opensource.org/licenses/bsd-license>. + * @endcond + */ + +#ifndef SOCKETCLIENT_H_9A0B2A88 +#define SOCKETCLIENT_H_9A0B2A88 + +#include "uscxml/Common.h" // for USCXML_API +#include "uscxml/concurrency/EventBase.h" +#include <string> +#include <map> +#include <set> + +#ifdef _WIN32 +# include <winsock2.h> +#else +# include <netinet/in.h> /* For sockaddr_in */ +#endif + +#include "uscxml/concurrency/tinythread.h" // for recursive_mutex, etc + +extern "C" { +#include <event2/event.h> +#include <event2/buffer.h> +#include <event2/bufferevent.h> +} + +namespace uscxml { + +class USCXML_API Socket { +public: + Socket(int domain, int type, int protocol); + virtual ~Socket(); + + void setBlockSizeRead(size_t size); + +protected: + + void setupSockAddr(const std::string& address, int port); + + evutil_socket_t _socketFD; + + tthread::recursive_mutex _mutex; + size_t _blockSizeRead; + struct sockaddr_in _sin; + + boost::shared_ptr<EventBase> _base; +}; + +class USCXML_API ServerSocket : public Socket { +public: + class Connection { + public: + struct bufferevent* bufferEvent; + int fd; + + void reply(const char* data, size_t size); + }; + + ServerSocket(int domain, int type, int protocol); + virtual ~ServerSocket(); + + void listen(const std::string& address, int port); + virtual void readCallback(const char* data, size_t size, Connection& conn) {}; + + +protected: + void bind(); + static void acceptCallback(evutil_socket_t listener, short event, void *ctx); + static void errorCallback(struct bufferevent *bev, short error, void *ctx); + static void readCallback(struct bufferevent *bev, void *ctx); + + std::map<struct bufferevent*, Connection> _connections; + struct event* _listenerEvent; + + static std::set<ServerSocket*> _instances; + +}; + +class USCXML_API ClientSocket : public Socket { +public: + ClientSocket(int domain, int type, int protocol); + virtual ~ClientSocket(); + + virtual void readCallback(const char* data, size_t size) {}; + void connect(const std::string& address, int port); + int write(const char* data, size_t size); + + +protected: + static void readCallback(struct bufferevent *bev, void *ctx); + static void errorCallback(struct bufferevent *bev, short error, void *ctx); + + struct bufferevent* _clientEvent; + +}; + + +} + +#endif /* end of include guard: SOCKETCLIENT_H_9A0B2A88 */ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 10c1213..62b8749 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -117,7 +117,7 @@ set_target_properties(test-url PROPERTIES FOLDER "Tests") add_executable(test-cmdline-parsing src/test-cmdline-parsing.cpp) target_link_libraries(test-cmdline-parsing uscxml) # add_test(test-cmdline-parsing ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/test-cmdline-parsing) -# set_target_properties(test-cmdline-parsing PROPERTIES FOLDER "Tests") +set_target_properties(test-cmdline-parsing PROPERTIES FOLDER "Tests") # add_executable(test-initial-config src/test-initial-config.cpp) # target_link_libraries(test-initial-config uscxml) @@ -129,6 +129,11 @@ target_link_libraries(test-datamodel uscxml) add_test(test-datamodel ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/test-datamodel) set_target_properties(test-datamodel PROPERTIES FOLDER "Tests") +add_executable(test-sockets src/test-sockets.cpp) +target_link_libraries(test-sockets uscxml) +# add_test(test-datamodel ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/test-sockets) +set_target_properties(test-sockets PROPERTIES FOLDER "Tests") + # if (NOT WIN32) # add_executable(test-mmi src/test-mmi.cpp) # target_link_libraries(test-mmi uscxml) diff --git a/test/src/test-eventdelay.cpp b/test/src/test-eventdelay.cpp index 12cc751..ce6c923 100644 --- a/test/src/test-eventdelay.cpp +++ b/test/src/test-eventdelay.cpp @@ -1,4 +1,4 @@ -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include <iostream> int eventCalled = 0; diff --git a/test/src/test-sockets.cpp b/test/src/test-sockets.cpp new file mode 100644 index 0000000..a712da0 --- /dev/null +++ b/test/src/test-sockets.cpp @@ -0,0 +1,98 @@ +#include "uscxml/config.h" +#include "uscxml/server/Socket.h" +#include <iostream> + +#include <event2/event.h> +#include "event2/thread.h" + +#ifdef HAS_SIGNAL_H +#include <signal.h> +#endif + +#include "uscxml/concurrency/tinythread.h" + +using namespace uscxml; + +class TestServer : public ServerSocket { +public: + TestServer(int domain, int type, int protocol) : ServerSocket(domain, type, protocol) {} + virtual void readCallback(const char* data, size_t size, Connection& conn) { + std::string content(data, size); +// std::cout << "Server got: " << content << std::endl; + std::string urghs("hi!"); + conn.reply(urghs.data(), urghs.size()); + }; +}; + +class TestClient : public ClientSocket { +public: + TestClient(int domain, int type, int protocol) : ClientSocket(domain, type, protocol) {} + virtual void readCallback(const char* data, size_t size) { + std::string content(data, size); + }; +}; + +int main(int argc, char** argv) { + +#if defined(HAS_SIGNAL_H) && !defined(WIN32) + signal(SIGPIPE, SIG_IGN); +#endif + +#ifndef _WIN32 + evthread_use_pthreads(); +#else + evthread_use_windows_threads(); +#endif + + if (0) { + // start server socket and connect + int iterations = 100; + + TestServer server(PF_INET, SOCK_STREAM, 0); + try { + server.listen("*", 1234); + + while(iterations--) { + std::cout << iterations << std::endl; + TestClient client(PF_INET, SOCK_STREAM, 0); + client.connect("127.0.0.1", 1234); + + std::string hello("hello"); + client.write(hello.data(), hello.size()); + + tthread::this_thread::sleep_for(tthread::chrono::milliseconds(20)); + } + + } catch (std::runtime_error e) { + std::cout << e.what() << std::endl; + } + } + + { + // connect client to server and kill server + int iterations = 100; + + try { + + while(iterations--) { + std::cout << iterations << std::endl; + TestServer* server = new TestServer(PF_INET, SOCK_STREAM, 0); + server->listen("*", 1236 + iterations); + + TestClient client(PF_INET, SOCK_STREAM, 0); + client.connect("127.0.0.1", 1236 + iterations); + + std::string hello("hello"); + client.write(hello.data(), hello.size()); + + delete server; + + tthread::this_thread::sleep_for(tthread::chrono::milliseconds(20)); + } + + } catch (std::runtime_error e) { + std::cout << e.what() << std::endl; + } + + } +}
\ No newline at end of file |