Skip to content
Snippets Groups Projects
Commit b1b06a1b authored by Giovanni La Mura's avatar Giovanni La Mura
Browse files

Recombine ASCII output files in chunks of 24Mb

parent 1a4c8903
No related branches found
No related tags found
No related merge requests found
...@@ -69,7 +69,6 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf ...@@ -69,7 +69,6 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf
void cluster(const string& config_file, const string& data_file, const string& output_path, const mixMPI *mpidata) { void cluster(const string& config_file, const string& data_file, const string& output_path, const mixMPI *mpidata) {
chrono::time_point<chrono::high_resolution_clock> t_start = chrono::high_resolution_clock::now(); chrono::time_point<chrono::high_resolution_clock> t_start = chrono::high_resolution_clock::now();
chrono::duration<double> elapsed; chrono::duration<double> elapsed;
string message;
string timing_name = output_path + "/c_timing_mpi"+ to_string(mpidata->rank) +".log"; string timing_name = output_path + "/c_timing_mpi"+ to_string(mpidata->rank) +".log";
FILE *timing_file = fopen(timing_name.c_str(), "w"); FILE *timing_file = fopen(timing_name.c_str(), "w");
Logger *time_logger = new Logger(LOG_DEBG, timing_file); Logger *time_logger = new Logger(LOG_DEBG, timing_file);
...@@ -260,7 +259,6 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -260,7 +259,6 @@ void cluster(const string& config_file, const string& data_file, const string& o
{ {
// thread 0 already wrote on global files, skip it and take care of appending the others // thread 0 already wrote on global files, skip it and take care of appending the others
for (int ri = 1; ri < ompnumthreads; ri++) { for (int ri = 1; ri < ompnumthreads; ri++) {
// Giovanni, please add here in this loop code to reopen the temporary files, reread them and append them respectively to the global output and tppoan, before closing them
string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri); string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri);
string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
logger->log(message, LOG_DEBG); logger->log(message, LOG_DEBG);
...@@ -301,13 +299,17 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -301,13 +299,17 @@ void cluster(const string& config_file, const string& data_file, const string& o
int remotethreads; int remotethreads;
MPI_Recv(&remotethreads, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(&remotethreads, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for (int ri = 0; ri < remotethreads; ri++) { for (int ri = 0; ri < remotethreads; ri++) {
// first get the ASCII local file char *chunk_buffer;
int c = 0; int chunk_buffer_size = -1;
MPI_Recv(&c, 1, MPI_INT, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
while (c != EOF) { while (chunk_buffer_size != 0) {
fputc(c, output); char *chunk_buffer = new char[chunk_buffer_size];
MPI_Recv(&c, 1, MPI_INT, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(chunk_buffer, chunk_buffer_size, MPI_CHAR, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
fputs(chunk_buffer, output);
delete[] chunk_buffer;
MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
} }
// now get the binary local file // now get the binary local file
long buffer_size = 0; long buffer_size = 0;
// get the size of the buffer // get the size of the buffer
...@@ -419,19 +421,39 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -419,19 +421,39 @@ void cluster(const string& config_file, const string& data_file, const string& o
MPI_Send(&ompnumthreads, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); MPI_Send(&ompnumthreads, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
// reopen local files, send them all to MPI process 0 // reopen local files, send them all to MPI process 0
for (int ri = 0; ri < ompnumthreads; ri++) { for (int ri = 0; ri < ompnumthreads; ri++) {
// Giovanni, please add here in this loop code to reopen the temporary files, reread them and append them respectively to the global output and tppoan, before closing them
string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri); string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri);
string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
logger->log(message, LOG_DEBG); logger->log(message, LOG_DEBG);
FILE *partial_output = fopen(partial_file_name.c_str(), "r"); fstream partial_output;
int c = 0; partial_output.open(partial_file_name.c_str(), ios::in | ios::binary);
while (c != EOF) { partial_output.seekg(0, ios::end);
c = fgetc(partial_output); const long partial_output_size = partial_output.tellg();
MPI_Send(&c, 1, MPI_INT, 0, 0, MPI_COMM_WORLD); int chunk_buffer_size = 25165824; // Length of char array with 24Mb size
} char *chunk_buffer = new char[chunk_buffer_size]();
fclose(partial_output); int full_chunks = (int)(partial_output_size / chunk_buffer_size);
remove(partial_file_name.c_str()); for (int fi = 0; fi < full_chunks; fi++) {
logger->log("done.\n", LOG_DEBG); partial_output.read(chunk_buffer, chunk_buffer_size);
// Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not)
MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
// Actually send the file contents to Node-0
MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
}
long ptr_position = partial_output.tellg();
if (ptr_position < partial_output_size) {
// Send the last partial buffer
chunk_buffer_size = partial_output_size - ptr_position;
partial_output.read(chunk_buffer, chunk_buffer_size);
// Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not)
MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
// Actually send the file contents to Node-0
MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
}
// Send a size 0 flag to inform Node-0 that the transmission is complete
chunk_buffer_size = 0;
MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
partial_output.close();
delete[] chunk_buffer;
partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri); partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri);
message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... ";
logger->log(message, LOG_DEBG); logger->log(message, LOG_DEBG);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment