summaryrefslogtreecommitdiffstats
path: root/src/uscxml/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'src/uscxml/concurrency')
-rw-r--r--src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp29
-rw-r--r--src/uscxml/concurrency/eventqueue/DelayedEventQueue.h8
2 files changed, 37 insertions, 0 deletions
diff --git a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp b/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp
index 14028fa..cbb82eb 100644
--- a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp
+++ b/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp
@@ -1,6 +1,7 @@
#include "DelayedEventQueue.h"
#include <assert.h>
#include <event2/event.h>
+#include <sstream>
namespace uscxml {
@@ -36,6 +37,27 @@ void DelayedEventQueue::run(void* instance) {
}
}
+void DelayedEventQueue::addEvent(std::string eventId, int fd, short opMask, void (*callback)(void*, const std::string eventId), void* userData, bool persist) {
+ if(_callbackData.find(eventId) != _callbackData.end()) {
+ cancelEvent(eventId);
+ }
+
+ if (persist)
+ opMask |= EV_PERSIST;
+
+ struct event* event = event_new(_eventLoop, fd, opMask, DelayedEventQueue::fileCallback, &_callbackData[eventId]);
+
+ _callbackData[eventId].eventId = eventId;
+ _callbackData[eventId].userData = userData;
+ _callbackData[eventId].eventQueue = this;
+ _callbackData[eventId].callback = callback;
+ _callbackData[eventId].event = event;
+ _callbackData[eventId].persist = false;
+
+ event_add(event, NULL);
+
+}
+
void DelayedEventQueue::addEvent(std::string eventId, void (*callback)(void*, const std::string eventId), uint32_t delayMs, void* userData, bool persist) {
if(_callbackData.find(eventId) != _callbackData.end()) {
cancelEvent(eventId);
@@ -83,6 +105,13 @@ void DelayedEventQueue::stop() {
void DelayedEventQueue::dummyCallback(evutil_socket_t fd, short what, void *arg) {
}
+void DelayedEventQueue::fileCallback(evutil_socket_t fd, short what, void *arg) {
+ struct callbackData *data = (struct callbackData*)arg;
+ tthread::lock_guard<tthread::recursive_mutex> lock(data->eventQueue->_mutex);
+ std::string eventId = data->eventId; // copy eventId
+ data->callback(data->userData, eventId);
+}
+
void DelayedEventQueue::timerCallback(evutil_socket_t fd, short what, void *arg) {
struct callbackData *data = (struct callbackData*)arg;
tthread::lock_guard<tthread::recursive_mutex> lock(data->eventQueue->_mutex);
diff --git a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.h b/src/uscxml/concurrency/eventqueue/DelayedEventQueue.h
index 7ce766b..0b72719 100644
--- a/src/uscxml/concurrency/eventqueue/DelayedEventQueue.h
+++ b/src/uscxml/concurrency/eventqueue/DelayedEventQueue.h
@@ -18,6 +18,12 @@ namespace uscxml {
class DelayedEventQueue {
public:
+ enum OpMask {
+ FD_READ = EV_READ,
+ FD_WRITE = EV_WRITE,
+ FD_SIGNAL = EV_SIGNAL
+ };
+
struct callbackData {
void *userData;
void (*callback)(void*, const std::string eventId);
@@ -30,6 +36,7 @@ public:
DelayedEventQueue();
virtual ~DelayedEventQueue();
+ void addEvent(std::string eventId, int fd, short opMask, void (*callback)(void*, const std::string eventId), void* userData, bool persist = true);
void addEvent(std::string eventId, void (*callback)(void*, const std::string eventId), uint32_t delayMs, void* userData, bool persist = false);
void cancelEvent(std::string eventId);
@@ -42,6 +49,7 @@ public:
}
static void timerCallback(evutil_socket_t fd, short what, void *arg);
+ static void fileCallback(evutil_socket_t fd, short what, void *arg);
static void dummyCallback(evutil_socket_t fd, short what, void *arg);
bool _isStarted;