Статьи

Клиент для ActiveMQ

Этот пост объясняет темы в Active MQ (Message Broker) с подпиской и публикацией. Для этого мы напишем два Java-клиента. Как мы это сделали для wso2 Message Broker

  • TopicSubscriber.java Подписаться на сообщения
  • TopicPublisher.java для публикации сообщений

Давайте начнем.

[1] Получить Active MQ с
http://activemq.apache.org/download.html

[1.1] Запустить Active MQ с \ bin \ activemq.bat.

Вы можете увидеть форму запущенного сервера
http: // localhost: 8161 / admin /

[2] Создайте Porject «Client» в IDE, который вы предпочли

[3] Добавьте activemq-all-5.7.0.jar в lib Dir в проекте (activemq-all-5.7.0.jar можно найти в корневой папке)

[ 4] Создал класс «TopicSubscriber.java», чтобы подписаться на сообщения

package simple;
 
import java.util.Properties;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
import javax.naming.NamingException;
 
public class TopicSubscriber {
 
 private String topicName = "news.sport";
 private String initialContextFactory = ""
 +"org.apache.activemq.jndi.ActiveMQInitialContextFactory";
 private String connectionString = "tcp://"
 +"localhost:61616";
  
 private boolean messageReceived = false;
 
 public static void main(String[] args) {
  TopicSubscriber subscriber = new TopicSubscriber();
  subscriber.subscribeWithTopicLookup();
 }
 
 public void subscribeWithTopicLookup() {
 
  Properties properties = new Properties();
  TopicConnection topicConnection = null;
  properties.put("java.naming.factory.initial", initialContextFactory);
  properties.put("connectionfactory.QueueConnectionFactory",
    connectionString);
  properties.put("topic." + topicName, topicName);
  try {
   InitialContext ctx = new InitialContext(properties);
   TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
     .lookup("QueueConnectionFactory");
   topicConnection = topicConnectionFactory.createTopicConnection();
   System.out
     .println("Create Topic Connection for Topic " + topicName);
 
   while (!messageReceived) {
    try {
     TopicSession topicSession = topicConnection
       .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
 
     Topic topic = (Topic) ctx.lookup(topicName);
     // start the connection
     topicConnection.start();
 
     // create a topic subscriber
     javax.jms.TopicSubscriber topicSubscriber = topicSession
       .createSubscriber(topic);
 
     TestMessageListener messageListener = new TestMessageListener();
     topicSubscriber.setMessageListener(messageListener);
 
     Thread.sleep(5000);
     topicSubscriber.close();
     topicSession.close();
    } catch (JMSException e) {
     e.printStackTrace();
    } catch (NamingException e) {
     e.printStackTrace();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
 
  } catch (NamingException e) {
   throw new RuntimeException("Error in initial context lookup", e);
  } catch (JMSException e) {
   throw new RuntimeException("Error in JMS operations", e);
  } finally {
   if (topicConnection != null) {
    try {
     topicConnection.close();
    } catch (JMSException e) {
     throw new RuntimeException(
       "Error in closing topic connection", e);
    }
   }
  }
 }
 
 public class TestMessageListener implements MessageListener {
  public void onMessage(Message message) {
   try {
    System.out.println("Got the Message : "
      + ((TextMessage) message).getText());
    messageReceived = true;
   } catch (JMSException e) {
    e.printStackTrace();
   }
  }
 }
 
}

[5] Создан класс «TopicPublisher.java» для публикации сообщений.

package simple;
 
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
 
public class TopicPublisher {
 private String topicName = "news.sport";
 
 private String initialContextFactory = "org.apache.activemq"
+".jndi.ActiveMQInitialContextFactory";
 private String connectionString = "tcp://localhost:61616";
  
 public static void main(String[] args) {
  TopicPublisher publisher = new TopicPublisher();
  publisher.publishWithTopicLookup();
 }
 
 public void publishWithTopicLookup() {
  Properties properties = new Properties();
  TopicConnection topicConnection = null;
  properties.put("java.naming.factory.initial", initialContextFactory);
  properties.put("connectionfactory.QueueConnectionFactory",
    connectionString);
  properties.put("topic." + topicName, topicName);
 
  try {
   // initialize
   // the required connection factories
   InitialContext ctx = new InitialContext(properties);
   TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
     .lookup("QueueConnectionFactory");
   topicConnection = topicConnectionFactory.createTopicConnection();
 
   try {
    TopicSession topicSession = topicConnection.createTopicSession(
      false, Session.AUTO_ACKNOWLEDGE);
    // create or use the topic
    System.out.println("Use the Topic " + topicName);
    Topic topic = (Topic) ctx.lookup(topicName);
    javax.jms.TopicPublisher topicPublisher = topicSession
      .createPublisher(topic);
 
    String msg = "Hi, I am Test Message";
    TextMessage textMessage = topicSession.createTextMessage(msg);
 
      topicPublisher.publish(textMessage);
    System.out.println("Publishing message " +textMessage);
    topicPublisher.close();
    topicSession.close();
 
    Thread.sleep(20);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
 
  } catch (JMSException e) {
   throw new RuntimeException("Error in JMS operations", e);
  } catch (NamingException e) {
   throw new RuntimeException("Error in initial context lookup", e);
  }
 }
 
}

[6] Во — первых Run «TopicSubscriber.java» , а затем запустить «TopicPublisher.java»

Вот из пут и от

TopicSubscriber ::

Создать Тему для подключения Тема news.sport

получил сообщение: Привет, Я Тестовое сообщение

TopicPublisher ::

Использование в Новости темы.спорт
Публикация сообщения ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID: Madhuka-THINK-51683-1359787878456-1: 1: 1: 1: 1, originalDestination = null, originalTransactionId = null, providerId = null, назначение = тема: //news.sport, TransactionsId = null, срок действия = 0, отметка времени = 1359787878729, прибытие = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, постоянный = true, тип = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, сжатый = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hi, я тестовое сообщение}

 [Подробнее] Вот полное сообщение, которое мы отправили на TopicSubscriber. Мы можем получить это любым параметром выше.

Вот пример, чтобы получить TimeStamp и ID из сообщения JMS.

public class TestMessageListener implements MessageListener {
 public void onMessage(Message message) {
  try {
   System.out.println("Got the Message  TimeStamp: "
     +  message.getJMSTimestamp());
   System.out.println("Got the Message JMS ID : "
     +  message.getJMSMessageID());
   messageReceived = true;
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
} 

Теперь перейдите на сервер ActiveMQ по адресу
http: // localhost: 8161 / admin /  См. Эту тему и количество сообщений для этой темы. Теперь вы можете проверить больше в «Active MQ»