Pipelines

TOAST pipelines are standalone Python scripts that apply one or more TOAST operators to existing or simulated data. They typically divide into two major parts:

  1. Creating the TOAST data object
  2. Applying TOAST operators to the data

Experiments and simulations typically require specialized methods for creating the data object. This part is hard to generalize. For the subsequent data processing TOAST provides a number of convenience functions are kept in toast.pipeline_tools.

In [ ]:
# Are you using a special reservation for a workshop?
# If so, set it here:
nersc_reservation = None

# Load common tools for all lessons
import sys
sys.path.insert(0, "..")
from lesson_tools import (
    check_nersc,
    fake_focalplane
)
nersc_host, nersc_repo = check_nersc(reservation=nersc_reservation)
if nersc_host is not None:
    %reload_ext slurm_magic

Brief example

We demonstrate these concepts by writing a very simple pipeline that creates its own observations, fills them with noise and applies a polynomial filter.

In [ ]:
%%writefile my_first_pipeline.py

import argparse

import numpy as np

# import lesson_tools

import toast
from toast.mpi import MPI
import toast.pipeline_tools


# Fake focaplane is copied here from lesson_tools to avoid import error

def fake_focalplane(
    samplerate=20,
    epsilon=0,
    net=1,
    fmin=0,
    alpha=1,
    fknee=0.05,
    fwhm=30,
    npix=7,
    fov=3.0
):
    """Create a set of fake detectors.

    This generates 7 pixels (14 dets) in a hexagon layout at the boresight
    and with a made up polarization orientation.

    Args:
        None

    Returns:
        (dict):  dictionary of detectors and their properties.

    """
    from toast.tod import hex_pol_angles_radial, hex_pol_angles_qu, hex_layout

    zaxis = np.array([0, 0, 1.0])
    
    pol_A = hex_pol_angles_qu(npix)
    pol_B = hex_pol_angles_qu(npix, offset=90.0)
    
    dets_A = hex_layout(npix, fov, "", "", pol_A)
    dets_B = hex_layout(npix, fov, "", "", pol_B)
    
    dets = dict()
    for p in range(npix):
        pstr = "{:01d}".format(p)
        for d, layout in zip(["A", "B"], [dets_A, dets_B]):
            props = dict()
            props["quat"] = layout[pstr]["quat"]
            props["epsilon"] = epsilon
            props["rate"] = samplerate
            props["alpha"] = alpha
            props["NET"] = net
            props["fmin"] = fmin
            props["fknee"] = fknee
            props["fwhm_arcmin"] = fwhm
            dname = "{}{}".format(pstr, d)
            dets[dname] = props
    return dets


def parse_arguments(comm):
    """ Declare and parse command line arguments
    
    """
    parser = argparse.ArgumentParser(fromfile_prefix_chars="@")
    
    # pipeline_tools provides arguments to supported operators
    toast.pipeline_tools.add_noise_args(parser)
    toast.pipeline_tools.add_polyfilter_args(parser)
    
    # The pipeline may add its own arguments
    parser.add_argument(
        "--nsample", 
        required=False, 
        default=10000,
        type=np.int,
        help="Length of the simulation",
    )
    
    args = parser.parse_args()
    return args


def create_observations(comm, args):
    """ Create the TOAST data object using `args`
    
    This method will look very different for different pipelines.
    """
    focalplane = fake_focalplane(samplerate=args.sample_rate, fknee=1.0)
    detnames = list(sorted(focalplane.keys()))
    detquats = {x: focalplane[x]["quat"] for x in detnames}
    tod = toast.tod.TODCache(None, detnames, args.nsample, detquats=detquats)
    
    # Write some auxiliary data
    tod.write_times(stamps=np.arange(args.nsample) / args.sample_rate)
    tod.write_common_flags(flags=np.zeros(args.nsample, dtype=np.uint8))
    for d in detnames:
        tod.write_flags(detector=d, flags=np.zeros(args.nsample, dtype=np.uint8))
    
    noise = toast.pipeline_tools.get_analytic_noise(args, comm, focalplane)
    
    observation = {"tod" : tod, "noise" : noise, "name" : "obs-000", "id" : 0}
    
    data = toast.Data(comm)
    data.obs.append(observation)
    
    return data


