KaliVeda  1.13/01
Heavy-Ion Analysis Toolkit
KV_CCIN2P3_GE.cpp
Go to the documentation of this file.
1 //Created by KVClassFactory on Wed Apr 27 15:43:08 CEST 2011
2 //Author: John Frankland
3 
4 #include "KV_CCIN2P3_GE.h"
5 #include "TSystem.h"
6 #include "TEnv.h"
7 #include "KVDataAnalyser.h"
8 #include "KVDataAnalysisTask.h"
9 #include "KVGEBatchJob.h"
10 #include "KVDataRepository.h"
11 #include "KVDataSetAnalyser.h"
12 #include "KVSimDirAnalyser.h"
13 
14 using namespace std;
15 
17 
18 
19 
24  : KVBatchSystem(name), fMultiJobs(kTRUE)
25 {
26  //Default constructor
27  //Sets default job time, memory and disk space as defined in $KVROOT/KVFiles/.kvrootrc
28 
29  fDefJobTime = gEnv->GetValue("GE.BatchSystem.DefaultJobTime", "5:00");
30  fDefJobMem = gEnv->GetValue("GE.BatchSystem.DefaultJobMemory", "2G");
31  fDefJobDisk = gEnv->GetValue("GE.BatchSystem.DefaultJobDisk", "50M");
32  fTimeSet = fDiskSet = fMemSet = kFALSE;
33  //default number of runs per job in multi jobs mode (default=1)
34  SetRunsPerJob(gEnv->GetValue("GE.BatchSystem.RunsPerJob", 1));
35 }
36 
37 
38 
39 
42 
44 {
45  //Clear previously set parameters in order to create a new job submission command
47  fTimeSet = fDiskSet = fMemSet = kFALSE;
48  fMultiJobs = kTRUE;
49 }
50 
51 
52 
53 
56 
57 KV_CCIN2P3_GE::~KV_CCIN2P3_GE()
58 {
59  //Destructor
60 }
61 
62 
63 
67 
69 {
70  //Set CPU time for batch job.
71  // SetJobTime() => use default time
72  KVString tmp(time);
73  if (tmp == "") tmp = fDefJobTime;
74  //time given as either "hh:mm:ss" or "ss" but NOT "mm:ss"!
75  if (tmp.GetNValues(":") == 2) tmp.Prepend("0:");
76  fParList.SetValue("-l ct=", tmp);
77  fTimeSet = kTRUE;
78 }
79 
80 
81 
86 
88 {
89  //Set maximum memory used by job.
90  //Include units in string, i.e. "100M", "1G" etc.
91  //If mem="", use default value
92  KVString tmp(mem);
93  if (tmp == "") tmp = fDefJobMem;
94  fParList.SetValue("-l vmem=", tmp);
95  fMemSet = kTRUE;
96 }
97 
98 
99 
104 
106 {
107  //Set maximum disk space used by job.
108  //Include units in string, i.e. "100M", "1G" etc.
109  //If diks="", use default value
110  KVString tmp(diks);
111  if (tmp == "") tmp = fDefJobDisk;
112  fParList.SetValue("-l fsize=", tmp);
113  fDiskSet = kTRUE;
114 }
115 
116 
117 
120 
122 {
123  //Print list of owner's jobs.
124  KVList* j = GetListOfJobs();
125  j->ls();
126  delete j;
127 }
128 
129 
130 
133 
135 {
136  // Checks the job and asks for any missing parameters
137 
139 
140  if (!fTimeSet) ChooseJobTime();
141 
142  if (!fDiskSet) ChooseJobDisk();
143 
144  if (!fMemSet) ChooseJobMemory();
145 
146  return kTRUE;
147 }
148 
149 
150 
152 
154 {
155  KVString tmp = "";
156  cout << "Enter max CPU time per job (ss/mn:ss/hh:mn:ss) ["
157  << fDefJobTime << "] : ";
158  cout.flush();
159  tmp.ReadToDelim(cin);
160  if (!tmp.Length()) {
161  SetJobTime();
162  return;
163  }
164  else
165  SetJobTime(tmp);
166 }
167 
168 
169 
171 
173 {
174  KVString tmp = "";
175  cout << "Enter max memory per job (xKB/xMB/xGB) ["
176  << fDefJobMem.Data() << "] : ";
177  cout.flush();
178  tmp.ReadToDelim(cin);
179  SetJobMemory(tmp.Data());
180 }
181 
182 
183 
185 
187 {
188  KVString tmp = "";
189  cout << "Enter max scratch disk per job (xKB/xMB/xGB) ["
190  << fDefJobDisk.Data() << "] : ";
191  cout.flush();
192  tmp.ReadToDelim(cin);
193  SetJobDisk(tmp.Data());
194 }
195 
196 
197 
200 
202 {
203 // returns the parameter string corresponding to the job CPU time
204  return fParList.GetStringValue("-l ct=");
205 }
206 
207 
208 
211 
213 {
214 // returns the parameter string corresponding to the job Memory
215  return fParList.GetStringValue("-l vmem=");
216 }
217 
218 
219 
222 
224 {
225 // returns the parameter string corresponding to the job Disk
226  return fParList.GetStringValue("-l fsize=");
227 }
228 
229 
230 
231 
235 
237 {
238  //Store any useful information on batch system in the TEnv
239  //(this method is used by KVDataAnalyser::WriteBatchEnvFile)
241  env->SetValue("BatchSystem.MultiJobs", MultiJobsMode());
242  if (MultiJobsMode()) env->SetValue("BatchSystem.CurrentRunList", fCurrJobRunList.AsString());
243  env->SetValue("BatchSystem.Time", GetJobTime());
244  env->SetValue("BatchSystem.Memory", GetJobMemory());
245  env->SetValue("BatchSystem.Disk", GetJobDisk());
246  // if analysis of simulated data is being used, we copy the files to analyse to the
247  // scratch disk of the batch job (make sure enough disk space is requested)
248  env->SetValue("SimDirAnalyser.CopyFilesToWorkingDirectory", true);
249 }
250 
251 
252 
253 
257 
259 {
260  //Read any useful information on batch system from the TEnv
261  //(this method is used by KVDataAnalyser::ReadBatchEnvFile)
263  SetMultiJobsMode(env->GetValue("BatchSystem.MultiJobs", kFALSE));
264  if (MultiJobsMode()) fCurrJobRunList.SetList(env->GetValue("BatchSystem.CurrentRunList", ""));
265  SetJobTime(env->GetValue("BatchSystem.Time", ""));
266  SetJobMemory(env->GetValue("BatchSystem.Memory", ""));
267  SetJobDisk(env->GetValue("BatchSystem.Disk", ""));
268 }
269 
270 
271 
272 
276 
277 void KV_CCIN2P3_GE::Print(Option_t* option) const
278 {
279  //if option="log", print infos for batch log file
280  //if option="all", print detailed info on batch system
281  if (!strcmp(option, "log")) {
282  KVBatchSystem::Print(option);
283  cout << "* DISK_REQ: " << GetJobDisk() << " *" << endl;
284  cout << "* MEM_REQ: " << GetJobMemory() << " *" << endl;
285  }
286  else
287  KVBatchSystem::Print(option);
288 }
289 
290 
291 
292 
306 
308 {
309  // PRIVATE method called by SubmitTask() at moment of job submission.
310  // Depending on the current environment, the default job submission options
311  // may be changed by this method.
312  //
313  // This method overrides and augments KVBatchSystem::ChangeDefJobOpt (which
314  // changes the options as a function of the type of analysis task).
315  // Here we add the CCIN2P3-specific case where the job is launched from a directory
316  // on the /sps/ semi-permanent storage facility, or if the data being analysed is
317  // stored in a repository on /sps/. In this case we need to add
318  // the option '-l u_sps_indra' to the 'qsub' command (if not already in the
319  // default job options)
320  //
322  KVString taskname = da->GetAnalysisTask()->GetName();
324  Bool_t repIsSPS = rootdir.BeginsWith("/sps/");
325 
326  KVString wrkdir(gSystem->WorkingDirectory());
327  KVString oldoptions(GetDefaultJobOptions());
328 
329  if (!oldoptions.Contains("sps")) {
330  Bool_t NeedToAddSPS = wrkdir.Contains("/sps/");
331  if ((NeedToAddSPS || repIsSPS)) {
332  oldoptions += " -l sps=1";
333  SetDefaultJobOptions(oldoptions.Data());
334  Info("ChangeDefJobOpt",
335  "Your job is being launched from /sps/... zone.\nTherefore the ressource 'sps' has been declared and the number of jobs which can be treated concurrently will be limited.");
336  }
337  }
338 }
339 
340 
341 
342 
349 
351 {
352  // Batch-system dependent sanitization of jobnames
353  // Grid Engine does not allow:
354  // :
355  // Any such character appearing in the current jobname will be replaced
356  // with '_'
357 
358  fCurrJobName.ReplaceAll(":", "_");
359 }
360 
361 
362 
366 
368 {
369  // Create and fill list with KVBatchJob objects describing current jobs
370  // Delete list after use
371 
372  KVList* list_of_jobs = new KVList;
373 
374  // use qstat -r to get list of job ids and jobnames
375  TString reply = gSystem->GetFromPipe("qstat -r");
376 
377  TObjArray* lines = reply.Tokenize("\n");
378  Int_t nlines = lines->GetEntries();
379  for (Int_t line_number = 0; line_number < nlines; line_number++) {
380  TString thisLine = ((TObjString*)(*lines)[line_number])->String();
381  if (thisLine.Contains("Full jobname:")) {
382  // previous line contains job-id and status
383  TString lastLine = ((TObjString*)(*lines)[line_number - 1])->String();
384  TObjArray* bits = lastLine.Tokenize(" ");
385  Int_t jobid = ((TObjString*)(*bits)[0])->String().Atoi();
386  TString status = ((TObjString*)(*bits)[4])->String();
387  // date & time jobs started (running job) or submitted (queued job)
388  TString sdate = ((TObjString*)(*bits)[5])->String();// mm/dd/yyyy
389  TString stime = ((TObjString*)(*bits)[6])->String();// hh:mm:ss
390  Int_t dd, MM, yyyy, hh, mm, ss;
391  sscanf(sdate.Data(), "%d/%d/%d", &MM, &dd, &yyyy);
392  sscanf(stime.Data(), "%d:%d:%d", &hh, &mm, &ss);
393  KVDatime submitted(yyyy, MM, dd, hh, mm, ss);
394  delete bits;
395  bits = thisLine.Tokenize(": ");
396  TString jobname = ((TObjString*)(*bits)[2])->String();
397  delete bits;
398 
399  KVGEBatchJob* job = new KVGEBatchJob();
400  job->SetName(jobname);
401  job->SetJobID(jobid);
402  job->SetStatus(status);
403  job->SetSubmitted(submitted);
404  list_of_jobs->Add(job);
405  }
406  }
407  delete lines;
408 
409  if (!list_of_jobs->GetEntries()) return list_of_jobs;
410 
411  // use qstat -j [jobid] to get cpu and memory used and also the resource requests
412  TIter next_job(list_of_jobs);
413  KVGEBatchJob* job;
414  while ((job = (KVGEBatchJob*)next_job())) {
415 
416  // for running jobs, read in from [jobname].status file
417  // the number of events read/to read, disk used
418  if (!strcmp(job->GetStatus(), "r")) job->UpdateDiskUsedEventsRead();
419 
420  reply = gSystem->GetFromPipe(Form("qstat -j %d", job->GetJobID()));
421  lines = reply.Tokenize("\n");
422  nlines = lines->GetEntries();
423  for (Int_t line_number = 0; line_number < nlines; line_number++) {
424  TString thisLine = ((TObjString*)(*lines)[line_number])->String();
425  if (thisLine.BeginsWith("usage")) {
426  TObjArray* bits = thisLine.Tokenize("=,");
427  TString stime = ((TObjString*)(*bits)[1])->String();// hh:mm:ss or d:hh:mm:ss
428  Int_t dd, hh, mm, ss;
429  TObjArray* tmp = stime.Tokenize(":");
430  dd = 0;
431  if (tmp->GetEntries() == 4) sscanf(stime.Data(), "%d:%2d:%2d:%2d", &dd, &hh, &mm, &ss);
432  else sscanf(stime.Data(), "%2d:%2d:%2d", &hh, &mm, &ss);
433  delete tmp;
434  job->SetCPUusage((dd * 24 + hh) * 3600 + mm * 60 + ss);
435  TString smem = ((TObjString*)(*bits)[7])->String();// xxx.xxxxM
436  job->SetMemUsed(smem);
437  delete bits;
438  }
439  else if (thisLine.BeginsWith("hard resource_list:")) {
440  TObjArray* bits = thisLine.Tokenize(": ");
441  TString res = ((TObjString*)(*bits)[2])->String();//os=sl5,xrootd=1,irods=1,s_vmem=1024M,s_fsize=50M,s_cpu=36000
442  res.ReplaceAll("s_vmem", "vmem");
443  res.ReplaceAll("s_fsize", "fsize");
444  res.ReplaceAll("s_cpu", "ct");
445  job->SetResources(res);
446  TObjArray* bbits = res.Tokenize(",");
447  TIter next_res(bbits);
448  TObjString* ss;
449  while ((ss = (TObjString*)next_res())) {
450  TString g = ss->String();
451  if (g.BeginsWith("ct=")) {
452  g.Remove(0, 3);
453  job->SetCPUmax(g.Atoi());
454  }
455  else if (g.BeginsWith("vmem=")) {
456  g.Remove(0, 5);
457  job->SetMemMax(g);
458  }
459  else if (g.BeginsWith("fsize=")) {
460  g.Remove(0, 6);
461  job->SetDiskMax(g);
462  }
463  }
464  delete bits;
465  delete bbits;
466  }
467  }
468  delete lines;
469  //}
470  }
471 
472  return list_of_jobs;
473 }
474 
475 
476 
479 
481 {
482  // add option to send mail when job starts
483  fParList.SetValue("-m b", "");
484 }
485 
486 
487 
490 
492 {
493  // add option to send mail when job ends
494  fParList.SetValue("-m e", "");
495 }
496 
497 
498 
501 
502 void KV_CCIN2P3_GE::SetSendMailAddress(const char* email)
503 {
504  // set email address for notifications
505  fParList.SetValue("-M ", email);
506 }
507 
508 
509 
514 
516 {
517  //Processes the job requests for the batch system.
518  //In normal mode, this submits one job for the data analyser fAnalyser
519  //In multijobs mode, this submits one job for each run in the runlist associated to fAnalyser
520 
521  if (!CheckJobParameters()) return;
522 
523  if (MultiJobsMode()) {
524  if (fAnalyser->InheritsFrom("KVDataSetAnalyser")) {
525  //submit jobs for every GetRunsPerJob() runs in runlist
526  KVDataSetAnalyser* ana = dynamic_cast<KVDataSetAnalyser*>(fAnalyser);
527  KVNumberList runs = ana->GetRunList();
528  runs.Begin();
529  Int_t remaining_runs = runs.GetNValues();
530  fCurrJobRunList.Clear();
531  while (remaining_runs && !runs.End()) {
532  Int_t run = runs.Next();
533  remaining_runs--;
534  fCurrJobRunList.Add(run);
535  if ((fCurrJobRunList.GetNValues() == GetRunsPerJob()) || runs.End()) {
536  // submit job for GetRunsPerJob() runs (or less if we have reached end of runlist 'runs')
537  ana->SetRuns(fCurrJobRunList, kFALSE);
538  ana->SetFullRunList(runs);
539  SubmitJob();
540  fCurrJobRunList.Clear();
541  }
542  }
543  ana->SetRuns(runs, kFALSE);
544  }
545  else if (fAnalyser->InheritsFrom("KVSimDirAnalyser")) {
546  // here we understand "run" to mean "file"
547  KVSimDirAnalyser* ana = dynamic_cast<KVSimDirAnalyser*>(fAnalyser);
548  TList* file_list = ana->GetFileList();
549  Int_t remaining_runs = ana->GetNumberOfFilesToAnalyse();
550  fCurrJobRunList.Clear();
551  TList cur_file_list;
552  TObject* of;
553  TIter it(file_list);
554  Int_t file_no = 1;
555  while ((of = it())) {
556  cur_file_list.Add(of);
557  fCurrJobRunList.Add(file_no);
558  remaining_runs--;
559  file_no++;
560  if ((fCurrJobRunList.GetNValues() == GetRunsPerJob()) || (remaining_runs == 0)) {
561  // submit job for GetRunsPerJob() files (or less if we have reached end of list)
562  ana->SetFileList(&cur_file_list);
563  SubmitJob();
564  fCurrJobRunList.Clear();
565  cur_file_list.Clear();
566  }
567  }
568  ana->SetFileList(file_list);
569  }
570  }
571  else {
572  SubmitJob();
573  }
574 
575 }
576 
577 
578 
592 
594 {
595  // Fill the list with all relevant parameters for batch system,
596  // set to their default values.
597  //
598  // Parameters defined here are:
599  // JobTime [string]
600  // JobMemory [string]
601  // JobDisk [string]
602  // MultiJobsMode [bool]
603  // RunsPerJob [int]
604  // EMailOnStart [bool]
605  // EMailOnEnd [bool]
606  // EMailAddress [string]
607 
609  nl.SetValue("JobTime", fDefJobTime);
610  nl.SetValue("JobMemory", fDefJobMem);
611  nl.SetValue("JobDisk", fDefJobDisk);
612  nl.SetValue("MultiJobsMode", MultiJobsMode());
613  nl.SetValue("RunsPerJob", fRunsPerJob);
614  nl.SetValue("EMailOnStart", kFALSE);
615  nl.SetValue("EMailOnEnd", kFALSE);
616  nl.SetValue("EMailAddress", "");
617 }
618 
619 
620 
623 
625 {
626  // Use the parameters in the list to set all relevant parameters for batch system.
627 
629  SetJobTime(nl.GetStringValue("JobTime"));
630  SetJobMemory(nl.GetStringValue("JobMemory"));
631  SetJobDisk(nl.GetStringValue("JobDisk"));
632  SetMultiJobsMode(nl.GetBoolValue("MultiJobsMode"));
633  SetRunsPerJob(nl.GetIntValue("RunsPerJob"));
634  if (nl.GetTStringValue("EMailAddress") != "") {
635  if (nl.GetBoolValue("EMailOnStart")) SetSendMailOnJobStart();
636  if (nl.GetBoolValue("EMailOnEnd")) SetSendMailOnJobEnd();
637  SetSendMailAddress(nl.GetStringValue("EMailAddress"));
638  }
639 }
640 
641 
int Int_t
ClassImp(KVPartitionList) void KVPartitionList
Initialisation.
char Char_t
const Bool_t kFALSE
bool Bool_t
const Bool_t kTRUE
const char Option_t
R__EXTERN TEnv * gEnv
const char rootdir[]
char * Form(const char *fmt,...)
R__EXTERN TSystem * gSystem
void SetCPUmax(Int_t c)
Definition: KVBatchJob.h:71
void SetSubmitted(KVDatime &m)
Definition: KVBatchJob.h:43
void SetJobID(Int_t n)
Definition: KVBatchJob.h:51
virtual void UpdateDiskUsedEventsRead()
Definition: KVBatchJob.cpp:53
const Char_t * GetStatus() const
Definition: KVBatchJob.h:31
void SetMemUsed(const Char_t *m)
Definition: KVBatchJob.h:63
Int_t GetJobID() const
Definition: KVBatchJob.h:47
void SetMemMax(const Char_t *m)
Definition: KVBatchJob.h:79
void SetCPUusage(Int_t m)
Definition: KVBatchJob.h:55
void SetDiskMax(const Char_t *m)
Definition: KVBatchJob.h:87
void SetStatus(const Char_t *s)
Definition: KVBatchJob.h:35
Base class for interface to a batch job management system.
Definition: KVBatchSystem.h:77
virtual void WriteBatchEnvFile(TEnv *)
virtual void Print(Option_t *="") const
virtual void ChangeDefJobOpt(KVDataAnalyser *da)
virtual void ReadBatchEnvFile(TEnv *)
virtual void SetBatchSystemParameters(const KVNameValueList &)
Use the parameters in the list to set all relevant parameters for batch system.
virtual void GetBatchSystemParameterList(KVNameValueList &)
virtual void Clear(Option_t *opt="")
virtual Bool_t CheckJobParameters()
Checks the job and ask for the job name if needed.
Manager class which sets up and runs data analysis tasks.
virtual KVString GetRootDirectoryOfDataToAnalyse() const
KVDataAnalysisTask * GetAnalysisTask() const
Pilots user analysis of experimental data.
void SetFullRunList(const KVNumberList &nl)
void SetRuns(const KVNumberList &nl, Bool_t check=kTRUE)
const KVNumberList & GetRunList() const
Extension of TDatime to handle various useful date formats.
Definition: KVDatime.h:32
Job handled by Grid Engine batch system at CC-IN2P3.
Definition: KVGEBatchJob.h:15
void SetResources(TString r)
Definition: KVGEBatchJob.h:24
Extended TList class which owns its objects by default.
Definition: KVList.h:27
Handles lists of named parameters with different types, a list of KVNamedParameter objects.
Int_t GetIntValue(const Char_t *name) const
void SetValue(const Char_t *name, value_type value)
Bool_t GetBoolValue(const Char_t *name) const
const Char_t * GetStringValue(const Char_t *name) const
TString GetTStringValue(const Char_t *name) const
Strings used to represent a set of ranges of values.
Definition: KVNumberList.h:83
Bool_t End(void) const
Definition: KVNumberList.h:197
Int_t GetNValues() const
void Begin(void) const
Int_t Next(void) const
virtual void Add(TObject *obj)
Class piloting analyses of simulated data.
Int_t GetNumberOfFilesToAnalyse() const
TList * GetFileList() const
void SetFileList(TList *l)
Extension of ROOT TString class which allows backwards compatibility with ROOT v3....
Definition: KVString.h:72
Int_t GetNValues(TString delim) const
Definition: KVString.cpp:886
Interface to CCIN2P3 Grid Engine batch job management system.
Definition: KV_CCIN2P3_GE.h:14
virtual KVList * GetListOfJobs()
virtual void GetBatchSystemParameterList(KVNameValueList &)
void SetSendMailOnJobEnd()
add option to send mail when job ends
const Char_t * GetJobTime(void) const
returns the parameter string corresponding to the job CPU time
void PrintJobs(Option_t *opt="")
Print list of owner's jobs.
void SetJobMemory(const Char_t *h="")
void ChooseJobDisk(void)
void SetJobDisk(const Char_t *h="")
virtual void WriteBatchEnvFile(TEnv *)
virtual void SanitizeJobName() const
virtual void Print(Option_t *="") const
virtual Bool_t CheckJobParameters()
Checks the job and asks for any missing parameters.
void SetSendMailAddress(const char *)
set email address for notifications
const Char_t * GetJobDisk(void) const
returns the parameter string corresponding to the job Disk
virtual void Clear(Option_t *opt="")
Clear previously set parameters in order to create a new job submission command.
void ChooseJobMemory(void)
void ChooseJobTime(void)
void SetJobTime(const Char_t *h="")
const Char_t * GetJobMemory(void) const
returns the parameter string corresponding to the job Memory
virtual void ChangeDefJobOpt(KVDataAnalyser *)
void SetSendMailOnJobStart()
add option to send mail when job starts
virtual void SetBatchSystemParameters(const KVNameValueList &)
Use the parameters in the list to set all relevant parameters for batch system.
virtual void ReadBatchEnvFile(TEnv *)
virtual void ls(Option_t *option="") const
virtual Int_t GetEntries() const
virtual const char * GetValue(const char *name, const char *dflt) const
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=nullptr)
virtual void Add(TObject *obj)
virtual void Clear(Option_t *option="")
virtual const char * GetName() const
virtual void SetName(const char *name)
Int_t GetEntries() const
TString & String()
Ssiz_t Length() const
std::istream & ReadToDelim(std::istream &str, char delim='\n')
TObjArray * Tokenize(const TString &delim) const
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
const char * Data() const
TString & Prepend(char c, Ssiz_t rep=1)
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
TString & ReplaceAll(const char *s1, const char *s2)
virtual TString GetFromPipe(const char *command)
virtual const char * WorkingDirectory()
const long double mm
Definition: KVUnits.h:69
const long double g
masses
Definition: KVUnits.h:72
void Info(const char *location, const char *va_(fmt),...)
UInt_t GetListOfJobs(TFile *file, TList &jobdirs)
const char * String