Source code for polaris.setup

import argparse
import os
import pickle
import shutil
import sys
import warnings
from typing import Dict, List

from polaris import Task, provenance
from polaris.components import get_components
from polaris.config import PolarisConfigParser
from polaris.io import symlink
from polaris.job import write_job_script
from polaris.machines import discover_machine


[docs]def setup_tasks(work_dir, task_list=None, numbers=None, config_file=None, machine=None, baseline_dir=None, component_path=None, suite_name='custom', cached=None, copy_executable=False, clean=False): """ Set up one or more tasks Parameters ---------- work_dir : str A directory that will serve as the base for creating task directories task_list : list of str, optional Relative paths for a tasks to set up numbers : list of str, optional Task numbers to setup, as listed from ``polaris list``, optionally with a suffix ``c`` to indicate that all steps in that task should be cached config_file : {str, None}, optional Configuration file with custom options for setting up and running tasks machine : str, optional The name of one of the machines with defined config options, which can be listed with ``polaris list --machines`` baseline_dir : str, optional Location of baselines that can be compared to component_path : str, optional The relative or absolute path to the location where the model and default namelists have been built suite_name : str, optional The name of the suite if tasks are being set up through a suite or ``'custom'`` if not cached : list of list of str, optional For each task in ``tasks``, which steps (if any) should be cached, or a list with "_all" as the first entry if all steps in the task should be cached copy_executable : bool, optional Whether to copy the model executable to the work directory clean : bool, optional Whether to delete the contents of the base work directory before setting up tasks Returns ------- tasks : dict of polaris.Task A dictionary of tasks, with the relative path in the work directory as keys """ machine = __get_machine_and_check_params(machine, config_file, task_list, numbers, cached) if work_dir is None: print('Warning: no base work directory was provided so setting up in ' 'the current directory.') work_dir = os.getcwd() work_dir = os.path.abspath(work_dir) components = get_components() all_tasks = dict() for component in components: for task in component.tasks.values(): all_tasks[task.path] = task tasks: Dict[str, Task] = dict() cached_steps: Dict[str, List[str]] = dict() _add_tasks_by_number(numbers, all_tasks, tasks, cached_steps) _add_tasks_by_name(task_list, all_tasks, cached, tasks, cached_steps) # get the component of the first task. We'll ensure that all tasks are # for this component first_path = next(iter(tasks)) component = tasks[first_path].component basic_config = _get_basic_config(config_file, machine, component_path, component) provenance.write(work_dir, tasks, config=basic_config) _expand_and_mark_cached_steps(tasks, cached_steps) if clean: print('') print('Cleaning task and step work directories:') _clean_tasks_and_steps(tasks, work_dir) print('') _setup_configs(component, tasks, work_dir, config_file, machine, component_path, copy_executable) print('Setting up tasks:') for path, task in tasks.items(): setup_task(path, task, machine, work_dir, baseline_dir, cached_steps=cached_steps[path]) _check_dependencies(tasks) suite = {'name': suite_name, 'tasks': tasks, 'work_dir': work_dir} # pickle the task or step dictionary for use at runtime pickle_file = os.path.join(suite['work_dir'], f'{suite_name}.pickle') with open(pickle_file, 'wb') as handle: pickle.dump(suite, handle, protocol=pickle.HIGHEST_PROTOCOL) _symlink_load_script(work_dir) max_cores, max_of_min_cores = _get_required_cores(tasks) print(f'target cores: {max_cores}') print(f'minimum cores: {max_of_min_cores}') if machine is not None: write_job_script(basic_config, machine, max_cores, max_of_min_cores, work_dir, suite=suite_name) return tasks
[docs]def setup_task(path, task, machine, work_dir, baseline_dir, cached_steps): """ Set up one or more tasks Parameters ---------- path : str Relative path for a tasks to set up task : polaris.Task A task to set up machine : str The name of one of the machines with defined config options, which can be listed with ``polaris list --machines`` work_dir : str A directory that will serve as the base for creating task directories baseline_dir : str Location of baselines that can be compared to cached_steps : list of str Which steps (if any) should be cached, identified by a list of subdirectories in the component """ print(f' {path}') task_dir = os.path.join(work_dir, path) try: os.makedirs(task_dir) except FileExistsError: pass task.work_dir = task_dir task.base_work_dir = work_dir # add the baseline directory for this task if baseline_dir is not None: task.baseline_dir = os.path.join(baseline_dir, path) if len(cached_steps) > 0: print_steps = ' '.join(cached_steps) print(f' steps with cached outputs: {print_steps}') # iterate over steps for step in task.steps.values(): _setup_step(task, step, work_dir, baseline_dir, task_dir) # wait until we've set up all the steps before pickling because steps may # need other steps to be set up for step in task.steps.values(): if step.setup_complete: # this is a shared step that has already been set up continue # pickle the task and step for use at runtime pickle_filename = os.path.join(step.work_dir, 'step.pickle') with open(pickle_filename, 'wb') as handle: pickle.dump(step, handle, protocol=pickle.HIGHEST_PROTOCOL) _symlink_load_script(step.work_dir) if machine is not None: cores = step.cpus_per_task * step.ntasks min_cores = step.min_cpus_per_task * step.min_tasks write_job_script(step.config, machine, cores, min_cores, step.work_dir) step.setup_complete = True # pickle the task and step for use at runtime pickle_filename = os.path.join(task.work_dir, 'task.pickle') with open(pickle_filename, 'wb') as handle: suite = {'name': 'task', 'tasks': {task.path: task}, 'work_dir': task.work_dir} pickle.dump(suite, handle, protocol=pickle.HIGHEST_PROTOCOL) _symlink_load_script(task_dir) if machine is not None: max_cores, max_of_min_cores = _get_required_cores({path: task}) write_job_script(task.config, machine, max_cores, max_of_min_cores, task_dir)
def main(): parser = argparse.ArgumentParser( description='Set up one or more tasks', prog='polaris setup') parser.add_argument("-t", "--tasks", nargs='+', dest="tasks", help="Relative path for a task(s) to set up.", metavar="PATH") parser.add_argument("-n", "--task_number", nargs='+', dest="task_num", type=str, help="Task number(s) to setup, as listed from " "'polaris list'. Can be a space-separated " "list of task numbers. A suffix 'c' indicates " "that all steps in the task should use cached " "outputs.", metavar="NUM") parser.add_argument("-f", "--config_file", dest="config_file", help="Configuration file for task setup.", metavar="FILE") parser.add_argument("-m", "--machine", dest="machine", help="The name of the machine for loading machine-" "related config options.", metavar="MACH") parser.add_argument("-w", "--work_dir", dest="work_dir", required=True, help="A base directory for setting up tasks.", metavar="PATH") parser.add_argument("-b", "--baseline_dir", dest="baseline_dir", help="Location of baselines that can be compared to.", metavar="PATH") parser.add_argument("-p", "--component_path", dest="component_path", help="The path where the component executable and " "default namelists have been built.", metavar="PATH") parser.add_argument("--suite_name", dest="suite_name", default="custom", help="The name to use for the 'custom' suite " "containing all setup tasks.", metavar="SUITE") parser.add_argument("--cached", dest="cached", nargs='+', help="A list of steps in a single task supplied with " "--tasks or --task_number that should use cached " "outputs, or '_all' if all steps should be " "cached.", metavar="STEP") parser.add_argument("--copy_executable", dest="copy_executable", action="store_true", help="If the model executable should be copied to the " "work directory.") parser.add_argument("--clean", dest="clean", action="store_true", help="If the base work directory should be deleted " "before setting up the tasks.") args = parser.parse_args(sys.argv[2:]) cached = None if args.cached is not None: if args.tasks is not None and len(args.tasks) != 1: raise ValueError('You can only cache steps for one task at at ' 'time.') if args.task_num is not None and len(args.task_num) != 1: raise ValueError('You can only cache steps for one task at at ' 'time.') # cached is a list of lists cached = [args.cached] setup_tasks(task_list=args.tasks, numbers=args.task_num, config_file=args.config_file, machine=args.machine, work_dir=args.work_dir, baseline_dir=args.baseline_dir, component_path=args.component_path, suite_name=args.suite_name, cached=cached, copy_executable=args.copy_executable, clean=args.clean) def _expand_and_mark_cached_steps(tasks, cached_steps): """ Mark any steps that will be cached. If any task asked for a step to be cached, it will be cached for all tasks that share the step. """ for path, task in tasks.items(): cached_names = cached_steps[path] if len(cached_names) > 0 and cached_names[0] == '_all': cached_steps[path] = list(task.steps.keys()) for step_name in cached_steps[path]: task.steps[step_name].cached = True def _setup_configs(component, tasks, work_dir, config_file, machine, component_path, copy_executable): """ Set up config parsers for this component """ common_config = _get_basic_config(config_file, machine, component_path, component) if copy_executable: common_config.set('setup', 'copy_executable', 'True') if 'POLARIS_BRANCH' in os.environ: polaris_branch = os.environ['POLARIS_BRANCH'] common_config.set('paths', 'polaris_branch', polaris_branch) else: common_config.set('paths', 'polaris_branch', os.getcwd()) initial_configs = _add_task_configs(component, tasks, common_config) # okay, we're finally ready to configure all the tasks and add configs # to the "owned" steps configs = _configure_tasks_and_add_step_configs(tasks, component, initial_configs, common_config) _write_configs(common_config, configs, component.name, work_dir) _symlink_configs(tasks, component.name, work_dir) def _add_task_configs(component, tasks, common_config): """ Add config parsers for tasks and steps that don't already have shared ones """ # get a list of shared steps and add config files for tasks to the # component configs = dict() for task in tasks.values(): if task.config.filepath is None: task.config_filename = f'{task.name}.cfg' task.config.filepath = os.path.join(task.subdir, task.config_filename) component.add_config(task.config) configs[task.config.filepath] = task.config # now go through all the configs and prepend the common config options, # then run the setup() method for each in case there is some customization for config in configs.values(): config.prepend(common_config) config.setup() return configs def _configure_tasks_and_add_step_configs(tasks, component, initial_configs, common_config): """ Call the configure() method for each task and add configs to "owned" steps """ for config in initial_configs.values(): for task in config.tasks: task.configure() config.set(section=f'{task.name}', option='steps_to_run', value=' '.join(task.steps_to_run), comment=f'A list of steps to include when running the ' f'{task.name} task') # add configs to steps after calling task.configure() on all tasks in case # new steps were added configs = dict() new_configs = dict() for task in tasks.values(): configs[task.config.filepath] = task.config for step in task.steps.values(): if step.has_shared_config: configs[step.config.filepath] = step.config if step.config.filepath is None: step.config_filename = f'{step.name}.cfg' step.config.filepath = os.path.join(step.subdir, step.config_filename) if step.config.filepath not in initial_configs: new_configs[step.config.filepath] = step.config component.add_config(step.config) else: step._set_config(task.config, link=task.config_filename) for config in new_configs.values(): config.prepend(common_config) config.setup() return configs def _write_configs(common_config, configs, component_name, work_dir): """ Write out all the config files """ # add the common config at the component level common_config.filepath = f'{component_name}.cfg' configs[common_config.filepath] = common_config # finally, write out the config files component_work_dir = os.path.join(work_dir, component_name) for config in configs.values(): config_filepath = os.path.join(component_work_dir, config.filepath) config_dir = os.path.dirname(config_filepath) try: os.makedirs(config_dir) except FileExistsError: pass with open(config_filepath, 'w') as f: config.write(f) def _symlink_configs(tasks, component_name, work_dir): """ Symlink config files for requested tasks and steps """ component_work_dir = os.path.join(work_dir, component_name) symlinks = dict() for task in tasks.values(): config = task.config config_filepath = os.path.join(component_work_dir, config.filepath) link_path = os.path.join(component_work_dir, task.subdir, task.config_filename) if not os.path.exists(link_path) and link_path not in symlinks: symlinks[link_path] = config_filepath for step in task.steps.values(): config = step.config config_filepath = os.path.join(component_work_dir, config.filepath) link_path = os.path.join(component_work_dir, step.subdir, step.config_filename) if not os.path.exists(link_path) and link_path not in symlinks: symlinks[link_path] = config_filepath for link_path, config_filepath in symlinks.items(): link_dir = os.path.dirname(link_path) try: os.makedirs(link_dir) except FileExistsError: pass symlink(config_filepath, link_path) def _clean_tasks_and_steps(tasks, base_work_dir): """ Remove contents of task and step work directories to start fresh """ print(f'{base_work_dir}:') for path, task in tasks.items(): task_work_dir = os.path.join(base_work_dir, path) try: shutil.rmtree(task_work_dir) print(f' {path}') except FileNotFoundError: pass for step in task.steps.values(): step_work_dir = os.path.join(base_work_dir, step.path) try: shutil.rmtree(step_work_dir) print(f' {step.path}') except FileNotFoundError: pass def _get_required_cores(tasks): """ Get the maximum number of target cores and the max of min cores """ max_cores = 0 max_of_min_cores = 0 for task in tasks.values(): for step_name in task.steps_to_run: step = task.steps[step_name] if step.ntasks is None: raise ValueError( f'The number of tasks (ntasks) was never set for ' f'{task.path} step {step_name}') if step.cpus_per_task is None: raise ValueError( f'The number of CPUs per task (cpus_per_task) was never ' f'set for {task.path} step {step_name}') cores = step.cpus_per_task * step.ntasks min_cores = step.min_cpus_per_task * step.min_tasks max_cores = max(max_cores, cores) max_of_min_cores = max(max_of_min_cores, min_cores) return max_cores, max_of_min_cores def __get_machine_and_check_params(machine, config_file, tasks, numbers, cached): if machine is None and 'POLARIS_MACHINE' in os.environ: machine = os.environ['POLARIS_MACHINE'] if machine is None: machine = discover_machine() if config_file is None and machine is None: raise ValueError('At least one of config_file and machine is needed.') if config_file is not None and not os.path.exists(config_file): raise FileNotFoundError( f'The user config file wasn\'t found: {config_file}') if tasks is None and numbers is None: raise ValueError('At least one of tasks or numbers is needed.') if cached is not None: if tasks is None: warnings.warn('Ignoring "cached" argument because "tasks" was ' 'not provided') elif len(cached) != len(tasks): raise ValueError('A list of cached steps must be provided for ' 'each task in "tasks"') return machine def _get_basic_config(config_file, machine, component_path, component): """ Get a base config parser for the machine and component but not a specific task """ config = PolarisConfigParser() if config_file is not None: config.add_user_config(config_file) # start with default polaris config options config.add_from_package('polaris', 'default.cfg') # add the E3SM config options from mache if machine is not None: config.add_from_package('mache.machines', f'{machine}.cfg', exception=False) # add the polaris machine config file if machine is None: machine = 'default' config.add_from_package('polaris.machines', f'{machine}.cfg') if 'POLARIS_BRANCH' in os.environ: polaris_branch = os.environ['POLARIS_BRANCH'] config.set('paths', 'polaris_branch', polaris_branch) else: config.set('paths', 'polaris_branch', os.getcwd()) # add the config options for the component config.add_from_package(f'polaris.{component.name}', f'{component.name}.cfg') component.configure(config) # set the component_path path from the command line if provided if component_path is not None: component_path = os.path.abspath(component_path) config.set('paths', 'component_path', component_path, user=True) return config def _add_tasks_by_number(numbers, all_tasks, tasks, cached_steps): if numbers is not None: keys = list(all_tasks) for number in numbers: cache_all = False if number.endswith('c'): cache_all = True number = int(number[:-1]) else: number = int(number) if number >= len(keys): raise ValueError(f'task number {number} is out of range. ' f'There are only {len(keys)} tasks.') path = keys[number] if cache_all: cached_steps[path] = ['_all'] else: cached_steps[path] = list() tasks[path] = all_tasks[path] def _add_tasks_by_name(task_list, all_tasks, cached, tasks, cached_steps): if task_list is not None: for index, path in enumerate(task_list): if path not in all_tasks: raise ValueError(f'Task with path {path} is not in ' f'tasks') if cached is not None: cached_steps[path] = cached[index] else: cached_steps[path] = list() tasks[path] = all_tasks[path] def _setup_step(task, step, work_dir, baseline_dir, task_dir): """ Set up a step in a task """ # make the step directory if it doesn't exist step_dir = os.path.join(work_dir, step.path) if step.name in task.step_symlinks: symlink(step_dir, os.path.join(task_dir, task.step_symlinks[step.name])) if step.setup_complete: # this is a shared step that has already been set up return try: os.makedirs(step_dir) except FileExistsError: pass step.work_dir = step_dir step.base_work_dir = work_dir # set up the step step.setup() # add the baseline directory for this step if baseline_dir is not None: step.baseline_dir = os.path.join(baseline_dir, step.path) # process input, output, namelist and streams files step.process_inputs_and_outputs() def _symlink_load_script(work_dir): """ make a symlink to the script for loading the polaris conda env. """ if 'LOAD_POLARIS_ENV' in os.environ: script_filename = os.environ['LOAD_POLARIS_ENV'] symlink(script_filename, os.path.join(work_dir, 'load_polaris_env.sh')) def _check_dependencies(tasks): for task in tasks.values(): for step in task.steps.values(): for name, dependency in step.dependencies.items(): if dependency.work_dir == '': raise ValueError(f'The dependency {name} of ' f'{task.path} step {step.name} was ' f'not set up.')