Статьи

Темы в WSO2 Message Broker

Этот пост объясняет темы в WSO2 Message Broker (MB) с подпиской и публикацией. Для этого варианта использования мы напишем два клиента Java.

  • TopicSubscriber.java Подписаться на сообщения
  • TopicPublisher.java для публикации сообщений

Давайте начнем.

[1] Получите WSO2 MB из
http://wso2.com/products/message-broker/.

[2] Создайте Porject «Клиент» в IDE, который вы предпочитаете.

[3] Добавьте ниже в lib Dir в проекте (Эти файлы могут быть находится в клиентской библиотеке в МБ)

  • Анды-клиент-0.13.wso2v4.jar
  • Джеронимо-jms_1.1_spec-1.1.0.wso2v1.jar
  • log4j-1.2.17.jar
  • org.wso2.carbon.event.client-4.0.0.jar
  • org.wso2.carbon.event.client.stub-4.0.0.jar
  • SLF4J-1.5.10.wso2v1.jar

[4] Создал класс «TopicSubscriber.java», чтобы подписаться на сообщения

package simple;
 
import java.util.Properties;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
import javax.naming.NamingException;
 
public class TopicSubscriber {
 
 private String topicName = "news.sport";
 private String initialContextFactory = "org.wso2.andes.jndi."
+"PropertiesFileInitialContextFactory";
 private String connectionString = "amqp:"
+"//admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
 private boolean messageReceived = false;
 
 public static void main(String[] args) {
  TopicSubscriber subscriber = new TopicSubscriber();
  subscriber.subscribeWithTopicLookup();
 }
 
