Skip to content
Snippets Groups Projects
Commit 3632405a authored by Giovanni La Mura's avatar Giovanni La Mura
Browse files

Merge branch 'virtualize_output' into 'master'

Finalize transition to virtual output

See merge request giacomo.mulas/np_tmcode!39
parents 0a8a1bce 4c68446e
No related branches found
No related tags found
No related merge requests found
...@@ -94,6 +94,7 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf ...@@ -94,6 +94,7 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf
* \param data_file: `string` Name of the input data file. * \param data_file: `string` Name of the input data file.
* \param output_path: `string` Directory to write the output files in. * \param output_path: `string` Directory to write the output files in.
*/ */
void cluster(const string& config_file, const string& data_file, const string& output_path, const mixMPI *mpidata) { void cluster(const string& config_file, const string& data_file, const string& output_path, const mixMPI *mpidata) {
chrono::time_point<chrono::high_resolution_clock> t_start = chrono::high_resolution_clock::now(); chrono::time_point<chrono::high_resolution_clock> t_start = chrono::high_resolution_clock::now();
chrono::duration<double> elapsed; chrono::duration<double> elapsed;
...@@ -102,6 +103,10 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -102,6 +103,10 @@ void cluster(const string& config_file, const string& data_file, const string& o
FILE *timing_file = fopen(timing_name.c_str(), "w"); FILE *timing_file = fopen(timing_name.c_str(), "w");
Logger *time_logger = new Logger(LOG_DEBG, timing_file); Logger *time_logger = new Logger(LOG_DEBG, timing_file);
Logger *logger = new Logger(LOG_DEBG); Logger *logger = new Logger(LOG_DEBG);
//===========
// Initialise MAGMA
//===========
#ifdef USE_MAGMA #ifdef USE_MAGMA
int device_count; int device_count;
cudaGetDeviceCount(&device_count); cudaGetDeviceCount(&device_count);
...@@ -117,11 +122,18 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -117,11 +122,18 @@ void cluster(const string& config_file, const string& data_file, const string& o
return; return;
} }
#endif #endif
// end MAGMA initialisation
//===========================
// the following only happens on MPI process 0 // the following only happens on MPI process 0
//===========================
if (mpidata->rank == 0) { if (mpidata->rank == 0) {
#ifdef USE_NVTX #ifdef USE_NVTX
nvtxRangePush("Set up"); nvtxRangePush("Set up");
#endif #endif
//=======================
// Initialise sconf from configuration file
//=======================
logger->log("INFO: making legacy configuration...", LOG_INFO); logger->log("INFO: making legacy configuration...", LOG_INFO);
ScattererConfiguration *sconf = NULL; ScattererConfiguration *sconf = NULL;
try { try {
...@@ -138,6 +150,11 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -138,6 +150,11 @@ void cluster(const string& config_file, const string& data_file, const string& o
sconf->write_formatted(output_path + "/c_OEDFB"); sconf->write_formatted(output_path + "/c_OEDFB");
sconf->write_binary(output_path + "/c_TEDF"); sconf->write_binary(output_path + "/c_TEDF");
sconf->write_binary(output_path + "/c_TEDF.hd5", "HDF5"); sconf->write_binary(output_path + "/c_TEDF.hd5", "HDF5");
// end logger initialisation
//========================
// Initialise gconf from configuration files
//========================
GeometryConfiguration *gconf = NULL; GeometryConfiguration *gconf = NULL;
try { try {
gconf = GeometryConfiguration::from_legacy(data_file); gconf = GeometryConfiguration::from_legacy(data_file);
...@@ -152,25 +169,33 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -152,25 +169,33 @@ void cluster(const string& config_file, const string& data_file, const string& o
return; return;
} }
logger->log(" done.\n", LOG_INFO); logger->log(" done.\n", LOG_INFO);
//end gconf initialisation
#ifdef USE_NVTX #ifdef USE_NVTX
nvtxRangePop(); nvtxRangePop();
#endif #endif
int s_nsph = sconf->number_of_spheres; int s_nsph = sconf->number_of_spheres;
int nsph = gconf->number_of_spheres; int nsph = gconf->number_of_spheres;
// Sanity check on number of sphere consistency, should always be verified
if (s_nsph == nsph) { if (s_nsph == nsph) {
// Shortcuts to variables stored in configuration objects // Shortcuts to variables stored in configuration objects
ScatteringAngles *p_scattering_angles = new ScatteringAngles(gconf); ScatteringAngles *p_scattering_angles = new ScatteringAngles(gconf);
double wp = sconf->wp; double wp = sconf->wp;
//FILE *output = fopen((output_path + "/c_OCLU").c_str(), "w"); // Open empty virtual ascii file for output
VirtualAsciiFile *p_output = new VirtualAsciiFile(); VirtualAsciiFile *p_output = new VirtualAsciiFile();
// for the time being, this is ok. When we can, add some logic in the sprintf calls that checks if a longer buffer would be needed, and in case expand it // for the time being, this is ok. When we can, add some logic in the sprintf calls that checks if a longer buffer would be needed, and in case expand it
// in any case, replace all sprintf() with snprintf(), to avoid in any case writing more than the available buffer size // in any case, replace all sprintf() with snprintf(), to avoid in any case writing more than the available buffer size
char virtual_line[256]; char virtual_line[256];
// Create and initialise pristine cid for MPI proc 0 and thread 0
ClusterIterationData *cid = new ClusterIterationData(gconf, sconf, mpidata); ClusterIterationData *cid = new ClusterIterationData(gconf, sconf, mpidata);
const int ndi = cid->c4->nsph * cid->c4->nlim; const int ndi = cid->c4->nsph * cid->c4->nlim;
np_int ndit = 2 * ndi; np_int ndit = 2 * ndi;
logger->log("INFO: Size of matrices to invert: " + to_string((int64_t)ndit) + " x " + to_string((int64_t)ndit) +".\n"); logger->log("INFO: Size of matrices to invert: " + to_string((int64_t)ndit) + " x " + to_string((int64_t)ndit) +".\n");
time_logger->log("INFO: Size of matrices to invert: " + to_string((int64_t)ndit) + " x " + to_string((int64_t)ndit) +".\n"); time_logger->log("INFO: Size of matrices to invert: " + to_string((int64_t)ndit) + " x " + to_string((int64_t)ndit) +".\n");
//==========================
// Write a block of info to the ascii output file
//==========================
sprintf(virtual_line, " READ(IR,*)NSPH,LI,LE,MXNDM,INPOL,NPNT,NPNTTS,IAVM,ISAM\n\0"); sprintf(virtual_line, " READ(IR,*)NSPH,LI,LE,MXNDM,INPOL,NPNT,NPNTTS,IAVM,ISAM\n\0");
p_output->append_line(virtual_line); p_output->append_line(virtual_line);
sprintf(virtual_line, " %5d%5d%5d%5ld%5d%5d%5d%5d%5d\n\0", sprintf(virtual_line, " %5d%5d%5d%5ld%5d%5d%5d%5d%5d\n\0",
...@@ -228,220 +253,260 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -228,220 +253,260 @@ void cluster(const string& config_file, const string& data_file, const string& o
double exri = sqrt(exdc); double exri = sqrt(exdc);
sprintf(virtual_line, " REFR. INDEX OF EXTERNAL MEDIUM=%15.7lE\n\0", exri); sprintf(virtual_line, " REFR. INDEX OF EXTERNAL MEDIUM=%15.7lE\n\0", exri);
p_output->append_line(virtual_line); p_output->append_line(virtual_line);
// Create empty virtual binary file
VirtualBinaryFile *vtppoanp = new VirtualBinaryFile(); VirtualBinaryFile *vtppoanp = new VirtualBinaryFile();
VirtualBinaryLine *vbinline = NULL;
// fstream *tppoanp = new fstream;
// fstream &tppoan = *tppoanp;
string tppoan_name = output_path + "/c_TPPOAN"; string tppoan_name = output_path + "/c_TPPOAN";
// tppoan.open(tppoan_name.c_str(), ios::out | ios::binary);
// if (tppoan.is_open()) {
#ifdef USE_MAGMA #ifdef USE_MAGMA
logger->log("INFO: using MAGMA calls.\n", LOG_INFO); logger->log("INFO: using MAGMA calls.\n", LOG_INFO);
#elif defined USE_LAPACK #elif defined USE_LAPACK
logger->log("INFO: using LAPACK calls.\n", LOG_INFO); logger->log("INFO: using LAPACK calls.\n", LOG_INFO);
#else #else
logger->log("INFO: using fall-back lucin() calls.\n", LOG_INFO); logger->log("INFO: using fall-back lucin() calls.\n", LOG_INFO);
#endif #endif
int iavm = gconf->iavm; int iavm = gconf->iavm;
int isam = gconf->isam; int isam = gconf->isam;
int inpol = gconf->in_pol; int inpol = gconf->in_pol;
int nxi = sconf->number_of_scales; int nxi = sconf->number_of_scales;
int nth = p_scattering_angles->nth; int nth = p_scattering_angles->nth;
int nths = p_scattering_angles->nths; int nths = p_scattering_angles->nths;
int nph = p_scattering_angles->nph; int nph = p_scattering_angles->nph;
int nphs = p_scattering_angles->nphs; int nphs = p_scattering_angles->nphs;
// tppoan.write(reinterpret_cast<char *>(&iavm), sizeof(int));
vtppoanp->append_line(VirtualBinaryLine(iavm)); //========================
// tppoan.write(reinterpret_cast<char *>(&isam), sizeof(int)); // write a block of info to virtual binary file
vtppoanp->append_line(VirtualBinaryLine(isam)); //========================
// tppoan.write(reinterpret_cast<char *>(&inpol), sizeof(int)); vtppoanp->append_line(VirtualBinaryLine(iavm));
vtppoanp->append_line(VirtualBinaryLine(inpol)); vtppoanp->append_line(VirtualBinaryLine(isam));
// tppoan.write(reinterpret_cast<char *>(&nxi), sizeof(int)); vtppoanp->append_line(VirtualBinaryLine(inpol));
vtppoanp->append_line(VirtualBinaryLine(nxi)); vtppoanp->append_line(VirtualBinaryLine(nxi));
// tppoan.write(reinterpret_cast<char *>(&nth), sizeof(int)); vtppoanp->append_line(VirtualBinaryLine(nth));
vtppoanp->append_line(VirtualBinaryLine(nth)); vtppoanp->append_line(VirtualBinaryLine(nph));
// tppoan.write(reinterpret_cast<char *>(&nph), sizeof(int)); vtppoanp->append_line(VirtualBinaryLine(nths));
vtppoanp->append_line(VirtualBinaryLine(nph)); vtppoanp->append_line(VirtualBinaryLine(nphs));
// tppoan.write(reinterpret_cast<char *>(&nths), sizeof(int)); if (sconf->idfc < 0) {
vtppoanp->append_line(VirtualBinaryLine(nths)); cid->vk = cid->xip * cid->wn;
// tppoan.write(reinterpret_cast<char *>(&nphs), sizeof(int)); sprintf(virtual_line, " VK=%15.7lE, XI IS SCALE FACTOR FOR LENGTHS\n\0", cid->vk);
vtppoanp->append_line(VirtualBinaryLine(nphs)); p_output->append_line(virtual_line);
if (sconf->idfc < 0) { sprintf(virtual_line, " \n\0");
cid->vk = cid->xip * cid->wn; p_output->append_line(virtual_line);
sprintf(virtual_line, " VK=%15.7lE, XI IS SCALE FACTOR FOR LENGTHS\n\0", cid->vk); }
p_output->append_line(virtual_line);
sprintf(virtual_line, " \n\0"); // do the first iteration on jxi488 separately, since it seems to be different from the others
p_output->append_line(virtual_line); int jxi488 = 1;
} chrono::time_point<chrono::high_resolution_clock> start_iter_1 = chrono::high_resolution_clock::now();
// do the first iteration on jxi488 separately, since it seems to be different from the others
int jxi488 = 1;
chrono::time_point<chrono::high_resolution_clock> start_iter_1 = chrono::high_resolution_clock::now();
#ifdef USE_NVTX #ifdef USE_NVTX
nvtxRangePush("First iteration"); nvtxRangePush("First iteration");
#endif #endif
int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid, p_output, output_path, vtppoanp); int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid, p_output, output_path, vtppoanp);
#ifdef USE_NVTX #ifdef USE_NVTX
nvtxRangePop(); nvtxRangePop();
#endif #endif
chrono::time_point<chrono::high_resolution_clock> end_iter_1 = chrono::high_resolution_clock::now(); chrono::time_point<chrono::high_resolution_clock> end_iter_1 = chrono::high_resolution_clock::now();
elapsed = start_iter_1 - t_start; elapsed = start_iter_1 - t_start;
string message = "INFO: Calculation setup took " + to_string(elapsed.count()) + "s.\n"; string message = "INFO: Calculation setup took " + to_string(elapsed.count()) + "s.\n";
logger->log(message); logger->log(message);
time_logger->log(message); time_logger->log(message);
elapsed = end_iter_1 - start_iter_1; elapsed = end_iter_1 - start_iter_1;
message = "INFO: First iteration took " + to_string(elapsed.count()) + "s.\n"; message = "INFO: First iteration took " + to_string(elapsed.count()) + "s.\n";
logger->log(message); logger->log(message);
time_logger->log(message); time_logger->log(message);
if (jer != 0) { if (jer != 0) {
// First loop failed. Halt the calculation. // First loop failed. Halt the calculation.
// tppoan.close(); fclose(timing_file);
fclose(timing_file);
//fclose(output);
delete p_output;
delete p_scattering_angles;
delete cid;
delete logger;
delete time_logger;
delete sconf;
delete gconf;
return;
}
// do the first outputs here, so that I open here the new files, afterwards I only append
p_output->write_to_disk(output_path + "/c_OCLU");
// reallocate a new one (even if it would be more efficient to emty the existing one
delete p_output; delete p_output;
p_output = new VirtualAsciiFile(); delete p_scattering_angles;
// now tppoan delete cid;
vtppoanp->write_to_disk(output_path + "/c_TPPOAN"); delete logger;
delete vtppoanp; delete time_logger;
vtppoanp = new VirtualBinaryFile(); delete sconf;
delete gconf;
return;
}
// here go the calls that send data to be duplicated on other MPI processes from process 0 to others, using MPI broadcasts, but only if MPI is actually used //==================================================
// do the first outputs here, so that I open here the new files, afterwards I only append
//==================================================
p_output->write_to_disk(output_path + "/c_OCLU");
delete p_output;
vtppoanp->write_to_disk(output_path + "/c_TPPOAN");
delete vtppoanp;
// here go the calls that send data to be duplicated on other MPI processes from process 0 to others, using MPI broadcasts, but only if MPI is actually used
#ifdef MPI_VERSION #ifdef MPI_VERSION
if (mpidata->mpirunning) { if (mpidata->mpirunning) {
gconf->mpibcast(mpidata); gconf->mpibcast(mpidata);
sconf->mpibcast(mpidata); sconf->mpibcast(mpidata);
cid->mpibcast(mpidata); cid->mpibcast(mpidata);
p_scattering_angles->mpibcast(mpidata); p_scattering_angles->mpibcast(mpidata);
} }
#endif #endif
// Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled
int ompnumthreads = 1; int ompnumthreads = 1;
VirtualAsciiFile **p_outarray = NULL; // this is for MPI process 0 (or even if we are not using MPI at all)
VirtualBinaryFile **vtppoanarray = NULL; int myjxi488startoffset = 0;
int myMPIstride = ompnumthreads;
int myMPIblock = ompnumthreads;
// Define here shared arrays of virtual ascii and binary files, so that thread 0 will be able to access the all later
VirtualAsciiFile **p_outarray = NULL;
VirtualBinaryFile **vtppoanarray = NULL;
#ifdef USE_NVTX #ifdef USE_NVTX
nvtxRangePush("Parallel loop"); nvtxRangePush("Parallel loop");
#endif #endif
//===========================================
// open the OpenMP parallel context, so each thread can initialise its stuff
//===========================================
#pragma omp parallel #pragma omp parallel
{ {
// Create and initialise this variable here, so that if OpenMP is enabled it is local to the thread, and if OpenMP is not enabled it has a well-defiled value anyway // Create and initialise this variable here, so that if OpenMP is enabled it is local to the thread, and if OpenMP is not enabled it has a well-defiled value anyway
int myompthread = 0; int myompthread = 0;
#ifdef _OPENMP #ifdef _OPENMP
// If OpenMP is enabled, give actual values to myompthread and ompnumthreads, and open thread-local output files // If OpenMP is enabled, give actual values to myompthread and ompnumthreads, and open thread-local output files
myompthread = omp_get_thread_num(); myompthread = omp_get_thread_num();
if (myompthread == 0) ompnumthreads = omp_get_num_threads(); if (myompthread == 0) ompnumthreads = omp_get_num_threads();
#endif #endif
if (myompthread == 0) {
p_outarray = new VirtualAsciiFile*[ompnumthreads]; if (myompthread == 0) {
vtppoanarray = new VirtualBinaryFile*[ompnumthreads]; // Initialise some shared variables only on thread 0
} p_outarray = new VirtualAsciiFile*[ompnumthreads];
// To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number vtppoanarray = new VirtualBinaryFile*[ompnumthreads];
ClusterIterationData *cid_2 = NULL; myMPIblock = ompnumthreads;
//FILE *output_2 = NULL; myMPIstride = myMPIblock;
VirtualAsciiFile *p_output_2 = NULL; }
VirtualBinaryFile *vtppoanp_2 = NULL;
// fstream *tppoanp_2 = NULL; #ifdef MPI_VERSION
// for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones if (myompthread == 0) {
if (myompthread == 0) { if (mpidata->mpirunning) {
cid_2 = cid; // only go through this if MPI has been actually used
//output_2 = output; for (int rr=1; rr<mpidata->nprocs; rr++) {
p_output_2 = p_output; // individually send their respective starting points to other MPI processes: they start immediately after the frequencies computed by previous processes so far
// tppoanp_2 = tppoanp; int remotejxi488startoffset = myMPIstride;
vtppoanp_2 = vtppoanp; MPI_Send(&remotejxi488startoffset, 1, MPI_INT, rr, 3, MPI_COMM_WORLD);
} else { int remoteMPIblock;
// this is not thread 0, so do create fresh copies of all local variables MPI_Recv(&remoteMPIblock, 1, MPI_INT, rr, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
cid_2 = new ClusterIterationData(*cid); // update myMPIstride to include the ones due to MPI process rr
//output_2 = fopen((output_path + "/c_OCLU_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), "w"); myMPIstride += remoteMPIblock;
p_output_2 = new VirtualAsciiFile(); }
vtppoanp_2 = new VirtualBinaryFile(); // now I know the total myMPIstride, I can send it to all processes
// tppoanp_2 = new fstream; MPI_Bcast(&myMPIstride, 1, MPI_INT, 0, MPI_COMM_WORLD);
// tppoanp_2->open((output_path + "/c_TPPOAN_" + to_string(mpidata->rank) + "_" + to_string(myompthread)).c_str(), ios::out | ios::binary);
} }
}
#endif
// add an omp barrier to make sure that the global variables defined by thread 0 are known to all threads below this
#pragma omp barrier
// To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number
ClusterIterationData *cid_2 = NULL;
VirtualAsciiFile *p_output_2 = NULL;
VirtualBinaryFile *vtppoanp_2 = NULL;
// for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones
if (myompthread == 0) {
cid_2 = cid;
} else {
// this is not thread 0, so do create fresh copies of all local variables
cid_2 = new ClusterIterationData(*cid);
}
// make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads
if (myompthread==0) {
logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n");
}
#pragma omp barrier
// ok, now I can actually start the parallel calculations
for (int ixi488=2; ixi488<=cid_2->number_of_scales; ixi488 +=myMPIstride) {
// the parallel loop over MPI processes covers a different set of indices for each thread
#pragma omp barrier
int myjxi488 = ixi488+myompthread;
// each thread opens new virtual files and stores their pointers in the shared array
p_output_2 = new VirtualAsciiFile();
vtppoanp_2 = new VirtualBinaryFile();
// each thread puts a copy of the pointers to its virtual files in the shared arrays
p_outarray[myompthread] = p_output_2; p_outarray[myompthread] = p_output_2;
vtppoanarray[myompthread] = vtppoanp_2; vtppoanarray[myompthread] = vtppoanp_2;
// fstream &tppoan_2 = *tppoanp_2;
// make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads
#pragma omp barrier #pragma omp barrier
if (myompthread==0) {
logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n"); // each MPI process handles a number of contiguous scales corresponding to its number of OMP threads at this omp level of parallelism
} if (myjxi488<=cid_2->number_of_scales) {
// ok, now I can actually start the parallel calculations int jer = cluster_jxi488_cycle(myjxi488, sconf, gconf, p_scattering_angles, cid_2, p_output_2, output_path, vtppoanp_2);
#pragma omp for
for (jxi488 = cid_2->firstxi; jxi488 <= cid_2->lastxi; jxi488++) {
int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid_2, p_output_2, output_path, vtppoanp_2);
} }
#pragma omp barrier
#ifdef USE_NVTX
nvtxRangePush("Output concatenation");
#endif
#pragma omp barrier #pragma omp barrier
// only threads different from 0 have to free local copies of variables and close local files // threads different from 0 append their virtual files to the one of thread 0, and delete them
if (myompthread != 0) { if (myompthread == 0) {
delete cid_2; for (int ti=1; ti<ompnumthreads; ti++) {
p_outarray[0]->append(*(p_outarray[ti]));
delete p_outarray[ti];
vtppoanarray[0]->append(*(vtppoanarray[ti]));
delete vtppoanarray[ti];
}
} }
#pragma omp barrier #pragma omp barrier
{ //==============================================
string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n"; // Collect all virtual files on thread 0 of MPI process 0, and append them to disk
logger->log(message); //==============================================
if (myompthread == 0) {
// thread 0 writes its virtual files, now including contributions from all threads, to disk, and deletes them
p_outarray[0]->append_to_disk(output_path + "/c_OCLU");
delete p_outarray[0];
vtppoanarray[0]->append_to_disk(output_path + "/c_TPPOAN");
delete vtppoanarray[0];
#ifdef MPI_VERSION
if (mpidata->mpirunning) {
// only go through this if MPI has been actually used
for (int rr=1; rr<mpidata->nprocs; rr++) {
// get the data from process rr, creating a new virtual ascii file
VirtualAsciiFile *p_output = new VirtualAsciiFile(mpidata, rr);
// append to disk and delete virtual ascii file
p_output->append_to_disk(output_path + "/c_OCLU");
delete p_output;
// get the data from process rr, creating a new virtual binary file
VirtualBinaryFile *vtppoanp = new VirtualBinaryFile(mpidata, rr);
// append to disk and delete virtual binary file
vtppoanp->append_to_disk(output_path + "/c_TPPOAN");
delete vtppoanp;
int test = MPI_Barrier(MPI_COMM_WORLD);
}
}
#endif
} }
} // closes pragma omp parallel // end block writing to disk
#ifdef USE_NVTX #ifdef USE_NVTX
nvtxRangePop(); nvtxRangePop();
nvtxRangePush("Output concatenation");
#endif #endif
#pragma omp barrier #pragma omp barrier
{
// thread 0 already wrote on global files, skip it and take care of appending the others } // close strided loop running on MPI processes, ixi488 loop
for (int ti=1; ti<ompnumthreads; ti++) { // delete the shared arrays I used to make available to thread 0 the virtual files of other threads
p_outarray[0]->append(*(p_outarray[ti])); #pragma omp barrier
delete p_outarray[ti]; if (myompthread == 0) {
vtppoanarray[0]->append(*(vtppoanarray[ti]));
delete vtppoanarray[ti];
}
p_outarray[0]->append_to_disk(output_path + "/c_OCLU");
delete p_outarray[0];
delete[] p_outarray; delete[] p_outarray;
vtppoanarray[0]->append_to_disk(output_path + "/c_TPPOAN");
delete vtppoanarray[0];
delete[] vtppoanarray; delete[] vtppoanarray;
} }
// here go the code to append the files written in MPI processes > 0 to the ones on MPI process 0 {
#ifdef MPI_VERSION string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n";
if (mpidata->mpirunning) { logger->log(message);
// only go through this if MPI has been actually used
for (int rr=1; rr<mpidata->nprocs; rr++) {
// get the data from process rr
VirtualAsciiFile *p_output = new VirtualAsciiFile(mpidata, rr);
p_output->append_to_disk(output_path + "/c_OCLU");
delete p_output;
VirtualBinaryFile *vtppoanp = new VirtualBinaryFile(mpidata, rr);
vtppoanp->append_to_disk(output_path + "/c_TPPOAN");
delete vtppoanp;
int test = MPI_Barrier(MPI_COMM_WORLD);
}
} }
#endif
#ifdef USE_NVTX #ifdef USE_NVTX
nvtxRangePop(); nvtxRangePop();
#endif #endif
// Clean memory delete cid_2;
delete cid; }
delete p_scattering_angles; delete p_scattering_angles;
} else { // NSPH mismatch between geometry and scatterer configurations. }
else { // NSPH mismatch between geometry and scatterer configurations.
throw UnrecognizedConfigurationException( throw UnrecognizedConfigurationException(
"Inconsistent geometry and scatterer configurations." "Inconsistent geometry and scatterer configurations."
); );
} }
delete sconf; delete sconf;
delete gconf; delete gconf;
chrono::time_point<chrono::high_resolution_clock> t_end = chrono::high_resolution_clock::now(); chrono::time_point<chrono::high_resolution_clock> t_end = chrono::high_resolution_clock::now();
...@@ -450,8 +515,11 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -450,8 +515,11 @@ void cluster(const string& config_file, const string& data_file, const string& o
logger->log(message); logger->log(message);
logger->log("Finished: output written to " + output_path + "/c_OCLU\n"); logger->log("Finished: output written to " + output_path + "/c_OCLU\n");
time_logger->log(message); time_logger->log(message);
} } // end instructions block of MPI process 0
//===============================
// instruction block for MPI processes different from 0
//===============================
#ifdef MPI_VERSION #ifdef MPI_VERSION
else { else {
// here go the code for MPI processes other than 0 // here go the code for MPI processes other than 0
...@@ -460,12 +528,15 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -460,12 +528,15 @@ void cluster(const string& config_file, const string& data_file, const string& o
ScattererConfiguration *sconf = new ScattererConfiguration(mpidata); ScattererConfiguration *sconf = new ScattererConfiguration(mpidata);
ClusterIterationData *cid = new ClusterIterationData(mpidata); ClusterIterationData *cid = new ClusterIterationData(mpidata);
ScatteringAngles *p_scattering_angles = new ScatteringAngles(mpidata); ScatteringAngles *p_scattering_angles = new ScatteringAngles(mpidata);
// open separate files for other MPI processes
// Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled // Create this variable and initialise it with a default here, so that it is defined anyway, with or without OpenMP support enabled
int ompnumthreads = 1; int ompnumthreads = 1;
VirtualAsciiFile **p_outarray = NULL; VirtualAsciiFile **p_outarray = NULL;
VirtualBinaryFile **vtppoanarray = NULL; VirtualBinaryFile **vtppoanarray = NULL;
int myjxi488startoffset;
int myMPIstride = ompnumthreads;
int myMPIblock = ompnumthreads;
#pragma omp parallel #pragma omp parallel
{ {
// Create and initialise this variable here, so that if OpenMP is enabled it is local to the thread, and if OpenMP is not enabled it has a well-defiled value anyway // Create and initialise this variable here, so that if OpenMP is enabled it is local to the thread, and if OpenMP is not enabled it has a well-defiled value anyway
...@@ -476,13 +547,22 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -476,13 +547,22 @@ void cluster(const string& config_file, const string& data_file, const string& o
if (myompthread == 0) ompnumthreads = omp_get_num_threads(); if (myompthread == 0) ompnumthreads = omp_get_num_threads();
#endif #endif
if (myompthread == 0) { if (myompthread == 0) {
// receive the start parameter from MPI process 0
MPI_Recv(&myjxi488startoffset, 1, MPI_INT, 0, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// send my number of omp threads to process 0
MPI_Send(&ompnumthreads, 1, MPI_INT, 0, 3, MPI_COMM_WORLD);
// receive myMPIstride sent by MPI process 0 to all processes
MPI_Bcast(&myMPIstride, 1, MPI_INT, 0, MPI_COMM_WORLD);
// allocate virtualfiles for each thread
p_outarray = new VirtualAsciiFile*[ompnumthreads]; p_outarray = new VirtualAsciiFile*[ompnumthreads];
vtppoanarray = new VirtualBinaryFile*[ompnumthreads]; vtppoanarray = new VirtualBinaryFile*[ompnumthreads];
} }
#pragma omp barrier
// To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number // To test parallelism, I will now start feeding this function with "clean" copies of the parameters, so that they will not be changed by previous iterations, and each one will behave as the first one. Define all (empty) variables here, so they have the correct scope, then they get different definitions depending on thread number
ClusterIterationData *cid_2 = NULL; ClusterIterationData *cid_2 = NULL;
VirtualAsciiFile *p_output_2 = NULL; VirtualAsciiFile *p_output_2 = NULL;
VirtualBinaryFile *vtppoanp_2 = NULL; VirtualBinaryFile *vtppoanp_2 = NULL;
// PLACEHOLDER
// for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones // for threads other than the 0, create distinct copies of all relevant data, while for thread 0 just define new references / pointers to the original ones
if (myompthread == 0) { if (myompthread == 0) {
cid_2 = cid; cid_2 = cid;
...@@ -490,65 +570,72 @@ void cluster(const string& config_file, const string& data_file, const string& o ...@@ -490,65 +570,72 @@ void cluster(const string& config_file, const string& data_file, const string& o
// this is not thread 0, so do create fresh copies of all local variables // this is not thread 0, so do create fresh copies of all local variables
cid_2 = new ClusterIterationData(*cid); cid_2 = new ClusterIterationData(*cid);
} }
p_output_2 = new VirtualAsciiFile();
p_outarray[myompthread] = p_output_2;
vtppoanp_2 = new VirtualBinaryFile();
vtppoanarray[myompthread] = vtppoanp_2;
// make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads // make sure all threads align here: I don't want the following loop to accidentally start for thread 0, possibly modifying some variables before they are copied by all other threads
#pragma omp barrier #pragma omp barrier
if (myompthread==0) logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n");
// ok, now I can actually start the parallel calculations // ok, now I can actually start the parallel calculations
#pragma omp for for (int ixi488=2; ixi488<=cid_2->number_of_scales; ixi488 +=myMPIstride) {
for (int jxi488 = cid_2->firstxi; jxi488 <= cid_2->lastxi; jxi488++) { // the parallel loop over MPI processes covers a different set of indices for each thread
int jer = cluster_jxi488_cycle(jxi488, sconf, gconf, p_scattering_angles, cid_2, p_output_2, output_path, vtppoanp_2);
}
#pragma omp barrier #pragma omp barrier
// only threads different from 0 have to free local copies of variables int myjxi488 = ixi488+myjxi488startoffset+myompthread;
if (myompthread != 0) { // each thread opens new virtual files and stores their pointers in the shared array
delete cid_2; p_output_2 = new VirtualAsciiFile();
} vtppoanp_2 = new VirtualBinaryFile();
// each thread puts a copy of the pointers to its virtual files in the shared arrays
p_outarray[myompthread] = p_output_2;
vtppoanarray[myompthread] = vtppoanp_2;
#pragma omp barrier #pragma omp barrier
{ if (myompthread==0) logger->log("Syncing OpenMP threads and starting the loop on wavelengths\n");
string message = "INFO: Closing thread-local output files of thread " + to_string(myompthread) + " and syncing threads.\n"; // ok, now I can actually start the parallel calculations
logger->log(message); // each MPI process handles a number of contiguous scales corresponding to its number of OMP threads at this omp level of parallelism
} if (myjxi488<=cid_2->number_of_scales) {
} // closes pragma omp parallel int jer = cluster_jxi488_cycle(myjxi488, sconf, gconf, p_scattering_angles, cid_2, p_output_2, output_path, vtppoanp_2);
} // close the OMP parallel for loop
#pragma omp barrier #pragma omp barrier
{ // threads different from 0 append their virtual files to the one of thread 0, and delete them
for (int ti=1; ti<ompnumthreads; ti++) { if (myompthread == 0) {
p_outarray[0]->append(*(p_outarray[ti])); for (int ti=1; ti<ompnumthreads; ti++) {
delete p_outarray[ti]; p_outarray[0]->append(*(p_outarray[ti]));
vtppoanarray[0]->append(*(vtppoanarray[ti])); delete p_outarray[ti];
delete vtppoanarray[ti]; vtppoanarray[0]->append(*(vtppoanarray[ti]));
} delete vtppoanarray[ti];
for (int rr = 1; rr < mpidata->nprocs; rr++) { }
if (rr == mpidata->rank) { // thread 0 sends the collected virtualfiles to thread 0 of MPI process 0, then deletes them
p_outarray[0]->mpisend(mpidata); for (int rr=1; rr<mpidata->nprocs; rr++) {
delete p_outarray[0]; if (rr == mpidata->rank) {
delete[] p_outarray; p_outarray[0]->mpisend(mpidata);
vtppoanarray[0]->mpisend(mpidata); delete p_outarray[0];
delete vtppoanarray[0]; vtppoanarray[0]->mpisend(mpidata);
delete[] vtppoanarray; delete vtppoanarray[0];
}
int test = MPI_Barrier(MPI_COMM_WORLD);
}
} }
int test = MPI_Barrier(MPI_COMM_WORLD); } // close strided loop running on MPI processes
// Clean memory
#pragma omp barrier
if (myompthread == 0) {
delete[] p_outarray;
delete[] vtppoanarray;
} }
} delete cid_2;
// Clean memory
delete cid; } // close pragma omp parallel
delete p_scattering_angles; delete p_scattering_angles;
delete sconf; delete sconf;
delete gconf; delete gconf;
}
#endif #endif
#ifdef USE_MAGMA #ifdef USE_MAGMA
logger->log("INFO: Process " + to_string(mpidata->rank) + " finalizes MAGMA.\n"); logger->log("INFO: Process " + to_string(mpidata->rank) + " finalizes MAGMA.\n");
magma_finalize(); magma_finalize();
#endif
fclose(timing_file);
delete time_logger;
delete logger;
#ifdef MPI_VERSION
}
#endif #endif
fclose(timing_file);
delete time_logger;
delete logger;
} }
int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConfiguration *gconf, ScatteringAngles *sa, ClusterIterationData *cid, VirtualAsciiFile *output, const string& output_path, VirtualBinaryFile *vtppoanp) int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConfiguration *gconf, ScatteringAngles *sa, ClusterIterationData *cid, VirtualAsciiFile *output, const string& output_path, VirtualBinaryFile *vtppoanp)
...@@ -987,11 +1074,9 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf ...@@ -987,11 +1074,9 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf
output->append_line(virtual_line); output->append_line(virtual_line);
sprintf(virtual_line, " Fk=%15.7lE\n\0", fz); sprintf(virtual_line, " Fk=%15.7lE\n\0", fz);
output->append_line(virtual_line); output->append_line(virtual_line);
if (ilr210 == 1) { double alamb = 2.0 * 3.141592653589793 / cid->vk;
double alamb = 2.0 * 3.141592653589793 / cid->vk; sprintf(virtual_line, "INSERTION: CSM_CLUSTER %15.7lE%15.7lE%15.7lE%15.7lE\n\0", alamb, scasm, abssm, extsm);
sprintf(virtual_line, "INSERTION: CSM_CLUSTER %15.7lE%15.7lE%15.7lE%15.7lE\n\0", alamb, scasm, abssm, extsm); output->append_line(virtual_line);
output->append_line(virtual_line);
}
} // ilr210 loop } // ilr210 loop
double rmbrif = (real(cid->c1ao->fsacm[0][0]) - real(cid->c1ao->fsacm[1][1])) / real(cid->c1ao->fsacm[0][0]); double rmbrif = (real(cid->c1ao->fsacm[0][0]) - real(cid->c1ao->fsacm[1][1])) / real(cid->c1ao->fsacm[0][0]);
double rmdchr = (imag(cid->c1ao->fsacm[0][0]) - imag(cid->c1ao->fsacm[1][1])) / imag(cid->c1ao->fsacm[0][0]); double rmdchr = (imag(cid->c1ao->fsacm[0][0]) - imag(cid->c1ao->fsacm[1][1])) / imag(cid->c1ao->fsacm[0][0]);
...@@ -1263,9 +1348,9 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf ...@@ -1263,9 +1348,9 @@ int cluster_jxi488_cycle(int jxi488, ScattererConfiguration *sconf, GeometryConf
output->append_line(virtual_line); output->append_line(virtual_line);
double alamb = 2.0 * 3.141592653589793 / cid->vk; double alamb = 2.0 * 3.141592653589793 / cid->vk;
if (ilr290 == 1) { if (ilr290 == 1) {
sprintf(virtual_line, "INSERTION: CS1_CLUSTER %13.5lE%10.3lE%10.3lE%15.7lE%15.7lE%15.7lE\n\0", alamb, th, ths, scasec, abssec, extsec); sprintf(virtual_line, "INSERTION: CS1_CLUSTER %9.2lf%9.2lf%9.2lf%9.2lf%9.2lf%13.5lf%13.5lf%13.5lf\n\0", alamb, th, ph, ths, phs, scasec, abssec, extsec);
} else if (ilr290 == 2) { } else if (ilr290 == 2) {
sprintf(virtual_line, "INSERTION: CS2_CLUSTER %13.5lE%10.3lE%10.3lE%15.7lE%15.7lE%15.7lE\n\0", alamb, th, ths, scasec, abssec, extsec); sprintf(virtual_line, "INSERTION: CS2_CLUSTER %9.2lf%9.2lf%9.2lf%9.2lf%9.2lf%13.5lf%13.5lf%13.5lf\n\0", alamb, th, ph, ths, phs, scasec, abssec, extsec);
} }
output->append_line(virtual_line); output->append_line(virtual_line);
bool goto190 = isam >= 0 && (jths > 1 || jphs > 1); bool goto190 = isam >= 0 && (jths > 1 || jphs > 1);
......
...@@ -1184,6 +1184,8 @@ ClusterIterationData::ClusterIterationData(const ClusterIterationData& rhs) { ...@@ -1184,6 +1184,8 @@ ClusterIterationData::ClusterIterationData(const ClusterIterationData& rhs) {
vk = rhs.vk; vk = rhs.vk;
firstxi = rhs.firstxi; firstxi = rhs.firstxi;
lastxi = rhs.lastxi; lastxi = rhs.lastxi;
xiblock = rhs.xiblock;
number_of_scales = rhs.number_of_scales;
} }
#ifdef MPI_VERSION #ifdef MPI_VERSION
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment