MarlinMT  0.1.0
ThreadPool.h
Go to the documentation of this file.
1 #ifndef MARLINMT_CONCURRENCY_THREADPOOL_h
2 #define MARLINMT_CONCURRENCY_THREADPOOL_h 1
3 
4 // -- std headers
5 #include <functional>
6 #include <thread>
7 #include <vector>
8 #include <atomic>
9 #include <mutex>
10 #include <utility>
11 #include <memory>
12 #include <future>
13 #include <condition_variable>
14 
15 // -- marlinmt headers
16 #include "marlinmt/Exceptions.h"
19 
20 namespace marlinmt {
21 
22  namespace concurrency {
23 
24  template <typename IN, typename OUT>
25  class Worker ;
26 
32  template <typename IN, typename OUT>
33  class ThreadPool {
34  public:
36  using PoolType = std::vector<std::shared_ptr<Worker<IN,OUT>>> ;
37  using Promise = std::shared_ptr<std::promise<OUT>> ;
38  using Future = std::future<OUT> ;
39  using PushResult = std::pair<Promise,Future> ;
40  friend class Worker<IN,OUT> ;
41 
42  public:
46  enum class PushPolicy {
47  Blocking,
49  };
50 
51  public:
52  ThreadPool() = default ;
53  ThreadPool(const ThreadPool &) = delete ;
54  ThreadPool(ThreadPool &&) = delete ;
55  ThreadPool& operator=(const ThreadPool &) = delete ;
56  ThreadPool& operator=(ThreadPool &&) = delete ;
57 
61  ~ThreadPool() ;
62 
68  template <typename WORKER, typename ...Args>
69  void addWorker(Args &&...args) ;
70 
74  void start() ;
75 
79  std::size_t size() const ;
80 
84  std::size_t nWaiting() const ;
85 
89  std::size_t nRunning() const ;
90 
94  std::size_t freeSlots() const ;
95 
99  bool isQueueEmpty() const ;
100 
104  void clearQueue() ;
105 
111  void setMaxQueueSize( std::size_t maxQueueSize ) ;
112 
118  void setAcceptPush( bool accept ) ;
119 
123  bool acceptPush() const ;
124 
129  bool active() const ;
130 
141  void stop( bool clear = true ) ;
142 
151  PushResult push( PushPolicy policy, IN && input ) ;
152 
153  private:
155  std::mutex _mutex {} ;
157  std::condition_variable _conditionVariable {} ;
163  std::atomic<bool> _isDone {false} ;
165  std::atomic<bool> _isStop {false} ;
167  std::atomic<bool> _isRunning {false} ;
169  std::atomic<bool> _acceptPush {true} ;
170  };
171 
172  }
173 
174 }
175 
176 // template implementation
178 
179 namespace marlinmt {
180 
181  namespace concurrency {
182 
183  template <typename IN, typename OUT>
185  stop(true) ;
186  }
187 
188  //--------------------------------------------------------------------------
189 
190  template <typename IN, typename OUT>
191  template <typename WORKER, typename ...Args>
192  inline void ThreadPool<IN,OUT>::addWorker(Args &&...args) {
193  if( _isRunning ) {
194  throw Exception( "ThreadPool::addWorker: thread pool is running, can't add a worker!" ) ;
195  }
196  std::unique_ptr<WORKER> impl( new WORKER(args...) ) ;
197  auto worker = std::make_shared<Worker<IN,OUT>>( *this, std::move( impl ) ) ;
198  _pool.push_back( worker ) ;
199  }
200 
201  //--------------------------------------------------------------------------
202 
203  template <typename IN, typename OUT>
205  if( _isRunning ) {
206  throw Exception( "ThreadPool::start: already running!" ) ;
207  }
208  // Start the worker threads
209  for (size_t i=0 ; i<_pool.size() ; i++) {
210  _pool[i]->start() ;
211  }
212  _isRunning = true ;
213  }
214 
215  //--------------------------------------------------------------------------
216 
217  template <typename IN, typename OUT>
218  inline std::size_t ThreadPool<IN,OUT>::size() const {
219  return _pool.size() ;
220  }
221 
222  //--------------------------------------------------------------------------
223 
224  template <typename IN, typename OUT>
225  inline std::size_t ThreadPool<IN,OUT>::nWaiting() const {
226  std::size_t count = 0 ;
227  for( auto &worker : _pool ) {
228  if( worker->waiting() ) {
229  ++count ;
230  }
231  }
232  return count ;
233  }
234 
235  //--------------------------------------------------------------------------
236 
237  template <typename IN, typename OUT>
238  inline std::size_t ThreadPool<IN,OUT>::nRunning() const {
239  return ( _pool.size() - nWaiting() ) ;
240  }
241 
242  //--------------------------------------------------------------------------
243 
244  template <typename IN, typename OUT>
245  inline std::size_t ThreadPool<IN,OUT>::freeSlots() const {
246  return _queue.freeSlots() ;
247  }
248 
249  //--------------------------------------------------------------------------
250 
251  template <typename IN, typename OUT>
252  inline bool ThreadPool<IN,OUT>::isQueueEmpty() const {
253  return _queue.empty() ;
254  }
255 
256  //--------------------------------------------------------------------------
257 
258  template <typename IN, typename OUT>
259  inline void ThreadPool<IN,OUT>::setMaxQueueSize( std::size_t maxQueueSize ) {
260  _queue.setMaxSize( maxQueueSize ) ;
261  }
262 
263  //--------------------------------------------------------------------------
264 
265  template <typename IN, typename OUT>
267  _queue.clear() ;
268  }
269 
270  //--------------------------------------------------------------------------
271 
272  template <typename IN, typename OUT>
273  inline void ThreadPool<IN,OUT>::setAcceptPush( bool accept ) {
274  _acceptPush = accept ;
275  }
276 
277  //--------------------------------------------------------------------------
278 
279  template <typename IN, typename OUT>
280  inline bool ThreadPool<IN,OUT>::acceptPush() const {
281  return _acceptPush.load() ;
282  }
283 
284  //--------------------------------------------------------------------------
285 
286  template <typename IN, typename OUT>
287  inline bool ThreadPool<IN,OUT>::active() const {
288  if( not _queue.empty() ) {
289  return true ;
290  }
291  for( auto &worker : _pool ) {
292  if( not worker->waiting() ) {
293  return true ;
294  }
295  }
296  return false ;
297  }
298 
299  //--------------------------------------------------------------------------
300 
301  template <typename IN, typename OUT>
302  inline void ThreadPool<IN,OUT>::stop( bool clear ) {
303  if ( clear ) {
304  if (_isStop) {
305  return ;
306  }
307  _isStop = true ;
308  for( auto &worker : _pool ) {
309  worker->stop() ;
310  }
311  _queue.clear() ;
312  }
313  else {
314  if (_isDone or _isStop) {
315  return ;
316  }
317  _isDone = true ; // give the waiting threads a command to finish
318  }
319  {
320  std::unique_lock<std::mutex> lock(_mutex);
321  _conditionVariable.notify_all(); // stop all waiting threads
322  }
323  for (auto &worker : _pool) { // wait for the computing threads to finish
324  worker->join() ;
325  }
326  _queue.clear() ;
327  _pool.clear() ;
328  _isRunning = false ;
329  }
330 
331  //--------------------------------------------------------------------------
332 
333  template <typename IN, typename OUT>
334  template <class>
335  inline typename ThreadPool<IN,OUT>::PushResult ThreadPool<IN,OUT>::push(PushPolicy policy, IN && queueData) {
336  if( not _isRunning.load() ) {
337  throw Exception( "ThreadPool::push: pool not running yet!" ) ;
338  }
339  if( not _acceptPush.load() ) {
340  throw Exception( "ThreadPool::push: not allowed to push in pool!" ) ;
341  }
342  QueueElement<IN,OUT> element( std::move(queueData) ) ;
343  PushResult result ;
344  result.first = element.promise() ;
345  result.second = result.first->get_future() ;
346  if(policy == PushPolicy::Blocking) {
347  // this is dirty yet
348  // TODO find a proper implementation ...
349  while( _queue.isFull() ) {
350  std::this_thread::sleep_for( std::chrono::microseconds(10) ) ;
351  }
352  // this is not really safe
353  _queue.push(element) ;
354  }
355  else {
356  if( _queue.isFull() ) {
357  throw Exception( "ThreadPool::push: queue is full!" ) ;
358  }
359  _queue.push(element) ;
360  }
361  std::unique_lock<std::mutex> lock(_mutex) ;
362  _conditionVariable.notify_one() ;
363  return std::move(result) ;
364  }
365 
366  } // end namespace concurrency
367 
368 } // end namespace marlinmt
369 
370 #endif
void setAcceptPush(bool accept)
Set whether the thread pool accept data push.
Definition: ThreadPool.h:273
std::atomic< bool > _acceptPush
Definition: ThreadPool.h:169
void stop(bool clear=true)
Stop the thread pool.
Definition: ThreadPool.h:302
ThreadPool & operator=(const ThreadPool &)=delete
std::size_t setMaxSize(std::size_t maxsize)
Set the maximum queue size.
Definition: Queue.h:93
std::size_t freeSlots() const
Get the number of free slots in the task queue.
Definition: ThreadPool.h:245
void setMaxQueueSize(std::size_t maxQueueSize)
Set the maximum queue size.
Definition: ThreadPool.h:259
std::atomic< bool > _isDone
The thread pool stop flag.
Definition: ThreadPool.h:163
void start()
Start the worker threads.
Definition: ThreadPool.h:204
void clear()
Clear the queue.
Definition: Queue.h:110
Block until a slot is free in the queue.
constexpr unsigned long long value(const Flag_t &flag)
Definition: Flags.h:106
bool isFull() const
Check whether the queue has reached the maximum allowed size.
Definition: Queue.h:102
bool isQueueEmpty() const
Whether the queue is empty.
Definition: ThreadPool.h:252
Throw an exception if the queue is full.
std::size_t freeSlots() const
Get the number of free slots in the queue.
Definition: Queue.h:120
PushResult push(PushPolicy policy, IN &&input)
Push a new task in the task queue.
Definition: ThreadPool.h:335
std::atomic< bool > _isRunning
Whether the thread pool accepts push action.
Definition: ThreadPool.h:167
std::size_t nWaiting() const
Get the number of waiting threads.
Definition: ThreadPool.h:225
std::size_t size() const
Get the thread pool size.
Definition: ThreadPool.h:218
std::atomic< bool > _isStop
Whether the thread pool is running.
Definition: ThreadPool.h:165
std::condition_variable _conditionVariable
The actual thread pool.
Definition: ThreadPool.h:157
std::shared_ptr< std::promise< OutputType > > Promise
Definition: ThreadPool.h:37
ThreadPool class The template parameter T is the type of data to enqueue and process in worker thread...
Definition: ThreadPool.h:33
std::vector< std::shared_ptr< Worker< InputType, OutputType > >> PoolType
Definition: ThreadPool.h:36
std::mutex _mutex
< The synchronization mutex
Definition: ThreadPool.h:155
bool push(T &value)
Push a value to the queue.
Definition: Queue.h:45
bool active() const
Whether the thread pool is active, meaning that the queue is not empty or at least one worker is acti...
Definition: ThreadPool.h:287
QueueType _queue
Runtime flag...
Definition: ThreadPool.h:161
void addWorker(Args &&...args)
Add a new worker thread.
Definition: ThreadPool.h:192
std::shared_ptr< std::promise< OUT > > promise() const
Get the output promise.
Definition: QueueElement.h:56
std::size_t nRunning() const
Get the number of threads currently handling a task.
Definition: ThreadPool.h:238
PoolType _pool
The input element queue.
Definition: ThreadPool.h:159
bool empty() const
Whether the queue is empty.
Definition: Queue.h:73
bool acceptPush() const
Whether the thread pool accepts data push.
Definition: ThreadPool.h:280
void clearQueue()
Clear the queue.
Definition: ThreadPool.h:266
Exception class.
Definition: Exceptions.h:60
QueueElement class A template queue element used in the thread pool.
Definition: QueueElement.h:20