// At each vlkb-request: // establish "connection" to RabbitMQ-broker (host:port) on autogenerated "channel" as user ??? // then using this connection:channel do: // * create a reply queue with autogenerated name // * start a consumer on this queue // generate a message with properties: corrId & reply-queue // * publish the message to the pre-defined "amq.direct" exchange with routingKey from config // * start waiting on reply-queue for next delivery // // It is admins responsibility to configure routingKey in Java-client (see Settings) to the // same value as queuename used starting vlkbd to ensure delivery // of vlkb-requests from Exchange to the correct queue import java.util.logging.Logger; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; import java.util.UUID; public class RpcOverAmqp { private static final Logger LOGGER = Logger.getLogger("RpcOverAmqp"); private final boolean NO_ACK = true; // affects message consume from queue: // broker will remove msg right after delivery without waiting for confirmation // improves performance on expense of reliability: private String userName = "guest"; private String password = "guest"; private String hostName; private int portNumber; private String routingKey; private Connection connection; private Channel channel; private String replyQueueName; private QueueingConsumer consumer; private int channelNumber; public static String doRpc(Settings.AmqpConn amqpConn, String InStr) { final String userName = "guest"; final String password = "guest"; // FIXME move these to Settings RpcOverAmqp rpc = new RpcOverAmqp( userName, password, amqpConn.hostName(), amqpConn.portNumber(), amqpConn.routingKey()); rpc.initConnectionAndReplyQueue(); String OutStr = null; try { LOGGER.info("Sent request : " + InStr); OutStr = rpc.callAndWaitReply(InStr); LOGGER.info("Got response : " + OutStr); } catch (Exception e) { e.printStackTrace(); } finally { try { rpc.close(); } catch (Exception ignore) { LOGGER.info("ignoring exception on rpc.close():" + ignore.getMessage()); } } return OutStr; } RpcOverAmqp(String userName, String password, String hostName, int portNumber, String routingKey) { this.userName = userName; this.password = password; this.hostName = hostName; this.portNumber = portNumber; this.routingKey = routingKey; } public void initConnectionAndReplyQueue() { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(hostName); factory.setPort(portNumber); factory.setUsername(userName); factory.setPassword(password); connection = factory.newConnection(); channel = connection.createChannel(); channelNumber = channel.getChannelNumber(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); // Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag. channel.basicConsume(replyQueueName, NO_ACK, consumer); } catch(Exception e) { e.printStackTrace(); } } public String callAndWaitReply(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); // send rpc params and where to reply (reply-queue & corrId) channel.basicPublish("", routingKey, props, message.getBytes("UTF-8")); //channel.basicPublish("amq.direct", routingKey, props, message.getBytes("UTF-8")); // wait for reply msg and return if corrId matches while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); System.out.println("CorrId sent[" + channelNumber + "]: " + delivery.getProperties().getCorrelationId() + "\nCorrId recv: " + corrId + "\nreplyQueueName: " + replyQueueName); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(),"UTF-8"); break; } } return response; } public void close() throws Exception { connection.close(); } }