| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

Manager::Manager Class Reference

List of all members.

Public Member Functions

def __init__
def initializeOptions
def parseOptions
def run
def runOneJob
def runBatch
def runCommand
def sequencesInRun
def processSequence
def addSequenceStats
def generateFigures
def updateRunIndex
def updateFullIndex
def clearSequence
def clearSequenceStats
def clearSummary
def printState
def resolveRunSequenceFile
def makeJobCommand
def ensureDirectories
def setLock
def isLocked
def removeLock
def lockStats
def statsLocked
def unlockStats
def lockRun
def runLocked
def unlockRun
def lockRunIndex
def runIndexLocked
def unlockRunIndex
def getState
def setState
def setStateInFile
def getStateFromFile
def lastLine

Detailed Description

Class which manages NuWa jobs

Definition at line 14 of file Manager.py.


Member Function Documentation

def Manager::Manager::__init__ (   self  ) 

Constructor

Definition at line 16 of file Manager.py.

00016                       :
00017         """Constructor"""
00018         self.jobConfig = None
00019         self.siteConfig = None
00020         self.stateSep=','
00021         self.stateNParts=2
00022         self.initializeOptions()
00023         return
00024 
    def initializeOptions(self):

def Manager::Manager::initializeOptions (   self  ) 

Add options to command line parser 

Definition at line 25 of file Manager.py.

00025                                :
00026         """ Add options to command line parser """
00027         from optparse import OptionParser
00028         from DybPython.hostid import hostid
00029         parser = OptionParser(usage=self.__doc__)
00030 
00031         parser.add_option("-S", "--summarize-run",action="store_true",
00032                           default=False, help="Summarize run histograms")
00033 
00034         parser.add_option("", "--print-state",action="store_true",
00035                           default=False, help="Print the job state of a run")
00036 
00037         parser.add_option("", "--clear-sequence",action="store_true",
00038                           default=False, help="Clear NuWa Job state for reprocessing")
00039 
00040         parser.add_option("", "--clear-sequence-stats",action="store_true",
00041                           default=False, help="Clear Add Stats state for readding")
00042 
00043         parser.add_option("", "--clear-summary",action="store_true",
00044                           default=False, help="Clear run summary for reprocessing")
00045 
00046         parser.add_option("", "--run-nuwa",action="store_true",
00047                           default=False, help="Process NuWa Job")
00048 
00049         parser.add_option("", "--add-stats",action="store_true",
00050                           default=False, help="Add root file to run total")
00051 
00052         parser.add_option("", "--make-figures",action="store_true",
00053                           default=False, help="Make figures for this job/run")
00054 
00055         parser.add_option("", "--xml-index",action="store_true",
00056                           default=False, help="Make XML index for this job/run")
00057 
00058         parser.add_option("", "--xml-add-run",action="store_true",
00059                           default=False, help="Add run to site XML index")
00060 
00061         parser.add_option("-j", "--job-name",default=None,type="string",
00062                           help="Name of NuWa Job")
00063 
00064         parser.add_option("-r", "--run-number",default=None,type="int",
00065                           help="Run Number for NuWa Job")
00066 
00067         parser.add_option("-A", "--all-sequences",action="store_true",
00068                           default=False, help="Process all sequences in run")
00069 
00070         parser.add_option("-s", "--sequence",default=None,type="int",
00071                           help="File Sequence Number for NuWa Job")
00072 
00073         parser.add_option("-f", "--filename",default=None,type="string",
00074                           help="Data file name for current job")
00075 
00076         parser.add_option("-l", "--log-level",default=3,type="int",
00077                           help="Set output log level")
00078 
00079         parser.add_option("-b", "--batch",action="store_true",default=False,
00080                           help="Send this job to the batch queue")
00081 
00082         parser.add_option("-c", "--cluster",default="pdsf",type="string",
00083                           help="Use the configuration for this cluster")
00084 
00085         parser.add_option("-d", "--dry-run",action="store_true",default=False,
00086                           help="Debug run; don't run any actual jobs")
00087 
00088         self.opts_parser = parser
00089 
00090         return
00091 
00092 
    def parseOptions(self, argv):

def Manager::Manager::parseOptions (   self,
  argv 
)

Parse command line options to given to Manager

Definition at line 93 of file Manager.py.

