13. High Performance Computing (part II: Message Passing Interface - MPI)#

  • What is MPI and how it works

  • MPI program structure

  • Environment management routines

  • How to compile and run MPI program

  • MPI point-to-point communications Blocking and non-blocking send/receive routines

  • MPI collective communication routines MPI_Reduce example

  • MPI lab exercises


13.1. Message Passing Interface (MPI)#

  • Processes run on network distributed hosts and communicate by exchanging messages.

  • A community of communicating processes is called a Communicator. MPI_COMM_WORLD is the default.

  • Inside the communicator, each process is assigned its rank.

  • Each process runs the same code (SPMD model)

_images/mpi_comm_world.gif

13.2. How an MPI Run Works#

  • Every process gets a copy of the same executable: Single Program, Multiple Data (SPMD).

  • They all start executing it.

  • Each process works completely independently of the other processes, except when communicating.

  • Each looks at its own rank to determine which part of the problem to work on.


13.3. MPI is SPMD#

To differentiate the roles of various processes running the same MPI code, you have to have if statements for the rank number. Often, the rank 0 process is referred as a master, and the other as a worker.

if (my_rank == 0) {
       //do the master tasks
}

# similarly,

if (my_rank != 0) {
       //do the worker tasks
} 

13.4. General MPI program structure#

MPI include header file #include "mpi.h"

Initialize MPI environment

Main coding and Message Passing calls

Terminate MPI environment


13.5. Major Environment Management Routines#

MPI_Init (&argc, &argv)

MPI_Comm_size (comm, &size)

MPI_Comm_rank (comm, &rank)

MPI_Abort (comm, errorcode)

MPI_Get_processor_name (&name, &resultlength)

MPI_Finalize ()


13.6. MPI code template without send/receive.#

Each process initializes the MPI environment, then prints out the number of processes, its rank, and the hostname

#include "mpi.h"
#include <stdio.h>

int main(int argc, char *argv[]) {
   int  numtasks, rank, len, rc; 
   char hostname[MPI_MAX_PROCESSOR_NAME];

          // initialize MPI  
   MPI_Init(&argc, &argv);
         
        // get MPI process rank, size, and processor name
   MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
   MPI_Comm_rank(MPI_COMM_WORLD, &rank);
   MPI_Get_processor_name(hostname, &len); 
   printf ("Number of tasks= %d My rank= %d Running on %s\n", numtasks, rank, hostname);
        
        // Here we can do some computing and message passing ….  
        
        // done with MPI  
   MPI_Finalize(); 
}

13.7. MPI compilation and run examples#

mpicc  -o hello.x  hello.c

To run the executable with 8 tasks on the same node:

mpirun  -n 8   hello.x

To run the executable across 8 CPUs via SLURM on a cluster:

srun  -n 8   hello.x

To run the executable across 2 nodes, 8 CPUs, and without a queue system:

mpirun  -n 8  -hostfile  nodes.txt  hello.x

The hostfile, nodes.txt, defines CPU allocation, for example:

nodes.txt

node01  slots=4
node02  slots=4

13.8. MPI point-to-point communications#

In two communicating MPI processes, one task is performing a send operation and the other task is performing a matching receive operation.

Blocking send/receive calls:

MPI_Send(&buffer, count, type, dest, tag, comm)
MPI_Recv(&buffer, count, type, source, tag, comm, status)

in MPI_Send

buffer – data elements, variables
count – the number of elements
type – MPI data type
dest - rank of the receiving process

in MPI_Recv

dest – the rank of receiving proc
source – rank of the sending proc
tag – int number, unique msg id
comm – communicator

13.9. MPI basic data types#

MPI C/C++ data types

C/C++ data types

MPI_INT

int

MPI_LONG

long int

MPI_LONG_LONG

long long int

MPI_CHAR

char

MPI_UNSIGNED_CHAR

unsigned char

MPI_UNSIGNED

unsigned int

MPI_UNSIGNED_LONG

unsigned long int

MPI_UNSIGNED_LONG_LONG

unsigned long long int

MPI_FLOAT

float

