How to use parallelism in Python¶
Since you are running at NERSC you may be interested in parallelizing your Python code and/or its I/O. This is a detailed topic but we will provide a short overview of several options.
If you intend to to run your code at scale, please see our discussion here that provides a brief overview of best filesystem practices for scaling up.
Multiprocessing¶
Python's standard library provides a multiprocessing package that supports spawning of processes. Multiprocessing be used to achieve some level of parallelism within a single compute node. It cannot be used to achieve parallelism across compute nodes. For more information, please see the official Python multiprocessing docs.
If your multiprocessing code makes calls to a threaded library like NumPy with threaded MKL support then you need to consider oversubscription of threads. While process affinity can be controlled to some degrees in certain contexts (e.g. Python distributions that implement os.sched_{get,set}affinity)
it is generally easier to reduce the number of threads used by each process. Actually it is most advisable to set it to a single thread. In particular for OpenMP:
export OMP_NUM_THREADS=1
Furthermore, when using Python multiprocessing on KNL you are advised to set:
export KMP_AFFINITY=disabled
mpi4py¶
mpi4py provides MPI standard bindings to the Python programming language. Documentation on mpi4py is available here.
Here is an example of how to use mpi4py on Cori:
#!/usr/bin/env python
from mpi4py import MPI
mpi_rank = MPI.COMM_WORLD.Get_rank()
mpi_size = MPI.COMM_WORLD.Get_size()
print(mpi_rank, mpi_size)
This program will initialize MPI, find each MPI task's rank in the global communicator, find the total number of ranks in the global communicator, print out these two results, and exit. Finalizing MPI with mpi4py is not necessary; it happens automatically when the program exits.
Suppose we put this program into a file called "mympi.py." To run it on the Haswell nodes on Cori, we could create the following batch script in the same directory as our Python script, that we call "myjob.sh:"
#!/bin/bash
#SBATCH --constraint=haswell
#SBATCH --nodes=3
#SBATCH --time=5
module load python
srun -n 96 -c 2 python mympi.py
To run "mympi.py" in batch on Cori, we submit the batch script from the command line using sbatch, and wait for it to run:
$ sbatch myjob.sh
Submitted batch job 987654321
After the job finishes, the output will be found in the file "slurm-987654321.out:"
$ cat slurm-987654321.out
...
91 96
44 96
31 96
...
0 96
...
mpi4py
in your custom conda environment¶
Do NOT conda/pip install mpi4py
at NERSC
You can install mpi4py using these tools without any warnings, but your mpi4py programs just won't work at NERSC. To use Cori's MPICH MPI, you'll need to build it yourself using the Cray compiler wrappers that link in Cray MPICH libraries.
We offer two separate options to use mpi4py in a conda environment at NERSC:
-
build mpi4py in your custom conda environment:
module load python conda create -n my_mpi4py_env python=3.8 source activate my_mpi4py_env module swap PrgEnv-${PE_ENV,,} PrgEnv-gnu MPICC="cc -shared" pip install --no-binary=mpi4py mpi4py
-
use the
lazy-mpi4py
conda environment that we provide, built againstPrgEnv-gnu
, which you can clone if you want to add more packages:module load python conda create --name my_mpi4py_env --clone lazy-mpi4py
Using mpi4py
in a Shifter container¶
When a large number of Python tasks are simultaneously launched with mpi4py, the result is many tasks trying to open the same files at the same time, causing filesystem contention and performance degradation. mpi4py applications running at the scale of a few hundred or a thousand tasks may take an unacceptable amount of time simply starting up.
Using mpi4py in a Shifter container is our recommended solution to this problem. We provide Python in Shifter examples and a specific mpi4py in Shifter example.
Dask¶
Dask is a task-based system in which a scheduler assigns work to workers. It is robust to failure and provides a nice bokeh-based application dashboard. It can be used to scale to multinode CPU and GPU systems. You can find more information about using Dask at NERSC here.
Parallel I/O with h5py¶
You can use h5py for either serial or parallel I/O.
For more general information about HDF5 at NERSC please see this page.
If you would like to use h5py for parallel I/O, you have two choices:
- build h5py against mpi4py in your custom conda environment;
- use the
lazy-h5py
conda environment that we provide, or clone it if you want to add other packages.
Both options are based on packages built against the GNU compiler and Cray MPICH, so if you want a build with a different compiler or MPI library, you'll have to recompile both mpi4py and h5py from scratch.
Pre-built h5py conda environment¶
To use the conda environment that we provide:
module load python
source activate lazy-h5py
And you are good to go; you can test it with a smoketest, see below.
You cannot install other packages in that conda environment, though, since it's read-only, but you can clone it and work on it:
module load python
conda create --name h5pyenv --clone lazy-h5py
source activate h5pyenv
conda install ...
Building h5py from source¶
If you instead prefer to build a h5py that is capable of running in parallel at NERSC we can follow the directions from the official documentation.
You will first need a conda environment with mpi4py built from source for the NERSC environment, as shown in our directions above, or you can clone our lazy-mpi4py
conda environment where we have already built mpi4py against PrgEnv-gnu
:
module load python
conda create -n h5pyenv --clone lazy-mpi4py
Activate the environment:
source activate h5pyenv
Load and configure your modules:
module load cray-hdf5-parallel
module swap PrgEnv-${PE_ENV,,} PrgEnv-gnu
Install a MKL-optimized numpy from the defaults
conda channel; we override the channel selection to avoid installing a less-optimized numpy e.g. from conda-forge
(see this note):
conda install -c defaults --override-channels numpy
You can also find some tips for performance optimization here if you are running MKL-optimized code on AMD CPUs, such as Perlmutter.
And finally use pip
to build the parallel h5py from source:
HDF5_MPI=ON CC=cc pip install --no-deps --no-binary=h5py h5py
Smoketest for h5py¶
To test a parallel build of h5py we need to use the compute nodes, since mpi4py doesn't work on login nodes. Let's get 2 interactive compute nodes, with 2 processors each and activate the h5py environment we just cloned or built:
salloc -N 2 --ntasks-per-node 2 -t 10 -C knl -q interactive
module load python
source activate h5pyenv
We'll use this test program described in the h5py docs:
from mpi4py import MPI
import h5py
rank = MPI.COMM_WORLD.rank # The process ID (integer 0-3 for a 4-process job)
with h5py.File('test.h5', 'w', driver='mpio', comm=MPI.COMM_WORLD) as f:
dset = f.create_dataset('test', (4,), dtype='i')
dset[rank] = rank
We can run this test with the 4 mpi ranks requested:
srun python test_h5py.py
If we now look at the file we wrote with h5dump test.h5
it should look like this:
HDF5 "test.h5" {
GROUP "/" {
DATASET "test" {
DATATYPE H5T_STD_I32LE
DATASPACE SIMPLE { ( 4 ) / ( 4 ) }
DATA {
(0): 0, 1, 2, 3
}
}
}
}
Great! Our 4 mpi ranks each wrote part of the HDF5 file.