Skip to content
Snippets Groups Projects
Commit 26218377 authored by lykos98's avatar lykos98
Browse files

Working tree

parent 7e299285
No related branches found
No related tags found
No related merge requests found
......@@ -18,8 +18,8 @@
#include <string.h>
#include <omp.h>
//#define WRITE_NGBH
//#define WRITE_TOP_NODES
#define WRITE_NGBH
#define WRITE_TOP_NODES
#ifdef USE_FLOAT32
#define MPI_MY_FLOAT MPI_FLOAT
......@@ -1380,6 +1380,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
float_t* __rcv_points = (float_t*)malloc(tot_points_rcv * (1 + ctx -> dims) * sizeof(float_t));
float_t* __snd_points = (float_t*)malloc(tot_points_snd * (1 + ctx -> dims) * sizeof(float_t));
/* copy data to send in contiguous memory */
for(int i = 0; i < ctx -> world_size; ++i)
{
......@@ -1406,10 +1407,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
heap_node* __heap_batches_to_snd = (heap_node*)malloc(k * tot_points_rcv * sizeof(heap_node));
heap_node* __heap_batches_to_rcv = (heap_node*)malloc(k * tot_points_snd * sizeof(heap_node));
/*
* need sizes in bytes
*/
rcv_displ[0] = 0;
snd_displ[0] = 0;
/*
......@@ -1471,14 +1468,24 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
* Have originally sended
*/
MPI_DB_PRINT("[MASTER] Sending out results\n");
for(int i = 0; i < ctx -> world_size; ++i)
{
if(i == ctx -> mpi_rank)
{
DB_PRINT("[RANK %d]\t", ctx -> mpi_rank);
for(int i = 0; i < ctx -> world_size; ++i)
//DB_PRINT("%d\t",point_to_rcv_count[i]);
DB_PRINT("%d\t",point_to_rcv_count[i]);
DB_PRINT("\n");
}
}
MPI_Datatype MPI_my_heap;
MPI_Type_contiguous(k * sizeof(heap_node), MPI_CHAR, &MPI_my_heap);
MPI_Barrier(ctx -> mpi_communicator);
MPI_Type_commit(&MPI_my_heap);
/*
//MPI_Alltoallv(__heap_batches_to_snd, rcv_count, rcv_displ, MPI_CHAR,
// __heap_batches_to_rcv, snd_count, snd_displ, MPI_CHAR, ctx -> mpi_communicator );
......@@ -1487,8 +1494,9 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
MPI_Alltoallv(__heap_batches_to_snd, rcv_count, rcv_displ, MPI_my_heap,
__heap_batches_to_rcv, snd_count, snd_displ, MPI_my_heap, ctx -> mpi_communicator );
*/
MPI_Type_free(&MPI_my_heap);
//MPI_Type_free(&MPI_my_heap);
heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*));
for(int i = 0; i < ctx -> world_size; ++i)
{
......@@ -1497,21 +1505,71 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k;
}
/*
* DIFFERENT APPROACH
*/
/*
* send out heaps
* and rcv counterparts
uint32_t n_points_threshold = 4294967296 / (k * sizeof(heap_node));
int default_msg_len = n_points_threshold / 2;
int* already_sent_points = (int*)malloc(ctx -> world_size * sizeof(int));
int* already_rcvd_points = (int*)malloc(ctx -> world_size * sizeof(int));
for(int i = 0; i < ctx -> world_size; ++i)
{
if(i == ctx -> mpi_rank)
{
for(int j = 0; j < ctx -> world_size; ++j)
{
int count = 0;
while(count < point_to_snd_count[i])
{
int count;
MPI_Status status;
MPI_Probe(j, MPI_ANY_TAG, ctx -> mpi_communicator, &status);
MPI_Get_count(&status, MPI_my_heap, &count);
MPI_Recv(rcv_heap_batches[j] + k * already_rcvd_points[j],
count, MPI_my_heap, j, MPI_ANY_TAG, ctx -> mpi_communicator, &status);
already_rcvd_points[j] += count;
//MPI_Recv(rcv_heap_batches[j], point_to_snd_count[j], MPI_my_heap, j, MPI_ANY_TAG, ctx -> mpi_communicator, &status);
}
}
}
else
{
int count = 0;
if(point_to_rcv_count[i] > 0)
{
while(already_sent_points[i] < point_to_rcv_count[i])
{
MPI_Request request;
//MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request)
count = MIN(default_msg_len, point_to_rcv_count[i] - already_sent_points[i] );
MPI_Send(heap_batches_per_node[i] + k * already_sent_points[i], count, MPI_my_heap, i, 0, ctx -> mpi_communicator);
already_sent_points[i] += count;
}
}
}
}
*/
/* -------------------------------------
* ALTERNATIVE TO ALL TO ALL FOR BIG MSG
* HERE IT BREAKS
* ------------------------------------- */
/*
MPI_Barrier(ctx -> mpi_communicator);
/*
heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*));
for(int i = 0; i < ctx -> world_size; ++i)
{
......@@ -1519,6 +1577,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
//rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] / sizeof(heap_node);
rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k;
}
*/
uint32_t n_points_threshold = 4294967296 / (k * sizeof(heap_node));
int default_msg_len = n_points_threshold / 2;
......@@ -1526,17 +1585,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
int* already_sent_points = (int*)malloc(ctx -> world_size * sizeof(int));
int* already_rcvd_points = (int*)malloc(ctx -> world_size * sizeof(int));
for(int i = 0; i < ctx -> world_size; ++i)
{
if(i == ctx -> mpi_rank)
{
DB_PRINT("[RANK %d]\t", ctx -> mpi_rank);
for(int i = 0; i < ctx -> world_size; ++i)
//DB_PRINT("%d\t",point_to_rcv_count[i]);
DB_PRINT("%d\t",point_to_rcv_count[i]/default_msg_len);
DB_PRINT("\n");
}
}
for(int i = 0; i < ctx -> world_size; ++i)
{
......@@ -1549,7 +1597,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
int count = 0;
if(point_to_rcv_count[i] > 0)
{
while(count < point_to_rcv_count[i])
while(already_sent_points[i] < point_to_rcv_count[i])
{
MPI_Request request;
//MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request)
......@@ -1561,8 +1609,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
}
}
MPI_Barrier(ctx -> mpi_communicator);
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ctx -> mpi_communicator, &flag, &status);
//DB_PRINT("%d %p %p\n",ctx -> mpi_rank, &flag, &status);
......@@ -1584,10 +1630,6 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
free(already_sent_points);
free(already_rcvd_points);
*/
/* merge old with new heaps */
......@@ -1777,12 +1819,18 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
//ctx->dims = 2;
//data = read_data_file(ctx, "../norm_data/50_blobs.npy", MY_TRUE);
// std_g0163178_Me14_091_0000
data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE);
/* 10^6 points ca.*/
//data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE);
/* 10^7 ~ 8M points */
//data = read_data_file(ctx,"../norm_data/std_g0163178_Me14_091_0001",MY_TRUE);
data = read_data_file(ctx,"../norm_data/std_g5503149_091_0001",MY_TRUE);
ctx->dims = 5;
// ctx -> n_points = 48*5*2000;
ctx->n_points = ctx->n_points / ctx->dims;
ctx->n_points = ctx->n_points / 2;
//ctx -> n_points = ctx -> world_size * 1000;
mpi_printf(ctx, "Read %lu points in %u dims\n", ctx->n_points, ctx->dims);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment