Skip to content
Snippets Groups Projects
Commit 7461a3fc authored by Claudio Gheller's avatar Claudio Gheller
Browse files

changes and add-ons to support SKA data challenge 3

parent 8234e1c5
No related branches found
No related tags found
No related merge requests found
CC = gcc
CXX = g++
MPICC = mpicc
MPIC++ = mpiCC
#FFTW_INCL= -I/homes/gheller/Pleiadi/fftw-3.3.10/local/include/
#FFTW_LIB= /homes/gheller/Pleiadi/fftw-3.3.10/local/lib/
FFTW_INCL= -I/homes/gheller/Pleiadi/fftw-3.3.10/local-mpich/include/
FFTW_LIB= /homes/gheller/Pleiadi/fftw-3.3.10/local-mpich/lib/
FITSIO_INCL= -I/iranet/arcsoft/cfitsio/cfitsio-4.1.0/include/
FITSIO_LIB= /iranet/arcsoft/cfitsio/cfitsio-4.1.0/lib/
NVCC = nvcc
NVFLAGS = -arch=sm_70 -Xcompiler -mno-float128 -std=c++11
NVLIB = -L/cineca/prod/opt/compilers/cuda/10.1/none/lib64/ -lcudart -lcuda
OMP= -fopenmp
CFLAGS += -I. $(FFTW_INCL) $(GSL_INCL) $(MPI_INCL) $(FITSIO_INCL)
OPTIMIZE = $(OMP) -O3 -mtune=native
......@@ -33,11 +33,13 @@ endif
# write the final image
OPT += -DWRITE_IMAGE
# perform w-stacking phase correction
OPT += -DPHASE_ON
#OPT += -DPHASE_ON
# Support CFITSIO !!! Remember to add the path to the CFITSIO library to LD_LIBRARY_PATH
OPT += -DFITSIO
#OPT += -DFITSIO
# Perform true parallel images writing
#OPT += -DPARALLELIO
# Normalize uvw in case it is not done in the binMS
OPT += -DNORMALIZE_UVW
ifeq (FITSIO,$(findstring FITSIO,$(OPT)))
LIBS += -L$(FITSIO_LIB) -lcfitsio
......
......@@ -8,7 +8,7 @@ import os
#outpath = '/data/gridding/data/shortgauss_t201806301100_SBH255.binMS/'
print(sys.argv[1])
outpath = "/data/gridding/data/Lofarbig/"+sys.argv[1]+".binMS/"
outpath = "/homes/gheller/lofar2/gheller/binMS/"+sys.argv[1]+".binMS/"
os.mkdir(outpath)
......@@ -38,7 +38,7 @@ num_threads = 1
# input MS
readtime0 = time.time()
#msfile = "/data/Lofar-data/results/L798046_SB244_uv.uncorr_130B27932t_146MHz.pre-cal.ms"
msfile = "/data/Lofar-Luca/results/"+sys.argv[1]+".ms/"
msfile = "/homes/gheller/lofar2/gheller/MS/"+sys.argv[1]+".ms/"
ms = pt.table(msfile, readonly=True, ack=False)
if rank == 0:
......
USE_MPI = 0
import numpy as np
import casacore.tables as pt
import time
import sys
import os
startfreq = 0
endfreq = 1
stepfreq = 5
#outpath = '/data/gridding/data/shortgauss_t201806301100_SBH255.binMS/'
print(sys.argv[1])
outpath = "/homes/gheller/lofar2/gheller/binMS/"+sys.argv[1]+str(startfreq)+"-"+str(endfreq)+"-"+str(stepfreq)+".binMS/"
os.mkdir(outpath)
ufile = 'ucoord.bin'
vfile = 'vcoord.bin'
wfile = 'wcoord.bin'
weights = 'weights.bin'
visrealfile = 'visibilities_real.bin'
visimgfile = 'visibilities_img.bin'
metafile = 'meta.txt'
offset = 0.0
if USE_MPI == 1:
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print(rank,size)
else:
comm = 0
rank = 0
size = 1
num_threads = 1
# input MS
readtime0 = time.time()
num_points_tot = 0
num_vis_tot = 0
for ifreq in range(startfreq,endfreq,stepfreq):
freqlabel = format(ifreq,'04d')
msfile = "/homes/gheller/lofar2/gheller/MS/"+sys.argv[1]+freqlabel+".ms/"
ms = pt.table(msfile, readonly=True, ack=False)
if rank == 0:
print("Reading ", msfile)
print("Writing ", outpath)
# load data and metadata
with pt.table(msfile + '::SPECTRAL_WINDOW', ack=False) as freqtab:
freq = freqtab.getcol('REF_FREQUENCY')[0] / 1000000.0
freqpersample = np.mean(freqtab.getcol('RESOLUTION'))
timepersample = ms.getcell('INTERVAL',0)
print("Frequencies (MHz) : ",freq)
print("Time interval (sec) : ",timepersample)
with pt.taql("SELECT ANTENNA1,ANTENNA2,sqrt(sumsqr(UVW)),GCOUNT() FROM $ms GROUPBY ANTENNA1,ANTENNA2") as BL:
ants1, ants2 = BL.getcol('ANTENNA1'), BL.getcol('ANTENNA2')
Ntime = BL.getcol('Col_4')[0] # number of timesteps
Nbaselines = len(ants1)
print("Number of timesteps : ",Ntime)
print("Total obs time (hrs): ",timepersample*Ntime/3600)
print("Number of baselines : ",Nbaselines)
#sp = pt.table(msfile+'::LOFAR_ANTENNA_FIELD', readonly=True, ack=False, memorytable=True).getcol('POSITION')
ant1, ant2 = ms.getcol('ANTENNA1'), ms.getcol('ANTENNA2')
number_of_measures = Ntime * Nbaselines
#nm_pe_aux = int(number_of_measures / size)
#remaining_aux = number_of_measures % size
nm_pe = np.array(0)
nm_pe = int(number_of_measures / size)
remaining = np.array(0)
remaining = number_of_measures % size
print(nm_pe,remaining)
else:
nm_pe = None
remaining = None
if USE_MPI == 1:
nm_pe = comm.bcast(nm_pe, root=0)
remaining = comm.bcast(remaining, root=0)
# set the data domain for each MPI rank
startrow = rank*nm_pe
if rank == size-1:
nm_pe = nm_pe+remaining
print(rank,nm_pe,remaining)
nrow = nm_pe
# read data
uvw = ms.getcol('UVW',startrow,nrow)
vis = ms.getcol('DATA',startrow,nrow)
weight = ms.getcol('WEIGHT',startrow,nrow)
print("Freqs per channel : ",vis.shape[1])
print("Polarizations : ",vis.shape[2])
print("Number of observations : ",uvw.shape[0])
print("Data size (MB) : ",uvw.shape[0]*vis.shape[1]*vis.shape[2]*2*4/1024.0/1024.0)
ms.close()
# set parameters
num_points = uvw.shape[0]
num_points_tot = num_points_tot + num_points
num_vis = num_points * vis.shape[1] * vis.shape[2]
num_vis_tot = num_vis_tot + num_vis
num_w_planes = 1
# serialize arrays
vis_ser_real = vis.real.flatten()
vis_ser_img = vis.imag.flatten()
print("data types: uvw = ",uvw.dtype," vis = ",vis_ser_real.dtype)
#vis_ser = np.zeros(2*vis_ser_real.size)
#for i in range(vis_ser_real.size):
# vis_ser[2*i]=vis_ser_real[i]
# vis_ser[2*i+1]=vis_ser_img[i]
uu_ser = uvw[:,0].flatten()
vv_ser = uvw[:,1].flatten()
ww_ser = uvw[:,2].flatten()
weight_ser = weight.flatten()
print(np.amin(uu_ser),np.amax(uu_ser))
print(np.amin(vv_ser),np.amax(vv_ser))
print(np.amin(ww_ser),np.amax(ww_ser))
readtime1 = time.time()
outfile = outpath+ufile
newFile = open(outfile, "a+b")
binary_format = bytearray(uu_ser)
newFile.write(binary_format)
newFile.close()
outfile = outpath+vfile
newFile = open(outfile, "a+b")
binary_format = bytearray(vv_ser)
newFile.write(binary_format)
newFile.close()
outfile = outpath+wfile
newFile = open(outfile, "a+b")
binary_format = bytearray(ww_ser)
newFile.write(binary_format)
newFile.close()
outfile = outpath+weights
newFile = open(outfile, "a+b")
binary_format = bytearray(weight_ser)
newFile.write(binary_format)
newFile.close()
outfile = outpath+visrealfile
newFile = open(outfile, "a+b")
binary_format = bytearray(vis_ser_real)
newFile.write(binary_format)
newFile.close()
outfile = outpath+visimgfile
newFile = open(outfile, "a+b")
binary_format = bytearray(vis_ser_img)
newFile.write(binary_format)
newFile.close()
ming = 0.0
maxg = 1.0
minw = 0.0
maxw = 1.0
outfile = outpath+metafile
f = open(outfile, 'w')
f.writelines(str(num_points_tot)+"\n")
f.writelines(str(num_vis_tot)+"\n")
f.writelines(str(vis.shape[1])+"\n")
f.writelines(str(vis.shape[2])+"\n")
f.writelines(str(Ntime)+"\n")
f.writelines(str(timepersample)+"\n")
f.writelines(str(timepersample*Ntime/3600)+"\n")
f.writelines(str(Nbaselines)+"\n")
f.writelines(str(ming)+"\n")
f.writelines(str(maxg)+"\n")
f.writelines(str(minw)+"\n")
f.writelines(str(maxw)+"\n")
f.close()
......@@ -219,10 +219,7 @@ if(rank == 0){
// INPUT FILES (only the first ndatasets entries are used)
int ndatasets = 1;
strcpy(datapath_multi[0],"/m100_scratch/userexternal/ederubei/L798046_SB244_uv.uncorr_130B27932t_146MHz.pre-cal.binMS/");
//strcpy(datapath_multi[0],"/m100_scratch/userexternal/cgheller/gridding/newgauss4_t201806301100_SBL180.binMS/");
//strcpy(datapath_multi[0],"/m100_scratch/userexternal/cgheller/gridding/Lofar/L798046_SB244_uv.uncorr_130B27932t_146MHz.pre-cal.binMS/");
//strcpy(datapath_multi[1],"/m100_scratch/userexternal/cgheller/gridding/Lofar/L798046_SB244_uv.uncorr_130B27932t_134MHz.pre-cal.binMS/");
strcpy(datapath_multi[0],"/homes/gheller/lofar2/gheller/binMS/ZW2_IFRQ_0-1-5.binMS/");
strcpy(datapath,datapath_multi[0]);
// Read metadata
......@@ -319,6 +316,62 @@ if(rank == 0){
fread(ww,Nmeasures*sizeof(double),1,pFile);
fclose(pFile);
#ifdef NORMALIZE_UVW
double minu = 1e20;
double minv = 1e20;
double minw = 1e20;
double maxu = -1e20;
double maxv = -1e20;
double maxw = -1e20;
for (long inorm=0; inorm<Nmeasures; inorm++)
{
minu = MIN(minu,uu[inorm]);
minv = MIN(minv,vv[inorm]);
minw = MIN(minw,ww[inorm]);
maxu = MAX(maxu,uu[inorm]);
maxv = MAX(maxv,vv[inorm]);
maxw = MAX(maxw,ww[inorm]);
}
double minu_all;
double minv_all;
double minw_all;
double maxu_all;
double maxv_all;
double maxw_all;
#ifdef USE_MPI
MPI_Allreduce(&minu,&minu_all,1, MPI_FLOAT, MPI_MIN, MPI_COMM_WORLD);
MPI_Allreduce(&minv,&minv_all,1, MPI_FLOAT, MPI_MIN, MPI_COMM_WORLD);
MPI_Allreduce(&minw,&minw_all,1, MPI_FLOAT, MPI_MIN, MPI_COMM_WORLD);
MPI_Allreduce(&maxu,&maxu_all,1, MPI_FLOAT, MPI_MAX, MPI_COMM_WORLD);
MPI_Allreduce(&maxv,&maxv_all,1, MPI_FLOAT, MPI_MAX, MPI_COMM_WORLD);
MPI_Allreduce(&maxw,&maxw_all,1, MPI_FLOAT, MPI_MAX, MPI_COMM_WORLD);
#else
minu_all = minu;
minv_all = minv;
minw_all = minw;
maxu_all = maxu;
maxv_all = maxv;
maxw_all = maxw;
#endif
double offset = 0.0;
double ming = MIN(minu_all,minv_all);
double maxg = MAX(maxu_all,maxv_all);
minw = minw_all;
maxw = maxw_all;
ming = ming-offset*ming;
maxg = maxg+offset*maxg;
for (long inorm=0; inorm<Nmeasures; inorm++)
{
uu[inorm] = (uu[inorm]+maxg)/(2.0*maxg);
vv[inorm] = (vv[inorm]+maxg)/(2.0*maxg);
ww[inorm] = (ww[inorm]-minw)/(maxw-minw);
}
#endif
#ifdef USE_MPI
MPI_Barrier(MPI_COMM_WORLD);
#endif
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment