Skip to content

How to use parallelism in Python

Since you are running at NERSC you may be interested in parallelizing your Python code and/or its I/O. This is a detailed topic but we will provide a short overview of several options.

If you intend to to run your code at scale, please see our discussion here that provides a brief overview of best filesystem practices for scaling up.

Multiprocessing

Python's standard library provides a multiprocessing package that supports spawning of processes. Multiprocessing be used to achieve some level of parallelism within a single compute node. It cannot be used to achieve parallelism across compute nodes. For more information, please see the official Python multiprocessing docs.

If your multiprocessing code makes calls to a threaded library like NumPy with threaded MKL support then you need to consider oversubscription of threads. While process affinity can be controlled to some degrees in certain contexts (e.g. Python distributions that implement os.sched_{get,set}affinity) it is generally easier to reduce the number of threads used by each process. Actually it is most advisable to set it to a single thread. In particular for OpenMP:

export OMP_NUM_THREADS=1

Furthermore, when using Python multiprocessing on KNL you are advised to set:

export KMP_AFFINITY=disabled

mpi4py

mpi4py provides MPI standard bindings to the Python programming language. Documentation on mpi4py is available here.

Here is an example of how to use mpi4py on Cori:

#!/usr/bin/env python
from mpi4py import MPI
mpi_rank = MPI.COMM_WORLD.Get_rank()
mpi_size = MPI.COMM_WORLD.Get_size()
print(mpi_rank, mpi_size)

This program will initialize MPI, find each MPI task's rank in the global communicator, find the total number of ranks in the global communicator, print out these two results, and exit. Finalizing MPI with mpi4py is not necessary; it happens automatically when the program exits.

Suppose we put this program into a file called "mympi.py." To run it on the Haswell nodes on Cori, we could create the following batch script in the same directory as our Python script, that we call "myjob.sh:"

#!/bin/bash
#SBATCH --constraint=haswell
#SBATCH --nodes=3
#SBATCH --time=5

module load python
srun -n 96 -c 2 python mympi.py

To run "mympi.py" in batch on Cori, we submit the batch script from the command line using sbatch, and wait for it to run:

$ sbatch myjob.sh
Submitted batch job 987654321

After the job finishes, the output will be found in the file "slurm-987654321.out:"

$ cat slurm-987654321.out
...
91 96
44 96
31 96
...
0 96
...

mpi4py in your custom conda environment

Do NOT conda/pip install mpi4py at NERSC

You can install mpi4py using these tools without any warnings, but your mpi4py programs just won't work at NERSC. To use Cori's MPICH MPI, you'll need to build it yourself using the Cray compiler wrappers that link in Cray MPICH libraries.

We offer two separate options to use mpi4py in a conda environment at NERSC:

  • build mpi4py in your custom conda environment:

    module load python
    conda create -n my_mpi4py_env python=3.8
    source activate my_mpi4py_env
    module swap PrgEnv-${PE_ENV,,} PrgEnv-gnu
    MPICC="cc -shared" pip install --no-binary=mpi4py mpi4py
    
  • use the lazy-mpi4py conda environment that we provide, built against PrgEnv-gnu, which you can clone if you want to add more packages:

    module load python
    conda create --name my_mpi4py_env --clone lazy-mpi4py
    

Using mpi4py in a Shifter container

When a large number of Python tasks are simultaneously launched with mpi4py, the result is many tasks trying to open the same files at the same time, causing filesystem contention and performance degradation. mpi4py applications running at the scale of a few hundred or a thousand tasks may take an unacceptable amount of time simply starting up.

Using mpi4py in a Shifter container is our recommended solution to this problem. We provide Python in Shifter examples and a specific mpi4py in Shifter example.

Dask

Dask is a task-based system in which a scheduler assigns work to workers. It is robust to failure and provides a nice bokeh-based application dashboard. It can be used to scale to multinode CPU and GPU systems. You can find more information about using Dask at NERSC here.

Parallel I/O with h5py

You can use h5py for either serial or parallel I/O.

For more general information about HDF5 at NERSC please see this page.

If you would like to use h5py for parallel I/O, you have two choices:

  • build h5py against mpi4py in your custom conda environment;
  • use the lazy-h5py conda environment that we provide, or clone it if you want to add other packages.

Both options are based on packages built against the GNU compiler and Cray MPICH, so if you want a build with a different compiler or MPI library, you'll have to recompile both mpi4py and h5py from scratch.

Pre-built h5py conda environment

To use the conda environment that we provide:

module load python
source activate lazy-h5py

And you are good to go; you can test it with a smoketest, see below.

You cannot install other packages in that conda environment, though, since it's read-only, but you can clone it and work on it:

module load python
conda create --name h5pyenv --clone lazy-h5py
source activate h5pyenv

conda install ...

Building h5py from source

If you instead prefer to build a h5py that is capable of running in parallel at NERSC we can follow the directions from the official documentation.

You will first need a conda environment with mpi4py built from source for the NERSC environment, as shown in our directions above, or you can clone our lazy-mpi4py conda environment where we have already built mpi4py against PrgEnv-gnu:

module load python
conda create -n h5pyenv --clone lazy-mpi4py

Activate the environment:

source activate h5pyenv

Load and configure your modules:

module load cray-hdf5-parallel
module swap PrgEnv-${PE_ENV,,} PrgEnv-gnu

Install a MKL-optimized numpy from the defaults conda channel; we override the channel selection to avoid installing a less-optimized numpy e.g. from conda-forge (see this note):

conda install -c defaults --override-channels numpy

You can also find some tips for performance optimization here if you are running MKL-optimized code on AMD CPUs, such as Perlmutter.

And finally use pip to build the parallel h5py from source:

HDF5_MPI=ON CC=cc pip install --no-deps --no-binary=h5py h5py

Smoketest for h5py

To test a parallel build of h5py we need to use the compute nodes, since mpi4py doesn't work on login nodes. Let's get 2 interactive compute nodes, with 2 processors each and activate the h5py environment we just cloned or built:

salloc -N 2 --ntasks-per-node 2 -t 10 -C knl -q interactive
module load python
source activate h5pyenv

We'll use this test program described in the h5py docs:

from mpi4py import MPI
import h5py

rank = MPI.COMM_WORLD.rank  # The process ID (integer 0-3 for a 4-process job)

with h5py.File('test.h5', 'w', driver='mpio', comm=MPI.COMM_WORLD) as f:
  dset = f.create_dataset('test', (4,), dtype='i')
  dset[rank] = rank

We can run this test with the 4 mpi ranks requested:

srun python test_h5py.py

If we now look at the file we wrote with h5dump test.h5 it should look like this:

HDF5 "test.h5" {
GROUP "/" {
   DATASET "test" {
      DATATYPE  H5T_STD_I32LE
      DATASPACE  SIMPLE { ( 4 ) / ( 4 ) }
      DATA {
      (0): 0, 1, 2, 3
      }
   }
}
}

Great! Our 4 mpi ranks each wrote part of the HDF5 file.