summaryrefslogtreecommitdiffstats
path: root/src/uscxml/concurrency/eventqueue/libevent/DelayedEventQueue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/uscxml/concurrency/eventqueue/libevent/DelayedEventQueue.cpp')
-rw-r--r--src/uscxml/concurrency/eventqueue/libevent/DelayedEventQueue.cpp87
1 files changed, 87 insertions, 0 deletions
diff --git a/src/uscxml/concurrency/eventqueue/libevent/DelayedEventQueue.cpp b/src/uscxml/concurrency/eventqueue/libevent/DelayedEventQueue.cpp
new file mode 100644
index 0000000..ce42af7
--- /dev/null
+++ b/src/uscxml/concurrency/eventqueue/libevent/DelayedEventQueue.cpp
@@ -0,0 +1,87 @@
+#include "uscxml/concurrency/eventqueue/libevent/DelayedEventQueue.h"
+#include <assert.h>
+#include <event2/event.h>
+
+namespace uscxml {
+
+ DelayedEventQueue::DelayedEventQueue() {
+ evthread_use_pthreads();
+ _eventLoop = event_base_new();
+ _thread = NULL;
+ }
+
+ DelayedEventQueue::~DelayedEventQueue() {
+ std::cout << "Deleting DelayedEventQueue" << std::endl;
+ if(_eventLoop)
+ event_base_loopbreak(_eventLoop);
+ if (_thread)
+ _thread->join();
+ if(_eventLoop)
+ event_base_free(_eventLoop);
+ }
+
+ void DelayedEventQueue::run(void* instance) {
+ DelayedEventQueue* THIS = (DelayedEventQueue*)instance;
+ int result;
+ while(THIS->_isStarted) {
+ {
+ //result = event_base_dispatch(THIS->_eventLoop);
+ result = event_base_loop(THIS->_eventLoop, EVLOOP_NO_EXIT_ON_EMPTY);
+ }
+ }
+ }
+
+ void DelayedEventQueue::addEvent(std::string eventId, void (*callback)(void*, const std::string eventId), uint32_t delayMs, void* userData) {
+ if(_callbackData.find(eventId) != _callbackData.end()) {
+ cancelEvent(eventId);
+ }
+
+ struct timeval delay = {delayMs / 1000, (delayMs % 1000) * 1000};
+ struct event* event = event_new(_eventLoop, -1, 0, DelayedEventQueue::timerCallback, &_callbackData[eventId]);
+
+ _callbackData[eventId].eventId = eventId;
+ _callbackData[eventId].userData = userData;
+ _callbackData[eventId].eventQueue = this;
+ _callbackData[eventId].callback = callback;
+ _callbackData[eventId].event = event;
+
+ event_add(event, &delay);
+ }
+
+ void DelayedEventQueue::cancelEvent(std::string eventId) {
+ tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
+
+ if(_callbackData.find(eventId) != _callbackData.end()) {
+ event_del(_callbackData[eventId].event);
+ event_free(_callbackData[eventId].event);
+ _callbackData.erase(eventId);
+ }
+ }
+
+ void DelayedEventQueue::start() {
+ _isStarted = true;
+ _thread = new tthread::thread(DelayedEventQueue::run, this);
+ }
+
+ void DelayedEventQueue::stop() {
+ if (_isStarted) {
+ _isStarted = false;
+ _thread->join();
+ delete _thread;
+ }
+ }
+
+ void DelayedEventQueue::dummyCallback(evutil_socket_t fd, short what, void *arg) {
+ }
+
+ void DelayedEventQueue::timerCallback(evutil_socket_t fd, short what, void *arg) {
+ struct callbackData *data = (struct callbackData*)arg;
+ tthread::lock_guard<tthread::recursive_mutex> lock(data->eventQueue->_mutex);
+
+ std::string eventId = data->eventId; // copy eventId
+ event_free(data->event);
+ data->eventQueue->_callbackData.erase(data->eventId);
+ data->callback(data->userData, eventId);
+ }
+
+} \ No newline at end of file