MarlinMT  0.1.0
Sequence.cc
Go to the documentation of this file.
1 #include <marlinmt/Sequence.h>
2 
3 // -- marlinmt headers
4 #include <marlinmt/Processor.h>
5 #include <marlinmt/Exceptions.h>
9 
10 // -- std headers
11 #include <algorithm>
12 #include <iomanip>
13 
14 namespace marlinmt {
15 
16  SequenceItem::SequenceItem( std::shared_ptr<Processor> proc ) :
17  _processor(proc),
18  _mutex(nullptr) {
19  if( nullptr == _processor ) {
20  throw Exception( "SequenceItem: got a nullptr for processor" ) ;
21  }
22  }
23 
24  //--------------------------------------------------------------------------
25 
26  SequenceItem::SequenceItem( std::shared_ptr<Processor> proc, std::shared_ptr<std::mutex> lock ) :
27  _processor(proc),
28  _mutex(lock) {
29  if( nullptr == _processor ) {
30  throw Exception( "SequenceItem: got a nullptr for processor" ) ;
31  }
32  }
33 
34  //--------------------------------------------------------------------------
35 
36  void SequenceItem::processRunHeader( std::shared_ptr<RunHeader> rhdr ) {
37  if( nullptr != _mutex ) {
38  std::lock_guard<std::mutex> lock( *_mutex ) ;
39  _processor->processRunHeader( rhdr.get() ) ;
40  }
41  else {
42  _processor->processRunHeader( rhdr.get() ) ;
43  }
44  }
45 
46  //--------------------------------------------------------------------------
47 
48  clock::pair SequenceItem::processEvent( std::shared_ptr<EventStore> event ) {
49  if( nullptr != _mutex ) {
50  auto start = clock::now() ;
51  std::lock_guard<std::mutex> lock( *_mutex ) ;
52  auto start2 = clock::now() ;
53  _processor->processEvent( event.get() ) ;
54  auto end = clock::now() ;
55  return clock::pair(
56  clock::time_difference<clock::seconds>(start, end),
57  clock::time_difference<clock::seconds>(start2, end)) ;
58  }
59  else {
60  auto start = clock::now() ;
61  _processor->processEvent( event.get() ) ;
62  auto end = clock::now() ;
63  return clock::pair(
64  clock::time_difference<clock::seconds>(start, end),
65  clock::time_difference<clock::seconds>(start, end)) ;
66  }
67  }
68 
69  //--------------------------------------------------------------------------
70 
71  std::shared_ptr<Processor> SequenceItem::processor() const {
72  return _processor ;
73  }
74 
75  //--------------------------------------------------------------------------
76 
77  const std::string &SequenceItem::name() const {
78  return _processor->name() ;
79  }
80 
81  //--------------------------------------------------------------------------
82  //--------------------------------------------------------------------------
83 
84  std::shared_ptr<SequenceItem> Sequence::createItem( std::shared_ptr<Processor> processor, std::shared_ptr<std::mutex> lock ) const {
85  return std::make_shared<SequenceItem>( processor, lock ) ;
86  }
87 
88  //--------------------------------------------------------------------------
89 
90  void Sequence::addItem( std::shared_ptr<SequenceItem> item ) {
91  auto iter = std::find_if(_items.begin(), _items.end(), [&](std::shared_ptr<SequenceItem> i){
92  return (i->name() == item->name()) ;
93  });
94  if( _items.end() != iter ) {
95  throw Exception( "Sequence::addItem: processor '" + item->name() + "' already in sequence" ) ;
96  }
97  _items.push_back( item ) ;
98  _clockMeasures[item->name()] = ClockMeasure() ;
99  }
100 
101  //--------------------------------------------------------------------------
102 
103  std::shared_ptr<SequenceItem> Sequence::at( Index index ) const {
104  return _items.at( index ) ;
105  }
106 
107  //--------------------------------------------------------------------------
108 
110  return _items.size() ;
111  }
112 
113  //--------------------------------------------------------------------------
114 
115  void Sequence::processRunHeader( std::shared_ptr<RunHeader> rhdr ) {
116  for ( auto item : _items ) {
117  item->processRunHeader( rhdr ) ;
118  }
119  }
120 
121  //--------------------------------------------------------------------------
122 
123  void Sequence::processEvent( std::shared_ptr<EventStore> event ) {
124  try {
125  auto extension = event->extensions().get<extensions::ProcessorConditions, ProcessorConditionsExtension>() ;
126  for ( auto item : _items ) {
127  if ( not extension->check( item->name() ) ) {
128  continue ;
129  }
130  auto clockMeas = item->processEvent( event ) ;
131  auto iter = _clockMeasures.find( item->name() ) ;
132  iter->second._appClock += clockMeas.first ;
133  iter->second._procClock += clockMeas.second ;
134  iter->second._counter ++ ;
135  }
136  }
137  catch ( SkipEventException& e ) {
138  auto iter = _skipEventMap.find( e.what() ) ;
139  if ( _skipEventMap.end() == iter ) {
140  _skipEventMap.insert( SkippedEventMap::value_type( e.what() , 1 ) ) ;
141  }
142  else {
143  iter->second ++;
144  }
145  }
146  }
147 
148  //--------------------------------------------------------------------------
149 
151  ClockMeasure summary {} ;
152  for ( auto t : _clockMeasures ) {
153  summary._appClock += t.second._appClock ;
154  summary._procClock += t.second._procClock ;
155  summary._counter += t.second._counter ;
156  }
157  return summary ;
158  }
159 
160  //--------------------------------------------------------------------------
161 
163  return _clockMeasures ;
164  }
165 
166  //--------------------------------------------------------------------------
167 
169  return _skipEventMap ;
170  }
171 
172  //--------------------------------------------------------------------------
173  //--------------------------------------------------------------------------
174 
175  SuperSequence::SuperSequence( std::size_t nseqs ) {
176  if( 0 == nseqs ) {
177  throw Exception( "SuperSequence: number of sequences must be > 0" ) ;
178  }
179  _sequences.resize(nseqs) ;
180  for( std::size_t i=0 ; i<nseqs ; ++i ) {
181  _sequences.at(i) = std::make_shared<Sequence>() ;
182  }
183  }
184 
185  //--------------------------------------------------------------------------
186 
188  for( auto item : _uniqueItems ) {
189  item->processor()->setup( app ) ;
190  }
191  }
192 
193  //--------------------------------------------------------------------------
194 
195  std::shared_ptr<Sequence> SuperSequence::sequence( Index index ) const {
196  return _sequences.at( index ) ;
197  }
198 
199  //--------------------------------------------------------------------------
200 
201  void SuperSequence::addProcessor( const ConfigSection &parameters ) {
202 
203  const bool cloneSet = parameters.hasParameter( "ProcessorClone" ) ;
204  const bool criticalSet = parameters.hasParameter( "ProcessorCritical" ) ;
205  bool clone = parameters.parameter<bool>( "ProcessorClone", true ) ;
206  bool critical = parameters.parameter<bool>( "ProcessorCritical", false ) ;
207  auto type = parameters.parameter<std::string>( "ProcessorType" ) ;
208  auto name = parameters.parameter<std::string>( "ProcessorName" ) ;
209  auto &pluginMgr = PluginManager::instance() ;
210  std::shared_ptr<Processor> processor = pluginMgr.create<Processor>( type ) ;
211  if( nullptr == processor ) {
212  throw Exception( "Processor of type '" + type + "' doesn't exists !" ) ;
213  }
214  auto cloneOpt = processor->runtimeOption( Processor::ERuntimeOption::eClone ) ;
215  auto criticalOpt = processor->runtimeOption( Processor::ERuntimeOption::eCritical ) ;
216  if( cloneOpt.has_value() ) {
217  if( cloneSet and (cloneOpt.value() != clone) ) {
218  throw Exception( "Processor '" +
219  type +
220  "' clone option forced to " +
221  (cloneOpt.value() ? "true" : "false") +
222  "!") ;
223  }
224  clone = cloneOpt.value() ;
225  }
226  if( criticalOpt.has_value() ) {
227  if( criticalSet and (criticalOpt.value() != critical) ) {
228  throw Exception( "Processor '" +
229  type +
230  "' critical option forced to " +
231  (criticalOpt.value() ? "true" : "false") +
232  "!") ;
233  }
234  critical = criticalOpt.value() ;
235  }
236  processor->setName( name ) ;
237  processor->setParameters( parameters ) ;
238  std::shared_ptr<std::mutex> lock = critical ? std::make_shared<std::mutex>() : nullptr ;
239  if( clone ) {
240  // add the first but then create new processor instances and add them
241  auto item = _sequences.at(0)->createItem( processor, lock ) ;
242  _sequences.at(0)->addItem( item ) ;
243  _uniqueItems.insert( item ) ;
244  for( SizeType i=1 ; i<size() ; ++i ) {
245  processor = pluginMgr.create<Processor>( type ) ;
246  processor->setName( name ) ;
247  processor->setParameters( parameters ) ;
248  item = _sequences.at(i)->createItem( processor, lock ) ;
249  _sequences.at(i)->addItem( item ) ;
250  _uniqueItems.insert( item ) ;
251  }
252  }
253  else {
254  // add the first and re-use the same item
255  auto item = _sequences.at(0)->createItem( processor, lock ) ;
256  _sequences.at(0)->addItem( item ) ;
257  _uniqueItems.insert( item ) ;
258  for( SizeType i=1 ; i<size() ; ++i ) {
259  _sequences.at(i)->addItem( item ) ;
260  }
261  }
262  }
263 
264  //--------------------------------------------------------------------------
265 
267  return _sequences.size() ;
268  }
269 
270  //--------------------------------------------------------------------------
271 
272  void SuperSequence::processRunHeader( std::shared_ptr<RunHeader> rhdr ) {
273  for( auto item : _uniqueItems ) {
274  item->processRunHeader( rhdr ) ;
275  }
276  }
277 
278  //--------------------------------------------------------------------------
279 
281  for( auto item : _uniqueItems ) {
282  item->processor()->end() ;
283  }
284  }
285 
286  //--------------------------------------------------------------------------
287 
289  // first merge measurements from the different sequences
290  Sequence::SkippedEventMap skippedEvents {} ;
291  Sequence::ClockMeasureMap clockMeasures {} ;
292  for( unsigned int i=0 ; i<size() ; ++i ) {
293  auto skipped = sequence(i)->skippedEvents() ;
294  auto clocks = sequence(i)->clockMeasures() ;
295  // merge skipped events stats
296  for( auto sk : skipped ) {
297  auto iter = skippedEvents.find( sk.first ) ;
298  if( skippedEvents.end() != iter ) {
299  iter->second += sk.second ;
300  }
301  else {
302  skippedEvents.insert( sk ) ;
303  }
304  }
305  // merge clocks stats
306  for( auto clk : clocks ) {
307  auto iter = clockMeasures.find( clk.first ) ;
308  if( clockMeasures.end() != iter ) {
309  iter->second._appClock += clk.second._appClock ;
310  iter->second._procClock += clk.second._procClock ;
311  iter->second._counter += clk.second._counter ;
312  }
313  else {
314  clockMeasures.insert( clk ) ;
315  }
316  }
317  }
318  logger->log<MESSAGE>() << "--------------------------------------------------------- " << std::endl ;
319  logger->log<MESSAGE>() << "-- Events skipped by processors : " << std::endl ;
320  unsigned int nSkipped = 0 ;
321  for( auto skip : skippedEvents ) {
322  logger->log<MESSAGE>() << "-- " << skip.first << ": \t" << skip.second << std::endl ;
323  nSkipped += skip.second ;
324  }
325  logger->log<MESSAGE>() << "-- Total: " << nSkipped << std::endl ;
326  logger->log<MESSAGE>() << "--------------------------------------------------------- " << std::endl
327  << std::endl ;
328  logger->log<MESSAGE>() << "--------------------------------------------------------- " << std::endl
329  << " Time used by processors ( in processEvent() ) : " << std::endl
330  << std::endl ;
331  std::list<Sequence::ClockMeasureMap::value_type> clockList( clockMeasures.begin() , clockMeasures.end() ) ;
332  typedef std::list<Sequence::ClockMeasureMap::value_type>::value_type elt ;
333  clockList.sort( [](const elt &lhs, const elt &rhs) {
334  return ( lhs.second._procClock > rhs.second._procClock ) ;
335  }) ;
336  double clockTotal = 0.0 ;
337  int eventTotal = 0 ;
338  for( auto clockMeasure : clockList ) {
339  std::string procName = clockMeasure.first ;
340  procName.resize(40, ' ') ;
341  clockTotal += clockMeasure.second._procClock ;
342  int lockTimeFraction = static_cast<int>(((clockMeasure.second._appClock - clockMeasure.second._procClock) / clockMeasure.second._appClock) * 100.) ;
343  if( clockMeasure.second._counter > eventTotal ){
344  eventTotal = clockMeasure.second._counter ;
345  }
346  std::stringstream ss ;
347  if ( clockMeasure.second._counter > 0 ) {
348  ss << clockMeasure.second._procClock / static_cast<float>(clockMeasure.second._counter) ;
349  }
350  else {
351  ss << "NaN" ;
352  }
353  std::stringstream lockPrint ;
354  if( size() > 1 ) {
355  lockPrint << "(lock: " << lockTimeFraction << " %)";
356  }
357  logger->log<MESSAGE>()
358  << procName
359  << std::setw(12) << std::scientific << clockMeasure.second._procClock << " s "
360  << "in " << std::setw(12) << clockMeasure.second._counter
361  << " events ==> "
362  << std::setw(12) << std::scientific << ss.str() << " [ s/evt.] "
363  << lockPrint.str()
364  << std::endl << std::endl ;
365  }
366  std::stringstream ss ;
367  if ( eventTotal > 0 ) {
368  ss << clockTotal / eventTotal ;
369  }
370  else {
371  ss << "NaN" ;
372  }
373  logger->log<MESSAGE>()
374  << "Total: "
375  << std::setw(12) << std::scientific << clockTotal << " s in "
376  << std::setw(12) << eventTotal << " events ==> "
377  << std::setw(12) << std::scientific << ss.str() << " [ s/evt.] "
378  << std::endl << std::endl ;
379  logger->log<MESSAGE>() << "--------------------------------------------------------- " << std::endl ;
380  }
381 
382 }
void end()
Call Processor::end() for all processors.
Definition: Sequence.cc:280
const SkippedEventMap & skippedEvents() const
Get all the skipped events of the sequence.
Definition: Sequence.cc:168
T parameter(const std::string &n) const
Get a parameter value as type T.
std::map< std::string, int > SkippedEventMap
Definition: Sequence.h:134
std::map< std::string, ClockMeasure > ClockMeasureMap
Definition: Sequence.h:133
void processRunHeader(std::shared_ptr< RunHeader > rhdr)
Process the run header.
Definition: Sequence.cc:115
SizeType size() const
Get the number of items in the sequence.
Definition: Sequence.cc:109
Sequences::size_type SizeType
Definition: Sequence.h:227
Sequences::size_type Index
Definition: Sequence.h:226
ProcessorConditionsExtension class Event extension providing access to processor runtime conditions (...
void addItem(std::shared_ptr< SequenceItem > item)
Add an item to the sequence.
Definition: Sequence.cc:90
clock::pair processEvent(std::shared_ptr< EventStore > event)
Call Processor::processEvent.
Definition: Sequence.cc:48
void setName(const std::string &n)
Set the component name.
Definition: Component.cc:29
void processRunHeader(std::shared_ptr< RunHeader > rhdr)
Process the run header.
Definition: Sequence.cc:272
std::shared_ptr< Processor > processor() const
Call Processor::modifyEvent.
Definition: Sequence.cc:71
Container::size_type SizeType
Definition: Sequence.h:132
std::shared_ptr< SequenceItem > at(Index index) const
Get a sequence item at the specified index.
Definition: Sequence.cc:103
std::shared_ptr< Processor > _processor
< The processor instance
Definition: Sequence.h:92
static time_point now()
Get the current time.
Definition: Utils.h:50
void addProcessor(const ConfigSection &parameters)
Add a processor using the input parameters.
Definition: Sequence.cc:201
bool hasParameter(const std::string &n) const
Whether the parameter exists.
Application class Base application interface for running a Marlin application.
Definition: Application.h:25
std::shared_ptr< SequenceItem > createItem(std::shared_ptr< Processor > processor, std::shared_ptr< std::mutex > lock) const
Create a sequence item.
Definition: Sequence.cc:84
void processRunHeader(std::shared_ptr< RunHeader > rhdr)
Process the run header.
Definition: Sequence.cc:36
const std::string & name() const
Get the processor name.
Definition: Sequence.cc:77
ClockMeasure clockMeasureSummary() const
Generate a clock measure summary of all items.
Definition: Sequence.cc:150
clock::duration_rep _appClock
The total time spent by the application on processEvent() and modifyEvent() calls.
Definition: Sequence.h:106
Container::size_type Index
Definition: Sequence.h:131
Whether the processor has to be executed in a critical section.
SequenceItem(std::shared_ptr< Processor > proc)
Constructor.
Definition: Sequence.cc:16
const ClockMeasureMap & clockMeasures() const
Get all the clock measurements of the sequence.
Definition: Sequence.cc:162
std::shared_ptr< streamlog::logstreamT< mutex_type > > Logger
Definition: Logging.h:73
Processor class.
Definition: Processor.h:43
void init(Application *app)
Call Processor::baseInit(app) for all processors.
Definition: Sequence.cc:187
ConfigSection class Holds a set of parameters and subsection.
Definition: Configuration.h:20
void printStatistics(Logging::Logger logger) const
Print statistics at end of application.
Definition: Sequence.cc:288
std::shared_ptr< Sequence > sequence(Index index) const
Get the sequence at the given index.
Definition: Sequence.cc:195
void processEvent(std::shared_ptr< EventStore > event)
Process the event.
Definition: Sequence.cc:123
static PluginManager & instance()
Get the plugin manager instance.
Exception class.
Definition: Exceptions.h:60
std::pair< duration_rep, duration_rep > pair
Definition: Utils.h:33
ClockMeasure struct Holds clock measurement data for processors.
Definition: Sequence.h:104
std::shared_ptr< std::mutex > _mutex
Definition: Sequence.h:94
SizeType size() const
Get the number of sequences.
Definition: Sequence.cc:266