Skip to content
Snippets Groups Projects
Commit 3752471c authored by Giovanni La Mura's avatar Giovanni La Mura
Browse files

Rewind ASCII files after getting their size and skip EOF character transmission

parent 287269a5
No related branches found
No related tags found
No related merge requests found
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
limitations under the License. limitations under the License.
*/ */
/*! \file cluster.cpp /*! \file cluster.cp
* *
* \brief Implementation of the calculation for a cluster of spheres. * \brief Implementation of the calculation for a cluster of spheres.
*/ */
...@@ -82,6 +82,7 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf ...@@ -82,6 +82,7 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf
void cluster(const string& config_file, const string& data_file, const string& output_path, const mixMPI *mpidata) { void cluster(const string& config_file, const string& data_file, const string& output_path, const mixMPI *mpidata) {
chrono::time_point<chrono::high_resolution_clock> t_start = chrono::high_resolution_clock::now(); chrono::time_point<chrono::high_resolution_clock> t_start = chrono::high_resolution_clock::now();
chrono::duration<double> elapsed; chrono::duration<double> elapsed;
string message;
string timing_name = output_path + "/c_timing_mpi"+ to_string(mpidata->rank) +".log"; string timing_name = output_path + "/c_timing_mpi"+ to_string(mpidata->rank) +".log";
FILE *timing_file = fopen(timing_name.c_str(), "w"); FILE *timing_file = fopen(timing_name.c_str(), "w");
Logger *time_logger = new Logger(LOG_DEBG, timing_file); Logger *time_logger = new Logger(LOG_DEBG, timing_file);
...@@ -271,39 +272,34 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -271,39 +272,34 @@ void cluster(const string& config_file, const string& data_file, const string& o
#pragma omp barrier #pragma omp barrier
{ {
// thread 0 already wrote on global files, skip it and take care of appending the others // thread 0 already wrote on global files, skip it and take care of appending the others
for (int ri = 0; ri < ompnumthreads; ri++) { for (int ri = 1; ri < ompnumthreads; ri++) {
// still, we need to remove all c_OCLU_RANK_0 files
string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri); string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri);
if (ri == 0) { string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
remove(partial_file_name.c_str()); logger->log(message, LOG_DEBG);
} else { FILE *partial_output = fopen(partial_file_name.c_str(), "r");
string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; int c = fgetc(partial_output);
logger->log(message, LOG_DEBG); while (c != EOF) {
FILE *partial_output = fopen(partial_file_name.c_str(), "r"); fputc(c, output);
int c = fgetc(partial_output); c = fgetc(partial_output);
while (c != EOF) {
fputc(c, output);
c = fgetc(partial_output);
}
fclose(partial_output);
remove(partial_file_name.c_str());
logger->log("done.\n", LOG_DEBG);
partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri);
message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
logger->log(message, LOG_DEBG);
fstream partial_tppoan;
partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary);
partial_tppoan.seekg(0, ios::end);
long buffer_size = partial_tppoan.tellg();
char *binary_buffer = new char[buffer_size];
partial_tppoan.seekg(0, ios::beg);
partial_tppoan.read(binary_buffer, buffer_size);
tppoan.write(binary_buffer, buffer_size);
partial_tppoan.close();
delete[] binary_buffer;
remove(partial_file_name.c_str());
logger->log("done.\n", LOG_DEBG);
} }
fclose(partial_output);
remove(partial_file_name.c_str());
logger->log("done.\n", LOG_DEBG);
partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri);
message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
logger->log(message, LOG_DEBG);
fstream partial_tppoan;
partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary);
partial_tppoan.seekg(0, ios::end);
long buffer_size = partial_tppoan.tellg();
char *binary_buffer = new char[buffer_size];
partial_tppoan.seekg(0, ios::beg);
partial_tppoan.read(binary_buffer, buffer_size);
tppoan.write(binary_buffer, buffer_size);
partial_tppoan.close();
delete[] binary_buffer;
remove(partial_file_name.c_str());
logger->log("done.\n", LOG_DEBG);
} }
} }
#endif #endif
...@@ -311,12 +307,13 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -311,12 +307,13 @@ void cluster(const string& config_file, const string& data_file, const string& o
#ifdef MPI_VERSION #ifdef MPI_VERSION
if (mpidata->mpirunning) { if (mpidata->mpirunning) {
// only go through this if MPI has been actually used // only go through this if MPI has been actually used
for (int rr=1; rr < mpidata->nprocs; rr++) { for (int rr=1; rr<mpidata->nprocs; rr++) {
// get the data from process rr // get the data from process rr
// how many openmp threads did process rr use? // how many openmp threads did process rr use?
int remotethreads; int remotethreads;
MPI_Recv(&remotethreads, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(&remotethreads, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for (int ri = 0; ri < remotethreads; ri++) { for (int ri=0; ri<remotethreads; ri++) {
// first get the ASCII local file
char *chunk_buffer; char *chunk_buffer;
int chunk_buffer_size = -1; int chunk_buffer_size = -1;
MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
...@@ -327,7 +324,8 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -327,7 +324,8 @@ void cluster(const string& config_file, const string& data_file, const string& o
delete[] chunk_buffer; delete[] chunk_buffer;
MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
} }
fputc(10, output);
// now get the binary local file // now get the binary local file
long buffer_size = 0; long buffer_size = 0;
// get the size of the buffer // get the size of the buffer
...@@ -446,11 +444,18 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -446,11 +444,18 @@ void cluster(const string& config_file, const string& data_file, const string& o
partial_output.open(partial_file_name.c_str(), ios::in | ios::binary); partial_output.open(partial_file_name.c_str(), ios::in | ios::binary);
partial_output.seekg(0, ios::end); partial_output.seekg(0, ios::end);
const long partial_output_size = partial_output.tellg(); const long partial_output_size = partial_output.tellg();
partial_output.close();
partial_output.open(partial_file_name.c_str(), ios::in | ios::binary);
int chunk_buffer_size = 25165824; // Length of char array with 24Mb size int chunk_buffer_size = 25165824; // Length of char array with 24Mb size
char *chunk_buffer = new char[chunk_buffer_size](); char *chunk_buffer = new char[chunk_buffer_size]();
int full_chunks = (int)(partial_output_size / chunk_buffer_size); int full_chunks = (int)(partial_output_size / chunk_buffer_size);
for (int fi = 0; fi < full_chunks; fi++) { for (int fi = 0; fi < full_chunks; fi++) {
partial_output.read(chunk_buffer, chunk_buffer_size); partial_output.read(chunk_buffer, chunk_buffer_size);
// If EOF is reached, do not send EOF character.
long ptr_position = partial_output.tellg();
if (ptr_position == partial_output_size) {
chunk_buffer_size--;
}
// Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not) // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not)
MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
// Actually send the file contents to Node-0 // Actually send the file contents to Node-0
...@@ -459,7 +464,7 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -459,7 +464,7 @@ void cluster(const string& config_file, const string& data_file, const string& o
long ptr_position = partial_output.tellg(); long ptr_position = partial_output.tellg();
if (ptr_position < partial_output_size) { if (ptr_position < partial_output_size) {
// Send the last partial buffer // Send the last partial buffer
chunk_buffer_size = partial_output_size - ptr_position; chunk_buffer_size = partial_output_size - ptr_position - 1;
partial_output.read(chunk_buffer, chunk_buffer_size); partial_output.read(chunk_buffer, chunk_buffer_size);
// Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not) // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not)
MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
...@@ -472,7 +477,8 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -472,7 +477,8 @@ void cluster(const string& config_file, const string& data_file, const string& o
partial_output.close(); partial_output.close();
delete[] chunk_buffer; delete[] chunk_buffer;
remove(partial_file_name.c_str()); remove(partial_file_name.c_str());
logger->log("done.\n", LOG_DEBG);
partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri); partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri);
message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
logger->log(message, LOG_DEBG); logger->log(message, LOG_DEBG);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment