Этот пост объясняет темы в Active MQ (Message Broker) с подпиской и публикацией. Для этого мы напишем два Java-клиента. Как мы это сделали для wso2 Message Broker
- TopicSubscriber.java Подписаться на сообщения
- TopicPublisher.java для публикации сообщений
Давайте начнем.
[1] Получить Active MQ с
http://activemq.apache.org/download.html
[1.1] Запустить Active MQ с \ bin \ activemq.bat.
Вы можете увидеть форму запущенного сервера
http: // localhost: 8161 / admin /
[2] Создайте Porject «Client» в IDE, который вы предпочли
[3] Добавьте activemq-all-5.7.0.jar в lib Dir в проекте (activemq-all-5.7.0.jar можно найти в корневой папке)
[ 4] Создал класс «TopicSubscriber.java», чтобы подписаться на сообщения
package simple; import java.util.Properties; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.naming.InitialContext; import javax.naming.NamingException; public class TopicSubscriber { private String topicName = "news.sport"; private String initialContextFactory = "" +"org.apache.activemq.jndi.ActiveMQInitialContextFactory"; private String connectionString = "tcp://" +"localhost:61616"; private boolean messageReceived = false; public static void main(String[] args) { TopicSubscriber subscriber = new TopicSubscriber(); subscriber.subscribeWithTopicLookup(); } public void subscribeWithTopicLookup() { Properties properties = new Properties(); TopicConnection topicConnection = null; properties.put("java.naming.factory.initial", initialContextFactory); properties.put("connectionfactory.QueueConnectionFactory", connectionString); properties.put("topic." + topicName, topicName); try { InitialContext ctx = new InitialContext(properties); TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx .lookup("QueueConnectionFactory"); topicConnection = topicConnectionFactory.createTopicConnection(); System.out .println("Create Topic Connection for Topic " + topicName); while (!messageReceived) { try { TopicSession topicSession = topicConnection .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = (Topic) ctx.lookup(topicName); // start the connection topicConnection.start(); // create a topic subscriber javax.jms.TopicSubscriber topicSubscriber = topicSession .createSubscriber(topic); TestMessageListener messageListener = new TestMessageListener(); topicSubscriber.setMessageListener(messageListener); Thread.sleep(5000); topicSubscriber.close(); topicSession.close(); } catch (JMSException e) { e.printStackTrace(); } catch (NamingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (NamingException e) { throw new RuntimeException("Error in initial context lookup", e); } catch (JMSException e) { throw new RuntimeException("Error in JMS operations", e); } finally { if (topicConnection != null) { try { topicConnection.close(); } catch (JMSException e) { throw new RuntimeException( "Error in closing topic connection", e); } } } } public class TestMessageListener implements MessageListener { public void onMessage(Message message) { try { System.out.println("Got the Message : " + ((TextMessage) message).getText()); messageReceived = true; } catch (JMSException e) { e.printStackTrace(); } } } }
[5] Создан класс «TopicPublisher.java» для публикации сообщений.
package simple; import javax.jms.*; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class TopicPublisher { private String topicName = "news.sport"; private String initialContextFactory = "org.apache.activemq" +".jndi.ActiveMQInitialContextFactory"; private String connectionString = "tcp://localhost:61616"; public static void main(String[] args) { TopicPublisher publisher = new TopicPublisher(); publisher.publishWithTopicLookup(); } public void publishWithTopicLookup() { Properties properties = new Properties(); TopicConnection topicConnection = null; properties.put("java.naming.factory.initial", initialContextFactory); properties.put("connectionfactory.QueueConnectionFactory", connectionString); properties.put("topic." + topicName, topicName); try { // initialize // the required connection factories InitialContext ctx = new InitialContext(properties); TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx .lookup("QueueConnectionFactory"); topicConnection = topicConnectionFactory.createTopicConnection(); try { TopicSession topicSession = topicConnection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE); // create or use the topic System.out.println("Use the Topic " + topicName); Topic topic = (Topic) ctx.lookup(topicName); javax.jms.TopicPublisher topicPublisher = topicSession .createPublisher(topic); String msg = "Hi, I am Test Message"; TextMessage textMessage = topicSession.createTextMessage(msg); topicPublisher.publish(textMessage); System.out.println("Publishing message " +textMessage); topicPublisher.close(); topicSession.close(); Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } catch (JMSException e) { throw new RuntimeException("Error in JMS operations", e); } catch (NamingException e) { throw new RuntimeException("Error in initial context lookup", e); } } }
[6] Во — первых Run «TopicSubscriber.java» , а затем запустить «TopicPublisher.java»
Вот из пут и от
TopicSubscriber ::
Создать Тему для подключения Тема news.sport
получил сообщение: Привет, Я Тестовое сообщение
TopicPublisher ::
Использование в Новости темы.спорт
Публикация сообщения ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID: Madhuka-THINK-51683-1359787878456-1: 1: 1: 1: 1, originalDestination = null, originalTransactionId = null, providerId = null, назначение = тема: //news.sport, TransactionsId = null, срок действия = 0, отметка времени = 1359787878729, прибытие = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, постоянный = true, тип = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, сжатый = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hi, я тестовое сообщение}
[Подробнее] Вот полное сообщение, которое мы отправили на TopicSubscriber. Мы можем получить это любым параметром выше.
Вот пример, чтобы получить TimeStamp и ID из сообщения JMS.
public class TestMessageListener implements MessageListener { public void onMessage(Message message) { try { System.out.println("Got the Message TimeStamp: " + message.getJMSTimestamp()); System.out.println("Got the Message JMS ID : " + message.getJMSMessageID()); messageReceived = true; } catch (JMSException e) { e.printStackTrace(); } } }
Теперь перейдите на сервер ActiveMQ по адресу
http: // localhost: 8161 / admin / См. Эту тему и количество сообщений для этой темы. Теперь вы можете проверить больше в «Active MQ»