MPI_DOUBLE

double

MPI_LONG_DOUBLE

long double

There are other data types. MPI also allows user defined data types.


13.10. Blocking send/receive example#

Two processes with rank 0 and 1 initialize variables and the MPI environment, then the process of rank 0 sends a message to the rank 1 process

if (rank == 0) {
   //send a message to proc 1
   dest = 1;
   MPI_Send(&outmsg, 9, MPI_CHAR, dest, tag, MPI_COMM_WORLD);
 }

if (rank == 1) {
   //receive a message from proc 0
    source = 0;
  MPI_Recv(&inmsg, 9, MPI_CHAR, source, tag, MPI_COMM_WORLD, &Stat);
}

13.11. Blocking MPI send/receive code example.#

Each process initializes the MPI environment, then process with rank 0 sends “LCI_2019” to rank 1

#include "mpi.h"
#include <stdio.h>
int main(int argc, char *argv[]) {
int numRanks, rank, dest, source, rc, count, tag=222;
char inmsg[9], outmsg[9]="LCI_2019";
MPI_Status Stat;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD, &numRanks);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);

if (rank == 0) {
  dest = 1;
  MPI_Send(&outmsg, 9, MPI_CHAR, dest, tag, MPI_COMM_WORLD);
  }
else if (rank == 1) {
  source = 0;
  MPI_Recv(&inmsg, 9, MPI_CHAR, source, tag, MPI_COMM_WORLD, &Stat);
  printf("Rank %d Received %s from Rank %d \n", rank, inmsg, source);
}
MPI_Finalize();

13.12. Compiling and running the MPI blocking send/receive code#

mpicc  -o  b_send_receive.x  b_send_receive.c
mpirun -n 2  b_send_receive.x

output:

Rank 1 Received LCI_2019 from Rank 0 
Task 1: Received 9 char(s) from task 0 with tag 222 

13.13. MPI send/receive buffering#

The MPI implementation (not the MPI standard) decides what happens to data between send/receive. Typically, a system buffer area is reserved to hold data in transit.

_images/MPI_send_receive.png

13.14. The blocking send/receive work flow#

MPI_Send returns after it is safe to modify the send buffer.

MPI_Recv returns after the data has arrived in the receive buffer.

_images/MPI_blocking_send_receive.png

13.15. MPI non-blocking send/receive#

The process of rank 0 sends a message to the rank 1 process via MPI_Isend, then continues running.

The rank 1 process may retrieve the data from the buffer later via MPI_Irecv.

Neither the send buffer can be updated, nor the receive buffer can be read until the processes make a synchronization call, MPI_wait.

if (rank == 0) {
//send a message to proc 1
dest = 1;
MPI_Isend(&outmsg, 9, MPI_CHAR, dest, tag, MPI_COMM_WORLD, &request);
   }

if (rank == 1) {
//receive a message from proc 0
source = 0;
MPI_Irecv(&inmsg, 9, MPI_CHAR, source, tag, MPI_COMM_WORLD, &request);
  }
MPI_wait(&request, &Stat);

13.16. The non-blocking send/receive work flow#

The MPI_Isend returns right away, and the rank 0 process continues running. The rank 1 process may call MPI_Irecv later. The send buffer shouldn’t be updated and the receive buffer shouldn’t be read until MPI_Wait is called by the both tasks.

_images/MPI_nonblocking_send_receive.png

13.17. Non-Blocking MPI send/receive code example.#

Each process initializes the MPI environment, then process with rank 0 sends “LCI_2019” to rank 1

#include "mpi.h"
#include <stdio.h>

int main(int argc, char *argv[]) {
int numRanks, rank, dest, source, rc, count, tag=222;
char inmsg[9], outmsg[9]="LCI_2019";
MPI_Status Stat;   MPI_Request request = MPI_REQUEST_NULL;

MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD, &numRanks);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (rank == 0) {
  dest = 1;
  MPI_Isend(&outmsg, 9, MPI_CHAR, dest, tag, MPI_COMM_WORLD, &request);   }
