Skip to content
Snippets Groups Projects
Commit 85c70c3c authored by lykos98's avatar lykos98
Browse files

added working implementation of density via kstar and H1

parent 55582ae9
Branches
Tags
No related merge requests found
main main
sync.sh sync.sh
leo_sync.sh
bb bb
*.ipynb *.ipynb
scalability_results scalability_results
#!/bin/bash #!/bin/bash
#SBATCH --nodes=1 #SBATCH --nodes=2
#SBATCH --ntasks-per-node=2 #SBATCH --ntasks-per-node=2
#SBATCH --cpus-per-task=18 #SBATCH --cpus-per-task=18
#SBATCH --time=01:00:00 #SBATCH --time=01:00:00
...@@ -11,8 +11,6 @@ ...@@ -11,8 +11,6 @@
#SBATCH --error=err_pleiadi #SBATCH --error=err_pleiadi
#SBATCH --mem=230G #SBATCH --mem=230G
cd $SLURM_SUBMIT_DIR cd $SLURM_SUBMIT_DIR
module restore dev_pleiadi module restore dev_pleiadi
source /u/ftomba/my_envs/dadac-dev/bin/activate source /u/ftomba/my_envs/dadac-dev/bin/activate
......
...@@ -4,15 +4,32 @@ ...@@ -4,15 +4,32 @@
#include "../common/common.h" #include "../common/common.h"
#include "../tree/tree.h" #include "../tree/tree.h"
#define THREAD_LEVEL MPI_THREAD_FUNNELED
int main(int argc, char** argv) { int main(int argc, char** argv) {
#if defined (_OPENMP) #if defined (_OPENMP)
int mpi_provided_thread_level; int mpi_provided_thread_level;
MPI_Init_thread( &argc, &argv, MPI_THREAD_FUNNELED, &mpi_provided_thread_level); MPI_Init_thread( &argc, &argv, THREAD_LEVEL, &mpi_provided_thread_level);
if ( mpi_provided_thread_level < MPI_THREAD_FUNNELED ) if ( mpi_provided_thread_level < THREAD_LEVEL )
{
switch(THREAD_LEVEL)
{ {
case MPI_THREAD_FUNNELED:
printf("a problem arise when asking for MPI_THREAD_FUNNELED level\n"); printf("a problem arise when asking for MPI_THREAD_FUNNELED level\n");
MPI_Finalize(); MPI_Finalize();
exit( 1 ); exit( 1 );
break;
case MPI_THREAD_SERIALIZED:
printf("a problem arise when asking for MPI_THREAD_SERIALIZED level\n");
MPI_Finalize();
exit( 1 );
break;
case MPI_THREAD_MULTIPLE:
printf("a problem arise when asking for MPI_THREAD_MULTIPLE level\n");
MPI_Finalize();
exit( 1 );
break;
}
} }
#else #else
MPI_Init(NULL, NULL); MPI_Init(NULL, NULL);
......
...@@ -1809,7 +1809,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre ...@@ -1809,7 +1809,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
TIME_START; TIME_START;
MPI_Datatype MPI_my_heap; MPI_Datatype MPI_my_heap;
MPI_Type_contiguous(k * sizeof(heap_node), MPI_CHAR, &MPI_my_heap); MPI_Type_contiguous(k * sizeof(heap_node), MPI_BYTE, &MPI_my_heap);
MPI_Barrier(ctx -> mpi_communicator); MPI_Barrier(ctx -> mpi_communicator);
MPI_Type_commit(&MPI_my_heap); MPI_Type_commit(&MPI_my_heap);
...@@ -2023,7 +2023,7 @@ void build_local_tree(global_context_t* ctx, kdtree_v2* local_tree) ...@@ -2023,7 +2023,7 @@ void build_local_tree(global_context_t* ctx, kdtree_v2* local_tree)
void ordered_data_to_file(global_context_t* ctx) void ordered_data_to_file(global_context_t* ctx)
{ {
//MPI_Barrier(ctx -> mpi_communicator); //MPI_Barrier(ctx -> mpi_communicator);
MPI_DB_PRINT("[MASTER] writing to file\n"); MPI_DB_PRINT("[MASTER] writing DATA to file\n");
float_t* tmp_data; float_t* tmp_data;
int* ppp; int* ppp;
int* displs; int* displs;
...@@ -2064,7 +2064,7 @@ void ordered_data_to_file(global_context_t* ctx) ...@@ -2064,7 +2064,7 @@ void ordered_data_to_file(global_context_t* ctx)
void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname) void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname)
{ {
//MPI_Barrier(ctx -> mpi_communicator); //MPI_Barrier(ctx -> mpi_communicator);
MPI_DB_PRINT("[MASTER] writing to file\n"); MPI_DB_PRINT("[MASTER] writing to file %s\n", fname);
void* tmp_data; void* tmp_data;
int* ppp; int* ppp;
int* displs; int* displs;
...@@ -2094,11 +2094,15 @@ void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, ...@@ -2094,11 +2094,15 @@ void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size,
} }
MPI_Gatherv(buffer, (int)(el_size * n), MPI_Gatherv(buffer, (int)(el_size * n),
MPI_CHAR, tmp_data, ppp, displs, MPI_CHAR, 0, ctx -> mpi_communicator); MPI_BYTE, tmp_data, ppp, displs, MPI_BYTE, 0, ctx -> mpi_communicator);
if(I_AM_MASTER) if(I_AM_MASTER)
{ {
FILE* file = fopen(fname,"w"); FILE* file = fopen(fname,"w");
if(!file)
{
printf("Cannot open file %s ! Aborting \n", fname);
}
fwrite(tmp_data, 1, el_size * tot_n, file); fwrite(tmp_data, 1, el_size * tot_n, file);
fclose(file); fclose(file);
free(tmp_data); free(tmp_data);
...@@ -2333,7 +2337,7 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_i ...@@ -2333,7 +2337,7 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_i
int req_idx = 0; int req_idx = 0;
MPI_Datatype MPI_my_heap; MPI_Datatype MPI_my_heap;
MPI_Type_contiguous(k * sizeof(heap_node), MPI_CHAR, &MPI_my_heap); MPI_Type_contiguous(k * sizeof(heap_node), MPI_BYTE, &MPI_my_heap);
MPI_Barrier(ctx -> mpi_communicator); MPI_Barrier(ctx -> mpi_communicator);
MPI_Type_commit(&MPI_my_heap); MPI_Type_commit(&MPI_my_heap);
...@@ -2648,6 +2652,29 @@ datapoint_info_t find_possibly_halo_datapoint(global_context_t* ctx, idx_t idx) ...@@ -2648,6 +2652,29 @@ datapoint_info_t find_possibly_halo_datapoint(global_context_t* ctx, idx_t idx)
} }
} }
datapoint_info_t find_possibly_halo_datapoint_rma(global_context_t* ctx, idx_t idx, MPI_Win win_datapoints)
{
int owner = foreign_owner(ctx, idx);
/* find if datapoint is halo or not */
if(owner == ctx -> mpi_rank)
{
idx_t i = idx - ctx -> idx_start;
return ctx -> local_datapoints[i];
}
else
{
idx_t i = idx - ctx -> rank_idx_start[owner];
datapoint_info_t tmp_dp;
MPI_Request request;
MPI_Status status;
MPI_Rget(&tmp_dp, sizeof(datapoint_info_t), MPI_BYTE, owner,
i * sizeof(datapoint_info_t), sizeof(datapoint_info_t), MPI_BYTE, win_datapoints, &request);
MPI_Wait(&request, MPI_STATUS_IGNORE);
return tmp_dp;
}
}
void compute_density_kstarnn(global_context_t* ctx, const float_t d, int verbose){ void compute_density_kstarnn(global_context_t* ctx, const float_t d, int verbose){
/* /*
...@@ -2737,22 +2764,29 @@ void compute_density_kstarnn(global_context_t* ctx, const float_t d, int verbose ...@@ -2737,22 +2764,29 @@ void compute_density_kstarnn(global_context_t* ctx, const float_t d, int verbose
#if defined(WRITE_DENSITY) #if defined(WRITE_DENSITY)
/* densities */ /* densities */
float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t));
float_t* gs = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t));
idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t)); idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t));
for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho; for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho;
for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar; for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar;
for(int i = 0; i < ctx -> local_n_points; ++i) gs[i] = ctx -> local_datapoints[i].g;
ordered_buffer_to_file(ctx, den, sizeof(float_t), ctx -> local_n_points, "bb/ordered_density.npy"); ordered_buffer_to_file(ctx, den, sizeof(float_t), ctx -> local_n_points, "bb/ordered_density.npy");
ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy"); ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy");
ordered_buffer_to_file(ctx, gs, sizeof(float_t), ctx -> local_n_points, "bb/g.npy");
ordered_data_to_file(ctx); ordered_data_to_file(ctx);
free(den); free(den);
free(ks); free(ks);
free(gs);
#endif #endif
return; return;
} }
float_t get_j_ksel_dist(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win* exposed_ngbh) float_t get_j_ksel_dist(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win exposed_ngbh)
{ {
int owner = foreign_owner(ctx, j); int owner = foreign_owner(ctx, j);
idx_t k = ctx -> k; idx_t k = ctx -> k;
...@@ -2764,13 +2798,11 @@ float_t get_j_ksel_dist(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win* exp ...@@ -2764,13 +2798,11 @@ float_t get_j_ksel_dist(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win* exp
} }
else else
{ {
//RMA
heap_node el; heap_node el;
idx_t pos = j - ctx -> rank_idx_start[owner]; idx_t pos = j - ctx -> rank_idx_start[owner];
MPI_Status status;
MPI_Request request; MPI_Request request;
MPI_Rget(&el, sizeof(heap_node), MPI_CHAR, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_CHAR, *exposed_ngbh, &request); int err = MPI_Rget(&el, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, exposed_ngbh, &request);
MPI_Wait(&request,&status); MPI_Wait(&request,MPI_STATUS_IGNORE);
return el.value; return el.value;
} }
} }
...@@ -2786,31 +2818,17 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver ...@@ -2786,31 +2818,17 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver
*/ */
MPI_Win exposed_ngbh;
MPI_Info info; MPI_Info info;
MPI_Barrier(ctx -> mpi_communicator); MPI_Barrier(ctx -> mpi_communicator);
idx_t k = ctx -> local_datapoints[0].ngbh.N; idx_t k = ctx -> local_datapoints[0].ngbh.N;
MPI_DB_PRINT("%lu %p\n", k, ctx -> __local_heap_buffers);
//int* test_buffer = (int*)malloc(k * sizeof(int));
//for(int i = 0; i < k; ++i) test_buffer[i] = (ctx -> mpi_rank + 1) * 1024;
MPI_Win_create( ctx -> __local_heap_buffers,
ctx -> local_n_points * k * sizeof(heap_node),
1, MPI_INFO_NULL,
ctx -> mpi_communicator,
&exposed_ngbh);
MPI_Win_fence(0, exposed_ngbh);
struct timespec start_tot, finish_tot; struct timespec start_tot, finish_tot;
double elapsed_tot; double elapsed_tot;
datapoint_info_t* local_datapoints = ctx -> local_datapoints; datapoint_info_t* local_datapoints = ctx -> local_datapoints;
//DB_PRINT("rank %d pos %lu own %d %lu %lu %lu\n", ctx -> mpi_rank, pos, owner, k, ksel, (pos * k + ksel) * sizeof(heap_node));
if(verbose) if(verbose)
{ {
printf("Density and k* estimation:\n"); printf("Density and k* estimation:\n");
...@@ -2823,7 +2841,41 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver ...@@ -2823,7 +2841,41 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver
if(sizeof(float_t) == sizeof(float)){ omega = powf(PI_F,d/2)/tgammaf(d/2.0f + 1.0f);} if(sizeof(float_t) == sizeof(float)){ omega = powf(PI_F,d/2)/tgammaf(d/2.0f + 1.0f);}
else{omega = pow(M_PI,d/2.)/tgamma(d/2.0 + 1.0);} else{omega = pow(M_PI,d/2.)/tgamma(d/2.0 + 1.0);}
#pragma omp parallel for
MPI_Win exposed_ngbh;
MPI_Win_create( ctx -> __local_heap_buffers,
ctx -> local_n_points * k * sizeof(heap_node),
1, MPI_INFO_NULL,
ctx -> mpi_communicator,
&exposed_ngbh);
MPI_Win_fence(0, exposed_ngbh);
MPI_Win_lock_all(0, exposed_ngbh);
//1 642146 87
//idx_t j = 642146;
//idx_t ksel = 87;
//int owner = foreign_owner(ctx, j);
//heap_node el;
//idx_t pos = j - ctx -> rank_idx_start[owner];
//MPI_Status status;
//MPI_Win_fence(0, exposed_ngbh);
////if(owner != ctx -> mpi_rank)
//for(owner = 0; owner < 2; ++owner)
//{
// HERE
// MPI_Request request;
// DB_PRINT("-------%p %d rr j %lu k %lu pos %lu s %lu %lu \n", &el, ctx -> mpi_rank, j, ksel, pos, sizeof(heap_node), (pos * k + ksel) * sizeof(heap_node));
// int err = MPI_Rget(&el, sizeof(heap_node), MPI_BYTE, owner, (pos * k + ksel) * sizeof(heap_node), sizeof(heap_node), MPI_BYTE, exposed_ngbh, &request);
// MPI_Wait(&request, MPI_STATUS_IGNORE);
// print_error_code(err);
// DB_PRINT("nnn %lu %lf\n", el.array_idx, el.value);
//}
MPI_Barrier(ctx -> mpi_communicator);
//#pragma omp parallel for
for(idx_t i = 0; i < ctx -> local_n_points; ++i) for(idx_t i = 0; i < ctx -> local_n_points; ++i)
{ {
...@@ -2844,7 +2896,7 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver ...@@ -2844,7 +2896,7 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver
* note jj can be an halo point * note jj can be an halo point
* need to search maybe for it in foreign nodes * need to search maybe for it in foreign nodes
* */ * */
float_t dist_jj = get_j_ksel_dist(ctx, jj, ksel, &exposed_ngbh); float_t dist_jj = get_j_ksel_dist(ctx, jj, ksel, exposed_ngbh);
vvj = omega * pow(dist_jj,d/2.); vvj = omega * pow(dist_jj,d/2.);
vp = (vvi + vvj)*(vvi + vvj); vp = (vvi + vvj)*(vvi + vvj);
...@@ -2875,19 +2927,24 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver ...@@ -2875,19 +2927,24 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver
printf("\tTotal time: %.3lfs\n\n", elapsed_tot); printf("\tTotal time: %.3lfs\n\n", elapsed_tot);
} }
MPI_Win_unlock_all(exposed_ngbh);
MPI_Win_fence(0, exposed_ngbh); MPI_Win_fence(0, exposed_ngbh);
MPI_Barrier(ctx -> mpi_communicator);
MPI_Win_free(&exposed_ngbh); MPI_Win_free(&exposed_ngbh);
#if defined(WRITE_DENSITY) #if defined(WRITE_DENSITY)
/* densities */ /* densities */
float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t));
float_t* gs = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t));
idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t)); idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t));
for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho; for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho;
for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar; for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar;
for(int i = 0; i < ctx -> local_n_points; ++i) gs[i] = ctx -> local_datapoints[i].g;
ordered_buffer_to_file(ctx, den, sizeof(float_t), ctx -> local_n_points, "bb/ordered_density.npy"); ordered_buffer_to_file(ctx, den, sizeof(float_t), ctx -> local_n_points, "bb/ordered_density.npy");
ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy"); ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy");
ordered_buffer_to_file(ctx, gs, sizeof(float_t), ctx -> local_n_points, "bb/g.npy");
ordered_data_to_file(ctx); ordered_data_to_file(ctx);
free(den); free(den);
free(ks); free(ks);
...@@ -2914,7 +2971,7 @@ float_t get_j_ksel_dist_v2(global_context_t* ctx, idx_t i, idx_t j, idx_t ksel, ...@@ -2914,7 +2971,7 @@ float_t get_j_ksel_dist_v2(global_context_t* ctx, idx_t i, idx_t j, idx_t ksel,
//RMA //RMA
flags[i] = 0; flags[i] = 0;
idx_t pos = j - ctx -> rank_idx_start[owner]; idx_t pos = j - ctx -> rank_idx_start[owner];
MPI_Get(tmp_heap_nodes + i, sizeof(heap_node), MPI_CHAR, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_CHAR, *exposed_ngbh); MPI_Get(tmp_heap_nodes + i, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, *exposed_ngbh);
//if(ctx -> mpi_rank == 0) DB_PRINT("rvcd %lu %lf\n", el.array_idx, el.value); //if(ctx -> mpi_rank == 0) DB_PRINT("rvcd %lu %lf\n", el.array_idx, el.value);
return 0; return 0;
} }
...@@ -3342,6 +3399,437 @@ void compute_correction(global_context_t* ctx, float_t Z) ...@@ -3342,6 +3399,437 @@ void compute_correction(global_context_t* ctx, float_t Z)
//printf("%lf\n",min_log_rho); //printf("%lf\n",min_log_rho);
} }
clusters_t Heuristic1_rma(global_context_t *ctx, int verbose)
{
/*
* Heurisitc 1, from paper of Errico, Facco, Laio & Rodriguez
* ( https://doi.org/10.1016/j.ins.2021.01.010 )
*
* args:
*/
datapoint_info_t* dp_info = ctx -> local_datapoints;
idx_t n = ctx -> local_n_points;
struct timespec start_tot, finish_tot;
double elapsed_tot;
if(verbose)
{
printf("H1: Preliminary cluster assignment\n");
clock_gettime(CLOCK_MONOTONIC, &start_tot);
}
TIME_DEF;
//idx_t ncenters = 0;
//idx_t putativeCenters = n;
lu_dynamic_array_t all_centers, removed_centers, actual_centers, max_rho;
lu_dynamic_array_allocate(&all_centers);
lu_dynamic_array_allocate(&removed_centers);
lu_dynamic_array_allocate(&actual_centers);
lu_dynamic_array_allocate(&max_rho);
datapoint_info_t** dp_info_ptrs = (datapoint_info_t**)malloc(n*sizeof(datapoint_info_t*));
struct timespec start, finish;
double elapsed;
if(verbose)
{
clock_gettime(CLOCK_MONOTONIC, &start);
}
/* proceed */
MPI_Win win_datapoints;
MPI_Win_create(ctx -> local_datapoints, ctx -> local_n_points * sizeof(datapoint_info_t),
1, MPI_INFO_NULL, ctx -> mpi_communicator, &win_datapoints);
MPI_Win_fence(0, win_datapoints);
MPI_Win_lock_all(0, win_datapoints);
for(idx_t i = 0; i < n; ++i)
{
/*
* Find the centers of the clusters as the points of higher density in their neighborhoods
* A point is tagged as a putative center if it is the point of higer density of its neighborhood
*/
dp_info_ptrs[i] = dp_info + i;
idx_t maxk = dp_info[i].kstar + 1;
float_t gi = dp_info[i].g;
dp_info[i].is_center = 1;
dp_info[i].cluster_idx = -1;
//printf("%lf\n",p -> g);
heap i_ngbh = dp_info[i].ngbh;
for(idx_t k = 1; k < maxk; ++k)
{
idx_t ngbh_index = i_ngbh.data[k].array_idx;
datapoint_info_t dj = find_possibly_halo_datapoint_rma(ctx, ngbh_index, win_datapoints);
float_t gj = dj.g;
if(gj > gi){
dp_info[i].is_center = 0;
break;
}
}
if(dp_info[i].is_center)
{
//lu_dynamic_array_pushBack(&all_centers, dp_info[i].array_idx);
lu_dynamic_array_pushBack(&all_centers, i);
}
}
if(verbose)
{
clock_gettime(CLOCK_MONOTONIC, &finish);
elapsed = (finish.tv_sec - start.tv_sec);
elapsed += (finish.tv_nsec - start.tv_nsec) / 1000000000.0;
printf("\tFinding putative centers: %.3lfs\n",elapsed);
clock_gettime(CLOCK_MONOTONIC, &start);
}
/*
* optimized version
*
* Generate a mask that keeps track of the point has been eliminating the
* point considered. Each thread updates this mask, then after the procedure
* ends, center, removed centers, and max_rho arrays are populated
*/
heap_node* to_remove_mask = (heap_node*)malloc(n*sizeof(heap_node));
for(idx_t p = 0; p < n; ++p)
{
to_remove_mask[p].array_idx = MY_SIZE_MAX;
to_remove_mask[p].value = 9999999;
}
qsort(dp_info_ptrs, n, sizeof(datapoint_info_t*), cmpPP);
MPI_Win win_to_remove_mask;
MPI_Win_create(to_remove_mask, n * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &win_to_remove_mask);
MPI_Win_fence(0, win_to_remove_mask);
/* check into internal nodes */
/*
* to remove
* and to reimplement it using rma
* for dp in datapoints:
* for nghb in neighbors:
* if ngbh its_mine
* no_problems, do it as always
* else:
* ngbh is an external node
* do rma things,
* in particular, acquire a lock on the window, read and write things
* then close
*/
//#pragma omp parallel for
for(idx_t p = 0; p < n; ++p)
{
datapoint_info_t i_point = *(dp_info_ptrs[p]);
for(idx_t j = 1; j < i_point.kstar + 1; ++j)
{
idx_t jidx = i_point.ngbh.data[j].array_idx;
datapoint_info_t j_point = find_possibly_halo_datapoint_rma(ctx, jidx, win_datapoints);
if(j_point.is_center && i_point.g > j_point.g)
{
//#pragma omp critical
{
int owner = foreign_owner(ctx, jidx);
idx_t jpos = jidx - ctx -> rank_idx_start[owner];
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, owner, 0, win_to_remove_mask);
heap_node mask_element;
MPI_Request request;
MPI_Rget( &mask_element, sizeof(heap_node), MPI_BYTE,
owner, jpos * sizeof(heap_node) , sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request);
MPI_Wait(&request, MPI_STATUS_IGNORE);
int flag = mask_element.array_idx == MY_SIZE_MAX;
if(flag || i_point.g > mask_element.value )
{
heap_node tmp_mask_element = {.array_idx = i_point.array_idx, .value = i_point.g};
MPI_Request request;
MPI_Rput(&tmp_mask_element, sizeof(heap_node), MPI_BYTE, owner,
jpos*sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request);
MPI_Wait(&request, MPI_STATUS_IGNORE);
}
MPI_Win_unlock(owner, win_to_remove_mask);
}
}
}
}
MPI_Win_fence(0, win_to_remove_mask);
/* populate the usual arrays */
for(idx_t p = 0; p < all_centers.count; ++p)
{
idx_t i = all_centers.data[p];
int e = 0;
//float_t gi = dp_info[i].g;
idx_t mr = to_remove_mask[i].array_idx;
if(mr != MY_SIZE_MAX)
{
//if(dp_info[mr].g > gi) e = 1;
e = 1;
}
switch (e)
{
case 1:
{
lu_dynamic_array_pushBack(&removed_centers,i);
dp_info[i].is_center = 0;
for(idx_t c = 0; c < removed_centers.count - 1; ++c)
{
if(mr == removed_centers.data[c])
{
mr = max_rho.data[c];
}
}
lu_dynamic_array_pushBack(&max_rho,mr);
}
break;
case 0:
{
lu_dynamic_array_pushBack(&actual_centers,i);
dp_info[i].cluster_idx = actual_centers.count - 1;
}
break;
default:
break;
}
}
free(to_remove_mask);
if(verbose)
{
clock_gettime(CLOCK_MONOTONIC, &finish);
elapsed = (finish.tv_sec - start.tv_sec);
elapsed += (finish.tv_nsec - start.tv_nsec) / 1000000000.0;
printf("\tFinding actual centers: %.3lfs\n",elapsed);
clock_gettime(CLOCK_MONOTONIC, &start);
}
int n_centers = (int)actual_centers.count;
int tot_centers;
MPI_Allreduce(&n_centers, &tot_centers, 1, MPI_INT, MPI_SUM, ctx -> mpi_communicator);
// MPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
/*
DB_PRINT("rank %d got %d heheh\n", ctx -> mpi_rank, (int)actual_centers.count);
DB_PRINT("rank %d tc %d rc %d\n", ctx -> mpi_rank, (int)all_centers.count, (int)removed_centers.count);
*/
MPI_DB_PRINT("%d bbb\n", tot_centers);
/* bring on master all centers
* order them in ascending order of density,
* then re-scatter them around to get unique cluster labels */
center_t* private_centers_buffer = (center_t*)malloc(actual_centers.count * sizeof(center_t));
center_t* global_centers_buffer = (center_t*)malloc(tot_centers * sizeof(center_t));
for(int i = 0; i < actual_centers.count; ++i)
{
idx_t idx = actual_centers.data[i] ;
private_centers_buffer[i].density = dp_info[idx].g;
private_centers_buffer[i].idx = dp_info[idx].array_idx;
}
MPI_Datatype mpi_center_type;
MPI_Type_contiguous(sizeof(center_t), MPI_BYTE, &mpi_center_type);
MPI_Type_commit(&mpi_center_type);
int* center_counts = (int*)malloc(ctx -> world_size * sizeof(int));
int* center_displs = (int*)malloc(ctx -> world_size * sizeof(int));
int cc = (int)actual_centers.count;
MPI_Allgather(&cc, 1, MPI_INT, center_counts, 1, MPI_INT, ctx -> mpi_communicator);
center_displs[0] = 0;
for(int i = 1; i < ctx -> world_size; ++i) center_displs[i] = center_displs[i - 1] + center_counts[i - 1];
/* alternative to check if something breaks */
for(int i = 0; i < actual_centers.count; ++i)
{
idx_t idx = actual_centers.data[i];
dp_info[idx].cluster_idx += center_displs[ctx -> mpi_rank];
}
free(center_counts);
free(center_displs);
/*
* Sort all the dp_info based on g and then perform the cluster assignment
* in asceding order
*/
/*
* Sort all the dp_info based on g and then perform the cluster assignment
* in asceding order
*/
int completed = 0;
while(!completed)
{
completed = 1;
int proc_points = 0;
/* retrieve assignment
* if some point remain uncompleted then get assignment from external points */
for(idx_t i = 0; i < n; ++i)
{
int wait_for_comms = 0;
datapoint_info_t* p = dp_info_ptrs[i];
if(!(p -> is_center) && p -> cluster_idx < 0)
{
int cluster = -1;
idx_t k = 0;
idx_t p_idx;
idx_t max_k = p -> ngbh.N;
/*assign each particle at the same cluster as the nearest particle of higher density*/
datapoint_info_t p_retrieved;
while( k < max_k - 1 && cluster == -1)
{
++k;
p_idx = p -> ngbh.data[k].array_idx;
p_retrieved = find_possibly_halo_datapoint_rma(ctx, p_idx, win_datapoints);
int flag = p_retrieved.g > p -> g;
if(p_retrieved.g > p -> g)
{
cluster = p_retrieved.cluster_idx;
wait_for_comms = 1;
break;
}
//if(p -> array_idx == 1587636) printf("%lu k %d p_idx %lu %lf %lf\n", k, cluster, p_idx, p_retrieved.g, p -> g );
}
if(cluster == -1 && !wait_for_comms)
{
float_t gmax = -99999.;
idx_t gm_index = SIZE_MAX;
for(idx_t k = 0; k < max_k; ++k)
{
idx_t ngbh_index = p -> ngbh.data[k].array_idx;
for(idx_t m = 0; m < removed_centers.count; ++m)
{
idx_t max_rho_idx = max_rho.data[m];
datapoint_info_t dp_max_rho = find_possibly_halo_datapoint_rma(ctx, max_rho_idx, win_datapoints);
//datapoint_info_t dp_max_rho = dp_info[max_rho_idx];
float_t gcand = dp_max_rho.g;
if(ngbh_index == removed_centers.data[m] && gcand > gmax)
{
//printf("%lu -- %lu\n", ele, m);
gmax = gcand;
gm_index = max_rho.data[m];
}
}
}
if(gm_index != SIZE_MAX)
{
datapoint_info_t dp_gm = find_possibly_halo_datapoint_rma(ctx, gm_index, win_datapoints);
//datapoint_info_t dp_gm = dp_info[gm_index];
cluster = dp_gm.cluster_idx;
}
}
p -> cluster_idx = cluster;
}
completed = completed && p -> cluster_idx != -1;
proc_points += p -> cluster_idx != -1 ? 1 : 0;
}
//DB_PRINT("rank %d proc %d\n", ctx -> mpi_rank, proc_points);
MPI_Allreduce(MPI_IN_PLACE, &completed, 1, MPI_INT, MPI_SUM, ctx -> mpi_communicator);
completed = completed == ctx -> world_size ? 1 : 0;
}
MPI_Win_unlock_all(win_datapoints);
MPI_Win_fence(0, win_datapoints);
MPI_Win_free(&win_datapoints);
if(verbose)
{
clock_gettime(CLOCK_MONOTONIC, &finish);
elapsed = (finish.tv_sec - start.tv_sec);
elapsed += (finish.tv_nsec - start.tv_nsec) / 1000000000.0;
printf("\tTentative clustering: %.3lfs\n",elapsed);
clock_gettime(CLOCK_MONOTONIC, &start);
}
free(dp_info_ptrs);
free(max_rho.data);
free(removed_centers.data);
free(all_centers.data);
#if defined(WRITE_CLUSTER_ASSIGN_H1)
/* densities */
int* ks = (int*)malloc(ctx -> local_n_points * sizeof(int));
for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].cluster_idx;
ordered_buffer_to_file(ctx, ks, sizeof(int), ctx -> local_n_points, "bb/cl.npy");
ordered_data_to_file(ctx);
free(ks);
#endif
clusters_t c_all;
c_all.centers = actual_centers;
if(verbose)
{
clock_gettime(CLOCK_MONOTONIC, &finish);
elapsed = (finish.tv_sec - start.tv_sec);
elapsed += (finish.tv_nsec - start.tv_nsec) / 1000000000.0;
printf("\tFinalizing clustering: %.3lfs\n",elapsed);
printf("\n");
}
clock_gettime(CLOCK_MONOTONIC, &finish_tot);
elapsed_tot = (finish_tot.tv_sec - start_tot.tv_sec);
elapsed_tot += (finish_tot.tv_nsec - start_tot.tv_nsec) / 1000000000.0;
if(verbose)
{
printf("\tFound %lu clusters\n",(uint64_t)actual_centers.count);
printf("\tTotal time: %.3lfs\n\n", elapsed_tot);
}
c_all.n = n;
return c_all;
}
clusters_t Heuristic1(global_context_t *ctx, int verbose) clusters_t Heuristic1(global_context_t *ctx, int verbose)
{ {
/* /*
...@@ -3657,8 +4145,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) ...@@ -3657,8 +4145,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
DB_PRINT("rank %d got %d heheh\n", ctx -> mpi_rank, (int)actual_centers.count); DB_PRINT("rank %d got %d heheh\n", ctx -> mpi_rank, (int)actual_centers.count);
DB_PRINT("rank %d tc %d rc %d\n", ctx -> mpi_rank, (int)all_centers.count, (int)removed_centers.count); DB_PRINT("rank %d tc %d rc %d\n", ctx -> mpi_rank, (int)all_centers.count, (int)removed_centers.count);
*/ */
MPI_DB_PRINT("%d bbb\n", tot_centers);
/* bring on master all centers /* bring on master all centers
* order them in ascending order of density, * order them in ascending order of density,
...@@ -3679,7 +4165,7 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) ...@@ -3679,7 +4165,7 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose)
HERE HERE
MPI_Type_contiguous(sizeof(center_t), MPI_CHAR, &mpi_center_type); MPI_Type_contiguous(sizeof(center_t), MPI_BYTE, &mpi_center_type);
MPI_Type_commit(&mpi_center_type); MPI_Type_commit(&mpi_center_type);
int* center_counts = (int*)malloc(ctx -> world_size * sizeof(int)); int* center_counts = (int*)malloc(ctx -> world_size * sizeof(int));
...@@ -4023,8 +4509,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) ...@@ -4023,8 +4509,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
int *send_counts = (int *)malloc(ctx->world_size * sizeof(int)); int *send_counts = (int *)malloc(ctx->world_size * sizeof(int));
int *displacements = (int *)malloc(ctx->world_size * sizeof(int)); int *displacements = (int *)malloc(ctx->world_size * sizeof(int));
HERE
displacements[0] = 0; displacements[0] = 0;
send_counts[0] = ctx->n_points / ctx->world_size; send_counts[0] = ctx->n_points / ctx->world_size;
send_counts[0] += (ctx->n_points % ctx->world_size) > 0 ? 1 : 0; send_counts[0] += (ctx->n_points % ctx->world_size) > 0 ? 1 : 0;
...@@ -4117,9 +4601,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) ...@@ -4117,9 +4601,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
//float_t id = id_estimate(ctx, dp_info, ctx -> local_n_points, 0.9, MY_FALSE); //float_t id = id_estimate(ctx, dp_info, ctx -> local_n_points, 0.9, MY_FALSE);
float_t id = compute_ID_two_NN_ML(ctx, dp_info, ctx -> local_n_points, MY_FALSE); float_t id = compute_ID_two_NN_ML(ctx, dp_info, ctx -> local_n_points, MY_FALSE);
elapsed_time = TIME_STOP; elapsed_time = TIME_STOP;
//id = 3.920865231328582;
//id = 4.008350298212649;
//id = 4.;
LOG_WRITE("ID estimate", elapsed_time) LOG_WRITE("ID estimate", elapsed_time)
MPI_DB_PRINT("ID %lf \n",id); MPI_DB_PRINT("ID %lf \n",id);
...@@ -4141,10 +4622,10 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) ...@@ -4141,10 +4622,10 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
elapsed_time = TIME_STOP; elapsed_time = TIME_STOP;
LOG_WRITE("Density estimate", elapsed_time) LOG_WRITE("Density estimate", elapsed_time)
//TIME_START; TIME_START;
//Heuristic1(ctx, MY_FALSE); Heuristic1_rma(ctx, MY_FALSE);
//elapsed_time = TIME_STOP; elapsed_time = TIME_STOP;
//LOG_WRITE("H1", elapsed_time) LOG_WRITE("H1", elapsed_time)
/* find density */ /* find density */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment