Skip to content
Snippets Groups Projects
Commit 5c6fcfc1 authored by Robert Butora's avatar Robert Butora
Browse files

moves doRpc to RpcOverAmqp (and renames Settings.AMQPConn -> Settings AmqpConn)

parent cb30b206
No related branches found
No related tags found
No related merge requests found
...@@ -379,7 +379,7 @@ class CutoutImpl implements Cutout ...@@ -379,7 +379,7 @@ class CutoutImpl implements Cutout
jReq.add(countNullValues); jReq.add(countNullValues);
jReq.add(extraCards); jReq.add(extraCards);
String outJson = doRpc( jReq.toString() ); String outJson = RpcOverAmqp.doRpc( settings.amqpConn, jReq.toString() );
cutResult = JsonDecoder.responseFromCutoutJson( outJson ); cutResult = JsonDecoder.responseFromCutoutJson( outJson );
} }
...@@ -445,49 +445,6 @@ class CutoutImpl implements Cutout ...@@ -445,49 +445,6 @@ class CutoutImpl implements Cutout
} }
private String doRpc(String InStr)
{
final String userName = "guest";
final String password = "guest";
// FIXME move these to Settings
RpcOverAmqp rpc = new RpcOverAmqp(
userName, password,
settings.amqpConn.hostName(),
settings.amqpConn.portNumber(),
settings.amqpConn.routingKey());
rpc.initConnectionAndReplyQueue();
String OutStr = null;
try
{
LOGGER.info("Sent request : " + InStr);
OutStr = rpc.callAndWaitReply(InStr);
LOGGER.info("Got response : " + OutStr);
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
try
{
rpc.close();
}
catch (Exception ignore)
{
LOGGER.info("ignoring exception on rpc.close():" + ignore.getMessage());
}
}
return OutStr;
}
private String generateSubimgPathname(String relPathname, int hdunum) private String generateSubimgPathname(String relPathname, int hdunum)
{ {
String cutfitsname = "vlkb-cutout"; String cutfitsname = "vlkb-cutout";
......
...@@ -78,57 +78,13 @@ class DatasetsAmqp implements Datasets ...@@ -78,57 +78,13 @@ class DatasetsAmqp implements Datasets
LOGGER.info("doMCutout over AMQP"); LOGGER.info("doMCutout over AMQP");
String updatedJsonString = JdlMCutout.resolveAndUpdateJsonRequest(jdlJson, settings, subsurveys); String updatedJsonString = JdlMCutout.resolveAndUpdateJsonRequest(jdlJson, settings, subsurveys);
LOGGER.info("doMCutout over AMQP : " + updatedJsonString); LOGGER.info("doMCutout over AMQP : " + updatedJsonString);
String outJson = doRpc( JdlMCutout.mcutoutToJson(updatedJsonString) ); String outJson = RpcOverAmqp.doRpc(settings.amqpConn, JdlMCutout.mcutoutToJson(updatedJsonString) );
mCutResult = JdlMCutout.responseFromMCutoutJson(outJson); mCutResult = JdlMCutout.responseFromMCutoutJson(outJson);
return mCutResult; return mCutResult;
} }
private String doRpc(String InStr)
{
LOGGER.info("trace");
final String userName = "guest";
final String password = "guest";
// FIXME move these to Settings
RpcOverAmqp rpc = new RpcOverAmqp(
userName, password,
settings.amqpConn.hostName(),
settings.amqpConn.portNumber(),
settings.amqpConn.routingKey());
rpc.initConnectionAndReplyQueue();
String OutStr = null;
try
{
LOGGER.info("Sent request : " + InStr);
OutStr = rpc.callAndWaitReply(InStr);
LOGGER.info("Got response : " + OutStr);
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
try
{
rpc.close();
}
catch (Exception ignore)
{
LOGGER.info("ignoring exception on rpc.close():" + ignore.getMessage());
}
}
return OutStr;
}
/* ================= MERGE =============================== */ /* ================= MERGE =============================== */
private String generateSubimgPathname(String relPathname, int hdunum) private String generateSubimgPathname(String relPathname, int hdunum)
...@@ -184,7 +140,7 @@ class DatasetsAmqp implements Datasets ...@@ -184,7 +140,7 @@ class DatasetsAmqp implements Datasets
String inJson = jReq.toString(); String inJson = jReq.toString();
return JsonDecoder.responseFromCutoutJson( doRpc(inJson) ); return JsonDecoder.responseFromCutoutJson( RpcOverAmqp.doRpc(settings.amqpConn, inJson) );
} }
...@@ -278,7 +234,7 @@ class DatasetsAmqp implements Datasets ...@@ -278,7 +234,7 @@ class DatasetsAmqp implements Datasets
LOGGER.info("trace"); LOGGER.info("trace");
String InJson = JsonEncoderMerge.mergefilesToJson( prefix, filestomerge); String InJson = JsonEncoderMerge.mergefilesToJson( prefix, filestomerge);
String OutJson = doRpc(InJson); String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
return JsonDecoder.responseFromCutoutJson( OutJson ); return JsonDecoder.responseFromCutoutJson( OutJson );
} }
...@@ -384,7 +340,7 @@ class DatasetsAmqp implements Datasets ...@@ -384,7 +340,7 @@ class DatasetsAmqp implements Datasets
LOGGER.info("trace"); LOGGER.info("trace");
String InJson = JsonEncoderMerge.mergefilesCommonHeaderToJson(jobId, prefix, filestomerge); String InJson = JsonEncoderMerge.mergefilesCommonHeaderToJson(jobId, prefix, filestomerge);
String OutJson = doRpc(InJson); String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson); String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson);
return results; return results;
...@@ -399,7 +355,7 @@ class DatasetsAmqp implements Datasets ...@@ -399,7 +355,7 @@ class DatasetsAmqp implements Datasets
LOGGER.info("trace"); LOGGER.info("trace");
String InJson = JsonEncoderMerge.mergefilesReprojectToJson(jobId, prefix, fitsfilename); String InJson = JsonEncoderMerge.mergefilesReprojectToJson(jobId, prefix, fitsfilename);
String OutJson = doRpc(InJson); String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson); String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson);
return results; return results;
...@@ -413,7 +369,7 @@ class DatasetsAmqp implements Datasets ...@@ -413,7 +369,7 @@ class DatasetsAmqp implements Datasets
LOGGER.info("trace"); LOGGER.info("trace");
String InJson = JsonEncoderMerge.mergefilesAddReprojectedToJson(jobId, prefix); String InJson = JsonEncoderMerge.mergefilesAddReprojectedToJson(jobId, prefix);
String OutJson = doRpc(InJson); String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson);
String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson); String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson);
return results; return results;
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// of vlkb-requests from Exchange to the correct queue // of vlkb-requests from Exchange to the correct queue
import java.util.logging.Logger;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
...@@ -24,6 +25,8 @@ import java.util.UUID; ...@@ -24,6 +25,8 @@ import java.util.UUID;
public class RpcOverAmqp public class RpcOverAmqp
{ {
private static final Logger LOGGER = Logger.getLogger("RpcOverAmqp");
private final boolean NO_ACK = true; private final boolean NO_ACK = true;
// affects message consume from queue: // affects message consume from queue:
// broker will remove msg right after delivery without waiting for confirmation // broker will remove msg right after delivery without waiting for confirmation
...@@ -43,6 +46,47 @@ public class RpcOverAmqp ...@@ -43,6 +46,47 @@ public class RpcOverAmqp
private int channelNumber; private int channelNumber;
public static String doRpc(Settings.AmqpConn amqpConn, String InStr)
{
final String userName = "guest";
final String password = "guest";
// FIXME move these to Settings
RpcOverAmqp rpc = new RpcOverAmqp(
userName, password,
amqpConn.hostName(),
amqpConn.portNumber(),
amqpConn.routingKey());
rpc.initConnectionAndReplyQueue();
String OutStr = null;
try
{
LOGGER.info("Sent request : " + InStr);
OutStr = rpc.callAndWaitReply(InStr);
LOGGER.info("Got response : " + OutStr);
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
try
{
rpc.close();
}
catch (Exception ignore)
{
LOGGER.info("ignoring exception on rpc.close():" + ignore.getMessage());
}
}
return OutStr;
}
RpcOverAmqp(String userName, String password, String hostName, int portNumber, String routingKey) RpcOverAmqp(String userName, String password, String hostName, int portNumber, String routingKey)
{ {
this.userName = userName; this.userName = userName;
......
...@@ -66,7 +66,7 @@ class Settings ...@@ -66,7 +66,7 @@ class Settings
} }
public static class AMQPConn public static class AmqpConn
{ {
private String hostName; private String hostName;
private int portNum; private int portNum;
...@@ -89,7 +89,7 @@ class Settings ...@@ -89,7 +89,7 @@ class Settings
public FITSPaths fitsPaths; public FITSPaths fitsPaths;
public DBConn dbConn; public DBConn dbConn;
public AMQPConn amqpConn; public AmqpConn amqpConn;
public DefaultParamValues defaults; public DefaultParamValues defaults;
...@@ -109,7 +109,7 @@ class Settings ...@@ -109,7 +109,7 @@ class Settings
FITSPaths fitsPaths = loadFITSPaths(properties); FITSPaths fitsPaths = loadFITSPaths(properties);
DBConn dbConn = loadDBConn(properties); DBConn dbConn = loadDBConn(properties);
AMQPConn amqpConn = loadAMQPConn(properties); AmqpConn amqpConn = loadAmqpConn(properties);
DefaultParamValues defaults = loadDefaults(properties); DefaultParamValues defaults = loadDefaults(properties);
return new Settings(dbConn, amqpConn, fitsPaths, defaults); return new Settings(dbConn, amqpConn, fitsPaths, defaults);
...@@ -128,7 +128,7 @@ class Settings ...@@ -128,7 +128,7 @@ class Settings
private Settings(DBConn dbConn, AMQPConn amqpConn, private Settings(DBConn dbConn, AmqpConn amqpConn,
FITSPaths fitsPaths, DefaultParamValues defaults) FITSPaths fitsPaths, DefaultParamValues defaults)
{ {
this.fitsPaths = fitsPaths; this.fitsPaths = fitsPaths;
...@@ -169,9 +169,9 @@ class Settings ...@@ -169,9 +169,9 @@ class Settings
return defaults; return defaults;
} }
private static AMQPConn loadAMQPConn(Properties properties) private static AmqpConn loadAmqpConn(Properties properties)
{ {
AMQPConn amqpconn = new AMQPConn(); AmqpConn amqpconn = new AmqpConn();
amqpconn.hostName = properties.getProperty("amqp_host_name", "").strip(); amqpconn.hostName = properties.getProperty("amqp_host_name", "").strip();
String strPortNum = properties.getProperty("amqp_port", "5672").strip(); String strPortNum = properties.getProperty("amqp_port", "5672").strip();
amqpconn.portNum = Integer.parseInt(strPortNum); amqpconn.portNum = Integer.parseInt(strPortNum);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment