From 43b6d769d2e2a2251211f94a564529334b107ef6 Mon Sep 17 00:00:00 2001
From: lykos98 <francy273998@gmail.com>
Date: Thu, 24 Apr 2025 16:31:34 +0200
Subject: [PATCH] added experimental h1 optimization

---
 run_leo       |  17 +++-
 src/adp/adp.c | 275 +++++++++++++++++++++++++++++++++-----------------
 src/adp/adp.h |  15 +++
 3 files changed, 213 insertions(+), 94 deletions(-)

diff --git a/run_leo b/run_leo
index 1f74f83..db5382a 100644
--- a/run_leo
+++ b/run_leo
@@ -1,12 +1,12 @@
 #!/bin/bash
 
-#SBATCH --nodes=6
+#SBATCH --nodes=1
 #SBATCH --ntasks-per-node=2
 #SBATCH --cpus-per-task=56
 #SBATCH --time=04:00:00
 #SBATCH --job-name=dadp_test
 #SBATCH --partition=dcgp_usr_prod 
-#SBATCH --account=IscrC_dadp
+#SBATCH --account=EUHPC_D18_045
 #SBATCH --output=out_leo
 #SBATCH --error=err_leo
 #SBATCH --mem=480G
@@ -14,8 +14,10 @@
 
 
 cd $SLURM_SUBMIT_DIR
-module restore my_gcc
-#module restore my_intel
+
+module load gcc
+module load openmpi
+
 make clean
 make
 ulimit -s unlimited
@@ -36,5 +38,12 @@ IN_DATA=/leonardo_work/IscrC_dadp
 #10^6 points 
 time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  ./main -t f32 -i ${IN_DATA}/norm_data/std_LR_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}
 
+#34 * 10^6 points
+#time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  ./main -t f32 -i ${IN_DATA}/norm_data/std_g1212639_091_0001 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}
+
+#88 * 10^6 points 
+#time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  ./main -t f32 -i ${IN_DATA}/norm_data/std_g5503149_091_0000 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}
+
+
 #200 * 10^6 points
 #time mpirun -n ${SLURM_NTASKS} --map-by ppr:1:socket:PE=${SLURM_CPUS_PER_TASK}  ./main -t f32 -i ${IN_DATA}/norm_data/std_g2980844_091_0000 -d 5 -a ${OUT_ASSIGNMENT} -o ${OUT_DATA}
diff --git a/src/adp/adp.c b/src/adp/adp.c
index e796b8d..700c43c 100644
--- a/src/adp/adp.c
+++ b/src/adp/adp.c
@@ -118,8 +118,8 @@ idx_t get_j_ksel_idx(global_context_t* ctx, idx_t j, idx_t ksel, MPI_Win exposed
 }
 
 
