diff options
author | Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de> | 2012-12-20 21:34:09 (GMT) |
---|---|---|
committer | Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de> | 2012-12-20 21:34:09 (GMT) |
commit | 498f6f80e9ca01236ca1491596875ab7eb4cd8c3 (patch) | |
tree | e627ae19475bb93a98dfa50db1950f6e3403f569 /src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp | |
parent | d779abe6ff76a78f92d229fcf1f006f5cf1f9295 (diff) | |
download | uscxml-498f6f80e9ca01236ca1491596875ab7eb4cd8c3.zip uscxml-498f6f80e9ca01236ca1491596875ab7eb4cd8c3.tar.gz uscxml-498f6f80e9ca01236ca1491596875ab7eb4cd8c3.tar.bz2 |
Refactoring finished
Support datamodels, invokers and ioprocessors as plugins
Comply to HTTP1.1 by sending host header field
Started prolog datamodel
Diffstat (limited to 'src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp')
-rw-r--r-- | src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp | 157 |
1 files changed, 79 insertions, 78 deletions
diff --git a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp b/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp index e2a89b2..e582c13 100644 --- a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp +++ b/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp @@ -4,88 +4,89 @@ namespace uscxml { - DelayedEventQueue::DelayedEventQueue() { +DelayedEventQueue::DelayedEventQueue() { #ifndef _WIN32 - evthread_use_pthreads(); + evthread_use_pthreads(); #else - evthread_use_windows_threads(); + evthread_use_windows_threads(); #endif - _eventLoop = event_base_new(); - _thread = NULL; - } + _eventLoop = event_base_new(); + _thread = NULL; +} - DelayedEventQueue::~DelayedEventQueue() { +DelayedEventQueue::~DelayedEventQueue() { // std::cout << "Deleting DelayedEventQueue" << std::endl; - stop(); - if (_thread) - _thread->join(); - if(_eventLoop) - event_base_free(_eventLoop); - } - - void DelayedEventQueue::run(void* instance) { - DelayedEventQueue* THIS = (DelayedEventQueue*)instance; - int result; - while(THIS->_isStarted) { - { - //result = event_base_dispatch(THIS->_eventLoop); - result = event_base_loop(THIS->_eventLoop, EVLOOP_NO_EXIT_ON_EMPTY); - } - } - } - - void DelayedEventQueue::addEvent(std::string eventId, void (*callback)(void*, const std::string eventId), uint32_t delayMs, void* userData) { - if(_callbackData.find(eventId) != _callbackData.end()) { - cancelEvent(eventId); - } - - struct timeval delay = {delayMs / 1000, (delayMs % 1000) * 1000}; - struct event* event = event_new(_eventLoop, -1, 0, DelayedEventQueue::timerCallback, &_callbackData[eventId]); - - _callbackData[eventId].eventId = eventId; - _callbackData[eventId].userData = userData; - _callbackData[eventId].eventQueue = this; - _callbackData[eventId].callback = callback; - _callbackData[eventId].event = event; - - event_add(event, &delay); - } - - void DelayedEventQueue::cancelEvent(std::string eventId) { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - - if(_callbackData.find(eventId) != _callbackData.end()) { - event_del(_callbackData[eventId].event); - event_free(_callbackData[eventId].event); - _callbackData.erase(eventId); - } - } - - void DelayedEventQueue::start() { - _isStarted = true; - _thread = new tthread::thread(DelayedEventQueue::run, this); - } - - void DelayedEventQueue::stop() { - if (_isStarted) { - _isStarted = false; - event_base_loopbreak(_eventLoop); - _thread->join(); - delete _thread; - } - } - - void DelayedEventQueue::dummyCallback(evutil_socket_t fd, short what, void *arg) { - } - - void DelayedEventQueue::timerCallback(evutil_socket_t fd, short what, void *arg) { - struct callbackData *data = (struct callbackData*)arg; - tthread::lock_guard<tthread::recursive_mutex> lock(data->eventQueue->_mutex); - - std::string eventId = data->eventId; // copy eventId - event_free(data->event); - data->callback(data->userData, eventId); - data->eventQueue->_callbackData.erase(data->eventId); - } + stop(); + if (_thread) + _thread->join(); + if(_eventLoop) + event_base_free(_eventLoop); +} + +void DelayedEventQueue::run(void* instance) { + DelayedEventQueue* THIS = (DelayedEventQueue*)instance; + int result; + while(THIS->_isStarted) { + { + //result = event_base_dispatch(THIS->_eventLoop); + result = event_base_loop(THIS->_eventLoop, EVLOOP_NO_EXIT_ON_EMPTY); + (void)result; + } + } +} + +void DelayedEventQueue::addEvent(std::string eventId, void (*callback)(void*, const std::string eventId), uint32_t delayMs, void* userData) { + if(_callbackData.find(eventId) != _callbackData.end()) { + cancelEvent(eventId); + } + + struct timeval delay = {delayMs / 1000, (delayMs % 1000) * 1000}; + struct event* event = event_new(_eventLoop, -1, 0, DelayedEventQueue::timerCallback, &_callbackData[eventId]); + + _callbackData[eventId].eventId = eventId; + _callbackData[eventId].userData = userData; + _callbackData[eventId].eventQueue = this; + _callbackData[eventId].callback = callback; + _callbackData[eventId].event = event; + + event_add(event, &delay); +} + +void DelayedEventQueue::cancelEvent(std::string eventId) { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + + if(_callbackData.find(eventId) != _callbackData.end()) { + event_del(_callbackData[eventId].event); + event_free(_callbackData[eventId].event); + _callbackData.erase(eventId); + } +} + +void DelayedEventQueue::start() { + _isStarted = true; + _thread = new tthread::thread(DelayedEventQueue::run, this); +} + +void DelayedEventQueue::stop() { + if (_isStarted) { + _isStarted = false; + event_base_loopbreak(_eventLoop); + _thread->join(); + delete _thread; + } +} + +void DelayedEventQueue::dummyCallback(evutil_socket_t fd, short what, void *arg) { +} + +void DelayedEventQueue::timerCallback(evutil_socket_t fd, short what, void *arg) { + struct callbackData *data = (struct callbackData*)arg; + tthread::lock_guard<tthread::recursive_mutex> lock(data->eventQueue->_mutex); + + std::string eventId = data->eventId; // copy eventId + event_free(data->event); + data->callback(data->userData, eventId); + data->eventQueue->_callbackData.erase(data->eventId); +} }
\ No newline at end of file |