else if (rank == 1) {
  source = 0;
  MPI_Irecv(&inmsg, 9, MPI_CHAR, source, tag, MPI_COMM_WORLD, &request);
          }
MPI_Wait(&request, &Stat);
if (rank == 1) { printf("Rank %d Received %s from Rank %d \n", rank, inmsg, source);  }
MPI_Finalize();     
}

13.18. Compiling and running the MPI non-blocking send/receive code#

mpicc  -o  nonb_send_receive.x   nonb_send_receive.c
mpirun -n 2  nonb_send_receive.x
    Rank 1 Received LCI_2019 from Rank 0

13.19. Send/Receive order and “fairness”#

  • Author: Blaise Barney, Livermore Computing.

  • Order:

    • MPI guarantees that messages will not overtake each other.

    • If a sender sends two messages (Message 1 and Message 2) in succession to the same destination, and both match the same receive, the receive operation will receive Message 1 before Message 2.

    • If a receiver posts two receives (Receive 1 and Receive 2), in succession, and both are looking for the same message, Receive 1 will receive the message before Receive 2.

    • Order rules do not apply if there are multiple threads of the same process participating in the communication operations.

  • Fairness:

    • MPI does not guarantee fairness - it’s up to the programmer to prevent “operation starvation”.

    • Example: task 0 sends a message to task 2. However, task 1 sends a competing message that matches task 2’s receive. Only one of the sends will complete.


13.20. MPI collective communications#

  • So far, we have reviewed the basic point-to-point MPI calls, involving pairs of processes.

  • For efficient programming, there are collective communication functions that involve all processes in the communicator, MPI_COMM_WORLD.

  • Some frequently used Collective communications routines:

    • MPI_Barrier – blocks execution until all the tasks reach it

    • MPI_Bcast – sends a message to all MPI tasks

    • MPI_Scatter – distributes data from one task to all

    • MPI_Gather – collects data from all tasks into one

    • MPI_Allgather – concatenation of data to all tasks

    • MPI_Reduce – reduction operation computed by all tasks


13.21. MPI Collective communications data flow.#

_images/MPI_collective.png

13.22. MPI_Reduce#

Applies a reduction operation, op, on all processes (tasks) in the communicator, comm, and places the result in one task, root.

MPI_Reduce (&sendbuf,&recvbuf,count,datatype,op,root,comm) 

13.23. Predefined MPI Reduction operations#

MPI Reduction

operation

MPI C data types

MPI_MAX

maximum

integer, float

MPI_MIN

minimum

integer, float

MPI_SUM

sum

integer, float

MPI_PROD

product

integer, float

MPI_LAND

logical AND

integer

MPI_BAND

bit-wise AND

integer, MPI_BYTE

MPI_LOR

logical OR

integer

MPI_BOR

bit-wise OR

integer, MPI_BYTE

MPI_LXOR

logical XOR

integer

MPI_BXOR

bit-wise XOR

integer, MPI_BYTE

MPI_MAXLOC

max value and location

float, double and long double

MPI_MINLOC

min value and location

float, double and long double

Users can also define their own reduction functions by using the MPI_Op_create routine.


13.24. Example: MPI_Reduce for a scalar product#

Scalar product of two vectors, \(\vec{A}\) and \(\vec{B}\):

sum = \((\vec{A} \cdot \vec{B})\) = \(A_0 \cdot B_0 + A_1 \cdot B_1 + … + A_{n-1} \cdot B_{n-1} \)

Have each task to compute partial sums on a chunk of data, then do MPI_Reduce on the partial sums and write the result to the task of rank 0:

int chunk = Array_Size/numprocs;    //how many elements each process gets in the chunk

for (i=rank*chunk; i < (rank+1)*chunk; ++i)
        part_sum += A[i] * B[i];              //Each process computes a partial sum on its chunk

MPI_Reduce(&part_sum, &sum, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD);  

if (rank == 0) {  
                     //print out the total sum  
                }

Each task has its full copy of arrays A[ ] and B[ ]. The Array_Size here is a multiple of variable chunk.


13.25. Compiling and running the MPI_Reduce code#

