summaryrefslogtreecommitdiffstats
path: root/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp
blob: e0b5a8d57d9b914a5f456927af14f8c1eb53b73f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#include "DelayedEventQueue.h"
#include <assert.h>
#include <event2/event.h>

namespace uscxml {

DelayedEventQueue::DelayedEventQueue() {
#ifndef _WIN32
	evthread_use_pthreads();
#else
	evthread_use_windows_threads();
#endif
	_eventLoop = event_base_new();
	_thread = NULL;
}

DelayedEventQueue::~DelayedEventQueue() {
//    std::cout << "Deleting DelayedEventQueue" << std::endl;
	stop();
	if (_thread && _isStarted)
		_thread->join();
	if(_eventLoop)
		event_base_free(_eventLoop);
}

void DelayedEventQueue::run(void* instance) {
	DelayedEventQueue* THIS = (DelayedEventQueue*)instance;
	int result;
	while(THIS->_isStarted) {
		{
			//result = event_base_dispatch(THIS->_eventLoop);
			result = event_base_loop(THIS->_eventLoop, EVLOOP_NO_EXIT_ON_EMPTY);
			(void)result;
		}
	}
}

void DelayedEventQueue::addEvent(std::string eventId, void (*callback)(void*, const std::string eventId), uint32_t delayMs, void* userData, bool persist) {
	if(_callbackData.find(eventId) != _callbackData.end()) {
		cancelEvent(eventId);
	}

	struct timeval delay = {delayMs / 1000, (delayMs % 1000) * 1000};
	struct event* event = event_new(_eventLoop, -1, (persist ? EV_PERSIST : 0), DelayedEventQueue::timerCallback, &_callbackData[eventId]);

	_callbackData[eventId].eventId = eventId;
	_callbackData[eventId].userData = userData;
	_callbackData[eventId].eventQueue = this;
	_callbackData[eventId].callback = callback;
	_callbackData[eventId].event = event;
	_callbackData[eventId].persist = persist;

	event_add(event, &delay);
}

void DelayedEventQueue::cancelEvent(std::string eventId) {
	tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);

	if(_callbackData.find(eventId) != _callbackData.end()) {
		event_del(_callbackData[eventId].event);
		event_free(_callbackData[eventId].event);
		_callbackData.erase(eventId);
	}
}

void DelayedEventQueue::start() {
	_isStarted = true;
	_thread = new tthread::thread(DelayedEventQueue::run, this);
	_isStarted = false;
}

void DelayedEventQueue::stop() {
	if (_isStarted) {
		_isStarted = false;
		event_base_loopbreak(_eventLoop);
		_thread->join();
		delete _thread;
	}
}

void DelayedEventQueue::dummyCallback(evutil_socket_t fd, short what, void *arg) {
}

void DelayedEventQueue::timerCallback(evutil_socket_t fd, short what, void *arg) {
	struct callbackData *data = (struct callbackData*)arg;
	tthread::lock_guard<tthread::recursive_mutex> lock(data->eventQueue->_mutex);

	std::string eventId = data->eventId; // copy eventId
	data->callback(data->userData, eventId);
	if (!data->persist) {
		event_free(data->event);
		data->eventQueue->_callbackData.erase(data->eventId);
	}
}

}