Source code for polaris.job

import importlib.resources as imp_res
import os as os

import numpy as np
from jinja2 import Template as Template
from mache.parallel import get_parallel_system
from mache.parallel.pbs import PbsSystem
from mache.parallel.slurm import SlurmSystem


[docs] def write_job_script( config, machine, work_dir, nodes=None, target_cores=None, min_cores=None, target_gpus=None, min_gpus=None, suite='', script_filename=None, run_command=None, ): """ Parameters ---------- config : polaris.config.PolarisConfigParser Configuration options for this test case, a combination of user configs and the defaults for the machine and component machine : {str, None} The name of the machine work_dir : str The work directory where the job script should be written nodes : int, optional The number of nodes for the job. If not provided, it will be calculated based on ``target_cores`` and ``min_cores``. target_cores : int, optional The target number of cores for the job to use if ``nodes`` not provided min_cores : int, optional The minimum number of cores for the job to use if ``nodes`` not provided target_gpus : int, optional The target number of GPUs for the job to use if ``nodes`` not provided min_gpus : int, optional The minimum number of GPUs for the job to use if ``nodes`` not provided suite : str, optional The name of the suite script_filename : str, optional The name of the job script file to write. If not provided, defaults to 'job_script.sh' or 'job_script.{suite}.sh' if suite is specified. run_command : str, optional The command(s) to run in the job script. If not provided, defaults to 'polaris serial {{suite}}'. """ if config.combined is None: config.combine() assert config.combined is not None parallel_system = get_parallel_system(config.combined) requested_nodes = nodes if config.has_option('parallel', 'account'): account = config.get('parallel', 'account') else: account = '' cores_per_node = parallel_system.get_config_int('cores_per_node') gpus_per_node = parallel_system.get_config_int('gpus_per_node', default=0) use_gpu_nodes = False if nodes is None: if target_cores is None or min_cores is None: raise ValueError( 'If nodes is not provided, both target_cores and min_cores ' 'must be provided.' ) use_gpu_nodes = ( gpus_per_node > 0 and target_gpus is not None and min_gpus is not None and max(target_gpus, min_gpus) > 0 ) if use_gpu_nodes: assert target_gpus is not None assert min_gpus is not None gpus = np.sqrt(target_gpus * min_gpus) nodes = int(np.ceil(gpus / gpus_per_node)) nodes = max(nodes, 1) else: if cores_per_node is None: raise ValueError( 'cores_per_node must be set when computing nodes from ' 'CPU resources' ) cores = np.sqrt(target_cores * min_cores) nodes = int(np.ceil(cores / cores_per_node)) nodes = max(nodes, 1) if requested_nodes is None: requested_nodes = nodes min_nodes_allowed = _get_min_nodes_allowed( cores_per_node=cores_per_node, gpus_per_node=gpus_per_node, min_cores=min_cores, min_gpus=min_gpus, ) # Determine parallel system type system = ( config.get('parallel', 'system') if config.has_option('parallel', 'system') else 'single_node' ) render_kwargs: dict[str, str] = {} desired_wall_time = config.get('job', 'wall_time') if system == 'slurm': ( partition, qos, constraint, gpus_per_node, max_wallclock, nodes, ) = SlurmSystem.get_slurm_options( config=config.combined, nodes=nodes, min_nodes_allowed=min_nodes_allowed, ) wall_time = _cap_wall_time(desired_wall_time, max_wallclock) template_name = 'job_script.slurm.template' render_kwargs.update( partition=partition, qos=qos, constraint=constraint, gpus_per_node=gpus_per_node, wall_time=wall_time, ) elif system == 'pbs': ( queue, constraint, gpus_per_node, max_wallclock, filesystems, nodes, ) = PbsSystem.get_pbs_options( config=config.combined, nodes=nodes, min_nodes_allowed=min_nodes_allowed, ) wall_time = _cap_wall_time(desired_wall_time, max_wallclock) template_name = 'job_script.pbs.template' render_kwargs.update( queue=queue, constraint=constraint, gpus_per_node=gpus_per_node, wall_time=wall_time, filesystems=filesystems, ) else: # Do not write a job script for other systems return job_name = config.get('job', 'job_name') if job_name == '<<<default>>>': job_name = f'polaris{f"_{suite}" if suite else ""}' if requested_nodes is not None and requested_nodes != nodes: print( f'Adjusted node count from {requested_nodes} to {nodes} for ' f'machine {machine} based on scheduler node limits.' ) template = Template( imp_res.files('polaris.job').joinpath(template_name).read_text() ) if run_command is None: run_command = f'polaris serial {suite}' if suite else 'polaris serial' run_command = f'source load_polaris_env.sh\n{run_command}' render_kwargs.update( job_name=job_name, account=account, nodes=f'{nodes}', suite=suite, run_command=run_command, ) text = template.render(**render_kwargs) if script_filename is None: script_filename = f'job_script{f".{suite}" if suite else ""}.sh' script_filename = os.path.join(work_dir, script_filename) with open(script_filename, 'w') as handle: handle.write(text)
def _get_min_nodes_allowed( cores_per_node, gpus_per_node, min_cores, min_gpus, ): """Compute the minimum feasible nodes from minimum requested resources.""" minima = [] if ( min_cores is not None and cores_per_node is not None and cores_per_node > 0 ): minima.append(max(int(np.ceil(min_cores / cores_per_node)), 1)) if ( min_gpus is not None and gpus_per_node is not None and gpus_per_node > 0 ): minima.append(max(int(np.ceil(min_gpus / gpus_per_node)), 1)) if len(minima) == 0: return None return max(minima) def _wallclock_to_seconds(wallclock): """Convert HH:MM:SS wall-clock string to total seconds, or None.""" parts = wallclock.split(':') if len(parts) != 3: return None try: return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2]) except ValueError: return None def _cap_wall_time(desired, max_wallclock): """Return desired wall time, capped at max_wallclock if it is smaller. Defaults to desired if max_wallclock is empty or cannot be parsed. """ if not max_wallclock: return desired desired_secs = _wallclock_to_seconds(desired) max_secs = _wallclock_to_seconds(max_wallclock) if desired_secs is None or max_secs is None: return desired if desired_secs <= max_secs: return desired return max_wallclock