diff --git a/src/adp/adp.c b/src/adp/adp.c index 72b2cf1b5d6f4885048d0ebf678992217592b0a5..d58e6945d428be6c56dffd66cf67d15254df8f04 100644 --- a/src/adp/adp.c +++ b/src/adp/adp.c @@ -2058,7 +2058,10 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo) int cidx = dp_info[i].cluster_idx; int halo_flag = dp_info[i].log_rho_c < max_border_den_array[cidx]; //int halo_flag = max_border_den_array[cidx] > dp_info[i].log_rho_c ; - dp_info[i].cluster_idx = halo_flag ? -1 : cidx; + + //changed_here + //dp_info[i].cluster_idx = halo_flag ? -1 : cidx; + dp_info[i].halo_flag = halo_flag; } } free(max_border_den_array); diff --git a/src/common/common.c b/src/common/common.c index 8078818f543079021008b3a9d71aa0e3d523b0ec..bfff6d730553e6c915ed85639d0144ba00c0d681 100644 --- a/src/common/common.c +++ b/src/common/common.c @@ -5,6 +5,7 @@ #define ARRAY_INCREMENT 100 #define FREE_NOT_NULL(x) if(x){free(x); x = NULL;} +#define PATH_LEN 500 void get_context(global_context_t* ctx) { @@ -26,6 +27,107 @@ void get_context(global_context_t* ctx) ctx -> __local_heap_buffers = NULL; } +void get_dataset_diagnostics(global_context_t* ctx, float_t* data) +{ + //print mean and variance per column of the dataset + float_t* mean = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); + float_t* var = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); + + for(int i = 0; i < ctx -> dims; ++i) + { + mean[i] = 0.; + var[i] = 0.; + } + + int jmax = ctx -> dims - (ctx -> dims % 4); + #pragma omp parallel + { + float_t* pvt_mean = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); + for(int j = 0; j < ctx -> dims; ++j) pvt_mean[j] = 0.; + + #pragma omp for + for(int i = 0; i < ctx -> n_points; ++i) + { + int j = 0; + for(j = 0; j < jmax; j+=4) + { + pvt_mean[j ] += data[i * ctx -> dims + j ]; + pvt_mean[j + 1] += data[i * ctx -> dims + j + 1]; + pvt_mean[j + 2] += data[i * ctx -> dims + j + 2]; + pvt_mean[j + 3] += data[i * ctx -> dims + j + 3]; + } + + for(j = jmax; j < ctx -> dims; ++j) + { + pvt_mean[j] += data[i * ctx -> dims + j]; + } + } + + for(int j = 0; j < ctx -> dims; ++j) + { + #pragma omp atomic update + mean[j] += pvt_mean[j]; + } + + free(pvt_mean); + } + + for(int i = 0; i < ctx -> dims; ++i) + { + mean[i] = mean[i] / (float_t) ctx -> n_points; + } + + #pragma omp parallel + { + float_t* pvt_var = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); + for(int j = 0; j < ctx -> dims; ++j) pvt_var[j] = 0.; + + #pragma omp for + for(int i = 0; i < ctx -> n_points; ++i) + { + int j = 0; + for(j = 0; j < jmax; j+=4) + { + float_t v0 = mean[j ] - data[i * ctx -> dims + j ]; + float_t v1 = mean[j + 1] - data[i * ctx -> dims + j + 1]; + float_t v2 = mean[j + 2] - data[i * ctx -> dims + j + 2]; + float_t v3 = mean[j + 3] - data[i * ctx -> dims + j + 3]; + + pvt_var[j ] += v0 * v0; + pvt_var[j + 1] += v1 * v1; + pvt_var[j + 2] += v2 * v2; + pvt_var[j + 3] += v3 * v3; + } + + for(j = jmax; j < ctx -> dims; ++j) + { + float_t v = mean[j] - data[i * ctx -> dims + j]; + pvt_var[j] += v * v; + } + } + + for(int j = 0; j < ctx -> dims; ++j) + { + #pragma omp atomic update + var[j] += pvt_var[j]; + } + + free(pvt_var); + } + + for(int i = 0; i < ctx -> dims; ++i) + { + var[i] = var[i] / ((float_t) ctx -> n_points - 1); + } + + for(int j = 0; j < ctx -> dims; ++j) + { + printf("dim%2d mean %.2e std %.2e\n", j, mean[j], sqrt(var[j])); + } + free(mean); + free(var); +} + void print_error_code(int err) { switch (err) @@ -225,7 +327,7 @@ static inline int get_unit_measure(size_t bytes) float_t* read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims, const int file_in_float32) { - + printf("Reading %s\n",fname); FILE *f = fopen(fname, "r"); if (!f) { @@ -336,6 +438,24 @@ void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, MPI_Barrier(ctx -> mpi_communicator); } +void distributed_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname) +{ + char out_path_w_proc_name[PATH_LEN]; + MPI_DB_PRINT("[MASTER] writing to file %s.%s\n", fname, "[proc]"); + snprintf(out_path_w_proc_name, PATH_LEN, "%s.%d", fname, ctx -> mpi_rank); + FILE* file = fopen(out_path_w_proc_name,"w"); + if(!file) + { + printf("Cannot open file %s ! Aborting \n", fname); + } + else + { + fwrite(buffer, 1, el_size * n, file); + } + fclose(file); + +} + void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname) { //MPI_Barrier(ctx -> mpi_communicator); @@ -367,7 +487,6 @@ void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_s displs[0] = 0; for(int i = 0; i < ctx -> world_size; ++i) ppp[i] = el_size * ppp[i]; for(int i = 1; i < ctx -> world_size; ++i) displs[i] = displs[i - 1] + ppp[i - 1]; - for(int i = 0; i < ctx -> world_size; ++i) already_recv[i] = 0; } @@ -376,7 +495,7 @@ void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_s //Gather on master // - uint64_t default_msg_len = 100000; //bytes + uint64_t default_msg_len = 100000000; //bytes if(I_AM_MASTER) { @@ -396,7 +515,14 @@ void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_s MPI_Recv(tmp_data + displs[r] + already_recv[r], ppp[r], MPI_BYTE, r, r, ctx -> mpi_communicator, MPI_STATUS_IGNORE); already_recv[r] += count_recv; + + #ifdef PRINT_ORDERED_BUFFER + printf("[MASTER] recieved from %d %lu elements out of %lu\n", r, already_recv[r], ppp[r]); + #endif } + #ifdef PRINT_ORDERED_BUFFER + printf("-----------\n"); + #endif } } else @@ -480,7 +606,7 @@ void test_file_path(const char* fname) if(file) { fprintf(file, "This is only to test if I can open a file in the desidered path\n"); - fprintf(file, "Here willbe written the output of dadp\n"); + fprintf(file, "Here will be written the output of dadp\n"); fclose(file); } else @@ -490,3 +616,21 @@ void test_file_path(const char* fname) } } + +void test_distributed_file_path(global_context_t* ctx, const char* fname) +{ + char out_path_w_proc_name[PATH_LEN]; + snprintf(out_path_w_proc_name, PATH_LEN, "%s.%d", fname, ctx -> mpi_rank); + FILE* file = fopen(out_path_w_proc_name,"w"); + if(!file) + { + printf("Cannot open file %s ! Aborting \n", out_path_w_proc_name); + } + else + { + fprintf(file, "This is only to test if I can open a file in the desidered path\n"); + fprintf(file, "Here will be written the output of dadp\n"); + } + fclose(file); + +} diff --git a/src/common/common.h b/src/common/common.h index 6194e1fef84e03e879dc33c0f7c3db29df8cc23b..04da7290db6dd1f44a6d10830594245fd0ad68fa 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -19,6 +19,7 @@ //#define PRINT_NGBH_EXCHANGE_SCHEME //#define PRINT_H2_COMM_SCHEME //#define PRINT_H1_CLUSTER_ASSIGN_COMPLETION +//#define PRINT_ORDERED_BUFFER typedef struct datapoint_info_t { idx_t array_idx; @@ -30,6 +31,7 @@ typedef struct datapoint_info_t { idx_t kstar; int is_center; int cluster_idx; + int halo_flag; } datapoint_info_t; #define MAX(A,B) ((A) > (B) ? (A) : (B)) @@ -199,3 +201,7 @@ void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, void test_file_path(const char* fname); void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname); float_t* read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims, const int file_in_float32); +void get_dataset_diagnostics(global_context_t* ctx, float_t* data); + +void test_distributed_file_path(global_context_t* ctx, const char* fname); +void distributed_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname); diff --git a/src/main/main.c b/src/main/main.c index aa291a037c0c63e1b7e0458cf19cc54f2c2e5388..54757c62ae22fca808324f4deff8f60b748aa389 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -9,10 +9,12 @@ #ifdef AMONRA #pragma message "Hi, you are on amonra" #define OUT_CLUSTER_ASSIGN "/beegfs/ftomba/phd/results/final_assignment.npy" + #define OUT_HALO_FLAGS "/beegfs/ftomba/phd/results/halo_flags.npy" #define OUT_DATA "/beegfs/ftomba/phd/results/ordered_data.npy" #else - #define OUT_CLUSTER_ASSIGN "/leonardo_scratch/large/userexternal/ftomba00/final_assignment.npy" - #define OUT_DATA "/leonardo_scratch/large/userexternal/ftomba00/ordered_data.npy" + #define OUT_CLUSTER_ASSIGN "/leonardo_scratch/large/userexternal/ftomba00/out_dadp/final_assignment.npy" + #define OUT_HALO_FLAGS "/leonardo_scratch/large/userexternal/ftomba00/out_dadp/halo_flags.npy" + #define OUT_DATA "/leonardo_scratch/large/userexternal/ftomba00/out_dadp/ordered_data.npy" #endif // @@ -106,10 +108,22 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_DEF double elapsed_time; - if(I_AM_MASTER) + float_t z = 3; + int halo = MY_TRUE; + float_t tol = 0.002; + int k = 300; + + if(I_AM_MASTER && ctx -> world_size <= 6) { test_file_path(OUT_DATA); test_file_path(OUT_CLUSTER_ASSIGN); + if(halo) test_file_path(OUT_HALO_FLAGS); + } + else + { + test_distributed_file_path(ctx, OUT_DATA); + test_distributed_file_path(ctx, OUT_CLUSTER_ASSIGN); + if(halo) test_distributed_file_path(ctx, OUT_HALO_FLAGS); } @@ -129,13 +143,16 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) // 2B points // data = read_data_file(ctx,"../norm_data/very_huge_blobs.npy",MY_FALSE); // data = read_data_file(ctx,"../norm_data/hd_blobs.npy",5,MY_FALSE); + + //1B points + //data = read_data_file(ctx,"../norm_data/eds_box_acc_normalized",5,MY_FALSE); // 190M points // std_g2980844_091_0000 - data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",5,MY_TRUE); + // data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",5,MY_TRUE); /* 1M points ca.*/ - //data = read_data_file(ctx,"../norm_data/std_LR_091_0001",5,MY_TRUE); + data = read_data_file(ctx,"../norm_data/std_LR_091_0001",5,MY_TRUE); /* BOX */ // data = read_data_file(ctx,"../norm_data/std_Box_256_30_092_0000",MY_TRUE); @@ -155,6 +172,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) // ctx->n_points = ctx->n_points / 2; // ctx->n_points = (ctx->n_points / 32) * ctx -> world_size; + get_dataset_diagnostics(ctx, data); + } /* communicate the total number of points*/ @@ -239,7 +258,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) original_ps.lb_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); original_ps.ub_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); - float_t tol = 0.002; top_kdtree_t tree; TIME_START; @@ -256,7 +274,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_START; kdtree_v2 local_tree; kdtree_v2_init( &local_tree, ctx -> local_data, ctx -> local_n_points, (unsigned int)ctx -> dims); - int k = 300; datapoint_info_t* dp_info = (datapoint_info_t*)MY_MALLOC(ctx -> local_n_points * sizeof(datapoint_info_t)); for(uint64_t i = 0; i < ctx -> local_n_points; ++i) @@ -272,6 +289,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) dp_info[i].kstar = -1; dp_info[i].is_center = -1; dp_info[i].cluster_idx = -1; + dp_info[i].halo_flag = 0; } ctx -> local_datapoints = dp_info; @@ -301,7 +319,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_START; - float_t z = 3; compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE); compute_correction(ctx, z); @@ -321,19 +338,48 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_START; - int halo = 1; Heuristic3(ctx, &clusters, z, halo); elapsed_time = TIME_STOP; LOG_WRITE("H3", elapsed_time) - /* write final assignment and data */ TIME_START; int* cl = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int)); for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].cluster_idx; - big_ordered_buffer_to_file(ctx, cl, sizeof(int), ctx -> local_n_points, OUT_CLUSTER_ASSIGN); - big_ordered_buffer_to_file(ctx, ctx -> local_data, sizeof(double), ctx -> local_n_points * ctx -> dims, OUT_DATA); - free(cl); + + if(ctx -> world_size <= 6) + { + big_ordered_buffer_to_file(ctx, cl, sizeof(int), ctx -> local_n_points, OUT_CLUSTER_ASSIGN); + big_ordered_buffer_to_file(ctx, ctx -> local_data, sizeof(double), ctx -> local_n_points * ctx -> dims, OUT_DATA); + + if(halo) + { + int* halo_flags = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int)); + for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].halo_flag; + big_ordered_buffer_to_file(ctx, halo_flags, sizeof(int), ctx -> local_n_points, OUT_HALO_FLAGS); + free(halo_flags); + } + + free(cl); + + } + else + { + distributed_buffer_to_file(ctx, cl, sizeof(int), ctx -> local_n_points, OUT_CLUSTER_ASSIGN); + distributed_buffer_to_file(ctx, ctx -> local_data, sizeof(double), ctx -> local_n_points * ctx -> dims, OUT_DATA); + + if(halo) + { + int* halo_flags = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int)); + for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].halo_flag; + distributed_buffer_to_file(ctx, halo_flags, sizeof(int), ctx -> local_n_points, OUT_HALO_FLAGS); + free(halo_flags); + } + + free(cl); + + } + elapsed_time = TIME_STOP; LOG_WRITE("Write results to file", elapsed_time); @@ -345,8 +391,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) free(send_counts); free(displacements); //free(dp_info); - - original_ps.data = NULL; free_pointset(&original_ps); diff --git a/src/tree/tree.c b/src/tree/tree.c index 028ee87e932bfa7b8dc67a636797a350de1fb68f..676b9f216d387219ab092e6a3c160413fcb47039 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -447,6 +447,8 @@ void compute_pure_global_binning(global_context_t *ctx, pointset_t *ps, float_t bin_w = (ps-> ub_box[d] - ps->lb_box[d]) / (float_t)k_global; + /* THIS IS problematic when ps -> ub_box == ps -> lb_box */ + #pragma omp parallel for for (size_t i = 0; i < ps->n_points; ++i) { @@ -687,8 +689,8 @@ top_kdtree_node_t* top_tree_generate_node(global_context_t* ctx, top_kdtree_t* t ptr -> parent = NULL; ptr -> lb_node_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); ptr -> ub_node_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); - ptr -> owner = -1; - ptr -> split_dim = 0.; + ptr -> owner = -1; + ptr -> split_dim = 0; ++tree -> count; return ptr; @@ -810,20 +812,31 @@ void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree /* insert node */ /* - MPI_DB_PRINT("Handling partition: \n\tcurrent_node %p, \n\tdim %d, \n\tn_points %d, \n\tstart_proc %d, \n\tn_procs %d, \n\tparent %p\n", + MPI_DB_PRINT( "[RANK %d] Handling partition:\n"\ + "\tcurrent_node %p,\n"\ + "\tdim %d,\n"\ + "\tn_points %lu,\n"\ + "\tstart_proc %d,\n"\ + "\tn_procs %d\n"\ + "\tparent %p\n"\ + "\tbase_ptr %p\n"\ + "\tlr %d\n", + ctx -> mpi_rank, current_node, current_partition.d, current_partition.n_points, current_partition.start_proc, current_partition.n_procs, - current_partition.parent); + current_partition.parent, + current_partition.base_ptr, + current_partition.lr); */ switch (current_partition.lr) { case TOP_TREE_LCH: if(current_partition.parent) { - current_node -> parent = current_partition.parent; + current_node -> parent = current_partition.parent; current_node -> parent -> lch = current_node; /* compute the box */ /* @@ -1079,6 +1092,9 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree) /*exchange points */ + + /* this need to change */ + MPI_Alltoallv( ctx -> local_data, send_count, send_displs, MPI_MY_FLOAT, rcvbuffer, rcv_count, rcv_displs, MPI_MY_FLOAT, ctx -> mpi_communicator);