diff --git a/.gitignore b/.gitignore index e583d9e2f88513133a62ee2f54b9d6f61bb49760..a8635fedcfff415fe3e6a59a549cef8fe0c17a1b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ main sync.sh leo_sync.sh +lumi_sync.sh bb **.ipynb* scalability_results diff --git a/src/adp/adp.c b/src/adp/adp.c index d1839b7e3c9236cf172b09f081a1fe4b7100b926..656b09e8e9cdb64c449029075fc81ff600f5e8c5 100644 --- a/src/adp/adp.c +++ b/src/adp/adp.c @@ -817,20 +817,6 @@ clusters_t Heuristic1(global_context_t *ctx) MPI_Win_create(to_remove_mask, n * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &win_to_remove_mask); MPI_Win_fence(0, win_to_remove_mask); - /* - * to remove - * and to reimplement it using rma - * for dp in datapoints: - * for nghb in neighbors: - * if ngbh its_mine - * no_problems, do it as always - * else: - * ngbh is an external node - * do rma things, - * in particular, acquire a lock on the window, read and write things - * then close - */ - #if defined(THREAD_FUNNELED) #else #pragma omp parallel for @@ -851,10 +837,8 @@ clusters_t Heuristic1(global_context_t *ctx) { /* * - * THIS PART CAN BE SUBSTITUTED - * we have something like (value, idx) pairs and if we have less than - * MAX_UINT32 particles we can manipulate value and idx to fit into a single - * UINT64 and then perform a single atomic max operation + * TODO: Implement it without this but using private locks + * use an array of locks, and compare and swap to actually gain control of the thing * * */ int owner = foreign_owner(ctx, jidx); @@ -1118,8 +1102,6 @@ clusters_t Heuristic1(global_context_t *ctx) void Heuristic2(global_context_t* ctx, clusters_t* cluster) { /* - * Port the current implementation to this using rma - * then perform the matrix reduction * * Each one computes its borders, then the borders are shared, and the matrix is * reduced to a single huge matrix of borders @@ -1659,63 +1641,59 @@ void master_finds_borders(global_context_t* ctx, clusters_t* cluster, float_t Z, #define src surviving_clusters[merging_table[m].source] #define trg surviving_clusters[merging_table[m].target] - //int re_check = ( (src != merging_table[m].source) || (trg != merging_table[m].target) ); - //if(re_check) - { - /* - * Enforce a that in case of symmetric merging condition the lowest idx cluster - * is merged into the higher idx cluster, only to preserve compatibility with - * original ADP implementation - * - * Process each element in the merging table - */ - idx_t new_src = (src < trg) ? src : trg; - idx_t new_trg = (src < trg) ? trg : src; - - /* - * pick who am I and retrieve all needed data from the - * border matrices - */ - - float_t dens1 = centers_dp[new_src].log_rho_c; - float_t dens1_err = centers_dp[new_src].log_rho_err; - float_t dens2 = centers_dp[new_trg].log_rho_c; - float_t dens2_err = centers_dp[new_trg].log_rho_err; - - //borders get - sparse_border_t b = sparse_border_get(cluster, new_src, new_trg); - float_t dens_border = b.density; - float_t dens_border_err = b.error; - - int i_have_to_merge = is_a_merging(dens1,dens1_err,dens2,dens2_err,dens_border,dens_border_err,Z); - switch (i_have_to_merge && src != trg) - { - case 1: - { - int side = merging_roles(dens1,dens1_err,dens2,dens2_err,dens_border,dens_border_err); - if(!side) - { - idx_t tmp; - tmp = new_src; - new_src = new_trg; - new_trg = tmp; - } + /* + * Enforce a that in case of symmetric merging condition the lowest idx cluster + * is merged into the higher idx cluster, only to preserve compatibility with + * original ADP implementation + * + * Process each element in the merging table + */ + idx_t new_src = (src < trg) ? src : trg; + idx_t new_trg = (src < trg) ? trg : src; - - /* - * Perform the actual meriging, - * first -> fix the borders, delete old ones and spawn new one in the correct position - * second -> update the surviving_clusters buffer - */ - fix_sparse_borders_A_into_B(new_src, new_trg, cluster); - merge_A_into_B(surviving_clusters, new_src, new_trg, nclus ); - } - break; - - default: - break; - } - } + /* + * pick who am I and retrieve all needed data from the + * border matrices + */ + + float_t dens1 = centers_dp[new_src].log_rho_c; + float_t dens1_err = centers_dp[new_src].log_rho_err; + float_t dens2 = centers_dp[new_trg].log_rho_c; + float_t dens2_err = centers_dp[new_trg].log_rho_err; + + //borders get + sparse_border_t b = sparse_border_get(cluster, new_src, new_trg); + float_t dens_border = b.density; + float_t dens_border_err = b.error; + + int i_have_to_merge = is_a_merging(dens1,dens1_err,dens2,dens2_err,dens_border,dens_border_err,Z); + switch (i_have_to_merge && src != trg) + { + case 1: + { + int side = merging_roles(dens1,dens1_err,dens2,dens2_err,dens_border,dens_border_err); + if(!side) + { + idx_t tmp; + tmp = new_src; + new_src = new_trg; + new_trg = tmp; + } + + + /* + * Perform the actual meriging, + * first -> fix the borders, delete old ones and spawn new one in the correct position + * second -> update the surviving_clusters buffer + */ + fix_sparse_borders_A_into_B(new_src, new_trg, cluster); + merge_A_into_B(surviving_clusters, new_src, new_trg, nclus ); + } + break; + + default: + break; + } #undef src #undef trg @@ -1814,14 +1792,6 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo) /* * Heurisitc 3, from paper of Errico, Facco, Laio & Rodriguez * ( https://doi.org/10.1016/j.ins.2021.01.010 ) - * Dense implementation, makes use of a dense matrix to store - * borders between clusters, so it is more performant when the number of clusters is low - * - * args: - * - clusters* cluster : cluster object storing border info and cluster centers - * - datapoint_info* dp_info : array of Datapoint structures - * - float_t Z : parameter to assess density peak significance - * - halo : flag to set if you want to compute also the halo points */ #define borders cluster->borders @@ -1902,6 +1872,8 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo) clusters_reset(cluster); } + /* broadcast the number of elements on those lists */ + MPI_Bcast(&(cluster -> centers.count), 1, MPI_UINT64_T, 0, ctx -> mpi_communicator); MPI_Bcast(&(cluster -> centers.size), 1, MPI_UINT64_T, 0, ctx -> mpi_communicator); @@ -1992,9 +1964,10 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo) { int cidx = dp_info[i].cluster_idx; int halo_flag = dp_info[i].log_rho_c < max_border_den_array[cidx]; - //int halo_flag = max_border_den_array[cidx] > dp_info[i].log_rho_c ; - //changed_here + + //doing this we can have both the assignment and if it is + //part of the halo, without the need for storing other info //halo points have cidx < 0 (old cidx = (c + 1) * -1 ) dp_info[i].cluster_idx = halo_flag ? (cidx * (-1)) - 1 : cidx; } @@ -2012,9 +1985,6 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo) free(old_to_new); /*free memory and put the correct arrays into place*/ - //free(ipos.data); - //free(jpos.data); - // #ifdef WRITE_FINAL_ASSIGNMENT int* cl = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int)); diff --git a/src/common/common.c b/src/common/common.c index bfff6d730553e6c915ed85639d0144ba00c0d681..e7b95923ea941490b752cb3fda71e948beea5077 100644 --- a/src/common/common.c +++ b/src/common/common.c @@ -17,13 +17,7 @@ void get_context(global_context_t* ctx) ctx -> ub_box = NULL; ctx -> rank_n_points = (int*)malloc(ctx -> world_size * sizeof(int)); ctx -> rank_idx_start = (int*)malloc(ctx -> world_size * sizeof(int)); - ctx -> idx_halo_points_recv = NULL; - ctx -> idx_halo_points_send = NULL; - ctx -> n_halo_points_recv = NULL; - ctx -> n_halo_points_send = NULL; - ctx -> halo_datapoints = NULL; ctx -> local_datapoints = NULL; - ctx -> __recv_heap_buffers = NULL; ctx -> __local_heap_buffers = NULL; } @@ -175,35 +169,6 @@ void free_context(global_context_t* ctx) //} FREE_NOT_NULL(ctx -> local_datapoints); - if(ctx -> halo_datapoints) - { - for(int i = 0; i < ctx -> world_size; ++i) - { - /* - for(int j = 0; j < ctx -> n_halo_points_recv[i]; ++j) - { - FREE_NOT_NULL(ctx -> halo_datapoints[i][j].ngbh.data); - } - */ - FREE_NOT_NULL(ctx -> halo_datapoints[i]); - } - } - FREE_NOT_NULL(ctx -> halo_datapoints); - FREE_NOT_NULL(ctx -> __recv_heap_buffers); - - if(ctx -> idx_halo_points_recv) - { - for(int i = 0; i < ctx -> world_size; ++i) FREE_NOT_NULL(ctx -> idx_halo_points_recv[i]); - } - FREE_NOT_NULL(ctx -> idx_halo_points_recv); - - if(ctx -> idx_halo_points_send) - { - for(int i = 0; i < ctx -> world_size; ++i) FREE_NOT_NULL(ctx -> idx_halo_points_send[i]); - } - FREE_NOT_NULL(ctx -> idx_halo_points_send); - FREE_NOT_NULL(ctx -> n_halo_points_recv); - FREE_NOT_NULL(ctx -> n_halo_points_send); FREE_NOT_NULL(ctx -> rank_n_points); FREE_NOT_NULL(ctx -> rank_idx_start); } @@ -460,8 +425,8 @@ void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_s { //MPI_Barrier(ctx -> mpi_communicator); MPI_DB_PRINT("[MASTER] writing to file %s\n", fname); - void* tmp_data; - idx_t already_sent = 0; + void* tmp_data; + idx_t already_sent = 0; idx_t* ppp; idx_t* displs; idx_t* already_recv; diff --git a/src/common/common.h b/src/common/common.h index 4130eef9bd363475401f692043c53b57c234a26f..b62bd6a3890c8a982a0a56a68c409746c6efecd9 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -21,17 +21,6 @@ //#define PRINT_H1_CLUSTER_ASSIGN_COMPLETION //#define PRINT_ORDERED_BUFFER -typedef struct datapoint_info_t { - heap ngbh; - int is_center; - int cluster_idx; - idx_t array_idx; - idx_t kstar; - float_t g; - float_t log_rho; - float_t log_rho_c; - float_t log_rho_err; -} datapoint_info_t; #define MAX(A,B) ((A) > (B) ? (A) : (B)) #define MIN(A,B) ((A) < (B) ? (A) : (B)) @@ -131,55 +120,69 @@ typedef struct datapoint_info_t { #define LOG_END #endif +typedef struct datapoint_info_t { + /* + * datapoint object + */ + heap ngbh; //heap object stores nearest neighbors of each point + int is_center; //flag signaling if a point is a cluster center + int cluster_idx; //cluster index + idx_t array_idx; //global index of the point in the dataset + idx_t kstar; //kstar value required for the density computation + float_t g; //density quantities, required by ADP the procedure + float_t log_rho; // + float_t log_rho_c; // + float_t log_rho_err; // +} datapoint_info_t; -struct global_context_t + +typedef struct global_context_t { - int world_size; - int mpi_rank; - int __processor_name_len; - idx_t k; - float_t* local_data; - float_t* lb_box; - float_t* ub_box; - int* n_halo_points_recv; - int* n_halo_points_send; - idx_t** idx_halo_points_recv; - idx_t** idx_halo_points_send; - size_t n_points; - size_t idx_start; - size_t local_n_points; - datapoint_info_t* local_datapoints; - datapoint_info_t** halo_datapoints; - heap_node* __recieved_heap_data; - uint32_t dims; - int* rank_idx_start; - int* rank_n_points; - char processor_mame[MPI_MAX_PROCESSOR_NAME]; - MPI_Comm mpi_communicator; - heap_node* __recv_heap_buffers; - heap_node* __local_heap_buffers; -}; - -struct pointset_t + /* + * context object to store info related to each + * MPI process + */ + char processor_mame[MPI_MAX_PROCESSOR_NAME]; //processor name + int __processor_name_len; //processor name len + int world_size; + int mpi_rank; //rank of the processor + uint32_t dims; //number of dimensions of the dataset + idx_t k; //number of neighbors + float_t* local_data; //pointer to the dataset stored into the node + float_t* lb_box; //bounding box of the dataset + float_t* ub_box; //bounding box of the dataset + size_t n_points; //total number of points + size_t idx_start; //starting index of the points on the processor + size_t local_n_points; //number of points stored in the current processor + datapoint_info_t* local_datapoints; //pointer to the datapoint properties + int* rank_idx_start; //starting index of datapoints in each processor + int* rank_n_points; //processor name + heap_node* __local_heap_buffers; //buffer that stores nearest neighbors + MPI_Comm mpi_communicator; //mpi communicator +} global_context_t; + +typedef struct pointset_t { + /* + * Helper object to handle top kdtree + * construction, it represents the dataset + * inside one node of the tree + */ size_t n_points; size_t __capacity; uint32_t dims; float_t* data; float_t* lb_box; float_t* ub_box; -}; +} pointset_t; -struct lu_dynamic_array_t { +typedef struct lu_dynamic_array_t { idx_t *data; idx_t size; idx_t count; -}; +} lu_dynamic_array_t; -typedef struct pointset_t pointset_t; -typedef struct global_context_t global_context_t; -typedef struct lu_dynamic_array_t lu_dynamic_array_t; void mpi_printf(global_context_t*, const char *fmt, ...); void get_context(global_context_t*); diff --git a/src/main/main.c b/src/main/main.c index 71836341577a2232003816593c2501201922ac7a..eb75a32cf0d9828e0dc8776162ebe588218fff9b 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -11,12 +11,28 @@ #define OUT_CLUSTER_ASSIGN "/beegfs/ftomba/phd/results/final_assignment.npy" #define OUT_HALO_FLAGS "/beegfs/ftomba/phd/results/halo_flags.npy" #define OUT_DATA "/beegfs/ftomba/phd/results/ordered_data.npy" -#else +#endif + +#ifdef LEONARDO #define OUT_CLUSTER_ASSIGN "/leonardo_scratch/large/userexternal/ftomba00/out_dadp/final_assignment.npy" #define OUT_HALO_FLAGS "/leonardo_scratch/large/userexternal/ftomba00/out_dadp/halo_flags.npy" #define OUT_DATA "/leonardo_scratch/large/userexternal/ftomba00/out_dadp/ordered_data.npy" #endif +#ifdef LUMI + #define OUT_CLUSTER_ASSIGN "~/scratch_dadp/out_dadp/final_assignment.npy" + #define OUT_HALO_FLAGS "~/scratch_dadp/out_dadp/halo_flags.npy" + #define OUT_DATA "~/scratch_dadp/out_dadp/ordered_data.npy" +#endif + +#ifndef OUT_CLUSTER_ASSIGN + #define OUT_CLUSTER_ASSIGN "final_assignment.npy" + #define OUT_HALO_FLAGS "halo_flags.npy" + #define OUT_DATA "ordered_data.npy" +#endif + + + // #ifdef THREAD_FUNNELED @@ -25,6 +41,7 @@ #define THREAD_LEVEL MPI_THREAD_MULTIPLE #endif + int main(int argc, char** argv) { #if defined (_OPENMP) int mpi_provided_thread_level; @@ -54,6 +71,7 @@ int main(int argc, char** argv) { MPI_Init(NULL, NULL); #endif + char processor_name[MPI_MAX_PROCESSOR_NAME]; int name_len; MPI_Get_processor_name(processor_name, &name_len); @@ -147,6 +165,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) //1B points // data = read_data_file(ctx,"../norm_data/eds_box_acc_normalized",5,MY_FALSE); + //data = read_data_file(ctx,"../norm_data/eds_box_6d",6,MY_FALSE); // 190M points // std_g2980844_091_0000 @@ -160,7 +179,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) /* 8M points */ - // data = read_data_file(ctx,"../norm_data/std_g0144846_Me14_091_0001",MY_TRUE); + // data = read_data_file(ctx,"../norm_data/std_g0144846_Me14_091_0001",5,MY_TRUE); //88M // data = read_data_file(ctx,"../norm_data/std_g5503149_091_0000",MY_TRUE); @@ -171,7 +190,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) //for weak scalability // ctx->n_points = ctx->n_points / 2; - // ctx->n_points = (ctx->n_points / 32) * ctx -> world_size; + //ctx->n_points = (ctx->n_points / 32) * ctx -> world_size; get_dataset_diagnostics(ctx, data); @@ -247,7 +266,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) ctx->local_data = pvt_data; - int k_local = 20; + int k_local = 20; int k_global = 20; uint64_t *global_bin_counts_int = (uint64_t *)MY_MALLOC(k_global * sizeof(uint64_t)); diff --git a/src/tree/kdtreeV2.c b/src/tree/kdtreeV2.c index 5f129ce77f060828d6da053dc655116da75b2ce0..b3b634a3db12cdbe0b43180b4161bf3ef203269a 100644 --- a/src/tree/kdtreeV2.c +++ b/src/tree/kdtreeV2.c @@ -325,8 +325,8 @@ void knn_sub_tree_search_kdtree_v2(FLOAT_TYPE* point, kdnode_v2* root, heap * H) FLOAT_TYPE max_d = H -> data[0].value; int c = max_d > (hp_distance * hp_distance); - //if(c || (H -> count) < (H -> N)) - if(c) + if(c || (H -> count) < (H -> N)) + //if(c) { switch (side) diff --git a/src/tree/tree.c b/src/tree/tree.c index 676b9f216d387219ab092e6a3c160413fcb47039..f66c6f2687c3e04d3e65d7e4fcff40e98114f404 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -51,7 +51,6 @@ int cmp_float_t(const void* a, const void* b) /* quickselect for an element along a dimension */ - void swap_data_element(float_t *a, float_t *b, int vec_len) { float_t tmp; for (int i = 0; i < vec_len; ++i) @@ -413,16 +412,6 @@ void global_binning_check(global_context_t *ctx, float_t *data, int d, int k) } int cc = 0; - /* - MPI_DB_PRINT("Actual bins: ["); - for(int bin_idx = 0; bin_idx < k; ++bin_idx) - { - MPI_DB_PRINT("%d ", counts[bin_idx]); - cc += counts[bin_idx]; - } - MPI_DB_PRINT("] tot %d\n",cc); - */ - free(counts); } } @@ -486,7 +475,7 @@ int partition_data_around_value(float_t *array, int vec_len, int compare_dim, // swap_data_element(array + (store_index)*vec_len , array + right*vec_len, // vec_len); - return store_index; // maybe, update, it works :) + return store_index; } guess_t refine_pure_binning(global_context_t *ctx, pointset_t *ps, @@ -1011,11 +1000,8 @@ int partition_data_around_key(int* key, float_t *val, int vec_len, int ref_key , store_index += 1; } } - /* Move pivot to its final place */ - // swap_data_element(array + (store_index)*vec_len , array + right*vec_len, - // vec_len); - return store_index; // maybe, update, it works :) + return store_index; } @@ -1063,9 +1049,6 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree) float_t* rcvbuffer = NULL; int tot_count = 0; - //[-8.33416939 -8.22858047] - - MPI_Barrier(ctx -> mpi_communicator); MPI_Alltoall(send_count, 1, MPI_INT, rcv_count, 1, MPI_INT, ctx -> mpi_communicator); @@ -1093,8 +1076,6 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree) /*exchange points */ - /* this need to change */ - MPI_Alltoallv( ctx -> local_data, send_count, send_displs, MPI_MY_FLOAT, rcvbuffer, rcv_count, rcv_displs, MPI_MY_FLOAT, ctx -> mpi_communicator); @@ -1475,7 +1456,7 @@ void print_diagnositcs(global_context_t* ctx, int k) sysinfo(&info); if(memory_use > 0.5 * (float_t)info.freeram / 1e9) - MPI_DB_PRINT("/!\\ Projected memory usage is more than half of the node memory, may go into troubles while communicating ngbh\n"); + MPI_DB_PRINT("/!\\ Projected memory usage is more than half of the node memory, may go into troubles while communicating ngbh\n"); MPI_DB_PRINT("\n"); MPI_Barrier(ctx -> mpi_communicator);