diff --git a/Makefile b/Makefile index 9c34660ebdf14daa9a557904972344b9d18bc740..ece6194590362c0c3ea03ea565b3d7053adb30a5 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ CC=mpicc -CFLAGS=-O3 -g -fopenmp +CFLAGS=-O3 -march=native -flto -funroll-loops -fopenmp LDFLAGS=-lm all: main diff --git a/src/adp/adp.c b/src/adp/adp.c index 1f1c229d2d3ae419544d667dec52edc03b364909..0e1666f505a1aa6495fb3c0de2e2d05963d26173 100644 --- a/src/adp/adp.c +++ b/src/adp/adp.c @@ -733,7 +733,7 @@ clusters_t Heuristic1(global_context_t *ctx) } } - /* + /** * optimized version * * Generate a mask that keeps track of the point has been eliminating the @@ -741,7 +741,7 @@ clusters_t Heuristic1(global_context_t *ctx) * ends, center, removed centers, and max_rho arrays are populated * * optimized v2 use a queue of center removal and then exchange them - */ + **/ heap_node* to_remove_mask = (heap_node*)MY_MALLOC(n*sizeof(heap_node)); for(idx_t p = 0; p < n; ++p) @@ -779,10 +779,118 @@ clusters_t Heuristic1(global_context_t *ctx) TIME_START; + +//#define EXP_CENTER_PRUNING +#if defined(EXP_CENTER_PRUNING) + int all_have_finished = 0; + int i_have_finished = 0; + + datapoint_info_t* tmp_datapoints = (datapoint_info_t*)MY_MALLOC(ctx -> local_n_points * sizeof(datapoint_info_t)); + + // use strategy similar to the one employed in density estimation + // vertical loop, first on k the on n + // request points + // then process the points + + + MPI_Win_fence(MPI_MODE_NOPUT, win_datapoints); + //MPI_Win_lock_all(0, win_datapoints); + MPI_DB_PRINT("Using vertical loop on centers pruning\n"); + int kmax = ctx -> k; + for(idx_t j = 1; j < kmax; ++j) + { + i_have_finished = 1; + + // request points + #pragma omp parallel for schedule(dynamic) + for(idx_t p = 0; p < n; ++p) + { + + datapoint_info_t i_point = *(dp_info_ptrs[p]); + if(j < i_point.kstar + 1) + { + #pragma omp atomic write + i_have_finished = 0; + + idx_t jidx = i_point.ngbh[j].array_idx; + int owner = foreign_owner(ctx, jidx); + /* find if datapoint is halo or not */ + idx_t jpos = jidx - ctx -> rank_idx_start[owner]; + if(owner == ctx -> mpi_rank) + { + tmp_datapoints[p] = ctx -> local_datapoints[jpos]; + } + else + { + MPI_Get(tmp_datapoints + p, sizeof(datapoint_info_t), MPI_BYTE, owner, + jpos * sizeof(datapoint_info_t), sizeof(datapoint_info_t), MPI_BYTE, win_datapoints); + } + } + + } + + //MPI_Win_flush_all(win_datapoints); + MPI_Win_fence(MPI_MODE_NOPUT ^ MPI_MODE_NOSTORE, win_datapoints); + // wait for all comms to happen + + #pragma omp parallel for schedule(dynamic) + for(idx_t p = 0; p < n; ++p) + { + datapoint_info_t i_point = *(dp_info_ptrs[p]); + if(j < i_point.kstar + 1) + { + datapoint_info_t j_point = tmp_datapoints[p]; + idx_t jidx = j_point.array_idx; + + if(j_point.is_center && i_point.g > j_point.g) + { + // set the lock at position i + // actually is the p-th point + int owner = foreign_owner(ctx, jidx); + //if local process it + if(owner == ctx -> mpi_rank) + { + //acquire the lock + idx_t jpos = jidx - ctx -> idx_start; + omp_set_lock(lock_array + jpos); + if(i_point.g > to_remove_mask[jpos].value) + { + to_remove_mask[jpos].array_idx = i_point.array_idx; + to_remove_mask[jpos].value = i_point.g; + } + omp_unset_lock(lock_array + jpos); + } + ////otherwise enqueue for sending + else + { + center_removal_t element = {.rank = owner, .source_id = i_point.array_idx, + .target_id = j_point.array_idx, .source_density = i_point.g}; + enqueue_center_removal(removal_queues + p, element); + } + } + + } + + } + + //MPI_Allreduce(&i_have_finished, &all_have_finished, 1, MPI_INT, MPI_LAND, ctx -> mpi_communicator); + //if(all_have_finished) break; + if(i_have_finished) break; + } + + //MPI_Win_unlock_all(win_datapoints); + MPI_Win_fence(0, win_datapoints); + free(tmp_datapoints); + +#else MPI_Win_fence(MPI_MODE_NOPUT, win_datapoints); + idx_t n_foreign_req = 0; + idx_t n_local_req = 0; + float_t elapsed_proc = 0.;; + #if !defined(THREAD_FUNNELED) - #pragma omp parallel for schedule(dynamic) + #pragma omp parallel for schedule(dynamic) reduction(+:elapsed_proc, n_local_req, n_foreign_req) #endif for(idx_t p = 0; p < n; ++p) { @@ -791,19 +899,31 @@ clusters_t Heuristic1(global_context_t *ctx) for(idx_t j = 1; j < i_point.kstar + 1; ++j) { idx_t jidx = i_point.ngbh[j].array_idx; - + struct timespec __start, __end; + clock_gettime(CLOCK_MONOTONIC,&__start); datapoint_info_t j_point = find_possibly_halo_datapoint_rma(ctx, jidx, win_datapoints); + clock_gettime(CLOCK_MONOTONIC,&__end); + elapsed_proc += (double)(__end.tv_sec - __start.tv_sec) + (__end.tv_nsec - __start.tv_nsec)/1e9; + + int owner = foreign_owner(ctx, jidx); + if(owner == ctx -> mpi_rank) + { + n_local_req++; + } + else + { + n_foreign_req++; + } if(j_point.is_center && i_point.g > j_point.g) { // set the lock at position i // actually is the p-th point - int owner = foreign_owner(ctx, jidx); - //if local process it - idx_t jpos = jidx - ctx -> idx_start; + // if local process it if(owner == ctx -> mpi_rank) { - //acquire the lock + // acquire the lock + idx_t jpos = jidx - ctx -> idx_start; omp_set_lock(lock_array + jpos); if(i_point.g > to_remove_mask[jpos].value) { @@ -812,7 +932,7 @@ clusters_t Heuristic1(global_context_t *ctx) } omp_unset_lock(lock_array + jpos); } - //otherwise enqueue for sending + // otherwise enqueue for sending else { center_removal_t element = {.rank = owner, .source_id = i_point.array_idx, @@ -822,9 +942,14 @@ clusters_t Heuristic1(global_context_t *ctx) } } } - MPI_Win_fence(MPI_MODE_NOPUT, win_datapoints); + MPI_Barrier(ctx -> mpi_communicator); + DB_PRINT("Rank %d: foreign requests points %lu out of %lu -> fraction %.2lf time %.2lfs\n", + ctx -> mpi_rank, n_foreign_req, n_local_req + n_foreign_req, (float)n_foreign_req/(float)n_local_req, elapsed_proc); + MPI_Barrier(ctx -> mpi_communicator); +#endif + //assemble arrays into a single buffer elapsed_time = TIME_STOP; diff --git a/src/common/common.c b/src/common/common.c index 71f57f58f39e30ee4148e09479826868a77b5c00..cccd653bf2cbf85e9a0dffd1af798294fa9ace4b 100644 --- a/src/common/common.c +++ b/src/common/common.c @@ -21,6 +21,8 @@ void get_context(global_context_t* ctx) ctx -> __local_heap_buffers = NULL; ctx -> input_data_in_float32 = -1; ctx -> dims = 0; + ctx -> k = 300; + ctx -> z = 3; } void get_dataset_diagnostics(global_context_t* ctx, float_t* data) diff --git a/src/common/common.h b/src/common/common.h index d9bdc7e799df5bcc60a1a8488d7a05002e168beb..a8322dc1295142b88e0f0ace432a867d3ab0de1d 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -149,6 +149,7 @@ typedef struct global_context_t int mpi_rank; //rank of the processor uint32_t dims; //number of dimensions of the dataset idx_t k; //number of neighbors + float_t z; //z parameter float_t* local_data; //pointer to the dataset stored into the node float_t* lb_box; //bounding box of the dataset float_t* ub_box; //bounding box of the dataset diff --git a/src/main/main.c b/src/main/main.c index 21cde332d3cffff181dc06847e0e91c5419a0eb7..4d91f77aa431ebf5f85654a7a791f9ca778f0449 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -44,6 +44,8 @@ struct option long_options[] = {"in-dims" , required_argument, 0, 'd'}, {"out-data", optional_argument, 0, 'o'}, {"out-assignment", optional_argument, 0, 'a'}, + {"kngbh", optional_argument, 0, 'k'}, + {"z", optional_argument, 0, 'z'}, {"help", optional_argument, 0, 'h'}, {0, 0, 0, 0} }; @@ -59,7 +61,9 @@ const char* help = "Distributed Advanced Density Peak\n"\ " mpi tasks and datapoints are ordered default is `out_data` \n"\ " -a --out-assignment (optional) output path for the cluster assignment output ranges [0 ... Nc - 1]\n"\ " for core points halo points have indices [-Nc ... -1] conversion\n"\ - " of idx for an halo point is cluster_idx = -halo_idx - 1, default is `out_assignment`\n"; + " of idx for an halo point is cluster_idx = -halo_idx - 1, default is `out_assignment`\n"\ + " -k --kngbh (optional) number of nearest neighbors to compute\n"\ + " -z --z (optional) number of nearest neighbors to compute\n"; void parse_args(global_context_t* ctx, int argc, char** argv) { @@ -70,7 +74,7 @@ void parse_args(global_context_t* ctx, int argc, char** argv) snprintf(ctx -> output_assignment_file, DEFAULT_STR_LEN, "%s", OUT_CLUSTER_ASSIGN); snprintf(ctx -> output_data_file, DEFAULT_STR_LEN, "%s", OUT_DATA); - while((opt = getopt_long(argc, argv, "i:t:d:o:a:h", long_options, NULL)) != -1) + while((opt = getopt_long(argc, argv, "i:t:d:o:a:k:z:h", long_options, NULL)) != -1) { switch(opt) { @@ -103,10 +107,17 @@ void parse_args(global_context_t* ctx, int argc, char** argv) case 'a': strncpy(ctx -> output_assignment_file, optarg, DEFAULT_STR_LEN); break; + case 'k': + ctx -> k = atoi(optarg); + break; + case 'z': + ctx -> z = atof(optarg); + break; case 'h': mpi_printf(ctx, "%s\n", help); MPI_Finalize(); exit(0); + default: mpi_printf(ctx, "%s\n", help); MPI_Finalize(); @@ -121,6 +132,30 @@ void parse_args(global_context_t* ctx, int argc, char** argv) if(err){MPI_Finalize(); exit(1);}; } +void print_hello(global_context_t* ctx) +{ + char * hello = "" +" _ _ \n" +" __| | __ _ __| |_ __ \n" +" / _` |/ _` |/ _` | '_ \\ \n" +"| (_| | (_| | (_| | |_) | \n" +" \\__,_|\\__,_|\\__,_| .__/ \n" +" |_| \n" +" Distributed Advanced Density Peak"; + + mpi_printf(ctx, "%s\n\n", hello); + mpi_printf(ctx, "Configs: \n"); + mpi_printf(ctx, "Data file -> %s\n", ctx -> input_data_file); + mpi_printf(ctx, "Input Type -> float%d\n", ctx -> input_data_in_float32 ? 32 : 64); + mpi_printf(ctx, "Dimensions -> %d\n", ctx -> dims); + mpi_printf(ctx, "Output data file -> %s\n", ctx -> output_data_file); + mpi_printf(ctx, "Output assignment file -> %s\n", ctx -> output_assignment_file); + mpi_printf(ctx, "k -> %lu\n", ctx -> k); + mpi_printf(ctx, "Z -> %lf\n", ctx -> z); + mpi_printf(ctx, "\nRUNNING!\n"); + +} + int main(int argc, char** argv) { #if defined (_OPENMP) int mpi_provided_thread_level; @@ -177,15 +212,19 @@ int main(int argc, char** argv) { */ int d = 5; + int k = 300; float_t* data; //parse command line parse_args(&ctx, argc, argv); + print_hello(&ctx); /* * Generate a random matrix of lenght of some kind */ + + if(ctx.mpi_rank == 0) { simulate_master_read_and_scatter(5, 1000000, &ctx); @@ -211,10 +250,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_DEF double elapsed_time; - float_t z = 3; int halo = MY_FALSE; float_t tol = 0.002; - int k = 300; if(I_AM_MASTER && ctx -> world_size <= 6) { @@ -356,7 +393,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) MPI_DB_PRINT("----- Performing ngbh search -----\n"); MPI_Barrier(ctx -> mpi_communicator); - mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, k); + mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, ctx -> k); MPI_Barrier(ctx -> mpi_communicator); elapsed_time = TIME_STOP; @@ -376,7 +413,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE); - compute_correction(ctx, z); + compute_correction(ctx, ctx -> z); elapsed_time = TIME_STOP; LOG_WRITE("Density estimate", elapsed_time) @@ -393,7 +430,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_START; - Heuristic3(ctx, &clusters, z, halo); + Heuristic3(ctx, &clusters, ctx -> z, halo); elapsed_time = TIME_STOP; LOG_WRITE("H3", elapsed_time)