-void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int verbose){
-
+void compute_density_kstarnn_rma_v2(global_context_t* ctx, const float_t d, int verbose)
+{
     /*
      * Point density computation:                       
      * args:                                            
@@ -648,12 +648,32 @@ lock_t h1_lock_free(global_context_t* ctx, MPI_Win lock_window, int owner, idx_t
     
 }
 
+void enqueue_center_removal(center_removal_queue_t* queue, 
+                            center_removal_t element)
+{
+    int default_incr_size = 100;
+    if(queue -> count >= queue -> size)
+    {
+        queue -> size += default_incr_size;
+        queue -> data = realloc(queue -> data, queue -> size * sizeof(center_removal_t));
+    }
+    queue -> data[queue -> count] = element;
+    queue -> count++;
+}
+
+int compare_removal_by_target(const void* a, const void* b)
+{
+    idx_t arg1 = (*(const center_removal_t *)a).target_id;
+    idx_t arg2 = (*(const center_removal_t *)b).target_id;
+    return (arg1 > arg2) - (arg1 < arg2);
+}
+
 clusters_t Heuristic1(global_context_t *ctx)
 {
-    /*
+    /**
      * Heurisitc 1, from paper of Errico, Facco, Laio & Rodriguez 
      * ( https://doi.org/10.1016/j.ins.2021.01.010 )              
-     */
+     **/
 
     datapoint_info_t* dp_info = ctx -> local_datapoints;
     idx_t n = ctx -> local_n_points; 
@@ -721,36 +741,38 @@ clusters_t Heuristic1(global_context_t *ctx)
 	 * Generate a mask that keeps track of the point has been eliminating the 
 	 * point considered. Each thread updates this mask, then after the procedure
 	 * ends, center, removed centers, and max_rho arrays are populated
+     *
+     * optimized v2 use a queue of center removal and then exchange them
 	 */
 		
-    lock_t*    lock_array     = (lock_t*)MY_MALLOC(n * sizeof(lock_t));
 	heap_node* to_remove_mask = (heap_node*)MY_MALLOC(n*sizeof(heap_node));
 
     for(idx_t p = 0; p < n; ++p) 
     {
         to_remove_mask[p].array_idx = MY_SIZE_MAX;
         to_remove_mask[p].value = 9999999;
-        lock_array[p] = LOCK_FREE;
     }
     qsort(dp_info_ptrs, n, sizeof(datapoint_info_t*), cmpPP);
 
+    /**
+     * workflow
+     * find the elements we need to eliminate
+     * compact them to a single array
+     * send/recieve 
+     **/
 
-    MPI_Win win_to_remove_mask;
-    MPI_Win_create(to_remove_mask, n * sizeof(heap_node), 1, MPI_INFO_NULL, ctx -> mpi_communicator, &win_to_remove_mask);
-    MPI_Win_fence(0, win_to_remove_mask);
-
-    MPI_Win win_locks;
-    MPI_Win_create(lock_array, n * sizeof(lock_t), sizeof(lock_t), MPI_INFO_NULL, ctx -> mpi_communicator, &win_locks);
-    MPI_Win_fence(0, win_locks);
+    center_removal_queue_t* removal_queues = (center_removal_queue_t*)MY_MALLOC(n * sizeof(center_removal_queue_t));
+    omp_lock_t* lock_array = (omp_lock_t*)MY_MALLOC(n * sizeof(omp_lock_t));
 
-#ifdef EXP_H1
-    MPI_Win_lock_all(0, win_to_remove_mask);
-    MPI_Win_lock_all(0, win_locks);
-#endif
-
-#ifdef EXP_H1
-    printf("Using experimental h1\n");
-#endif
+    //zero out all queues
+    
+    for(idx_t i = 0; i < n; ++i)
+    {
+        removal_queues[i].count  = 0;
+        removal_queues[i].size   = 0;
+        removal_queues[i].data   = NULL;
+        omp_init_lock(lock_array + i);
+    }
 
 #if !defined(THREAD_FUNNELED)
     #pragma omp parallel for schedule(dynamic)
@@ -767,82 +789,140 @@ clusters_t Heuristic1(global_context_t *ctx)
 
             if(j_point.is_center && i_point.g > j_point.g)
             {
-                /*
-                 *
-                 * TODO: Implement it without this but using private locks
-                 * use an array of locks, and compare and swap to actually gain control of the thing
-                 *
-                 * */
-
-#ifdef EXP_H1
-                #pragma omp critical (h1_exp)
+                // set the lock at position i 
+                // actually is the p-th point
+                int owner = foreign_owner(ctx, jidx);
+                //if local process it
+                if(owner == ctx -> mpi_rank)
                 {
-                    int owner = foreign_owner(ctx, jidx);
-                    idx_t jpos = jidx - ctx -> rank_idx_start[owner];
+                    idx_t jpos = jidx - ctx -> idx_start;
+                    if(i_point.g > to_remove_mask[jpos].value)
+                    {
+                        omp_set_lock(lock_array + jpos);
+                        to_remove_mask[jpos].array_idx = i_point.array_idx;
+                        to_remove_mask[jpos].value     = i_point.g;
+                        omp_unset_lock(lock_array + jpos);
+                    }
+                    
+                }
+                //otherwise enqueue for sending
+                else
+                {
+                    center_removal_t element = {.rank = owner, .source_id = i_point.array_idx, 
+                        .target_id = j_point.array_idx, .source_density = i_point.g};
+                    enqueue_center_removal(removal_queues + p, element);
+                }
+            }
+        }
+    }
 
-                    lock_t state = LOCK_FREE;
+    //assemble arrays into a single buffer
+    
+    idx_t tot_removal = 0;
+    for(idx_t p = 0; p < n; ++p)
+    {
+        tot_removal += removal_queues[p].count;
+    }
+    
+    idx_t buffer_idx = 0;
+    center_removal_t* removal_buffer = (center_removal_t*)MY_MALLOC(tot_removal * sizeof(center_removal_t));
 
