Select Git revision
VlkbAmqp.java
-
Robert Butora authoredRobert Butora authored
VlkbAmqp.java 15.39 KiB
import java.util.logging.Logger;
import java.util.logging.Level;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;
import java.time.Instant;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.nio.file.StandardOpenOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.*;// Timestamp in cut-filename
import java.io.ByteArrayOutputStream; // for SODA direct streaming doSubimgStream
import vo.parameter.*;
class VlkbAmqp implements Vlkb
{
static final Logger LOGGER = Logger.getLogger(VlkbAmqp.class.getName());
private Settings settings = null;
private Subsurvey[] subsurveys = null;
private Resolver resolver = null;
public VlkbAmqp()
{
LOGGER.fine("trace VlkbAmqp()");
this.settings = Settings.getInstance();
this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys)
: new ResolverByObsCore(settings.dbConn, subsurveys));
}
public VlkbAmqp(Settings settings)
{
LOGGER.fine("trace VlkbAmqp(settings)");
this.settings = settings;
this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys)
: new ResolverByObsCore(settings.dbConn, subsurveys));
}
public VlkbAmqp(Settings settings, Subsurvey[] subsurveys)
{
LOGGER.fine("trace VlkbAmqp(settings, subsurveys)");
this.settings = settings;
this.subsurveys = subsurveys;
this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys)
: new ResolverByObsCore(settings.dbConn, subsurveys));
}
public NullValueCount doCountNullValues(String absPathname, int hdunum)
throws IOException, InterruptedException
{
LOGGER.fine("trace: not implemented; TB deprecated");
return new NullValueCount();
}
public CutResult doMerge(String[] idArr, Coord coord, boolean countNullValues)
throws FileNotFoundException, IOException
{
LOGGER.fine("trace");
return merge(idArr, coord, countNullValues);
}
///////////////////////////////////////////////////////////////////////////////////
public void doFile(String relPathname, int hdunum,
Pos pos, Band band, Time time, Pol pol, String dummyCutAbsPathname)
throws IOException, InterruptedException
{
;// only placehoder for compatibility with Vlkb.java interface,
//Amqp support is deprecated
}
public void doFile(String relPathname, int hdunum,
String pixels, String dummyCutAbsPathname)
throws IOException, InterruptedException
{
;// only placehoder for compatibility with Vlkb.java interface,
//Amqp support is deprecated
}
public CutResult doFileAmqp(String relPathname, int hdunum,
Pos pos, Band band, Time time, Pol pol,
boolean countNullValues, FitsCard[] extraCards, String dummyCutAbsPathname)
throws IOException, InterruptedException
{
LOGGER.fine("trace: " + pos.toString() );
CutResult cutResult = new CutResult();
LOGGER.finer("Using AMQP");
JsonEncoder jReq = new JsonEncoder();
jReq.add(relPathname, hdunum);
jReq.add(pos);
jReq.add(band);
jReq.add(time);
jReq.add(pol);
jReq.add(countNullValues);
jReq.add(extraCards);
String outJson = RpcOverAmqp.doRpc( settings.amqpConn, jReq.toString() );
cutResult = JsonDecoder.responseFromCutoutJson( outJson );
return cutResult;
}
private CutResult doFileById(String id, Pos pos, Band band, Time time, Pol pol,
boolean countNullValues, Subsurvey[] subsurveys)
throws IOException, InterruptedException
{
LOGGER.fine("trace");
FitsCard[] extraCards = null;
this.resolver.resolve(id);
String relPathname = this.resolver.relPathname();
int hdunum = this.resolver.hdunum();
String subsurveyId = this.resolver.obsCollection();
if(subsurveyId != null)
{
extraCards = Subsurvey.subsurveysFindCards(subsurveys, subsurveyId);
}
else
{
LOGGER.finer("Resolver returns subsurveyId null: no extraCards loaded.");
}
final String DEFAULT_TIME_SYSTEM = "MJD_UTC"; // FIXME take from confif file
CutResult cutResult = doFileAmqp(relPathname, hdunum, pos, band, time, pol,
countNullValues, extraCards, null);
return cutResult;
}
///////////////////////////////////////////////////////////////////////////////////
public MCutResult doMCutout(String jdlJson, String workDir)
throws IOException
{
LOGGER.fine("trace");
MCutResult mCutResult;
LOGGER.finer("doMCutout over AMQP");
String updatedJsonString = JdlMCutout.resolveAndUpdateJsonRequest(jdlJson, settings, subsurveys);
LOGGER.finest("doMCutout over AMQP : " + updatedJsonString);
String outJson = RpcOverAmqp.doRpc(settings.amqpConn, JdlMCutout.mcutoutToJson(updatedJsonString) );
mCutResult = JdlMCutout.responseFromMCutoutJson(outJson);
return mCutResult;
}
/* ================= MERGE =============================== */
private String generateSubimgPathname(String relPathname, int hdunum)
{
String cutfitsname = "vlkb-cutout";
Instant instant = Instant.now() ;
String timestamp = instant.toString().replace(":","-").replace(".","_");
String tempPathname1 = relPathname.replaceAll("/","-");
String tempPathname2 = tempPathname1.replaceAll(" ","_");
if(hdunum == 1)
{
return cutfitsname + "_" + timestamp + "_" + tempPathname2;
}
else
{
String extnum = "EXT" + String.valueOf(hdunum-1);
return cutfitsname + "_" + timestamp + "_" + extnum + "_" + tempPathname2;
}
}
private CutResult cutout(
String publisherDid, Coord coord,
boolean countNullValues)
{
// ResolverByObsCore rsl = new ResolverByObsCore(settings.dbConn, subsurveys);
Resolver rsl = resolver;//new ResolverFromId();//settings.dbConn, subsurveys);
rsl.resolve(publisherDid);
FitsCard[] extraCards = null;
//Subsurvey.subsurveysFindCards(subsurveys, rsl.obsCollection());//rsl.subsurveyId);
String absSubimgPathname = settings.fitsPaths.cutouts() + "/"
+ generateSubimgPathname(rsl.relPathname(), rsl.hdunum());
LOGGER.finest("absSubimgPathname: " + absSubimgPathname);
LOGGER.finer("Using AMQP");
JsonEncoder jReq = new JsonEncoder();
jReq.add(rsl.relPathname(), rsl.hdunum());
jReq.add(coord.pos);
jReq.add(coord.band);
jReq.add(coord.time);
jReq.add(coord.pol);
jReq.add(countNullValues);
jReq.add(extraCards);
String inJson = jReq.toString();
return JsonDecoder.responseFromCutoutJson( RpcOverAmqp.doRpc(settings.amqpConn, inJson) );
}
protected CutResult merge(String[] pubdids, Coord coord, Boolean countNullValues)
{
LOGGER.fine("trace");
ArrayList<CutResult> allresults = new ArrayList<CutResult>();
// 1. Decode pubdid's from inputs.pubdid and cutout on each
CutResult[] allCutResults = do_cutouts(
pubdids, coord,
countNullValues);
allresults.addAll(Arrays.asList(allCutResults));
String[] allCutPathnames = selectCutPathnames(allCutResults);
if( allCutPathnames.length <= 0 ){
LOGGER.warning("No cutout created.");
return null;
}
if( allCutPathnames.length != pubdids.length ) {
LOGGER.warning("Number of cubes found and number of cutouts created do not match.");
}
try
{
// 2. regridding (closest neighbour interpolation) for files to be merged
Regrid grid = new Regrid();
int dim = grid.dimensions(allCutPathnames);
if( dim > 2 ) {
Boolean changed = grid.regrid_vel2(allCutPathnames);
if(changed){
//allresults.add("MSG Keywords CDELT3, CRVAL3 were adjusted for merge regridding.");
LOGGER.finer("MSG Keywords CDELT3, CRVAL3 were adjusted for merge regridding.");
}
}
// 3. Merge cut-files
//String[] strar_results = mergefiles_parallel(id, logFileName, // logfileName
//String[] strar_results = mergefiles_split_execution(id, logFileName, // logfileName
CutResult strar_results = mergefiles(
String.valueOf(dim), // prefix: "2D" or "3D"
allCutPathnames); // files to merge
allresults.addAll(Arrays.asList(strar_results));
}
catch(Exception e)
{
LOGGER.log(Level.SEVERE, "merge:",e);
//allresults.add(
// "MSG System error. Report time, your IP-number, and the exact request-URL string to service administrator.");
}
CutResult[] dlkArr = allresults.toArray(new CutResult[allresults.size()]);
return dlkArr[0]; // FIXME should return only datalink for the merged file not all cutout files?
}
protected CutResult[] do_cutouts(
String[] publisherDids, Coord coord,
Boolean countNullValues)
{
ArrayList<CutResult> allresults = new ArrayList<CutResult>();
if(publisherDids.length <= 0)
return null; // no cube found
for(String publisherDid : publisherDids)
{
CutResult cutout_results_table = cutout(
publisherDid, coord,
countNullValues);
allresults.addAll(Arrays.asList(cutout_results_table));
}
return allresults.toArray(new CutResult[allresults.size()]);
}
protected CutResult mergefiles(
String prefix, // IN prefix added after filename-start-word
String[] filestomerge) // IN abs path with filenames to be merged
{
LOGGER.fine("trace");
String InJson = JsonEncoderMerge.mergefilesToJson( prefix, filestomerge);
String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
return JsonDecoder.responseFromCutoutJson( OutJson );
}
// BEGIN parallel
protected String[] mergefiles_parallel(
String jobId, // IN any identifier to be guaranteed distinct
String logfilename, // IN logfilename without path
String prefix, // IN prefix added after filename-start-word
String[] filestomerge) // IN abs path with filenames to be merged
{
LOGGER.fine("trace");
String[] responseCH = mergefiles_common_header(jobId, logfilename, prefix, filestomerge);
for(String sentence : responseCH) VlkbAmqp.LOGGER.finest("responseCmnHdr: " + sentence);
// check if response errored -> abort with 500: Internal Server Error & log details
int threadsCount = filestomerge.length;
Thread threadArr[] = new Thread[threadsCount];
Reproject reprojectArr[] = new Reproject[threadsCount];
int i;
for(i=0; i<threadsCount; i++)
//for(String file : filestomerge)
{
String file = filestomerge[i];
reprojectArr[i] = new Reproject(this, jobId, prefix, file);
threadArr[i] = new Thread(reprojectArr[i], "reproject: " + String.valueOf(i));
threadArr[i].start();
}
// wait until all threads finished
for(i=0; i<threadsCount; i++)
//for(Thread thread : threadArr)
{
try
{
threadArr[i].join();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
for(String sentence : reprojectArr[i].response) VlkbAmqp.LOGGER.finest("response[" + String.valueOf(i) + "]: " + sentence);
if(!isResponseOk(reprojectArr[i].response))
{
;// FIXME response incorrect -> abort merge-job, free resources
// if incorrect paarams -> respond HTTP.WRONG REQUEST
// if other error -> respond HTTP.INTRNAL ERRR & log
}
}
String[] response = mergefiles_add_reprojected(jobId, prefix);
// check if response errored -> abort with 500: Internal Server Error & log details
return response;
}
private boolean isResponseOk(String[] response)
{
// FIXME implement!
return true;
}
protected String[] mergefiles_split_execution(
String jobId, // IN any identifier to be guaranteed distinct
String logfilename, // IN logfilename without path
String prefix, // IN prefix added after filename-start-word
String[] filestomerge) // IN abs path with filenames to be merged
{
LOGGER.fine("trace");
String[] responseCH = mergefiles_common_header(jobId, logfilename, prefix, filestomerge);
// check if response errored -> abort with 500: Internal Server Error & log details
for(String file : filestomerge)// FIXME parallelize on threads & join
{
String[] response = mergefiles_reproject(jobId, prefix, file);
// check if response errored -> abort with: 500: Internal Server Error & log details
}
String[] response = mergefiles_add_reprojected(jobId, prefix);
// check if response errored -> abort with 500: Internal Server Error & log details
return response;
}
protected String[] mergefiles_common_header(
String jobId, // IN jobId to distinguish parallel executed requests
String logfilename, // IN logfilename without path
String prefix, // IN prefix added after filename-start-word
String[] filestomerge) // IN abs path with filenames to be merged
{
LOGGER.fine("trace");
String InJson = JsonEncoderMerge.mergefilesCommonHeaderToJson(jobId, prefix, filestomerge);
String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson);
return results;
}
protected String[] mergefiles_reproject(
String jobId, // IN jobId to distinguish parallel executed requests
String prefix, // IN prefix added after filename-start-word
String fitsfilename) // IN logfilename without path
{
LOGGER.fine("trace");
String InJson = JsonEncoderMerge.mergefilesReprojectToJson(jobId, prefix, fitsfilename);
String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson);
return results;
}
protected String[] mergefiles_add_reprojected(
String jobId, // IN jobId to distinguish parallel executed requests
String prefix) // IN prefix added after filename-start-word
{
LOGGER.fine("trace");
String InJson = JsonEncoderMerge.mergefilesAddReprojectedToJson(jobId, prefix);
String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson);
return results;
}
// END parallel
// returns selected data in list of strings:
// -- from cutout: the cutout filename (server local)
private String[] selectCutPathnames(CutResult[] results) {
LOGGER.fine("trace");
// return only data (without MSG's LOG's etc)
ArrayList<String> data = new ArrayList<String>();
// sanity check - move after doFunc call (here covered by exception)
// FIXME consider catch null-pointer-exception
if(results == null) {
LOGGER.finest("selectCutPathnames: results-table is null.");
return null;
}
for (CutResult res : results) {
/*/ protect substring() calls below;
// FIXME consider catch exception index-out-of-bounds
if(res.length() < 3) {
LOGGER.warning("Assert(Results.toXML): results msg shorter then 3 chars : " + res);
continue;
}
// decode msg type
switch(res.substring(0,3)){
case "URL": // from cutout: the cutout filename for download
String localfname = res.substring(4);//.replaceAll(FITScutpath, "");
String[] ssfn = localfname.split(":");
//String[] ssfn = localfname.substring(4).split(":");
LOGGER.finest("ssfn[0]: " + ssfn[0]);
LOGGER.finest("ssfn[1]: " + ssfn[1]);
data.add(ssfn[1]);
//data.add(localfname);
break;
case "NVS": // from cutout : NVS_nn:nn:nn
case "MSG":
case "LOG":
case "CUT": // from cutout: the file which was cut
case "HID": // from search
// no data, do nothing
break;
default:
LOGGER.severe("Assert(Results.toXML): results msg has unhandled msgtype code : " + res);
}*/
data.add(res.fileName);
}
return data.toArray(new String[data.size()]);
}
}