00093                                 :
00094         """Parse command line options to given to Manager"""
00095         self.argv = argv
00096         (options, args) = self.opts_parser.parse_args(args=argv)
00097         self.opts = options
00098         self.args = args
00099 
00100         # Translate compound option
00101         if self.opts.summarize_run:
00102             self.opts.make_figures = True
00103             self.opts.xml_index = True
00104             self.opts.xml_add_run = True
00105 
00106         if self.opts.filename!=None or (self.opts.run_number!=None
00107                                         and self.opts.sequence!=None):
00108             # Check for consistency of file, run, sequence
00109             # and fill out any unset parts if possible
00110             [status, run, seq, fname] = self.resolveRunSequenceFile(
00111                 self.opts.run_number,
00112                 self.opts.sequence,
00113                 self.opts.filename)
00114             if status != Status.SUCCESS: return status
00115             self.opts.run_number = run
00116             self.opts.sequence = seq
00117             self.opts.filename = fname
00118 
00119         # Check consistentcy of command line arguments
00120         if (self.opts.run_nuwa
00121             or self.opts.add_stats
00122             or self.opts.make_figures
00123             or self.opts.xml_index
00124             or self.opts.xml_add_run
00125             or self.opts.clear_sequence
00126             or self.opts.clear_sequence_stats
00127             or self.opts.clear_summary
00128             or self.opts.print_state):
00129             if self.opts.run_number==None:
00130                 print "Error: No run number has been found or provided"
00131                 return Status.FAILURE
00132         if (self.opts.run_nuwa
00133             or self.opts.add_stats
00134             or self.opts.make_figures
00135             or self.opts.xml_index
00136             or self.opts.clear_sequence
00137             or self.opts.clear_sequence_stats
00138             or self.opts.clear_summary
00139             or self.opts.print_state):            
00140             if self.opts.job_name==None:
00141                 print "Error: A job name must be provided: use '-j'"
00142                 return Status.FAILURE
00143             if jobConfigs.has_key(self.opts.job_name):
00144                 self.jobConfig = jobConfigs[self.opts.job_name]
00145             else:
00146                 print "Error: Unknown job name '%s'" % self.opts.job_name
00147                 return Status.FAILURE
00148         if (self.opts.run_nuwa
00149             or self.opts.clear_sequence
00150             or self.opts.clear_sequence_stats) and not self.opts.all_sequences:
00151             if self.opts.sequence==None:
00152                 print "Error: No sequence number has been found or provided"
00153                 return Status.FAILURE
00154             if self.opts.filename==None:
00155                 print "Error: No data filename has been found or provided"
00156                 return Status.FAILURE
00157 
00158         if self.opts.all_sequences:
00159             if self.opts.sequence!=None:
00160                 print "Error: Do not set sequence when processing all sequences"
00161                 return Status.FAILURE
00162             if self.opts.filename!=None:
00163                 print "Error: Do not set filename when processing all sequences"
00164                 return Status.FAILURE
00165             if not (self.opts.run_nuwa
00166                     or self.opts.clear_sequence
00167                     or self.opts.clear_sequence_stats):
00168                 print "Error: Requesting 'all-sequences' but not 'run-nuwa', 'clear-sequence', or 'clear-sequence-stats'?"
00169                 return Status.FAILURE
00170 
00171         if siteConfigs.has_key(self.opts.cluster):
00172             self.siteConfig = siteConfigs[self.opts.cluster]
00173         else:
00174             print "Error: Unknown computer cluster '%s'; add to SiteConfigs" % self.opts.cluster
00175             return Status.FAILURE
00176 
00177         return Status.SUCCESS
00178 
    def run(self):

def Manager::Manager::run (   self  ) 

Run the manager 

Definition at line 179 of file Manager.py.

00179                  :
00180         """ Run the manager """
00181         import os
00182         os.umask(2)
00183         if self.opts.all_sequences:
00184             # Loop over all sequences in run, submit one job for each
00185             import DybPython.Catalog as Catalog
00186             fileList = Catalog.runs[self.opts.run_number]
00187             for fname in fileList:
00188                 fileDesc = FileDescription(fname)
00189                 if not fileDesc.isValid:
00190                     print "Skipping invalid file: ",fname
00191                     continue
00192                 self.opts.filename = fname
00193                 self.opts.sequence = fileDesc.sequence
00194                 status = self.runOneJob()
00195                 self.opts.filename = None
00196                 self.opts.sequence = None
00197                 if status != Status.SUCCESS: return status
00198         else:
00199             status = self.runOneJob()
00200             if status != Status.SUCCESS: return status
00201         return Status.SUCCESS
00202 
    def runOneJob(self):

def Manager::Manager::runOneJob (   self  ) 

Run a single job 

Definition at line 203 of file Manager.py.

00203                        :
00204         """ Run a single job """
00205         if self.opts.batch:
00206             #  Catch batch submissions, and route to batch queue
00207             return self.runBatch(self.opts.job_name,
00208                                  self.opts.run_number,
00209                                  self.opts.sequence)
00210         # Not a batch job, so start working
00211         if self.opts.print_state:
00212             # Print the state for this job/run/seq
00213             status = self.printState(self.opts.job_name,
00214                                      self.opts.run_number,
00215                                      self.opts.sequence)
00216             if status != Status.SUCCESS: return status
00217         if self.opts.clear_sequence:
00218             # Clear one sequence state for reprocessing
00219             status = self.clearSequence(self.opts.job_name,
00220                                         self.opts.run_number,
00221                                         self.opts.sequence)
00222             if status != Status.SUCCESS: return status
00223             if self.jobConfig.hasStats:
00224                 status = self.clearSequenceStats(self.opts.job_name,
00225                                                  self.opts.run_number,
00226                                                  self.opts.sequence)
00227                 if status != Status.SUCCESS: return status
00228         if self.opts.clear_sequence_stats:
00229             # Clear one sequence stats for reprocessing
00230             if self.jobConfig.hasStats:
00231                 status = self.clearSequenceStats(self.opts.job_name,
00232                                                  self.opts.run_number,
00233                                                  self.opts.sequence)
00234             if status != Status.SUCCESS: return status
00235         if self.opts.clear_summary:
00236             # Clear summary for reprocessing
00237             status = self.clearSummary(self.opts.job_name, self.opts.run_number)
00238             if status != Status.SUCCESS: return status
00239         if self.opts.run_nuwa:
00240             # Process one nuwa job
00241             status = self.processSequence(self.opts.job_name,
00242                                           self.opts.run_number,
00243                                           self.opts.sequence,
00244                                           self.opts.filename)
00245             if status != Status.SUCCESS: return status
00246         if self.opts.add_stats:
00247             # Add root file to run total
00248             status = self.addSequenceStats(self.opts.job_name,
00249                                            self.opts.run_number,
00250                                            self.opts.sequence)
00251             if status != Status.SUCCESS: return status
00252         if self.opts.make_figures:
00253             # Generate current figures for this job
00254             status = self.generateFigures(self.opts.job_name,
00255                                           self.opts.run_number)
00256             if status != Status.SUCCESS: return status
00257         if self.opts.xml_index:
00258             # Update run xml index for this job
00259             status = self.updateRunIndex(self.opts.job_name,
00260                                          self.opts.run_number)
00261             if status != Status.SUCCESS: return status
00262         if self.opts.xml_add_run:
00263             # Add this run to the total index if needed
00264             status = self.updateFullIndex(self.opts.run_number)
00265             if status != Status.SUCCESS: return status
00266         return Status.SUCCESS
00267 
    def runBatch(self, jobName, runNumber, seqNumber):

