diff --git a/data-access/servlet/src/main/java/cutout/DatasetsImpl.java b/data-access/servlet/src/main/java/cutout/DatasetsAmqp.java similarity index 77% rename from data-access/servlet/src/main/java/cutout/DatasetsImpl.java rename to data-access/servlet/src/main/java/cutout/DatasetsAmqp.java index c3101afd3bc19b80614ebc8b488507a91b62902b..cd37504e0abb10fbdb35390e118e6ea2a9076bfa 100644 --- a/data-access/servlet/src/main/java/cutout/DatasetsImpl.java +++ b/data-access/servlet/src/main/java/cutout/DatasetsAmqp.java @@ -27,36 +27,32 @@ import java.io.ByteArrayOutputStream; // for SODA direct streaming doSubimgStrea import vo.parameter.*; -class DatasetsImpl implements Datasets +class DatasetsAmqp implements Datasets { - static final Logger LOGGER = Logger.getLogger("DatasetsImpl"); + static final Logger LOGGER = Logger.getLogger(DatasetsAmqp.class.getName()); private Settings settings = null; private Subsurvey[] subsurveys = null; - private Cutout cutout = null; - public DatasetsImpl() + public DatasetsAmqp() { - LOGGER.info("trace DatasetsImpl()"); + LOGGER.info("trace DatasetsAmqp()"); this.settings = Settings.getInstance(); - cutout = new CutoutImpl(settings, subsurveys); } - public DatasetsImpl(Settings settings) + public DatasetsAmqp(Settings settings) { - LOGGER.info("trace DatasetsImpl(settings)"); + LOGGER.info("trace DatasetsAmqp(settings)"); this.settings = settings; - this.cutout = new CutoutImpl(settings, subsurveys); } - public DatasetsImpl(Settings settings, Subsurvey[] subsurveys) + public DatasetsAmqp(Settings settings, Subsurvey[] subsurveys) { - LOGGER.info("trace DatasetsImpl(settings, subsurveys)"); + LOGGER.info("trace DatasetsAmqp(settings, subsurveys)"); this.settings = settings; this.subsurveys = subsurveys; - this.cutout = new CutoutImpl(settings, subsurveys); } @@ -79,114 +75,16 @@ class DatasetsImpl implements Datasets MCutResult mCutResult; - if(settings.amqpConn.isHostnameEmpty()) - { - LOGGER.info("doMCutout with CLI"); - CutArgs[] cutArgsArr = CutArgs.parseCutArgsArr(jdlJson); - MCutResult.Cut[] cutResultArr = doCutouts( cutArgsArr ); - mCutResult = doCompressCutFiles( cutResultArr ); - } - else - { - LOGGER.info("doMCutout over AMQP"); - String updatedJsonString = JdlMCutout.resolveAndUpdateJsonRequest(jdlJson, settings, subsurveys); - LOGGER.info("doMCutout over AMQP : " + updatedJsonString); - String outJson = doRpc( JdlMCutout.mcutoutToJson(updatedJsonString) ); - mCutResult = JdlMCutout.responseFromMCutoutJson(outJson); - } + LOGGER.info("doMCutout over AMQP"); + String updatedJsonString = JdlMCutout.resolveAndUpdateJsonRequest(jdlJson, settings, subsurveys); + LOGGER.info("doMCutout over AMQP : " + updatedJsonString); + String outJson = doRpc( JdlMCutout.mcutoutToJson(updatedJsonString) ); + mCutResult = JdlMCutout.responseFromMCutoutJson(outJson); return mCutResult; } - - /* ================= ALL ================================== */ - - - private MCutResult.Cut[] doCutouts(CutArgs[] cutArgsArr) - { - LOGGER.info("trace, count of cuts : " + String.valueOf( cutArgsArr.length ) ); - - List<MCutResult.Cut> cutResList = new ArrayList<MCutResult.Cut>(); - - int ix = 0; - for(CutArgs cutArgs: cutArgsArr) - { - MCutResult.Cut cut = doFileByIdWithErr(cutArgs.id, - cutArgs.pos, cutArgs.band, cutArgs.time, cutArgs.pol, cutArgs.pixels, - cutArgs.countNullValues, null);//cutArgs.extraCards); - - cut.index = ix++; - - LOGGER.info("cut" + String.valueOf(cut.index) + " : " + cut.content); - - cutResList.add(cut); - } - - return cutResList.toArray(new MCutResult.Cut[0]); - } - - - // FIXME implement similar for Merge: MCutResult = call-Montage-demosaic-sequence(cutResultArr) - private MCutResult doCompressCutFiles(MCutResult.Cut[] cutArr) - { - // FIXME do compression here - for(MCutResult.Cut cut : cutArr) - { - LOGGER.info("TBD compress cut-id"+ String.valueOf(cut.index) + " -> " + cut.content); - } - - MCutResult mCutResult = new MCutResult(); - mCutResult.cutResArr = cutArr; - mCutResult.fileName = "filename.tar.gz"; // FIXME do-zip-all-cuts(cutResultArr) - mCutResult.fileSize = 0; - - return mCutResult; - } - - - private MCutResult.Cut doFileByIdWithErr(String id, Pos pos, Band band, Time time, Pol pol, String pixels, - boolean countNullValues, Subsurvey[] subsurveys) - { - LOGGER.info("trace"); - - MCutResult mCutResult = new MCutResult(); - MCutResult.Cut cut = mCutResult.new Cut(/* FIXME eventually add here new Inputs(id, pos,..) */); - - try - { - CutResult cutResult = cutout.doFileById(id, - pos, band, time, pol, pixels, - countNullValues, subsurveys); - - cut.content = cutResult.fileName; - cut.contentType = MCutResult.Cut.ContentType.FILENAME; - } - catch(MultiValuedParamNotSupported ex) - { - cut.content = "MultiValuedParamNotSupported: " + ex.getMessage(); - cut.contentType = MCutResult.Cut.ContentType.BAD_REQUEST; - LOGGER.info(cut.content); - } - catch(IllegalArgumentException ex) - { - cut.content = "IllegalArgumentException: " + ex.getMessage(); - cut.contentType = MCutResult.Cut.ContentType.BAD_REQUEST; - LOGGER.info(cut.content); - } - catch(Exception ex) - { - cut.content = "Exception: " + ex.getMessage(); - cut.contentType = MCutResult.Cut.ContentType.SERVICE_ERROR; - LOGGER.info(cut.content); - ex.printStackTrace(); - } - - return cut; - } - - - private String doRpc(String InStr) { LOGGER.info("trace"); @@ -231,6 +129,7 @@ class DatasetsImpl implements Datasets } + /* ================= MERGE =============================== */ private String generateSubimgPathname(String relPathname, int hdunum) { @@ -254,7 +153,6 @@ class DatasetsImpl implements Datasets } - /* ================= MERGE =============================== */ private CutResult cutout( String publisherDid, Coord coord, @@ -397,7 +295,7 @@ class DatasetsImpl implements Datasets LOGGER.info("mergefiles_parallel()"); String[] responseCH = mergefiles_common_header(jobId, logfilename, prefix, filestomerge); - for(String sentence : responseCH) DatasetsImpl.LOGGER.info("responseCmnHdr: " + sentence); + for(String sentence : responseCH) DatasetsAmqp.LOGGER.info("responseCmnHdr: " + sentence); // check if response errored -> abort with 500: Internal Server Error & log details int threadsCount = filestomerge.length; @@ -429,7 +327,7 @@ class DatasetsImpl implements Datasets } - for(String sentence : reprojectArr[i].response) DatasetsImpl.LOGGER.info("response[" + String.valueOf(i) + "]: " + sentence); + for(String sentence : reprojectArr[i].response) DatasetsAmqp.LOGGER.info("response[" + String.valueOf(i) + "]: " + sentence); if(!isResponseOk(reprojectArr[i].response)) { ;// FIXME response incorrect -> abort merge-job, free resources diff --git a/data-access/servlet/src/main/java/cutout/DatasetsCli.java b/data-access/servlet/src/main/java/cutout/DatasetsCli.java new file mode 100644 index 0000000000000000000000000000000000000000..ffbe830e6b3ecfdfe067c34bdac4524d6e107b0a --- /dev/null +++ b/data-access/servlet/src/main/java/cutout/DatasetsCli.java @@ -0,0 +1,174 @@ + +import java.util.logging.Logger; +import java.util.logging.Level; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; + +import java.time.Instant; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.nio.file.StandardOpenOption; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import java.time.*;// Timestamp in cut-filename +import java.io.ByteArrayOutputStream; // for SODA direct streaming doSubimgStream + +import vo.parameter.*; + +class DatasetsCli implements Datasets +{ + static final Logger LOGGER = Logger.getLogger("DatasetsCli"); + + private Settings settings = null; + private Subsurvey[] subsurveys = null; + private Cutout cutout = null; + + public DatasetsCli() + { + LOGGER.info("trace DatasetsCli()"); + this.settings = Settings.getInstance(); + cutout = new CutoutImpl(settings, subsurveys); + } + + + public DatasetsCli(Settings settings) + { + LOGGER.info("trace DatasetsCli(settings)"); + this.settings = settings; + this.cutout = new CutoutImpl(settings, subsurveys); + } + + + public DatasetsCli(Settings settings, Subsurvey[] subsurveys) + { + LOGGER.info("trace DatasetsCli(settings, subsurveys)"); + this.settings = settings; + this.subsurveys = subsurveys; + this.cutout = new CutoutImpl(settings, subsurveys); + } + + + + + public CutResult doMerge(String[] idArr, Coord coord, boolean countNullValues) + throws FileNotFoundException, IOException + { + LOGGER.info("trace doMerge by CLI is NOT IMPLEMENTED (only by AMQP)"); + + return new CutResult(); + } + + + + public MCutResult doMCutout(String jdlJson) + throws IOException + { + LOGGER.info("trace"); + + MCutResult mCutResult; + + CutArgs[] cutArgsArr = CutArgs.parseCutArgsArr(jdlJson); + MCutResult.Cut[] cutResultArr = doCutouts( cutArgsArr ); + mCutResult = doCompressCutFiles( cutResultArr ); + + return mCutResult; + } + + + + private MCutResult.Cut[] doCutouts(CutArgs[] cutArgsArr) + { + LOGGER.info("trace, count of cuts : " + String.valueOf( cutArgsArr.length ) ); + + List<MCutResult.Cut> cutResList = new ArrayList<MCutResult.Cut>(); + + int ix = 0; + for(CutArgs cutArgs: cutArgsArr) + { + MCutResult.Cut cut = doFileByIdWithErr(cutArgs.id, + cutArgs.pos, cutArgs.band, cutArgs.time, cutArgs.pol, cutArgs.pixels, + cutArgs.countNullValues, null);//cutArgs.extraCards); + + cut.index = ix++; + + LOGGER.info("cut" + String.valueOf(cut.index) + " : " + cut.content); + + cutResList.add(cut); + } + + return cutResList.toArray(new MCutResult.Cut[0]); + } + + + // FIXME implement similar for Merge: MCutResult = call-Montage-demosaic-sequence(cutResultArr) + private MCutResult doCompressCutFiles(MCutResult.Cut[] cutArr) + { + // FIXME do compression here + for(MCutResult.Cut cut : cutArr) + { + LOGGER.info("TBD compress cut-id"+ String.valueOf(cut.index) + " -> " + cut.content); + } + + MCutResult mCutResult = new MCutResult(); + mCutResult.cutResArr = cutArr; + mCutResult.fileName = "filename.tar.gz"; // FIXME do-zip-all-cuts(cutResultArr) + mCutResult.fileSize = 0; + + return mCutResult; + } + + + private MCutResult.Cut doFileByIdWithErr(String id, Pos pos, Band band, Time time, Pol pol, String pixels, + boolean countNullValues, Subsurvey[] subsurveys) + { + LOGGER.info("trace"); + + MCutResult mCutResult = new MCutResult(); + MCutResult.Cut cut = mCutResult.new Cut(/* FIXME eventually add here new Inputs(id, pos,..) */); + + try + { + CutResult cutResult = cutout.doFileById(id, + pos, band, time, pol, pixels, + countNullValues, subsurveys); + + cut.content = cutResult.fileName; + cut.contentType = MCutResult.Cut.ContentType.FILENAME; + } + catch(MultiValuedParamNotSupported ex) + { + cut.content = "MultiValuedParamNotSupported: " + ex.getMessage(); + cut.contentType = MCutResult.Cut.ContentType.BAD_REQUEST; + LOGGER.info(cut.content); + } + catch(IllegalArgumentException ex) + { + cut.content = "IllegalArgumentException: " + ex.getMessage(); + cut.contentType = MCutResult.Cut.ContentType.BAD_REQUEST; + LOGGER.info(cut.content); + } + catch(Exception ex) + { + cut.content = "Exception: " + ex.getMessage(); + cut.contentType = MCutResult.Cut.ContentType.SERVICE_ERROR; + LOGGER.info(cut.content); + ex.printStackTrace(); + } + + return cut; + } + +} + diff --git a/data-access/servlet/src/main/java/cutout/Reproject.java b/data-access/servlet/src/main/java/cutout/Reproject.java index d2505d99559a4676648609330699189fb75b846d..03d757149922ff11a777c8255ebd58140170abb8 100644 --- a/data-access/servlet/src/main/java/cutout/Reproject.java +++ b/data-access/servlet/src/main/java/cutout/Reproject.java @@ -8,9 +8,9 @@ class Reproject implements Runnable String prefix; String fileName; String[] response; - DatasetsImpl datasets; + DatasetsAmqp datasets; - public Reproject(DatasetsImpl datasets, String id, String prefix, String fileName) + public Reproject(DatasetsAmqp datasets, String id, String prefix, String fileName) { this.datasets = datasets; this.id = id; @@ -23,9 +23,9 @@ class Reproject implements Runnable public void run() { String name = Thread.currentThread().getName(); - DatasetsImpl.LOGGER.info("Start of " + name); + DatasetsAmqp.LOGGER.info("Start of " + name); response = datasets.mergefiles_reproject(id, prefix, fileName); - DatasetsImpl.LOGGER.info("End of " + name); + DatasetsAmqp.LOGGER.info("End of " + name); } } diff --git a/data-access/servlet/src/main/java/webapi/ServletMCutout.java b/data-access/servlet/src/main/java/webapi/ServletMCutout.java index 9131f21e75030d90ec5aa485764a01dde4b93f0f..2cc1ea646a6120b5f792ce1cf85a687533731373 100644 --- a/data-access/servlet/src/main/java/webapi/ServletMCutout.java +++ b/data-access/servlet/src/main/java/webapi/ServletMCutout.java @@ -40,8 +40,7 @@ public class ServletMCutout extends javax.servlet.http.HttpServlet private static final Logger LOGGER = Logger.getLogger(ServletMCutout.class.getName()); private static final Settings settings = Settings.getInstance(); - protected Datasets datasets = new DatasetsImpl(settings); - + protected Datasets datasets = ( settings.amqpConn.isHostnameEmpty() ? new DatasetsCli(settings): new DatasetsAmqp(settings) ); public void init() throws ServletException diff --git a/data-access/servlet/src/main/java/webapi/ServletMerge.java b/data-access/servlet/src/main/java/webapi/ServletMerge.java index 2b34a4e59bc7f75d7b30ae8b2f0cfad3c28a086f..7b430faa553667a45a920f1d52507caf676793d5 100644 --- a/data-access/servlet/src/main/java/webapi/ServletMerge.java +++ b/data-access/servlet/src/main/java/webapi/ServletMerge.java @@ -47,8 +47,7 @@ public class ServletMerge extends javax.servlet.http.HttpServlet final String DEFAULT_SPEC_SYSTEM = settings.defaults.specSystem; - Datasets datasets = new DatasetsImpl(settings); - + protected Datasets datasets = ( settings.amqpConn.isHostnameEmpty() ? new DatasetsCli(settings): new DatasetsAmqp(settings) ); public void init() throws ServletException diff --git a/data-access/servlet/src/main/java/webapi/UWSMCutoutWork.java b/data-access/servlet/src/main/java/webapi/UWSMCutoutWork.java index 3c613df287139ef83512291b3facdb3dcc46dc4b..8a5de968068e5b5ad7b36fb4c274c1a78693375a 100644 --- a/data-access/servlet/src/main/java/webapi/UWSMCutoutWork.java +++ b/data-access/servlet/src/main/java/webapi/UWSMCutoutWork.java @@ -39,7 +39,7 @@ public class UWSMCutoutWork extends JobThread private Settings settings = UWSMCutout.settings; - protected Datasets datasets = new DatasetsImpl(settings); + protected Datasets datasets = ( settings.amqpConn.isHostnameEmpty() ? new DatasetsCli(settings): new DatasetsAmqp(settings) ); /* NOTE needed if cutouts dir served by vlkb-datasets */ private String webappRootRequestUrl = null; diff --git a/data-access/servlet/src/main/java/webapi/UWSMergeWork.java b/data-access/servlet/src/main/java/webapi/UWSMergeWork.java index 667d43695324850ffbc8f27e7420f9cec15d80eb..4883a6a9c7ef7578785dcd331bfc1591d4d1d12c 100644 --- a/data-access/servlet/src/main/java/webapi/UWSMergeWork.java +++ b/data-access/servlet/src/main/java/webapi/UWSMergeWork.java @@ -35,7 +35,7 @@ public class UWSMergeWork extends JobThread final String DEFAULT_SPEC_SYSTEM = settings.defaults.specSystem; final String DEFAULT_TIME_SYSTEM = "MJD_UTC"; - Datasets datasets = new DatasetsImpl(settings); + protected Datasets datasets = ( settings.amqpConn.isHostnameEmpty() ? new DatasetsCli(settings): new DatasetsAmqp(settings) ); /* NOTE needed if cutouts dir served by vlkb-datasets */ private String webappRootRequestUrl = null; diff --git a/data-access/servlet/src/main/java/webapi/UWSSodaWork.java b/data-access/servlet/src/main/java/webapi/UWSSodaWork.java index cc6c31a645426aef3c73a141b5ad83fa3eadef80..5195fa2eee8ae0db2951c3e8f785f9c0d73cd566 100644 --- a/data-access/servlet/src/main/java/webapi/UWSSodaWork.java +++ b/data-access/servlet/src/main/java/webapi/UWSSodaWork.java @@ -32,6 +32,7 @@ public class UWSSodaWork extends JobThread /* NOTE needed if cutouts dir served by vlkb-datasets */ private String webappRootRequestUrl = null; + protected Datasets datasets = ( settings.amqpConn.isHostnameEmpty() ? new DatasetsCli(settings): new DatasetsAmqp(settings) );$ public UWSSodaWork(UWSJob j) throws UWSException @@ -93,8 +94,6 @@ public class UWSSodaWork extends JobThread extraCards = Subsurvey.subsurveysFindCards(subsurveys, rsl.subsurveyId); } - Datasets datasets = new DatasetsImpl(settings); - final String DEFAULT_TIME_SYSTEM = "MJD_UTC"; // FIXME take from confif file if(pos != null) pos.setSystem(Pos.System.valueOf(DEFAULT_SKY_SYSTEM));