KaliVeda  1.13/01
Heavy-Ion Analysis Toolkit
KVOnlineReconDataAnalyser.cpp
Go to the documentation of this file.
1 //Created by KVClassFactory on Mon Apr 1 10:16:46 2019
2 //Author: eindra
3 
5 #include "KVMultiDetArray.h"
6 #include "KVDataSet.h"
7 #include "KVReconEventSelector.h"
8 
9 #ifdef WITH_DATAFLOW
10 #include "dataflowports.h"
11 
12 #include <KVZMQMessage.h>
13 #endif
14 
16 
17 
18 
19 
24 {
25  // Default constructor
26  SetStatusUpdateInterval(2.5);
27 }
28 
29 
30 
31 
34 
36 {
37  // Destructor
38 }
39 
40 
41 
42 
47 
49 {
50  //Run the online analysis
51 
52  //make the chosen dataset the active dataset ( = gDataSet; note this also opens database
53  //and positions gDataBase & gExpDB).
54  GetDataSet()->cd();
55 
56  TString option = GetUserClassOptions();
57 
59 
60  if (!fSelector || !fSelector->InheritsFrom("TSelector")) {
61  std::cout << "The selector \"" << GetUserClass() << "\" is not valid." << std::endl;
62  std::cout << "Process aborted." << std::endl;
64  return;
65  }
66 
69 
72  if (!fSelector->IsOptGiven("dataflowhost")) {
73  Warning("SubmitTask", "Did you forget to give option 'dataflowhost' in InitAnalysis()?");
74  }
76 
77  preInitRun();
78  fSelector->InitRun();
79  postInitRun();
80 
81  zmq::context_t context(1); // for ZeroMQ communications
82  std::string zmq_spy_port = Form("tcp://%s:%d", fSelector->GetOpt("dataflowhost").Data(), PORT_EVENT_PUB);
83  zmq::socket_t pub(context, ZMQ_SUB);
84  int timeout = 500; //milliseconds
85  pub.setsockopt(ZMQ_RCVTIMEO, &timeout, sizeof(int));
86  try {
87  pub.connect(zmq_spy_port.c_str());
88  }
89  catch (zmq::error_t& e) {
90  Error("SubmitTask", "failed to connect socket: %s", e.what());
91  }
92 
93  std::cout << "Connected to EventPublisher " << zmq_spy_port << std::endl;
94  pub.setsockopt(ZMQ_SUBSCRIBE, "", 0);
95 
96  zmq::message_t event;
97 
98  int nev = 0;
99  int nevt = 0;
100  TDatime fTimeStamp;
101  Double_t fStartTime = fTimeStamp.Convert();
102  fUpdate = false;
103 
104  while (1) {
105 
106  if (pub.recv(&event)) {
107 
108  KVZMQMessage mess_event(event);
110  if ((event = mess_event.GetObject<KVReconstructedEvent>())) {
111  ++nev;
112  ++nevt;
114  preAnalysis();
115  if (!fSelector->Analysis()) break;
116  postAnalysis();
117  }
118  }
119  fTimeStamp.Set();
120  Double_t time = fTimeStamp.Convert();
121  if ((time - fStartTime) >= GetStatusUpdateInterval()) {
122  std::cout << "~" << (int)(nev / (time - fStartTime)) << " events/s. tot = " << nevt << std::endl;
123  fStartTime = time;
124  nev = 0;
125  fUpdate = true;
126  }
127 
128  }
129  fSelector->EndRun();
130 
132  delete fSelector;
133 }
134 
135 
136 
137 
139 
141 {
143 }
144 
145 
146 
149 
151 {
152  // Set minimum (trigger) multiplicity for array
153 
155 }
156 
157 
158 
166 
168 {
169  // This method returns true every fStatusUpdateInterval seconds, according to the check performed in SubmitTask
170  // You can change this value in your analysis class by calling
171  //
172  // gDataAnalyser->SetStatusUpdateInterval(...);
173  //
174  // with the required value in seconds before the processing loop starts
175 
176  if (fUpdate) {
177  fUpdate = false;
178  return true;
179  }
180  return false;
181 }
182 
183 
184 
int Int_t
KVMultiDetArray * gMultiDetArray
ClassImp(KVPartitionList) void KVPartitionList
Initialisation.
#define SafeDelete(p)
#define e(i)
bool Bool_t
double Double_t
char * Form(const char *fmt,...)
Double_t GetStatusUpdateInterval() const
virtual void postAnalysis()
const KVString & GetUserClassOptions() const
const Char_t * GetUserClass()
virtual void postInitRun()
virtual void postInitAnalysis()
TObject * GetInstanceOfUserClass(const KVString &alternative_base_class="")
const KVDataSet * GetDataSet() const
void cd() const
Definition: KVDataSet.cpp:736
virtual void InitAnalysis()
virtual void EndRun()
virtual Bool_t Analysis()
Bool_t IsOptGiven(const Char_t *option)
TString GetOpt(const Char_t *option) const
virtual void EndAnalysis()
virtual void ParseOptions()
void SetEvent(KVEvent *e)
virtual void InitRun()
virtual void SetMinimumOKMultiplicity(KVEvent *) const
static KVMultiDetArray * MakeMultiDetector(const Char_t *dataset_name, Int_t run=-1, TString classname="KVMultiDetArray")
Online analysis of reconstructed data.
void preAnalysis()
Set minimum (trigger) multiplicity for array.
virtual ~KVOnlineReconDataAnalyser()
Destructor.
Bool_t CheckStatusUpdateInterval(Int_t) const
Manages user analysis of reconstructed experimental data.
KVReconEventSelector * fSelector
the data analysis class
Base class for user analysis of reconstructed data.
KVReconstructedEvent * GetEvent() const
Event containing KVReconstructedNucleus nuclei reconstructed from hits in detectors.
Allow to send/receive ROOT/KV objects between ZeroMQ sockets.
Definition: KVZMQMessage.h:45
TObject * GetObject()
Definition: KVZMQMessage.h:81
void Set()
UInt_t Convert(Bool_t toGMT=kFALSE) const
virtual const char * GetName() const
virtual void Warning(const char *method, const char *msgfmt,...) const
virtual Bool_t InheritsFrom(const char *classname) const
virtual void Error(const char *method, const char *msgfmt,...) const
virtual void SetOption(const char *option)
const char * Data() const