def Manager::Manager::runBatch (   self,
  jobName,
  runNumber,
  seqNumber 
)

Submit current command to the batch queue

Definition at line 268 of file Manager.py.

00268                                                      :
00269         """Submit current command to the batch queue"""
00270         batchArgs = self.argv[:]
00271         for arg in ['--batch','-b','--all-sequences']:
00272             while arg in batchArgs:
00273                 batchArgs.remove(arg)
00274         if self.opts.all_sequences:
00275             # Insert current sequence number in batch command line
00276             batchArgs += ['-s %d'%self.opts.sequence]
00277         requiredDirectories = [ self.siteConfig.batchLogDirectory(runNumber) ]
00278         status = self.ensureDirectories( requiredDirectories )
00279         if status!=Status.SUCCESS:
00280             return status
00281         # Run current job in batch mode
00282         batchCommand = self.siteConfig.batchCommand(batchArgs, jobName,
00283                                                     runNumber, seqNumber)
00284         self.runCommand( batchCommand )
00285         return Status.SUCCESS
00286 
    def runCommand(self, command):

def Manager::Manager::runCommand (   self,
  command 
)

Run a single shell command

Definition at line 287 of file Manager.py.

00287                                  :
00288         """Run a single shell command"""
00289         print "[Running: ]", command
00290         if not self.opts.dry_run:
00291             import subprocess
00292             process = subprocess.Popen(command, shell=True)
00293             return process.wait()
00294         return Status.SUCCESS
00295 
    def sequencesInRun(self, runNumber):

def Manager::Manager::sequencesInRun (   self,
  runNumber 
)

Get list of sequences in run from Catalog

Definition at line 296 of file Manager.py.

00296                                        :
00297         """Get list of sequences in run from Catalog"""
00298         seqList = []
00299         import DybPython.Catalog as Catalog
00300         fileList = Catalog.runs[self.opts.run_number]
00301         for fname in fileList:
00302             fileDesc = FileDescription(fname)
00303             if not fileDesc.isValid:
00304                 print "Skipping invalid file: ",fname
00305                 continue
00306             seqList.append( fileDesc.sequence )
00307         return seqList
00308 
    def processSequence(self, jobName, runNumber, seqNumber, fileName):

def Manager::Manager::processSequence (   self,
  jobName,
  runNumber,
  seqNumber,
  fileName 
)

Process the NuWa job with the given name for one file

Definition at line 309 of file Manager.py.

00309                                                                       :
00310         """Process the NuWa job with the given name for one file"""
00311         # Run one sequence
00312         print "Processing: %s_%07d_%04d (%s)" % (jobName, runNumber,
00313                                                  seqNumber, fileName)
00314         seqState = self.getState( JobType.RUN, jobName, runNumber, seqNumber)
00315         if seqState.state==State.RUN_DONE:
00316             # Skipping processed sequence
00317             print "processSequence: Run %d seq %d already processed." % (runNumber,
00318                                                                          seqNumber)
00319             return Status.SUCCESS
00320         if seqState.state==State.RUN_FAILED:
00321             # Skipping problematic sequence
00322             print "processSequence: Run %d seq %d has problems." % (runNumber,
00323                                                                     seqNumber)
00324             return Status.SUCCESS
00325         self.setState( State.RUN_PROCESSING, JobType.RUN, jobName, runNumber, seqNumber)
00326         requiredDirectories = [ self.siteConfig.logDirectory(runNumber) ]
00327         if self.jobConfig.hasStats:
00328             # Make sure output directory exists
00329             requiredDirectories += [ self.siteConfig.statsDirectory(runNumber) ]
00330         status = self.ensureDirectories( requiredDirectories )
00331         if status!=Status.SUCCESS:
00332             self.setState( State.RUN_FAILED, JobType.RUN, jobName, runNumber, seqNumber)
00333             return status
00334         command = self.makeJobCommand(jobName, runNumber, seqNumber, fileName)
00335         # Run Command
00336         status = self.runCommand( command )
00337         if status!=Status.SUCCESS:
00338             self.setState( State.RUN_FAILED, JobType.RUN, jobName, runNumber, seqNumber)
00339             return status
00340         self.setState( State.RUN_DONE, JobType.RUN, jobName, runNumber, seqNumber)
00341         return Status.SUCCESS
00342 
    def addSequenceStats(self, jobName, runNumber, seqNumber):

