import java.util.logging.Logger; import java.util.logging.Level; import java.util.List; import java.util.ArrayList; import java.util.Arrays; import java.time.Instant; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.nio.file.StandardOpenOption; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.*;// Timestamp in cut-filename import java.io.ByteArrayOutputStream; // for SODA direct streaming doSubimgStream import vo.parameter.*; class VlkbAmqp implements Vlkb { static final Logger LOGGER = Logger.getLogger(VlkbAmqp.class.getName()); private Settings settings = null; private Subsurvey[] subsurveys = null; private Resolver resolver = null; public VlkbAmqp() { LOGGER.fine("trace VlkbAmqp()"); this.settings = Settings.getInstance(); this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys) : new ResolverByObsCore(settings.dbConn, subsurveys)); } public VlkbAmqp(Settings settings) { LOGGER.fine("trace VlkbAmqp(settings)"); this.settings = settings; this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys) : new ResolverByObsCore(settings.dbConn, subsurveys)); } public VlkbAmqp(Settings settings, Subsurvey[] subsurveys) { LOGGER.fine("trace VlkbAmqp(settings, subsurveys)"); this.settings = settings; this.subsurveys = subsurveys; this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys) : new ResolverByObsCore(settings.dbConn, subsurveys)); } public CutResult doMerge(String[] idArr, Coord coord, boolean countNullValues) throws FileNotFoundException, IOException { LOGGER.fine("trace"); return merge(idArr, coord, countNullValues); } /////////////////////////////////////////////////////////////////////////////////// public CutResult doFile(String relPathname, int hdunum, Pos pos, Band band, Time time, Pol pol, String pixels, boolean countNullValues, FitsCard[] extraCards, String dummyCutAbsPathname) throws IOException, InterruptedException { LOGGER.fine("trace: " + pos.toString() ); CutResult cutResult = new CutResult(); LOGGER.finer("Using AMQP"); JsonEncoder jReq = new JsonEncoder(); jReq.add(relPathname, hdunum); jReq.add(pos); jReq.add(band); jReq.add(time); jReq.add(pol); // jReq.add(pixels), FIXME implement to supoort PIXLES in vlkb-legacy by AMQP jReq.add(countNullValues); jReq.add(extraCards); String outJson = RpcOverAmqp.doRpc( settings.amqpConn, jReq.toString() ); cutResult = JsonDecoder.responseFromCutoutJson( outJson ); return cutResult; } private CutResult doFileById(String id, Pos pos, Band band, Time time, Pol pol, String pixels, boolean countNullValues, Subsurvey[] subsurveys) throws IOException, InterruptedException { LOGGER.fine("trace"); FitsCard[] extraCards = null; this.resolver.resolve(id); String relPathname = this.resolver.relPathname(); int hdunum = this.resolver.hdunum(); String subsurveyId = this.resolver.obsCollection(); if(subsurveyId != null) { extraCards = Subsurvey.subsurveysFindCards(subsurveys, subsurveyId); } else { LOGGER.finer("Resolver returns subsurveyId null: no extraCards loaded."); } final String DEFAULT_TIME_SYSTEM = "MJD_UTC"; // FIXME take from confif file CutResult cutResult = doFile(relPathname, hdunum, pos, band, time, pol, pixels, countNullValues, extraCards, null); return cutResult; } /////////////////////////////////////////////////////////////////////////////////// public MCutResult doMCutout(String jdlJson, String workDir) throws IOException { LOGGER.fine("trace"); MCutResult mCutResult; LOGGER.finer("doMCutout over AMQP"); String updatedJsonString = JdlMCutout.resolveAndUpdateJsonRequest(jdlJson, settings, subsurveys); LOGGER.finest("doMCutout over AMQP : " + updatedJsonString); String outJson = RpcOverAmqp.doRpc(settings.amqpConn, JdlMCutout.mcutoutToJson(updatedJsonString) ); mCutResult = JdlMCutout.responseFromMCutoutJson(outJson); return mCutResult; } /* ================= MERGE =============================== */ private String generateSubimgPathname(String relPathname, int hdunum) { String cutfitsname = "vlkb-cutout"; Instant instant = Instant.now() ; String timestamp = instant.toString().replace(":","-").replace(".","_"); String tempPathname1 = relPathname.replaceAll("/","-"); String tempPathname2 = tempPathname1.replaceAll(" ","_"); if(hdunum == 1) { return cutfitsname + "_" + timestamp + "_" + tempPathname2; } else { String extnum = "EXT" + String.valueOf(hdunum-1); return cutfitsname + "_" + timestamp + "_" + extnum + "_" + tempPathname2; } } private CutResult cutout( String publisherDid, Coord coord, boolean countNullValues) { // ResolverByObsCore rsl = new ResolverByObsCore(settings.dbConn, subsurveys); Resolver rsl = resolver;//new ResolverFromId();//settings.dbConn, subsurveys); rsl.resolve(publisherDid); FitsCard[] extraCards = null; //Subsurvey.subsurveysFindCards(subsurveys, rsl.obsCollection());//rsl.subsurveyId); String absSubimgPathname = settings.fitsPaths.cutouts() + "/" + generateSubimgPathname(rsl.relPathname(), rsl.hdunum()); LOGGER.finest("absSubimgPathname: " + absSubimgPathname); LOGGER.finer("Using AMQP"); JsonEncoder jReq = new JsonEncoder(); jReq.add(rsl.relPathname(), rsl.hdunum()); jReq.add(coord.pos); jReq.add(coord.band); jReq.add(coord.time); jReq.add(coord.pol); // jReq.add(pixels), FIXME implement to supoort PIXLES in vlkb-legacy by AMQP jReq.add(countNullValues); jReq.add(extraCards); String inJson = jReq.toString(); return JsonDecoder.responseFromCutoutJson( RpcOverAmqp.doRpc(settings.amqpConn, inJson) ); } protected CutResult merge(String[] pubdids, Coord coord, Boolean countNullValues) { LOGGER.fine("trace"); ArrayList<CutResult> allresults = new ArrayList<CutResult>(); // 1. Decode pubdid's from inputs.pubdid and cutout on each CutResult[] allCutResults = do_cutouts( pubdids, coord, countNullValues); allresults.addAll(Arrays.asList(allCutResults)); String[] allCutPathnames = selectCutPathnames(allCutResults); if( allCutPathnames.length <= 0 ){ LOGGER.warning("No cutout created."); return null; } if( allCutPathnames.length != pubdids.length ) { LOGGER.warning("Number of cubes found and number of cutouts created do not match."); } try { // 2. regridding (closest neighbour interpolation) for files to be merged Regrid grid = new Regrid(); int dim = grid.dimensions(allCutPathnames); if( dim > 2 ) { Boolean changed = grid.regrid_vel2(allCutPathnames); if(changed){ //allresults.add("MSG Keywords CDELT3, CRVAL3 were adjusted for merge regridding."); LOGGER.finer("MSG Keywords CDELT3, CRVAL3 were adjusted for merge regridding."); } } // 3. Merge cut-files //String[] strar_results = mergefiles_parallel(id, logFileName, // logfileName //String[] strar_results = mergefiles_split_execution(id, logFileName, // logfileName CutResult strar_results = mergefiles( String.valueOf(dim), // prefix: "2D" or "3D" allCutPathnames); // files to merge allresults.addAll(Arrays.asList(strar_results)); } catch(Exception e) { LOGGER.log(Level.SEVERE, "merge:",e); //allresults.add( // "MSG System error. Report time, your IP-number, and the exact request-URL string to service administrator."); } CutResult[] dlkArr = allresults.toArray(new CutResult[allresults.size()]); return dlkArr[0]; // FIXME should return only datalink for the merged file not all cutout files? } protected CutResult[] do_cutouts( String[] publisherDids, Coord coord, Boolean countNullValues) { ArrayList<CutResult> allresults = new ArrayList<CutResult>(); if(publisherDids.length <= 0) return null; // no cube found for(String publisherDid : publisherDids) { CutResult cutout_results_table = cutout( publisherDid, coord, countNullValues); allresults.addAll(Arrays.asList(cutout_results_table)); } return allresults.toArray(new CutResult[allresults.size()]); } protected CutResult mergefiles( String prefix, // IN prefix added after filename-start-word String[] filestomerge) // IN abs path with filenames to be merged { LOGGER.fine("trace"); String InJson = JsonEncoderMerge.mergefilesToJson( prefix, filestomerge); String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson); return JsonDecoder.responseFromCutoutJson( OutJson ); } // BEGIN parallel protected String[] mergefiles_parallel( String jobId, // IN any identifier to be guaranteed distinct String logfilename, // IN logfilename without path String prefix, // IN prefix added after filename-start-word String[] filestomerge) // IN abs path with filenames to be merged { LOGGER.fine("trace"); String[] responseCH = mergefiles_common_header(jobId, logfilename, prefix, filestomerge); for(String sentence : responseCH) VlkbAmqp.LOGGER.finest("responseCmnHdr: " + sentence); // check if response errored -> abort with 500: Internal Server Error & log details int threadsCount = filestomerge.length; Thread threadArr[] = new Thread[threadsCount]; Reproject reprojectArr[] = new Reproject[threadsCount]; int i; for(i=0; i<threadsCount; i++) //for(String file : filestomerge) { String file = filestomerge[i]; reprojectArr[i] = new Reproject(this, jobId, prefix, file); threadArr[i] = new Thread(reprojectArr[i], "reproject: " + String.valueOf(i)); threadArr[i].start(); } // wait until all threads finished for(i=0; i<threadsCount; i++) //for(Thread thread : threadArr) { try { threadArr[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } for(String sentence : reprojectArr[i].response) VlkbAmqp.LOGGER.finest("response[" + String.valueOf(i) + "]: " + sentence); if(!isResponseOk(reprojectArr[i].response)) { ;// FIXME response incorrect -> abort merge-job, free resources // if incorrect paarams -> respond HTTP.WRONG REQUEST // if other error -> respond HTTP.INTRNAL ERRR & log } } String[] response = mergefiles_add_reprojected(jobId, prefix); // check if response errored -> abort with 500: Internal Server Error & log details return response; } private boolean isResponseOk(String[] response) { // FIXME implement! return true; } protected String[] mergefiles_split_execution( String jobId, // IN any identifier to be guaranteed distinct String logfilename, // IN logfilename without path String prefix, // IN prefix added after filename-start-word String[] filestomerge) // IN abs path with filenames to be merged { LOGGER.fine("trace"); String[] responseCH = mergefiles_common_header(jobId, logfilename, prefix, filestomerge); // check if response errored -> abort with 500: Internal Server Error & log details for(String file : filestomerge)// FIXME parallelize on threads & join { String[] response = mergefiles_reproject(jobId, prefix, file); // check if response errored -> abort with: 500: Internal Server Error & log details } String[] response = mergefiles_add_reprojected(jobId, prefix); // check if response errored -> abort with 500: Internal Server Error & log details return response; } protected String[] mergefiles_common_header( String jobId, // IN jobId to distinguish parallel executed requests String logfilename, // IN logfilename without path String prefix, // IN prefix added after filename-start-word String[] filestomerge) // IN abs path with filenames to be merged { LOGGER.fine("trace"); String InJson = JsonEncoderMerge.mergefilesCommonHeaderToJson(jobId, prefix, filestomerge); String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson); String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson); return results; } protected String[] mergefiles_reproject( String jobId, // IN jobId to distinguish parallel executed requests String prefix, // IN prefix added after filename-start-word String fitsfilename) // IN logfilename without path { LOGGER.fine("trace"); String InJson = JsonEncoderMerge.mergefilesReprojectToJson(jobId, prefix, fitsfilename); String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson); String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson); return results; } protected String[] mergefiles_add_reprojected( String jobId, // IN jobId to distinguish parallel executed requests String prefix) // IN prefix added after filename-start-word { LOGGER.fine("trace"); String InJson = JsonEncoderMerge.mergefilesAddReprojectedToJson(jobId, prefix); String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson); String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson); return results; } // END parallel // returns selected data in list of strings: // -- from cutout: the cutout filename (server local) private String[] selectCutPathnames(CutResult[] results) { LOGGER.fine("trace"); // return only data (without MSG's LOG's etc) ArrayList<String> data = new ArrayList<String>(); // sanity check - move after doFunc call (here covered by exception) // FIXME consider catch null-pointer-exception if(results == null) { LOGGER.finest("selectCutPathnames: results-table is null."); return null; } for (CutResult res : results) { /*/ protect substring() calls below; // FIXME consider catch exception index-out-of-bounds if(res.length() < 3) { LOGGER.warning("Assert(Results.toXML): results msg shorter then 3 chars : " + res); continue; } // decode msg type switch(res.substring(0,3)){ case "URL": // from cutout: the cutout filename for download String localfname = res.substring(4);//.replaceAll(FITScutpath, ""); String[] ssfn = localfname.split(":"); //String[] ssfn = localfname.substring(4).split(":"); LOGGER.finest("ssfn[0]: " + ssfn[0]); LOGGER.finest("ssfn[1]: " + ssfn[1]); data.add(ssfn[1]); //data.add(localfname); break; case "NVS": // from cutout : NVS_nn:nn:nn case "MSG": case "LOG": case "CUT": // from cutout: the file which was cut case "HID": // from search // no data, do nothing break; default: LOGGER.severe("Assert(Results.toXML): results msg has unhandled msgtype code : " + res); }*/ data.add(res.fileName); } return data.toArray(new String[data.size()]); } }