MarlinMT  0.1.0
marlinmt::concurrency::PEPScheduler Class Reference

PEPScheduler class Parallel Event Processing Scheduler. More...

#include <PEPScheduler.h>

Inherits marlinmt::IScheduler.

Public Types

using ConditionsMap = std::map< std::string, std::string >
 
using InputType = std::shared_ptr< EventStore >
 
using OutputType = WorkerOutput
 
using WorkerPool = ThreadPool< InputType, OutputType >
 
using ProcessorSequence = std::shared_ptr< SuperSequence >
 
using PushResultList = std::vector< WorkerPool::PushResult >
 
using EventList = std::vector< std::shared_ptr< EventStore > >
 
using Clock = std::chrono::steady_clock
 
using TimePoint = std::chrono::steady_clock::time_point
 
- Public Types inherited from marlinmt::Component
using LoggerPtr = Logging::Logger
 
- Public Types inherited from marlinmt::Configurable
using ParameterMap = std::map< std::string, std::shared_ptr< ParameterImpl > >
 
using iterator = ParameterMap::iterator
 
using const_iterator = ParameterMap::const_iterator
 

Public Member Functions

 PEPScheduler ()
 Constructor. More...
 
void initialize () override
 Initialize the scheduler. More...
 
void end () override
 Terminate the scheduler activites Cleanup memory, etc ... More...
 
void processRunHeader (std::shared_ptr< RunHeader > rhdr) override
 Process a run header. More...
 
void pushEvent (std::shared_ptr< EventStore > event) override
 Push a new event to the scheduler for processing. More...
 
void popFinishedEvents (std::vector< std::shared_ptr< EventStore >> &events) override
 Retrieve finished events from the scheduler. More...
 
std::size_t freeSlots () const override
 Get the number of free event slots. More...
 
- Public Member Functions inherited from marlinmt::IScheduler
 IScheduler ()
 Constructor. More...
 
virtual ~IScheduler ()=default
 
- Public Member Functions inherited from marlinmt::Component
 Component ()=delete
 No default constructor. More...
 
 Component (const Component &)=delete
 No copy or assignement. More...
 
Componentoperator= (const Component &)=delete
 
virtual ~Component ()=default
 Default destructor. More...
 
 Component (const std::string &type)
 Constructor with component type. More...
 
const std::string & type () const
 Get the component name. More...
 
const std::string & name () const
 Get the component name. More...
 
void setName (const std::string &n)
 Set the component name. More...
 
const std::string & description () const
 Get the component description. More...
 
void setDescription (const std::string &desc)
 Set the component description. More...
 
const Applicationapplication () const
 Get the application in which the component is registered. More...
 
Applicationapplication ()
 Get the application in which the component is registered. More...
 
template<class T >
Logging::StreamType log () const
 Log a message with specific log level. More...
 
Logging::StreamType debug () const
 Shortcut for log<DEBUG>() More...
 
Logging::StreamType message () const
 Shortcut for log<MESSAGE>() More...
 
Logging::StreamType warning () const
 Shortcut for log<WARNING>() More...
 
Logging::StreamType error () const
 Shortcut for log<ERROR>() More...
 
void setVerbosity (const std::string &level)
 Set the verbosity level. More...
 
const std::string & verbosity () const
 Get the verbosity level. More...
 
bool isInitialized () const
 Whether the component has been initialized. More...
 
void setup (Application *app)
 Setup the component. More...
 
void printParameters () const
 Print the component parameters. More...
 
template<class T >
void printParameters () const
 Print the component parameters at specific verbosity. More...
 
void setParameters (const ConfigSection &section, bool throwIfNotFound=false)
 Set the parameters from the configuration section. More...
 
void getParameters (ConfigSection &section, const std::set< std::string > &exclude={}) const
 Get the parameters from configurable object and populate the config section with. More...
 
- Public Member Functions inherited from marlinmt::Configurable
 Configurable ()=default
 
virtual ~Configurable ()=default
 
template<typename T >
std::shared_ptr< ParameterImpladdParameter (EParameterType paramType, const std::string &name, const std::string &desc, std::shared_ptr< T > value)
 Add a parameter. More...
 
