import os
import subprocess
import sys
[docs]
def write(work_dir, tasks, config=None, machine=None, baseline_dir=None):
"""
Write a file with provenance, such as the git version, conda packages,
command, and tasks, to the work directory.
This function overwrites any existing provenance file in the work
directory rather than appending, so the provenance reflects the most
recent call to Polaris that used this work directory.
Parameters
----------
work_dir : str
The path to the work directory where the tasks will be set up
tasks : dict
A dictionary describing all of the tasks and their steps
config : polaris.config.PolarisConfigParser, optional
Configuration options for this task, a combination of user configs
and the defaults for the machine and component
machine : str, optional
The machine on which Polaris is being run
baseline_dir : str, optional
The path to the baseline work directory, if any
"""
polaris_git_version = None
if os.path.exists('.git'):
try:
args = ['git', 'describe', '--tags', '--dirty', '--always']
polaris_git_version = subprocess.check_output(args).decode('utf-8')
polaris_git_version = polaris_git_version.strip('\n')
except subprocess.CalledProcessError:
pass
if config is None:
# this is a call to clean and we don't need to document the component
# version
component_git_version = None
else:
component_git_version = _get_component_git_version(config)
try:
args = ['pixi', 'list']
pixi_list = subprocess.check_output(args).decode('utf-8')
except subprocess.CalledProcessError:
pixi_list = None
calling_command = ' '.join(sys.argv)
try:
os.makedirs(work_dir)
except OSError:
pass
provenance_path = f'{work_dir}/provenance'
# Always overwrite to ensure provenance reflects the latest setup/suite
provenance_file = open(provenance_path, 'w')
provenance_file.write(
'**************************************************'
'*********************\n'
)
if polaris_git_version is not None:
provenance_file.write(
f'polaris git version: {polaris_git_version}\n\n'
)
if component_git_version is not None:
provenance_file.write(
f'component git version: {component_git_version}\n\n'
)
provenance_file.write(f'command: {calling_command}\n\n')
# Add readily parsable, PR-friendly metadata discovered at setup time
_write_meta(provenance_file, 'machine', machine)
_write_scheduler_metadata(provenance_file, config)
_write_meta(provenance_file, 'compiler', _get_compiler(config))
_write_meta(provenance_file, 'work directory', work_dir)
_write_meta(provenance_file, 'build directory', _get_build_dir(config))
_write_meta(provenance_file, 'baseline work directory', baseline_dir)
provenance_file.write('tasks:\n')
for _, task in tasks.items():
prefix = ' '
lines = list()
to_print = {
'path': task.path,
'name': task.name,
'component': task.component.name,
'subdir': task.subdir,
}
for key in to_print:
key_string = f'{key}: '.ljust(15)
lines.append(f'{prefix}{key_string}{to_print[key]}')
lines.append(f'{prefix}steps:')
for step in task.steps.values():
if step.name == step.subdir:
lines.append(f'{prefix} - {step.name}')
else:
lines.append(f'{prefix} - {step.name}: {step.subdir}')
lines.append('')
print_string = '\n'.join(lines)
provenance_file.write(f'{print_string}\n')
if pixi_list is not None:
provenance_file.write('pixi list:\n')
provenance_file.write(f'{pixi_list}\n')
provenance_file.write(
'**************************************************'
'*********************\n'
)
provenance_file.close()
def _get_component_git_version(config):
if config.has_option('build', 'branch'):
branch = config.get('build', 'branch')
else:
branch = None
if branch is None or not os.path.exists(branch):
return None
cwd = os.getcwd()
os.chdir(branch)
try:
args = ['git', 'describe', '--tags', '--dirty', '--always']
component_git_version = subprocess.check_output(args).decode('utf-8')
component_git_version = component_git_version.strip('\n')
except subprocess.CalledProcessError:
component_git_version = None
os.chdir(cwd)
return component_git_version
def _get_system(config):
if config is None:
return None
if config.has_option('parallel', 'system'):
return config.get('parallel', 'system')
return None
def _resolve_scheduler_fields(config, system):
"""Resolve partition/qos/constraint (slurm) or just constraint (pbs)."""
partition = None
qos = None
constraint = None
if config is None:
return partition, qos, constraint
if system == 'pbs':
# PBS: only constraint is relevant for our purposes
cons_val = (
config.get('job', 'constraint')
if config.has_option('job', 'constraint')
else None
)
if cons_val and cons_val != '<<<default>>>':
constraint = cons_val
elif config.has_option('parallel', 'constraints'):
cons_list = config.getlist('parallel', 'constraints')
if cons_list:
constraint = cons_list[0]
return partition, qos, constraint
# Default to slurm-like resolution
part_val = (
config.get('job', 'partition')
if config.has_option('job', 'partition')
else None
)
if part_val and part_val != '<<<default>>>':
partition = part_val
elif config.has_option('parallel', 'partitions'):
parts = config.getlist('parallel', 'partitions')
if parts:
partition = parts[0]
qos_val = (
config.get('job', 'qos') if config.has_option('job', 'qos') else None
)
if qos_val and qos_val != '<<<default>>>':
qos = qos_val
elif config.has_option('parallel', 'qos'):
qos_list = config.getlist('parallel', 'qos')
if qos_list:
qos = qos_list[0]
cons_val = (
config.get('job', 'constraint')
if config.has_option('job', 'constraint')
else None
)
if cons_val and cons_val != '<<<default>>>':
constraint = cons_val
elif config.has_option('parallel', 'constraints'):
cons_list = config.getlist('parallel', 'constraints')
if cons_list:
constraint = cons_list[0]
return partition, qos, constraint
def _get_compiler(config):
if config is None:
return None
if config.has_option('deploy', 'compiler'):
val = config.get('deploy', 'compiler')
return val or None
return None
def _get_build_dir(config):
if config is None:
return None
if config.has_option('paths', 'component_path'):
val = config.get('paths', 'component_path')
return val or None
return None
def _write_meta(provenance_file, label, value):
"""Write a simple 'label: value' line if value is provided."""
if value is None:
return
if isinstance(value, str) and value.strip() == '':
return
provenance_file.write(f'{label}: {value}\n\n')
def _write_scheduler_metadata(provenance_file, config):
"""Write partition/qos/constraint metadata when available."""
system = _get_system(config)
partition, qos, constraint = _resolve_scheduler_fields(config, system)
_write_meta(provenance_file, 'partition', partition)
_write_meta(provenance_file, 'qos', qos)
_write_meta(provenance_file, 'constraint', constraint)
# No return value; this function only writes metadata lines