import importlib.resources as imp_res
import os
from typing import Dict, Tuple, Union
import xarray as xr
from mpas_tools.io import write_netcdf
from mpas_tools.vector.reconstruct import reconstruct_variable
from ruamel.yaml import YAML
from polaris import Component
from polaris.ocean.vertical.diagnostics import (
geom_thickness_from_ds,
pseudothickness_from_ds,
)
[docs]
class Ocean(Component):
"""
The collection of all test case for the MPAS-Ocean core
Attributes
----------
model : str
The ocean model being used, either 'mpas-ocean', 'omega', or
'unknown' if no OceanModelStep or OceanIOStep is present in any task
mpaso_to_omega_dim_map : dict
A map from MPAS-Ocean dimension names to their Omega equivalents
mpaso_to_omega_var_map : dict
A map from MPAS-Ocean variable names to their Omega equivalents
"""
[docs]
def __init__(self):
"""
Construct the collection of MPAS-Ocean test cases
"""
super().__init__(name='ocean')
self.model: Union[None, str] = None
self.mpaso_to_omega_dim_map: Union[None, Dict[str, str]] = None
self.mpaso_to_omega_var_map: Union[None, Dict[str, str]] = None
def configure(self, config, tasks):
"""
Configure the component
Parameters
----------
config : polaris.config.PolarisConfigParser
config options to modify
tasks : list of polaris.Task
The tasks to be set up for this component
"""
section = config['ocean']
model = section.get('model')
has_ocean_io_steps, has_ocean_model_steps = (
self._has_ocean_io_model_steps(tasks)
)
if not (has_ocean_model_steps or has_ocean_io_steps):
# No ocean I/O or model steps, so no model detection or build
# needed.
if model == 'detect':
model = 'unknown'
self.model = model
return
if model == 'detect':
model = self._detect_model(config)
print('Detected ocean model:', model)
config.set('ocean', 'model', model)
configs = {'mpas-ocean': 'mpas_ocean.cfg', 'omega': 'omega.cfg'}
if model not in configs:
raise ValueError(f'Unknown ocean model {model}')
config.add_from_package('polaris.ocean', configs[model])
component_path = config.get('paths', 'component_path')
if has_ocean_model_steps:
# we need to try to detect the model and build it if needed
if model == 'omega':
detected = self._detect_omega_build(component_path)
else:
detected = self._detect_mpas_ocean_build(component_path)
if not detected:
# looks like we need to build the model
build = config.getboolean('build', 'build')
if not build:
print(
f'Ocean model {model} not found in '
f'{component_path}, setting build option to True'
)
config.set('build', 'build', 'True', user=True)
if model == 'omega':
self._read_var_map()
self.model = model
[docs]
def map_to_native_model_vars(self, ds):
"""
If the model is Omega, rename dimensions and variables in a dataset
from their MPAS-Ocean names to the Omega equivalent (appropriate for
input datasets like an initial condition)
Parameters
----------
ds : xarray.Dataset
A dataset containing MPAS-Ocean variable names
Returns
-------
ds : xarray.Dataset
The same dataset with variables renamed as appropriate for the
ocean model being run
"""
model = self.model
if model == 'omega':
assert self.mpaso_to_omega_dim_map is not None
rename = {
k: v
for k, v in self.mpaso_to_omega_dim_map.items()
if k in ds.dims
}
assert self.mpaso_to_omega_var_map is not None
rename_vars = {
k: v for k, v in self.mpaso_to_omega_var_map.items() if k in ds
}
rename.update(rename_vars)
ds = ds.rename(rename)
return ds
[docs]
def map_var_list_to_native_model(self, var_list):
"""
If the model is Omega, rename variables from their MPAS-Ocean names to
the Omega equivalent (appropriate for validation variable lists)
Parameters
----------
var_list : list of str
A list of MPAS-Ocean variable names
Returns
-------
renamed_vars : list of str
The same list with variables renamed as appropriate for the
ocean model being run
"""
renamed_vars = var_list
model = self.model
if model == 'omega':
assert self.mpaso_to_omega_var_map is not None
renamed_vars = [
v
for k, v in self.mpaso_to_omega_var_map.items()
if k in var_list
]
return renamed_vars
[docs]
def write_model_dataset(self, ds, filename, config=None):
"""
Write out the given dataset, mapping dimension and variable names from
MPAS-Ocean to Omega names if appropriate
Parameters
----------
ds : xarray.Dataset
A dataset containing MPAS-Ocean variable names
filename : str
The path for the NetCDF file to write
"""
if (
self.model == 'omega'
and 'layerThickness' in ds.keys()
and 'PseudoThickness' not in ds.keys()
and config is not None
):
pseudothickness, spec_vol = pseudothickness_from_ds(
ds, config=config
)
if pseudothickness is not None and spec_vol is not None:
ds['PseudoThickness'] = pseudothickness
ds['SpecVol'] = spec_vol
ds['layerThickness'] = ds['PseudoThickness'].copy()
# After map_to_native_model_vars, the dataset contains
# LayerThickness and PseudoThickness which are both
# pseudo-thickness
ds = self.map_to_native_model_vars(ds)
write_netcdf(ds=ds, fileName=filename)
[docs]
def map_from_native_model_vars(self, ds):
"""
If the model is Omega, rename dimensions and variables in a dataset
from their Omega names to the MPAS-Ocean equivalent (appropriate for
datasets that are output from the model)
Parameters
----------
ds : xarray.Dataset
A dataset containing variable names native to either ocean model
Returns
-------
ds : xarray.Dataset
The same dataset with variables named as expected in MPAS-Ocean
"""
model = self.model
if model == 'omega':
# switch keys and values in mpaso_to_omega maps to get
# omega to mpaso maps
assert self.mpaso_to_omega_dim_map is not None
rename = {
k: v
for v, k in self.mpaso_to_omega_dim_map.items()
if k in ds.dims
}
assert self.mpaso_to_omega_var_map is not None
rename_vars = {
k: v for v, k in self.mpaso_to_omega_var_map.items() if k in ds
}
rename.update(rename_vars)
ds = ds.rename(rename)
return ds
[docs]
def map_var_list_from_native_model(self, var_list):
"""
If the model is Omega, rename variables from their Omega names to
the MPAS-Ocean equivalent
Parameters
----------
var_list : list of str
A list of MPAS-Ocean variable names
Returns
-------
renamed_vars : list of str
The same list with variables renamed as appropriate for the
ocean model being run
"""
renamed_vars = var_list
model = self.model
if model == 'omega':
# switch keys and values in mpaso_to_omega maps to get
# omega to mpaso maps
assert self.mpaso_to_omega_var_map is not None
renamed_vars = [
v
for v, k in self.mpaso_to_omega_var_map.items()
if k in var_list
]
return renamed_vars
[docs]
def open_model_dataset(
self,
filename,
config=None,
mesh_filename=None,
reconstruct_variables=None,
coeffs_filename=None,
**kwargs,
):
"""
Open the given dataset, mapping variable and dimension names from Omega
to MPAS-Ocean names if appropriate
Parameters
----------
filename : str
The path for the NetCDF file to open
config : polaris.Config, optional
The configuration object for the simulation.
mesh_filename : str, optional
Path to the mesh NetCDF file.
reconstruct_variables : list of str, optional
List of variable names to reconstruct in the dataset.
coeffs_reconstruct : xarray.DataArray, optional
Coefficients used for reconstructing variables.
coeffs_filename : str, optional
Path to the coefficients NetCDF file.
kwargs
keyword arguments passed to `xarray.open_dataset()`
Returns
-------
ds : xarray.Dataset
The dataset with variables named as expected in MPAS-Ocean
"""
ds = xr.open_dataset(filename, **kwargs)
if (
self.model == 'omega'
and 'LayerThickness' in ds.keys()
and 'SpecVol' in ds.keys()
and config is not None
):
# Omega's LayerThickness is actually pseudo-thickness
ds['PseudoThickness'] = ds.LayerThickness
ds['LayerThickness'] = geom_thickness_from_ds(ds, config=config)
ds = self.map_from_native_model_vars(ds)
ds = _add_reconstructed_variables_to_dataset(
ds, reconstruct_variables, mesh_filename, coeffs_filename
)
return ds
def _has_ocean_io_model_steps(self, tasks) -> Tuple[bool, bool]:
"""
Determine if any steps in this component descend from OceanIOStep or
OceanModelStep
"""
# local import to avoid circular imports
from polaris.ocean.model.ocean_io_step import OceanIOStep
from polaris.ocean.model.ocean_model_step import OceanModelStep
has_ocean_model_steps = any(
isinstance(step, OceanModelStep)
for task in tasks
for step in task.steps.values()
)
has_ocean_io_steps = any(
isinstance(step, OceanIOStep)
for task in tasks
for step in task.steps.values()
)
return has_ocean_io_steps, has_ocean_model_steps
def _read_var_map(self):
"""
Read the map from MPAS-Ocean to Omega dimension and variable names
"""
package = 'polaris.ocean.model'
filename = 'mpaso_to_omega.yaml'
text = imp_res.files(package).joinpath(filename).read_text()
yaml_data = YAML(typ='rt')
nested_dict = yaml_data.load(text)
self.mpaso_to_omega_dim_map = nested_dict['dimensions']
self.mpaso_to_omega_var_map = nested_dict['variables']
def _detect_model(self, config) -> str:
"""
Detect which ocean model to use
"""
# build config options for each model, so the default component_path
# can be read if it hasn't been overridden
omega_config = config.copy()
omega_config.add_from_package('polaris.ocean', 'omega.cfg')
omega_path = omega_config.get('paths', 'component_path')
mpas_ocean_config = config.copy()
mpas_ocean_config.add_from_package('polaris.ocean', 'mpas_ocean.cfg')
mpas_ocean_path = mpas_ocean_config.get('paths', 'component_path')
if self._detect_omega_build(omega_path):
return 'omega'
elif self._detect_mpas_ocean_build(mpas_ocean_path):
return 'mpas-ocean'
else:
raise ValueError(
f'Could not detect ocean model; neither MPAS-Ocean '
f'nor Omega appear to be available; '
f'searched {omega_path} and {mpas_ocean_path}.'
)
def _detect_omega_build(self, path) -> bool:
"""
Detect if Omega is available
"""
required_files = [
'configs/Default.yml',
'src/omega.exe',
]
path = os.path.abspath(path)
all_found = True
for required_file in required_files:
if not os.path.exists(os.path.join(path, required_file)):
all_found = False
break
return all_found
def _detect_mpas_ocean_build(self, path) -> bool:
"""
Detect if MPAS-Ocean is available
Returns
-------
is_mpas_ocean : bool
True if MPAS-Ocean appears to be available, False otherwise
"""
required_files = [
'default_inputs/namelist.ocean.forward',
'default_inputs/streams.ocean.forward',
'src/Registry_processed.xml',
'ocean_model',
]
path = os.path.abspath(path)
all_found = True
for required_file in required_files:
if not os.path.exists(os.path.join(path, required_file)):
all_found = False
break
return all_found
def _add_reconstructed_variables_to_dataset(
ds, reconstruct_variables, mesh_filename, coeffs_filename
):
"""
Add reconstructed vector variables to the dataset if requested.
Parameters
----------
ds : xarray.Dataset
The dataset to add reconstructed variables to.
reconstruct_variables : list of str or None
List of variable names to reconstruct.
mesh_filename : str
Path to the mesh NetCDF file.
coeffs_filename : str
Path to the coefficients NetCDF file.
Returns
-------
ds : xarray.Dataset
The dataset with reconstructed variables added.
"""
if reconstruct_variables is None:
return ds
if mesh_filename is None:
raise ValueError(
'mesh_filename must be provided to open_model_dataset for '
'variable reconstruction'
)
if coeffs_filename is None:
raise ValueError(
'coeffs_filename must be provided to open_model_dataset for '
'variable reconstruction'
)
for variable in reconstruct_variables:
if variable not in ds:
raise ValueError(
f"User requested vector reconstruction for '{variable}' "
"but it isn't present in the dataset."
)
out_var_name = (
variable.replace('normal', '').lower()
if 'normal' in variable
else variable
)
if f'{out_var_name}Zonal' in ds and f'{out_var_name}Meridional' in ds:
continue
ds_mesh = xr.open_dataset(mesh_filename)
ds_coeff = xr.open_dataset(coeffs_filename)
coeffs_reconstruct = ds_coeff.coeffs_reconstruct
if ds_coeff.sizes['nCells'] != ds_mesh.sizes['nCells']:
print(
f'The sizes of coefficient dataset do not match mesh '
f'dataset; exiting without reconstructing {out_var_name}'
)
continue
reconstruct_variable(
out_var_name,
ds[variable],
ds_mesh,
coeffs_reconstruct,
ds,
quiet=True,
)
return ds
# create a single module-level instance available to other components
ocean = Ocean()