Skip to content
Snippets Groups Projects
Commit c93c88ec authored by Giovanni La Mura's avatar Giovanni La Mura
Browse files

Add MPI_Barrier() calls

parent d64883e8
No related branches found
No related tags found
No related merge requests found
...@@ -386,11 +386,6 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -386,11 +386,6 @@ void cluster(const string& config_file, const string& data_file, const string& o
// only threads different from 0 have to free local copies of variables and close local files // only threads different from 0 have to free local copies of variables and close local files
if (myompthread != 0) { if (myompthread != 0) {
delete cid_2; delete cid_2;
//fclose(output_2);
// p_output_2->write_to_disk(output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread));
// delete p_output_2;
// tppoanp_2->close();
// delete tppoanp_2;
} }
#pragma omp barrier #pragma omp barrier
{ {
...@@ -418,43 +413,6 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -418,43 +413,6 @@ void cluster(const string& config_file, const string& data_file, const string& o
vtppoanarray[0]->append_to_disk(output_path + "/c_TPPOAN"); vtppoanarray[0]->append_to_disk(output_path + "/c_TPPOAN");
delete vtppoanarray[0]; delete vtppoanarray[0];
delete[] vtppoanarray; delete[] vtppoanarray;
// for (int ri = 1; ri < ompnumthreads; ri++) {
// string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri);
// string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
// logger->log(message, LOG_DEBG);
// FILE *partial_output = fopen(partial_file_name.c_str(), "r");
// // have a look at the getline() or fgets() C functions to read one line at a time, instead of char by char, it would simplify things
// char virtual_line[256];
// int index = 0;
// int c = fgetc(partial_output);
// while (c != EOF) {
// virtual_line[index++] = c;
// if (c == '\n') {
// virtual_line[index] = '\0';
// p_output->append_line(virtual_line);
// index = 0;
// }
// c = fgetc(partial_output);
// }
// fclose(partial_output);
// remove(partial_file_name.c_str());
// logger->log("done.\n", LOG_DEBG);
// string partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri);
// string message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
// logger->log(message, LOG_DEBG);
// fstream partial_tppoan;
// partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary);
// partial_tppoan.seekg(0, ios::end);
// long buffer_size = partial_tppoan.tellg();
// char *binary_buffer = new char[buffer_size];
// partial_tppoan.seekg(0, ios::beg);
// partial_tppoan.read(binary_buffer, buffer_size);
// // tppoan.write(binary_buffer, buffer_size);
// partial_tppoan.close();
// delete[] binary_buffer;
// remove(partial_file_name.c_str());
// logger->log("done.\n", LOG_DEBG);
// }
} }
// here go the code to append the files written in MPI processes > 0 to the ones on MPI process 0 // here go the code to append the files written in MPI processes > 0 to the ones on MPI process 0
#ifdef MPI_VERSION #ifdef MPI_VERSION
...@@ -466,48 +424,9 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -466,48 +424,9 @@ void cluster(const string& config_file, const string& data_file, const string& o
p_output->append_to_disk(output_path + "/c_OCLU"); p_output->append_to_disk(output_path + "/c_OCLU");
delete p_output; delete p_output;
VirtualBinaryFile *vtppoanp = new VirtualBinaryFile(mpidata, rr); VirtualBinaryFile *vtppoanp = new VirtualBinaryFile(mpidata, rr);
vtppoanp->append_to_disk(output_path + "/c_TPPOAN_bis"); vtppoanp->append_to_disk(output_path + "/c_TPPOAN");
delete vtppoanp; delete vtppoanp;
// // how many openmp threads did process rr use? int test = MPI_Barrier(MPI_COMM_WORLD);
// int remotethreads;
// MPI_Recv(&remotethreads, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// for (int ri=0; ri<remotethreads; ri++) {
// // first get the ASCII local file
// char *chunk_buffer;
// int chunk_buffer_size = -1;
// MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// while (chunk_buffer_size != 0) {
// char *chunk_buffer = new char[chunk_buffer_size];
// MPI_Recv(chunk_buffer, chunk_buffer_size, MPI_CHAR, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// // fputs(chunk_buffer, output);
// int index = 0, last_char = 0;
// char c;
// while (last_char < chunk_buffer_size) {
// c = chunk_buffer[last_char++];
// virtual_line[index++] = c;
// if (c == '\n') {
// virtual_line[index] = '\0';
// index = 0;
// p_output->append_line(virtual_line);
// }
// }
// delete[] chunk_buffer;
// MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// }
// // if (ri<remotethreads-1) fprintf(output, "\n");
// // now get the binary local file
// long buffer_size = 0;
// // get the size of the buffer
// MPI_Recv(&buffer_size, 1, MPI_LONG, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// // allocate the bufer
// char *binary_buffer = new char[buffer_size];
// // actually receive the buffer
// MPI_Recv(binary_buffer, buffer_size, MPI_CHAR, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// // we can write it to disk
// // tppoan.write(binary_buffer, buffer_size);
// delete[] binary_buffer;
// }
} }
} }
...@@ -515,14 +434,6 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -515,14 +434,6 @@ void cluster(const string& config_file, const string& data_file, const string& o
#ifdef USE_NVTX #ifdef USE_NVTX
nvtxRangePop(); nvtxRangePop();
#endif #endif
// tppoanp->close();
// delete tppoanp;
// } else { // In case TPPOAN could not be opened. Should never happen.
// logger->err("\nERROR: failed to open TPPOAN file.\n");
// }
// fclose(output);
// p_output->write_to_disk(output_path + "/c_OCLU");
// delete p_output;
// Clean memory // Clean memory
delete cid; delete cid;
delete p_scattering_angles; delete p_scattering_angles;
...@@ -550,11 +461,6 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -550,11 +461,6 @@ void cluster(const string& config_file, const string& data_file, const string& o
ClusterIterationData *cid = new ClusterIterationData(mpidata); ClusterIterationData *cid = new ClusterIterationData(mpidata);
ScatteringAngles *p_scattering_angles = new ScatteringAngles(mpidata); ScatteringAngles *p_scattering_angles = new ScatteringAngles(mpidata);
// open separate files for other MPI processes // open separate files for other MPI processes
// File *output = fopen((output_path + "/c_OCLU_mpi"+ to_string(mpidata->rank)).c_str(), "w");
// fstream *tppoanp = new fstream;
// fstream &tppoan = *tppoanp;
// string tppoan_name = output_path + "/c_TPPOAN_mpi"+ to_string(mpidata->rank);
// tppoan.open(tppoan_name.c_str(), ios::out | ios::binary);
// Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled
int ompnumthreads = 1; int ompnumthreads = 1;
VirtualAsciiFile **p_outarray = NULL; VirtualAsciiFile **p_outarray = NULL;
...@@ -575,27 +481,19 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -575,27 +481,19 @@ void cluster(const string& config_file, const string& data_file, const string& o
} }
// To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number // To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number
ClusterIterationData *cid_2 = NULL; ClusterIterationData *cid_2 = NULL;
//FILE *output_2 = NULL;
VirtualAsciiFile *p_output_2 = NULL; VirtualAsciiFile *p_output_2 = NULL;
VirtualBinaryFile *vtppoanp_2 = NULL; VirtualBinaryFile *vtppoanp_2 = NULL;
// fstream *tppoanp_2 = NULL;
// for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones // for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones
if (myompthread == 0) { if (myompthread == 0) {
cid_2 = cid; cid_2 = cid;
// output_2 = output;
// tppoanp_2 = tppoanp;
} else { } else {
// this is not thread 0, so do create fresh copies of all local variables // this is not thread 0, so do create fresh copies of all local variables
cid_2 = new ClusterIterationData(*cid); cid_2 = new ClusterIterationData(*cid);
} }
// output_2 = fopen((output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), "w");
p_output_2 = new VirtualAsciiFile(); p_output_2 = new VirtualAsciiFile();
p_outarray[myompthread] = p_output_2; p_outarray[myompthread] = p_output_2;
vtppoanp_2 = new VirtualBinaryFile(); vtppoanp_2 = new VirtualBinaryFile();
vtppoanarray[myompthread] = vtppoanp_2; vtppoanarray[myompthread] = vtppoanp_2;
// tppoanp_2 = new fstream;
// tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), ios::out | ios::binary);
// fstream &tppoan_2 = *tppoanp_2;
// make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads // make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads
#pragma omp barrier #pragma omp barrier
if (myompthread==0) logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n"); if (myompthread==0) logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n");
...@@ -610,11 +508,6 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -610,11 +508,6 @@ void cluster(const string& config_file, const string& data_file, const string& o
if (myompthread != 0) { if (myompthread != 0) {
delete cid_2; delete cid_2;
} }
// fclose(output_2);
// p_output_2->write_to_disk(output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread));
// delete p_output_2;
// tppoanp_2->close();
// delete tppoanp_2;
#pragma omp barrier #pragma omp barrier
{ {
string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n"; string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n";
...@@ -629,81 +522,17 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -629,81 +522,17 @@ void cluster(const string& config_file, const string& data_file, const string& o
vtppoanarray[0]->append(*(vtppoanarray[ti])); vtppoanarray[0]->append(*(vtppoanarray[ti]));
delete vtppoanarray[ti]; delete vtppoanarray[ti];
} }
p_outarray[0]->mpisend(mpidata); for (int rr = 1; rr < mpidata->nprocs; rr++) {
delete p_outarray[0]; if (rr == mpidata->rank) {
delete[] p_outarray; p_outarray[0]->mpisend(mpidata);
vtppoanarray[0]->mpisend(mpidata); delete p_outarray[0];
delete vtppoanarray[0]; delete[] p_outarray;
delete[] vtppoanarray; vtppoanarray[0]->mpisend(mpidata);
// // tell MPI process 0 how many threads we have on this process (not necessarily the same across all processes) delete vtppoanarray[0];
// MPI_Send(&ompnumthreads, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); delete[] vtppoanarray;
// // reopen local files, send them all to MPI process 0 }
// for (int ri = 0; ri < ompnumthreads; ri++) { int test = MPI_Barrier(MPI_COMM_WORLD);
// string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri); }
// string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
// logger->log(message, LOG_DEBG);
// fstream partial_output;
// partial_output.open(partial_file_name.c_str(), ios::in);
// partial_output.seekg(0, ios::end);
// const long partial_output_size = partial_output.tellg();
// partial_output.close();
// partial_output.open(partial_file_name.c_str(), ios::in);
// int chunk_buffer_size = 25165824; // Length of char array with 24Mb size
// char *chunk_buffer = new char[chunk_buffer_size]();
// int full_chunks = (int)(partial_output_size / chunk_buffer_size);
// for (int fi = 0; fi < full_chunks; fi++) {
// partial_output.read(chunk_buffer, chunk_buffer_size);
// // If EOF is reached, do not send EOF character.
// long ptr_position = partial_output.tellg();
// if (ptr_position == partial_output_size) {
// chunk_buffer[chunk_buffer_size - 1] = '\0';
// }
// // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not)
// MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
// // Actually send the file contents to Node-0
// MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
// }
// long ptr_position = partial_output.tellg();
// if (ptr_position < partial_output_size) {
// // Send the last partial buffer
// chunk_buffer_size = partial_output_size - ptr_position;
// delete[] chunk_buffer;
// chunk_buffer = new char[chunk_buffer_size];
// partial_output.read(chunk_buffer, chunk_buffer_size);
// // chunk_buffer[chunk_buffer_size - 1] = '\0';
// // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not)
// MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
// // Actually send the file contents to Node-0
// MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
// }
// // Send a size 0 flag to inform Node-0 that the transmission is complete
// chunk_buffer_size = 0;
// MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
// partial_output.close();
// delete[] chunk_buffer;
// remove(partial_file_name.c_str());
// logger->log("done.\n", LOG_DEBG);
// string partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri);
// string message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
// logger->log(message, LOG_DEBG);
// fstream partial_tppoan;
// partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary);
// partial_tppoan.seekg(0, ios::end);
// long buffer_size = partial_tppoan.tellg();
// char *binary_buffer = new char[buffer_size];
// partial_tppoan.seekg(0, ios::beg);
// partial_tppoan.read(binary_buffer, buffer_size);
// // tell MPI process 0 how large is the buffer
// MPI_Send(&buffer_size, 1, MPI_LONG, 0, 1, MPI_COMM_WORLD);
// // actually send the buffer
// MPI_Send(binary_buffer, buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
// // // tppoan.write(binary_buffer, buffer_size);
// partial_tppoan.close();
// delete[] binary_buffer;
// remove(partial_file_name.c_str());
// logger->log("done.\n", LOG_DEBG);
// }
} }
// Clean memory // Clean memory
delete cid; delete cid;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment