20 namespace concurrency {
63 output._event = event ;
68 output._exception = std::current_exception() ;
99 error() <<
"This should never happen !!" << std::endl ;
101 message() <<
"Terminating application" << std::endl ;
108 double totalProcessorClock {0.0} ;
109 double totalApplicationClock {0.0} ;
111 auto summary =
_superSequence->sequence(i)->clockMeasureSummary() ;
112 totalProcessorClock += summary._procClock ;
113 totalApplicationClock += summary._appClock ;
115 const double speedup = totalProcessorClock / parallelTime ;
116 const double lockTimeFraction = ((totalApplicationClock - totalProcessorClock) / totalApplicationClock) * 100. ;
117 message() <<
"---------------------------------------------------" << std::endl ;
118 message() <<
"-- Threading summary" << std::endl ;
120 message() <<
"-- Speedup (serial/parallel): " << totalProcessorClock <<
" / " << parallelTime <<
" = " << speedup << std::endl ;
122 double speedupPercent = (speedup - 1) * 100 / static_cast<double>(
_superSequence->size() - 1 ) ;
123 if( speedupPercent < 0 ) {
124 speedupPercent = 0. ;
126 message() <<
"-- Speedup percentage: " << speedupPercent <<
" " <<
'%' << std::endl ;
130 message() <<
"-- Lock time fraction: " << lockTimeFraction <<
" %" << std::endl ;
131 message() <<
"---------------------------------------------------" << std::endl ;
145 log<DEBUG5>() <<
"PEPScheduler configureProcessors ..." << std::endl ;
151 if ( activeProcessors.empty() ) {
155 for (
size_t i=0 ; i<activeProcessors.size() ; ++i ) {
156 auto procName = activeProcessors[ i ] ;
157 log<DEBUG5>() <<
"Active processor " << procName << std::endl ;
158 auto &procSection = procsSection.section( procName ) ;
165 log<DEBUG5>() <<
"configureProcessors ... DONE" << std::endl ;
172 log<DEBUG5>() <<
"configurePool ..." << std::endl ;
173 log<DEBUG5>() <<
"Number of workers: " <<
_superSequence->size() << std::endl ;
175 log<DEBUG>() <<
"Adding worker ..." << std::endl ;
178 log<DEBUG5>() <<
"starting thread pool" << std::endl ;
185 log<DEBUG5>() <<
"configurePool ... DONE" << std::endl ;
200 std::this_thread::sleep_for( std::chrono::microseconds(10) ) ;
205 _runHeaderTime += clock::time_difference<clock::seconds>( rhdrStart, rhdrEnd ) ;
215 _lockingTime += clock::elapsed_since<clock::milliseconds>( start ) ;
224 const bool finished = (iter->second.wait_for(std::chrono::seconds(0)) == std::future_status::ready) ;
226 auto output = iter->second.get() ;
228 if(
nullptr != output._exception ) {
229 std::rethrow_exception( output._exception ) ;
231 message() <<
"Finished event uid " << output._event->uid() << std::endl ;
232 events.push_back( output._event ) ;
238 _popTime += clock::elapsed_since<clock::milliseconds>( start ) ;
void setAcceptPush(bool accept)
Set whether the thread pool accept data push.
void stop(bool clear=true)
Stop the thread pool.
Logging::StreamType error() const
Shortcut for log<ERROR>()
clock::duration_rep _lockingTime
The total time spent on popping events from the output event pool.
WorkerPool _pool
< The worker thread pool
ProcessorSequenceWorker class.
ProcessorSequenceWorker(std::shared_ptr< Sequence > sequence)
Constructor.
std::size_t freeSlots() const
Get the number of free slots in the task queue.
IScheduler interface Interface for implementing a scheduling algorithm for event processing.
unsigned int nthreads(const std::string &str)
std::vector< std::string > parameterNames() const
Get the list of parameter names.
void setMaxQueueSize(std::size_t maxQueueSize)
Set the maximum queue size.
virtual void initialize() override
Initialize the scheduler.
void start()
Start the worker threads.
void setName(const std::string &n)
Set the component name.
std::size_t freeSlots() const override
Get the number of free event slots.
LoggerPtr _logger
The logger instance.
const CmdLineParseResult & cmdLineParseResult() const
Get the command line parsing result (after init)
Logging::StreamType message() const
Shortcut for log<MESSAGE>()
static duration_rep time_difference(const time_point &older, const time_point &ealier)
Get the time difference between two time points.
void end() override
Terminate the scheduler activites Cleanup memory, etc ...
PEPScheduler()
Constructor.
Output process(Input &&event) override
WorkerBase class Base class to implement processing of task data (so called queued-element) pushed in...
static time_point now()
Get the current time.
void processRunHeader(std::shared_ptr< RunHeader > rhdr) override
Process a run header.
~ProcessorSequenceWorker()=default
clock::duration_rep _popTime
std::shared_ptr< Sequence > _sequence
< The processor sequence to run in the worker thread
#define MARLINMT_THROW(message)
PushResult push(PushPolicy policy, IN &&input)
Push a new task in the task queue.
unsigned int _nthreads
The number of threads.
void popFinishedEvents(std::vector< std::shared_ptr< EventStore >> &events) override
Retrieve finished events from the scheduler.
ConfigSection & section(const std::string &sn)
Get a section by name.
PushResultList _pushResults
The start time.
clock::time_point _startTime
The end time.
void configureProcessors()
const Application & application() const
Get the application in which the component is registered.
UIntParameter _queueSize
The scheduler event queue size.
T get() const
Get the parameter value.
bool active() const
Whether the thread pool is active, meaning that the queue is not empty or at least one worker is acti...
void addWorker(Args &&...args)
Add a new worker thread.
PEPScheduler::InputType Input
ProcessorSequence _superSequence
The list of worker output promises.
PEPScheduler::OutputType Output
clock::time_point _endTime
The total time spent on processing run headers.
void pushEvent(std::shared_ptr< EventStore > event) override
Push a new event to the scheduler for processing.
std::vector< std::shared_ptr< EventStore > > EventList
void initialize() override
Initialize the scheduler.
clock::duration_rep _runHeaderTime
The total time spent on locking on thread pool queue access.
const Configuration & configuration() const
Get the main application configuration object.
bool isSet() const
Whether the parameter has been set.