import argparse
import importlib.resources as imp_res
from collections import OrderedDict
from typing import Dict
from jinja2 import Template
from lxml import etree
from ruamel.yaml import YAML
[docs]class PolarisYaml:
"""
A class for reading writing and combining config files in yaml format
(e.g. as used in OMEGA).
Attributes
----------
configs : dict
Nested dictionaries containing config options
streams : dict
Nested dictionaries containing data about streams
model : str
The name of the E3SM component
"""
[docs] def __init__(self):
"""
Create a yaml config object
"""
self.configs = dict()
self.streams = dict()
self.model = None
[docs] @classmethod
def read(cls, filename, package=None, replacements=None):
"""
Add config options from a yaml file
Parameters
----------
filename : str
A template yaml config file
package : str, optional
The name of a package the filename is found in
replacements : dict, optional
A dictionary of replacements, which, if provided, is used to
replace Jinja variables and the yaml file is assumed do be a Jinja
template
Returns
-------
yaml : polaris.yaml.PolarisYaml
A yaml object read in from the given file (and optionally package)
"""
# read the text from a file (possibly in a package)
if package is not None:
text = imp_res.files(package).joinpath(filename).read_text()
else:
with open(filename, 'r') as infile:
text = infile.read()
# if this is a jinja template, render the template with the
# replacements
if replacements is not None:
template = Template(text)
text = template.render(**replacements)
yaml = cls()
yaml_data = YAML(typ='rt')
configs = yaml_data.load(text)
keys = list(configs)
if len(keys) > 1:
raise ValueError(
f'Config yaml file contains unexpected sections:\n '
f'{keys[1:]}')
yaml.model = keys[0]
yaml.streams = dict()
configs = configs[yaml.model]
if 'streams' in configs:
yaml.streams = configs['streams']
configs = dict(configs)
configs.pop('streams')
yaml.configs = configs
return yaml
[docs] def update(self, configs, quiet=True):
"""
Add config options from a dictionary
Parameters
----------
configs : dict
A nested dictionary of config sections, options and values
quiet : bool, optional
Whether or not to print the updated config options as they are
replaced
"""
if self.model in configs:
# we want one layer deeper
configs = configs[self.model]
_update_section(configs, self.configs, quiet)
[docs] def write(self, filename):
"""
Write config options to a yaml file
Parameters
----------
filename : str
A yaml config file
"""
yaml = YAML(typ='rt')
configs = dict(self.configs)
if self.streams:
configs['streams'] = self.streams
model_configs = dict()
model_configs[self.model] = configs
with open(filename, 'w') as outfile:
yaml.dump(model_configs, outfile)
def _add_stream(self, stream_name, stream):
"""
Add stream from a dictionary
"""
self.streams[stream_name] = stream
[docs]def mpas_namelist_and_streams_to_yaml(model, namelist_template=None,
namelist=None,
streams=None):
"""
Add config options from a yaml file
Parameters
----------
model : str
The name of the model
namelist_template : str
An MPAS namelist template file
namelist : str, optional
An MPAS namelist file
streams : str, optional
An MPAS streams file
Returns
-------
yaml : polaris.yaml.PolarisYaml
A yaml object with the namelists and streams
"""
yaml = PolarisYaml()
yaml.model = model
if namelist is not None:
yaml.configs = _read_namelist(namelist_template, namelist)
if streams is not None:
yaml.streams = _streams_xml_to_dict(streams)
return yaml
[docs]def main_mpas_to_yaml():
parser = argparse.ArgumentParser(
description='Convert a namelist and/or streams file to yaml')
parser.add_argument("-n", "--namelist", dest="namelist",
required=False,
help="MPAS namelist file")
parser.add_argument("-s", "--streams", dest="streams",
required=False,
help="MPAS streams file")
parser.add_argument("-t", "--namelist_template", dest="namelist_template",
required=False,
help="MPAS namelist template file (with all namelist "
"options). For MPAS-Ocean, this will typically be"
" ${PATH_TO_MPASO}/default_inputs/"
"namelist.forward.ocean")
parser.add_argument("-y", "--yaml", dest="yaml",
required=True,
help="Output yaml file")
parser.add_argument("-m", "--model", dest="model", default='omega',
help="Model name for the yaml")
args = parser.parse_args()
yaml = mpas_namelist_and_streams_to_yaml(
model=args.model,
namelist_template=args.namelist_template,
namelist=args.namelist,
streams=args.streams)
yaml.write(args.yaml)
[docs]def yaml_to_mpas_streams(processed_registry_filename, yaml):
"""
Add config options from a yaml file
Parameters
----------
processed_registry_filename : str
The processed registry file, used to determine the types of variables
each steam (since the yaml format doesn't supply that information).
yaml : polaris.yaml.PolarisYaml
A yaml object with the namelists and streams
Returns
-------
tree : lxml.etree
A tree of XML data describing MPAS i/o streams with the content from
the streams in the yaml file
"""
with open(processed_registry_filename, 'r') as reg_file:
registry_string = reg_file.read()
registry_string = registry_string.lstrip('\n')
registry = etree.fromstring(registry_string)
root = etree.Element('streams')
for stream in yaml.streams:
# find out if stream or immutable_stream
tag = _get_stream_tag(registry, stream)
attrs = dict(yaml.streams[stream])
contents = None
if 'contents' in attrs:
contents = attrs.pop('contents')
attrs['name'] = stream
child = etree.SubElement(root, tag, attrib=attrs)
if contents is not None:
for var in contents:
# find out what type it has
tag = _get_var_tag(registry, var)
etree.SubElement(child, tag, attrib=dict(name=var))
tree = etree.ElementTree(element=root)
return tree
def _update_section(src, dst, quiet, print_section=None):
"""
Recursively update config options in a section of a config from a source
section to the associate destination dictionary
"""
for name in src:
if isinstance(src[name], (dict, OrderedDict)):
if print_section is not None:
print_subsection = f'{print_section}: {name}'
else:
print_subsection = name
if name not in dst:
raise ValueError(
f'Attempting to modify config options to a '
f'nonexistent config\n'
f'(sub)section: {print_subsection}')
# this is a subsection
src_sub = src[name]
dst_sub = dst[name]
_update_section(src_sub, dst_sub, quiet, print_subsection)
else:
if name not in dst:
raise ValueError(
f'Attempting to modify a nonexistent config '
f'options:{print_section}: {name}')
if not quiet:
print(f' {print_section}: {name} = {src[name]}')
dst[name] = src[name]
def _read_namelist(namelist_template, namelist_filename):
""" Read the defaults file """
record_map = _read_namelist_template(namelist_template)
with open(namelist_filename, 'r') as f:
lines = f.readlines()
namelist: Dict[str, Dict[str, int | float | bool | str]] = dict()
for line in lines:
_, opt, value = _read_namelist_line(line)
if opt is not None and value is not None:
record = record_map[opt]
if record not in namelist:
namelist[record] = dict()
namelist[record][opt] = value
return namelist
def _read_namelist_template(namelist_template):
""" Read the defaults file """
with open(namelist_template, 'r') as f:
lines = f.readlines()
record_map: Dict[str, str] = dict()
record = None
for line in lines:
new_record, opt, _ = _read_namelist_line(line)
if new_record is not None:
record = new_record
elif opt is not None and record is not None:
record_map[opt] = record
return record_map
def _read_namelist_line(line):
record = None
opt = None
value: int | float | bool | str | None = None
if '&' in line:
record = line.strip('&').strip('\n').strip()
elif '=' in line:
opt, val = line.strip('\n').split('=')
opt = opt.strip()
str_value = \
val.strip().strip('\"').strip('\'').strip()
try:
value = int(str_value)
except ValueError:
try:
value = float(str_value)
except ValueError:
if str_value in ['true', '.true.']:
value = True
elif str_value in ['false', '.false.']:
value = False
if value is None:
value = str_value
return record, opt, value
def _streams_xml_to_dict(streams_filename):
""" Convert a streams XML file to nested dictionaries """
streams: Dict[str, Dict[str, str | list]] = dict()
tree = etree.parse(streams_filename)
xml_streams = next(tree.iter('streams'))
for child in xml_streams:
if child.tag not in ['stream', 'immutable_stream']:
raise ValueError(f'Unexpected tag {child.tag} instead of stream or'
f'immutable stream')
stream_name = child.attrib['name']
streams[stream_name] = dict()
for attr, value in child.attrib.items():
if attr != 'name':
streams[stream_name][attr] = value
contents = list()
for grandchild in child:
if grandchild.tag == 'var':
contents.append(grandchild.attrib['name'])
elif grandchild.tag == 'var_struct':
contents.append(grandchild.attrib['name'])
elif grandchild.tag == 'var_array':
contents.append(grandchild.attrib['name'])
elif grandchild.tag == 'stream':
contents.append(grandchild.attrib['name'])
else:
raise ValueError(f'Unexpected tag {grandchild.tag}')
if len(contents) > 0:
streams[stream_name]['contents'] = contents
return streams
def _get_stream_tag(registry, stream):
""" Get the xml tag, 'stream' or 'immutable_stream' for a given stream """
streams = next(next(registry.iter('registry')).iter('streams'))
# if we don't find the stream, it can't be an immutable stream
tag = 'stream'
for child in streams:
if child.tag == 'stream' and child.attrib['name'] == stream:
if 'immutable' in child.attrib and \
child.attrib['immutable'] == 'true':
tag = 'immutable_stream'
break
return tag
def _get_var_tag(registry, variable):
"""
Get the xml tag -- 'stream', 'var_struct', 'var_array' or 'var' -- for a
variable
"""
tag = None
streams = next(next(registry.iter('registry')).iter('streams'))
for child in streams:
if child.tag == 'stream' and child.attrib['name'] == variable:
return 'stream'
tree = next(registry.iter('registry'))
for child in tree:
if child.tag == 'var_struct':
if child.attrib['name'] == variable:
return 'var_struct'
for grandchild in child:
if grandchild.tag in ['var_struct', 'var_array', 'var'] and \
grandchild.attrib['name'] == variable:
return grandchild.tag
if grandchild.tag in ['var_struct', 'var_array']:
for greatgrand in grandchild:
if greatgrand.tag in ['var_array', 'var'] and \
greatgrand.attrib['name'] == variable:
return greatgrand.tag
if tag is None:
raise ValueError(f'Could not find {variable} in preprocessed registry')
return tag