Something went wrong on our end
Select Git revision
-
Giovanni La Mura authoredGiovanni La Mura authored
RpcOverAmqp.java 4.43 KiB
// At each vlkb-request:
// establish "connection" to RabbitMQ-broker (host:port) on autogenerated "channel" as user ???
// then using this connection:channel do:
// * create a reply queue with autogenerated name
// * start a consumer on this queue
// generate a message with properties: corrId & reply-queue
// * publish the message to the pre-defined "amq.direct" exchange with routingKey from config
// * start waiting on reply-queue for next delivery
//
// It is admins responsibility to configure routingKey in Java-client (see Settings) to the
// same value as queuename used starting vlkbd to ensure delivery
// of vlkb-requests from Exchange to the correct queue
import java.util.logging.Logger;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;
public class RpcOverAmqp
{
private static final Logger LOGGER = Logger.getLogger("RpcOverAmqp");
private final boolean NO_ACK = true;
// affects message consume from queue:
// broker will remove msg right after delivery without waiting for confirmation
// improves performance on expense of reliability:
private String userName = "guest";
private String password = "guest";
private String hostName;
private int portNumber;
private String routingKey;
private Connection connection;
private Channel channel;
private String replyQueueName;
private QueueingConsumer consumer;
private int channelNumber;
public static String doRpc(Settings.AmqpConn amqpConn, String InStr)
{
final String userName = "guest";
final String password = "guest";
// FIXME move these to Settings
RpcOverAmqp rpc = new RpcOverAmqp(
userName, password,
amqpConn.hostName(),
amqpConn.portNumber(),
amqpConn.routingKey());
rpc.initConnectionAndReplyQueue();
String OutStr = null;
try
{
LOGGER.info("Sent request : " + InStr);
OutStr = rpc.callAndWaitReply(InStr);
LOGGER.info("Got response : " + OutStr);
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
try
{
rpc.close();
}
catch (Exception ignore)
{
LOGGER.info("ignoring exception on rpc.close():" + ignore.getMessage());
}
}
return OutStr;
}
RpcOverAmqp(String userName, String password, String hostName, int portNumber, String routingKey)
{
this.userName = userName;
this.password = password;
this.hostName = hostName;
this.portNumber = portNumber;
this.routingKey = routingKey;
}
public void initConnectionAndReplyQueue()
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setUsername(userName);
factory.setPassword(password);
connection = factory.newConnection();
channel = connection.createChannel();
channelNumber = channel.getChannelNumber();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
// Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag.
channel.basicConsume(replyQueueName, NO_ACK, consumer);
}
catch(Exception e)
{
e.printStackTrace();
}
}
public String callAndWaitReply(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
// send rpc params and where to reply (reply-queue & corrId)
channel.basicPublish("", routingKey, props, message.getBytes("UTF-8"));
//channel.basicPublish("amq.direct", routingKey, props, message.getBytes("UTF-8"));
// wait for reply msg and return if corrId matches
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("CorrId sent[" + channelNumber + "]: " + delivery.getProperties().getCorrelationId()
+ "\nCorrId recv: " + corrId
+ "\nreplyQueueName: " + replyQueueName);
if (delivery.getProperties().getCorrelationId().equals(corrId))
{
response = new String(delivery.getBody(),"UTF-8");
break;
}
}
return response;
}
public void close() throws Exception
{
connection.close();
}
}