Этот пост объясняет темы в WSO2 Message Broker (MB) с подпиской и публикацией. Для этого варианта использования мы напишем два клиента Java.
- TopicSubscriber.java Подписаться на сообщения
- TopicPublisher.java для публикации сообщений
Давайте начнем.
[1] Получите WSO2 MB из
http://wso2.com/products/message-broker/.
[2] Создайте Porject «Клиент» в IDE, который вы предпочитаете.
[3] Добавьте ниже в lib Dir в проекте (Эти файлы могут быть находится в клиентской библиотеке в МБ)
- Анды-клиент-0.13.wso2v4.jar
- Джеронимо-jms_1.1_spec-1.1.0.wso2v1.jar
- log4j-1.2.17.jar
- org.wso2.carbon.event.client-4.0.0.jar
- org.wso2.carbon.event.client.stub-4.0.0.jar
- SLF4J-1.5.10.wso2v1.jar
[4] Создал класс «TopicSubscriber.java», чтобы подписаться на сообщения
package simple;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class TopicSubscriber {
private String topicName = "news.sport";
private String initialContextFactory = "org.wso2.andes.jndi."
+"PropertiesFileInitialContextFactory";
private String connectionString = "amqp:"
+"//admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
private boolean messageReceived = false;
public static void main(String[] args) {
TopicSubscriber subscriber = new TopicSubscriber();
subscriber.subscribeWithTopicLookup();
}
public void subscribeWithTopicLookup() {
Properties properties = new Properties();
TopicConnection topicConnection = null;
properties.put("java.naming.factory.initial", initialContextFactory);
properties.put("connectionfactory.QueueConnectionFactory",
connectionString);
properties.put("topic." + topicName, topicName);
try {
InitialContext ctx = new InitialContext(properties);
TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
.lookup("QueueConnectionFactory");
topicConnection = topicConnectionFactory.createTopicConnection();
System.out
.println("Create Topic Connection for Topic " + topicName);
while (!messageReceived) {
try {
TopicSession topicSession = topicConnection
.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) ctx.lookup(topicName);
// start the connection
topicConnection.start();
// create a topic subscriber
javax.jms.TopicSubscriber topicSubscriber = topicSession
.createSubscriber(topic);
TestMessageListener messageListener = new TestMessageListener();
topicSubscriber.setMessageListener(messageListener);
Thread.sleep(5000);
topicSubscriber.close();
topicSession.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (NamingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (NamingException e) {
throw new RuntimeException("Error in initial context lookup", e);
} catch (JMSException e) {
throw new RuntimeException("Error in JMS operations", e);
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
throw new RuntimeException(
"Error in closing topic connection", e);
}
}
}
}
public class TestMessageListener implements MessageListener {
public void onMessage(Message message) {
try {
System.out.println("Got the Message : "
+ ((TextMessage) message).getText());
messageReceived = true;
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
[5] Создан класс «TopicPublisher.java» для публикации сообщений.
package simple;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
private String topicName = "news.sport";
private String initialContextFactory = "org.wso2.andes.jndi."
+"PropertiesFileInitialContextFactory";
private String connectionString = "amqp:"
+"//admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
public static void main(String[] args) {
TopicPublisher publisher = new TopicPublisher();
publisher.publishWithTopicLookup();
}
public void publishWithTopicLookup() {
Properties properties = new Properties();
TopicConnection topicConnection = null;
properties.put("java.naming.factory.initial", initialContextFactory);
properties.put("connectionfactory.QueueConnectionFactory",
connectionString);
properties.put("topic." + topicName, topicName);
try {
// initialize
// the required connection factories
InitialContext ctx = new InitialContext(properties);
TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx
.lookup("QueueConnectionFactory");
topicConnection = topicConnectionFactory.createTopicConnection();
try {
TopicSession topicSession = topicConnection.createTopicSession(
false, Session.AUTO_ACKNOWLEDGE);
// create or use the topic
System.out.println("Use the Topic " + topicName);
Topic topic = (Topic) ctx.lookup(topicName);
javax.jms.TopicPublisher topicPublisher = topicSession
.createPublisher(topic);
String msg = "Hi, I am Test Message";
TextMessage textMessage = topicSession.createTextMessage(msg);
topicPublisher.publish(textMessage);
System.out.println("Publishing message " +textMessage);
topicPublisher.close();
topicSession.close();
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (JMSException e) {
throw new RuntimeException("Error in JMS operations", e);
} catch (NamingException e) {
throw new RuntimeException("Error in initial context lookup", e);
}
}
}
[6] Во — первых Run «TopicSubscriber.java» , а затем запустить «TopicPublisher.java»
Вот из пут и от
TopicSubscriber ::
Создать Тему для подключения Тема news.sport
получил сообщение: Привет, Я Тестовое сообщение
TopicPublisher ::
Использование в Тема news.sport Публикация сообщения
Тело: Привет, я тестовое сообщение.
Идентификатор корреляции JMS: нулевая
метка времени JMS: 1359720212306 Срок
действия JMS: 0
Приоритет JMS: 4
Режим доставки JMS: 2
Ответ JMS: null
JMS Доставлено: false
JMS Адресат: тема: //amq.topic/news.sport/?routingkey=’news.sport’&exclusive=’true’&autodelete=’true ‘
JMS Тип:
ноль
JMS MessageID: ID:d7915d2c-6ddc-3b8a-b1aa-7a63009c6cae
JMS Content-Type: text / plain Номер сообщения AMQ: -1
Свойства:
JMS_QPID_DESTTYPE = 2
[More] Вот полное сообщение, которое мы отправили TopicSubscriber. Мы можем получить это любым параметром выше.
Вот пример, чтобы получить TimeStamp и ID из сообщения JMS.
[6] Во — первых Run «TopicSubscriber.java» , а затем запустить «TopicPublisher.java»
Вот из пут и от
TopicSubscriber ::
Создать Тему для подключения Тема news.sport
получил сообщение: Привет, Я Тестовое сообщение
TopicPublisher ::
Использование в Тема news.sport Публикация сообщения
Тело: Привет, я тестовое сообщение
Идентификатор JMS-корреляции: null
JMS timestamp: 1359720212306 Срок
действия JMS: 0
Приоритет JMS: 4
Режим доставки JMS: 2
JMS-ответ на: null
JMS Доставлено: false JMS-адресат:
тема: //amq.topic/news.sport/? Routingkey = ‘news.sport’ & exclusive = ‘true’ & autodelete = ‘true’
JMS-тип: null
JMS MessageID: ID: d7915d2c-6ddc-3b8a-b1aa-7a63009c6cae
JMS Content-Type: text / plain Номер сообщения AMQ: -1
Свойства:
JMS_QPID_DESTTYPE = 2
[Больше] Вот полное сообщение, которое мы отправили TopicSubscriber. Мы можем получить это любым параметром выше.
Вот пример, чтобы получить TimeStamp и ID из сообщения JMS.
public class TestMessageListener implements MessageListener {
public void onMessage(Message message) {
try {
System.out.println("Got the Message TimeStamp: "
+ message.getJMSTimestamp());
System.out.println("Got the Message JMS ID : "
+ message.getJMSMessageID());
messageReceived = true;
} catch (JMSException e) {
e.printStackTrace();
}
}
}