Skip to content
Snippets Groups Projects
Select Git revision
  • 50188b8e7220bc4c56d2fe3db47cbd5b9b4ef00e
  • master default protected
  • fix-issue-901
  • fix-issue-928
  • fix-issue-896-zmq-publish
  • fix-issue-885
  • fix-issue-921
  • fix-910
  • fix-issue-804
  • srt-bandQ-receiver
  • fix-issue-855
  • stable
  • srt-bandW-receiver
  • fix-issue-805
  • feature-med-c-band-srv
  • fix-issue-760
  • fix-issue-628
  • fix-issue-588
  • fix-issue-derotator-328
  • OffsetReview
  • DerotatorAndMinorServo
  • discos1.0.6h
  • discos1.0.6f
  • discos1.0.6e
  • discos1.0.6d
  • discos1.0.6c
  • discos1.0.6b
  • discos1.0.6a
  • discos1.0.6
  • discos1.0.5
  • discos1.0.4
  • discos1.0.3
  • discos1.0.2
  • discos1.0.1
  • discos1.0.0
  • discos1.0-rc02
  • discos1.0-rc01
  • escs-0.5
  • escs-0.4
  • nuraghe-0.6
  • noto-0.1
41 results

EngineThread.cpp

Blame
  • VlkbAmqp.java 15.39 KiB
    
    import java.util.logging.Logger;
    import java.util.logging.Level;
    import java.util.List;
    import java.util.ArrayList;
    import java.util.Arrays;
    
    import java.time.Instant;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.io.InputStreamReader;
    import java.io.OutputStreamWriter;
    import java.io.PrintWriter;
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.nio.file.StandardOpenOption;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    
    import java.time.*;// Timestamp in cut-filename
    import java.io.ByteArrayOutputStream; // for SODA direct streaming doSubimgStream
    
    import vo.parameter.*;
    
    class VlkbAmqp implements Vlkb
    {
       static final Logger LOGGER = Logger.getLogger(VlkbAmqp.class.getName());
    
       private Settings    settings   = null;
       private Subsurvey[] subsurveys = null;
       private Resolver    resolver   = null;
    
       public VlkbAmqp()
       {
          LOGGER.fine("trace VlkbAmqp()");
          this.settings = Settings.getInstance();
          this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys)
                : new ResolverByObsCore(settings.dbConn, subsurveys));
       }
    
    
       public VlkbAmqp(Settings settings)
       {
          LOGGER.fine("trace VlkbAmqp(settings)");
          this.settings = settings;
          this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys)
                : new ResolverByObsCore(settings.dbConn, subsurveys));
       }
    
    
    
       public VlkbAmqp(Settings settings, Subsurvey[] subsurveys)
       {
          LOGGER.fine("trace VlkbAmqp(settings, subsurveys)");
          this.settings = settings;
          this.subsurveys = subsurveys;
          this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys)
                : new ResolverByObsCore(settings.dbConn, subsurveys));
       }
    
    
    	public NullValueCount doCountNullValues(String absPathname, int hdunum)
    		throws IOException, InterruptedException
    		{
    			LOGGER.fine("trace: not implemented; TB deprecated");
    			return new NullValueCount();
    		}
    
    
    	public CutResult doMerge(String[] idArr, Coord coord, boolean countNullValues)
    		throws FileNotFoundException, IOException
    		{
    			LOGGER.fine("trace");
    
    			return merge(idArr, coord, countNullValues);
    		}
    
    
    	///////////////////////////////////////////////////////////////////////////////////
    	public void doFile(String relPathname, int hdunum,
    			Pos pos, Band band, Time time, Pol pol, String dummyCutAbsPathname)
    		throws IOException, InterruptedException
    		{
    			;// only placehoder for compatibility with Vlkb.java interface,
    			 //Amqp support is deprecated
    		}
    
    	public void doFile(String relPathname, int hdunum,
    			String pixels, String dummyCutAbsPathname)
    		throws IOException, InterruptedException
    		{
    			;// only placehoder for compatibility with Vlkb.java interface,
    			 //Amqp support is deprecated
    		}
    
    
    	public CutResult doFileAmqp(String relPathname, int hdunum,
    			Pos pos, Band band, Time time, Pol pol,
    			boolean countNullValues, FitsCard[] extraCards, String dummyCutAbsPathname)
    		throws IOException, InterruptedException
    		{
    			LOGGER.fine("trace: " + pos.toString() );
    
    			CutResult cutResult = new CutResult();
    
    			LOGGER.finer("Using AMQP");
    
    			JsonEncoder jReq = new JsonEncoder();
    			jReq.add(relPathname, hdunum);
    			jReq.add(pos);
    			jReq.add(band);
    			jReq.add(time);
    			jReq.add(pol);
    
    			jReq.add(countNullValues);
    			jReq.add(extraCards);
    
    			String outJson = RpcOverAmqp.doRpc( settings.amqpConn, jReq.toString() );
    
    			cutResult = JsonDecoder.responseFromCutoutJson( outJson );
    
    			return cutResult;
    		}
    
    
    
    	private CutResult doFileById(String id, Pos pos, Band band, Time time, Pol pol,
    			boolean countNullValues, Subsurvey[] subsurveys)
    		throws IOException, InterruptedException
    		{
    			LOGGER.fine("trace");
    
    			FitsCard[] extraCards = null;
    
    			this.resolver.resolve(id);
    			String relPathname = this.resolver.relPathname();
    			int hdunum         = this.resolver.hdunum();
    			String subsurveyId = this.resolver.obsCollection();
    
    			if(subsurveyId != null)
    			{
    				extraCards = Subsurvey.subsurveysFindCards(subsurveys, subsurveyId);
    			}
    			else
    			{
    				LOGGER.finer("Resolver returns subsurveyId null: no extraCards loaded.");
    			}
    
    			final String DEFAULT_TIME_SYSTEM = "MJD_UTC"; // FIXME take from confif file
    
    			CutResult cutResult = doFileAmqp(relPathname, hdunum, pos, band, time, pol,
    					countNullValues, extraCards, null);
    
    			return cutResult;
    		}
    
    
    
    
    	///////////////////////////////////////////////////////////////////////////////////
    
    
    	public MCutResult doMCutout(String jdlJson, String workDir)
    		throws IOException
    		{
    			LOGGER.fine("trace");
    
    			MCutResult mCutResult;
    
    			LOGGER.finer("doMCutout over AMQP");
    			String updatedJsonString = JdlMCutout.resolveAndUpdateJsonRequest(jdlJson, settings, subsurveys);
    			LOGGER.finest("doMCutout over AMQP : " + updatedJsonString);
    			String outJson = RpcOverAmqp.doRpc(settings.amqpConn,  JdlMCutout.mcutoutToJson(updatedJsonString) );
    			mCutResult = JdlMCutout.responseFromMCutoutJson(outJson);
    
    			return mCutResult;
    		}
    
    
    	/* ================= MERGE =============================== */
    
    	private  String generateSubimgPathname(String relPathname, int hdunum)
    	{
    		String cutfitsname = "vlkb-cutout";
    
    		Instant instant = Instant.now() ;
    		String timestamp = instant.toString().replace(":","-").replace(".","_");
    
    		String tempPathname1 = relPathname.replaceAll("/","-");
    		String tempPathname2 = tempPathname1.replaceAll(" ","_");
    
    		if(hdunum == 1)
    		{
    			return cutfitsname + "_" + timestamp + "_" + tempPathname2;
    		}
    		else
    		{
    			String extnum = "EXT" + String.valueOf(hdunum-1);
    			return cutfitsname + "_" + timestamp + "_" + extnum + "_" + tempPathname2;
    		}
    	}
    
    
    
    	private CutResult cutout(
    			String publisherDid, Coord coord,
    			boolean countNullValues)
    	{
    		// ResolverByObsCore rsl = new ResolverByObsCore(settings.dbConn, subsurveys);
    		Resolver rsl = resolver;//new ResolverFromId();//settings.dbConn, subsurveys);
    		rsl.resolve(publisherDid);
    
    		FitsCard[] extraCards = null;
    		//Subsurvey.subsurveysFindCards(subsurveys, rsl.obsCollection());//rsl.subsurveyId);
    		String absSubimgPathname = settings.fitsPaths.cutouts() + "/"
    			+ generateSubimgPathname(rsl.relPathname(), rsl.hdunum());
    		LOGGER.finest("absSubimgPathname: " + absSubimgPathname);
    
    		LOGGER.finer("Using AMQP");
    
    		JsonEncoder jReq = new JsonEncoder();
    		jReq.add(rsl.relPathname(), rsl.hdunum());
    		jReq.add(coord.pos);
    		jReq.add(coord.band);
    		jReq.add(coord.time);
    		jReq.add(coord.pol);
    
    		jReq.add(countNullValues);
    		jReq.add(extraCards);
    
    		String inJson = jReq.toString();
    
    		return JsonDecoder.responseFromCutoutJson( RpcOverAmqp.doRpc(settings.amqpConn, inJson) );
    	}
    
    
    	protected CutResult merge(String[] pubdids, Coord coord, Boolean countNullValues)
    	{
    		LOGGER.fine("trace");
    
    		ArrayList<CutResult> allresults = new ArrayList<CutResult>();
    
    		// 1. Decode pubdid's from inputs.pubdid and cutout on each
    
    		CutResult[] allCutResults = do_cutouts(
    				pubdids, coord,
    				countNullValues);
    
    		allresults.addAll(Arrays.asList(allCutResults));
    
    
    		String[] allCutPathnames = selectCutPathnames(allCutResults);
    
    		if( allCutPathnames.length <= 0 ){
    			LOGGER.warning("No cutout created.");
    			return null;
    		}
    		if( allCutPathnames.length != pubdids.length ) {
    			LOGGER.warning("Number of cubes found and number of cutouts created do not match.");
    		}
    
    		try
    		{
    			// 2. regridding (closest neighbour interpolation) for files to be merged
    
    			Regrid grid = new Regrid();
    			int dim = grid.dimensions(allCutPathnames);
    			if( dim > 2 ) {
    				Boolean changed = grid.regrid_vel2(allCutPathnames);
    				if(changed){
    					//allresults.add("MSG Keywords CDELT3, CRVAL3 were adjusted for merge regridding.");
    					LOGGER.finer("MSG Keywords CDELT3, CRVAL3 were adjusted for merge regridding.");
    				}
    			}
    
    			// 3. Merge cut-files
    
    			//String[] strar_results = mergefiles_parallel(id, logFileName,  // logfileName
    			//String[] strar_results = mergefiles_split_execution(id, logFileName,  // logfileName
    			CutResult strar_results = mergefiles(
    					String.valueOf(dim),  // prefix: "2D" or "3D"
    					allCutPathnames);     // files to merge
    
    			allresults.addAll(Arrays.asList(strar_results));
    
    		}
    		catch(Exception e)
    		{
    			LOGGER.log(Level.SEVERE, "merge:",e);
    			//allresults.add(
    			//      "MSG System error. Report time, your IP-number, and the exact request-URL string to service administrator.");
    		}
    
    		CutResult[] dlkArr = allresults.toArray(new CutResult[allresults.size()]);
    		return dlkArr[0]; // FIXME should return only datalink for the merged file not all cutout files?
    	}
    
    
    	protected CutResult[] do_cutouts(
    			String[] publisherDids, Coord coord,
    			Boolean countNullValues)
    	{
    		ArrayList<CutResult> allresults = new ArrayList<CutResult>();
    		if(publisherDids.length <= 0)
    			return null; // no cube found
    
    		for(String publisherDid : publisherDids)
    		{
    			CutResult cutout_results_table = cutout(
    					publisherDid, coord,
    					countNullValues);
    
    			allresults.addAll(Arrays.asList(cutout_results_table));
    		}
    
    		return allresults.toArray(new CutResult[allresults.size()]);
    	}
    
    
    	protected CutResult mergefiles(
    			String prefix,          // IN prefix added after filename-start-word
    			String[] filestomerge)  // IN abs path with filenames to be merged
    	{
    		LOGGER.fine("trace");
    
    		String InJson = JsonEncoderMerge.mergefilesToJson( prefix, filestomerge);
    		String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
    		return JsonDecoder.responseFromCutoutJson( OutJson );
    	}
    
    
    
    	// BEGIN parallel
    
    	protected String[] mergefiles_parallel(
    			String jobId,        // IN any identifier to be guaranteed distinct
    			String logfilename,  // IN logfilename without path
    			String prefix,          // IN prefix added after filename-start-word
    			String[] filestomerge)  // IN abs path with filenames to be merged
    	{
    		LOGGER.fine("trace");
    
    		String[] responseCH = mergefiles_common_header(jobId, logfilename, prefix, filestomerge);
    		for(String sentence : responseCH) VlkbAmqp.LOGGER.finest("responseCmnHdr: " + sentence);
    		// check if response errored -> abort with 500: Internal Server Error & log details
    
    		int threadsCount = filestomerge.length;
    		Thread threadArr[] = new Thread[threadsCount];
    		Reproject reprojectArr[] = new Reproject[threadsCount];
    		int i;
    		for(i=0; i<threadsCount; i++)
    			//for(String file : filestomerge)
    		{
    			String file = filestomerge[i];
    			reprojectArr[i] = new Reproject(this, jobId, prefix, file);
    			threadArr[i] = new Thread(reprojectArr[i], "reproject: " + String.valueOf(i)); 
    
    			threadArr[i].start();
    		}
    
    		// wait until all threads finished
    
    		for(i=0; i<threadsCount; i++)
    			//for(Thread thread : threadArr)
    		{
    			try
    			{
    				threadArr[i].join();
    			}
    			catch (InterruptedException e)
    			{
    				e.printStackTrace();
    			}
    
    
    			for(String sentence : reprojectArr[i].response) VlkbAmqp.LOGGER.finest("response[" + String.valueOf(i) + "]: " + sentence);
    			if(!isResponseOk(reprojectArr[i].response))
    			{
    				;// FIXME response incorrect -> abort merge-job, free resources
    				 // if incorrect paarams -> respond HTTP.WRONG REQUEST
    				 // if other error       -> respond HTTP.INTRNAL ERRR & log
    			}
    		}
    
    		String[] response = mergefiles_add_reprojected(jobId, prefix);
    		// check if response errored -> abort with 500: Internal Server Error & log details
    
    		return response;
    	}
    
    	private boolean isResponseOk(String[] response)
    	{
    		// FIXME implement!
    		return true;
    	}
    
    
    
    
    
    	protected String[] mergefiles_split_execution(
    			String jobId,        // IN any identifier to be guaranteed distinct
    			String logfilename,  // IN logfilename without path
    			String prefix,          // IN prefix added after filename-start-word
    			String[] filestomerge)  // IN abs path with filenames to be merged
    	{
    		LOGGER.fine("trace");
    
    		String[] responseCH = mergefiles_common_header(jobId, logfilename, prefix, filestomerge);
    		// check if response errored -> abort with 500: Internal Server Error & log details
    
    		for(String file : filestomerge)// FIXME parallelize on threads & join
    		{
    			String[] response = mergefiles_reproject(jobId, prefix, file);
    			// check if response errored -> abort with: 500: Internal Server Error & log details
    		}
    
    		String[] response = mergefiles_add_reprojected(jobId, prefix);
    		// check if response errored -> abort with 500: Internal Server Error & log details
    
    		return response;
    	}
    
    	protected String[] mergefiles_common_header(
    			String jobId,     // IN jobId to distinguish parallel executed requests
    			String logfilename,     // IN logfilename without path
    			String prefix,          // IN prefix added after filename-start-word
    			String[] filestomerge)  // IN abs path with filenames to be merged
    	{
    		LOGGER.fine("trace");
    
    		String InJson = JsonEncoderMerge.mergefilesCommonHeaderToJson(jobId, prefix, filestomerge);
    		String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
    		String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson);
    
    		return results;
    	}
    
    
    	protected String[] mergefiles_reproject(
    			String jobId,     // IN jobId to distinguish parallel executed requests
    			String prefix,          // IN prefix added after filename-start-word
    			String fitsfilename)    // IN logfilename without path
    	{
    		LOGGER.fine("trace");
    
    		String InJson = JsonEncoderMerge.mergefilesReprojectToJson(jobId, prefix, fitsfilename);
    		String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
    		String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson);
    
    		return results;
    	}
    
    
    	protected String[] mergefiles_add_reprojected(
    			String jobId,     // IN jobId to distinguish parallel executed requests
    			String prefix)          // IN prefix added after filename-start-word
    	{
    		LOGGER.fine("trace");
    
    		String InJson = JsonEncoderMerge.mergefilesAddReprojectedToJson(jobId, prefix);
    		String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
    		String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson);
    
    		return results;
    	}
    
    	// END parallel
    
    
    
    
    
    	// returns selected data in list of strings:
    	// -- from cutout: the cutout filename (server local)
    	private String[] selectCutPathnames(CutResult[] results) {
    
    		LOGGER.fine("trace");
    
    		// return only data (without MSG's LOG's etc)
    		ArrayList<String> data = new ArrayList<String>();
    
    		// sanity check - move after doFunc call (here covered by exception)
    		// FIXME consider catch null-pointer-exception
    		if(results == null) {
    			LOGGER.finest("selectCutPathnames: results-table is null.");
    			return null;
    		}
    
    		for (CutResult res : results) {
    
    			/*/ protect substring() calls below;
    			// FIXME consider catch exception index-out-of-bounds
    			if(res.length() < 3) {
    			LOGGER.warning("Assert(Results.toXML): results msg shorter then 3 chars : " + res);
    			continue;
    			}
    
    			// decode msg type
    			switch(res.substring(0,3)){
    			case "URL": // from cutout: the cutout filename for download
    			String localfname = res.substring(4);//.replaceAll(FITScutpath, "");
    			String[] ssfn = localfname.split(":");
    			//String[] ssfn = localfname.substring(4).split(":");
    			LOGGER.finest("ssfn[0]: " + ssfn[0]);
    			LOGGER.finest("ssfn[1]: " + ssfn[1]);
    			data.add(ssfn[1]);
    			//data.add(localfname);
    			break;
    			case "NVS": // from cutout : NVS_nn:nn:nn
    			case "MSG":
    			case "LOG":
    			case "CUT": // from cutout: the file which was cut
    			case "HID": // from search
    							// no data, do nothing
    							break;
    							default:
    							LOGGER.severe("Assert(Results.toXML): results msg has unhandled msgtype code : " + res);
    							}*/
    			data.add(res.fileName);
    		}
    
    		return data.toArray(new String[data.size()]);
    	}
    
    
    }