From 97cd94f0b632601c1c9a56adf40542f1bc0e35aa Mon Sep 17 00:00:00 2001 From: lykos98 <francy273998@gmail.com> Date: Mon, 6 May 2024 16:01:37 +0200 Subject: [PATCH] refactored code to not use criticals, implemented request of neighbors of neighbors --- src/common/common.c | 48 ++++--- src/common/common.h | 20 +++ src/tree/tree.c | 324 +++++++++++++++++++++++++++++++++++++++----- src/tree/tree.h | 11 -- 4 files changed, 339 insertions(+), 64 deletions(-) diff --git a/src/common/common.c b/src/common/common.c index 5da59fa..d678aba 100644 --- a/src/common/common.c +++ b/src/common/common.c @@ -2,6 +2,8 @@ #include "mpi.h" #include <time.h> +#define FREE_NOT_NULL(x) if(x){free(x); x = NULL;} + void get_context(global_context_t* ctx) { MPI_Comm_size(ctx -> mpi_communicator, &(ctx -> world_size)); @@ -12,29 +14,41 @@ void get_context(global_context_t* ctx) ctx -> ub_box = NULL; ctx -> rank_n_points = (int*)malloc(ctx -> world_size * sizeof(int)); ctx -> rank_idx_start = (int*)malloc(ctx -> world_size * sizeof(int)); + ctx -> idx_halo_points_recv = NULL; + ctx -> idx_halo_points_send = NULL; + ctx -> n_halo_points_recv = NULL; + ctx -> n_halo_points_send = NULL; + ctx -> halo_datapoints = NULL; } void free_context(global_context_t* ctx) { - if(ctx -> local_data) - { - free(ctx -> local_data); - ctx -> local_data = NULL; - } - if(ctx -> ub_box) - { - free(ctx -> ub_box); - ctx -> ub_box = NULL; - } + FREE_NOT_NULL(ctx -> local_data); + FREE_NOT_NULL(ctx -> ub_box); + FREE_NOT_NULL(ctx -> lb_box); - if(ctx -> lb_box) - { - free(ctx -> lb_box); - ctx -> lb_box = NULL; - } - free(ctx -> rank_n_points); - free(ctx -> rank_idx_start); + if(ctx -> halo_datapoints) + { + for(int i = 0; i < ctx -> world_size; ++i) FREE_NOT_NULL(ctx -> halo_datapoints[i]); + } + FREE_NOT_NULL(ctx -> halo_datapoints); + + if(ctx -> idx_halo_points_recv) + { + for(int i = 0; i < ctx -> world_size; ++i) FREE_NOT_NULL(ctx -> idx_halo_points_recv[i]); + } + FREE_NOT_NULL(ctx -> idx_halo_points_recv); + + if(ctx -> idx_halo_points_send) + { + for(int i = 0; i < ctx -> world_size; ++i) FREE_NOT_NULL(ctx -> idx_halo_points_send[i]); + } + FREE_NOT_NULL(ctx -> idx_halo_points_send); + FREE_NOT_NULL(ctx -> n_halo_points_recv); + FREE_NOT_NULL(ctx -> n_halo_points_send); + FREE_NOT_NULL(ctx -> rank_n_points); + FREE_NOT_NULL(ctx -> rank_idx_start); } void free_pointset(pointset_t* ps) diff --git a/src/common/common.h b/src/common/common.h index 6b84cde..1ca7b7f 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -5,8 +5,21 @@ #include <mpi.h> #include <stdint.h> #include <time.h> +#include "../tree/heap.h" //#include <stdarg.h> +typedef struct datapoint_info_t { + idx_t array_idx; + heap ngbh; + float_t g; + float_t log_rho; + float_t log_rho_c; + float_t log_rho_err; + idx_t kstar; + int is_center; + int cluster_idx; +} datapoint_info_t; + #define MAX(A,B) ((A) > (B) ? (A) : (B)) #define MIN(A,B) ((A) < (B) ? (A) : (B)) @@ -98,9 +111,16 @@ struct global_context_t float_t* local_data; float_t* lb_box; float_t* ub_box; + int* n_halo_points_recv; + int* n_halo_points_send; + idx_t** idx_halo_points_recv; + idx_t** idx_halo_points_send; size_t n_points; size_t idx_start; size_t local_n_points; + datapoint_info_t* local_datapoints; + datapoint_info_t** halo_datapoints; + heap_node* __recieved_heap_data; uint32_t dims; int* rank_idx_start; int* rank_n_points; diff --git a/src/tree/tree.c b/src/tree/tree.c index bb5a5d4..7c7d94b 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -2039,7 +2039,7 @@ void ordered_data_to_file(global_context_t* ctx) static inline int foreign_owner(global_context_t* ctx, idx_t idx) { - int owner; + int owner = ctx -> mpi_rank; if( idx >= ctx -> idx_start && idx < ctx -> idx_start + ctx -> local_n_points) { return ctx -> mpi_rank; @@ -2053,38 +2053,27 @@ static inline int foreign_owner(global_context_t* ctx, idx_t idx) return owner; } -static inline void push_on_foreign_idx_list(idx_t element, int owner, int* counts, int* capacities, idx_t** lists) +static inline void append_foreign_idx_list(idx_t element, int owner, int* counts, idx_t** lists) { - if(capacities[owner] == counts[owner]) - { - int new_cap = capacities[owner] * 1.1; - lists[owner] = (idx_t*)realloc(lists[owner], new_cap * sizeof(idx_t)); - capacities[owner] = new_cap; - } - /* find the plausible place */ - int idx_to_insert = counts[owner]; - - int flag = 1; - /* check if point is already inserted */ - for(int i = 0; i < idx_to_insert; ++i) - { - flag = flag && lists[owner][i] != element; - } + int idx_to_insert; - /* if this is the case insert it */ + #pragma omp atomic capture + idx_to_insert = counts[owner]++; - if(flag) - { - counts[owner]++; - lists[owner][idx_to_insert] = element; - } + lists[owner][idx_to_insert] = element; +} +int cmp_idx(const void* a, const void* b) +{ + idx_t aa = *((idx_t*)a); + idx_t bb = *((idx_t*)b); + return (aa > bb) - (aa < bb); } -void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp) +void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_info_t** foreign_dp) { int k = dp[0].ngbh.count; @@ -2092,12 +2081,14 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp) int* count_to_request = (int*)malloc(ctx -> world_size * sizeof(int)); int* capacities = (int*)malloc(ctx -> world_size * sizeof(int)); + for(int i = 0; i < ctx -> world_size; ++i) { - array_indexes_to_request[i] = (idx_t*)malloc(100 * sizeof(idx_t)); - capacities[i] = 100; + capacities[i] = 0; count_to_request[i] = 0; } + + /* count them */ #pragma omp parallel for for(uint32_t i = 0; i < ctx -> local_n_points; ++i) @@ -2111,27 +2102,272 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp) if(owner != ctx -> mpi_rank) { //DB_PRINT("Hehe"); - #pragma omp critical - { - push_on_foreign_idx_list(element, owner, count_to_request, capacities, array_indexes_to_request); - } + /* TODO: change this to a series of locks */ + #pragma omp atomic update + capacities[owner]++; } } } + + /* alloc */ + + for(int i = 0; i < ctx -> world_size; ++i) + { + array_indexes_to_request[i] = (idx_t*)malloc(capacities[i] * sizeof(idx_t)); + } + + /* append them */ + + #pragma omp parallel for + for(uint32_t i = 0; i < ctx -> local_n_points; ++i) + { + for(int j = 0; j < k; ++j) + { + idx_t element = dp[i].ngbh.data[j].array_idx; + int owner = foreign_owner(ctx, element); + //DB_PRINT("%lu %d\n", element, owner); + + if(owner != ctx -> mpi_rank) + { + //DB_PRINT("Hehe"); + /* TODO: change this to a series of locks */ + append_foreign_idx_list(element, owner, count_to_request, array_indexes_to_request); + } + } + } + + /* prune them */ + int* unique_count = (int*)malloc(ctx -> world_size * sizeof(int)); + + /* + if(I_AM_MASTER) + { + FILE* f = fopen("uniq","w"); + fwrite(array_indexes_to_request[1], sizeof(idx_t), capacities[1],f); + fclose(f); + } + */ + + #pragma omp paralell for + for(int i = 0; i < ctx -> world_size; ++i) + { + unique_count[i] = capacities[i] > 0; //init unique count + qsort(array_indexes_to_request[i], capacities[i], sizeof(idx_t), cmp_idx); + uint32_t prev = array_indexes_to_request[i][0]; + for(int el = 1; el < capacities[i]; ++el) + { + int flag = prev == array_indexes_to_request[i][el]; + if(!flag) + { + /* in place subsitution + * if a different element is found then + * copy in at the next free place + * */ + array_indexes_to_request[i][unique_count[i]] = array_indexes_to_request[i][el]; + unique_count[i]++; + } + prev = array_indexes_to_request[i][el]; + } + } + + for(int i = 0; i < ctx -> world_size; ++i) + { + array_indexes_to_request[i] = (idx_t*)realloc(array_indexes_to_request[i], unique_count[i] * sizeof(idx_t)); + } + + DB_PRINT("[RANK %d elements to request]", ctx -> mpi_rank); for(int i = 0; i < ctx -> world_size; ++i) { - DB_PRINT("%d\t", count_to_request[i]); + DB_PRINT("%d\t", unique_count[i]); } DB_PRINT("\n"); + /* + if(I_AM_MASTER) + { + FILE* f = fopen("uniq2","w"); + fwrite(array_indexes_to_request[1], sizeof(idx_t), unique_count[1],f); + fclose(f); + } + */ + + for(int i = 0; i < ctx -> world_size; ++i) + { + foreign_dp[i] = (datapoint_info_t*)calloc(sizeof(datapoint_info_t), unique_count[i]); + } + + /* alias for unique counts */ + int* n_heap_to_recv = unique_count; + int* n_heap_to_send = (int*)malloc(ctx -> world_size * sizeof(int)); + + /* exchange how many to recv how many to send */ + + MPI_Alltoall(n_heap_to_recv, 1, MPI_INT, n_heap_to_send, 1, MPI_INT , ctx -> mpi_communicator); + /* compute displacements and yada yada */ + int* sdispls = (int*)calloc(ctx -> world_size , sizeof(int)); + int* rdispls = (int*)calloc(ctx -> world_size , sizeof(int)); - for(int i = 0; i < ctx -> world_size; ++i) free(array_indexes_to_request[i]); - free(array_indexes_to_request); + int tot_count_send = n_heap_to_send[0]; + int tot_count_recv = n_heap_to_recv[0]; + for(int i = 1; i < ctx -> world_size; ++i) + { + sdispls[i] = sdispls[i - 1] + n_heap_to_send[i - 1]; + rdispls[i] = rdispls[i - 1] + n_heap_to_recv[i - 1]; + + tot_count_send += n_heap_to_send[i]; + tot_count_recv += n_heap_to_recv[i]; + } + idx_t* idx_buffer_to_send = (idx_t*)malloc(tot_count_send * sizeof(idx_t)); + idx_t* idx_buffer_to_recv = (idx_t*)malloc(tot_count_recv * sizeof(idx_t)); + + /* copy indexes on send buffer */ + for(int i = 0; i < ctx -> world_size; ++i) + { + memcpy(idx_buffer_to_recv + rdispls[i], array_indexes_to_request[i], n_heap_to_recv[i] * sizeof(idx_t)); + } + + MPI_Alltoallv(idx_buffer_to_recv, n_heap_to_recv, rdispls, MPI_UINT64_T, idx_buffer_to_send, n_heap_to_send, sdispls, MPI_UINT64_T, ctx -> mpi_communicator); + + + /* allocate foreign dp */ + heap_node* heap_buffer_to_send = (heap_node*)malloc(tot_count_send * k * sizeof(heap_node)); + heap_node* heap_buffer_to_recv = (heap_node*)malloc(tot_count_recv * k * sizeof(heap_node)); + + for(int i = 0; i < tot_count_send; ++i) + { + idx_t idx = idx_buffer_to_send[i] - ctx -> idx_start; + memcpy(heap_buffer_to_send + i * k, dp[idx].ngbh.data, k * sizeof(heap_node)); + } + /* exchange heaps */ + + + MPI_Barrier(ctx -> mpi_communicator); + int default_msg_len = MAX_MSG_SIZE / (k * sizeof(heap_node)); + + int* already_sent_points = (int*)malloc(ctx -> world_size * sizeof(int)); + int* already_rcvd_points = (int*)malloc(ctx -> world_size * sizeof(int)); + + /* allocate a request array to keep track of all requests going out*/ + MPI_Request* req_array; + int req_num = 0; + for(int i = 0; i < ctx -> world_size; ++i) + { + req_num += n_heap_to_send[i] > 0 ? n_heap_to_send[i]/default_msg_len + 1 : 0; + } + + req_array = (MPI_Request*)malloc(req_num * sizeof(MPI_Request)); + + for(int i = 0; i < ctx -> world_size; ++i) + { + already_sent_points[i] = 0; + already_rcvd_points[i] = 0; + } + + int req_idx = 0; + + MPI_Datatype MPI_my_heap; + MPI_Type_contiguous(k * sizeof(heap_node), MPI_CHAR, &MPI_my_heap); + MPI_Barrier(ctx -> mpi_communicator); + MPI_Type_commit(&MPI_my_heap); + + for(int i = 0; i < ctx -> world_size; ++i) + { + int count = 0; + if(n_heap_to_send[i] > 0) + { + while(already_sent_points[i] < n_heap_to_send[i]) + { + MPI_Request request; + count = MIN(default_msg_len, n_heap_to_send[i] - already_sent_points[i] ); + MPI_Isend( heap_buffer_to_send + k * (already_sent_points[i] + sdispls[i]), count, + MPI_my_heap, i, 0, ctx -> mpi_communicator, &request); + already_sent_points[i] += count; + req_array[req_idx] = request; + ++req_idx; + } + } + } + int flag; + MPI_Status status; + MPI_Barrier(ctx -> mpi_communicator); + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status); + //DB_PRINT("%d %p %p\n",ctx -> mpi_rank, &flag, &status); + while(flag) + { + MPI_Request request; + int count; + int source = status.MPI_SOURCE; + MPI_Get_count(&status, MPI_my_heap, &count); + /* recieve each slice */ + + MPI_Recv(heap_buffer_to_recv + k * (already_rcvd_points[source] + rdispls[source]), + count, MPI_my_heap, source, MPI_ANY_TAG, ctx -> mpi_communicator, &status); + + already_rcvd_points[source] += count; + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status); + + } + MPI_Barrier(ctx -> mpi_communicator); + + MPI_Type_free(&MPI_my_heap); + + MPI_Testall(req_num, req_array, &flag, MPI_STATUSES_IGNORE); + + if(flag == 0) + { + DB_PRINT("[!!!] Rank %d has unfinished communications\n", ctx -> mpi_rank); + exit(1); + } + free(req_array); + free(already_sent_points); + free(already_rcvd_points); + + + /* copy results where needed */ + for(int i = 0; i < ctx -> world_size; ++i) + { + for(int j = 0; j < n_heap_to_recv[i]; ++j) + { + foreign_dp[i][j].array_idx = array_indexes_to_request[i][j]; + init_heap(&(foreign_dp[i][j].ngbh)); + allocate_heap(&(foreign_dp[i][j].ngbh), k); + foreign_dp[i][j].ngbh.N = k; + foreign_dp[i][j].ngbh.count = k; + memcpy(foreign_dp[i][j].ngbh.data, heap_buffer_to_recv + k * (j + rdispls[i]), k * sizeof(heap_node)); + + if(foreign_dp[i][j].ngbh.data[0].array_idx != array_indexes_to_request[i][j]) + { + printf("Chahah %lu\n",array_indexes_to_request[i][j]); + } + } + } + + + /* put back indexes in the context */ + + ctx -> idx_halo_points_recv = array_indexes_to_request; + ctx -> n_halo_points_recv = n_heap_to_recv; + + ctx -> n_halo_points_send = n_heap_to_send; + ctx -> idx_halo_points_send = (idx_t**)malloc(ctx -> world_size * sizeof(idx_t*)); + for(int i = 0; i < ctx -> world_size; ++i) ctx -> idx_halo_points_send[i] = NULL; + + for(int i = 0; i < ctx -> world_size; ++i) + { + ctx -> idx_halo_points_send[i] = (idx_t*)malloc( n_heap_to_send[i] * sizeof(idx_t)); + memcpy(ctx -> idx_halo_points_send[i], idx_buffer_to_send + sdispls[i], n_heap_to_send[i] * sizeof(idx_t) ); + } + free(count_to_request); free(capacities); + + free(heap_buffer_to_recv); + free(heap_buffer_to_send); + free(idx_buffer_to_send); + free(idx_buffer_to_recv); } void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) @@ -2168,7 +2404,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) //ctx -> n_points = 48*5*2000; ctx->n_points = ctx->n_points / ctx->dims; - ctx->n_points = (ctx->n_points * 6) / 10; + ctx->n_points = (ctx->n_points * 0.1) / 10; // ctx -> n_points = ctx -> world_size * 1000; //ctx -> n_points = 10000000 * ctx -> world_size; @@ -2271,7 +2507,12 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) elapsed_time = TIME_STOP; LOG_WRITE("Total time for all knn search", elapsed_time) - //find_foreign_nodes(ctx, dp_info); + TIME_START; + + datapoint_info_t** foreign_dp_info = (datapoint_info_t**)malloc(ctx -> world_size * sizeof(datapoint_info_t*)); + find_foreign_nodes(ctx, dp_info, foreign_dp_info); + elapsed_time = TIME_STOP; + LOG_WRITE("Finding points to request the ngbh", elapsed_time) #if defined (WRITE_NGBH) @@ -2282,8 +2523,19 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) { free(dp_info[i].ngbh.data); } - + for(int i = 0; i < ctx -> world_size; ++i) + { + for(int j = 0; j < ctx -> n_halo_points_recv[i]; ++j) + { + free(foreign_dp_info[i][j].ngbh.data); + } + free(foreign_dp_info[i]); + } + + free(foreign_dp_info); + + top_tree_free(ctx, &tree); kdtree_v2_free(&local_tree); diff --git a/src/tree/tree.h b/src/tree/tree.h index 39086a2..a30f014 100644 --- a/src/tree/tree.h +++ b/src/tree/tree.h @@ -82,17 +82,6 @@ typedef struct top_kdtree_t -typedef struct datapoint_info_t { - idx_t array_idx; - heap ngbh; - float_t g; - float_t log_rho; - float_t log_rho_c; - float_t log_rho_err; - idx_t kstar; - int is_center; - int cluster_idx; -} datapoint_info_t; -- GitLab