template<typename T >
std::shared_ptr< ParameterImpladdParameter (EParameterType paramType, const std::string &name, const std::string &desc, std::shared_ptr< T > value, T defVal)
 Add a parameter. More...
 
template<typename T >
parameter (const std::string &name) const
 Get a parameter value. More...
 
template<typename T >
parameter (const std::string &name, const T &fallback) const
 Get a parameter value. More...
 
void checkParameter (const std::string &name) const
 Check if the parameter has been registered. More...
 
bool exists (const std::string &name) const
 Return true if the parameter has been registered. More...
 
bool isSet (const std::string &name) const
 Returns true if the parameter exists and is set, false otherwise. More...
 
void clear ()
 Remove all parameters. More...
 
void unset ()
 Unset all registered parameters. More...
 
iterator begin ()
 
const_iterator begin () const
 
iterator end ()
 
const_iterator end () const
 

Private Member Functions

void preConfigure ()
 
void configureProcessors ()
 
void configurePool ()
 

Private Attributes

WorkerPool _pool {}
 < The worker thread pool More...
 
ProcessorSequence _superSequence {nullptr}
 The list of worker output promises. More...
 
PushResultList _pushResults {}
 The start time. More...
 
clock::time_point _startTime {}
 The end time. More...
 
clock::time_point _endTime {}
 The total time spent on processing run headers. More...
 
clock::duration_rep _runHeaderTime {0}
 The total time spent on locking on thread pool queue access. More...
 
clock::duration_rep _lockingTime {0}
 The total time spent on popping events from the output event pool. More...
 
clock::duration_rep _popTime {0}
 
UIntParameter _queueSize {*this, "EventQueueSize", "The input event queue size (default 2*nthreads)"}
 The scheduler event queue size. More...
 

Additional Inherited Members

- Protected Attributes inherited from marlinmt::Component
std::string _type {}
 The component type. More...
 
std::string _name {}
 The component name. More...
 
std::string _description {"No description"}
 The component description. More...
 
Application_application {nullptr}
 The application in which the component has been registered. More...
 
LoggerPtr _logger {nullptr}
 The logger instance. More...
 
StringParameter _verbosity { *this, "Verbosity", "The component verbosity level", "MESSAGE" }
 The verbosity level of the logger (parameter) More...
 
- Protected Attributes inherited from marlinmt::Configurable
ParameterMap _parameters {}
 The parameter map. More...
 

Detailed Description

PEPScheduler class Parallel Event Processing Scheduler.

Implements the scheduling of parallel inter-event processing.

A set of N worker threads are allocated at startup within a thread pool. Every time a new event is pushed in the scheduler, the event is queued in the thread pool for further processing. Note that this operation can fail if the thread pool queue is full. Use freeSlots() to know how many slots are free in the thread pool queue and avoid unexpected exceptions.

Definition at line 45 of file PEPScheduler.h.

Member Typedef Documentation

◆ Clock

using marlinmt::concurrency::PEPScheduler::Clock = std::chrono::steady_clock

Definition at line 54 of file PEPScheduler.h.

◆ ConditionsMap

using marlinmt::concurrency::PEPScheduler::ConditionsMap = std::map<std::string, std::string>

Definition at line 47 of file PEPScheduler.h.

◆ EventList

using marlinmt::concurrency::PEPScheduler::EventList = std::vector<std::shared_ptr<EventStore> >

Definition at line 53 of file PEPScheduler.h.

◆ InputType

Definition at line 48 of file PEPScheduler.h.

◆ OutputType

◆ ProcessorSequence

Definition at line 51 of file PEPScheduler.h.

◆ PushResultList

◆ TimePoint

using marlinmt::concurrency::PEPScheduler::TimePoint = std::chrono::steady_clock::time_point

Definition at line 55 of file PEPScheduler.h.

◆ WorkerPool

Constructor & Destructor Documentation

◆ PEPScheduler()

marlinmt::concurrency::PEPScheduler::PEPScheduler ( )

Constructor.

Definition at line 76 of file PEPScheduler.cc.

References marlinmt::Component::setName().

Member Function Documentation

◆ configurePool()

◆ configureProcessors()

void marlinmt::concurrency::PEPScheduler::configureProcessors ( )
private

◆ end()

◆ freeSlots()

std::size_t marlinmt::concurrency::PEPScheduler::freeSlots ( ) const
overridevirtual

