00001
00002
00003
00004
00005
00006
00007
00008 import ProcessManager.Status as Status
00009 from ProcessManager.SiteConfig import siteConfigs
00010 from ProcessManager.FileDescription import FileDescription
00011 from ProcessManager.JobConfig import jobConfigs
00012 from ProcessManager.JobState import State, JobState, JobType
00013
00014 class Manager:
00015 """Class which manages NuWa jobs"""
00016 def __init__(self):
00017 """Constructor"""
00018 self.jobConfig = None
00019 self.siteConfig = None
00020 self.stateSep=','
00021 self.stateNParts=2
00022 self.initializeOptions()
00023 return
00024
00025 def initializeOptions(self):
00026 """ Add options to command line parser """
00027 from optparse import OptionParser
00028 from DybPython.hostid import hostid
00029 parser = OptionParser(usage=self.__doc__)
00030
00031 parser.add_option("-S", "--summarize-run",action="store_true",
00032 default=False, help="Summarize run histograms")
00033
00034 parser.add_option("", "--print-state",action="store_true",
00035 default=False, help="Print the job state of a run")
00036
00037 parser.add_option("", "--clear-sequence",action="store_true",
00038 default=False, help="Clear NuWa Job state for reprocessing")
00039
00040 parser.add_option("", "--clear-sequence-stats",action="store_true",
00041 default=False, help="Clear Add Stats state for readding")
00042
00043 parser.add_option("", "--clear-summary",action="store_true",
00044 default=False, help="Clear run summary for reprocessing")
00045
00046 parser.add_option("", "--run-nuwa",action="store_true",
00047 default=False, help="Process NuWa Job")
00048
00049 parser.add_option("", "--add-stats",action="store_true",
00050 default=False, help="Add root file to run total")
00051
00052 parser.add_option("", "--make-figures",action="store_true",
00053 default=False, help="Make figures for this job/run")
00054
00055 parser.add_option("", "--xml-index",action="store_true",
00056 default=False, help="Make XML index for this job/run")
00057
00058 parser.add_option("", "--xml-add-run",action="store_true",
00059 default=False, help="Add run to site XML index")
00060
00061 parser.add_option("-j", "--job-name",default=None,type="string",
00062 help="Name of NuWa Job")
00063
00064 parser.add_option("-r", "--run-number",default=None,type="int",
00065 help="Run Number for NuWa Job")
00066
00067 parser.add_option("-A", "--all-sequences",action="store_true",
00068 default=False, help="Process all sequences in run")
00069
00070 parser.add_option("-s", "--sequence",default=None,type="int",
00071 help="File Sequence Number for NuWa Job")
00072
00073 parser.add_option("-f", "--filename",default=None,type="string",
00074 help="Data file name for current job")
00075
00076 parser.add_option("-l", "--log-level",default=3,type="int",
00077 help="Set output log level")
00078
00079 parser.add_option("-b", "--batch",action="store_true",default=False,
00080 help="Send this job to the batch queue")
00081
00082 parser.add_option("-c", "--cluster",default="pdsf",type="string",
00083 help="Use the configuration for this cluster")
00084
00085 parser.add_option("-d", "--dry-run",action="store_true",default=False,
00086 help="Debug run; don't run any actual jobs")
00087
00088 self.opts_parser = parser
00089
00090 return
00091
00092
00093 def parseOptions(self, argv):
00094 """Parse command line options to given to Manager"""
00095 self.argv = argv
00096 (options, args) = self.opts_parser.parse_args(args=argv)
00097 self.opts = options
00098 self.args = args
00099
00100
00101 if self.opts.summarize_run:
00102 self.opts.make_figures = True
00103 self.opts.xml_index = True
00104 self.opts.xml_add_run = True
00105
00106 if self.opts.filename!=None or (self.opts.run_number!=None
00107 and self.opts.sequence!=None):
00108
00109
00110 [status, run, seq, fname] = self.resolveRunSequenceFile(
00111 self.opts.run_number,
00112 self.opts.sequence,
00113 self.opts.filename)
00114 if status != Status.SUCCESS: return status
00115 self.opts.run_number = run
00116 self.opts.sequence = seq
00117 self.opts.filename = fname
00118
00119
00120 if (self.opts.run_nuwa
00121 or self.opts.add_stats
00122 or self.opts.make_figures
00123 or self.opts.xml_index
00124 or self.opts.xml_add_run
00125 or self.opts.clear_sequence
00126 or self.opts.clear_sequence_stats
00127 or self.opts.clear_summary
00128 or self.opts.print_state):
00129 if self.opts.run_number==None:
00130 print "Error: No run number has been found or provided"
00131 return Status.FAILURE
00132 if (self.opts.run_nuwa
00133 or self.opts.add_stats
00134 or self.opts.make_figures
00135 or self.opts.xml_index
00136 or self.opts.clear_sequence
00137 or self.opts.clear_sequence_stats
00138 or self.opts.clear_summary
00139 or self.opts.print_state):
00140 if self.opts.job_name==None:
00141 print "Error: A job name must be provided: use '-j'"
00142 return Status.FAILURE
00143 if jobConfigs.has_key(self.opts.job_name):
00144 self.jobConfig = jobConfigs[self.opts.job_name]
00145 else:
00146 print "Error: Unknown job name '%s'" % self.opts.job_name
00147 return Status.FAILURE
00148 if (self.opts.run_nuwa
00149 or self.opts.clear_sequence
00150 or self.opts.clear_sequence_stats) and not self.opts.all_sequences:
00151 if self.opts.sequence==None:
00152 print "Error: No sequence number has been found or provided"
00153 return Status.FAILURE
00154 if self.opts.filename==None:
00155 print "Error: No data filename has been found or provided"
00156 return Status.FAILURE
00157
00158 if self.opts.all_sequences:
00159 if self.opts.sequence!=None:
00160 print "Error: Do not set sequence when processing all sequences"
00161 return Status.FAILURE
00162 if self.opts.filename!=None:
00163 print "Error: Do not set filename when processing all sequences"
00164 return Status.FAILURE
00165 if not (self.opts.run_nuwa
00166 or self.opts.clear_sequence
00167 or self.opts.clear_sequence_stats):
00168 print "Error: Requesting 'all-sequences' but not 'run-nuwa', 'clear-sequence', or 'clear-sequence-stats'?"
00169 return Status.FAILURE
00170
00171 if siteConfigs.has_key(self.opts.cluster):
00172 self.siteConfig = siteConfigs[self.opts.cluster]
00173 else:
00174 print "Error: Unknown computer cluster '%s'; add to SiteConfigs" % self.opts.cluster
00175 return Status.FAILURE
00176
00177 return Status.SUCCESS
00178
00179 def run(self):
00180 """ Run the manager """
00181 import os
00182 os.umask(2)
00183 if self.opts.all_sequences:
00184
00185 import DybPython.Catalog as Catalog
00186 fileList = Catalog.runs[self.opts.run_number]
00187 for fname in fileList:
00188 fileDesc = FileDescription(fname)
00189 if not fileDesc.isValid:
00190 print "Skipping invalid file: ",fname
00191 continue
00192 self.opts.filename = fname
00193 self.opts.sequence = fileDesc.sequence
00194 status = self.runOneJob()
00195 self.opts.filename = None
00196 self.opts.sequence = None
00197 if status != Status.SUCCESS: return status
00198 else:
00199 status = self.runOneJob()
00200 if status != Status.SUCCESS: return status
00201 return Status.SUCCESS
00202
00203 def runOneJob(self):
00204 """ Run a single job """
00205 if self.opts.batch:
00206
00207 return self.runBatch(self.opts.job_name,
00208 self.opts.run_number,
00209 self.opts.sequence)
00210
00211 if self.opts.print_state:
00212
00213 status = self.printState(self.opts.job_name,
00214 self.opts.run_number,
00215 self.opts.sequence)
00216 if status != Status.SUCCESS: return status
00217 if self.opts.clear_sequence:
00218
00219 status = self.clearSequence(self.opts.job_name,
00220 self.opts.run_number,
00221 self.opts.sequence)
00222 if status != Status.SUCCESS: return status
00223 if self.jobConfig.hasStats:
00224 status = self.clearSequenceStats(self.opts.job_name,
00225 self.opts.run_number,
00226 self.opts.sequence)
00227 if status != Status.SUCCESS: return status
00228 if self.opts.clear_sequence_stats:
00229
00230 if self.jobConfig.hasStats:
00231 status = self.clearSequenceStats(self.opts.job_name,
00232 self.opts.run_number,
00233 self.opts.sequence)
00234 if status != Status.SUCCESS: return status
00235 if self.opts.clear_summary:
00236
00237 status = self.clearSummary(self.opts.job_name, self.opts.run_number)
00238 if status != Status.SUCCESS: return status
00239 if self.opts.run_nuwa:
00240
00241 status = self.processSequence(self.opts.job_name,
00242 self.opts.run_number,
00243 self.opts.sequence,
00244 self.opts.filename)
00245 if status != Status.SUCCESS: return status
00246 if self.opts.add_stats:
00247
00248 status = self.addSequenceStats(self.opts.job_name,
00249 self.opts.run_number,
00250 self.opts.sequence)
00251 if status != Status.SUCCESS: return status
00252 if self.opts.make_figures:
00253
00254 status = self.generateFigures(self.opts.job_name,
00255 self.opts.run_number)
00256 if status != Status.SUCCESS: return status
00257 if self.opts.xml_index:
00258
00259 status = self.updateRunIndex(self.opts.job_name,
00260 self.opts.run_number)
00261 if status != Status.SUCCESS: return status
00262 if self.opts.xml_add_run:
00263
00264 status = self.updateFullIndex(self.opts.run_number)
00265 if status != Status.SUCCESS: return status
00266 return Status.SUCCESS
00267
00268 def runBatch(self, jobName, runNumber, seqNumber):
00269 """Submit current command to the batch queue"""
00270 batchArgs = self.argv[:]
00271 for arg in ['--batch','-b','--all-sequences']:
00272 while arg in batchArgs:
00273 batchArgs.remove(arg)
00274 if self.opts.all_sequences:
00275
00276 batchArgs += ['-s %d'%self.opts.sequence]
00277 requiredDirectories = [ self.siteConfig.batchLogDirectory(runNumber) ]
00278 status = self.ensureDirectories( requiredDirectories )
00279 if status!=Status.SUCCESS:
00280 return status
00281
00282 batchCommand = self.siteConfig.batchCommand(batchArgs, jobName,
00283 runNumber, seqNumber)
00284 self.runCommand( batchCommand )
00285 return Status.SUCCESS
00286
00287 def runCommand(self, command):
00288 """Run a single shell command"""
00289 print "[Running: ]", command
00290 if not self.opts.dry_run:
00291 import subprocess
00292 process = subprocess.Popen(command, shell=True)
00293 return process.wait()
00294 return Status.SUCCESS
00295
00296 def sequencesInRun(self, runNumber):
00297 """Get list of sequences in run from Catalog"""
00298 seqList = []
00299 import DybPython.Catalog as Catalog
00300 fileList = Catalog.runs[self.opts.run_number]
00301 for fname in fileList:
00302 fileDesc = FileDescription(fname)
00303 if not fileDesc.isValid:
00304 print "Skipping invalid file: ",fname
00305 continue
00306 seqList.append( fileDesc.sequence )
00307 return seqList
00308
00309 def processSequence(self, jobName, runNumber, seqNumber, fileName):
00310 """Process the NuWa job with the given name for one file"""
00311
00312 print "Processing: %s_%07d_%04d (%s)" % (jobName, runNumber,
00313 seqNumber, fileName)
00314 seqState = self.getState( JobType.RUN, jobName, runNumber, seqNumber)
00315 if seqState.state==State.RUN_DONE:
00316
00317 print "processSequence: Run %d seq %d already processed." % (runNumber,
00318 seqNumber)
00319 return Status.SUCCESS
00320 if seqState.state==State.RUN_FAILED:
00321
00322 print "processSequence: Run %d seq %d has problems." % (runNumber,
00323 seqNumber)
00324 return Status.SUCCESS
00325 self.setState( State.RUN_PROCESSING, JobType.RUN, jobName, runNumber, seqNumber)
00326 requiredDirectories = [ self.siteConfig.logDirectory(runNumber) ]
00327 if self.jobConfig.hasStats:
00328
00329 requiredDirectories += [ self.siteConfig.statsDirectory(runNumber) ]
00330 status = self.ensureDirectories( requiredDirectories )
00331 if status!=Status.SUCCESS:
00332 self.setState( State.RUN_FAILED, JobType.RUN, jobName, runNumber, seqNumber)
00333 return status
00334 command = self.makeJobCommand(jobName, runNumber, seqNumber, fileName)
00335
00336 status = self.runCommand( command )
00337 if status!=Status.SUCCESS:
00338 self.setState( State.RUN_FAILED, JobType.RUN, jobName, runNumber, seqNumber)
00339 return status
00340 self.setState( State.RUN_DONE, JobType.RUN, jobName, runNumber, seqNumber)
00341 return Status.SUCCESS
00342
00343 def addSequenceStats(self, jobName, runNumber, seqNumber):
00344 """Add the sequence histograms to run total"""
00345
00346 status = self.lockStats(jobName, runNumber)
00347 if status!=Status.SUCCESS:
00348 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, runNumber,
00349 seqNumber)
00350 return status
00351 currentSequences = []
00352 if seqNumber != None:
00353
00354 currentSequences.append(seqNumber)
00355 else:
00356
00357 currentSequences += self.sequencesInRun(runNumber)
00358 print "Add Sequence Stats: %s_%07d: %s" % (jobName, runNumber,
00359 str(currentSequences))
00360 import os
00361 statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00362 statsTempFile = "%s.tmp.%d" % (statsSumFile, os.getpid())
00363
00364 status=Status.SUCCESS
00365 for sequence in currentSequences:
00366 seqState = self.getState( JobType.ADDSTATS, jobName, runNumber, sequence)
00367 if seqState.state==State.STATS_DONE:
00368
00369 print "AddStats: Run %d seq %d already added." % (runNumber, sequence)
00370 continue
00371 if seqState.state==State.STATS_FAILED:
00372
00373 print "AddStats: Run %d seq %d has problems." % (runNumber, sequence)
00374 continue
00375 self.setState( State.STATS_ADDING, JobType.ADDSTATS, jobName, runNumber,
00376 sequence)
00377 command = None
00378 statsSeqFile = self.siteConfig.statsSeqFile(jobName, runNumber,
00379 sequence)
00380 if os.path.isfile(statsSumFile):
00381 command = '$RUNDIAGNOSTICSROOT/share/mergeFiles.py -o %(statsTempFile)s %(statsSumFile)s %(statsSeqFile)s' % {
00382 "statsTempFile":statsTempFile,
00383 "statsSumFile":statsSumFile,
00384 "statsSeqFile":statsSeqFile
00385 }
00386 else:
00387 command = "cp %s %s" % (statsSeqFile, statsTempFile)
00388
00389
00390 status = self.runCommand( command )
00391 if status!=Status.SUCCESS:
00392 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, runNumber,
00393 sequence)
00394 break
00395
00396 command = "mv %s %s" % (statsTempFile, statsSumFile)
00397 status = self.runCommand( command )
00398 if status!=Status.SUCCESS:
00399 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, runNumber,
00400 sequence)
00401 break
00402 self.setState( State.STATS_DONE, JobType.ADDSTATS, jobName, runNumber,
00403 sequence)
00404 if os.path.isfile(statsTempFile):
00405 command = "rm %s" % (statsTempFile)
00406 self.runCommand( command )
00407 self.unlockStats(jobName, runNumber)
00408 return status
00409
00410 def generateFigures(self, jobName, runNumber):
00411 """Generate figures for this run"""
00412 print "Making figures: %s_%07d" % (jobName, runNumber)
00413 if self.opts.add_stats:
00414
00415 import time
00416 time.sleep(10)
00417 requiredDirectories = [ self.siteConfig.figuresDirectory(runNumber) ]
00418 status = self.ensureDirectories( requiredDirectories )
00419 if status!=Status.SUCCESS:
00420 return status
00421 if self.statsLocked(jobName, runNumber):
00422
00423 print "Postponing figure generation"
00424 return Status.SUCCESS
00425 status = self.lockStats(jobName, runNumber)
00426 if status!=Status.SUCCESS:
00427 return status
00428 self.setState( State.SUMMARY_PRINTING, JobType.SUMMARY, jobName, runNumber)
00429 statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00430 figuresDir = self.siteConfig.figuresDirectory(runNumber)
00431 command = "$RUNDIAGNOSTICSROOT/share/figures.py -o %(figuresDir)s -r stats/diagnostics/run_%(runNumber)07d %(statsSumFile)s" % {
00432 "figuresDir":figuresDir,
00433 "runNumber":runNumber,
00434 "statsSumFile":statsSumFile
00435 }
00436
00437 status = self.runCommand( command )
00438 if status!=Status.SUCCESS:
00439 self.setState( State.SUMMARY_FAILED_PRINTING, JobType.SUMMARY,
00440 jobName, runNumber)
00441 status = self.unlockStats(jobName, runNumber)
00442 return status
00443 self.setState( State.SUMMARY_PRINTED, JobType.SUMMARY, jobName, runNumber)
00444 status = self.unlockStats(jobName, runNumber)
00445 return status
00446
00447 def updateRunIndex(self, jobName, runNumber):
00448 """Update/Create XML index for this run, with results from Job Name"""
00449 import os
00450 print "Updating Run XML: %s_%07d" % (jobName, runNumber)
00451 requiredDirectories = [ self.siteConfig.xmlDirectory(runNumber) ]
00452 status = self.ensureDirectories( requiredDirectories )
00453 if status!=Status.SUCCESS:
00454 return status
00455 if self.statsLocked(jobName, runNumber) or self.runLocked(runNumber):
00456
00457 print "Postponing update of run index"
00458 return Status.SUCCESS
00459 status = self.lockStats(jobName, runNumber)
00460 if status!=Status.SUCCESS:
00461 return status
00462 status = self.lockRun(runNumber)
00463 if status!=Status.SUCCESS:
00464 self.unlockStats(jobName, runNumber)
00465 return status
00466 statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00467 statsInstallPath = self.siteConfig.statsInstallPath(jobName, runNumber)
00468 xmlIndexFile = self.siteConfig.xmlIndexFile(runNumber)
00469 indexTempFile = "%s.tmp.%d" % (xmlIndexFile, os.getpid())
00470 figPathReplace = self.siteConfig.figuresPathReplace(runNumber)
00471 command = "$RUNDIAGNOSTICSROOT/share/xmlIndex.py -f %(statsSumFile)s -o %(indexTempFile)s -p %(statsInstallPath)s -r %(figPathReplace)s" % {
00472 "statsSumFile":statsSumFile,
00473 "indexTempFile":indexTempFile,
00474 "statsInstallPath":statsInstallPath,
00475 "figPathReplace":figPathReplace
00476 }
00477
00478 self.setState( State.SUMMARY_INDEXING, JobType.SUMMARY, jobName, runNumber)
00479 status = self.runCommand( command )
00480 if status!=Status.SUCCESS:
00481 self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY,
00482 jobName, runNumber)
00483 self.unlockStats(jobName, runNumber)
00484 self.unlockRun(runNumber)
00485 return status
00486 import os
00487 if os.path.isfile(xmlIndexFile):
00488
00489 mergedFile = "%s.merge" % indexTempFile
00490 command = "$RUNDIAGNOSTICSROOT/share/xmlIndex.py -o %(mergedFile)s -i %(xmlIndexFile)s -u %(indexTempFile)s" % {
00491 "mergedFile":mergedFile,
00492 "xmlIndexFile":xmlIndexFile,
00493 "indexTempFile":indexTempFile
00494 }
00495 status = self.runCommand( command )
00496 if status!=Status.SUCCESS:
00497 self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY,
00498 jobName, runNumber)
00499 self.unlockStats(jobName, runNumber)
00500 self.unlockRun(runNumber)
00501 return status
00502
00503 command = "mv %s %s" % (mergedFile, indexTempFile)
00504 status = self.runCommand( command )
00505 if status!=Status.SUCCESS:
00506 self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY,
00507 jobName, runNumber)
00508 self.unlockStats(jobName, runNumber)
00509 self.unlockRun(runNumber)
00510 return status
00511
00512 command = "mv %s %s" % (indexTempFile, xmlIndexFile)
00513 status = self.runCommand( command )
00514 if status!=Status.SUCCESS:
00515 self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY,
00516 jobName, runNumber)
00517 self.unlockStats(jobName, runNumber)
00518 self.unlockRun(runNumber)
00519 return status
00520 self.setState( State.SUMMARY_DONE, JobType.SUMMARY, jobName, runNumber)
00521 self.unlockStats(jobName, runNumber)
00522 self.unlockRun(runNumber)
00523 return Status.SUCCESS
00524
00525 def updateFullIndex(self, runNumber):
00526 """Make sure this run is present/added to index of all runs"""
00527
00528 import os
00529 print "Updating Run List XML: %07d" % (runNumber)
00530 status = self.lockRunIndex()
00531 if status!=Status.SUCCESS:
00532 self.unlockRun(runNumber)
00533 return status
00534 xmlIndexFile = self.siteConfig.xmlIndexFile(runNumber)
00535 mainIndexFile = self.siteConfig.runIndexFile()
00536 mainFileDir = self.siteConfig.runIndexDir()
00537 mainTempFile = "%s.tmp.%d" % (mainIndexFile, os.getpid())
00538 xmlRelativeDir = self.siteConfig.xmlRelativeDir(runNumber)
00539 requiredFiles = ['runs.xml','index.html','runs.css','runs.js']
00540 for filename in requiredFiles:
00541 fullFilePath = mainFileDir+ '/' + filename
00542 if not os.path.isfile(fullFilePath):
00543
00544 command = "cp $RUNDIAGNOSTICSROOT/output/%s %s" % (
00545 filename,
00546 fullFilePath
00547 )
00548 status = self.runCommand( command )
00549 if status!=Status.SUCCESS:
00550 self.unlockRunIndex()
00551 return status
00552
00553 command = "$RUNDIAGNOSTICSROOT/share/xmlCollect.py -o %(mainTempFile)s -p %(xmlRelativeDir)s %(xmlIndexFile)s %(mainIndexFile)s" % {
00554 "mainTempFile":mainTempFile,
00555 "xmlIndexFile":xmlIndexFile,
00556 "mainIndexFile":mainIndexFile,
00557 "xmlRelativeDir":xmlRelativeDir
00558 }
00559 status = self.runCommand( command )
00560 if status!=Status.SUCCESS:
00561 self.unlockRunIndex()
00562 return status
00563 command = "mv %s %s" % (mainTempFile, mainIndexFile)
00564 status = self.runCommand( command )
00565 if status!=Status.SUCCESS:
00566 self.unlockRunIndex()
00567 return status
00568 self.unlockRunIndex()
00569 return Status.SUCCESS
00570
00571 def clearSequence(self, jobName, runNumber, seqNumber):
00572 """Clear the job result and state to allow reprocessing"""
00573 import os
00574 statsSeqFile = self.siteConfig.statsSeqFile(jobName, runNumber,
00575 seqNumber)
00576 if os.path.isfile(statsSeqFile):
00577 command = "rm -f %s" % statsSeqFile
00578 status = self.runCommand( command )
00579 if status!=Status.SUCCESS:
00580 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName,
00581 runNumber, seqNumber)
00582 return status
00583 return self.setState( State.RUN_READY, JobType.RUN, jobName, runNumber,
00584 seqNumber)
00585
00586 def clearSequenceStats(self, jobName, runNumber, seqNumber):
00587 """Clear the job state to allow reprocessing"""
00588 return self.setState( State.STATS_READY, JobType.ADDSTATS, jobName,
00589 runNumber, seqNumber)
00590
00591 def clearSummary(self, jobName, runNumber):
00592 """Clear the job summary and state to allow reprocessing"""
00593 import os
00594 statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00595 if os.path.isfile(statsSumFile):
00596 command = "rm -f %s" % statsSumFile
00597 status = self.runCommand( command )
00598 if status!=Status.SUCCESS:
00599 return status
00600 return self.setState( State.SUMMARY_READY, JobType.ADDSTATS, jobName,
00601 runNumber)
00602
00603 def printState(self, jobName, runNumber, seqNumber):
00604 """Print State information for this job/run/sequence"""
00605 summaryState = self.getState(JobType.SUMMARY, jobName, runNumber)
00606 print "Run %07d: %s" % (runNumber, State.getName(summaryState.state))
00607 currentSequences = []
00608 if seqNumber != None:
00609
00610 currentSequences.append(seqNumber)
00611 else:
00612
00613 currentSequences += self.sequencesInRun(runNumber)
00614 for sequence in currentSequences:
00615 runState = self.getState(JobType.RUN, jobName, runNumber, sequence)
00616 statsState = self.getState(JobType.ADDSTATS, jobName, runNumber,
00617 sequence)
00618 print " Seq %04d: %s %s" % (sequence,
00619 State.getName(runState.state),
00620 State.getName(statsState.state))
00621 return Status.SUCCESS
00622
00623 def resolveRunSequenceFile(self, runNumber, seqNumber, fileName):
00624 """Resolve the Run Number, Sequence Number, and Filename,
00625 given a subset of the information.
00626 """
00627 badResolve = [Status.FAILURE, None, None, None]
00628 if fileName!=None:
00629
00630 fileDesc = FileDescription(fileName)
00631 if not fileDesc.isValid:
00632 print "Invalid file: ",fileName
00633 return badResolve
00634 if runNumber!=None and runNumber!=fileDesc.runNumber:
00635 print "Error: runNumber %d invalid for file %s"
00636 return badResolve
00637 if seqNumber!=None and seqNumber!=fileDesc.sequence:
00638 print "Error: runNumber %d invalid for file %s"
00639 return badResolve
00640 return [Status.SUCCESS, fileDesc.runNumber,
00641 fileDesc.sequence, fileName]
00642 else:
00643
00644 if runNumber==None or seqNumber==None:
00645 print "Error: Either Filename or Run/Seq must be specified."
00646 return badResolve
00647 import DybPython.Catalog as Catalog
00648 fileList = Catalog.runs[runNumber]
00649 for fname in fileList:
00650 fileDesc = FileDescription(fname)
00651 if not fileDesc.isValid:
00652 print "Skipping invalid file from Catalog: ",fname
00653 continue
00654 if fileDesc.sequence==seqNumber:
00655
00656 return [Status.SUCCESS, runNumber, seqNumber, fname]
00657 return badResolve
00658
00659 def makeJobCommand(self, jobName, runNumber, seqNumber, fileName):
00660 """ Make the command to be run for this job """
00661
00662 commandArgs = {}
00663 commandArgs["dataFile"] = fileName
00664 commandArgs["outLog"] = self.siteConfig.logOutFile(jobName,
00665 runNumber,
00666 seqNumber)
00667 commandArgs["errLog"] = self.siteConfig.logErrFile(jobName,
00668 runNumber,
00669 seqNumber)
00670 statsFile = None
00671 if self.jobConfig.hasStats:
00672 statsFile = self.siteConfig.statsSeqFile(jobName, runNumber,
00673 seqNumber)
00674 commandArgs["statsFile"] = statsFile
00675 command = self.jobConfig.command % commandArgs
00676 return command
00677
00678 def ensureDirectories(self, directories):
00679 """Check if directories exist, and make if necessary"""
00680 import os
00681 for directory in directories:
00682 if not os.path.exists(directory):
00683 command = "mkdir -p %s" % directory
00684 status = self.runCommand( command )
00685 if status!=Status.SUCCESS:
00686 return status
00687 return Status.SUCCESS
00688
00689
00690
00691 def setLock(self, lockFile):
00692 """ Request a lockfile, and wait for lock """
00693 command = "lockfile %s" % lockFile
00694 return self.runCommand(command)
00695
00696 def isLocked(self, lockFile):
00697 """ Check whether a lockfile exists """
00698 import os
00699 return os.path.isfile(lockFile)
00700
00701 def removeLock(self, lockFile):
00702 """ Remove a lockfile """
00703 command = "rm -f %s" % lockFile
00704 return self.runCommand(command)
00705
00706 def lockStats(self, jobName, runNumber):
00707 """ Lock statistics """
00708 statsLockFile = self.siteConfig.statsLockFile(jobName, runNumber)
00709 return self.setLock( statsLockFile )
00710
00711 def statsLocked(self, jobName, runNumber):
00712 """ Are statistics locked? """
00713 statsLockFile = self.siteConfig.statsLockFile(jobName, runNumber)
00714 return self.isLocked( statsLockFile )
00715
00716 def unlockStats(self, jobName, runNumber):
00717 """ Unlock statistics """
00718 statsLockFile = self.siteConfig.statsLockFile(jobName, runNumber)
00719 return self.removeLock( statsLockFile )
00720
00721 def lockRun(self, runNumber):
00722 """ Lock run summary """
00723 runLockFile = self.siteConfig.runLockFile(runNumber)
00724 return self.setLock( runLockFile )
00725
00726 def runLocked(self, runNumber):
00727 """ Is run summary locked? """
00728 runLockFile = self.siteConfig.runLockFile(runNumber)
00729 return self.isLocked( runLockFile )
00730
00731 def unlockRun(self, runNumber):
00732 """ Unlock run summary """
00733 runLockFile = self.siteConfig.runLockFile(runNumber)
00734 return self.removeLock( runLockFile )
00735
00736 def lockRunIndex(self):
00737 """ Lock index of runs """
00738 lockFile = self.siteConfig.runIndexLockFile()
00739 return self.setLock( lockFile )
00740
00741 def runIndexLocked(self):
00742 """ Is run summary locked? """
00743 lockFile = self.siteConfig.runIndexLockFile()
00744 return self.isLocked( lockFile )
00745
00746 def unlockRunIndex(self):
00747 """ Unlock run summary """
00748 lockFile = self.siteConfig.runIndexLockFile()
00749 return self.removeLock( lockFile )
00750
00751
00752
00753 def getState(self, jobType, jobName, runNumber, seqNumber=None):
00754 """ Check the state of a job """
00755 import os
00756 stateFile = self.siteConfig.stateFile(jobType, jobName, runNumber, seqNumber)
00757 if not os.path.isfile( stateFile ):
00758
00759 return JobState()
00760 return self.getStateFromFile( stateFile )
00761
00762 def setState(self, newState, jobType, jobName, runNumber, seqNumber=None):
00763 """ Set the state of a job """
00764 stateFile = self.siteConfig.stateFile(jobType, jobName, runNumber, seqNumber)
00765 return self.setStateInFile(stateFile, newState)
00766
00767 def setStateInFile(self, stateFile, newState):
00768 """Set the job state in the given state file"""
00769 stateName = State.getName( newState )
00770 import time, datetime
00771 stateTimeStr = time.ctime()
00772 stateLogStr = self.stateSep.join([stateName, stateTimeStr])
00773 import os
00774 status = self.ensureDirectories( [os.path.dirname( stateFile )] )
00775 if status != Status.SUCCESS:
00776 return status
00777 command = "echo '%s' >> %s" % (stateLogStr, stateFile)
00778 return self.runCommand( command )
00779
00780 def getStateFromFile(self, stateFile):
00781 """ Get state from file (Internal function) """
00782 lastLine = self.lastLine(stateFile)
00783 if len(lastLine)==0:
00784 return JobState()
00785 stateParts = lastLine.split(self.stateSep)
00786 if len(stateParts) != self.stateNParts:
00787 return JobState()
00788
00789 stateStr = stateParts[0].strip()
00790 state = State.getStateByName(stateStr)
00791
00792 stateTimeStr = stateParts[1].strip()
00793 import time
00794 ttuple = time.strptime( stateTimeStr )
00795 stateTime = time.mktime( ttuple )
00796 return JobState(state, stateTime)
00797
00798 def lastLine(self, filename):
00799 """Get the last line from a file"""
00800 import subprocess
00801 tailProc = subprocess.Popen("tail -n 1 %s" % filename,
00802 stdout=subprocess.PIPE,
00803 shell=True)
00804 (out, err) = tailProc.communicate()
00805 status = tailProc.wait()
00806 if status!=Status.SUCCESS or type(out)!=str:
00807 return ""
00808 return out.strip()
00809