From 7e29928535ed6bfb2e0ffb7d1da982d3499e218b Mon Sep 17 00:00:00 2001 From: lykos98 Date: Thu, 14 Mar 2024 17:03:53 +0100 Subject: [PATCH] Cleaned up code, trying to resolve copying back results from other nodes --- check.py | 15 ++--- src/main/main.c | 8 ++- src/main/main/main.c | 44 ------------- src/tree/tree.c | 152 +++++++++++++++++++++++++++++++++++++++---- 4 files changed, 150 insertions(+), 69 deletions(-) delete mode 100644 src/main/main/main.c diff --git a/check.py b/check.py index 219ae92..cd2e2f7 100644 --- a/check.py +++ b/check.py @@ -5,11 +5,11 @@ import matplotlib.pyplot as plt import numpy as np from sklearn.neighbors import NearestNeighbors -ndims = 2 -k = 100 -p = 12 +ndims = 5 +k = 500 +p = 2 -with open("bb/nodes_50_blobs_more_var.csv","r") as f: +with open("bb/top_nodes.csv","r") as f: l = f.readlines() def parse_lines(l,n_dims): @@ -105,11 +105,6 @@ if __name__ == "__main__": if not np.isclose(d1,d2): abs_errors += 1 same_dist += 1 - print(" Found error in ", w[0], d1, d2) + #print(" Found error in ", w[0], d1, d2) print(f"Found {abs_errors} errors") - - - - - diff --git a/src/main/main.c b/src/main/main.c index 56d3763..710657b 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -6,7 +6,6 @@ int main(int argc, char** argv) { #if defined (_OPENMP) - printf("Running Hybrid (Openmp + MPI) code\n"); int mpi_provided_thread_level; MPI_Init_thread( &argc, &argv, MPI_THREAD_FUNNELED, &mpi_provided_thread_level); if ( mpi_provided_thread_level < MPI_THREAD_FUNNELED ) @@ -16,7 +15,6 @@ int main(int argc, char** argv) { exit( 1 ); } #else - printf("Running pure MPI code\n"); MPI_Init(NULL, NULL); #endif @@ -28,6 +26,12 @@ int main(int argc, char** argv) { ctx.mpi_communicator = MPI_COMM_WORLD; get_context(&ctx); + + #if defined (_OPENMP) + mpi_printf(&ctx,"Running Hybrid (Openmp + MPI) code\n"); + #else + mpi_printf(&ctx,"Running pure MPI code\n"); + #endif /* * Mock reading some files, one for each processor diff --git a/src/main/main/main.c b/src/main/main/main.c deleted file mode 100644 index e018f07..0000000 --- a/src/main/main/main.c +++ /dev/null @@ -1,44 +0,0 @@ -#include -#include -#include "../common/common.h" -#include "../tree/tree.h" - -int main(int argc, char** argv) { - MPI_Init(NULL, NULL); - - char processor_name[MPI_MAX_PROCESSOR_NAME]; - int name_len; - MPI_Get_processor_name(processor_name, &name_len); - - global_context_t ctx; - - ctx.mpi_communicator = MPI_COMM_WORLD; - get_context(&ctx); - - /* - * Mock reading some files, one for each processor - */ - - int d = 5; - - float_t* data; - - /* - * Generate a random matrix of lenght of some kind - */ - if(ctx.mpi_rank == 0) - { - simulate_master_read_and_scatter(5, 1000000, &ctx); - } - else - { - simulate_master_read_and_scatter(0, 0, &ctx); - } - - //free(data); - free_context(&ctx); - MPI_Finalize(); -} - - - diff --git a/src/tree/tree.c b/src/tree/tree.c index 119cfa6..bb8a364 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -18,8 +18,8 @@ #include #include -#define WRITE_NGBH -#define WRITE_TOP_NODES +//#define WRITE_NGBH +//#define WRITE_TOP_NODES #ifdef USE_FLOAT32 #define MPI_MY_FLOAT MPI_FLOAT @@ -962,7 +962,7 @@ void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree if(I_AM_MASTER) { //tree_print(ctx, tree -> root); - write_nodes_to_file(ctx, tree, "bb/nodes_50_blobs_more_var.csv"); + write_nodes_to_file(ctx, tree, "bb/top_nodes.csv"); } #endif @@ -1289,10 +1289,11 @@ void convert_heap_idx_to_global(global_context_t* ctx, heap* H) } } + void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtree_t* top_tree, kdtree_v2* local_tree, float_t* data, int k) { /* local search */ - MPI_DB_PRINT("Ngbh search\n"); + MPI_DB_PRINT("[MASTER] Local ngbh search "); #pragma omp parallel for for(int p = 0; p < ctx -> local_n_points; ++p) { @@ -1304,9 +1305,10 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre dp_info[idx].is_center = 0; dp_info[idx].array_idx = idx; } - MPI_DB_PRINT("Done\n"); + MPI_DB_PRINT("---> Done\n"); + MPI_DB_PRINT("[MASTER] Finding point to send to other procs\n"); /* find if a points needs a refine on the global tree */ float_t** data_to_send_per_proc = (float_t**)malloc(ctx -> world_size * sizeof(float_t*)); int** local_idx_of_the_point = (int**)malloc(ctx -> world_size * sizeof(int*)); @@ -1340,6 +1342,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre point_to_snd_count, point_to_snd_capacity); } + MPI_DB_PRINT("[MASTER] Sending points to send to other procs\n"); + int* point_to_rcv_count = (int*)malloc(ctx -> world_size * sizeof(int)); /* exchange points to work on*/ @@ -1408,14 +1412,23 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre rcv_displ[0] = 0; snd_displ[0] = 0; + /* rcv_count[0] = point_to_rcv_count[0] * k * sizeof(heap_node); snd_count[0] = point_to_snd_count[0] * k * sizeof(heap_node); + */ + rcv_count[0] = point_to_rcv_count[0]; + snd_count[0] = point_to_snd_count[0]; for(int i = 1; i < ctx -> world_size; ++i) { + /* rcv_count[i] = point_to_rcv_count[i] * k * sizeof(heap_node); snd_count[i] = point_to_snd_count[i] * k * sizeof(heap_node); + */ + + rcv_count[i] = point_to_rcv_count[i]; + snd_count[i] = point_to_snd_count[i]; rcv_displ[i] = rcv_displ[i - 1] + rcv_count[i - 1]; snd_displ[i] = snd_displ[i - 1] + snd_count[i - 1]; @@ -1425,18 +1438,20 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre for(int p = 0; p < ctx -> world_size; ++p) { //heap_batches_per_node[p] = NULL; - heap_batches_per_node[p] = __heap_batches_to_snd + rcv_displ[p] / sizeof(heap_node); + //heap_batches_per_node[p] = __heap_batches_to_snd + rcv_displ[p] / sizeof(heap_node); + heap_batches_per_node[p] = __heap_batches_to_snd + rcv_displ[p] * k; } /* compute everything */ + MPI_DB_PRINT("[MASTER] Working on recieved points\n"); - #pragma omp parallel for for(int p = 0; p < ctx -> world_size; ++p) { if(point_to_rcv_count[p] > 0) //if(count_rcv_work_batches[p] > 0) { //heap_batches_per_node[p] = (heap_node*)malloc(k * point_to_rcv_count[p] * sizeof(heap_node)); + #pragma omp parallel for for(int batch = 0; batch < point_to_rcv_count[p]; ++batch) { heap H; @@ -1455,23 +1470,130 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre * counts are inverted since I have to recieve as many batches as points I * Have originally sended */ - MPI_Alltoallv(__heap_batches_to_snd, rcv_count, rcv_displ, MPI_CHAR, - __heap_batches_to_rcv, snd_count, snd_displ, MPI_CHAR, ctx -> mpi_communicator ); + MPI_DB_PRINT("[MASTER] Sending out results\n"); + + + + + MPI_Datatype MPI_my_heap; + MPI_Type_contiguous(k * sizeof(heap_node), MPI_CHAR, &MPI_my_heap); + MPI_Barrier(ctx -> mpi_communicator); + MPI_Type_commit(&MPI_my_heap); + + //MPI_Alltoallv(__heap_batches_to_snd, rcv_count, rcv_displ, MPI_CHAR, + // __heap_batches_to_rcv, snd_count, snd_displ, MPI_CHAR, ctx -> mpi_communicator ); + + + + MPI_Alltoallv(__heap_batches_to_snd, rcv_count, rcv_displ, MPI_my_heap, + __heap_batches_to_rcv, snd_count, snd_displ, MPI_my_heap, ctx -> mpi_communicator ); + + MPI_Type_free(&MPI_my_heap); + heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*)); + for(int i = 0; i < ctx -> world_size; ++i) + { + //rcv_heap_batches[i] = NULL; + //rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] / sizeof(heap_node); + rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k; + } + + /* * send out heaps * and rcv counterparts */ + /* ------------------------------------- + * ALTERNATIVE TO ALL TO ALL FOR BIG MSG + * ------------------------------------- */ + + /* + MPI_Barrier(ctx -> mpi_communicator); + + heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*)); for(int i = 0; i < ctx -> world_size; ++i) { //rcv_heap_batches[i] = NULL; - rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] / sizeof(heap_node); + //rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] / sizeof(heap_node); + rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k; } + uint32_t n_points_threshold = 4294967296 / (k * sizeof(heap_node)); + int default_msg_len = n_points_threshold / 2; + + int* already_sent_points = (int*)malloc(ctx -> world_size * sizeof(int)); + int* already_rcvd_points = (int*)malloc(ctx -> world_size * sizeof(int)); + + for(int i = 0; i < ctx -> world_size; ++i) + { + if(i == ctx -> mpi_rank) + { + DB_PRINT("[RANK %d]\t", ctx -> mpi_rank); + for(int i = 0; i < ctx -> world_size; ++i) + //DB_PRINT("%d\t",point_to_rcv_count[i]); + DB_PRINT("%d\t",point_to_rcv_count[i]/default_msg_len); + DB_PRINT("\n"); + } + } + + for(int i = 0; i < ctx -> world_size; ++i) + { + already_sent_points[i] = 0; + already_rcvd_points[i] = 0; + } + + for(int i = 0; i < ctx -> world_size; ++i) + { + int count = 0; + if(point_to_rcv_count[i] > 0) + { + while(count < point_to_rcv_count[i]) + { + MPI_Request request; + //MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request) + count = MIN(default_msg_len, point_to_rcv_count[i] - already_sent_points[i] ); + MPI_Isend( heap_batches_per_node[i] + k * already_sent_points[i], count, + MPI_my_heap, i, 0, ctx -> mpi_communicator, &request); + already_sent_points[i] += count; + } + } + } + + + + MPI_Barrier(ctx -> mpi_communicator); + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status); + //DB_PRINT("%d %p %p\n",ctx -> mpi_rank, &flag, &status); + while(flag) + { + MPI_Request request; + int count; + int source = status.MPI_SOURCE; + MPI_Get_count(&status, MPI_my_heap, &count); + + MPI_Recv(rcv_heap_batches[source] + k * already_rcvd_points[source], + count, MPI_my_heap, source, MPI_ANY_TAG, ctx -> mpi_communicator, &status); + + already_rcvd_points[source] += count; + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status); + + } + MPI_Barrier(ctx -> mpi_communicator); + + free(already_sent_points); + free(already_rcvd_points); + */ + + + + + /* merge old with new heaps */ + MPI_DB_PRINT("[MASTER] Merging resutls\n"); + for(int i = 0; i < ctx -> world_size; ++i) { #pragma omp paralell for @@ -1656,11 +1778,12 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) //data = read_data_file(ctx, "../norm_data/50_blobs.npy", MY_TRUE); // std_g0163178_Me14_091_0000 data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE); + //data = read_data_file(ctx,"../norm_data/std_g0163178_Me14_091_0001",MY_TRUE); ctx->dims = 5; // ctx -> n_points = 48*5*2000; ctx->n_points = ctx->n_points / ctx->dims; - //ctx -> n_points = 6 * 500; + //ctx -> n_points = ctx -> world_size * 1000; mpi_printf(ctx, "Read %lu points in %u dims\n", ctx->n_points, ctx->dims); } @@ -1729,7 +1852,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) kdtree_v2 local_tree; kdtree_v2_init( &local_tree, ctx -> local_data, ctx -> local_n_points, (unsigned int)ctx -> dims); - int k = 100; + int k = 500; datapoint_info_t* dp_info = (datapoint_info_t*)malloc(ctx -> local_n_points * sizeof(datapoint_info_t)); build_local_tree(ctx, &local_tree); @@ -1737,7 +1860,10 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, k); - ordered_data_to_file(ctx); + #if defined (WRITE_NGBH) + ordered_data_to_file(ctx); + #endif + top_tree_free(ctx, &tree); kdtree_v2_free(&local_tree); -- GitLab