-                    state = h1_lock_acquire(ctx, win_locks, owner, jpos, state);
+    for(idx_t p = 0; p < n; ++p)
+    {
+        if(removal_queues[p].count > 0)        
+        {
+            memcpy(removal_buffer + buffer_idx, removal_queues[p].data, removal_queues[p].count * sizeof(center_removal_t));
+            buffer_idx += removal_queues[p].count;
+        }
+    }
 
-                    heap_node mask_element;
-                    MPI_Request request;
+    //sort by array idx (it sorts also by rank)
+    
+    qsort(removal_buffer, tot_removal, sizeof(center_removal_t), compare_removal_by_target);
 
-                    MPI_Rget(&mask_element, sizeof(heap_node), MPI_BYTE, 
-                            owner, jpos * sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request);
-                    MPI_Wait(&request, MPI_STATUS_IGNORE);
+    //prepare for the sendrcv
+    
+    int* recv_counts = (int*)MY_MALLOC(ctx -> world_size * sizeof(int));
+    int* send_counts = (int*)MY_MALLOC(ctx -> world_size * sizeof(int));
 
-                    int flag = mask_element.array_idx == MY_SIZE_MAX;							
-                    if(flag || i_point.g > mask_element.value )
-                    {
-                        heap_node tmp_mask_element = {.array_idx = i_point.array_idx, .value = i_point.g};
-                        MPI_Request request;
-                        MPI_Rput(&tmp_mask_element, sizeof(heap_node), MPI_BYTE, owner, 
-                                jpos*sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request);
-                        MPI_Wait(&request, MPI_STATUS_IGNORE);
+    int* recv_displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int));
+    int* send_displs = (int*)MY_MALLOC(ctx -> world_size * sizeof(int));
 
-                    }
+    //zero_out_all
 
-                    state = h1_lock_free(ctx, win_locks, owner, jpos, state);
-                }
-#else
-                #pragma omp critical (centers_elimination)                 
-                {
-                    int owner = foreign_owner(ctx, jidx);
-                    idx_t jpos = jidx - ctx -> rank_idx_start[owner];
-
-                    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, owner, 0, win_to_remove_mask);
-                    heap_node mask_element;
-                    MPI_Request request;
-                    MPI_Rget(&mask_element, sizeof(heap_node), MPI_BYTE, 
-                             owner, jpos * sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request);
-                    MPI_Wait(&request, MPI_STATUS_IGNORE);
-
-                    int flag = mask_element.array_idx == MY_SIZE_MAX;							
-                    if(flag || i_point.g > mask_element.value )
-                    {
-                        heap_node tmp_mask_element = {.array_idx = i_point.array_idx, .value = i_point.g};
-                        MPI_Request request;
-                        MPI_Rput(&tmp_mask_element, sizeof(heap_node), MPI_BYTE, owner, 
-                                jpos*sizeof(heap_node), sizeof(heap_node), MPI_BYTE, win_to_remove_mask, &request);
-                        MPI_Wait(&request, MPI_STATUS_IGNORE);
+    for(int i = 0; i < ctx -> mpi_rank; ++i)
+    {
+        recv_counts[i] = 0;
+        send_counts[i] = 0;
+        recv_displs[i] = 0;
+        send_displs[i] = 0;
+    }
 
-                    }
+    for(idx_t i = 0; i < tot_removal; ++i)
+    {
+        int rank_idx = removal_buffer[i].rank;
+        send_counts[rank_idx]++;
+    }
 
-                    MPI_Win_unlock(owner, win_to_remove_mask);
-                }
-#endif
-            }
-        }
+    // all to all to recieve counts
+
+    MPI_Alltoall(send_counts, 1, MPI_INT, 
+                 recv_counts, 1, MPI_INT, ctx -> mpi_communicator);
+
+    // compute displacements
+
+    for(int i = 1; i < ctx -> world_size; ++i)
+    {
+        recv_displs[i] = recv_displs[i - 1] + recv_counts[i - 1];
+        send_displs[i] = send_displs[i - 1] + send_counts[i - 1];
     }
 
