summaryrefslogtreecommitdiffstats
path: root/src/threadpool.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/threadpool.h')
-rw-r--r--src/threadpool.h116
1 files changed, 116 insertions, 0 deletions
diff --git a/src/threadpool.h b/src/threadpool.h
new file mode 100644
index 0000000..3acba4e
--- /dev/null
+++ b/src/threadpool.h
@@ -0,0 +1,116 @@
+#ifndef THREADPOOL_H
+#define THREADPOOL_H
+
+#include <condition_variable>
+#include <deque>
+#include <functional>
+#include <future>
+#include <mutex>
+#include <thread>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+/// Class managing a pool of worker threads.
+/// Work can be queued by passing a function to queue(). When the
+/// work is done the result of the function will be passed back via a future.
+class ThreadPool
+{
+ public:
+ /// start N threads in the thread pool.
+ ThreadPool(std::size_t N=1)
+ {
+ for (std::size_t i = 0; i < N; ++i)
+ {
+ // each thread is a std::async running thread_task():
+ m_finished.push_back(
+ std::async(
+ std::launch::async,
+ [this]{ threadTask(); }
+ )
+ );
+ }
+ }
+ /// deletes the thread pool by finishing all threads
+ ~ThreadPool()
+ {
+ finish();
+ }
+
+ /// Queue the lambda 'task' for the threads to execute.
+ /// A future of the return type of the lambda is returned to capture the result.
+ template<class F, class R=std::result_of_t<F&()> >
+ std::future<R> queue(F&& f)
+ {
+ // wrap the function object into a packaged task, splitting
+ // execution from the return value:
+ std::packaged_task<R()> p(std::forward<F>(f));
+
+ auto r=p.get_future(); // get the return value before we hand off the task
+ {
+ std::unique_lock<std::mutex> l(m_mutex);
+ m_work.emplace_back(std::move(p)); // store the task<R()> as a task<void()>
+ m_cond.notify_one(); // wake a thread to work on the task
+ }
+
+ return r; // return the future result of the task
+ }
+
+ /// finish enques a "stop the thread" message for every thread,
+ /// then waits for them to finish
+ void finish()
+ {
+ {
+ std::unique_lock<std::mutex> l(m_mutex);
+ for(auto&& u : m_finished)
+ {
+ unused_variable(u);
+ m_work.push_back({});
+ }
+ }
+ m_cond.notify_all();
+ m_finished.clear();
+ }
+ private:
+
+ // helper to silence the compiler warning about unused variables
+ template <typename ...Args>
+ void unused_variable(Args&& ...args) { (void)(sizeof...(args)); }
+
+ // the work that a worker thread does:
+ void threadTask()
+ {
+ while(true)
+ {
+ // pop a task off the queue:
+ std::packaged_task<void()> f;
+ {
+ // usual thread-safe queue code:
+ std::unique_lock<std::mutex> l(m_mutex);
+ if (m_work.empty())
+ {
+ m_cond.wait(l,[&]{return !m_work.empty();});
+ }
+ f = std::move(m_work.front());
+ m_work.pop_front();
+ }
+ // if the task is invalid, it means we are asked to abort
+ if (!f.valid()) return;
+ // otherwise, run the task
+ f();
+ }
+ }
+
+ // the mutex, condition variable and deque form a single
+ // thread-safe triggered queue of tasks:
+ std::mutex m_mutex;
+ std::condition_variable m_cond;
+ // note that a packaged_task<void> can store a packaged_task<R>:
+ std::deque< std::packaged_task<void()> > m_work;
+
+ // this holds futures representing the worker threads being done:
+ std::vector< std::future<void> > m_finished;
+};
+
+#endif
+