From 1f365d60e99d53c660c860e6e60a89a317b19484 Mon Sep 17 00:00:00 2001 From: lykos98 Date: Fri, 22 Mar 2024 10:37:39 +0100 Subject: [PATCH] fixed bug on big messages --- src/tree/kdtreeV2.c | 2 +- src/tree/tree.c | 101 ++++++++++++++++++++++++++++---------------- 2 files changed, 65 insertions(+), 38 deletions(-) diff --git a/src/tree/kdtreeV2.c b/src/tree/kdtreeV2.c index a097c23..aa0f21e 100644 --- a/src/tree/kdtreeV2.c +++ b/src/tree/kdtreeV2.c @@ -284,7 +284,7 @@ void knn_sub_tree_search_kdtree_v2(FLOAT_TYPE* point, kdnode_v2* root, heap * H) for(size_t i = 0; i < root -> node_list.count; ++i) { kdnode_v2* n = root -> node_list.data[i]; - __builtin_prefetch(root -> node_list.data + i + 1, 0, 3); + //__builtin_prefetch(root -> node_list.data + i + 1, 0, 3); FLOAT_TYPE distance = eud_kdtree_v2(point, n -> data); insert_max_heap(H, distance,n -> array_idx); } diff --git a/src/tree/tree.c b/src/tree/tree.c index 7d35b6a..73bf9af 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -1194,17 +1194,22 @@ void tree_walk( int owner = root -> owner; int idx = point_to_send_count[owner]; int capacity = point_to_send_capacity[owner]; - int len = 1 + ctx -> dims; + //int len = 1 + ctx -> dims; + int len = ctx -> dims; if(idx == capacity) { - data_to_send_per_proc[owner] = realloc(data_to_send_per_proc[owner], (capacity * 1.1) * (1 + ctx -> dims) * sizeof(float_t)); + //data_to_send_per_proc[owner] = realloc(data_to_send_per_proc[owner], (capacity * 1.1) * (1 + ctx -> dims) * sizeof(float_t)); + data_to_send_per_proc[owner] = realloc(data_to_send_per_proc[owner], (capacity * 1.1) * (ctx -> dims) * sizeof(float_t)); local_idx_of_the_point[owner] = realloc(local_idx_of_the_point[owner], (capacity * 1.1) * sizeof(int)); point_to_send_capacity[owner] = capacity * 1.1; } float_t* base = data_to_send_per_proc[owner] + (len * idx); + /* base[0] = max_dist; memcpy(base + 1, point, ctx -> dims * sizeof(float_t)); + */ + memcpy(base, point, ctx -> dims * sizeof(float_t)); local_idx_of_the_point[owner][idx] = point_idx; point_to_send_count[owner]++; @@ -1356,7 +1361,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre for(int i = 0; i < ctx -> world_size; ++i) { - data_to_send_per_proc[i] = (float_t*)malloc(100 * (1 + ctx -> dims) * sizeof(float_t)); + //data_to_send_per_proc[i] = (float_t*)malloc(100 * (1 + ctx -> dims) * sizeof(float_t)); + data_to_send_per_proc[i] = (float_t*)malloc(100 * (ctx -> dims) * sizeof(float_t)); local_idx_of_the_point[i] = (int*)malloc(100 * sizeof(int)); point_to_snd_capacity[i] = 100; point_to_snd_count[i] = 0; @@ -1399,8 +1405,14 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre /*compute counts and displs*/ rcv_displ[0] = 0; snd_displ[0] = 0; + + /* rcv_count[0] = point_to_rcv_count[0] * (1 + ctx -> dims); snd_count[0] = point_to_snd_count[0] * (1 + ctx -> dims); + */ + + rcv_count[0] = point_to_rcv_count[0] * (ctx -> dims); + snd_count[0] = point_to_snd_count[0] * (ctx -> dims); int tot_points_rcv = point_to_rcv_count[0]; int tot_points_snd = point_to_snd_count[0]; @@ -1408,8 +1420,12 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre for(int i = 1; i < ctx -> world_size; ++i) { + /* rcv_count[i] = point_to_rcv_count[i] * (1 + ctx -> dims); snd_count[i] = point_to_snd_count[i] * (1 + ctx -> dims); + */ + rcv_count[i] = point_to_rcv_count[i] * (ctx -> dims); + snd_count[i] = point_to_snd_count[i] * (ctx -> dims); tot_count += rcv_count[i]; tot_points_rcv += point_to_rcv_count[i]; @@ -1419,8 +1435,12 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre snd_displ[i] = snd_displ[i - 1] + snd_count[i - 1]; } + /* float_t* __rcv_points = (float_t*)malloc(tot_points_rcv * (1 + ctx -> dims) * sizeof(float_t)); float_t* __snd_points = (float_t*)malloc(tot_points_snd * (1 + ctx -> dims) * sizeof(float_t)); + */ + float_t* __rcv_points = (float_t*)malloc(tot_points_rcv * (ctx -> dims) * sizeof(float_t)); + float_t* __snd_points = (float_t*)malloc(tot_points_snd * (ctx -> dims) * sizeof(float_t)); @@ -1445,11 +1465,13 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre int flag; /* prepare heap batches */ - int work_batch_stride = 1 + ctx -> dims; + //int work_batch_stride = 1 + ctx -> dims; + int work_batch_stride = ctx -> dims; heap_node* __heap_batches_to_snd = (heap_node*)malloc((uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node)); heap_node* __heap_batches_to_rcv = (heap_node*)malloc((uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node)); + /* for(int i = 0; i < ctx -> world_size; ++i) { @@ -1462,15 +1484,18 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre } } */ + if( __heap_batches_to_rcv == NULL) { DB_PRINT("Rank %d failed to allocate rcv_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node)); + exit(5); } if( __heap_batches_to_snd == NULL) { DB_PRINT("Rank %d failed to allocate snd_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node)); + exit(5); } MPI_Barrier(ctx -> mpi_communicator); @@ -1492,10 +1517,11 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre snd_displ[i] = snd_displ[i - 1] + snd_count[i - 1]; } + heap_node** heap_batches_per_node = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*)); for(int p = 0; p < ctx -> world_size; ++p) { - heap_batches_per_node[p] = __heap_batches_to_snd + rcv_displ[p] * k; + heap_batches_per_node[p] = __heap_batches_to_snd + (uint64_t)rcv_displ[p] * (uint64_t)k; } TIME_STOP @@ -1503,10 +1529,24 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre MPI_DB_PRINT("[MASTER] Working on recieved points\n"); MPI_Barrier(ctx -> mpi_communicator); + /* + for(int i = 0; i < ctx -> world_size; ++i) + { + if(i == ctx -> mpi_rank) + { + DB_PRINT("[RANK %d]\t", ctx -> mpi_rank); + for(int i = 0; i < ctx -> world_size; ++i) + //DB_PRINT("%d\t",point_to_rcv_count[i]); + DB_PRINT("%d\t",point_to_rcv_count[i]); + DB_PRINT("\n"); + } + } + */ + TIME_START for(int p = 0; p < ctx -> world_size; ++p) { - if(point_to_rcv_count[p] > 0) + if(point_to_rcv_count[p] > 0 && p != ctx -> mpi_rank) //if(count_rcv_work_batches[p] > 0) { //heap_batches_per_node[p] = (heap_node*)malloc(k * point_to_rcv_count[p] * sizeof(heap_node)); @@ -1516,8 +1556,10 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre heap H; H.count = 0; H.N = k; - H.data = heap_batches_per_node[p] + k * batch; - float_t* point = rcv_work_batches[p] + batch * work_batch_stride + 1; + H.data = heap_batches_per_node[p] + (uint64_t)k * (uint64_t)batch; + init_heap(&H); + //float_t* point = rcv_work_batches[p] + batch * work_batch_stride + 1; + float_t* point = rcv_work_batches[p] + (uint64_t)batch * (uint64_t)work_batch_stride; knn_sub_tree_search_kdtree_v2(point, local_tree -> root, &H); convert_heap_idx_to_global(ctx, &H); } @@ -1529,24 +1571,13 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre * counts are inverted since I have to recieve as many batches as points I * Have originally sended */ + MPI_Barrier(ctx -> mpi_communicator); TIME_STOP MPI_DB_PRINT("[MASTER] Sending out results\n"); - MPI_Barrier(ctx -> mpi_communicator); TIME_START - /* - for(int i = 0; i < ctx -> world_size; ++i) - { - if(i == ctx -> mpi_rank) - { - DB_PRINT("[RANK %d]\t", ctx -> mpi_rank); - for(int i = 0; i < ctx -> world_size; ++i) - //DB_PRINT("%d\t",point_to_rcv_count[i]); - DB_PRINT("%d\t",point_to_rcv_count[i]); - DB_PRINT("\n"); - } - } - */ + + MPI_Datatype MPI_my_heap; @@ -1565,6 +1596,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre */ //MPI_Type_free(&MPI_my_heap); + heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*)); for(int i = 0; i < ctx -> world_size; ++i) { @@ -1633,20 +1665,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre * HERE IT BREAKS * ------------------------------------- */ - MPI_Barrier(ctx -> mpi_communicator); - - - /* - heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*)); - for(int i = 0; i < ctx -> world_size; ++i) - { - //rcv_heap_batches[i] = NULL; - //rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] / sizeof(heap_node); - rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k; - } - */ - uint32_t n_points_threshold = 4294967296 / (k * sizeof(heap_node)); int default_msg_len = n_points_threshold / 2; @@ -1704,6 +1723,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre /* merge old with new heaps */ MPI_DB_PRINT("[MASTER] Merging resutls\n"); + MPI_Barrier(ctx -> mpi_communicator); TIME_START for(int i = 0; i < ctx -> world_size; ++i) @@ -1891,13 +1911,17 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) //ctx->dims = 2; //data = read_data_file(ctx, "../norm_data/50_blobs.npy", MY_TRUE); // std_g0163178_Me14_091_0000 + // + // 190M points + // std_g2980844_091_0000 + data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE); /* 10^6 points ca.*/ //data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE); /* 10^7 ~ 8M points */ - data = read_data_file(ctx,"../norm_data/std_g0144846_Me14_091_0001",MY_TRUE); + //data = read_data_file(ctx,"../norm_data/std_g0144846_Me14_091_0001",MY_TRUE); //88M BREAKS //data = read_data_file(ctx,"../norm_data/std_g5503149_091_0000",MY_TRUE); @@ -1985,10 +2009,13 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) kdtree_v2 local_tree; kdtree_v2_init( &local_tree, ctx -> local_data, ctx -> local_n_points, (unsigned int)ctx -> dims); - int k = 400; + int k = 300; datapoint_info_t* dp_info = (datapoint_info_t*)malloc(ctx -> local_n_points * sizeof(datapoint_info_t)); + + TIME_START build_local_tree(ctx, &local_tree); + TIME_STOP mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, k); -- GitLab