Select Git revision
CameraStatistics.cpp
-
Steven Lambright authored
Updated our Cube class to be closer to the current coding standards. This change will not change how Isis behaves, any file formats, or any other output from Isis. Fixes #1356. git-svn-id: http://subversion.wr.usgs.gov/repos/prog/isis3/trunk@4941 41f8697f-d340-4b68-9986-7bafba869bb8
Steven Lambright authoredUpdated our Cube class to be closer to the current coding standards. This change will not change how Isis behaves, any file formats, or any other output from Isis. Fixes #1356. git-svn-id: http://subversion.wr.usgs.gov/repos/prog/isis3/trunk@4941 41f8697f-d340-4b68-9986-7bafba869bb8
VlkbAmqp.java 15.39 KiB
import java.util.logging.Logger;
import java.util.logging.Level;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;
import java.time.Instant;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.nio.file.StandardOpenOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.*;// Timestamp in cut-filename
import java.io.ByteArrayOutputStream; // for SODA direct streaming doSubimgStream
import vo.parameter.*;
class VlkbAmqp implements Vlkb
{
static final Logger LOGGER = Logger.getLogger(VlkbAmqp.class.getName());
private Settings settings = null;
private Subsurvey[] subsurveys = null;
private Resolver resolver = null;
public VlkbAmqp()
{
LOGGER.fine("trace VlkbAmqp()");
this.settings = Settings.getInstance();
this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys)
: new ResolverByObsCore(settings.dbConn, subsurveys));
}
public VlkbAmqp(Settings settings)
{
LOGGER.fine("trace VlkbAmqp(settings)");
this.settings = settings;
this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys)
: new ResolverByObsCore(settings.dbConn, subsurveys));
}
public VlkbAmqp(Settings settings, Subsurvey[] subsurveys)
{
LOGGER.fine("trace VlkbAmqp(settings, subsurveys)");
this.settings = settings;
this.subsurveys = subsurveys;
this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys)
: new ResolverByObsCore(settings.dbConn, subsurveys));
}
public NullValueCount doCountNullValues(String absPathname, int hdunum)
throws IOException, InterruptedException
{
LOGGER.fine("trace: not implemented; TB deprecated");
return new NullValueCount();
}
public CutResult doMerge(String[] idArr, Coord coord, boolean countNullValues)
throws FileNotFoundException, IOException
{
LOGGER.fine("trace");
return merge(idArr, coord, countNullValues);
}
///////////////////////////////////////////////////////////////////////////////////
public void doFile(String relPathname, int hdunum,
Pos pos, Band band, Time time, Pol pol, String dummyCutAbsPathname)
throws IOException, InterruptedException
{
;// only placehoder for compatibility with Vlkb.java interface,
//Amqp support is deprecated
}
public void doFile(String relPathname, int hdunum,
String pixels, String dummyCutAbsPathname)
throws IOException, InterruptedException
{
;// only placehoder for compatibility with Vlkb.java interface,
//Amqp support is deprecated
}
public CutResult doFileAmqp(String relPathname, int hdunum,
Pos pos, Band band, Time time, Pol pol,
boolean countNullValues, FitsCard[] extraCards, String dummyCutAbsPathname)
throws IOException, InterruptedException
{
LOGGER.fine("trace: " + pos.toString() );
CutResult cutResult = new CutResult();
LOGGER.finer("Using AMQP");
JsonEncoder jReq = new JsonEncoder();
jReq.add(relPathname, hdunum);
jReq.add(pos);
jReq.add(band);
jReq.add(time);
jReq.add(pol);
jReq.add(countNullValues);
jReq.add(extraCards);
String outJson = RpcOverAmqp.doRpc( settings.amqpConn, jReq.toString() );
cutResult = JsonDecoder.responseFromCutoutJson( outJson );
return cutResult;
}
private CutResult doFileById(String id, Pos pos, Band band, Time time, Pol pol,
boolean countNullValues, Subsurvey[] subsurveys)
throws IOException, InterruptedException
{
LOGGER.fine("trace");
FitsCard[] extraCards = null;
this.resolver.resolve(id);
String relPathname = this.resolver.relPathname();
int hdunum = this.resolver.hdunum();
String subsurveyId = this.resolver.obsCollection();
if(subsurveyId != null)
{
extraCards = Subsurvey.subsurveysFindCards(subsurveys, subsurveyId);
}
else
{
LOGGER.finer("Resolver returns subsurveyId null: no extraCards loaded.");
}
final String DEFAULT_TIME_SYSTEM = "MJD_UTC"; // FIXME take from confif file
CutResult cutResult = doFileAmqp(relPathname, hdunum, pos, band, time, pol,
countNullValues, extraCards, null);
return cutResult;
}
///////////////////////////////////////////////////////////////////////////////////
public MCutResult doMCutout(String jdlJson, String workDir)
throws IOException
{
LOGGER.fine("trace");
MCutResult mCutResult;
LOGGER.finer("doMCutout over AMQP");
String updatedJsonString = JdlMCutout.resolveAndUpdateJsonRequest(jdlJson, settings, subsurveys);
LOGGER.finest("doMCutout over AMQP : " + updatedJsonString);
String outJson = RpcOverAmqp.doRpc(settings.amqpConn, JdlMCutout.mcutoutToJson(updatedJsonString) );
mCutResult = JdlMCutout.responseFromMCutoutJson(outJson);
return mCutResult;
}
/* ================= MERGE =============================== */
private String generateSubimgPathname(String relPathname, int hdunum)
{
String cutfitsname = "vlkb-cutout";
Instant instant = Instant.now() ;
String timestamp = instant.toString().replace(":","-").replace(".","_");
String tempPathname1 = relPathname.replaceAll("/","-");
String tempPathname2 = tempPathname1.replaceAll(" ","_");
if(hdunum == 1)
{
return cutfitsname + "_" + timestamp + "_" + tempPathname2;
}
else
{
String extnum = "EXT" + String.valueOf(hdunum-1);
return cutfitsname + "_" + timestamp + "_" + extnum + "_" + tempPathname2;
}
}
private CutResult cutout(
String publisherDid, Coord coord,
boolean countNullValues)
{
// ResolverByObsCore rsl = new ResolverByObsCore(settings.dbConn, subsurveys);
Resolver rsl = resolver;//new ResolverFromId();//settings.dbConn, subsurveys);
rsl.resolve(publisherDid);
FitsCard[] extraCards = null;
//Subsurvey.subsurveysFindCards(subsurveys, rsl.obsCollection());//rsl.subsurveyId);
String absSubimgPathname = settings.fitsPaths.cutouts() + "/"
+ generateSubimgPathname(rsl.relPathname(), rsl.hdunum());
LOGGER.finest("absSubimgPathname: " + absSubimgPathname);
LOGGER.finer("Using AMQP");
JsonEncoder jReq = new JsonEncoder();
jReq.add(rsl.relPathname(), rsl.hdunum());
jReq.add(coord.pos);
jReq.add(coord.band);
jReq.add(coord.time);
jReq.add(coord.pol);
jReq.add(countNullValues);
jReq.add(extraCards);
String inJson = jReq.toString();
return JsonDecoder.responseFromCutoutJson( RpcOverAmqp.doRpc(settings.amqpConn, inJson) );
}
protected CutResult merge(String[] pubdids, Coord coord, Boolean countNullValues)
{
LOGGER.fine("trace");
ArrayList<CutResult> allresults = new ArrayList<CutResult>();
// 1. Decode pubdid's from inputs.pubdid and cutout on each
CutResult[] allCutResults = do_cutouts(
pubdids, coord,
countNullValues);
allresults.addAll(Arrays.asList(allCutResults));
String[] allCutPathnames = selectCutPathnames(allCutResults);
if( allCutPathnames.length <= 0 ){
LOGGER.warning("No cutout created.");
return null;
}
if( allCutPathnames.length != pubdids.length ) {
LOGGER.warning("Number of cubes found and number of cutouts created do not match.");
}
try
{
// 2. regridding (closest neighbour interpolation) for files to be merged
Regrid grid = new Regrid();
int dim = grid.dimensions(allCutPathnames);
if( dim > 2 ) {
Boolean changed = grid.regrid_vel2(allCutPathnames);
if(changed){
//allresults.add("MSG Keywords CDELT3, CRVAL3 were adjusted for merge regridding.");
LOGGER.finer("MSG Keywords CDELT3, CRVAL3 were adjusted for merge regridding.");
}
}
// 3. Merge cut-files
//String[] strar_results = mergefiles_parallel(id, logFileName, // logfileName
//String[] strar_results = mergefiles_split_execution(id, logFileName, // logfileName
CutResult strar_results = mergefiles(
String.valueOf(dim), // prefix: "2D" or "3D"
allCutPathnames); // files to merge
allresults.addAll(Arrays.asList(strar_results));
}
catch(Exception e)
{
LOGGER.log(Level.SEVERE, "merge:",e);
//allresults.add(
// "MSG System error. Report time, your IP-number, and the exact request-URL string to service administrator.");
}
CutResult[] dlkArr = allresults.toArray(new CutResult[allresults.size()]);
return dlkArr[0]; // FIXME should return only datalink for the merged file not all cutout files?
}
protected CutResult[] do_cutouts(
String[] publisherDids, Coord coord,
Boolean countNullValues)
{
ArrayList<CutResult> allresults = new ArrayList<CutResult>();
if(publisherDids.length <= 0)
return null; // no cube found
for(String publisherDid : publisherDids)
{
CutResult cutout_results_table = cutout(
publisherDid, coord,
countNullValues);
allresults.addAll(Arrays.asList(cutout_results_table));
}
return allresults.toArray(new CutResult[allresults.size()]);
}
protected CutResult mergefiles(
String prefix, // IN prefix added after filename-start-word
String[] filestomerge) // IN abs path with filenames to be merged
{
LOGGER.fine("trace");
String InJson = JsonEncoderMerge.mergefilesToJson( prefix, filestomerge);
String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
return JsonDecoder.responseFromCutoutJson( OutJson );
}
// BEGIN parallel
protected String[] mergefiles_parallel(
String jobId, // IN any identifier to be guaranteed distinct
String logfilename, // IN logfilename without path
String prefix, // IN prefix added after filename-start-word
String[] filestomerge) // IN abs path with filenames to be merged
{
LOGGER.fine("trace");
String[] responseCH = mergefiles_common_header(jobId, logfilename, prefix, filestomerge);
for(String sentence : responseCH) VlkbAmqp.LOGGER.finest("responseCmnHdr: " + sentence);
// check if response errored -> abort with 500: Internal Server Error & log details
int threadsCount = filestomerge.length;
Thread threadArr[] = new Thread[threadsCount];
Reproject reprojectArr[] = new Reproject[threadsCount];
int i;
for(i=0; i<threadsCount; i++)
//for(String file : filestomerge)
{
String file = filestomerge[i];
reprojectArr[i] = new Reproject(this, jobId, prefix, file);
threadArr[i] = new Thread(reprojectArr[i], "reproject: " + String.valueOf(i));
threadArr[i].start();
}
// wait until all threads finished
for(i=0; i<threadsCount; i++)
//for(Thread thread : threadArr)
{
try
{
threadArr[i].join();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
for(String sentence : reprojectArr[i].response) VlkbAmqp.LOGGER.finest("response[" + String.valueOf(i) + "]: " + sentence);
if(!isResponseOk(reprojectArr[i].response))
{
;// FIXME response incorrect -> abort merge-job, free resources
// if incorrect paarams -> respond HTTP.WRONG REQUEST
// if other error -> respond HTTP.INTRNAL ERRR & log
}
}
String[] response = mergefiles_add_reprojected(jobId, prefix);
// check if response errored -> abort with 500: Internal Server Error & log details
return response;
}
private boolean isResponseOk(String[] response)
{
// FIXME implement!
return true;
}
protected String[] mergefiles_split_execution(
String jobId, // IN any identifier to be guaranteed distinct
String logfilename, // IN logfilename without path
String prefix, // IN prefix added after filename-start-word
String[] filestomerge) // IN abs path with filenames to be merged
{
LOGGER.fine("trace");
String[] responseCH = mergefiles_common_header(jobId, logfilename, prefix, filestomerge);
// check if response errored -> abort with 500: Internal Server Error & log details
for(String file : filestomerge)// FIXME parallelize on threads & join
{
String[] response = mergefiles_reproject(jobId, prefix, file);
// check if response errored -> abort with: 500: Internal Server Error & log details
}
String[] response = mergefiles_add_reprojected(jobId, prefix);
// check if response errored -> abort with 500: Internal Server Error & log details
return response;
}
protected String[] mergefiles_common_header(
String jobId, // IN jobId to distinguish parallel executed requests
String logfilename, // IN logfilename without path
String prefix, // IN prefix added after filename-start-word
String[] filestomerge) // IN abs path with filenames to be merged
{
LOGGER.fine("trace");
String InJson = JsonEncoderMerge.mergefilesCommonHeaderToJson(jobId, prefix, filestomerge);
String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson);
return results;
}
protected String[] mergefiles_reproject(
String jobId, // IN jobId to distinguish parallel executed requests
String prefix, // IN prefix added after filename-start-word
String fitsfilename) // IN logfilename without path
{
LOGGER.fine("trace");
String InJson = JsonEncoderMerge.mergefilesReprojectToJson(jobId, prefix, fitsfilename);
String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson);
return results;
}
protected String[] mergefiles_add_reprojected(
String jobId, // IN jobId to distinguish parallel executed requests
String prefix) // IN prefix added after filename-start-word
{
LOGGER.fine("trace");
String InJson = JsonEncoderMerge.mergefilesAddReprojectedToJson(jobId, prefix);
String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson);
return results;
}
// END parallel
// returns selected data in list of strings:
// -- from cutout: the cutout filename (server local)
private String[] selectCutPathnames(CutResult[] results) {
LOGGER.fine("trace");
// return only data (without MSG's LOG's etc)
ArrayList<String> data = new ArrayList<String>();
// sanity check - move after doFunc call (here covered by exception)
// FIXME consider catch null-pointer-exception
if(results == null) {
LOGGER.finest("selectCutPathnames: results-table is null.");
return null;
}
for (CutResult res : results) {
/*/ protect substring() calls below;
// FIXME consider catch exception index-out-of-bounds
if(res.length() < 3) {
LOGGER.warning("Assert(Results.toXML): results msg shorter then 3 chars : " + res);
continue;
}
// decode msg type
switch(res.substring(0,3)){
case "URL": // from cutout: the cutout filename for download
String localfname = res.substring(4);//.replaceAll(FITScutpath, "");
String[] ssfn = localfname.split(":");
//String[] ssfn = localfname.substring(4).split(":");
LOGGER.finest("ssfn[0]: " + ssfn[0]);
LOGGER.finest("ssfn[1]: " + ssfn[1]);
data.add(ssfn[1]);
//data.add(localfname);
break;
case "NVS": // from cutout : NVS_nn:nn:nn
case "MSG":
case "LOG":
case "CUT": // from cutout: the file which was cut
case "HID": // from search
// no data, do nothing
break;
default:
LOGGER.severe("Assert(Results.toXML): results msg has unhandled msgtype code : " + res);
}*/
data.add(res.fileName);
}
return data.toArray(new String[data.size()]);
}
}