 public void subscribeWithTopicLookup() {
 
  Properties properties = new Properties();
  TopicConnection topicConnection = null;
  properties.put("java.naming.factory.initial", initialContextFactory);
  properties.put("connectionfactory.QueueConnectionFactory",
    connectionString);
  properties.put("topic." + topicName, topicName);
  try {
   InitialContext ctx = new InitialContext(properties);
   TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
     .lookup("QueueConnectionFactory");
   topicConnection = topicConnectionFactory.createTopicConnection();
   System.out
     .println("Create Topic Connection for Topic " + topicName);
 
   while (!messageReceived) {
    try {
     TopicSession topicSession = topicConnection
       .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
 
     Topic topic = (Topic) ctx.lookup(topicName);
     // start the connection
     topicConnection.start();
 
     // create a topic subscriber
     javax.jms.TopicSubscriber topicSubscriber = topicSession
       .createSubscriber(topic);
 
     TestMessageListener messageListener = new TestMessageListener();
     topicSubscriber.setMessageListener(messageListener);
 
     Thread.sleep(5000);
     topicSubscriber.close();
     topicSession.close();
    } catch (JMSException e) {
     e.printStackTrace();
    } catch (NamingException e) {
     e.printStackTrace();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
 
  } catch (NamingException e) {
   throw new RuntimeException("Error in initial context lookup", e);
  } catch (JMSException e) {
   throw new RuntimeException("Error in JMS operations", e);
  } finally {
   if (topicConnection != null) {
    try {
     topicConnection.close();
    } catch (JMSException e) {
     throw new RuntimeException(
       "Error in closing topic connection", e);
    }
   }
  }
 }
 
 public class TestMessageListener implements MessageListener {
  public void onMessage(Message message) {
   try {
    System.out.println("Got the Message : "
      + ((TextMessage) message).getText());
    messageReceived = true;
   } catch (JMSException e) {
    e.printStackTrace();
   }
  }
 }
 
}

[5] Создан класс «TopicPublisher.java» для публикации сообщений.

package simple;
 
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
 
public class TopicPublisher {
 private String topicName = "news.sport";
 private String initialContextFactory = "org.wso2.andes.jndi."
 +"PropertiesFileInitialContextFactory";
 private String connectionString = "amqp:"
 +"//admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
 
 public static void main(String[] args) {
  TopicPublisher publisher = new TopicPublisher();
  publisher.publishWithTopicLookup();
 }
 
 public void publishWithTopicLookup() {
  Properties properties = new Properties();
  TopicConnection topicConnection = null;
  properties.put("java.naming.factory.initial", initialContextFactory);
  properties.put("connectionfactory.QueueConnectionFactory",
    connectionString);
  properties.put("topic." + topicName, topicName);
 
  try {
   // initialize
   // the required connection factories
   InitialContext ctx = new InitialContext(properties);
   TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
     .lookup("QueueConnectionFactory");
   topicConnection = topicConnectionFactory.createTopicConnection();
 
   try {
    TopicSession topicSession = topicConnection.createTopicSession(
      false, Session.AUTO_ACKNOWLEDGE);
    // create or use the topic
    System.out.println("Use the Topic " + topicName);
    Topic topic = (Topic) ctx.lookup(topicName);
    javax.jms.TopicPublisher topicPublisher = topicSession
      .createPublisher(topic);
 
    String msg = "Hi, I am Test Message";
    TextMessage textMessage = topicSession.createTextMessage(msg);
 
      topicPublisher.publish(textMessage);
    System.out.println("Publishing message " +textMessage);
    topicPublisher.close();
    topicSession.close();
 
    Thread.sleep(20);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
 
  } catch (JMSException e) {
   throw new RuntimeException("Error in JMS operations", e);
  } catch (NamingException e) {
   throw new RuntimeException("Error in initial context lookup", e);
  }
 }
 
}

[6] Во — первых Run «TopicSubscriber.java» , а затем запустить «TopicPublisher.java»

Вот из пут и от

TopicSubscriber ::

Создать Тему для подключения Тема news.sport

получил сообщение: Привет, Я Тестовое сообщение

TopicPublisher ::

Использование в Тема news.sport Публикация сообщения

Тело: Привет, я тестовое сообщение.

Идентификатор корреляции JMS: нулевая

метка времени JMS: 1359720212306 Срок

действия JMS: 0

Приоритет JMS: 4

Режим доставки JMS: 2

Ответ JMS: null

JMS Доставлено: false

JMS Адресат: тема: //amq.topic/news.sport/?routingkey=’news.sport’&exclusive=’true’&autodelete=’true ‘

JMS Тип:
ноль

JMS MessageID: ID:d7915d2c-6ddc-3b8a-b1aa-7a63009c6cae

JMS Content-Type: text / plain Номер сообщения AMQ: -1

Свойства:

 JMS_QPID_DESTTYPE = 2

 [More] Вот полное сообщение, которое мы отправили TopicSubscriber. Мы можем получить это любым параметром выше.

Вот пример, чтобы получить TimeStamp и ID из сообщения JMS.

[6] Во — первых Run «TopicSubscriber.java» , а затем запустить «TopicPublisher.java»

Вот из пут и от

TopicSubscriber ::

Создать Тему для подключения Тема news.sport

получил сообщение: Привет, Я Тестовое сообщение

TopicPublisher ::

Использование в Тема news.sport Публикация сообщения

Тело: Привет, я тестовое сообщение

Идентификатор JMS-корреляции: null

JMS timestamp: 1359720212306 Срок

действия JMS: 0

Приоритет JMS: 4

Режим доставки JMS: 2

JMS-ответ на: null

JMS Доставлено: false JMS-адресат:

тема: //amq.topic/news.sport/? Routingkey = ‘news.sport’ & exclusive = ‘true’ & autodelete = ‘true’

JMS-тип: null

JMS MessageID: ID: d7915d2c-6ddc-3b8a-b1aa-7a63009c6cae

JMS Content-Type: text / plain Номер сообщения AMQ: -1

Свойства:

 JMS_QPID_DESTTYPE = 2

 [Больше] Вот полное сообщение, которое мы отправили TopicSubscriber. Мы можем получить это любым параметром выше.

Вот пример, чтобы получить TimeStamp и ID из сообщения JMS.

public class TestMessageListener implements MessageListener {
 public void onMessage(Message message) {
  try {
   System.out.println("Got the Message  TimeStamp: "
     +  message.getJMSTimestamp());
   System.out.println("Got the Message JMS ID : "
     +  message.getJMSMessageID());
   messageReceived = true;
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
}