diff --git a/src/cluster/cluster.cpp b/src/cluster/cluster.cpp index 52869a5940e7f356fe7308486a8d20437635315c..bcc959da32dd4748ab05d4724409593799e4de29 100644 --- a/src/cluster/cluster.cpp +++ b/src/cluster/cluster.cpp @@ -69,7 +69,6 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf void cluster(const string& config_file, const string& data_file, const string& output_path, const mixMPI *mpidata) { chrono::time_point<chrono::high_resolution_clock> t_start = chrono::high_resolution_clock::now(); chrono::duration<double> elapsed; - string message; string timing_name = output_path + "/c_timing_mpi"+ to_string(mpidata->rank) +".log"; FILE *timing_file = fopen(timing_name.c_str(), "w"); Logger *time_logger = new Logger(LOG_DEBG, timing_file); @@ -260,7 +259,6 @@ void cluster(const string& config_file, const string& data_file, const string& o { // thread 0 already wrote on global files, skip it and take care of appending the others for (int ri = 1; ri < ompnumthreads; ri++) { - // Giovanni, please add here in this loop code to reopen the temporary files, reread them and append them respectively to the global output and tppoan, before closing them string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri); string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; logger->log(message, LOG_DEBG); @@ -295,19 +293,23 @@ void cluster(const string& config_file, const string& data_file, const string& o #ifdef MPI_VERSION if (mpidata->mpirunning) { // only go through this if MPI has been actually used - for (int rr=1; rr<mpidata->nprocs; rr++) { + for (int rr=1; rr < mpidata->nprocs; rr++) { // get the data from process rr // how many openmp threads did process rr use? int remotethreads; MPI_Recv(&remotethreads, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - for (int ri=0; ri<remotethreads; ri++) { - // first get the ASCII local file - int c = 0; - MPI_Recv(&c, 1, MPI_INT, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - while (c != EOF) { - fputc(c, output); - MPI_Recv(&c, 1, MPI_INT, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + for (int ri = 0; ri < remotethreads; ri++) { + char *chunk_buffer; + int chunk_buffer_size = -1; + MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + while (chunk_buffer_size != 0) { + char *chunk_buffer = new char[chunk_buffer_size]; + MPI_Recv(chunk_buffer, chunk_buffer_size, MPI_CHAR, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + fputs(chunk_buffer, output); + delete[] chunk_buffer; + MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } + // now get the binary local file long buffer_size = 0; // get the size of the buffer @@ -419,19 +421,39 @@ void cluster(const string& config_file, const string& data_file, const string& o MPI_Send(&ompnumthreads, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); // reopen local files, send them all to MPI process 0 for (int ri = 0; ri < ompnumthreads; ri++) { - // Giovanni, please add here in this loop code to reopen the temporary files, reread them and append them respectively to the global output and tppoan, before closing them string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri); string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; logger->log(message, LOG_DEBG); - FILE *partial_output = fopen(partial_file_name.c_str(), "r"); - int c = 0; - while (c != EOF) { - c = fgetc(partial_output); - MPI_Send(&c, 1, MPI_INT, 0, 0, MPI_COMM_WORLD); + fstream partial_output; + partial_output.open(partial_file_name.c_str(), ios::in | ios::binary); + partial_output.seekg(0, ios::end); + const long partial_output_size = partial_output.tellg(); + int chunk_buffer_size = 25165824; // Length of char array with 24Mb size + char *chunk_buffer = new char[chunk_buffer_size](); + int full_chunks = (int)(partial_output_size / chunk_buffer_size); + for (int fi = 0; fi < full_chunks; fi++) { + partial_output.read(chunk_buffer, chunk_buffer_size); + // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not) + MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); + // Actually send the file contents to Node-0 + MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD); } - fclose(partial_output); - remove(partial_file_name.c_str()); - logger->log("done.\n", LOG_DEBG); + long ptr_position = partial_output.tellg(); + if (ptr_position < partial_output_size) { + // Send the last partial buffer + chunk_buffer_size = partial_output_size - ptr_position; + partial_output.read(chunk_buffer, chunk_buffer_size); + // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not) + MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); + // Actually send the file contents to Node-0 + MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD); + } + // Send a size 0 flag to inform Node-0 that the transmission is complete + chunk_buffer_size = 0; + MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); + partial_output.close(); + delete[] chunk_buffer; + partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri); message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; logger->log(message, LOG_DEBG);