This example demonstrates receive-side partial completion notification using more than one partition per receive-side thread. It uses a naive flag based method to test for multiple completed partitions per thread. Note that this means that some threads may be busy polling for completion of assigned partitions when partitions are available to work on that were not assigned to the polling threads in this example. More advanced work stealing methods could be employed for greater efficiency. Like previous examples, it also demonstrates send-side production of input to part of an overall buffer. This example also uses different send-side and receive-side partitioning.
Example
Partitioned communication receive-side partial completion.
#include <stdlib.h> #include "mpi.h" #define NUM_THREADS 64 #define PARTITIONS NUM_THREADS #define PARTLENGTH 16 #define MESSAGE_LENGTH PARTITIONS*PARTLENGTH int main(int argc, char *argv[]) /* send-side partitioning */ { double message[MESSAGE_LENGTH]; int send_partitions = PARTITIONS, send_partlength = PARTLENGTH, recv_partitions = PARTITIONS*2, recv_partlength = PARTLENGTH/2; int source = 0, dest = 1, tag = 1, flag = 0; int myrank; int provided; MPI_Request request; MPI_Info info = MPI_INFO_NULL; MPI_Datatype send_type; MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); if (provided < MPI_THREAD_MULTIPLE) MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); MPI_Comm_rank(MPI_COMM_WORLD, &myrank); MPI_Type_contiguous(send_partlength, MPI_DOUBLE, &send_type); MPI_Type_commit(&send_type); if (myrank == 0) { MPI_Psend_init(message, send_partitions, 1, send_type, dest, tag, MPI_COMM_WORLD, info, &request); MPI_Start(&request); #pragma omp parallel for shared(request) \ firstprivate(send_partitions) \ num_threads(NUM_THREADS) for (int i=0; i<send_partitions; i++) { /* compute and fill partition #i, then mark ready: */ MPI_Pready(i, request); } while(!flag) { /* Do useful work */ MPI_Test(&request, &flag, MPI_STATUS_IGNORE); /* Do useful work */ } MPI_Request_free(&request); } else if (myrank == 1) { MPI_Precv_init(message, recv_partitions, recv_partlength, MPI_DOUBLE, source, tag, MPI_COMM_WORLD, info, &request); MPI_Start(&request); #pragma omp parallel for shared(request) \ firstprivate(recv_partitions) \ num_threads(NUM_THREADS) for (int j=0; j<recv_partitions; j+=2) { int part_flag = 0; int part1_complete = 0; int part2_complete = 0; while(part1_complete == 0 || part2_complete == 0) { /* test partition #j and #j+1 */ MPI_Parrived(request, j, &part_flag); if(part_flag && part1_complete == 0) { part1_complete++; /* Do work using partition j data */ } MPI_Parrived(request, j+1, &part_flag); if(part_flag && part2_complete == 0) { part2_complete++; /* Do work using partition j+1 */ } } } while(!flag) { /* Do useful work */ MPI_Test(&request, &flag, MPI_STATUS_IGNORE); /* Do useful work */ } MPI_Request_free(&request); } MPI_Finalize(); return 0; }