diff --git a/src/adp/adp.c b/src/adp/adp.c index 80de11421f999e23abd26aa0fea0ca1fa4c518bc..72b2cf1b5d6f4885048d0ebf678992217592b0a5 100644 --- a/src/adp/adp.c +++ b/src/adp/adp.c @@ -52,7 +52,7 @@ float_t compute_ID_two_NN_ML(global_context_t* ctx, datapoint_info_t* dp_info, i clock_gettime(CLOCK_MONOTONIC, &start_tot); } - float_t log_mus = 0; + float_t log_mus = 0.; for(idx_t i = 0; i < n; ++i) { log_mus += 0.5 * log(dp_info[i].ngbh.data[2].value/dp_info[i].ngbh.data[1].value); @@ -244,7 +244,7 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy"); ordered_buffer_to_file(ctx, gs, sizeof(float_t), ctx -> local_n_points, "bb/g.npy"); - ordered_data_to_file(ctx); + ordered_data_to_file(ctx, "bb/ordered_data.npy"); free(den); free(ks); #endif @@ -437,7 +437,7 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy"); ordered_buffer_to_file(ctx, gs, sizeof(float_t), ctx -> local_n_points, "bb/g.npy"); - ordered_data_to_file(ctx); + ordered_data_to_file(ctx, "bb/ordered_data.npy"); free(den); free(ks); #endif @@ -822,7 +822,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) clock_gettime(CLOCK_MONOTONIC, &start); } - /* * optimized version * @@ -846,7 +845,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) - /* check into internal nodes */ /* * to remove @@ -934,7 +932,9 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) { case 1: { - lu_dynamic_array_pushBack(&removed_centers,i); + //lu_dynamic_array_pushBack(&removed_centers,i); + lu_dynamic_array_pushBack(&removed_centers,dp_info[i].array_idx); + //here it sets is_center to 0 dp_info[i].is_center = 0; for(idx_t c = 0; c < removed_centers.count - 1; ++c) { @@ -1011,6 +1011,8 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) { idx_t idx = actual_centers.data[i]; dp_info[idx].cluster_idx += center_displs[ctx -> mpi_rank]; + + //this tranlates them to global indexing actual_centers.data[i] += ctx -> idx_start; } @@ -1067,7 +1069,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) } } - if(cluster == -1 && !wait_for_comms) { float_t gmax = -99999.; @@ -1102,6 +1103,11 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) } + #ifdef PRINT_H1_CLUSTER_ASSIGN_COMPLETION + DB_PRINT("[RANK %d] proc points %d completed %d %lu\n", ctx -> mpi_rank, proc_points, completed, ctx -> local_n_points); + MPI_Barrier(ctx -> mpi_communicator); + #endif + MPI_Allreduce(MPI_IN_PLACE, &completed, 1, MPI_INT, MPI_SUM, ctx -> mpi_communicator); completed = completed == ctx -> world_size ? 1 : 0; @@ -1133,7 +1139,7 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].cluster_idx; ordered_buffer_to_file(ctx, ks, sizeof(int), ctx -> local_n_points, "bb/cl.npy"); - ordered_data_to_file(ctx); + ordered_data_to_file(ctx, "bb/ordered_data.npy"); free(ks); #endif @@ -2078,6 +2084,7 @@ void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo) for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].cluster_idx; ordered_buffer_to_file(ctx, cl, sizeof(int), ctx -> local_n_points, "bb/final_assignment.npy"); + ordered_data_to_file(ctx, "bb/ordered_data.npy"); free(cl); diff --git a/src/adp/adp.h b/src/adp/adp.h index e880754775b8950a5d815f0e259d63f9fc385e6e..605833b3da1f729c63d3181a5504d99efb5d0524 100644 --- a/src/adp/adp.h +++ b/src/adp/adp.h @@ -55,3 +55,4 @@ void clusters_allocate(clusters_t * c, int s); clusters_t Heuristic1(global_context_t *ctx, int verbose); void Heuristic2(global_context_t* ctx, clusters_t* cluster); void Heuristic3(global_context_t* ctx, clusters_t* cluster, float_t Z, int halo); +void clusters_free(clusters_t * c); diff --git a/src/common/common.c b/src/common/common.c index 1dcfc06b8e0dc2d82af3adb8160e209d9c1950d4..0ecc276d148f1bad907f1f8d876569d0b0fb9fef 100644 --- a/src/common/common.c +++ b/src/common/common.c @@ -253,3 +253,158 @@ void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, } MPI_Barrier(ctx -> mpi_communicator); } + +void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname) +{ + //MPI_Barrier(ctx -> mpi_communicator); + MPI_DB_PRINT("[MASTER] writing to file %s\n", fname); + void* tmp_data; + idx_t already_sent = 0; + idx_t* ppp; + idx_t* displs; + idx_t* already_recv; + + MPI_Barrier(ctx -> mpi_communicator); + + uint64_t tot_n = 0; + MPI_Reduce(&n, &tot_n, 1, MPI_UINT64_T , MPI_SUM, 0, ctx -> mpi_communicator); + + if(I_AM_MASTER) + { + tmp_data = (void*)MY_MALLOC(el_size * tot_n ); + ppp = (idx_t*)MY_MALLOC(ctx -> world_size * sizeof(idx_t)); + displs = (idx_t*)MY_MALLOC(ctx -> world_size * sizeof(idx_t)); + already_recv = (idx_t*)MY_MALLOC(ctx -> world_size * sizeof(idx_t)); + + } + + MPI_Gather(&n, 1, MPI_UINT64_T, ppp, 1, MPI_UINT64_T, 0, ctx -> mpi_communicator); + + if(I_AM_MASTER) + { + displs[0] = 0; + for(int i = 0; i < ctx -> world_size; ++i) ppp[i] = el_size * ppp[i]; + for(int i = 1; i < ctx -> world_size; ++i) displs[i] = displs[i - 1] + ppp[i - 1]; + + for(int i = 0; i < ctx -> world_size; ++i) already_recv[i] = 0; + + } + + + //Gather on master + // + + uint64_t default_msg_len = 10000000; //bytes + + if(I_AM_MASTER) + { + //recieve from itself + memcpy(tmp_data, buffer, n * el_size); + for(int r = 1; r < ctx -> world_size; ++r) + { + while(already_recv[r] < ppp[r]) + { + MPI_Status status; + MPI_Probe(r, MPI_ANY_TAG, ctx -> mpi_communicator, &status); + + MPI_Request request; + int count_recv; + int source = status.MPI_SOURCE; + MPI_Get_count(&status, MPI_BYTE, &count_recv); + + MPI_Recv(tmp_data + displs[r] + already_recv[r], ppp[r], MPI_BYTE, r, r, ctx -> mpi_communicator, MPI_STATUS_IGNORE); + already_recv[r] += count_recv; + } + } + } + else + { + while(already_sent < n * el_size) + { + int count_send = MIN(default_msg_len, n * el_size - already_sent); + MPI_Send(buffer + already_sent, count_send, MPI_BYTE, 0, ctx -> mpi_rank, ctx -> mpi_communicator); + already_sent += count_send; + } + } + + if(I_AM_MASTER) + { + FILE* file = fopen(fname,"w"); + if(!file) + { + printf("Cannot open file %s ! Aborting \n", fname); + } + fwrite(tmp_data, 1, el_size * tot_n, file); + fclose(file); + free(tmp_data); + free(ppp); + free(displs); + + } + MPI_Barrier(ctx -> mpi_communicator); +} + +void ordered_data_to_file(global_context_t* ctx, const char* fname) +{ + //MPI_Barrier(ctx -> mpi_communicator); + MPI_DB_PRINT("[MASTER] writing DATA to file\n"); + float_t* tmp_data; + int* ppp; + int* displs; + + MPI_Barrier(ctx -> mpi_communicator); + if(I_AM_MASTER) + { + tmp_data = (float_t*)MY_MALLOC(ctx -> dims * ctx -> n_points * sizeof(float_t)); + ppp = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + + } + + MPI_Gather(&(ctx -> local_n_points), 1, MPI_INT, ppp, 1, MPI_INT, 0, ctx -> mpi_communicator); + + if(I_AM_MASTER) + { + displs[0] = 0; + for(int i = 0; i < ctx -> world_size; ++i) ppp[i] = ctx -> dims * ppp[i]; + for(int i = 1; i < ctx -> world_size; ++i) displs[i] = displs[i - 1] + ppp[i - 1]; + + } + MPI_Gatherv(ctx -> local_data, ctx -> dims * ctx -> local_n_points, + MPI_MY_FLOAT, tmp_data, ppp, displs, MPI_MY_FLOAT, 0, ctx -> mpi_communicator); + + if(I_AM_MASTER) + { + FILE* file = fopen(fname,"w"); + if(file) + { + fwrite(tmp_data, sizeof(float_t), ctx -> dims * ctx -> n_points, file); + fclose(file); + } + else + { + printf("Cannot open file %s\n", fname); + } + free(tmp_data); + free(ppp); + free(displs); + } + MPI_Barrier(ctx -> mpi_communicator); +} + +void test_file_path(const char* fname) +{ + FILE* file = fopen(fname,"w"); + if(file) + { + fprintf(file, "This is only to test if I can open a file in the desidered path\n"); + fprintf(file, "Here willbe written the output of dadp\n"); + fclose(file); + } + else + { + printf("Cannot open file %s\n", fname); + exit(1); + } + +} diff --git a/src/common/common.h b/src/common/common.h index 3af3005dcf5603f94c67cdb2ab7a04a22a69448f..097ab97b583bcd7873ad1bcdd956508346399e9c 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -14,10 +14,11 @@ //#define WRITE_CLUSTER_ASSIGN_H1 //#define WRITE_BORDERS //#define WRITE_MERGING_TABLE -#define WRITE_FINAL_ASSIGNMENT +//#define WRITE_FINAL_ASSIGNMENT //#define PRINT_NGBH_EXCHANGE_SCHEME -#define PRINT_H2_COMM_SCHEME +//#define PRINT_H2_COMM_SCHEME +//#define PRINT_H1_CLUSTER_ASSIGN_COMPLETION typedef struct datapoint_info_t { idx_t array_idx; @@ -193,5 +194,7 @@ void lu_dynamic_array_reserve(lu_dynamic_array_t * a, idx_t n); void lu_dynamic_array_init(lu_dynamic_array_t * a); void print_error_code(int err); -void ordered_data_to_file(global_context_t* ctx); +void ordered_data_to_file(global_context_t* ctx, const char* fname); void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname); +void test_file_path(const char* fname); +void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname); diff --git a/src/main/main.c b/src/main/main.c index 26b611ddf194dde46d2c218571b397748b36281a..fa4f3f45fd9c3b2d603c30e20b7cddc6ea4dbd44 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -6,6 +6,15 @@ #include "../adp/adp.h" +#ifdef AMONRA + #pragma message "Hi, you are on amonra" + #define OUT_CLUSTER_ASSIGN "/beegfs/ftomba/phd/results/final_assignment.npy" + #define OUT_DATA "/beegfs/ftomba/phd/results/ordered_data.npy" +#else + #define OUT_CLUSTER_ASSIGN "/leonardo_scratch/large/userexternal/ftomba00/final_assignment.npy" + #define OUT_DATA "/leonardo_scratch/large/userexternal/ftomba00/ordered_data.npy" +#endif + // #ifdef THREAD_FUNNELED @@ -97,19 +106,33 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_DEF double elapsed_time; + if(I_AM_MASTER) + { + test_file_path(OUT_DATA); + test_file_path(OUT_CLUSTER_ASSIGN); + } + + if (ctx->mpi_rank == 0) { //data = read_data_file(ctx, "../norm_data/50_blobs_more_var.npy", MY_TRUE); //ctx->dims = 2; - //data = read_data_file(ctx, "../norm_data/50_blobs.npy", MY_TRUE); + //data = read_data_file(ctx, "../norm_data/blobs_small.npy", MY_FALSE); // std_g0163178_Me14_091_0000 + // 100M points + // 2D + // std_g2980844_091_0000 + //data = read_data_file(ctx,"../norm_data/huge_blobs.npy",MY_FALSE); + // 2B points + // data = read_data_file(ctx,"../norm_data/very_huge_blobs.npy",MY_FALSE); + // 190M points // std_g2980844_091_0000 - data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE); + // data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE); /* 1M points ca.*/ - // data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE); + data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE); /* BOX */ // data = read_data_file(ctx,"../norm_data/std_Box_256_30_092_0000",MY_TRUE); @@ -124,18 +147,15 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) // //34 M //data = read_data_file(ctx,"../norm_data/std_g1212639_091_0001",MY_TRUE); - ctx->dims = 5; - - //ctx -> n_points = 5 * 100000; + ctx -> dims = 5; + // ctx->dims = 2; ctx->n_points = ctx->n_points / ctx->dims; - //ctx->n_points = (ctx->n_points * 5) / 10; - // ctx -> n_points = ctx -> world_size * 1000; - //ctx -> n_points = 10000000 * ctx -> world_size; - //generate_random_matrix(&data, ctx -> dims, ctx -> n_points, ctx); - //mpi_printf(ctx, "Read %lu points in %u dims\n", ctx->n_points, ctx->dims); + //for weak scalability + // ctx->n_points = ctx->n_points / 2; + // ctx->n_points = (ctx->n_points / 32) * ctx -> world_size; + } - //MPI_DB_PRINT("[MASTER] Reading file and scattering\n"); /* communicate the total number of points*/ MPI_Bcast(&(ctx->dims), 1, MPI_UINT32_T, 0, ctx->mpi_communicator); @@ -165,6 +185,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) MPI_Scatterv(data, send_counts, displacements, MPI_MY_FLOAT, pvt_data, send_counts[ctx->mpi_rank], MPI_MY_FLOAT, 0, ctx->mpi_communicator); + if (I_AM_MASTER) free(data); + ctx->local_data = pvt_data; int k_local = 20; @@ -201,7 +223,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) //int k = 30; datapoint_info_t* dp_info = (datapoint_info_t*)MY_MALLOC(ctx -> local_n_points * sizeof(datapoint_info_t)); - /* initialize, to cope with valgrind */ for(uint64_t i = 0; i < ctx -> local_n_points; ++i) { dp_info[i].ngbh.data = NULL; @@ -216,6 +237,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) dp_info[i].is_center = -1; dp_info[i].cluster_idx = -1; } + ctx -> local_datapoints = dp_info; build_local_tree(ctx, &local_tree); elapsed_time = TIME_STOP; @@ -243,9 +265,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_START; - float_t z = 2; + float_t z = 3; - ctx -> local_datapoints = dp_info; //compute_density_kstarnn_rma(ctx, id, MY_FALSE); compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE); compute_correction(ctx, z); @@ -265,31 +286,33 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_START; - int halo = 0; + int halo = 1; Heuristic3(ctx, &clusters, z, halo); elapsed_time = TIME_STOP; LOG_WRITE("H3", elapsed_time) - - /* find density */ - #if defined (WRITE_NGBH) - ordered_data_to_file(ctx); - #endif - - /* - free(foreign_dp_info); - */ + /* write final assignment and data */ + + TIME_START; + int* cl = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int)); + for(int i = 0; i < ctx -> local_n_points; ++i) cl[i] = ctx -> local_datapoints[i].cluster_idx; + big_ordered_buffer_to_file(ctx, cl, sizeof(int), ctx -> local_n_points, OUT_CLUSTER_ASSIGN); + big_ordered_buffer_to_file(ctx, ctx -> local_data, sizeof(double), ctx -> local_n_points * ctx -> dims, OUT_DATA); + free(cl); + elapsed_time = TIME_STOP; + LOG_WRITE("Write results to file", elapsed_time); top_tree_free(ctx, &tree); kdtree_v2_free(&local_tree); + //clusters_free(&clusters); free(send_counts); free(displacements); //free(dp_info); - if (ctx->mpi_rank == 0) free(data); + original_ps.data = NULL; free_pointset(&original_ps); free(global_bin_counts_int); diff --git a/src/tree/tree.c b/src/tree/tree.c index 10d97f75d7712641f1ec5bff18703c048606ceef..bfa18d8fbbddb8790edee724d9506bc356ba612f 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -208,21 +208,6 @@ void compute_bounding_box(global_context_t *ctx) { MPI_Allreduce(MPI_IN_PLACE, lb, ctx->dims, MPI_MY_FLOAT, MPI_MIN, ctx->mpi_communicator); MPI_Allreduce(MPI_IN_PLACE, ub, ctx->dims, MPI_MY_FLOAT, MPI_MAX, ctx->mpi_communicator); - /* - DB_PRINT("[RANK %d]:", ctx -> mpi_rank); - for(size_t d = 0; d < ctx -> dims; ++d) - { - DB_PRINT("%lf ", ctx -> ub_box[d]); - } - DB_PRINT("\n"); - */ - - /* - MPI_DB_PRINT("[BOUNDING BOX]: "); - for(size_t d = 0; d < ctx -> dims; ++d) MPI_DB_PRINT("d%d:[%lf, %lf] ",(int)d, - lb[d], ub[d]); MPI_DB_PRINT("\n"); - */ - #undef local_data #undef lb #undef ub @@ -1803,7 +1788,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre max_n_recv = MAX(max_n_recv, (idx_t)ngbh_to_recv[i]); } - MPI_DB_PRINT("Using default message lenght %lu\n", default_msg_len); + MPI_DB_PRINT("Using default message length %lu\n", default_msg_len); heap_node* __heap_batches_to_rcv = (heap_node*)MY_MALLOC((uint64_t)k * (uint64_t)max_n_recv * sizeof(heap_node)); if( __heap_batches_to_rcv == NULL) @@ -1980,46 +1965,6 @@ void build_local_tree(global_context_t* ctx, kdtree_v2* local_tree) local_tree -> root = build_tree_kdtree_v2(local_tree -> _nodes, local_tree -> n_nodes, ctx -> dims); } -void ordered_data_to_file(global_context_t* ctx) -{ - //MPI_Barrier(ctx -> mpi_communicator); - MPI_DB_PRINT("[MASTER] writing DATA to file\n"); - float_t* tmp_data; - int* ppp; - int* displs; - - MPI_Barrier(ctx -> mpi_communicator); - if(I_AM_MASTER) - { - tmp_data = (float_t*)MY_MALLOC(ctx -> dims * ctx -> n_points * sizeof(float_t)); - ppp = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); - displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); - - } - - MPI_Gather(&(ctx -> local_n_points), 1, MPI_INT, ppp, 1, MPI_INT, 0, ctx -> mpi_communicator); - - if(I_AM_MASTER) - { - displs[0] = 0; - for(int i = 0; i < ctx -> world_size; ++i) ppp[i] = ctx -> dims * ppp[i]; - for(int i = 1; i < ctx -> world_size; ++i) displs[i] = displs[i - 1] + ppp[i - 1]; - - } - MPI_Gatherv(ctx -> local_data, ctx -> dims * ctx -> local_n_points, - MPI_MY_FLOAT, tmp_data, ppp, displs, MPI_MY_FLOAT, 0, ctx -> mpi_communicator); - - if(I_AM_MASTER) - { - FILE* file = fopen("bb/ordered_data.npy","w"); - fwrite(tmp_data, sizeof(float_t), ctx -> dims * ctx -> n_points, file); - fclose(file); - free(tmp_data); - free(ppp); - free(displs); - } - MPI_Barrier(ctx -> mpi_communicator); -} int foreign_owner(global_context_t* ctx, idx_t idx)