20 throw Exception(
"SequenceItem: got a nullptr for processor" ) ;
30 throw Exception(
"SequenceItem: got a nullptr for processor" ) ;
38 std::lock_guard<std::mutex> lock( *
_mutex ) ;
51 std::lock_guard<std::mutex> lock( *
_mutex ) ;
56 clock::time_difference<clock::seconds>(start, end),
57 clock::time_difference<clock::seconds>(start2, end)) ;
64 clock::time_difference<clock::seconds>(start, end),
65 clock::time_difference<clock::seconds>(start, end)) ;
85 return std::make_shared<SequenceItem>(
processor, lock ) ;
91 auto iter = std::find_if(_items.begin(), _items.end(), [&](std::shared_ptr<SequenceItem> i){
92 return (i->name() == item->name()) ;
94 if( _items.end() != iter ) {
95 throw Exception(
"Sequence::addItem: processor '" + item->name() +
"' already in sequence" ) ;
97 _items.push_back( item ) ;
104 return _items.at( index ) ;
110 return _items.size() ;
116 for (
auto item : _items ) {
117 item->processRunHeader( rhdr ) ;
126 for (
auto item : _items ) {
127 if ( not extension->check( item->name() ) ) {
130 auto clockMeas = item->processEvent( event ) ;
131 auto iter = _clockMeasures.find( item->name() ) ;
132 iter->second._appClock += clockMeas.first ;
133 iter->second._procClock += clockMeas.second ;
134 iter->second._counter ++ ;
137 catch ( SkipEventException& e ) {
138 auto iter = _skipEventMap.find( e.what() ) ;
139 if ( _skipEventMap.end() == iter ) {
140 _skipEventMap.insert( SkippedEventMap::value_type( e.what() , 1 ) ) ;
152 for (
auto t : _clockMeasures ) {
153 summary.
_appClock += t.second._appClock ;
154 summary._procClock += t.second._procClock ;
155 summary._counter += t.second._counter ;
163 return _clockMeasures ;
169 return _skipEventMap ;
177 throw Exception(
"SuperSequence: number of sequences must be > 0" ) ;
179 _sequences.resize(nseqs) ;
180 for( std::size_t i=0 ; i<nseqs ; ++i ) {
181 _sequences.at(i) = std::make_shared<Sequence>() ;
188 for(
auto item : _uniqueItems ) {
189 item->processor()->setup( app ) ;
196 return _sequences.at( index ) ;
203 const bool cloneSet = parameters.
hasParameter(
"ProcessorClone" ) ;
204 const bool criticalSet = parameters.
hasParameter(
"ProcessorCritical" ) ;
205 bool clone = parameters.
parameter<
bool>(
"ProcessorClone", true ) ;
206 bool critical = parameters.
parameter<
bool>(
"ProcessorCritical", false ) ;
207 auto type = parameters.
parameter<std::string>(
"ProcessorType" ) ;
208 auto name = parameters.
parameter<std::string>(
"ProcessorName" ) ;
211 if(
nullptr == processor ) {
212 throw Exception(
"Processor of type '" + type +
"' doesn't exists !" ) ;
216 if( cloneOpt.has_value() ) {
217 if( cloneSet and (cloneOpt.value() != clone) ) {
220 "' clone option forced to " +
221 (cloneOpt.value() ?
"true" :
"false") +
224 clone = cloneOpt.value() ;
226 if( criticalOpt.has_value() ) {
227 if( criticalSet and (criticalOpt.value() != critical) ) {
230 "' critical option forced to " +
231 (criticalOpt.value() ?
"true" :
"false") +
234 critical = criticalOpt.value() ;
236 processor->setName( name ) ;
237 processor->setParameters( parameters ) ;
238 std::shared_ptr<std::mutex> lock = critical ? std::make_shared<std::mutex>() :
nullptr ;
241 auto item = _sequences.at(0)->createItem( processor, lock ) ;
242 _sequences.at(0)->addItem( item ) ;
243 _uniqueItems.insert( item ) ;
244 for(
SizeType i=1 ; i<size() ; ++i ) {
245 processor = pluginMgr.create<
Processor>( type ) ;
247 processor->setParameters( parameters ) ;
248 item = _sequences.at(i)->createItem( processor, lock ) ;
249 _sequences.at(i)->addItem( item ) ;
250 _uniqueItems.insert( item ) ;
255 auto item = _sequences.at(0)->createItem( processor, lock ) ;
256 _sequences.at(0)->addItem( item ) ;
257 _uniqueItems.insert( item ) ;
258 for(
SizeType i=1 ; i<size() ; ++i ) {
259 _sequences.at(i)->addItem( item ) ;
267 return _sequences.size() ;
273 for(
auto item : _uniqueItems ) {
274 item->processRunHeader( rhdr ) ;
281 for(
auto item : _uniqueItems ) {
282 item->processor()->end() ;
292 for(
unsigned int i=0 ; i<size() ; ++i ) {
293 auto skipped = sequence(i)->skippedEvents() ;
294 auto clocks = sequence(i)->clockMeasures() ;
296 for(
auto sk : skipped ) {
297 auto iter = skippedEvents.find( sk.first ) ;
298 if( skippedEvents.end() != iter ) {
299 iter->second += sk.second ;
302 skippedEvents.insert( sk ) ;
306 for(
auto clk : clocks ) {
307 auto iter = clockMeasures.find( clk.first ) ;
308 if( clockMeasures.end() != iter ) {
309 iter->second._appClock += clk.second._appClock ;
310 iter->second._procClock += clk.second._procClock ;
311 iter->second._counter += clk.second._counter ;
314 clockMeasures.insert( clk ) ;
318 logger->log<MESSAGE>() <<
"--------------------------------------------------------- " << std::endl ;
319 logger->log<MESSAGE>() <<
"-- Events skipped by processors : " << std::endl ;
320 unsigned int nSkipped = 0 ;
321 for(
auto skip : skippedEvents ) {
322 logger->log<MESSAGE>() <<
"-- " << skip.first <<
": \t" << skip.second << std::endl ;
323 nSkipped += skip.second ;
325 logger->log<MESSAGE>() <<
"-- Total: " << nSkipped << std::endl ;
326 logger->log<MESSAGE>() <<
"--------------------------------------------------------- " << std::endl
328 logger->log<MESSAGE>() <<
"--------------------------------------------------------- " << std::endl
329 <<
" Time used by processors ( in processEvent() ) : " << std::endl
331 std::list<Sequence::ClockMeasureMap::value_type> clockList( clockMeasures.begin() , clockMeasures.end() ) ;
332 typedef std::list<Sequence::ClockMeasureMap::value_type>::value_type elt ;
333 clockList.sort( [](
const elt &lhs,
const elt &rhs) {
334 return ( lhs.second._procClock > rhs.second._procClock ) ;
336 double clockTotal = 0.0 ;
338 for(
auto clockMeasure : clockList ) {
339 std::string procName = clockMeasure.first ;
340 procName.resize(40,
' ') ;
341 clockTotal += clockMeasure.second._procClock ;
342 int lockTimeFraction =
static_cast<int>(((clockMeasure.second._appClock - clockMeasure.second._procClock) / clockMeasure.second._appClock) * 100.) ;
343 if( clockMeasure.second._counter > eventTotal ){
344 eventTotal = clockMeasure.second._counter ;
346 std::stringstream ss ;
347 if ( clockMeasure.second._counter > 0 ) {
348 ss << clockMeasure.second._procClock /
static_cast<float>(clockMeasure.second._counter) ;
353 std::stringstream lockPrint ;
355 lockPrint <<
"(lock: " << lockTimeFraction <<
" %)";
357 logger->log<MESSAGE>()
359 << std::setw(12) << std::scientific << clockMeasure.second._procClock <<
" s " 360 <<
"in " << std::setw(12) << clockMeasure.second._counter
362 << std::setw(12) << std::scientific << ss.str() <<
" [ s/evt.] " 364 << std::endl << std::endl ;
366 std::stringstream ss ;
367 if ( eventTotal > 0 ) {
368 ss << clockTotal / eventTotal ;
373 logger->log<MESSAGE>()
375 << std::setw(12) << std::scientific << clockTotal <<
" s in " 376 << std::setw(12) << eventTotal <<
" events ==> " 377 << std::setw(12) << std::scientific << ss.str() <<
" [ s/evt.] " 378 << std::endl << std::endl ;
379 logger->log<MESSAGE>() <<
"--------------------------------------------------------- " << std::endl ;
void end()
Call Processor::end() for all processors.
const SkippedEventMap & skippedEvents() const
Get all the skipped events of the sequence.
T parameter(const std::string &n) const
Get a parameter value as type T.
std::map< std::string, int > SkippedEventMap
std::map< std::string, ClockMeasure > ClockMeasureMap
void processRunHeader(std::shared_ptr< RunHeader > rhdr)
Process the run header.
SizeType size() const
Get the number of items in the sequence.
Sequences::size_type SizeType
Sequences::size_type Index
ProcessorConditionsExtension class Event extension providing access to processor runtime conditions (...
void addItem(std::shared_ptr< SequenceItem > item)
Add an item to the sequence.
clock::pair processEvent(std::shared_ptr< EventStore > event)
Call Processor::processEvent.
void setName(const std::string &n)
Set the component name.
void processRunHeader(std::shared_ptr< RunHeader > rhdr)
Process the run header.
std::shared_ptr< Processor > processor() const
Call Processor::modifyEvent.
Container::size_type SizeType
std::shared_ptr< SequenceItem > at(Index index) const
Get a sequence item at the specified index.
std::shared_ptr< Processor > _processor
< The processor instance
static time_point now()
Get the current time.
void addProcessor(const ConfigSection ¶meters)
Add a processor using the input parameters.
bool hasParameter(const std::string &n) const
Whether the parameter exists.
Application class Base application interface for running a Marlin application.
std::shared_ptr< SequenceItem > createItem(std::shared_ptr< Processor > processor, std::shared_ptr< std::mutex > lock) const
Create a sequence item.
void processRunHeader(std::shared_ptr< RunHeader > rhdr)
Process the run header.
const std::string & name() const
Get the processor name.
ClockMeasure clockMeasureSummary() const
Generate a clock measure summary of all items.
clock::duration_rep _appClock
The total time spent by the application on processEvent() and modifyEvent() calls.
Container::size_type Index
Whether the processor has to be executed in a critical section.
SequenceItem(std::shared_ptr< Processor > proc)
Constructor.
const ClockMeasureMap & clockMeasures() const
Get all the clock measurements of the sequence.
std::shared_ptr< streamlog::logstreamT< mutex_type > > Logger
void init(Application *app)
Call Processor::baseInit(app) for all processors.
ConfigSection class Holds a set of parameters and subsection.
void printStatistics(Logging::Logger logger) const
Print statistics at end of application.
std::shared_ptr< Sequence > sequence(Index index) const
Get the sequence at the given index.
void processEvent(std::shared_ptr< EventStore > event)
Process the event.
static PluginManager & instance()
Get the plugin manager instance.
std::pair< duration_rep, duration_rep > pair
ClockMeasure struct Holds clock measurement data for processors.
std::shared_ptr< std::mutex > _mutex
SizeType size() const
Get the number of sequences.