mpicc   -o  mpi_reduce.x   mpi_reduce.c
mpirun  -n 4   mpi_reduce.x

13.26. Deploy a virtual cluster (Exericse)#

Download VMs appliences, master and n01 into KVM directory:

cd KVM
wget http://capone.rutgers.edu/coursefiles/master.tgz
tar -zxvf master.tgz
rm master.tgz
wget http://capone.rutgers.edu/coursefiles/n01.tgz
tar -zxvf n01.tgz
rm n01.tgz

Deploy the VMs:

mv master.xml /etc/libvirt/qemu
mv n01.xml /etc/libvirt/qemu
virsh define /etc/libvirt/qemu/master.xml
virsh define /etc/libvirt/qemu/n01.xml

13.27. User accounts for MPI runs (Exercise)#

Start master and n01 VMs:

virsh start master
virsh start n01

Login to master and check if user account edward exists:

virsh console master
sudo -s
id edward

Login as user edward to VM master and generate SSH RSA public/private keys for passwordless authentication. The passphrase should be empty:

su - edward
whoami
ssh-keygen -t rsa
cd .ssh
cp id_rsa.pub  authorized_keys

As user edward, ssh to n01 from master The user should be able to access n01 without password.


13.28. Open MPI installation (Exercise)#

Login to master VM:

virsh console master

On master, do MPI installation with apt-get

apt-get update
apt-get install openmpi-common openmpi-bin mpi-default-dev g++

Perform the same package installation on n01 VM.


13.29. MPI code compilation (Exercise)#

Login to master as user edward.

Un-tar the source codes from the examples discussed, MPI.tgz, into a separate directory, MPI, as shown below:

wget http://linuxcourse.rutgers.edu/2024/html/lessons/HPC_2/MPI.tgz
mkdir MPI
cp MPI.tgz MPI; cd MPI
tar -zxvf MPI.tgz 

The MPI codes can be compiled with command mpicc

mpicc -o hello.x hello.c
mpicc -o b_send_receive.x b_send_receive.c 
mpicc -o nonb_send_receive.x nonb_send_receive.c 
mpicc -o mpi_reduce.x mpi_reduce.c 

Run hello.x code on 4 CPU slots:

mpirun -n 4 hello.x

Run the blocking send/receive code on 2 CPU slots:

mpirun -n 2 b_send_receive.x

Run the non-blocking send/receive code on 2 CPU slots:

mpirun -n 2 nonb_send_receive.x

Run the MPI Reduction code on 4 CPU slots:

mpirun -n 4 mpi_reduce.x 

Run the MPI Reduction code on 8 CPU slots:

mpirun -n 8 mpi_reduce.x

13.30. MPI run on a cluster (Exercise)#

To run MPI computations on a distributed system (a cluster), a shared file system, and a passwordless authentication between compute nodes are required. Our master and n01 VMS with NFS shared file system and a passwordless authentication can work as a computational cluster for MPI.

Create file nodes.txt with the following content by using nano editor:

nodes.txt

master slots=1
n01    slots=1

To test if MPI works across the two VMs, master and n01, run command uname -n via mpirun as follows:

mpirun -n 2 -hostfile nodes.txt uname -n

Similarly, run b_send_receive.x and nonb_send_receive.x:

mpirun -n 2 -hostfile nodes.txt b_send_receive.x
mpirun -n 2 -hostfile nodes.txt nonb_send_receive.x

Modify file nodes.txt by changing the number of CPU slots on the VMs:

nodes.txt

master slots=4
n01    slots=4

Run mpi_reduce.x on 8 CPUs across the two nodes:

mpirun -n 8 -hostfile nodes.txt mpi_reduce.x 

13.31. Concluding remarks#

  • Today, we have reviewed the basics of MPI programming, including an MPI code structure, point-to-point and collective communication routines, compilation and executable run with mpirun.

  • More advanced MPI routines and techniques, including user defined data types, multiple communicators, task topology can be found in the references.

  • Shortly, we’ll proceed with MPI lab exercises.


13.32. References#

LLNL MPI tutorial

Open MPI