Skip to content
Snippets Groups Projects
Commit 5f5bd321 authored by lykos98's avatar lykos98
Browse files

added prototype of logging, still working on details and formatting, added...

added prototype of logging, still working on details and formatting, added array of requests in MPI_Isends in exchange of ngbh
parent 786f1e20
Branches
Tags
No related merge requests found
Source diff could not be displayed: it is too large. Options to address this: view the blob.
......@@ -5,9 +5,11 @@
#include <mpi.h>
#include <stdint.h>
#include <time.h>
//#include <stdarg.h>
#define MAX(A,B) ((A) > (B) ? (A) : (B))
#define MIN(A,B) ((A) < (B) ? (A) : (B))
#ifdef USE_FLOAT32
#define float_t float
#else
......@@ -35,22 +37,31 @@
#define TIME_DEF
#define TIME_START
#define TIME_STOP
#define LOG_WRITE
#else
#define TIME_DEF struct timespec __start, __end;
#define TIME_START { \
MPI_Barrier(ctx -> mpi_communicator); \
clock_gettime(CLOCK_MONOTONIC,&__start); \
}
#define TIME_STOP { \
MPI_Barrier(ctx -> mpi_communicator); \
clock_gettime(CLOCK_MONOTONIC,&__end); \
MPI_DB_PRINT("[TIME] elapsed time %.2lfs\n", (double)(__end.tv_sec - __start.tv_sec) \
+ (__end.tv_nsec - __start.tv_nsec)/1e9); \
#define TIME_STOP \
(clock_gettime(CLOCK_MONOTONIC,&__end), \
(double)(__end.tv_sec - __start.tv_sec) + (__end.tv_nsec - __start.tv_nsec)/1e9)
#define LOG_WRITE(sec_name,time) { \
if(time > 0) \
{ \
double max, min, avg; \
MPI_Reduce(&time, &avg, 1, MPI_DOUBLE, MPI_SUM, 0, ctx -> mpi_communicator); \
MPI_Reduce(&time, &min, 1, MPI_DOUBLE, MPI_MIN, 0, ctx -> mpi_communicator); \
MPI_Reduce(&time, &max, 1, MPI_DOUBLE, MPI_MAX, 0, ctx -> mpi_communicator); \
MPI_DB_PRINT("%s -> [avg: %.2lfs, min: %.2lfs, max: %.2lfs]\n", sec_name, avg/((double)ctx -> world_size), min, max); \
} \
else \
{ \
MPI_DB_PRINT("%s\n", sec_name);\
}\
}
#endif
#define MAX(A,B) ((A) > (B) ? (A) : (B))
#define MIN(A,B) ((A) < (B) ? (A) : (B))
#endif
/*
* from Spriengel code Gadget4
......@@ -66,6 +77,18 @@
#define va_end(ap) __builtin_va_end(ap)
#define va_arg(ap, type) __builtin_va_arg(ap, type)
#if defined(NDEBUG)
FILE* __log_file;
#define LOG_START __log_file = fopen("","w");
#define LOG
#define LOG_END
#else
#define LOG_START
#define LOG
#define LOG_END
#endif
struct global_context_t
{
size_t n_points;
......
......@@ -22,6 +22,17 @@
//#define WRITE_NGBH
//#define WRITE_TOP_NODES
/*
* Maximum bytes to send with a single mpi send/recv, used
* while communicating results of ngbh search
*/
/* Maximum allowed is 4GB */
//#define MAX_MSG_SIZE 4294967296
/* Used slices of 10 mb */
#define MAX_MSG_SIZE 10000000
#ifdef USE_FLOAT32
#define MPI_MY_FLOAT MPI_FLOAT
#else
......@@ -811,8 +822,10 @@ void build_top_kdtree(global_context_t *ctx, pointset_t *og_pointset, top_kdtree
size_t tot_n_points = 0;
MPI_Allreduce(&(og_pointset->n_points), &tot_n_points, 1, MPI_UINT64_T, MPI_SUM, ctx->mpi_communicator);
/*
MPI_DB_PRINT("[MASTER] Top tree builder invoked\n");
MPI_DB_PRINT("[MASTER] Trying to build top tree on %lu with %d processors\n", tot_n_points, ctx->world_size);
*/
MPI_DB_PRINT("Trying to build top tree on %lu with %d processors\n", tot_n_points, ctx->world_size);
size_t current_partition_n_points = tot_n_points;
size_t expected_points_per_node = tot_n_points / ctx->world_size;
......@@ -1053,7 +1066,6 @@ int partition_data_around_key(int* key, float_t *val, int vec_len, int ref_key ,
void exchange_points(global_context_t* ctx, top_kdtree_t* tree)
{
MPI_DB_PRINT("[MASTER] Domain decomposition\n");
int* points_per_proc = (int*)malloc(ctx -> world_size * sizeof(int));
int* points_owners = (int*)malloc(ctx -> dims * ctx -> local_n_points * sizeof(float_t));
int* partition_offset = (int*)malloc(ctx -> world_size * sizeof(int));
......@@ -1335,11 +1347,10 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
/* print diagnostics */
print_diagnositcs(ctx, k);
TIME_DEF;
double elapsed_time;
TIME_DEF
MPI_DB_PRINT("[MASTER] Local ngbh search ");
TIME_START
TIME_START;
MPI_Barrier(ctx -> mpi_communicator);
#pragma omp parallel for
for(int p = 0; p < ctx -> local_n_points; ++p)
......@@ -1352,13 +1363,11 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
dp_info[idx].is_center = 0;
dp_info[idx].array_idx = idx;
}
MPI_DB_PRINT("---> Done\n");
elapsed_time = TIME_STOP;
LOG_WRITE("Local neighborhood search", elapsed_time);
TIME_STOP
MPI_DB_PRINT("[MASTER] Finding point to send to other procs\n");
TIME_START
TIME_START;
/* find if a points needs a refine on the global tree */
float_t** data_to_send_per_proc = (float_t**)malloc(ctx -> world_size * sizeof(float_t*));
int** local_idx_of_the_point = (int**)malloc(ctx -> world_size * sizeof(int*));
......@@ -1377,7 +1386,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
/* for each point walk the tree and find to which proc send data */
/* actually compute intersection of ngbh radius of each point to node box */
/* tree walk for each point can be optimized drasticaly*/
#pragma omp parallel for
for(int i = 0; i < ctx -> local_n_points; ++i)
{
......@@ -1393,13 +1401,12 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
point_to_snd_count, point_to_snd_capacity);
}
TIME_STOP
MPI_DB_PRINT("[MASTER] Sending points to send to other procs\n");
elapsed_time = TIME_STOP;
LOG_WRITE("Finding points to refine", elapsed_time);
TIME_START
TIME_START;
int* point_to_rcv_count = (int*)malloc(ctx -> world_size * sizeof(int));
/* exchange points to work on*/
MPI_Alltoall(point_to_snd_count, 1, MPI_INT, point_to_rcv_count, 1, MPI_INT, ctx -> mpi_communicator);
......@@ -1462,6 +1469,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
//int work_batch_stride = 1 + ctx -> dims;
int work_batch_stride = ctx -> dims;
/* Note that I then have to recieve an equal number of heaps as the one I sent out before */
heap_node* __heap_batches_to_snd = (heap_node*)malloc((uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node));
heap_node* __heap_batches_to_rcv = (heap_node*)malloc((uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node));
......@@ -1469,18 +1477,17 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
if( __heap_batches_to_rcv == NULL)
{
DB_PRINT("Rank %d failed to allocate rcv_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node));
exit(5);
exit(1);
}
if( __heap_batches_to_snd == NULL)
{
DB_PRINT("Rank %d failed to allocate snd_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node));
exit(5);
exit(1);
}
MPI_Barrier(ctx -> mpi_communicator);
rcv_displ[0] = 0;
snd_displ[0] = 0;
rcv_count[0] = point_to_rcv_count[0];
......@@ -1506,11 +1513,13 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
/* compute everything */
MPI_Barrier(ctx -> mpi_communicator);
TIME_STOP
elapsed_time = TIME_STOP;
LOG_WRITE("Exchanging points", elapsed_time);
MPI_DB_PRINT("[MASTER] Working on recieved points\n");
TIME_START;
TIME_START
/* ngbh search on recieved points */
for(int p = 0; p < ctx -> world_size; ++p)
{
if(point_to_rcv_count[p] > 0 && p != ctx -> mpi_rank)
......@@ -1533,19 +1542,25 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
}
}
/* sendout */
/* sendout results */
/*
* counts are inverted since I have to recieve as many batches as points I
* Have originally sended
* dummy pointers to clarify counts in this part
* act like an alias for rcv and snd counts
*/
MPI_Barrier(ctx -> mpi_communicator);
TIME_STOP
MPI_DB_PRINT("[MASTER] Sending out results\n");
TIME_START
int* ngbh_to_send = point_to_rcv_count;
int* ngbh_to_recv = point_to_snd_count;
/*
* counts are inverted since I have to recieve as many batches as points I
* Have originally sended
*/
elapsed_time = TIME_STOP;
LOG_WRITE("Ngbh search for foreing points", elapsed_time);
TIME_START;
MPI_Datatype MPI_my_heap;
MPI_Type_contiguous(k * sizeof(heap_node), MPI_CHAR, &MPI_my_heap);
......@@ -1565,12 +1580,20 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
* ------------------------------------- */
MPI_Barrier(ctx -> mpi_communicator);
uint32_t n_points_threshold = 4294967296 / (k * sizeof(heap_node));
int default_msg_len = n_points_threshold / 2;
int default_msg_len = MAX_MSG_SIZE / (k * sizeof(heap_node));
int* already_sent_points = (int*)malloc(ctx -> world_size * sizeof(int));
int* already_rcvd_points = (int*)malloc(ctx -> world_size * sizeof(int));
/* allocate a request array to keep track of all requests going out*/
MPI_Request* req_array;
int req_num = 0;
for(int i = 0; i < ctx -> world_size; ++i)
{
req_num += ngbh_to_send[i] > 0 ? ngbh_to_send[i]/default_msg_len + 1 : 0;
}
req_array = (MPI_Request*)malloc(req_num * sizeof(MPI_Request));
for(int i = 0; i < ctx -> world_size; ++i)
{
......@@ -1578,18 +1601,22 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
already_rcvd_points[i] = 0;
}
int req_idx = 0;
for(int i = 0; i < ctx -> world_size; ++i)
{
int count = 0;
if(point_to_rcv_count[i] > 0)
if(ngbh_to_send[i] > 0)
{
while(already_sent_points[i] < point_to_rcv_count[i])
while(already_sent_points[i] < ngbh_to_send[i])
{
MPI_Request request;
count = MIN(default_msg_len, point_to_rcv_count[i] - already_sent_points[i] );
count = MIN(default_msg_len, ngbh_to_send[i] - already_sent_points[i] );
MPI_Isend( heap_batches_per_node[i] + k * already_sent_points[i], count,
MPI_my_heap, i, 0, ctx -> mpi_communicator, &request);
already_sent_points[i] += count;
req_array[req_idx] = request;
++req_idx;
}
}
}
......@@ -1597,6 +1624,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
MPI_Barrier(ctx -> mpi_communicator);
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);
//DB_PRINT("%d %p %p\n",ctx -> mpi_rank, &flag, &status);
//HERE
while(flag)
{
MPI_Request request;
......@@ -1614,26 +1642,31 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
}
MPI_Barrier(ctx -> mpi_communicator);
MPI_Testall(req_num, req_array, &flag, MPI_STATUSES_IGNORE);
if(flag == 0)
{
DB_PRINT("[!!!] Rank %d has unfinished communications\n", ctx -> mpi_rank);
exit(1);
}
free(req_array);
free(already_sent_points);
free(already_rcvd_points);
TIME_STOP
/*
* actually can be slimmer, we can send only the possible points to insert
* requires an all to all and then to specify the point in to insert
*/
elapsed_time = TIME_STOP;
LOG_WRITE("Sending results to other proc", elapsed_time);
/* merge old with new heaps */
MPI_DB_PRINT("[MASTER] Merging resutls\n");
MPI_Barrier(ctx -> mpi_communicator);
TIME_START
TIME_START;
for(int i = 0; i < ctx -> world_size; ++i)
{
#pragma omp paralell for
for(int b = 0; b < point_to_snd_count[i]; ++b)
for(int b = 0; b < ngbh_to_recv[i]; ++b)
{
int idx = local_idx_of_the_point[i][b];
/* retrieve the heap */
......@@ -1657,6 +1690,9 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
heap_sort(&(dp_info[i].ngbh));
}
elapsed_time = TIME_STOP;
LOG_WRITE("Merging results", elapsed_time);
#if defined(WRITE_NGBH)
MPI_DB_PRINT("Writing ngbh to files\n");
char ngbh_out[80];
......@@ -1677,12 +1713,11 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
#endif
MPI_Barrier(ctx -> mpi_communicator);
TIME_STOP
TIME_START
for(int i = 0; i < ctx -> world_size; ++i)
{
if(data_to_send_per_proc[i]) free(data_to_send_per_proc[i]);
......@@ -1711,7 +1746,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
free(__heap_batches_to_snd);
free(__rcv_points);
free(__snd_points);
TIME_STOP
}
void test_the_idea(global_context_t* ctx)
......@@ -1744,7 +1779,6 @@ void test_the_idea(global_context_t* ctx)
void build_local_tree(global_context_t* ctx, kdtree_v2* local_tree)
{
MPI_DB_PRINT("[MASTER] Building local trees\n");
local_tree -> root = build_tree_kdtree_v2(local_tree -> _nodes, local_tree -> n_nodes, ctx -> dims);
}
......@@ -1795,9 +1829,8 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
{
float_t *data;
TIME_DEF
double elapsed_time;
TIME_START
MPI_DB_PRINT("[MASTER] Reading file and scattering\n");
if (ctx->mpi_rank == 0)
{
//data = read_data_file(ctx, "../norm_data/50_blobs_more_var.npy", MY_TRUE);
......@@ -1828,8 +1861,12 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
ctx->n_points = ctx->n_points / ctx->dims;
ctx->n_points = (ctx->n_points * 10) / 10;
// ctx -> n_points = ctx -> world_size * 1000;
mpi_printf(ctx, "Read %lu points in %u dims\n", ctx->n_points, ctx->dims);
//ctx -> n_points = 10000000 * ctx -> world_size;
//generate_random_matrix(&data, ctx -> dims, ctx -> n_points, ctx);
//mpi_printf(ctx, "Read %lu points in %u dims\n", ctx->n_points, ctx->dims);
}
//MPI_DB_PRINT("[MASTER] Reading file and scattering\n");
/* communicate the total number of points*/
MPI_Bcast(&(ctx->dims), 1, MPI_UINT32_T, 0, ctx->mpi_communicator);
......@@ -1874,20 +1911,21 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
original_ps.ub_box = (float_t*)malloc(ctx -> dims * sizeof(float_t));
float_t tol = 0.002;
TIME_STOP
top_kdtree_t tree;
TIME_START
TIME_START;
top_tree_init(ctx, &tree);
TIME_STOP
elapsed_time = TIME_STOP;
LOG_WRITE("Initializing global kdtree", elapsed_time);
TIME_START
TIME_START;
build_top_kdtree(ctx, &original_ps, &tree, k_global, tol);
exchange_points(ctx, &tree);
MPI_Barrier(ctx -> mpi_communicator);
TIME_STOP
elapsed_time = TIME_STOP;
LOG_WRITE("Top kdtree build and domain decomposition", elapsed_time);
//test_the_idea(ctx);
TIME_START;
kdtree_v2 local_tree;
kdtree_v2_init( &local_tree, ctx -> local_data, ctx -> local_n_points, (unsigned int)ctx -> dims);
int k = 300;
......@@ -1910,19 +1948,19 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
dp_info[i].cluster_idx = -1;
}
TIME_START
build_local_tree(ctx, &local_tree);
TIME_STOP
elapsed_time = TIME_STOP;
LOG_WRITE("Local trees init and build", elapsed_time);
TIME_START
MPI_DB_PRINT("[MASTER] Performing ngbh search\n");
TIME_START;
MPI_DB_PRINT("----- Performing ngbh search -----\n");
MPI_Barrier(ctx -> mpi_communicator);
mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, k);
MPI_Barrier(ctx -> mpi_communicator);
MPI_DB_PRINT("Tot time ---->");
TIME_STOP
elapsed_time = TIME_STOP;
LOG_WRITE("Total time for all knn search", elapsed_time)
#if defined (WRITE_NGBH)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment