diff --git a/src/adp/adp.c b/src/adp/adp.c index 064fd7583c94c60077384d9110a0e74f6e0b72d0..616e8301d3786a2f4b4b1b3018ae5d40dfb25f76 100644 --- a/src/adp/adp.c +++ b/src/adp/adp.c @@ -1320,15 +1320,15 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster) #define RECV 0 #define DO_NOTHING -1 - MPI_Barrier(ctx -> mpi_communicator); - while(ranks > 1) { int dp = ranks % 2; ranks = ranks / 2 + dp; - int send_rcv = ctx -> mpi_rank >= ranks; + int send_rcv = (ctx -> mpi_rank >= ranks); + + MPI_Barrier(ctx -> mpi_communicator); - if(dp && ctx -> mpi_rank == ranks - 1) send_rcv = DO_NOTHING; + if(dp && ctx -> mpi_rank == (ranks - 1)) send_rcv = DO_NOTHING; switch (send_rcv) { @@ -1392,9 +1392,9 @@ void Heuristic2(global_context_t* ctx, clusters_t* cluster) #endif break; } - MPI_Barrier(ctx -> mpi_communicator); #if defined(PRINT_H2_COMM_SCHEME) MPI_DB_PRINT("-----------------\n"); + MPI_Barrier(ctx -> mpi_communicator); #endif } diff --git a/src/common/common.h b/src/common/common.h index 6fa6856975b5249e2e0c83eb3a870496824d9f04..7f64c295c8f612152540ec98b926aa1a270417d6 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -8,6 +8,16 @@ #include "../tree/heap.h" //#include <stdarg.h> +//#define WRITE_NGBH +//#define WRITE_TOP_NODES +#define WRITE_DENSITY +#define WRITE_CLUSTER_ASSIGN_H1 +//#define WRITE_BORDERS +//#define WRITE_MERGING_TABLE +#define WRITE_FINAL_ASSIGNMENT + +#define PRINT_NGBH_EXCHANGE_SCHEME + typedef struct datapoint_info_t { idx_t array_idx; heap ngbh; @@ -183,3 +193,4 @@ void lu_dynamic_array_init(lu_dynamic_array_t * a); void print_error_code(int err); void ordered_data_to_file(global_context_t* ctx); +void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname); diff --git a/src/main/main.c b/src/main/main.c index 243683164014e9f6a22ea7ce4e51d6a32aad8114..f84d1a5922855e14e7d94d8705b7c6d8894e6984 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -5,6 +5,7 @@ #include "../tree/tree.h" #include "../adp/adp.h" + // #ifdef THREAD_FUNNELED @@ -58,9 +59,9 @@ int main(int argc, char** argv) { #endif #if defined (THREAD_FUNNELED) - mpi_printf(&ctx,"/!\\ Code build with MPI_THREAD_FUNNELED level\n"); + mpi_printf(&ctx,"/!\\ Code built with MPI_THREAD_FUNNELED level\n"); #else - mpi_printf(&ctx,"/!\\ Code build with MPI_THREAD_MULTIPLE level\n"); + mpi_printf(&ctx,"/!\\ Code built with MPI_THREAD_MULTIPLE level\n"); #endif /* @@ -125,7 +126,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) //data = read_data_file(ctx,"../norm_data/std_g1212639_091_0001",MY_TRUE); ctx->dims = 5; - // ctx -> n_points = 5 * 100000; + //ctx -> n_points = 5 * 100000; ctx->n_points = ctx->n_points / ctx->dims; //ctx->n_points = (ctx->n_points * 5) / 10; // ctx -> n_points = ctx -> world_size * 1000; diff --git a/src/tree/tree.c b/src/tree/tree.c index 2ecac4eb5ad28f94170771af0803967f1bd73ead..2dd7c8b303a92ac71b4ae56898f20bfcdeb276b3 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -19,13 +19,6 @@ #include <omp.h> #include <sys/sysinfo.h> -//#define WRITE_NGBH -//#define WRITE_TOP_NODES -#define WRITE_DENSITY -//#define WRITE_CLUSTER_ASSIGN_H1 -//#define WRITE_BORDERS -//#define WRITE_MERGING_TABLE -#define WRITE_FINAL_ASSIGNMENT /* @@ -37,7 +30,7 @@ //#define MAX_MSG_SIZE 4294967296 /* Used slices of 10 mb ? Really good? Maybe at the cause of TID thing */ -#define MAX_MSG_SIZE 1000000000 +#define MAX_MSG_SIZE (10000 * k * sizeof(heap_node)) #define TOP_TREE_RCH 1 @@ -1571,10 +1564,10 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre TIME_START; /* find if a points needs a refine on the global tree */ - float_t** data_to_send_per_proc = (float_t**)MY_MALLOC(ctx -> world_size * sizeof(float_t*)); - int** local_idx_of_the_point = (int**)MY_MALLOC(ctx -> world_size * sizeof(int*)); - int* point_to_snd_count = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); - int* point_to_snd_capacity = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + float_t** data_to_send_per_proc = (float_t**)MY_MALLOC(ctx -> world_size * sizeof(float_t*)); + int** local_idx_of_the_point = (int**)MY_MALLOC(ctx -> world_size * sizeof(int*)); + int* point_to_snd_count = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* point_to_snd_capacity = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); for(int i = 0; i < ctx -> world_size; ++i) { @@ -1591,23 +1584,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre point_to_snd_count[i] = 0; } - /* for each point walk the tree and find to which proc send data */ - /* actually compute intersection of ngbh radius of each point to node box */ - - /* OLD VERSION SINGLE TREE WALK */ - /* - #pragma omp parallel for - for(int i = 0; i < ctx -> local_n_points; ++i) - { - float_t max_dist = dp_info[i].ngbh.data[0].value; - float_t* point = ctx -> local_data + (i * ctx -> dims); - - tree_walk(ctx, top_tree -> root, i, max_dist, - point, data_to_send_per_proc, local_idx_of_the_point, - point_to_snd_count, point_to_snd_capacity); - } - */ - /* NEW VERSION double tree walk */ #pragma omp parallel for for(int i = 0; i < ctx -> local_n_points; ++i) @@ -1709,14 +1685,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre /* Note that I then have to recieve an equal number of heaps as the one I sent out before */ heap_node* __heap_batches_to_snd = (heap_node*)MY_MALLOC((uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node)); - heap_node* __heap_batches_to_rcv = (heap_node*)MY_MALLOC((uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node)); - - - if( __heap_batches_to_rcv == NULL) - { - DB_PRINT("Rank %d failed to allocate rcv_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node)); - exit(1); - } if( __heap_batches_to_snd == NULL) { @@ -1805,11 +1773,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre MPI_Barrier(ctx -> mpi_communicator); MPI_Type_commit(&MPI_my_heap); - heap_node** rcv_heap_batches = (heap_node**)MY_MALLOC(ctx -> world_size * sizeof(heap_node*)); - for(int i = 0; i < ctx -> world_size; ++i) - { - rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k; - } /* ------------------------------------- * ALTERNATIVE TO ALL TO ALL FOR BIG MSG @@ -1842,86 +1805,226 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre int req_idx = 0; + /* ---------------------------------------------------- */ + // FROM HERE + //heap_node* __heap_batches_to_rcv = (heap_node*)MY_MALLOC((uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node)); + //if( __heap_batches_to_rcv == NULL) + //{ + // DB_PRINT("Rank %d failed to allocate rcv_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node)); + // exit(1); + //} + + //heap_node** rcv_heap_batches = (heap_node**)MY_MALLOC(ctx -> world_size * sizeof(heap_node*)); + //for(int i = 0; i < ctx -> world_size; ++i) + //{ + // rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k; + //} + + //HERE + + //for(int i = 0; i < ctx -> world_size; ++i) + //{ + // int count = 0; + // if(ngbh_to_send[i] > 0) + // { + // while(already_sent_points[i] < ngbh_to_send[i]) + // { + // MPI_Request request; + // count = MIN(default_msg_len, ngbh_to_send[i] - already_sent_points[i] ); + // MPI_Isend( heap_batches_per_node[i] + k * already_sent_points[i], count, + // MPI_my_heap, i, 0, ctx -> mpi_communicator, &request); + // already_sent_points[i] += count; + // req_array[req_idx] = request; + // ++req_idx; + // } + // } + //} + + ///* Here it breaks for six nodes */ + + //HERE; + // + //MPI_Barrier(ctx -> mpi_communicator); + //MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status); + ////DB_PRINT("%d %p %p\n",ctx -> mpi_rank, &flag, &status); + //while(flag) + //{ + // MPI_Request request; + // int count; + // int source = status.MPI_SOURCE; + // MPI_Get_count(&status, MPI_my_heap, &count); + // /* recieve each slice */ + + // MPI_Recv(rcv_heap_batches[source] + k * already_rcvd_points[source], + // count, MPI_my_heap, source, MPI_ANY_TAG, ctx -> mpi_communicator, &status); + + // already_rcvd_points[source] += count; + // MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status); + + //} + //MPI_Barrier(ctx -> mpi_communicator); + + //MPI_Testall(req_num, req_array, &flag, MPI_STATUSES_IGNORE); + + //if(flag == 0) + //{ + // DB_PRINT("[!!!] Rank %d has unfinished communications\n", ctx -> mpi_rank); + // exit(1); + //} + //free(req_array); + //free(already_sent_points); + //free(already_rcvd_points); + + //elapsed_time = TIME_STOP; + //LOG_WRITE("Sending results to other proc", elapsed_time); + + ///* merge old with new heaps */ + + //MPI_Barrier(ctx -> mpi_communicator); + + //TIME_START; + + //for(int i = 0; i < ctx -> world_size; ++i) + //{ + // #pragma omp paralell for + // for(int b = 0; b < ngbh_to_recv[i]; ++b) + // { + // int idx = local_idx_of_the_point[i][b]; + // /* retrieve the heap */ + // heap H; + // H.count = k; + // H.N = k; + // H.data = rcv_heap_batches[i] + k*b; + // /* insert the points into the heap */ + // for(int j = 0; j < k; ++j) + // { + // insert_max_heap(&(dp_info[idx].ngbh), H.data[j].value, H.data[j].array_idx); + // } + // } + //} + /* ----------- TO HERE ---------------------------- */ + + // find the maximum number of points to send */ + + + idx_t max_n_recv = 0; for(int i = 0; i < ctx -> world_size; ++i) { - int count = 0; - if(ngbh_to_send[i] > 0) - { - while(already_sent_points[i] < ngbh_to_send[i]) - { - MPI_Request request; - count = MIN(default_msg_len, ngbh_to_send[i] - already_sent_points[i] ); - MPI_Isend( heap_batches_per_node[i] + k * already_sent_points[i], count, - MPI_my_heap, i, 0, ctx -> mpi_communicator, &request); - already_sent_points[i] += count; - req_array[req_idx] = request; - ++req_idx; - } - } + max_n_recv = MAX(max_n_recv, (idx_t)ngbh_to_recv[i]); } - + MPI_DB_PRINT("Using default message lenght %lu\n", default_msg_len); + + heap_node* __heap_batches_to_rcv = (heap_node*)MY_MALLOC((uint64_t)k * (uint64_t)max_n_recv * sizeof(heap_node)); + if( __heap_batches_to_rcv == NULL) + { + DB_PRINT("Rank %d failed to allocate rcv_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)max_n_recv* sizeof(heap_node)); + exit(1); + } + + /* make a ring */ + MPI_Barrier(ctx -> mpi_communicator); - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status); - //DB_PRINT("%d %p %p\n",ctx -> mpi_rank, &flag, &status); - while(flag) + for(int i = 1; i < ctx -> world_size; ++i) { - MPI_Request request; - int count; - int source = status.MPI_SOURCE; - MPI_Get_count(&status, MPI_my_heap, &count); - /* recieve each slice */ + int rank_to_send = (ctx -> mpi_rank + i) % (ctx -> world_size); + int rank_to_recv = (ctx -> world_size + ctx -> mpi_rank - i) % (ctx -> world_size); - MPI_Recv(rcv_heap_batches[source] + k * already_rcvd_points[source], - count, MPI_my_heap, source, MPI_ANY_TAG, ctx -> mpi_communicator, &status); + /* do things */ - already_rcvd_points[source] += count; - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status); + /* send out one batch */ - } - MPI_Barrier(ctx -> mpi_communicator); + #ifdef PRINT_NGBH_EXCHANGE_SCHEME + MPI_DB_PRINT("[--- ROUND %d ----]\n", i); + MPI_Barrier(ctx -> mpi_communicator); + DB_PRINT("[RANK %d] sending to %d tot: %d [%luB]---- recieving from %d %d\n", ctx -> mpi_rank, + rank_to_send, ngbh_to_send[rank_to_send], ngbh_to_send[rank_to_send]*sizeof(heap_node), rank_to_recv, ngbh_to_recv[rank_to_recv]); + #endif + if(ngbh_to_send[rank_to_send] > 0) + { + int count_send = 0; + while(already_sent_points[rank_to_send] < ngbh_to_send[rank_to_send]) + { + MPI_Request request; + count_send = MIN((int)default_msg_len, (int)(ngbh_to_send[rank_to_send] - already_sent_points[rank_to_send] )); + MPI_Isend( heap_batches_per_node[rank_to_send] + k * already_sent_points[rank_to_send], count_send, + MPI_my_heap, rank_to_send, 0, ctx -> mpi_communicator, &request); - MPI_Testall(req_num, req_array, &flag, MPI_STATUSES_IGNORE); + already_sent_points[rank_to_send] += count_send; + req_array[req_idx] = request; + ++req_idx; + } + } - if(flag == 0) - { - DB_PRINT("[!!!] Rank %d has unfinished communications\n", ctx -> mpi_rank); - exit(1); - } - free(req_array); - free(already_sent_points); - free(already_rcvd_points); + if(ngbh_to_send[rank_to_send] != already_sent_points[rank_to_send] || point_to_rcv_count[rank_to_send] != already_sent_points[rank_to_send]) + { + DB_PRINT("Madonnina del mare send [rank %d] %d %d %d\n", ctx -> mpi_rank, ngbh_to_send[rank_to_send], already_sent_points[rank_to_send], point_to_snd_count[rank_to_send]); - elapsed_time = TIME_STOP; - LOG_WRITE("Sending results to other proc", elapsed_time); + } + + MPI_Barrier(ctx -> mpi_communicator); - /* merge old with new heaps */ + if(ngbh_to_recv[rank_to_recv] > 0) + { + flag = 0; + while(already_rcvd_points[rank_to_recv] < ngbh_to_recv[rank_to_recv]) + { + MPI_Probe(rank_to_recv, MPI_ANY_TAG, ctx -> mpi_communicator, &status); + MPI_Request request; + int count_recv; + int source = status.MPI_SOURCE; + MPI_Get_count(&status, MPI_my_heap, &count_recv); + /* recieve each slice */ - MPI_Barrier(ctx -> mpi_communicator); + MPI_Recv(__heap_batches_to_rcv + k * already_rcvd_points[rank_to_recv], + count_recv, MPI_my_heap, source, MPI_ANY_TAG, ctx -> mpi_communicator, &status); - TIME_START; + already_rcvd_points[rank_to_recv] += count_recv; + } + } - for(int i = 0; i < ctx -> world_size; ++i) - { + if(ngbh_to_recv[rank_to_recv] != already_rcvd_points[rank_to_recv] || point_to_snd_count[rank_to_recv] != already_rcvd_points[rank_to_recv]) + { + DB_PRINT("Madonnina del mare [rank %d] %d %d %d\n", ctx -> mpi_rank, ngbh_to_recv[rank_to_recv], already_rcvd_points[rank_to_recv], point_to_snd_count[rank_to_recv]); + + } + /* merge lists */ #pragma omp paralell for - for(int b = 0; b < ngbh_to_recv[i]; ++b) + for(int b = 0; b < ngbh_to_recv[rank_to_recv]; ++b) { - int idx = local_idx_of_the_point[i][b]; + int idx = local_idx_of_the_point[rank_to_recv][b]; /* retrieve the heap */ heap H; H.count = k; H.N = k; - H.data = rcv_heap_batches[i] + k*b; + H.data = __heap_batches_to_rcv + k*b; /* insert the points into the heap */ for(int j = 0; j < k; ++j) { insert_max_heap(&(dp_info[idx].ngbh), H.data[j].value, H.data[j].array_idx); } } + + + MPI_Barrier(ctx -> mpi_communicator); } + + MPI_Testall(req_idx, req_array, &flag, MPI_STATUSES_IGNORE); + + if(flag == 0) + { + DB_PRINT("[!!!] Rank %d has unfinished communications\n", ctx -> mpi_rank); + exit(1); + } + free(req_array); + free(already_sent_points); + free(already_rcvd_points); + + + /* -------------------------------------------------------- */ /* heapsort them */ #pragma omp parallel for @@ -1964,7 +2067,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre free(data_to_send_per_proc); free(local_idx_of_the_point); free(heap_batches_per_node); - free(rcv_heap_batches); + //free(rcv_heap_batches); free(rcv_work_batches); free(point_to_rcv_count); free(point_to_snd_count);