def Manager::Manager::addSequenceStats (   self,
  jobName,
  runNumber,
  seqNumber 
)

Add the sequence histograms to run total

Definition at line 343 of file Manager.py.

00343                                                              :
00344         """Add the sequence histograms to run total"""
00345         # Add ROOT files
00346         status = self.lockStats(jobName, runNumber)
00347         if status!=Status.SUCCESS:
00348             self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, runNumber,
00349                            seqNumber)
00350             return status
00351         currentSequences = []
00352         if seqNumber != None:
00353             # Add only the requested sequence
00354             currentSequences.append(seqNumber)
00355         else:
00356             # Loop over all sequences for this run
00357             currentSequences += self.sequencesInRun(runNumber)
00358         print "Add Sequence Stats: %s_%07d: %s" % (jobName, runNumber,
00359                                                    str(currentSequences))
00360         import os
00361         statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00362         statsTempFile = "%s.tmp.%d" % (statsSumFile, os.getpid())
00363         # Loop over sequences ROOT output, and add to run sum
00364         status=Status.SUCCESS
00365         for sequence in currentSequences:
00366             seqState = self.getState( JobType.ADDSTATS, jobName, runNumber, sequence)
00367             if seqState.state==State.STATS_DONE:
00368                 # Skipping processed sequence
00369                 print "AddStats: Run %d seq %d already added." % (runNumber, sequence)
00370                 continue
00371             if seqState.state==State.STATS_FAILED:
00372                 # Skipping problematic sequence
00373                 print "AddStats: Run %d seq %d has problems." % (runNumber, sequence)
00374                 continue
00375             self.setState( State.STATS_ADDING, JobType.ADDSTATS, jobName, runNumber,
00376                            sequence)
00377             command = None
00378             statsSeqFile = self.siteConfig.statsSeqFile(jobName, runNumber,
00379                                                         sequence)
00380             if os.path.isfile(statsSumFile):
00381                 command = '$RUNDIAGNOSTICSROOT/share/mergeFiles.py -o %(statsTempFile)s %(statsSumFile)s %(statsSeqFile)s' % {
00382                     "statsTempFile":statsTempFile,
00383                     "statsSumFile":statsSumFile,
00384                     "statsSeqFile":statsSeqFile
00385                     }
00386             else:
00387                 command = "cp %s %s" % (statsSeqFile, statsTempFile)
00388             # Run Command
00389             # Set job status to processing
00390             status = self.runCommand( command )
00391             if status!=Status.SUCCESS:
00392                 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, runNumber,
00393                                sequence)
00394                 break
00395             # Move temp file to replace total
00396             command = "mv %s %s" % (statsTempFile, statsSumFile)
00397             status = self.runCommand( command )
00398             if status!=Status.SUCCESS:
00399                 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, runNumber,
00400                                sequence)
00401                 break
00402             self.setState( State.STATS_DONE, JobType.ADDSTATS, jobName, runNumber,
00403                            sequence)
00404         if os.path.isfile(statsTempFile):
00405             command = "rm %s" % (statsTempFile)
00406             self.runCommand( command )
00407         self.unlockStats(jobName, runNumber)
00408         return status
00409 
    def generateFigures(self, jobName, runNumber):

def Manager::Manager::generateFigures (   self,
  jobName,
  runNumber 
)

Generate figures for this run

Definition at line 410 of file Manager.py.

00410                                                  :
00411         """Generate figures for this run"""
00412         print "Making figures: %s_%07d" % (jobName, runNumber)
00413         if self.opts.add_stats:
00414             # Add a short break to allow other add-stats jobs to proceed
00415             import time
00416             time.sleep(10)
00417         requiredDirectories = [ self.siteConfig.figuresDirectory(runNumber) ]
00418         status = self.ensureDirectories( requiredDirectories )
00419         if status!=Status.SUCCESS:
00420             return status
00421         if self.statsLocked(jobName, runNumber):
00422             # Postpone figure generation for a later time
00423             print "Postponing figure generation"
00424             return Status.SUCCESS
00425         status = self.lockStats(jobName, runNumber)
00426         if status!=Status.SUCCESS:
00427             return status
00428         self.setState( State.SUMMARY_PRINTING, JobType.SUMMARY, jobName, runNumber)
00429         statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00430         figuresDir = self.siteConfig.figuresDirectory(runNumber)
00431         command = "$RUNDIAGNOSTICSROOT/share/figures.py -o %(figuresDir)s -r stats/diagnostics/run_%(runNumber)07d %(statsSumFile)s" % {
00432             "figuresDir":figuresDir,
00433             "runNumber":runNumber,
00434             "statsSumFile":statsSumFile
00435             }
00436         # Run Command
00437         status = self.runCommand( command )
00438         if status!=Status.SUCCESS:
00439             self.setState( State.SUMMARY_FAILED_PRINTING, JobType.SUMMARY,
00440                            jobName, runNumber)
00441             status = self.unlockStats(jobName, runNumber)
00442             return status
00443         self.setState( State.SUMMARY_PRINTED, JobType.SUMMARY, jobName, runNumber)
00444         status = self.unlockStats(jobName, runNumber)
00445         return status
00446 
    def updateRunIndex(self, jobName, runNumber):

