| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

job.py

Go to the documentation of this file.
00001 #!/home/dyb/dybsw/NuWa/external/Python/2.5.4/i686-slc4-gcc34-dbg/bin/python2.5
00002 
00003 import os, sys, time, string
00004 import MySQLdb
00005 
00006 # Configure database
00007 #os.environ.setdefault("DBCONF","onsitedb")
00008 from DybPython import DBConf
00009 dbcnf = DBConf('onsitedb')
00010 dbhost = dbcnf['host']
00011 database = dbcnf['database']
00012 dbuser = dbcnf['user']
00013 dbpasswd = dbcnf['password']
00014 
00015 # Define start sequence number
00016 f_start_seqnum = open('./cur_seq_num')
00017 seqnum_start = int(f_start_seqnum.read())
00018 f_start_seqnum.close()
00019 
00020 # Defile raw data path
00021 filepath = '/dyb/rawdata/spade/'
00022 
00023 # Set time format
00024 ISOTIMEFORMAT='%Y-%m-%d %X'
00025 ISOTIMEFORMAT1='%Y%m%d%H%M%S'
00026   
00027 # Defile folder for png files
00028 pngpath = '../HistLog/'
00029 if not os.path.exists(pngpath):
00030     os.mkdir(pngpath)
00031 
00032 # Define log file
00033 logpath = pngpath + 'log/'
00034 if not os.path.exists(logpath):
00035     os.mkdir(logpath)
00036 logfile = logpath + 'log' + time.strftime(ISOTIMEFORMAT1,time.localtime())
00037 
00038 # Create folder for histograms
00039 histopath = pngpath + 'histogram/'
00040 if not os.path.exists(histopath):
00041     os.mkdir(histopath)
00042     
00043 # Create folder for history information
00044 historypath = pngpath + 'testdb/'
00045 if not os.path.exists(historypath):
00046     os.mkdir(historypath)
00047 adinfofile = historypath + 'ADInfo.txt'
00048 
00049 # Create a file to trigger histogram merging thread
00050 if not os.path.exists('hadd_finish'):
00051     os.system('touch hadd_finish')
00052 
00053 # Get maximum sequence number from database
00054 def getmaxseq():
00055     try:
00056         connect = MySQLdb.connect(host=dbhost, user=dbuser, passwd=dbpasswd, db=database)
00057         cursor = connect.cursor()
00058       
00059         cursor.execute(" SELECT seqno, filename FROM DaqRawDataFileInfo ORDER BY seqno DESC LIMIT 1 ")
00060         row = cursor.fetchone()
00061 
00062     except MySQLdb.Error, e:
00063         print "Error %d: %s" % (e.args[0], e.args[1])
00064         sys.exit (1)
00065 
00066     cursor.close()
00067     connect.close()
00068 
00069     return row[0]
00070     
00071 # Get file name and run number according to sequence number from database
00072 def getfile(seqno):
00073     try:
00074         connect = MySQLdb.connect(host=dbhost, user=dbuser, passwd=dbpasswd, db=database)
00075         cursor = connect.cursor()
00076       
00077         cursor.execute(" SELECT seqno, filename, runno, transferState FROM DaqRawDataFileInfo where seqno = " + str(seqno))
00078         row = cursor.fetchone()
00079 
00080     except MySQLdb.Error, e:
00081         print "Error %d: %s" % (e.args[0], e.args[1])
00082         sys.exit (1)
00083 
00084     cursor.close()
00085     connect.close()
00086     
00087     return row
00088 
00089 def getfiletime(seqno):
00090     try:
00091         connect = MySQLdb.connect(host=dbhost, user=dbuser, passwd=dbpasswd, db=database)
00092         cursor = connect.cursor()
00093 
00094         cursor.execute(" SELECT timestart FROM DaqRawDataFileInfoVld where seqno = " + str(seqno))
00095         row = cursor.fetchone()
00096 
00097     except MySQLdb.Error, e:
00098         print "Error %d: %s" % (e.args[0], e.args[1])
00099         sys.exit (1)
00100 
00101     cursor.close()
00102     connect.close()
00103 
00104     return row[0]
00105 
00106 # Set plots structure for ODM
00107 def listfile(path, runno, f):
00108     for i in os.listdir(path):
00109         newpath = path + '/' + i
00110         if os.path.isfile(newpath):
00111             if i.find('small') < 0 and i.find('png') > -1:
00112                 spath = newpath.split('/')
00113                 spath[0] = 'HistLog/run' + str(runno)
00114                 newpath = '/'.join(spath)
00115                 plot0 = spath[4].split('.')
00116                 plot1 = plot0[0].split('_')
00117                 f.write(plot1[2] + '\t' + newpath + '\n')
00118         if os.path.isdir(newpath):
00119             listfile(newpath, runno, f)
00120 
00121 # Generate png files
00122 def generatepng(f,run,strhist,strfile): #strfile means some file information to be displayed
00123     f.write('Generate png files.' + time.strftime(ISOTIMEFORMAT,time.localtime()) + '\n')
00124     
00125     os.system('unbuffer root -b -l -q daya_style.C dump.C\(\\"' + strhist + '\\"\)')
00126 #    os.system('unbuffer root -b -l -q HistAna.C\(\\"' + strhist + '\\"\)')
00127 
00128     rootpath = 'stats'
00129     ff = open('stats/available_plots.txt', 'w')
00130     listfile(rootpath,run,ff)
00131     ff.close()
00132   
00133     newpath = pngpath + 'run' + str(run)
00134     if os.path.exists(newpath): #this is not a new run
00135         os.system('rm -rf ' + newpath)
00136     if os.path.exists('stats'):
00137         os.system('mv stats ' + newpath)
00138 #        if not os.path.exists(newpath+'/Raw/'):
00139 #            os.mkdir(newpath+'/Raw/')
00140 #        if not os.path.exists(newpath+'/Calib/'):
00141 #            os.mkdir(newpath+'/Calib/')
00142 #        os.system('mv Raw/*.* ' + newpath + '/Raw/')
00143 #        os.system('rm Raw -rf')
00144 #        if os.path.exists('Calib'):
00145 #            os.system('mv Calib/*.* ' + newpath + '/Calib/')
00146 #            os.system('rm Calib -rf')
00147 #    else :   #this is a new run
00148 #        os.system('mv stats ' + newpath)
00149 #        os.mkdir(newpath)
00150 #        os.system('mv Raw ' + newpath)
00151 #        if os.path.exists('Calib'):
00152 #            os.system('mv Calib ' + newpath)
00153 
00154 #temp = '\"current_hist_' + str(run) + '.root\"'
00155 
00156     #Set current run number
00157     fcrn = open(pngpath + 'current_run_number', 'w')
00158     fcrn.write(str(run))
00159     fcrn.close()
00160 
00161     if strfile != '':
00162         fstrf = open(pngpath + 'current_file_name', 'w')
00163         fstrf.write(strfile)
00164         fstrf.close()
00165 
00166     #Set aviabale run number
00167     if not os.path.exists('temp'):
00168         farn = open('temp', 'w')
00169         farn.write(str(run) + '\n')
00170         farn.close()
00171         os.system('cat ' + pngpath + 'available_run_number >> temp')
00172         os.system('cp temp ' + pngpath + 'available_run_number')
00173 
00174     else :  #delete the last line in ADInfo.txt
00175         os.system('sed -i \'$d\' ' + adinfofile)
00176         
00177     #Set run by run information
00178     if os.path.exists('stat.dat'):
00179         fin = open('./stat.dat', 'r')
00180         strstat = fin.read()
00181         fadin = open(pngpath + adinfofile, 'a')
00182         fadin.write(str(run) + '#' + strstat)
00183         os.system('mv stat.dat ' + newpath)
00184 
00185 # If stop flag exists, rename root file and exit
00186 def checkexit(f, srcfile='', desfile=''):
00187     if os.path.exists('stopjob'):
00188         if srcfile != '' and desfile != '' and os.path.exists(srcfile):
00189             os.system('mv ' + srcfile + ' ' + histopath + desfile)
00190         if os.path.exists('temp'):
00191             os.system('rm temp')
00192         f.write('Roger stop message, then exit.\n')
00193         f.close()
00194         sys.exit(1)
00195     f.close()
00196 
00197 def main():
00198   
00199     # Open a log file
00200     f = open(logfile, 'w')
00201     f.write('########### Job start at %s ###########\n' % time.strftime(ISOTIMEFORMAT,time.localtime()))
00202   
00203     # Record file transfer time, if a file is not transferred for 15 minutes, go to next record
00204     file_trans_time = 0
00205     # If a file is tagged as transferred but doesn't created for 15 minutes, go to next record
00206     file_create_time = 0
00207     waitnum = 0
00208     currentrun = -9
00209     seqnum = seqnum_start
00210     maxseq = getmaxseq()
00211     f.write('Max sequence number = %s\n' %maxseq)
00212     f.close()
00213 
00214     while 1:
00215 
00216         # Save current sequence number
00217         fseqnum = open('./cur_seq_num', 'w')
00218         fseqnum.write(str(seqnum))
00219         fseqnum.close()
00220 
00221         # Open log file
00222         f = open(logfile, 'aw')
00223         if waitnum == 0:
00224             f.write('--- ' + time.strftime(ISOTIMEFORMAT,time.localtime()) + ' ---\n')
00225             f.write('Current sequence number = %s\n' %seqnum)
00226         
00227         # Get file name and run number according to sequence number from database
00228         fileinfo = getfile(seqnum)
00229         if not fileinfo:
00230             # Empty record
00231             if seqnum <= maxseq:
00232                 f.write('Empty record, go to next\n')
00233                 seqnum = seqnum + 1 
00234                 checkexit(f)
00235                 continue
00236 
00237             # Reach the latest record, wait 10 seconds then requery database
00238             # Print a log message every 10 minutes to show I'm still alive
00239             if waitnum == 0:
00240                 f.write('No more record, wait ...\n')
00241             waitnum = waitnum + 1
00242             if waitnum == 60:
00243                 waitnum = 0
00244             time.sleep(10)
00245             checkexit(f, 'current_hist_'+str(currentrun)+'.root', 'hist_'+str(currentrun)+'.root')
00246             continue
00247 
00248         waitnum = 0
00249 
00250         # Check transfer state
00251         if fileinfo[3] != 'TRANSFERRED':
00252             if file_trans_time == 90:
00253                 f.write('Raw data still not transferred after 15 minutes, go to next record.\n')
00254                 file_trans_time = 0
00255                 seqnum = seqnum + 1
00256             else:
00257                 f.write('Raw data not transferred, wait 10 seconds ...\n')
00258                 time.sleep(10)
00259             file_trans_time = file_trans_time + 1
00260             checkexit(f, 'current_hist_'+str(currentrun)+'.root', 'hist_'+str(currentrun)+'.root')
00261             continue
00262 
00263         filename = fileinfo[1]
00264         runnum = fileinfo[2]
00265         file = filepath + filename
00266         f.write(file+'\n')
00267         
00268         # Get file index
00269         namelist = filename.split('.')
00270         fileindex = namelist[6]
00271                         
00272         # Check file exists or not
00273         if not os.path.isfile(file):
00274             if file_create_time == 90:
00275                 f.write('Raw data still not found after 15 minutes, go to next record.\n')
00276                 file_create_time = 0
00277                 seqnum = seqnum + 1
00278             else:
00279                 f.write('Raw data is set to transferred but can not found, wait 10 seconds ...\n')
00280                 time.sleep(10)
00281             file_create_time = file_create_time + 1
00282             checkexit(f)
00283             continue
00284       
00285         f.write('###########' + time.strftime(ISOTIMEFORMAT,time.localtime()) + '###########\n')
00286         f.write('Start NuWa job\n')
00287         
00288         # Run PQM
00289         os.system('echo Start NuWa job: ' + file)
00290 #        os.system('nuwa.py --history=off -A none -l 4 -n -1 RawReading.py ' + file)
00291         os.system('nuwa.py --history=off -A none -n -1 -l 4 --daq=on --output-stats="{\'file1\':\'hist_test.root\'}" DaqReading.py ' + file)
00292         
00293         # Check if a new run
00294         if runnum == currentrun:  #not a new run
00295 
00296             generatepng(f,currentrun, 'hist_test.root', 'file' + fileindex + ' ' + str(getfiletime(seqnum)) + '\n')
00297             # Rename histogram file for merging
00298             os.system('mv hist_test.root hist_test1.root')
00299 
00300             while 1:
00301                 if not os.path.exists('hadd_finish'):
00302                     f.write('wait for histogram merging\n')
00303                     time.sleep(10)
00304                 else:
00305                     os.system('rm hadd_finish')
00306                     break
00307 
00308             # Merge root files for same run
00309             f.write('Merge root files for same run: current_hist_' + str(runnum) + '.root' + time.strftime(ISOTIMEFORMAT,time.localtime()) + '\n')
00310 
00311             fadd = open('hadd.sh', 'w')
00312             fadd.write('#!/bin/bash\n')
00313 #fadd.write('source ~/.nuwa.bash\n')
00314             fadd.write('unbuffer hadd temp_hist.root current_hist_' + str(runnum) + '.root hist_test1.root 2>&1 > /dev/null\n')
00315             fadd.write('mv temp_hist.root current_hist_' + str(runnum) + '.root\n')
00316             fadd.write('rm hist_test1.root\n')
00317             fadd.write('echo finish merging\n')
00318             fadd.write('touch hadd_finish\n')
00319             fadd.write('echo finish touch\n')
00320             fadd.close()
00321             os.system('chmod 755 hadd.sh')
00322             os.system('./hadd.sh ')
00323 #            os.system('touch hadd_finish')
00324             
00325 #            os.system('hadd temp_hist.root current_hist_' + str(runnum) + '.root hist_test.root')
00326 #            os.system('mv temp_hist.root current_hist_' + str(runnum) + '.root')
00327 #            os.system('rm hist_test.root')
00328             
00329         else:
00330             f.write('This is a new run: %d\n' %runnum)
00331             while 1:
00332                 if not os.path.exists('hadd_finish'):
00333                     f.write('wait for histogram merging\n')
00334                     time.sleep(10)
00335                 else:
00336                     break
00337 
00338 #            if not os.path.exists('hadd_finish'):
00339 #                os.system('touch hadd_finish')
00340             if os.path.exists('temp'):
00341                 os.system('rm temp')
00342 
00343             # Rename root file of previous run
00344             if os.path.exists('current_hist_' + str(currentrun) + '.root'):
00345                 f.write('Save root file for previous run:' + str(currentrun) + '.root\n')
00346 #                os.system('mv current_hist_'+str(currentrun)+'.root '+'hist_test.root')
00347                 generatepng(f,currentrun,'current_hist_'+str(currentrun)+'.root','')
00348                 os.system('mv current_hist_'+str(currentrun)+'.root '+histopath+'hist_'+str(currentrun)+'.root')
00349                 
00350             currentrun = runnum
00351             generatepng(f,currentrun, 'hist_test.root', 'file' + fileindex + ' ' + str(getfiletime(seqnum)) + '\n')
00352             f.write('Save root file : current_hist_' + str(currentrun) + '.root\n')
00353             os.system('mv hist_test.root current_hist_' + str(currentrun) + '.root')
00354 
00355         # Next record
00356         seqnum = seqnum + 1 
00357 
00358         # Doesn't reach latest record, go to next without waiting
00359         if seqnum <= maxseq:
00360             f.write('Next record\n')
00361             checkexit(f, 'current_hist_'+str(currentrun)+'.root', 'hist_'+str(currentrun)+'.root')
00362   
00363             continue
00364             
00365         # Reach latest record, so refresh maximum sequence number
00366         else:
00367             maxseq = getmaxseq()
00368 
00369         checkexit(f, 'current_hist_'+str(currentrun)+'.root', 'hist_'+str(currentrun)+'.root')
00370 
00371 main()
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Mon Apr 11 20:26:19 2011 for DQMRawData by doxygen 1.4.7