def main():
    """ The `main` will instantiate and process the data.
    
    """
    mpiworld, procs, rank, comm = toast.pipeline_tools.get_comm()
    args = parse_arguments(comm)
    data = create_observations(comm, args)
    
    mc = 0
    name = "signal"
    tod = data.obs[0]["tod"]
    det = tod.local_dets[0]
    
    # Simulate noise, write out one detector
    toast.pipeline_tools.simulate_noise(args, comm, data, mc, cache_prefix=name)
    tod.local_signal(det, name).tofile("noise.npy")
    
    # Apply polyfilter, write the filtered data
    toast.pipeline_tools.apply_polyfilter(args, comm, data, cache_name=name)
    tod.local_signal(det, name).tofile("filtered.npy")
    

if __name__ == "__main__":
    main()
In [ ]:
! python3 my_first_pipeline.py --help
In [ ]:
! python3 my_first_pipeline.py --simulate-noise --polyfilter --poly-order 6 --nsample 10000
In [ ]:
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np

noise = np.fromfile("noise.npy")
filtered = np.fromfile("filtered.npy")
plt.plot(noise, '.', label="noise")
plt.plot(noise - filtered, '-', label="polynomial")
plt.legend()

Satellite simulation pipeline

pipelines/toast_satellite_sim.py

In [ ]:
! toast_satellite_sim.py --help

main without the profiling commands:

def main():
    env = Environment.get()

    mpiworld, procs, rank, comm = get_comm()
    args, comm, groupsize = parse_arguments(comm, procs)

    # Parse options                                                                                          

    if comm.world_rank == 0:
        os.makedirs(args.outdir, exist_ok=True)

    focalplane, gain, detweights = load_focalplane(args, comm)

    data = create_observations(args, comm, focalplane, groupsize)

    expand_pointing(args, comm, data)

    localpix, localsm, subnpix = get_submaps(args, comm, data)

    signalname = None
    skyname = simulate_sky_signal(
        args, comm, data, [focalplane], subnpix, localsm, "signal"
    )
    if skyname is not None:
        signalname = skyname

    diponame = simulate_dipole(args, comm, data, "signal")
    if diponame is not None:
        signalname = diponame

    # Mapmaking                                                                                         

    if not args.use_madam:
        # THIS BRANCH WILL SOON BE REPLACED WITH THE NATIVE MAPMAKER
    else:
        # Initialize madam parameters                                                                                          

        madampars = setup_madam(args)

        # Loop over Monte Carlos                                                                                         

        firstmc = args.MC_start
        nmc = args.MC_count

        for mc in range(firstmc, firstmc + nmc):
            # create output directory for this realization
            outpath = os.path.join(args.outdir, "mc_{:03d}".format(mc))

            simulate_noise(args, comm, data, mc, "tot_signal", overwrite=True)

            # add sky signal
            add_signal(args, comm, data, "tot_signal", signalname)

            if gain is not None:
                op_apply_gain = OpApplyGain(gain, name="tot_signal")
                op_apply_gain.exec(data)

            apply_madam(args, comm, data, madampars, outpath, detweights, "tot_signal")

Ground simulation pipeline

pipelines/toast_ground_sim.py

In [ ]:
! toast_ground_sim.py --help

main without profiling