def Manager::Manager::updateRunIndex (   self,
  jobName,
  runNumber 
)

Update/Create XML index for this run, with results from Job Name

Definition at line 447 of file Manager.py.

00447                                                 :
00448         """Update/Create XML index for this run, with results from Job Name"""
00449         import os
00450         print "Updating Run XML: %s_%07d" % (jobName, runNumber)
00451         requiredDirectories = [ self.siteConfig.xmlDirectory(runNumber) ]
00452         status = self.ensureDirectories( requiredDirectories )
00453         if status!=Status.SUCCESS:
00454             return status        
00455         if self.statsLocked(jobName, runNumber) or self.runLocked(runNumber):
00456             # Postpone run summary for a later time
00457             print "Postponing update of run index"
00458             return Status.SUCCESS
00459         status = self.lockStats(jobName, runNumber)
00460         if status!=Status.SUCCESS:
00461             return status
00462         status = self.lockRun(runNumber)
00463         if status!=Status.SUCCESS:
00464             self.unlockStats(jobName, runNumber)
00465             return status
00466         statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00467         statsInstallPath = self.siteConfig.statsInstallPath(jobName, runNumber)
00468         xmlIndexFile = self.siteConfig.xmlIndexFile(runNumber)
00469         indexTempFile = "%s.tmp.%d" % (xmlIndexFile, os.getpid())
00470         figPathReplace = self.siteConfig.figuresPathReplace(runNumber)
00471         command = "$RUNDIAGNOSTICSROOT/share/xmlIndex.py -f %(statsSumFile)s -o %(indexTempFile)s -p %(statsInstallPath)s -r %(figPathReplace)s" % {
00472             "statsSumFile":statsSumFile,
00473             "indexTempFile":indexTempFile,
00474             "statsInstallPath":statsInstallPath,
00475             "figPathReplace":figPathReplace
00476             }
00477         # Run Command
00478         self.setState( State.SUMMARY_INDEXING, JobType.SUMMARY, jobName, runNumber)
00479         status = self.runCommand( command )
00480         if status!=Status.SUCCESS:
00481             self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY,
00482                            jobName, runNumber)
00483             self.unlockStats(jobName, runNumber)
00484             self.unlockRun(runNumber)
00485             return status
00486         import os
00487         if os.path.isfile(xmlIndexFile):
00488             # Merge new results into current index
00489             mergedFile = "%s.merge" % indexTempFile
00490             command = "$RUNDIAGNOSTICSROOT/share/xmlIndex.py -o %(mergedFile)s -i %(xmlIndexFile)s -u %(indexTempFile)s" % {
00491             "mergedFile":mergedFile,
00492             "xmlIndexFile":xmlIndexFile,
00493             "indexTempFile":indexTempFile
00494             }
00495             status = self.runCommand( command )
00496             if status!=Status.SUCCESS:
00497                 self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY,
00498                                jobName, runNumber)
00499                 self.unlockStats(jobName, runNumber)
00500                 self.unlockRun(runNumber)
00501                 return status
00502             # Move temp file to replace index
00503             command = "mv %s %s" % (mergedFile, indexTempFile)
00504             status = self.runCommand( command )
00505             if status!=Status.SUCCESS:
00506                 self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY,
00507                                jobName, runNumber)
00508                 self.unlockStats(jobName, runNumber)
00509                 self.unlockRun(runNumber)
00510                 return status
00511         # Move temp file to replace index
00512         command = "mv %s %s" % (indexTempFile, xmlIndexFile)
00513         status = self.runCommand( command )
00514         if status!=Status.SUCCESS:
00515             self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY,
00516                            jobName, runNumber)
00517             self.unlockStats(jobName, runNumber)
00518             self.unlockRun(runNumber)
00519             return status        
00520         self.setState( State.SUMMARY_DONE, JobType.SUMMARY, jobName, runNumber)
00521         self.unlockStats(jobName, runNumber)
00522         self.unlockRun(runNumber)
00523         return Status.SUCCESS
00524 
    def updateFullIndex(self, runNumber):

def Manager::Manager::updateFullIndex (   self,
  runNumber 
)

Make sure this run is present/added to index of all runs

Definition at line 525 of file Manager.py.

