diff --git a/src/cluster/cluster.cpp b/src/cluster/cluster.cpp index 602f9ea238418b540593b9281f529a3a4d76dc71..faabf2163db9b6902b868877742d5115144ec6d3 100644 --- a/src/cluster/cluster.cpp +++ b/src/cluster/cluster.cpp @@ -1,4 +1,4 @@ -/* Copyright 2004 INAF - Osservatorio Astronomico di Cagliari +/* Copyright 2024 INAF - Osservatorio Astronomico di Cagliari Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -25,6 +25,11 @@ #ifdef _OPENMP #include <omp.h> #endif +#ifdef USE_MPI +#ifndef MPI_VERSION +#include <mpi.h> +#endif +#endif #ifndef INCLUDE_TYPES_H_ #include "../include/types.h" @@ -74,251 +79,438 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf * \param data_file: `string` Name of the input data file. * \param output_path: `string` Directory to write the output files in. */ -void cluster(const string& config_file, const string& data_file, const string& output_path) { +void cluster(const string& config_file, const string& data_file, const string& output_path, const mixMPI *mpidata) { chrono::time_point<chrono::high_resolution_clock> t_start = chrono::high_resolution_clock::now(); chrono::duration<double> elapsed; string message; - string timing_name = output_path + "/c_timing.log"; + string timing_name = output_path + "/c_timing_mpi"+ to_string(mpidata->rank) +".log"; FILE *timing_file = fopen(timing_name.c_str(), "w"); Logger *time_logger = new Logger(LOG_DEBG, timing_file); Logger *logger = new Logger(LOG_INFO); - logger->log("INFO: making legacy configuration...", LOG_INFO); - ScattererConfiguration *sconf = NULL; - try { - sconf = ScattererConfiguration::from_dedfb(config_file); - } catch(const OpenConfigurationFileException &ex) { - logger->err("\nERROR: failed to open scatterer configuration file.\n"); - string message = "FILE: " + string(ex.what()) + "\n"; - logger->err(message); - exit(1); - } - sconf->write_formatted(output_path + "/c_OEDFB"); - sconf->write_binary(output_path + "/c_TEDF"); - sconf->write_binary(output_path + "/c_TEDF.hd5", "HDF5"); - GeometryConfiguration *gconf = NULL; - try { - gconf = GeometryConfiguration::from_legacy(data_file); - } catch (const OpenConfigurationFileException &ex) { - logger->err("\nERROR: failed to open geometry configuration file.\n"); - string message = "FILE: " + string(ex.what()) + "\n"; - logger->err(message); - if (sconf) delete sconf; - exit(1); - } - logger->log(" done.\n", LOG_INFO); - int s_nsph = sconf->number_of_spheres; - int nsph = gconf->number_of_spheres; - if (s_nsph == nsph) { - // Shortcuts to variables stored in configuration objects - ScatteringAngles *p_scattering_angles = new ScatteringAngles(gconf); - double wp = sconf->wp; - FILE *output = fopen((output_path + "/c_OCLU").c_str(), "w"); - ClusterIterationData *cid = new ClusterIterationData(gconf, sconf); - const int ndi = cid->c4->nsph * cid->c4->nlim; - np_int ndit = 2 * ndi; - logger->log("INFO: Size of matrices to invert: " + to_string((int64_t)ndit) + " x " + to_string((int64_t)ndit) +".\n"); - time_logger->log("INFO: Size of matrices to invert: " + to_string((int64_t)ndit) + " x " + to_string((int64_t)ndit) +".\n"); - fprintf(output, " READ(IR,*)NSPH,LI,LE,MXNDM,INPOL,NPNT,NPNTTS,IAVM,ISAM\n"); - fprintf(output, " %5d%5d%5d%5ld%5d%5d%5d%5d%5d\n", - nsph, cid->c4->li, cid->c4->le, gconf->mxndm, gconf->in_pol, gconf->npnt, - gconf->npntts, gconf->iavm, gconf->iavm - ); - fprintf(output, " READ(IR,*)RXX(I),RYY(I),RZZ(I)\n"); - for (int ri = 0; ri < nsph; ri++) fprintf(output, "%17.8lE%17.8lE%17.8lE\n", - gconf->get_sph_x(ri), gconf->get_sph_y(ri), gconf->get_sph_z(ri) - ); - fprintf(output, " READ(IR,*)TH,THSTP,THLST,THS,THSSTP,THSLST\n"); - fprintf( - output, " %10.3lE%10.3lE%10.3lE%10.3lE%10.3lE%10.3lE\n", - p_scattering_angles->th, p_scattering_angles->thstp, - p_scattering_angles->thlst, p_scattering_angles->ths, - p_scattering_angles->thsstp, p_scattering_angles->thslst - ); - fprintf(output, " READ(IR,*)PH,PHSTP,PHLST,PHS,PHSSTP,PHSLST\n"); - fprintf( - output, " %10.3lE%10.3lE%10.3lE%10.3lE%10.3lE%10.3lE\n", - p_scattering_angles->ph, p_scattering_angles->phstp, - p_scattering_angles->phlst, p_scattering_angles->phs, - p_scattering_angles->phsstp, p_scattering_angles->phslst - ); - fprintf(output, " READ(IR,*)JWTM\n"); - fprintf(output, " %5d\n", gconf->jwtm); - fprintf(output, " READ(ITIN)NSPHT\n"); - fprintf(output, " READ(ITIN)(IOG(I),I=1,NSPH)\n"); - fprintf(output, " READ(ITIN)EXDC,WP,XIP,IDFC,NXI\n"); - fprintf(output, " READ(ITIN)(XIV(I),I=1,NXI)\n"); - fprintf(output, " READ(ITIN)NSHL(I),ROS(I)\n"); - fprintf(output, " READ(ITIN)(RCF(I,NS),NS=1,NSH)\n"); - fprintf(output, " \n"); - str(sconf, cid->c1, cid->c1ao, cid->c3, cid->c4, cid->c6); - thdps(cid->c4->lm, cid->zpv); - double exdc = sconf->exdc; - double exri = sqrt(exdc); - fprintf(output, " REFR. INDEX OF EXTERNAL MEDIUM=%15.7lE\n", exri); - fstream *tppoanp = new fstream; - fstream &tppoan = *tppoanp; - string tppoan_name = output_path + "/c_TPPOAN"; - tppoan.open(tppoan_name.c_str(), ios::out | ios::binary); - if (tppoan.is_open()) { + // the following only happens on MPI process 0 + if (mpidata->rank == 0) { + logger->log("INFO: making legacy configuration...", LOG_INFO); + ScattererConfiguration *sconf = NULL; + try { + sconf = ScattererConfiguration::from_dedfb(config_file); + } catch(const OpenConfigurationFileException &ex) { + logger->err("\nERROR: failed to open scatterer configuration file.\n"); + string message = "FILE: " + string(ex.what()) + "\n"; + logger->err(message); + exit(1); + } + sconf->write_formatted(output_path + "/c_OEDFB"); + sconf->write_binary(output_path + "/c_TEDF"); + sconf->write_binary(output_path + "/c_TEDF.hd5", "HDF5"); + GeometryConfiguration *gconf = NULL; + try { + gconf = GeometryConfiguration::from_legacy(data_file); + } catch (const OpenConfigurationFileException &ex) { + logger->err("\nERROR: failed to open geometry configuration file.\n"); + string message = "FILE: " + string(ex.what()) + "\n"; + logger->err(message); + if (sconf) delete sconf; + exit(1); + } + logger->log(" done.\n", LOG_INFO); + int s_nsph = sconf->number_of_spheres; + int nsph = gconf->number_of_spheres; + if (s_nsph == nsph) { + // Shortcuts to variables stored in configuration objects + ScatteringAngles *p_scattering_angles = new ScatteringAngles(gconf); + double wp = sconf->wp; + FILE *output = fopen((output_path + "/c_OCLU").c_str(), "w"); + ClusterIterationData *cid = new ClusterIterationData(gconf, sconf, mpidata); + const int ndi = cid->c4->nsph * cid->c4->nlim; + np_int ndit = 2 * ndi; + logger->log("INFO: Size of matrices to invert: " + to_string((int64_t)ndit) + " x " + to_string((int64_t)ndit) +".\n"); + time_logger->log("INFO: Size of matrices to invert: " + to_string((int64_t)ndit) + " x " + to_string((int64_t)ndit) +".\n"); + fprintf(output, " READ(IR,*)NSPH,LI,LE,MXNDM,INPOL,NPNT,NPNTTS,IAVM,ISAM\n"); + fprintf(output, " %5d%5d%5d%5ld%5d%5d%5d%5d%5d\n", + nsph, cid->c4->li, cid->c4->le, gconf->mxndm, gconf->in_pol, gconf->npnt, + gconf->npntts, gconf->iavm, gconf->iavm + ); + fprintf(output, " READ(IR,*)RXX(I),RYY(I),RZZ(I)\n"); + for (int ri = 0; ri < nsph; ri++) fprintf(output, "%17.8lE%17.8lE%17.8lE\n", + gconf->get_sph_x(ri), gconf->get_sph_y(ri), gconf->get_sph_z(ri) + ); + fprintf(output, " READ(IR,*)TH,THSTP,THLST,THS,THSSTP,THSLST\n"); + fprintf( + output, " %10.3lE%10.3lE%10.3lE%10.3lE%10.3lE%10.3lE\n", + p_scattering_angles->th, p_scattering_angles->thstp, + p_scattering_angles->thlst, p_scattering_angles->ths, + p_scattering_angles->thsstp, p_scattering_angles->thslst + ); + fprintf(output, " READ(IR,*)PH,PHSTP,PHLST,PHS,PHSSTP,PHSLST\n"); + fprintf( + output, " %10.3lE%10.3lE%10.3lE%10.3lE%10.3lE%10.3lE\n", + p_scattering_angles->ph, p_scattering_angles->phstp, + p_scattering_angles->phlst, p_scattering_angles->phs, + p_scattering_angles->phsstp, p_scattering_angles->phslst + ); + fprintf(output, " READ(IR,*)JWTM\n"); + fprintf(output, " %5d\n", gconf->jwtm); + fprintf(output, " READ(ITIN)NSPHT\n"); + fprintf(output, " READ(ITIN)(IOG(I),I=1,NSPH)\n"); + fprintf(output, " READ(ITIN)EXDC,WP,XIP,IDFC,NXI\n"); + fprintf(output, " READ(ITIN)(XIV(I),I=1,NXI)\n"); + fprintf(output, " READ(ITIN)NSHL(I),ROS(I)\n"); + fprintf(output, " READ(ITIN)(RCF(I,NS),NS=1,NSH)\n"); + fprintf(output, " \n"); + str(sconf, cid->c1, cid->c1ao, cid->c3, cid->c4, cid->c6); + thdps(cid->c4->lm, cid->zpv); + double exdc = sconf->exdc; + double exri = sqrt(exdc); + fprintf(output, " REFR. INDEX OF EXTERNAL MEDIUM=%15.7lE\n", exri); + fstream *tppoanp = new fstream; + fstream &tppoan = *tppoanp; + string tppoan_name = output_path + "/c_TPPOAN"; + tppoan.open(tppoan_name.c_str(), ios::out | ios::binary); + if (tppoan.is_open()) { #ifdef USE_LAPACK - logger->log("INFO: using LAPACK calls.\n", LOG_INFO); -#elif defined USE_MAGMA - logger->log("INFO: using MAGMA calls.\n", LOG_INFO); - magma_init(); + logger->log("INFO: using LAPACK calls.\n", LOG_INFO); #else - logger->log("INFO: using fall-back lucin() calls.\n", LOG_INFO); + logger->log("INFO: using fall-back lucin() calls.\n", LOG_INFO); #endif - int iavm = gconf->iavm; - int isam = gconf->isam; - int inpol = gconf->in_pol; - int nxi = sconf->number_of_scales; - int nth = p_scattering_angles->nth; - int nths = p_scattering_angles->nths; - int nph = p_scattering_angles->nph; - int nphs = p_scattering_angles->nphs; - tppoan.write(reinterpret_cast<char *>(&iavm), sizeof(int)); - tppoan.write(reinterpret_cast<char *>(&isam), sizeof(int)); - tppoan.write(reinterpret_cast<char *>(&inpol), sizeof(int)); - tppoan.write(reinterpret_cast<char *>(&nxi), sizeof(int)); - tppoan.write(reinterpret_cast<char *>(&nth), sizeof(int)); - tppoan.write(reinterpret_cast<char *>(&nph), sizeof(int)); - tppoan.write(reinterpret_cast<char *>(&nths), sizeof(int)); - tppoan.write(reinterpret_cast<char *>(&nphs), sizeof(int)); - if (sconf->idfc < 0) { - cid->vk = cid->xip * cid->wn; - fprintf(output, " VK=%15.7lE, XI IS SCALE FACTOR FOR LENGTHS\n", cid->vk); - fprintf(output, " \n"); - } - // do the first iteration on jxi488 separately, since it seems to be different from the others - int jxi488 = 1; - chrono::time_point<chrono::high_resolution_clock> start_iter_1 = chrono::high_resolution_clock::now(); - int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid, output, output_path, tppoan); - chrono::time_point<chrono::high_resolution_clock> end_iter_1 = chrono::high_resolution_clock::now(); - elapsed = start_iter_1 - t_start; - message = "INFO: Calculation setup took " + to_string(elapsed.count()) + "s.\n"; - logger->log(message); - time_logger->log(message); - elapsed = end_iter_1 - start_iter_1; - message = "INFO: First iteration took " + to_string(elapsed.count()) + "s.\n"; - logger->log(message); - time_logger->log(message); + int iavm = gconf->iavm; + int isam = gconf->isam; + int inpol = gconf->in_pol; + int nxi = sconf->number_of_scales; + int nth = p_scattering_angles->nth; + int nths = p_scattering_angles->nths; + int nph = p_scattering_angles->nph; + int nphs = p_scattering_angles->nphs; + tppoan.write(reinterpret_cast<char *>(&iavm), sizeof(int)); + tppoan.write(reinterpret_cast<char *>(&isam), sizeof(int)); + tppoan.write(reinterpret_cast<char *>(&inpol), sizeof(int)); + tppoan.write(reinterpret_cast<char *>(&nxi), sizeof(int)); + tppoan.write(reinterpret_cast<char *>(&nth), sizeof(int)); + tppoan.write(reinterpret_cast<char *>(&nph), sizeof(int)); + tppoan.write(reinterpret_cast<char *>(&nths), sizeof(int)); + tppoan.write(reinterpret_cast<char *>(&nphs), sizeof(int)); + if (sconf->idfc < 0) { + cid->vk = cid->xip * cid->wn; + fprintf(output, " VK=%15.7lE, XI IS SCALE FACTOR FOR LENGTHS\n", cid->vk); + fprintf(output, " \n"); + } + // do the first iteration on jxi488 separately, since it seems to be different from the others + int jxi488 = 1; + chrono::time_point<chrono::high_resolution_clock> start_iter_1 = chrono::high_resolution_clock::now(); + int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid, output, output_path, tppoan); + chrono::time_point<chrono::high_resolution_clock> end_iter_1 = chrono::high_resolution_clock::now(); + elapsed = start_iter_1 - t_start; + string message = "INFO: Calculation setup took " + to_string(elapsed.count()) + "s.\n"; + logger->log(message); + time_logger->log(message); + elapsed = end_iter_1 - start_iter_1; + message = "INFO: First iteration took " + to_string(elapsed.count()) + "s.\n"; + logger->log(message); + time_logger->log(message); - // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled - int ompnumthreads = 1; + // here go the calls that send data to be duplicated on other MPI processes from process 0 to others, using MPI broadcasts, but only if MPI is actually used +#ifdef MPI_VERSION + if (mpidata->mpirunning) { + gconf->mpibcast(mpidata); + sconf->mpibcast(mpidata); + cid->mpibcast(mpidata); + p_scattering_angles->mpibcast(mpidata); + } +#endif + // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled + int ompnumthreads = 1; #pragma omp parallel - { - // Create and initialise this variable here, so that if OpenMP is enabled it is local to the thread, and if OpenMP is not enabled it has a well-defiled value anyway - int myompthread = 0; + { + // Create and initialise this variable here, so that if OpenMP is enabled it is local to the thread, and if OpenMP is not enabled it has a well-defiled value anyway + int myompthread = 0; #ifdef _OPENMP - // If OpenMP is enabled, give actual values to myompthread and ompnumthreads, and open thread-local output files - myompthread = omp_get_thread_num(); - if (myompthread == 0) ompnumthreads = omp_get_num_threads(); + // If OpenMP is enabled, give actual values to myompthread and ompnumthreads, and open thread-local output files + myompthread = omp_get_thread_num(); + if (myompthread == 0) ompnumthreads = omp_get_num_threads(); #endif - // To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number - ClusterIterationData *cid_2 = NULL; - FILE *output_2 = NULL; - fstream *tppoanp_2 = NULL; - // for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones - if (myompthread == 0) { - cid_2 = cid; - output_2 = output; - tppoanp_2 = tppoanp; - } else { - // this is not thread 0, so do create fresh copies of all local variables - cid_2 = new ClusterIterationData(*cid); - output_2 = fopen((output_path + "/c_OCLU_" + to_string(myompthread)).c_str(), "w"); - tppoanp_2 = new fstream; - tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(myompthread)).c_str(), ios::out | ios::binary); - } - fstream &tppoan_2 = *tppoanp_2; - // make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads + // To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number + ClusterIterationData *cid_2 = NULL; + FILE *output_2 = NULL; + fstream *tppoanp_2 = NULL; + // for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones + if (myompthread == 0) { + cid_2 = cid; + output_2 = output; + tppoanp_2 = tppoanp; + } else { + // this is not thread 0, so do create fresh copies of all local variables + cid_2 = new ClusterIterationData(*cid); + output_2 = fopen((output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), "w"); + tppoanp_2 = new fstream; + tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), ios::out | ios::binary); + } + fstream &tppoan_2 = *tppoanp_2; + // make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads #pragma omp barrier - if (myompthread==0) logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n"); - // ok, now I can actually start the parallel calculations + if (myompthread==0) logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n"); + // ok, now I can actually start the parallel calculations #pragma omp for - for (jxi488 = 2; jxi488 <= nxi; jxi488++) { - int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid_2, output_2, output_path, *tppoanp_2); - } + for (jxi488 = cid_2->firstxi; jxi488 <= cid_2->lastxi; jxi488++) { + int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid_2, output_2, output_path, *tppoanp_2); + } #pragma omp barrier - // only threads different from 0 have to free local copies of variables and close local files - if (myompthread != 0) { - delete cid_2; - fclose(output_2); - tppoanp_2->close(); - delete tppoanp_2; + // only threads different from 0 have to free local copies of variables and close local files + if (myompthread != 0) { + delete cid_2; + fclose(output_2); + tppoanp_2->close(); + delete tppoanp_2; + } +#pragma omp barrier + { + string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n"; + logger->log(message); + } + } // closes pragma omp parallel +#ifdef _OPENMP +#pragma omp barrier + { + // thread 0 already wrote on global files, skip it and take care of appending the others + for (int ri = 1; ri < ompnumthreads; ri++) { + string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri); + string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; + logger->log(message, LOG_DEBG); + FILE *partial_output = fopen(partial_file_name.c_str(), "r"); + int c = fgetc(partial_output); + while (c != EOF) { + fputc(c, output); + c = fgetc(partial_output); + } + fclose(partial_output); + remove(partial_file_name.c_str()); + logger->log("done.\n", LOG_DEBG); + partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri); + message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; + logger->log(message, LOG_DEBG); + fstream partial_tppoan; + partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary); + partial_tppoan.seekg(0, ios::end); + long buffer_size = partial_tppoan.tellg(); + char *binary_buffer = new char[buffer_size]; + partial_tppoan.seekg(0, ios::beg); + partial_tppoan.read(binary_buffer, buffer_size); + tppoan.write(binary_buffer, buffer_size); + partial_tppoan.close(); + delete[] binary_buffer; + remove(partial_file_name.c_str()); + logger->log("done.\n", LOG_DEBG); + } + } +#endif + // here go the code to append the files written in MPI processes > 0 to the ones on MPI process 0 +#ifdef MPI_VERSION + if (mpidata->mpirunning) { + // only go through this if MPI has been actually used + for (int rr=1; rr<mpidata->nprocs; rr++) { + // get the data from process rr + // how many openmp threads did process rr use? + int remotethreads; + MPI_Recv(&remotethreads, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + for (int ri=0; ri<remotethreads; ri++) { + // first get the ASCII local file + char *chunk_buffer; + int chunk_buffer_size = -1; + MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + while (chunk_buffer_size != 0) { + char *chunk_buffer = new char[chunk_buffer_size]; + MPI_Recv(chunk_buffer, chunk_buffer_size, MPI_CHAR, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + fputs(chunk_buffer, output); + delete[] chunk_buffer; + MPI_Recv(&chunk_buffer_size, 1, MPI_INT, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } + fprintf(output, "\n"); + + // now get the binary local file + long buffer_size = 0; + // get the size of the buffer + MPI_Recv(&buffer_size, 1, MPI_LONG, rr, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + // allocate the bufer + char *binary_buffer = new char[buffer_size]; + // actually receive the buffer + MPI_Recv(binary_buffer, buffer_size, MPI_CHAR, rr, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + // we can write it to disk + tppoan.write(binary_buffer, buffer_size); + delete[] binary_buffer; + } + } } - } // closes pragma omp parallel +#endif + tppoanp->close(); + delete tppoanp; + } else { // In case TPPOAN could not be opened. Should never happen. + logger->err("\nERROR: failed to open TPPOAN file.\n"); + } + fclose(output); + // Clean memory + delete cid; + delete p_scattering_angles; + } else { // NSPH mismatch between geometry and scatterer configurations. + throw UnrecognizedConfigurationException( + "Inconsistent geometry and scatterer configurations." + ); + } + delete sconf; + delete gconf; + chrono::time_point<chrono::high_resolution_clock> t_end = chrono::high_resolution_clock::now(); + elapsed = t_end - t_start; + string message = "INFO: Calculation lasted " + to_string(elapsed.count()) + "s.\n"; + logger->log(message); + logger->log("Finished: output written to " + output_path + "/c_OCLU\n"); + time_logger->log(message); + } + +#ifdef MPI_VERSION + else { + // here go the code for MPI processes other than 0 + // copy gconf, sconf, cid and p_scattering_angles from MPI process 0 + GeometryConfiguration *gconf = new GeometryConfiguration(mpidata); + ScattererConfiguration *sconf = new ScattererConfiguration(mpidata); + ClusterIterationData *cid = new ClusterIterationData(mpidata); + ScatteringAngles *p_scattering_angles = new ScatteringAngles(mpidata); + // open separate files for other MPI processes + // File *output = fopen((output_path + "/c_OCLU_mpi"+ to_string(mpidata->rank)).c_str(), "w"); + // fstream *tppoanp = new fstream; + // fstream &tppoan = *tppoanp; + // string tppoan_name = output_path + "/c_TPPOAN_mpi"+ to_string(mpidata->rank); + // tppoan.open(tppoan_name.c_str(), ios::out | ios::binary); + // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled + int ompnumthreads = 1; + +#pragma omp parallel + { + // Create and initialise this variable here, so that if OpenMP is enabled it is local to the thread, and if OpenMP is not enabled it has a well-defiled value anyway + int myompthread = 0; #ifdef _OPENMP + // If OpenMP is enabled, give actual values to myompthread and ompnumthreads, and open thread-local output files + myompthread = omp_get_thread_num(); + if (myompthread == 0) ompnumthreads = omp_get_num_threads(); +#endif + // To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number + ClusterIterationData *cid_2 = NULL; + FILE *output_2 = NULL; + fstream *tppoanp_2 = NULL; + // for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones + if (myompthread == 0) { + cid_2 = cid; + // output_2 = output; + // tppoanp_2 = tppoanp; + } else { + // this is not thread 0, so do create fresh copies of all local variables + cid_2 = new ClusterIterationData(*cid); + } + output_2 = fopen((output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), "w"); + tppoanp_2 = new fstream; + tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), ios::out | ios::binary); + fstream &tppoan_2 = *tppoanp_2; + // make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads +#pragma omp barrier + if (myompthread==0) logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n"); + // ok, now I can actually start the parallel calculations +#pragma omp for + for (int jxi488 = cid_2->firstxi; jxi488 <= cid_2->lastxi; jxi488++) { + int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid_2, output_2, output_path, *tppoanp_2); + } + +#pragma omp barrier + // only threads different from 0 have to free local copies of variables + if (myompthread != 0) { + delete cid_2; + } + fclose(output_2); + tppoanp_2->close(); + delete tppoanp_2; #pragma omp barrier { - message = "INFO: Thread-local output files closed and threads synchronized.\n"; + string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n"; logger->log(message); - // thread 0 already wrote on global files, skip it and take care of appending the others - chrono::time_point<chrono::high_resolution_clock> t_output_start = chrono::high_resolution_clock::now(); - for (int ri = 1; ri < ompnumthreads; ri++) { - // Giovanni, please add here in this loop code to reopen the temporary files, reread them and append them respectively to the global output and tppoan, before closing them - string partial_file_name = output_path + "/c_OCLU_" + to_string(ri); - message = "Copying ASCII output of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; - logger->log(message, LOG_DEBG); - FILE *partial_output = fopen(partial_file_name.c_str(), "r"); - char c = fgetc(partial_output); - while (c != EOF) { - fputc(c, output); - c = fgetc(partial_output); + } + } // closes pragma omp parallel +#pragma omp barrier + { + // tell MPI process 0 how many threads we have on this process (not necessarily the same across all processes) + MPI_Send(&ompnumthreads, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); + // reopen local files, send them all to MPI process 0 + for (int ri = 0; ri < ompnumthreads; ri++) { + string partial_file_name = output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(ri); + string message = "Copying ASCII output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; + logger->log(message, LOG_DEBG); + fstream partial_output; + partial_output.open(partial_file_name.c_str(), ios::in | ios::binary); + partial_output.seekg(0, ios::end); + const long partial_output_size = partial_output.tellg(); + partial_output.close(); + partial_output.open(partial_file_name.c_str(), ios::in | ios::binary); + int chunk_buffer_size = 25165824; // Length of char array with 24Mb size + char *chunk_buffer = new char[chunk_buffer_size](); + int full_chunks = (int)(partial_output_size / chunk_buffer_size); + for (int fi = 0; fi < full_chunks; fi++) { + partial_output.read(chunk_buffer, chunk_buffer_size); + // If EOF is reached, do not send EOF character. + long ptr_position = partial_output.tellg(); + if (ptr_position == partial_output_size) { + chunk_buffer[chunk_buffer_size - 1] = '\0'; } - fclose(partial_output); - remove(partial_file_name.c_str()); - logger->log("done.\n", LOG_DEBG); - partial_file_name = output_path + "/c_TPPOAN_" + to_string(ri); - message = "Copying binary output of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; - logger->log(message, LOG_DEBG); - fstream partial_tppoan; - partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary); - partial_tppoan.seekg(0, ios::end); - long buffer_size = partial_tppoan.tellg(); - char *binary_buffer = new char[buffer_size]; - partial_tppoan.seekg(0, ios::beg); - partial_tppoan.read(binary_buffer, buffer_size); - tppoan.write(binary_buffer, buffer_size); - partial_tppoan.close(); - delete[] binary_buffer; - remove(partial_file_name.c_str()); - logger->log("done.\n", LOG_DEBG); + // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not) + MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); + // Actually send the file contents to Node-0 + MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD); } - chrono::time_point<chrono::high_resolution_clock> t_output_end = chrono::high_resolution_clock::now(); - elapsed = t_output_end - t_output_start; - message = "INFO: Recombining output files took " + to_string(elapsed.count()) + "s.\n"; - logger->log(message); - time_logger->log(message); + long ptr_position = partial_output.tellg(); + if (ptr_position < partial_output_size) { + // Send the last partial buffer + chunk_buffer_size = partial_output_size - ptr_position; + delete[] chunk_buffer; + chunk_buffer = new char[chunk_buffer_size]; + partial_output.read(chunk_buffer, chunk_buffer_size); + chunk_buffer[chunk_buffer_size - 1] = '\0'; + // Send the size of the buffer that is being transmitted (Node-0 does not know whether it is full or not) + MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); + // Actually send the file contents to Node-0 + MPI_Send(chunk_buffer, chunk_buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD); + } + // Send a size 0 flag to inform Node-0 that the transmission is complete + chunk_buffer_size = 0; + MPI_Send(&chunk_buffer_size, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); + partial_output.close(); + delete[] chunk_buffer; + remove(partial_file_name.c_str()); + logger->log("done.\n", LOG_DEBG); + + partial_file_name = output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(ri); + message = "Copying binary output in MPI process " + to_string(mpidata->rank) + " of thread " + to_string(ri) + " of " + to_string(ompnumthreads - 1) + "... "; + logger->log(message, LOG_DEBG); + fstream partial_tppoan; + partial_tppoan.open(partial_file_name.c_str(), ios::in | ios::binary); + partial_tppoan.seekg(0, ios::end); + long buffer_size = partial_tppoan.tellg(); + char *binary_buffer = new char[buffer_size]; + partial_tppoan.seekg(0, ios::beg); + partial_tppoan.read(binary_buffer, buffer_size); + // tell MPI process 0 how large is the buffer + MPI_Send(&buffer_size, 1, MPI_LONG, 0, 1, MPI_COMM_WORLD); + // actually send the buffer + MPI_Send(binary_buffer, buffer_size, MPI_CHAR, 0, 0, MPI_COMM_WORLD); + // tppoan.write(binary_buffer, buffer_size); + partial_tppoan.close(); + delete[] binary_buffer; + remove(partial_file_name.c_str()); + logger->log("done.\n", LOG_DEBG); } -#endif - tppoanp->close(); - delete tppoanp; -#ifdef USE_MAGMA - magma_finalize(); -#endif - } else { // In case TPPOAN could not be opened. Should never happen. - logger->err("\nERROR: failed to open TPPOAN file.\n"); - } // closes if (tppoan.is_open) ... else ... - fclose(output); + } // Clean memory delete cid; delete p_scattering_angles; - } else { // NSPH mismatch between geometry and scatterer configurations. - throw UnrecognizedConfigurationException( - "Inconsistent geometry and scatterer configurations." - ); + delete sconf; + delete gconf; + } - delete sconf; - delete gconf; - chrono::time_point<chrono::high_resolution_clock> t_end = chrono::high_resolution_clock::now(); - elapsed = t_end - t_start; - message = "INFO: Calculation lasted " + to_string(elapsed.count()) + "s.\n"; - logger->log(message); - time_logger->log(message); - logger->log("Finished: output written to " + output_path + "/c_OCLU\n"); +#endif fclose(timing_file); delete time_logger; delete logger; @@ -327,8 +519,8 @@ void cluster(const string& config_file, const string& data_file, const string& o int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConfiguration *gconf, ScatteringAngles *sa, ClusterIterationData *cid, FILE *output, const string& output_path, fstream& tppoan) { int nxi = sconf->number_of_scales; - Logger *logger = new Logger(LOG_INFO); string message = "INFO: running scale iteration " + to_string(jxi488) + " of " + to_string(nxi) + ".\n"; + Logger *logger = new Logger(LOG_INFO); logger->log(message); chrono::duration<double> elapsed; chrono::time_point<chrono::high_resolution_clock> interval_start, interval_end; @@ -369,7 +561,6 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf hjv(exri, vkarg, jer, lcalc, cid->arg, cid->c1, cid->c1ao, cid->c4); if (jer != 0) { fprintf(output, " STOP IN HJV\n"); - delete logger; return jer; // break; // rewrite this to go to the end of the function, to free locally allocated variables and return jer } @@ -399,13 +590,11 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf ); if (jer != 0) { fprintf(output, " STOP IN DME\n"); - delete logger; return jer; //break; } } if (jer != 0) { - delete logger; return jer; //break; } @@ -987,7 +1176,10 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf elapsed = interval_end - interval_start; message = "INFO: angle loop for scale " + to_string(jxi488) + " took " + to_string(elapsed.count()) + "s.\n"; logger->log(message); + logger->log("INFO: finished scale iteration " + to_string(jxi488) + " of " + to_string(nxi) + ".\n"); + delete logger; + return jer; }