summaryrefslogtreecommitdiffstats
path: root/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp
diff options
context:
space:
mode:
authorStefan Radomski <radomski@tk.informatik.tu-darmstadt.de>2012-12-15 19:10:50 (GMT)
committerStefan Radomski <radomski@tk.informatik.tu-darmstadt.de>2012-12-15 19:10:50 (GMT)
commitf1700edcd08d6215888e226618555ba43b5324ec (patch)
tree738f30de64f699c3f56d2e15963537c9493a24b4 /src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp
parent2855a9ff7b423140237c9e988252fde0cbacd0a1 (diff)
downloaduscxml-f1700edcd08d6215888e226618555ba43b5324ec.zip
uscxml-f1700edcd08d6215888e226618555ba43b5324ec.tar.gz
uscxml-f1700edcd08d6215888e226618555ba43b5324ec.tar.bz2
Refactoring and plugin support
Diffstat (limited to 'src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp')
-rw-r--r--src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp91
1 files changed, 91 insertions, 0 deletions
diff --git a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp b/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp
new file mode 100644
index 0000000..e2a89b2
--- /dev/null
+++ b/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp
@@ -0,0 +1,91 @@
+#include "DelayedEventQueue.h"
+#include <assert.h>
+#include <event2/event.h>
+
+namespace uscxml {
+
+ DelayedEventQueue::DelayedEventQueue() {
+#ifndef _WIN32
+ evthread_use_pthreads();
+#else
+ evthread_use_windows_threads();
+#endif
+ _eventLoop = event_base_new();
+ _thread = NULL;
+ }
+
+ DelayedEventQueue::~DelayedEventQueue() {
+// std::cout << "Deleting DelayedEventQueue" << std::endl;
+ stop();
+ if (_thread)
+ _thread->join();
+ if(_eventLoop)
+ event_base_free(_eventLoop);
+ }
+
+ void DelayedEventQueue::run(void* instance) {
+ DelayedEventQueue* THIS = (DelayedEventQueue*)instance;
+ int result;
+ while(THIS->_isStarted) {
+ {
+ //result = event_base_dispatch(THIS->_eventLoop);
+ result = event_base_loop(THIS->_eventLoop, EVLOOP_NO_EXIT_ON_EMPTY);
+ }
+ }
+ }
+
+ void DelayedEventQueue::addEvent(std::string eventId, void (*callback)(void*, const std::string eventId), uint32_t delayMs, void* userData) {
+ if(_callbackData.find(eventId) != _callbackData.end()) {
+ cancelEvent(eventId);
+ }
+
+ struct timeval delay = {delayMs / 1000, (delayMs % 1000) * 1000};
+ struct event* event = event_new(_eventLoop, -1, 0, DelayedEventQueue::timerCallback, &_callbackData[eventId]);
+
+ _callbackData[eventId].eventId = eventId;
+ _callbackData[eventId].userData = userData;
+ _callbackData[eventId].eventQueue = this;
+ _callbackData[eventId].callback = callback;
+ _callbackData[eventId].event = event;
+
+ event_add(event, &delay);
+ }
+
+ void DelayedEventQueue::cancelEvent(std::string eventId) {
+ tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
+
+ if(_callbackData.find(eventId) != _callbackData.end()) {
+ event_del(_callbackData[eventId].event);
+ event_free(_callbackData[eventId].event);
+ _callbackData.erase(eventId);
+ }
+ }
+
+ void DelayedEventQueue::start() {
+ _isStarted = true;
+ _thread = new tthread::thread(DelayedEventQueue::run, this);
+ }
+
+ void DelayedEventQueue::stop() {
+ if (_isStarted) {
+ _isStarted = false;
+ event_base_loopbreak(_eventLoop);
+ _thread->join();
+ delete _thread;
+ }
+ }
+
+ void DelayedEventQueue::dummyCallback(evutil_socket_t fd, short what, void *arg) {
+ }
+
+ void DelayedEventQueue::timerCallback(evutil_socket_t fd, short what, void *arg) {
+ struct callbackData *data = (struct callbackData*)arg;
+ tthread::lock_guard<tthread::recursive_mutex> lock(data->eventQueue->_mutex);
+
+ std::string eventId = data->eventId; // copy eventId
+ event_free(data->event);
+ data->callback(data->userData, eventId);
+ data->eventQueue->_callbackData.erase(data->eventId);
+ }
+
+} \ No newline at end of file