00525                                         :
00526         """Make sure this run is present/added to index of all runs"""
00527         # Add/Verify Run is in index of all runs
00528         import os
00529         print "Updating Run List XML: %07d" % (runNumber)
00530         status = self.lockRunIndex()
00531         if status!=Status.SUCCESS:
00532             self.unlockRun(runNumber)
00533             return status
00534         xmlIndexFile = self.siteConfig.xmlIndexFile(runNumber)
00535         mainIndexFile = self.siteConfig.runIndexFile()
00536         mainFileDir = self.siteConfig.runIndexDir()
00537         mainTempFile = "%s.tmp.%d" % (mainIndexFile, os.getpid())
00538         xmlRelativeDir = self.siteConfig.xmlRelativeDir(runNumber)
00539         requiredFiles = ['runs.xml','index.html','runs.css','runs.js']
00540         for filename in requiredFiles:
00541             fullFilePath = mainFileDir+ '/' + filename
00542             if not os.path.isfile(fullFilePath):
00543                 # Make main index file
00544                 command = "cp $RUNDIAGNOSTICSROOT/output/%s %s" % (
00545                     filename,
00546                     fullFilePath
00547                     )
00548                 status = self.runCommand( command )
00549                 if status!=Status.SUCCESS:
00550                     self.unlockRunIndex()
00551                     return status
00552         # Run Command
00553         command = "$RUNDIAGNOSTICSROOT/share/xmlCollect.py -o %(mainTempFile)s -p %(xmlRelativeDir)s %(xmlIndexFile)s  %(mainIndexFile)s" % {
00554             "mainTempFile":mainTempFile,
00555             "xmlIndexFile":xmlIndexFile,
00556             "mainIndexFile":mainIndexFile,
00557             "xmlRelativeDir":xmlRelativeDir
00558             }
00559         status = self.runCommand( command )
00560         if status!=Status.SUCCESS:
00561             self.unlockRunIndex()
00562             return status
00563         command = "mv %s %s" % (mainTempFile, mainIndexFile)
00564         status = self.runCommand( command )
00565         if status!=Status.SUCCESS:
00566             self.unlockRunIndex()
00567             return status
00568         self.unlockRunIndex()
00569         return Status.SUCCESS
00570 
    def clearSequence(self, jobName, runNumber, seqNumber):

def Manager::Manager::clearSequence (   self,
  jobName,
  runNumber,
  seqNumber 
)

Clear the job result and state to allow reprocessing

Definition at line 571 of file Manager.py.

00571                                                           :
00572         """Clear the job result and state to allow reprocessing"""
00573         import os
00574         statsSeqFile = self.siteConfig.statsSeqFile(jobName, runNumber,
00575                                                     seqNumber)
00576         if os.path.isfile(statsSeqFile):
00577             command = "rm -f %s" % statsSeqFile
00578             status = self.runCommand( command )
00579             if status!=Status.SUCCESS:
00580                 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName,
00581                               runNumber, seqNumber) 
00582                 return status
00583         return self.setState( State.RUN_READY, JobType.RUN, jobName, runNumber,
00584                               seqNumber) 
00585 
    def clearSequenceStats(self, jobName, runNumber, seqNumber):

def Manager::Manager::clearSequenceStats (   self,
  jobName,
  runNumber,
  seqNumber 
)

Clear the job state to allow reprocessing

Definition at line 586 of file Manager.py.

00586                                                                :
00587         """Clear the job state to allow reprocessing"""
00588         return self.setState( State.STATS_READY, JobType.ADDSTATS, jobName,
00589                               runNumber, seqNumber) 
00590 
    def clearSummary(self, jobName, runNumber):

def Manager::Manager::clearSummary (   self,
  jobName,
  runNumber 
)

Clear the job summary and state to allow reprocessing

Definition at line 591 of file Manager.py.

00591                                               :
00592         """Clear the job summary and state to allow reprocessing"""
00593         import os
00594         statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00595         if os.path.isfile(statsSumFile):
00596             command = "rm -f %s" % statsSumFile
00597             status = self.runCommand( command )
00598             if status!=Status.SUCCESS:
00599                 return status
00600         return self.setState( State.SUMMARY_READY, JobType.ADDSTATS, jobName,
00601                               runNumber)
00602 
    def printState(self, jobName, runNumber, seqNumber):

def Manager::Manager::printState (   self,
  jobName,
  runNumber,
  seqNumber 
)

Print State information for this job/run/sequence

Definition at line 603 of file Manager.py.

00603                                                        :
00604         """Print State information for this job/run/sequence"""
00605         summaryState = self.getState(JobType.SUMMARY, jobName, runNumber)
00606         print "Run %07d: %s" % (runNumber, State.getName(summaryState.state))
00607         currentSequences = []
00608         if seqNumber != None:
00609             # Add only the requested sequence
00610             currentSequences.append(seqNumber)
00611         else:
00612             # Loop over all sequences for this run
00613             currentSequences += self.sequencesInRun(runNumber)
00614         for sequence in currentSequences:
00615             runState = self.getState(JobType.RUN, jobName, runNumber, sequence)
00616             statsState = self.getState(JobType.ADDSTATS, jobName, runNumber,
00617                                        sequence)
00618             print " Seq %04d: %s %s" % (sequence,
00619                                         State.getName(runState.state),
00620                                         State.getName(statsState.state)) 
00621         return Status.SUCCESS
00622 
    def resolveRunSequenceFile(self, runNumber, seqNumber, fileName):

def Manager::Manager::resolveRunSequenceFile (   self,
  runNumber,
  seqNumber,
  fileName 
)

Resolve the Run Number, Sequence Number, and Filename,
given a subset of the information.

Definition at line 623 of file Manager.py.

