This just uses a couple of basic operators to show off the upcoming data and processing APIs.
import os
import tempfile
import numpy as np
import toast
from toast import qarray as qa
from toast import (
Telescope,
Focalplane,
Observation,
load_config,
dump_config,
default_config,
create,
)
from toast.future_ops import (
Pipeline,
SimSatellite,
SimNoise,
DefaultNoiseModel
)
The basic data model is a set of Observation instances, each of which is associated with a Focalplane on a Telescope. Note that a Focalplane instance is probably just a sub-set of detectors on the actual physical focalplane. These detectors must be co-sampled and likely have other things in common (for example, they are on the same wafer or are correlated in some other way). For this example, we will manually create these objects, but usually these will be loaded / created by some experiment-specific function.
MPI is optional in TOAST, although it is required to achieve good parallel performance on traditional CPU systems. In this section we show how interactive use of TOAST can be done without any reference to MPI. In a later section we show how to make use of distributed data and operations.
# help(Observation)
# help(Focalplane)
# help(Telescope)
sample_rate = 10 # Hz
detector_names = [
"det_00_A",
"det_00_B"
]
zaxis = np.array([0.0, 0.0, 1.0])
detector_properties = {
"det_00_A": {
"fsample": sample_rate,
"NET": 50.0,
"quat": qa.rotation(zaxis, 0.0),
"fmin": 1.0e-5,
"fknee": 0.05,
"alpha": 1.0,
"pol_leakage": 0.0,
},
"det_00_B": {
"fsample": sample_rate,
"NET": 50.0,
"quat": qa.rotation(zaxis, np.pi/2),
"fmin": 1.0e-5,
"fknee": 0.05,
"alpha": 1.0,
"pol_leakage": 0.0,
}
}
focalplane = Focalplane(
detector_data=detector_properties,
sample_rate=sample_rate
)
telescope = Telescope(name="fake", focalplane=focalplane)
# Make an empty observation
samples = 10
obs = Observation(telescope, name="2020-07-31_A", samples=samples)
print(obs)
By default, the observation is empty. You can add arbitrary metadata to the observation- it acts just like a dictionary.
hk = {
"Temperature 1": np.array([1.0, 2.0, 3.0]),
"Other Sensor": 1.2345
}
obs["housekeeping"] = hk
print(obs)
Now we can add some Time Ordered Data to this observation. There are basically two types of data: timestreams of information that all detectors have in common (telescope boresight, etc) and timestreams of detector data (signals and flags). Although an Observation acts like a dictionary that can hold arbitrary keys, there are some standard built-in names for TOD quantities that are used by the Operator classes. You can also create other custom types of data. To see the built-in names, you can do:
print(obs.keynames)
These underlying names can be overridden at construction time if you like.
Detector data has some unique properties that we often want to leverage in our analyses. Each process has some detectors and some time slice of the observation. In the case of a single process like this example, all the data is local. Before using data we need to create it within the empty Observation. Here we create the default SIGNAL data:
obs.create_signal()
print(obs.signal())
This special DetectorData class is a table that can be indexed either by name or by index. You can set and get values as usual:
sig = obs.signal()
sig["det_00_A"] = np.arange(samples, dtype=np.float64)
sig[1] = 10.0 * np.arange(samples, dtype=np.float64)
print(obs.signal())
print(sig[:])
print(sig["det_00_A", "det_00_B"])
print(sig[1][1:5])
This showed the creation of the default "SIGNAL" detector data, but you can create other types of data. For example, lets say you wanted to create some detector pointing matrix values consisting of a 64bit integer pixel number and three 32bit floats for the I/Q/U weights:
obs.create_detector_data("pixels", shape=(samples,), dtype=np.int64)
obs.create_detector_data("weights", shape=(samples, 3), dtype=np.float32)
print(obs["pixels"])
print(obs["weights"])
weights = obs["weights"]
for d in obs.detectors:
for s in range(samples):
weights[d][s] = [1.0, 0.5, 0.5]
print(obs["weights"])
There are some types of timestreams which all detectors have in common within the observation. These include things like telescope pointing, timestamps, and other quantities. We want all processes to have access to these quantities. However, this type of data is usually stored once and then read many times. We use shared memory on the node to store this data to avoid duplicating it for every process. For this simple serial example, the details are not important. The main thing is to use a special method when creating these buffers in the observation. For example:
obs.create_times()
obs.create_boresight_radec()
obs.create_common_flags()
# This accesses the timestamps regardless of the underlying
# dictionary key and checks that the underlying buffer has
# the right dimensions.
print(obs.times())
times = obs.times()
print(times[0:5])
The create_*() methods create these shared memory objects of the correct default dimensions and type. You can also create completely custom timestream data (see advanced topics below).
After creating the shared buffer, we used the observation method times() to return the timestamps. There are similar methods for all the "standard" observation data products (boresight_radec(), signal(), etc). The benefit to using these methods instead of accessing the internal dictionary key directly is that there are checks on the shapes of the underlying objects to ensure consistency. Also, an operator does not have to know the name of the underlying dictionary key, which might be different between experiments.
These shared data objects have a set() method used to write to them. This is more important when using MPI. In the serial case, you can just do:
times.set(np.arange(samples, dtype=np.float64))
print(times[:])
The Observation instances discussed previously are usually stored as a list inside a top-level container class called Data. This class also stores the TOAST MPI communicator information. For this serial example you can just instantiate an empty Data class and add things to the observation list:
data = toast.Data()
print(data)
print(data.obs)
Obviously this Data object has no observations yet. We'll fix that in the next section!
The Operator class defines the interfaces for operators working on data. Each operator has methods that describe what valid options it takes and observation dictionary keys it uses for input and output. An operator has an exec() method that works with Data objects. We will start by looking at the SimSatellite operator to simulate fake telescope scan strategies for a generic satellite. We can always see the options and default values by using the class method called help():
SimSatellite.help()
These options above are passed as a dictionary into the constructor of the class. The dictionary can be created manually or loaded from a file (see config file section below). For now, we can start by getting a dictionary of default options:
simsat_defaults = SimSatellite.defaults()
print(simsat_defaults)
This special class, ObjectConfig, is basically just a dictionary where every key has a corresponding help string. We can set values in this while leaving the help string alone. The help string is written out when dumping to config files. The only value we are going to modify is the Telescope instance that we need to pass in. For now, we'll generate one using the helper functions from the toast unit tests:
# We should make this more general and put it in the main package...
from toast.tests._helpers import boresight_focalplane
fp = boresight_focalplane(4, future=True)
tele = Telescope("fake", focalplane=fp)
Ok, now we can add this to our config and create our operator:
simsat_defaults["telescope"] = tele
simsat = SimSatellite(simsat_defaults)
print(simsat)
And now we have an Operator that is ready to use. This particular operator creates observations from scratch with telescope properties generated and stored. We can create an empty Data object and then run this operator on it:
data = toast.Data()
simsat.exec(data)
print(data)
data.info()
print("There are {} observations".format(len(data.obs)))
print(data.obs[0])
for ob in data.obs:
print(ob.times()[:5])
print(ob.boresight_radec()[:5])
So we see that our SimSatellite operator has created just one observation of 360 samples in the Data object. We can feed this tiny dataset to further operators to simulate signals or process the data. Let's now simulate some noise timestreams for our detectors. First we need to create a "noise model" for our detectors. We can bootstrap this process by making a noise model from the nominal detector properties in the focalplane:
DefaultNoiseModel.help()
noise_model_config = DefaultNoiseModel.defaults()
print(noise_model_config)
noise_model = DefaultNoiseModel(noise_model_config)
noise_model.exec(data)
Now we are ready to use the SimNoise operator to simulate some timestreams:
SimNoise.help()
In this case, we can just use all the default options. The default noise model was created by the SimSatellite operator and if we don't specify the out key this operator just writes to the default detector data ("SIGNAL").
noise_config = SimNoise.defaults()
print(noise_config)
# Create the operator
sim_noise = SimNoise(noise_config)
# Run it on the data
sim_noise.exec(data)
data.info()
Notice that the observation now has some signal. Let's look at that:
# print(data.obs[0].signal())
# Just look at few samples from one detector in the first observation
print(data.obs[0].signal()["d03"][:5])
There is a special Operator class called Pipeline which serves as a way to group other operators together and run them in sequence (possibly running them on only a few detectors at a time). The default is to run the list of operators on the full Data object in one shot. The Pipeline class has the usual way of getting the defaults:
Pipeline.help()
We'll see more about this Operator below.
We saw above how operators are constructed with a dictionary of parameters. You can do everything by passing parameters when constructing operators. Configuration files are completely optional, but they do allow easy sharing of complicated pipeline setups.
These parameters can be loaded from one or more files and used to automatically construct operators for use. When doing this, each instance of an operator is given a "name" that can be used to reference it later. This way you can have multiple operators of the same class doing different things within your pipeline. If you have a script where you know which operators you are going to be using, you can get the defaults for the whole list at once:
ops = {
"sim_satellite": SimSatellite,
"noise_model": DefaultNoiseModel,
"sim_noise": SimNoise
}
conf = default_config(operators=ops)
print(conf)
We can dump this to a file and look at it:
tmpdir = tempfile.mkdtemp()
conf_file = os.path.join(tmpdir, "test.toml")
dump_config(conf_file, conf)
!cat {conf_file}
... and we can also load it back in:
newconf = load_config(conf_file)
print(newconf)
What if we wanted to add a Pipeline to this configuration that reference the names of the two operators to use? We can do that using a special syntax which consists of @config: followed by a UNIX-style path to the object we are referencing. For example:
# Get the default Pipeline config
sim_pipe_config = Pipeline.defaults()
print(sim_pipe_config)
# Add references to the operators
sim_pipe_config["operators"] = [
"@config:/operators/sim_satellite",
"@config:/operators/noise_model",
"@config:/operators/sim_noise"
]
# Add the pipeline config to the main config
newconf["operators"]["sim_pipe"] = sim_pipe_config
print(newconf)
Now we could dump this to a file for later use. What if we wanted to go and create operators from this? We could loop through each key in the "operators" dictionary and instantiate the class with the config values. However, there is a helper function that does this. Before doing that we need to add a Telescope to this config for the satellite simulation. Normally this would be done by some experiment-specific script that would create a more custom telescope / focalplane.
newconf["operators"]["sim_satellite"]["telescope"] = tele
Now instantiate all the operators in one go:
run = create(newconf)
print(run)
Although the result looks similar, look more closely. Our dictionary of configuration options is now actually a dictionary of instantiated classes. We can now run these operators directly:
data = toast.Data()
# Run the Pipeline operator, which in turn runs 2 other operators
run["operators"]["sim_pipe"].exec(data)
data.info()
# Print the first 5 samples of one detector in the first observation.
data.obs[0].signal()["d00"][:5]
The previous sections covered the Observation container and its interfaces, and how to create and run Operators on a Data object containing a list of observations. The new data model has some aspects that improve our situation on larger runs.
Earlier we saw how the MPI shared memory objects created in an Observation are used to store things that are common to all detectors (boresight pointing, telescope velocity, etc). These quantities have defaults for the shape, dtype, and the communicator used. In the case of these common properties, the "grid column communicator" is used. This includes all processes in the observation that share a common slice of time. The result is that only one copy of these common data objects exist on each node, regardless of how many processes are running on the node.
However, we can create completely custom shared memory objects. Imagine that every single process needed some common telescope data to be able to work with its local signal. We could create a shared object on the group communicator used for the whole observation:
samples = 10
obs = Observation(telescope, name="2020-07-31_A", samples=samples)
# This is the same for every process, regardless of location in the process grid
obs.create_shared_data(
"same_for_every_process",
shape=(100, 100),
dtype=np.float64,
comm=obs.mpicomm
)
In another example, suppose that we had some detector-specific quantities (beams, bandpasses, etc) shared by all processes with data from a given detector. We can store that in shared memory using the "grid row communicator" of the process grid, so that we only have one copy of those products per node:
# This is the same for every process along a row of the grid
# so these processes all have the same detectors.
obs.create_shared_data(
"detector_aux_data",
shape=(len(obs.local_detectors), 100),
dtype=np.float64,
comm=obs.grid_comm_row
)
The use of MPI shared memory should greatly reduce our memory footprint when running many MPI processes per node.
The Pipeline operator is used to chain other operators together and can internally feed data to those sub-operators in sets of detectors. This is a work in progress, but the workflow code that creates the Pipeline will be able to specify sets of detectors to process at once. This set will be different on different processes. The special strings "ALL" and "SINGLE" are used to either work with all detectors in one shot (the current toast default) or to loop over detectors individually, running all operators on those before moving on to the next.
This covers planned features...
For each supported architecture, if all operators in a pipeline support that hardware then the pipeline can use observation methods to copy select data to the accelerator at the beginning and back from the accelerator at the end. Operators individually have methods that specify the observation keys they "require" and also the observation keys they "provide". This allows logic in the pipeline operator to determine which intermediate data products of the operators are only on the accelerator and do not need to be moved back to the host system. A Pipeline running on an accelerator will likely process only a few detectors at a time due to memory constraints.
Observations already make use of MPI shared memory that is replicated across nodes. Each node will have some number of accelerators. We can assign each process to a particular accelerator and compute the minimal set of shared memory objects that need to be staged to each accelerator.