This lesson is a brief introduction to TOAST and its data representations. This next cell is just initializing some things for the notebook.
# Load common tools for all lessons
import sys
sys.path.insert(0, "..")
from lesson_tools import (
fake_focalplane
)
# Capture C++ output in the jupyter cells
%load_ext wurlitzer
You can get the current TOAST runtime configuration from the "Environment" class.
import toast
env = toast.Environment.get()
print(env)
Before using TOAST for simulation or analysis, it is important to discuss how data is stored in memory and how that data can be distributed among many processes to parallelize large workflows.
First, let's create a fake focalplane of detectors to use throughout this example.
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
# Generate a fake focalplane with 7 pixels, each with 2 detectors.
fp = fake_focalplane()
# Make a plot of this focalplane layout.
detnames = list(sorted(fp.keys()))
detquat = {x: fp[x]["quat"] for x in detnames}
detfwhm = {x: fp[x]["fwhm_arcmin"] for x in detnames}
detlabels = {x: x for x in detnames}
detpolcol = {x: "red" if i % 2 == 0 else "blue" for i, x in enumerate(detnames)}
toast.tod.plot_focalplane(
detquat, 4.0, 4.0, None, fwhm=detfwhm, polcolor=detpolcol, labels=detlabels
)
TOAST works with data organized into observations. Each observation is independent of any other observation. An observation consists of co-sampled detectors for some span of time. The intrinsic detector noise is assumed to be stationary within an observation. Typically there are other quantities which are constant for an observation (e.g. elevation, weather conditions, satellite spin axis, etc).
An observation is just a dictionary with at least one member ("tod") which is an instance of a class that derives from the toast.TOD
base class.
The inputs to a TOD class constructor are at least:
The TOD class can act as a storage container for different "flavors" of timestreams as well as a source and sink for the observation data (with the read*() and write*() methods):
import toast.qarray as qa
nsamples = 1000
obs = dict()
obs["name"] = "20191014_000"
# The type of TOD class is usually specific to the data processing job.
# For example it might be one of the simulation classes or it might be
# a class that loads experiment data. Here we just use a simple class
# that is only used for testing and which reads / writes data to internal memory
# buffers.
tod = toast.tod.TODCache(None, detnames, nsamples, detquats=detquat)
obs["tod"] = tod
# Print the tod to get summary info:
print(tod)
# The TOD class has methods to get information about the data:
print("TOD has detectors {}".format(", ".join(tod.detectors)))
print("TOD has {} total samples for each detector".format(tod.total_samples))
# Write some data. Not every TOD derived class supports writing (for example,
# TOD classes that represent simulations).
def fill_tod(tod, fp):
detnames = tod.detectors
t_delta = 1.0 / fp[detnames[0]]["rate"]
tod.write_times(stamps=np.arange(0.0, nsamples * t_delta, t_delta))
tod.write_boresight(
data=qa.from_angles(
(np.pi / 2) * np.ones(nsamples),
(2 * np.pi / nsamples) * np.arange(nsamples),
np.zeros(nsamples)
)
)
tod.write_position(pos=np.zeros((nsamples, 3), dtype=np.float64))
tod.write_velocity(vel=np.zeros((nsamples, 3), dtype=np.float64))
tod.write_common_flags(flags=np.zeros(nsamples, dtype=np.uint8))
for d in detnames:
tod.write(
detector=d, data=np.random.normal(
scale=fp[d]["NET"],
size=nsamples
)
)
tod.write_flags(
detector=d, flags=np.zeros(nsamples, dtype=np.uint8)
)
fill_tod(tod, fp)
# Read it back
print("TOD timestamps = {} ...".format(tod.read_times()[:5]))
print("TOD boresight = \n{} ...".format(tod.read_boresight()[:5,:]))
for d in detnames:
print("TOD detector {} = {} ...".format(
d, tod.read(detector=d, n=5))
)
print("TOD detector {} flags = {} ...".format(
d, tod.read_flags(detector=d, n=5))
)
# Store some data in the cache. The "cache" member variable looks like a dictionary of
# numpy arrays, but the memory used is allocated in C, so that we can actually clear
# these buffers when needed.
for d in detnames:
processed = tod.read(detector=d)
processed /= 2.0
# By convention, we usually name buffers in the cache by <prefix>_<detector>
tod.cache.put("processed_{}".format(d), processed)
print("TOD cache now contains {} bytes".format(tod.cache.report(silent=True)))
One common use pattern is to "read and cache" data. This happens if we want to keep the data in memory to re-use later. The TOD class has a set of methods that start with the string "local_" that perform this action.
# Get data from cache or read and cache
print("TOD timestamps = {} ...".format(tod.local_times()[:2]))
for d in detnames:
print("TOD detector {} = {} ...".format(
d, tod.local_signal(d)[:2])
)
print("TOD detector {} pointing = {} ...".format(
d, tod.local_pointing(d)[:2,:])
)
print("TOD detector {} flags = {} ...".format(
d, tod.local_flags(d)[:2])
)
A toast.Comm instance takes the global number of processes available (MPI.COMM_WORLD) and divides them into groups. Each process group is assigned one or more observations. Since observations are independent, this means that different groups can be independently working on separate observations in parallel. It also means that inter-process communication needed when working on a single observation can occur with a smaller set of processes.
At NERSC, this notebook is running on a login node, so we cannot use MPI. Constructing a default toast.Comm
whenever MPI use is disabled will just produce a single group of one process. See the parallel example at the end of this notebook for a case with multiple groups.
comm = toast.Comm()
print(comm)
A toast.Data instance is mainly just a list of observations. However remember that each process group will have a different set of observations. Since we have only one group of one process, this example is not so interesting. See the MPI example.
data = toast.Data(comm)
data.obs.append(obs)
Recapping previous sections, we have some groups of processes, each of which has a set of observations. Within a single process group, the detector data is distributed across the processes within the group. That distribution is controlled by the size of the communicator passed to the TOD class, and also by the detranks
parameter of the constructor. This detranks number sets the dimension of the process grid in the detector direction. For example, a value of "1" means that every process has all detectors for some span of time. A value equal to the size of the communicator results in every process having some number of detectors for the entire observation. The detranks parameter must divide evenly into the number of processes in the communicator and determines how the processes are arranged in a grid.
As a concrete example, imagine that MPI.COMM_WORLD has 4 processes. We split this into 2 groups of 2 procesess. There are 3 observations of varying lengths and every group has one or 2 observations. Here is the starting point of our data distribution:
Next we split the processes into 2 groups
Then we assign our observations to the two groups
When we create the TOD class in each observation, we specify how the the data is distributed within each observation. If the detranks
parameter is "1", then the dimension of the process grid in the detector direction is one.
If detranks
is set to the size of the group, then we get:
Once we have our distributed data set up, we usually feed this through a pipeline
. There will be a lesson on pipelines later. Here we will create an entire fake dataset and work with it. The MPI introduction notebook will go into more details about working with distributed data.
comm = toast.Comm()
data = toast.Data(comm)
# Create 3 observations, each containing an TODCache class. We'll
# use the same focalplane and number of samples for each observation,
# but this is not required- each observation is independent.
for i in range(3):
obsname = "observation_{:02d}".format(i)
obs = dict()
obs["name"] = obsname
obs["id"] = "{:02d}".format(i)
obs["tod"] = toast.tod.TODCache(
comm.comm_group, detnames, nsamples, detquats=detquat
)
fill_tod(obs["tod"], fp)
data.obs.append(obs)
# What does our distributed data look like now?
data.info()
Here we see a dump of the distributed data, and in this case the TODCache
class is storing stuff "under the hood", which is why data shows up in the dump of the cache as well as when calling the normal TOD access methods.
Next, we can dump this data to a TIDAS volume (directories of HDF5 files in this case).
import os
import shutil
if toast.tod.tidas_available:
import toast.tod.tidas as tt
datapath = "intro_data"
if os.path.isdir(datapath):
shutil.rmtree(datapath)
exporter = tt.OpTidasExport(
datapath,
tt.TODTidas,
backend="hdf5",
comp="none",
use_todchunks=True,
)
exporter.exec(data)
And then we can load it back in...
if toast.tod.tidas_available:
data = tt.load_tidas(
comm,
comm.group_size,
datapath,
"r",
"detectors",
tt.TODTidas,
distintervals="chunks",
group_dets="detectors",
)
data.info()
There are many utilities in the TOAST package that use compiled code internally. These include:
toast.rng
: Streamed random number generation, with support for generating random samples from any location within a stream.
toast.qarray
: Vectorized quaternion operations.
toast.fft
: API Wrapper around different vendor FFT packages.
toast.cache
: Class for dictionary of C-allocated numpy arrays.
toast.healpix
: Subset of pixel projection routines, simd vectorized and threaded.
toast.timing
: Simple serial timers, global named timers per process, a decorator to time calls to functions, and MPI tools to gather timing statistics from multiple processes.
Here is a quick example of a threaded generation of random numbers drawn from a unit-variance gaussian distribution. Note the "key" pair of uint64 values and the first value of the "counter" pair determine the stream, and the second value of the counter pair is effectively the sample in that stream. We can drawn randoms from anywhere in the stream in a reproducible fashion (i.e. this random generator is stateless). Under the hood, this uses the Random123 package on each thread.
import toast.rng as rng
# Number of random samples
nrng = 10
# Draw randoms from the beginning of a stream
rng1 = rng.random(
nrng, key=[12, 34], counter=[56, 0], sampler="gaussian", threads=True
)
# Draw randoms from some later starting point in the stream
rng2 = rng.random(
nrng, key=[12, 34], counter=[56, 4], sampler="gaussian", threads=True
)
# The returned objects are buffer providers, so can be used like a numpy array.
print("Returned RNG buffers:")
print(rng1)
print(rng2)
# Compare the elements. Note how the overlapping sample indices match. The
# randoms drawn for any given sample agree regardless of the starting sample.
print("------ rng1 ------")
for i in range(nrng):
print("rng1 {}: {}".format(i, rng1[i]))
print("------ rng2 ------")
for i in range(nrng):
print("rng2 {}: {}".format(i + 4, rng2[i]))
The quaternion manipulation functions internally attempt to improve performance using OpenMP SIMD directives and threading in cases where it makes sense. The Python API is modelled after the quaternionarray package (https://github.com/zonca/quaternionarray/). There are functions for common operations like multiplying quaternion arrays, rotating arrays of vectors, converting to and from angle representations, SLERP, etc.
import toast.qarray as qa
# Number points for this example
nqa = 5
# Make some fake rotation data by sweeping through theta / phi / pa angles
theta = np.linspace(0.0, np.pi, num=nqa)
phi = np.linspace(0.0, 2 * np.pi, num=nqa)
pa = np.zeros(nqa)
print("----- input angles -----")
print("theta = ", theta)
print("phi = ", phi)
print("pa = ", pa)
# Convert to quaternions
quat = qa.from_angles(theta, phi, pa)
print("\n----- output quaternions -----")
print(quat)
# Use these to rotate a vector
zaxis = np.array([0.0, 0.0, 1.0])
zrot = qa.rotate(quat, zaxis)
print("\n---- Z-axis rotated by quaternions ----")
print(zrot)
# Rotate different vector by each quaternion
zout = qa.rotate(quat, zrot)
print("\n---- Arbitrary vectors rotated by quaternions ----")
print(zout)
# Multiply two quaternion arrays
qcopy = np.array(quat)
qout = qa.mult(quat, qcopy)
print("\n---- Product of two quaternion arrays ----")
print(qout)
# SLERP quaternions
qtime = 3.0 * np.arange(nqa)
qtargettime = np.arange(3.0 * (nqa - 1) + 1)
qslerped = qa.slerp(qtargettime, qtime, quat)
print("\n---- SLERP input ----")
for t, q in zip(qtime, quat):
print("t = {} : {}".format(t, q))
print("\n---- SLERP output ----")
for t, q in zip(qtargettime, qslerped):
print("t = {} : {}".format(t, q))
The internal FFT functions in TOAST are very limited and focus only on batched 1D Real FFTs. These are used for simulated noise generation and timestream filtering. Internally the compiled code can use either FFTW or MKL for the backend calculation.
# Number of batched FFTs
nbatch = 5
# FFT length
nfft = 65536
# Create some fake data
infft = np.zeros((nbatch, nfft), dtype=np.float64)
for b in range(nbatch):
infft[b, :] = rng.random(nfft, key=[0, 0], counter=[b, 0], sampler="gaussian")
print("----- FFT input -----")
print(infft)
# Forward FFT
outfft = toast.fft.r1d_forward(infft)
print("\n----- FFT output -----")
print(outfft)
# Reverse FFT
backfft = toast.fft.r1d_backward(outfft)
print("\n----- FFT inverse output -----")
print(backfft)
The Cache class provides a mechanism to work around the Python memory pool. There are times when we want to allocate memory and explicitly free it without waiting for garbage collection. Every instance of a toast.Cache
acts as a dictionary of numpy arrays. Internally, the memory of each entry is a flat-packed std::vector with a custom allocator that ensures aligned memory allocation. Aligned memory is required for SIMD operations both in TOAST and in external libraries. Buffers in a Cache instance can be used directly for such operations.
from toast.cache import Cache
# Example array dimensions
cnames = ["c1", "c2"]
cshapes = {
"c1" : (20,),
"c2" : (2, 3, 2)
}
ctyps = {
"c1" : np.float64,
"c2" : np.uint16
}
# A cache instance
cache = Cache()
# Create some empty arrays in the cache
for cn in cnames:
cache.create(cn, ctyps[cn], cshapes[cn])
print("---- Cache object ----")
print(cache)
print("\n---- Now contains ----")
for cn in cnames:
print("{}: {}".format(cn, cache.reference(cn)))
print("Size = ", cache.report(silent=True), " bytes")
# Fill existing buffers
# Get a reference to the buffer
cdata = cache.reference("c1")
# Assign elements.
cdata[:] = np.random.random(cshapes["c1"])
# Delete the reference
del cdata
cdata = cache.reference("c2")
idx = 0
for x in range(cshapes["c2"][0]):
for y in range(cshapes["c2"][1]):
for z in range(cshapes["c2"][2]):
cdata[x, y, z] = idx
idx += 1
del cdata
print("\n---- Contents after filling ----")
for cn in cnames:
print("{}: {}".format(cn, cache.reference(cn)))
print("Size = ", cache.report(silent=True), " bytes")
# We can also "put" existing numpy arrays which will then be copied into
# the cache
np1 = np.random.normal(size=10)
np2 = np.random.randint(0, high=255, dtype=np.uint16, size=12).reshape((2, 3, 2))
cache.put("p1", np1)
cache.put("p2", np2)
print("\n---- Contents after putting numpy arrays ----")
for cn in list(cache.keys()):
print("{}: {}".format(cn, cache.reference(cn)))
print("Size = ", cache.report(silent=True), " bytes")
TOAST includes extensive tests built in to the package. Running all of them takes some time, but you can also run just one test by specifying the name of the file in the toast/tests directory (without the ".py" extension):
import toast.tests
# Run just a couple simple tests in toast/tests/env.py
toast.tests.run("env")
# Now run **ALL** the (serial) tests
# toast.tests.run()