Source code for run_mpas_analysis

#!/usr/bin/env python
# Copyright (c) 2017,  Los Alamos National Security, LLC (LANS)
# and the University Corporation for Atmospheric Research (UCAR).
#
# Unless noted otherwise source code is licensed under the BSD license.
# Additional copyright and license information can be found in the LICENSE file
# distributed with this code, or at http://mpas-dev.github.com/license.html
#

"""
Runs MPAS-Analysis via a configuration file (e.g. `config.analysis`)
specifying analysis options.
"""
# Authors
# -------
# Xylar Asay-Davis, Phillip J. Wolfram

from __future__ import absolute_import, division, print_function, \
    unicode_literals

import matplotlib as mpl
import argparse
import traceback
import sys
import pkg_resources
import shutil
import os
from collections import OrderedDict

from mpas_analysis.configuration import MpasAnalysisConfigParser

from mpas_analysis.shared.io.utility import build_config_full_path, \
    make_directories

from mpas_analysis.shared.html import generate_html

from mpas_analysis.shared import AnalysisTask


[docs]def build_analysis_list(config, refConfig): # {{{ """ Build a list of analysis tasks. New tasks should be added here, following the approach used for existing analysis tasks. Parameters ---------- config : ``MpasAnalysisConfigParser`` object contains config options refConfig : ``MpasAnalysisConfigParser`` object contains config options for a reference run, or ``None`` if no config file for a reference run was specified Returns ------- analyses : list of ``AnalysisTask`` objects A list of all analysis tasks """ # Authors # ------- # Xylar Asay-Davis # choose the right rendering backend, depending on whether we're displaying # to the screen if not config.getboolean('plot', 'displayToScreen'): mpl.use('Agg') # analysis can only be imported after the right MPL renderer is selected from mpas_analysis import ocean from mpas_analysis import sea_ice from mpas_analysis.shared.climatology import MpasClimatologyTask, \ RefYearMpasClimatologyTask from mpas_analysis.shared.time_series import MpasTimeSeriesTask analyses = [] # Ocean Analyses oceanClimatolgyTask = MpasClimatologyTask(config=config, componentName='ocean') oceanTimeSeriesTask = MpasTimeSeriesTask(config=config, componentName='ocean') oceanIndexTask = MpasTimeSeriesTask(config=config, componentName='ocean', section='index') oceanRefYearClimatolgyTask = RefYearMpasClimatologyTask( config=config, componentName='ocean') analyses.append(oceanClimatolgyTask) analyses.append(oceanRefYearClimatolgyTask) analyses.append(ocean.ClimatologyMapMLD(config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapSST(config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapSSS(config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapSSH(config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapOHCAnomaly( config, oceanClimatolgyTask, oceanRefYearClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapSoseTemperature( config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapSoseSalinity( config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapSoseMLD( config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapSoseZonalVel( config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapSoseMeridVel( config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapSoseVelMag( config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapArgoTemperature( config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapArgoSalinity( config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapSchmidtko( config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.ClimatologyMapAntarcticMelt(config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.TimeSeriesAntarcticMelt(config, oceanTimeSeriesTask, refConfig)) analyses.append(ocean.TimeSeriesTemperatureAnomaly(config, oceanTimeSeriesTask)) analyses.append(ocean.TimeSeriesSalinityAnomaly(config, oceanTimeSeriesTask)) analyses.append(ocean.TimeSeriesOHCAnomaly(config, oceanTimeSeriesTask, refConfig)) analyses.append(ocean.TimeSeriesSST(config, oceanTimeSeriesTask, refConfig)) analyses.append(ocean.MeridionalHeatTransport(config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.StreamfunctionMOC(config, oceanClimatolgyTask, refConfig)) analyses.append(ocean.IndexNino34(config, oceanIndexTask, refConfig)) # Sea Ice Analyses seaIceClimatolgyTask = MpasClimatologyTask(config=config, componentName='seaIce') seaIceTimeSeriesTask = MpasTimeSeriesTask(config=config, componentName='seaIce') analyses.append(seaIceClimatolgyTask) analyses.append(sea_ice.ClimatologyMapSeaIceConc( config=config, mpasClimatologyTask=seaIceClimatolgyTask, hemisphere='NH', refConfig=refConfig)) analyses.append(sea_ice.ClimatologyMapSeaIceThick( config=config, mpasClimatologyTask=seaIceClimatolgyTask, hemisphere='NH', refConfig=refConfig)) analyses.append(sea_ice.ClimatologyMapSeaIceConc( config=config, mpasClimatologyTask=seaIceClimatolgyTask, hemisphere='SH', refConfig=refConfig)) analyses.append(sea_ice.ClimatologyMapSeaIceThick( config=config, mpasClimatologyTask=seaIceClimatolgyTask, hemisphere='SH', refConfig=refConfig)) analyses.append(seaIceTimeSeriesTask) analyses.append(sea_ice.TimeSeriesSeaIce(config, seaIceTimeSeriesTask, refConfig)) return analyses # }}}
[docs]def determine_analyses_to_generate(analyses): # {{{ """ Build a list of analysis tasks to run based on the 'generate' config option (or command-line flag) and prerequisites and subtasks of each requested task. Each task's ``setup_and_check`` method is called in the process. Parameters ---------- analyses : list of ``AnalysisTask`` objects A list of all analysis tasks Returns ------- analysesToGenerate : list of ``AnalysisTask`` objects A list of analysis tasks to run """ # Authors # ------- # Xylar Asay-Davis analysesToGenerate = OrderedDict() # check which analysis we actually want to generate and only keep those for analysisTask in analyses: # update the dictionary with this task and perhaps its subtasks add_task_and_subtasks(analysisTask, analysesToGenerate) return analysesToGenerate # }}}
[docs]def add_task_and_subtasks(analysisTask, analysesToGenerate, callCheckGenerate=True): # {{{ """ If a task has been requested through the generate config option or if it is a prerequisite of a requested task, add it to the dictionary of tasks to generate. Parameters ---------- analysisTask : ``AnalysisTask`` A task to be added analysesToGenerate : ``OrderedDict`` of ``AnalysisTask`` The list of analysis tasks to be generated, which this call may update to include this task and its subtasks callCheckGenerate : bool Whether the ``check_generate`` method should be call for this task to see if it has been requested. We skip this for subtasks and prerequisites, since they are needed by another task regardless of whether the user specifically requested them. """ # Authors # ------- # Xylar Asay-Davis key = (analysisTask.taskName, analysisTask.subtaskName) if key in analysesToGenerate.keys(): # The task was already added assert(analysisTask._setupStatus == 'success') return # for each anlaysis task, check if we want to generate this task # and if the analysis task has a valid configuration taskTitle = analysisTask.printTaskName if callCheckGenerate and not analysisTask.check_generate(): # we don't need to add this task -- it wasn't requested return # first, we should try to add the prerequisites of this task and its # subtasks (if they aren't also subtasks for this task) prereqs = analysisTask.runAfterTasks for subtask in analysisTask.subtasks: for prereq in subtask.runAfterTasks: if prereq not in analysisTask.subtasks: prereqs.extend(subtask.runAfterTasks) for prereq in prereqs: add_task_and_subtasks(prereq, analysesToGenerate, callCheckGenerate=False) if prereq._setupStatus != 'success': # a prereq failed setup_and_check print("ERROR: prerequisite task {} of analysis task {}" " failed during check,\n" " so this task will not be run".format( prereq.printTaskName, taskTitle)) analysisTask._setupStatus = 'fail' return # make sure all prereqs have been set up successfully before trying to # set up this task -- this task's setup may depend on setup in the prereqs try: analysisTask.setup_and_check() except (Exception, BaseException): traceback.print_exc(file=sys.stdout) print("ERROR: analysis task {} failed during check and " "will not be run".format(taskTitle)) analysisTask._setupStatus = 'fail' return # next, we should try to add the subtasks. This is done after the current # analysis task has been set up in case subtasks depend on information # from the parent task for subtask in analysisTask.subtasks: add_task_and_subtasks(subtask, analysesToGenerate, callCheckGenerate=False) if subtask._setupStatus != 'success': # a subtask failed setup_and_check print("ERROR: a subtask of analysis task {}" " failed during check,\n" " so this task will not be run".format(taskTitle)) analysisTask._setupStatus = 'fail' return analysesToGenerate[key] = analysisTask analysisTask._setupStatus = 'success'
# }}}
[docs]def update_generate(config, generate): # {{{ """ Update the 'generate' config option using a string from the command line. Parameters ---------- config : ``MpasAnalysisConfigParser`` object contains config options generate : str a comma-separated string of generate flags: either names of analysis tasks or commands of the form ``all_<tag>`` or ``no_<tag>`` indicating that analysis with a given tag should be included or excluded). """ # Authors # ------- # Xylar Asay-Davis # overwrite the 'generate' in config with a string that parses to # a list of string generateList = generate.split(',') generateString = ', '.join(["'{}'".format(element) for element in generateList]) generateString = '[{}]'.format(generateString) config.set('output', 'generate', generateString) # }}}
[docs]def run_analysis(config, analyses): # {{{ """ Run all the tasks, either in serial or in parallel Parameters ---------- config : ``MpasAnalysisConfigParser`` object contains config options analyses : OrderedDict of ``AnalysisTask`` objects A dictionary of analysis tasks to run with (task, subtask) names as keys """ # Authors # ------- # Xylar Asay-Davis # write the config file the log directory logsDirectory = build_config_full_path(config, 'output', 'logsSubdirectory') mainRunName = config.get('runs', 'mainRunName') configFileName = '{}/config.{}'.format(logsDirectory, mainRunName) configFile = open(configFileName, 'w') config.write(configFile) configFile.close() taskCount = config.getWithDefault('execute', 'parallelTaskCount', default=1) isParallel = taskCount > 1 and len(analyses) > 1 for analysisTask in analyses.values(): if not analysisTask.runAfterTasks and not analysisTask.subtasks: analysisTask._runStatus.value = AnalysisTask.READY else: analysisTask._runStatus.value = AnalysisTask.BLOCKED tasksWithErrors = [] runningTasks = {} # run each analysis task while True: # we still have tasks to run for analysisTask in analyses.values(): if analysisTask._runStatus.value == AnalysisTask.BLOCKED: prereqs = analysisTask.runAfterTasks + analysisTask.subtasks prereqStatus = [prereq._runStatus.value for prereq in prereqs] if any([runStatus == AnalysisTask.FAIL for runStatus in prereqStatus]): # a prerequisite failed so this task cannot succeed analysisTask._runStatus.value = AnalysisTask.FAIL if all([runStatus == AnalysisTask.SUCCESS for runStatus in prereqStatus]): # no unfinished prerequisites so we can run this task analysisTask._runStatus.value = AnalysisTask.READY unfinishedCount = 0 for analysisTask in analyses.values(): if analysisTask._runStatus.value not in [AnalysisTask.SUCCESS, AnalysisTask.FAIL]: unfinishedCount += 1 if unfinishedCount <= 0: # we're done break # launch new tasks for key, analysisTask in analyses.items(): if analysisTask._runStatus.value == AnalysisTask.READY: if isParallel: print('Running {}'.format(analysisTask.printTaskName)) analysisTask._runStatus.value = AnalysisTask.RUNNING analysisTask.start() runningTasks[key] = analysisTask if len(runningTasks.keys()) >= taskCount: break else: analysisTask.run(writeLogFile=False) if isParallel: # wait for a task to finish analysisTask = wait_for_task(runningTasks) key = (analysisTask.taskName, analysisTask.subtaskName) runningTasks.pop(key) taskTitle = analysisTask.printTaskName if analysisTask._runStatus.value == AnalysisTask.SUCCESS: print(" Task {} has finished successfully.".format( taskTitle)) elif analysisTask._runStatus.value == AnalysisTask.FAIL: print("ERROR in task {}. See log file {} for details".format( taskTitle, analysisTask._logFileName)) tasksWithErrors.append(taskTitle) else: print("Unexpected status from in task {}. This may be a " "bug.".format(taskTitle)) else: if analysisTask._runStatus.value == AnalysisTask.FAIL: sys.exit(1) if not isParallel and config.getboolean('plot', 'displayToScreen'): import matplotlib.pyplot as plt plt.show() # raise the last exception so the process exits with an error errorCount = len(tasksWithErrors) if errorCount == 1: print("There were errors in task {}".format(tasksWithErrors[0])) sys.exit(1) elif errorCount > 0: print("There were errors in {} tasks: {}".format( errorCount, ', '.join(tasksWithErrors))) sys.exit(1)
# }}}
[docs]def wait_for_task(runningTasks, timeout=0.1): # {{{ """ Build a list of analysis modules based on the 'generate' config option. New tasks should be added here, following the approach used for existing analysis tasks. Parameters ---------- runningTasks : dict of ``AnalysisTasks`` The tasks that are currently running, with task names as keys Returns ------- analysisTask : ``AnalysisTasks`` A task that finished """ # Authors # ------- # Xylar Asay-Davis # necessary to have a timeout so we can kill the whole thing # with a keyboard interrupt while True: for analysisTask in runningTasks.values(): analysisTask.join(timeout=timeout) if not analysisTask.is_alive(): return analysisTask # }}}
if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawTextHelpFormatter) parser.add_argument("--setup_only", dest="setup_only", action='store_true', help="If only the setup phase, not the run or HTML " "generation phases, should be executed.") parser.add_argument("--html_only", dest="html_only", action='store_true', help="If only the setup and HTML generation phases, " "not the run phase, should be executed.") parser.add_argument("-g", "--generate", dest="generate", help="A list of analysis modules to generate " "(nearly identical generate option in config file).", metavar="ANALYSIS1[,ANALYSIS2,ANALYSIS3,...]") parser.add_argument("-l", "--list", dest="list", action='store_true', help="List the available analysis tasks") parser.add_argument("-p", "--purge", dest="purge", action='store_true', help="Purge the analysis by deleting the output" "directory before running") parser.add_argument('configFiles', metavar='CONFIG', type=str, nargs='*', help='config file') args = parser.parse_args() for configFile in args.configFiles: if not os.path.exists(configFile): raise OSError('Config file {} not found.'.format(configFile)) # add config.default to cover default not included in the config files # provided on the command line if pkg_resources.resource_exists('mpas_analysis', 'config.default'): defaultConfig = pkg_resources.resource_filename('mpas_analysis', 'config.default') configFiles = [defaultConfig] + args.configFiles else: print('WARNING: Did not find config.default. Assuming other config ' 'file(s) contain a\n' 'full set of configuration options.') defaultConfig = None configFiles = args.configFiles config = MpasAnalysisConfigParser() config.read(configFiles) if args.list: analyses = build_analysis_list(config, refConfig=None) for analysisTask in analyses: print('task: {}'.format(analysisTask.taskName)) print(' component: {}'.format(analysisTask.componentName)), print(' tags: {}'.format(', '.join(analysisTask.tags))) sys.exit(0) if config.has_option('runs', 'referenceRunConfigFile'): refConfigFile = config.get('runs', 'referenceRunConfigFile') if not os.path.exists(refConfigFile): raise OSError('A reference config file {} was specified but the ' 'file does not exist'.format(refConfigFile)) refConfigFiles = [refConfigFile] if defaultConfig is not None: refConfigFiles = [defaultConfig] + refConfigFiles refConfig = MpasAnalysisConfigParser() refConfig.read(refConfigFiles) # replace the log directory so log files get written to this run's # log directory, not the reference run's logsDirectory = build_config_full_path(config, 'output', 'logsSubdirectory') refConfig.set('output', 'logsSubdirectory', logsDirectory) print('Comparing to reference run {} rather than observations. \n' 'Make sure that MPAS-Analysis has been run previously with the ' 'ref config file.'.format(refConfig.get('runs', 'mainRunName'))) else: refConfig = None if args.purge: outputDirectory = config.get('output', 'baseDirectory') if not os.path.exists(outputDirectory): print('Output directory {} does not exist.\n' 'No purge necessary.'.format(outputDirectory)) else: print('Deleting contents of {}'.format(outputDirectory)) shutil.rmtree(outputDirectory) if args.generate: update_generate(config, args.generate) if refConfig is not None: # we want to use the "generate" option from the current run, not # the reference config file refConfig.set('output', 'generate', config.get('output', 'generate')) logsDirectory = build_config_full_path(config, 'output', 'logsSubdirectory') make_directories(logsDirectory) analyses = build_analysis_list(config, refConfig) analyses = determine_analyses_to_generate(analyses) if not args.setup_only and not args.html_only: run_analysis(config, analyses) if not args.setup_only: generate_html(config, analyses, refConfig) # vim: foldmethod=marker ai ts=4 sts=4 et sw=4 ft=python