summaryrefslogtreecommitdiffstats
path: root/src/threadpool.h
blob: 3acba4e84e277d28c08975d388f7bc5342e7a39d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <condition_variable>
#include <deque>
#include <functional>
#include <future>
#include <mutex>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>

/// Class managing a pool of worker threads.
/// Work can be queued by passing a function to queue(). When the
/// work is done the result of the function will be passed back via a future.
class ThreadPool
{
  public:
    /// start N threads in the thread pool.
    ThreadPool(std::size_t N=1)
    {
      for (std::size_t i = 0; i < N; ++i)
      {
	// each thread is a std::async running thread_task():
	m_finished.push_back(
	    std::async(
	      std::launch::async,
	      [this]{ threadTask(); }
	      )
	    );
      }
    }
    /// deletes the thread pool by finishing all threads
    ~ThreadPool()
    {
      finish();
    }

    /// Queue the lambda 'task' for the threads to execute.
    /// A future of the return type of the lambda is returned to capture the result.
    template<class F, class R=std::result_of_t<F&()> >
    std::future<R> queue(F&& f)
    {
      // wrap the function object into a packaged task, splitting
      // execution from the return value:
      std::packaged_task<R()> p(std::forward<F>(f));

      auto r=p.get_future(); // get the return value before we hand off the task
      {
	std::unique_lock<std::mutex> l(m_mutex);
	m_work.emplace_back(std::move(p)); // store the task<R()> as a task<void()>
        m_cond.notify_one(); // wake a thread to work on the task
      }

      return r; // return the future result of the task
    }

    /// finish enques a "stop the thread" message for every thread,
    /// then waits for them to finish
    void finish()
    {
      {
	std::unique_lock<std::mutex> l(m_mutex);
	for(auto&& u : m_finished)
	{
          unused_variable(u);
	  m_work.push_back({});
	}
      }
      m_cond.notify_all();
      m_finished.clear();
    }
  private:

    // helper to silence the compiler warning about unused variables
    template <typename ...Args>
    void unused_variable(Args&& ...args) { (void)(sizeof...(args)); }

    // the work that a worker thread does:
    void threadTask()
    {
      while(true)
      {
	// pop a task off the queue:
	std::packaged_task<void()> f;
	{
	  // usual thread-safe queue code:
	  std::unique_lock<std::mutex> l(m_mutex);
	  if (m_work.empty())
	  {
	    m_cond.wait(l,[&]{return !m_work.empty();});
	  }
	  f = std::move(m_work.front());
	  m_work.pop_front();
	}
	// if the task is invalid, it means we are asked to abort
	if (!f.valid()) return;
	// otherwise, run the task
	f();
      }
    }

    // the mutex, condition variable and deque form a single
    // thread-safe triggered queue of tasks:
    std::mutex m_mutex;
    std::condition_variable m_cond;
    // note that a packaged_task<void> can store a packaged_task<R>:
    std::deque< std::packaged_task<void()> > m_work;

    // this holds futures representing the worker threads being done:
    std::vector< std::future<void> > m_finished;
};

#endif