MarlinMT
0.1.0
|
PEPScheduler class Parallel Event Processing Scheduler. More...
#include <PEPScheduler.h>
Inherits marlinmt::IScheduler.
Public Types | |
using | ConditionsMap = std::map< std::string, std::string > |
using | InputType = std::shared_ptr< EventStore > |
using | OutputType = WorkerOutput |
using | WorkerPool = ThreadPool< InputType, OutputType > |
using | ProcessorSequence = std::shared_ptr< SuperSequence > |
using | PushResultList = std::vector< WorkerPool::PushResult > |
using | EventList = std::vector< std::shared_ptr< EventStore > > |
using | Clock = std::chrono::steady_clock |
using | TimePoint = std::chrono::steady_clock::time_point |
Public Types inherited from marlinmt::Component | |
using | LoggerPtr = Logging::Logger |
Public Types inherited from marlinmt::Configurable | |
using | ParameterMap = std::map< std::string, std::shared_ptr< ParameterImpl > > |
using | iterator = ParameterMap::iterator |
using | const_iterator = ParameterMap::const_iterator |
Public Member Functions | |
PEPScheduler () | |
Constructor. More... | |
void | initialize () override |
Initialize the scheduler. More... | |
void | end () override |
Terminate the scheduler activites Cleanup memory, etc ... More... | |
void | processRunHeader (std::shared_ptr< RunHeader > rhdr) override |
Process a run header. More... | |
void | pushEvent (std::shared_ptr< EventStore > event) override |
Push a new event to the scheduler for processing. More... | |
void | popFinishedEvents (std::vector< std::shared_ptr< EventStore >> &events) override |
Retrieve finished events from the scheduler. More... | |
std::size_t | freeSlots () const override |
Get the number of free event slots. More... | |
Public Member Functions inherited from marlinmt::IScheduler | |
IScheduler () | |
Constructor. More... | |
virtual | ~IScheduler ()=default |
Public Member Functions inherited from marlinmt::Component | |
Component ()=delete | |
No default constructor. More... | |
Component (const Component &)=delete | |
No copy or assignement. More... | |
Component & | operator= (const Component &)=delete |
virtual | ~Component ()=default |
Default destructor. More... | |
Component (const std::string &type) | |
Constructor with component type. More... | |
const std::string & | type () const |
Get the component name. More... | |
const std::string & | name () const |
Get the component name. More... | |
void | setName (const std::string &n) |
Set the component name. More... | |
const std::string & | description () const |
Get the component description. More... | |
void | setDescription (const std::string &desc) |
Set the component description. More... | |
const Application & | application () const |
Get the application in which the component is registered. More... | |
Application & | application () |
Get the application in which the component is registered. More... | |
template<class T > | |
Logging::StreamType | log () const |
Log a message with specific log level. More... | |
Logging::StreamType | debug () const |
Shortcut for log<DEBUG>() More... | |
Logging::StreamType | message () const |
Shortcut for log<MESSAGE>() More... | |
Logging::StreamType | warning () const |
Shortcut for log<WARNING>() More... | |
Logging::StreamType | error () const |
Shortcut for log<ERROR>() More... | |
void | setVerbosity (const std::string &level) |
Set the verbosity level. More... | |
const std::string & | verbosity () const |
Get the verbosity level. More... | |
bool | isInitialized () const |
Whether the component has been initialized. More... | |
void | setup (Application *app) |
Setup the component. More... | |
void | printParameters () const |
Print the component parameters. More... | |
template<class T > | |
void | printParameters () const |
Print the component parameters at specific verbosity. More... | |
void | setParameters (const ConfigSection §ion, bool throwIfNotFound=false) |
Set the parameters from the configuration section. More... | |
void | getParameters (ConfigSection §ion, const std::set< std::string > &exclude={}) const |
Get the parameters from configurable object and populate the config section with. More... | |
Public Member Functions inherited from marlinmt::Configurable | |
Configurable ()=default | |
virtual | ~Configurable ()=default |
template<typename T > | |
std::shared_ptr< ParameterImpl > | addParameter (EParameterType paramType, const std::string &name, const std::string &desc, std::shared_ptr< T > value) |
Add a parameter. More... | |
template<typename T > | |
std::shared_ptr< ParameterImpl > | addParameter (EParameterType paramType, const std::string &name, const std::string &desc, std::shared_ptr< T > value, T defVal) |
Add a parameter. More... | |
template<typename T > | |
T | parameter (const std::string &name) const |
Get a parameter value. More... | |
template<typename T > | |
T | parameter (const std::string &name, const T &fallback) const |
Get a parameter value. More... | |
void | checkParameter (const std::string &name) const |
Check if the parameter has been registered. More... | |
bool | exists (const std::string &name) const |
Return true if the parameter has been registered. More... | |
bool | isSet (const std::string &name) const |
Returns true if the parameter exists and is set, false otherwise. More... | |
void | clear () |
Remove all parameters. More... | |
void | unset () |
Unset all registered parameters. More... | |
iterator | begin () |
const_iterator | begin () const |
iterator | end () |
const_iterator | end () const |
Private Member Functions | |
void | preConfigure () |
void | configureProcessors () |
void | configurePool () |
Private Attributes | |
WorkerPool | _pool {} |
< The worker thread pool More... | |
ProcessorSequence | _superSequence {nullptr} |
The list of worker output promises. More... | |
PushResultList | _pushResults {} |
The start time. More... | |
clock::time_point | _startTime {} |
The end time. More... | |
clock::time_point | _endTime {} |
The total time spent on processing run headers. More... | |
clock::duration_rep | _runHeaderTime {0} |
The total time spent on locking on thread pool queue access. More... | |
clock::duration_rep | _lockingTime {0} |
The total time spent on popping events from the output event pool. More... | |
clock::duration_rep | _popTime {0} |
UIntParameter | _queueSize {*this, "EventQueueSize", "The input event queue size (default 2*nthreads)"} |
The scheduler event queue size. More... | |
Additional Inherited Members | |
Protected Attributes inherited from marlinmt::Component | |
std::string | _type {} |
The component type. More... | |
std::string | _name {} |
The component name. More... | |
std::string | _description {"No description"} |
The component description. More... | |
Application * | _application {nullptr} |
The application in which the component has been registered. More... | |
LoggerPtr | _logger {nullptr} |
The logger instance. More... | |
StringParameter | _verbosity { *this, "Verbosity", "The component verbosity level", "MESSAGE" } |
The verbosity level of the logger (parameter) More... | |
Protected Attributes inherited from marlinmt::Configurable | |
ParameterMap | _parameters {} |
The parameter map. More... | |
PEPScheduler class Parallel Event Processing Scheduler.
Implements the scheduling of parallel inter-event processing.
A set of N worker threads are allocated at startup within a thread pool. Every time a new event is pushed in the scheduler, the event is queued in the thread pool for further processing. Note that this operation can fail if the thread pool queue is full. Use freeSlots() to know how many slots are free in the thread pool queue and avoid unexpected exceptions.
Definition at line 45 of file PEPScheduler.h.
using marlinmt::concurrency::PEPScheduler::Clock = std::chrono::steady_clock |
Definition at line 54 of file PEPScheduler.h.
using marlinmt::concurrency::PEPScheduler::ConditionsMap = std::map<std::string, std::string> |
Definition at line 47 of file PEPScheduler.h.
using marlinmt::concurrency::PEPScheduler::EventList = std::vector<std::shared_ptr<EventStore> > |
Definition at line 53 of file PEPScheduler.h.
using marlinmt::concurrency::PEPScheduler::InputType = std::shared_ptr<EventStore> |
Definition at line 48 of file PEPScheduler.h.
Definition at line 49 of file PEPScheduler.h.
using marlinmt::concurrency::PEPScheduler::ProcessorSequence = std::shared_ptr<SuperSequence> |
Definition at line 51 of file PEPScheduler.h.
using marlinmt::concurrency::PEPScheduler::PushResultList = std::vector<WorkerPool::PushResult> |
Definition at line 52 of file PEPScheduler.h.
using marlinmt::concurrency::PEPScheduler::TimePoint = std::chrono::steady_clock::time_point |
Definition at line 55 of file PEPScheduler.h.
Definition at line 50 of file PEPScheduler.h.
marlinmt::concurrency::PEPScheduler::PEPScheduler | ( | ) |
Constructor.
Definition at line 76 of file PEPScheduler.cc.
References marlinmt::Component::setName().
|
private |
Definition at line 170 of file PEPScheduler.cc.
References _pool, _queueSize, _superSequence, marlinmt::concurrency::ThreadPool< IN, OUT >::addWorker(), marlinmt::ParameterBase< T >::get(), marlinmt::ParameterBase< T >::isSet(), marlinmt::concurrency::ThreadPool< IN, OUT >::setAcceptPush(), marlinmt::concurrency::ThreadPool< IN, OUT >::setMaxQueueSize(), and marlinmt::concurrency::ThreadPool< IN, OUT >::start().
Referenced by initialize().
|
private |
Definition at line 144 of file PEPScheduler.cc.
References _superSequence, marlinmt::Component::application(), marlinmt::Application::configuration(), MARLINMT_THROW, marlinmt::ConfigSection::parameterNames(), and marlinmt::Configuration::section().
Referenced by initialize().
|
overridevirtual |
Terminate the scheduler activites Cleanup memory, etc ...
Implements marlinmt::IScheduler.
Definition at line 94 of file PEPScheduler.cc.
References _endTime, _lockingTime, marlinmt::Component::_logger, _pool, _popTime, _pushResults, _runHeaderTime, _startTime, _superSequence, marlinmt::Component::error(), marlinmt::Component::message(), marlinmt::clock::now(), popFinishedEvents(), marlinmt::concurrency::ThreadPool< IN, OUT >::stop(), and marlinmt::clock::time_difference().
|
overridevirtual |
Get the number of free event slots.
Implements marlinmt::IScheduler.
Definition at line 243 of file PEPScheduler.cc.
References _pool, and marlinmt::concurrency::ThreadPool< IN, OUT >::freeSlots().
|
overridevirtual |
Initialize the scheduler.
Read the config section from the configuration
Reimplemented from marlinmt::IScheduler.
Definition at line 83 of file PEPScheduler.cc.
References _startTime, configurePool(), configureProcessors(), marlinmt::IScheduler::initialize(), marlinmt::clock::now(), and preConfigure().
|
overridevirtual |
Retrieve finished events from the scheduler.
events | the list of event to retrieve |
Implements marlinmt::IScheduler.
Definition at line 220 of file PEPScheduler.cc.
References _popTime, _pushResults, marlinmt::Component::message(), and marlinmt::clock::now().
Referenced by end().
|
private |
Definition at line 136 of file PEPScheduler.cc.
References marlinmt::CmdLineParser::ParseResult::_nthreads, _superSequence, marlinmt::Component::application(), marlinmt::Application::cmdLineParseResult(), and marlinmt::details::nthreads().
Referenced by initialize().
|
overridevirtual |
Process a run header.
rhdr | the run header to process |
Implements marlinmt::IScheduler.
Definition at line 190 of file PEPScheduler.cc.
References _pool, _runHeaderTime, _superSequence, marlinmt::concurrency::ThreadPool< IN, OUT >::active(), marlinmt::clock::now(), and marlinmt::concurrency::ThreadPool< IN, OUT >::setAcceptPush().
|
overridevirtual |
Push a new event to the scheduler for processing.
event | the event to push |
Implements marlinmt::IScheduler.
Definition at line 211 of file PEPScheduler.cc.
References _lockingTime, _pool, _pushResults, marlinmt::clock::now(), and marlinmt::concurrency::ThreadPool< IN, OUT >::push().
|
private |
The total time spent on processing run headers.
Definition at line 84 of file PEPScheduler.h.
Referenced by end().
|
private |
The total time spent on popping events from the output event pool.
Definition at line 88 of file PEPScheduler.h.
Referenced by end(), and pushEvent().
|
private |
< The worker thread pool
The processor super sequence
Definition at line 76 of file PEPScheduler.h.
Referenced by configurePool(), end(), freeSlots(), processRunHeader(), and pushEvent().
|
private |
Definition at line 90 of file PEPScheduler.h.
Referenced by end(), and popFinishedEvents().
|
private |
The start time.
Definition at line 80 of file PEPScheduler.h.
Referenced by end(), popFinishedEvents(), and pushEvent().
|
private |
The scheduler event queue size.
Definition at line 92 of file PEPScheduler.h.
Referenced by configurePool().
|
private |
The total time spent on locking on thread pool queue access.
Definition at line 86 of file PEPScheduler.h.
Referenced by end(), and processRunHeader().
|
private |
|
private |
The list of worker output promises.
Definition at line 78 of file PEPScheduler.h.
Referenced by configurePool(), configureProcessors(), end(), preConfigure(), and processRunHeader().