Functions | |
def | getmaxseq |
def | getfile |
def | getfiletime |
def | listfile |
def | generatepng |
def | checkexit |
def | main |
Variables | |
tuple | dbcnf = DBConf('onsitedb') |
list | dbhost = dbcnf['host'] |
list | database = dbcnf['database'] |
list | dbuser = dbcnf['user'] |
list | dbpasswd = dbcnf['password'] |
tuple | f_start_seqnum = open('./cur_seq_num') |
tuple | seqnum_start = int(f_start_seqnum.read()) |
string | filepath = '/dyb/rawdata/spade/' |
string | ISOTIMEFORMAT = '%Y-%m-%d %X' |
string | ISOTIMEFORMAT1 = '%Y%m%d%H%M%S' |
string | pngpath = '../HistLog/' |
string | logpath = 'log/' |
string | logfile = 'log' |
string | histopath = 'histogram/' |
string | historypath = 'testdb/' |
string | adinfofile = 'ADInfo.txt' |
def job::getmaxseq | ( | ) |
Definition at line 54 of file job.py.
00054 : 00055 try: 00056 connect = MySQLdb.connect(host=dbhost, user=dbuser, passwd=dbpasswd, db=database) 00057 cursor = connect.cursor() 00058 00059 cursor.execute(" SELECT seqno, filename FROM DaqRawDataFileInfo ORDER BY seqno DESC LIMIT 1 ") 00060 row = cursor.fetchone() 00061 00062 except MySQLdb.Error, e: 00063 print "Error %d: %s" % (e.args[0], e.args[1]) 00064 sys.exit (1) 00065 00066 cursor.close() 00067 connect.close() 00068 00069 return row[0] 00070 00071 # Get file name and run number according to sequence number from database def getfile(seqno):
def job::getfile | ( | seqno | ) |
Definition at line 72 of file job.py.
00072 : 00073 try: 00074 connect = MySQLdb.connect(host=dbhost, user=dbuser, passwd=dbpasswd, db=database) 00075 cursor = connect.cursor() 00076 00077 cursor.execute(" SELECT seqno, filename, runno, transferState FROM DaqRawDataFileInfo where seqno = " + str(seqno)) 00078 row = cursor.fetchone() 00079 00080 except MySQLdb.Error, e: 00081 print "Error %d: %s" % (e.args[0], e.args[1]) 00082 sys.exit (1) 00083 00084 cursor.close() 00085 connect.close() 00086 00087 return row 00088 def getfiletime(seqno):
def job::getfiletime | ( | seqno | ) |
Definition at line 89 of file job.py.
00089 : 00090 try: 00091 connect = MySQLdb.connect(host=dbhost, user=dbuser, passwd=dbpasswd, db=database) 00092 cursor = connect.cursor() 00093 00094 cursor.execute(" SELECT timestart FROM DaqRawDataFileInfoVld where seqno = " + str(seqno)) 00095 row = cursor.fetchone() 00096 00097 except MySQLdb.Error, e: 00098 print "Error %d: %s" % (e.args[0], e.args[1]) 00099 sys.exit (1) 00100 00101 cursor.close() 00102 connect.close() 00103 00104 return row[0] 00105 00106 # Set plots structure for ODM def listfile(path, runno, f):
def job::listfile | ( | path, | ||
runno, | ||||
f | ||||
) |
Definition at line 107 of file job.py.
00107 : 00108 for i in os.listdir(path): 00109 newpath = path + '/' + i 00110 if os.path.isfile(newpath): 00111 if i.find('small') < 0 and i.find('png') > -1: 00112 spath = newpath.split('/') 00113 spath[0] = 'HistLog/run' + str(runno) 00114 newpath = '/'.join(spath) 00115 plot0 = spath[4].split('.') 00116 plot1 = plot0[0].split('_') 00117 f.write(plot1[2] + '\t' + newpath + '\n') 00118 if os.path.isdir(newpath): 00119 listfile(newpath, runno, f) 00120 00121 # Generate png files def generatepng(f,run,strhist,strfile): #strfile means some file information to be displayed
def job::generatepng | ( | f, | ||
run, | ||||
strhist, | ||||
strfile | ||||
) |
Definition at line 122 of file job.py.
00122 : #strfile means some file information to be displayed 00123 f.write('Generate png files.' + time.strftime(ISOTIMEFORMAT,time.localtime()) + '\n') 00124 00125 os.system('unbuffer root -b -l -q daya_style.C dump.C\(\\"' + strhist + '\\"\)') 00126 # os.system('unbuffer root -b -l -q HistAna.C\(\\"' + strhist + '\\"\)') 00127 00128 rootpath = 'stats' 00129 ff = open('stats/available_plots.txt', 'w') 00130 listfile(rootpath,run,ff) 00131 ff.close() 00132 00133 newpath = pngpath + 'run' + str(run) 00134 if os.path.exists(newpath): #this is not a new run 00135 os.system('rm -rf ' + newpath) 00136 if os.path.exists('stats'): 00137 os.system('mv stats ' + newpath) 00138 # if not os.path.exists(newpath+'/Raw/'): 00139 # os.mkdir(newpath+'/Raw/') 00140 # if not os.path.exists(newpath+'/Calib/'): 00141 # os.mkdir(newpath+'/Calib/') 00142 # os.system('mv Raw/*.* ' + newpath + '/Raw/') 00143 # os.system('rm Raw -rf') 00144 # if os.path.exists('Calib'): 00145 # os.system('mv Calib/*.* ' + newpath + '/Calib/') 00146 # os.system('rm Calib -rf') 00147 # else : #this is a new run 00148 # os.system('mv stats ' + newpath) 00149 # os.mkdir(newpath) 00150 # os.system('mv Raw ' + newpath) 00151 # if os.path.exists('Calib'): 00152 # os.system('mv Calib ' + newpath) 00153 00154 #temp = '\"current_hist_' + str(run) + '.root\"' 00155 00156 #Set current run number 00157 fcrn = open(pngpath + 'current_run_number', 'w') 00158 fcrn.write(str(run)) 00159 fcrn.close() 00160 00161 if strfile != '': 00162 fstrf = open(pngpath + 'current_file_name', 'w') 00163 fstrf.write(strfile) 00164 fstrf.close() 00165 00166 #Set aviabale run number 00167 if not os.path.exists('temp'): 00168 farn = open('temp', 'w') 00169 farn.write(str(run) + '\n') 00170 farn.close() 00171 os.system('cat ' + pngpath + 'available_run_number >> temp') 00172 os.system('cp temp ' + pngpath + 'available_run_number') 00173 00174 else : #delete the last line in ADInfo.txt 00175 os.system('sed -i \'$d\' ' + adinfofile) 00176 00177 #Set run by run information 00178 if os.path.exists('stat.dat'): 00179 fin = open('./stat.dat', 'r') 00180 strstat = fin.read() 00181 fadin = open(pngpath + adinfofile, 'a') 00182 fadin.write(str(run) + '#' + strstat) 00183 os.system('mv stat.dat ' + newpath) 00184 00185 # If stop flag exists, rename root file and exit
def job::checkexit | ( | f, | ||
srcfile = '' , |
||||
desfile = '' | ||||
) |
Definition at line 186 of file job.py.
00186 : 00187 if os.path.exists('stopjob'): 00188 if srcfile != '' and desfile != '' and os.path.exists(srcfile): 00189 os.system('mv ' + srcfile + ' ' + histopath + desfile) 00190 if os.path.exists('temp'): 00191 os.system('rm temp') 00192 f.write('Roger stop message, then exit.\n') 00193 f.close() 00194 sys.exit(1) 00195 f.close() 00196 def main():
def job::main | ( | ) |
Definition at line 197 of file job.py.
00197 : 00198 00199 # Open a log file 00200 f = open(logfile, 'w') 00201 f.write('########### Job start at %s ###########\n' % time.strftime(ISOTIMEFORMAT,time.localtime())) 00202 00203 # Record file transfer time, if a file is not transferred for 15 minutes, go to next record 00204 file_trans_time = 0 00205 # If a file is tagged as transferred but doesn't created for 15 minutes, go to next record 00206 file_create_time = 0 00207 waitnum = 0 00208 currentrun = -9 00209 seqnum = seqnum_start 00210 maxseq = getmaxseq() 00211 f.write('Max sequence number = %s\n' %maxseq) 00212 f.close() 00213 00214 while 1: 00215 00216 # Save current sequence number 00217 fseqnum = open('./cur_seq_num', 'w') 00218 fseqnum.write(str(seqnum)) 00219 fseqnum.close() 00220 00221 # Open log file 00222 f = open(logfile, 'aw') 00223 if waitnum == 0: 00224 f.write('--- ' + time.strftime(ISOTIMEFORMAT,time.localtime()) + ' ---\n') 00225 f.write('Current sequence number = %s\n' %seqnum) 00226 00227 # Get file name and run number according to sequence number from database 00228 fileinfo = getfile(seqnum) 00229 if not fileinfo: 00230 # Empty record 00231 if seqnum <= maxseq: 00232 f.write('Empty record, go to next\n') 00233 seqnum = seqnum + 1 00234 checkexit(f) 00235 continue 00236 00237 # Reach the latest record, wait 10 seconds then requery database 00238 # Print a log message every 10 minutes to show I'm still alive 00239 if waitnum == 0: 00240 f.write('No more record, wait ...\n') 00241 waitnum = waitnum + 1 00242 if waitnum == 60: 00243 waitnum = 0 00244 time.sleep(10) 00245 checkexit(f, 'current_hist_'+str(currentrun)+'.root', 'hist_'+str(currentrun)+'.root') 00246 continue 00247 00248 waitnum = 0 00249 00250 # Check transfer state 00251 if fileinfo[3] != 'TRANSFERRED': 00252 if file_trans_time == 90: 00253 f.write('Raw data still not transferred after 15 minutes, go to next record.\n') 00254 file_trans_time = 0 00255 seqnum = seqnum + 1 00256 else: 00257 f.write('Raw data not transferred, wait 10 seconds ...\n') 00258 time.sleep(10) 00259 file_trans_time = file_trans_time + 1 00260 checkexit(f, 'current_hist_'+str(currentrun)+'.root', 'hist_'+str(currentrun)+'.root') 00261 continue 00262 00263 filename = fileinfo[1] 00264 runnum = fileinfo[2] 00265 file = filepath + filename 00266 f.write(file+'\n') 00267 00268 # Get file index 00269 namelist = filename.split('.') 00270 fileindex = namelist[6] 00271 00272 # Check file exists or not 00273 if not os.path.isfile(file): 00274 if file_create_time == 90: 00275 f.write('Raw data still not found after 15 minutes, go to next record.\n') 00276 file_create_time = 0 00277 seqnum = seqnum + 1 00278 else: 00279 f.write('Raw data is set to transferred but can not found, wait 10 seconds ...\n') 00280 time.sleep(10) 00281 file_create_time = file_create_time + 1 00282 checkexit(f) 00283 continue 00284 00285 f.write('###########' + time.strftime(ISOTIMEFORMAT,time.localtime()) + '###########\n') 00286 f.write('Start NuWa job\n') 00287 00288 # Run PQM 00289 os.system('echo Start NuWa job: ' + file) 00290 # os.system('nuwa.py --history=off -A none -l 4 -n -1 RawReading.py ' + file) 00291 os.system('nuwa.py --history=off -A none -n -1 -l 4 --daq=on --output-stats="{\'file1\':\'hist_test.root\'}" DaqReading.py ' + file) 00292 00293 # Check if a new run 00294 if runnum == currentrun: #not a new run 00295 00296 generatepng(f,currentrun, 'hist_test.root', 'file' + fileindex + ' ' + str(getfiletime(seqnum)) + '\n') 00297 # Rename histogram file for merging 00298 os.system('mv hist_test.root hist_test1.root') 00299 00300 while 1: 00301 if not os.path.exists('hadd_finish'): 00302 f.write('wait for histogram merging\n') 00303 time.sleep(10) 00304 else: 00305 os.system('rm hadd_finish') 00306 break 00307 00308 # Merge root files for same run 00309 f.write('Merge root files for same run: current_hist_' + str(runnum) + '.root' + time.strftime(ISOTIMEFORMAT,time.localtime()) + '\n') 00310 00311 fadd = open('hadd.sh', 'w') 00312 fadd.write('#!/bin/bash\n') 00313 #fadd.write('source ~/.nuwa.bash\n') 00314 fadd.write('unbuffer hadd temp_hist.root current_hist_' + str(runnum) + '.root hist_test1.root 2>&1 > /dev/null\n') 00315 fadd.write('mv temp_hist.root current_hist_' + str(runnum) + '.root\n') 00316 fadd.write('rm hist_test1.root\n') 00317 fadd.write('echo finish merging\n') 00318 fadd.write('touch hadd_finish\n') 00319 fadd.write('echo finish touch\n') 00320 fadd.close() 00321 os.system('chmod 755 hadd.sh') 00322 os.system('./hadd.sh ') 00323 # os.system('touch hadd_finish') 00324 00325 # os.system('hadd temp_hist.root current_hist_' + str(runnum) + '.root hist_test.root') 00326 # os.system('mv temp_hist.root current_hist_' + str(runnum) + '.root') 00327 # os.system('rm hist_test.root') 00328 00329 else: 00330 f.write('This is a new run: %d\n' %runnum) 00331 while 1: 00332 if not os.path.exists('hadd_finish'): 00333 f.write('wait for histogram merging\n') 00334 time.sleep(10) 00335 else: 00336 break 00337 00338 # if not os.path.exists('hadd_finish'): 00339 # os.system('touch hadd_finish') 00340 if os.path.exists('temp'): 00341 os.system('rm temp') 00342 00343 # Rename root file of previous run 00344 if os.path.exists('current_hist_' + str(currentrun) + '.root'): 00345 f.write('Save root file for previous run:' + str(currentrun) + '.root\n') 00346 # os.system('mv current_hist_'+str(currentrun)+'.root '+'hist_test.root') 00347 generatepng(f,currentrun,'current_hist_'+str(currentrun)+'.root','') 00348 os.system('mv current_hist_'+str(currentrun)+'.root '+histopath+'hist_'+str(currentrun)+'.root') 00349 00350 currentrun = runnum 00351 generatepng(f,currentrun, 'hist_test.root', 'file' + fileindex + ' ' + str(getfiletime(seqnum)) + '\n') 00352 f.write('Save root file : current_hist_' + str(currentrun) + '.root\n') 00353 os.system('mv hist_test.root current_hist_' + str(currentrun) + '.root') 00354 00355 # Next record 00356 seqnum = seqnum + 1 00357 00358 # Doesn't reach latest record, go to next without waiting 00359 if seqnum <= maxseq: 00360 f.write('Next record\n') 00361 checkexit(f, 'current_hist_'+str(currentrun)+'.root', 'hist_'+str(currentrun)+'.root') 00362 00363 continue 00364 00365 # Reach latest record, so refresh maximum sequence number 00366 else: 00367 maxseq = getmaxseq() 00368 00369 checkexit(f, 'current_hist_'+str(currentrun)+'.root', 'hist_'+str(currentrun)+'.root') 00370 00371 main() main()
tuple job::dbcnf = DBConf('onsitedb') [static] |
list job::dbhost = dbcnf['host'] [static] |
list job::database = dbcnf['database'] [static] |
list job::dbuser = dbcnf['user'] [static] |
list job::dbpasswd = dbcnf['password'] [static] |
tuple job::f_start_seqnum = open('./cur_seq_num') [static] |
tuple job::seqnum_start = int(f_start_seqnum.read()) [static] |
string job::filepath = '/dyb/rawdata/spade/' [static] |
string job::ISOTIMEFORMAT = '%Y-%m-%d %X' [static] |
string job::ISOTIMEFORMAT1 = '%Y%m%d%H%M%S' [static] |
string job::pngpath = '../HistLog/' [static] |
string job::logpath = 'log/' [static] |
string job::logfile = 'log' [static] |
string job::histopath = 'histogram/' [static] |
string job::historypath = 'testdb/' [static] |
string job::adinfofile = 'ADInfo.txt' [static] |