Этот пост объясняет темы в Active MQ (Message Broker) с подпиской и публикацией. Для этого мы напишем два Java-клиента. Как мы это сделали для wso2 Message Broker
- TopicSubscriber.java Подписаться на сообщения
- TopicPublisher.java для публикации сообщений
Давайте начнем.
[1] Получить Active MQ с
http://activemq.apache.org/download.html
[1.1] Запустить Active MQ с \ bin \ activemq.bat.
Вы можете увидеть форму запущенного сервера
http: // localhost: 8161 / admin /
[2] Создайте Porject «Client» в IDE, который вы предпочли
[3] Добавьте activemq-all-5.7.0.jar в lib Dir в проекте (activemq-all-5.7.0.jar можно найти в корневой папке)
[ 4] Создал класс «TopicSubscriber.java», чтобы подписаться на сообщения
package simple;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class TopicSubscriber {
private String topicName = "news.sport";
private String initialContextFactory = ""
+"org.apache.activemq.jndi.ActiveMQInitialContextFactory";
private String connectionString = "tcp://"
+"localhost:61616";
private boolean messageReceived = false;
public static void main(String[] args) {
TopicSubscriber subscriber = new TopicSubscriber();
subscriber.subscribeWithTopicLookup();
}
public void subscribeWithTopicLookup() {
Properties properties = new Properties();
TopicConnection topicConnection = null;
properties.put("java.naming.factory.initial", initialContextFactory);
properties.put("connectionfactory.QueueConnectionFactory",
connectionString);
properties.put("topic." + topicName, topicName);
try {
InitialContext ctx = new InitialContext(properties);
TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
.lookup("QueueConnectionFactory");
topicConnection = topicConnectionFactory.createTopicConnection();
System.out
.println("Create Topic Connection for Topic " + topicName);
while (!messageReceived) {
try {
TopicSession topicSession = topicConnection
.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) ctx.lookup(topicName);
// start the connection
topicConnection.start();
// create a topic subscriber
javax.jms.TopicSubscriber topicSubscriber = topicSession
.createSubscriber(topic);
TestMessageListener messageListener = new TestMessageListener();
topicSubscriber.setMessageListener(messageListener);
Thread.sleep(5000);
topicSubscriber.close();
topicSession.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (NamingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (NamingException e) {
throw new RuntimeException("Error in initial context lookup", e);
} catch (JMSException e) {
throw new RuntimeException("Error in JMS operations", e);
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
throw new RuntimeException(
"Error in closing topic connection", e);
}
}
}
}
public class TestMessageListener implements MessageListener {
public void onMessage(Message message) {
try {
System.out.println("Got the Message : "
+ ((TextMessage) message).getText());
messageReceived = true;
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
[5] Создан класс «TopicPublisher.java» для публикации сообщений.
package simple;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
private String topicName = "news.sport";
private String initialContextFactory = "org.apache.activemq"
+".jndi.ActiveMQInitialContextFactory";
private String connectionString = "tcp://localhost:61616";
public static void main(String[] args) {
TopicPublisher publisher = new TopicPublisher();
publisher.publishWithTopicLookup();
}
public void publishWithTopicLookup() {
Properties properties = new Properties();
TopicConnection topicConnection = null;
properties.put("java.naming.factory.initial", initialContextFactory);
properties.put("connectionfactory.QueueConnectionFactory",
connectionString);
properties.put("topic." + topicName, topicName);
try {
// initialize
// the required connection factories
InitialContext ctx = new InitialContext(properties);
TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
.lookup("QueueConnectionFactory");
topicConnection = topicConnectionFactory.createTopicConnection();
try {
TopicSession topicSession = topicConnection.createTopicSession(
false, Session.AUTO_ACKNOWLEDGE);
// create or use the topic
System.out.println("Use the Topic " + topicName);
Topic topic = (Topic) ctx.lookup(topicName);
javax.jms.TopicPublisher topicPublisher = topicSession
.createPublisher(topic);
String msg = "Hi, I am Test Message";
TextMessage textMessage = topicSession.createTextMessage(msg);
topicPublisher.publish(textMessage);
System.out.println("Publishing message " +textMessage);
topicPublisher.close();
topicSession.close();
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (JMSException e) {
throw new RuntimeException("Error in JMS operations", e);
} catch (NamingException e) {
throw new RuntimeException("Error in initial context lookup", e);
}
}
}
[6] Во — первых Run «TopicSubscriber.java» , а затем запустить «TopicPublisher.java»
Вот из пут и от
TopicSubscriber ::
Создать Тему для подключения Тема news.sport
получил сообщение: Привет, Я Тестовое сообщение
TopicPublisher ::
Использование в Новости темы.спорт
Публикация сообщения ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID: Madhuka-THINK-51683-1359787878456-1: 1: 1: 1: 1, originalDestination = null, originalTransactionId = null, providerId = null, назначение = тема: //news.sport, TransactionsId = null, срок действия = 0, отметка времени = 1359787878729, прибытие = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, постоянный = true, тип = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, сжатый = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hi, я тестовое сообщение}
[Подробнее] Вот полное сообщение, которое мы отправили на TopicSubscriber. Мы можем получить это любым параметром выше.
Вот пример, чтобы получить TimeStamp и ID из сообщения JMS.
public class TestMessageListener implements MessageListener {
public void onMessage(Message message) {
try {
System.out.println("Got the Message TimeStamp: "
+ message.getJMSTimestamp());
System.out.println("Got the Message JMS ID : "
+ message.getJMSMessageID());
messageReceived = true;
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Теперь перейдите на сервер ActiveMQ по адресу
http: // localhost: 8161 / admin / См. Эту тему и количество сообщений для этой темы. Теперь вы можете проверить больше в «Active MQ»
