From 5835143e86e0294c002a40b1a5b0f2be705955da Mon Sep 17 00:00:00 2001 From: Luca Tornatore Date: Wed, 30 Mar 2022 16:32:50 +0200 Subject: [PATCH] update on reduce: (i) added the mapping from global_rank to ranks in myHOST (ii) added a ctrl window to synchronize reduce operations --- allvars.c | 2 +- allvars.h | 2 +- fourier_transform.c | 12 ++--- gridding.c | 122 ++++++++++++++++++++++++++++++++------------ init.c | 24 ++++----- main.c | 26 +++++----- numa.c | 76 +++++++++++++++++++-------- numa.h | 22 +++++--- proto.h | 2 +- result.c | 4 +- 10 files changed, 195 insertions(+), 97 deletions(-) diff --git a/allvars.c b/allvars.c index 8e67a3c..58a1c6b 100644 --- a/allvars.c +++ b/allvars.c @@ -14,7 +14,7 @@ struct fileData data; char filename[1000], buf[30], num_buf[30]; char datapath[900]; int xaxis, yaxis; -int rank; +int global_rank; int size; long nsectors; long startrow; diff --git a/allvars.h b/allvars.h index 2f66fc1..bdc1dbe 100644 --- a/allvars.h +++ b/allvars.h @@ -118,7 +118,7 @@ extern struct fileData extern char filename[1000], buf[30], num_buf[30]; extern char datapath[900]; extern int xaxis, yaxis; -extern int rank; +extern int global_rank; extern int size; extern long nsectors; extern long startrow; diff --git a/fourier_transform.c b/fourier_transform.c index a0af7fd..81d7fd8 100644 --- a/fourier_transform.c +++ b/fourier_transform.c @@ -6,7 +6,7 @@ void fftw_data(){ #ifdef USE_FFTW // FFT transform the data (using distributed FFTW) - if(rank == 0)printf("PERFORMING FFT\n"); + if(global_rank == 0)printf("PERFORMING FFT\n"); clock_gettime(CLOCK_MONOTONIC, &begin); start = clock(); fftw_plan plan; @@ -84,7 +84,7 @@ void write_fftw_data(){ MPI_Win_create(gridss, size_of_grid*sizeof(double), sizeof(double), MPI_INFO_NULL, MPI_COMM_WORLD, &writewin); MPI_Win_fence(0,writewin); #endif - if (rank == 0) + if (global_rank == 0) { printf("WRITING FFT TRANSFORMED DATA\n"); file.pFilereal = fopen (out.fftfile2,"wb"); @@ -157,7 +157,7 @@ void write_fftw_data(){ clock_gettime(CLOCK_MONOTONIC, &begin); start = clock(); - if(rank == 0)printf("PHASE CORRECTION\n"); + if(global_rank == 0)printf("PHASE CORRECTION\n"); double* image_real = (double*) calloc(xaxis*yaxis,sizeof(double)); double* image_imag = (double*) calloc(xaxis*yaxis,sizeof(double)); @@ -171,7 +171,7 @@ void write_fftw_data(){ #ifdef WRITE_IMAGE - if(rank == 0) + if(global_rank == 0) { file.pFilereal = fopen (out.fftfile2,"wb"); file.pFileimg = fopen (out.fftfile3,"wb"); @@ -181,13 +181,13 @@ void write_fftw_data(){ #ifdef USE_MPI MPI_Barrier(MPI_COMM_WORLD); #endif - if(rank == 0)printf("WRITING IMAGE\n"); + if(global_rank == 0)printf("WRITING IMAGE\n"); for (int isector=0; isector 0 && downdist >= 0.0) { sectorarray[binphi-1][counter[binphi-1]] = iphi; counter[binphi-1]++;}; } - - #ifdef PIPPO - long iiii = 0; - for (int j=0; j=0; iphi--) - { - printf("%d %d %ld %ld %ld\n",rank,j,iiii,histo_send[j],sectorarray[j][iphi]); - iiii++; - } - } - #endif - + #ifdef VERBOSE - for (int iii=0; iii myHOST) && (Me.Rank[HOSTS] != -1)) - - MPI_Reduce(grid, grid, size_of_grid, MPI_DOUBLE,MPI_SUM,target_rank,*Me.COMM[HOSTS]); - - // that can be avoided if shared window coincides with gridss - memcpy(grid, Me.swins[Me.Rank[myHOST]].ptr+isector*sizeof(gridss), sizeof(grid)); + MPI_Barrier(MPI_COMM_WORLD); + + int Im_in_the_new_communicator = MPI_UNDEFINED; + if(global_rank == target_rank) + Im_in_the_new_communicator = 1; + else + if( Me.Rank[HOSTS] == 0 ) + { + if( Me.Ranks_to_host[ target_rank ] != Me.myhost ) + Im_in_the_new_communicator = 1; + } + MPI_Comm Sector_Comm; + MPI_Comm_split( COMM[WORLD], Im_in_the_new_communicator, global_rank, &Sector_Comm); + + if( Sector_Comm != MPI_COMM_NULL ) + { + int sector_size; + int sector_rank = 0; + int sector_target; + + MPI_Comm_size( Sector_Comm, §or_size); + MPI_Comm_rank( Sector_Comm, §or_rank); + if ( global_rank == target_rank) + { + MPI_Send( §or_rank, 1, MPI_INT, 0, 0, Sector_Comm); + memcpy(grid, Me.swins[Me.Rank[myHOST]].ptr+isector*sizeof(gridss), sizeof(grid)); + } + + if( sector_rank == 0 ) + { + MPI_Status status; + MPI_Recv( §or_target, 1, MPI_INT, MPI_ANY_SOURCE, 0, Sector_Comm, &status); + } + + MPI_Bcast( §or_target, 1, MPI_INT, 0, Sector_Comm ); + + MPI_Reduce(grid, grid, size_of_grid, MPI_DOUBLE,MPI_SUM,sector_target, Sector_Comm); + + MPI_Comm_free( &Sector_Comm ); + } + //MPI_Put(gridss,size_of_grid,MPI_DOUBLE,target_rank,0,size_of_grid,MPI_DOUBLE,slabwin); #else @@ -321,7 +344,7 @@ void write_grided_data() #ifdef WRITE_DATA // Write results - if (rank == 0) + if (global_rank == 0) { printf("WRITING GRIDDED DATA\n"); file.pFilereal = fopen (out.outfile2,"wb"); @@ -399,24 +422,55 @@ void write_grided_data() } - void reduce( int sector ) - { +#define NSLEEP( T ) {struct timespec tsleep={0, (T)}; nanosleep(&tsleep, NULL); } +void reduce( int sector, int target_rank ) + { + + int local_rank = Me.Rank[myHOST]; + + if( Me.Ranks_to_host[ target_rank ] == Me.myhost ) + { + + int r = 0; + while( Me.Ranks_to_myhost[r] != target_rank ) + r++; + + if( r > 0 ) + { + if( local_rank == 0 ) + local_rank = r; + if( local_rank == r ) + local_rank = 0; + } + } + + int max_level = 0; while( (1<< (++max_level) ) < Me.Ntasks[myHOST] ); + *(int*)(Me.win_ctrl.ptr) = -1; + for(int l = 0; l < max_level; l++) { int threshold = 1 << (1+l); - - if( Me.Rank[myHOST] % threshold == 0) - { - int target = Me.Rank[myHOST] + (1<SHMEMl; MPI_Info winfo; MPI_Info_create(&winfo); MPI_Info_set(winfo, "alloc_shared_noncontig", "true"); + + // ----------------------------------- + // initialize the flow control windows + // ----------------------------------- + Me->win_ctrl.size = sizeof(int); + MPI_Win_allocate_shared(Me->win_ctrl.size, 1, winfo, *Me->COMM[SHMEMl], + &(Me->win_ctrl.ptr), &(Me->win_ctrl.win)); + + MPI_Aint wsize = sizeof(int); + MPI_Win_allocate_shared(wsize, 1, winfo, *Me->COMM[SHMEMl], + &win_ctrl_hostmaster_ptr, &win_ctrl_hostmaster); + Me->scwins = (win_t*)malloc(Me->Ntasks[SHMEMl]*sizeof(win_t) ); + // get the addresses of all the windows from my siblings + // at my shared-memory level + // + for( int t = 0; t < Me->Ntasks[SHMEMl]; t++ ) + if( t != Me->Rank[SHMEMl] ) + MPI_Win_shared_query( Me->win_ctrl.win, t, &(Me->scwins[t].size), + &(Me->scwins[t].disp), &(Me->scwins[t].ptr) ); + + if( Me->Rank[SHMEMl] != 0 ) + MPI_Win_shared_query( win_ctrl_hostmaster, 0, &(win_ctrl_hostmaster_size), + &win_ctrl_hostmaster_disp, &win_ctrl_hostmaster_ptr ); + + // ----------------------------------- + // initialize the data windows + // ----------------------------------- + + win_hostmaster_size = WIN_HOST_MASTER_SIZE_DFLT*1024*1024; + MPI_Aint win_host_size = WIN_HOST_SIZE_DFLT*1024*1024; + + Me->win.size = win_host_size; MPI_Win_allocate_shared(Me->win.size, 1, winfo, *Me->COMM[SHMEMl], &(Me->win.ptr), &(Me->win.win)); - MPI_Aint size = ( Me->Rank[SHMEMl] == 0 ? win_host_master_size : 0); - MPI_Win_allocate_shared(size, 1, winfo, *Me->COMM[SHMEMl], &win_host_master_ptr, &win_host_master); + wsize = ( Me->Rank[SHMEMl] == 0 ? win_hostmaster_size : 0); + MPI_Win_allocate_shared(wsize, 1, winfo, *Me->COMM[SHMEMl], &win_hostmaster_ptr, &win_hostmaster); Me->swins = (win_t*)malloc(Me->Ntasks[SHMEMl]*sizeof(win_t) ); // Me->swins = (win_t*)malloc(Me->Ntasks[SHMEMl]*sizeof(win_t)); @@ -61,7 +94,7 @@ int init_numa( int Rank, int Size, MPI_Comm *MYWORLD, map_t *Me ) MPI_Win_shared_query( Me->win.win, t, &(Me->swins[t].size), &(Me->swins[t].disp), &(Me->swins[t].ptr) ); if( Me->Rank[SHMEMl] != 0 ) - MPI_Win_shared_query( win_host_master, 0, &(win_host_master_size), &win_host_master_disp, &win_host_master_ptr ); + MPI_Win_shared_query( win_hostmaster, 0, &(win_hostmaster_size), &win_hostmaster_disp, &win_hostmaster_ptr ); } @@ -196,7 +229,6 @@ int map_hostnames( MPI_Comm *MY_WORLD, // the communicator to refer to // -------------------------------------------------- // --- init some global vars me -> Ranks_to_host = (int*)malloc(Ntasks*sizeof(int)); - //me -> Ranks_to_host = (int*)malloc(Ntasks*sizeof(int)); me -> Nhosts = 0; me -> myhost = -1; @@ -263,7 +295,11 @@ int map_hostnames( MPI_Comm *MY_WORLD, // the communicator to refer to MPI_Allgather( &me->myhost, sizeof(me->myhost), MPI_BYTE, me->Ranks_to_host, sizeof(me->myhost), MPI_BYTE, *MY_WORLD ); - + + me -> Ranks_to_myhost = (int*)malloc(me->Ntasks[myHOST]*sizeof(int)); + MPI_Allgather( &global_rank, sizeof(global_rank), MPI_BYTE, + me->Ranks_to_myhost, sizeof(global_rank), MPI_BYTE, *me->COMM[myHOST]); + free( alldata ); return me->Nhosts; diff --git a/numa.h b/numa.h index 985d872..f9b9d02 100644 --- a/numa.h +++ b/numa.h @@ -23,7 +23,8 @@ typedef struct int myhost; // the host on which i'm running int Nhosts; int Ntasks[HLEVELS]; - int *Ranks_to_host; // check if it is needed + int *Ranks_to_host; // keeps track to what host the original global_ranks belong to + int *Ranks_to_myhost; // keeps track of the local_rank of the original global_rank int Rank[HLEVELS]; int MAXl; // the maximum level of the hierarchy int SHMEMl; // the maximum hierarchy level that is in shared memory @@ -32,8 +33,10 @@ typedef struct // not yet used // int mynode; // the numa node on which i'm running // int ntasks_in_my_node; - win_t win; // my shared-memory window - win_t *swins; // the shared-memory windows of ther tasks in my host + win_t win_ctrl; // my shared memory window used for flow control + win_t win; // my shared-memory window used for data + win_t *scwins; // the control shared-memory windows of the tasks in my host + win_t *swins; // the data shared-memory windows of the tasks in my host } map_t; @@ -45,8 +48,13 @@ extern MPI_Comm COMM[HLEVELS]; #define WIN_HOST_SIZE_DFLT 100 // in MB #define WIN_HOST_MASTER_SIZE_DFLT 100 // in MB -extern MPI_Aint win_host_master_size; -extern MPI_Win win_host_master; -extern int win_host_master_disp; -extern void *win_host_master_ptr; +extern MPI_Aint win_ctrl_hostmaster_size; +extern MPI_Win win_ctrl_hostmaster; +extern int win_ctrl_hostmaster_disp; +extern void *win_ctrl_hostmaster_ptr; + +extern MPI_Aint win_hostmaster_size; +extern MPI_Win win_hostmaster; +extern int win_hostmaster_disp; +extern void *win_hostmaster_ptr; diff --git a/proto.h b/proto.h index 160fee1..f00af2a 100644 --- a/proto.h +++ b/proto.h @@ -23,7 +23,7 @@ void gridding(); void initialize_array(); void gridding_data(); void write_grided_data(); -void reduce( int ); +void reduce( int, int ); /* fourier_transform.c */ diff --git a/result.c b/result.c index 7a7607d..15d3100 100644 --- a/result.c +++ b/result.c @@ -10,7 +10,7 @@ void write_result() { timing.tot_time1 = (finish.tv_sec - begin0.tv_sec); timing.tot_time1 += (finish.tv_nsec - begin0.tv_nsec) / 1000000000.0; - if (rank == 0) + if (global_rank == 0) { printf("Setup time: %f sec\n",timing.setup_time); printf("Process time: %f sec\n",timing.process_time); @@ -33,7 +33,7 @@ void write_result() { } } - if (rank == 0) + if (global_rank == 0) { file.pFile = fopen (out.timingfile,"w"); if (param.num_threads == 1) -- GitLab