From 3752471cf6d098c9127d6a0f99319025d5f491f3 Mon Sep 17 00:00:00 2001 From: Giovanni La Mura <giovanni.lamura@inaf.it> Date: Sun, 12 May 2024 21:34:08 +0200 Subject: [PATCH] Rewind ASCII files after getting their size and skip EOF character transmission --- src/cluster/cluster.cpp | 80 ++++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/src/cluster/cluster.cpp b/src/cluster/cluster.cpp index a24d9503..7193c54e 100644 --- a/src/cluster/cluster.cpp +++ b/src/cluster/cluster.cpp @@ -13,7 +13,7 @@ limitations under the License. */ -/*! \file cluster.cpp +/*! \file cluster.cp * * \brief Implementation of the calculation for a cluster of spheres. */ @@ -82,6 +82,7 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf void cluster(const string& config_file, const string& data_file, const string& output_path, const mixMPI *mpidata) { chrono::time_point<chrono::high_resolution_clock> t_start = chrono::high_resolution_clock::now(); chrono::duration<double> elapsed; + string message; string timing_name = output_path + "/c_timing_mpi"+ to_string(mpidata->rank) +".log"; FILE *timing_file = fopen(timing_name.c_str(), "w"); Logger *time_logger = new Logger(LOG_DEBG, timing_file); @@ -271,39 +272,34 @@ void cluster(const string& config_file, const string& data_file, const string& o #pragma omp barrier { // thread 0 already wrote on global files, skip it and take care of appending the others - for (int ri = 0; ri < ompnumthreads; ri++) { - // still, we need to remove all c_OCLU_RANK_0 files + for (int ri = 1; ri < ompnumthreads; ri++) { string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri); - if (ri == 0) { - remove(partial_file_name.c_str()); - } else { - string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; - logger->log(message, LOG_DEBG); - FILE *partial_output = fopen(partial_file_name.c_str(), "r"); - int c = fgetc(partial_output); - while (c != EOF) { - fputc(c, output); - c = fgetc(partial_output); - } - fclose(partial_output); - remove(partial_file_name.c_str()); - logger->log("done.\n", LOG_DEBG); - partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri); - message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; - logger->log(message, LOG_DEBG); - fstream partial_tppoan; - partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary); - partial_tppoan.seekg(0, ios::end); - long buffer_size = partial_tppoan.tellg(); - char *binary_buffer = new char[buffer_size]; - partial_tppoan.seekg(0, ios::beg); - partial_tppoan.read(binary_buffer, buffer_size); - tppoan.write(binary_buffer, buffer_size); - partial_tppoan.close(); - delete[] binary_buffer; - remove(partial_file_name.c_str()); - logger->log("done.\n", LOG_DEBG); + string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; + logger->log(message, LOG_DEBG); + FILE *partial_output = fopen(partial_file_name.c_str(), "r"); + int c = fgetc(partial_output); + while (c != EOF) { + fputc(c, output); + c = fgetc(partial_output); } + fclose(partial_output); + remove(partial_file_name.c_str()); + logger->log("done.\n", LOG_DEBG); + partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri); + message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; + logger->log(message, LOG_DEBG); + fstream partial_tppoan; + partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary); + partial_tppoan.seekg(0, ios::end); + long buffer_size = partial_tppoan.tellg(); + char *binary_buffer = new char[buffer_size]; + partial_tppoan.seekg(0, ios::beg); + partial_tppoan.read(binary_buffer, buffer_size); + tppoan.write(binary_buffer, buffer_size); + partial_tppoan.close(); + delete[] binary_buffer; + remove(partial_file_name.c_str()); + logger->log("done.\n", LOG_DEBG); } } #endif @@ -311,12 +307,13 @@ void cluster(const string& config_file, const string& data_file, const string& o #ifdef MPI_VERSION if (mpidata->mpirunning) { // only go through this if MPI has been actually used - for (int rr=1; rr < mpidata->nprocs; rr++) { + for (int rr=1; rr<mpidata->nprocs; rr++) { // get the data from process rr // how many openmp threads did process rr use? int remotethreads; MPI_Recv(&remotethreads, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - for (int ri = 0; ri < remotethreads; ri++) { + for (int ri=0; ri<remotethreads; ri++) { + // first get the ASCII local file char *chunk_buffer; int chunk_buffer_size = -1; MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); @@ -327,7 +324,8 @@ void cluster(const string& config_file, const string& data_file, const string& o delete[] chunk_buffer; MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } - + fputc(10, output); + // now get the binary local file long buffer_size = 0; // get the size of the buffer @@ -446,11 +444,18 @@ void cluster(const string& config_file, const string& data_file, const string& o partial_output.open(partial_file_name.c_str(), ios::in | ios::binary); partial_output.seekg(0, ios::end); const long partial_output_size = partial_output.tellg(); + partial_output.close(); + partial_output.open(partial_file_name.c_str(), ios::in | ios::binary); int chunk_buffer_size = 25165824; // Length of char array with 24Mb size char *chunk_buffer = new char[chunk_buffer_size](); int full_chunks = (int)(partial_output_size / chunk_buffer_size); for (int fi = 0; fi < full_chunks; fi++) { partial_output.read(chunk_buffer, chunk_buffer_size); + // If EOF is reached, do not send EOF character. + long ptr_position = partial_output.tellg(); + if (ptr_position == partial_output_size) { + chunk_buffer_size--; + } // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not) MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); // Actually send the file contents to Node-0 @@ -459,7 +464,7 @@ void cluster(const string& config_file, const string& data_file, const string& o long ptr_position = partial_output.tellg(); if (ptr_position < partial_output_size) { // Send the last partial buffer - chunk_buffer_size = partial_output_size - ptr_position; + chunk_buffer_size = partial_output_size - ptr_position - 1; partial_output.read(chunk_buffer, chunk_buffer_size); // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not) MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); @@ -472,7 +477,8 @@ void cluster(const string& config_file, const string& data_file, const string& o partial_output.close(); delete[] chunk_buffer; remove(partial_file_name.c_str()); - + logger->log("done.\n", LOG_DEBG); + partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri); message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; logger->log(message, LOG_DEBG); -- GitLab