MarlinMT  0.1.0
PEPScheduler.h
Go to the documentation of this file.
1 #ifndef MARLINMT_CONCURRENCY_PEPSCHEDULER_h
2 #define MARLINMT_CONCURRENCY_PEPSCHEDULER_h 1
3 
4 // -- marlinmt headers
5 #include <marlinmt/IScheduler.h>
6 #include <marlinmt/Logging.h>
7 #include <marlinmt/Utils.h>
9 
10 // -- std headers
11 #include <unordered_set>
12 
13 namespace marlinmt {
14 
15  class SuperSequence ;
16  class Sequence ;
17 
18  namespace concurrency {
19 
24  struct WorkerOutput {
26  std::shared_ptr<EventStore> _event {nullptr} ;
28  std::exception_ptr _exception {nullptr} ;
29  };
30 
31  //--------------------------------------------------------------------------
32  //--------------------------------------------------------------------------
33 
45  class PEPScheduler : public IScheduler {
46  public:
47  using ConditionsMap = std::map<std::string, std::string> ;
48  using InputType = std::shared_ptr<EventStore> ;
51  using ProcessorSequence = std::shared_ptr<SuperSequence> ;
52  using PushResultList = std::vector<WorkerPool::PushResult> ;
53  using EventList = std::vector<std::shared_ptr<EventStore>> ;
54  using Clock = std::chrono::steady_clock ;
55  using TimePoint = std::chrono::steady_clock::time_point ;
56 
57  public:
59  PEPScheduler() ;
60 
61  // from IScheduler interface
62  void initialize() override ;
63  void end() override ;
64  void processRunHeader( std::shared_ptr<RunHeader> rhdr ) override ;
65  void pushEvent( std::shared_ptr<EventStore> event ) override ;
66  void popFinishedEvents( std::vector<std::shared_ptr<EventStore>> &events ) override ;
67  std::size_t freeSlots() const override ;
68 
69  private:
70  void preConfigure() ;
71  void configureProcessors() ;
72  void configurePool() ;
73 
74  private:
76  WorkerPool _pool {} ;
78  ProcessorSequence _superSequence {nullptr} ;
80  PushResultList _pushResults {} ;
82  clock::time_point _startTime {} ;
84  clock::time_point _endTime {} ;
86  clock::duration_rep _runHeaderTime {0} ;
88  clock::duration_rep _lockingTime {0} ;
90  clock::duration_rep _popTime {0} ;
92  UIntParameter _queueSize {*this, "EventQueueSize", "The input event queue size (default 2*nthreads)"} ;
93  };
94 
95  }
96 
97 } // end namespace marlinmt
98 
99 #endif
std::shared_ptr< SuperSequence > ProcessorSequence
Definition: PEPScheduler.h:51
IScheduler interface Interface for implementing a scheduling algorithm for event processing.
Definition: IScheduler.h:23
std::chrono::steady_clock Clock
Definition: PEPScheduler.h:54
std::vector< WorkerPool::PushResult > PushResultList
Definition: PEPScheduler.h:52
clock_type::time_point time_point
Definition: Utils.h:31
float duration_rep
Definition: Utils.h:32
std::shared_ptr< EventStore > _event
< The input event
Definition: PEPScheduler.h:26
std::shared_ptr< EventStore > InputType
Definition: PEPScheduler.h:48
WorkerOutput struct Stores the output of a processor sequence call.
Definition: PEPScheduler.h:24
std::map< std::string, std::string > ConditionsMap
Definition: PEPScheduler.h:47
PEPScheduler class Parallel Event Processing Scheduler.
Definition: PEPScheduler.h:45
std::vector< std::shared_ptr< EventStore > > EventList
Definition: PEPScheduler.h:53
std::chrono::steady_clock::time_point TimePoint
Definition: PEPScheduler.h:55