00623                                                                     :
00624         """Resolve the Run Number, Sequence Number, and Filename,
00625         given a subset of the information.
00626         """
00627         badResolve = [Status.FAILURE, None, None, None]
00628         if fileName!=None:
00629             # Check if filename is consistent with supplied run/seq numbers
00630             fileDesc = FileDescription(fileName)
00631             if not fileDesc.isValid:
00632                 print "Invalid file: ",fileName
00633                 return badResolve
00634             if runNumber!=None and runNumber!=fileDesc.runNumber:
00635                 print "Error: runNumber %d invalid for file %s"
00636                 return badResolve
00637             if seqNumber!=None and seqNumber!=fileDesc.sequence:
00638                 print "Error: runNumber %d invalid for file %s"
00639                 return badResolve
00640             return [Status.SUCCESS, fileDesc.runNumber,
00641                     fileDesc.sequence, fileName] 
00642         else:
00643             # Lookup filename from catalog
00644             if runNumber==None or seqNumber==None:
00645                 print "Error: Either Filename or Run/Seq must be specified."
00646                 return badResolve
00647             import DybPython.Catalog as Catalog
00648             fileList = Catalog.runs[runNumber]
00649             for fname in fileList:
00650                 fileDesc = FileDescription(fname)
00651                 if not fileDesc.isValid:
00652                     print "Skipping invalid file from Catalog: ",fname
00653                     continue
00654                 if fileDesc.sequence==seqNumber:
00655                     # Found a file match for run/seq number
00656                     return [Status.SUCCESS, runNumber, seqNumber, fname]
00657         return badResolve
00658 
    def makeJobCommand(self, jobName, runNumber, seqNumber, fileName):

def Manager::Manager::makeJobCommand (   self,
  jobName,
  runNumber,
  seqNumber,
  fileName 
)

Make the command to be run for this job 

Definition at line 659 of file Manager.py.

00659                                                                      :
00660         """ Make the command to be run for this job """
00661         # Prepare Job Command
00662         commandArgs = {}
00663         commandArgs["dataFile"] = fileName
00664         commandArgs["outLog"] = self.siteConfig.logOutFile(jobName,
00665                                                            runNumber,
00666                                                            seqNumber)
00667         commandArgs["errLog"] = self.siteConfig.logErrFile(jobName,
00668                                                            runNumber,
00669                                                            seqNumber)
00670         statsFile = None
00671         if self.jobConfig.hasStats:
00672             statsFile = self.siteConfig.statsSeqFile(jobName, runNumber,
00673                                                      seqNumber)
00674             commandArgs["statsFile"] = statsFile
00675         command = self.jobConfig.command % commandArgs
00676         return command
00677 
    def ensureDirectories(self, directories):

def Manager::Manager::ensureDirectories (   self,
  directories 
)

Check if directories exist, and make if necessary

Definition at line 678 of file Manager.py.

00678                                             :
00679         """Check if directories exist, and make if necessary"""
00680         import os
00681         for directory in directories:
00682             if not os.path.exists(directory):
00683                 command = "mkdir -p %s" % directory
00684                 status = self.runCommand( command )
00685                 if status!=Status.SUCCESS:
00686                     return status
00687         return Status.SUCCESS        
00688 
    ### Functions for handling job locks #################################

def Manager::Manager::setLock (   self,
  lockFile 
)

Request a lockfile, and wait for lock 

Definition at line 691 of file Manager.py.

00691                                :
00692         """ Request a lockfile, and wait for lock """
00693         command = "lockfile %s" % lockFile
00694         return self.runCommand(command)
00695 
    def isLocked(self, lockFile):

def Manager::Manager::isLocked (   self,
  lockFile 
)

Check whether a lockfile exists 

Definition at line 696 of file Manager.py.

00696                                 :
00697         """ Check whether a lockfile exists """
00698         import os
00699         return os.path.isfile(lockFile)
00700 
    def removeLock(self, lockFile):

def Manager::Manager::removeLock (   self,
  lockFile 
)

Remove a lockfile 

Definition at line 701 of file Manager.py.

00701                                   :
00702         """ Remove a lockfile """
00703         command = "rm -f %s" % lockFile
00704         return self.runCommand(command)
00705 
    def lockStats(self, jobName, runNumber):

def Manager::Manager::lockStats (   self,
  jobName,
  runNumber 
)

Lock statistics 

Definition at line 706 of file Manager.py.

00706                                            :
00707         """ Lock statistics """
00708         statsLockFile = self.siteConfig.statsLockFile(jobName, runNumber)
00709         return self.setLock( statsLockFile )
00710 
    def statsLocked(self, jobName, runNumber):

def Manager::Manager::statsLocked (   self,
  jobName,
  runNumber 
)

Are statistics locked? 

Definition at line 711 of file Manager.py.

00711                                              :
00712         """ Are statistics locked? """
00713         statsLockFile = self.siteConfig.statsLockFile(jobName, runNumber)
00714         return self.isLocked( statsLockFile )
00715 
    def unlockStats(self, jobName, runNumber):

def Manager::Manager::unlockStats (   self,
  jobName,
  runNumber 
)

Unlock statistics 

Definition at line 716 of file Manager.py.

00716                                              :
00717         """ Unlock statistics """
00718         statsLockFile = self.siteConfig.statsLockFile(jobName, runNumber)
00719         return self.removeLock( statsLockFile )
00720 
    def lockRun(self, runNumber):

def Manager::Manager::lockRun (   self,
  runNumber 
)

Lock run summary 

Definition at line 721 of file Manager.py.

