Skip to content
Snippets Groups Projects
Commit 4b8ee8a1 authored by Mulas, Giacomo's avatar Mulas, Giacomo
Browse files

Assemble ASCII output on MPI proc 0, thread 0 then write only from there

parent 38bfc070
No related branches found
No related tags found
No related merge requests found
...@@ -4,14 +4,14 @@ ...@@ -4,14 +4,14 @@
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
You may obtain a copy of the License at You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
/*! \file cluster.cpp /*! \file cluster.cpp
* *
...@@ -163,6 +163,8 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -163,6 +163,8 @@ void cluster(const string& config_file, const string& data_file, const string& o
double wp = sconf->wp; double wp = sconf->wp;
//FILE *output = fopen((output_path + "/c_OCLU").c_str(), "w"); //FILE *output = fopen((output_path + "/c_OCLU").c_str(), "w");
VirtualAsciiFile *p_output = new VirtualAsciiFile(); VirtualAsciiFile *p_output = new VirtualAsciiFile();
// for the time being, this is ok. When we can, add some logic in the sprintf calls that checks if a longer buffer would be needed, and in case expand it
// in any case, replace all sprintf() with snprintf(), to avoid in any case writing more than the available buffer size
char virtual_line[256]; char virtual_line[256];
ClusterIterationData *cid = new ClusterIterationData(gconf, sconf, mpidata); ClusterIterationData *cid = new ClusterIterationData(gconf, sconf, mpidata);
const int ndi = cid->c4->nsph * cid->c4->nlim; const int ndi = cid->c4->nsph * cid->c4->nlim;
...@@ -306,6 +308,7 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -306,6 +308,7 @@ void cluster(const string& config_file, const string& data_file, const string& o
#endif #endif
// Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled
int ompnumthreads = 1; int ompnumthreads = 1;
VirtualAsciiFile **p_outarray = NULL;
#ifdef USE_NVTX #ifdef USE_NVTX
nvtxRangePush("Parallel loop"); nvtxRangePush("Parallel loop");
...@@ -317,7 +320,10 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -317,7 +320,10 @@ void cluster(const string& config_file, const string& data_file, const string& o
#ifdef _OPENMP #ifdef _OPENMP
// If OpenMP is enabled, give actual values to myompthread and ompnumthreads, and open thread-local output files // If OpenMP is enabled, give actual values to myompthread and ompnumthreads, and open thread-local output files
myompthread = omp_get_thread_num(); myompthread = omp_get_thread_num();
if (myompthread == 0) ompnumthreads = omp_get_num_threads(); if (myompthread == 0) {
ompnumthreads = omp_get_num_threads();
p_outarray = new VirtualAsciiFile*[ompnumthreads];
}
#endif #endif
// To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number // To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number
ClusterIterationData *cid_2 = NULL; ClusterIterationData *cid_2 = NULL;
...@@ -338,6 +344,7 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -338,6 +344,7 @@ void cluster(const string& config_file, const string& data_file, const string& o
tppoanp_2 = new fstream; tppoanp_2 = new fstream;
tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), ios::out | ios::binary); tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), ios::out | ios::binary);
} }
p_outarray[myompthread] = p_output_2;
fstream &tppoan_2 = *tppoanp_2; fstream &tppoan_2 = *tppoanp_2;
// make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads // make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads
#pragma omp barrier #pragma omp barrier
...@@ -355,8 +362,8 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -355,8 +362,8 @@ void cluster(const string& config_file, const string& data_file, const string& o
if (myompthread != 0) { if (myompthread != 0) {
delete cid_2; delete cid_2;
//fclose(output_2); //fclose(output_2);
p_output_2->write_to_disk(output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread)); // p_output_2->write_to_disk(output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread));
delete p_output_2; // delete p_output_2;
tppoanp_2->close(); tppoanp_2->close();
delete tppoanp_2; delete tppoanp_2;
} }
...@@ -375,28 +382,36 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -375,28 +382,36 @@ void cluster(const string& config_file, const string& data_file, const string& o
#pragma omp barrier #pragma omp barrier
{ {
// thread 0 already wrote on global files, skip it and take care of appending the others // thread 0 already wrote on global files, skip it and take care of appending the others
for (int ti=1; ti<ompnumthreads; ti++) {
p_outarray[0]->append(*(p_outarray[ti]));
delete p_outarray[ti];
}
p_outarray[0]->write_to_disk(output_path + "/c_OCLU");
delete p_outarray[0];
delete[] p_outarray;
for (int ri = 1; ri < ompnumthreads; ri++) { for (int ri = 1; ri < ompnumthreads; ri++) {
string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri); // string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri);
string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; // string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
logger->log(message, LOG_DEBG); // logger->log(message, LOG_DEBG);
FILE *partial_output = fopen(partial_file_name.c_str(), "r"); // FILE *partial_output = fopen(partial_file_name.c_str(), "r");
char virtual_line[256]; // // have a look at the getline() or fgets() C functions to read one line at a time, instead of char by char, it would simplify things
int index = 0; // char virtual_line[256];
int c = fgetc(partial_output); // int index = 0;
while (c != EOF) { // int c = fgetc(partial_output);
virtual_line[index++] = c; // while (c != EOF) {
if (c == '\n') { // virtual_line[index++] = c;
virtual_line[index] = '\0'; // if (c == '\n') {
p_output->append_line(virtual_line); // virtual_line[index] = '\0';
index = 0; // p_output->append_line(virtual_line);
} // index = 0;
c = fgetc(partial_output); // }
} // c = fgetc(partial_output);
fclose(partial_output); // }
remove(partial_file_name.c_str()); // fclose(partial_output);
logger->log("done.\n", LOG_DEBG); // remove(partial_file_name.c_str());
partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri); // logger->log("done.\n", LOG_DEBG);
message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; string partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri);
string message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
logger->log(message, LOG_DEBG); logger->log(message, LOG_DEBG);
fstream partial_tppoan; fstream partial_tppoan;
partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary); partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary);
...@@ -419,33 +434,36 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -419,33 +434,36 @@ void cluster(const string& config_file, const string& data_file, const string& o
// only go through this if MPI has been actually used // only go through this if MPI has been actually used
for (int rr=1; rr<mpidata->nprocs; rr++) { for (int rr=1; rr<mpidata->nprocs; rr++) {
// get the data from process rr // get the data from process rr
// how many openmp threads did process rr use? VirtualAsciiFile *p_output = new VirtualAsciiFile(mpidata, rr);
p_output->append_to_disk(output_path + "/c_OCLU");
delete p_output;
// // how many openmp threads did process rr use?
int remotethreads; int remotethreads;
MPI_Recv(&remotethreads, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(&remotethreads, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for (int ri=0; ri<remotethreads; ri++) { for (int ri=0; ri<remotethreads; ri++) {
// first get the ASCII local file // // first get the ASCII local file
char *chunk_buffer; // char *chunk_buffer;
int chunk_buffer_size = -1; // int chunk_buffer_size = -1;
MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); // MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
while (chunk_buffer_size != 0) { // while (chunk_buffer_size != 0) {
char *chunk_buffer = new char[chunk_buffer_size]; // char *chunk_buffer = new char[chunk_buffer_size];
MPI_Recv(chunk_buffer, chunk_buffer_size, MPI_CHAR, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); // MPI_Recv(chunk_buffer, chunk_buffer_size, MPI_CHAR, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// fputs(chunk_buffer, output); // // fputs(chunk_buffer, output);
int index = 0, last_char = 0; // int index = 0, last_char = 0;
char c; // char c;
while (last_char < chunk_buffer_size) { // while (last_char < chunk_buffer_size) {
c = chunk_buffer[last_char++]; // c = chunk_buffer[last_char++];
virtual_line[index++] = c; // virtual_line[index++] = c;
if (c == '\n') { // if (c == '\n') {
virtual_line[index] = '\0'; // virtual_line[index] = '\0';
index = 0; // index = 0;
p_output->append_line(virtual_line); // p_output->append_line(virtual_line);
} // }
} // }
delete[] chunk_buffer; // delete[] chunk_buffer;
MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); // MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
} // }
// if (ri<remotethreads-1) fprintf(output, "\n"); // // if (ri<remotethreads-1) fprintf(output, "\n");
// now get the binary local file // now get the binary local file
long buffer_size = 0; long buffer_size = 0;
...@@ -461,6 +479,7 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -461,6 +479,7 @@ void cluster(const string& config_file, const string& data_file, const string& o
} }
} }
} }
#endif #endif
#ifdef USE_NVTX #ifdef USE_NVTX
nvtxRangePop(); nvtxRangePop();
...@@ -471,8 +490,8 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -471,8 +490,8 @@ void cluster(const string& config_file, const string& data_file, const string& o
logger->err("\nERROR: failed to open TPPOAN file.\n"); logger->err("\nERROR: failed to open TPPOAN file.\n");
} }
// fclose(output); // fclose(output);
p_output->write_to_disk(output_path + "/c_OCLU"); // p_output->write_to_disk(output_path + "/c_OCLU");
delete p_output; // delete p_output;
// Clean memory // Clean memory
delete cid; delete cid;
delete p_scattering_angles; delete p_scattering_angles;
...@@ -507,6 +526,7 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -507,6 +526,7 @@ void cluster(const string& config_file, const string& data_file, const string& o
// tppoan.open(tppoan_name.c_str(), ios::out | ios::binary); // tppoan.open(tppoan_name.c_str(), ios::out | ios::binary);
// Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled
int ompnumthreads = 1; int ompnumthreads = 1;
VirtualAsciiFile **p_outarray = NULL;
#pragma omp parallel #pragma omp parallel
{ {
...@@ -515,7 +535,10 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -515,7 +535,10 @@ void cluster(const string& config_file, const string& data_file, const string& o
#ifdef _OPENMP #ifdef _OPENMP
// If OpenMP is enabled, give actual values to myompthread and ompnumthreads, and open thread-local output files // If OpenMP is enabled, give actual values to myompthread and ompnumthreads, and open thread-local output files
myompthread = omp_get_thread_num(); myompthread = omp_get_thread_num();
if (myompthread == 0) ompnumthreads = omp_get_num_threads(); if (myompthread == 0) {
ompnumthreads = omp_get_num_threads();
p_outarray = new VirtualAsciiFile*[ompnumthreads];
}
#endif #endif
// To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number // To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number
ClusterIterationData *cid_2 = NULL; ClusterIterationData *cid_2 = NULL;
...@@ -533,6 +556,7 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -533,6 +556,7 @@ void cluster(const string& config_file, const string& data_file, const string& o
} }
// output_2 = fopen((output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), "w"); // output_2 = fopen((output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), "w");
p_output_2 = new VirtualAsciiFile(); p_output_2 = new VirtualAsciiFile();
p_outarray[myompthread] = p_output_2;
tppoanp_2 = new fstream; tppoanp_2 = new fstream;
tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), ios::out | ios::binary); tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), ios::out | ios::binary);
fstream &tppoan_2 = *tppoanp_2; fstream &tppoan_2 = *tppoanp_2;
...@@ -551,8 +575,8 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -551,8 +575,8 @@ void cluster(const string& config_file, const string& data_file, const string& o
delete cid_2; delete cid_2;
} }
// fclose(output_2); // fclose(output_2);
p_output_2->write_to_disk(output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread)); // p_output_2->write_to_disk(output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread));
delete p_output_2; // delete p_output_2;
tppoanp_2->close(); tppoanp_2->close();
delete tppoanp_2; delete tppoanp_2;
#pragma omp barrier #pragma omp barrier
...@@ -563,57 +587,64 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -563,57 +587,64 @@ void cluster(const string& config_file, const string& data_file, const string& o
} // closes pragma omp parallel } // closes pragma omp parallel
#pragma omp barrier #pragma omp barrier
{ {
// tell MPI process 0 how many threads we have on this process (not necessarily the same across all processes) for (int ti=1; ti<ompnumthreads; ti++) {
p_outarray[0]->append(*(p_outarray[ti]));
delete p_outarray[ti];
}
p_outarray[0]->mpisend(mpidata);
delete p_outarray[0];
delete[] p_outarray;
// // tell MPI process 0 how many threads we have on this process (not necessarily the same across all processes)
MPI_Send(&ompnumthreads, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); MPI_Send(&ompnumthreads, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
// reopen local files, send them all to MPI process 0 // // reopen local files, send them all to MPI process 0
for (int ri = 0; ri < ompnumthreads; ri++) { for (int ri = 0; ri < ompnumthreads; ri++) {
string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri); // string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri);
string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; // string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
logger->log(message, LOG_DEBG); // logger->log(message, LOG_DEBG);
fstream partial_output; // fstream partial_output;
partial_output.open(partial_file_name.c_str(), ios::in); // partial_output.open(partial_file_name.c_str(), ios::in);
partial_output.seekg(0, ios::end); // partial_output.seekg(0, ios::end);
const long partial_output_size = partial_output.tellg(); // const long partial_output_size = partial_output.tellg();
partial_output.close(); // partial_output.close();
partial_output.open(partial_file_name.c_str(), ios::in); // partial_output.open(partial_file_name.c_str(), ios::in);
int chunk_buffer_size = 25165824; // Length of char array with 24Mb size // int chunk_buffer_size = 25165824; // Length of char array with 24Mb size
char *chunk_buffer = new char[chunk_buffer_size](); // char *chunk_buffer = new char[chunk_buffer_size]();
int full_chunks = (int)(partial_output_size / chunk_buffer_size); // int full_chunks = (int)(partial_output_size / chunk_buffer_size);
for (int fi = 0; fi < full_chunks; fi++) { // for (int fi = 0; fi < full_chunks; fi++) {
partial_output.read(chunk_buffer, chunk_buffer_size); // partial_output.read(chunk_buffer, chunk_buffer_size);
// If EOF is reached, do not send EOF character. // // If EOF is reached, do not send EOF character.
long ptr_position = partial_output.tellg(); // long ptr_position = partial_output.tellg();
if (ptr_position == partial_output_size) { // if (ptr_position == partial_output_size) {
chunk_buffer[chunk_buffer_size - 1] = '\0'; // chunk_buffer[chunk_buffer_size - 1] = '\0';
} // }
// Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not) // // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not)
MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); // MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
// Actually send the file contents to Node-0 // // Actually send the file contents to Node-0
MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD); // MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
} // }
long ptr_position = partial_output.tellg(); // long ptr_position = partial_output.tellg();
if (ptr_position < partial_output_size) { // if (ptr_position < partial_output_size) {
// Send the last partial buffer // // Send the last partial buffer
chunk_buffer_size = partial_output_size - ptr_position; // chunk_buffer_size = partial_output_size - ptr_position;
delete[] chunk_buffer; // delete[] chunk_buffer;
chunk_buffer = new char[chunk_buffer_size]; // chunk_buffer = new char[chunk_buffer_size];
partial_output.read(chunk_buffer, chunk_buffer_size); // partial_output.read(chunk_buffer, chunk_buffer_size);
// chunk_buffer[chunk_buffer_size - 1] = '\0'; // // chunk_buffer[chunk_buffer_size - 1] = '\0';
// Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not) // // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not)
MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); // MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
// Actually send the file contents to Node-0 // // Actually send the file contents to Node-0
MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD); // MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
} // }
// Send a size 0 flag to inform Node-0 that the transmission is complete // // Send a size 0 flag to inform Node-0 that the transmission is complete
chunk_buffer_size = 0; // chunk_buffer_size = 0;
MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); // MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
partial_output.close(); // partial_output.close();
delete[] chunk_buffer; // delete[] chunk_buffer;
remove(partial_file_name.c_str()); // remove(partial_file_name.c_str());
logger->log("done.\n", LOG_DEBG); // logger->log("done.\n", LOG_DEBG);
partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri); string partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri);
message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; string message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
logger->log(message, LOG_DEBG); logger->log(message, LOG_DEBG);
fstream partial_tppoan; fstream partial_tppoan;
partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary); partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary);
......
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
#include <vector> #include <vector>
class mixMPI;
/*! \class FileSchema /*! \class FileSchema
* *
* \brief File content descriptor. * \brief File content descriptor.
...@@ -166,12 +168,12 @@ class HDFFile { ...@@ -166,12 +168,12 @@ class HDFFile {
class VirtualAsciiFile { class VirtualAsciiFile {
protected: protected:
//! \brief The number of lines. //! \brief The number of lines.
int32_t _num_lines; // int32_t _num_lines;
//! \brief A vector of strings representing the file lines. //! \brief A vector of strings representing the file lines.
std::vector<std::string> *_file_lines; std::vector<std::string> *_file_lines;
public: public:
const int32_t &num_lines = _num_lines; // const int32_t &num_lines = _num_lines;
/*! \brief VirtualAsciiFile instance constructor. /*! \brief VirtualAsciiFile instance constructor.
* *
* \param lines: `int32_t` Number of lines, if known in advance (optional, default is 0). * \param lines: `int32_t` Number of lines, if known in advance (optional, default is 0).
...@@ -184,6 +186,13 @@ public: ...@@ -184,6 +186,13 @@ public:
*/ */
VirtualAsciiFile(const VirtualAsciiFile& rhs); VirtualAsciiFile(const VirtualAsciiFile& rhs);
/*! \brief VirtualAsciiFile instance constructor copying all contents off MPISend() calls from MPI process rr.
*
* \param mpidata: `mixMPI *` pointer to MPI data structure.
* \param rr: `int` rank of the MPI process sending the data.
*/
VirtualAsciiFile(const mixMPI *mpidata, int rr);
/*! \brief VirtualAsciiFile instance destroyer. /*! \brief VirtualAsciiFile instance destroyer.
*/ */
~VirtualAsciiFile(); ~VirtualAsciiFile();
...@@ -192,7 +201,7 @@ public: ...@@ -192,7 +201,7 @@ public:
* *
* \param rhs: `const VirtualAsciiFile&` Reference to the VirtualAsciiFile to be appended. * \param rhs: `const VirtualAsciiFile&` Reference to the VirtualAsciiFile to be appended.
*/ */
void append(const VirtualAsciiFile& rhs); void append(VirtualAsciiFile& rhs);
/*! \brief Append a line at the end of the file. /*! \brief Append a line at the end of the file.
* *
...@@ -236,5 +245,153 @@ public: ...@@ -236,5 +245,153 @@ public:
* \return result: `int` A result code (0 if successful). * \return result: `int` A result code (0 if successful).
*/ */
int write_to_disk(const std::string& file_name); int write_to_disk(const std::string& file_name);
/*! \brief Send VirtualAsciiFile instance to MPI process 0 via MPISend() calls.
*
* \param mpidata: `mixMPI *` pointer to MPI data structure.
*/
void mpisend(const mixMPI *mpidata);
}; };
#endif #endif
// /*! \class VirtualBinaryLine
// *
// * \brief Virtual representation of a binary file line.
// */
// class VirtualBinaryLine {
// protected:
// //! \brief The pointer to the piece of data to be written, cast to char *
// char *_data_pointer;
// //! \brief the size of the data block.
// size_t _data_size;
// /*! \brief VirtualBinaryLine instance constructor.
// *
// * \param mydata: `int, double, long, float, complex, or dcomplex`piece of data to put in the line.
// */
// VirtualBinaryLine(int mydata);
// VirtualBinaryLine(long mydata);
// VirtualBinaryLine(float mydata);
// VirtualBinaryLine(double mydata);
// VirtualBinaryLine(complex mydata);
// VirtualBinaryLine(dcomplex mydata);
// /*! \brief VirtualBinaryLine copy constructor.
// *
// * \param rhs: `const VirtualBinaryLine&` Reference to a VirtualBinaryLine instance.
// */
// VirtualBinaryLine(const VirtualBinaryLine& rhs);
// /*! \brief VirtualBinaryLine instance constructor copying all contents off MPISend() calls from MPI process rr.
// *
// * \param mpidata: `mixMPI *` pointer to MPI data structure.
// * \param rr: `int` rank of the MPI process sending the data.
// */
// VirtualBinaryLine(const mixMPI *mpidata, int rr);
// /*! \brief VirtualBinaryLine instance destroyer.
// */
// ~VirtualBinaryLine();
// /*! \brief Send VirtualBinaryLine instance to MPI process 0 via MPISend() calls.
// *
// * \param mpidata: `mixMPI *` pointer to MPI data structure.
// */
// void mpisend(const mixMPI *mpidata);
// };
// /*! \class VirtualBinaryFile
// *
// * \brief Virtual representation of a binary file.
// */
// class VirtualBinaryFile {
// protected:
// //! \brief The number of lines.
// // int32_t _num_lines;
// //! \brief A vector of strings representing the file lines.
// std::vector<VirtualBinaryLine> *_file_lines;
// public:
// // const int32_t &num_lines = _num_lines;
// /*! \brief VirtualBinaryFile instance constructor.
// *
// * \param lines: `int32_t` Number of lines, if known in advance (optional, default is 0).
// */
// VirtualBinaryFile(int32_t lines = 0);
// /*! \brief VirtualBinaryFile copy constructor.
// *
// * \param rhs: `const VirtualBinaryFile&` Reference to a VirtualBinaryFile instance.
// */
// VirtualBinaryFile(const VirtualBinaryFile& rhs);
// /*! \brief VirtualBinaryFile instance constructor copying all contents off MPISend() calls from MPI process rr.
// *
// * \param mpidata: `mixMPI *` pointer to MPI data structure.
// * \param rr: `int` rank of the MPI process sending the data.
// */
// VirtualBinaryFile(const mixMPI *mpidata, int rr);
// /*! \brief VirtualBinaryFile instance destroyer.
// */
// ~VirtualBinaryFile();
// /*! \brief Append another VirtualBinaryFile at the end of the current instance.
// *
// * \param rhs: `const VirtualBinaryFile&` Reference to the VirtualBinaryFile to be appended.
// */
// void append(VirtualBinaryFile& rhs);
// /*! \brief Append a line at the end of the file.
// *
// * \param line: `const string&` Reference to a string representing the line.
// */
// void append_line(const std::string& line);
// /*! \brief Append the contents of the VirtualBinaryFile to a physical file on disk.
// *
// * \param file_name: `const string&` Name of the file to append contents to.
// * \return result: `int` A result code (0 if successful).
// */
// int append_to_disk(const std::string& file_name);
// /*! \brief Insert another VirtualBinaryFile at a given position.
// *
// * This function inserts a target VirtualBinaryFile in the current one at the given
// * position. Optionally, a range of lines to be inserted can be specified, otherwise
// * the full content of the target file is inserted. This function DOES NOT increase
// * the size of the inner storage and it can only be used if the inner storage has
// * already been adjusted to contain the insertion target.
// *
// * \param position: `int32_t` The position at which the other file is inserted in this one.
// * \param rhs: `const VirtualBinaryFile&` The refence to the VirtualBinaryFile to be inserted.
// * \param start: `int32_t` The first line to be inserted (optional, default is 0).
// * \param end: `int32_t` The last line to be inserted (optional, default is 0 to read all).
// * \param line: `const string&` Reference to a string representing the line.
// * \return result: `int` A result code (0 if successful).
// */
// int insert(int32_t position, VirtualBinaryFile& rhs, int32_t start = 0, int32_t end = 0);
// /*! \brief Get the number of lines in the current instance.
// *
// * \return size: `int32_t` The number of lines in the VirtualBinaryFile instance.
// */
// int32_t number_of_lines() { return _file_lines->size(); }
// /*! \brief Write virtual file contents to a real file on disk.
// *
// * \param file_name: `const string&` Name of the file to append contents to.
// * \return result: `int` A result code (0 if successful).
// */
// int write_to_disk(const std::string& file_name);
// /*! \brief Send VirtualBinaryFile instance to MPI process 0 via MPISend() calls.
// *
// * \param mpidata: `mixMPI *` pointer to MPI data structure.
// */
// void mpisend(const mixMPI *mpidata);
// };
//#endif
...@@ -20,6 +20,10 @@ ...@@ -20,6 +20,10 @@
#include "../include/file_io.h" #include "../include/file_io.h"
#endif #endif
#ifdef USE_MPI
#include <mpi.h>
#endif
using namespace std; using namespace std;
/* >>> FileSchema class implementation <<< */ /* >>> FileSchema class implementation <<< */
...@@ -237,13 +241,34 @@ VirtualAsciiFile::VirtualAsciiFile(int32_t lines) { ...@@ -237,13 +241,34 @@ VirtualAsciiFile::VirtualAsciiFile(int32_t lines) {
} }
VirtualAsciiFile::VirtualAsciiFile(const VirtualAsciiFile& rhs) { VirtualAsciiFile::VirtualAsciiFile(const VirtualAsciiFile& rhs) {
_num_lines = rhs._num_lines; // _num_lines = rhs._num_lines;
_file_lines = new vector<string>(); _file_lines = new vector<string>();
for (vector<string>::iterator it = rhs._file_lines->begin(); it != rhs._file_lines->end(); ++it) { for (vector<string>::iterator it = rhs._file_lines->begin(); it != rhs._file_lines->end(); ++it) {
_file_lines->push_back(*it); _file_lines->push_back(*it);
} }
} }
#ifdef MPI_VERSION
VirtualAsciiFile::VirtualAsciiFile(const mixMPI *mpidata, int rr) {
// receive _num_lines from MPI process rr
int32_t num_lines;
MPI_Recv(&num_lines, 1, MPI_INT32_T, rr, 10, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
int32_t mysize;
_file_lines = new vector<string>();
// loop over data to receive
for (int32_t zi=0; zi<num_lines; zi++) {
// receive the size of the string to receive
MPI_Recv(&mysize, 1, MPI_INT32_T, rr, 10, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// allocate the buffer accordingly
char buffer[mysize+1];
// receive the char buffer
MPI_Recv(buffer, mysize+1, MPI_CHAR, rr, 10, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// append to the vector
_file_lines->push_back(buffer);
}
}
#endif
VirtualAsciiFile::~VirtualAsciiFile() { VirtualAsciiFile::~VirtualAsciiFile() {
// is it necessary to pop them out one by one? isn't there the dedicated method of std::vector to clean the vector? // is it necessary to pop them out one by one? isn't there the dedicated method of std::vector to clean the vector?
// besides, shouldn't this be done anyway by the destructor of std:vector? // besides, shouldn't this be done anyway by the destructor of std:vector?
...@@ -253,7 +278,7 @@ VirtualAsciiFile::~VirtualAsciiFile() { ...@@ -253,7 +278,7 @@ VirtualAsciiFile::~VirtualAsciiFile() {
if (_file_lines != NULL) delete _file_lines; if (_file_lines != NULL) delete _file_lines;
} }
void VirtualAsciiFile::append(const VirtualAsciiFile& rhs) { void VirtualAsciiFile::append(VirtualAsciiFile& rhs) {
// concatenate the virtualasciifile pointed by rhs to the current one // concatenate the virtualasciifile pointed by rhs to the current one
// can't we use the dedicated method insert of std::vector to do the appending, instead of an explicit loop? // can't we use the dedicated method insert of std::vector to do the appending, instead of an explicit loop?
for (vector<string>::iterator it = rhs._file_lines->begin(); it != rhs._file_lines->end(); ++it) { for (vector<string>::iterator it = rhs._file_lines->begin(); it != rhs._file_lines->end(); ++it) {
...@@ -262,6 +287,7 @@ void VirtualAsciiFile::append(const VirtualAsciiFile& rhs) { ...@@ -262,6 +287,7 @@ void VirtualAsciiFile::append(const VirtualAsciiFile& rhs) {
} }
void VirtualAsciiFile::append_line(const string& line) { void VirtualAsciiFile::append_line(const string& line) {
// would it be worth reimplementing a sprintf-like method, so that we can give it all the arguments we would give to sprintf and get rid of the intermediate buffer completely?
// append a line of output to the virtualasciifile // append a line of output to the virtualasciifile
_file_lines->push_back(line); _file_lines->push_back(line);
} }
...@@ -313,4 +339,67 @@ int VirtualAsciiFile::write_to_disk(const std::string& file_name) { ...@@ -313,4 +339,67 @@ int VirtualAsciiFile::write_to_disk(const std::string& file_name) {
} }
return result; return result;
} }
#ifdef MPI_VERSION
void VirtualAsciiFile::mpisend(const mixMPI *mpidata) {
// Send VirtualAsciiFile instance to MPI process 0 via MPISend() calls
// first send the size
int32_t num_lines = _file_lines->size();
MPI_Send(&num_lines, 1, MPI_INT32_T, 0, 10, MPI_COMM_WORLD);
// now loop over data to send
int32_t mysize;
for (vector<string>::iterator it = _file_lines->begin(); it != _file_lines->end(); ++it) {
// send the length of each string
mysize = (int32_t) it->size();
MPI_Send(&mysize, 1, MPI_INT32_T, 0, 10, MPI_COMM_WORLD);
// send the string itself
MPI_Send(it->c_str(), mysize+1, MPI_CHAR, 0, 10, MPI_COMM_WORLD);
}
}
#endif
/* >>> End of VirtualAsciiFile class implementation <<< */ /* >>> End of VirtualAsciiFile class implementation <<< */
// /* >>> VirtualBinaryLine class implementation <<< */
// VirtualBinaryLine::VirtualBinaryLine(int mydata) {
// _data_size = sizeof(mydata)
// int *buffer = malloc(_data_size);
// *buffer = mydata;
// _data_pointer = reinterpret_cast<char *>(buffer);
// }
// VirtualBinaryLine::VirtualBinaryLine(double mydata) {
// _data_size = sizeof(mydata)
// double *buffer = malloc(_data_size);
// *buffer = mydata;
// _data_pointer = reinterpret_cast<char *>(buffer);
// }
// VirtualBinaryLine::VirtualBinaryLine(float mydata) {
// _data_size = sizeof(mydata)
// float *buffer = malloc(_data_size);
// *buffer = mydata;
// _data_pointer = reinterpret_cast<char *>(buffer);
// }
// VirtualBinaryLine::VirtualBinaryLine(long mydata) {
// _data_size = sizeof(mydata)
// long *buffer = malloc(_data_size);
// *buffer = mydata;
// _data_pointer = reinterpret_cast<char *>(buffer);
// }
// VirtualBinaryLine::VirtualBinaryLine(dcomplex mydata) {
// _data_size = sizeof(mydata)
// dcomplex *buffer = malloc(_data_size);
// *buffer = mydata;
// _data_pointer = reinterpret_cast<char *>(buffer);
// }
// VirtualBinaryLine::VirtualBinaryLine(complex mydata) {
// _data_size = sizeof(mydata)
// complex *buffer = malloc(_data_size);
// *buffer = mydata;
// _data_pointer = reinterpret_cast<char *>(buffer);
// }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment