diff --git a/.clangd b/.clangd new file mode 100644 index 0000000000000000000000000000000000000000..64bc6680fe6ad2faaeb7d38717a363b47cc6bf8e --- /dev/null +++ b/.clangd @@ -0,0 +1,2 @@ +CompileFlags: + Add: -I/usr/lib/x86_64-linux-gnu/openmpi/include diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..a2931121c074ec76fddab7a5c4610173f9547495 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +main +sync.sh diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/Makefile b/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..1814ba911e8644467bb8f7d761021c4d41d00749 --- /dev/null +++ b/Makefile @@ -0,0 +1,10 @@ +CC=mpicc +CFLAGS=-O3 -march=native +LDFLAGS=-lm + +all: main + +obj=src/main/main.c src/tree/tree.c src/common/common.c + +main: ${obj} + ${CC} ${CFLAGS} ${LDFLAGS} ${obj} -o $@ diff --git a/README b/README new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/added b/added new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/common/common.c b/src/common/common.c new file mode 100644 index 0000000000000000000000000000000000000000..429b7a31cf65e32c5d2548b353795f8e19e18aaa --- /dev/null +++ b/src/common/common.c @@ -0,0 +1,92 @@ +#include "common.h" +#include "mpi.h" +#include <time.h> + +void get_context(global_context_t* ctx) +{ + MPI_Comm_size(ctx -> mpi_communicator, &(ctx -> world_size)); + MPI_Get_processor_name(ctx -> processor_mame, &(ctx -> __processor_name_len)); + MPI_Comm_rank(ctx -> mpi_communicator, &(ctx -> mpi_rank)); + ctx -> local_data = NULL; + ctx -> lb_box = NULL; + ctx -> ub_box = NULL; +} + +void free_context(global_context_t* ctx) +{ + if(ctx -> local_data) + { + free(ctx -> local_data); + ctx -> local_data = NULL; + } + + if(ctx -> ub_box) + { + free(ctx -> ub_box); + ctx -> ub_box = NULL; + } + + if(ctx -> lb_box) + { + free(ctx -> lb_box); + ctx -> lb_box = NULL; + } + +} + +void free_pointset(pointset_t* ps) +{ + if(ps -> data) + { + free(ps -> data); + ps -> data = NULL; + } + + if(ps -> ub_box) + { + free(ps -> ub_box); + ps -> ub_box = NULL; + } + + if(ps -> lb_box) + { + free(ps -> lb_box); + ps -> lb_box = NULL; + } +} + + +void mpi_printf(global_context_t* ctx, const char *fmt, ...) +{ + if(ctx -> mpi_rank == 0) + { + va_list l; + va_start(l, fmt); +// printf("[MASTER]: "); + vprintf(fmt, l); + // myflush(stdout); + va_end(l); + } +} + +void generate_random_matrix( + float_t** data, + int dimensions, + size_t nmin, + size_t nmax, + global_context_t* ctx) +{ + /* seed the random number generator */ + srand((unsigned)time(NULL) + ctx -> mpi_rank * ctx -> world_size + ctx -> __processor_name_len); + + size_t n = rand() % (nmax - nmin) + nmin; + float_t* local_data = (float_t*)malloc(dimensions*n*sizeof(float_t)); + for(size_t i = 0; i < dimensions*n; ++i) local_data[i] = (float_t)rand()/(float_t)RAND_MAX; + *data = local_data; + + ctx -> dims = dimensions; + ctx -> local_n_points = n; + + return; +} + diff --git a/src/common/common.h b/src/common/common.h new file mode 100644 index 0000000000000000000000000000000000000000..8baac026f9953f239776403dfa24802184721c0a --- /dev/null +++ b/src/common/common.h @@ -0,0 +1,84 @@ +#pragma once +#include <math.h> +#include <stdlib.h> +#include <stdio.h> +#include <mpi.h> +#include <stdint.h> + +#ifdef USE_FLOAT32 + #define float_t float +#else + #define float_t double +#endif + +#define MY_TRUE 1 +#define MY_FALSE 0 + +#define DB_PRINT(...) printf(__VA_ARGS__) +#ifdef NDEBUG + #undef DB_PRINT(...) + #define DB_PRINT(...) +#endif + +#define MPI_DB_PRINT(...) mpi_printf(ctx,__VA_ARGS__) +#ifdef NDEBUG + #undef MPI_DB_PRINT(...) + #define MPI_DB_PRINT(...) +#endif + +#define MPI_PRINT(...) mpi_printf(ctx,__VA_ARGS__) + +#define MAX(A,B) ((A) > (B) ? (A) : (B)) +#define MIN(A,B) ((A) < (B) ? (A) : (B)) + +/* + * from Spriengel code Gadget4 + */ + +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 202000L +/* C2x does not require the second parameter for va_start. */ +#define va_start(ap, ...) __builtin_va_start(ap, 0) +#else +/* Versions before C2x do require the second parameter. */ +#define va_start(ap, param) __builtin_va_start(ap, param) +#endif +#define va_end(ap) __builtin_va_end(ap) +#define va_arg(ap, type) __builtin_va_arg(ap, type) + +struct global_context_t +{ + size_t n_points; + MPI_Comm mpi_communicator; + int world_size; + char processor_mame[MPI_MAX_PROCESSOR_NAME]; + int __processor_name_len; + int mpi_rank; + size_t local_n_points; + uint32_t dims; + float_t* local_data; + float_t* lb_box; + float_t* ub_box; +}; + +struct pointset_t +{ + size_t n_points; + size_t __capacity; + uint32_t dims; + float_t* data; + float_t* lb_box; + float_t* ub_box; +}; + +typedef struct pointset_t pointset_t; +typedef struct global_context_t global_context_t; + +void mpi_printf(global_context_t*, const char *fmt, ...); +void get_context(global_context_t*); +void print_global_context(global_context_t* ); +void free_context(global_context_t* ); +void free_pointset(pointset_t* ); + +void generate_random_matrix(float_t** ,int ,size_t ,size_t ,global_context_t*); + + diff --git a/src/tree/tree.c b/src/tree/tree.c new file mode 100644 index 0000000000000000000000000000000000000000..92687b4b18770530e8e1596c60ff7a9ad5c2df83 --- /dev/null +++ b/src/tree/tree.c @@ -0,0 +1,1720 @@ +/* + * Implementation of a distributed memory kd-tree + * The idea is to have a top level domain decomposition with a shallow shared + * top level tree between computational nodes. + * + * Then each domain has a different set of points to work on separately + * the top tree serves as a map to know later on in which processor ask for + * neighbors + */ +#include "tree.h" +#include "mpi.h" +#include <math.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#ifdef USE_FLOAT32 +#define MPI_MY_FLOAT MPI_FLOAT +#else +#define MPI_MY_FLOAT MPI_DOUBLE +#endif + +#define I_AM_MASTER ctx->mpi_rank == 0 + +#define TOP_TREE_RCH 1 +#define TOP_TREE_LCH 0 + +float_t *read_data_file(global_context_t *ctx, const char *fname, + const int file_in_float32) { + + FILE *f = fopen(fname, "r"); + if (!f) { + printf("Nope\n"); + exit(1); + } + fseek(f, 0, SEEK_END); + size_t n = ftell(f); + rewind(f); + + int InputFloatSize = file_in_float32 ? 4 : 8; + + n = n / (InputFloatSize); + + float_t *data = (float_t *)malloc(n * sizeof(float_t)); + + if (file_in_float32) { + float *df = (float *)malloc(n * sizeof(float)); + size_t fff = fread(df, sizeof(float), n, f); + mpi_printf(ctx, "Read %luB\n", fff); + fclose(f); + + for (uint64_t i = 0; i < n; ++i) + data[i] = (float_t)(df[i]); + + free(df); + } else { + double *df = (double *)malloc(n * sizeof(double)); + size_t fff = fread(df, sizeof(double), n, f); + mpi_printf(ctx, "Read %luB\n", fff); + fclose(f); + + for (uint64_t i = 0; i < n; ++i) + data[i] = (float_t)(df[i]); + + free(df); + } + ctx->n_points = n; + return data; +} + +/* quickselect for an element along a dimension */ + +void swap_data_element(float_t *a, float_t *b, int vec_len) { + float_t tmp; + for (int i = 0; i < vec_len; ++i) { + tmp = a[i]; + a[i] = b[i]; + b[i] = tmp; + } +} + +int compare_data_element(float_t *a, float_t *b, int compare_dim) { + return -((a[compare_dim] - b[compare_dim]) > 0.) + + ((a[compare_dim] - b[compare_dim]) < 0.); +} + +int partition_data_element(float_t *array, int vec_len, int compare_dim, + int left, int right, int pivot_index) { + int store_index = left; + int i; + /* Move pivot to end */ + swap_data_element(array + pivot_index * vec_len, array + right * vec_len, + vec_len); + for (i = left; i < right; ++i) { + // if(compare_data_element(array + i*vec_len, array + pivot_index*vec_len, + // compare_dim ) >= 0){ + if (array[i * vec_len + compare_dim] < + array[right * vec_len + compare_dim]) { + swap_data_element(array + store_index * vec_len, array + i * vec_len, + vec_len); + store_index += 1; + } + } + /* Move pivot to its final place */ + swap_data_element(array + (store_index)*vec_len, array + right * vec_len, + vec_len); + + return store_index; +} + +int qselect_data_element(float_t *array, int vec_len, int compare_dim, int left, + int right, int n) { + int pivot_index; + if (left == right) { + return left; + } + pivot_index = left; // + (rand() % (right-left + 1)); /* random int left <= x + // <= right */ + pivot_index = partition_data_element(array, vec_len, compare_dim, left, right, + pivot_index); + /* The pivot is in its final sorted position */ + if (n == pivot_index) { + return pivot_index; + } else if (n < pivot_index) { + return qselect_data_element(array, vec_len, compare_dim, left, + pivot_index - 1, n); + } else { + return qselect_data_element(array, vec_len, compare_dim, pivot_index + 1, + right, n); + } +} + +int quickselect_data_element(float_t *array, int vec_len, int array_size, + int compare_dim, int k) { + return qselect_data_element(array, vec_len, compare_dim, 0, array_size - 1, + k - 1); +} + +int CMP_DIM; +int compare_data_element_sort(const void *a, const void *b) { + float_t aa = *((float_t *)a + CMP_DIM); + float_t bb = *((float_t *)b + CMP_DIM); + return ((aa - bb) > 0.) - ((aa - bb) < 0.); +} + +void compute_bounding_box(global_context_t *ctx) { + ctx->lb_box = (float_t *)malloc(ctx->dims * sizeof(float_t)); + ctx->ub_box = (float_t *)malloc(ctx->dims * sizeof(float_t)); + + for (size_t d = 0; d < ctx->dims; ++d) { + ctx->lb_box[d] = 99999999.; + ctx->ub_box[d] = -99999999.; + } + +#define local_data ctx->local_data +#define lb ctx->lb_box +#define ub ctx->ub_box + + /* compute minimum and maximum for each dimensions, store them in local bb */ + /* each processor on its own */ + for (size_t i = 0; i < ctx->local_n_points; ++i) { + for (size_t d = 0; d < ctx->dims; ++d) { + lb[d] = MIN(local_data[i * ctx->dims + d], lb[d]); + ub[d] = MAX(local_data[i * ctx->dims + d], ub[d]); + } + } + + /* Reduce to obtain bb */ + /* + MPI_Allreduce( const void *sendbuf, + void *recvbuf, + int count, + MPI_Datatype datatype, + MPI_Op op, + MPI_Comm comm) + */ + + /*get the bounding box */ + + MPI_Allreduce(MPI_IN_PLACE, lb, ctx->dims, MPI_MY_FLOAT, MPI_MIN, + ctx->mpi_communicator); + MPI_Allreduce(MPI_IN_PLACE, ub, ctx->dims, MPI_MY_FLOAT, MPI_MAX, + ctx->mpi_communicator); + + /* + DB_PRINT("[RANK %d]:", ctx -> mpi_rank); + for(size_t d = 0; d < ctx -> dims; ++d) + { + DB_PRINT("%lf ", ctx -> ub_box[d]); + } + DB_PRINT("\n"); + */ + + /* + MPI_DB_PRINT("[BOUNDING BOX]: "); + for(size_t d = 0; d < ctx -> dims; ++d) MPI_DB_PRINT("d%d:[%lf, %lf] ",(int)d, + lb[d], ub[d]); MPI_DB_PRINT("\n"); + */ + +#undef local_data +#undef lb +#undef ub +} + +/* i want a queue to enqueue the partitions to deal with */ +void enqueue_partition(partition_queue_t *queue, partition_t p) +{ + if (queue->count == queue->_capacity) { + queue->_capacity = queue->_capacity * 1.10; + queue->data = realloc(queue->data, queue->_capacity); + } + /* insert point */ + memmove(queue->data + 1, queue->data, queue->count * sizeof(partition_t)); + queue->data[0] = p; + queue->count++; +} + +partition_t dequeue_partition(partition_queue_t *queue) { + + return queue->data[--(queue->count)]; +} + +void compute_medians_and_check(global_context_t *ctx, float_t *data) { + float_t prop = 0.5; + int k = (int)(ctx->local_n_points * prop); + int d = 1; + + /*quick select on a particular dimension */ + CMP_DIM = d; + int kk = (k - 1) * ctx->dims; + + int count = 0; + // idx = idx - 1; + // + int aaa = quickselect_data_element(ctx->local_data, (int)(ctx->dims), + (int)(ctx->local_n_points), d, k); + /* + * sanity check + * check if the median found in each node is + * a median + */ + + float_t *medians_rcv = + (float_t *)malloc(ctx->dims * ctx->world_size * sizeof(float_t)); + + /* + * MPI_Allgather( const void *sendbuf, + * int sendcount, + * MPI_Datatype sendtype, + * void *recvbuf, + * int recvcount, + * MPI_Datatype recvtype, + * MPI_Comm comm) + */ + + /* Exchange medians */ + + MPI_Allgather(ctx->local_data + kk, ctx->dims, MPI_MY_FLOAT, medians_rcv, + ctx->dims, MPI_MY_FLOAT, ctx->mpi_communicator); + + /* sort medians on each node */ + + CMP_DIM = d; + qsort(medians_rcv, ctx->world_size, ctx->dims * sizeof(float_t), + compare_data_element_sort); + + /* + * Evaluate goodness of the median on master which has whole dataset + */ + + if (ctx->mpi_rank == 0) { + int count = 0; + int idx = (int)(prop * (ctx->world_size)); + // idx = idx - 1; + for (int i = 0; i < ctx->n_points; ++i) { + count += data[i * ctx->dims + d] <= medians_rcv[idx * ctx->dims + d]; + } + mpi_printf(ctx, + "Choosing %lf percentile on dimension %d: empirical prop %lf\n", + prop, d, (float_t)count / (float_t)(ctx->n_points)); + } + free(medians_rcv); +} + +void check_pc(global_context_t *ctx, float_t *best_guess, float_t *data, int d, + float_t prop) { + if (ctx->mpi_rank == 0) { + int count = 0; + int idx = (int)(prop * (ctx->world_size)); + // idx = idx - 1; + for (int i = 0; i < ctx->n_points; ++i) { + count += data[i * ctx->dims + d] <= best_guess[d]; + } + mpi_printf(ctx, + "Choosing %lf percentile on dimension %d: empirical prop %lf\n", + prop, d, (float_t)count / (float_t)(ctx->n_points)); + } +} + +void check_pc_pointset(global_context_t *ctx, pointset_t *ps, + float_t *best_guess, int d, float_t prop) { + float_t *tmp_data; + int *rcv_count; + int tmp_local_count = (int)ps->n_points; + int *displs; + int point_count = 0; + /* + * ONLY FOR TEST PURPOSES + * gather on master all data + * perform the count on master + */ + if (I_AM_MASTER) { + rcv_count = (int *)malloc(ctx->world_size * sizeof(int)); + displs = (int *)malloc(ctx->world_size * sizeof(int)); + MPI_Gather(&tmp_local_count, 1, MPI_INT, rcv_count, 1, MPI_INT, 0, + ctx->mpi_communicator); + + int tot_count = 0; + for (int p = 0; p < ctx->world_size; ++p) { + point_count += rcv_count[p]; + displs[p] = tot_count; + tot_count += rcv_count[p] * ps->dims; + rcv_count[p] = rcv_count[p] * ps->dims; + } + + tmp_data = (float_t *)malloc(tot_count * sizeof(float_t)); + MPI_Gatherv(ps->data, ps->n_points * ps->dims, MPI_MY_FLOAT, tmp_data, + rcv_count, displs, MPI_MY_FLOAT, 0, ctx->mpi_communicator); + } else { + MPI_Gather(&(ps->n_points), 1, MPI_INT, rcv_count, 1, MPI_INT, 0, + ctx->mpi_communicator); + MPI_Gatherv(ps->data, ps->n_points * ps->dims, MPI_MY_FLOAT, tmp_data, + rcv_count, displs, MPI_MY_FLOAT, 0, ctx->mpi_communicator); + } + if (I_AM_MASTER) { + MPI_DB_PRINT("[MASTER] Gathered %d points from procs\n", point_count); + int count = 0; + int idx = (int)(prop * (ctx->world_size)); + // idx = idx - 1; + for (int i = 0; i < point_count; ++i) { + count += tmp_data[i * ps->dims + d] <= best_guess[d]; + } + + mpi_printf(ctx, "[PS TEST]: "); + mpi_printf(ctx, + "Choosing %lf percentile on dimension %d: empirical prop %lf\n", + prop, d, (float_t)count / (float_t)(point_count)); + free(rcv_count); + free(tmp_data); + free(displs); + } +} + +float_t check_pc_pointset_parallel(global_context_t *ctx, pointset_t *ps, + float_t *best_guess, int d, float_t prop) { + /* + * ONLY FOR TEST PURPOSES + * gather on master all data + * perform the count on master + */ + int pvt_count = 0; + for (int i = 0; i < ps->n_points; ++i) { + pvt_count += ps->data[i * ps->dims + d] <= best_guess[d]; + } + + int pvt_n_and_tot[2] = {pvt_count, ps->n_points}; + int tot_count[2]; + MPI_Allreduce(pvt_n_and_tot, tot_count, 2, MPI_INT, MPI_SUM, + ctx->mpi_communicator); + + float_t ep = (float_t)tot_count[0] / (float_t)(tot_count[1]); + /* + mpi_printf(ctx,"[PS TEST PARALLEL]: "); + mpi_printf(ctx,"Condsidering %d points, searching for %lf percentile on + dimension %d: empirical measure %lf\n",tot_count[1],prop, d, ep); + */ + return ep; +} + +void compute_bounding_box_pointset(global_context_t *ctx, pointset_t *ps) { + ps->lb_box = (float_t *)malloc(ps->dims * sizeof(float_t)); + ps->ub_box = (float_t *)malloc(ps->dims * sizeof(float_t)); + + for (size_t d = 0; d < ps->dims; ++d) { + ps->lb_box[d] = 99999999.; + ps->ub_box[d] = -99999999.; + } + +#define local_data ps->data +#define lb ps->lb_box +#define ub ps->ub_box + + /* compute minimum and maximum for each dimensions, store them in local bb */ + /* each processor on its own */ + for (size_t i = 0; i < ps->n_points; ++i) { + for (size_t d = 0; d < ps->dims; ++d) { + lb[d] = MIN(local_data[i * ps->dims + d], lb[d]); + ub[d] = MAX(local_data[i * ps->dims + d], ub[d]); + } + } + + /* Reduce to obtain bb */ + /* + MPI_Allreduce( const void *sendbuf, + void *recvbuf, + int count, + MPI_Datatype datatype, + MPI_Op op, + MPI_Comm comm) + */ + + /*get the bounding box */ + + MPI_Allreduce(MPI_IN_PLACE, lb, ps->dims, MPI_MY_FLOAT, MPI_MIN, + ctx->mpi_communicator); + MPI_Allreduce(MPI_IN_PLACE, ub, ps->dims, MPI_MY_FLOAT, MPI_MAX, + ctx->mpi_communicator); + + /* + DB_PRINT("[RANK %d]:", ctx -> mpi_rank); + for(size_t d = 0; d < ctx -> dims; ++d) + { + DB_PRINT("%lf ", ctx -> ub_box[d]); + } + DB_PRINT("\n"); + */ + + /* + MPI_DB_PRINT("[PS BOUNDING BOX]: "); + for(size_t d = 0; d < ps -> dims; ++d) MPI_DB_PRINT("d%d:[%lf, %lf] ",(int)d, + lb[d], ub[d]); MPI_DB_PRINT("\n"); + */ + +#undef local_data +#undef lb +#undef ub +} + +void compute_adaptive_binning_pointset(global_context_t *ctx, pointset_t *ps, + int k_local, int k_global, int d, + float_t *bin_bounds, + float_t *global_bin_counts) { + float_t nn = (float_t)nn; + // int use_qselect = (float_t)k < 2.*log2(nn)/nn + 1.; + + /* TODO: unset this */ + int use_qselect = 1; + int stride = ps->n_points / k_local; + if (use_qselect) { + // quickselect_data_element(ctx -> local_data , ctx -> dims, ctx -> + // local_n_points, 1, 5); + int idx = 0; + while (idx < ps->n_points) { + int offset = idx * ps->dims; + // MPI_DB_PRINT("Startig from %d w stride %d -- %d \n", idx, stride, ctx + // -> local_n_points - idx); + if (ps->n_points - idx > stride) + quickselect_data_element(ps->data + offset, ps->dims, + ps->n_points - idx, d, stride); + idx = idx + stride; + } + } else { + qsort(ps->data, ps->n_points, ps->dims * sizeof(float_t), + compare_data_element_sort); + } + + /* + * Now what is more convenient? We have to also + * keep track of who owns the most "median thing" + * so who owns the lb and ub of each interval + * + * MY approach would be to + * - compute the local binning + * - found the most median bin + * - ask each processor to find the point nearest to that bin + */ + + int leftover = ps->n_points - stride * k_local + stride; + MPI_DB_PRINT("%lu %d %d leftover %d\n", ps->n_points, k_local, + stride * k_local, leftover); + + /* fill the first one and the last one with the bb values */ + bin_bounds[0] = 9999999; + bin_bounds[k_local] = -99999999; + for (size_t i = 0; i < ps->n_points; ++i) { + bin_bounds[0] = MIN(ps->data[i * ps->dims + d], bin_bounds[0]); + bin_bounds[k_local] = MAX(ps->data[i * ps->dims + d], bin_bounds[k_local]); + } + // bin_bounds[0] = ctx -> lb_box[d]; + // bin_bounds[k_local] = ctx -> ub_box[d]; + + for (int i = 1; i < k_local; ++i) { + /* retrieve the index of the datapoint on the bound */ + int idx = (stride * i) - 1; + /* write the value on the bins */ + bin_bounds[i] = ps->data[idx * ps->dims + d]; + } + + /* alloc and initialize global bins */ + for (int i = 0; i < k_global; ++i) + global_bin_counts[i] = 0; + + /* retrieve strides for each processor */ + int *all_proc_strides = (int *)malloc(ctx->world_size * sizeof(int)); + MPI_Allgather(&stride, 1, MPI_INT, all_proc_strides, 1, MPI_INT, + ctx->mpi_communicator); + + /* retrieve leftovers for each proc */ + int *all_proc_leftovers = (int *)malloc(ctx->world_size * sizeof(int)); + MPI_Allgather(&leftover, 1, MPI_INT, all_proc_leftovers, 1, MPI_INT, + ctx->mpi_communicator); + + for (int i = 0; i < ctx->world_size; ++i) { + // MPI_DB_PRINT("[MASTER] recieved from proc %d stride %d leftover %d\n", i, + // all_proc_strides[i], all_proc_leftovers[i]); + } + + float_t *all_proc_bounds = + (float_t *)malloc(ctx->world_size * (k_local + 1) * sizeof(float_t)); + MPI_Allgather(bin_bounds, k_local + 1, MPI_MY_FLOAT, all_proc_bounds, + k_local + 1, MPI_MY_FLOAT, ctx->mpi_communicator); + /* exchange bounds */ + + /* + printf("[RANK %d] bounds: [", ctx -> mpi_rank); + for(int i = 0; i < k_local + 1; ++i) printf("%lf ", bin_bounds[i]); + printf("]\n"); + */ + + /* + MPI_Barrier(ctx -> mpi_communicator); + for(int i = 0; i < ctx -> world_size; ++i) + { + MPI_DB_PRINT("["); + for(int b = 0; b < k_local + 1; ++b) MPI_DB_PRINT("%lf ", + all_proc_bounds[i*(k_local + 1) + b]); MPI_DB_PRINT("]\n"); + } + */ + + /* + * now let the dance begin + * fill the global count array + */ + + /* compute intersection between local bin and global bin */ + float_t box_lb = ps->lb_box[d]; + float_t box_ub = ps->ub_box[d]; + float_t box_width = box_ub - box_lb; + float_t global_bin_width = box_width / (float_t)k_global; + + for (int i = 0; i < ctx->world_size; ++i) + for (int local_bin_idx = 0; local_bin_idx < k_local; ++local_bin_idx) { + float_t local_bin_lb = + all_proc_bounds[(i) * (k_local + 1) + local_bin_idx]; + float_t local_bin_ub = + all_proc_bounds[(i) * (k_local + 1) + local_bin_idx + 1]; + float_t local_bin_width = local_bin_ub - local_bin_lb; + + int local_bin_count = local_bin_idx == k_local - 1 ? all_proc_leftovers[i] + : all_proc_strides[i]; + // MPI_DB_PRINT("%d \n", local_bin_count); + + if (local_bin_count > 0) { + float_t local_bin_density = + (float_t)(local_bin_count) / local_bin_width; + // MPI_DB_PRINT("%lf\n", local_bin_width ); + for (int global_bin_idx = 0; global_bin_idx < k_global; + ++global_bin_idx) { + float_t global_bin_lb = box_lb + global_bin_width * global_bin_idx; + float_t global_bin_ub = + box_lb + global_bin_width * (global_bin_idx + 1); + + float_t intersenction_lb = MAX(global_bin_lb, local_bin_lb); + float_t intersenction_ub = MIN(global_bin_ub, local_bin_ub); + float_t intersection_width = intersenction_ub - intersenction_lb; + + if (intersection_width > 0) { + float_t intersection_count = intersection_width * local_bin_density; + global_bin_counts[global_bin_idx] += intersection_count; + } + } + } + } + + /*float_t cc = 0;*/ + + /* + MPI_DB_PRINT("Global bins: ["); + for(int global_bin_idx = 0; global_bin_idx < k_global; ++global_bin_idx) + { + MPI_DB_PRINT("%.2lf ", global_bin_counts[global_bin_idx]); + cc += global_bin_counts[global_bin_idx]; + } + MPI_DB_PRINT("] tot %lf\n", cc); + */ + + /* + * + */ + + free(all_proc_strides); + free(all_proc_leftovers); + free(all_proc_bounds); +} + +void compute_binning(global_context_t *ctx, int k_local, int k_global, int d, + float_t *bin_bounds, float_t *global_bin_counts) { + float_t nn = (float_t)nn; + // int use_qselect = (float_t)k < 2.*log2(nn)/nn + 1.; + + /* TODO: unset this */ + int use_qselect = 1; + int stride = ctx->local_n_points / k_local; + if (use_qselect) { + // quickselect_data_element(ctx -> local_data , ctx -> dims, ctx -> + // local_n_points, 1, 5); + int idx = 0; + while (idx < ctx->local_n_points) { + int offset = idx * ctx->dims; + // MPI_DB_PRINT("Startig from %d w stride %d -- %d \n", idx, stride, ctx + // -> local_n_points - idx); + if (ctx->local_n_points - idx > stride) + quickselect_data_element(ctx->local_data + offset, ctx->dims, + ctx->local_n_points - idx, d, stride); + idx = idx + stride; + } + } else { + qsort(ctx->local_data, ctx->local_n_points, ctx->dims * sizeof(float_t), + compare_data_element_sort); + } + + /* + * Now what is more convenient? We have to also + * keep track of who owns the most "median thing" + * so who owns the lb and ub of each interval + * + * MY approach would be to + * - compute the local binning + * - found the most median bin + * - ask each processor to find the point nearest to that bin + */ + + int leftover = ctx->local_n_points - stride * k_local + stride; + MPI_DB_PRINT("%lu %d %d leftover %d\n", ctx->local_n_points, k_local, + stride * k_local, leftover); + + /* fill the first one and the last one with the bb values */ + bin_bounds[0] = 9999999; + bin_bounds[k_local] = -99999999; + for (size_t i = 0; i < ctx->local_n_points; ++i) { + bin_bounds[0] = MIN(ctx->local_data[i * ctx->dims + d], bin_bounds[0]); + bin_bounds[k_local] = + MAX(ctx->local_data[i * ctx->dims + d], bin_bounds[k_local]); + } + // bin_bounds[0] = ctx -> lb_box[d]; + // bin_bounds[k_local] = ctx -> ub_box[d]; + + for (int i = 1; i < k_local; ++i) { + /* retrieve the index of the datapoint on the bound */ + int idx = (stride * i) - 1; + /* write the value on the bins */ + bin_bounds[i] = ctx->local_data[idx * ctx->dims + d]; + } + + /* alloc and initialize global bins */ + for (int i = 0; i < k_global; ++i) + global_bin_counts[i] = 0; + + /* retrieve strides for each processor */ + int *all_proc_strides = (int *)malloc(ctx->world_size * sizeof(int)); + MPI_Allgather(&stride, 1, MPI_INT, all_proc_strides, 1, MPI_INT, + ctx->mpi_communicator); + + /* retrieve leftovers for each proc */ + int *all_proc_leftovers = (int *)malloc(ctx->world_size * sizeof(int)); + MPI_Allgather(&leftover, 1, MPI_INT, all_proc_leftovers, 1, MPI_INT, + ctx->mpi_communicator); + + for (int i = 0; i < ctx->world_size; ++i) { + // MPI_DB_PRINT("[MASTER] recieved from proc %d stride %d leftover %d\n", i, + // all_proc_strides[i], all_proc_leftovers[i]); + } + + float_t *all_proc_bounds = + (float_t *)malloc(ctx->world_size * (k_local + 1) * sizeof(float_t)); + MPI_Allgather(bin_bounds, k_local + 1, MPI_MY_FLOAT, all_proc_bounds, + k_local + 1, MPI_MY_FLOAT, ctx->mpi_communicator); + /* exchange bounds */ + + /* + printf("[RANK %d] bounds: [", ctx -> mpi_rank); + for(int i = 0; i < k_local + 1; ++i) printf("%lf ", bin_bounds[i]); + printf("]\n"); + */ + + /* + MPI_Barrier(ctx -> mpi_communicator); + for(int i = 0; i < ctx -> world_size; ++i) + { + MPI_DB_PRINT("["); + for(int b = 0; b < k_local + 1; ++b) MPI_DB_PRINT("%lf ", + all_proc_bounds[i*(k_local + 1) + b]); MPI_DB_PRINT("]\n"); + } + */ + + float_t bin_width = ctx->ub_box[d] - ctx->lb_box[d]; + + /* compute intersection between local bin and global bin */ + float_t box_lb = ctx->lb_box[d]; + float_t box_ub = ctx->ub_box[d]; + float_t box_width = box_ub - box_lb; + float_t global_bin_width = box_width / (float_t)k_global; + + for (int i = 0; i < ctx->world_size; ++i) + for (int local_bin_idx = 0; local_bin_idx < k_local; ++local_bin_idx) { + float_t local_bin_lb = + all_proc_bounds[(i) * (k_local + 1) + local_bin_idx]; + float_t local_bin_ub = + all_proc_bounds[(i) * (k_local + 1) + local_bin_idx + 1]; + float_t local_bin_width = local_bin_ub - local_bin_lb; + + int local_bin_count = local_bin_idx == k_local - 1 ? all_proc_leftovers[i] + : all_proc_strides[i]; + // MPI_DB_PRINT("%d \n", local_bin_count); + + if (local_bin_count > 0) { + float_t local_bin_density = + (float_t)(local_bin_count) / local_bin_width; + // MPI_DB_PRINT("%lf\n", local_bin_width ); + for (int global_bin_idx = 0; global_bin_idx < k_global; + ++global_bin_idx) { + float_t global_bin_lb = box_lb + global_bin_width * global_bin_idx; + float_t global_bin_ub = + box_lb + global_bin_width * (global_bin_idx + 1); + + float_t intersenction_lb = MAX(global_bin_lb, local_bin_lb); + float_t intersenction_ub = MIN(global_bin_ub, local_bin_ub); + float_t intersection_width = intersenction_ub - intersenction_lb; + + if (intersection_width > 0) { + float_t intersection_count = intersection_width * local_bin_density; + global_bin_counts[global_bin_idx] += intersection_count; + } + } + } + } + + float_t cc = 0; + + /* + MPI_DB_PRINT("Global bins: ["); + for(int global_bin_idx = 0; global_bin_idx < k_global; ++global_bin_idx) + { + MPI_DB_PRINT("%.2lf ", global_bin_counts[global_bin_idx]); + cc += global_bin_counts[global_bin_idx]; + } + MPI_DB_PRINT("] tot %lf\n", cc); + */ + + free(all_proc_strides); + free(all_proc_leftovers); + free(all_proc_bounds); +} + +int retrieve_guess_adaptive(global_context_t *ctx, pointset_t *ps, + float_t *global_bin_counts, float_t *best_guess, + int k_global, int d, float_t pc) { + float_t total_count = 0.; + for (int i = 0; i < k_global; ++i) + total_count += global_bin_counts[i]; + + float_t cumulative_count = 0; + int idx = 0; + while ((cumulative_count + global_bin_counts[idx]) / total_count < pc) { + cumulative_count += global_bin_counts[idx]; + idx++; + } + /* find best spot in the bin */ + float_t box_lb = ps->lb_box[d]; + float_t box_ub = ps->ub_box[d]; + float_t box_width = box_ub - box_lb; + float_t global_bin_width = box_width / (float_t)k_global; + + float_t x0 = box_lb + (global_bin_width * (idx)); + float_t x1 = box_lb + (global_bin_width * (idx + 1)); + + float_t y0 = (cumulative_count) / total_count; + float_t y1 = (cumulative_count + global_bin_counts[idx]) / total_count; + + float_t x_guess = (pc - y0) / (y1 - y0) * (x1 - x0) + x0; + + MPI_DB_PRINT("[MASTER] best guess @ %lf is %lf on bin %d on dimension %d --- " + "x0 %lf x1 %lf y0 %lf y1 %lf\n", + pc, x_guess, idx, d, x0, x1, y0, y1); + + /* find nearest point btw guess */ + + int best_idx = 0; + float_t best_val = 999999; + for (int i = 0; i < ps->n_points; ++i) { + float_t dist = fabs(x_guess - ps->data[i * ps->dims + d]); + int c = dist < best_val; + best_val = c ? dist : best_val; + best_idx = c ? i : best_idx; + } + + mpi_double_int best = {.val = best_val, .key = ctx->mpi_rank}; + MPI_Allreduce(MPI_IN_PLACE, &best, 1, MPI_DOUBLE_INT, MPI_MINLOC, + ctx->mpi_communicator); + // printf("%lf %d\n", best.val, best.key); + + /* broadcast best guess */ + if (ctx->mpi_rank == best.key) { + memcpy(best_guess, ps->data + best_idx * ps->dims, + ps->dims * sizeof(float_t)); + } + MPI_Bcast(best_guess, ps->dims, MPI_MY_FLOAT, best.key, + ctx->mpi_communicator); + + /* recieved ? */ + MPI_DB_PRINT("[MASTER] ["); + for (int i = 0; i < ps->dims; ++i) + MPI_DB_PRINT("%lf ", best_guess[i]); + MPI_DB_PRINT("]\n"); + + /* + printf("[rank %d] [", ctx -> mpi_rank); + for(int i = 0; i < ctx -> dims; ++i) printf("%lf ", best_guess[i]); + printf("]\n"); + */ + return idx; +} + +int retrieve_guess_pure(global_context_t *ctx, pointset_t *ps, + uint64_t *global_bin_counts, float_t *best_guess, + int k_global, int d, float_t pc) { + + /* + * retrieving the best median guess from pure binning + */ + + float_t total_count = 0.; + for (int i = 0; i < k_global; ++i) + total_count += (float_t)global_bin_counts[i]; + + /* + MPI_DB_PRINT("[ "); + for(int i = 0; i < k_global; ++i) + { + MPI_DB_PRINT("%lu %lf --- ", global_bin_counts[i], + (float_t)global_bin_counts[i]/(float_t)total_count); + } + MPI_DB_PRINT("\n"); + */ + + float_t cumulative_count = 0; + int idx = 0; + while ((cumulative_count + (float_t)global_bin_counts[idx]) / total_count < + pc) { + cumulative_count += (float_t)global_bin_counts[idx]; + idx++; + } + /* find best spot in the bin */ + float_t box_lb = ps->lb_box[d]; + float_t box_ub = ps->ub_box[d]; + float_t box_width = box_ub - box_lb; + float_t global_bin_width = box_width / (float_t)k_global; + + float_t x0 = box_lb + (global_bin_width * (idx)); + float_t x1 = box_lb + (global_bin_width * (idx + 1)); + + float_t y0 = (cumulative_count) / total_count; + float_t y1 = (cumulative_count + global_bin_counts[idx]) / total_count; + + float_t x_guess = (pc - y0) / (y1 - y0) * (x1 - x0) + x0; + + /* + MPI_DB_PRINT("[MASTER] best guess @ %lf is %lf on bin %d on dimension %d --- + x0 %lf x1 %lf y0 %lf y1 %lf\n",pc, x_guess,idx, d, x0, x1, y0, y1); + */ + + /* find nearest point btw guess */ + + int best_idx = 0; + float_t best_val = 999999; + for (int i = 0; i < ps->n_points; ++i) { + float_t dist = fabs(x_guess - ps->data[i * ps->dims + d]); + int c = dist < best_val; + best_val = c ? dist : best_val; + best_idx = c ? i : best_idx; + } + + mpi_double_int best = {.val = best_val, .key = ctx->mpi_rank}; + MPI_Allreduce(MPI_IN_PLACE, &best, 1, MPI_DOUBLE_INT, MPI_MINLOC, + ctx->mpi_communicator); + // printf("%lf %d\n", best.val, best.key); + + /* broadcast best guess */ + if (ctx->mpi_rank == best.key) { + memcpy(best_guess, ps->data + best_idx * ps->dims, + ps->dims * sizeof(float_t)); + } + MPI_Bcast(best_guess, ps->dims, MPI_MY_FLOAT, best.key, + ctx->mpi_communicator); + + /* recieved ? */ + /* + MPI_DB_PRINT("[MASTER] ["); + for(int i = 0; i < ps -> dims; ++i) MPI_DB_PRINT("%lf ", best_guess[i]); + MPI_DB_PRINT("]\n"); + */ + + /* + printf("[rank %d] [", ctx -> mpi_rank); + for(int i = 0; i < ctx -> dims; ++i) printf("%lf ", best_guess[i]); + printf("]\n"); + */ + return idx; +} + +void retrieve_pc(global_context_t *ctx, float_t *global_bin_counts, + float_t *best_guess, int k_global, int d, float_t pc) { + float_t total_count = 0.; + for (int i = 0; i < k_global; ++i) + total_count += global_bin_counts[i]; + + float_t cumulative_count = 0; + int idx = 0; + while ((cumulative_count + global_bin_counts[idx]) / total_count < pc) { + cumulative_count += global_bin_counts[idx]; + idx++; + } + /* find best spot in the bin */ + float_t box_lb = ctx->lb_box[d]; + float_t box_ub = ctx->ub_box[d]; + float_t box_width = box_ub - box_lb; + float_t global_bin_width = box_width / (float_t)k_global; + + float_t x0 = box_lb + (global_bin_width * (idx)); + float_t x1 = box_lb + (global_bin_width * (idx + 1)); + + float_t y0 = (cumulative_count) / total_count; + float_t y1 = (cumulative_count + global_bin_counts[idx]) / total_count; + + float_t x_guess = (pc - y0) / (y1 - y0) * (x1 - x0) + x0; + + MPI_DB_PRINT("[MASTER] best guess @ %lf is %lf on bin %d on dimension %d --> " + "x0 %lf x1 %lf y0 %lf y1 %lf\n", + pc, x_guess, idx, d, x0, x1, y0, y1); + + /* find nearest point btw guess */ + + int best_idx = 0; + float_t best_val = 999999; + for (int i = 0; i < ctx->local_n_points; ++i) { + float_t dist = fabs(x_guess - ctx->local_data[i * ctx->dims + d]); + int c = dist < best_val; + best_val = c ? dist : best_val; + best_idx = c ? i : best_idx; + } + + mpi_double_int best = {.val = best_val, .key = ctx->mpi_rank}; + MPI_Allreduce(MPI_IN_PLACE, &best, 1, MPI_DOUBLE_INT, MPI_MINLOC, + ctx->mpi_communicator); + // printf("%lf %d\n", best.val, best.key); + + /* broadcast best guess */ + if (ctx->mpi_rank == best.key) { + memcpy(best_guess, ctx->local_data + best_idx * ctx->dims, + ctx->dims * sizeof(float_t)); + } + MPI_Bcast(best_guess, ctx->dims, MPI_MY_FLOAT, best.key, + ctx->mpi_communicator); + + /* recieved ? */ + MPI_DB_PRINT("[MASTER] ["); + for (int i = 0; i < ctx->dims; ++i) + MPI_DB_PRINT("%lf ", best_guess[i]); + MPI_DB_PRINT("]\n"); + + /* + printf("[rank %d] [", ctx -> mpi_rank); + for(int i = 0; i < ctx -> dims; ++i) printf("%lf ", best_guess[i]); + printf("]\n"); + */ +} + +void global_binning_check(global_context_t *ctx, float_t *data, int d, int k) { + /* + * sanity check + * find if global bins are somehow similar to acutal binning on master + */ + + if (I_AM_MASTER) { + int *counts = (int *)malloc(k * sizeof(int)); + for (int bin_idx = 0; bin_idx < k; ++bin_idx) + counts[bin_idx] = 0; + + float_t box_lb = ctx->lb_box[d]; + float_t box_ub = ctx->ub_box[d]; + float_t box_width = box_ub - box_lb; + float_t bin_width = box_width / (float_t)k; + + for (int i = 0; i < ctx->n_points; ++i) { + int bin_idx = (int)((data[i * ctx->dims + d] - box_lb) / bin_width); + if (bin_idx < k) + counts[bin_idx]++; + // counts[bin_idx]++; + } + int cc = 0; + + /* + MPI_DB_PRINT("Actual bins: ["); + for(int bin_idx = 0; bin_idx < k; ++bin_idx) + { + MPI_DB_PRINT("%d ", counts[bin_idx]); + cc += counts[bin_idx]; + } + MPI_DB_PRINT("] tot %d\n",cc); + */ + + free(counts); + } +} + +void compute_pure_global_binning(global_context_t *ctx, pointset_t *ps, + uint64_t *global_bin_counts, int k_global, + int d) { + /* compute binning of data along dimension d */ + uint64_t *local_bin_count = (uint64_t *)malloc(k_global * sizeof(uint64_t)); + for (size_t k = 0; k < k_global; ++k) { + local_bin_count[k] = 0; + global_bin_counts[k] = 0; + } + + float_t bin_w = (ps->ub_box[d] - ps->lb_box[d]) / k_global; + + for (size_t i = 0; i < ps->n_points; ++i) { + float_t p = ps->data[i * ps->dims + d]; + int bin_idx = (int)((p - ps->lb_box[d]) / bin_w); + local_bin_count[bin_idx]++; + } + + MPI_Allreduce(local_bin_count, global_bin_counts, k_global, MPI_UNSIGNED_LONG, + MPI_SUM, ctx->mpi_communicator); +} + +int partition_data_around_value(float_t *array, int vec_len, int compare_dim, + int left, int right, float_t pivot_value) { + /* + * returns the number of elements less than the pivot + */ + int store_index = left; + int i; + /* Move pivot to end */ + for (i = left; i < right; ++i) { + // if(compare_data_element(array + i*vec_len, array + pivot_index*vec_len, + // compare_dim ) >= 0){ + if (array[i * vec_len + compare_dim] < pivot_value) { + swap_data_element(array + store_index * vec_len, array + i * vec_len, + vec_len); + store_index += 1; + } + } + /* Move pivot to its final place */ + // swap_data_element(array + (store_index)*vec_len , array + right*vec_len, + // vec_len); + + return store_index; // maybe, update, it works :) +} + +float_t refine_pure_binning(global_context_t *ctx, pointset_t *ps, + float_t *best_guess, uint64_t *global_bin_count, + int best_idx, int k_global, int d, + float_t f, float_t ep, float_t tolerance) { + /* refine process from obtained binning */ + if (fabs(ep - f) < tolerance) { + /* + MPI_DB_PRINT("[MASTER] No need to refine, finishing\n"); + */ + return ep; + } + float_t total_count = 0; + float_t starting_cumulative = 0; + + for (int i = 0; i < best_idx; ++i) + starting_cumulative += global_bin_count[i]; + for (int i = 0; i < k_global; ++i) + total_count += global_bin_count[i]; + + float_t bin_w = (ps->ub_box[d] - ps->lb_box[d]) / k_global; + float_t bin_lb = ps->lb_box[d] + (bin_w * (best_idx)); + float_t bin_ub = ps->lb_box[d] + (bin_w * (best_idx + 1)); + + uint64_t *tmp_global_bins = (uint64_t *)malloc(sizeof(uint64_t) * k_global); + for (int i = 0; i < k_global; ++i) + tmp_global_bins[i] = global_bin_count[i]; + + int tmp_best_idx = best_idx; + + /* + MPI_DB_PRINT("STARTING REFINE global bins: "); + for(int i = 0; i < k_global; ++i) + { + MPI_DB_PRINT("%lf ", global_bin_count[i]); + } + MPI_DB_PRINT("\n"); + */ + while (fabs(ep - f) > tolerance) { + /* compute the target */ + float_t ff, b0, b1; + ff = -1; + b0 = starting_cumulative; + b1 = tmp_global_bins[tmp_best_idx]; + ff = (f * total_count - b0) / ((float_t)tmp_global_bins[tmp_best_idx]); + + /* + * generate a partset of points in the bin considered + * each one has to partition its dataset according to the + * fact that points on dimension d has to be in the bin + * + * then make into in place alg for now, copy data in another pointer + * will be done in place + * */ + + /* + MPI_DB_PRINT("---- ---- ----\n"); + MPI_DB_PRINT("[MASTER] Refining on bin %d lb %lf ub %lf starting c %lf + %lf\n", tmp_best_idx, bin_lb, bin_ub, starting_cumulative/total_count, + (tmp_global_bins[tmp_best_idx] + starting_cumulative)/total_count); + */ + + for (int i = 0; i < k_global; ++i) + tmp_global_bins[i] = 0; + + pointset_t tmp_ps; + + int end_idx = partition_data_around_value(ps->data, (int)ps->dims, d, 0, + (int)ps->n_points, bin_ub); + int start_idx = partition_data_around_value(ps->data, (int)ps->dims, d, 0, + end_idx, bin_lb); + + /* + tmp_ps.__capacity = 1000; + tmp_ps.n_points = 0; + tmp_ps.dims = ps->dims; + tmp_ps.data = (float_t*)malloc(tmp_ps.__capacity * tmp_ps.dims * + sizeof(float_t)); tmp_ps.lb_box = NULL; tmp_ps.ub_box = + NULL; + + for(int i = 0; i < ps -> n_points; ++i) + { + float_t p = ps -> data[i * ps -> dims + d]; + if( p > bin_lb && p < bin_ub) + { + if(tmp_ps.n_points == tmp_ps.__capacity) + { + tmp_ps.__capacity = tmp_ps.__capacity*1.10; + tmp_ps.data = (float_t*)realloc(tmp_ps.data, + tmp_ps.__capacity * tmp_ps.dims * sizeof(float_t)); + } + int idx_src = i * ps -> dims; + int idx_dst = tmp_ps.n_points * tmp_ps.dims; + memcpy(tmp_ps.data + idx_dst, ps -> data + idx_src, + tmp_ps.dims * sizeof(float_t)); tmp_ps.n_points++; + } + } + */ + + tmp_ps.n_points = end_idx - start_idx; + tmp_ps.data = ps->data + start_idx * ps->dims; + tmp_ps.dims = ps->dims; + tmp_ps.lb_box = NULL; + tmp_ps.ub_box = NULL; + + compute_bounding_box_pointset(ctx, &tmp_ps); + + /* + MPI_DB_PRINT("[MASTER] searching for %lf of the bin considered\n",ff); + */ + + // DB_PRINT("%lu\n",tmp_ps.n_points ); + MPI_Barrier(ctx->mpi_communicator); + compute_pure_global_binning(ctx, &tmp_ps, tmp_global_bins, k_global, d); + + /* sum to global bins */ + // for(int i = 0; i < k_global; ++i) tmp_global_bins[i] += + // starting_cumulative; + + tmp_best_idx = retrieve_guess_pure(ctx, &tmp_ps, tmp_global_bins, + best_guess, k_global, d, ff); + + ep = check_pc_pointset_parallel(ctx, ps, best_guess, d, f); + // ep = check_pc_pointset_parallel(ctx, &tmp_ps, best_guess, d, f); + + bin_w = (tmp_ps.ub_box[d] - tmp_ps.lb_box[d]) / k_global; + bin_lb = tmp_ps.lb_box[d] + (bin_w * (tmp_best_idx)); + bin_ub = tmp_ps.lb_box[d] + (bin_w * (tmp_best_idx + 1)); + + for (int i = 0; i < tmp_best_idx; ++i) + starting_cumulative += tmp_global_bins[i]; + + // free(tmp_ps.data); + free(tmp_ps.lb_box); + free(tmp_ps.ub_box); + } + + /* + MPI_DB_PRINT("SUCCESS!!! \n"); + */ + + free(tmp_global_bins); + + return ep; +} + +float_t refine_on_bin(global_context_t *ctx, pointset_t *ps, + float_t *best_guess, float_t *global_bin_count, + int best_idx, int k_global, int k_local, int d, float_t f, + float_t tolerance) { + float_t total_count = 0; + float_t starting_cumulative = 0; + for (int i = 0; i < best_idx; ++i) + starting_cumulative += global_bin_count[i]; + float_t ep = 100; + for (int i = 0; i < k_global; ++i) + total_count += global_bin_count[i]; + + float_t bin_w = (ps->ub_box[d] - ps->lb_box[d]) / k_global; + float_t bin_lb = ps->lb_box[d] + (bin_w * (best_idx)); + float_t bin_ub = ps->lb_box[d] + (bin_w * (best_idx + 1)); + + float_t *tmp_global_bins = (float_t *)malloc(sizeof(float_t) * k_global); + float_t *tmp_bin_bounds = (float_t *)malloc((k_local + 1) * sizeof(float_t)); + for (int i = 0; i < k_global; ++i) + tmp_global_bins[i] = global_bin_count[i]; + + int tmp_best_idx = best_idx; + + /* + MPI_DB_PRINT("STARTING REFINE global bins: "); + for(int i = 0; i < k_global; ++i) + { + MPI_DB_PRINT("%lf ", global_bin_count[i]); + } + MPI_DB_PRINT("\n"); + */ + while (fabs(ep - f) > 0.002) { + /* compute the target */ + float_t ff, b0, b1; + ff = -1; + while (ff < 0) { + starting_cumulative = 0; + for (int i = 0; i < tmp_best_idx; ++i) + starting_cumulative += tmp_global_bins[i]; + b0 = starting_cumulative; + b1 = tmp_global_bins[tmp_best_idx]; + ff = (f * total_count - b0) / ((float_t)tmp_global_bins[tmp_best_idx]); + } + + /* + * generate a partset of points in the bin considered + * each one has to partition its dataset according to the + * fact that points on dimension d has to be in the bin + * + * then make into in place alg + * for now dirty and inefficient + * */ + MPI_DB_PRINT("---- ---- ----\n"); + MPI_DB_PRINT( + "[MASTER] Refining on bin %d lb %lf ub %lf starting c %lf %lf\n", + tmp_best_idx, bin_lb, bin_ub, starting_cumulative / total_count, + (tmp_global_bins[tmp_best_idx] + starting_cumulative) / total_count); + + for (int i = 0; i < k_global; ++i) + tmp_global_bins[i] = 0; + + pointset_t tmp_ps; + tmp_ps.__capacity = 1000; + tmp_ps.n_points = 0; + tmp_ps.dims = ps->dims; + tmp_ps.data = + (float_t *)malloc(tmp_ps.__capacity * tmp_ps.dims * sizeof(float_t)); + tmp_ps.lb_box = NULL; + tmp_ps.ub_box = NULL; + + for (int i = 0; i < ps->n_points; ++i) { + float_t p = ps->data[i * ps->dims + d]; + if (p > bin_lb && p < bin_ub) { + if (tmp_ps.n_points == tmp_ps.__capacity) { + tmp_ps.__capacity = tmp_ps.__capacity * 1.10; + tmp_ps.data = (float_t *)realloc( + tmp_ps.data, tmp_ps.__capacity * tmp_ps.dims * sizeof(float_t)); + } + int idx_src = i * ps->dims; + int idx_dst = tmp_ps.n_points * tmp_ps.dims; + memcpy(tmp_ps.data + idx_dst, ps->data + idx_src, + tmp_ps.dims * sizeof(float_t)); + tmp_ps.n_points++; + } + } + + compute_bounding_box_pointset(ctx, &tmp_ps); + + MPI_DB_PRINT("%lf ff\n", ff); + + // DB_PRINT("%lu\n",tmp_ps.n_points ); + MPI_Barrier(ctx->mpi_communicator); + compute_adaptive_binning_pointset(ctx, &tmp_ps, k_local, k_global, d, + tmp_bin_bounds, tmp_global_bins); + + /* sum to global bins */ + // for(int i = 0; i < k_global; ++i) tmp_global_bins[i] += + // starting_cumulative; + + tmp_best_idx = retrieve_guess_adaptive(ctx, &tmp_ps, tmp_global_bins, + best_guess, k_global, d, ff); + + ep = check_pc_pointset_parallel(ctx, ps, best_guess, d, f); + ep = check_pc_pointset_parallel(ctx, &tmp_ps, best_guess, d, f); + + bin_w = (tmp_ps.ub_box[d] - tmp_ps.lb_box[d]) / k_global; + bin_lb = tmp_ps.lb_box[d] + (bin_w * (tmp_best_idx)); + bin_ub = tmp_ps.lb_box[d] + (bin_w * (tmp_best_idx + 1)); + + free(tmp_ps.data); + free(tmp_ps.lb_box); + free(tmp_ps.ub_box); + } + + MPI_DB_PRINT("SUCCESS!!! --- "); + + free(tmp_bin_bounds); + free(tmp_global_bins); + + return ep; +} + +void init_queue(partition_queue_t *pq) { + pq->count = 0; + pq->_capacity = 1000; + pq->data = (partition_t *)malloc(pq->_capacity * sizeof(partition_t)); +} + +void free_queue(partition_queue_t *pq) { free(pq->data); } + +void get_pointset_from_partition(pointset_t* ps, partition_t* part) +{ + ps -> lb_box = NULL; + ps -> ub_box = NULL; + ps -> n_points = part -> n_points; + ps -> data = part -> base_ptr; + ps -> n_points = part -> n_points; +} + +void compute_median_pure_binning( + global_context_t* ctx, + pointset_t* ps, + float_t* guess, + float_t fraction, + int selected_dim, + int n_bins, + float_t tolerance) +{ + + int best_bin_idx; + float_t ep; + + uint64_t* global_bin_counts_int = (uint64_t*)malloc(n_bins * sizeof(uint64_t)); + + for(int i = 0; i < ctx -> dims; ++i) guess[i] = 0.; + + compute_bounding_box_pointset(ctx, ps); + compute_pure_global_binning(ctx, ps, global_bin_counts_int, + n_bins, selected_dim); + best_bin_idx = retrieve_guess_pure( + ctx, ps, global_bin_counts_int, guess, n_bins, selected_dim, fraction); + // check_pc_pointset(ctx, ps, best_guess, d, f); + ep = check_pc_pointset_parallel(ctx, ps, guess, selected_dim, fraction); + ep = refine_pure_binning(ctx, ps, guess, + global_bin_counts_int, best_bin_idx, n_bins, + selected_dim, fraction, ep, tolerance); + free(global_bin_counts_int); + +} + +void top_tree_insert( + global_context_t* ctx, + top_kdtree_t* tree, + float_t* guess, + int selected_dim, + int is_leaf, + size_t n_points) +{ + if(tree -> count == tree->_capacity) + { + int new_cap = tree->_capacity * 1.1; + tree -> medians = realloc(tree -> medians, new_cap * tree -> dims * sizeof(float_t)); + tree -> _nodes = realloc(tree -> _nodes, new_cap * sizeof(top_kdtree_node_t)); + tree -> _capacity = new_cap; + } + + /* copy guess into medians */ + memcpy(tree -> medians + (tree -> count * ctx -> dims) , guess, ctx -> dims * sizeof(float_t)); + + /* retrieve the next free node */ + top_kdtree_node_t* node = &(tree->_nodes[tree -> count]); + + node -> data = tree -> medians + (tree -> count * tree -> dims); + node -> split_dim = selected_dim; + node -> owner = -1; + node -> lch = NULL; + node -> rch = NULL; + node -> parent = NULL; + node -> is_leaf = is_leaf; + node -> n_points = n_points; + + if(tree -> count == 0) + { + tree -> root = node; + } + else + { + top_kdtree_node_t* current_node = tree -> root; + top_kdtree_node_t* parent = NULL; + int side = -1; + while(current_node) + { + int split_dim = current_node -> split_dim; + parent = current_node -> parent; + /* tree traversal */ + int side = node -> data[split_dim] - current_node -> data[split_dim]; + switch (side) + { + case TOP_TREE_LCH: + parent = current_node; + current_node = current_node -> lch; + break; + case TOP_TREE_RCH: + parent = current_node; + current_node = current_node -> lch; + break; + default: + break; + } + + } + /* found the right place */ + switch (side) + { + case TOP_TREE_LCH: + parent -> lch = node; + node -> parent = parent; + break; + case TOP_TREE_RCH: + parent -> rch = node; + node -> parent = parent; + break; + default: + break; + } + } + + + +} + +void top_tree_init(global_context_t* ctx, top_kdtree_t* tree) +{ + tree -> dims = ctx -> dims; + tree -> count = 0; + tree -> _capacity = 100; + tree -> _nodes = (top_kdtree_node_t*)malloc(tree -> _capacity * sizeof(top_kdtree_node_t)); + tree -> medians = (float_t*)malloc(tree -> _capacity * tree -> dims * sizeof(float_t)); + return; +} + +void top_tree_free(global_context_t* ctx, top_kdtree_t *tree) +{ + free(tree -> _nodes ); + free(tree -> medians); + return; +} + + + +void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree_t* tree, int n_bins, float_t tolerance) +{ + size_t tot_n_points = 0; + MPI_Allreduce(&(og_pointset->n_points), &tot_n_points, 1, MPI_UINT64_T, + MPI_SUM, ctx->mpi_communicator); + + MPI_DB_PRINT("[MASTER] Top tree builder invoked\n"); + MPI_DB_PRINT("[MASTER] Trying to build top tree on %lu with %d processors\n", + tot_n_points, ctx->world_size); + + size_t current_partition_n_points = tot_n_points; + size_t expected_points_per_node = tot_n_points / ctx->world_size; + + /* enqueue the two partitions */ + + compute_bounding_box_pointset(ctx, og_pointset); + + partition_queue_t queue; + init_queue(&queue); + + int selected_dim = 0; + partition_t current_partition = {.d = selected_dim, + .base_ptr = og_pointset -> data, + .n_points = og_pointset -> n_points, + .n_procs = ctx -> world_size}; + + enqueue_partition(&queue, current_partition); + pointset_t current_pointset; + float_t* median_guess = (float_t*)malloc(ctx -> dims * sizeof(float_t)); + + while (queue.count) { + current_partition = dequeue_partition(&queue); + get_pointset_from_partition(¤t_pointset, ¤t_partition); + current_pointset.dims = ctx -> dims; + /* handle partition */ + compute_bounding_box_pointset(ctx,¤t_pointset); + float_t fraction = (current_partition.n_procs/2)/(float_t)current_partition.procs; + compute_median_pure_binning(ctx, ¤t_pointset, median_guess, fraction, selected_dim, n_bins, tolerance); + int pv = partition_data_around_value(current_pointset.data, ctx -> dims, selected_dim, 0, current_pointset.n_points, median_guess[selected_dim]); + /* insert node */ + + /* + * + * if points in the pointset is less than exp point per node then create a leaf + * actually better to have a margin of x percent around ppn + * + */ + + if( current_pointset.n_points < expected_points_per_node ) + { + top_tree_insert(ctx, tree, median_guess, selected_dim, MY_TRUE, current_pointset.n_points); + } + else + { + top_tree_insert(ctx, tree, median_guess, selected_dim, MY_FALSE, current_pointset.n_points); + } + + size_t points_left = current_partition.n_points * fraction; + size_t points_right = current_partition.n_points - points_left; + + int procs_left = current_partition.n_procs*fraction; + int procs_right = current_partition.n_procs - procs_left; + + int next_dimension = (++selected_dim) % (ctx -> dims); + partition_t left_partition = { + .n_points = points_left, + .n_procs = procs_left, + . + + }; + + /* get left and right pointset */ + + + + } + + free(median_guess); + free_queue(&queue); +} + +void simulate_master_read_and_scatter(int dims, size_t n, + global_context_t *ctx) +{ + float_t *data; + /* generate random points */ + + /* + if(ctx -> mpi_rank == 0) + { + data = (float_t*)malloc(dims*n*sizeof(float_t)); + //for(size_t i = 0; i < dims * n; ++i) data[i] = + (float_t)rand()/(float_t)(RAND_MAX); + //for(size_t i = 0; i < n; ++i) + // for(size_t j = 0; j < dims; ++j) + // data[i*dims + j] = (float_t)i; + + for(size_t i = 0; i < dims * n; ++i) data[i] = (float_t)i; + ctx -> dims = dims; + ctx -> n_points = n; + } + */ + + /* read from files */ + + if (ctx->mpi_rank == 0) { + data = read_data_file(ctx, "../norm_data/std_LR_091_0001", MY_TRUE); + // std_g0163178_Me14_091_0000 + // data = + // read_data_file(ctx,"../norm_data/std_g0163178_Me14_091_0000",MY_TRUE); + // ctx -> n_points = 48*5*2000; + ctx->dims = 5; + ctx->n_points = ctx->n_points / ctx->dims; + mpi_printf(ctx, "Read %lu points in %u dims\n", ctx->n_points, ctx->dims); + } + + /* communicate the total number of points*/ + MPI_Bcast(&(ctx->dims), 1, MPI_UINT32_T, 0, ctx->mpi_communicator); + MPI_Bcast(&(ctx->n_points), 1, MPI_UINT64_T, 0, ctx->mpi_communicator); + + /* compute the number of elements to recieve for each processor */ + int *send_counts = (int *)malloc(ctx->world_size * sizeof(int)); + int *displacements = (int *)malloc(ctx->world_size * sizeof(int)); + + displacements[0] = 0; + send_counts[0] = ctx->n_points / ctx->world_size; + send_counts[0] += (ctx->n_points % ctx->world_size) > 0 ? 1 : 0; + send_counts[0] = send_counts[0] * ctx->dims; + + for (int p = 1; p < ctx->world_size; ++p) { + send_counts[p] = (ctx->n_points / ctx->world_size); + send_counts[p] += (ctx->n_points % ctx->world_size) > p ? 1 : 0; + send_counts[p] = send_counts[p] * ctx->dims; + displacements[p] = displacements[p - 1] + send_counts[p]; + } + + ctx->local_n_points = send_counts[ctx->mpi_rank] / ctx->dims; + + /* +* MPI_Scatterv(const void *sendbuf, const int *sendcounts, const int *displs, + MPI_Datatype sendtype, void +*recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) + */ + + float_t *pvt_data = + (float_t *)malloc(send_counts[ctx->mpi_rank] * sizeof(float_t)); + + MPI_Scatterv(data, send_counts, displacements, MPI_MY_FLOAT, pvt_data, + send_counts[ctx->mpi_rank], MPI_MY_FLOAT, 0, + ctx->mpi_communicator); + + ctx->local_data = pvt_data; + + int k_local = 20; + int k_global = 20; + float_t *global_bin_counts = (float_t *)malloc(k_global * sizeof(float_t)); + uint64_t *global_bin_counts_int = + (uint64_t *)malloc(k_global * sizeof(uint64_t)); + + float_t *best_guess = (float_t *)malloc(ctx->dims * sizeof(float_t)); + float_t *bin_bounds = (float_t *)malloc((k_local + 1) * sizeof(float_t)); + + pointset_t original_ps; + original_ps.data = ctx->local_data; + original_ps.dims = ctx->dims; + original_ps.n_points = ctx->local_n_points; + original_ps.lb_box = NULL; + original_ps.ub_box = NULL; + + float_t incr = 0.05; + float_t tol = 0.001; + for (int d = 0; d < ctx->dims; ++d) { + for (float_t f = incr; f < 1; f += incr) { + + int best_bin_idx; + float_t ep; + + /* + compute_bounding_box_pointset(ctx,&original_ps); + compute_adaptive_binning_pointset(ctx,&original_ps,k_local,k_global, d, + bin_bounds, global_bin_counts); best_bin_idx = + retrieve_guess_adaptive(ctx, &original_ps,global_bin_counts, best_guess, + k_global, d, f); + //check_pc_pointset(ctx, &ps, best_guess, d, f); + ep = check_pc_pointset_parallel(ctx, &original_ps, best_guess, d, f); + */ + + /** + if(fabs(ep - f) > 0.02) + { + + refine_on_bin(ctx, &original_ps, best_guess, global_bin_counts, + best_bin_idx, k_global, k_local, d, f, 0.2); + } + */ + // MPI_DB_PRINT("--------------------------------------\n\n"); + + compute_bounding_box_pointset(ctx, &original_ps); + compute_pure_global_binning(ctx, &original_ps, global_bin_counts_int, + k_global, d); + best_bin_idx = retrieve_guess_pure( + ctx, &original_ps, global_bin_counts_int, best_guess, k_global, d, f); + // check_pc_pointset(ctx, &ps, best_guess, d, f); + ep = check_pc_pointset_parallel(ctx, &original_ps, best_guess, d, f); + ep = refine_pure_binning(ctx, &original_ps, best_guess, + global_bin_counts_int, best_bin_idx, k_global, + d, f, ep, tol); + MPI_DB_PRINT("[MASTER] dimension %d searching for %lf found %lf\n", d, f, + ep); + } + MPI_DB_PRINT("--------------------------------------\n\n"); + } + + // compute_bounding_box(ctx); + // global_binning_check(ctx, data,d, k_global); + // retrieve_pc(ctx, global_bin_counts, best_guess, k_global, d, f); + // check_pc(ctx, best_guess, data, d, f); + + // compute_medians_and_check(ctx,data); + + free(send_counts); + free(displacements); + free(global_bin_counts); + // free(pvt_data); + free(bin_bounds); + free(best_guess); + if (ctx->mpi_rank == 0) + free(data); + + original_ps.data = NULL; + free_pointset(&original_ps); + // free(pvt_data); +} diff --git a/src/tree/tree.h b/src/tree/tree.h new file mode 100644 index 0000000000000000000000000000000000000000..d537771330f1b581c4249d5646d360751ed596ad --- /dev/null +++ b/src/tree/tree.h @@ -0,0 +1,59 @@ +#pragma once +#include <stdlib.h> +#include <stdio.h> +#include <memory.h> +#include <math.h> +#include "../common/common.h" + +typedef struct mpi_double_int{ + float_t val; + int key; +} mpi_double_int; + + +typedef struct partition_t +{ + int d; + int n_procs; + size_t n_points; + float_t* base_ptr; +} partition_t; + +typedef struct partition_queue_t +{ + int count; + int _capacity; + struct partition_t* data; +} partition_queue_t; + + +typedef struct top_kdtree_node_t +{ + float_t* data; + //float_t* node_box_lb; //Needed? + //float_t* node_box_ub; //Needed? + int owner; + int split_dim; + int is_leaf; + size_t n_points; + struct top_kdtree_node_t* lch; + struct top_kdtree_node_t* rch; + struct top_kdtree_node_t* parent; +} top_kdtree_node_t; + +typedef struct top_kdtree_t +{ + int dims; + size_t count; + size_t _capacity; + float_t* medians; + struct top_kdtree_node_t* _nodes; + struct top_kdtree_node_t* root; + +} top_kdtree_t; + + +void simulate_master_read_and_scatter(int, size_t, global_context_t* ); + + + diff --git a/src/utils/utils.h b/src/utils/utils.h new file mode 100644 index 0000000000000000000000000000000000000000..6f70f09beec2219624baeca92e2cd7deaa104fb4 --- /dev/null +++ b/src/utils/utils.h @@ -0,0 +1 @@ +#pragma once diff --git a/var.py b/var.py new file mode 100644 index 0000000000000000000000000000000000000000..708cc54871c1a4c9d1ce62bd903ec4960a193898 --- /dev/null +++ b/var.py @@ -0,0 +1,6 @@ +import numpy as np + +d = np.fromfile("../norm_data/std_LR_091_0000", dtype=np.float32) +print(d.shape) +d = d.reshape((d.shape[0]//5,5)) +print(np.cov(d.T))