diff --git a/run_pleiadi b/run_pleiadi index 2e1a0e102fefde234d3dbb36bab7ac0400e1ad50..a186623965a90ba944c4c9e09df97ab0a3cc5f59 100644 --- a/run_pleiadi +++ b/run_pleiadi @@ -1,6 +1,6 @@ #!/bin/bash -#SBATCH --nodes=4 +#SBATCH --nodes=1 #SBATCH --ntasks-per-node=2 #SBATCH --cpus-per-task=18 #SBATCH --time=01:00:00 @@ -27,7 +27,7 @@ export OMP_PROC_BIND=close rm bb/* #time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:node:PE=${SLURM_CPUS_PER_TASK} main -time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK} main +time mpirun -n ${SLURM_NTASKS} --mca orte_base_help_aggregate 0 --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK} main #time mpirun -n ${SLURM_NTASKS} main #time python3 check.py diff --git a/src/common/common.c b/src/common/common.c index 0f8858ba6d5e11f209b2d60f0961a8821be0a0fd..2a437caffa5896e455f85dc169729fe6717a7e40 100644 --- a/src/common/common.c +++ b/src/common/common.c @@ -23,6 +23,38 @@ void get_context(global_context_t* ctx) ctx -> halo_datapoints = NULL; ctx -> local_datapoints = NULL; ctx -> __recv_heap_buffers = NULL; + ctx -> __local_heap_buffers = NULL; +} + +void print_error_code(int err) +{ + switch (err) + { + case MPI_SUCCESS: + DB_PRINT("MPI_SUCCESS\n"); + break; + case MPI_ERR_ARG: + DB_PRINT("MPI_ERR_ARG\n"); + break; + case MPI_ERR_COMM: + DB_PRINT("MPI_ERR_COMM\n"); + break; + case MPI_ERR_DISP: + DB_PRINT("MPI_ERR_DISP\n"); + break; + case MPI_ERR_INFO: + DB_PRINT("MPI_ERR_INFO\n"); + break; + case MPI_ERR_SIZE: + DB_PRINT("MPI_ERR_SIZE\n"); + break; + case MPI_ERR_OTHER: + DB_PRINT("MPI_ERR_OTHER\n"); + break; + default: + break; + + } } void free_context(global_context_t* ctx) @@ -31,11 +63,14 @@ void free_context(global_context_t* ctx) FREE_NOT_NULL(ctx -> local_data); FREE_NOT_NULL(ctx -> ub_box); FREE_NOT_NULL(ctx -> lb_box); + //FREE_NOT_NULL(ctx -> __local_heap_buffers); + if(ctx -> __local_heap_buffers) MPI_Free_mem(ctx -> __local_heap_buffers); - if(ctx -> local_datapoints) - { - for(int i = 0; i < ctx -> local_n_points; ++i) FREE_NOT_NULL(ctx -> local_datapoints[i].ngbh.data); - } + + //if(ctx -> local_datapoints) + //{ + // for(int i = 0; i < ctx -> local_n_points; ++i) FREE_NOT_NULL(ctx -> local_datapoints[i].ngbh.data); + //} FREE_NOT_NULL(ctx -> local_datapoints); if(ctx -> halo_datapoints) diff --git a/src/common/common.h b/src/common/common.h index 1acad4694e65561c60126088db5286b4b760d846..1af32d9019427906a54731f18a4454717bd701aa 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -10,14 +10,14 @@ typedef struct datapoint_info_t { idx_t array_idx; - heap ngbh; - float_t g; - float_t log_rho; - float_t log_rho_c; - float_t log_rho_err; - idx_t kstar; - int is_center; - int cluster_idx; + heap ngbh; + float_t g; + float_t log_rho; + float_t log_rho_c; + float_t log_rho_err; + idx_t kstar; + int is_center; + int cluster_idx; } datapoint_info_t; #define MAX(A,B) ((A) > (B) ? (A) : (B)) @@ -111,6 +111,7 @@ struct global_context_t int world_size; int mpi_rank; int __processor_name_len; + idx_t k; float_t* local_data; float_t* lb_box; float_t* ub_box; @@ -130,6 +131,7 @@ struct global_context_t char processor_mame[MPI_MAX_PROCESSOR_NAME]; MPI_Comm mpi_communicator; heap_node* __recv_heap_buffers; + heap_node* __local_heap_buffers; }; struct pointset_t @@ -165,5 +167,6 @@ void lu_dynamic_array_pushBack(lu_dynamic_array_t * a, idx_t p); void lu_dynamic_array_Reset(lu_dynamic_array_t * a); void lu_dynamic_array_reserve(lu_dynamic_array_t * a, idx_t n); void lu_dynamic_array_init(lu_dynamic_array_t * a); +void print_error_code(int err); diff --git a/src/tree/heap.h b/src/tree/heap.h index 3ecd6e184edebfec0ceab72d3f815f3da328bd4c..513170efc9b5ca225934dc444de99e409f09d809 100644 --- a/src/tree/heap.h +++ b/src/tree/heap.h @@ -8,6 +8,8 @@ #define T double #define DATA_DIMS 0 +#define HERE printf("%d reached line %d\n", ctx -> mpi_rank, __LINE__); + #ifdef USE_FLOAT32 #define FLOAT_TYPE float #else diff --git a/src/tree/tree.c b/src/tree/tree.c index 2dcb8eaa1096e72fce95cc25be5e4a6e726b455f..6b0f30549652d2d63cde1ca20f2ae0b2fa2e0c78 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -41,7 +41,6 @@ #define MPI_MY_FLOAT MPI_DOUBLE #endif -#define HERE printf("%d reached line %d\n", ctx -> mpi_rank, __LINE__); #define I_AM_MASTER ctx->mpi_rank == 0 @@ -1547,18 +1546,28 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre /* local search */ /* print diagnostics */ print_diagnositcs(ctx, k); + ctx -> k = (idx_t)k; TIME_DEF; double elapsed_time; TIME_START; MPI_Barrier(ctx -> mpi_communicator); + //ctx -> __local_heap_buffers = (heap_node*)malloc(ctx -> local_n_points * k * sizeof(heap_node)); + MPI_Alloc_mem(ctx -> local_n_points * k * sizeof(heap_node), MPI_INFO_NULL, &(ctx -> __local_heap_buffers)); + #pragma omp parallel for for(int p = 0; p < ctx -> local_n_points; ++p) { idx_t idx = local_tree -> _nodes[p].array_idx; /* actually we want to preserve the heap to then insert guesses from other nodes */ - dp_info[idx].ngbh = knn_kdtree_v2_no_heapsort(local_tree -> _nodes[p].data, local_tree -> root, k); + dp_info[idx].ngbh.data = ctx -> __local_heap_buffers + k * idx; + dp_info[idx].ngbh.count = 0; + dp_info[idx].ngbh.N = k; + + //dp_info[idx].ngbh = knn_kdtree_v2_no_heapsort(local_tree -> _nodes[p].data, local_tree -> root, k); + knn_sub_tree_search_kdtree_v2(local_tree -> _nodes[p].data, local_tree -> root, &(dp_info[idx].ngbh)); + convert_heap_idx_to_global(ctx, &(dp_info[idx].ngbh)); dp_info[idx].cluster_idx = -1; dp_info[idx].is_center = 0; @@ -2741,6 +2750,369 @@ void compute_density_kstarnn(global_context_t* ctx, const float_t d, int verbose return; +} + +float_t get_j_ksel_dist(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win* exposed_ngbh) +{ + int owner = foreign_owner(ctx, j); + idx_t k = ctx -> k; + /* find if datapoint is halo or not */ + if(owner == ctx -> mpi_rank) + { + idx_t pos = j - ctx -> idx_start; + return ctx -> local_datapoints[pos].ngbh.data[ksel].value; + } + else + { + //RMA + heap_node el; + idx_t pos = j - ctx -> rank_idx_start[owner]; + MPI_Status status; + MPI_Request request; + MPI_Rget(&el, sizeof(heap_node), MPI_CHAR, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_CHAR, *exposed_ngbh, &request); + MPI_Wait(&request,&status); + return el.value; + } +} + +void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int verbose){ + + /* + * Point density computation: + * args: + * - paricles: array of structs + * - d : intrinsic dimension of the dataset + * - points : number of points in the dataset + */ + + + MPI_Win exposed_ngbh; + MPI_Info info; + + + MPI_Barrier(ctx -> mpi_communicator); + idx_t k = ctx -> local_datapoints[0].ngbh.N; + MPI_DB_PRINT("%lu %p\n", k, ctx -> __local_heap_buffers); + + //int* test_buffer = (int*)malloc(k * sizeof(int)); + //for(int i = 0; i < k; ++i) test_buffer[i] = (ctx -> mpi_rank + 1) * 1024; + + MPI_Win_create( ctx -> __local_heap_buffers, + ctx -> local_n_points * k * sizeof(heap_node), + 1, MPI_INFO_NULL, + ctx -> mpi_communicator, + &exposed_ngbh); + + MPI_Win_fence(0, exposed_ngbh); + + struct timespec start_tot, finish_tot; + double elapsed_tot; + + datapoint_info_t* local_datapoints = ctx -> local_datapoints; + + //DB_PRINT("rank %d pos %lu own %d %lu %lu %lu\n", ctx -> mpi_rank, pos, owner, k, ksel, (pos * k + ksel) * sizeof(heap_node)); + if(verbose) + { + printf("Density and k* estimation:\n"); + clock_gettime(CLOCK_MONOTONIC, &start_tot); + } + + idx_t kMAX = ctx -> local_datapoints[0].ngbh.N - 1; + + float_t omega = 0.; + if(sizeof(float_t) == sizeof(float)){ omega = powf(PI_F,d/2)/tgammaf(d/2.0f + 1.0f);} + else{omega = pow(M_PI,d/2.)/tgamma(d/2.0 + 1.0);} + + #pragma omp parallel for + for(idx_t i = 0; i < ctx -> local_n_points; ++i) + { + + idx_t j = 4; + idx_t k; + float_t dL = 0.; + float_t vvi = 0.; + float_t vvj = 0.; + float_t vp = 0.; + while(j < kMAX && dL < DTHR) + { + idx_t ksel = j - 1; + vvi = omega * pow(local_datapoints[i].ngbh.data[ksel].value,d/2.); + + idx_t jj = local_datapoints[i].ngbh.data[j].array_idx; + + /* + * note jj can be an halo point + * need to search maybe for it in foreign nodes + * */ + float_t dist_jj = get_j_ksel_dist(ctx, jj, ksel, &exposed_ngbh); + vvj = omega * pow(dist_jj,d/2.); + + vp = (vvi + vvj)*(vvi + vvj); + dL = -2.0 * ksel * log(4.*vvi*vvj/vp); + j = j + 1; + } + if(j == kMAX) + { + k = j - 1; + vvi = omega * pow(ctx -> local_datapoints[i].ngbh.data[k].value,d/2.); + } + else + { + k = j - 2; + } + local_datapoints[i].kstar = k; + local_datapoints[i].log_rho = log((float_t)(k)/vvi/((float_t)(ctx -> n_points))); + //dp_info[i].log_rho = log((float_t)(k)) - log(vvi) -log((float_t)(points)); + local_datapoints[i].log_rho_err = 1.0/sqrt((float_t)k); //(float_t)(-Q_rsqrt((float)k)); + local_datapoints[i].g = local_datapoints[i].log_rho - local_datapoints[i].log_rho_err; + } + + if(verbose) + { + clock_gettime(CLOCK_MONOTONIC, &finish_tot); + elapsed_tot = (finish_tot.tv_sec - start_tot.tv_sec); + elapsed_tot += (finish_tot.tv_nsec - start_tot.tv_nsec) / 1000000000.0; + printf("\tTotal time: %.3lfs\n\n", elapsed_tot); + } + + MPI_Win_fence(0, exposed_ngbh); + MPI_Barrier(ctx -> mpi_communicator); + MPI_Win_free(&exposed_ngbh); + + #if defined(WRITE_DENSITY) + /* densities */ + float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); + idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t)); + for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho; + for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar; + + ordered_buffer_to_file(ctx, den, sizeof(float_t), ctx -> local_n_points, "bb/ordered_density.npy"); + ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy"); + ordered_data_to_file(ctx); + free(den); + free(ks); + #endif + return; + + +} + +float_t get_j_ksel_dist_v2(global_context_t* ctx, idx_t i, idx_t j, idx_t ksel, int* flags, heap_node* tmp_heap_nodes, MPI_Win* exposed_ngbh) +{ + if(flags[i]) + { + int owner = foreign_owner(ctx, j); + idx_t k = ctx -> k; + /* find if datapoint is halo or not */ + if(owner == ctx -> mpi_rank) + { + idx_t pos = j - ctx -> idx_start; + return ctx -> local_datapoints[pos].ngbh.data[ksel].value; + } + else + { + //RMA + flags[i] = 0; + idx_t pos = j - ctx -> rank_idx_start[owner]; + MPI_Get(tmp_heap_nodes + i, sizeof(heap_node), MPI_CHAR, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_CHAR, *exposed_ngbh); + //if(ctx -> mpi_rank == 0) DB_PRINT("rvcd %lu %lf\n", el.array_idx, el.value); + return 0; + } + } + else + { + flags[i] = 1; + return tmp_heap_nodes[i].value; + } +} + +void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int verbose){ + + /* + * Point density computation: + * args: + * - paricles: array of structs + * - d : intrinsic dimension of the dataset + * - points : number of points in the dataset + */ + + + MPI_Win exposed_ngbh; + MPI_Info info; + + + MPI_Barrier(ctx -> mpi_communicator); + idx_t k = ctx -> local_datapoints[0].ngbh.N; + MPI_DB_PRINT("%lu %p\n", k, ctx -> __local_heap_buffers); + + + MPI_Win_create( ctx -> __local_heap_buffers, + ctx -> local_n_points * k * sizeof(heap_node), + 1, MPI_INFO_NULL, + ctx -> mpi_communicator, + &exposed_ngbh); + + MPI_Win_fence(0, exposed_ngbh); + + struct timespec start_tot, finish_tot; + double elapsed_tot; + + datapoint_info_t* local_datapoints = ctx -> local_datapoints; + + if(verbose) + { + printf("Density and k* estimation:\n"); + clock_gettime(CLOCK_MONOTONIC, &start_tot); + } + + idx_t kMAX = ctx -> local_datapoints[0].ngbh.N - 1; + + float_t omega = 0.; + if(sizeof(float_t) == sizeof(float)){ omega = powf(PI_F,d/2)/tgammaf(d/2.0f + 1.0f);} + else{omega = pow(M_PI,d/2.)/tgamma(d/2.0 + 1.0);} + + + /* + * Iterative, wait after each pass for communications to finish + * + * */ + + int finished = 0; + int fin_num = 0; + + int* flags = (int*)malloc(ctx -> local_n_points * sizeof(int)); + int* last_j = (int*)malloc(ctx -> local_n_points * sizeof(int)); + int* completed = (int*)malloc(ctx -> local_n_points * sizeof(int)); + heap_node* tmp_heap_nodes = (heap_node*)malloc(ctx -> local_n_points * sizeof(heap_node)); + + for(int i = 0; i < ctx -> local_n_points; ++i) + { + flags[i] = 1; + last_j[i] = 4; + completed[i] = 0; + } + + + while(!finished) + { + finished = 1; + #pragma omp parallel for + for(idx_t i = 0; i < ctx -> local_n_points; ++i) + { + if(!completed[i]) + { + + idx_t j = last_j[i]; + idx_t k; + float_t dL = 0.; + float_t vvi = 0.; + float_t vvj = 0.; + float_t vp = 0.; + int dl_DTHR_flag = 1; + while(j < kMAX && dl_DTHR_flag) + { + idx_t ksel = j - 1; + vvi = omega * pow(local_datapoints[i].ngbh.data[ksel].value,d/2.); + + idx_t jj = local_datapoints[i].ngbh.data[j].array_idx; + + /* + * note jj can be an halo point + * need to search maybe for it in foreign nodes + * */ + float_t dist_jj = get_j_ksel_dist_v2(ctx, i, jj, ksel, flags, tmp_heap_nodes, &exposed_ngbh); + if(flags[i]) + { + //if the ngbh node is available compute it else + //you need to wait for comms + vvj = omega * pow(dist_jj,d/2.); + + vp = (vvi + vvj)*(vvi + vvj); + dL = -2.0 * ksel * log(4.*vvi*vvj/vp); + + dl_DTHR_flag = dL < DTHR; + j = j + 1; + last_j[i] = j; + } + else + { + break; + } + } + if(j == kMAX || !dl_DTHR_flag) + { + + if(j == kMAX) + { + k = j - 1; + vvi = omega * pow(ctx -> local_datapoints[i].ngbh.data[k].value,d/2.); + } + else + { + k = j - 2; + } + local_datapoints[i].kstar = k; + local_datapoints[i].log_rho = log((float_t)(k)/vvi/((float_t)(ctx -> n_points))); + //dp_info[i].log_rho = log((float_t)(k)) - log(vvi) -log((float_t)(points)); + local_datapoints[i].log_rho_err = 1.0/sqrt((float_t)k); //(float_t)(-Q_rsqrt((float)k)); + local_datapoints[i].g = local_datapoints[i].log_rho - local_datapoints[i].log_rho_err; + + completed[i] = 1; + #pragma omp atomic update + finished = finished & 1; + + #pragma omp atomic update + fin_num++; + } + else + { + completed[i] = 0; + #pragma omp atomic update + finished = finished & 0; + } + } + + } + + //DB_PRINT("Rank %d fin %d/%lu\n", ctx -> mpi_rank, fin_num, ctx -> local_n_points); + //call the fence to get out all results + MPI_Win_fence(0, exposed_ngbh); + MPI_Allreduce(MPI_IN_PLACE, &finished, 1, MPI_INT, MPI_LAND, ctx -> mpi_communicator); + } + + if(verbose) + { + clock_gettime(CLOCK_MONOTONIC, &finish_tot); + elapsed_tot = (finish_tot.tv_sec - start_tot.tv_sec); + elapsed_tot += (finish_tot.tv_nsec - start_tot.tv_nsec) / 1000000000.0; + printf("\tTotal time: %.3lfs\n\n", elapsed_tot); + } + + free(flags); + free(tmp_heap_nodes); + free(completed); + free(last_j); + + MPI_Win_fence(0, exposed_ngbh); + MPI_Barrier(ctx -> mpi_communicator); + MPI_Win_free(&exposed_ngbh); + + #if defined(WRITE_DENSITY) + /* densities */ + float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); + idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t)); + for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho; + for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar; + + ordered_buffer_to_file(ctx, den, sizeof(float_t), ctx -> local_n_points, "bb/ordered_density.npy"); + ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy"); + ordered_data_to_file(ctx); + free(den); + free(ks); + #endif + return; + + } void clusters_allocate(clusters_t * c, int s) @@ -3612,10 +3984,10 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) // 190M points // std_g2980844_091_0000 - data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE); + // data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE); /* 1M points ca.*/ - //data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE); + data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE); /* BOX */ // data = read_data_file(ctx,"../norm_data/std_Box_256_30_092_0000",MY_TRUE); @@ -3632,7 +4004,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) // data = read_data_file(ctx,"../norm_data/std_g1212639_091_0001",MY_TRUE); ctx->dims = 5; - //ctx -> n_points = 48*5*2000; + //ctx -> n_points = 5*2000; ctx->n_points = ctx->n_points / ctx->dims; //ctx->n_points = (ctx->n_points * 5) / 10; // ctx -> n_points = ctx -> world_size * 1000; @@ -3740,12 +4112,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) LOG_WRITE("Total time for all knn search", elapsed_time) - TIME_START; - - datapoint_info_t** foreign_dp_info = (datapoint_info_t**)malloc(ctx -> world_size * sizeof(datapoint_info_t*)); - find_foreign_nodes(ctx, dp_info, foreign_dp_info); - elapsed_time = TIME_STOP; - LOG_WRITE("Finding points to request the ngbh", elapsed_time) TIME_START; //float_t id = id_estimate(ctx, dp_info, ctx -> local_n_points, 0.9, MY_FALSE); @@ -3758,16 +4124,27 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) MPI_DB_PRINT("ID %lf \n",id); + + //TIME_START; + //datapoint_info_t** foreign_dp_info = (datapoint_info_t**)malloc(ctx -> world_size * sizeof(datapoint_info_t*)); + //find_foreign_nodes(ctx, dp_info, foreign_dp_info); + //elapsed_time = TIME_STOP; + //LOG_WRITE("Finding points to request the ngbh", elapsed_time) + TIME_START; - compute_density_kstarnn(ctx, id, MY_FALSE); + + ctx -> local_datapoints = dp_info; + //compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE); + compute_density_kstarnn_rma(ctx, id, MY_FALSE); + //compute_density_kstarnn(ctx, id, MY_FALSE); compute_correction(ctx, 2); elapsed_time = TIME_STOP; LOG_WRITE("Density estimate", elapsed_time) - TIME_START; - Heuristic1(ctx, MY_FALSE); - elapsed_time = TIME_STOP; - LOG_WRITE("H1", elapsed_time) + //TIME_START; + //Heuristic1(ctx, MY_FALSE); + //elapsed_time = TIME_STOP; + //LOG_WRITE("H1", elapsed_time) /* find density */ @@ -3780,20 +4157,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) #endif /* - for(int i = 0; i < ctx -> local_n_points; ++i) - { - free(dp_info[i].ngbh.data); - } - - for(int i = 0; i < ctx -> world_size; ++i) - { - for(int j = 0; j < ctx -> n_halo_points_recv[i]; ++j) - { - free(foreign_dp_info[i][j].ngbh.data); - } - free(foreign_dp_info[i]); - } - free(foreign_dp_info); */ @@ -3810,5 +4173,4 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) original_ps.data = NULL; free_pointset(&original_ps); free(global_bin_counts_int); - }