Skip to content
Snippets Groups Projects
Select Git revision
  • 287269a571817b7406e7d2dab01ae7b7fb6c996b
  • master default protected
  • parallel_trapping
  • offload_trapping
  • script_devel
  • unify_iterations
  • containers-m10
  • magma_refinement
  • release9
  • enable_svd
  • parallel_angles_gmu
  • containers-m8
  • parallel_angles
  • profile_omp_leonardo
  • test_nvidia_profiler
  • containers
  • shaditest
  • test1
  • main
  • 3-error-in-run-the-program
  • experiment
  • NP_TMcode-M10a.03
  • NP_TMcode-M10a.02
  • NP_TMcode-M10a.01
  • NP_TMcode-M10a.00
  • NP_TMcode-M9.01
  • NP_TMcode-M9.00
  • NP_TMcode-M8.03
  • NP_TMcode-M8.02
  • NP_TMcode-M8.01
  • NP_TMcode-M8.00
  • NP_TMcode-M7.00
  • v0.0
33 results

cfrfme.cpp

Blame
  • RpcOverAmqp.java 4.43 KiB
    
    // At each vlkb-request:
    // establish "connection" to RabbitMQ-broker (host:port) on autogenerated "channel" as user ???
    // then using this connection:channel do:
    // * create a reply queue with autogenerated name
    // * start a consumer on this queue
    // generate a message with properties: corrId & reply-queue
    // * publish the message to the pre-defined "amq.direct" exchange with routingKey from config
    // * start waiting on reply-queue for next delivery
    //
    // It is admins responsibility to configure routingKey in Java-client (see Settings) to the
    // same value as queuename used starting vlkbd to ensure  delivery
    // of vlkb-requests from Exchange to the correct queue
    
    
    import java.util.logging.Logger;
    
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import java.util.UUID;
    
    
    public class RpcOverAmqp
    {
     private static final Logger LOGGER = Logger.getLogger("RpcOverAmqp");
    
    	private final boolean NO_ACK = true;
    	// affects message consume from queue:
    	// broker will remove msg right after delivery without waiting for confirmation
    	// improves performance on expense of reliability:
    
    	private String userName = "guest";
    	private String password = "guest";
    	private String hostName;
    	private int portNumber;
    	private String routingKey;
    
    	private Connection connection;
    	private Channel channel;
    	private String replyQueueName;
    	private QueueingConsumer consumer;
    
     private int channelNumber;
    
    
     public static String doRpc(Settings.AmqpConn  amqpConn, String InStr)
       {
          final String userName = "guest";
          final String password = "guest";
          // FIXME move these to Settings
    
          RpcOverAmqp rpc = new RpcOverAmqp(
                userName, password,
                amqpConn.hostName(),
                amqpConn.portNumber(),
                amqpConn.routingKey());
    
          rpc.initConnectionAndReplyQueue();
    
          String OutStr = null;
    
          try
          {
             LOGGER.info("Sent request : " + InStr);
             OutStr = rpc.callAndWaitReply(InStr);
             LOGGER.info("Got response : " + OutStr);
          }
          catch  (Exception e)
          {
             e.printStackTrace();
          }
          finally
          {
             try
             {
                rpc.close();
             }
             catch (Exception ignore)
             {
                LOGGER.info("ignoring exception on rpc.close():" + ignore.getMessage());
             }
          }
    
          return OutStr;
       }
    
    	RpcOverAmqp(String userName, String password, String hostName, int portNumber, String routingKey)
    	{
    		this.userName = userName;
    		this.password = password;
    		this.hostName = hostName;
    		this.portNumber = portNumber;
    		this.routingKey = routingKey;
    	}
    
    
    
    	public void initConnectionAndReplyQueue()
    	{
    		try
    		{
    			ConnectionFactory factory = new ConnectionFactory();
    			factory.setHost(hostName);
    			factory.setPort(portNumber);
    
          factory.setUsername(userName);
          factory.setPassword(password);
    
          connection = factory.newConnection();
    			channel = connection.createChannel();
    
          channelNumber = channel.getChannelNumber();
    
    			replyQueueName = channel.queueDeclare().getQueue();
    			consumer = new QueueingConsumer(channel);
    
    			// Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag.
    			channel.basicConsume(replyQueueName, NO_ACK, consumer);
    		}
    		catch(Exception e)
    		{
    			e.printStackTrace();
    		}
    	}
    
    
    
    	public String callAndWaitReply(String message) throws Exception {
    		String response = null;
    		String corrId = UUID.randomUUID().toString();
    
    		BasicProperties props = new BasicProperties
    			.Builder()
    			.correlationId(corrId)
    			.replyTo(replyQueueName)
    			.build();
    
    		// send rpc params and where to reply (reply-queue & corrId)
    
    		channel.basicPublish("", routingKey, props, message.getBytes("UTF-8"));
    		//channel.basicPublish("amq.direct", routingKey, props, message.getBytes("UTF-8"));
    
    		// wait for reply msg and return if corrId matches
    
    		while (true)
    		{
    
    			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    
    			System.out.println("CorrId sent[" + channelNumber + "]: "  + delivery.getProperties().getCorrelationId()
    					+ "\nCorrId recv: " + corrId
    					+ "\nreplyQueueName: " +  replyQueueName);
    
    			if (delivery.getProperties().getCorrelationId().equals(corrId))
    			{
    				response = new String(delivery.getBody(),"UTF-8");
    				break;
    			}
    		}
    
    		return response;
    	}
    
    
    
    	public void close() throws Exception
    	{
    		connection.close();
    	}
    
    }