import argparse
import glob
import os
import pickle
import sys
import time
from datetime import timedelta
import mpas_tools.io
from mpas_tools.logging import LoggingContext, check_call
from polaris import Task
from polaris.logging import log_function_call, log_method_call
from polaris.parallel import (
get_available_parallel_resources,
run_command,
set_cores_per_node,
)
from polaris.run import (
complete_step_run,
load_dependencies,
setup_config,
unpickle_suite,
)
# ANSI fail text: https://stackoverflow.com/a/287944/7728169
start_fail = '\033[91m'
start_pass = '\033[92m'
start_time_color = '\033[94m'
end_color = '\033[0m'
pass_str = f'{start_pass}PASS{end_color}'
success_str = f'{start_pass}SUCCESS{end_color}'
fail_str = f'{start_fail}FAIL{end_color}'
error_str = f'{start_fail}ERROR{end_color}'
[docs]def run_tasks(suite_name, quiet=False, is_task=False, steps_to_run=None,
steps_to_skip=None):
"""
Run the given suite or task
Parameters
----------
suite_name : str
The name of the suite
quiet : bool, optional
Whether step names are not included in the output as the suite
progresses
is_task : bool
Whether this is a task instead of a full suite
steps_to_run : list of str, optional
A list of the steps to run if this is a task, not a full suite.
The default behavior is to run the default steps unless they are in
``steps_to_skip``
steps_to_skip : list of str, optional
A list of steps not to run if this is a task, not a full suite.
Typically, these are steps to remove from the defaults
"""
suite = unpickle_suite(suite_name)
# get the config file for the first task in the suite
task = next(iter(suite['tasks'].values()))
component = task.component
common_config = setup_config(task.base_work_dir,
component.name,
f'{component.name}.cfg')
available_resources = get_available_parallel_resources(common_config)
# start logging to stdout/stderr
with LoggingContext(suite_name) as stdout_logger:
os.environ['PYTHONUNBUFFERED'] = '1'
if not is_task:
try:
os.makedirs('case_outputs')
except OSError:
pass
failures = 0
cwd = os.getcwd()
suite_start = time.time()
task_times = dict()
result_strs = dict()
for task_name in suite['tasks']:
stdout_logger.info(f'{task_name}')
task = suite['tasks'][task_name]
if is_task:
log_filename = None
task_logger = stdout_logger
else:
task_prefix = task.path.replace('/', '_')
log_filename = f'{cwd}/case_outputs/{task_prefix}.log'
task_logger = None
result_str, success, task_time = _log_and_run_task(
task, stdout_logger, task_logger, quiet, log_filename,
is_task, steps_to_run, steps_to_skip,
available_resources)
result_strs[task_name] = result_str
if not success:
failures += 1
task_times[task_name] = task_time
suite_time = time.time() - suite_start
os.chdir(cwd)
_log_task_runtimes(stdout_logger, task_times, result_strs, suite_time,
failures)
[docs]def run_single_step(step_is_subprocess=False, quiet=False):
"""
Used by the framework to run a step when ``polaris serial`` gets called in
the step's work directory
Parameters
----------
step_is_subprocess : bool, optional
Whether the step is being run as a subprocess of a task or suite
quiet : bool, optional
Whether step names are not included in the output as the suite
progresses
"""
with open('step.pickle', 'rb') as handle:
step = pickle.load(handle)
task = Task(component=step.component, name='dummy_task')
task.add_step(step)
task.new_step_log_file = False
# This prevents infinite loop of subprocesses
if step_is_subprocess:
step.run_as_subprocess = False
config = setup_config(step.base_work_dir,
step.component.name,
step.config.filepath)
task.config = config
available_resources = get_available_parallel_resources(config)
set_cores_per_node(task.config, available_resources['cores_per_node'])
mpas_tools.io.default_format = config.get('io', 'format')
mpas_tools.io.default_engine = config.get('io', 'engine')
# start logging to stdout/stderr
logger_name = step.path.replace('/', '_')
with LoggingContext(name=logger_name) as stdout_logger:
task.logger = stdout_logger
if quiet:
task.stdout_logger = None
else:
task.stdout_logger = stdout_logger
log_function_call(function=_run_task, logger=stdout_logger)
stdout_logger.info('')
stdout_logger.info(f'Running step: {step.name}')
_run_task(task, available_resources)
def main():
parser = argparse.ArgumentParser(
description='Run a suite, task or step',
prog='polaris serial')
parser.add_argument("suite", nargs='?',
help="The name of a suite to run. Can exclude "
"or include the .pickle filename suffix.")
parser.add_argument("--steps", dest="steps", nargs='+',
help="The steps of a task to run.")
parser.add_argument("--skip_steps", dest="skip_steps", nargs='+',
help="The steps of a task not to run, see "
"steps_to_run in the config file for defaults.")
parser.add_argument("-q", "--quiet", dest="quiet", action="store_true",
help="If set, step names are not included in the "
"output as the suite progresses. Has no "
"effect when running tasks or steps on "
"their own.")
parser.add_argument("--step_is_subprocess", dest="step_is_subprocess",
action="store_true",
help="Used internally by polaris to indicate that "
"a step is being run as a subprocess.")
args = parser.parse_args(sys.argv[2:])
if args.suite is not None:
# Running a specified suite from the base work directory
run_tasks(args.suite, quiet=args.quiet)
elif os.path.exists('task.pickle'):
# Running a task inside of its work directory
run_tasks(suite_name='task', quiet=args.quiet, is_task=True,
steps_to_run=args.steps, steps_to_skip=args.skip_steps)
elif os.path.exists('step.pickle'):
# Running a step inside of its work directory
run_single_step(step_is_subprocess=args.step_is_subprocess,
quiet=args.quiet)
else:
pickles = glob.glob('*.pickle')
if len(pickles) == 1:
# Running an unspecified suite from the base work directory
suite = os.path.splitext(os.path.basename(pickles[0]))[0]
run_tasks(suite, quiet=args.quiet)
elif len(pickles) == 0:
raise OSError('No pickle files were found. Are you sure this is '
'a polaris suite, task or step work directory?')
else:
raise ValueError('More than one suite was found. Please specify '
'which to run: polaris serial <suite>')
def _update_steps_to_run(task_name, steps_to_run, steps_to_skip, config,
steps):
"""
Update the steps to run
"""
if steps_to_run is None:
step_str = config.get(task_name, 'steps_to_run').replace(',', ' ')
steps_to_run = step_str.split()
for step in steps_to_run:
if step not in steps:
raise ValueError(
f'A step "{step}" was requested but is not one of the steps '
f'in this task:'
f'\n{list(steps)}')
if steps_to_skip is not None:
for step in steps_to_skip:
if step not in steps:
raise ValueError(
f'A step "{step}" was flagged not to run but is not one '
f'of the steps in this task:'
f'\n{list(steps)}')
steps_to_run = [step for step in steps_to_run if step not in
steps_to_skip]
return steps_to_run
def _log_task_runtimes(stdout_logger, task_times, result_strs, suite_time,
failures):
"""
Log the runtimes for the task(s)
"""
stdout_logger.info('Task Runtimes:')
for task_name, task_time in task_times.items():
task_time_str = str(timedelta(seconds=round(task_time)))
stdout_logger.info(f'{task_time_str} '
f'{result_strs[task_name]} {task_name}')
suite_time_str = str(timedelta(seconds=round(suite_time)))
stdout_logger.info(f'Total runtime: {suite_time_str}')
if failures == 0:
stdout_logger.info('PASS: All passed successfully!')
else:
if failures == 1:
message = '1 task'
else:
message = f'{failures} tasks'
stdout_logger.error(f'FAIL: {message} failed, see above.')
sys.exit(1)
def _print_to_stdout(task, message):
"""
Write out a message to stdout if we're not running a single step
"""
if task.stdout_logger is not None:
task.stdout_logger.info(message)
if task.logger != task.stdout_logger:
# also write it to the log file
task.logger.info(message)
def _log_and_run_task(task, stdout_logger, task_logger, quiet,
log_filename, is_task, steps_to_run,
steps_to_skip, available_resources):
task_name = task.path.replace('/', '_')
with LoggingContext(task_name, logger=task_logger,
log_filename=log_filename) as task_logger:
if quiet:
# just log the step names and any failure messages to the
# log file
task.stdout_logger = task_logger
else:
# log steps to stdout
task.stdout_logger = stdout_logger
task.logger = task_logger
task.log_filename = log_filename
# If we are running a task on its own, we want a log file per step
# If we are running within a suite, we want a log file per task, with
# output from each of its steps
task.new_step_log_file = is_task
os.chdir(task.work_dir)
config = setup_config(task.base_work_dir,
task.component.name,
task.config.filepath)
task.config = config
set_cores_per_node(task.config,
available_resources['cores_per_node'])
mpas_tools.io.default_format = config.get('io', 'format')
mpas_tools.io.default_engine = config.get('io', 'engine')
task.steps_to_run = _update_steps_to_run(
task.name, steps_to_run, steps_to_skip, config, task.steps)
task_start = time.time()
log_function_call(function=_run_task, logger=task_logger)
task_logger.info('')
task_list = ', '.join(task.steps_to_run)
task_logger.info(f'Running steps: {task_list}')
try:
baselines_passed = _run_task(task, available_resources)
run_status = success_str
task_pass = True
except Exception:
run_status = error_str
task_pass = False
task_logger.exception('Exception raised while running '
'the steps of the task')
status = f' task execution: {run_status}'
if task_pass:
stdout_logger.info(status)
if baselines_passed is None:
result_str = pass_str
success = True
else:
if baselines_passed:
baseline_str = pass_str
result_str = pass_str
success = True
else:
baseline_str = fail_str
result_str = fail_str
success = False
status = f' baseline comp.: {baseline_str}'
stdout_logger.info(status)
else:
stdout_logger.error(status)
if not is_task:
stdout_logger.error(f' see: case_outputs/{task_name}.log')
result_str = fail_str
success = False
task_time = time.time() - task_start
task_time_str = str(timedelta(seconds=round(task_time)))
stdout_logger.info(f' task runtime: '
f'{start_time_color}{task_time_str}{end_color}')
return result_str, success, task_time
def _run_task(task, available_resources):
"""
Run each step of the task
"""
logger = task.logger
cwd = os.getcwd()
baselines_passed = None
for step_name in task.steps_to_run:
step = task.steps[step_name]
complete_filename = os.path.join(step.work_dir,
'polaris_step_complete.log')
_print_to_stdout(task, f' * step: {step_name}')
if os.path.exists(complete_filename):
_print_to_stdout(task, ' already completed')
continue
if step.cached:
_print_to_stdout(task, ' cached')
continue
step_start = time.time()
step.config = setup_config(step.base_work_dir,
step.component.name,
step.config.filepath)
if task.log_filename is not None:
step_log_filename = task.log_filename
else:
step_log_filename = None
try:
if step.run_as_subprocess:
_run_step_as_subprocess(
logger, step, task.new_step_log_file)
else:
_run_step(task, step, task.new_step_log_file,
available_resources, step_log_filename)
except Exception:
_print_to_stdout(task,
f' execution: {error_str}')
raise
_print_to_stdout(task,
f' execution: {success_str}')
os.chdir(cwd)
step_time = time.time() - step_start
step_time_str = str(timedelta(seconds=round(step_time)))
compared, status = step.validate_baselines()
if compared:
if status:
baseline_str = pass_str
else:
baseline_str = fail_str
_print_to_stdout(task,
f' baseline comp.: {baseline_str}')
if baselines_passed is None:
baselines_passed = status
elif not status:
baselines_passed = False
_print_to_stdout(task,
f' runtime: '
f'{start_time_color}{step_time_str}{end_color}')
return baselines_passed
def _run_step(task, step, new_log_file, available_resources,
step_log_filename):
"""
Run the requested step
"""
logger = task.logger
cwd = os.getcwd()
missing_files = list()
for input_file in step.inputs:
if not os.path.exists(input_file):
missing_files.append(input_file)
if len(missing_files) > 0:
raise OSError(
f'input file(s) missing in step {step.name} in '
f'{step.component.name}/{step.subdir}: {missing_files}')
load_dependencies(step)
# each logger needs a unique name
logger_name = step.path.replace('/', '_')
if new_log_file:
# we want to create new log file and point the step to that name
new_log_filename = f'{cwd}/{step.name}.log'
step_log_filename = new_log_filename
step_logger = None
else:
# either we don't want a log file at all or there is an existing one
# to use. Either way, we don't want a new log filename and we want
# to use the existing logger. The step log filename will be whatever
# is passed as a parameter
step_logger = logger
new_log_filename = None
step.log_filename = step_log_filename
with LoggingContext(name=logger_name, logger=step_logger,
log_filename=new_log_filename) as step_logger:
step.logger = step_logger
os.chdir(step.work_dir)
step_logger.info('')
log_method_call(method=step.constrain_resources, logger=step_logger)
step_logger.info('')
step.constrain_resources(available_resources)
# runtime_setup() will perform small tasks that require knowing the
# resources of the task before the step runs (such as creating
# graph partitions)
step_logger.info('')
log_method_call(method=step.runtime_setup, logger=step_logger)
step_logger.info('')
step.runtime_setup()
if step.args is not None:
step_logger.info('\nBypassing step\'s run() method and running '
'with command line args\n')
log_function_call(function=run_command, logger=step_logger)
step_logger.info('')
run_command(step.args, step.cpus_per_task, step.ntasks,
step.openmp_threads, step.config, step.logger)
else:
step_logger.info('')
log_method_call(method=step.run, logger=step_logger)
step_logger.info('')
step.run()
complete_step_run(step)
missing_files = list()
for output_file in step.outputs:
if not os.path.exists(output_file):
missing_files.append(output_file)
if len(missing_files) > 0:
# We want to indicate that the step failed by removing the pickle
try:
os.remove('step_after_run.pickle')
except FileNotFoundError:
pass
raise OSError(
f'output file(s) missing in step {step.name} in '
f'{step.component.name}/{step.subdir}: {missing_files}')
def _run_step_as_subprocess(logger, step, new_log_file):
"""
Run the requested step as a subprocess
"""
cwd = os.getcwd()
logger_name = step.path.replace('/', '_')
if new_log_file:
log_filename = f'{cwd}/{step.name}.log'
step_logger = None
else:
step_logger = logger
log_filename = None
step.log_filename = log_filename
with LoggingContext(name=logger_name, logger=step_logger,
log_filename=log_filename) as step_logger:
os.chdir(step.work_dir)
step_args = ['polaris', 'serial', '--step_is_subprocess']
check_call(step_args, step_logger)