summaryrefslogtreecommitdiffstats
path: root/src/uscxml/concurrency/eventqueue/DelayedEventQueue.cpp
blob: cbb82eb6a4fc4d1f869d244332dd3f69a6427bd4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#include "DelayedEventQueue.h"
#include <assert.h>
#include <event2/event.h>
#include <sstream>

namespace uscxml {

DelayedEventQueue::DelayedEventQueue() {
#ifndef _WIN32
	evthread_use_pthreads();
#else
	evthread_use_windows_threads();
#endif
	_eventLoop = event_base_new();
	_thread = NULL;
	_isStarted = false;
}

DelayedEventQueue::~DelayedEventQueue() {
//    std::cout << "Deleting DelayedEventQueue" << std::endl;
	stop();
	if (_thread && _isStarted)
		_thread->join();
	if(_eventLoop)
		event_base_free(_eventLoop);
}

void DelayedEventQueue::run(void* instance) {
	DelayedEventQueue* THIS = (DelayedEventQueue*)instance;
	int result;
	while(THIS->_isStarted) {
		{
			//result = event_base_dispatch(THIS->_eventLoop);
			result = event_base_loop(THIS->_eventLoop, EVLOOP_NO_EXIT_ON_EMPTY);
			(void)result;
		}
	}
}

void DelayedEventQueue::addEvent(std::string eventId, int fd, short opMask, void (*callback)(void*, const std::string eventId), void* userData, bool persist) {
	if(_callbackData.find(eventId) != _callbackData.end()) {
		cancelEvent(eventId);
	}

	if (persist)
		opMask |= EV_PERSIST;

	struct event* event = event_new(_eventLoop, fd, opMask, DelayedEventQueue::fileCallback, &_callbackData[eventId]);

	_callbackData[eventId].eventId = eventId;
	_callbackData[eventId].userData = userData;
	_callbackData[eventId].eventQueue = this;
	_callbackData[eventId].callback = callback;
	_callbackData[eventId].event = event;
	_callbackData[eventId].persist = false;
	
	event_add(event, NULL);

}

void DelayedEventQueue::addEvent(std::string eventId, void (*callback)(void*, const std::string eventId), uint32_t delayMs, void* userData, bool persist) {
	if(_callbackData.find(eventId) != _callbackData.end()) {
		cancelEvent(eventId);
	}

	struct timeval delay = {delayMs / 1000, (delayMs % 1000) * 1000};
	struct event* event = event_new(_eventLoop, -1, (persist ? EV_PERSIST : 0), DelayedEventQueue::timerCallback, &_callbackData[eventId]);

	_callbackData[eventId].eventId = eventId;
	_callbackData[eventId].userData = userData;
	_callbackData[eventId].eventQueue = this;
	_callbackData[eventId].callback = callback;
	_callbackData[eventId].event = event;
	_callbackData[eventId].persist = persist;

	event_add(event, &delay);
}

void DelayedEventQueue::cancelEvent(std::string eventId) {
	tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);

	if(_callbackData.find(eventId) != _callbackData.end()) {
		event_del(_callbackData[eventId].event);
		event_free(_callbackData[eventId].event);
		_callbackData.erase(eventId);
	}
}

void DelayedEventQueue::start() {
	_isStarted = true;
	_thread = new tthread::thread(DelayedEventQueue::run, this);
}

void DelayedEventQueue::stop() {
	if (_isStarted) {
		_isStarted = false;
		event_base_loopbreak(_eventLoop);
	}
	if (_thread) {
		_thread->join();
		delete _thread;
	}
}

void DelayedEventQueue::dummyCallback(evutil_socket_t fd, short what, void *arg) {
}

void DelayedEventQueue::fileCallback(evutil_socket_t fd, short what, void *arg) {
	struct callbackData *data = (struct callbackData*)arg;
	tthread::lock_guard<tthread::recursive_mutex> lock(data->eventQueue->_mutex);
	std::string eventId = data->eventId; // copy eventId
	data->callback(data->userData, eventId);
}

void DelayedEventQueue::timerCallback(evutil_socket_t fd, short what, void *arg) {
	struct callbackData *data = (struct callbackData*)arg;
	tthread::lock_guard<tthread::recursive_mutex> lock(data->eventQueue->_mutex);

	std::string eventId = data->eventId; // copy eventId
	data->callback(data->userData, eventId);
	if (!data->persist) {
		event_free(data->event);
		data->eventQueue->_callbackData.erase(data->eventId);
	}
}

}