Этот пост объясняет темы в WSO2 Message Broker (MB) с подпиской и публикацией. Для этого варианта использования мы напишем два клиента Java.
- TopicSubscriber.java Подписаться на сообщения
- TopicPublisher.java для публикации сообщений
Давайте начнем.
[1] Получите WSO2 MB из
http://wso2.com/products/message-broker/.
[2] Создайте Porject «Клиент» в IDE, который вы предпочитаете.
[3] Добавьте ниже в lib Dir в проекте (Эти файлы могут быть находится в клиентской библиотеке в МБ)
- Анды-клиент-0.13.wso2v4.jar
- Джеронимо-jms_1.1_spec-1.1.0.wso2v1.jar
- log4j-1.2.17.jar
- org.wso2.carbon.event.client-4.0.0.jar
- org.wso2.carbon.event.client.stub-4.0.0.jar
- SLF4J-1.5.10.wso2v1.jar
[4] Создал класс «TopicSubscriber.java», чтобы подписаться на сообщения
package simple; import java.util.Properties; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.naming.InitialContext; import javax.naming.NamingException; public class TopicSubscriber { private String topicName = "news.sport"; private String initialContextFactory = "org.wso2.andes.jndi." +"PropertiesFileInitialContextFactory"; private String connectionString = "amqp:" +"//admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'"; private boolean messageReceived = false; public static void main(String[] args) { TopicSubscriber subscriber = new TopicSubscriber(); subscriber.subscribeWithTopicLookup(); } public void subscribeWithTopicLookup() { Properties properties = new Properties(); TopicConnection topicConnection = null; properties.put("java.naming.factory.initial", initialContextFactory); properties.put("connectionfactory.QueueConnectionFactory", connectionString); properties.put("topic." + topicName, topicName); try { InitialContext ctx = new InitialContext(properties); TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx .lookup("QueueConnectionFactory"); topicConnection = topicConnectionFactory.createTopicConnection(); System.out .println("Create Topic Connection for Topic " + topicName); while (!messageReceived) { try { TopicSession topicSession = topicConnection .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = (Topic) ctx.lookup(topicName); // start the connection topicConnection.start(); // create a topic subscriber javax.jms.TopicSubscriber topicSubscriber = topicSession .createSubscriber(topic); TestMessageListener messageListener = new TestMessageListener(); topicSubscriber.setMessageListener(messageListener); Thread.sleep(5000); topicSubscriber.close(); topicSession.close(); } catch (JMSException e) { e.printStackTrace(); } catch (NamingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (NamingException e) { throw new RuntimeException("Error in initial context lookup", e); } catch (JMSException e) { throw new RuntimeException("Error in JMS operations", e); } finally { if (topicConnection != null) { try { topicConnection.close(); } catch (JMSException e) { throw new RuntimeException( "Error in closing topic connection", e); } } } } public class TestMessageListener implements MessageListener { public void onMessage(Message message) { try { System.out.println("Got the Message : " + ((TextMessage) message).getText()); messageReceived = true; } catch (JMSException e) { e.printStackTrace(); } } } }
[5] Создан класс «TopicPublisher.java» для публикации сообщений.
package simple; import javax.jms.*; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class TopicPublisher { private String topicName = "news.sport"; private String initialContextFactory = "org.wso2.andes.jndi." +"PropertiesFileInitialContextFactory"; private String connectionString = "amqp:" +"//admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'"; public static void main(String[] args) { TopicPublisher publisher = new TopicPublisher(); publisher.publishWithTopicLookup(); } public void publishWithTopicLookup() { Properties properties = new Properties(); TopicConnection topicConnection = null; properties.put("java.naming.factory.initial", initialContextFactory); properties.put("connectionfactory.QueueConnectionFactory", connectionString); properties.put("topic." + topicName, topicName); try { // initialize // the required connection factories InitialContext ctx = new InitialContext(properties); TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx .lookup("QueueConnectionFactory"); topicConnection = topicConnectionFactory.createTopicConnection(); try { TopicSession topicSession = topicConnection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE); // create or use the topic System.out.println("Use the Topic " + topicName); Topic topic = (Topic) ctx.lookup(topicName); javax.jms.TopicPublisher topicPublisher = topicSession .createPublisher(topic); String msg = "Hi, I am Test Message"; TextMessage textMessage = topicSession.createTextMessage(msg); topicPublisher.publish(textMessage); System.out.println("Publishing message " +textMessage); topicPublisher.close(); topicSession.close(); Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } catch (JMSException e) { throw new RuntimeException("Error in JMS operations", e); } catch (NamingException e) { throw new RuntimeException("Error in initial context lookup", e); } } }
[6] Во — первых Run «TopicSubscriber.java» , а затем запустить «TopicPublisher.java»
Вот из пут и от
TopicSubscriber ::
Создать Тему для подключения Тема news.sport
получил сообщение: Привет, Я Тестовое сообщение
TopicPublisher ::
Использование в Тема news.sport Публикация сообщения
Тело: Привет, я тестовое сообщение.
Идентификатор корреляции JMS: нулевая
метка времени JMS: 1359720212306 Срок
действия JMS: 0
Приоритет JMS: 4
Режим доставки JMS: 2
Ответ JMS: null
JMS Доставлено: false
JMS Адресат: тема: //amq.topic/news.sport/?routingkey=’news.sport’&exclusive=’true’&autodelete=’true ‘
JMS Тип:
ноль
JMS MessageID: ID:d7915d2c-6ddc-3b8a-b1aa-7a63009c6cae
JMS Content-Type: text / plain Номер сообщения AMQ: -1
Свойства:
JMS_QPID_DESTTYPE = 2
[More] Вот полное сообщение, которое мы отправили TopicSubscriber. Мы можем получить это любым параметром выше.
Вот пример, чтобы получить TimeStamp и ID из сообщения JMS.
[6] Во — первых Run «TopicSubscriber.java» , а затем запустить «TopicPublisher.java»
Вот из пут и от
TopicSubscriber ::
Создать Тему для подключения Тема news.sport
получил сообщение: Привет, Я Тестовое сообщение
TopicPublisher ::
Использование в Тема news.sport Публикация сообщения
Тело: Привет, я тестовое сообщение
Идентификатор JMS-корреляции: null
JMS timestamp: 1359720212306 Срок
действия JMS: 0
Приоритет JMS: 4
Режим доставки JMS: 2
JMS-ответ на: null
JMS Доставлено: false JMS-адресат:
тема: //amq.topic/news.sport/? Routingkey = ‘news.sport’ & exclusive = ‘true’ & autodelete = ‘true’
JMS-тип: null
JMS MessageID: ID: d7915d2c-6ddc-3b8a-b1aa-7a63009c6cae
JMS Content-Type: text / plain Номер сообщения AMQ: -1
Свойства:
JMS_QPID_DESTTYPE = 2
[Больше] Вот полное сообщение, которое мы отправили TopicSubscriber. Мы можем получить это любым параметром выше.
Вот пример, чтобы получить TimeStamp и ID из сообщения JMS.
public class TestMessageListener implements MessageListener { public void onMessage(Message message) { try { System.out.println("Got the Message TimeStamp: " + message.getJMSTimestamp()); System.out.println("Got the Message JMS ID : " + message.getJMSMessageID()); messageReceived = true; } catch (JMSException e) { e.printStackTrace(); } } }