diff --git a/src/common/common.c b/src/common/common.c index 0ecc276d148f1bad907f1f8d876569d0b0fb9fef..8078818f543079021008b3a9d71aa0e3d523b0ec 100644 --- a/src/common/common.c +++ b/src/common/common.c @@ -202,6 +202,88 @@ void lu_dynamic_array_init(lu_dynamic_array_t * a) a -> size = 0; } +const char* __units[3] = {"MB", "GB", "TB"}; +const double __multiplier[3] = {1e6, 1e9, 1e12}; + +static inline int get_unit_measure(size_t bytes) +{ + if((double)bytes < (1e9)) + { + return 0; + } + else if ((double)bytes < (1e12)) { + return 1; + } + else + { + return 2; + } + + +} + +float_t* read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims, + const int file_in_float32) +{ + + FILE *f = fopen(fname, "r"); + if (!f) + { + printf("Nope\n"); + exit(1); + } + fseek(f, 0, SEEK_END); + size_t n = ftell(f); + rewind(f); + + int InputFloatSize = file_in_float32 ? 4 : 8; + + n = n / (InputFloatSize); + + float_t *data = (float_t *)MY_MALLOC(n * sizeof(float_t)); + + if (file_in_float32) + { + float *df = (float *)MY_MALLOC(n * sizeof(float)); + size_t fff = fread(df, sizeof(float), n, f); + + int measure = get_unit_measure(fff * sizeof(float)); + double file_len_converted = (double)(fff * sizeof(float))/__multiplier[measure]; + + mpi_printf(ctx, "Read %.2lf%s\n", file_len_converted, __units[measure]); + + ctx -> dims = ndims; + ctx -> n_points = n / ctx -> dims; + + mpi_printf(ctx, "Got ndims %lu npoints %lu\n", ctx -> dims, ctx -> n_points); + fclose(f); + + for (uint64_t i = 0; i < n; ++i) data[i] = (float_t)(df[i]); + + free(df); + } + else + { + double *df = (double *)MY_MALLOC(n * sizeof(double)); + size_t fff = fread(df, sizeof(double), n, f); + + int measure = get_unit_measure(fff * sizeof(double)); + double file_len_converted = (double)(fff * sizeof(double))/__multiplier[measure]; + mpi_printf(ctx, "Read %.2lf%s\n", file_len_converted, __units[measure]); + + ctx -> dims = ndims; + ctx -> n_points = n / ctx -> dims; + + mpi_printf(ctx, "Got ndims %lu npoints %lu\n", ctx -> dims, ctx -> n_points); + fclose(f); + + for (uint64_t i = 0; i < n; ++i) data[i] = (float_t)(df[i]); + + free(df); + } + return data; +} + void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname) { //MPI_Barrier(ctx -> mpi_communicator); @@ -294,7 +376,7 @@ void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_s //Gather on master // - uint64_t default_msg_len = 10000000; //bytes + uint64_t default_msg_len = 100000; //bytes if(I_AM_MASTER) { diff --git a/src/common/common.h b/src/common/common.h index 097ab97b583bcd7873ad1bcdd956508346399e9c..6194e1fef84e03e879dc33c0f7c3db29df8cc23b 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -54,9 +54,9 @@ typedef struct datapoint_info_t { #define MY_TRUE 1 #define MY_FALSE 0 -#define CHECK_ALLOCATION(x) if(!x){printf("[!!!] %d rank encountered failed allocation at line %s ", ctx -> mpi_rank, __LINE__ ); exit(1);}; +#define CHECK_ALLOCATION(x) if(!x){printf("[!!!] %d rank encountered failed allocation at line %s \n", ctx -> mpi_rank, __LINE__ ); exit(1);}; -#define CHECK_ALLOCATION_NO_CTX(x) if(!x){printf("[!!!] Failed allocation at line %d ", __LINE__ ); exit(1);} +#define CHECK_ALLOCATION_NO_CTX(x) if(!x){printf("[!!!] Failed allocation at line %d \n", __LINE__ ); exit(1);} #define MY_MALLOC(n) ({void* p = calloc(n,1); CHECK_ALLOCATION_NO_CTX(p); p; }) #define DB_PRINT(...) printf(__VA_ARGS__) @@ -198,3 +198,4 @@ void ordered_data_to_file(global_context_t* ctx, const char* fname); void ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname); void test_file_path(const char* fname); void big_ordered_buffer_to_file(global_context_t* ctx, void* buffer, size_t el_size, uint64_t n, const char* fname); +float_t* read_data_file(global_context_t *ctx, const char *fname, const idx_t ndims, const int file_in_float32); diff --git a/src/main/main.c b/src/main/main.c index fa4f3f45fd9c3b2d603c30e20b7cddc6ea4dbd44..aa291a037c0c63e1b7e0458cf19cc54f2c2e5388 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -113,11 +113,13 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) } + TIME_START; if (ctx->mpi_rank == 0) { //data = read_data_file(ctx, "../norm_data/50_blobs_more_var.npy", MY_TRUE); //ctx->dims = 2; //data = read_data_file(ctx, "../norm_data/blobs_small.npy", MY_FALSE); + //data = read_data_file(ctx, "../norm_data/blobs_small.npy", MY_FALSE); // std_g0163178_Me14_091_0000 // 100M points @@ -126,13 +128,14 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) //data = read_data_file(ctx,"../norm_data/huge_blobs.npy",MY_FALSE); // 2B points // data = read_data_file(ctx,"../norm_data/very_huge_blobs.npy",MY_FALSE); + // data = read_data_file(ctx,"../norm_data/hd_blobs.npy",5,MY_FALSE); // 190M points // std_g2980844_091_0000 - // data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE); + data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",5,MY_TRUE); /* 1M points ca.*/ - data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE); + //data = read_data_file(ctx,"../norm_data/std_LR_091_0001",5,MY_TRUE); /* BOX */ // data = read_data_file(ctx,"../norm_data/std_Box_256_30_092_0000",MY_TRUE); @@ -142,15 +145,12 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) // data = read_data_file(ctx,"../norm_data/std_g0144846_Me14_091_0001",MY_TRUE); //88M - //data = read_data_file(ctx,"../norm_data/std_g5503149_091_0000",MY_TRUE); + // data = read_data_file(ctx,"../norm_data/std_g5503149_091_0000",MY_TRUE); // //34 M //data = read_data_file(ctx,"../norm_data/std_g1212639_091_0001",MY_TRUE); - ctx -> dims = 5; - // ctx->dims = 2; - ctx->n_points = ctx->n_points / ctx->dims; - + //for weak scalability // ctx->n_points = ctx->n_points / 2; // ctx->n_points = (ctx->n_points / 32) * ctx -> world_size; @@ -162,8 +162,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) MPI_Bcast(&(ctx->n_points), 1, MPI_UINT64_T, 0, ctx->mpi_communicator); /* compute the number of elements to recieve for each processor */ - int *send_counts = (int *)MY_MALLOC(ctx->world_size * sizeof(int)); - int *displacements = (int *)MY_MALLOC(ctx->world_size * sizeof(int)); + idx_t *send_counts = (idx_t *)MY_MALLOC(ctx->world_size * sizeof(idx_t)); + idx_t *displacements = (idx_t *)MY_MALLOC(ctx->world_size * sizeof(idx_t)); displacements[0] = 0; send_counts[0] = ctx->n_points / ctx->world_size; @@ -183,7 +183,45 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) float_t *pvt_data = (float_t *)MY_MALLOC(send_counts[ctx->mpi_rank] * sizeof(float_t)); - MPI_Scatterv(data, send_counts, displacements, MPI_MY_FLOAT, pvt_data, send_counts[ctx->mpi_rank], MPI_MY_FLOAT, 0, ctx->mpi_communicator); + uint64_t default_msg_len = 10000000; //bytes + + if(I_AM_MASTER) + { + memcpy(pvt_data, data, ctx -> dims * ctx -> local_n_points * sizeof(float_t)); + int already_sent_points = 0; + for(int i = 1; i < ctx -> world_size; ++i) + { + already_sent_points = 0; + while(already_sent_points < send_counts[i]) + { + int count_send = MIN(default_msg_len, send_counts[i] - already_sent_points); + MPI_Send(data + displacements[i] + already_sent_points, count_send, MPI_MY_FLOAT, i, ctx -> mpi_rank, ctx -> mpi_communicator); + already_sent_points += count_send; + //DB_PRINT("[RANK 0] has sent to rank %d %d elements out of %lu\n",i, already_sent_points, send_counts[i]); + } + //DB_PRINT("------------------------------------------------\n"); + } + } + else + { + int already_recvd_points = 0; + while(already_recvd_points < send_counts[ctx -> mpi_rank]) + { + MPI_Status status; + MPI_Probe(0, MPI_ANY_TAG, ctx -> mpi_communicator, &status); + + MPI_Request request; + int count_recv; + int source = status.MPI_SOURCE; + MPI_Get_count(&status, MPI_MY_FLOAT, &count_recv); + + MPI_Recv(pvt_data + already_recvd_points, count_recv, MPI_MY_FLOAT, source, MPI_ANY_TAG, ctx -> mpi_communicator, MPI_STATUS_IGNORE); + already_recvd_points += count_recv; + } + } + + elapsed_time = TIME_STOP; + LOG_WRITE("Importing file ad scattering", elapsed_time); if (I_AM_MASTER) free(data); @@ -214,13 +252,11 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) exchange_points(ctx, &tree); elapsed_time = TIME_STOP; LOG_WRITE("Top kdtree build and domain decomposition", elapsed_time); - //test_the_idea(ctx); TIME_START; kdtree_v2 local_tree; kdtree_v2_init( &local_tree, ctx -> local_data, ctx -> local_n_points, (unsigned int)ctx -> dims); int k = 300; - //int k = 30; datapoint_info_t* dp_info = (datapoint_info_t*)MY_MALLOC(ctx -> local_n_points * sizeof(datapoint_info_t)); for(uint64_t i = 0; i < ctx -> local_n_points; ++i) @@ -267,7 +303,6 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx) float_t z = 3; - //compute_density_kstarnn_rma(ctx, id, MY_FALSE); compute_density_kstarnn_rma_v2(ctx, id, MY_FALSE); compute_correction(ctx, z); elapsed_time = TIME_STOP; diff --git a/src/tree/tree.c b/src/tree/tree.c index bfa18d8fbbddb8790edee724d9506bc356ba612f..028ee87e932bfa7b8dc67a636797a350de1fb68f 100644 --- a/src/tree/tree.c +++ b/src/tree/tree.c @@ -49,51 +49,6 @@ int cmp_float_t(const void* a, const void* b) -float_t* read_data_file(global_context_t *ctx, const char *fname, - const int file_in_float32) -{ - - FILE *f = fopen(fname, "r"); - if (!f) - { - printf("Nope\n"); - exit(1); - } - fseek(f, 0, SEEK_END); - size_t n = ftell(f); - rewind(f); - - int InputFloatSize = file_in_float32 ? 4 : 8; - - n = n / (InputFloatSize); - - float_t *data = (float_t *)MY_MALLOC(n * sizeof(float_t)); - - if (file_in_float32) - { - float *df = (float *)MY_MALLOC(n * sizeof(float)); - size_t fff = fread(df, sizeof(float), n, f); - mpi_printf(ctx, "Read %luB\n", fff); - fclose(f); - - for (uint64_t i = 0; i < n; ++i) data[i] = (float_t)(df[i]); - - free(df); - } - else - { - double *df = (double *)MY_MALLOC(n * sizeof(double)); - size_t fff = fread(df, sizeof(double), n, f); - mpi_printf(ctx, "Read %luB\n", fff); - fclose(f); - - for (uint64_t i = 0; i < n; ++i) data[i] = (float_t)(df[i]); - - free(df); - } - ctx->n_points = n; - return data; -} /* quickselect for an element along a dimension */ @@ -827,10 +782,10 @@ void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree int selected_dim = 0; partition_t current_partition = { .d = selected_dim, - .base_ptr = og_pointset->data, - .n_points = og_pointset->n_points, - .n_procs = ctx->world_size, - .parent = NULL, + .base_ptr = og_pointset->data, + .n_points = og_pointset->n_points, + .n_procs = ctx->world_size, + .parent = NULL, .lr = NO_CHILD }; @@ -946,22 +901,22 @@ void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree int next_dimension = (++selected_dim) % (ctx->dims); partition_t left_partition = { .n_points = points_left, - .n_procs = procs_left, - .start_proc = current_partition.start_proc, - .parent = current_node, - .lr = TOP_TREE_LCH, + .n_procs = procs_left, + .start_proc = current_partition.start_proc, + .parent = current_node, + .lr = TOP_TREE_LCH, .base_ptr = current_pointset.data, - .d = next_dimension, + .d = next_dimension, }; partition_t right_partition = { .n_points = points_right, - .n_procs = procs_right, - .start_proc = current_partition.start_proc + procs_left, - .parent = current_node, - .lr = TOP_TREE_RCH, + .n_procs = procs_right, + .start_proc = current_partition.start_proc + procs_left, + .parent = current_node, + .lr = TOP_TREE_RCH, .base_ptr = current_pointset.data + pv * current_pointset.dims, - .d = next_dimension + .d = next_dimension }; enqueue_partition(&queue, left_partition); diff --git a/src/tree/tree.h b/src/tree/tree.h index 1ee44f847f926b3713114ba76165f2aab8795a6d..1d84445fee96d3440f49346fe7e29e62bb480512 100644 --- a/src/tree/tree.h +++ b/src/tree/tree.h @@ -90,7 +90,6 @@ typedef struct top_kdtree_t void simulate_master_read_and_scatter(int, size_t, global_context_t* ); -float_t* read_data_file(global_context_t *ctx, const char *fname, const int file_in_float32); void top_tree_init(global_context_t *ctx, top_kdtree_t *tree); void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree_t *tree, int n_bins, float_t tolerance); void exchange_points(global_context_t* ctx, top_kdtree_t* tree);