From d8dc46cc000d81d031c5f08806dc26a3b433e1bf Mon Sep 17 00:00:00 2001 From: Stefan Radomski Date: Wed, 16 Jul 2014 23:54:17 +0200 Subject: Generic socket support --- src/bindings/swig/java/CMakeLists.txt | 1 + src/uscxml/Interpreter.cpp | 2 +- src/uscxml/concurrency/DelayedEventQueue.cpp | 156 +++++++++++ src/uscxml/concurrency/DelayedEventQueue.h | 85 ++++++ src/uscxml/concurrency/EventBase.cpp | 74 ++++++ src/uscxml/concurrency/EventBase.h | 61 +++++ .../concurrency/eventqueue/DelayedEventQueue.cpp | 156 ----------- .../concurrency/eventqueue/DelayedEventQueue.h | 85 ------ src/uscxml/interpreter/InterpreterDraft6.cpp | 2 +- src/uscxml/interpreter/InterpreterRC.cpp | 2 +- .../plugins/invoker/calendar/CalendarInvoker.cpp | 2 +- .../plugins/invoker/expect/ExpectInvoker.cpp | 2 +- .../plugins/invoker/heartbeat/HeartbeatInvoker.h | 2 +- src/uscxml/plugins/invoker/im/IMInvoker.cpp | 2 +- .../ioprocessor/basichttp/BasicHTTPIOProcessor.h | 2 +- .../plugins/ioprocessor/comet/CometIOProcessor.h | 2 +- src/uscxml/server/Socket.cpp | 291 +++++++++++++++++++++ src/uscxml/server/Socket.h | 116 ++++++++ test/CMakeLists.txt | 7 +- test/src/test-eventdelay.cpp | 2 +- test/src/test-sockets.cpp | 98 +++++++ 21 files changed, 898 insertions(+), 252 deletions(-) create mode 100644 src/uscxml/concurrency/DelayedEventQueue.cpp create mode 100644 src/uscxml/concurrency/DelayedEventQueue.h create mode 100644 src/uscxml/concurrency/EventBase.cpp create mode 100644 src/uscxml/concurrency/EventBase.h delete mode 100644 src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp delete mode 100644 src/uscxml/concurrency/eventqueue/DelayedEventQueue.h create mode 100644 src/uscxml/server/Socket.cpp create mode 100644 src/uscxml/server/Socket.h create mode 100644 test/src/test-sockets.cpp diff --git a/src/bindings/swig/java/CMakeLists.txt b/src/bindings/swig/java/CMakeLists.txt index 86f2fd6..eb51f83 100644 --- a/src/bindings/swig/java/CMakeLists.txt +++ b/src/bindings/swig/java/CMakeLists.txt @@ -31,6 +31,7 @@ set_target_properties(uscxmlNativeJava PROPERTIES COMPILE_FLAGS "-DSWIG") swig_link_libraries(uscxmlNativeJava uscxml) +FIND_PROGRAM(ANT_EXECUTABLE ant PATHS $ENV{ANT_HOME}/bin ENV PATH ) if (ANT_EXECUTABLE) set(USCXML_LANGUAGE_BINDINGS "java ${USCXML_LANGUAGE_BINDINGS}") diff --git a/src/uscxml/Interpreter.cpp b/src/uscxml/Interpreter.cpp index 8a9ba63..90f00c4 100644 --- a/src/uscxml/Interpreter.cpp +++ b/src/uscxml/Interpreter.cpp @@ -29,7 +29,7 @@ #include "uscxml/plugins/invoker/http/HTTPServletInvoker.h" #include "uscxml/server/InterpreterServlet.h" -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include #include diff --git a/src/uscxml/concurrency/DelayedEventQueue.cpp b/src/uscxml/concurrency/DelayedEventQueue.cpp new file mode 100644 index 0000000..642c4a0 --- /dev/null +++ b/src/uscxml/concurrency/DelayedEventQueue.cpp @@ -0,0 +1,156 @@ +/** + * @file + * @author 2012-2013 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) + * @copyright Simplified BSD + * + * @cond + * This program is free software: you can redistribute it and/or modify + * it under the terms of the FreeBSD license as published by the FreeBSD + * project. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the FreeBSD license along with this + * program. If not, see . + * @endcond + */ + +#include "DelayedEventQueue.h" +#include "uscxml/messages/Event.h" + +#include + +#include +#include "event2/thread.h" + +namespace uscxml { + +DelayedEventQueue::DelayedEventQueue() { +#ifndef _WIN32 + evthread_use_pthreads(); +#else + evthread_use_windows_threads(); +#endif + _eventLoop = event_base_new(); + _thread = NULL; + _isStarted = false; +} + +DelayedEventQueue::~DelayedEventQueue() { +// std::cout << "Deleting DelayedEventQueue" << std::endl; + stop(); + if (_thread && _isStarted) + _thread->join(); + if(_eventLoop) + event_base_free(_eventLoop); +} + +void DelayedEventQueue::run(void* instance) { + DelayedEventQueue* INSTANCE = (DelayedEventQueue*)instance; + int result; + while(INSTANCE->_isStarted) { +// #ifndef EVLOOP_NO_EXIT_ON_EMPTY +// result = event_base_dispatch(INSTANCE->_eventLoop); +// #else + result = event_base_loop(INSTANCE->_eventLoop, EVLOOP_NO_EXIT_ON_EMPTY); +//#endif + (void)result; + } +} + +void DelayedEventQueue::addEvent(std::string eventId, int fd, short opMask, void (*callback)(void*, const std::string eventId), void* userData, bool persist) { + if(_callbackData.find(eventId) != _callbackData.end()) { + cancelEvent(eventId); + } + + if (persist) + opMask |= EV_PERSIST; + + struct event* event = event_new(_eventLoop, fd, opMask, DelayedEventQueue::fileCallback, &_callbackData[eventId]); + + _callbackData[eventId].eventId = eventId; + _callbackData[eventId].userData = userData; + _callbackData[eventId].eventQueue = this; + _callbackData[eventId].callback = callback; + _callbackData[eventId].event = event; + _callbackData[eventId].persist = false; + + event_add(event, NULL); + +} + +void DelayedEventQueue::addEvent(std::string eventId, void (*callback)(void*, const std::string eventId), uint32_t delayMs, void* userData, bool persist) { + if(_callbackData.find(eventId) != _callbackData.end()) { + cancelEvent(eventId); + } + + struct timeval delay = {delayMs / 1000, (delayMs % 1000) * 1000}; + struct event* event = event_new(_eventLoop, -1, (persist ? EV_PERSIST : 0), DelayedEventQueue::timerCallback, &_callbackData[eventId]); + + _callbackData[eventId].eventId = eventId; + _callbackData[eventId].userData = userData; + _callbackData[eventId].eventQueue = this; + _callbackData[eventId].callback = callback; + _callbackData[eventId].event = event; + _callbackData[eventId].persist = persist; + + event_add(event, &delay); +} + +void DelayedEventQueue::cancelEvent(std::string eventId) { + tthread::lock_guard lock(_mutex); + + if(_callbackData.find(eventId) != _callbackData.end()) { + event_del(_callbackData[eventId].event); + event_free(_callbackData[eventId].event); + _callbackData.erase(eventId); + } +} + +void DelayedEventQueue::start() { + _isStarted = true; + _thread = new tthread::thread(DelayedEventQueue::run, this); +} + +void DelayedEventQueue::stop() { + if (_isStarted) { + _isStarted = false; + event_base_loopbreak(_eventLoop); + } + if (_thread) { + _thread->join(); + delete _thread; + } +} + +void DelayedEventQueue::dummyCallback(evutil_socket_t fd, short what, void *arg) { +} + +void DelayedEventQueue::fileCallback(evutil_socket_t fd, short what, void *arg) { + struct callbackData *data = (struct callbackData*)arg; + tthread::lock_guard lock(data->eventQueue->_mutex); + std::string eventId = data->eventId; // copy eventId + data->callback(data->userData, eventId); +} + +void DelayedEventQueue::timerCallback(evutil_socket_t fd, short what, void *arg) { + struct callbackData *data = (struct callbackData*)arg; + tthread::lock_guard lock(data->eventQueue->_mutex); + + std::string eventId = data->eventId; // copy eventId + try { + data->callback(data->userData, eventId); + } catch (Event e) { + LOG(ERROR) << "Exception thrown when executing delayed event:" << std::endl << e << std::endl; + } catch (...) { + LOG(ERROR) << "Exception thrown when executing delayed event" << std::endl; + } + if (!data->persist) { + event_free(data->event); + data->eventQueue->_callbackData.erase(data->eventId); + } +} + +} \ No newline at end of file diff --git a/src/uscxml/concurrency/DelayedEventQueue.h b/src/uscxml/concurrency/DelayedEventQueue.h new file mode 100644 index 0000000..2248c47 --- /dev/null +++ b/src/uscxml/concurrency/DelayedEventQueue.h @@ -0,0 +1,85 @@ +/** + * @file + * @author 2012-2013 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) + * @copyright Simplified BSD + * + * @cond + * This program is free software: you can redistribute it and/or modify + * it under the terms of the FreeBSD license as published by the FreeBSD + * project. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the FreeBSD license along with this + * program. If not, see . + * @endcond + */ + +#ifndef DELAYEDEVENTQUEUE_H_JA6WRBVP +#define DELAYEDEVENTQUEUE_H_JA6WRBVP + +#include "uscxml/concurrency/tinythread.h" + + +#include "uscxml/Common.h" +#include "event2/util.h" // for evutil_socket_t +#include + +#include + +#include +#include + +namespace uscxml { + +class USCXML_API DelayedEventQueue { +public: + + enum OpMask { + DEQ_READ = EV_READ, + DEQ_WRITE = EV_WRITE, + DEQ_SIGNAL = EV_SIGNAL + }; + + struct callbackData { + void *userData; + void (*callback)(void*, const std::string eventId); + std::string eventId; + bool persist; + struct event *event; + DelayedEventQueue* eventQueue; + }; + + DelayedEventQueue(); + virtual ~DelayedEventQueue(); + + void addEvent(std::string eventId, int fd, short opMask, void (*callback)(void*, const std::string eventId), void* userData, bool persist = true); + void addEvent(std::string eventId, void (*callback)(void*, const std::string eventId), uint32_t delayMs, void* userData, bool persist = false); + void cancelEvent(std::string eventId); + + void start(); + void stop(); + static void run(void*); + + bool isEmpty() { + return _callbackData.empty(); + } + + static void timerCallback(evutil_socket_t fd, short what, void *arg); + static void fileCallback(evutil_socket_t fd, short what, void *arg); + static void dummyCallback(evutil_socket_t fd, short what, void *arg); + + bool _isStarted; + tthread::thread* _thread; + tthread::recursive_mutex _mutex; + + std::map _callbackData; + struct event_base* _eventLoop; +}; + +} + + +#endif /* end of include guard: DELAYEDEVENTQUEUE_H_JA6WRBVP */ diff --git a/src/uscxml/concurrency/EventBase.cpp b/src/uscxml/concurrency/EventBase.cpp new file mode 100644 index 0000000..b5c1308 --- /dev/null +++ b/src/uscxml/concurrency/EventBase.cpp @@ -0,0 +1,74 @@ +/** + * @file + * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) + * @copyright Simplified BSD + * + * @cond + * This program is free software: you can redistribute it and/or modify + * it under the terms of the FreeBSD license as published by the FreeBSD + * project. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the FreeBSD license along with this + * program. If not, see . + * @endcond + */ + +#include "EventBase.h" + +namespace uscxml { + +std::map > EventBase::_eventBases; +tthread::recursive_mutex EventBase::_instanceMutex; + +boost::shared_ptr EventBase::get(const std::string& name) { + tthread::lock_guard lock(_instanceMutex); + + std::map >::const_iterator instIter = _eventBases.begin(); + while(instIter != _eventBases.end()) { + if (!instIter->second.lock()) { + _eventBases.erase(instIter++); + } else { + instIter++; + } + } + + instIter = _eventBases.find(name); + boost::shared_ptr instance = instIter->second.lock(); + if (instance) + return instance; + + instance = boost::shared_ptr(new EventBase()); + _eventBases.insert(std::make_pair(name, instance)); + + return instance; +} + +EventBase::EventBase() { + base = event_base_new(); + _isStarted = true; + _thread = new tthread::thread(EventBase::run, this); +} + +void EventBase::run(void* arg) { + EventBase* INSTANCE = (EventBase*)arg; + int result; + + while(INSTANCE->_isStarted) { + result = event_base_loop(INSTANCE->base, EVLOOP_NO_EXIT_ON_EMPTY); + (void)result; + } +} + +EventBase::~EventBase() { + _isStarted = false; + event_base_loopbreak(base); + _thread->join(); + event_base_free(base); + delete _thread; +} + +} \ No newline at end of file diff --git a/src/uscxml/concurrency/EventBase.h b/src/uscxml/concurrency/EventBase.h new file mode 100644 index 0000000..22bd664 --- /dev/null +++ b/src/uscxml/concurrency/EventBase.h @@ -0,0 +1,61 @@ +/** + * @file + * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) + * @copyright Simplified BSD + * + * @cond + * This program is free software: you can redistribute it and/or modify + * it under the terms of the FreeBSD license as published by the FreeBSD + * project. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the FreeBSD license along with this + * program. If not, see . + * @endcond + */ + +#ifndef EVENTBASE_H_C479DA74 +#define EVENTBASE_H_C479DA74 + +#include "uscxml/Common.h" +#include "uscxml/concurrency/tinythread.h" + +extern "C" { +#include +#include +#include +} + +#include +#include +#include +#include + +namespace uscxml { + +class USCXML_API EventBase { +public: + EventBase(); + virtual ~EventBase(); + + static boost::shared_ptr get(const std::string& name); + struct event_base* base; + +protected: + + static void run(void*); + + tthread::thread* _thread; + bool _isStarted; + + static std::map > _eventBases; + static tthread::recursive_mutex _instanceMutex; + +}; + +} + +#endif /* end of include guard: EVENTBASE_H_C479DA74 */ diff --git a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp b/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp deleted file mode 100644 index 642c4a0..0000000 --- a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp +++ /dev/null @@ -1,156 +0,0 @@ -/** - * @file - * @author 2012-2013 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) - * @copyright Simplified BSD - * - * @cond - * This program is free software: you can redistribute it and/or modify - * it under the terms of the FreeBSD license as published by the FreeBSD - * project. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the FreeBSD license along with this - * program. If not, see . - * @endcond - */ - -#include "DelayedEventQueue.h" -#include "uscxml/messages/Event.h" - -#include - -#include -#include "event2/thread.h" - -namespace uscxml { - -DelayedEventQueue::DelayedEventQueue() { -#ifndef _WIN32 - evthread_use_pthreads(); -#else - evthread_use_windows_threads(); -#endif - _eventLoop = event_base_new(); - _thread = NULL; - _isStarted = false; -} - -DelayedEventQueue::~DelayedEventQueue() { -// std::cout << "Deleting DelayedEventQueue" << std::endl; - stop(); - if (_thread && _isStarted) - _thread->join(); - if(_eventLoop) - event_base_free(_eventLoop); -} - -void DelayedEventQueue::run(void* instance) { - DelayedEventQueue* INSTANCE = (DelayedEventQueue*)instance; - int result; - while(INSTANCE->_isStarted) { -// #ifndef EVLOOP_NO_EXIT_ON_EMPTY -// result = event_base_dispatch(INSTANCE->_eventLoop); -// #else - result = event_base_loop(INSTANCE->_eventLoop, EVLOOP_NO_EXIT_ON_EMPTY); -//#endif - (void)result; - } -} - -void DelayedEventQueue::addEvent(std::string eventId, int fd, short opMask, void (*callback)(void*, const std::string eventId), void* userData, bool persist) { - if(_callbackData.find(eventId) != _callbackData.end()) { - cancelEvent(eventId); - } - - if (persist) - opMask |= EV_PERSIST; - - struct event* event = event_new(_eventLoop, fd, opMask, DelayedEventQueue::fileCallback, &_callbackData[eventId]); - - _callbackData[eventId].eventId = eventId; - _callbackData[eventId].userData = userData; - _callbackData[eventId].eventQueue = this; - _callbackData[eventId].callback = callback; - _callbackData[eventId].event = event; - _callbackData[eventId].persist = false; - - event_add(event, NULL); - -} - -void DelayedEventQueue::addEvent(std::string eventId, void (*callback)(void*, const std::string eventId), uint32_t delayMs, void* userData, bool persist) { - if(_callbackData.find(eventId) != _callbackData.end()) { - cancelEvent(eventId); - } - - struct timeval delay = {delayMs / 1000, (delayMs % 1000) * 1000}; - struct event* event = event_new(_eventLoop, -1, (persist ? EV_PERSIST : 0), DelayedEventQueue::timerCallback, &_callbackData[eventId]); - - _callbackData[eventId].eventId = eventId; - _callbackData[eventId].userData = userData; - _callbackData[eventId].eventQueue = this; - _callbackData[eventId].callback = callback; - _callbackData[eventId].event = event; - _callbackData[eventId].persist = persist; - - event_add(event, &delay); -} - -void DelayedEventQueue::cancelEvent(std::string eventId) { - tthread::lock_guard lock(_mutex); - - if(_callbackData.find(eventId) != _callbackData.end()) { - event_del(_callbackData[eventId].event); - event_free(_callbackData[eventId].event); - _callbackData.erase(eventId); - } -} - -void DelayedEventQueue::start() { - _isStarted = true; - _thread = new tthread::thread(DelayedEventQueue::run, this); -} - -void DelayedEventQueue::stop() { - if (_isStarted) { - _isStarted = false; - event_base_loopbreak(_eventLoop); - } - if (_thread) { - _thread->join(); - delete _thread; - } -} - -void DelayedEventQueue::dummyCallback(evutil_socket_t fd, short what, void *arg) { -} - -void DelayedEventQueue::fileCallback(evutil_socket_t fd, short what, void *arg) { - struct callbackData *data = (struct callbackData*)arg; - tthread::lock_guard lock(data->eventQueue->_mutex); - std::string eventId = data->eventId; // copy eventId - data->callback(data->userData, eventId); -} - -void DelayedEventQueue::timerCallback(evutil_socket_t fd, short what, void *arg) { - struct callbackData *data = (struct callbackData*)arg; - tthread::lock_guard lock(data->eventQueue->_mutex); - - std::string eventId = data->eventId; // copy eventId - try { - data->callback(data->userData, eventId); - } catch (Event e) { - LOG(ERROR) << "Exception thrown when executing delayed event:" << std::endl << e << std::endl; - } catch (...) { - LOG(ERROR) << "Exception thrown when executing delayed event" << std::endl; - } - if (!data->persist) { - event_free(data->event); - data->eventQueue->_callbackData.erase(data->eventId); - } -} - -} \ No newline at end of file diff --git a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.h b/src/uscxml/concurrency/eventqueue/DelayedEventQueue.h deleted file mode 100644 index 2248c47..0000000 --- a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.h +++ /dev/null @@ -1,85 +0,0 @@ -/** - * @file - * @author 2012-2013 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) - * @copyright Simplified BSD - * - * @cond - * This program is free software: you can redistribute it and/or modify - * it under the terms of the FreeBSD license as published by the FreeBSD - * project. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the FreeBSD license along with this - * program. If not, see . - * @endcond - */ - -#ifndef DELAYEDEVENTQUEUE_H_JA6WRBVP -#define DELAYEDEVENTQUEUE_H_JA6WRBVP - -#include "uscxml/concurrency/tinythread.h" - - -#include "uscxml/Common.h" -#include "event2/util.h" // for evutil_socket_t -#include - -#include - -#include -#include - -namespace uscxml { - -class USCXML_API DelayedEventQueue { -public: - - enum OpMask { - DEQ_READ = EV_READ, - DEQ_WRITE = EV_WRITE, - DEQ_SIGNAL = EV_SIGNAL - }; - - struct callbackData { - void *userData; - void (*callback)(void*, const std::string eventId); - std::string eventId; - bool persist; - struct event *event; - DelayedEventQueue* eventQueue; - }; - - DelayedEventQueue(); - virtual ~DelayedEventQueue(); - - void addEvent(std::string eventId, int fd, short opMask, void (*callback)(void*, const std::string eventId), void* userData, bool persist = true); - void addEvent(std::string eventId, void (*callback)(void*, const std::string eventId), uint32_t delayMs, void* userData, bool persist = false); - void cancelEvent(std::string eventId); - - void start(); - void stop(); - static void run(void*); - - bool isEmpty() { - return _callbackData.empty(); - } - - static void timerCallback(evutil_socket_t fd, short what, void *arg); - static void fileCallback(evutil_socket_t fd, short what, void *arg); - static void dummyCallback(evutil_socket_t fd, short what, void *arg); - - bool _isStarted; - tthread::thread* _thread; - tthread::recursive_mutex _mutex; - - std::map _callbackData; - struct event_base* _eventLoop; -}; - -} - - -#endif /* end of include guard: DELAYEDEVENTQUEUE_H_JA6WRBVP */ diff --git a/src/uscxml/interpreter/InterpreterDraft6.cpp b/src/uscxml/interpreter/InterpreterDraft6.cpp index e2ba95e..7569fdb 100644 --- a/src/uscxml/interpreter/InterpreterDraft6.cpp +++ b/src/uscxml/interpreter/InterpreterDraft6.cpp @@ -18,7 +18,7 @@ */ #include "InterpreterDraft6.h" -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include #include "uscxml/UUID.h" diff --git a/src/uscxml/interpreter/InterpreterRC.cpp b/src/uscxml/interpreter/InterpreterRC.cpp index 3d17c87..24b9003 100644 --- a/src/uscxml/interpreter/InterpreterRC.cpp +++ b/src/uscxml/interpreter/InterpreterRC.cpp @@ -20,7 +20,7 @@ #include "InterpreterRC.h" #include "uscxml/Factory.h" -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include #include "uscxml/UUID.h" diff --git a/src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp b/src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp index 849845e..8a4ea3d 100644 --- a/src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp +++ b/src/uscxml/plugins/invoker/calendar/CalendarInvoker.cpp @@ -20,7 +20,7 @@ #include #include "CalendarInvoker.h" -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include diff --git a/src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp b/src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp index 22c7942..69ad686 100644 --- a/src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp +++ b/src/uscxml/plugins/invoker/expect/ExpectInvoker.cpp @@ -20,7 +20,7 @@ #include "ExpectInvoker.h" #include -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #ifdef BUILD_AS_PLUGINS #include diff --git a/src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h b/src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h index ba4e9ef..f0bc32c 100644 --- a/src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h +++ b/src/uscxml/plugins/invoker/heartbeat/HeartbeatInvoker.h @@ -21,7 +21,7 @@ #define HEARTBEATINVOKER_H_W09J90F0 #include -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #ifdef BUILD_AS_PLUGINS #include "uscxml/plugins/Plugins.h" diff --git a/src/uscxml/plugins/invoker/im/IMInvoker.cpp b/src/uscxml/plugins/invoker/im/IMInvoker.cpp index 08d6a03..2bbb855 100644 --- a/src/uscxml/plugins/invoker/im/IMInvoker.cpp +++ b/src/uscxml/plugins/invoker/im/IMInvoker.cpp @@ -23,7 +23,7 @@ #include "uscxml/DOMUtils.h" #include -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #ifdef BUILD_AS_PLUGINS #include diff --git a/src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h b/src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h index 29bc208..3a43e49 100644 --- a/src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h +++ b/src/uscxml/plugins/ioprocessor/basichttp/BasicHTTPIOProcessor.h @@ -35,7 +35,7 @@ extern "C" { # define USCXML_PLUGIN_API #endif -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include "uscxml/server/HTTPServer.h" #include "uscxml/Interpreter.h" #include "uscxml/Factory.h" diff --git a/src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h b/src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h index e30bdb2..0a57d70 100644 --- a/src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h +++ b/src/uscxml/plugins/ioprocessor/comet/CometIOProcessor.h @@ -20,7 +20,7 @@ #ifndef COMETIOPROCESSOR_H_2CUY93KU #define COMETIOPROCESSOR_H_2CUY93KU -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include "uscxml/server/HTTPServer.h" #include "uscxml/Interpreter.h" #include "uscxml/Factory.h" diff --git a/src/uscxml/server/Socket.cpp b/src/uscxml/server/Socket.cpp new file mode 100644 index 0000000..9c844e5 --- /dev/null +++ b/src/uscxml/server/Socket.cpp @@ -0,0 +1,291 @@ +/** + * @file + * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) + * @copyright Simplified BSD + * + * @cond + * This program is free software: you can redistribute it and/or modify + * it under the terms of the FreeBSD license as published by the FreeBSD + * project. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the FreeBSD license along with this + * program. If not, see . + * @endcond + */ + +#include "Socket.h" + +#include "uscxml/Message.h" // for Data, Event +#include "uscxml/config.h" // for OPENSSL_FOUND + +#ifndef _WIN32 +#include /* For socket functions */ +#include // inet_addr +#endif + +#include /* For fcntl */ +#include + +namespace uscxml { + +// see: http://codepad.org/XRJAVg5m +Socket::Socket(int domain, int type, int protocol) { + + _base = EventBase::get("sockets"); + _blockSizeRead = 1024; + + if (!_base) + throw std::runtime_error("Cannot get eventbase"); + + _sin.sin_family = domain; + _socketFD = socket(domain, type, protocol); + + if (_socketFD == -1) + throw std::runtime_error(std::string("socket: ") + strerror(errno)); +#ifndef WIN32 + { + int one = 1; + if (setsockopt(_socketFD, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) != 0) { + throw std::runtime_error(std::string("setsockopt: ") + strerror(errno)); + } + } +#endif + +} + +Socket::~Socket() { + if (_socketFD > 0) +#ifdef WIN32 + closesocket(_socketFD); +#else + close(_socketFD); +#endif +} + +void Socket::setupSockAddr(const std::string& address, int port) { + if (address == "*") { + _sin.sin_addr.s_addr = 0; + } else { + _sin.sin_addr.s_addr = inet_addr(address.c_str()); + if (_sin.sin_addr.s_addr == INADDR_NONE) + throw std::runtime_error(std::string("inet_addr: ") + strerror(errno)); + } + + _sin.sin_port = htons(port); +} + +void Socket::setBlockSizeRead(size_t size) { +// tthread::lock_guard lock(_mutex); + _blockSizeRead = size; +} + +ClientSocket::ClientSocket(int domain, int type, int protocol) : Socket(domain, type, protocol), _clientEvent(NULL) { +} + + +ClientSocket::~ClientSocket() { + if (_clientEvent) { + bufferevent_enable(_clientEvent, 0); + bufferevent_free(_clientEvent); + } +} + +void ClientSocket::errorCallback(struct bufferevent *bev, short error, void *ctx) { + ClientSocket* instance = (ClientSocket*)ctx; + // tthread::lock_guard lock(instance->_mutex); + + if (error & BEV_EVENT_READING) { + std::cout << "ClientSocket: error encountered while reading" << std::endl; + } else if (error & BEV_EVENT_WRITING) { + std::cout << "ClientSocket: error encountered while writing" << std::endl; + } else if (error & BEV_EVENT_EOF) { + std::cout << "ClientSocket: eof file reached" << std::endl; + } else if (error & BEV_EVENT_ERROR) { + std::cout << "ClientSocket: unrecoverable error encountered" << std::endl; + } else if (error & BEV_EVENT_TIMEOUT) { + std::cout << "ClientSocket: user-specified timeout reached" << std::endl; + } else if (error & BEV_EVENT_CONNECTED) { + std::cout << "ClientSocket: connect operation finished" << std::endl; + } + + // bufferevent_free(bev); +} + +void ClientSocket::connect(const std::string& address, int port) { +// tthread::lock_guard lock(_mutex); + + setupSockAddr(address, port); + if(::connect(_socketFD, (struct sockaddr *)&_sin, sizeof _sin) != 0) { + throw std::runtime_error(std::string("connect: ") + strerror(errno)); + } + + _clientEvent = bufferevent_socket_new(_base->base, _socketFD, 0); //BEV_OPT_THREADSAFE); + bufferevent_setcb(_clientEvent, ClientSocket::readCallback, NULL, ClientSocket::errorCallback, this); + bufferevent_enable(_clientEvent, EV_READ|EV_WRITE); +} + +int ClientSocket::write(const char* data, size_t size) { +// tthread::lock_guard lock(_mutex); + bufferevent_write(_clientEvent, data, size); + return size; +} + +void ClientSocket::readCallback(struct bufferevent *bev, void *ctx) { + ClientSocket* instance = (ClientSocket*)ctx; +// tthread::lock_guard lock(instance->_mutex); + + size_t n; + struct evbuffer* input; + char* data = (char*)malloc(instance->_blockSizeRead); + + input = bufferevent_get_input(bev); + n = evbuffer_remove(input, data, instance->_blockSizeRead); + + instance->readCallback(data, n); + free(data); +} + +std::set ServerSocket::_instances; + +ServerSocket::ServerSocket(int domain, int type, int protocol) : Socket(domain, type, protocol), _listenerEvent(NULL) { + _instances.insert(this); +} + +ServerSocket::~ServerSocket() { + tthread::lock_guard lock(_mutex); + + std::map::iterator connIter = _connections.begin(); + while(connIter != _connections.end()) { + bufferevent_enable(connIter->second.bufferEvent, 0); + bufferevent_setcb(connIter->second.bufferEvent, NULL, NULL, NULL, 0); + + bufferevent_free(connIter->second.bufferEvent); +#ifdef WIN32 + closesocket(connIter->second.fd); +#else + close(connIter->second.fd); +#endif + + connIter++; + } + + if (_listenerEvent) { + event_del(_listenerEvent); + event_free(_listenerEvent); + } + + _instances.erase(this); + +} + +void ServerSocket::errorCallback(struct bufferevent *bev, short error, void *ctx) { + ServerSocket* instance = (ServerSocket*)ctx; + tthread::lock_guard lock(instance->_mutex); + + if (_instances.find(instance) == _instances.end()) + return; + + if (error & BEV_EVENT_READING || error & BEV_EVENT_WRITING) { + // remote end close the connection + tthread::lock_guard lock(instance->_mutex); + std::map::iterator conn = instance->_connections.find(bev); + if (conn != instance->_connections.end()) { + bufferevent_enable(conn->second.bufferEvent, 0); + bufferevent_free(conn->second.bufferEvent); +#ifdef WIN32 + closesocket(conn->second.fd); +#else + close(conn->second.fd); +#endif + + instance->_connections.erase(conn); + } + } else if (error & BEV_EVENT_EOF) { + std::cout << "ServerSocket: eof file reached" << std::endl; + } else if (error & BEV_EVENT_ERROR) { + std::cout << "ServerSocket: unrecoverable error encountered" << std::endl; + } else if (error & BEV_EVENT_TIMEOUT) { + std::cout << "ServerSocket: user-specified timeout reached" << std::endl; + } else if (error & BEV_EVENT_CONNECTED) { + std::cout << "ServerSocket: connect operation finished" << std::endl; + } + // bufferevent_free(bev); +} + +void ServerSocket::readCallback(struct bufferevent *bev, void *ctx) { + ServerSocket* instance = (ServerSocket*)ctx; + tthread::lock_guard lock(instance->_mutex); + + // instance is already gone + if (_instances.find(instance) == _instances.end()) + return; + + size_t n; + struct evbuffer* input; + char* data = (char*)malloc(instance->_blockSizeRead); + + input = bufferevent_get_input(bev); + n = evbuffer_remove(input, data, instance->_blockSizeRead); + + instance->readCallback(data, n, instance->_connections[bev]); + free(data); +} + +void ServerSocket::bind() { + if (::bind(_socketFD, (struct sockaddr*)&_sin, sizeof(_sin)) < 0) { + throw std::runtime_error(std::string("bind: ") + strerror(errno)); + } +} + +void ServerSocket::listen(const std::string& address, int port) { +// tthread::lock_guard lock(_mutex); + setupSockAddr(address, port); + bind(); + + _listenerEvent = event_new(_base->base, _socketFD, EV_READ|EV_PERSIST, acceptCallback, (void*)this); + /*XXX check it */ + event_add(_listenerEvent, NULL); + + if (::listen(_socketFD, 16)<0) { + throw std::runtime_error(std::string("listen: ") + strerror(errno)); + } +} + +void ServerSocket::acceptCallback(evutil_socket_t listener, short event, void *ctx) { + ServerSocket* instance = (ServerSocket*)ctx; +// tthread::lock_guard lock(instance->_mutex); + + struct sockaddr_storage ss; + socklen_t slen = sizeof(ss); + int fd = accept(listener, (struct sockaddr*)&ss, &slen); + if (fd < 0) { + throw std::runtime_error(std::string("accept: ") + strerror(errno)); + } else if (fd > FD_SETSIZE) { +#ifdef WIN32 + closesocket(fd); +#else + close(fd); +#endif + + throw std::runtime_error(std::string("accept: ") + strerror(errno)); + } else { + struct bufferevent *bev; + evutil_make_socket_nonblocking(fd); + bev = bufferevent_socket_new(instance->_base->base, fd, BEV_OPT_THREADSAFE); + bufferevent_setcb(bev, ServerSocket::readCallback, NULL, ServerSocket::errorCallback, ctx); + bufferevent_enable(bev, EV_READ|EV_WRITE); + + instance->_connections[bev].bufferEvent = bev; + instance->_connections[bev].fd = fd; + } +} + +void ServerSocket::Connection::reply(const char* data, size_t size) { + bufferevent_write(bufferEvent, data, size); +} + +} diff --git a/src/uscxml/server/Socket.h b/src/uscxml/server/Socket.h new file mode 100644 index 0000000..fcaada4 --- /dev/null +++ b/src/uscxml/server/Socket.h @@ -0,0 +1,116 @@ +/** + * @file + * @author 2012-2014 Stefan Radomski (stefan.radomski@cs.tu-darmstadt.de) + * @copyright Simplified BSD + * + * @cond + * This program is free software: you can redistribute it and/or modify + * it under the terms of the FreeBSD license as published by the FreeBSD + * project. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the FreeBSD license along with this + * program. If not, see . + * @endcond + */ + +#ifndef SOCKETCLIENT_H_9A0B2A88 +#define SOCKETCLIENT_H_9A0B2A88 + +#include "uscxml/Common.h" // for USCXML_API +#include "uscxml/concurrency/EventBase.h" +#include +#include +#include + +#ifdef _WIN32 +# include +#else +# include /* For sockaddr_in */ +#endif + +#include "uscxml/concurrency/tinythread.h" // for recursive_mutex, etc + +extern "C" { +#include +#include +#include +} + +namespace uscxml { + +class USCXML_API Socket { +public: + Socket(int domain, int type, int protocol); + virtual ~Socket(); + + void setBlockSizeRead(size_t size); + +protected: + + void setupSockAddr(const std::string& address, int port); + + evutil_socket_t _socketFD; + + tthread::recursive_mutex _mutex; + size_t _blockSizeRead; + struct sockaddr_in _sin; + + boost::shared_ptr _base; +}; + +class USCXML_API ServerSocket : public Socket { +public: + class Connection { + public: + struct bufferevent* bufferEvent; + int fd; + + void reply(const char* data, size_t size); + }; + + ServerSocket(int domain, int type, int protocol); + virtual ~ServerSocket(); + + void listen(const std::string& address, int port); + virtual void readCallback(const char* data, size_t size, Connection& conn) {}; + + +protected: + void bind(); + static void acceptCallback(evutil_socket_t listener, short event, void *ctx); + static void errorCallback(struct bufferevent *bev, short error, void *ctx); + static void readCallback(struct bufferevent *bev, void *ctx); + + std::map _connections; + struct event* _listenerEvent; + + static std::set _instances; + +}; + +class USCXML_API ClientSocket : public Socket { +public: + ClientSocket(int domain, int type, int protocol); + virtual ~ClientSocket(); + + virtual void readCallback(const char* data, size_t size) {}; + void connect(const std::string& address, int port); + int write(const char* data, size_t size); + + +protected: + static void readCallback(struct bufferevent *bev, void *ctx); + static void errorCallback(struct bufferevent *bev, short error, void *ctx); + + struct bufferevent* _clientEvent; + +}; + + +} + +#endif /* end of include guard: SOCKETCLIENT_H_9A0B2A88 */ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 10c1213..62b8749 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -117,7 +117,7 @@ set_target_properties(test-url PROPERTIES FOLDER "Tests") add_executable(test-cmdline-parsing src/test-cmdline-parsing.cpp) target_link_libraries(test-cmdline-parsing uscxml) # add_test(test-cmdline-parsing ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/test-cmdline-parsing) -# set_target_properties(test-cmdline-parsing PROPERTIES FOLDER "Tests") +set_target_properties(test-cmdline-parsing PROPERTIES FOLDER "Tests") # add_executable(test-initial-config src/test-initial-config.cpp) # target_link_libraries(test-initial-config uscxml) @@ -129,6 +129,11 @@ target_link_libraries(test-datamodel uscxml) add_test(test-datamodel ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/test-datamodel) set_target_properties(test-datamodel PROPERTIES FOLDER "Tests") +add_executable(test-sockets src/test-sockets.cpp) +target_link_libraries(test-sockets uscxml) +# add_test(test-datamodel ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/test-sockets) +set_target_properties(test-sockets PROPERTIES FOLDER "Tests") + # if (NOT WIN32) # add_executable(test-mmi src/test-mmi.cpp) # target_link_libraries(test-mmi uscxml) diff --git a/test/src/test-eventdelay.cpp b/test/src/test-eventdelay.cpp index 12cc751..ce6c923 100644 --- a/test/src/test-eventdelay.cpp +++ b/test/src/test-eventdelay.cpp @@ -1,4 +1,4 @@ -#include "uscxml/concurrency/eventqueue/DelayedEventQueue.h" +#include "uscxml/concurrency/DelayedEventQueue.h" #include int eventCalled = 0; diff --git a/test/src/test-sockets.cpp b/test/src/test-sockets.cpp new file mode 100644 index 0000000..a712da0 --- /dev/null +++ b/test/src/test-sockets.cpp @@ -0,0 +1,98 @@ +#include "uscxml/config.h" +#include "uscxml/server/Socket.h" +#include + +#include +#include "event2/thread.h" + +#ifdef HAS_SIGNAL_H +#include +#endif + +#include "uscxml/concurrency/tinythread.h" + +using namespace uscxml; + +class TestServer : public ServerSocket { +public: + TestServer(int domain, int type, int protocol) : ServerSocket(domain, type, protocol) {} + virtual void readCallback(const char* data, size_t size, Connection& conn) { + std::string content(data, size); +// std::cout << "Server got: " << content << std::endl; + std::string urghs("hi!"); + conn.reply(urghs.data(), urghs.size()); + }; +}; + +class TestClient : public ClientSocket { +public: + TestClient(int domain, int type, int protocol) : ClientSocket(domain, type, protocol) {} + virtual void readCallback(const char* data, size_t size) { + std::string content(data, size); + }; +}; + +int main(int argc, char** argv) { + +#if defined(HAS_SIGNAL_H) && !defined(WIN32) + signal(SIGPIPE, SIG_IGN); +#endif + +#ifndef _WIN32 + evthread_use_pthreads(); +#else + evthread_use_windows_threads(); +#endif + + if (0) { + // start server socket and connect + int iterations = 100; + + TestServer server(PF_INET, SOCK_STREAM, 0); + try { + server.listen("*", 1234); + + while(iterations--) { + std::cout << iterations << std::endl; + TestClient client(PF_INET, SOCK_STREAM, 0); + client.connect("127.0.0.1", 1234); + + std::string hello("hello"); + client.write(hello.data(), hello.size()); + + tthread::this_thread::sleep_for(tthread::chrono::milliseconds(20)); + } + + } catch (std::runtime_error e) { + std::cout << e.what() << std::endl; + } + } + + { + // connect client to server and kill server + int iterations = 100; + + try { + + while(iterations--) { + std::cout << iterations << std::endl; + TestServer* server = new TestServer(PF_INET, SOCK_STREAM, 0); + server->listen("*", 1236 + iterations); + + TestClient client(PF_INET, SOCK_STREAM, 0); + client.connect("127.0.0.1", 1236 + iterations); + + std::string hello("hello"); + client.write(hello.data(), hello.size()); + + delete server; + + tthread::this_thread::sleep_for(tthread::chrono::milliseconds(20)); + } + + } catch (std::runtime_error e) { + std::cout << e.what() << std::endl; + } + + } +} \ No newline at end of file -- cgit v0.12