Возможно, вы уже сталкивались с ситуацией, когда вам необходимо иметь возможности обмена сообщениями в вашем приложении, пока ваше клиентское приложение работает в среде, которую вы не можете контролировать по конфигурации и ограничениям сети. Такая ситуация приводит к тому, что вы не можете использовать связь JMS через назначенные порты, такие как 7676 и так далее.
Вы можете просто убрать JMS и следовать другому протоколу или даже обычному HTTP-соединению, чтобы удовлетворить ваши требования к архитектуре и дизайну, но мы не можем легко отказаться от возможностей чистого JMS API, поскольку нам может потребоваться длительная подписка на проверенную и уже хорошо протестированную и устоявшуюся конструкцию. и архитектура. Различные поставщики реализации JMS предоставляют различные типы возможностей для удовлетворения такого требования. ActiveMQ предоставляет HTTP и HTTPS транспорт для JMS API и описал это здесь .
OpenMQ предоставляет различные транспортные протоколы и каналы доступа для доступа к функциям OpenMQ из разных программ. Один из каналов доступа, который включает HTTP, называется универсальной службой сообщений (UMS), которая предоставляет простой шаблон взаимодействия REST для размещения сообщений и передачи их с любого языка программирования и устройства, которые могут взаимодействовать с сетевым сервером. UMS имеет некоторые ограничения, которые просто понятны на основе RESTful-природы UMS. Для текущей версии OpenMQ он поддерживает только очереди сообщений в качестве пунктов назначения, поэтому функции публикации и подписки недоступны.
Другая возможность, которая включает HTTP, — это JMS через HTTP или просто JMS HTTP туннелирование. Реализация OpenMQ JMS поддерживает HTTP / s в качестве транспортного протокола, если мы правильно настроим брокер сообщений. Я сказал, что реализация JMS поддерживает HTTP / S в качестве транспортного протокола, что означает, что мы, как пользователь JMS API, почти не видим различий в том, как мы используем JMS API. Мы можем использовать публикацию-подписку, двухточечную подписку, длительную подписку, транзакции и все, что предусмотрено в JMS API.
Сначала давайте рассмотрим установку и настройку OpenMQ, а затем рассмотрим пример, чтобы увидеть, что меняется между использованием JMS API и JMS API через транспорт HTTP. Процесс установки выглядит следующим образом:
- Загрузите OpenMQ с https://mq.dev.java.net/ это должен быть zip- файл, который отличается для каждой платформы.
- Установите MQ, разархивировав вышеуказанный файл и запустив ./installer или installer.bat или около того.
- После завершения установки перейдите в папку install_folder / var / mq / instances / imqbroker / props и откройте config.properties в текстовом редакторе, таком как блокнот, emeditor или gedit.
- Добавьте следующую строку в конец вышеуказанного файла:
Проект OpenMQ предоставляет веб-приложение Java EE, которое взаимодействует с брокером OpenMQ с одной стороны и реализацией Sun JMS с другой стороны. Взаимодействие этого приложения с клиентом и MQ-брокером в значительной степени настраивается в различных аспектах, таких как номер порта брокера, входящий опрос клиента, адрес брокера и так далее.
imq.service.activelist=jms,admin,httpjms
Теперь давайте посмотрим, как мы можем публиковать некоторые сообщения, этот пример кода предполагает, что мы настроили посредник сообщений, и предполагает, что у нас есть следующие два файла JAR в пути к классам. Эти файлы JAR доступны в папке install_folder / mq / lib /
- imq.jar
- jms.jar
Теперь код издателя:
public class Publisher {
public void publish(String messageContent) {
try {
String addressList = "http://127.0.0.1:8080/imqhttp/tunnel";
com.sun.messaging.TopicConnectionFactory topicConnectionFactory = new com.sun.messaging.TopicConnectionFactory();
topicConnectionFactory.setProperty(com.sun.messaging.ConnectionConfiguration.imqAddressList, addressList);
javax.jms.Topic top;
javax.jms.Connection con = topicConnectionFactory.createTopicConnection("admin", "admin");
javax.jms.Session session = con.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
top = session.createTopic("DIRECT_TOPIC");
MessageProducer prod = session.createProducer(top);
Message textMessage = session.createTextMessage(messageContent);
prod.send(textMessage);
prod.close();
session.close();
con.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
public static void main(String args[]) {
Publisher p = new Publisher();
for (int i = 1; i < 10; i++) {
p.publish("Sample Text Message Content: " + i);
}
}
}
Как вы можете видеть, единственное отличие — это URL-адрес соединения, который использует http вместо mq и указывает на адрес контейнера сервлета, а не на адрес прослушивания брокера.
Пример кода подписчика следует аналогичной схеме. Здесь я пишу пример постоянного подписчика, чтобы вы могли видеть, что мы можем использовать надежную подписку через HTTP. Но вы должны заметить, что HTTP-транспорт использует опрос и постоянно открытый канал связи, что может привести к некоторой перегрузке на сервере.
class SimpleListener implements MessageListener {
public void onMessage(Message msg) {
System.out.println("On Message Called");
if (msg instanceof TextMessage) {
try {
System.out.print(((TextMessage) msg).getText());
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
}
public class Subscriber {
/**
* @param args the command line arguments
*/
public void subscribe(String clientID, String susbscritpionID) {
try {
// TODO code application logic here
String addressList = "http://127.0.0.1:8080/imqhttp/tunnel";
com.sun.messaging.TopicConnectionFactory topicConnectionFactory = new com.sun.messaging.TopicConnectionFactory();
topicConnectionFactory.setProperty(com.sun.messaging.ConnectionConfiguration.imqAddressList, addressList);
javax.jms.Topic top;
javax.jms.Connection con = topicConnectionFactory.createTopicConnection("admin", "admin");
con.setClientID(clientID);
javax.jms.Session session = con.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
top = session.createTopic("DIRECT_TOPIC");
TopicSubscriber topicSubscriber = session.createDurableSubscriber(top, susbscritpionID);
topicSubscriber.setMessageListener(new SimpleListener());
con.start();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
public static void main(String args[]) {
Subscriber sub = new Subscriber();
sub.subscribe("C19", "C1_011");
}
}
Теперь, как вы можете протестировать весь пример и контролировать MQ? это очень просто, используя предоставленные инструменты.
Выполните следующие шаги, чтобы протестировать общую систему долговременной подписки:
- Запустить подписчика
- Запустить другого подписчика с другим идентификатором клиента
- Запустите издателя один или два раза
- Убей второго подписчика
- Запустите издателя один раз
- Запустите подписчика, и вы увидите, что подписчик будет извлекать все сообщения, поступившие после его закрытия.
Обратите внимание, что у нас не может быть двух отдельных клиентов с одинаковым идентификатором клиента, поскольку брокер не сможет определить, какому клиенту он должен отправлять сообщения.
./imqcmd -b 127.0.0.1:7979 метрики dst -tt -n DIRECT_TOPIC Команда запросит имя пользователя и пароль, предоставит для них admin / admin Пример кода для этой записи можно найти здесь . Пример кода представляет собой проект NetBeans с исходным кодом издателя и подписчика. Полная документация OpenMQ доступна в его центре документации . Вы можете увидеть, как можно изменять разные номера портов или настраивать разные свойства взаимодействия через веб-приложение Broker и HTTP-туннелирования. |