TOAST pipelines are standalone Python scripts that apply one or more TOAST operators to existing or simulated data. They typically divide into two major parts:
data
objectdata
Experiments and simulations typically require specialized methods for creating the data
object. This part is hard to generalize. For the subsequent data processing TOAST provides a number of convenience functions are kept in toast.pipeline_tools.
# Are you using a special reservation for a workshop?
# If so, set it here:
nersc_reservation = None
# Load common tools for all lessons
import sys
sys.path.insert(0, "..")
from lesson_tools import (
check_nersc,
fake_focalplane
)
nersc_host, nersc_repo = check_nersc(reservation=nersc_reservation)
if nersc_host is not None:
%reload_ext slurm_magic
We demonstrate these concepts by writing a very simple pipeline that creates its own observations, fills them with noise and applies a polynomial filter.
%%writefile my_first_pipeline.py
import argparse
import numpy as np
# import lesson_tools
import toast
from toast.mpi import MPI
import toast.pipeline_tools
# Fake focaplane is copied here from lesson_tools to avoid import error
def fake_focalplane(
samplerate=20,
epsilon=0,
net=1,
fmin=0,
alpha=1,
fknee=0.05,
fwhm=30,
npix=7,
fov=3.0
):
"""Create a set of fake detectors.
This generates 7 pixels (14 dets) in a hexagon layout at the boresight
and with a made up polarization orientation.
Args:
None
Returns:
(dict): dictionary of detectors and their properties.
"""
from toast.tod import hex_pol_angles_radial, hex_pol_angles_qu, hex_layout
zaxis = np.array([0, 0, 1.0])
pol_A = hex_pol_angles_qu(npix)
pol_B = hex_pol_angles_qu(npix, offset=90.0)
dets_A = hex_layout(npix, fov, "", "", pol_A)
dets_B = hex_layout(npix, fov, "", "", pol_B)
dets = dict()
for p in range(npix):
pstr = "{:01d}".format(p)
for d, layout in zip(["A", "B"], [dets_A, dets_B]):
props = dict()
props["quat"] = layout[pstr]["quat"]
props["epsilon"] = epsilon
props["rate"] = samplerate
props["alpha"] = alpha
props["NET"] = net
props["fmin"] = fmin
props["fknee"] = fknee
props["fwhm_arcmin"] = fwhm
dname = "{}{}".format(pstr, d)
dets[dname] = props
return dets
def parse_arguments(comm):
""" Declare and parse command line arguments
"""
parser = argparse.ArgumentParser(fromfile_prefix_chars="@")
# pipeline_tools provides arguments to supported operators
toast.pipeline_tools.add_noise_args(parser)
toast.pipeline_tools.add_polyfilter_args(parser)
# The pipeline may add its own arguments
parser.add_argument(
"--nsample",
required=False,
default=10000,
type=np.int,
help="Length of the simulation",
)
args = parser.parse_args()
return args
def create_observations(comm, args):
""" Create the TOAST data object using `args`
This method will look very different for different pipelines.
"""
focalplane = fake_focalplane(samplerate=args.sample_rate, fknee=1.0)
detnames = list(sorted(focalplane.keys()))
detquats = {x: focalplane[x]["quat"] for x in detnames}
tod = toast.tod.TODCache(None, detnames, args.nsample, detquats=detquats)
# Write some auxiliary data
tod.write_times(stamps=np.arange(args.nsample) / args.sample_rate)
tod.write_common_flags(flags=np.zeros(args.nsample, dtype=np.uint8))
for d in detnames:
tod.write_flags(detector=d, flags=np.zeros(args.nsample, dtype=np.uint8))
noise = toast.pipeline_tools.get_analytic_noise(args, comm, focalplane)
observation = {"tod" : tod, "noise" : noise, "name" : "obs-000", "id" : 0}
data = toast.Data(comm)
data.obs.append(observation)
return data
def main():
""" The `main` will instantiate and process the data.
"""
mpiworld, procs, rank, comm = toast.pipeline_tools.get_comm()
args = parse_arguments(comm)
data = create_observations(comm, args)
mc = 0
name = "signal"
tod = data.obs[0]["tod"]
det = tod.local_dets[0]
# Simulate noise, write out one detector
toast.pipeline_tools.simulate_noise(args, comm, data, mc, cache_prefix=name)
tod.local_signal(det, name).tofile("noise.npy")
# Apply polyfilter, write the filtered data
toast.pipeline_tools.apply_polyfilter(args, comm, data, cache_name=name)
tod.local_signal(det, name).tofile("filtered.npy")
if __name__ == "__main__":
main()
! python3 my_first_pipeline.py --help
! python3 my_first_pipeline.py --simulate-noise --polyfilter --poly-order 6 --nsample 10000
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
noise = np.fromfile("noise.npy")
filtered = np.fromfile("filtered.npy")
plt.plot(noise, '.', label="noise")
plt.plot(noise - filtered, '-', label="polynomial")
plt.legend()
! toast_satellite_sim.py --help
main
without the profiling commands:
def main():
env = Environment.get()
mpiworld, procs, rank, comm = get_comm()
args, comm, groupsize = parse_arguments(comm, procs)
# Parse options
if comm.world_rank == 0:
os.makedirs(args.outdir, exist_ok=True)
focalplane, gain, detweights = load_focalplane(args, comm)
data = create_observations(args, comm, focalplane, groupsize)
expand_pointing(args, comm, data)
localpix, localsm, subnpix = get_submaps(args, comm, data)
signalname = None
skyname = simulate_sky_signal(
args, comm, data, [focalplane], subnpix, localsm, "signal"
)
if skyname is not None:
signalname = skyname
diponame = simulate_dipole(args, comm, data, "signal")
if diponame is not None:
signalname = diponame
# Mapmaking
if not args.use_madam:
# THIS BRANCH WILL SOON BE REPLACED WITH THE NATIVE MAPMAKER
else:
# Initialize madam parameters
madampars = setup_madam(args)
# Loop over Monte Carlos
firstmc = args.MC_start
nmc = args.MC_count
for mc in range(firstmc, firstmc + nmc):
# create output directory for this realization
outpath = os.path.join(args.outdir, "mc_{:03d}".format(mc))
simulate_noise(args, comm, data, mc, "tot_signal", overwrite=True)
# add sky signal
add_signal(args, comm, data, "tot_signal", signalname)
if gain is not None:
op_apply_gain = OpApplyGain(gain, name="tot_signal")
op_apply_gain.exec(data)
apply_madam(args, comm, data, madampars, outpath, detweights, "tot_signal")
! toast_ground_sim.py --help
main
without profiling
def main():
mpiworld, procs, rank, comm = get_comm()
args, comm = parse_arguments(comm)
# Initialize madam parameters
madampars = setup_madam(args)
# Load and broadcast the schedule file
schedules = load_schedule(args, comm)
# Load the weather and append to schedules
load_weather(args, comm, schedules)
# load or simulate the focalplane
detweights = load_focalplanes(args, comm, schedules)
# Create the TOAST data object to match the schedule. This will
# include simulating the boresight pointing
data, telescope_data = create_observations(args, comm, schedules)
# Split the communicator for day and season mapmaking
time_comms = get_time_communicators(args, comm, data)
# Expand boresight quaternions into detector pointing weights and
# pixel numbers
expand_pointing(args, comm, data)
# Purge the pointing if we are NOT going to export the
# data to a TIDAS volume
if (args.tidas is None) and (args.spt3g is None):
for ob in data.obs:
tod = ob["tod"]
tod.free_radec_quats()
# Prepare auxiliary information for distributed map objects
_, localsm, subnpix = get_submaps(args, comm, data)
if args.pysm_model:
focalplanes = [s.telescope.focalplane.detector_data for s in schedules]
signalname = simulate_sky_signal(
args, comm, data, focalplanes, subnpix, localsm, "signal"
)
else:
signalname = scan_sky_signal(args, comm, data, localsm, subnpix, "signal")
# Set up objects to take copies of the TOD at appropriate times
totalname, totalname_freq = setup_sigcopy(args)
# Loop over Monte Carlos
firstmc = args.MC_start
nsimu = args.MC_count
freqs = [float(freq) for freq in args.freq.split(",")]
nfreq = len(freqs)
for mc in range(firstmc, firstmc + nsimu):
simulate_atmosphere(args, comm, data, mc, totalname)
# Loop over frequencies with identical focal planes and identical
# atmospheric noise
for ifreq, freq in enumerate(freqs):
# Make a copy of the atmosphere so we can scramble the gains and apply
# frequency-dependent scaling
copy_signal(args, comm, data, totalname, totalname_freq)
scale_atmosphere_by_frequency(
args, comm, data, freq=freq, mc=mc, cache_name=totalname_freq
)
update_atmospheric_noise_weights(args, comm, data, freq, mc)
# Add previously simulated sky signal to the atmospheric noise
add_signal(args, comm, data, totalname_freq, signalname, purge=(nsimu == 1))
mcoffset = ifreq * 1000000
simulate_noise(args, comm, data, mc + mcoffset, totalname_freq)
simulate_sss(args, comm, data, mc + mcoffset, totalname_freq)
scramble_gains(args, comm, data, mc + mcoffset, totalname_freq)
if (mc == firstmc) and (ifreq == 0):
# For the first realization and frequency, optionally
# export the timestream data
output_tidas(args, comm, data, totalname)
output_spt3g(args, comm, data, totalname)
outpath = setup_output(args, comm, mc + mcoffset, freq)
# Bin and destripe maps
apply_madam(
args,
comm,
data,
madampars,
outpath,
detweights,
totalname_freq,
freq=freq,
time_comms=time_comms,
telescope_data=telescope_data,
first_call=(mc == firstmc),
)
if args.apply_polyfilter or args.apply_groundfilter:
# Filter signal
apply_polyfilter(args, comm, data, totalname_freq)
apply_groundfilter(args, comm, data, totalname_freq)
# Bin filtered maps
apply_madam(
args,
comm,
data,
madampars,
outpath,
detweights,
totalname_freq,
freq=freq,
time_comms=time_comms,
telescope_data=telescope_data,
first_call=False,
extra_prefix="filtered",
bin_only=True,
)