00721                                 :
00722         """ Lock run summary """
00723         runLockFile = self.siteConfig.runLockFile(runNumber)
00724         return self.setLock( runLockFile )
00725 
    def runLocked(self, runNumber):

def Manager::Manager::runLocked (   self,
  runNumber 
)

Is run summary locked? 

Definition at line 726 of file Manager.py.

00726                                   :
00727         """ Is run summary locked? """
00728         runLockFile = self.siteConfig.runLockFile(runNumber)
00729         return self.isLocked( runLockFile )
00730 
    def unlockRun(self, runNumber):

def Manager::Manager::unlockRun (   self,
  runNumber 
)

Unlock run summary 

Definition at line 731 of file Manager.py.

00731                                   :
00732         """ Unlock run summary """
00733         runLockFile = self.siteConfig.runLockFile(runNumber)
00734         return self.removeLock( runLockFile )
00735 
    def lockRunIndex(self):

def Manager::Manager::lockRunIndex (   self  ) 

Lock index of runs 

Definition at line 736 of file Manager.py.

00736                           :
00737         """ Lock index of runs """
00738         lockFile = self.siteConfig.runIndexLockFile()
00739         return self.setLock( lockFile )
00740 
    def runIndexLocked(self):

def Manager::Manager::runIndexLocked (   self  ) 

Is run summary locked? 

Definition at line 741 of file Manager.py.

00741                             :
00742         """ Is run summary locked? """
00743         lockFile = self.siteConfig.runIndexLockFile()
00744         return self.isLocked( lockFile )
00745     
    def unlockRunIndex(self):

def Manager::Manager::unlockRunIndex (   self  ) 

Unlock run summary 

Definition at line 746 of file Manager.py.

00746                             :
00747         """ Unlock run summary """
00748         lockFile = self.siteConfig.runIndexLockFile()
00749         return self.removeLock( lockFile )
00750 
    ### Functions for handling job state #################################

def Manager::Manager::getState (   self,
  jobType,
  jobName,
  runNumber,
  seqNumber = None 
)

Check the state of a job 

Definition at line 753 of file Manager.py.

00753                                                                    :
00754         """ Check the state of a job """
00755         import os
00756         stateFile = self.siteConfig.stateFile(jobType, jobName, runNumber, seqNumber)
00757         if not os.path.isfile( stateFile ):
00758             # Job is unknown
00759             return JobState()
00760         return self.getStateFromFile( stateFile )
00761 
    def setState(self, newState, jobType, jobName, runNumber, seqNumber=None):

def Manager::Manager::setState (   self,
  newState,
  jobType,
  jobName,
  runNumber,
  seqNumber = None 
)

Set the state of a job 

Definition at line 762 of file Manager.py.

00762                                                                              :
00763         """ Set the state of a job """
00764         stateFile = self.siteConfig.stateFile(jobType, jobName, runNumber, seqNumber)
00765         return self.setStateInFile(stateFile, newState)
00766 
    def setStateInFile(self, stateFile, newState):

def Manager::Manager::setStateInFile (   self,
  stateFile,
  newState 
)

Set the job state in the given state file

Definition at line 767 of file Manager.py.

00767                                                  :
00768         """Set the job state in the given state file"""
00769         stateName = State.getName( newState )
00770         import time, datetime
00771         stateTimeStr = time.ctime() # Note: this is localtime
00772         stateLogStr = self.stateSep.join([stateName, stateTimeStr])
00773         import os
00774         status = self.ensureDirectories( [os.path.dirname( stateFile )] )
00775         if status != Status.SUCCESS:
00776             return status
00777         command = "echo '%s' >> %s" % (stateLogStr, stateFile)
00778         return self.runCommand( command )
00779 
    def getStateFromFile(self, stateFile):

def Manager::Manager::getStateFromFile (   self,
  stateFile 
)

Get state from file (Internal function) 

Definition at line 780 of file Manager.py.

00780                                          :
00781         """ Get state from file (Internal function) """
00782         lastLine = self.lastLine(stateFile)
00783         if len(lastLine)==0:
00784             return JobState()
00785         stateParts = lastLine.split(self.stateSep)
00786         if len(stateParts) != self.stateNParts:
00787             return JobState()
00788         # State
00789         stateStr = stateParts[0].strip()
00790         state = State.getStateByName(stateStr)
00791         # State time
00792         stateTimeStr = stateParts[1].strip()
00793         import time
00794         ttuple = time.strptime( stateTimeStr )
00795         stateTime = time.mktime( ttuple )
00796         return JobState(state, stateTime)
00797 
    def lastLine(self, filename):

def Manager::Manager::lastLine (   self,
  filename 
)

Get the last line from a file

Definition at line 798 of file Manager.py.

00798                                 :
00799         """Get the last line from a file"""
00800         import subprocess
00801         tailProc = subprocess.Popen("tail -n 1 %s" % filename,
00802                                     stdout=subprocess.PIPE,
00803                                     shell=True)
00804         (out, err) = tailProc.communicate()
00805         status = tailProc.wait()
00806         if status!=Status.SUCCESS or type(out)!=str:
00807             return ""
00808         return out.strip()
00809     
    


The documentation for this class was generated from the following file:
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Mon Apr 11 20:10:14 2011 for ProcessManager by doxygen 1.4.7