diff --git a/src/cluster/cluster.cpp b/src/cluster/cluster.cpp index e6be934ab8786a1d9eda1362bc1a168b26fb45a9..a6d640b8c5d8c16290421e4d68bd46304659f84a 100644 --- a/src/cluster/cluster.cpp +++ b/src/cluster/cluster.cpp @@ -94,6 +94,7 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf * \param data_file: `string` Name of the input data file. * \param output_path: `string` Directory to write the output files in. */ + void cluster(const string& config_file, const string& data_file, const string& output_path, const mixMPI *mpidata) { chrono::time_point<chrono::high_resolution_clock> t_start = chrono::high_resolution_clock::now(); chrono::duration<double> elapsed; @@ -102,6 +103,10 @@ void cluster(const string& config_file, const string& data_file, const string& o FILE *timing_file = fopen(timing_name.c_str(), "w"); Logger *time_logger = new Logger(LOG_DEBG, timing_file); Logger *logger = new Logger(LOG_DEBG); + + //=========== + // Initialise MAGMA + //=========== #ifdef USE_MAGMA int device_count; cudaGetDeviceCount(&device_count); @@ -117,11 +122,18 @@ void cluster(const string& config_file, const string& data_file, const string& o return; } #endif + // end MAGMA initialisation + + //=========================== // the following only happens on MPI process 0 + //=========================== if (mpidata->rank == 0) { #ifdef USE_NVTX nvtxRangePush("Set up"); #endif + //======================= + // Initialise sconf from configuration file + //======================= logger->log("INFO: making legacy configuration...", LOG_INFO); ScattererConfiguration *sconf = NULL; try { @@ -138,6 +150,11 @@ void cluster(const string& config_file, const string& data_file, const string& o sconf->write_formatted(output_path + "/c_OEDFB"); sconf->write_binary(output_path + "/c_TEDF"); sconf->write_binary(output_path + "/c_TEDF.hd5", "HDF5"); + // end logger initialisation + + //======================== + // Initialise gconf from configuration files + //======================== GeometryConfiguration *gconf = NULL; try { gconf = GeometryConfiguration::from_legacy(data_file); @@ -152,25 +169,33 @@ void cluster(const string& config_file, const string& data_file, const string& o return; } logger->log(" done.\n", LOG_INFO); + //end gconf initialisation + #ifdef USE_NVTX nvtxRangePop(); #endif int s_nsph = sconf->number_of_spheres; int nsph = gconf->number_of_spheres; + // Sanity check on number of sphere consistency, should always be verified if (s_nsph == nsph) { // Shortcuts to variables stored in configuration objects ScatteringAngles *p_scattering_angles = new ScatteringAngles(gconf); double wp = sconf->wp; - //FILE *output = fopen((output_path + "/c_OCLU").c_str(), "w"); + // Open empty virtual ascii file for output VirtualAsciiFile *p_output = new VirtualAsciiFile(); // for the time being, this is ok. When we can, add some logic in the sprintf calls that checks if a longer buffer would be needed, and in case expand it // in any case, replace all sprintf() with snprintf(), to avoid in any case writing more than the available buffer size char virtual_line[256]; + // Create and initialise pristine cid for MPI proc 0 and thread 0 ClusterIterationData *cid = new ClusterIterationData(gconf, sconf, mpidata); const int ndi = cid->c4->nsph * cid->c4->nlim; np_int ndit = 2 * ndi; logger->log("INFO: Size of matrices to invert: " + to_string((int64_t)ndit) + " x " + to_string((int64_t)ndit) +".\n"); time_logger->log("INFO: Size of matrices to invert: " + to_string((int64_t)ndit) + " x " + to_string((int64_t)ndit) +".\n"); + + //========================== + // Write a block of info to the ascii output file + //========================== sprintf(virtual_line, " READ(IR,*)NSPH,LI,LE,MXNDM,INPOL,NPNT,NPNTTS,IAVM,ISAM\n\0"); p_output->append_line(virtual_line); sprintf(virtual_line, " %5d%5d%5d%5ld%5d%5d%5d%5d%5d\n\0", @@ -228,220 +253,260 @@ void cluster(const string& config_file, const string& data_file, const string& o double exri = sqrt(exdc); sprintf(virtual_line, " REFR. INDEX OF EXTERNAL MEDIUM=%15.7lE\n\0", exri); p_output->append_line(virtual_line); + + // Create empty virtual binary file VirtualBinaryFile *vtppoanp = new VirtualBinaryFile(); - VirtualBinaryLine *vbinline = NULL; - // fstream *tppoanp = new fstream; - // fstream &tppoan = *tppoanp; string tppoan_name = output_path + "/c_TPPOAN"; - // tppoan.open(tppoan_name.c_str(), ios::out | ios::binary); - // if (tppoan.is_open()) { #ifdef USE_MAGMA - logger->log("INFO: using MAGMA calls.\n", LOG_INFO); + logger->log("INFO: using MAGMA calls.\n", LOG_INFO); #elif defined USE_LAPACK - logger->log("INFO: using LAPACK calls.\n", LOG_INFO); + logger->log("INFO: using LAPACK calls.\n", LOG_INFO); #else - logger->log("INFO: using fall-back lucin() calls.\n", LOG_INFO); + logger->log("INFO: using fall-back lucin() calls.\n", LOG_INFO); #endif - int iavm = gconf->iavm; - int isam = gconf->isam; - int inpol = gconf->in_pol; - int nxi = sconf->number_of_scales; - int nth = p_scattering_angles->nth; - int nths = p_scattering_angles->nths; - int nph = p_scattering_angles->nph; - int nphs = p_scattering_angles->nphs; - // tppoan.write(reinterpret_cast<char *>(&iavm), sizeof(int)); - vtppoanp->append_line(VirtualBinaryLine(iavm)); - // tppoan.write(reinterpret_cast<char *>(&isam), sizeof(int)); - vtppoanp->append_line(VirtualBinaryLine(isam)); - // tppoan.write(reinterpret_cast<char *>(&inpol), sizeof(int)); - vtppoanp->append_line(VirtualBinaryLine(inpol)); - // tppoan.write(reinterpret_cast<char *>(&nxi), sizeof(int)); - vtppoanp->append_line(VirtualBinaryLine(nxi)); - // tppoan.write(reinterpret_cast<char *>(&nth), sizeof(int)); - vtppoanp->append_line(VirtualBinaryLine(nth)); - // tppoan.write(reinterpret_cast<char *>(&nph), sizeof(int)); - vtppoanp->append_line(VirtualBinaryLine(nph)); - // tppoan.write(reinterpret_cast<char *>(&nths), sizeof(int)); - vtppoanp->append_line(VirtualBinaryLine(nths)); - // tppoan.write(reinterpret_cast<char *>(&nphs), sizeof(int)); - vtppoanp->append_line(VirtualBinaryLine(nphs)); - if (sconf->idfc < 0) { - cid->vk = cid->xip * cid->wn; - sprintf(virtual_line, " VK=%15.7lE, XI IS SCALE FACTOR FOR LENGTHS\n\0", cid->vk); - p_output->append_line(virtual_line); - sprintf(virtual_line, " \n\0"); - p_output->append_line(virtual_line); - } - // do the first iteration on jxi488 separately, since it seems to be different from the others - int jxi488 = 1; - chrono::time_point<chrono::high_resolution_clock> start_iter_1 = chrono::high_resolution_clock::now(); + int iavm = gconf->iavm; + int isam = gconf->isam; + int inpol = gconf->in_pol; + int nxi = sconf->number_of_scales; + int nth = p_scattering_angles->nth; + int nths = p_scattering_angles->nths; + int nph = p_scattering_angles->nph; + int nphs = p_scattering_angles->nphs; + + //======================== + // write a block of info to virtual binary file + //======================== + vtppoanp->append_line(VirtualBinaryLine(iavm)); + vtppoanp->append_line(VirtualBinaryLine(isam)); + vtppoanp->append_line(VirtualBinaryLine(inpol)); + vtppoanp->append_line(VirtualBinaryLine(nxi)); + vtppoanp->append_line(VirtualBinaryLine(nth)); + vtppoanp->append_line(VirtualBinaryLine(nph)); + vtppoanp->append_line(VirtualBinaryLine(nths)); + vtppoanp->append_line(VirtualBinaryLine(nphs)); + if (sconf->idfc < 0) { + cid->vk = cid->xip * cid->wn; + sprintf(virtual_line, " VK=%15.7lE, XI IS SCALE FACTOR FOR LENGTHS\n\0", cid->vk); + p_output->append_line(virtual_line); + sprintf(virtual_line, " \n\0"); + p_output->append_line(virtual_line); + } + + // do the first iteration on jxi488 separately, since it seems to be different from the others + int jxi488 = 1; + chrono::time_point<chrono::high_resolution_clock> start_iter_1 = chrono::high_resolution_clock::now(); #ifdef USE_NVTX - nvtxRangePush("First iteration"); + nvtxRangePush("First iteration"); #endif - int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid, p_output, output_path, vtppoanp); + int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid, p_output, output_path, vtppoanp); #ifdef USE_NVTX - nvtxRangePop(); + nvtxRangePop(); #endif - chrono::time_point<chrono::high_resolution_clock> end_iter_1 = chrono::high_resolution_clock::now(); - elapsed = start_iter_1 - t_start; - string message = "INFO: Calculation setup took " + to_string(elapsed.count()) + "s.\n"; - logger->log(message); - time_logger->log(message); - elapsed = end_iter_1 - start_iter_1; - message = "INFO: First iteration took " + to_string(elapsed.count()) + "s.\n"; - logger->log(message); - time_logger->log(message); - if (jer != 0) { - // First loop failed. Halt the calculation. - // tppoan.close(); - fclose(timing_file); - //fclose(output); - delete p_output; - delete p_scattering_angles; - delete cid; - delete logger; - delete time_logger; - delete sconf; - delete gconf; - return; - } - // do the first outputs here, so that I open here the new files, afterwards I only append - p_output->write_to_disk(output_path + "/c_OCLU"); - // reallocate a new one (even if it would be more efficient to emty the existing one + chrono::time_point<chrono::high_resolution_clock> end_iter_1 = chrono::high_resolution_clock::now(); + elapsed = start_iter_1 - t_start; + string message = "INFO: Calculation setup took " + to_string(elapsed.count()) + "s.\n"; + logger->log(message); + time_logger->log(message); + elapsed = end_iter_1 - start_iter_1; + message = "INFO: First iteration took " + to_string(elapsed.count()) + "s.\n"; + logger->log(message); + time_logger->log(message); + if (jer != 0) { + // First loop failed. Halt the calculation. + fclose(timing_file); delete p_output; - p_output = new VirtualAsciiFile(); - // now tppoan - vtppoanp->write_to_disk(output_path + "/c_TPPOAN"); - delete vtppoanp; - vtppoanp = new VirtualBinaryFile(); + delete p_scattering_angles; + delete cid; + delete logger; + delete time_logger; + delete sconf; + delete gconf; + return; + } - // here go the calls that send data to be duplicated on other MPI processes from process 0 to others, using MPI broadcasts, but only if MPI is actually used + //================================================== + // do the first outputs here, so that I open here the new files, afterwards I only append + //================================================== + p_output->write_to_disk(output_path + "/c_OCLU"); + delete p_output; + vtppoanp->write_to_disk(output_path + "/c_TPPOAN"); + delete vtppoanp; + + // here go the calls that send data to be duplicated on other MPI processes from process 0 to others, using MPI broadcasts, but only if MPI is actually used #ifdef MPI_VERSION - if (mpidata->mpirunning) { - gconf->mpibcast(mpidata); - sconf->mpibcast(mpidata); - cid->mpibcast(mpidata); - p_scattering_angles->mpibcast(mpidata); - } + if (mpidata->mpirunning) { + gconf->mpibcast(mpidata); + sconf->mpibcast(mpidata); + cid->mpibcast(mpidata); + p_scattering_angles->mpibcast(mpidata); + } #endif - // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled - int ompnumthreads = 1; - VirtualAsciiFile **p_outarray = NULL; - VirtualBinaryFile **vtppoanarray = NULL; + // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled + int ompnumthreads = 1; + // this is for MPI process 0 (or even if we are not using MPI at all) + int myjxi488startoffset = 0; + int myMPIstride = ompnumthreads; + int myMPIblock = ompnumthreads; + // Define here shared arrays of virtual ascii and binary files, so that thread 0 will be able to access the all later + VirtualAsciiFile **p_outarray = NULL; + VirtualBinaryFile **vtppoanarray = NULL; #ifdef USE_NVTX - nvtxRangePush("Parallel loop"); + nvtxRangePush("Parallel loop"); #endif + + //=========================================== + // open the OpenMP parallel context, so each thread can initialise its stuff + //=========================================== #pragma omp parallel - { - // Create and initialise this variable here, so that if OpenMP is enabled it is local to the thread, and if OpenMP is not enabled it has a well-defiled value anyway - int myompthread = 0; + { + // Create and initialise this variable here, so that if OpenMP is enabled it is local to the thread, and if OpenMP is not enabled it has a well-defiled value anyway + int myompthread = 0; + #ifdef _OPENMP - // If OpenMP is enabled, give actual values to myompthread and ompnumthreads, and open thread-local output files - myompthread = omp_get_thread_num(); - if (myompthread == 0) ompnumthreads = omp_get_num_threads(); + // If OpenMP is enabled, give actual values to myompthread and ompnumthreads, and open thread-local output files + myompthread = omp_get_thread_num(); + if (myompthread == 0) ompnumthreads = omp_get_num_threads(); #endif - if (myompthread == 0) { - p_outarray = new VirtualAsciiFile*[ompnumthreads]; - vtppoanarray = new VirtualBinaryFile*[ompnumthreads]; - } - // To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number - ClusterIterationData *cid_2 = NULL; - //FILE *output_2 = NULL; - VirtualAsciiFile *p_output_2 = NULL; - VirtualBinaryFile *vtppoanp_2 = NULL; - // fstream *tppoanp_2 = NULL; - // for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones - if (myompthread == 0) { - cid_2 = cid; - //output_2 = output; - p_output_2 = p_output; - // tppoanp_2 = tppoanp; - vtppoanp_2 = vtppoanp; - } else { - // this is not thread 0, so do create fresh copies of all local variables - cid_2 = new ClusterIterationData(*cid); - //output_2 = fopen((output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), "w"); - p_output_2 = new VirtualAsciiFile(); - vtppoanp_2 = new VirtualBinaryFile(); - // tppoanp_2 = new fstream; - // tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), ios::out | ios::binary); + + if (myompthread == 0) { + // Initialise some shared variables only on thread 0 + p_outarray = new VirtualAsciiFile*[ompnumthreads]; + vtppoanarray = new VirtualBinaryFile*[ompnumthreads]; + myMPIblock = ompnumthreads; + myMPIstride = myMPIblock; + } + +#ifdef MPI_VERSION + if (myompthread == 0) { + if (mpidata->mpirunning) { + // only go through this if MPI has been actually used + for (int rr=1; rr<mpidata->nprocs; rr++) { + // individually send their respective starting points to other MPI processes: they start immediately after the frequencies computed by previous processes so far + int remotejxi488startoffset = myMPIstride; + MPI_Send(&remotejxi488startoffset, 1, MPI_INT, rr, 3, MPI_COMM_WORLD); + int remoteMPIblock; + MPI_Recv(&remoteMPIblock, 1, MPI_INT, rr, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + // update myMPIstride to include the ones due to MPI process rr + myMPIstride += remoteMPIblock; + } + // now I know the total myMPIstride, I can send it to all processes + MPI_Bcast(&myMPIstride, 1, MPI_INT, 0, MPI_COMM_WORLD); } + } +#endif + // add an omp barrier to make sure that the global variables defined by thread 0 are known to all threads below this +#pragma omp barrier + + // To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number + ClusterIterationData *cid_2 = NULL; + VirtualAsciiFile *p_output_2 = NULL; + VirtualBinaryFile *vtppoanp_2 = NULL; + // for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones + if (myompthread == 0) { + cid_2 = cid; + } else { + // this is not thread 0, so do create fresh copies of all local variables + cid_2 = new ClusterIterationData(*cid); + } + // make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads + if (myompthread==0) { + logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n"); + } +#pragma omp barrier + // ok, now I can actually start the parallel calculations + for (int ixi488=2; ixi488<=cid_2->number_of_scales; ixi488 +=myMPIstride) { + // the parallel loop over MPI processes covers a different set of indices for each thread +#pragma omp barrier + int myjxi488 = ixi488+myompthread; + // each thread opens new virtual files and stores their pointers in the shared array + p_output_2 = new VirtualAsciiFile(); + vtppoanp_2 = new VirtualBinaryFile(); + // each thread puts a copy of the pointers to its virtual files in the shared arrays p_outarray[myompthread] = p_output_2; vtppoanarray[myompthread] = vtppoanp_2; - // fstream &tppoan_2 = *tppoanp_2; - // make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads #pragma omp barrier - if (myompthread==0) { - logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n"); - } - // ok, now I can actually start the parallel calculations -#pragma omp for - for (jxi488 = cid_2->firstxi; jxi488 <= cid_2->lastxi; jxi488++) { - int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid_2, p_output_2, output_path, vtppoanp_2); + + // each MPI process handles a number of contiguous scales corresponding to its number of OMP threads at this omp level of parallelism + if (myjxi488<=cid_2->number_of_scales) { + int jer = cluster_jxi488_cycle(myjxi488, sconf, gconf, p_scattering_angles, cid_2, p_output_2, output_path, vtppoanp_2); } +#pragma omp barrier +#ifdef USE_NVTX + nvtxRangePush("Output concatenation"); +#endif #pragma omp barrier - // only threads different from 0 have to free local copies of variables and close local files - if (myompthread != 0) { - delete cid_2; + // threads different from 0 append their virtual files to the one of thread 0, and delete them + if (myompthread == 0) { + for (int ti=1; ti<ompnumthreads; ti++) { + p_outarray[0]->append(*(p_outarray[ti])); + delete p_outarray[ti]; + vtppoanarray[0]->append(*(vtppoanarray[ti])); + delete vtppoanarray[ti]; + } } #pragma omp barrier - { - string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n"; - logger->log(message); + //============================================== + // Collect all virtual files on thread 0 of MPI process 0, and append them to disk + //============================================== + if (myompthread == 0) { + // thread 0 writes its virtual files, now including contributions from all threads, to disk, and deletes them + p_outarray[0]->append_to_disk(output_path + "/c_OCLU"); + delete p_outarray[0]; + vtppoanarray[0]->append_to_disk(output_path + "/c_TPPOAN"); + delete vtppoanarray[0]; + +#ifdef MPI_VERSION + if (mpidata->mpirunning) { + // only go through this if MPI has been actually used + for (int rr=1; rr<mpidata->nprocs; rr++) { + // get the data from process rr, creating a new virtual ascii file + VirtualAsciiFile *p_output = new VirtualAsciiFile(mpidata, rr); + // append to disk and delete virtual ascii file + p_output->append_to_disk(output_path + "/c_OCLU"); + delete p_output; + // get the data from process rr, creating a new virtual binary file + VirtualBinaryFile *vtppoanp = new VirtualBinaryFile(mpidata, rr); + // append to disk and delete virtual binary file + vtppoanp->append_to_disk(output_path + "/c_TPPOAN"); + delete vtppoanp; + int test = MPI_Barrier(MPI_COMM_WORLD); + } + } +#endif } - } // closes pragma omp parallel + // end block writing to disk #ifdef USE_NVTX - nvtxRangePop(); - - nvtxRangePush("Output concatenation"); + nvtxRangePop(); #endif #pragma omp barrier - { - // thread 0 already wrote on global files, skip it and take care of appending the others - for (int ti=1; ti<ompnumthreads; ti++) { - p_outarray[0]->append(*(p_outarray[ti])); - delete p_outarray[ti]; - vtppoanarray[0]->append(*(vtppoanarray[ti])); - delete vtppoanarray[ti]; - } - p_outarray[0]->append_to_disk(output_path + "/c_OCLU"); - delete p_outarray[0]; + + } // close strided loop running on MPI processes, ixi488 loop + // delete the shared arrays I used to make available to thread 0 the virtual files of other threads +#pragma omp barrier + if (myompthread == 0) { delete[] p_outarray; - vtppoanarray[0]->append_to_disk(output_path + "/c_TPPOAN"); - delete vtppoanarray[0]; delete[] vtppoanarray; } - // here go the code to append the files written in MPI processes > 0 to the ones on MPI process 0 -#ifdef MPI_VERSION - if (mpidata->mpirunning) { - // only go through this if MPI has been actually used - for (int rr=1; rr<mpidata->nprocs; rr++) { - // get the data from process rr - VirtualAsciiFile *p_output = new VirtualAsciiFile(mpidata, rr); - p_output->append_to_disk(output_path + "/c_OCLU"); - delete p_output; - VirtualBinaryFile *vtppoanp = new VirtualBinaryFile(mpidata, rr); - vtppoanp->append_to_disk(output_path + "/c_TPPOAN"); - delete vtppoanp; - int test = MPI_Barrier(MPI_COMM_WORLD); - } + { + string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n"; + logger->log(message); } - -#endif #ifdef USE_NVTX nvtxRangePop(); #endif - // Clean memory - delete cid; + delete cid_2; + } delete p_scattering_angles; - } else { // NSPH mismatch between geometry and scatterer configurations. + } + + else { // NSPH mismatch between geometry and scatterer configurations. throw UnrecognizedConfigurationException( "Inconsistent geometry and scatterer configurations." ); } + delete sconf; delete gconf; chrono::time_point<chrono::high_resolution_clock> t_end = chrono::high_resolution_clock::now(); @@ -450,8 +515,11 @@ void cluster(const string& config_file, const string& data_file, const string& o logger->log(message); logger->log("Finished: output written to " + output_path + "/c_OCLU\n"); time_logger->log(message); - } - + } // end instructions block of MPI process 0 + + //=============================== + // instruction block for MPI processes different from 0 + //=============================== #ifdef MPI_VERSION else { // here go the code for MPI processes other than 0 @@ -460,12 +528,15 @@ void cluster(const string& config_file, const string& data_file, const string& o ScattererConfiguration *sconf = new ScattererConfiguration(mpidata); ClusterIterationData *cid = new ClusterIterationData(mpidata); ScatteringAngles *p_scattering_angles = new ScatteringAngles(mpidata); - // open separate files for other MPI processes + // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled int ompnumthreads = 1; VirtualAsciiFile **p_outarray = NULL; VirtualBinaryFile **vtppoanarray = NULL; - + int myjxi488startoffset; + int myMPIstride = ompnumthreads; + int myMPIblock = ompnumthreads; + #pragma omp parallel { // Create and initialise this variable here, so that if OpenMP is enabled it is local to the thread, and if OpenMP is not enabled it has a well-defiled value anyway @@ -476,13 +547,22 @@ void cluster(const string& config_file, const string& data_file, const string& o if (myompthread == 0) ompnumthreads = omp_get_num_threads(); #endif if (myompthread == 0) { + // receive the start parameter from MPI process 0 + MPI_Recv(&myjxi488startoffset, 1, MPI_INT, 0, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + // send my number of omp threads to process 0 + MPI_Send(&ompnumthreads, 1, MPI_INT, 0, 3, MPI_COMM_WORLD); + // receive myMPIstride sent by MPI process 0 to all processes + MPI_Bcast(&myMPIstride, 1, MPI_INT, 0, MPI_COMM_WORLD); + // allocate virtualfiles for each thread p_outarray = new VirtualAsciiFile*[ompnumthreads]; vtppoanarray = new VirtualBinaryFile*[ompnumthreads]; } +#pragma omp barrier // To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number ClusterIterationData *cid_2 = NULL; VirtualAsciiFile *p_output_2 = NULL; VirtualBinaryFile *vtppoanp_2 = NULL; + // PLACEHOLDER // for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones if (myompthread == 0) { cid_2 = cid; @@ -490,65 +570,72 @@ void cluster(const string& config_file, const string& data_file, const string& o // this is not thread 0, so do create fresh copies of all local variables cid_2 = new ClusterIterationData(*cid); } - p_output_2 = new VirtualAsciiFile(); - p_outarray[myompthread] = p_output_2; - vtppoanp_2 = new VirtualBinaryFile(); - vtppoanarray[myompthread] = vtppoanp_2; // make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads #pragma omp barrier - if (myompthread==0) logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n"); // ok, now I can actually start the parallel calculations -#pragma omp for - for (int jxi488 = cid_2->firstxi; jxi488 <= cid_2->lastxi; jxi488++) { - int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid_2, p_output_2, output_path, vtppoanp_2); - } - + for (int ixi488=2; ixi488<=cid_2->number_of_scales; ixi488 +=myMPIstride) { + // the parallel loop over MPI processes covers a different set of indices for each thread #pragma omp barrier - // only threads different from 0 have to free local copies of variables - if (myompthread != 0) { - delete cid_2; - } + int myjxi488 = ixi488+myjxi488startoffset+myompthread; + // each thread opens new virtual files and stores their pointers in the shared array + p_output_2 = new VirtualAsciiFile(); + vtppoanp_2 = new VirtualBinaryFile(); + // each thread puts a copy of the pointers to its virtual files in the shared arrays + p_outarray[myompthread] = p_output_2; + vtppoanarray[myompthread] = vtppoanp_2; #pragma omp barrier - { - string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n"; - logger->log(message); - } - } // closes pragma omp parallel + if (myompthread==0) logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n"); + // ok, now I can actually start the parallel calculations + // each MPI process handles a number of contiguous scales corresponding to its number of OMP threads at this omp level of parallelism + if (myjxi488<=cid_2->number_of_scales) { + int jer = cluster_jxi488_cycle(myjxi488, sconf, gconf, p_scattering_angles, cid_2, p_output_2, output_path, vtppoanp_2); + } // close the OMP parallel for loop + #pragma omp barrier - { - for (int ti=1; ti<ompnumthreads; ti++) { - p_outarray[0]->append(*(p_outarray[ti])); - delete p_outarray[ti]; - vtppoanarray[0]->append(*(vtppoanarray[ti])); - delete vtppoanarray[ti]; - } - for (int rr = 1; rr < mpidata->nprocs; rr++) { - if (rr == mpidata->rank) { - p_outarray[0]->mpisend(mpidata); - delete p_outarray[0]; - delete[] p_outarray; - vtppoanarray[0]->mpisend(mpidata); - delete vtppoanarray[0]; - delete[] vtppoanarray; + // threads different from 0 append their virtual files to the one of thread 0, and delete them + if (myompthread == 0) { + for (int ti=1; ti<ompnumthreads; ti++) { + p_outarray[0]->append(*(p_outarray[ti])); + delete p_outarray[ti]; + vtppoanarray[0]->append(*(vtppoanarray[ti])); + delete vtppoanarray[ti]; + } + // thread 0 sends the collected virtualfiles to thread 0 of MPI process 0, then deletes them + for (int rr=1; rr<mpidata->nprocs; rr++) { + if (rr == mpidata->rank) { + p_outarray[0]->mpisend(mpidata); + delete p_outarray[0]; + vtppoanarray[0]->mpisend(mpidata); + delete vtppoanarray[0]; + } + int test = MPI_Barrier(MPI_COMM_WORLD); + } } - int test = MPI_Barrier(MPI_COMM_WORLD); + } // close strided loop running on MPI processes + + // Clean memory +#pragma omp barrier + if (myompthread == 0) { + delete[] p_outarray; + delete[] vtppoanarray; } - } - // Clean memory - delete cid; + delete cid_2; + + } // close pragma omp parallel delete p_scattering_angles; delete sconf; delete gconf; - - } #endif #ifdef USE_MAGMA - logger->log("INFO: Process " + to_string(mpidata->rank) + " finalizes MAGMA.\n"); - magma_finalize(); + logger->log("INFO: Process " + to_string(mpidata->rank) + " finalizes MAGMA.\n"); + magma_finalize(); +#endif + fclose(timing_file); + delete time_logger; + delete logger; +#ifdef MPI_VERSION + } #endif - fclose(timing_file); - delete time_logger; - delete logger; } int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConfiguration *gconf, ScatteringAngles *sa, ClusterIterationData *cid, VirtualAsciiFile *output, const string& output_path, VirtualBinaryFile *vtppoanp) @@ -987,11 +1074,9 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf output->append_line(virtual_line); sprintf(virtual_line, " Fk=%15.7lE\n\0", fz); output->append_line(virtual_line); - if (ilr210 == 1) { - double alamb = 2.0 * 3.141592653589793 / cid->vk; - sprintf(virtual_line, "INSERTION: CSM_CLUSTER %15.7lE%15.7lE%15.7lE%15.7lE\n\0", alamb, scasm, abssm, extsm); - output->append_line(virtual_line); - } + double alamb = 2.0 * 3.141592653589793 / cid->vk; + sprintf(virtual_line, "INSERTION: CSM_CLUSTER %15.7lE%15.7lE%15.7lE%15.7lE\n\0", alamb, scasm, abssm, extsm); + output->append_line(virtual_line); } // ilr210 loop double rmbrif = (real(cid->c1ao->fsacm[0][0]) - real(cid->c1ao->fsacm[1][1])) / real(cid->c1ao->fsacm[0][0]); double rmdchr = (imag(cid->c1ao->fsacm[0][0]) - imag(cid->c1ao->fsacm[1][1])) / imag(cid->c1ao->fsacm[0][0]); @@ -1263,9 +1348,9 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf output->append_line(virtual_line); double alamb = 2.0 * 3.141592653589793 / cid->vk; if (ilr290 == 1) { - sprintf(virtual_line, "INSERTION: CS1_CLUSTER %13.5lE%10.3lE%10.3lE%15.7lE%15.7lE%15.7lE\n\0", alamb, th, ths, scasec, abssec, extsec); + sprintf(virtual_line, "INSERTION: CS1_CLUSTER %9.2lf%9.2lf%9.2lf%9.2lf%9.2lf%13.5lf%13.5lf%13.5lf\n\0", alamb, th, ph, ths, phs, scasec, abssec, extsec); } else if (ilr290 == 2) { - sprintf(virtual_line, "INSERTION: CS2_CLUSTER %13.5lE%10.3lE%10.3lE%15.7lE%15.7lE%15.7lE\n\0", alamb, th, ths, scasec, abssec, extsec); + sprintf(virtual_line, "INSERTION: CS2_CLUSTER %9.2lf%9.2lf%9.2lf%9.2lf%9.2lf%13.5lf%13.5lf%13.5lf\n\0", alamb, th, ph, ths, phs, scasec, abssec, extsec); } output->append_line(virtual_line); bool goto190 = isam >= 0 && (jths > 1 || jphs > 1); diff --git a/src/libnptm/Commons.cpp b/src/libnptm/Commons.cpp index f6a1c46a93af496fda1eae00aeb3cbc492427a65..1410bbea6305e166786d05c0fb7b5f68df753812 100644 --- a/src/libnptm/Commons.cpp +++ b/src/libnptm/Commons.cpp @@ -1184,6 +1184,8 @@ ClusterIterationData::ClusterIterationData(const ClusterIterationData& rhs) { vk = rhs.vk; firstxi = rhs.firstxi; lastxi = rhs.lastxi; + xiblock = rhs.xiblock; + number_of_scales = rhs.number_of_scales; } #ifdef MPI_VERSION