MarlinMT  0.1.0
Worker.h
Go to the documentation of this file.
1 #ifndef MARLINMT_CONCURRENCY_WORKER_h
2 #define MARLINMT_CONCURRENCY_WORKER_h 1
3 
4 // -- std headers
5 #include <functional>
6 #include <thread>
7 #include <vector>
8 #include <atomic>
9 #include <mutex>
10 #include <utility>
11 #include <memory>
12 #include <future>
13 #include <queue>
14 #include <condition_variable>
15 
16 // -- marlinmt headers
17 #include "marlinmt/Exceptions.h"
19 
20 namespace marlinmt {
21 
22  namespace concurrency {
23 
24  template <typename IN, typename OUT>
25  class ThreadPool ;
26  template <typename IN, typename OUT>
27  class Worker ;
28 
36  template <typename IN, typename OUT>
37  class WorkerBase {
38  friend class Worker<IN,OUT> ;
39  public:
40  using Input = IN ;
41  using Output = OUT ;
42 
43  public:
44  virtual ~WorkerBase() = default ;
45 
51  virtual OUT process( IN && data ) = 0 ;
52 
53  protected:
59  void processElement( QueueElement<IN,OUT> &element ) ;
60  };
61 
62  //--------------------------------------------------------------------------
63  //--------------------------------------------------------------------------
64 
65  template <typename OUT>
66  class WorkerBase<void,OUT> {
67  friend class Worker<void,OUT> ;
68  public:
69  virtual ~WorkerBase() = default ;
70  virtual OUT process() = 0 ;
71  protected:
72  void processElement( QueueElement<void,OUT> &element ) ;
73  };
74 
75  //--------------------------------------------------------------------------
76 
77  template <typename IN>
78  class WorkerBase<IN,void> {
79  friend class Worker<IN,void> ;
80  public:
81  virtual ~WorkerBase() = default ;
82  virtual void process( IN && data ) = 0 ;
83  protected:
84  void processElement( QueueElement<IN,void> &element ) ;
85  };
86 
87  //--------------------------------------------------------------------------
88 
89  template <>
90  class WorkerBase<void,void> {
91  friend class Worker<void,void> ;
92  public:
93  virtual ~WorkerBase() = default ;
94  virtual void process() = 0 ;
95  protected:
96  void processElement( QueueElement<void,void> &element ) ;
97  };
98 
99  //--------------------------------------------------------------------------
100 
101  template <typename IN, typename OUT>
103  element.setValue( std::move( this->process( std::move(element.takeInput()) ) ) ) ;
104  }
105 
106  template <typename IN>
108  this->process( std::move(element.takeInput()) ) ;
109  element.setValue() ;
110  }
111 
112  template <typename OUT>
114  element.setValue( std::move( this->process() ) ) ;
115  }
116 
117  // template <>
119  this->process() ;
120  element.setValue() ;
121  }
122 
123  //--------------------------------------------------------------------------
124  //--------------------------------------------------------------------------
125 
129  template <typename IN, typename OUT>
130  class Worker {
131  public:
132  using Input = IN ;
133  using Output = OUT ;
136 
137  public:
138  Worker() = delete ;
139  Worker(const Worker&) = delete ;
140  Worker(Worker &&) = delete ;
141  Worker &operator=(const Worker&) = delete ;
142  Worker &operator=(Worker&&) = delete ;
143 
151  Worker( Pool &pool, std::unique_ptr<IMPL> impl ) ;
152 
156  void start() ;
157 
161  void run() ;
162 
167  void stop() ;
168 
172  bool running() const ;
173 
177  bool waiting() const ;
178 
182  void join() ;
183 
184  private:
188  std::thread _thread {} ;
190  std::atomic<bool> _stopFlag {false} ;
192  std::atomic<bool> _waitingFlag {false} ;
194  std::unique_ptr<Impl> _impl {nullptr} ;
195  };
196 
197  }
198 
199 }
200 
201 // template implementation
203 
204 namespace marlinmt {
205 
206  namespace concurrency {
207 
208  template <typename IN, typename OUT>
209  template <typename IMPL, class>
210  inline Worker<IN,OUT>::Worker( Pool & pool, std::unique_ptr<IMPL> impl ) :
211  _threadPool(pool),
212  _impl(std::move(impl)) {
213  /* nop */
214  }
215 
216  //--------------------------------------------------------------------------
217 
218  template <typename IN, typename OUT>
219  inline void Worker<IN,OUT>::start() {
220  _thread = std::thread( &Worker::run, this ) ;
221  }
222 
223  //--------------------------------------------------------------------------
224 
225  template <typename IN, typename OUT>
226  inline void Worker<IN,OUT>::run() {
227  QueueElement<IN,OUT> element ;
228  bool isPop = _threadPool._queue.pop( element ) ;
229  while (true) {
230  // if there is anything in the queue
231  while (isPop) {
232  _impl->processElement( element ) ;
233  // the thread is wanted to stop, return even if the queue is not empty yet
234  if (_stopFlag.load())
235  return;
236  else
237  isPop = _threadPool._queue.pop( element ) ;
238  }
239  // the queue is empty here, wait for the next command
240  std::unique_lock<std::mutex> lock(_threadPool._mutex);
241  _waitingFlag = true ;
242  _threadPool._conditionVariable.wait(lock, [this, &element, &isPop](){
243  isPop = _threadPool._queue.pop( element ) ;
244  return isPop || _threadPool._isDone || _stopFlag ;
245  }) ;
246  _waitingFlag = false ;
247  // if the queue is empty and this->isDone == true or *flag then return
248  if ( not isPop ) {
249  return ;
250  }
251  }
252  }
253 
254  //--------------------------------------------------------------------------
255 
256  template <typename IN, typename OUT>
257  inline void Worker<IN,OUT>::stop() {
258  _stopFlag = true ;
259  }
260 
261  //--------------------------------------------------------------------------
262 
263  template <typename IN, typename OUT>
264  inline bool Worker<IN,OUT>::running() const {
265  return ( _thread.get_id() != std::thread::id() ) ;
266  }
267 
268  //--------------------------------------------------------------------------
269 
270  template <typename IN, typename OUT>
271  inline bool Worker<IN,OUT>::waiting() const {
272  return _waitingFlag.load() ;
273  }
274 
275  //--------------------------------------------------------------------------
276 
277  template <typename IN, typename OUT>
278  inline void Worker<IN,OUT>::join() {
279  if( _thread.joinable() ) {
280  _thread.join() ;
281  }
282  }
283 
284  } // end namespace concurrency
285 
286 } // end namespace marlinmt
287 
288 #endif
std::unique_ptr< Impl > _impl
Definition: Worker.h:194
virtual OUT process(IN &&data)=0
Process a single queued data taken form the thread pool.
std::atomic< bool > _stopFlag
Whether the worker thread is waiting for data.
Definition: Worker.h:190
void setValue(OUT &&output)
Set the value to be retrieved in the future variable.
Definition: QueueElement.h:65
bool running() const
Whether the worker thread is currently running.
Definition: Worker.h:264
std::atomic< bool > _isDone
The thread pool stop flag.
Definition: ThreadPool.h:163
Definition: EntryData.h:93
constexpr unsigned long long value(const Flag_t &flag)
Definition: Flags.h:106
void processElement(QueueElement< IN, OUT > &element)
Process queued element from the thread pool.
Definition: Worker.h:102
WorkerBase class Base class to implement processing of task data (so called queued-element) pushed in...
Definition: Worker.h:37
void run()
The method executing in the worker thread.
Definition: Worker.h:226
std::thread _thread
The stop flag.
Definition: Worker.h:188
Pool & _threadPool
< The parent thread pool
Definition: Worker.h:186
std::condition_variable _conditionVariable
The actual thread pool.
Definition: ThreadPool.h:157
ThreadPool class The template parameter T is the type of data to enqueue and process in worker thread...
Definition: ThreadPool.h:33
std::mutex _mutex
< The synchronization mutex
Definition: ThreadPool.h:155
bool waiting() const
Whether the worker is waiting.
Definition: Worker.h:271
void start()
Start the worker thread.
Definition: Worker.h:219
QueueType _queue
Runtime flag...
Definition: ThreadPool.h:161
void stop()
Switch ON the stop flag, requesting the worker thread to stop.
Definition: Worker.h:257
bool pop(T &value)
Pop and get the front element in the queue.
Definition: Queue.h:60
WorkerOutput struct Stores the output of a processor sequence call.
Definition: PEPScheduler.h:24
IN takeInput()
Take the input data.
Definition: QueueElement.h:73
std::atomic< bool > _waitingFlag
The worker implementation.
Definition: Worker.h:192
QueueElement class A template queue element used in the thread pool.
Definition: QueueElement.h:20
void join()
Join the worker thread.
Definition: Worker.h:278