def main():
    mpiworld, procs, rank, comm = get_comm()

    args, comm = parse_arguments(comm)

    # Initialize madam parameters                                                                                          

    madampars = setup_madam(args)

    # Load and broadcast the schedule file                                                                                         

    schedules = load_schedule(args, comm)

    # Load the weather and append to schedules                                                                                         

    load_weather(args, comm, schedules)

    # load or simulate the focalplane                                                                                          

    detweights = load_focalplanes(args, comm, schedules)

    # Create the TOAST data object to match the schedule.  This will                                                                                         
    # include simulating the boresight pointing                                                                                         

    data, telescope_data = create_observations(args, comm, schedules)

    # Split the communicator for day and season mapmaking                                                                                          

    time_comms = get_time_communicators(args, comm, data)

    # Expand boresight quaternions into detector pointing weights and                                                                                          
    # pixel numbers                                                                                          

    expand_pointing(args, comm, data)

    # Purge the pointing if we are NOT going to export the                                                                                         
    # data to a TIDAS volume                                                                                         
    if (args.tidas is None) and (args.spt3g is None):
        for ob in data.obs:
            tod = ob["tod"]
            tod.free_radec_quats()

    # Prepare auxiliary information for distributed map objects                                                                                          

    _, localsm, subnpix = get_submaps(args, comm, data)

    if args.pysm_model:
        focalplanes = [s.telescope.focalplane.detector_data for s in schedules]
        signalname = simulate_sky_signal(
            args, comm, data, focalplanes, subnpix, localsm, "signal"
        )
    else:
        signalname = scan_sky_signal(args, comm, data, localsm, subnpix, "signal")

    # Set up objects to take copies of the TOD at appropriate times                                                                                          

    totalname, totalname_freq = setup_sigcopy(args)

    # Loop over Monte Carlos

    firstmc = args.MC_start
    nsimu = args.MC_count

    freqs = [float(freq) for freq in args.freq.split(",")]
    nfreq = len(freqs)

    for mc in range(firstmc, firstmc + nsimu):

        simulate_atmosphere(args, comm, data, mc, totalname)

        # Loop over frequencies with identical focal planes and identical                                                                                          
        # atmospheric noise                                                                                         

        for ifreq, freq in enumerate(freqs):
            # Make a copy of the atmosphere so we can scramble the gains and apply                                                                                         
            # frequency-dependent scaling

            copy_signal(args, comm, data, totalname, totalname_freq)

            scale_atmosphere_by_frequency(
                args, comm, data, freq=freq, mc=mc, cache_name=totalname_freq
            )

            update_atmospheric_noise_weights(args, comm, data, freq, mc)

            # Add previously simulated sky signal to the atmospheric noise                                                                                          

            add_signal(args, comm, data, totalname_freq, signalname, purge=(nsimu == 1))

            mcoffset = ifreq * 1000000

            simulate_noise(args, comm, data, mc + mcoffset, totalname_freq)

            simulate_sss(args, comm, data, mc + mcoffset, totalname_freq)

            scramble_gains(args, comm, data, mc + mcoffset, totalname_freq)

            if (mc == firstmc) and (ifreq == 0):
                # For the first realization and frequency, optionally                                                                                                                                                                                                                 
                # export the timestream data

                output_tidas(args, comm, data, totalname)
                output_spt3g(args, comm, data, totalname)

            outpath = setup_output(args, comm, mc + mcoffset, freq)

            # Bin and destripe maps                                                                                          

            apply_madam(
                args,
                comm,
                data,
                madampars,
                outpath,
                detweights,
                totalname_freq,
                freq=freq,
                time_comms=time_comms,
                telescope_data=telescope_data,
                first_call=(mc == firstmc),
            )

            if args.apply_polyfilter or args.apply_groundfilter:

                # Filter signal                                                                                          

                apply_polyfilter(args, comm, data, totalname_freq)

                apply_groundfilter(args, comm, data, totalname_freq)

                # Bin filtered maps                                                                                          

                apply_madam(
                    args,
                    comm,
                    data,
                    madampars,
                    outpath,
                    detweights,
                    totalname_freq,
                    freq=freq,
                    time_comms=time_comms,
                    telescope_data=telescope_data,
                    first_call=False,
                    extra_prefix="filtered",
                    bin_only=True,
                )