MarlinMT  0.1.0
PEPScheduler.cc
Go to the documentation of this file.
2 
3 // -- marlinmt headers
4 #include <marlinmt/Application.h>
5 #include <marlinmt/Utils.h>
6 #include <marlinmt/Sequence.h>
7 #include <marlinmt/Processor.h>
9 #include <marlinmt/EventStore.h>
10 #include <marlinmt/RunHeader.h>
11 
12 // -- std headers
13 #include <exception>
14 #include <algorithm>
15 #include <iomanip>
16 #include <set>
17 
18 namespace marlinmt {
19 
20  namespace concurrency {
21 
25  class ProcessorSequenceWorker : public WorkerBase<PEPScheduler::InputType,PEPScheduler::OutputType> {
26  public:
28  using Input = Base::Input ;
29  using Output = Base::Output ;
30 
31  public:
32  ~ProcessorSequenceWorker() = default ;
33 
34  public:
40  ProcessorSequenceWorker( std::shared_ptr<Sequence> sequence ) ;
41 
42  private:
43  // from WorkerBase<IN,OUT>
44  Output process( Input && event ) override ;
45 
46  private:
48  std::shared_ptr<Sequence> _sequence {nullptr} ;
49  };
50 
51  //--------------------------------------------------------------------------
52  //--------------------------------------------------------------------------
53 
54  ProcessorSequenceWorker::ProcessorSequenceWorker( std::shared_ptr<Sequence> sequence ) :
55  _sequence(sequence) {
56  /* nop */
57  }
58 
59  //--------------------------------------------------------------------------
60 
62  Output output {} ;
63  output._event = event ;
64  try {
65  _sequence->processEvent( event ) ;
66  }
67  catch(...) {
68  output._exception = std::current_exception() ;
69  }
70  return output ;
71  }
72 
73  //--------------------------------------------------------------------------
74  //--------------------------------------------------------------------------
75 
77  IScheduler() {
78  setName( "PEPScheduler" ) ;
79  }
80 
81  //--------------------------------------------------------------------------
82 
84  // base init
86  preConfigure() ;
88  configurePool() ;
90  }
91 
92  //--------------------------------------------------------------------------
93 
95  _pool.stop(false) ;
96  EventList events ;
97  popFinishedEvents( events ) ;
98  if( not _pushResults.empty() ) {
99  error() << "This should never happen !!" << std::endl ;
100  }
101  message() << "Terminating application" << std::endl ;
102  _endTime = clock::now() ;
103  _superSequence->end() ;
104  // print some statistics
105  _superSequence->printStatistics( _logger ) ;
106  // print additional threading summary
107  const auto parallelTime = clock::time_difference( _startTime, _endTime ) - _runHeaderTime ;
108  double totalProcessorClock {0.0} ;
109  double totalApplicationClock {0.0} ;
110  for ( unsigned int i=0 ; i<_superSequence->size() ; ++i ) {
111  auto summary = _superSequence->sequence(i)->clockMeasureSummary() ;
112  totalProcessorClock += summary._procClock ;
113  totalApplicationClock += summary._appClock ;
114  }
115  const double speedup = totalProcessorClock / parallelTime ;
116  const double lockTimeFraction = ((totalApplicationClock - totalProcessorClock) / totalApplicationClock) * 100. ;
117  message() << "---------------------------------------------------" << std::endl ;
118  message() << "-- Threading summary" << std::endl ;
119  message() << "-- N threads: " << _superSequence->size() << std::endl ;
120  message() << "-- Speedup (serial/parallel): " << totalProcessorClock << " / " << parallelTime << " = " << speedup << std::endl ;
121  if( _superSequence->size() > 1 ) {
122  double speedupPercent = (speedup - 1) * 100 / static_cast<double>( _superSequence->size() - 1 ) ;
123  if( speedupPercent < 0 ) {
124  speedupPercent = 0. ;
125  }
126  message() << "-- Speedup percentage: " << speedupPercent << " " << '%' << std::endl ;
127  }
128  message() << "-- Queue lock time: " << _lockingTime << " ms" << std::endl ;
129  message() << "-- Pop event time: " << _popTime << " ms" << std::endl ;
130  message() << "-- Lock time fraction: " << lockTimeFraction << " %" << std::endl ;
131  message() << "---------------------------------------------------" << std::endl ;
132  }
133 
134  //--------------------------------------------------------------------------
135 
137  // create processor super sequence
138  unsigned int nthreads = application().cmdLineParseResult()._nthreads ;
139  _superSequence = std::make_shared<SuperSequence>(nthreads) ;
140  }
141 
142  //--------------------------------------------------------------------------
143 
145  log<DEBUG5>() << "PEPScheduler configureProcessors ..." << std::endl ;
146  auto &execSection = application().configuration().section("execute") ;
147  auto &procsSection = application().configuration().section("processors") ;
148  // create list of active processors
149  auto activeProcessors = execSection.parameterNames() ;
150  // auto activeProcessors = app->activeProcessors() ;
151  if ( activeProcessors.empty() ) {
152  MARLINMT_THROW( "Active processor list is empty !" ) ;
153  }
154  // populate processor sequences
155  for ( size_t i=0 ; i<activeProcessors.size() ; ++i ) {
156  auto procName = activeProcessors[ i ] ;
157  log<DEBUG5>() << "Active processor " << procName << std::endl ;
158  auto &procSection = procsSection.section( procName ) ;
159  // if ( nullptr == processorParameters ) {
160  // throw Exception( "PEPScheduler::configureProcessors: undefined processor '" + procName + "'" ) ;
161  // }
162  _superSequence->addProcessor( procSection ) ;
163  }
164  _superSequence->init( &application() ) ;
165  log<DEBUG5>() << "configureProcessors ... DONE" << std::endl ;
166  }
167 
168  //--------------------------------------------------------------------------
169 
171  // create N workers for N processor sequences
172  log<DEBUG5>() << "configurePool ..." << std::endl ;
173  log<DEBUG5>() << "Number of workers: " << _superSequence->size() << std::endl ;
174  for( unsigned int i=0 ; i<_superSequence->size() ; ++i ) {
175  log<DEBUG>() << "Adding worker ..." << std::endl ;
177  }
178  log<DEBUG5>() << "starting thread pool" << std::endl ;
179  unsigned int queueSize = _queueSize.isSet() ?
180  _queueSize.get() :
181  static_cast<unsigned int>(2 * _superSequence->size()) ;
182  _pool.setMaxQueueSize( queueSize ) ;
183  _pool.start() ;
184  _pool.setAcceptPush( true ) ;
185  log<DEBUG5>() << "configurePool ... DONE" << std::endl ;
186  }
187 
188  //--------------------------------------------------------------------------
189 
190  void PEPScheduler::processRunHeader( std::shared_ptr<RunHeader> rhdr ) {
191  // Current way to process run header:
192  // - Stop accepting event in thread pool
193  // - Wait for current events processing to finish
194  // - Process run header
195  // - Resume pool access for new event push
196  _pool.setAcceptPush( false ) ;
197  // need to wait for all current tasks to finish
198  // and then process run header
199  while( _pool.active() ) {
200  std::this_thread::sleep_for( std::chrono::microseconds(10) ) ;
201  }
202  auto rhdrStart = clock::now() ;
203  _superSequence->processRunHeader( rhdr ) ;
204  auto rhdrEnd = clock::now() ;
205  _runHeaderTime += clock::time_difference<clock::seconds>( rhdrStart, rhdrEnd ) ;
206  _pool.setAcceptPush( true ) ;
207  }
208 
209  //--------------------------------------------------------------------------
210 
211  void PEPScheduler::pushEvent( std::shared_ptr<EventStore> event ) {
212  // push event to thread pool queue. It might throw !
213  auto start = clock::now() ;
214  _pushResults.push_back( _pool.push( WorkerPool::PushPolicy::ThrowIfFull, std::move(event) ) ) ;
215  _lockingTime += clock::elapsed_since<clock::milliseconds>( start ) ;
216  }
217 
218  //--------------------------------------------------------------------------
219 
220  void PEPScheduler::popFinishedEvents( std::vector<std::shared_ptr<EventStore>> &events ) {
221  auto start = clock::now() ;
222  auto iter = _pushResults.begin() ;
223  while( iter != _pushResults.end() ) {
224  const bool finished = (iter->second.wait_for(std::chrono::seconds(0)) == std::future_status::ready) ;
225  if( finished ) {
226  auto output = iter->second.get() ;
227  // if an exception was raised during processing rethrow it there !
228  if( nullptr != output._exception ) {
229  std::rethrow_exception( output._exception ) ;
230  }
231  message() << "Finished event uid " << output._event->uid() << std::endl ;
232  events.push_back( output._event ) ;
233  iter = _pushResults.erase( iter ) ;
234  continue;
235  }
236  ++iter ;
237  }
238  _popTime += clock::elapsed_since<clock::milliseconds>( start ) ;
239  }
240 
241  //--------------------------------------------------------------------------
242 
243  std::size_t PEPScheduler::freeSlots() const {
244  return _pool.freeSlots() ;
245  }
246 
247  }
248 
249 } // namespace marlinmt
void setAcceptPush(bool accept)
Set whether the thread pool accept data push.
Definition: ThreadPool.h:273
void stop(bool clear=true)
Stop the thread pool.
Definition: ThreadPool.h:302
Logging::StreamType error() const
Shortcut for log<ERROR>()
Definition: Component.cc:65
clock::duration_rep _lockingTime
The total time spent on popping events from the output event pool.
Definition: PEPScheduler.h:88
WorkerPool _pool
< The worker thread pool
Definition: PEPScheduler.h:76
ProcessorSequenceWorker class.
Definition: PEPScheduler.cc:25
ProcessorSequenceWorker(std::shared_ptr< Sequence > sequence)
Constructor.
Definition: PEPScheduler.cc:54
std::size_t freeSlots() const
Get the number of free slots in the task queue.
Definition: ThreadPool.h:245
IScheduler interface Interface for implementing a scheduling algorithm for event processing.
Definition: IScheduler.h:23
unsigned int nthreads(const std::string &str)
Definition: Utils.h:580
std::vector< std::string > parameterNames() const
Get the list of parameter names.
void setMaxQueueSize(std::size_t maxQueueSize)
Set the maximum queue size.
Definition: ThreadPool.h:259
virtual void initialize() override
Initialize the scheduler.
Definition: IScheduler.cc:15
void start()
Start the worker threads.
Definition: ThreadPool.h:204
void setName(const std::string &n)
Set the component name.
Definition: Component.cc:29
std::size_t freeSlots() const override
Get the number of free event slots.
LoggerPtr _logger
The logger instance.
Definition: Component.h:173
const CmdLineParseResult & cmdLineParseResult() const
Get the command line parsing result (after init)
Definition: Application.cc:165
Logging::StreamType message() const
Shortcut for log<MESSAGE>()
Definition: Component.cc:53
static duration_rep time_difference(const time_point &older, const time_point &ealier)
Get the time difference between two time points.
Definition: Utils.h:88
void end() override
Terminate the scheduler activites Cleanup memory, etc ...
Definition: PEPScheduler.cc:94
Output process(Input &&event) override
Definition: PEPScheduler.cc:61
WorkerBase class Base class to implement processing of task data (so called queued-element) pushed in...
Definition: Worker.h:37
static time_point now()
Get the current time.
Definition: Utils.h:50
void processRunHeader(std::shared_ptr< RunHeader > rhdr) override
Process a run header.
std::shared_ptr< Sequence > _sequence
< The processor sequence to run in the worker thread
Definition: PEPScheduler.cc:48
#define MARLINMT_THROW(message)
Definition: Exceptions.h:8
PushResult push(PushPolicy policy, IN &&input)
Push a new task in the task queue.
Definition: ThreadPool.h:335
unsigned int _nthreads
The number of threads.
Definition: CmdLineParser.h:29
void popFinishedEvents(std::vector< std::shared_ptr< EventStore >> &events) override
Retrieve finished events from the scheduler.
ConfigSection & section(const std::string &sn)
Get a section by name.
PushResultList _pushResults
The start time.
Definition: PEPScheduler.h:80
clock::time_point _startTime
The end time.
Definition: PEPScheduler.h:82
const Application & application() const
Get the application in which the component is registered.
Definition: Component.cc:83
UIntParameter _queueSize
The scheduler event queue size.
Definition: PEPScheduler.h:92
T get() const
Get the parameter value.
Definition: Parameter.h:556
bool active() const
Whether the thread pool is active, meaning that the queue is not empty or at least one worker is acti...
Definition: ThreadPool.h:287
void addWorker(Args &&...args)
Add a new worker thread.
Definition: ThreadPool.h:192
ProcessorSequence _superSequence
The list of worker output promises.
Definition: PEPScheduler.h:78
clock::time_point _endTime
The total time spent on processing run headers.
Definition: PEPScheduler.h:84
void pushEvent(std::shared_ptr< EventStore > event) override
Push a new event to the scheduler for processing.
std::vector< std::shared_ptr< EventStore > > EventList
Definition: PEPScheduler.h:53
void initialize() override
Initialize the scheduler.
Definition: PEPScheduler.cc:83
clock::duration_rep _runHeaderTime
The total time spent on locking on thread pool queue access.
Definition: PEPScheduler.h:86
const Configuration & configuration() const
Get the main application configuration object.
Definition: Application.cc:278
bool isSet() const
Whether the parameter has been set.
Definition: Parameter.h:506