summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStefan Radomski <radomski@tk.informatik.tu-darmstadt.de>2014-07-16 21:54:17 (GMT)
committerStefan Radomski <radomski@tk.informatik.tu-darmstadt.de>2014-07-16 21:54:17 (GMT)
commitd8dc46cc000d81d031c5f08806dc26a3b433e1bf (patch)
treeb4bc051b80a47ac849631a5da6da5fdc3bbd8d4d
parentb9fcc8d669491725d8dc37583958eadd31a46456 (diff)
downloaduscxml-d8dc46cc000d81d031c5f08806dc26a3b433e1bf.zip
uscxml-d8dc46cc000d81d031c5f08806dc26a3b433e1bf.tar.gz
uscxml-d8dc46cc000d81d031c5f08806dc26a3b433e1bf.tar.bz2
Generic socket support
-rw-r--r--src/bindings/swig/java/CMakeLists.txt1
-rw-r--r--src/uscxml/Interpreter.cpp2
-rw-r--r--src/uscxml/concurrency/DelayedEventQueue.cpp (renamed from src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp)0
-rw-r--r--src/uscxml/concurrency/DelayedEventQueue.h (renamed from src/uscxml/concurrency/eventqueue/DelayedEventQueue.h)0
-rw-r--r--src/uscxml/concurrency/EventBase.cpp74
-rw-r--r--src/uscxml/concurrency/EventBase.h61
-rw-r--r--src/uscxml/interpreter/InterpreterDraft6.cpp2
-rw-r--r--src/uscxml/interpreter/InterpreterRC.cpp2
-rw-r--r--src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp2
-rw-r--r--src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp2
-rw-r--r--src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h2
-rw-r--r--src/uscxml/plugins/invoker/im/IMInvoker.cpp2
-rw-r--r--src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h2
-rw-r--r--src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h2
-rw-r--r--src/uscxml/server/Socket.cpp291
-rw-r--r--src/uscxml/server/Socket.h116
-rw-r--r--test/CMakeLists.txt7
-rw-r--r--test/src/test-eventdelay.cpp2
-rw-r--r--test/src/test-sockets.cpp98
19 files changed, 657 insertions, 11 deletions
diff --git a/src/bindings/swig/java/CMakeLists.txt b/src/bindings/swig/java/CMakeLists.txt
index 86f2fd6..eb51f83 100644
--- a/src/bindings/swig/java/CMakeLists.txt
+++ b/src/bindings/swig/java/CMakeLists.txt
@@ -31,6 +31,7 @@ set_target_properties(uscxmlNativeJava PROPERTIES COMPILE_FLAGS "-DSWIG")
swig_link_libraries(uscxmlNativeJava uscxml)
+FIND_PROGRAM(ANT_EXECUTABLE ant PATHS $ENV{ANT_HOME}/bin ENV PATH )
if (ANT_EXECUTABLE)
set(USCXML_LANGUAGE_BINDINGS "java ${USCXML_LANGUAGE_BINDINGS}")
diff --git a/src/uscxml/Interpreter.cpp b/src/uscxml/Interpreter.cpp
index 8a9ba63..90f00c4 100644
--- a/src/uscxml/Interpreter.cpp
+++ b/src/uscxml/Interpreter.cpp
@@ -29,7 +29,7 @@
#include "uscxml/plugins/invoker/http/HTTPServletInvoker.h"
#include "uscxml/server/InterpreterServlet.h"
-#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h"
+#include "uscxml/concurrency/DelayedEventQueue.h"
#include <DOM/Simple/DOMImplementation.hpp>
#include <SAX/helpers/InputSourceResolver.hpp>
diff --git a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp b/src/uscxml/concurrency/DelayedEventQueue.cpp
index 642c4a0..642c4a0 100644
--- a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp
+++ b/src/uscxml/concurrency/DelayedEventQueue.cpp
diff --git a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.h b/src/uscxml/concurrency/DelayedEventQueue.h
index 2248c47..2248c47 100644
--- a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.h
+++ b/src/uscxml/concurrency/DelayedEventQueue.h
diff --git a/src/uscxml/concurrency/EventBase.cpp b/src/uscxml/concurrency/EventBase.cpp
new file mode 100644
index 0000000..b5c1308
--- /dev/null
+++ b/src/uscxml/concurrency/EventBase.cpp
@@ -0,0 +1,74 @@
+/**
+ * @file
+ * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de)
+ * @copyright Simplified BSD
+ *
+ * @cond
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the FreeBSD license as published by the FreeBSD
+ * project.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the FreeBSD license along with this
+ * program. If not, see <http://www.opensource.org/licenses/bsd-license>.
+ * @endcond
+ */
+
+#include "EventBase.h"
+
+namespace uscxml {
+
+std::map<std::string, boost::weak_ptr<EventBase> > EventBase::_eventBases;
+tthread::recursive_mutex EventBase::_instanceMutex;
+
+boost::shared_ptr<EventBase> EventBase::get(const std::string& name) {
+ tthread::lock_guard<tthread::recursive_mutex> lock(_instanceMutex);
+
+ std::map<std::string, boost::weak_ptr<EventBase> >::const_iterator instIter = _eventBases.begin();
+ while(instIter != _eventBases.end()) {
+ if (!instIter->second.lock()) {
+ _eventBases.erase(instIter++);
+ } else {
+ instIter++;
+ }
+ }
+
+ instIter = _eventBases.find(name);
+ boost::shared_ptr<EventBase> instance = instIter->second.lock();
+ if (instance)
+ return instance;
+
+ instance = boost::shared_ptr<EventBase>(new EventBase());
+ _eventBases.insert(std::make_pair(name, instance));
+
+ return instance;
+}
+
+EventBase::EventBase() {
+ base = event_base_new();
+ _isStarted = true;
+ _thread = new tthread::thread(EventBase::run, this);
+}
+
+void EventBase::run(void* arg) {
+ EventBase* INSTANCE = (EventBase*)arg;
+ int result;
+
+ while(INSTANCE->_isStarted) {
+ result = event_base_loop(INSTANCE->base, EVLOOP_NO_EXIT_ON_EMPTY);
+ (void)result;
+ }
+}
+
+EventBase::~EventBase() {
+ _isStarted = false;
+ event_base_loopbreak(base);
+ _thread->join();
+ event_base_free(base);
+ delete _thread;
+}
+
+} \ No newline at end of file
diff --git a/src/uscxml/concurrency/EventBase.h b/src/uscxml/concurrency/EventBase.h
new file mode 100644
index 0000000..22bd664
--- /dev/null
+++ b/src/uscxml/concurrency/EventBase.h
@@ -0,0 +1,61 @@
+/**
+ * @file
+ * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de)
+ * @copyright Simplified BSD
+ *
+ * @cond
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the FreeBSD license as published by the FreeBSD
+ * project.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the FreeBSD license along with this
+ * program. If not, see <http://www.opensource.org/licenses/bsd-license>.
+ * @endcond
+ */
+
+#ifndef EVENTBASE_H_C479DA74
+#define EVENTBASE_H_C479DA74
+
+#include "uscxml/Common.h"
+#include "uscxml/concurrency/tinythread.h"
+
+extern "C" {
+#include <event2/event.h>
+#include <event2/buffer.h>
+#include <event2/bufferevent.h>
+}
+
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+#include <map>
+#include <string>
+
+namespace uscxml {
+
+class USCXML_API EventBase {
+public:
+ EventBase();
+ virtual ~EventBase();
+
+ static boost::shared_ptr<EventBase> get(const std::string& name);
+ struct event_base* base;
+
+protected:
+
+ static void run(void*);
+
+ tthread::thread* _thread;
+ bool _isStarted;
+
+ static std::map<std::string, boost::weak_ptr<EventBase> > _eventBases;
+ static tthread::recursive_mutex _instanceMutex;
+
+};
+
+}
+
+#endif /* end of include guard: EVENTBASE_H_C479DA74 */
diff --git a/src/uscxml/interpreter/InterpreterDraft6.cpp b/src/uscxml/interpreter/InterpreterDraft6.cpp
index e2ba95e..7569fdb 100644
--- a/src/uscxml/interpreter/InterpreterDraft6.cpp
+++ b/src/uscxml/interpreter/InterpreterDraft6.cpp
@@ -18,7 +18,7 @@
*/
#include "InterpreterDraft6.h"
-#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h"
+#include "uscxml/concurrency/DelayedEventQueue.h"
#include <glog/logging.h>
#include "uscxml/UUID.h"
diff --git a/src/uscxml/interpreter/InterpreterRC.cpp b/src/uscxml/interpreter/InterpreterRC.cpp
index 3d17c87..24b9003 100644
--- a/src/uscxml/interpreter/InterpreterRC.cpp
+++ b/src/uscxml/interpreter/InterpreterRC.cpp
@@ -20,7 +20,7 @@
#include "InterpreterRC.h"
#include "uscxml/Factory.h"
-#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h"
+#include "uscxml/concurrency/DelayedEventQueue.h"
#include <glog/logging.h>
#include "uscxml/UUID.h"
diff --git a/src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp b/src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp
index 849845e..8a4ea3d 100644
--- a/src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp
+++ b/src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp
@@ -20,7 +20,7 @@
#include <boost/algorithm/string.hpp>
#include "CalendarInvoker.h"
-#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h"
+#include "uscxml/concurrency/DelayedEventQueue.h"
#include <glog/logging.h>
diff --git a/src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp b/src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp
index 22c7942..69ad686 100644
--- a/src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp
+++ b/src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp
@@ -20,7 +20,7 @@
#include "ExpectInvoker.h"
#include <glog/logging.h>
-#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h"
+#include "uscxml/concurrency/DelayedEventQueue.h"
#ifdef BUILD_AS_PLUGINS
#include <Pluma/Connector.hpp>
diff --git a/src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h b/src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h
index ba4e9ef..f0bc32c 100644
--- a/src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h
+++ b/src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h
@@ -21,7 +21,7 @@
#define HEARTBEATINVOKER_H_W09J90F0
#include <uscxml/Interpreter.h>
-#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h"
+#include "uscxml/concurrency/DelayedEventQueue.h"
#ifdef BUILD_AS_PLUGINS
#include "uscxml/plugins/Plugins.h"
diff --git a/src/uscxml/plugins/invoker/im/IMInvoker.cpp b/src/uscxml/plugins/invoker/im/IMInvoker.cpp
index 08d6a03..2bbb855 100644
--- a/src/uscxml/plugins/invoker/im/IMInvoker.cpp
+++ b/src/uscxml/plugins/invoker/im/IMInvoker.cpp
@@ -23,7 +23,7 @@
#include "uscxml/DOMUtils.h"
#include <boost/algorithm/string.hpp>
-#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h"
+#include "uscxml/concurrency/DelayedEventQueue.h"
#ifdef BUILD_AS_PLUGINS
#include <Pluma/Connector.hpp>
diff --git a/src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h b/src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h
index 29bc208..3a43e49 100644
--- a/src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h
+++ b/src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h
@@ -35,7 +35,7 @@ extern "C" {
# define USCXML_PLUGIN_API
#endif
-#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h"
+#include "uscxml/concurrency/DelayedEventQueue.h"
#include "uscxml/server/HTTPServer.h"
#include "uscxml/Interpreter.h"
#include "uscxml/Factory.h"
diff --git a/src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h b/src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h
index e30bdb2..0a57d70 100644
--- a/src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h
+++ b/src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h
@@ -20,7 +20,7 @@
#ifndef COMETIOPROCESSOR_H_2CUY93KU
#define COMETIOPROCESSOR_H_2CUY93KU
-#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h"
+#include "uscxml/concurrency/DelayedEventQueue.h"
#include "uscxml/server/HTTPServer.h"
#include "uscxml/Interpreter.h"
#include "uscxml/Factory.h"
diff --git a/src/uscxml/server/Socket.cpp b/src/uscxml/server/Socket.cpp
new file mode 100644
index 0000000..9c844e5
--- /dev/null
+++ b/src/uscxml/server/Socket.cpp
@@ -0,0 +1,291 @@
+/**
+ * @file
+ * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de)
+ * @copyright Simplified BSD
+ *
+ * @cond
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the FreeBSD license as published by the FreeBSD
+ * project.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the FreeBSD license along with this
+ * program. If not, see <http://www.opensource.org/licenses/bsd-license>.
+ * @endcond
+ */
+
+#include "Socket.h"
+
+#include "uscxml/Message.h" // for Data, Event
+#include "uscxml/config.h" // for OPENSSL_FOUND
+
+#ifndef _WIN32
+#include <sys/socket.h> /* For socket functions */
+#include <arpa/inet.h> // inet_addr
+#endif
+
+#include <fcntl.h> /* For fcntl */
+#include <iostream>
+
+namespace uscxml {
+
+// see: http://codepad.org/XRJAVg5m
+Socket::Socket(int domain, int type, int protocol) {
+
+ _base = EventBase::get("sockets");
+ _blockSizeRead = 1024;
+
+ if (!_base)
+ throw std::runtime_error("Cannot get eventbase");
+
+ _sin.sin_family = domain;
+ _socketFD = socket(domain, type, protocol);
+
+ if (_socketFD == -1)
+ throw std::runtime_error(std::string("socket: ") + strerror(errno));
+#ifndef WIN32
+ {
+ int one = 1;
+ if (setsockopt(_socketFD, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) {
+ throw std::runtime_error(std::string("setsockopt: ") + strerror(errno));
+ }
+ }
+#endif
+
+}
+
+Socket::~Socket() {
+ if (_socketFD > 0)
+#ifdef WIN32
+ closesocket(_socketFD);
+#else
+ close(_socketFD);
+#endif
+}
+
+void Socket::setupSockAddr(const std::string& address, int port) {
+ if (address == "*") {
+ _sin.sin_addr.s_addr = 0;
+ } else {
+ _sin.sin_addr.s_addr = inet_addr(address.c_str());
+ if (_sin.sin_addr.s_addr == INADDR_NONE)
+ throw std::runtime_error(std::string("inet_addr: ") + strerror(errno));
+ }
+
+ _sin.sin_port = htons(port);
+}
+
+void Socket::setBlockSizeRead(size_t size) {
+// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
+ _blockSizeRead = size;
+}
+
+ClientSocket::ClientSocket(int domain, int type, int protocol) : Socket(domain, type, protocol), _clientEvent(NULL) {
+}
+
+
+ClientSocket::~ClientSocket() {
+ if (_clientEvent) {
+ bufferevent_enable(_clientEvent, 0);
+ bufferevent_free(_clientEvent);
+ }
+}
+
+void ClientSocket::errorCallback(struct bufferevent *bev, short error, void *ctx) {
+ ClientSocket* instance = (ClientSocket*)ctx;
+ // tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex);
+
+ if (error & BEV_EVENT_READING) {
+ std::cout << "ClientSocket: error encountered while reading" << std::endl;
+ } else if (error & BEV_EVENT_WRITING) {
+ std::cout << "ClientSocket: error encountered while writing" << std::endl;
+ } else if (error & BEV_EVENT_EOF) {
+ std::cout << "ClientSocket: eof file reached" << std::endl;
+ } else if (error & BEV_EVENT_ERROR) {
+ std::cout << "ClientSocket: unrecoverable error encountered" << std::endl;
+ } else if (error & BEV_EVENT_TIMEOUT) {
+ std::cout << "ClientSocket: user-specified timeout reached" << std::endl;
+ } else if (error & BEV_EVENT_CONNECTED) {
+ std::cout << "ClientSocket: connect operation finished" << std::endl;
+ }
+
+ // bufferevent_free(bev);
+}
+
+void ClientSocket::connect(const std::string& address, int port) {
+// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
+
+ setupSockAddr(address, port);
+ if(::connect(_socketFD, (struct sockaddr *)&_sin, sizeof _sin) != 0) {
+ throw std::runtime_error(std::string("connect: ") + strerror(errno));
+ }
+
+ _clientEvent = bufferevent_socket_new(_base->base, _socketFD, 0); //BEV_OPT_THREADSAFE);
+ bufferevent_setcb(_clientEvent, ClientSocket::readCallback, NULL, ClientSocket::errorCallback, this);
+ bufferevent_enable(_clientEvent, EV_READ|EV_WRITE);
+}
+
+int ClientSocket::write(const char* data, size_t size) {
+// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
+ bufferevent_write(_clientEvent, data, size);
+ return size;
+}
+
+void ClientSocket::readCallback(struct bufferevent *bev, void *ctx) {
+ ClientSocket* instance = (ClientSocket*)ctx;
+// tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex);
+
+ size_t n;
+ struct evbuffer* input;
+ char* data = (char*)malloc(instance->_blockSizeRead);
+
+ input = bufferevent_get_input(bev);
+ n = evbuffer_remove(input, data, instance->_blockSizeRead);
+
+ instance->readCallback(data, n);
+ free(data);
+}
+
+std::set<ServerSocket*> ServerSocket::_instances;
+
+ServerSocket::ServerSocket(int domain, int type, int protocol) : Socket(domain, type, protocol), _listenerEvent(NULL) {
+ _instances.insert(this);
+}
+
+ServerSocket::~ServerSocket() {
+ tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
+
+ std::map<struct bufferevent*, Connection>::iterator connIter = _connections.begin();
+ while(connIter != _connections.end()) {
+ bufferevent_enable(connIter->second.bufferEvent, 0);
+ bufferevent_setcb(connIter->second.bufferEvent, NULL, NULL, NULL, 0);
+
+ bufferevent_free(connIter->second.bufferEvent);
+#ifdef WIN32
+ closesocket(connIter->second.fd);
+#else
+ close(connIter->second.fd);
+#endif
+
+ connIter++;
+ }
+
+ if (_listenerEvent) {
+ event_del(_listenerEvent);
+ event_free(_listenerEvent);
+ }
+
+ _instances.erase(this);
+
+}
+
+void ServerSocket::errorCallback(struct bufferevent *bev, short error, void *ctx) {
+ ServerSocket* instance = (ServerSocket*)ctx;
+ tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex);
+
+ if (_instances.find(instance) == _instances.end())
+ return;
+
+ if (error & BEV_EVENT_READING || error & BEV_EVENT_WRITING) {
+ // remote end close the connection
+ tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex);
+ std::map<struct bufferevent*, Connection>::iterator conn = instance->_connections.find(bev);
+ if (conn != instance->_connections.end()) {
+ bufferevent_enable(conn->second.bufferEvent, 0);
+ bufferevent_free(conn->second.bufferEvent);
+#ifdef WIN32
+ closesocket(conn->second.fd);
+#else
+ close(conn->second.fd);
+#endif
+
+ instance->_connections.erase(conn);
+ }
+ } else if (error & BEV_EVENT_EOF) {
+ std::cout << "ServerSocket: eof file reached" << std::endl;
+ } else if (error & BEV_EVENT_ERROR) {
+ std::cout << "ServerSocket: unrecoverable error encountered" << std::endl;
+ } else if (error & BEV_EVENT_TIMEOUT) {
+ std::cout << "ServerSocket: user-specified timeout reached" << std::endl;
+ } else if (error & BEV_EVENT_CONNECTED) {
+ std::cout << "ServerSocket: connect operation finished" << std::endl;
+ }
+ // bufferevent_free(bev);
+}
+
+void ServerSocket::readCallback(struct bufferevent *bev, void *ctx) {
+ ServerSocket* instance = (ServerSocket*)ctx;
+ tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex);
+
+ // instance is already gone
+ if (_instances.find(instance) == _instances.end())
+ return;
+
+ size_t n;
+ struct evbuffer* input;
+ char* data = (char*)malloc(instance->_blockSizeRead);
+
+ input = bufferevent_get_input(bev);
+ n = evbuffer_remove(input, data, instance->_blockSizeRead);
+
+ instance->readCallback(data, n, instance->_connections[bev]);
+ free(data);
+}
+
+void ServerSocket::bind() {
+ if (::bind(_socketFD, (struct sockaddr*)&_sin, sizeof(_sin)) < 0) {
+ throw std::runtime_error(std::string("bind: ") + strerror(errno));
+ }
+}
+
+void ServerSocket::listen(const std::string& address, int port) {
+// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
+ setupSockAddr(address, port);
+ bind();
+
+ _listenerEvent = event_new(_base->base, _socketFD, EV_READ|EV_PERSIST, acceptCallback, (void*)this);
+ /*XXX check it */
+ event_add(_listenerEvent, NULL);
+
+ if (::listen(_socketFD, 16)<0) {
+ throw std::runtime_error(std::string("listen: ") + strerror(errno));
+ }
+}
+
+void ServerSocket::acceptCallback(evutil_socket_t listener, short event, void *ctx) {
+ ServerSocket* instance = (ServerSocket*)ctx;
+// tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex);
+
+ struct sockaddr_storage ss;
+ socklen_t slen = sizeof(ss);
+ int fd = accept(listener, (struct sockaddr*)&ss, &slen);
+ if (fd < 0) {
+ throw std::runtime_error(std::string("accept: ") + strerror(errno));
+ } else if (fd > FD_SETSIZE) {
+#ifdef WIN32
+ closesocket(fd);
+#else
+ close(fd);
+#endif
+
+ throw std::runtime_error(std::string("accept: ") + strerror(errno));
+ } else {
+ struct bufferevent *bev;
+ evutil_make_socket_nonblocking(fd);
+ bev = bufferevent_socket_new(instance->_base->base, fd, BEV_OPT_THREADSAFE);
+ bufferevent_setcb(bev, ServerSocket::readCallback, NULL, ServerSocket::errorCallback, ctx);
+ bufferevent_enable(bev, EV_READ|EV_WRITE);
+
+ instance->_connections[bev].bufferEvent = bev;
+ instance->_connections[bev].fd = fd;
+ }
+}
+
+void ServerSocket::Connection::reply(const char* data, size_t size) {
+ bufferevent_write(bufferEvent, data, size);
+}
+
+}
diff --git a/src/uscxml/server/Socket.h b/src/uscxml/server/Socket.h
new file mode 100644
index 0000000..fcaada4
--- /dev/null
+++ b/src/uscxml/server/Socket.h
@@ -0,0 +1,116 @@
+/**
+ * @file
+ * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de)
+ * @copyright Simplified BSD
+ *
+ * @cond
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the FreeBSD license as published by the FreeBSD
+ * project.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the FreeBSD license along with this
+ * program. If not, see <http://www.opensource.org/licenses/bsd-license>.
+ * @endcond
+ */
+
+#ifndef SOCKETCLIENT_H_9A0B2A88
+#define SOCKETCLIENT_H_9A0B2A88
+
+#include "uscxml/Common.h" // for USCXML_API
+#include "uscxml/concurrency/EventBase.h"
+#include <string>
+#include <map>
+#include <set>
+
+#ifdef _WIN32
+# include <winsock2.h>
+#else
+# include <netinet/in.h> /* For sockaddr_in */
+#endif
+
+#include "uscxml/concurrency/tinythread.h" // for recursive_mutex, etc
+
+extern "C" {
+#include <event2/event.h>
+#include <event2/buffer.h>
+#include <event2/bufferevent.h>
+}
+
+namespace uscxml {
+
+class USCXML_API Socket {
+public:
+ Socket(int domain, int type, int protocol);
+ virtual ~Socket();
+
+ void setBlockSizeRead(size_t size);
+
+protected:
+
+ void setupSockAddr(const std::string& address, int port);
+
+ evutil_socket_t _socketFD;
+
+ tthread::recursive_mutex _mutex;
+ size_t _blockSizeRead;
+ struct sockaddr_in _sin;
+
+ boost::shared_ptr<EventBase> _base;
+};
+
+class USCXML_API ServerSocket : public Socket {
+public:
+ class Connection {
+ public:
+ struct bufferevent* bufferEvent;
+ int fd;
+
+ void reply(const char* data, size_t size);
+ };
+
+ ServerSocket(int domain, int type, int protocol);
+ virtual ~ServerSocket();
+
+ void listen(const std::string& address, int port);
+ virtual void readCallback(const char* data, size_t size, Connection& conn) {};
+
+
+protected:
+ void bind();
+ static void acceptCallback(evutil_socket_t listener, short event, void *ctx);
+ static void errorCallback(struct bufferevent *bev, short error, void *ctx);
+ static void readCallback(struct bufferevent *bev, void *ctx);
+
+ std::map<struct bufferevent*, Connection> _connections;
+ struct event* _listenerEvent;
+
+ static std::set<ServerSocket*> _instances;
+
+};
+
+class USCXML_API ClientSocket : public Socket {
+public:
+ ClientSocket(int domain, int type, int protocol);
+ virtual ~ClientSocket();
+
+ virtual void readCallback(const char* data, size_t size) {};
+ void connect(const std::string& address, int port);
+ int write(const char* data, size_t size);
+
+
+protected:
+ static void readCallback(struct bufferevent *bev, void *ctx);
+ static void errorCallback(struct bufferevent *bev, short error, void *ctx);
+
+ struct bufferevent* _clientEvent;
+
+};
+
+
+}
+
+#endif /* end of include guard: SOCKETCLIENT_H_9A0B2A88 */
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 10c1213..62b8749 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -117,7 +117,7 @@ set_target_properties(test-url PROPERTIES FOLDER "Tests")
add_executable(test-cmdline-parsing src/test-cmdline-parsing.cpp)
target_link_libraries(test-cmdline-parsing uscxml)
# add_test(test-cmdline-parsing ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/test-cmdline-parsing)
-# set_target_properties(test-cmdline-parsing PROPERTIES FOLDER "Tests")
+set_target_properties(test-cmdline-parsing PROPERTIES FOLDER "Tests")
# add_executable(test-initial-config src/test-initial-config.cpp)
# target_link_libraries(test-initial-config uscxml)
@@ -129,6 +129,11 @@ target_link_libraries(test-datamodel uscxml)
add_test(test-datamodel ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/test-datamodel)
set_target_properties(test-datamodel PROPERTIES FOLDER "Tests")
+add_executable(test-sockets src/test-sockets.cpp)
+target_link_libraries(test-sockets uscxml)
+# add_test(test-datamodel ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/test-sockets)
+set_target_properties(test-sockets PROPERTIES FOLDER "Tests")
+
# if (NOT WIN32)
# add_executable(test-mmi src/test-mmi.cpp)
# target_link_libraries(test-mmi uscxml)
diff --git a/test/src/test-eventdelay.cpp b/test/src/test-eventdelay.cpp
index 12cc751..ce6c923 100644
--- a/test/src/test-eventdelay.cpp
+++ b/test/src/test-eventdelay.cpp
@@ -1,4 +1,4 @@
-#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h"
+#include "uscxml/concurrency/DelayedEventQueue.h"
#include <iostream>
int eventCalled = 0;
diff --git a/test/src/test-sockets.cpp b/test/src/test-sockets.cpp
new file mode 100644
index 0000000..a712da0
--- /dev/null
+++ b/test/src/test-sockets.cpp
@@ -0,0 +1,98 @@
+#include "uscxml/config.h"
+#include "uscxml/server/Socket.h"
+#include <iostream>
+
+#include <event2/event.h>
+#include "event2/thread.h"
+
+#ifdef HAS_SIGNAL_H
+#include <signal.h>
+#endif
+
+#include "uscxml/concurrency/tinythread.h"
+
+using namespace uscxml;
+
+class TestServer : public ServerSocket {
+public:
+ TestServer(int domain, int type, int protocol) : ServerSocket(domain, type, protocol) {}
+ virtual void readCallback(const char* data, size_t size, Connection& conn) {
+ std::string content(data, size);
+// std::cout << "Server got: " << content << std::endl;
+ std::string urghs("hi!");
+ conn.reply(urghs.data(), urghs.size());
+ };
+};
+
+class TestClient : public ClientSocket {
+public:
+ TestClient(int domain, int type, int protocol) : ClientSocket(domain, type, protocol) {}
+ virtual void readCallback(const char* data, size_t size) {
+ std::string content(data, size);
+ };
+};
+
+int main(int argc, char** argv) {
+
+#if defined(HAS_SIGNAL_H) && !defined(WIN32)
+ signal(SIGPIPE, SIG_IGN);
+#endif
+
+#ifndef _WIN32
+ evthread_use_pthreads();
+#else
+ evthread_use_windows_threads();
+#endif
+
+ if (0) {
+ // start server socket and connect
+ int iterations = 100;
+
+ TestServer server(PF_INET, SOCK_STREAM, 0);
+ try {
+ server.listen("*", 1234);
+
+ while(iterations--) {
+ std::cout << iterations << std::endl;
+ TestClient client(PF_INET, SOCK_STREAM, 0);
+ client.connect("127.0.0.1", 1234);
+
+ std::string hello("hello");
+ client.write(hello.data(), hello.size());
+
+ tthread::this_thread::sleep_for(tthread::chrono::milliseconds(20));
+ }
+
+ } catch (std::runtime_error e) {
+ std::cout << e.what() << std::endl;
+ }
+ }
+
+ {
+ // connect client to server and kill server
+ int iterations = 100;
+
+ try {
+
+ while(iterations--) {
+ std::cout << iterations << std::endl;
+ TestServer* server = new TestServer(PF_INET, SOCK_STREAM, 0);
+ server->listen("*", 1236 + iterations);
+
+ TestClient client(PF_INET, SOCK_STREAM, 0);
+ client.connect("127.0.0.1", 1236 + iterations);
+
+ std::string hello("hello");
+ client.write(hello.data(), hello.size());
+
+ delete server;
+
+ tthread::this_thread::sleep_for(tthread::chrono::milliseconds(20));
+ }
+
+ } catch (std::runtime_error e) {
+ std::cout << e.what() << std::endl;
+ }
+
+ }
+} \ No newline at end of file