-#ifdef EXP_H1
-    MPI_Win_unlock_all(win_to_remove_mask);
-    MPI_Win_unlock_all(win_locks);
-#endif
+    idx_t tot_recv_counts = 0;
+
+    // count how many elements to recieve
+
+    for(int i = 0; i < ctx -> world_size; ++i) tot_recv_counts += recv_counts[i];
+    if(ctx -> mpi_rank == 0){
+        for(int i = 0; i < ctx -> world_size; ++i){
+            DB_PRINT("%d mpi rank recv_count %d to %d\n", ctx -> mpi_rank, recv_counts[i], i);
+            DB_PRINT("%d mpi rank send_count %d to %d\n", ctx -> mpi_rank, send_counts[i], i);
+        } 
+    }
+    DB_PRINT("rank %d: %lu recv counts\n", ctx -> mpi_rank, tot_recv_counts);
+
+    // change dimensions to bytes
+
+    for(int i = 0; i < ctx -> world_size; ++i)
+    {
+        recv_counts[i] = recv_counts[i] * sizeof(center_removal_t);
+        send_counts[i] = send_counts[i] * sizeof(center_removal_t);
+        recv_displs[i] = recv_displs[i] * sizeof(center_removal_t);
+        send_displs[i] = send_displs[i] * sizeof(center_removal_t);
+    }
+
+    //allocate buffer to recieve center elminiations
+
+    center_removal_t* recv_removals = (center_removal_t*)MY_MALLOC(tot_recv_counts * sizeof(center_removal_t));
+
+    // all to all
+    MPI_Alltoallv(removal_buffer, send_counts, send_displs, MPI_BYTE, 
+                   recv_removals, recv_counts, recv_displs, MPI_BYTE, ctx -> mpi_communicator);
+
+    // merge into the mask
     
-    MPI_Win_fence(0, win_to_remove_mask);
-    MPI_Win_fence(0, win_locks);
-    MPI_Barrier(ctx -> mpi_communicator);
+    #pragma omp parallel for
+    for(idx_t i = 0; i < tot_recv_counts; ++i)
+    {
+        idx_t el_pos = recv_removals[i].target_id - ctx -> idx_start;
+        if(recv_removals[i].source_density > to_remove_mask[el_pos].value)
+        {
+            omp_set_lock(lock_array + el_pos);
+            to_remove_mask[el_pos].array_idx = recv_removals[i].source_id;
+            to_remove_mask[el_pos].value     = recv_removals[i].source_density;
+            omp_unset_lock(lock_array + el_pos);
+        }
+    }
+
 
 	/* populate the usual arrays */
     for(idx_t p = 0; p < all_centers.count; ++p)
@@ -888,11 +968,24 @@ clusters_t Heuristic1(global_context_t *ctx)
         }
     }
 
-    MPI_Win_free(&win_to_remove_mask);
 	free(to_remove_mask);
-    
-    MPI_Win_free(&win_locks);
-	free(lock_array);
+
+    for(idx_t i = 0; i < n; ++i)
+    {
+        removal_queues[i].count  = 0;
+        removal_queues[i].size   = 0;
+        removal_queues[i].data   = NULL;
+        omp_destroy_lock(lock_array+ i);
+    }
+
+    free(removal_queues);
+    free(removal_buffer);
+    free(send_counts);
+    free(send_displs);
+    free(recv_counts);
+    free(recv_displs);
+    free(lock_array);
+    free(recv_removals);
 
     int n_centers = (int)actual_centers.count;
     int tot_centers;
@@ -900,9 +993,11 @@ clusters_t Heuristic1(global_context_t *ctx)
 
     MPI_DB_PRINT("Found %d temporary centers\n", tot_centers);
 
-    /* bring on master all centers 
+    /** 
+     * bring on master all centers 
      * order them in ascending order of density, 
-     * then re-scatter them around to get unique cluster labels */ 
+     * then re-scatter them around to get unique cluster labels 
+     **/ 
 
     center_t* private_centers_buffer = (center_t*)MY_MALLOC(actual_centers.count * sizeof(center_t));
     center_t* global_centers_buffer  = (center_t*)MY_MALLOC(tot_centers * sizeof(center_t));
diff --git a/src/adp/adp.h b/src/adp/adp.h
index 4db27b2..e06f0bf 100644
--- a/src/adp/adp.h
+++ b/src/adp/adp.h
@@ -45,6 +45,21 @@ typedef struct merge_t
     float_t density;
 } merge_t;
 
+typedef struct center_removal_t
+{
+    int     rank;
+    idx_t   source_id;
+    idx_t   target_id;
+    float_t source_density;
+} center_removal_t;
+
+typedef struct center_removal_queue_t
+{
+    center_removal_t* data;
+    idx_t count;
+    idx_t size;
+} center_removal_queue_t;
+
 
 
 void compute_density_kstarnn_rma(global_context_t* ctx, const float_t d, int verbose);
-- 
GitLab