/* * Implementation of a distributed memory kd-tree * The idea is to have a top level domain decomposition with a shallow shared * top level tree between computational nodes. * * Then each domain has a different set of points to work on separately * the top tree serves as a map to know later on in which processor ask for * neighbors */ #include "tree.h" #include "mpi.h" #include #include #include #include #include #ifdef USE_FLOAT32 #define MPI_MY_FLOAT MPI_FLOAT #else #define MPI_MY_FLOAT MPI_DOUBLE #endif #define I_AM_MASTER ctx->mpi_rank == 0 #define TOP_TREE_RCH 1 #define TOP_TREE_LCH 0 #define NO_CHILD -1 float_t *read_data_file(global_context_t *ctx, const char *fname, const int file_in_float32) { FILE *f = fopen(fname, "r"); if (!f) { printf("Nope\n"); exit(1); } fseek(f, 0, SEEK_END); size_t n = ftell(f); rewind(f); int InputFloatSize = file_in_float32 ? 4 : 8; n = n / (InputFloatSize); float_t *data = (float_t *)malloc(n * sizeof(float_t)); if (file_in_float32) { float *df = (float *)malloc(n * sizeof(float)); size_t fff = fread(df, sizeof(float), n, f); mpi_printf(ctx, "Read %luB\n", fff); fclose(f); for (uint64_t i = 0; i < n; ++i) data[i] = (float_t)(df[i]); free(df); } else { double *df = (double *)malloc(n * sizeof(double)); size_t fff = fread(df, sizeof(double), n, f); mpi_printf(ctx, "Read %luB\n", fff); fclose(f); for (uint64_t i = 0; i < n; ++i) data[i] = (float_t)(df[i]); free(df); } ctx->n_points = n; return data; } /* quickselect for an element along a dimension */ void swap_data_element(float_t *a, float_t *b, int vec_len) { float_t tmp; for (int i = 0; i < vec_len; ++i) { tmp = a[i]; a[i] = b[i]; b[i] = tmp; } } int compare_data_element(float_t *a, float_t *b, int compare_dim) { return -((a[compare_dim] - b[compare_dim]) > 0.) + ((a[compare_dim] - b[compare_dim]) < 0.); } int partition_data_element(float_t *array, int vec_len, int compare_dim, int left, int right, int pivot_index) { int store_index = left; int i; /* Move pivot to end */ swap_data_element(array + pivot_index * vec_len, array + right * vec_len, vec_len); for (i = left; i < right; ++i) { // if(compare_data_element(array + i*vec_len, array + pivot_index*vec_len, // compare_dim ) >= 0){ if (array[i * vec_len + compare_dim] < array[right * vec_len + compare_dim]) { swap_data_element(array + store_index * vec_len, array + i * vec_len, vec_len); store_index += 1; } } /* Move pivot to its final place */ swap_data_element(array + (store_index)*vec_len, array + right * vec_len, vec_len); return store_index; } int qselect_data_element(float_t *array, int vec_len, int compare_dim, int left, int right, int n) { int pivot_index; if (left == right) { return left; } pivot_index = left; // + (rand() % (right-left + 1)); /* random int left <= x <= right */ pivot_index = partition_data_element(array, vec_len, compare_dim, left, right, pivot_index); /* The pivot is in its final sorted position */ if (n == pivot_index) { return pivot_index; } else if (n < pivot_index) { return qselect_data_element(array, vec_len, compare_dim, left, pivot_index - 1, n); } else { return qselect_data_element(array, vec_len, compare_dim, pivot_index + 1, right, n); } } int quickselect_data_element(float_t *array, int vec_len, int array_size, int compare_dim, int k) { return qselect_data_element(array, vec_len, compare_dim, 0, array_size - 1, k - 1); } int CMP_DIM; int compare_data_element_sort(const void *a, const void *b) { float_t aa = *((float_t *)a + CMP_DIM); float_t bb = *((float_t *)b + CMP_DIM); return ((aa - bb) > 0.) - ((aa - bb) < 0.); } void compute_bounding_box(global_context_t *ctx) { ctx->lb_box = (float_t *)malloc(ctx->dims * sizeof(float_t)); ctx->ub_box = (float_t *)malloc(ctx->dims * sizeof(float_t)); for (size_t d = 0; d < ctx->dims; ++d) { ctx->lb_box[d] = 99999999.; ctx->ub_box[d] = -99999999.; } #define local_data ctx->local_data #define lb ctx->lb_box #define ub ctx->ub_box /* compute minimum and maximum for each dimensions, store them in local bb */ /* each processor on its own */ for (size_t i = 0; i < ctx->local_n_points; ++i) { for (size_t d = 0; d < ctx->dims; ++d) { lb[d] = MIN(local_data[i * ctx->dims + d], lb[d]); ub[d] = MAX(local_data[i * ctx->dims + d], ub[d]); } } /* Reduce to obtain bb */ /* MPI_Allreduce( const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) */ /*get the bounding box */ MPI_Allreduce(MPI_IN_PLACE, lb, ctx->dims, MPI_MY_FLOAT, MPI_MIN, ctx->mpi_communicator); MPI_Allreduce(MPI_IN_PLACE, ub, ctx->dims, MPI_MY_FLOAT, MPI_MAX, ctx->mpi_communicator); /* DB_PRINT("[RANK %d]:", ctx -> mpi_rank); for(size_t d = 0; d < ctx -> dims; ++d) { DB_PRINT("%lf ", ctx -> ub_box[d]); } DB_PRINT("\n"); */ /* MPI_DB_PRINT("[BOUNDING BOX]: "); for(size_t d = 0; d < ctx -> dims; ++d) MPI_DB_PRINT("d%d:[%lf, %lf] ",(int)d, lb[d], ub[d]); MPI_DB_PRINT("\n"); */ #undef local_data #undef lb #undef ub } /* i want a queue to enqueue the partitions to deal with */ void enqueue_partition(partition_queue_t *queue, partition_t p) { if (queue->count == queue->_capacity) { queue->_capacity = queue->_capacity * 1.10; queue->data = realloc(queue->data, queue->_capacity); } /* insert point */ memmove(queue->data + 1, queue->data, queue->count * sizeof(partition_t)); queue->data[0] = p; queue->count++; } partition_t dequeue_partition(partition_queue_t *queue) { return queue->data[--(queue->count)]; } void compute_medians_and_check(global_context_t *ctx, float_t *data) { float_t prop = 0.5; int k = (int)(ctx->local_n_points * prop); int d = 1; /*quick select on a particular dimension */ CMP_DIM = d; int kk = (k - 1) * ctx->dims; int count = 0; // idx = idx - 1; // int aaa = quickselect_data_element(ctx->local_data, (int)(ctx->dims), (int)(ctx->local_n_points), d, k); /* * sanity check * check if the median found in each node is * a median */ float_t *medians_rcv = (float_t *)malloc(ctx->dims * ctx->world_size * sizeof(float_t)); /* * MPI_Allgather( const void *sendbuf, * int sendcount, * MPI_Datatype sendtype, * void *recvbuf, * int recvcount, * MPI_Datatype recvtype, * MPI_Comm comm) */ /* Exchange medians */ MPI_Allgather(ctx->local_data + kk, ctx->dims, MPI_MY_FLOAT, medians_rcv, ctx->dims, MPI_MY_FLOAT, ctx->mpi_communicator); /* sort medians on each node */ CMP_DIM = d; qsort(medians_rcv, ctx->world_size, ctx->dims * sizeof(float_t), compare_data_element_sort); /* * Evaluate goodness of the median on master which has whole dataset */ if (ctx->mpi_rank == 0) { int count = 0; int idx = (int)(prop * (ctx->world_size)); // idx = idx - 1; for (int i = 0; i < ctx->n_points; ++i) { count += data[i * ctx->dims + d] <= medians_rcv[idx * ctx->dims + d]; } mpi_printf(ctx, "Choosing %lf percentile on dimension %d: empirical prop %lf\n", prop, d, (float_t)count / (float_t)(ctx->n_points)); } free(medians_rcv); } float_t check_pc_pointset_parallel(global_context_t *ctx, pointset_t *ps, guess_t g, int d, float_t prop) { /* * ONLY FOR TEST PURPOSES * gather on master all data * perform the count on master */ int pvt_count = 0; for (int i = 0; i < ps->n_points; ++i) { pvt_count += ps->data[i * ps->dims + d] <= g.x_guess; } int pvt_n_and_tot[2] = {pvt_count, ps->n_points}; int tot_count[2]; MPI_Allreduce(pvt_n_and_tot, tot_count, 2, MPI_INT, MPI_SUM, ctx->mpi_communicator); float_t ep = (float_t)tot_count[0] / (float_t)(tot_count[1]); /* mpi_printf(ctx,"[PS TEST PARALLEL]: "); mpi_printf(ctx,"Condsidering %d points, searching for %lf percentile on dimension %d: empirical measure %lf\n",tot_count[1],prop, d, ep); */ return ep; } void compute_bounding_box_pointset(global_context_t *ctx, pointset_t *ps) { for (size_t d = 0; d < ps->dims; ++d) { ps->lb_box[d] = 99999999.; ps->ub_box[d] = -99999999.; } #define local_data ps->data #define lb ps->lb_box #define ub ps->ub_box /* compute minimum and maximum for each dimensions, store them in local bb */ /* each processor on its own */ for (size_t i = 0; i < ps->n_points; ++i) { for (size_t d = 0; d < ps->dims; ++d) { lb[d] = MIN(local_data[i * ps->dims + d], lb[d]); ub[d] = MAX(local_data[i * ps->dims + d], ub[d]); } } /* Reduce to obtain bb */ /* MPI_Allreduce( const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) */ /*get the bounding box */ MPI_Allreduce(MPI_IN_PLACE, lb, ps->dims, MPI_MY_FLOAT, MPI_MIN, ctx->mpi_communicator); MPI_Allreduce(MPI_IN_PLACE, ub, ps->dims, MPI_MY_FLOAT, MPI_MAX, ctx->mpi_communicator); /* DB_PRINT("[RANK %d]:", ctx -> mpi_rank); for(size_t d = 0; d < ctx -> dims; ++d) { DB_PRINT("%lf ", ctx -> ub_box[d]); } DB_PRINT("\n"); */ /* MPI_DB_PRINT("[PS BOUNDING BOX]: "); for(size_t d = 0; d < ps -> dims; ++d) MPI_DB_PRINT("d%d:[%lf, %lf] ",(int)d, lb[d], ub[d]); MPI_DB_PRINT("\n"); */ #undef local_data #undef lb #undef ub } guess_t retrieve_guess_pure(global_context_t *ctx, pointset_t *ps, uint64_t *global_bin_counts, int k_global, int d, float_t pc) { /* * retrieving the best median guess from pure binning */ float_t total_count = 0.; for (int i = 0; i < k_global; ++i) total_count += (float_t)global_bin_counts[i]; /* MPI_DB_PRINT("[ "); for(int i = 0; i < k_global; ++i) { MPI_DB_PRINT("%lu %lf --- ", global_bin_counts[i], (float_t)global_bin_counts[i]/(float_t)total_count); } MPI_DB_PRINT("\n"); */ float_t cumulative_count = 0; int idx = 0; while ((cumulative_count + (float_t)global_bin_counts[idx]) / total_count < pc) { cumulative_count += (float_t)global_bin_counts[idx]; idx++; } /* find best spot in the bin */ float_t box_lb = ps->lb_box[d]; float_t box_ub = ps->ub_box[d]; float_t box_width = box_ub - box_lb; float_t global_bin_width = box_width / (float_t)k_global; float_t x0 = box_lb + (global_bin_width * (idx)); float_t x1 = box_lb + (global_bin_width * (idx + 1)); float_t y0 = (cumulative_count) / total_count; float_t y1 = (cumulative_count + global_bin_counts[idx]) / total_count; float_t x_guess = (pc - y0) / (y1 - y0) * (x1 - x0) + x0; /* MPI_DB_PRINT("[MASTER] best guess @ %lf is %lf on bin %d on dimension %d --- x0 %lf x1 %lf y0 %lf y1 %lf\n",pc, x_guess,idx, d, x0, x1, y0, y1); */ guess_t g = {.bin_idx = idx, .x_guess = x_guess}; return g; } void global_binning_check(global_context_t *ctx, float_t *data, int d, int k) { /* * sanity check * find if global bins are somehow similar to acutal binning on master */ if (I_AM_MASTER) { int *counts = (int *)malloc(k * sizeof(int)); for (int bin_idx = 0; bin_idx < k; ++bin_idx) counts[bin_idx] = 0; float_t box_lb = ctx->lb_box[d]; float_t box_ub = ctx->ub_box[d]; float_t box_width = box_ub - box_lb; float_t bin_width = box_width / (float_t)k; for (int i = 0; i < ctx->n_points; ++i) { int bin_idx = (int)((data[i * ctx->dims + d] - box_lb) / bin_width); if (bin_idx < k) counts[bin_idx]++; // counts[bin_idx]++ } int cc = 0; /* MPI_DB_PRINT("Actual bins: ["); for(int bin_idx = 0; bin_idx < k; ++bin_idx) { MPI_DB_PRINT("%d ", counts[bin_idx]); cc += counts[bin_idx]; } MPI_DB_PRINT("] tot %d\n",cc); */ free(counts); } } void compute_pure_global_binning(global_context_t *ctx, pointset_t *ps, uint64_t *global_bin_counts, int k_global, int d) { /* compute binning of data along dimension d */ uint64_t *local_bin_count = (uint64_t *)malloc(k_global * sizeof(uint64_t)); //MPI_DB_PRINT("%p %p %p %p %p\n", local_bin_count, global_bin_counts, ps -> data, ps -> lb_box, ps -> ub_box); //DB_PRINT("rank %d npoints %lu %p %p %p %p %p\n",ctx -> mpi_rank, ps -> n_points, local_bin_count, global_bin_counts, ps -> data, ps -> lb_box, ps -> ub_box); for (size_t k = 0; k < k_global; ++k) { local_bin_count[k] = 0; global_bin_counts[k] = 0; } /* MPI_DB_PRINT("[PS BOUNDING BOX %d]: ", ctx -> mpi_rank); for(size_t d = 0; d < ps -> dims; ++d) MPI_DB_PRINT("d%d:[%lf, %lf] ",(int)d, ps -> lb_box[d], ps -> ub_box[d]); MPI_DB_PRINT("\n"); MPI_DB_PRINT("\n"); */ float_t bin_w = (ps-> ub_box[d] - ps->lb_box[d]) / (float_t)k_global; for (size_t i = 0; i < ps->n_points; ++i) { float_t p = ps->data[i * ps->dims + d]; int bin_idx = (int)((p - ps->lb_box[d]) / bin_w); /* if(bin_idx < 0) { DB_PRINT("rank %d qua %lf %lf %d %lf\n",ctx -> mpi_rank, (p - ps->lb_box[d]), (p - ps->lb_box[d]) / bin_w, bin_idx, bin_w); DB_PRINT("[PS BOUNDING BOX %d i have %d]: ", ctx -> mpi_rank,d); for(size_t d = 0; d < ps -> dims; ++d) DB_PRINT("d%d:[%lf, %lf] ",(int)d, ps -> lb_box[d], ps -> ub_box[d]); MPI_DB_PRINT("\n"); DB_PRINT("\n"); } */ local_bin_count[bin_idx]++; } MPI_Allreduce(local_bin_count, global_bin_counts, k_global, MPI_UNSIGNED_LONG, MPI_SUM, ctx->mpi_communicator); free(local_bin_count); } int partition_data_around_value(float_t *array, int vec_len, int compare_dim, int left, int right, float_t pivot_value) { /* * returns the number of elements less than the pivot */ int store_index = left; int i; /* Move pivot to end */ for (i = left; i < right; ++i) { // if(compare_data_element(array + i*vec_len, array + pivot_index*vec_len, compare_dim ) >= 0){ if (array[i * vec_len + compare_dim] < pivot_value) { swap_data_element(array + store_index * vec_len, array + i * vec_len, vec_len); store_index += 1; } } /* Move pivot to its final place */ // swap_data_element(array + (store_index)*vec_len , array + right*vec_len, // vec_len); return store_index; // maybe, update, it works :) } guess_t refine_pure_binning(global_context_t *ctx, pointset_t *ps, guess_t best_guess, uint64_t *global_bin_count, int k_global, int d, float_t f, float_t tolerance) { /* refine process from obtained binning */ if (fabs(best_guess.ep - f) < tolerance) { /* MPI_DB_PRINT("[MASTER] No need to refine, finishing\n"); */ return best_guess; } float_t total_count = 0; float_t starting_cumulative = 0; for (int i = 0; i < best_guess.bin_idx; ++i) starting_cumulative += global_bin_count[i]; for (int i = 0; i < k_global; ++i) total_count += global_bin_count[i]; float_t bin_w = (ps->ub_box[d] - ps->lb_box[d]) / k_global; float_t bin_lb = ps->lb_box[d] + (bin_w * (best_guess.bin_idx)); float_t bin_ub = ps->lb_box[d] + (bin_w * (best_guess.bin_idx + 1)); uint64_t *tmp_global_bins = (uint64_t *)malloc(sizeof(uint64_t) * k_global); for (int i = 0; i < k_global; ++i) tmp_global_bins[i] = global_bin_count[i]; /* MPI_DB_PRINT("STARTING REFINE global bins: "); for(int i = 0; i < k_global; ++i) { MPI_DB_PRINT("%lf ", global_bin_count[i]); } MPI_DB_PRINT("\n"); */ guess_t g; while (fabs(best_guess.ep - f) > tolerance) { /* compute the target */ float_t ff, b0, b1; ff = -1; b0 = starting_cumulative; b1 = tmp_global_bins[best_guess.bin_idx]; ff = (f * total_count - b0) / ((float_t)tmp_global_bins[best_guess.bin_idx]); /* * generate a partset of points in the bin considered * each one has to partition its dataset according to the * fact that points on dimension d has to be in the bin * * then make into in place alg for now, copy data in another pointer * will be done in place * */ /* MPI_DB_PRINT("---- ---- ----\n"); MPI_DB_PRINT("[MASTER] Refining on bin %d lb %lf ub %lf starting c %lf %lf\n", best_guess.bin_idx, bin_lb, bin_ub, starting_cumulative/total_count, (tmp_global_bins[best_guess.bin_idx] + starting_cumulative)/total_count); */ for (int i = 0; i < k_global; ++i) tmp_global_bins[i] = 0; pointset_t tmp_ps; int end_idx = partition_data_around_value(ps->data, (int)ps->dims, d, 0, (int)ps->n_points, bin_ub); int start_idx = partition_data_around_value(ps->data, (int)ps->dims, d, 0,end_idx, bin_lb); tmp_ps.n_points = end_idx - start_idx; tmp_ps.data = ps->data + start_idx * ps->dims; tmp_ps.dims = ps->dims; tmp_ps.lb_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); tmp_ps.ub_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); compute_bounding_box_pointset(ctx, &tmp_ps); /* MPI_DB_PRINT("[MASTER] searching for %lf of the bin considered\n",ff); */ // DB_PRINT("%lu\n",tmp_ps.n_points ); MPI_Barrier(ctx->mpi_communicator); compute_pure_global_binning(ctx, &tmp_ps, tmp_global_bins, k_global, d); /* sum to global bins */ // for(int i = 0; i < k_global; ++i) tmp_global_bins[i] += // starting_cumulative; best_guess = retrieve_guess_pure(ctx, &tmp_ps, tmp_global_bins, k_global, d, ff); best_guess.ep = check_pc_pointset_parallel(ctx, ps, best_guess, d, f); // ep = check_pc_pointset_parallel(ctx, &tmp_ps, best_guess, d, f); bin_w = (tmp_ps.ub_box[d] - tmp_ps.lb_box[d]) / k_global; bin_lb = tmp_ps.lb_box[d] + (bin_w * (best_guess.bin_idx)); bin_ub = tmp_ps.lb_box[d] + (bin_w * (best_guess.bin_idx + 1)); for (int i = 0; i < best_guess.bin_idx; ++i) starting_cumulative += tmp_global_bins[i]; // free(tmp_ps.data); free(tmp_ps.lb_box); free(tmp_ps.ub_box); } /* MPI_DB_PRINT("SUCCESS!!! \n"); */ free(tmp_global_bins); return best_guess; } void init_queue(partition_queue_t *pq) { pq->count = 0; pq->_capacity = 1000; pq->data = (partition_t *)malloc(pq->_capacity * sizeof(partition_t)); } void free_queue(partition_queue_t *pq) { free(pq->data); } void get_pointset_from_partition(pointset_t *ps, partition_t *part) { ps->n_points = part->n_points; ps->data = part->base_ptr; ps->n_points = part->n_points; } guess_t compute_median_pure_binning(global_context_t *ctx, pointset_t *ps, float_t fraction, int selected_dim, int n_bins, float_t tolerance) { int best_bin_idx; float_t ep; uint64_t *global_bin_counts_int = (uint64_t *)malloc(n_bins * sizeof(uint64_t)); compute_bounding_box_pointset(ctx, ps); compute_pure_global_binning(ctx, ps, global_bin_counts_int, n_bins, selected_dim); guess_t g = retrieve_guess_pure(ctx, ps, global_bin_counts_int, n_bins, selected_dim, fraction); // check_pc_pointset(ctx, ps, best_guess, d, f); g.ep = check_pc_pointset_parallel(ctx, ps, g, selected_dim, fraction); g = refine_pure_binning(ctx, ps, g, global_bin_counts_int, n_bins, selected_dim, fraction, tolerance); free(global_bin_counts_int); return g; } void top_tree_init(global_context_t *ctx, top_kdtree_t *tree) { tree->dims = ctx->dims; tree->count = 0; tree->_capacity = 100; tree->_nodes = (top_kdtree_node_t *)malloc(tree->_capacity * sizeof(top_kdtree_node_t)); return; } void top_tree_free(global_context_t *ctx, top_kdtree_t *tree) { for(int i = 0; i < tree -> count; ++i) { if(tree -> _nodes[i].lb_node_box) free(tree -> _nodes[i].lb_node_box); if(tree -> _nodes[i].ub_node_box) free(tree -> _nodes[i].ub_node_box); } free(tree->_nodes); return; } top_kdtree_node_t* top_tree_generate_node(global_context_t* ctx, top_kdtree_t* tree) { if (tree->count == tree->_capacity) { int new_cap = tree->_capacity * 1.1; tree->_nodes = realloc(tree->_nodes, new_cap * sizeof(top_kdtree_node_t)); tree->_capacity = new_cap; } top_kdtree_node_t* ptr = tree -> _nodes + tree -> count; ptr -> lb_node_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); ptr -> ub_node_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); ptr -> owner = -1; ++tree -> count; return ptr; } void tree_print(global_context_t* ctx, top_kdtree_node_t* root) { MPI_DB_PRINT("Node %p: \n\tsplit_dim %d \n\tdata %lf", root, root -> split_dim, root -> data); MPI_DB_PRINT("\n\tparent %p", root -> parent); MPI_DB_PRINT("\n\towner %d", root -> owner); MPI_DB_PRINT("\n\tbox"); for(size_t d = 0; d < ctx -> dims; ++d) MPI_DB_PRINT("\n\t d%d:[%lf, %lf]",(int)d, root -> lb_node_box[d], root -> ub_node_box[d]); MPI_DB_PRINT("\n\n"); if(root -> lch) tree_print(ctx, root -> lch); if(root -> rch) tree_print(ctx, root -> rch); } void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree_t *tree, int n_bins, float_t tolerance) { size_t tot_n_points = 0; MPI_Allreduce(&(og_pointset->n_points), &tot_n_points, 1, MPI_UINT64_T, MPI_SUM, ctx->mpi_communicator); MPI_DB_PRINT("[MASTER] Top tree builder invoked\n"); MPI_DB_PRINT("[MASTER] Trying to build top tree on %lu with %d processors\n", tot_n_points, ctx->world_size); size_t current_partition_n_points = tot_n_points; size_t expected_points_per_node = tot_n_points / ctx->world_size; /* enqueue the two partitions */ compute_bounding_box_pointset(ctx, og_pointset); partition_queue_t queue; init_queue(&queue); int selected_dim = 0; partition_t current_partition = { .d = selected_dim, .base_ptr = og_pointset->data, .n_points = og_pointset->n_points, .n_procs = ctx->world_size, .parent = NULL, .lr = NO_CHILD }; enqueue_partition(&queue, current_partition); pointset_t current_pointset; current_pointset.lb_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); current_pointset.ub_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); while (queue.count) { /*dequeue the partition to process */ current_partition = dequeue_partition(&queue); /* generate e pointset for that partition */ get_pointset_from_partition(¤t_pointset, ¤t_partition); current_pointset.dims = ctx->dims; /*generate a tree node */ top_kdtree_node_t* current_node = current_node = top_tree_generate_node(ctx, tree); /* insert node */ /* MPI_DB_PRINT("Handling partition: \n\tcurrent_node %p, \n\tdim %d, \n\tn_points %d, \n\tstart_proc %d, \n\tn_procs %d, \n\tparent %p\n", current_node, current_partition.d, current_partition.n_points, current_partition.start_proc, current_partition.n_procs, current_partition.parent); */ switch (current_partition.lr) { case TOP_TREE_LCH: if(current_partition.parent) { current_node -> parent = current_partition.parent; current_node -> parent -> lch = current_node; /* compute the box */ /* * left child has lb equal to parent * ub equal to parent except for the dim of splitting */ int parent_split_dim = current_node -> parent -> split_dim; float_t parent_hp = current_node -> parent -> data; memcpy(current_node -> lb_node_box, current_node -> parent -> lb_node_box, ctx -> dims * sizeof(float_t)); memcpy(current_node -> ub_node_box, current_node -> parent -> ub_node_box, ctx -> dims * sizeof(float_t)); current_node -> ub_node_box[parent_split_dim] = parent_hp; } break; case TOP_TREE_RCH: if(current_partition.parent) { current_node -> parent = current_partition.parent; current_node -> parent -> rch = current_node; int parent_split_dim = current_node -> parent -> split_dim; float_t parent_hp = current_node -> parent -> data; /* * right child has ub equal to parent * lb equal to parent except for the dim of splitting */ memcpy(current_node -> lb_node_box, current_node -> parent -> lb_node_box, ctx -> dims * sizeof(float_t)); memcpy(current_node -> ub_node_box, current_node -> parent -> ub_node_box, ctx -> dims * sizeof(float_t)); current_node -> lb_node_box[parent_split_dim] = parent_hp; } break; default: { tree -> root = current_node; memcpy(current_node -> lb_node_box, og_pointset -> lb_box, ctx -> dims * sizeof(float_t)); memcpy(current_node -> ub_node_box, og_pointset -> ub_box, ctx -> dims * sizeof(float_t)); } break; } current_node -> split_dim = current_partition.d; current_node -> parent = current_partition.parent; current_node -> lch = NULL; current_node -> rch = NULL; /* handle partition */ if(current_partition.n_procs > 1) { float_t fraction = (current_partition.n_procs / 2) / (float_t)current_partition.n_procs; guess_t g = compute_median_pure_binning(ctx, ¤t_pointset, fraction, current_partition.d, n_bins, tolerance); int pv = partition_data_around_value(current_pointset.data, ctx->dims, current_partition.d, 0, current_pointset.n_points, g.x_guess); current_node -> data = g.x_guess; size_t points_left = (size_t)pv; size_t points_right = current_partition.n_points - points_left; int procs_left = current_partition.n_procs * fraction; int procs_right = current_partition.n_procs - procs_left; /* MPI_DB_PRINT("Chosing as guess: %lf, seareching for %lf, obtained %lf\n", g.x_guess, fraction, g.ep); MPI_DB_PRINT("-------------------\n\n"); */ int next_dimension = (++selected_dim) % (ctx->dims); partition_t left_partition = { .n_points = points_left, .n_procs = procs_left, .start_proc = current_partition.start_proc, .parent = current_node, .lr = TOP_TREE_LCH, .base_ptr = current_pointset.data, .d = next_dimension, }; partition_t right_partition = { .n_points = points_right, .n_procs = procs_right, .start_proc = current_partition.start_proc + procs_left, .parent = current_node, .lr = TOP_TREE_RCH, .base_ptr = current_pointset.data + pv * current_pointset.dims, .d = next_dimension }; enqueue_partition(&queue, left_partition); enqueue_partition(&queue, right_partition); MPI_Barrier(ctx -> mpi_communicator); } else { current_node -> owner = current_partition.start_proc; } } MPI_DB_PRINT("Root is %p\n", tree -> root); tree_print(ctx, tree -> root); free_queue(&queue); } void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) { float_t *data; /* generate random points */ /* if(ctx -> mpi_rank == 0) { data = (float_t*)malloc(dims*n*sizeof(float_t)); //for(size_t i = 0; i < dims * n; ++i) data[i] = (float_t)rand()/(float_t)(RAND_MAX); //for(size_t i = 0; i < n; ++i) // for(size_t j = 0; j < dims; ++j) // data[i*dims + j] = (float_t)i; for(size_t i = 0; i < dims * n; ++i) data[i] = (float_t)i; ctx -> dims = dims; ctx -> n_points = n; } */ /* read from files */ if (ctx->mpi_rank == 0) { data = read_data_file(ctx, "../norm_data/std_LR_091_0000", MY_TRUE); // std_g0163178_Me14_091_0000 // data = // read_data_file(ctx,"../norm_data/std_g0163178_Me14_091_0000",MY_TRUE); // ctx -> n_points = 48*5*2000; ctx->dims = 5; ctx->n_points = ctx->n_points / ctx->dims; mpi_printf(ctx, "Read %lu points in %u dims\n", ctx->n_points, ctx->dims); } /* communicate the total number of points*/ MPI_Bcast(&(ctx->dims), 1, MPI_UINT32_T, 0, ctx->mpi_communicator); MPI_Bcast(&(ctx->n_points), 1, MPI_UINT64_T, 0, ctx->mpi_communicator); /* compute the number of elements to recieve for each processor */ int *send_counts = (int *)malloc(ctx->world_size * sizeof(int)); int *displacements = (int *)malloc(ctx->world_size * sizeof(int)); displacements[0] = 0; send_counts[0] = ctx->n_points / ctx->world_size; send_counts[0] += (ctx->n_points % ctx->world_size) > 0 ? 1 : 0; send_counts[0] = send_counts[0] * ctx->dims; for (int p = 1; p < ctx->world_size; ++p) { send_counts[p] = (ctx->n_points / ctx->world_size); send_counts[p] += (ctx->n_points % ctx->world_size) > p ? 1 : 0; send_counts[p] = send_counts[p] * ctx->dims; displacements[p] = displacements[p - 1] + send_counts[p]; } ctx->local_n_points = send_counts[ctx->mpi_rank] / ctx->dims; /* * MPI_Scatterv(const void *sendbuf, const int *sendcounts, const int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) */ float_t *pvt_data = (float_t *)malloc(send_counts[ctx->mpi_rank] * sizeof(float_t)); MPI_Scatterv(data, send_counts, displacements, MPI_MY_FLOAT, pvt_data, send_counts[ctx->mpi_rank], MPI_MY_FLOAT, 0, ctx->mpi_communicator); ctx->local_data = pvt_data; int k_local = 20; int k_global = 20; uint64_t *global_bin_counts_int = (uint64_t *)malloc(k_global * sizeof(uint64_t)); pointset_t original_ps; original_ps.data = ctx->local_data; original_ps.dims = ctx->dims; original_ps.n_points = ctx->local_n_points; original_ps.lb_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); original_ps.ub_box = (float_t*)malloc(ctx -> dims * sizeof(float_t)); float_t incr = 0.05; float_t tol = 0.001; /* for (int d = 0; d < ctx->dims; ++d) { for (float_t f = incr; f < 1; f += incr) { int best_bin_idx; float_t ep; compute_bounding_box_pointset(ctx, &original_ps); compute_pure_global_binning(ctx, &original_ps, global_bin_counts_int, k_global, d); guess_t g = retrieve_guess_pure( ctx, &original_ps, global_bin_counts_int, k_global, d, f); // check_pc_pointset(ctx, &ps, best_guess, d, f); g.ep = check_pc_pointset_parallel(ctx, &original_ps, g, d, f); g = refine_pure_binning(ctx, &original_ps, g, global_bin_counts_int, k_global, d, f, tol); MPI_DB_PRINT("[MASTER] dimension %d searching for %lf found %lf\n", d, f, g.ep); } MPI_DB_PRINT("--------------------------------------\n\n"); } */ top_kdtree_t tree; top_tree_init(ctx, &tree); build_top_kdtree(ctx, &original_ps, &tree, k_global, tol); top_tree_free(ctx, &tree); free(send_counts); free(displacements); // free(pvt_data); if (ctx->mpi_rank == 0) free(data); original_ps.data = NULL; free_pointset(&original_ps); // free(pvt_data); }