diff --git a/.gitignore b/.gitignore index 731d51d7abb05bbadf63ae11e68902757675116e..5ec99ea20e0f859eadccedb2afa54de1021614b4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ main sync.sh +leo_sync.sh bb *.ipynb scalability_results diff --git a/run_pleiadi b/run_pleiadi index a186623965a90ba944c4c9e09df97ab0a3cc5f59..34cb27eaa5e6cf9c70f93f46435c73d0aa0a359b 100644 --- a/run_pleiadi +++ b/run_pleiadi @@ -1,6 +1,6 @@ #!/bin/bash -#SBATCH --nodes=1 +#SBATCH --nodes=2 #SBATCH --ntasks-per-node=2 #SBATCH --cpus-per-task=18 #SBATCH --time=01:00:00 @@ -11,8 +11,6 @@ #SBATCH --error=err_pleiadi #SBATCH --mem=230G - - cd $SLURM_SUBMIT_DIR module restore dev_pleiadi source /u/ftomba/my_envs/dadac-dev/bin/activate diff --git a/src/main/main.c b/src/main/main.c index 710657b88657e5aa634e62d6e10ac20bc31435cf..fa68fc0d501ec181463dc5157b30b2764b7a60ba 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -4,15 +4,32 @@ #include "../common/common.h" #include "../tree/tree.h" +#define THREAD_LEVEL MPI_THREAD_FUNNELED + int main(int argc, char** argv) { #if defined (_OPENMP) int mpi_provided_thread_level; - MPI_Init_thread( &argc, &argv, MPI_THREAD_FUNNELED, &mpi_provided_thread_level); - if ( mpi_provided_thread_level < MPI_THREAD_FUNNELED ) + MPI_Init_thread( &argc, &argv, THREAD_LEVEL, &mpi_provided_thread_level); + if ( mpi_provided_thread_level < THREAD_LEVEL ) { - printf("a problem arise when asking for MPI_THREAD_FUNNELED level\n"); - MPI_Finalize(); - exit( 1 ); + switch(THREAD_LEVEL) + { + case MPI_THREAD_FUNNELED: + printf("a problem arise when asking for MPI_THREAD_FUNNELED level\n"); + MPI_Finalize(); + exit( 1 ); + break; + case MPI_THREAD_SERIALIZED: + printf("a problem arise when asking for MPI_THREAD_SERIALIZED level\n"); + MPI_Finalize(); + exit( 1 ); + break; + case MPI_THREAD_MULTIPLE: + printf("a problem arise when asking for MPI_THREAD_MULTIPLE level\n"); + MPI_Finalize(); + exit( 1 ); + break; + } } #else MPI_Init(NULL, NULL); diff --git a/src/tree/tree.c b/src/tree/tree.c index 6b0f30549652d2d63cde1ca20f2ae0b2fa2e0c78..a9064252f1348a2f70693851bbbeaaebf49aa1e1 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -1809,7 +1809,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre TIME_START; MPI_Datatype MPI_my_heap; - MPI_Type_contiguous(k * sizeof(heap_node), MPI_CHAR, &MPI_my_heap); + MPI_Type_contiguous(k * sizeof(heap_node), MPI_BYTE, &MPI_my_heap); MPI_Barrier(ctx -> mpi_communicator); MPI_Type_commit(&MPI_my_heap); @@ -2023,7 +2023,7 @@ void build_local_tree(global_context_t* ctx, kdtree_v2* local_tree) void ordered_data_to_file(global_context_t* ctx) { //MPI_Barrier(ctx -> mpi_communicator); - MPI_DB_PRINT("[MASTER] writing to file\n"); + MPI_DB_PRINT("[MASTER] writing DATA to file\n"); float_t* tmp_data; int* ppp; int* displs; @@ -2064,7 +2064,7 @@ void ordered_data_to_file(global_context_t* ctx) void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname) { //MPI_Barrier(ctx -> mpi_communicator); - MPI_DB_PRINT("[MASTER] writing to file\n"); + MPI_DB_PRINT("[MASTER] writing to file %s\n", fname); void* tmp_data; int* ppp; int* displs; @@ -2094,11 +2094,15 @@ void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, } MPI_Gatherv(buffer, (int)(el_size * n), - MPI_CHAR, tmp_data, ppp, displs, MPI_CHAR, 0, ctx -> mpi_communicator); + MPI_BYTE, tmp_data, ppp, displs, MPI_BYTE, 0, ctx -> mpi_communicator); if(I_AM_MASTER) { FILE* file = fopen(fname,"w"); + if(!file) + { + printf("Cannot open file %s ! Aborting \n", fname); + } fwrite(tmp_data, 1, el_size * tot_n, file); fclose(file); free(tmp_data); @@ -2333,7 +2337,7 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_i int req_idx = 0; MPI_Datatype MPI_my_heap; - MPI_Type_contiguous(k * sizeof(heap_node), MPI_CHAR, &MPI_my_heap); + MPI_Type_contiguous(k * sizeof(heap_node), MPI_BYTE, &MPI_my_heap); MPI_Barrier(ctx -> mpi_communicator); MPI_Type_commit(&MPI_my_heap); @@ -2648,6 +2652,29 @@ datapoint_info_t find_possibly_halo_datapoint(global_context_t* ctx, idx_t idx) } } +datapoint_info_t find_possibly_halo_datapoint_rma(global_context_t* ctx, idx_t idx, MPI_Win win_datapoints) +{ + int owner = foreign_owner(ctx, idx); + /* find if datapoint is halo or not */ + if(owner == ctx -> mpi_rank) + { + idx_t i = idx - ctx -> idx_start; + return ctx -> local_datapoints[i]; + } + else + { + idx_t i = idx - ctx -> rank_idx_start[owner]; + datapoint_info_t tmp_dp; + MPI_Request request; + MPI_Status status; + MPI_Rget(&tmp_dp, sizeof(datapoint_info_t), MPI_BYTE, owner, + i * sizeof(datapoint_info_t), sizeof(datapoint_info_t), MPI_BYTE, win_datapoints, &request); + MPI_Wait(&request, MPI_STATUS_IGNORE); + + return tmp_dp; + } +} + void compute_density_kstarnn(global_context_t* ctx, const float_t d, int verbose){ /* @@ -2737,22 +2764,29 @@ void compute_density_kstarnn(global_context_t* ctx, const float_t d, int verbose #if defined(WRITE_DENSITY) /* densities */ float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); + float_t* gs = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t)); + for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho; for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar; + for(int i = 0; i < ctx -> local_n_points; ++i) gs[i] = ctx -> local_datapoints[i].g; ordered_buffer_to_file(ctx, den, sizeof(float_t), ctx -> local_n_points, "bb/ordered_density.npy"); ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy"); + ordered_buffer_to_file(ctx, gs, sizeof(float_t), ctx -> local_n_points, "bb/g.npy"); + ordered_data_to_file(ctx); + free(den); free(ks); + free(gs); #endif return; } -float_t get_j_ksel_dist(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win* exposed_ngbh) +float_t get_j_ksel_dist(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win exposed_ngbh) { int owner = foreign_owner(ctx, j); idx_t k = ctx -> k; @@ -2764,13 +2798,11 @@ float_t get_j_ksel_dist(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win* exp } else { - //RMA heap_node el; idx_t pos = j - ctx -> rank_idx_start[owner]; - MPI_Status status; MPI_Request request; - MPI_Rget(&el, sizeof(heap_node), MPI_CHAR, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_CHAR, *exposed_ngbh, &request); - MPI_Wait(&request,&status); + int err = MPI_Rget(&el, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, exposed_ngbh, &request); + MPI_Wait(&request,MPI_STATUS_IGNORE); return el.value; } } @@ -2786,31 +2818,17 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver */ - MPI_Win exposed_ngbh; MPI_Info info; MPI_Barrier(ctx -> mpi_communicator); idx_t k = ctx -> local_datapoints[0].ngbh.N; - MPI_DB_PRINT("%lu %p\n", k, ctx -> __local_heap_buffers); - - //int* test_buffer = (int*)malloc(k * sizeof(int)); - //for(int i = 0; i < k; ++i) test_buffer[i] = (ctx -> mpi_rank + 1) * 1024; - - MPI_Win_create( ctx -> __local_heap_buffers, - ctx -> local_n_points * k * sizeof(heap_node), - 1, MPI_INFO_NULL, - ctx -> mpi_communicator, - &exposed_ngbh); - - MPI_Win_fence(0, exposed_ngbh); struct timespec start_tot, finish_tot; double elapsed_tot; datapoint_info_t* local_datapoints = ctx -> local_datapoints; - //DB_PRINT("rank %d pos %lu own %d %lu %lu %lu\n", ctx -> mpi_rank, pos, owner, k, ksel, (pos * k + ksel) * sizeof(heap_node)); if(verbose) { printf("Density and k* estimation:\n"); @@ -2823,7 +2841,41 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver if(sizeof(float_t) == sizeof(float)){ omega = powf(PI_F,d/2)/tgammaf(d/2.0f + 1.0f);} else{omega = pow(M_PI,d/2.)/tgamma(d/2.0 + 1.0);} - #pragma omp parallel for + + MPI_Win exposed_ngbh; + MPI_Win_create( ctx -> __local_heap_buffers, + ctx -> local_n_points * k * sizeof(heap_node), + 1, MPI_INFO_NULL, + ctx -> mpi_communicator, + &exposed_ngbh); + + MPI_Win_fence(0, exposed_ngbh); + MPI_Win_lock_all(0, exposed_ngbh); + + //1 642146 87 + //idx_t j = 642146; + //idx_t ksel = 87; + //int owner = foreign_owner(ctx, j); + //heap_node el; + //idx_t pos = j - ctx -> rank_idx_start[owner]; + //MPI_Status status; + //MPI_Win_fence(0, exposed_ngbh); + + ////if(owner != ctx -> mpi_rank) + //for(owner = 0; owner < 2; ++owner) + //{ + // HERE + // MPI_Request request; + // DB_PRINT("-------%p %d rr j %lu k %lu pos %lu s %lu %lu \n", &el, ctx -> mpi_rank, j, ksel, pos, sizeof(heap_node), (pos * k + ksel) * sizeof(heap_node)); + // int err = MPI_Rget(&el, sizeof(heap_node), MPI_BYTE, owner, (pos * k + ksel) * sizeof(heap_node), sizeof(heap_node), MPI_BYTE, exposed_ngbh, &request); + // MPI_Wait(&request, MPI_STATUS_IGNORE); + // print_error_code(err); + // DB_PRINT("nnn %lu %lf\n", el.array_idx, el.value); + //} + + MPI_Barrier(ctx -> mpi_communicator); + + //#pragma omp parallel for for(idx_t i = 0; i < ctx -> local_n_points; ++i) { @@ -2844,7 +2896,7 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver * note jj can be an halo point * need to search maybe for it in foreign nodes * */ - float_t dist_jj = get_j_ksel_dist(ctx, jj, ksel, &exposed_ngbh); + float_t dist_jj = get_j_ksel_dist(ctx, jj, ksel, exposed_ngbh); vvj = omega * pow(dist_jj,d/2.); vp = (vvi + vvj)*(vvi + vvj); @@ -2874,20 +2926,25 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver elapsed_tot += (finish_tot.tv_nsec - start_tot.tv_nsec) / 1000000000.0; printf("\tTotal time: %.3lfs\n\n", elapsed_tot); } - + + MPI_Win_unlock_all(exposed_ngbh); MPI_Win_fence(0, exposed_ngbh); - MPI_Barrier(ctx -> mpi_communicator); MPI_Win_free(&exposed_ngbh); #if defined(WRITE_DENSITY) /* densities */ float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); + float_t* gs = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t)); + for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho; for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar; + for(int i = 0; i < ctx -> local_n_points; ++i) gs[i] = ctx -> local_datapoints[i].g; ordered_buffer_to_file(ctx, den, sizeof(float_t), ctx -> local_n_points, "bb/ordered_density.npy"); ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy"); + ordered_buffer_to_file(ctx, gs, sizeof(float_t), ctx -> local_n_points, "bb/g.npy"); + ordered_data_to_file(ctx); free(den); free(ks); @@ -2914,7 +2971,7 @@ float_t get_j_ksel_dist_v2(global_context_t* ctx, idx_t i, idx_t j, idx_t ksel, //RMA flags[i] = 0; idx_t pos = j - ctx -> rank_idx_start[owner]; - MPI_Get(tmp_heap_nodes + i, sizeof(heap_node), MPI_CHAR, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_CHAR, *exposed_ngbh); + MPI_Get(tmp_heap_nodes + i, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, *exposed_ngbh); //if(ctx -> mpi_rank == 0) DB_PRINT("rvcd %lu %lf\n", el.array_idx, el.value); return 0; } @@ -3342,6 +3399,437 @@ void compute_correction(global_context_t* ctx, float_t Z) //printf("%lf\n",min_log_rho); } +clusters_t Heuristic1_rma(global_context_t *ctx, int verbose) +{ + /* + * Heurisitc 1, from paper of Errico, Facco, Laio & Rodriguez + * ( https://doi.org/10.1016/j.ins.2021.01.010 ) + * + * args: + */ + datapoint_info_t* dp_info = ctx -> local_datapoints; + idx_t n = ctx -> local_n_points; + + struct timespec start_tot, finish_tot; + double elapsed_tot; + + if(verbose) + { + printf("H1: Preliminary cluster assignment\n"); + clock_gettime(CLOCK_MONOTONIC, &start_tot); + } + + TIME_DEF; + + + //idx_t ncenters = 0; + //idx_t putativeCenters = n; + lu_dynamic_array_t all_centers, removed_centers, actual_centers, max_rho; + lu_dynamic_array_allocate(&all_centers); + lu_dynamic_array_allocate(&removed_centers); + lu_dynamic_array_allocate(&actual_centers); + lu_dynamic_array_allocate(&max_rho); + + datapoint_info_t** dp_info_ptrs = (datapoint_info_t**)malloc(n*sizeof(datapoint_info_t*)); + + struct timespec start, finish; + double elapsed; + + + if(verbose) + { + clock_gettime(CLOCK_MONOTONIC, &start); + } + + /* proceed */ + + MPI_Win win_datapoints; + MPI_Win_create(ctx -> local_datapoints, ctx -> local_n_points * sizeof(datapoint_info_t), + 1, MPI_INFO_NULL, ctx -> mpi_communicator, &win_datapoints); + MPI_Win_fence(0, win_datapoints); + MPI_Win_lock_all(0, win_datapoints); + + + for(idx_t i = 0; i < n; ++i) + { + /* + * Find the centers of the clusters as the points of higher density in their neighborhoods + * A point is tagged as a putative center if it is the point of higer density of its neighborhood + */ + + dp_info_ptrs[i] = dp_info + i; + idx_t maxk = dp_info[i].kstar + 1; + float_t gi = dp_info[i].g; + dp_info[i].is_center = 1; + dp_info[i].cluster_idx = -1; + //printf("%lf\n",p -> g); + heap i_ngbh = dp_info[i].ngbh; + for(idx_t k = 1; k < maxk; ++k) + { + idx_t ngbh_index = i_ngbh.data[k].array_idx; + datapoint_info_t dj = find_possibly_halo_datapoint_rma(ctx, ngbh_index, win_datapoints); + float_t gj = dj.g; + if(gj > gi){ + dp_info[i].is_center = 0; + break; + } + } + if(dp_info[i].is_center) + { + //lu_dynamic_array_pushBack(&all_centers, dp_info[i].array_idx); + lu_dynamic_array_pushBack(&all_centers, i); + } + } + + if(verbose) + { + clock_gettime(CLOCK_MONOTONIC, &finish); + elapsed = (finish.tv_sec - start.tv_sec); + elapsed += (finish.tv_nsec - start.tv_nsec) / 1000000000.0; + printf("\tFinding putative centers: %.3lfs\n",elapsed); + clock_gettime(CLOCK_MONOTONIC, &start); + } + + + /* + * optimized version + * + * Generate a mask that keeps track of the point has been eliminating the + * point considered. Each thread updates this mask, then after the procedure + * ends, center, removed centers, and max_rho arrays are populated + */ + + heap_node* to_remove_mask = (heap_node*)malloc(n*sizeof(heap_node)); + for(idx_t p = 0; p < n; ++p) + { + to_remove_mask[p].array_idx = MY_SIZE_MAX; + to_remove_mask[p].value = 9999999; + } + qsort(dp_info_ptrs, n, sizeof(datapoint_info_t*), cmpPP); + + + MPI_Win win_to_remove_mask; + MPI_Win_create(to_remove_mask, n * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &win_to_remove_mask); + MPI_Win_fence(0, win_to_remove_mask); + + + + /* check into internal nodes */ + + /* + * to remove + * and to reimplement it using rma + * for dp in datapoints: + * for nghb in neighbors: + * if ngbh its_mine + * no_problems, do it as always + * else: + * ngbh is an external node + * do rma things, + * in particular, acquire a lock on the window, read and write things + * then close + */ + + //#pragma omp parallel for + for(idx_t p = 0; p < n; ++p) + { + datapoint_info_t i_point = *(dp_info_ptrs[p]); + + for(idx_t j = 1; j < i_point.kstar + 1; ++j) + { + idx_t jidx = i_point.ngbh.data[j].array_idx; + + datapoint_info_t j_point = find_possibly_halo_datapoint_rma(ctx, jidx, win_datapoints); + + if(j_point.is_center && i_point.g > j_point.g) + { + //#pragma omp critical + { + int owner = foreign_owner(ctx, jidx); + idx_t jpos = jidx - ctx -> rank_idx_start[owner]; + + MPI_Win_lock(MPI_LOCK_EXCLUSIVE, owner, 0, win_to_remove_mask); + heap_node mask_element; + MPI_Request request; + MPI_Rget( &mask_element, sizeof(heap_node), MPI_BYTE, + owner, jpos * sizeof(heap_node) , sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request); + MPI_Wait(&request, MPI_STATUS_IGNORE); + + int flag = mask_element.array_idx == MY_SIZE_MAX; + if(flag || i_point.g > mask_element.value ) + { + heap_node tmp_mask_element = {.array_idx = i_point.array_idx, .value = i_point.g}; + MPI_Request request; + MPI_Rput(&tmp_mask_element, sizeof(heap_node), MPI_BYTE, owner, + jpos*sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request); + MPI_Wait(&request, MPI_STATUS_IGNORE); + + } + + MPI_Win_unlock(owner, win_to_remove_mask); + } + } + } + } + + MPI_Win_fence(0, win_to_remove_mask); + + /* populate the usual arrays */ + for(idx_t p = 0; p < all_centers.count; ++p) + { + idx_t i = all_centers.data[p]; + int e = 0; + //float_t gi = dp_info[i].g; + idx_t mr = to_remove_mask[i].array_idx; + if(mr != MY_SIZE_MAX) + { + //if(dp_info[mr].g > gi) e = 1; + e = 1; + } + switch (e) + { + case 1: + { + lu_dynamic_array_pushBack(&removed_centers,i); + dp_info[i].is_center = 0; + for(idx_t c = 0; c < removed_centers.count - 1; ++c) + { + if(mr == removed_centers.data[c]) + { + mr = max_rho.data[c]; + } + } + lu_dynamic_array_pushBack(&max_rho,mr); + + } + break; + + case 0: + { + lu_dynamic_array_pushBack(&actual_centers,i); + dp_info[i].cluster_idx = actual_centers.count - 1; + } + break; + + default: + break; + } + } + free(to_remove_mask); + + if(verbose) + { + clock_gettime(CLOCK_MONOTONIC, &finish); + elapsed = (finish.tv_sec - start.tv_sec); + elapsed += (finish.tv_nsec - start.tv_nsec) / 1000000000.0; + printf("\tFinding actual centers: %.3lfs\n",elapsed); + clock_gettime(CLOCK_MONOTONIC, &start); + } + + int n_centers = (int)actual_centers.count; + int tot_centers; + MPI_Allreduce(&n_centers, &tot_centers, 1, MPI_INT, MPI_SUM, ctx -> mpi_communicator); + // MPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) + /* + DB_PRINT("rank %d got %d heheh\n", ctx -> mpi_rank, (int)actual_centers.count); + DB_PRINT("rank %d tc %d rc %d\n", ctx -> mpi_rank, (int)all_centers.count, (int)removed_centers.count); + */ + MPI_DB_PRINT("%d bbb\n", tot_centers); + + + /* bring on master all centers + * order them in ascending order of density, + * then re-scatter them around to get unique cluster labels */ + + + center_t* private_centers_buffer = (center_t*)malloc(actual_centers.count * sizeof(center_t)); + + center_t* global_centers_buffer = (center_t*)malloc(tot_centers * sizeof(center_t)); + + for(int i = 0; i < actual_centers.count; ++i) + { + idx_t idx = actual_centers.data[i] ; + private_centers_buffer[i].density = dp_info[idx].g; + private_centers_buffer[i].idx = dp_info[idx].array_idx; + } + MPI_Datatype mpi_center_type; + + MPI_Type_contiguous(sizeof(center_t), MPI_BYTE, &mpi_center_type); + MPI_Type_commit(&mpi_center_type); + + int* center_counts = (int*)malloc(ctx -> world_size * sizeof(int)); + int* center_displs = (int*)malloc(ctx -> world_size * sizeof(int)); + + int cc = (int)actual_centers.count; + MPI_Allgather(&cc, 1, MPI_INT, center_counts, 1, MPI_INT, ctx -> mpi_communicator); + + center_displs[0] = 0; + for(int i = 1; i < ctx -> world_size; ++i) center_displs[i] = center_displs[i - 1] + center_counts[i - 1]; + + /* alternative to check if something breaks */ + for(int i = 0; i < actual_centers.count; ++i) + { + idx_t idx = actual_centers.data[i]; + dp_info[idx].cluster_idx += center_displs[ctx -> mpi_rank]; + + } + + free(center_counts); + free(center_displs); + + /* + * Sort all the dp_info based on g and then perform the cluster assignment + * in asceding order + */ + + /* + * Sort all the dp_info based on g and then perform the cluster assignment + * in asceding order + */ + + int completed = 0; + + while(!completed) + { + completed = 1; + + + int proc_points = 0; + /* retrieve assignment + * if some point remain uncompleted then get assignment from external points */ + + for(idx_t i = 0; i < n; ++i) + { + int wait_for_comms = 0; + datapoint_info_t* p = dp_info_ptrs[i]; + if(!(p -> is_center) && p -> cluster_idx < 0) + { + int cluster = -1; + idx_t k = 0; + idx_t p_idx; + idx_t max_k = p -> ngbh.N; + /*assign each particle at the same cluster as the nearest particle of higher density*/ + datapoint_info_t p_retrieved; + while( k < max_k - 1 && cluster == -1) + { + + + ++k; + p_idx = p -> ngbh.data[k].array_idx; + p_retrieved = find_possibly_halo_datapoint_rma(ctx, p_idx, win_datapoints); + + int flag = p_retrieved.g > p -> g; + + if(p_retrieved.g > p -> g) + { + cluster = p_retrieved.cluster_idx; + wait_for_comms = 1; + break; + } + //if(p -> array_idx == 1587636) printf("%lu k %d p_idx %lu %lf %lf\n", k, cluster, p_idx, p_retrieved.g, p -> g ); + } + + + if(cluster == -1 && !wait_for_comms) + { + float_t gmax = -99999.; + idx_t gm_index = SIZE_MAX; + for(idx_t k = 0; k < max_k; ++k) + { + idx_t ngbh_index = p -> ngbh.data[k].array_idx; + for(idx_t m = 0; m < removed_centers.count; ++m) + { + idx_t max_rho_idx = max_rho.data[m]; + datapoint_info_t dp_max_rho = find_possibly_halo_datapoint_rma(ctx, max_rho_idx, win_datapoints); + //datapoint_info_t dp_max_rho = dp_info[max_rho_idx]; + float_t gcand = dp_max_rho.g; + if(ngbh_index == removed_centers.data[m] && gcand > gmax) + { + //printf("%lu -- %lu\n", ele, m); + gmax = gcand; + gm_index = max_rho.data[m]; + } + } + } + if(gm_index != SIZE_MAX) + { + datapoint_info_t dp_gm = find_possibly_halo_datapoint_rma(ctx, gm_index, win_datapoints); + //datapoint_info_t dp_gm = dp_info[gm_index]; + cluster = dp_gm.cluster_idx; + } + + } + p -> cluster_idx = cluster; + + } + completed = completed && p -> cluster_idx != -1; + proc_points += p -> cluster_idx != -1 ? 1 : 0; + + } + + //DB_PRINT("rank %d proc %d\n", ctx -> mpi_rank, proc_points); + MPI_Allreduce(MPI_IN_PLACE, &completed, 1, MPI_INT, MPI_SUM, ctx -> mpi_communicator); + completed = completed == ctx -> world_size ? 1 : 0; + + } + + MPI_Win_unlock_all(win_datapoints); + MPI_Win_fence(0, win_datapoints); + MPI_Win_free(&win_datapoints); + + if(verbose) + { + clock_gettime(CLOCK_MONOTONIC, &finish); + elapsed = (finish.tv_sec - start.tv_sec); + elapsed += (finish.tv_nsec - start.tv_nsec) / 1000000000.0; + printf("\tTentative clustering: %.3lfs\n",elapsed); + clock_gettime(CLOCK_MONOTONIC, &start); + } + + free(dp_info_ptrs); + free(max_rho.data); + free(removed_centers.data); + free(all_centers.data); + + #if defined(WRITE_CLUSTER_ASSIGN_H1) + /* densities */ + int* ks = (int*)malloc(ctx -> local_n_points * sizeof(int)); + for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].cluster_idx; + + ordered_buffer_to_file(ctx, ks, sizeof(int), ctx -> local_n_points, "bb/cl.npy"); + ordered_data_to_file(ctx); + free(ks); + #endif + + + clusters_t c_all; + c_all.centers = actual_centers; + + + if(verbose) + { + clock_gettime(CLOCK_MONOTONIC, &finish); + elapsed = (finish.tv_sec - start.tv_sec); + elapsed += (finish.tv_nsec - start.tv_nsec) / 1000000000.0; + printf("\tFinalizing clustering: %.3lfs\n",elapsed); + printf("\n"); + } + + clock_gettime(CLOCK_MONOTONIC, &finish_tot); + elapsed_tot = (finish_tot.tv_sec - start_tot.tv_sec); + elapsed_tot += (finish_tot.tv_nsec - start_tot.tv_nsec) / 1000000000.0; + + + if(verbose) + { + printf("\tFound %lu clusters\n",(uint64_t)actual_centers.count); + printf("\tTotal time: %.3lfs\n\n", elapsed_tot); + } + + c_all.n = n; + return c_all; +} + clusters_t Heuristic1(global_context_t *ctx, int verbose) { /* @@ -3657,8 +4145,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) DB_PRINT("rank %d got %d heheh\n", ctx -> mpi_rank, (int)actual_centers.count); DB_PRINT("rank %d tc %d rc %d\n", ctx -> mpi_rank, (int)all_centers.count, (int)removed_centers.count); */ - MPI_DB_PRINT("%d bbb\n", tot_centers); - /* bring on master all centers * order them in ascending order of density, @@ -3679,7 +4165,7 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) HERE - MPI_Type_contiguous(sizeof(center_t), MPI_CHAR, &mpi_center_type); + MPI_Type_contiguous(sizeof(center_t), MPI_BYTE, &mpi_center_type); MPI_Type_commit(&mpi_center_type); int* center_counts = (int*)malloc(ctx -> world_size * sizeof(int)); @@ -4004,7 +4490,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) // data = read_data_file(ctx,"../norm_data/std_g1212639_091_0001",MY_TRUE); ctx->dims = 5; - //ctx -> n_points = 5*2000; + // ctx -> n_points = 5 * 2000; ctx->n_points = ctx->n_points / ctx->dims; //ctx->n_points = (ctx->n_points * 5) / 10; // ctx -> n_points = ctx -> world_size * 1000; @@ -4023,8 +4509,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) int *send_counts = (int *)malloc(ctx->world_size * sizeof(int)); int *displacements = (int *)malloc(ctx->world_size * sizeof(int)); - HERE - displacements[0] = 0; send_counts[0] = ctx->n_points / ctx->world_size; send_counts[0] += (ctx->n_points % ctx->world_size) > 0 ? 1 : 0; @@ -4117,9 +4601,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) //float_t id = id_estimate(ctx, dp_info, ctx -> local_n_points, 0.9, MY_FALSE); float_t id = compute_ID_two_NN_ML(ctx, dp_info, ctx -> local_n_points, MY_FALSE); elapsed_time = TIME_STOP; - //id = 3.920865231328582; - //id = 4.008350298212649; - //id = 4.; LOG_WRITE("ID estimate", elapsed_time) MPI_DB_PRINT("ID %lf \n",id); @@ -4141,10 +4622,10 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) elapsed_time = TIME_STOP; LOG_WRITE("Density estimate", elapsed_time) - //TIME_START; - //Heuristic1(ctx, MY_FALSE); - //elapsed_time = TIME_STOP; - //LOG_WRITE("H1", elapsed_time) + TIME_START; + Heuristic1_rma(ctx, MY_FALSE); + elapsed_time = TIME_STOP; + LOG_WRITE("H1", elapsed_time) /* find density */