Get the number of free event slots.

Implements marlinmt::IScheduler.

Definition at line 243 of file PEPScheduler.cc.

References _pool, and marlinmt::concurrency::ThreadPool< IN, OUT >::freeSlots().

◆ initialize()

void marlinmt::concurrency::PEPScheduler::initialize ( )
overridevirtual

Initialize the scheduler.

Read the config section from the configuration

Reimplemented from marlinmt::IScheduler.

Definition at line 83 of file PEPScheduler.cc.

References _startTime, configurePool(), configureProcessors(), marlinmt::IScheduler::initialize(), marlinmt::clock::now(), and preConfigure().

◆ popFinishedEvents()

void marlinmt::concurrency::PEPScheduler::popFinishedEvents ( std::vector< std::shared_ptr< EventStore >> &  events)
overridevirtual

Retrieve finished events from the scheduler.

Parameters
eventsthe list of event to retrieve

Implements marlinmt::IScheduler.

Definition at line 220 of file PEPScheduler.cc.

References _popTime, _pushResults, marlinmt::Component::message(), and marlinmt::clock::now().

Referenced by end().

◆ preConfigure()

void marlinmt::concurrency::PEPScheduler::preConfigure ( )
private

◆ processRunHeader()

void marlinmt::concurrency::PEPScheduler::processRunHeader ( std::shared_ptr< RunHeader rhdr)
overridevirtual

◆ pushEvent()

void marlinmt::concurrency::PEPScheduler::pushEvent ( std::shared_ptr< EventStore event)
overridevirtual

Push a new event to the scheduler for processing.

Parameters
eventthe event to push

Implements marlinmt::IScheduler.

Definition at line 211 of file PEPScheduler.cc.

References _lockingTime, _pool, _pushResults, marlinmt::clock::now(), and marlinmt::concurrency::ThreadPool< IN, OUT >::push().

Member Data Documentation

◆ _endTime

clock::time_point marlinmt::concurrency::PEPScheduler::_endTime {}
private

The total time spent on processing run headers.

Definition at line 84 of file PEPScheduler.h.

Referenced by end().

◆ _lockingTime

clock::duration_rep marlinmt::concurrency::PEPScheduler::_lockingTime {0}
private

The total time spent on popping events from the output event pool.

Definition at line 88 of file PEPScheduler.h.

Referenced by end(), and pushEvent().

◆ _pool

WorkerPool marlinmt::concurrency::PEPScheduler::_pool {}
private

< The worker thread pool

The processor super sequence

Definition at line 76 of file PEPScheduler.h.

Referenced by configurePool(), end(), freeSlots(), processRunHeader(), and pushEvent().

◆ _popTime

clock::duration_rep marlinmt::concurrency::PEPScheduler::_popTime {0}
private

Definition at line 90 of file PEPScheduler.h.

Referenced by end(), and popFinishedEvents().

◆ _pushResults

PushResultList marlinmt::concurrency::PEPScheduler::_pushResults {}
private

The start time.

Definition at line 80 of file PEPScheduler.h.

Referenced by end(), popFinishedEvents(), and pushEvent().

◆ _queueSize

UIntParameter marlinmt::concurrency::PEPScheduler::_queueSize {*this, "EventQueueSize", "The input event queue size (default 2*nthreads)"}
private

The scheduler event queue size.

Definition at line 92 of file PEPScheduler.h.

Referenced by configurePool().

◆ _runHeaderTime

clock::duration_rep marlinmt::concurrency::PEPScheduler::_runHeaderTime {0}
private

The total time spent on locking on thread pool queue access.

Definition at line 86 of file PEPScheduler.h.

Referenced by end(), and processRunHeader().

◆ _startTime

clock::time_point marlinmt::concurrency::PEPScheduler::_startTime {}
private

The end time.

Definition at line 82 of file PEPScheduler.h.

Referenced by end(), and initialize().

◆ _superSequence

ProcessorSequence marlinmt::concurrency::PEPScheduler::_superSequence {nullptr}
private

The list of worker output promises.

Definition at line 78 of file PEPScheduler.h.

Referenced by configurePool(), configureProcessors(), end(), preConfigure(), and processRunHeader().


The documentation for this class was generated from the following files: