Public Member Functions | |
def | __init__ |
def | initializeOptions |
def | parseOptions |
def | run |
def | runOneJob |
def | runBatch |
def | runCommand |
def | sequencesInRun |
def | processSequence |
def | addSequenceStats |
def | generateFigures |
def | updateRunIndex |
def | updateFullIndex |
def | clearSequence |
def | clearSequenceStats |
def | clearSummary |
def | printState |
def | resolveRunSequenceFile |
def | makeJobCommand |
def | ensureDirectories |
def | setLock |
def | isLocked |
def | removeLock |
def | lockStats |
def | statsLocked |
def | unlockStats |
def | lockRun |
def | runLocked |
def | unlockRun |
def | lockRunIndex |
def | runIndexLocked |
def | unlockRunIndex |
def | getState |
def | setState |
def | setStateInFile |
def | getStateFromFile |
def | lastLine |
Class which manages NuWa jobs
Definition at line 14 of file Manager.py.
def Manager::Manager::__init__ | ( | self | ) |
Constructor
Definition at line 16 of file Manager.py.
00016 : 00017 """Constructor""" 00018 self.jobConfig = None 00019 self.siteConfig = None 00020 self.stateSep=',' 00021 self.stateNParts=2 00022 self.initializeOptions() 00023 return 00024 def initializeOptions(self):
def Manager::Manager::initializeOptions | ( | self | ) |
Add options to command line parser
Definition at line 25 of file Manager.py.
00025 : 00026 """ Add options to command line parser """ 00027 from optparse import OptionParser 00028 from DybPython.hostid import hostid 00029 parser = OptionParser(usage=self.__doc__) 00030 00031 parser.add_option("-S", "--summarize-run",action="store_true", 00032 default=False, help="Summarize run histograms") 00033 00034 parser.add_option("", "--print-state",action="store_true", 00035 default=False, help="Print the job state of a run") 00036 00037 parser.add_option("", "--clear-sequence",action="store_true", 00038 default=False, help="Clear NuWa Job state for reprocessing") 00039 00040 parser.add_option("", "--clear-sequence-stats",action="store_true", 00041 default=False, help="Clear Add Stats state for readding") 00042 00043 parser.add_option("", "--clear-summary",action="store_true", 00044 default=False, help="Clear run summary for reprocessing") 00045 00046 parser.add_option("", "--run-nuwa",action="store_true", 00047 default=False, help="Process NuWa Job") 00048 00049 parser.add_option("", "--add-stats",action="store_true", 00050 default=False, help="Add root file to run total") 00051 00052 parser.add_option("", "--make-figures",action="store_true", 00053 default=False, help="Make figures for this job/run") 00054 00055 parser.add_option("", "--xml-index",action="store_true", 00056 default=False, help="Make XML index for this job/run") 00057 00058 parser.add_option("", "--xml-add-run",action="store_true", 00059 default=False, help="Add run to site XML index") 00060 00061 parser.add_option("-j", "--job-name",default=None,type="string", 00062 help="Name of NuWa Job") 00063 00064 parser.add_option("-r", "--run-number",default=None,type="int", 00065 help="Run Number for NuWa Job") 00066 00067 parser.add_option("-A", "--all-sequences",action="store_true", 00068 default=False, help="Process all sequences in run") 00069 00070 parser.add_option("-s", "--sequence",default=None,type="int", 00071 help="File Sequence Number for NuWa Job") 00072 00073 parser.add_option("-f", "--filename",default=None,type="string", 00074 help="Data file name for current job") 00075 00076 parser.add_option("-l", "--log-level",default=3,type="int", 00077 help="Set output log level") 00078 00079 parser.add_option("-b", "--batch",action="store_true",default=False, 00080 help="Send this job to the batch queue") 00081 00082 parser.add_option("-c", "--cluster",default="pdsf",type="string", 00083 help="Use the configuration for this cluster") 00084 00085 parser.add_option("-d", "--dry-run",action="store_true",default=False, 00086 help="Debug run; don't run any actual jobs") 00087 00088 self.opts_parser = parser 00089 00090 return 00091 00092 def parseOptions(self, argv):
def Manager::Manager::parseOptions | ( | self, | ||
argv | ||||
) |
Parse command line options to given to Manager
Definition at line 93 of file Manager.py.
00093 : 00094 """Parse command line options to given to Manager""" 00095 self.argv = argv 00096 (options, args) = self.opts_parser.parse_args(args=argv) 00097 self.opts = options 00098 self.args = args 00099 00100 # Translate compound option 00101 if self.opts.summarize_run: 00102 self.opts.make_figures = True 00103 self.opts.xml_index = True 00104 self.opts.xml_add_run = True 00105 00106 if self.opts.filename!=None or (self.opts.run_number!=None 00107 and self.opts.sequence!=None): 00108 # Check for consistency of file, run, sequence 00109 # and fill out any unset parts if possible 00110 [status, run, seq, fname] = self.resolveRunSequenceFile( 00111 self.opts.run_number, 00112 self.opts.sequence, 00113 self.opts.filename) 00114 if status != Status.SUCCESS: return status 00115 self.opts.run_number = run 00116 self.opts.sequence = seq 00117 self.opts.filename = fname 00118 00119 # Check consistentcy of command line arguments 00120 if (self.opts.run_nuwa 00121 or self.opts.add_stats 00122 or self.opts.make_figures 00123 or self.opts.xml_index 00124 or self.opts.xml_add_run 00125 or self.opts.clear_sequence 00126 or self.opts.clear_sequence_stats 00127 or self.opts.clear_summary 00128 or self.opts.print_state): 00129 if self.opts.run_number==None: 00130 print "Error: No run number has been found or provided" 00131 return Status.FAILURE 00132 if (self.opts.run_nuwa 00133 or self.opts.add_stats 00134 or self.opts.make_figures 00135 or self.opts.xml_index 00136 or self.opts.clear_sequence 00137 or self.opts.clear_sequence_stats 00138 or self.opts.clear_summary 00139 or self.opts.print_state): 00140 if self.opts.job_name==None: 00141 print "Error: A job name must be provided: use '-j'" 00142 return Status.FAILURE 00143 if jobConfigs.has_key(self.opts.job_name): 00144 self.jobConfig = jobConfigs[self.opts.job_name] 00145 else: 00146 print "Error: Unknown job name '%s'" % self.opts.job_name 00147 return Status.FAILURE 00148 if (self.opts.run_nuwa 00149 or self.opts.clear_sequence 00150 or self.opts.clear_sequence_stats) and not self.opts.all_sequences: 00151 if self.opts.sequence==None: 00152 print "Error: No sequence number has been found or provided" 00153 return Status.FAILURE 00154 if self.opts.filename==None: 00155 print "Error: No data filename has been found or provided" 00156 return Status.FAILURE 00157 00158 if self.opts.all_sequences: 00159 if self.opts.sequence!=None: 00160 print "Error: Do not set sequence when processing all sequences" 00161 return Status.FAILURE 00162 if self.opts.filename!=None: 00163 print "Error: Do not set filename when processing all sequences" 00164 return Status.FAILURE 00165 if not (self.opts.run_nuwa 00166 or self.opts.clear_sequence 00167 or self.opts.clear_sequence_stats): 00168 print "Error: Requesting 'all-sequences' but not 'run-nuwa', 'clear-sequence', or 'clear-sequence-stats'?" 00169 return Status.FAILURE 00170 00171 if siteConfigs.has_key(self.opts.cluster): 00172 self.siteConfig = siteConfigs[self.opts.cluster] 00173 else: 00174 print "Error: Unknown computer cluster '%s'; add to SiteConfigs" % self.opts.cluster 00175 return Status.FAILURE 00176 00177 return Status.SUCCESS 00178 def run(self):
def Manager::Manager::run | ( | self | ) |
Run the manager
Definition at line 179 of file Manager.py.
00179 : 00180 """ Run the manager """ 00181 import os 00182 os.umask(2) 00183 if self.opts.all_sequences: 00184 # Loop over all sequences in run, submit one job for each 00185 import DybPython.Catalog as Catalog 00186 fileList = Catalog.runs[self.opts.run_number] 00187 for fname in fileList: 00188 fileDesc = FileDescription(fname) 00189 if not fileDesc.isValid: 00190 print "Skipping invalid file: ",fname 00191 continue 00192 self.opts.filename = fname 00193 self.opts.sequence = fileDesc.sequence 00194 status = self.runOneJob() 00195 self.opts.filename = None 00196 self.opts.sequence = None 00197 if status != Status.SUCCESS: return status 00198 else: 00199 status = self.runOneJob() 00200 if status != Status.SUCCESS: return status 00201 return Status.SUCCESS 00202 def runOneJob(self):
def Manager::Manager::runOneJob | ( | self | ) |
Run a single job
Definition at line 203 of file Manager.py.
00203 : 00204 """ Run a single job """ 00205 if self.opts.batch: 00206 # Catch batch submissions, and route to batch queue 00207 return self.runBatch(self.opts.job_name, 00208 self.opts.run_number, 00209 self.opts.sequence) 00210 # Not a batch job, so start working 00211 if self.opts.print_state: 00212 # Print the state for this job/run/seq 00213 status = self.printState(self.opts.job_name, 00214 self.opts.run_number, 00215 self.opts.sequence) 00216 if status != Status.SUCCESS: return status 00217 if self.opts.clear_sequence: 00218 # Clear one sequence state for reprocessing 00219 status = self.clearSequence(self.opts.job_name, 00220 self.opts.run_number, 00221 self.opts.sequence) 00222 if status != Status.SUCCESS: return status 00223 if self.jobConfig.hasStats: 00224 status = self.clearSequenceStats(self.opts.job_name, 00225 self.opts.run_number, 00226 self.opts.sequence) 00227 if status != Status.SUCCESS: return status 00228 if self.opts.clear_sequence_stats: 00229 # Clear one sequence stats for reprocessing 00230 if self.jobConfig.hasStats: 00231 status = self.clearSequenceStats(self.opts.job_name, 00232 self.opts.run_number, 00233 self.opts.sequence) 00234 if status != Status.SUCCESS: return status 00235 if self.opts.clear_summary: 00236 # Clear summary for reprocessing 00237 status = self.clearSummary(self.opts.job_name, self.opts.run_number) 00238 if status != Status.SUCCESS: return status 00239 if self.opts.run_nuwa: 00240 # Process one nuwa job 00241 status = self.processSequence(self.opts.job_name, 00242 self.opts.run_number, 00243 self.opts.sequence, 00244 self.opts.filename) 00245 if status != Status.SUCCESS: return status 00246 if self.opts.add_stats: 00247 # Add root file to run total 00248 status = self.addSequenceStats(self.opts.job_name, 00249 self.opts.run_number, 00250 self.opts.sequence) 00251 if status != Status.SUCCESS: return status 00252 if self.opts.make_figures: 00253 # Generate current figures for this job 00254 status = self.generateFigures(self.opts.job_name, 00255 self.opts.run_number) 00256 if status != Status.SUCCESS: return status 00257 if self.opts.xml_index: 00258 # Update run xml index for this job 00259 status = self.updateRunIndex(self.opts.job_name, 00260 self.opts.run_number) 00261 if status != Status.SUCCESS: return status 00262 if self.opts.xml_add_run: 00263 # Add this run to the total index if needed 00264 status = self.updateFullIndex(self.opts.run_number) 00265 if status != Status.SUCCESS: return status 00266 return Status.SUCCESS 00267 def runBatch(self, jobName, runNumber, seqNumber):
def Manager::Manager::runBatch | ( | self, | ||
jobName, | ||||
runNumber, | ||||
seqNumber | ||||
) |
Submit current command to the batch queue
Definition at line 268 of file Manager.py.
00268 : 00269 """Submit current command to the batch queue""" 00270 batchArgs = self.argv[:] 00271 for arg in ['--batch','-b','--all-sequences']: 00272 while arg in batchArgs: 00273 batchArgs.remove(arg) 00274 if self.opts.all_sequences: 00275 # Insert current sequence number in batch command line 00276 batchArgs += ['-s %d'%self.opts.sequence] 00277 requiredDirectories = [ self.siteConfig.batchLogDirectory(runNumber) ] 00278 status = self.ensureDirectories( requiredDirectories ) 00279 if status!=Status.SUCCESS: 00280 return status 00281 # Run current job in batch mode 00282 batchCommand = self.siteConfig.batchCommand(batchArgs, jobName, 00283 runNumber, seqNumber) 00284 self.runCommand( batchCommand ) 00285 return Status.SUCCESS 00286 def runCommand(self, command):
def Manager::Manager::runCommand | ( | self, | ||
command | ||||
) |
Run a single shell command
Definition at line 287 of file Manager.py.
00287 : 00288 """Run a single shell command""" 00289 print "[Running: ]", command 00290 if not self.opts.dry_run: 00291 import subprocess 00292 process = subprocess.Popen(command, shell=True) 00293 return process.wait() 00294 return Status.SUCCESS 00295 def sequencesInRun(self, runNumber):
def Manager::Manager::sequencesInRun | ( | self, | ||
runNumber | ||||
) |
Get list of sequences in run from Catalog
Definition at line 296 of file Manager.py.
00296 : 00297 """Get list of sequences in run from Catalog""" 00298 seqList = [] 00299 import DybPython.Catalog as Catalog 00300 fileList = Catalog.runs[self.opts.run_number] 00301 for fname in fileList: 00302 fileDesc = FileDescription(fname) 00303 if not fileDesc.isValid: 00304 print "Skipping invalid file: ",fname 00305 continue 00306 seqList.append( fileDesc.sequence ) 00307 return seqList 00308 def processSequence(self, jobName, runNumber, seqNumber, fileName):
def Manager::Manager::processSequence | ( | self, | ||
jobName, | ||||
runNumber, | ||||
seqNumber, | ||||
fileName | ||||
) |
Process the NuWa job with the given name for one file
Definition at line 309 of file Manager.py.
00309 : 00310 """Process the NuWa job with the given name for one file""" 00311 # Run one sequence 00312 print "Processing: %s_%07d_%04d (%s)" % (jobName, runNumber, 00313 seqNumber, fileName) 00314 seqState = self.getState( JobType.RUN, jobName, runNumber, seqNumber) 00315 if seqState.state==State.RUN_DONE: 00316 # Skipping processed sequence 00317 print "processSequence: Run %d seq %d already processed." % (runNumber, 00318 seqNumber) 00319 return Status.SUCCESS 00320 if seqState.state==State.RUN_FAILED: 00321 # Skipping problematic sequence 00322 print "processSequence: Run %d seq %d has problems." % (runNumber, 00323 seqNumber) 00324 return Status.SUCCESS 00325 self.setState( State.RUN_PROCESSING, JobType.RUN, jobName, runNumber, seqNumber) 00326 requiredDirectories = [ self.siteConfig.logDirectory(runNumber) ] 00327 if self.jobConfig.hasStats: 00328 # Make sure output directory exists 00329 requiredDirectories += [ self.siteConfig.statsDirectory(runNumber) ] 00330 status = self.ensureDirectories( requiredDirectories ) 00331 if status!=Status.SUCCESS: 00332 self.setState( State.RUN_FAILED, JobType.RUN, jobName, runNumber, seqNumber) 00333 return status 00334 command = self.makeJobCommand(jobName, runNumber, seqNumber, fileName) 00335 # Run Command 00336 status = self.runCommand( command ) 00337 if status!=Status.SUCCESS: 00338 self.setState( State.RUN_FAILED, JobType.RUN, jobName, runNumber, seqNumber) 00339 return status 00340 self.setState( State.RUN_DONE, JobType.RUN, jobName, runNumber, seqNumber) 00341 return Status.SUCCESS 00342 def addSequenceStats(self, jobName, runNumber, seqNumber):
def Manager::Manager::addSequenceStats | ( | self, | ||
jobName, | ||||
runNumber, | ||||
seqNumber | ||||
) |
Add the sequence histograms to run total
Definition at line 343 of file Manager.py.
00343 : 00344 """Add the sequence histograms to run total""" 00345 # Add ROOT files 00346 status = self.lockStats(jobName, runNumber) 00347 if status!=Status.SUCCESS: 00348 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, runNumber, 00349 seqNumber) 00350 return status 00351 currentSequences = [] 00352 if seqNumber != None: 00353 # Add only the requested sequence 00354 currentSequences.append(seqNumber) 00355 else: 00356 # Loop over all sequences for this run 00357 currentSequences += self.sequencesInRun(runNumber) 00358 print "Add Sequence Stats: %s_%07d: %s" % (jobName, runNumber, 00359 str(currentSequences)) 00360 import os 00361 statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber) 00362 statsTempFile = "%s.tmp.%d" % (statsSumFile, os.getpid()) 00363 # Loop over sequences ROOT output, and add to run sum 00364 status=Status.SUCCESS 00365 for sequence in currentSequences: 00366 seqState = self.getState( JobType.ADDSTATS, jobName, runNumber, sequence) 00367 if seqState.state==State.STATS_DONE: 00368 # Skipping processed sequence 00369 print "AddStats: Run %d seq %d already added." % (runNumber, sequence) 00370 continue 00371 if seqState.state==State.STATS_FAILED: 00372 # Skipping problematic sequence 00373 print "AddStats: Run %d seq %d has problems." % (runNumber, sequence) 00374 continue 00375 self.setState( State.STATS_ADDING, JobType.ADDSTATS, jobName, runNumber, 00376 sequence) 00377 command = None 00378 statsSeqFile = self.siteConfig.statsSeqFile(jobName, runNumber, 00379 sequence) 00380 if os.path.isfile(statsSumFile): 00381 command = '$RUNDIAGNOSTICSROOT/share/mergeFiles.py -o %(statsTempFile)s %(statsSumFile)s %(statsSeqFile)s' % { 00382 "statsTempFile":statsTempFile, 00383 "statsSumFile":statsSumFile, 00384 "statsSeqFile":statsSeqFile 00385 } 00386 else: 00387 command = "cp %s %s" % (statsSeqFile, statsTempFile) 00388 # Run Command 00389 # Set job status to processing 00390 status = self.runCommand( command ) 00391 if status!=Status.SUCCESS: 00392 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, runNumber, 00393 sequence) 00394 break 00395 # Move temp file to replace total 00396 command = "mv %s %s" % (statsTempFile, statsSumFile) 00397 status = self.runCommand( command ) 00398 if status!=Status.SUCCESS: 00399 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, runNumber, 00400 sequence) 00401 break 00402 self.setState( State.STATS_DONE, JobType.ADDSTATS, jobName, runNumber, 00403 sequence) 00404 if os.path.isfile(statsTempFile): 00405 command = "rm %s" % (statsTempFile) 00406 self.runCommand( command ) 00407 self.unlockStats(jobName, runNumber) 00408 return status 00409 def generateFigures(self, jobName, runNumber):
def Manager::Manager::generateFigures | ( | self, | ||
jobName, | ||||
runNumber | ||||
) |
Generate figures for this run
Definition at line 410 of file Manager.py.
00410 : 00411 """Generate figures for this run""" 00412 print "Making figures: %s_%07d" % (jobName, runNumber) 00413 if self.opts.add_stats: 00414 # Add a short break to allow other add-stats jobs to proceed 00415 import time 00416 time.sleep(10) 00417 requiredDirectories = [ self.siteConfig.figuresDirectory(runNumber) ] 00418 status = self.ensureDirectories( requiredDirectories ) 00419 if status!=Status.SUCCESS: 00420 return status 00421 if self.statsLocked(jobName, runNumber): 00422 # Postpone figure generation for a later time 00423 print "Postponing figure generation" 00424 return Status.SUCCESS 00425 status = self.lockStats(jobName, runNumber) 00426 if status!=Status.SUCCESS: 00427 return status 00428 self.setState( State.SUMMARY_PRINTING, JobType.SUMMARY, jobName, runNumber) 00429 statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber) 00430 figuresDir = self.siteConfig.figuresDirectory(runNumber) 00431 command = "$RUNDIAGNOSTICSROOT/share/figures.py -o %(figuresDir)s -r stats/diagnostics/run_%(runNumber)07d %(statsSumFile)s" % { 00432 "figuresDir":figuresDir, 00433 "runNumber":runNumber, 00434 "statsSumFile":statsSumFile 00435 } 00436 # Run Command 00437 status = self.runCommand( command ) 00438 if status!=Status.SUCCESS: 00439 self.setState( State.SUMMARY_FAILED_PRINTING, JobType.SUMMARY, 00440 jobName, runNumber) 00441 status = self.unlockStats(jobName, runNumber) 00442 return status 00443 self.setState( State.SUMMARY_PRINTED, JobType.SUMMARY, jobName, runNumber) 00444 status = self.unlockStats(jobName, runNumber) 00445 return status 00446 def updateRunIndex(self, jobName, runNumber):
def Manager::Manager::updateRunIndex | ( | self, | ||
jobName, | ||||
runNumber | ||||
) |
Update/Create XML index for this run, with results from Job Name
Definition at line 447 of file Manager.py.
00447 : 00448 """Update/Create XML index for this run, with results from Job Name""" 00449 import os 00450 print "Updating Run XML: %s_%07d" % (jobName, runNumber) 00451 requiredDirectories = [ self.siteConfig.xmlDirectory(runNumber) ] 00452 status = self.ensureDirectories( requiredDirectories ) 00453 if status!=Status.SUCCESS: 00454 return status 00455 if self.statsLocked(jobName, runNumber) or self.runLocked(runNumber): 00456 # Postpone run summary for a later time 00457 print "Postponing update of run index" 00458 return Status.SUCCESS 00459 status = self.lockStats(jobName, runNumber) 00460 if status!=Status.SUCCESS: 00461 return status 00462 status = self.lockRun(runNumber) 00463 if status!=Status.SUCCESS: 00464 self.unlockStats(jobName, runNumber) 00465 return status 00466 statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber) 00467 statsInstallPath = self.siteConfig.statsInstallPath(jobName, runNumber) 00468 xmlIndexFile = self.siteConfig.xmlIndexFile(runNumber) 00469 indexTempFile = "%s.tmp.%d" % (xmlIndexFile, os.getpid()) 00470 figPathReplace = self.siteConfig.figuresPathReplace(runNumber) 00471 command = "$RUNDIAGNOSTICSROOT/share/xmlIndex.py -f %(statsSumFile)s -o %(indexTempFile)s -p %(statsInstallPath)s -r %(figPathReplace)s" % { 00472 "statsSumFile":statsSumFile, 00473 "indexTempFile":indexTempFile, 00474 "statsInstallPath":statsInstallPath, 00475 "figPathReplace":figPathReplace 00476 } 00477 # Run Command 00478 self.setState( State.SUMMARY_INDEXING, JobType.SUMMARY, jobName, runNumber) 00479 status = self.runCommand( command ) 00480 if status!=Status.SUCCESS: 00481 self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY, 00482 jobName, runNumber) 00483 self.unlockStats(jobName, runNumber) 00484 self.unlockRun(runNumber) 00485 return status 00486 import os 00487 if os.path.isfile(xmlIndexFile): 00488 # Merge new results into current index 00489 mergedFile = "%s.merge" % indexTempFile 00490 command = "$RUNDIAGNOSTICSROOT/share/xmlIndex.py -o %(mergedFile)s -i %(xmlIndexFile)s -u %(indexTempFile)s" % { 00491 "mergedFile":mergedFile, 00492 "xmlIndexFile":xmlIndexFile, 00493 "indexTempFile":indexTempFile 00494 } 00495 status = self.runCommand( command ) 00496 if status!=Status.SUCCESS: 00497 self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY, 00498 jobName, runNumber) 00499 self.unlockStats(jobName, runNumber) 00500 self.unlockRun(runNumber) 00501 return status 00502 # Move temp file to replace index 00503 command = "mv %s %s" % (mergedFile, indexTempFile) 00504 status = self.runCommand( command ) 00505 if status!=Status.SUCCESS: 00506 self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY, 00507 jobName, runNumber) 00508 self.unlockStats(jobName, runNumber) 00509 self.unlockRun(runNumber) 00510 return status 00511 # Move temp file to replace index 00512 command = "mv %s %s" % (indexTempFile, xmlIndexFile) 00513 status = self.runCommand( command ) 00514 if status!=Status.SUCCESS: 00515 self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY, 00516 jobName, runNumber) 00517 self.unlockStats(jobName, runNumber) 00518 self.unlockRun(runNumber) 00519 return status 00520 self.setState( State.SUMMARY_DONE, JobType.SUMMARY, jobName, runNumber) 00521 self.unlockStats(jobName, runNumber) 00522 self.unlockRun(runNumber) 00523 return Status.SUCCESS 00524 def updateFullIndex(self, runNumber):
def Manager::Manager::updateFullIndex | ( | self, | ||
runNumber | ||||
) |
Make sure this run is present/added to index of all runs
Definition at line 525 of file Manager.py.
00525 : 00526 """Make sure this run is present/added to index of all runs""" 00527 # Add/Verify Run is in index of all runs 00528 import os 00529 print "Updating Run List XML: %07d" % (runNumber) 00530 status = self.lockRunIndex() 00531 if status!=Status.SUCCESS: 00532 self.unlockRun(runNumber) 00533 return status 00534 xmlIndexFile = self.siteConfig.xmlIndexFile(runNumber) 00535 mainIndexFile = self.siteConfig.runIndexFile() 00536 mainFileDir = self.siteConfig.runIndexDir() 00537 mainTempFile = "%s.tmp.%d" % (mainIndexFile, os.getpid()) 00538 xmlRelativeDir = self.siteConfig.xmlRelativeDir(runNumber) 00539 requiredFiles = ['runs.xml','index.html','runs.css','runs.js'] 00540 for filename in requiredFiles: 00541 fullFilePath = mainFileDir+ '/' + filename 00542 if not os.path.isfile(fullFilePath): 00543 # Make main index file 00544 command = "cp $RUNDIAGNOSTICSROOT/output/%s %s" % ( 00545 filename, 00546 fullFilePath 00547 ) 00548 status = self.runCommand( command ) 00549 if status!=Status.SUCCESS: 00550 self.unlockRunIndex() 00551 return status 00552 # Run Command 00553 command = "$RUNDIAGNOSTICSROOT/share/xmlCollect.py -o %(mainTempFile)s -p %(xmlRelativeDir)s %(xmlIndexFile)s %(mainIndexFile)s" % { 00554 "mainTempFile":mainTempFile, 00555 "xmlIndexFile":xmlIndexFile, 00556 "mainIndexFile":mainIndexFile, 00557 "xmlRelativeDir":xmlRelativeDir 00558 } 00559 status = self.runCommand( command ) 00560 if status!=Status.SUCCESS: 00561 self.unlockRunIndex() 00562 return status 00563 command = "mv %s %s" % (mainTempFile, mainIndexFile) 00564 status = self.runCommand( command ) 00565 if status!=Status.SUCCESS: 00566 self.unlockRunIndex() 00567 return status 00568 self.unlockRunIndex() 00569 return Status.SUCCESS 00570 def clearSequence(self, jobName, runNumber, seqNumber):
def Manager::Manager::clearSequence | ( | self, | ||
jobName, | ||||
runNumber, | ||||
seqNumber | ||||
) |
Clear the job result and state to allow reprocessing
Definition at line 571 of file Manager.py.
00571 : 00572 """Clear the job result and state to allow reprocessing""" 00573 import os 00574 statsSeqFile = self.siteConfig.statsSeqFile(jobName, runNumber, 00575 seqNumber) 00576 if os.path.isfile(statsSeqFile): 00577 command = "rm -f %s" % statsSeqFile 00578 status = self.runCommand( command ) 00579 if status!=Status.SUCCESS: 00580 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, 00581 runNumber, seqNumber) 00582 return status 00583 return self.setState( State.RUN_READY, JobType.RUN, jobName, runNumber, 00584 seqNumber) 00585 def clearSequenceStats(self, jobName, runNumber, seqNumber):
def Manager::Manager::clearSequenceStats | ( | self, | ||
jobName, | ||||
runNumber, | ||||
seqNumber | ||||
) |
Clear the job state to allow reprocessing
Definition at line 586 of file Manager.py.
00586 : 00587 """Clear the job state to allow reprocessing""" 00588 return self.setState( State.STATS_READY, JobType.ADDSTATS, jobName, 00589 runNumber, seqNumber) 00590 def clearSummary(self, jobName, runNumber):
def Manager::Manager::clearSummary | ( | self, | ||
jobName, | ||||
runNumber | ||||
) |
Clear the job summary and state to allow reprocessing
Definition at line 591 of file Manager.py.
00591 : 00592 """Clear the job summary and state to allow reprocessing""" 00593 import os 00594 statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber) 00595 if os.path.isfile(statsSumFile): 00596 command = "rm -f %s" % statsSumFile 00597 status = self.runCommand( command ) 00598 if status!=Status.SUCCESS: 00599 return status 00600 return self.setState( State.SUMMARY_READY, JobType.ADDSTATS, jobName, 00601 runNumber) 00602 def printState(self, jobName, runNumber, seqNumber):
def Manager::Manager::printState | ( | self, | ||
jobName, | ||||
runNumber, | ||||
seqNumber | ||||
) |
Print State information for this job/run/sequence
Definition at line 603 of file Manager.py.
00603 : 00604 """Print State information for this job/run/sequence""" 00605 summaryState = self.getState(JobType.SUMMARY, jobName, runNumber) 00606 print "Run %07d: %s" % (runNumber, State.getName(summaryState.state)) 00607 currentSequences = [] 00608 if seqNumber != None: 00609 # Add only the requested sequence 00610 currentSequences.append(seqNumber) 00611 else: 00612 # Loop over all sequences for this run 00613 currentSequences += self.sequencesInRun(runNumber) 00614 for sequence in currentSequences: 00615 runState = self.getState(JobType.RUN, jobName, runNumber, sequence) 00616 statsState = self.getState(JobType.ADDSTATS, jobName, runNumber, 00617 sequence) 00618 print " Seq %04d: %s %s" % (sequence, 00619 State.getName(runState.state), 00620 State.getName(statsState.state)) 00621 return Status.SUCCESS 00622 def resolveRunSequenceFile(self, runNumber, seqNumber, fileName):
def Manager::Manager::resolveRunSequenceFile | ( | self, | ||
runNumber, | ||||
seqNumber, | ||||
fileName | ||||
) |
Resolve the Run Number, Sequence Number, and Filename, given a subset of the information.
Definition at line 623 of file Manager.py.
00623 : 00624 """Resolve the Run Number, Sequence Number, and Filename, 00625 given a subset of the information. 00626 """ 00627 badResolve = [Status.FAILURE, None, None, None] 00628 if fileName!=None: 00629 # Check if filename is consistent with supplied run/seq numbers 00630 fileDesc = FileDescription(fileName) 00631 if not fileDesc.isValid: 00632 print "Invalid file: ",fileName 00633 return badResolve 00634 if runNumber!=None and runNumber!=fileDesc.runNumber: 00635 print "Error: runNumber %d invalid for file %s" 00636 return badResolve 00637 if seqNumber!=None and seqNumber!=fileDesc.sequence: 00638 print "Error: runNumber %d invalid for file %s" 00639 return badResolve 00640 return [Status.SUCCESS, fileDesc.runNumber, 00641 fileDesc.sequence, fileName] 00642 else: 00643 # Lookup filename from catalog 00644 if runNumber==None or seqNumber==None: 00645 print "Error: Either Filename or Run/Seq must be specified." 00646 return badResolve 00647 import DybPython.Catalog as Catalog 00648 fileList = Catalog.runs[runNumber] 00649 for fname in fileList: 00650 fileDesc = FileDescription(fname) 00651 if not fileDesc.isValid: 00652 print "Skipping invalid file from Catalog: ",fname 00653 continue 00654 if fileDesc.sequence==seqNumber: 00655 # Found a file match for run/seq number 00656 return [Status.SUCCESS, runNumber, seqNumber, fname] 00657 return badResolve 00658 def makeJobCommand(self, jobName, runNumber, seqNumber, fileName):
def Manager::Manager::makeJobCommand | ( | self, | ||
jobName, | ||||
runNumber, | ||||
seqNumber, | ||||
fileName | ||||
) |
Make the command to be run for this job
Definition at line 659 of file Manager.py.
00659 : 00660 """ Make the command to be run for this job """ 00661 # Prepare Job Command 00662 commandArgs = {} 00663 commandArgs["dataFile"] = fileName 00664 commandArgs["outLog"] = self.siteConfig.logOutFile(jobName, 00665 runNumber, 00666 seqNumber) 00667 commandArgs["errLog"] = self.siteConfig.logErrFile(jobName, 00668 runNumber, 00669 seqNumber) 00670 statsFile = None 00671 if self.jobConfig.hasStats: 00672 statsFile = self.siteConfig.statsSeqFile(jobName, runNumber, 00673 seqNumber) 00674 commandArgs["statsFile"] = statsFile 00675 command = self.jobConfig.command % commandArgs 00676 return command 00677 def ensureDirectories(self, directories):
def Manager::Manager::ensureDirectories | ( | self, | ||
directories | ||||
) |
Check if directories exist, and make if necessary
Definition at line 678 of file Manager.py.
00678 : 00679 """Check if directories exist, and make if necessary""" 00680 import os 00681 for directory in directories: 00682 if not os.path.exists(directory): 00683 command = "mkdir -p %s" % directory 00684 status = self.runCommand( command ) 00685 if status!=Status.SUCCESS: 00686 return status 00687 return Status.SUCCESS 00688 ### Functions for handling job locks #################################
def Manager::Manager::setLock | ( | self, | ||
lockFile | ||||
) |
Request a lockfile, and wait for lock
Definition at line 691 of file Manager.py.
00691 : 00692 """ Request a lockfile, and wait for lock """ 00693 command = "lockfile %s" % lockFile 00694 return self.runCommand(command) 00695 def isLocked(self, lockFile):
def Manager::Manager::isLocked | ( | self, | ||
lockFile | ||||
) |
Check whether a lockfile exists
Definition at line 696 of file Manager.py.
00696 : 00697 """ Check whether a lockfile exists """ 00698 import os 00699 return os.path.isfile(lockFile) 00700 def removeLock(self, lockFile):
def Manager::Manager::removeLock | ( | self, | ||
lockFile | ||||
) |
Remove a lockfile
Definition at line 701 of file Manager.py.
00701 : 00702 """ Remove a lockfile """ 00703 command = "rm -f %s" % lockFile 00704 return self.runCommand(command) 00705 def lockStats(self, jobName, runNumber):
def Manager::Manager::lockStats | ( | self, | ||
jobName, | ||||
runNumber | ||||
) |
Lock statistics
Definition at line 706 of file Manager.py.
00706 : 00707 """ Lock statistics """ 00708 statsLockFile = self.siteConfig.statsLockFile(jobName, runNumber) 00709 return self.setLock( statsLockFile ) 00710 def statsLocked(self, jobName, runNumber):
def Manager::Manager::statsLocked | ( | self, | ||
jobName, | ||||
runNumber | ||||
) |
Are statistics locked?
Definition at line 711 of file Manager.py.
00711 : 00712 """ Are statistics locked? """ 00713 statsLockFile = self.siteConfig.statsLockFile(jobName, runNumber) 00714 return self.isLocked( statsLockFile ) 00715 def unlockStats(self, jobName, runNumber):
def Manager::Manager::unlockStats | ( | self, | ||
jobName, | ||||
runNumber | ||||
) |
Unlock statistics
Definition at line 716 of file Manager.py.
00716 : 00717 """ Unlock statistics """ 00718 statsLockFile = self.siteConfig.statsLockFile(jobName, runNumber) 00719 return self.removeLock( statsLockFile ) 00720 def lockRun(self, runNumber):
def Manager::Manager::lockRun | ( | self, | ||
runNumber | ||||
) |
Lock run summary
Definition at line 721 of file Manager.py.
00721 : 00722 """ Lock run summary """ 00723 runLockFile = self.siteConfig.runLockFile(runNumber) 00724 return self.setLock( runLockFile ) 00725 def runLocked(self, runNumber):
def Manager::Manager::runLocked | ( | self, | ||
runNumber | ||||
) |
Is run summary locked?
Definition at line 726 of file Manager.py.
00726 : 00727 """ Is run summary locked? """ 00728 runLockFile = self.siteConfig.runLockFile(runNumber) 00729 return self.isLocked( runLockFile ) 00730 def unlockRun(self, runNumber):
def Manager::Manager::unlockRun | ( | self, | ||
runNumber | ||||
) |
Unlock run summary
Definition at line 731 of file Manager.py.
00731 : 00732 """ Unlock run summary """ 00733 runLockFile = self.siteConfig.runLockFile(runNumber) 00734 return self.removeLock( runLockFile ) 00735 def lockRunIndex(self):
def Manager::Manager::lockRunIndex | ( | self | ) |
Lock index of runs
Definition at line 736 of file Manager.py.
00736 : 00737 """ Lock index of runs """ 00738 lockFile = self.siteConfig.runIndexLockFile() 00739 return self.setLock( lockFile ) 00740 def runIndexLocked(self):
def Manager::Manager::runIndexLocked | ( | self | ) |
Is run summary locked?
Definition at line 741 of file Manager.py.
00741 : 00742 """ Is run summary locked? """ 00743 lockFile = self.siteConfig.runIndexLockFile() 00744 return self.isLocked( lockFile ) 00745 def unlockRunIndex(self):
def Manager::Manager::unlockRunIndex | ( | self | ) |
Unlock run summary
Definition at line 746 of file Manager.py.
00746 : 00747 """ Unlock run summary """ 00748 lockFile = self.siteConfig.runIndexLockFile() 00749 return self.removeLock( lockFile ) 00750 ### Functions for handling job state #################################
def Manager::Manager::getState | ( | self, | ||
jobType, | ||||
jobName, | ||||
runNumber, | ||||
seqNumber = None | ||||
) |
Check the state of a job
Definition at line 753 of file Manager.py.
00753 : 00754 """ Check the state of a job """ 00755 import os 00756 stateFile = self.siteConfig.stateFile(jobType, jobName, runNumber, seqNumber) 00757 if not os.path.isfile( stateFile ): 00758 # Job is unknown 00759 return JobState() 00760 return self.getStateFromFile( stateFile ) 00761 def setState(self, newState, jobType, jobName, runNumber, seqNumber=None):
def Manager::Manager::setState | ( | self, | ||
newState, | ||||
jobType, | ||||
jobName, | ||||
runNumber, | ||||
seqNumber = None | ||||
) |
Set the state of a job
Definition at line 762 of file Manager.py.
00762 : 00763 """ Set the state of a job """ 00764 stateFile = self.siteConfig.stateFile(jobType, jobName, runNumber, seqNumber) 00765 return self.setStateInFile(stateFile, newState) 00766 def setStateInFile(self, stateFile, newState):
def Manager::Manager::setStateInFile | ( | self, | ||
stateFile, | ||||
newState | ||||
) |
Set the job state in the given state file
Definition at line 767 of file Manager.py.
00767 : 00768 """Set the job state in the given state file""" 00769 stateName = State.getName( newState ) 00770 import time, datetime 00771 stateTimeStr = time.ctime() # Note: this is localtime 00772 stateLogStr = self.stateSep.join([stateName, stateTimeStr]) 00773 import os 00774 status = self.ensureDirectories( [os.path.dirname( stateFile )] ) 00775 if status != Status.SUCCESS: 00776 return status 00777 command = "echo '%s' >> %s" % (stateLogStr, stateFile) 00778 return self.runCommand( command ) 00779 def getStateFromFile(self, stateFile):
def Manager::Manager::getStateFromFile | ( | self, | ||
stateFile | ||||
) |
Get state from file (Internal function)
Definition at line 780 of file Manager.py.
00780 : 00781 """ Get state from file (Internal function) """ 00782 lastLine = self.lastLine(stateFile) 00783 if len(lastLine)==0: 00784 return JobState() 00785 stateParts = lastLine.split(self.stateSep) 00786 if len(stateParts) != self.stateNParts: 00787 return JobState() 00788 # State 00789 stateStr = stateParts[0].strip() 00790 state = State.getStateByName(stateStr) 00791 # State time 00792 stateTimeStr = stateParts[1].strip() 00793 import time 00794 ttuple = time.strptime( stateTimeStr ) 00795 stateTime = time.mktime( ttuple ) 00796 return JobState(state, stateTime) 00797 def lastLine(self, filename):
def Manager::Manager::lastLine | ( | self, | ||
filename | ||||
) |
Get the last line from a file
Definition at line 798 of file Manager.py.
00798 : 00799 """Get the last line from a file""" 00800 import subprocess 00801 tailProc = subprocess.Popen("tail -n 1 %s" % filename, 00802 stdout=subprocess.PIPE, 00803 shell=True) 00804 (out, err) = tailProc.communicate() 00805 status = tailProc.wait() 00806 if status!=Status.SUCCESS or type(out)!=str: 00807 return "" 00808 return out.strip() 00809