diff --git a/Readme.md b/Readme.md index 31ba125b9a7e483e34d57ddb8039568f7c7fd4d8..2f7537eb8a6a27dba10ea3c9444ea75cbd6314b7 100644 --- a/Readme.md +++ b/Readme.md @@ -35,5 +35,34 @@ Since the memory toll due to the non-cache friendly pattern, we compare 3 differ **v2**. copying all the neighbours into a memory buffer so that the loop is then linear on consequent memory addresses (of course could hardly been done in a real code) -**v3**. evolving the v2 in a more realistic way, having a thread that fetches neighbours particles in a limited buffer while other threads process them +**v3 (TBD)**. evolving the v2 in a more realistic way, having a thread that fetches neighbours particles in a limited buffer while other threads process them +All the previous tests are also reshaped in a version "b" in which the resulting force is not reduced to a single value but is still a vector of 3 components (TBD) + +------ + +### USAGE EXAMPLE + +1. generate a list of $N$ particles using some seed for the rnd generation + `./vect 0 $N $SEED` + This will produce a file `particle.data` that contains the data of + + - the generated particles + + - the active particles + + - the neighbours list for every active particle + + Hence, loading data from this file all the runs with different vectorization types will process exactly the same data + +2. process the data selecting an implementation + `./vect [1..5] ` + argument 1 to 5 selects the 5 different types of vectorization. + The execution produces an output file named + `force$X.$V.out` + where `$X` is either 1 or 2, depending on whether you used `vect.1` or `vect.2`, and `$V` is the vectorization type that has been run. + Using the code `compare_outputs.c` it is possible to compare the results against the fiducial case obtained running he vectorization type `0`. + The usage is simply + `./compare_outputs $file1 $file2` + + diff --git a/headers/vector_pragmas.h b/headers/vector_pragmas.h index a59fbbb0f7fe91c84af428bb3efda5631938a1e1..dbdd438be609f8fb63765d1b674fc5dc84ae3fd8 100644 --- a/headers/vector_pragmas.h +++ b/headers/vector_pragmas.h @@ -66,7 +66,7 @@ * ×××××××××××××××××××× * × LOOPS - - LOOP_UNROLL generic directive for unrollinf + - LOOP_UNROLL generic directive for unrolling - LOOP_UNROLL_N(n) directive for unrolling n times * ×××××××××××××××××××× diff --git a/headers/vector_types.h b/headers/vector_types.h index 78bb0edba1f1ca8bca24f6b0d014ac1f8ecb9621..e9d9628b343c1f950e115e410c59cbb75ed9e31f 100644 --- a/headers/vector_types.h +++ b/headers/vector_types.h @@ -71,6 +71,12 @@ #define vitype __m512 #define vlltype vitype +#define DVSIZE 8 +#define FVSIZE 16 +#define IVSIZE 16 +#define LLVSIZE 8 +#define VALIGN 64 + #elif defined ( __AVX__ ) || defined ( __AVX2__ ) #warning "found AVX/AVX2" @@ -81,6 +87,12 @@ #define vitype __m256 #define vlltype vitype +#define DVSIZE 4 +#define FVSIZE 8 +#define IVSIZE 8 +#define LLVSIZE 4 +#define VALIGN 32 + #elif defined ( __SSE4__ ) || defined ( __SSE3__ ) #warning "found SSE >= 3" @@ -90,6 +102,12 @@ #define vitype __m128 #define vlltype vitype +#define DVSIZE 2 +#define FVSIZE 8 +#define IVSIZE 8 +#define LLVSIZE 2 +#define VALIGN 32 + #else #define VSIZE (sizeof(double)) @@ -128,13 +146,13 @@ typedef int ivector_t; #else - +/* #define DVSIZE (VSIZE / sizeof(double)) #define FVSIZE (VSIZE / sizeof(float)) #define IVSIZE (VSIZE / sizeof(int)) #define LLVSIZE (VSIZE / sizeof(int)) #define VALIGN (VSIZE) - +*/ #if defined(__GNUC__) && !defined(__clang__) @@ -143,12 +161,31 @@ typedef float fvector_t __attribute__((vector_size (VSIZE))); typedef int ivector_t __attribute__((vector_size (VSIZE))); typedef long long llvector_t __attribute__((vector_size (VSIZE))); +#define GLUE(A,B) A ## B +#define INIT_VECTOR(myS) GLUE(LIST,myS) + +#define _LIST2_ X(value), X(value) +#define _LIST4_ _LIST2_ , _LIST2_ +#define _LIST8_ _LIST4_ , _LIST4_ +#define LIST2 { _LIST2_ } +#define LIST4 { _LIST4_ } +#define LIST8 { _LIST8_ } + +/* NOTE: HOW to initialize GCC vectors witha single value + + #define X(v) $SOME_VALUE + double vecta __attribute__((vector_size(8*sizeof(double)))) = INIT_VECTOR(SIZE); + #undef X + */ + #elif defined(__clang__) || defined(__INTEL_LLVM_COMPILER) typedef double dvector_t __attribute__((ext_vector_type (DVSIZE))); typedef float fvector_t __attribute__((ext_vector_type (FVSIZE))); typedef int ivector_t __attribute__((ext_vector_type (IVSIZE))); typedef long long llvector_t __attribute__((ext_vector_type (LLVSIZE))); + +#define INIT_VECTOR( _VTYPE_, _VSIZE_, value ) (_VTYPE_)(value); #endif diff --git a/src/compare_outputs.c b/src/compare_outputs.c index 751be0ae40a10124fdb7b8c326c2fb10299d4f1a..732d968dc51056e1b28b57cf31c2e5d0b86145db 100644 --- a/src/compare_outputs.c +++ b/src/compare_outputs.c @@ -70,7 +70,7 @@ int main ( int argc, char **argv ) } if ( faults ) - printf ( " »»» %d particles differ more tha the tolerance (%g)\n", + printf ( " »»» %d particles differ more than the tolerance (%g)\n", faults, tolerance ); else printf(" :) OK !!\n"); diff --git a/src/vect.1.c b/src/vect.1.c index 6636f416851c24efb2c191772b886012a7fffbc1..95805b62a850176025fe9f822cb232a26469919f 100644 --- a/src/vect.1.c +++ b/src/vect.1.c @@ -144,11 +144,12 @@ double process_with_structure( const P_t * restrict P, // force += Ptarget.mass * data[3] / dist2; - /* + double check = P[target].mass * data[3] / dist2; - printf("T%d n %d->%d %g %g %g %g\n", - target, i, ngb_list[i], dx*dx, dy*dy, dz*dz, check ); - */ + #if defined(DEBUG) + printf("T%d n %d->%d %g %g %g %g %g\n", + target, i, ngb_list[i], force, dx*dx, dy*dy, dz*dz, check ); + #endif data[0] = predata[0]; data[1] = predata[1]; @@ -246,11 +247,14 @@ double process_with_vectors( const v4df * restrict V, const int target, // vforce[k] += mm /( ((v4df_u)dist2).p[0] + ((v4df_u)dist2).p[1] + ((v4df_u)dist2).p[2] ); - /* printf("T%d n %d->%d %g %g %g %g\n", */ - /* target, i+k, ngb_list[i+k], */ - /* ((v4df_u)dist2).p[0], ((v4df_u)dist2).p[1], ((v4df_u)dist2).p[2], */ - /* mm /( ((v4df_u)dist2).p[0] + ((v4df_u)dist2).p[1] + ((v4df_u)dist2).p[2]) ); */ - + + #if defined(DEBUG) + printf("T%d n %d->%d %g %g %g %g %g\n", + target, i+k, ngb_list[i+k], + force, ((v4df_u)dist2).p[0], ((v4df_u)dist2).p[1], ((v4df_u)dist2).p[2], + mm /( ((v4df_u)dist2).p[0] + ((v4df_u)dist2).p[1] + ((v4df_u)dist2).p[2]) ); + #endif + } // transfer the pre-loaded data for the @@ -265,7 +269,7 @@ double process_with_vectors( const v4df * restrict V, const int target, for ( int k = 0; k < BUNCH; k++ ) force += vforce[k]; - // process the reamining particles + // process the remaining particles // for ( int i = _Nngb_; i < Nngb; i++ ) { @@ -284,6 +288,13 @@ double process_with_vectors( const v4df * restrict V, const int target, // get the force from the i-th neighbour // force += mm /( ((v4df_u)dist2).p[0] + ((v4df_u)dist2).p[1] + ((v4df_u)dist2).p[2] ); + + #if defined(DEBUG) + printf("T%d n %d->%d %g %g %g %g %g\n", + target, _Nngb_+i, ngb_list[_Nngb_+i], + force, ((v4df_u)dist2).p[0], ((v4df_u)dist2).p[1], ((v4df_u)dist2).p[2], + mm /( ((v4df_u)dist2).p[0] + ((v4df_u)dist2).p[1] + ((v4df_u)dist2).p[2]) ); + #endif } return force; @@ -307,20 +318,22 @@ double process_with_vectors_intrinsics( const double * restrict V, { #define BUNCH 8 // see comments at the begin of routine // process_with_vectors() - + + __m256d *VV = __builtin_assume_aligned ((__m256*)V, VALIGN); long long bb = -1; // used for masking entries of a vector __m256d mask_pos = _mm256_set_pd(0, *(double*)&bb, *(double*)&bb, *(double*)&bb); // used to mask out the mass __m256d mask_mass = _mm256_set_pd(*(double*)&bb, 0, 0, 0); // used to mask out the position const __m256d register vtarget = _mm256_load_pd(&V[target*4]); // load the target data - + // this is the particle of which we + // want to calculate the force + __m256d Vbuffer[BUNCH] = {0}; + double Vforce [BUNCH] = {0}; - __m256d Vbuffer[BUNCH]; - __m256d Vforce = {0}; - __m256d *VV = __builtin_assume_aligned ((__m256*)V, VALIGN); - - // set-up the new iteration spaces + // we'll be treating the neighbours in bunches + // set-up the new iteration spaces, the remainder + // will be accounted at the end // int _Nngb_; { @@ -330,14 +343,15 @@ double process_with_vectors_intrinsics( const double * restrict V, } int _Nngb_last_ = _Nngb_ - BUNCH; - // pre-load the very first BUNCH of particles + // load the very first BUNCH of particles // for (int j = 0; j < BUNCH; j++ ) Vbuffer [j] = _mm256_load_pd((double*)&VV[ngb_list[j]]); LOOP_UNROLL_N(1) for ( int i = 0; i < _Nngb_; i+= BUNCH ) - { + { + // hide the memory latency by pre-loding // the next BUNCH // @@ -346,8 +360,11 @@ double process_with_vectors_intrinsics( const double * restrict V, { int next_bunch = i+BUNCH; for (int j = 0; j < BUNCH; j++ ) { + // prevBuffer will be moved into Vbuffer + // at the end of the i iteration + // int k = ngb_list[next_bunch+j]; - preVbuffer [j] = _mm256_load_pd((double*)&VV[k]); } + preVbuffer [j] = _mm256_load_pd((double*)&VV[k]); } } @@ -399,13 +416,23 @@ double process_with_vectors_intrinsics( const double * restrict V, // // get [ 0, 0, 0, m0*m1 ] mprod = _mm256_and_pd( mprod, mask_mass ); + + double mm = ((double*)&mprod)[3]; // // get [ 0, 0, 0, m0*m1 / (dx^2+dy^2+dz^2) ] mprod = _mm256_div_pd( mprod, vdist ); // get the uppermost element from the vector // --> here we should use the reshuffle+extraction instruction - (((double*)&Vforce)[k]) += (((double*)&mprod)[3]); + Vforce[k] += (((double*)&mprod)[3]); + + #if defined(DEBUG) + __m256d _vdist = vdist; + printf("T%d n %d->%d %g %g %g\n", + target, i+k, ngb_list[i+k], + Vforce[k], ((double*)&_vdist)[0], + (((double*)&mprod)[3]) ); + #endif } @@ -414,9 +441,11 @@ double process_with_vectors_intrinsics( const double * restrict V, } - double force = 0; // this will accumulate the total force on target from all the neigbhours - for ( int k = 0; k < 4; k++ ) - force += ((double*)&Vforce)[k]; + // accumulate the partial results + // + double force = 0; + for ( int k = 0; k < BUNCH; k++ ) + force += Vforce[k]; // process the remaining particles // @@ -431,6 +460,13 @@ double process_with_vectors_intrinsics( const double * restrict V, dist2 += (dd[j]-V[k+j])*(dd[j]-V[k+j]); force += (dd[3] * V[k+3]) / dist2; + + #if defined(DEBUG) + printf("T%d n %d->%d %g %g %g %g %g\n", + target, _Nngb_+i, ngb_list[_Nngb_+i], + force, dist2, (dd[0]-V[k])*(dd[0]-V[k]), (dd[1]-V[k+1])*(dd[1]-V[k+1]), (dd[2]-V[k+2])*(dd[2]-V[k+2]), + dd[3] / dist2 ); + #endif } return force; @@ -468,11 +504,20 @@ double process_with_arrays( const double * restrict pos_x, { int log2_BUNCH = 0; while ( (BUNCH >> ++log2_BUNCH) > 0 ); + // + // _Nngb_ is the largest multiple of BUNCH + // that is smaller then NNgb + // For some reasons, this way the compiler + // spots that the code for peeling the loop + // between 0 to _Nngb_ is not needed + // _Nngb_ = Nngb & ((unsigned)-1 << (log2_BUNCH-1)); } int _Nngb_last_ = _Nngb_ - BUNCH; - + + // preload the first BUNCH of neighbours + // LOOP_UNROLL_N(BUNCH) for ( int j = 0; j < BUNCH; j++ ) vx[j] = pos_x[ngb_list[j]]; @@ -493,6 +538,10 @@ double process_with_arrays( const double * restrict pos_x, for ( int i = 0; i < _Nngb_; i+=BUNCH ) { + // pre-load the nect BUNCH if neigbours + // prevx, prevy, prevz and prevm will be moved to + // vx,vy,vz and vm at the end if this iteration + // double prevx[BUNCH], prevy[BUNCH], prevz[BUNCH], prevm[BUNCH]; if ( i < _Nngb_last_ ) { @@ -507,7 +556,7 @@ double process_with_arrays( const double * restrict pos_x, prevm[j] = mass[ngb_list[next+j]]; } - // get distances along x, y and z for 4 neighbours + // get distances along x, y and z for BUNCH neighbours // double dx[BUNCH], dy[BUNCH], @@ -540,6 +589,10 @@ double process_with_arrays( const double * restrict pos_x, for ( int j = 0; j < BUNCH; j++ ) vforce[j] += mm[j] / dist2[j]; + // move the pre-loaded prevx, prevy, prevz, prevm + // values into the vx, vy,vz, vm values + // + for ( int j = 0; j < BUNCH; j++ ) vx[j] = prevx[j]; for ( int j = 0; j < BUNCH; j++ ) @@ -586,6 +639,11 @@ double process_with_arrays_intrinsics( const double * restrict pos_x, const int target, const int *ngb_list, const int Nngb ) { + // here I use specifically 256bits instructions + // the vector_types.h should be evoluted so to include + // a definition of ops that is size independent + // + #define BUNCH 4 pos_x = __builtin_assume_aligned( pos_x, VALIGN ); @@ -624,7 +682,7 @@ double process_with_arrays_intrinsics( const double * restrict pos_x, IVDEP LOOP_VECTORIZE - LOOP_UNROLL_N(1) + LOOP_UNROLL_N(1) // this is to force the compiler not to unroll for ( int i = 0; i < _Nngb_; i+=BUNCH ) { @@ -673,10 +731,9 @@ double process_with_arrays_intrinsics( const double * restrict pos_x, // for ( int i = _Nngb_; i < Nngb; i++ ) { - double dist2 = ( - (pos_x[target] - pos_x[ngb_list[i]])*(pos_x[target] - pos_x[ngb_list[i]]) + - (pos_y[target] - pos_y[ngb_list[i]])*(pos_y[target] - pos_y[ngb_list[i]]) + - (pos_z[target] - pos_z[ngb_list[i]])*(pos_z[target] - pos_z[ngb_list[i]]) ); + double dist2 = ( (pos_x[target] - pos_x[ngb_list[i]])*(pos_x[target] - pos_x[ngb_list[i]]) + + (pos_y[target] - pos_y[ngb_list[i]])*(pos_y[target] - pos_y[ngb_list[i]]) + + (pos_z[target] - pos_z[ngb_list[i]])*(pos_z[target] - pos_z[ngb_list[i]]) ); force += mass[target]*mass[ngb_list[i]] / dist2; } @@ -701,14 +758,13 @@ int main( int argc, char **argv ) long int seed = (argc > 3 ? atoi(*(argv+3)) : -1 ); // set the seed for repeatible runs int Nrepetitions= (argc > 4 ? atoi(*(argv+4)) : NREPETITIONS ); int dry_run = (argc > 5 ? atoi(*(argv+5)) : 0 ); // 1 to estimate floats for initialization - int from_file = ( case_to_run < 0 ); + int from_file = ( case_to_run > 0 ); - case_to_run = (case_to_run < 0 ? ~case_to_run + 1 : case_to_run); // make it positive + case_to_run = (case_to_run < 0 ? -case_to_run : case_to_run); // make it positive - if ( (case_to_run < 0) || (case_to_run > 5) ) { + if ( case_to_run > 5 ) { printf("unknown case %d\n", case_to_run ); return 0; } - P_t * restrict P; @@ -725,14 +781,33 @@ int main( int argc, char **argv ) if ( case_to_run == 0 ) + /* + * case = 0 + * amounts to generate the particles and their neighbours' indexes + * and to dump them on file; + * no processing involved in thise case + */ { outfile = fopen( "particle.data", "w" ); fwrite( &N, sizeof(int), 1, outfile ); fwrite( &Nrepetitions, sizeof(int), 1, outfile ); } - else + else + /* + * cases -4,..,-1,1,..,4 are about actual processing + */ { + if ( from_file ) { + + /* + * for cases -1, .., -4 we want to process the data written in + * the file "particle.data" (sorry for bein so rude, we can add + * an argument for filename) + * + * Here we open the file and read how many particles are therein + */ + size_t ret; int Nrep; infile = fopen( "particle.data", "r" ); @@ -755,19 +830,25 @@ int main( int argc, char **argv ) if ( dry_run ) printf(" >>> DRY RUN :: use to estimate ops outside the actual calculations\n" ); + /* + * allocate memory + * the allocation depends on the case to be run, + * since we use different data structures in + * different cases + */ if ( case_to_run > 0 ) { switch( case_to_run ) { case 1: { - // allocate array of big structures + // allocate one array of big structures // P = (P_t *)aligned_alloc(VALIGN, N * sizeof(P_t)); } break; case 2: case 3: { - // allocate array of small structures (vectors) + // allocate one array of small structures (vectors) // vpos = (v4df *)aligned_alloc(VALIGN, N * sizeof(v4df)); } break; @@ -784,11 +865,15 @@ int main( int argc, char **argv ) memset( (void*)force, 0, N*sizeof(double)); } + // initialize particle data // { if ( ! from_file ) { + // we are not reading the data from file, + // hence we need to initialize the random seed + // if ( seed == -1 ) srand48(time(NULL)); else @@ -798,14 +883,27 @@ int main( int argc, char **argv ) { double data[4] __attribute__((aligned(VALIGN))); if ( ! from_file ) - for ( int j = 0; j < 4; j++ ) - data[j] = drand48(); + { + // if we're jot reading data + // from file, we generate a random + // quadruplet of double + // + for ( int j = 0; j < 4; j++ ) + data[j] = drand48(); + } else { + // otherwise, we read it from file + // size_t ret = fread ( data, 4, sizeof(double), infile ); } - if ( case_to_run >= 0 ) + if ( case_to_run >= 0 ) + /* + * if we're not reading from file, + * we assigne the values in the quadruplet + * to the data structure that we are using + */ switch( case_to_run ) { case 0:{ @@ -834,6 +932,9 @@ int main( int argc, char **argv ) } if ( dry_run ) { + // + // we wanted to time just the set-up + // printf("dry run: going to clean up\n"); goto clean; } @@ -844,7 +945,7 @@ int main( int argc, char **argv ) * ------------------------------------------------------ */ // determine how many particles will be active, from - // a minimum of 1/100-ed to a maximum of 1/100+1/10 + // a minimum of 1/20-ed to a maximum of 1/20+1/10 // int Nactive; @@ -852,7 +953,7 @@ int main( int argc, char **argv ) if ( case_to_run >= 0 && !from_file ) { - Nactive = N/100 + (int)lrand48() % (N/10); // 11% of particle will be processed at max, 1% as minimum + Nactive = N/20 + (int)lrand48() % (N/10); // ~15% of particle will be processed at max, 1% as minimum if ( case_to_run == 0 ) { size_t ret = fwrite( &Nactive, sizeof(int), 1, outfile );} } @@ -860,7 +961,7 @@ int main( int argc, char **argv ) size_t ret = fread( &Nactive, sizeof(int), 1, infile ); } AvgNneighbours = ( NNEIGHBOURS > N*0.1 ? (int)(N*0.1) : NNEIGHBOURS ); - ngb_list = (int*)calloc( AvgNneighbours * 1.2, sizeof(int) ); + ngb_list = (int*)calloc( AvgNneighbours * 1.2, sizeof(int) ); PAPI_INIT; @@ -868,7 +969,7 @@ int main( int argc, char **argv ) double timing = 0; - // loop over active particles + // loop for Nactive particles // for ( int j = 0; j < Nactive; j++ ) { @@ -883,7 +984,7 @@ int main( int argc, char **argv ) if ( !from_file ) { - target = (int)lrand48() % N; // the index of the to-be-processed particle + target = (int)lrand48() % (N-Nrepetitions-1); // the index of the to-be-processed particle // determine how many will be the neighbours // ( same comment than before apply) @@ -902,9 +1003,10 @@ int main( int argc, char **argv ) } if ( case_to_run == 0 ) - { + {/* printf("iter %d target %d has %u neighbours\n", j, target, (unsigned)Nneighbours ); + */ size_t ret; ret = fwrite( &target, sizeof(int), 1, outfile ); ret = fwrite( &Nneighbours, sizeof(int), 1, outfile ); @@ -932,7 +1034,7 @@ int main( int argc, char **argv ) for ( int shot = 0; shot <= Nrepetitions; shot++ ) { // shot = = iteration is used as a warm-up for the neighbours walk - int mytarget = (target+shot)%N; + int mytarget = (target+shot)%N; if ( shot >= 1 ) now = CPU_TIME; @@ -1018,7 +1120,7 @@ int main( int argc, char **argv ) // char filename[100]; - sprintf( filename, "force.%d.out", case_to_run ); + sprintf( filename, "force1.%d.out", case_to_run ); FILE *output = fopen( filename, "w" ); if( output != NULL ) { fwrite( &N, sizeof(int), 1, output); diff --git a/src/vect.2.c b/src/vect.2.c index 06bbfa5d171c59d39f44337b4ac28a8cf7296a07..f4612da2836da376df7663ef9e14ade8e538b89c 100644 --- a/src/vect.2.c +++ b/src/vect.2.c @@ -31,9 +31,10 @@ #if defined(__STDC__) -# if (__STDC_VERSION__ >= 199901L) -# define _XOPEN_SOURCE 700 -# endif +#if (__STDC_VERSION__ <= 201710L) +#error "c2x standard is needed to compile this source" +#endif +#define _XOPEN_SOURCE 700 #endif #include <stdlib.h> @@ -106,9 +107,20 @@ double process_with_structure( const P_t * restrict P, void * restrict mem, double *timing) { + /* + * move all the neighbours into a local buffer + * here we hide the latency of sparse memory access + * and we try to enhance the vectorization by building + * the possibility of a loop over contiguous memory + * + */ P_t * restrict memP = __builtin_assume_aligned((P_t*)mem, VALIGN); for ( int i = 0; i < Nngb; i++ ) memP[i] = P[ ngb_list[i] ]; + + // let's return the timing of the pure calculation so that we + // can compare with v1 + // *timing = CPU_TIME; @@ -171,11 +183,12 @@ double process_with_vectors( const v4df * restrict V, alignas(VALIGN) const v4df mask_pos = {1,1,1,0}; // used to mask out the mass alignas(VALIGN) const v4df mask_mass = {0,0,0,1}; // used to mask out the position - #if defined(VECTORS_V2) - double vforce[BUNCH] = {0}; - #endif - double force = 0; // this will accumulate the final total force on target - // from all the neigbhours + #if defined(VECTORS_V2) + // explicit unrolling - not yet implemented + double vforce[BUNCH] = {0}; + #endif + double force = 0; // this will accumulate the final total force on target + // from all the neigbhours ASSUME_ALIGNED(mem, VALIGN); ASSUME_ALIGNED(V, VALIGN); @@ -183,7 +196,7 @@ double process_with_vectors( const v4df * restrict V, v4df *memV = ASSUME_ALIGNED((v4df*)mem, VALIGN); IVDEP - LOOP_UNROLL_N(4) + LOOP_UNROLL_N(BUNCH) VECTOR_ALIGNED for ( int i = 0; i < Nngb; i++ ) memV[i] = V[ ngb_list[i] ]; @@ -211,9 +224,13 @@ double process_with_vectors( const v4df * restrict V, // get the force from the i-th neighbour // force += mm /( ((v4df_u)dist2).p[0] + ((v4df_u)dist2).p[1] + ((v4df_u)dist2).p[2] ); + // + // NOTE: the accumulation here above on a single double is obvisouly a bottleneck + // because it imposes a strict constraint on compiler's optimization. + // I } - // process the reamining particles + // process the remaining particles // for ( int i = _Nngb_; i < Nngb; i++ ) { @@ -257,7 +274,7 @@ double process_with_vectors_intrinsics( const double * restrict V, { - double * restrict memV = ASSUME_ALIGNED((double*)mem, VALIGN); + double * restrict memV = ASSUME_ALIGNED ((double*)mem, VALIGN); __m256d * restrict memVV = ASSUME_ALIGNED ((__m256d*)mem, VALIGN); IVDEP @@ -361,9 +378,9 @@ double process_with_arrays( const double * restrict pos_x, { #define BUNCH DVSIZE - ASSUME_ALIGNED(pos_x, VALIGN); - ASSUME_ALIGNED(pos_y, VALIGN); - ASSUME_ALIGNED(pos_z, VALIGN); + ASSUME_ALIGNED( pos_x, VALIGN ); + ASSUME_ALIGNED( pos_y, VALIGN ); + ASSUME_ALIGNED( pos_z, VALIGN ); ASSUME_ALIGNED( mass, VALIGN ); ASSUME_ALIGNED( ngb_list, VALIGN ); @@ -383,7 +400,7 @@ double process_with_arrays( const double * restrict pos_x, _memm[i] = mass [k]; } - *timing = CPU_TIME; + double mytiming = CPU_TIME; const double x[BUNCH] = { [0 ... BUNCH-1] = pos_x[target]}; const double y[BUNCH] = { [0 ... BUNCH-1] = pos_y[target]}; @@ -450,7 +467,7 @@ double process_with_arrays( const double * restrict pos_x, force += mass[target]*mass[ngb_list[i]] / dist2; } - *timing = CPU_TIME - *timing; + *timing = CPU_TIME - mytiming; return force; #undef BUNCH } @@ -473,8 +490,6 @@ double process_with_arrays_intrinsics( const double * restrict pos_x, const int Nngb, double * timing) { - - *timing = CPU_TIME; ASSUME_ALIGNED(pos_x, VALIGN); ASSUME_ALIGNED(pos_y, VALIGN); @@ -509,32 +524,47 @@ double process_with_arrays_intrinsics( const double * restrict pos_x, dvector_t register vtargetm ATTRIBUTE_ALIGNED(VALIGN) = (dvector_t)(mass[target]); dvector_t one ATTRIBUTE_ALIGNED(VALIGN) = (dvector_t)(1.0); #else - dvector_t register vtargetx ATTRIBUTE_ALIGNED(VALIGN) = {pos_x[target]}; - dvector_t register vtargety ATTRIBUTE_ALIGNED(VALIGN) = {pos_y[target]}; - dvector_t register vtargetz ATTRIBUTE_ALIGNED(VALIGN) = {pos_z[target]}; - dvector_t register vtargetm ATTRIBUTE_ALIGNED(VALIGN) = {mass[target]}; - dvector_t one ATTRIBUTE_ALIGNED(VALIGN) = {1.0}; + #define X(v) (pos_x[target]) + dvector_t vtargetx ATTRIBUTE_ALIGNED(VALIGN) = INIT_VECTOR(DVSIZE); + #undef X + #define X(v) (pos_y[target]) + dvector_t vtargety ATTRIBUTE_ALIGNED(VALIGN) = INIT_VECTOR(DVSIZE); + #undef X + #define X(v) (pos_z[target]) + dvector_t vtargetz ATTRIBUTE_ALIGNED(VALIGN) = INIT_VECTOR(DVSIZE); + #undef X + #define X(v) (mass[target]) + dvector_t vtargetm ATTRIBUTE_ALIGNED(VALIGN) = INIT_VECTOR(DVSIZE); + #undef X + #define X(v) 1.0 + dvector_t one ATTRIBUTE_ALIGNED(VALIGN) = INIT_VECTOR(DVSIZE); + #undef X + #define X(v) 0.0 + dvector_t vforce ATTRIBUTE_ALIGNED(VALIGN) = INIT_VECTOR(DVSIZE); + #undef X #endif - dvector_t register vforce ATTRIBUTE_ALIGNED(VALIGN) = {0}; + double force = 0; // this will accumulate the total force on target from all the neigbhours int _Nngb_ = Nngb & (-DVSIZE); _Nngb_ = _Nngb_ / (DVSIZE); - + + double mytiming = CPU_TIME; + IVDEP LOOP_VECTORIZE for ( int i = 0; i < _Nngb_; i++ ) { - dvector_t register dist2x = (vtargetx - _vmemx[i]); - dvector_t register dist2y = (vtargety - _vmemy[i]); - dvector_t register dist2z = (vtargetz - _vmemz[i]); + dvector_t dist2x = (vtargetx - _vmemx[i]); + dvector_t dist2y = (vtargety - _vmemy[i]); + dvector_t dist2z = (vtargetz - _vmemz[i]); dist2x *= dist2x; dist2y *= dist2y; dist2z *= dist2z; - dvector_t register inv_dist = one / (dist2x+dist2y+dist2z); + dvector_t inv_dist = one / (dist2x+dist2y+dist2z); vforce += (vtargetm * _vmemm[i]) * inv_dist; } @@ -559,7 +589,7 @@ double process_with_arrays_intrinsics( const double * restrict pos_x, for ( int i = 0; i < 4; i++ ) force += _vforce_.v[i]; - *timing = CPU_TIME - *timing; + *timing = CPU_TIME - mytiming; return force; @@ -575,11 +605,11 @@ int main( int argc, char **argv ) long int seed = (argc > 3 ? atoi(*(argv+3)) : -1 ); // set the seed for repeatible runs int Nrepetitions= (argc > 4 ? atoi(*(argv+4)) : NREPETITIONS ); int dry_run = (argc > 5 ? atoi(*(argv+5)) : 0 ); // 1 to estimate floats for initialization - int from_file = ( case_to_run < 0 ); + int from_file = ( case_to_run > 0 ); - case_to_run = (case_to_run < 0 ? ~case_to_run + 1 : case_to_run); // make it positive + case_to_run = (case_to_run < 0 ? -case_to_run : case_to_run); // make it positive - if ( (case_to_run < 0) || (case_to_run > 5) ) { + if ( case_to_run > 5 ) { printf("unknown case %d\n", case_to_run ); return 0; } @@ -624,7 +654,7 @@ int main( int argc, char **argv ) printf(" > Running case %d -> \"%s\" with %u points and %d repetitions\n" - " > Vectors are %lu bytes long, %lu double long\n", + " > Vectors are %lu bytes long, %d double long\n", case_to_run, implementation_labels[case_to_run], N, Nrepetitions, VSIZE, DVSIZE); if ( dry_run ) @@ -774,7 +804,7 @@ int main( int argc, char **argv ) if ( !from_file ) { - target = (int)lrand48() % N; // the index of the to-be-processed particle + target = (int)lrand48() % (N-Nrepetitions); // the index of the to-be-processed particle // determine how many will be the neighbours // ( same comment than before apply) @@ -794,8 +824,10 @@ int main( int argc, char **argv ) if ( case_to_run == 0 ) { - printf("iter %d target %d has %u neighbours\n", j, - target, (unsigned)Nneighbours ); + /* + printf("iter %d target %d has %u neighbours\n", j, + target, (unsigned)Nneighbours ); + */ size_t ret; ret = fwrite( &target, sizeof(int), 1, outfile ); ret = fwrite( &Nneighbours, sizeof(int), 1, outfile ); @@ -915,7 +947,7 @@ int main( int argc, char **argv ) // char filename[100]; - sprintf( filename, "force.%d.out", case_to_run ); + sprintf( filename, "force2.%d.out", case_to_run ); FILE *output = fopen( filename, "w" ); if( output != NULL ) { fwrite( &N, sizeof(int), 1, output);