1 #ifndef MARLINMT_CONCURRENCY_THREADPOOL_h 2 #define MARLINMT_CONCURRENCY_THREADPOOL_h 1 13 #include <condition_variable> 22 namespace concurrency {
24 template <
typename IN,
typename OUT>
32 template <
typename IN,
typename OUT>
36 using PoolType = std::vector<std::shared_ptr<Worker<IN,OUT>>> ;
37 using Promise = std::shared_ptr<std::promise<OUT>> ;
68 template <
typename WORKER,
typename ...Args>
79 std::size_t
size()
const ;
141 void stop(
bool clear =
true ) ;
181 namespace concurrency {
183 template <
typename IN,
typename OUT>
190 template <
typename IN,
typename OUT>
191 template <
typename WORKER,
typename ...Args>
194 throw Exception(
"ThreadPool::addWorker: thread pool is running, can't add a worker!" ) ;
196 std::unique_ptr<WORKER> impl(
new WORKER(args...) ) ;
197 auto worker = std::make_shared<Worker<IN,OUT>>( *
this, std::move( impl ) ) ;
198 _pool.push_back( worker ) ;
203 template <
typename IN,
typename OUT>
206 throw Exception(
"ThreadPool::start: already running!" ) ;
209 for (
size_t i=0 ; i<
_pool.size() ; i++) {
217 template <
typename IN,
typename OUT>
219 return _pool.size() ;
224 template <
typename IN,
typename OUT>
226 std::size_t count = 0 ;
227 for(
auto &worker :
_pool ) {
228 if( worker->waiting() ) {
237 template <
typename IN,
typename OUT>
244 template <
typename IN,
typename OUT>
251 template <
typename IN,
typename OUT>
258 template <
typename IN,
typename OUT>
265 template <
typename IN,
typename OUT>
272 template <
typename IN,
typename OUT>
279 template <
typename IN,
typename OUT>
286 template <
typename IN,
typename OUT>
291 for(
auto &worker :
_pool ) {
292 if( not worker->waiting() ) {
301 template <
typename IN,
typename OUT>
308 for(
auto &worker :
_pool ) {
320 std::unique_lock<std::mutex> lock(
_mutex);
323 for (
auto &worker :
_pool) {
333 template <
typename IN,
typename OUT>
337 throw Exception(
"ThreadPool::push: pool not running yet!" ) ;
340 throw Exception(
"ThreadPool::push: not allowed to push in pool!" ) ;
344 result.first = element.
promise() ;
345 result.second = result.first->get_future() ;
350 std::this_thread::sleep_for( std::chrono::microseconds(10) ) ;
357 throw Exception(
"ThreadPool::push: queue is full!" ) ;
361 std::unique_lock<std::mutex> lock(
_mutex) ;
363 return std::move(result) ;
void setAcceptPush(bool accept)
Set whether the thread pool accept data push.
std::atomic< bool > _acceptPush
void stop(bool clear=true)
Stop the thread pool.
PushPolicy
PushPolicy enumerator.
ThreadPool & operator=(const ThreadPool &)=delete
std::size_t setMaxSize(std::size_t maxsize)
Set the maximum queue size.
std::size_t freeSlots() const
Get the number of free slots in the task queue.
void setMaxQueueSize(std::size_t maxQueueSize)
Set the maximum queue size.
std::atomic< bool > _isDone
The thread pool stop flag.
void start()
Start the worker threads.
void clear()
Clear the queue.
Block until a slot is free in the queue.
constexpr unsigned long long value(const Flag_t &flag)
bool isFull() const
Check whether the queue has reached the maximum allowed size.
std::pair< Promise, Future > PushResult
bool isQueueEmpty() const
Whether the queue is empty.
Throw an exception if the queue is full.
std::size_t freeSlots() const
Get the number of free slots in the queue.
PushResult push(PushPolicy policy, IN &&input)
Push a new task in the task queue.
std::atomic< bool > _isRunning
Whether the thread pool accepts push action.
std::size_t nWaiting() const
Get the number of waiting threads.
std::future< OutputType > Future
std::size_t size() const
Get the thread pool size.
std::atomic< bool > _isStop
Whether the thread pool is running.
std::condition_variable _conditionVariable
The actual thread pool.
std::shared_ptr< std::promise< OutputType > > Promise
ThreadPool class The template parameter T is the type of data to enqueue and process in worker thread...
std::vector< std::shared_ptr< Worker< InputType, OutputType > >> PoolType
std::mutex _mutex
< The synchronization mutex
bool push(T &value)
Push a value to the queue.
bool active() const
Whether the thread pool is active, meaning that the queue is not empty or at least one worker is acti...
QueueType _queue
Runtime flag...
void addWorker(Args &&...args)
Add a new worker thread.
std::shared_ptr< std::promise< OUT > > promise() const
Get the output promise.
std::size_t nRunning() const
Get the number of threads currently handling a task.
PoolType _pool
The input element queue.
bool empty() const
Whether the queue is empty.
bool acceptPush() const
Whether the thread pool accepts data push.
void clearQueue()
Clear the queue.
QueueElement class A template queue element used in the thread pool.