Skip to content
Snippets Groups Projects
Commit 1f365d60 authored by lykos98's avatar lykos98
Browse files

fixed bug on big messages

parent 6ff072da
No related branches found
No related tags found
No related merge requests found
......@@ -284,7 +284,7 @@ void knn_sub_tree_search_kdtree_v2(FLOAT_TYPE* point, kdnode_v2* root, heap * H)
for(size_t i = 0; i < root -> node_list.count; ++i)
{
kdnode_v2* n = root -> node_list.data[i];
__builtin_prefetch(root -> node_list.data + i + 1, 0, 3);
//__builtin_prefetch(root -> node_list.data + i + 1, 0, 3);
FLOAT_TYPE distance = eud_kdtree_v2(point, n -> data);
insert_max_heap(H, distance,n -> array_idx);
}
......
......@@ -1194,17 +1194,22 @@ void tree_walk(
int owner = root -> owner;
int idx = point_to_send_count[owner];
int capacity = point_to_send_capacity[owner];
int len = 1 + ctx -> dims;
//int len = 1 + ctx -> dims;
int len = ctx -> dims;
if(idx == capacity)
{
data_to_send_per_proc[owner] = realloc(data_to_send_per_proc[owner], (capacity * 1.1) * (1 + ctx -> dims) * sizeof(float_t));
//data_to_send_per_proc[owner] = realloc(data_to_send_per_proc[owner], (capacity * 1.1) * (1 + ctx -> dims) * sizeof(float_t));
data_to_send_per_proc[owner] = realloc(data_to_send_per_proc[owner], (capacity * 1.1) * (ctx -> dims) * sizeof(float_t));
local_idx_of_the_point[owner] = realloc(local_idx_of_the_point[owner], (capacity * 1.1) * sizeof(int));
point_to_send_capacity[owner] = capacity * 1.1;
}
float_t* base = data_to_send_per_proc[owner] + (len * idx);
/*
base[0] = max_dist;
memcpy(base + 1, point, ctx -> dims * sizeof(float_t));
*/
memcpy(base, point, ctx -> dims * sizeof(float_t));
local_idx_of_the_point[owner][idx] = point_idx;
point_to_send_count[owner]++;
......@@ -1356,7 +1361,8 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
for(int i = 0; i < ctx -> world_size; ++i)
{
data_to_send_per_proc[i] = (float_t*)malloc(100 * (1 + ctx -> dims) * sizeof(float_t));
//data_to_send_per_proc[i] = (float_t*)malloc(100 * (1 + ctx -> dims) * sizeof(float_t));
data_to_send_per_proc[i] = (float_t*)malloc(100 * (ctx -> dims) * sizeof(float_t));
local_idx_of_the_point[i] = (int*)malloc(100 * sizeof(int));
point_to_snd_capacity[i] = 100;
point_to_snd_count[i] = 0;
......@@ -1399,8 +1405,14 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
/*compute counts and displs*/
rcv_displ[0] = 0;
snd_displ[0] = 0;
/*
rcv_count[0] = point_to_rcv_count[0] * (1 + ctx -> dims);
snd_count[0] = point_to_snd_count[0] * (1 + ctx -> dims);
*/
rcv_count[0] = point_to_rcv_count[0] * (ctx -> dims);
snd_count[0] = point_to_snd_count[0] * (ctx -> dims);
int tot_points_rcv = point_to_rcv_count[0];
int tot_points_snd = point_to_snd_count[0];
......@@ -1408,8 +1420,12 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
for(int i = 1; i < ctx -> world_size; ++i)
{
/*
rcv_count[i] = point_to_rcv_count[i] * (1 + ctx -> dims);
snd_count[i] = point_to_snd_count[i] * (1 + ctx -> dims);
*/
rcv_count[i] = point_to_rcv_count[i] * (ctx -> dims);
snd_count[i] = point_to_snd_count[i] * (ctx -> dims);
tot_count += rcv_count[i];
tot_points_rcv += point_to_rcv_count[i];
......@@ -1419,8 +1435,12 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
snd_displ[i] = snd_displ[i - 1] + snd_count[i - 1];
}
/*
float_t* __rcv_points = (float_t*)malloc(tot_points_rcv * (1 + ctx -> dims) * sizeof(float_t));
float_t* __snd_points = (float_t*)malloc(tot_points_snd * (1 + ctx -> dims) * sizeof(float_t));
*/
float_t* __rcv_points = (float_t*)malloc(tot_points_rcv * (ctx -> dims) * sizeof(float_t));
float_t* __snd_points = (float_t*)malloc(tot_points_snd * (ctx -> dims) * sizeof(float_t));
......@@ -1445,11 +1465,13 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
int flag;
/* prepare heap batches */
int work_batch_stride = 1 + ctx -> dims;
//int work_batch_stride = 1 + ctx -> dims;
int work_batch_stride = ctx -> dims;
heap_node* __heap_batches_to_snd = (heap_node*)malloc((uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node));
heap_node* __heap_batches_to_rcv = (heap_node*)malloc((uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node));
/*
for(int i = 0; i < ctx -> world_size; ++i)
{
......@@ -1463,14 +1485,17 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
}
*/
if( __heap_batches_to_rcv == NULL)
{
DB_PRINT("Rank %d failed to allocate rcv_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)tot_points_rcv * sizeof(heap_node));
exit(5);
}
if( __heap_batches_to_snd == NULL)
{
DB_PRINT("Rank %d failed to allocate snd_heaps %luB required\n",ctx -> mpi_rank, (uint64_t)k * (uint64_t)tot_points_snd * sizeof(heap_node));
exit(5);
}
MPI_Barrier(ctx -> mpi_communicator);
......@@ -1492,10 +1517,11 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
snd_displ[i] = snd_displ[i - 1] + snd_count[i - 1];
}
heap_node** heap_batches_per_node = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*));
for(int p = 0; p < ctx -> world_size; ++p)
{
heap_batches_per_node[p] = __heap_batches_to_snd + rcv_displ[p] * k;
heap_batches_per_node[p] = __heap_batches_to_snd + (uint64_t)rcv_displ[p] * (uint64_t)k;
}
TIME_STOP
......@@ -1503,10 +1529,24 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
MPI_DB_PRINT("[MASTER] Working on recieved points\n");
MPI_Barrier(ctx -> mpi_communicator);
/*
for(int i = 0; i < ctx -> world_size; ++i)
{
if(i == ctx -> mpi_rank)
{
DB_PRINT("[RANK %d]\t", ctx -> mpi_rank);
for(int i = 0; i < ctx -> world_size; ++i)
//DB_PRINT("%d\t",point_to_rcv_count[i]);
DB_PRINT("%d\t",point_to_rcv_count[i]);
DB_PRINT("\n");
}
}
*/
TIME_START
for(int p = 0; p < ctx -> world_size; ++p)
{
if(point_to_rcv_count[p] > 0)
if(point_to_rcv_count[p] > 0 && p != ctx -> mpi_rank)
//if(count_rcv_work_batches[p] > 0)
{
//heap_batches_per_node[p] = (heap_node*)malloc(k * point_to_rcv_count[p] * sizeof(heap_node));
......@@ -1516,8 +1556,10 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
heap H;
H.count = 0;
H.N = k;
H.data = heap_batches_per_node[p] + k * batch;
float_t* point = rcv_work_batches[p] + batch * work_batch_stride + 1;
H.data = heap_batches_per_node[p] + (uint64_t)k * (uint64_t)batch;
init_heap(&H);
//float_t* point = rcv_work_batches[p] + batch * work_batch_stride + 1;
float_t* point = rcv_work_batches[p] + (uint64_t)batch * (uint64_t)work_batch_stride;
knn_sub_tree_search_kdtree_v2(point, local_tree -> root, &H);
convert_heap_idx_to_global(ctx, &H);
}
......@@ -1529,24 +1571,13 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
* counts are inverted since I have to recieve as many batches as points I
* Have originally sended
*/
MPI_Barrier(ctx -> mpi_communicator);
TIME_STOP
MPI_DB_PRINT("[MASTER] Sending out results\n");
MPI_Barrier(ctx -> mpi_communicator);
TIME_START
/*
for(int i = 0; i < ctx -> world_size; ++i)
{
if(i == ctx -> mpi_rank)
{
DB_PRINT("[RANK %d]\t", ctx -> mpi_rank);
for(int i = 0; i < ctx -> world_size; ++i)
//DB_PRINT("%d\t",point_to_rcv_count[i]);
DB_PRINT("%d\t",point_to_rcv_count[i]);
DB_PRINT("\n");
}
}
*/
MPI_Datatype MPI_my_heap;
......@@ -1565,6 +1596,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
*/
//MPI_Type_free(&MPI_my_heap);
heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*));
for(int i = 0; i < ctx -> world_size; ++i)
{
......@@ -1633,20 +1665,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
* HERE IT BREAKS
* ------------------------------------- */
MPI_Barrier(ctx -> mpi_communicator);
/*
heap_node** rcv_heap_batches = (heap_node**)malloc(ctx -> world_size * sizeof(heap_node*));
for(int i = 0; i < ctx -> world_size; ++i)
{
//rcv_heap_batches[i] = NULL;
//rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] / sizeof(heap_node);
rcv_heap_batches[i] = __heap_batches_to_rcv + snd_displ[i] * k;
}
*/
uint32_t n_points_threshold = 4294967296 / (k * sizeof(heap_node));
int default_msg_len = n_points_threshold / 2;
......@@ -1704,6 +1723,7 @@ void mpi_ngbh_search(global_context_t* ctx, datapoint_info_t* dp_info, top_kdtre
/* merge old with new heaps */
MPI_DB_PRINT("[MASTER] Merging resutls\n");
MPI_Barrier(ctx -> mpi_communicator);
TIME_START
for(int i = 0; i < ctx -> world_size; ++i)
......@@ -1891,13 +1911,17 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
//ctx->dims = 2;
//data = read_data_file(ctx, "../norm_data/50_blobs.npy", MY_TRUE);
// std_g0163178_Me14_091_0000
//
// 190M points
// std_g2980844_091_0000
data = read_data_file(ctx,"../norm_data/std_g2980844_091_0000",MY_TRUE);
/* 10^6 points ca.*/
//data = read_data_file(ctx,"../norm_data/std_LR_091_0001",MY_TRUE);
/* 10^7 ~ 8M points */
data = read_data_file(ctx,"../norm_data/std_g0144846_Me14_091_0001",MY_TRUE);
//data = read_data_file(ctx,"../norm_data/std_g0144846_Me14_091_0001",MY_TRUE);
//88M BREAKS
//data = read_data_file(ctx,"../norm_data/std_g5503149_091_0000",MY_TRUE);
......@@ -1985,10 +2009,13 @@ void simulate_master_read_and_scatter(int dims, size_t n, global_context_t *ctx)
kdtree_v2 local_tree;
kdtree_v2_init( &local_tree, ctx -> local_data, ctx -> local_n_points, (unsigned int)ctx -> dims);
int k = 400;
int k = 300;
datapoint_info_t* dp_info = (datapoint_info_t*)malloc(ctx -> local_n_points * sizeof(datapoint_info_t));
TIME_START
build_local_tree(ctx, &local_tree);
TIME_STOP
mpi_ngbh_search(ctx, dp_info, &tree, &local_tree, ctx -> local_data, k);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment