Skip to content
Snippets Groups Projects
Commit c036a473 authored by lykos98's avatar lykos98
Browse files

added all to all in query exchange, working on retrieval via all to all of the heaps

parent da2f017a
No related branches found
No related tags found
No related merge requests found
Source diff could not be displayed: it is too large. Options to address this: view the blob.
...@@ -1116,6 +1116,8 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree) ...@@ -1116,6 +1116,8 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree)
rcvbuffer = (float_t*)malloc(tot_count * sizeof(float_t)); rcvbuffer = (float_t*)malloc(tot_count * sizeof(float_t));
/*exchange points */
MPI_Alltoallv( ctx -> local_data, send_count, send_displs, MPI_MY_FLOAT, MPI_Alltoallv( ctx -> local_data, send_count, send_displs, MPI_MY_FLOAT,
rcvbuffer, rcv_count, rcv_displs, MPI_MY_FLOAT, rcvbuffer, rcv_count, rcv_displs, MPI_MY_FLOAT,
ctx -> mpi_communicator); ctx -> mpi_communicator);
...@@ -1158,6 +1160,7 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree) ...@@ -1158,6 +1160,7 @@ void exchange_points(global_context_t* ctx, top_kdtree_t* tree)
free(partition_offset); free(partition_offset);
free(rcv_count); free(rcv_count);
free(rcv_displs); free(rcv_displs);
free(send_displs);
} }
static inline size_t local_to_global_idx(global_context_t* ctx, size_t local_idx) static inline size_t local_to_global_idx(global_context_t* ctx, size_t local_idx)
...@@ -1317,15 +1320,15 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre ...@@ -1317,15 +1320,15 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
/* find if a points needs a refine on the global tree */ /* find if a points needs a refine on the global tree */
float_t** data_to_send_per_proc = (float_t**)malloc(ctx -> world_size * sizeof(float_t*)); float_t** data_to_send_per_proc = (float_t**)malloc(ctx -> world_size * sizeof(float_t*));
int** local_idx_of_the_point = (int**)malloc(ctx -> world_size * sizeof(int*)); int** local_idx_of_the_point = (int**)malloc(ctx -> world_size * sizeof(int*));
int* point_to_send_count = (int*)malloc(ctx -> world_size * sizeof(int)); int* point_to_snd_count = (int*)malloc(ctx -> world_size * sizeof(int));
int* point_to_send_capacity = (int*)malloc(ctx -> world_size * sizeof(int)); int* point_to_snd_capacity = (int*)malloc(ctx -> world_size * sizeof(int));
for(int i = 0; i < ctx -> world_size; ++i) for(int i = 0; i < ctx -> world_size; ++i)
{ {
data_to_send_per_proc[i] = (float_t*)malloc(100 * (1 + ctx -> dims) * sizeof(float_t)); data_to_send_per_proc[i] = (float_t*)malloc(100 * (1 + ctx -> dims) * sizeof(float_t));
local_idx_of_the_point[i] = (int*)malloc(100 * sizeof(int)); local_idx_of_the_point[i] = (int*)malloc(100 * sizeof(int));
point_to_send_capacity[i] = 100; point_to_snd_capacity[i] = 100;
point_to_send_count[i] = 0; point_to_snd_count[i] = 0;
} }
/* for each point walk the tree and find to which proc send data */ /* for each point walk the tree and find to which proc send data */
...@@ -1344,73 +1347,99 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre ...@@ -1344,73 +1347,99 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
tree_walk(ctx, top_tree -> root, i, max_dist, tree_walk(ctx, top_tree -> root, i, max_dist,
point, data_to_send_per_proc, local_idx_of_the_point, point, data_to_send_per_proc, local_idx_of_the_point,
point_to_send_count, point_to_send_capacity); point_to_snd_count, point_to_snd_capacity);
} }
MPI_Barrier(ctx -> mpi_communicator); int* point_to_rcv_count = (int*)malloc(ctx -> world_size * sizeof(int));
DB_PRINT("Rank %d wants to send:\t",ctx -> mpi_rank);
for(int i = 0; i < ctx -> world_size; ++i)
{
DB_PRINT("%d:%d\t", i, point_to_send_count[i]);
}
DB_PRINT("\n");
MPI_Barrier(ctx -> mpi_communicator);
/* exchange points to work on*/ /* exchange points to work on*/
MPI_Alltoall(point_to_snd_count, 1, MPI_INT, point_to_rcv_count, 1, MPI_INT, ctx -> mpi_communicator);
int* count_rcv_work_batches = (int*)malloc(ctx -> world_size * sizeof(int)); int* rcv_count = (int*)malloc(ctx -> world_size * sizeof(int));
float_t** rcv_work_batches = (float_t**)malloc(ctx -> world_size * sizeof(float_t*)); int* snd_count = (int*)malloc(ctx -> world_size * sizeof(int));
for(int i = 0; i < ctx -> world_size; ++i) int* rcv_displ = (int*)malloc(ctx -> world_size * sizeof(int));
int* snd_displ = (int*)malloc(ctx -> world_size * sizeof(int));
/*compute counts and displs*/
rcv_displ[0] = 0;
snd_displ[0] = 0;
rcv_count[0] = point_to_rcv_count[0] * (1 + ctx -> dims);
snd_count[0] = point_to_snd_count[0] * (1 + ctx -> dims);
int tot_points_rcv = point_to_rcv_count[0];
int tot_points_snd = point_to_snd_count[0];
int tot_count = rcv_count[0];
for(int i = 1; i < ctx -> world_size; ++i)
{ {
count_rcv_work_batches[i] = 0; rcv_count[i] = point_to_rcv_count[i] * (1 + ctx -> dims);
rcv_work_batches[i] = NULL; snd_count[i] = point_to_snd_count[i] * (1 + ctx -> dims);
tot_count += rcv_count[i];
tot_points_rcv += point_to_rcv_count[i];
tot_points_snd += point_to_snd_count[i];
rcv_displ[i] = rcv_displ[i - 1] + rcv_count[i - 1];
snd_displ[i] = snd_displ[i - 1] + snd_count[i - 1];
} }
float_t* __rcv_points = (float_t*)malloc(tot_points_rcv * (1 + ctx -> dims) * sizeof(float_t));
float_t* __snd_points = (float_t*)malloc(tot_points_snd * (1 + ctx -> dims) * sizeof(float_t));
/* copy data to send in contiguous memory */
for(int i = 0; i < ctx -> world_size; ++i) for(int i = 0; i < ctx -> world_size; ++i)
{ {
if(point_to_send_count[i] > 0) memcpy(__snd_points + snd_displ[i], data_to_send_per_proc[i], snd_count[i] * sizeof(float_t));
{
MPI_Request request;
//MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request)
MPI_Isend( data_to_send_per_proc[i],
point_to_send_count[i]*(1 + ctx -> dims) , /* 1 per max_dist + point*/
MPI_MY_FLOAT, i, 0, ctx -> mpi_communicator, &request);
}
} }
MPI_Status status;
int flag;
int cc = 0;
MPI_Barrier(ctx -> mpi_communicator);
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_SOURCE, ctx -> mpi_communicator, &flag, &status);
while(flag)
{
cc++;
MPI_Request request;
int count;
MPI_Get_count(&status, MPI_MY_FLOAT, &count);
rcv_work_batches[status.MPI_SOURCE] = (float_t*)malloc(count * sizeof(float_t));
//MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status)
MPI_Recv(rcv_work_batches[status.MPI_SOURCE], count, MPI_MY_FLOAT, status.MPI_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &status); MPI_Alltoallv(__snd_points, snd_count, snd_displ, MPI_MY_FLOAT,
count_rcv_work_batches[status.MPI_SOURCE] = count / (1 + ctx -> dims); __rcv_points, rcv_count, rcv_displ, MPI_MY_FLOAT, ctx -> mpi_communicator);
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_SOURCE, ctx -> mpi_communicator, &flag, &status); HERE;
}
MPI_DB_PRINT("\n");
MPI_Barrier(ctx -> mpi_communicator); int* count_rcv_work_batches = (int*)malloc(ctx -> world_size * sizeof(int));
//if(ctx -> mpi_rank == 1) float_t** rcv_work_batches = (float_t**)malloc(ctx -> world_size * sizeof(float_t*));
{
DB_PRINT("Rank %d has rcvd:\t",ctx -> mpi_rank);
for(int i = 0; i < ctx -> world_size; ++i) for(int i = 0; i < ctx -> world_size; ++i)
{ {
DB_PRINT("%d:%d\t", i,count_rcv_work_batches[i]); count_rcv_work_batches[i] = point_to_rcv_count[i];
} //rcv_work_batches[i] = NULL;
DB_PRINT("\n"); rcv_work_batches[i] = __rcv_points + rcv_displ[i];
} }
MPI_Barrier(ctx -> mpi_communicator);
// for(int i = 0; i < ctx -> world_size; ++i)
// {
// if(point_to_snd_count[i] > 0)
// {
// MPI_Request request;
// //MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request)
// MPI_Isend( data_to_send_per_proc[i],
// point_to_snd_count[i]*(1 + ctx -> dims) , /* 1 per max_dist + point*/
// MPI_MY_FLOAT, i, 0, ctx -> mpi_communicator, &request);
// }
// }
//
//
MPI_Status status;
int flag;
// int cc = 0;
// MPI_Barrier(ctx -> mpi_communicator);
// MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);
// while(flag)
// {
// cc++;
// MPI_Request request;
// int count;
// MPI_Get_count(&status, MPI_MY_FLOAT, &count);
// rcv_work_batches[status.MPI_SOURCE] = (float_t*)malloc(count * sizeof(float_t));
// //MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status)
//
// MPI_Recv(rcv_work_batches[status.MPI_SOURCE], count, MPI_MY_FLOAT, status.MPI_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &status);
// count_rcv_work_batches[status.MPI_SOURCE] = count / (1 + ctx -> dims);
// MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_SOURCE, ctx -> mpi_communicator, &flag, &status);
//
// }
/* prepare heap batches */ /* prepare heap batches */
...@@ -1421,10 +1450,11 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre ...@@ -1421,10 +1450,11 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
for(int p = 0; p < ctx -> world_size; ++p) for(int p = 0; p < ctx -> world_size; ++p)
{ {
if(count_rcv_work_batches[p] > 0) if(point_to_rcv_count[p] > 0)
//if(count_rcv_work_batches[p] > 0)
{ {
heap_batches_per_node[p] = (heap_node*)malloc(k * count_rcv_work_batches[p] * sizeof(heap_node)); heap_batches_per_node[p] = (heap_node*)malloc(k * count_rcv_work_batches[p] * sizeof(heap_node));
for(int batch = 0; batch < count_rcv_work_batches[p]; ++batch) for(int batch = 0; batch < point_to_rcv_count[p]; ++batch)
{ {
heap H; heap H;
H.count = 0; H.count = 0;
...@@ -1478,7 +1508,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre ...@@ -1478,7 +1508,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
for(int i = 0; i < ctx -> world_size; ++i) for(int i = 0; i < ctx -> world_size; ++i)
{ {
for(int b = 0; b < point_to_send_count[i]; ++b) for(int b = 0; b < point_to_snd_count[i]; ++b)
{ {
int idx = local_idx_of_the_point[i][b]; int idx = local_idx_of_the_point[i][b];
//MPI_DB_PRINT("%d %d %d\n",i,b,idx); //MPI_DB_PRINT("%d %d %d\n",i,b,idx);
...@@ -1523,7 +1553,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre ...@@ -1523,7 +1553,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
if(heap_batches_per_node[i]) free(heap_batches_per_node[i]); if(heap_batches_per_node[i]) free(heap_batches_per_node[i]);
if(data_to_send_per_proc[i]) free(data_to_send_per_proc[i]); if(data_to_send_per_proc[i]) free(data_to_send_per_proc[i]);
if(local_idx_of_the_point[i]) free(local_idx_of_the_point[i]); if(local_idx_of_the_point[i]) free(local_idx_of_the_point[i]);
if(rcv_work_batches[i]) free(rcv_work_batches[i]); //if(rcv_work_batches[i]) free(rcv_work_batches[i]);
if(rcv_heap_batches[i]) free(rcv_heap_batches[i]); if(rcv_heap_batches[i]) free(rcv_heap_batches[i]);
} }
...@@ -1531,9 +1561,14 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre ...@@ -1531,9 +1561,14 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
free(rcv_heap_batches); free(rcv_heap_batches);
free(rcv_work_batches); free(rcv_work_batches);
free(count_rcv_work_batches); free(count_rcv_work_batches);
free(point_to_rcv_count);
free(point_to_snd_count);
free(point_to_snd_capacity);
free(point_to_send_count); free(rcv_count);
free(point_to_send_capacity); free(snd_count);
free(rcv_displ);
free(snd_displ);
} }
void test_the_idea(global_context_t* ctx) void test_the_idea(global_context_t* ctx)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment