diff --git a/src/tree/tree.c b/src/tree/tree.c index 73bf9afc1d8cc51656efb554ae3dcc94ab012e6e..771b40a8e8c203116f130c529c01f8ca1e9bd641 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -1406,10 +1406,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre rcv_displ[0] = 0; snd_displ[0] = 0; - /* - rcv_count[0] = point_to_rcv_count[0] * (1 + ctx -> dims); - snd_count[0] = point_to_snd_count[0] * (1 + ctx -> dims); - */ rcv_count[0] = point_to_rcv_count[0] * (ctx -> dims); snd_count[0] = point_to_snd_count[0] * (ctx -> dims); @@ -1420,10 +1416,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre for(int i = 1; i < ctx -> world_size; ++i) { - /* - rcv_count[i] = point_to_rcv_count[i] * (1 + ctx -> dims); - snd_count[i] = point_to_snd_count[i] * (1 + ctx -> dims); - */ rcv_count[i] = point_to_rcv_count[i] * (ctx -> dims); snd_count[i] = point_to_snd_count[i] * (ctx -> dims); @@ -1435,10 +1427,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre snd_displ[i] = snd_displ[i - 1] + snd_count[i - 1]; } - /* - float_t* __rcv_points = (float_t*)malloc(tot_points_rcv * (1 + ctx -> dims) * sizeof(float_t)); - float_t* __snd_points = (float_t*)malloc(tot_points_snd * (1 + ctx -> dims) * sizeof(float_t)); - */ float_t* __rcv_points = (float_t*)malloc(tot_points_rcv * (ctx -> dims) * sizeof(float_t)); float_t* __snd_points = (float_t*)malloc(tot_points_snd * (ctx -> dims) * sizeof(float_t)); @@ -1472,20 +1460,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre heap_node* __heap_batches_to_rcv = (heap_node*)malloc((uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node)); - /* - for(int i = 0; i < ctx -> world_size; ++i) - { - if(i == ctx -> mpi_rank) - { - DB_PRINT("[RANK %d]\t", ctx -> mpi_rank); - DB_PRINT("tot point rcv %d tot point send %d \t",tot_points_rcv, tot_points_snd); - DB_PRINT("bytes rcv %lu snd %lu", (uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node), (uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node)); - DB_PRINT("\n"); - } - } - */ - - if( __heap_batches_to_rcv == NULL) { DB_PRINT("Rank %d failed to allocate rcv_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node)); @@ -1529,19 +1503,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre MPI_DB_PRINT("[MASTER] Working on recieved points\n"); MPI_Barrier(ctx -> mpi_communicator); - /* - for(int i = 0; i < ctx -> world_size; ++i) - { - if(i == ctx -> mpi_rank) - { - DB_PRINT("[RANK %d]\t", ctx -> mpi_rank); - for(int i = 0; i < ctx -> world_size; ++i) - //DB_PRINT("%d\t",point_to_rcv_count[i]); - DB_PRINT("%d\t",point_to_rcv_count[i]); - DB_PRINT("\n"); - } - } - */ TIME_START for(int p = 0; p < ctx -> world_size; ++p) @@ -1584,85 +1545,17 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre MPI_Type_contiguous(k * sizeof(heap_node), MPI_CHAR, &MPI_my_heap); MPI_Barrier(ctx -> mpi_communicator); MPI_Type_commit(&MPI_my_heap); - /* - - //MPI_Alltoallv(__heap_batches_to_snd, rcv_count, rcv_displ, MPI_CHAR, - // __heap_batches_to_rcv, snd_count, snd_displ, MPI_CHAR, ctx -> mpi_communicator ); - - - - MPI_Alltoallv(__heap_batches_to_snd, rcv_count, rcv_displ, MPI_my_heap, - __heap_batches_to_rcv, snd_count, snd_displ, MPI_my_heap, ctx -> mpi_communicator ); - */ - - //MPI_Type_free(&MPI_my_heap); heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*)); for(int i = 0; i < ctx -> world_size; ++i) { - //rcv_heap_batches[i] = NULL; - //rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] / sizeof(heap_node); rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k; } - /* - * DIFFERENT APPROACH - */ - - /* - uint32_t n_points_threshold = 4294967296 / (k * sizeof(heap_node)); - int default_msg_len = n_points_threshold / 2; - - int* already_sent_points = (int*)malloc(ctx -> world_size * sizeof(int)); - int* already_rcvd_points = (int*)malloc(ctx -> world_size * sizeof(int)); - - - - for(int i = 0; i < ctx -> world_size; ++i) - { - if(i == ctx -> mpi_rank) - { - for(int j = 0; j < ctx -> world_size; ++j) - { - int count = 0; - while(count < point_to_snd_count[i]) - { - int count; - MPI_Status status; - MPI_Probe(j, MPI_ANY_TAG, ctx -> mpi_communicator, &status); - MPI_Get_count(&status, MPI_my_heap, &count); - - MPI_Recv(rcv_heap_batches[j] + k * already_rcvd_points[j], - count, MPI_my_heap, j, MPI_ANY_TAG, ctx -> mpi_communicator, &status); - - already_rcvd_points[j] += count; - //MPI_Recv(rcv_heap_batches[j], point_to_snd_count[j], MPI_my_heap, j, MPI_ANY_TAG, ctx -> mpi_communicator, &status); - } - } - } - else - { - - int count = 0; - if(point_to_rcv_count[i] > 0) - { - while(already_sent_points[i] < point_to_rcv_count[i]) - { - MPI_Request request; - //MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request) - count = MIN(default_msg_len, point_to_rcv_count[i] - already_sent_points[i] ); - MPI_Send(heap_batches_per_node[i] + k * already_sent_points[i], count, MPI_my_heap, i, 0, ctx -> mpi_communicator); - already_sent_points[i] += count; - } - } - } - } - */ - - /* ------------------------------------- * ALTERNATIVE TO ALL TO ALL FOR BIG MSG - * HERE IT BREAKS + * HERE IT BREAKS mpi cannot handle msg + * lager than 4GB * ------------------------------------- */ MPI_Barrier(ctx -> mpi_communicator); @@ -1687,7 +1580,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre while(already_sent_points[i] < point_to_rcv_count[i]) { MPI_Request request; - //MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request) count = MIN(default_msg_len, point_to_rcv_count[i] - already_sent_points[i] ); MPI_Isend( heap_batches_per_node[i] + k * already_sent_points[i], count, MPI_my_heap, i, 0, ctx -> mpi_communicator, &request); @@ -1705,6 +1597,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre int count; int source = status.MPI_SOURCE; MPI_Get_count(&status, MPI_my_heap, &count); + /* recieve each slice */ MPI_Recv(rcv_heap_batches[source] + k * already_rcvd_points[source], count, MPI_my_heap, source, MPI_ANY_TAG, ctx -> mpi_communicator, &status); @@ -1720,6 +1613,11 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre TIME_STOP + /* + * actually can be slimmer, we can send only the possible points to insert + * requires an all to all and then to specify the point in to insert + */ + /* merge old with new heaps */ MPI_DB_PRINT("[MASTER] Merging resutls\n"); @@ -1732,7 +1630,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre for(int b = 0; b < point_to_snd_count[i]; ++b) { int idx = local_idx_of_the_point[i][b]; - //MPI_DB_PRINT("%d %d %d\n",i,b,idx); /* retrieve the heap */ heap H; H.count = k; @@ -1741,7 +1638,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre /* insert the points into the heap */ for(int j = 0; j < k; ++j) { - // MPI_DB_PRINT(" -- inserting %u %lf max_dist %lf \n", H.data[j].value, H.data[j].array_idx, dp_info[idx].ngbh.data[0].value); insert_max_heap(&(dp_info[idx].ngbh), H.data[j].value, H.data[j].array_idx); } } @@ -1782,11 +1678,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre for(int i = 0; i < ctx -> world_size; ++i) { - //if(heap_batches_per_node[i]) free(heap_batches_per_node[i]); if(data_to_send_per_proc[i]) free(data_to_send_per_proc[i]); if(local_idx_of_the_point[i]) free(local_idx_of_the_point[i]); - //if(rcv_work_batches[i]) free(rcv_work_batches[i]); - //if(rcv_heap_batches[i]) free(rcv_heap_batches[i]); } free(heap_batches_per_node); @@ -1886,24 +1779,6 @@ void ordered_data_to_file(global_context_t* ctx) void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) { float_t *data; - /* generate random points */ - - /* - if(ctx -> mpi_rank == 0) - { - data = (float_t*)malloc(dims*n*sizeof(float_t)); - //for(size_t i = 0; i < dims * n; ++i) data[i] = (float_t)rand()/(float_t)(RAND_MAX); - //for(size_t i = 0; i < n; ++i) - // for(size_t j = 0; j < dims; ++j) - // data[i*dims + j] = (float_t)i; - - for(size_t i = 0; i < dims * n; ++i) data[i] = (float_t)i; - ctx -> dims = dims; - ctx -> n_points = n; - } - */ - /* read from files */ - if (ctx->mpi_rank == 0) { @@ -1911,19 +1786,19 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) //ctx->dims = 2; //data = read_data_file(ctx, "../norm_data/50_blobs.npy", MY_TRUE); // std_g0163178_Me14_091_0000 - // + // 190M points // std_g2980844_091_0000 data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE); - /* 10^6 points ca.*/ + /* 1M points ca.*/ //data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE); - /* 10^7 ~ 8M points */ + /* 8M points */ //data = read_data_file(ctx,"../norm_data/std_g0144846_Me14_091_0001",MY_TRUE); - //88M BREAKS + //88M //data = read_data_file(ctx,"../norm_data/std_g5503149_091_0000",MY_TRUE); // @@ -1938,13 +1813,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) mpi_printf(ctx, "Read %lu points in %u dims\n", ctx->n_points, ctx->dims); } - - /* - ctx -> dims = 2; - ctx -> n_points = 1000000; - generate_random_matrix(&data, ctx -> dims , ctx -> n_points, ctx); - */ - /* communicate the total number of points*/ MPI_Bcast(&(ctx->dims), 1, MPI_UINT32_T, 0, ctx->mpi_communicator); MPI_Bcast(&(ctx->n_points), 1, MPI_UINT64_T, 0, ctx->mpi_communicator); @@ -1969,12 +1837,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) ctx->local_n_points = send_counts[ctx->mpi_rank] / ctx->dims; - /* - * MPI_Scatterv(const void *sendbuf, const int *sendcounts, const int *displs, - MPI_Datatype sendtype, void - *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) - */ - float_t *pvt_data = (float_t *)malloc(send_counts[ctx->mpi_rank] * sizeof(float_t)); MPI_Scatterv(data, send_counts, displacements, MPI_MY_FLOAT, pvt_data, send_counts[ctx->mpi_rank], MPI_MY_FLOAT, 0, ctx->mpi_communicator); @@ -2024,9 +1886,10 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) ordered_data_to_file(ctx); #endif - top_tree_free(ctx, &tree); + top_tree_free(ctx, &tree); kdtree_v2_free(&local_tree); + free(send_counts); free(displacements); free(dp_info);