From 2621837706802987fb59d0e497b1bb616ee729a0 Mon Sep 17 00:00:00 2001 From: lykos98 Date: Thu, 14 Mar 2024 22:42:05 +0100 Subject: [PATCH] Working tree --- src/tree/tree.c | 116 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 82 insertions(+), 34 deletions(-) diff --git a/src/tree/tree.c b/src/tree/tree.c index bb8a364..6183f7e 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -18,8 +18,8 @@ #include #include -//#define WRITE_NGBH -//#define WRITE_TOP_NODES +#define WRITE_NGBH +#define WRITE_TOP_NODES #ifdef USE_FLOAT32 #define MPI_MY_FLOAT MPI_FLOAT @@ -1380,6 +1380,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre float_t* __rcv_points = (float_t*)malloc(tot_points_rcv * (1 + ctx -> dims) * sizeof(float_t)); float_t* __snd_points = (float_t*)malloc(tot_points_snd * (1 + ctx -> dims) * sizeof(float_t)); + /* copy data to send in contiguous memory */ for(int i = 0; i < ctx -> world_size; ++i) { @@ -1406,10 +1407,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre heap_node* __heap_batches_to_snd = (heap_node*)malloc(k * tot_points_rcv * sizeof(heap_node)); heap_node* __heap_batches_to_rcv = (heap_node*)malloc(k * tot_points_snd * sizeof(heap_node)); - /* - * need sizes in bytes - */ - rcv_displ[0] = 0; snd_displ[0] = 0; /* @@ -1471,14 +1468,24 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre * Have originally sended */ MPI_DB_PRINT("[MASTER] Sending out results\n"); + for(int i = 0; i < ctx -> world_size; ++i) + { + if(i == ctx -> mpi_rank) + { + DB_PRINT("[RANK %d]\t", ctx -> mpi_rank); + for(int i = 0; i < ctx -> world_size; ++i) + //DB_PRINT("%d\t",point_to_rcv_count[i]); + DB_PRINT("%d\t",point_to_rcv_count[i]); + DB_PRINT("\n"); + } + } - - MPI_Datatype MPI_my_heap; MPI_Type_contiguous(k * sizeof(heap_node), MPI_CHAR, &MPI_my_heap); MPI_Barrier(ctx -> mpi_communicator); MPI_Type_commit(&MPI_my_heap); + /* //MPI_Alltoallv(__heap_batches_to_snd, rcv_count, rcv_displ, MPI_CHAR, // __heap_batches_to_rcv, snd_count, snd_displ, MPI_CHAR, ctx -> mpi_communicator ); @@ -1487,8 +1494,9 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre MPI_Alltoallv(__heap_batches_to_snd, rcv_count, rcv_displ, MPI_my_heap, __heap_batches_to_rcv, snd_count, snd_displ, MPI_my_heap, ctx -> mpi_communicator ); + */ - MPI_Type_free(&MPI_my_heap); + //MPI_Type_free(&MPI_my_heap); heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*)); for(int i = 0; i < ctx -> world_size; ++i) { @@ -1496,22 +1504,72 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre //rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] / sizeof(heap_node); rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k; } - - - /* - * send out heaps - * and rcv counterparts + /* + * DIFFERENT APPROACH */ + /* + uint32_t n_points_threshold = 4294967296 / (k * sizeof(heap_node)); + int default_msg_len = n_points_threshold / 2; + + int* already_sent_points = (int*)malloc(ctx -> world_size * sizeof(int)); + int* already_rcvd_points = (int*)malloc(ctx -> world_size * sizeof(int)); + + + + for(int i = 0; i < ctx -> world_size; ++i) + { + if(i == ctx -> mpi_rank) + { + for(int j = 0; j < ctx -> world_size; ++j) + { + int count = 0; + while(count < point_to_snd_count[i]) + { + int count; + MPI_Status status; + MPI_Probe(j, MPI_ANY_TAG, ctx -> mpi_communicator, &status); + MPI_Get_count(&status, MPI_my_heap, &count); + + MPI_Recv(rcv_heap_batches[j] + k * already_rcvd_points[j], + count, MPI_my_heap, j, MPI_ANY_TAG, ctx -> mpi_communicator, &status); + + already_rcvd_points[j] += count; + //MPI_Recv(rcv_heap_batches[j], point_to_snd_count[j], MPI_my_heap, j, MPI_ANY_TAG, ctx -> mpi_communicator, &status); + } + } + } + else + { + + int count = 0; + if(point_to_rcv_count[i] > 0) + { + while(already_sent_points[i] < point_to_rcv_count[i]) + { + MPI_Request request; + //MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request) + count = MIN(default_msg_len, point_to_rcv_count[i] - already_sent_points[i] ); + MPI_Send(heap_batches_per_node[i] + k * already_sent_points[i], count, MPI_my_heap, i, 0, ctx -> mpi_communicator); + already_sent_points[i] += count; + } + } + } + } + */ + + /* ------------------------------------- * ALTERNATIVE TO ALL TO ALL FOR BIG MSG + * HERE IT BREAKS * ------------------------------------- */ - /* + MPI_Barrier(ctx -> mpi_communicator); + /* heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*)); for(int i = 0; i < ctx -> world_size; ++i) { @@ -1519,6 +1577,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre //rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] / sizeof(heap_node); rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k; } + */ uint32_t n_points_threshold = 4294967296 / (k * sizeof(heap_node)); int default_msg_len = n_points_threshold / 2; @@ -1526,17 +1585,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre int* already_sent_points = (int*)malloc(ctx -> world_size * sizeof(int)); int* already_rcvd_points = (int*)malloc(ctx -> world_size * sizeof(int)); - for(int i = 0; i < ctx -> world_size; ++i) - { - if(i == ctx -> mpi_rank) - { - DB_PRINT("[RANK %d]\t", ctx -> mpi_rank); - for(int i = 0; i < ctx -> world_size; ++i) - //DB_PRINT("%d\t",point_to_rcv_count[i]); - DB_PRINT("%d\t",point_to_rcv_count[i]/default_msg_len); - DB_PRINT("\n"); - } - } for(int i = 0; i < ctx -> world_size; ++i) { @@ -1549,7 +1597,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre int count = 0; if(point_to_rcv_count[i] > 0) { - while(count < point_to_rcv_count[i]) + while(already_sent_points[i] < point_to_rcv_count[i]) { MPI_Request request; //MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request) @@ -1561,8 +1609,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre } } - - MPI_Barrier(ctx -> mpi_communicator); MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status); //DB_PRINT("%d %p %p\n",ctx -> mpi_rank, &flag, &status); @@ -1584,12 +1630,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre free(already_sent_points); free(already_rcvd_points); - */ - - - /* merge old with new heaps */ MPI_DB_PRINT("[MASTER] Merging resutls\n"); @@ -1777,12 +1819,18 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) //ctx->dims = 2; //data = read_data_file(ctx, "../norm_data/50_blobs.npy", MY_TRUE); // std_g0163178_Me14_091_0000 - data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE); + + /* 10^6 points ca.*/ + //data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE); + + /* 10^7 ~ 8M points */ //data = read_data_file(ctx,"../norm_data/std_g0163178_Me14_091_0001",MY_TRUE); + data = read_data_file(ctx,"../norm_data/std_g5503149_091_0001",MY_TRUE); ctx->dims = 5; // ctx -> n_points = 48*5*2000; ctx->n_points = ctx->n_points / ctx->dims; + ctx->n_points = ctx->n_points / 2; //ctx -> n_points = ctx -> world_size * 1000; mpi_printf(ctx, "Read %lu points in %u dims\n", ctx->n_points, ctx->dims); } -- GitLab