diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..333234e8130ed7a52b83cd7cd71ffe48d9f950e3 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,15 @@ +cmake_minimum_required(VERSION 2.4) +project(dadp) + +set(CMAKE_C_COMPILER "mpicc") +add_compile_options("-O3") +add_compile_options("-g") +add_compile_options("-fopenmp") +link_libraries("-lm") +link_libraries("-fopenmp") + +include_directories(${PROJECT_SOURCE_DIR}/include) + +add_executable(main src/main/main.c src/tree/tree.c src/common/common.c src/tree/kdtreeV2.c src/tree/heap.c) + + diff --git a/Makefile b/Makefile index 33f97029e99f95a9cfa4f9e1afaf6283fbcc6516..8df32476d9ac91b9171f382b9ea62f1c606e8b33 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ CC=mpicc -CFLAGS=-O3 -g -LDFLAGS=-lm -fopenmp +CFLAGS=-O3 -g -fopenmp +LDFLAGS=-lm all: main diff --git a/run_pleiadi b/run_pleiadi index 34cb27eaa5e6cf9c70f93f46435c73d0aa0a359b..618905d56515723e1cff57ee949f03e6fe318405 100644 --- a/run_pleiadi +++ b/run_pleiadi @@ -1,6 +1,6 @@ #!/bin/bash -#SBATCH --nodes=2 +#SBATCH --nodes=1 #SBATCH --ntasks-per-node=2 #SBATCH --cpus-per-task=18 #SBATCH --time=01:00:00 @@ -26,6 +26,7 @@ rm bb/* #time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:node:PE=${SLURM_CPUS_PER_TASK} main time mpirun -n ${SLURM_NTASKS} --mca orte_base_help_aggregate 0 --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK} main +#time mpirun -n ${SLURM_NTASKS} --mca orte_base_help_aggregate 0 --map-by core main #time mpirun -n ${SLURM_NTASKS} main #time python3 check.py diff --git a/run_pleiadi_no_openmp b/run_pleiadi_no_openmp new file mode 100644 index 0000000000000000000000000000000000000000..d6a09146422fddcac708eeb5604843321639070f --- /dev/null +++ b/run_pleiadi_no_openmp @@ -0,0 +1,28 @@ +#!/bin/bash + +#SBATCH --nodes=8 +#SBATCH --ntasks-per-node=36 +#SBATCH --cpus-per-task=1 +#SBATCH --time=01:00:00 +#SBATCH --job-name=dADP-test +#SBATCH --account=ulearn +#SBATCH --partition=pleiadi +#SBATCH --output=out_pleiadi +#SBATCH --error=err_pleiadi +#SBATCH --mem=230G + +cd $SLURM_SUBMIT_DIR +module restore dev_pleiadi +source /u/ftomba/my_envs/dadac-dev/bin/activate +make clean +make + +#export PSM2_MQ_SENDREQS_MAX=268435456 +#export PSM2_MQ_RECVREQS_MAX=268435456 + +rm bb/* + +time mpirun -n ${SLURM_NTASKS} --mca orte_base_help_aggregate 0 --map-by core main + +#time python3 check.py + diff --git a/src/common/common.h b/src/common/common.h index 1af32d9019427906a54731f18a4454717bd701aa..3bd19562a400f2111b25cc10ec8a08c872d6c6dd 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -35,6 +35,9 @@ typedef struct datapoint_info_t { #define CHECK_ALLOCATION(x) if(!x){printf("[!!!] %d rank encountered failed allocation at line %s ", ctx -> mpi_rank, __LINE__ ); exit(1);}; +#define CHECK_ALLOCATION_NO_CTX(x) if(!x){printf("[!!!] Failed allocation at line %d ", __LINE__ ); exit(1);} +#define MY_MALLOC(n) ({void* p = calloc(n,1); CHECK_ALLOCATION_NO_CTX(p); p; }) + #define DB_PRINT(...) printf(__VA_ARGS__) #ifdef NDEBUG #undef DB_PRINT(...) @@ -64,6 +67,7 @@ typedef struct datapoint_info_t { (clock_gettime(CLOCK_MONOTONIC,&__end), \ (double)(__end.tv_sec - __start.tv_sec) + (__end.tv_nsec - __start.tv_nsec)/1e9) #define LOG_WRITE(sec_name,time) { \ + MPI_Barrier(ctx -> mpi_communicator); \ if(time > 0) \ { \ double max, min, avg; \ diff --git a/src/main/main.c b/src/main/main.c index fa68fc0d501ec181463dc5157b30b2764b7a60ba..0a698424c28fea2b0e5f13228544e09d1fe885e2 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -4,7 +4,13 @@ #include "../common/common.h" #include "../tree/tree.h" -#define THREAD_LEVEL MPI_THREAD_FUNNELED +// + +#ifdef THREAD_FUNNELED + #define THREAD_LEVEL MPI_THREAD_FUNNELED +#else + #define THREAD_LEVEL MPI_THREAD_MULTIPLE +#endif int main(int argc, char** argv) { #if defined (_OPENMP) @@ -49,7 +55,13 @@ int main(int argc, char** argv) { #else mpi_printf(&ctx,"Running pure MPI code\n"); #endif - + + #if defined (THREAD_FUNNELED) + mpi_printf(&ctx,"/!\\ Code build with MPI_THREAD_FUNNELED level\n"); + #else + mpi_printf(&ctx,"/!\\ Code build with MPI_THREAD_MULTIPLE level\n"); + #endif + /* * Mock reading some files, one for each processor */ diff --git a/src/tree/tree.c b/src/tree/tree.c index a9064252f1348a2f70693851bbbeaaebf49aa1e1..10807c1f0f6b5f54a5bd82af837a5e1ca160e13c 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -22,7 +22,8 @@ //#define WRITE_NGBH //#define WRITE_TOP_NODES //#define WRITE_DENSITY -//#define WRITE_CLUSTER_ASSIGN_H1 +#define WRITE_CLUSTER_ASSIGN_H1 +#define WRITE_BORDERS /* * Maximum bytes to send with a single mpi send/recv, used @@ -81,11 +82,11 @@ float_t *read_data_file(global_context_t *ctx, const char *fname, n = n / (InputFloatSize); - float_t *data = (float_t *)malloc(n * sizeof(float_t)); + float_t *data = (float_t *)MY_MALLOC(n * sizeof(float_t)); if (file_in_float32) { - float *df = (float *)malloc(n * sizeof(float)); + float *df = (float *)MY_MALLOC(n * sizeof(float)); size_t fff = fread(df, sizeof(float), n, f); mpi_printf(ctx, "Read %luB\n", fff); fclose(f); @@ -96,7 +97,7 @@ float_t *read_data_file(global_context_t *ctx, const char *fname, } else { - double *df = (double *)malloc(n * sizeof(double)); + double *df = (double *)MY_MALLOC(n * sizeof(double)); size_t fff = fread(df, sizeof(double), n, f); mpi_printf(ctx, "Read %luB\n", fff); fclose(f); @@ -186,8 +187,8 @@ int compare_data_element_sort(const void *a, const void *b) { } void compute_bounding_box(global_context_t *ctx) { - ctx->lb_box = (float_t *)malloc(ctx->dims * sizeof(float_t)); - ctx->ub_box = (float_t *)malloc(ctx->dims * sizeof(float_t)); + ctx->lb_box = (float_t *)MY_MALLOC(ctx->dims * sizeof(float_t)); + ctx->ub_box = (float_t *)MY_MALLOC(ctx->dims * sizeof(float_t)); for (size_t d = 0; d < ctx->dims; ++d) { ctx->lb_box[d] = 99999999.; @@ -280,7 +281,7 @@ void compute_medians_and_check(global_context_t *ctx, float_t *data) { * a median */ - float_t *medians_rcv = (float_t *)malloc(ctx->dims * ctx->world_size * sizeof(float_t)); + float_t *medians_rcv = (float_t *)MY_MALLOC(ctx->dims * ctx->world_size * sizeof(float_t)); /* * MPI_Allgather( const void *sendbuf, @@ -471,7 +472,7 @@ void global_binning_check(global_context_t *ctx, float_t *data, int d, int k) if (I_AM_MASTER) { - int *counts = (int *)malloc(k * sizeof(int)); + int *counts = (int *)MY_MALLOC(k * sizeof(int)); for (int bin_idx = 0; bin_idx < k; ++bin_idx) counts[bin_idx] = 0; float_t box_lb = ctx->lb_box[d]; @@ -506,7 +507,7 @@ void compute_pure_global_binning(global_context_t *ctx, pointset_t *ps, int d) { /* compute binning of data along dimension d */ - uint64_t *local_bin_count = (uint64_t *)malloc(k_global * sizeof(uint64_t)); + uint64_t *local_bin_count = (uint64_t *)MY_MALLOC(k_global * sizeof(uint64_t)); for (size_t k = 0; k < k_global; ++k) { local_bin_count[k] = 0; @@ -583,7 +584,7 @@ guess_t refine_pure_binning(global_context_t *ctx, pointset_t *ps, float_t bin_lb = ps->lb_box[d] + (bin_w * (best_guess.bin_idx)); float_t bin_ub = ps->lb_box[d] + (bin_w * (best_guess.bin_idx + 1)); - uint64_t *tmp_global_bins = (uint64_t *)malloc(sizeof(uint64_t) * k_global); + uint64_t *tmp_global_bins = (uint64_t *)MY_MALLOC(sizeof(uint64_t) * k_global); for (int i = 0; i < k_global; ++i) tmp_global_bins[i] = global_bin_count[i]; /* @@ -632,8 +633,8 @@ guess_t refine_pure_binning(global_context_t *ctx, pointset_t *ps, tmp_ps.n_points = end_idx - start_idx; tmp_ps.data = ps->data + start_idx * ps->dims; tmp_ps.dims = ps->dims; - tmp_ps.lb_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); - tmp_ps.ub_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); + tmp_ps.lb_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); + tmp_ps.ub_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); compute_bounding_box_pointset(ctx, &tmp_ps); @@ -678,7 +679,7 @@ void init_queue(partition_queue_t *pq) { pq->count = 0; pq->_capacity = 1000; - pq->data = (partition_t *)malloc(pq->_capacity * sizeof(partition_t)); + pq->data = (partition_t *)MY_MALLOC(pq->_capacity * sizeof(partition_t)); } void free_queue(partition_queue_t *pq) { free(pq->data); } @@ -695,7 +696,7 @@ guess_t compute_median_pure_binning(global_context_t *ctx, pointset_t *ps, float int best_bin_idx; float_t ep; - uint64_t *global_bin_counts_int = (uint64_t *)malloc(n_bins * sizeof(uint64_t)); + uint64_t *global_bin_counts_int = (uint64_t *)MY_MALLOC(n_bins * sizeof(uint64_t)); compute_bounding_box_pointset(ctx, ps); compute_pure_global_binning(ctx, ps, global_bin_counts_int, n_bins, selected_dim); @@ -722,7 +723,7 @@ void top_tree_init(global_context_t *ctx, top_kdtree_t *tree) int tree_nodes = (1 << (l + 1)) - 1; //int tree_nodes = compute_n_nodes(ctx -> world_size); //MPI_DB_PRINT("Tree nodes %d %d %d %d\n", ctx -> world_size,l, tree_nodes, compute_n_nodes(ctx -> world_size)); - tree->_nodes = (top_kdtree_node_t*)malloc(tree_nodes * sizeof(top_kdtree_node_t)); + tree->_nodes = (top_kdtree_node_t*)MY_MALLOC(tree_nodes * sizeof(top_kdtree_node_t)); for(int i = 0; i < tree_nodes; ++i) { tree -> _nodes[i].lch = NULL; @@ -759,8 +760,8 @@ top_kdtree_node_t* top_tree_generate_node(global_context_t* ctx, top_kdtree_t* t ptr -> lch = NULL; ptr -> rch = NULL; ptr -> parent = NULL; - ptr -> lb_node_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); - ptr -> ub_node_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); + ptr -> lb_node_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); + ptr -> ub_node_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); ptr -> owner = -1; ptr -> split_dim = 0.; ++tree -> count; @@ -865,8 +866,8 @@ void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree enqueue_partition(&queue, current_partition); pointset_t current_pointset; - current_pointset.lb_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); - current_pointset.ub_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); + current_pointset.lb_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); + current_pointset.ub_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); while (queue.count) { @@ -1083,9 +1084,9 @@ int partition_data_around_key(int* key, float_t *val, int vec_len, int ref_key , void exchange_points(global_context_t* ctx, top_kdtree_t* tree) { - int* points_per_proc = (int*)malloc(ctx -> world_size * sizeof(int)); - int* points_owners = (int*)malloc(ctx -> dims * ctx -> local_n_points * sizeof(float_t)); - int* partition_offset = (int*)malloc(ctx -> world_size * sizeof(int)); + int* points_per_proc = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* points_owners = (int*)MY_MALLOC(ctx -> dims * ctx -> local_n_points * sizeof(float_t)); + int* partition_offset = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); /* compute owner */ #pragma omp parallel for @@ -1116,9 +1117,9 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree) points_per_proc[i] = points_per_proc[i] - points_per_proc[i - 1]; } - int* rcv_count = (int*)malloc(ctx -> world_size * sizeof(int)); - int* rcv_displs = (int*)malloc(ctx -> world_size * sizeof(int)); - int* send_displs = (int*)malloc(ctx -> world_size * sizeof(int)); + int* rcv_count = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* rcv_displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* send_displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); int* send_count = points_per_proc; float_t* rcvbuffer = NULL; @@ -1129,7 +1130,6 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree) MPI_Barrier(ctx -> mpi_communicator); - /* TODO: change it to an all to all*/ MPI_Alltoall(send_count, 1, MPI_INT, rcv_count, 1, MPI_INT, ctx -> mpi_communicator); rcv_displs[0] = 0; send_displs[0] = 0; @@ -1150,7 +1150,7 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree) tot_count += rcv_count[i]; } - rcvbuffer = (float_t*)malloc(tot_count * sizeof(float_t)); + rcvbuffer = (float_t*)MY_MALLOC(tot_count * sizeof(float_t)); /*exchange points */ @@ -1159,7 +1159,7 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree) ctx -> mpi_communicator); ctx -> local_n_points = tot_count / ctx -> dims; - int* ppp = (int*)malloc(ctx -> world_size * sizeof(int)); + int* ppp = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); MPI_Allgather(&(ctx -> local_n_points), 1, MPI_INT, ppp, 1, MPI_INT, ctx -> mpi_communicator); ctx -> idx_start = 0; @@ -1553,7 +1553,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre TIME_START; MPI_Barrier(ctx -> mpi_communicator); - //ctx -> __local_heap_buffers = (heap_node*)malloc(ctx -> local_n_points * k * sizeof(heap_node)); + //ctx -> __local_heap_buffers = (heap_node*)MY_MALLOC(ctx -> local_n_points * k * sizeof(heap_node)); MPI_Alloc_mem(ctx -> local_n_points * k * sizeof(heap_node), MPI_INFO_NULL, &(ctx -> __local_heap_buffers)); #pragma omp parallel for @@ -1579,18 +1579,18 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre TIME_START; /* find if a points needs a refine on the global tree */ - float_t** data_to_send_per_proc = (float_t**)malloc(ctx -> world_size * sizeof(float_t*)); - int** local_idx_of_the_point = (int**)malloc(ctx -> world_size * sizeof(int*)); - int* point_to_snd_count = (int*)malloc(ctx -> world_size * sizeof(int)); - int* point_to_snd_capacity = (int*)malloc(ctx -> world_size * sizeof(int)); + float_t** data_to_send_per_proc = (float_t**)MY_MALLOC(ctx -> world_size * sizeof(float_t*)); + int** local_idx_of_the_point = (int**)MY_MALLOC(ctx -> world_size * sizeof(int*)); + int* point_to_snd_count = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* point_to_snd_capacity = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); for(int i = 0; i < ctx -> world_size; ++i) { /* allocate it afterwards */ /* OLD VERSION - data_to_send_per_proc[i] = (float_t*)malloc(100 * (ctx -> dims) * sizeof(float_t)); - local_idx_of_the_point[i] = (int*)malloc(100 * sizeof(int)); + data_to_send_per_proc[i] = (float_t*)MY_MALLOC(100 * (ctx -> dims) * sizeof(float_t)); + local_idx_of_the_point[i] = (int*)MY_MALLOC(100 * sizeof(int)); point_to_snd_capacity[i] = 100; */ @@ -1631,8 +1631,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre for(int i = 0; i < ctx -> world_size; ++i) { int np = point_to_snd_capacity[i]; - data_to_send_per_proc[i] = (float_t*)malloc(np * (ctx -> dims) * sizeof(float_t)); - local_idx_of_the_point[i] = (int*)malloc(np * sizeof(int)); + data_to_send_per_proc[i] = (float_t*)MY_MALLOC(np * (ctx -> dims) * sizeof(float_t)); + local_idx_of_the_point[i] = (int*)MY_MALLOC(np * sizeof(int)); } @@ -1651,15 +1651,15 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre LOG_WRITE("Finding points to refine", elapsed_time); TIME_START; - int* point_to_rcv_count = (int*)malloc(ctx -> world_size * sizeof(int)); + int* point_to_rcv_count = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); /* exchange points to work on*/ MPI_Alltoall(point_to_snd_count, 1, MPI_INT, point_to_rcv_count, 1, MPI_INT, ctx -> mpi_communicator); - int* rcv_count = (int*)malloc(ctx -> world_size * sizeof(int)); - int* snd_count = (int*)malloc(ctx -> world_size * sizeof(int)); - int* rcv_displ = (int*)malloc(ctx -> world_size * sizeof(int)); - int* snd_displ = (int*)malloc(ctx -> world_size * sizeof(int)); + int* rcv_count = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* snd_count = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* rcv_displ = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* snd_displ = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); /*compute counts and displs*/ rcv_displ[0] = 0; @@ -1686,8 +1686,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre snd_displ[i] = snd_displ[i - 1] + snd_count[i - 1]; } - float_t* __rcv_points = (float_t*)malloc(tot_points_rcv * (ctx -> dims) * sizeof(float_t)); - float_t* __snd_points = (float_t*)malloc(tot_points_snd * (ctx -> dims) * sizeof(float_t)); + float_t* __rcv_points = (float_t*)MY_MALLOC(tot_points_rcv * (ctx -> dims) * sizeof(float_t)); + float_t* __snd_points = (float_t*)MY_MALLOC(tot_points_snd * (ctx -> dims) * sizeof(float_t)); @@ -1701,7 +1701,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre MPI_Alltoallv(__snd_points, snd_count, snd_displ, MPI_MY_FLOAT, __rcv_points, rcv_count, rcv_displ, MPI_MY_FLOAT, ctx -> mpi_communicator); - float_t** rcv_work_batches = (float_t**)malloc(ctx -> world_size * sizeof(float_t*)); + float_t** rcv_work_batches = (float_t**)MY_MALLOC(ctx -> world_size * sizeof(float_t*)); for(int i = 0; i < ctx -> world_size; ++i) { //rcv_work_batches[i] = NULL; @@ -1716,8 +1716,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre int work_batch_stride = ctx -> dims; /* Note that I then have to recieve an equal number of heaps as the one I sent out before */ - heap_node* __heap_batches_to_snd = (heap_node*)malloc((uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node)); - heap_node* __heap_batches_to_rcv = (heap_node*)malloc((uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node)); + heap_node* __heap_batches_to_snd = (heap_node*)MY_MALLOC((uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node)); + heap_node* __heap_batches_to_rcv = (heap_node*)MY_MALLOC((uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node)); if( __heap_batches_to_rcv == NULL) @@ -1751,7 +1751,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre } - heap_node** heap_batches_per_node = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*)); + heap_node** heap_batches_per_node = (heap_node**)MY_MALLOC(ctx -> world_size * sizeof(heap_node*)); for(int p = 0; p < ctx -> world_size; ++p) { heap_batches_per_node[p] = __heap_batches_to_snd + (uint64_t)rcv_displ[p] * (uint64_t)k; @@ -1771,7 +1771,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre if(point_to_rcv_count[p] > 0 && p != ctx -> mpi_rank) //if(count_rcv_work_batches[p] > 0) { - //heap_batches_per_node[p] = (heap_node*)malloc(k * point_to_rcv_count[p] * sizeof(heap_node)); + //heap_batches_per_node[p] = (heap_node*)MY_MALLOC(k * point_to_rcv_count[p] * sizeof(heap_node)); #pragma omp parallel for for(int batch = 0; batch < point_to_rcv_count[p]; ++batch) { @@ -1787,6 +1787,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre } } } + + HERE /* sendout results */ @@ -1813,7 +1815,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre MPI_Barrier(ctx -> mpi_communicator); MPI_Type_commit(&MPI_my_heap); - heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*)); + heap_node** rcv_heap_batches = (heap_node**)MY_MALLOC(ctx -> world_size * sizeof(heap_node*)); for(int i = 0; i < ctx -> world_size; ++i) { rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k; @@ -1828,8 +1830,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre MPI_Barrier(ctx -> mpi_communicator); int default_msg_len = MAX_MSG_SIZE / (k * sizeof(heap_node)); - int* already_sent_points = (int*)malloc(ctx -> world_size * sizeof(int)); - int* already_rcvd_points = (int*)malloc(ctx -> world_size * sizeof(int)); + int* already_sent_points = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* already_rcvd_points = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); /* allocate a request array to keep track of all requests going out*/ MPI_Request* req_array; @@ -1839,7 +1841,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre req_num += ngbh_to_send[i] > 0 ? ngbh_to_send[i]/default_msg_len + 1 : 0; } - req_array = (MPI_Request*)malloc(req_num * sizeof(MPI_Request)); + req_array = (MPI_Request*)MY_MALLOC(req_num * sizeof(MPI_Request)); for(int i = 0; i < ctx -> world_size; ++i) { @@ -1989,7 +1991,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre void test_the_idea(global_context_t* ctx) { - int* num = (int*)malloc(ctx -> world_size * sizeof(int)); + int* num = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); for(int i = 0; i < ctx -> world_size; ++i) num[i] = i; for(int i = 0; i < ctx -> world_size; ++i) @@ -2031,9 +2033,9 @@ void ordered_data_to_file(global_context_t* ctx) MPI_Barrier(ctx -> mpi_communicator); if(I_AM_MASTER) { - tmp_data = (float_t*)malloc(ctx -> dims * ctx -> n_points * sizeof(float_t)); - ppp = (int*)malloc(ctx -> world_size * sizeof(int)); - displs = (int*)malloc(ctx -> world_size * sizeof(int)); + tmp_data = (float_t*)MY_MALLOC(ctx -> dims * ctx -> n_points * sizeof(float_t)); + ppp = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); } @@ -2076,9 +2078,9 @@ void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, if(I_AM_MASTER) { - tmp_data = (void*)malloc(el_size * tot_n ); - ppp = (int*)malloc(ctx -> world_size * sizeof(int)); - displs = (int*)malloc(ctx -> world_size * sizeof(int)); + tmp_data = (void*)MY_MALLOC(el_size * tot_n ); + ppp = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); } @@ -2153,9 +2155,9 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_i { int k = dp[0].ngbh.count; - idx_t** array_indexes_to_request = (idx_t**)malloc(ctx -> world_size * sizeof(idx_t*)); - int* count_to_request = (int*)malloc(ctx -> world_size * sizeof(int)); - int* capacities = (int*)malloc(ctx -> world_size * sizeof(int)); + idx_t** array_indexes_to_request = (idx_t**)MY_MALLOC(ctx -> world_size * sizeof(idx_t*)); + int* count_to_request = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* capacities = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); for(int i = 0; i < ctx -> world_size; ++i) @@ -2187,7 +2189,7 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_i for(int i = 0; i < ctx -> world_size; ++i) { - array_indexes_to_request[i] = (idx_t*)malloc(capacities[i] * sizeof(idx_t)); + array_indexes_to_request[i] = (idx_t*)MY_MALLOC(capacities[i] * sizeof(idx_t)); } /* append them */ @@ -2209,7 +2211,7 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_i } /* prune them */ - int* unique_count = (int*)malloc(ctx -> world_size * sizeof(int)); + int* unique_count = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); /* if(I_AM_MASTER) @@ -2254,7 +2256,7 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_i /* alias for unique counts */ int* n_heap_to_recv = unique_count; - int* n_heap_to_send = (int*)malloc(ctx -> world_size * sizeof(int)); + int* n_heap_to_send = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); /* exchange how many to recv how many to send */ @@ -2274,8 +2276,8 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_i tot_count_send += n_heap_to_send[i]; tot_count_recv += n_heap_to_recv[i]; } - idx_t* idx_buffer_to_send = (idx_t*)malloc(tot_count_send * sizeof(idx_t)); - idx_t* idx_buffer_to_recv = (idx_t*)malloc(tot_count_recv * sizeof(idx_t)); + idx_t* idx_buffer_to_send = (idx_t*)MY_MALLOC(tot_count_send * sizeof(idx_t)); + idx_t* idx_buffer_to_recv = (idx_t*)MY_MALLOC(tot_count_recv * sizeof(idx_t)); /* copy indexes on send buffer */ for(int i = 0; i < ctx -> world_size; ++i) @@ -2287,8 +2289,8 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_i /* allocate foreign dp */ - heap_node* heap_buffer_to_send = (heap_node*)malloc(tot_count_send * k * sizeof(heap_node)); - heap_node* heap_buffer_to_recv = (heap_node*)malloc(tot_count_recv * k * sizeof(heap_node)); + heap_node* heap_buffer_to_send = (heap_node*)MY_MALLOC(tot_count_send * k * sizeof(heap_node)); + heap_node* heap_buffer_to_recv = (heap_node*)MY_MALLOC(tot_count_recv * k * sizeof(heap_node)); if(!heap_buffer_to_send) { @@ -2315,8 +2317,8 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_i MPI_Barrier(ctx -> mpi_communicator); int default_msg_len = MAX_MSG_SIZE / (k * sizeof(heap_node)); - int* already_sent_points = (int*)malloc(ctx -> world_size * sizeof(int)); - int* already_rcvd_points = (int*)malloc(ctx -> world_size * sizeof(int)); + int* already_sent_points = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* already_rcvd_points = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); /* allocate a request array to keep track of all requests going out*/ MPI_Request* req_array; @@ -2326,7 +2328,7 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_i req_num += n_heap_to_send[i] > 0 ? n_heap_to_send[i]/default_msg_len + 1 : 0; } - req_array = (MPI_Request*)malloc(req_num * sizeof(MPI_Request)); + req_array = (MPI_Request*)MY_MALLOC(req_num * sizeof(MPI_Request)); for(int i = 0; i < ctx -> world_size; ++i) { @@ -2430,12 +2432,12 @@ void find_foreign_nodes(global_context_t* ctx, datapoint_info_t* dp, datapoint_i ctx -> n_halo_points_recv = n_heap_to_recv; ctx -> n_halo_points_send = n_heap_to_send; - ctx -> idx_halo_points_send = (idx_t**)malloc(ctx -> world_size * sizeof(idx_t*)); + ctx -> idx_halo_points_send = (idx_t**)MY_MALLOC(ctx -> world_size * sizeof(idx_t*)); for(int i = 0; i < ctx -> world_size; ++i) ctx -> idx_halo_points_send[i] = NULL; for(int i = 0; i < ctx -> world_size; ++i) { - ctx -> idx_halo_points_send[i] = (idx_t*)malloc( n_heap_to_send[i] * sizeof(idx_t)); + ctx -> idx_halo_points_send[i] = (idx_t*)MY_MALLOC( n_heap_to_send[i] * sizeof(idx_t)); memcpy(ctx -> idx_halo_points_send[i], idx_buffer_to_send + sdispls[i], n_heap_to_send[i] * sizeof(idx_t) ); } @@ -2552,8 +2554,8 @@ float_t id_estimate(global_context_t* ctx, datapoint_info_t* dp_info, idx_t n, f } //float_t fraction = 0.7; - float_t* r = (float_t*)malloc(n*sizeof(float_t)); - float_t* Pemp = (float_t*)malloc(n*sizeof(float_t)); + float_t* r = (float_t*)MY_MALLOC(n*sizeof(float_t)); + float_t* Pemp = (float_t*)MY_MALLOC(n*sizeof(float_t)); for(idx_t i = 0; i < n; ++i) { @@ -2667,6 +2669,31 @@ datapoint_info_t find_possibly_halo_datapoint_rma(global_context_t* ctx, idx_t i datapoint_info_t tmp_dp; MPI_Request request; MPI_Status status; + + MPI_Rget(&tmp_dp, sizeof(datapoint_info_t), MPI_BYTE, owner, + i * sizeof(datapoint_info_t), sizeof(datapoint_info_t), MPI_BYTE, win_datapoints, &request); + MPI_Wait(&request, MPI_STATUS_IGNORE); + + return tmp_dp; + } +} + +datapoint_info_t find_possibly_halo_datapoint_rma_db(global_context_t* ctx, idx_t idx, MPI_Win win_datapoints) +{ + int owner = foreign_owner(ctx, idx); + /* find if datapoint is halo or not */ + if(owner == ctx -> mpi_rank) + { + idx_t i = idx - ctx -> idx_start; + return ctx -> local_datapoints[i]; + } + else + { + idx_t i = idx - ctx -> rank_idx_start[owner]; + datapoint_info_t tmp_dp; + MPI_Request request; + MPI_Status status; + MPI_Rget(&tmp_dp, sizeof(datapoint_info_t), MPI_BYTE, owner, i * sizeof(datapoint_info_t), sizeof(datapoint_info_t), MPI_BYTE, win_datapoints, &request); MPI_Wait(&request, MPI_STATUS_IGNORE); @@ -2763,9 +2790,9 @@ void compute_density_kstarnn(global_context_t* ctx, const float_t d, int verbose #if defined(WRITE_DENSITY) /* densities */ - float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); - float_t* gs = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); - idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t)); + float_t* den = (float_t*)MY_MALLOC(ctx -> local_n_points * sizeof(float_t)); + float_t* gs = (float_t*)MY_MALLOC(ctx -> local_n_points * sizeof(float_t)); + idx_t* ks = (idx_t*)MY_MALLOC(ctx -> local_n_points * sizeof(idx_t)); for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho; for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar; @@ -2807,6 +2834,27 @@ float_t get_j_ksel_dist(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win expo } } +idx_t get_j_ksel_idx(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win exposed_ngbh) +{ + int owner = foreign_owner(ctx, j); + idx_t k = ctx -> k; + /* find if datapoint is halo or not */ + if(owner == ctx -> mpi_rank) + { + idx_t pos = j - ctx -> idx_start; + return ctx -> local_datapoints[pos].ngbh.data[ksel].array_idx; + } + else + { + heap_node el; + idx_t pos = j - ctx -> rank_idx_start[owner]; + MPI_Request request; + int err = MPI_Rget(&el, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, exposed_ngbh, &request); + MPI_Wait(&request,MPI_STATUS_IGNORE); + return el.array_idx; + } +} + void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int verbose){ /* @@ -2852,30 +2900,17 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver MPI_Win_fence(0, exposed_ngbh); MPI_Win_lock_all(0, exposed_ngbh); - //1 642146 87 - //idx_t j = 642146; - //idx_t ksel = 87; - //int owner = foreign_owner(ctx, j); - //heap_node el; - //idx_t pos = j - ctx -> rank_idx_start[owner]; - //MPI_Status status; - //MPI_Win_fence(0, exposed_ngbh); - - ////if(owner != ctx -> mpi_rank) - //for(owner = 0; owner < 2; ++owner) - //{ - // HERE - // MPI_Request request; - // DB_PRINT("-------%p %d rr j %lu k %lu pos %lu s %lu %lu \n", &el, ctx -> mpi_rank, j, ksel, pos, sizeof(heap_node), (pos * k + ksel) * sizeof(heap_node)); - // int err = MPI_Rget(&el, sizeof(heap_node), MPI_BYTE, owner, (pos * k + ksel) * sizeof(heap_node), sizeof(heap_node), MPI_BYTE, exposed_ngbh, &request); - // MPI_Wait(&request, MPI_STATUS_IGNORE); - // print_error_code(err); - // DB_PRINT("nnn %lu %lf\n", el.array_idx, el.value); - //} - MPI_Barrier(ctx -> mpi_communicator); - //#pragma omp parallel for + #pragma omp parallel for + for(idx_t i = 0; i < ctx -> local_n_points; ++i) + { + for(idx_t k = 0; k <= kMAX; ++k) + { + local_datapoints[i].ngbh.data[k].value = omega * pow(local_datapoints[i].ngbh.data[k].value, d/2.); + } + } + for(idx_t i = 0; i < ctx -> local_n_points; ++i) { @@ -2888,7 +2923,8 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver while(j < kMAX && dL < DTHR) { idx_t ksel = j - 1; - vvi = omega * pow(local_datapoints[i].ngbh.data[ksel].value,d/2.); + //vvi = omega * pow(local_datapoints[i].ngbh.data[ksel].value,d/2.); + vvi = local_datapoints[i].ngbh.data[ksel].value; idx_t jj = local_datapoints[i].ngbh.data[j].array_idx; @@ -2896,8 +2932,9 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver * note jj can be an halo point * need to search maybe for it in foreign nodes * */ - float_t dist_jj = get_j_ksel_dist(ctx, jj, ksel, exposed_ngbh); - vvj = omega * pow(dist_jj,d/2.); + //float_t dist_jj = get_j_ksel_dist(ctx, jj, ksel, exposed_ngbh); + //vvj = omega * pow(dist_jj,d/2.); + vvj = get_j_ksel_dist(ctx, jj, ksel, exposed_ngbh); vp = (vvi + vvj)*(vvi + vvj); dL = -2.0 * ksel * log(4.*vvi*vvj/vp); @@ -2906,7 +2943,8 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver if(j == kMAX) { k = j - 1; - vvi = omega * pow(ctx -> local_datapoints[i].ngbh.data[k].value,d/2.); + //vvi = omega * pow(ctx -> local_datapoints[i].ngbh.data[k].value,d/2.); + vvi = ctx -> local_datapoints[i].ngbh.data[k].value; } else { @@ -2933,9 +2971,9 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver #if defined(WRITE_DENSITY) /* densities */ - float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); - float_t* gs = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); - idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t)); + float_t* den = (float_t*)MY_MALLOC(ctx -> local_n_points * sizeof(float_t)); + float_t* gs = (float_t*)MY_MALLOC(ctx -> local_n_points * sizeof(float_t)); + idx_t* ks = (idx_t*)MY_MALLOC(ctx -> local_n_points * sizeof(idx_t)); for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho; for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar; @@ -2954,35 +2992,6 @@ void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int ver } -float_t get_j_ksel_dist_v2(global_context_t* ctx, idx_t i, idx_t j, idx_t ksel, int* flags, heap_node* tmp_heap_nodes, MPI_Win* exposed_ngbh) -{ - if(flags[i]) - { - int owner = foreign_owner(ctx, j); - idx_t k = ctx -> k; - /* find if datapoint is halo or not */ - if(owner == ctx -> mpi_rank) - { - idx_t pos = j - ctx -> idx_start; - return ctx -> local_datapoints[pos].ngbh.data[ksel].value; - } - else - { - //RMA - flags[i] = 0; - idx_t pos = j - ctx -> rank_idx_start[owner]; - MPI_Get(tmp_heap_nodes + i, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, *exposed_ngbh); - //if(ctx -> mpi_rank == 0) DB_PRINT("rvcd %lu %lf\n", el.array_idx, el.value); - return 0; - } - } - else - { - flags[i] = 1; - return tmp_heap_nodes[i].value; - } -} - void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int verbose){ /* @@ -2994,22 +3003,11 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int */ - MPI_Win exposed_ngbh; MPI_Info info; MPI_Barrier(ctx -> mpi_communicator); idx_t k = ctx -> local_datapoints[0].ngbh.N; - MPI_DB_PRINT("%lu %p\n", k, ctx -> __local_heap_buffers); - - - MPI_Win_create( ctx -> __local_heap_buffers, - ctx -> local_n_points * k * sizeof(heap_node), - 1, MPI_INFO_NULL, - ctx -> mpi_communicator, - &exposed_ngbh); - - MPI_Win_fence(0, exposed_ngbh); struct timespec start_tot, finish_tot; double elapsed_tot; @@ -3029,47 +3027,49 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int else{omega = pow(M_PI,d/2.)/tgamma(d/2.0 + 1.0);} - /* - * Iterative, wait after each pass for communications to finish - * - * */ + MPI_Win exposed_ngbh; + MPI_Win_create( ctx -> __local_heap_buffers, + ctx -> local_n_points * k * sizeof(heap_node), + 1, MPI_INFO_NULL, + ctx -> mpi_communicator, + &exposed_ngbh); - int finished = 0; - int fin_num = 0; + MPI_Win_fence(MPI_MODE_NOPUT, exposed_ngbh); - int* flags = (int*)malloc(ctx -> local_n_points * sizeof(int)); - int* last_j = (int*)malloc(ctx -> local_n_points * sizeof(int)); - int* completed = (int*)malloc(ctx -> local_n_points * sizeof(int)); - heap_node* tmp_heap_nodes = (heap_node*)malloc(ctx -> local_n_points * sizeof(heap_node)); + MPI_Barrier(ctx -> mpi_communicator); - for(int i = 0; i < ctx -> local_n_points; ++i) + #pragma omp parallel for + for(idx_t i = 0; i < ctx -> local_n_points; ++i) { - flags[i] = 1; - last_j[i] = 4; - completed[i] = 0; + for(idx_t k = 0; k <= kMAX; ++k) + { + local_datapoints[i].ngbh.data[k].value = omega * pow(local_datapoints[i].ngbh.data[k].value, d/2.); + } + //initialize kstar at 0 + local_datapoints[i].kstar = 0; } + int i_have_finished = 0; + int all_have_finished = 0; + heap_node* scratch_heap_nodes = (heap_node*)MY_MALLOC(ctx -> local_n_points * sizeof(heap_node)); - while(!finished) - { - finished = 1; - #pragma omp parallel for - for(idx_t i = 0; i < ctx -> local_n_points; ++i) + for(idx_t j = 4; j < kMAX - 1; ++j) { - if(!completed[i]) + i_have_finished = 1; + //request data + idx_t ksel = j - 1; + +#if defined(THREAD_FUNNELED) +#else + #pragma omp parallel for +#endif + + for(idx_t i = 0; i < ctx -> local_n_points; ++i) { - idx_t j = last_j[i]; - idx_t k; - float_t dL = 0.; - float_t vvi = 0.; - float_t vvj = 0.; - float_t vp = 0.; - int dl_DTHR_flag = 1; - while(j < kMAX && dl_DTHR_flag) + if(ctx -> local_datapoints[i].kstar == 0) { - idx_t ksel = j - 1; - vvi = omega * pow(local_datapoints[i].ngbh.data[ksel].value,d/2.); + //vvi = omega * pow(local_datapoints[i].ngbh.data[ksel].value,d/2.); idx_t jj = local_datapoints[i].ngbh.data[j].array_idx; @@ -3077,92 +3077,108 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int * note jj can be an halo point * need to search maybe for it in foreign nodes * */ - float_t dist_jj = get_j_ksel_dist_v2(ctx, i, jj, ksel, flags, tmp_heap_nodes, &exposed_ngbh); - if(flags[i]) - { - //if the ngbh node is available compute it else - //you need to wait for comms - vvj = omega * pow(dist_jj,d/2.); - vp = (vvi + vvj)*(vvi + vvj); - dL = -2.0 * ksel * log(4.*vvi*vvj/vp); + int owner = foreign_owner(ctx, jj); + idx_t pos = jj - ctx -> rank_idx_start[owner]; - dl_DTHR_flag = dL < DTHR; - j = j + 1; - last_j[i] = j; + if(owner == ctx -> mpi_rank) + { + scratch_heap_nodes[i] = ctx -> local_datapoints[pos].ngbh.data[ksel]; } else { - break; + MPI_Get(scratch_heap_nodes + i, + sizeof(heap_node), + MPI_BYTE, + owner, + (MPI_Aint)((pos * (ctx -> k) + ksel) * sizeof(heap_node)), + sizeof(heap_node), + MPI_BYTE, + exposed_ngbh); } } - if(j == kMAX || !dl_DTHR_flag) + + } + + MPI_Win_fence(MPI_MODE_NOPUT,exposed_ngbh); + //process data +#if defined(THREAD_FUNNELED) +#else + #pragma omp parallel for +#endif + for(idx_t i = 0; i < ctx -> local_n_points; ++i) + { + if(ctx -> local_datapoints[i].kstar == 0) { + float_t vvi, vvj, vp, dL; + vvi = local_datapoints[i].ngbh.data[ksel].value; + vvj = scratch_heap_nodes[i].value; + vp = (vvi + vvj)*(vvi + vvj); + dL = -2.0 * ksel * log(4.*vvi*vvj/vp); - if(j == kMAX) + if(dL > DTHR) { - k = j - 1; - vvi = omega * pow(ctx -> local_datapoints[i].ngbh.data[k].value,d/2.); + idx_t k = j - 1; + local_datapoints[i].kstar = k; + local_datapoints[i].log_rho = log((float_t)(k)/vvi/((float_t)(ctx -> n_points))); + local_datapoints[i].log_rho_err = 1.0/sqrt((float_t)k); //(float_t)(-Q_rsqrt((float)k)); + local_datapoints[i].g = local_datapoints[i].log_rho - local_datapoints[i].log_rho_err; } else { - k = j - 2; + #pragma omp atomic write + i_have_finished = 0; } - local_datapoints[i].kstar = k; - local_datapoints[i].log_rho = log((float_t)(k)/vvi/((float_t)(ctx -> n_points))); - //dp_info[i].log_rho = log((float_t)(k)) - log(vvi) -log((float_t)(points)); - local_datapoints[i].log_rho_err = 1.0/sqrt((float_t)k); //(float_t)(-Q_rsqrt((float)k)); - local_datapoints[i].g = local_datapoints[i].log_rho - local_datapoints[i].log_rho_err; - - completed[i] = 1; - #pragma omp atomic update - finished = finished & 1; - - #pragma omp atomic update - fin_num++; - } - else - { - completed[i] = 0; - #pragma omp atomic update - finished = finished & 0; } + } + + MPI_Allreduce(&i_have_finished, &all_have_finished, 1, MPI_INT, MPI_LAND, ctx -> mpi_communicator); + + if(all_have_finished) break; } - //DB_PRINT("Rank %d fin %d/%lu\n", ctx -> mpi_rank, fin_num, ctx -> local_n_points); - //call the fence to get out all results - MPI_Win_fence(0, exposed_ngbh); - MPI_Allreduce(MPI_IN_PLACE, &finished, 1, MPI_INT, MPI_LAND, ctx -> mpi_communicator); - } - if(verbose) - { - clock_gettime(CLOCK_MONOTONIC, &finish_tot); - elapsed_tot = (finish_tot.tv_sec - start_tot.tv_sec); - elapsed_tot += (finish_tot.tv_nsec - start_tot.tv_nsec) / 1000000000.0; - printf("\tTotal time: %.3lfs\n\n", elapsed_tot); - } +#if defined(THREAD_FUNNELED) +#else + #pragma omp parallel for +#endif + for(idx_t i = 0; i < ctx -> local_n_points; ++i) + { + if(ctx -> local_datapoints[i].kstar == 0) + { + idx_t k = kMAX - 1; + + float_t vvi = ctx -> local_datapoints[i].ngbh.data[k].value; + + local_datapoints[i].kstar = k; + local_datapoints[i].log_rho = log((float_t)(k)/vvi/((float_t)(ctx -> n_points))); + local_datapoints[i].log_rho_err = 1.0/sqrt((float_t)k); //(float_t)(-Q_rsqrt((float)k)); + local_datapoints[i].g = local_datapoints[i].log_rho - local_datapoints[i].log_rho_err; + } + } + - free(flags); - free(tmp_heap_nodes); - free(completed); - free(last_j); - MPI_Win_fence(0, exposed_ngbh); - MPI_Barrier(ctx -> mpi_communicator); MPI_Win_free(&exposed_ngbh); + free(scratch_heap_nodes); + #if defined(WRITE_DENSITY) /* densities */ - float_t* den = (float_t*)malloc(ctx -> local_n_points * sizeof(float_t)); - idx_t* ks = (idx_t*)malloc(ctx -> local_n_points * sizeof(idx_t)); + float_t* den = (float_t*)MY_MALLOC(ctx -> local_n_points * sizeof(float_t)); + float_t* gs = (float_t*)MY_MALLOC(ctx -> local_n_points * sizeof(float_t)); + idx_t* ks = (idx_t*)MY_MALLOC(ctx -> local_n_points * sizeof(idx_t)); + for(int i = 0; i < ctx -> local_n_points; ++i) den[i] = ctx -> local_datapoints[i].log_rho; for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].kstar; + for(int i = 0; i < ctx -> local_n_points; ++i) gs[i] = ctx -> local_datapoints[i].g; ordered_buffer_to_file(ctx, den, sizeof(float_t), ctx -> local_n_points, "bb/ordered_density.npy"); ordered_buffer_to_file(ctx, ks, sizeof(idx_t), ctx -> local_n_points, "bb/ks.npy"); + ordered_buffer_to_file(ctx, gs, sizeof(float_t), ctx -> local_n_points, "bb/g.npy"); + ordered_data_to_file(ctx); free(den); free(ks); @@ -3172,6 +3188,35 @@ void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int } +float_t get_j_ksel_dist_v2(global_context_t* ctx, idx_t i, idx_t j, idx_t ksel, int* flags, heap_node* tmp_heap_nodes, MPI_Win* exposed_ngbh) +{ + if(flags[i]) + { + int owner = foreign_owner(ctx, j); + idx_t k = ctx -> k; + /* find if datapoint is halo or not */ + if(owner == ctx -> mpi_rank) + { + idx_t pos = j - ctx -> idx_start; + return ctx -> local_datapoints[pos].ngbh.data[ksel].value; + } + else + { + //RMA + flags[i] = 0; + idx_t pos = j - ctx -> rank_idx_start[owner]; + MPI_Get(tmp_heap_nodes + i, sizeof(heap_node), MPI_BYTE, owner, (MPI_Aint)((pos * k + ksel) * sizeof(heap_node)), sizeof(heap_node), MPI_BYTE, *exposed_ngbh); + //if(ctx -> mpi_rank == 0) DB_PRINT("rvcd %lu %lf\n", el.array_idx, el.value); + return 0; + } + } + else + { + flags[i] = 1; + return tmp_heap_nodes[i].value; + } +} + void clusters_allocate(clusters_t * c, int s) { /* @@ -3184,17 +3229,18 @@ void clusters_allocate(clusters_t * c, int s) } idx_t nclus = c -> centers.count; + if(s) { //printf("Using sparse implementation\n"); c -> use_sparse_borders = 1; - c -> sparse_borders = (adj_list_t*)malloc(nclus*sizeof(adj_list_t)); + c -> sparse_borders = (adj_list_t*)MY_MALLOC(nclus*sizeof(adj_list_t)); for(idx_t i = 0; i < nclus; ++i) { c -> sparse_borders[i].count = 0; c -> sparse_borders[i].size = PREALLOC_BORDERS; - c -> sparse_borders[i].data = (sparse_border_t*)malloc(PREALLOC_BORDERS*sizeof(sparse_border_t)); + c -> sparse_borders[i].data = (sparse_border_t*)MY_MALLOC(PREALLOC_BORDERS*sizeof(sparse_border_t)); } } @@ -3202,8 +3248,8 @@ void clusters_allocate(clusters_t * c, int s) { //printf("Using dense implementation\n"); c -> use_sparse_borders = 0; - c -> __borders_data = (border_t*)malloc(nclus*nclus*sizeof(border_t)); - c -> borders = (border_t**)malloc(nclus*sizeof(border_t*)); + c -> __borders_data = (border_t*)MY_MALLOC(nclus*nclus*sizeof(border_t)); + c -> borders = (border_t**)MY_MALLOC(nclus*sizeof(border_t*)); #pragma omp parallel for @@ -3430,7 +3476,7 @@ clusters_t Heuristic1_rma(global_context_t *ctx, int verbose) lu_dynamic_array_allocate(&actual_centers); lu_dynamic_array_allocate(&max_rho); - datapoint_info_t** dp_info_ptrs = (datapoint_info_t**)malloc(n*sizeof(datapoint_info_t*)); + datapoint_info_t** dp_info_ptrs = (datapoint_info_t**)MY_MALLOC(n*sizeof(datapoint_info_t*)); struct timespec start, finish; double elapsed; @@ -3449,6 +3495,10 @@ clusters_t Heuristic1_rma(global_context_t *ctx, int verbose) MPI_Win_fence(0, win_datapoints); MPI_Win_lock_all(0, win_datapoints); +#if defined(THREAD_FUNNELED) +#else + #pragma omp parallel for +#endif for(idx_t i = 0; i < n; ++i) { @@ -3499,7 +3549,7 @@ clusters_t Heuristic1_rma(global_context_t *ctx, int verbose) * ends, center, removed centers, and max_rho arrays are populated */ - heap_node* to_remove_mask = (heap_node*)malloc(n*sizeof(heap_node)); + heap_node* to_remove_mask = (heap_node*)MY_MALLOC(n*sizeof(heap_node)); for(idx_t p = 0; p < n; ++p) { to_remove_mask[p].array_idx = MY_SIZE_MAX; @@ -3530,7 +3580,10 @@ clusters_t Heuristic1_rma(global_context_t *ctx, int verbose) * then close */ - //#pragma omp parallel for +#if defined(THREAD_FUNNELED) +#else + #pragma omp parallel for +#endif for(idx_t p = 0; p < n; ++p) { datapoint_info_t i_point = *(dp_info_ptrs[p]); @@ -3642,9 +3695,9 @@ clusters_t Heuristic1_rma(global_context_t *ctx, int verbose) * then re-scatter them around to get unique cluster labels */ - center_t* private_centers_buffer = (center_t*)malloc(actual_centers.count * sizeof(center_t)); + center_t* private_centers_buffer = (center_t*)MY_MALLOC(actual_centers.count * sizeof(center_t)); - center_t* global_centers_buffer = (center_t*)malloc(tot_centers * sizeof(center_t)); + center_t* global_centers_buffer = (center_t*)MY_MALLOC(tot_centers * sizeof(center_t)); for(int i = 0; i < actual_centers.count; ++i) { @@ -3657,8 +3710,8 @@ clusters_t Heuristic1_rma(global_context_t *ctx, int verbose) MPI_Type_contiguous(sizeof(center_t), MPI_BYTE, &mpi_center_type); MPI_Type_commit(&mpi_center_type); - int* center_counts = (int*)malloc(ctx -> world_size * sizeof(int)); - int* center_displs = (int*)malloc(ctx -> world_size * sizeof(int)); + int* center_counts = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* center_displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); int cc = (int)actual_centers.count; MPI_Allgather(&cc, 1, MPI_INT, center_counts, 1, MPI_INT, ctx -> mpi_communicator); @@ -3674,6 +3727,10 @@ clusters_t Heuristic1_rma(global_context_t *ctx, int verbose) } + idx_t* all_center_idx = (idx_t*)MY_MALLOC(tot_centers * sizeof(idx_t)); + + MPI_Allgatherv(actual_centers.data, actual_centers.count, MPI_UINT64_T, all_center_idx, center_counts, center_displs, MPI_UINT64_T, ctx -> mpi_communicator); + free(center_counts); free(center_displs); @@ -3793,7 +3850,7 @@ clusters_t Heuristic1_rma(global_context_t *ctx, int verbose) #if defined(WRITE_CLUSTER_ASSIGN_H1) /* densities */ - int* ks = (int*)malloc(ctx -> local_n_points * sizeof(int)); + int* ks = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int)); for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].cluster_idx; ordered_buffer_to_file(ctx, ks, sizeof(int), ctx -> local_n_points, "bb/cl.npy"); @@ -3802,10 +3859,17 @@ clusters_t Heuristic1_rma(global_context_t *ctx, int verbose) #endif + free(actual_centers.data); + actual_centers.size = tot_centers; + actual_centers.count = tot_centers; + actual_centers.data = all_center_idx; + + clusters_t c_all; c_all.centers = actual_centers; + if(verbose) { clock_gettime(CLOCK_MONOTONIC, &finish); @@ -3861,7 +3925,7 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) lu_dynamic_array_allocate(&actual_centers); lu_dynamic_array_allocate(&max_rho); - datapoint_info_t** dp_info_ptrs = (datapoint_info_t**)malloc(n*sizeof(datapoint_info_t*)); + datapoint_info_t** dp_info_ptrs = (datapoint_info_t**)MY_MALLOC(n*sizeof(datapoint_info_t*)); struct timespec start, finish; double elapsed; @@ -3890,10 +3954,10 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) tot_count_recv += ctx -> n_halo_points_recv[i]; } - float_t* gs_send = (float_t*)malloc(tot_count_send * sizeof(float_t)); - float_t* gs_recv = (float_t*)malloc(tot_count_recv * sizeof(float_t)); - idx_t* ks_send = (idx_t*)malloc(tot_count_send * sizeof(idx_t)); - idx_t* ks_recv = (idx_t*)malloc(tot_count_recv * sizeof(idx_t)); + float_t* gs_send = (float_t*)MY_MALLOC(tot_count_send * sizeof(float_t)); + float_t* gs_recv = (float_t*)MY_MALLOC(tot_count_recv * sizeof(float_t)); + idx_t* ks_send = (idx_t*)MY_MALLOC(tot_count_send * sizeof(idx_t)); + idx_t* ks_recv = (idx_t*)MY_MALLOC(tot_count_recv * sizeof(idx_t)); /* pack it in gsend */ for(int i = 0; i < ctx -> world_size; ++i) @@ -3986,7 +4050,7 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) * ends, center, removed centers, and max_rho arrays are populated */ - idx_t* to_remove_mask = (idx_t*)malloc(n*sizeof(idx_t)); + idx_t* to_remove_mask = (idx_t*)MY_MALLOC(n*sizeof(idx_t)); for(idx_t p = 0; p < n; ++p) {to_remove_mask[p] = MY_SIZE_MAX;} qsort(dp_info_ptrs, n, sizeof(datapoint_info_t*), cmpPP); @@ -4151,9 +4215,9 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) * then re-scatter them around to get unique cluster labels */ - center_t* private_centers_buffer = (center_t*)malloc(actual_centers.count * sizeof(center_t)); + center_t* private_centers_buffer = (center_t*)MY_MALLOC(actual_centers.count * sizeof(center_t)); - center_t* global_centers_buffer = (center_t*)malloc(tot_centers * sizeof(center_t)); + center_t* global_centers_buffer = (center_t*)MY_MALLOC(tot_centers * sizeof(center_t)); for(int i = 0; i < actual_centers.count; ++i) { @@ -4168,8 +4232,8 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) MPI_Type_contiguous(sizeof(center_t), MPI_BYTE, &mpi_center_type); MPI_Type_commit(&mpi_center_type); - int* center_counts = (int*)malloc(ctx -> world_size * sizeof(int)); - int* center_displs = (int*)malloc(ctx -> world_size * sizeof(int)); + int* center_counts = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); + int* center_displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int)); int cc = (int)actual_centers.count; MPI_Allgather(&cc, 1, MPI_INT, center_counts, 1, MPI_INT, ctx -> mpi_communicator); @@ -4185,13 +4249,9 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) } - //MPI_Gatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int *recvcounts, const int *displs, MPI_Datatype recvtype, int root, MPI_Comm comm) - // - /* MPI_Gatherv(private_centers_buffer, (int)actual_centers.count, mpi_center_type, global_centers_buffer, center_counts, center_displs , mpi_center_type, 0, ctx -> mpi_communicator); - HERE if(I_AM_MASTER) { @@ -4200,7 +4260,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) for(int i = 0; i < tot_centers; ++i) global_centers_buffer[i].cluster_idx = i; } - HERE MPI_Barrier(ctx -> mpi_communicator); MPI_Bcast(global_centers_buffer, tot_centers, mpi_center_type, 0, ctx -> mpi_communicator); @@ -4218,7 +4277,7 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) free(private_centers_buffer); free(global_centers_buffer); - */ + free(center_counts); free(center_displs); @@ -4236,8 +4295,8 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) int completed = 0; - int* cl_send = (int*)malloc(tot_count_send * sizeof(int)); - int* cl_recv = (int*)malloc(tot_count_recv * sizeof(int)); + int* cl_send = (int*)MY_MALLOC(tot_count_send * sizeof(int)); + int* cl_recv = (int*)MY_MALLOC(tot_count_recv * sizeof(int)); @@ -4286,34 +4345,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) while( k < max_k - 1 && cluster == -1) { - /* - ++k; - p_idx = p -> ngbh.data[k].array_idx; - datapoint_info_t p_retrieved = find_possibly_halo_datapoint(ctx, p_idx); - - int flag = (p_retrieved.g > p -> g ) - (p_retrieved.g < p -> g); - int bf = 0; - - switch (flag) - { - case 1: - cluster = p_retrieved.cluster_idx; - wait_for_comms = 1; - bf = 1; - break; - case 0: - cluster = p_retrieved.cluster_idx; - wait_for_comms = cluster == -1 ? 0 : 1; - bf = 0; - break; - case -1: - break; - default: - break; - } - if(bf) break; - */ - ++k; p_idx = p -> ngbh.data[k].array_idx; @@ -4329,20 +4360,6 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) } } - /* - if(p -> array_idx == 858931) - { - for(int k = 0; k < max_k ; ++k) - { - p_idx = p -> ngbh.data[k].array_idx; - datapoint_info_t p_retrieved = find_possibly_halo_datapoint(ctx, p_idx); - DB_PRINT("%lu p %d %lf %lf %d %d\n", p_retrieved.array_idx, p_retrieved.cluster_idx, p_retrieved.g, - p -> ngbh.data[k].value, - p_retrieved.g < p -> g, - p_retrieved.g >= p -> g); - } - } - */ if(cluster == -1 && !wait_for_comms) { @@ -4418,7 +4435,7 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) #if defined(WRITE_CLUSTER_ASSIGN_H1) /* densities */ - int* ks = (int*)malloc(ctx -> local_n_points * sizeof(int)); + int* ks = (int*)MY_MALLOC(ctx -> local_n_points * sizeof(int)); for(int i = 0; i < ctx -> local_n_points; ++i) ks[i] = ctx -> local_datapoints[i].cluster_idx; ordered_buffer_to_file(ctx, ks, sizeof(int), ctx -> local_n_points, "bb/cl.npy"); @@ -4455,6 +4472,259 @@ clusters_t Heuristic1(global_context_t *ctx, int verbose) return c_all; } +void Heuristic2_rma(global_context_t* ctx, clusters_t* cluster) +{ + /* + * Port the current implementation to this using rma + * then perform the matrix reduction + * + * Each one computes its borders, then the borders are shared, and the matrix is + * reduced to a single huge matrix of borders + * + */ + + int verbose = 0; + + datapoint_info_t* dp_info = ctx -> local_datapoints; + + MPI_Win dp_info_win, ngbh_win; + MPI_Win_create(ctx -> local_datapoints, ctx -> local_n_points * sizeof(datapoint_info_t), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &dp_info_win); + MPI_Win_create(ctx -> __local_heap_buffers, ctx -> local_n_points * ctx -> k * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &ngbh_win); + + MPI_Win_fence(0, dp_info_win); + MPI_Win_fence(0, ngbh_win); + + MPI_Win_lock_all(0, dp_info_win); + MPI_Win_lock_all(0, ngbh_win); + MPI_Barrier(ctx -> mpi_communicator); + + + + #define borders cluster->borders + + struct timespec start_tot, finish_tot; + double elapsed_tot; + idx_t n = ctx -> local_n_points; + + if(verbose) + { + printf("H2: Finding border points\n"); + clock_gettime(CLOCK_MONOTONIC, &start_tot); + } + + idx_t nclus = cluster->centers.count; + idx_t max_k = dp_info[0].ngbh.N; + + for(idx_t i = 0; i < n; ++i) + { + idx_t pp = NOBORDER; + /*loop over n neighbors*/ + int c = dp_info[i].cluster_idx; + if(!dp_info[i].is_center) + { + for(idx_t k = 1; k < dp_info[i].kstar + 1; ++k) + { + /*index of the kth ngbh of n*/ + idx_t j = dp_info[i].ngbh.data[k].array_idx; + pp = NOBORDER; + /*Loop over kn neigbhours to find if n is the nearest*/ + /*if cluster of the particle in nbhg is c then check is neighborhood*/ + + //DB_PRINT("my rank %d l %lu %lu %d\n", ctx -> mpi_rank, k, j, foreign_owner(ctx, j)); + datapoint_info_t j_dp = find_possibly_halo_datapoint_rma(ctx, j, dp_info_win); + if(j_dp.cluster_idx != c) + { + pp = j; + break; + } + + } + } + + if(pp != NOBORDER) + { + datapoint_info_t pp_dp = find_possibly_halo_datapoint_rma(ctx, pp, dp_info_win); + for(idx_t k = 1; k < max_k; ++k) + { + //TODO: can optimize it can retrieve the whole ngbh in one shot + idx_t pp_ngbh_idx = get_j_ksel_idx(ctx, pp_dp.array_idx, k, ngbh_win); + if(pp_ngbh_idx == dp_info[i].array_idx) + { + break; + } + + datapoint_info_t pp_ngbh_dp = find_possibly_halo_datapoint_rma(ctx, pp_ngbh_idx, ngbh_win); + if(pp_ngbh_dp.cluster_idx == c) + { + pp = NOBORDER; + break; + } + } + } + + /*if it is the maximum one add it to the cluster*/ + if(pp != NOBORDER) + { + datapoint_info_t j_dp = find_possibly_halo_datapoint_rma(ctx, pp, dp_info_win); + int ppc = j_dp.cluster_idx; + //insert one and symmetric one + sparse_border_t b = {.i = c, .j = ppc, .idx = ctx -> local_datapoints[i].array_idx, .density = ctx -> local_datapoints[i].g, .error = ctx -> local_datapoints[i].log_rho_err}; + sparse_border_insert(cluster, b); + ////get symmetric border + sparse_border_t bsym = {.i = ppc, .j = c, .idx = ctx -> local_datapoints[i].array_idx, .density = ctx -> local_datapoints[i].g, .error = ctx -> local_datapoints[i].log_rho_err}; + sparse_border_insert(cluster, bsym); + } + + + } + + + for(idx_t c = 0; c < nclus; ++c) + { + for(idx_t el = 0; el < cluster -> sparse_borders[c].count; ++el) + { + //fix border density, write log rho c + idx_t idx = cluster -> sparse_borders[c].data[el].idx; + datapoint_info_t idx_dp = find_possibly_halo_datapoint_rma(ctx, idx, dp_info_win); + cluster -> sparse_borders[c].data[el].density = idx_dp.log_rho_c; + } + } + + + if(verbose) + { + clock_gettime(CLOCK_MONOTONIC, &finish_tot); + elapsed_tot = (finish_tot.tv_sec - start_tot.tv_sec); + elapsed_tot += (finish_tot.tv_nsec - start_tot.tv_nsec) / 1000000000.0; + printf("\tTotal time: %.3lfs\n\n", elapsed_tot); + } + + MPI_Barrier(ctx -> mpi_communicator); + MPI_Win_unlock_all(ngbh_win); + MPI_Win_unlock_all(dp_info_win); + + + /* Border reduce */ + + idx_t num_border_el = 0; + for(idx_t i = 0; i < nclus; ++i) + { + num_border_el += cluster -> sparse_borders[i].count; + } + + MPI_DB_PRINT("Master found %lu border elements\n", num_border_el); + + int i_have_sent = 0; + int level = 1; + int ranks = ctx -> world_size; + + #define SEND 1 + #define RECV 0 + #define DO_NOTHING -1 + + MPI_Barrier(ctx -> mpi_communicator); + + while(ranks > 1) + { + int dp = ranks % 2; + ranks = ranks / 2 + dp; + int send_rcv = ctx -> mpi_rank >= ranks; + + if(dp && ctx -> mpi_rank == ranks - 1) send_rcv = DO_NOTHING; + + switch (send_rcv) + { + case SEND: + if(!i_have_sent) + { + idx_t num_border_el = 0; + for(idx_t i = 0; i < nclus; ++i) + { + num_border_el += cluster -> sparse_borders[i].count; + } + sparse_border_t* borders_to_send = (sparse_border_t*)MY_MALLOC(num_border_el * sizeof(sparse_border_t)); + + idx_t count = 0; + for(idx_t i = 0; i < nclus; ++i) + { + for(idx_t j = 0; j < cluster -> sparse_borders[i].count; ++j) + { + borders_to_send[count] = cluster -> sparse_borders[i].data[j]; + count++; + } + } + + int rank_to_send = ctx -> mpi_rank - ranks; + DB_PRINT("-- Rank %d sending to %d\n", ctx -> mpi_rank, rank_to_send); + + MPI_Send(&num_border_el, 1, MPI_UINT64_T, rank_to_send, rank_to_send, ctx -> mpi_communicator); + + MPI_Send(borders_to_send, num_border_el * sizeof(sparse_border_t), MPI_BYTE , rank_to_send, rank_to_send, ctx -> mpi_communicator); + + i_have_sent = 1; + free(borders_to_send); + } + break; + case RECV: + { + DB_PRINT("** Rank %d recieving\n", ctx -> mpi_rank); + idx_t num_borders_recv = 0; + MPI_Recv(&num_borders_recv, 1, MPI_UINT64_T, MPI_ANY_SOURCE, ctx -> mpi_rank, ctx -> mpi_communicator, MPI_STATUS_IGNORE); + + sparse_border_t* borders_to_recv = (sparse_border_t*)MY_MALLOC(num_borders_recv * sizeof(sparse_border_t)); + + MPI_Recv(borders_to_recv, num_borders_recv * sizeof(sparse_border_t), MPI_BYTE , MPI_ANY_SOURCE, ctx -> mpi_rank, ctx -> mpi_communicator, MPI_STATUS_IGNORE); + + for(int i = 0; i < num_borders_recv; ++i) + { + sparse_border_insert(cluster, borders_to_recv[i]); + } + free(borders_to_recv); + } + break; + default: + DB_PRINT(".. Rank %d doing nothing\n", ctx -> mpi_rank); + break; + } + MPI_Barrier(ctx -> mpi_communicator); + MPI_DB_PRINT("-----------------\n"); + + } + + num_border_el = 0; + for(idx_t i = 0; i < nclus; ++i) + { + num_border_el += cluster -> sparse_borders[i].count; + } + + MPI_DB_PRINT("Master final %lu border elements\n", num_border_el); + + #undef SEND + #undef RECV + + #ifdef WRITE_BORDERS + if(I_AM_MASTER) + { + FILE* f = fopen("bb/borders.csv", "w"); + for(idx_t i = 0; i < nclus; ++i) + { + for(int j = 0; j < cluster -> sparse_borders[i].count; ++j) + { + sparse_border_t b = cluster -> sparse_borders[i].data[j]; + fprintf(f, "%lu,%lu,%lu,%lf\n", b.i, b.j, b.idx, b.density); + } + } + fclose(f); + } + + + #endif + + return; + #undef borders + + } + void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) { float_t *data; @@ -4490,8 +4760,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) // data = read_data_file(ctx,"../norm_data/std_g1212639_091_0001",MY_TRUE); ctx->dims = 5; - // ctx -> n_points = 5 * 2000; - ctx->n_points = ctx->n_points / ctx->dims; + ctx -> n_points = 5 * 100000; + //ctx->n_points = ctx->n_points / ctx->dims; //ctx->n_points = (ctx->n_points * 5) / 10; // ctx -> n_points = ctx -> world_size * 1000; @@ -4506,8 +4776,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) MPI_Bcast(&(ctx->n_points), 1, MPI_UINT64_T, 0, ctx->mpi_communicator); /* compute the number of elements to recieve for each processor */ - int *send_counts = (int *)malloc(ctx->world_size * sizeof(int)); - int *displacements = (int *)malloc(ctx->world_size * sizeof(int)); + int *send_counts = (int *)MY_MALLOC(ctx->world_size * sizeof(int)); + int *displacements = (int *)MY_MALLOC(ctx->world_size * sizeof(int)); displacements[0] = 0; send_counts[0] = ctx->n_points / ctx->world_size; @@ -4525,7 +4795,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) ctx->local_n_points = send_counts[ctx->mpi_rank] / ctx->dims; - float_t *pvt_data = (float_t *)malloc(send_counts[ctx->mpi_rank] * sizeof(float_t)); + float_t *pvt_data = (float_t *)MY_MALLOC(send_counts[ctx->mpi_rank] * sizeof(float_t)); MPI_Scatterv(data, send_counts, displacements, MPI_MY_FLOAT, pvt_data, send_counts[ctx->mpi_rank], MPI_MY_FLOAT, 0, ctx->mpi_communicator); @@ -4534,14 +4804,14 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) int k_local = 20; int k_global = 20; - uint64_t *global_bin_counts_int = (uint64_t *)malloc(k_global * sizeof(uint64_t)); + uint64_t *global_bin_counts_int = (uint64_t *)MY_MALLOC(k_global * sizeof(uint64_t)); pointset_t original_ps; original_ps.data = ctx->local_data; original_ps.dims = ctx->dims; original_ps.n_points = ctx->local_n_points; - original_ps.lb_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); - original_ps.ub_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); + original_ps.lb_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); + original_ps.ub_box = (float_t*)MY_MALLOC(ctx -> dims * sizeof(float_t)); float_t tol = 0.002; @@ -4564,7 +4834,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) int k = 300; //int k = 30; - datapoint_info_t* dp_info = (datapoint_info_t*)malloc(ctx -> local_n_points * sizeof(datapoint_info_t)); + datapoint_info_t* dp_info = (datapoint_info_t*)MY_MALLOC(ctx -> local_n_points * sizeof(datapoint_info_t)); /* initialize, to cope with valgrind */ for(uint64_t i = 0; i < ctx -> local_n_points; ++i) { @@ -4607,7 +4877,7 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) //TIME_START; - //datapoint_info_t** foreign_dp_info = (datapoint_info_t**)malloc(ctx -> world_size * sizeof(datapoint_info_t*)); + //datapoint_info_t** foreign_dp_info = (datapoint_info_t**)MY_MALLOC(ctx -> world_size * sizeof(datapoint_info_t*)); //find_foreign_nodes(ctx, dp_info, foreign_dp_info); //elapsed_time = TIME_STOP; //LOG_WRITE("Finding points to request the ngbh", elapsed_time) @@ -4615,17 +4885,23 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) TIME_START; ctx -> local_datapoints = dp_info; - //compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE); - compute_density_kstarnn_rma(ctx, id, MY_FALSE); + //compute_density_kstarnn_rma(ctx, id, MY_FALSE); + compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE); //compute_density_kstarnn(ctx, id, MY_FALSE); compute_correction(ctx, 2); elapsed_time = TIME_STOP; LOG_WRITE("Density estimate", elapsed_time) TIME_START; - Heuristic1_rma(ctx, MY_FALSE); + clusters_t clusters = Heuristic1_rma(ctx, MY_FALSE); elapsed_time = TIME_STOP; LOG_WRITE("H1", elapsed_time) + + TIME_START; + clusters_allocate(&clusters, 1); + Heuristic2_rma(ctx, &clusters); + elapsed_time = TIME_STOP; + LOG_WRITE("H2", elapsed_time) /* find density */