blob: 3acba4e84e277d28c08975d388f7bc5342e7a39d (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <condition_variable>
#include <deque>
#include <functional>
#include <future>
#include <mutex>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>
/// Class managing a pool of worker threads.
/// Work can be queued by passing a function to queue(). When the
/// work is done the result of the function will be passed back via a future.
class ThreadPool
{
public:
/// start N threads in the thread pool.
ThreadPool(std::size_t N=1)
{
for (std::size_t i = 0; i < N; ++i)
{
// each thread is a std::async running thread_task():
m_finished.push_back(
std::async(
std::launch::async,
[this]{ threadTask(); }
)
);
}
}
/// deletes the thread pool by finishing all threads
~ThreadPool()
{
finish();
}
/// Queue the lambda 'task' for the threads to execute.
/// A future of the return type of the lambda is returned to capture the result.
template<class F, class R=std::result_of_t<F&()> >
std::future<R> queue(F&& f)
{
// wrap the function object into a packaged task, splitting
// execution from the return value:
std::packaged_task<R()> p(std::forward<F>(f));
auto r=p.get_future(); // get the return value before we hand off the task
{
std::unique_lock<std::mutex> l(m_mutex);
m_work.emplace_back(std::move(p)); // store the task<R()> as a task<void()>
m_cond.notify_one(); // wake a thread to work on the task
}
return r; // return the future result of the task
}
/// finish enques a "stop the thread" message for every thread,
/// then waits for them to finish
void finish()
{
{
std::unique_lock<std::mutex> l(m_mutex);
for(auto&& u : m_finished)
{
unused_variable(u);
m_work.push_back({});
}
}
m_cond.notify_all();
m_finished.clear();
}
private:
// helper to silence the compiler warning about unused variables
template <typename ...Args>
void unused_variable(Args&& ...args) { (void)(sizeof...(args)); }
// the work that a worker thread does:
void threadTask()
{
while(true)
{
// pop a task off the queue:
std::packaged_task<void()> f;
{
// usual thread-safe queue code:
std::unique_lock<std::mutex> l(m_mutex);
if (m_work.empty())
{
m_cond.wait(l,[&]{return !m_work.empty();});
}
f = std::move(m_work.front());
m_work.pop_front();
}
// if the task is invalid, it means we are asked to abort
if (!f.valid()) return;
// otherwise, run the task
f();
}
}
// the mutex, condition variable and deque form a single
// thread-safe triggered queue of tasks:
std::mutex m_mutex;
std::condition_variable m_cond;
// note that a packaged_task<void> can store a packaged_task<R>:
std::deque< std::packaged_task<void()> > m_work;
// this holds futures representing the worker threads being done:
std::vector< std::future<void> > m_finished;
};
#endif
|