// At each vlkb-request:
// establish "connection" to RabbitMQ-broker (host:port) on autogenerated "channel" as user ???
// then using this connection:channel do:
// * create a reply queue with autogenerated name
// * start a consumer on this queue
// generate a message with properties: corrId & reply-queue
// * publish the message to the pre-defined "amq.direct" exchange with routingKey from config
// * start waiting on reply-queue for next delivery
//
// It is admins responsibility to configure routingKey in Java-client (see Settings) to the
// same value as queuename used starting vlkbd to ensure  delivery
// of vlkb-requests from Exchange to the correct queue


import java.util.logging.Logger;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;


public class RpcOverAmqp
{
 private static final Logger LOGGER = Logger.getLogger("RpcOverAmqp");

	private final boolean NO_ACK = true;
	// affects message consume from queue:
	// broker will remove msg right after delivery without waiting for confirmation
	// improves performance on expense of reliability:

	private String userName = "guest";
	private String password = "guest";
	private String hostName;
	private int portNumber;
	private String routingKey;

	private Connection connection;
	private Channel channel;
	private String replyQueueName;
	private QueueingConsumer consumer;

 private int channelNumber;


 public static String doRpc(Settings.AmqpConn  amqpConn, String InStr)
   {
      final String userName = "guest";
      final String password = "guest";
      // FIXME move these to Settings

      RpcOverAmqp rpc = new RpcOverAmqp(
            userName, password,
            amqpConn.hostName(),
            amqpConn.portNumber(),
            amqpConn.routingKey());

      rpc.initConnectionAndReplyQueue();

      String OutStr = null;

      try
      {
         LOGGER.info("Sent request : " + InStr);
         OutStr = rpc.callAndWaitReply(InStr);
         LOGGER.info("Got response : " + OutStr);
      }
      catch  (Exception e)
      {
         e.printStackTrace();
      }
      finally
      {
         try
         {
            rpc.close();
         }
         catch (Exception ignore)
         {
            LOGGER.info("ignoring exception on rpc.close():" + ignore.getMessage());
         }
      }

      return OutStr;
   }

	RpcOverAmqp(String userName, String password, String hostName, int portNumber, String routingKey)
	{
		this.userName = userName;
		this.password = password;
		this.hostName = hostName;
		this.portNumber = portNumber;
		this.routingKey = routingKey;
	}



	public void initConnectionAndReplyQueue()
	{
		try
		{
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(hostName);
			factory.setPort(portNumber);

      factory.setUsername(userName);
      factory.setPassword(password);

      connection = factory.newConnection();
			channel = connection.createChannel();

      channelNumber = channel.getChannelNumber();

			replyQueueName = channel.queueDeclare().getQueue();
			consumer = new QueueingConsumer(channel);

			// Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag.
			channel.basicConsume(replyQueueName, NO_ACK, consumer);
		}
		catch(Exception e)
		{
			e.printStackTrace();
		}
	}



	public String callAndWaitReply(String message) throws Exception {
		String response = null;
		String corrId = UUID.randomUUID().toString();

		BasicProperties props = new BasicProperties
			.Builder()
			.correlationId(corrId)
			.replyTo(replyQueueName)
			.build();

		// send rpc params and where to reply (reply-queue & corrId)

		channel.basicPublish("", routingKey, props, message.getBytes("UTF-8"));
		//channel.basicPublish("amq.direct", routingKey, props, message.getBytes("UTF-8"));

		// wait for reply msg and return if corrId matches

		while (true)
		{

			QueueingConsumer.Delivery delivery = consumer.nextDelivery();

			System.out.println("CorrId sent[" + channelNumber + "]: "  + delivery.getProperties().getCorrelationId()
					+ "\nCorrId recv: " + corrId
					+ "\nreplyQueueName: " +  replyQueueName);

			if (delivery.getProperties().getCorrelationId().equals(corrId))
			{
				response = new String(delivery.getBody(),"UTF-8");
				break;
			}
		}

		return response;
	}



	public void close() throws Exception
	{
		connection.close();
	}

}