Статьи

Учебник по интеграции с Spring 3 HornetQ 2.1

Используйте новую сверхвысокопроизводительную систему обмена сообщениями от JBoss до Spring Framework.

HornetQ — это проект с открытым исходным кодом для создания многопротокольной, встраиваемой, высокопроизводительной, кластерной, асинхронной системы обмена сообщениями. Он написан на Java и работает на любой платформе с Java 5 или более поздней версией. Высокопроизводительный журнал HornetQ, обеспечивающий высокую производительность, обеспечивает постоянную производительность обмена сообщениями со скоростью, обычно наблюдаемой для непостоянных сообщений. Непостоянная производительность обмена сообщениями также чрезвычайно высока. Среди других «сексуальных» функций, HornetQ предлагает репликацию сервера и автоматическое аварийное переключение клиента для устранения потерянных или дублированных сообщений в случае сбоя сервера, может быть настроен для кластерного использования, когда географически распределенные кластеры серверов HornetQ знают, как загружать сообщения балансировки, и предоставляет комплексные API управления для управления и мониторинга всех серверов HornetQ .

В этом уроке мы покажем вам, как использовать HornetQ через среду Spring . Чтобы сделать вещи более интересными, мы продолжим с того места, которое мы оставили в предыдущей статье об интеграции Spring GWT Hibernate JPA Infinispan . Мы собираемся использовать наш проект GWTSpringInfinispan и наделить его функциональностью обмена сообщениями! Конечно, вы можете следовать этой статье, чтобы интегрировать свой Spring- проект с HornetQ .

Мы будем использовать HornetQ версии 2.1.0. Финал, который вы можете скачать здесь . Нам также понадобится библиотека jboss-logging-spi. Будет использоваться JBoss Logging SPI версии 2.1.1. GA, которую вы можете скачать из репозитория JBoss Maven здесь

Чтобы правильно интегрировать Spring и HornetQ во время выполнения, мы должны предоставить все необходимые библиотеки для веб-приложения. Поэтому скопируйте файлы, перечисленные ниже в / war / WEB-INF / lib (скопируйте соответствующие файлы, если вы используете разные версии)

Из дистрибутива HornetQ

  • /lib/hornetq-bootstrap.jar
  • /lib/hornetq-core.jar
  • /lib/hornetq-jms.jar
  • /lib/hornetq-logging.jar
  • /lib/jnpserver.jar
  • /lib/netty.jar

Библиотека JBoss Logging SPI

  • JBoss-каротаж СПИ-2.1.1.GA.jar

Наконец, для правильной работы HornetQ во время выполнения, несколько файлов конфигурации должны быть доступны в classpath веб-приложения. Как уже упоминалось во вводном разделе этого руководства, мы можем создавать кластеры серверов HornetQ для балансировки нагрузки и обмена сообщениями с высокой доступностью, или мы можем использовать HornetQ в некластерной среде. В любом случае требуется другая конфигурация. Дистрибутив HornetQ содержит все разновидности файлов конфигурации в каталоге / config. Мы будем использовать кластерную конфигурацию jboss-as-5, чтобы использовать все возможности платформы обмена сообщениями. Скопируйте следующие файлы из каталога / config / jboss-as-5 / cluster в ваш пакет application / resources:

  • hornetq-configuration.xml — это основной файл конфигурации HornetQ
  • hornetq-jms.xml — файл конфигурации службы JMS на стороне сервера

Если вы не развертываете на сервере приложений JBoss , отредактируйте файл hornetq-configuration.xml и замените «$ {jboss.server.data.dir}» на «$ {data.dir: ../ data}»

Скопируйте следующий файл из каталога / config / stand-alone / cluster в ваш пакет application / resources:

  • hornetq-users.xml — файл учетных данных пользователя для менеджера безопасности HornetQ

Прежде чем мы перейдем к фактическим примерам интеграции и примерам реализации клиента, давайте рассмотрим несколько полезных деталей об архитектуре сервера HornetQ и файлах конфигурации, упомянутых выше.

Сервер HornetQ не говорит на JMS и фактически ничего не знает о JMS , это сервер обмена сообщениями, не зависящий от протокола, предназначенный для использования с несколькими различными протоколами. Клиенты HornetQ , потенциально на разных физических машинах, взаимодействуют с сервером HornetQ . HornetQ в настоящее время предоставляет два API для обмена сообщениями на стороне клиента:

  • Базовый клиент API. Это простой интуитивно понятный Java API, который позволяет использовать полный набор функций обмена сообщениями без каких-либо сложностей JMS.
  • API клиента JMS . Стандартный JMS API доступен на стороне клиента

Семантика JMS реализуется тонким фасадным слоем JMS на стороне клиента. Когда пользователь использует API-интерфейс JMS на стороне клиента, все взаимодействия JMS преобразуются в операции на основном клиентском API-интерфейсе HornetQ, а затем передаются по проводному каналу с использованием проводного формата HornetQ . Сервер всегда имеет дело только с основными взаимодействиями API.

Стандартная конфигурация автономного сервера обмена сообщениями включает в себя основной сервер обмена сообщениями, службу JMS и службу JNDI .

Роль службы JMS заключается в развертывании и привязке к JNDI любых экземпляров JMS Queue, Topic и ConnectionFactory из любых файлов конфигурации hornetq-jms.xml на стороне сервера. Он также предоставляет простой API управления для создания и уничтожения экземпляров Queues, Topics и ConnectionFactory, к которым можно получить доступ через JMX или соединение. Это отдельный сервис для главного сервера HornetQ , поскольку главный сервер не зависит от JMS . Если вы не хотите развертывать какие-либо экземпляры JMS Queue, Topic или ConnectionFactory через конфигурацию XML на стороне сервера и не требует API управления JMS на стороне сервера, вы можете отключить эту службу.

Сервер JNDI также включен, поскольку JNDI является распространенным требованием при использовании JMS для поиска экземпляров Queues, Topics и ConnectionFactory. Если вам не требуется JNDI, эта служба также может быть отключена. HornetQ позволяет программно создавать JMS и основные объекты непосредственно на стороне клиента, а не искать их из JNDI , поэтому сервер JNDI не всегда является требованием.

HornetQ поставляется с базовой реализацией диспетчера безопасности, которая получает учетные данные пользователя
из файла hornetq-users.xml. Этот файл содержит информацию о пользователе, пароле и роли.

Мы собираемся использовать JMS- сервис HornetQ и выполнять клиентский код JMS в той же JVM, что и сервер имен, поэтому мы должны создать файл «jndi.properties» и поместить его в наш пакет application / resources вместе с остальными файлами конфигурации HornetQ, описанными выше. , Содержимое файла «jndi.properties» должно быть следующим:

1
java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory

Прежде чем продолжить, мы должны позаботиться о зависимостях для нашего проекта Eclipse . Следующие jar-файлы должны быть включены в путь сборки Java проекта:

  • jms.jar

Теперь давайте интегрируем Spring с HornetQ . Найдите файл applicationContext.xml в папке / war / WEB-INF и добавьте следующие компоненты:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<bean name="namingServerImpl" class="org.jnp.server.NamingBeanImpl" init-method="start" destroy-method="stop" />
 
<bean name="namingServer" class="org.jnp.server.Main" init-method="start" destroy-method="stop">
 <property name="namingInfo" ref="namingServerImpl" />
 <property name="port" value="1099" />
 <property name="bindAddress" value="localhost" />
 <property name="rmiPort" value="1098" />
 <property name="rmiBindAddress" value="localhost" />
</bean>
 
<bean name="mbeanServer" class="java.lang.management.ManagementFactory" factory-method="getPlatformMBeanServer" />
 
<bean name="fileConfiguration" class="org.hornetq.core.config.impl.FileConfiguration" init-method="start" destroy-method="stop" />
 
<bean name="hornetQSecurityManagerImpl" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl" />
 
<!-- The core server -->
<bean name="hornetQServerImpl" class="org.hornetq.core.server.impl.HornetQServerImpl">
 <constructor-arg ref="fileConfiguration" />
 <constructor-arg ref="mbeanServer" />
 <constructor-arg ref="hornetQSecurityManagerImpl" />
</bean>
 
<!-- The JMS server -->
<bean name="jmsServerManagerImpl" class="org.hornetq.jms.server.impl.JMSServerManagerImpl" init-method="start" destroy-method="stop" depends-on="namingServer">
 <constructor-arg ref="hornetQServerImpl" />
</bean>

Если вы намереваетесь настроить Spring и HornetQ в автономной среде, вышеупомянутой конфигурации должно быть достаточно. В нашем случае, когда мы разворачиваем веб-приложение на Apache — Tomcat , необходимо внести незначительные изменения.

Apache — Tomcat предоставляет службу JNDI для всех развернутых веб-приложений для настройки атрибутов и ресурсов среды. Кроме того, контекст именования, доступный во время выполнения, доступен только для чтения, поскольку управление средой и ресурсами осуществляется с использованием файлов дескрипторов развертывания, таких как web.xml и context.xml. Кроме того, при запуске Apache — Tomcat инициализирует свою среду JNDI, используя системные свойства. В результате клиенты «в ВМ», которые используют класс InitialContext JNDI (без предоставления параметров среды конструктора) для выполнения операций именования, всегда получают интерфейс контекста реализации JNDI Apache — Tomcat .

Чтобы сервер JNDI HornetQ мог сосуществовать со службой имен Apache — Tomcat и службой JMS HornetQ, чтобы иметь возможность привязывать экземпляры очередей, тем и ConnectionFactory к JNDI , мы должны выполнить следующие действия:

  • Отключите Apache — сервис именования Tomcat для нашего веб-приложения
  • Сконфигурируйте сервер JNDI HornetQ, чтобы он не использовал существующую службу JNDI, если она доступна, но всегда создавал новую

Чтобы отключить Apache — сервис именования Tomcat для нашего веб-приложения, мы должны выполнить следующие действия:

  • Создайте папку META-INF в папке / war нашего проекта
  • Создайте файл context.xml, содержащий следующую контекстную директиву:
1
<Context override="true" useNaming="false" />

Чтобы сервер JNDI HornetQ не использовал существующую службу JNDI, если она доступна, мы должны добавить следующее свойство в bean-компонент Spring «namingServerImpl»:

1
<property name="useGlobalService" value="false" />

Чтобы использовать службу обмена сообщениями HornetQ через Spring, мы можем либо создать фабрику соединений, либо искать из JNDI . Фабрика соединений и пример «JmsTemplate» представлены ниже:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
<bean name="connectionFactory" class="org.hornetq.jms.client.HornetQConnectionFactory" >
 <constructor-arg>
  <bean class="org.hornetq.api.core.TransportConfiguration">
   <constructor-arg value="org.hornetq.integration.transports.netty.NettyConnectorFactory" />
   <constructor-arg>
    <map key-type="java.lang.String" value-type="java.lang.Object">
     <entry key="port" value="5445"></entry>
    </map>
   </constructor-arg>
  </bean>
 </constructor-arg>
</bean>
 
<bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
 <property name="connectionFactory" ref="connectionFactory"></property>
</bean>

Поиск JNDI для примера фабрики соединений показан ниже:

1
2
3
4
5
<bean id="inVMConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean" depends-on="jmsServerManagerImpl">
 <property name="jndiName">
  <value>java:/ConnectionFactory</value>
 </property>
</bean>

Мы будем использовать метод поиска JNDI для получения фабрики соединений, поэтому добавьте указанную выше конфигурацию в файл applicationContext.xml.

Вот и вся конфигурация, которую мы должны сделать, давайте продолжим реализовывать гипотетический бизнес-кейс, используя нашу недавно интегрированную службу обмена сообщениями. Наше веб-приложение предоставляет функции для добавления, обновления и извлечения данных «сотрудника». Предположим, что мы хотим получать уведомления каждый раз, когда выполняется добавление или изменение данных «сотрудника». Для простоты уведомление будет записано в журнал на консоли Apache — Tomcat . Мы собираемся внедрить производителя JMS для отправки сообщения в очередь «Уведомления» каждый раз, когда пользователь выполняет обновление данных «сотрудника». Кроме того, должен быть реализован потребитель JMS для обработки сообщений очереди «Уведомления» и входа в консоль.

Чтобы создать очередь «Уведомления» и связать ее с JNDI под именем «/ queue / Notifications», добавьте следующее в файл hornetq-jms.xml:

1
2
3
<queue name="Notifications">
 <entry name="/queue/Notifications"/>
</queue>

Чтобы иметь возможность использовать вновь созданную очередь «Уведомления» через компоненты Spring , добавьте следующую директиву поиска JNDI в файл applicationContext.xml:

1
2
3
4
5
<bean id="notificationsQueue" class="org.springframework.jndi.JndiObjectFactoryBean" depends-on="jmsServerManagerImpl">
 <property name="jndiName">
  <value>/queue/Notifications</value>
 </property>
</bean>

Поскольку как JMS- производитель, так и потребитель являются компонентами на стороне сервера, они должны быть помещены в подпакет / server нашего приложения. Мы решили создать их в подпакете / server / utils, потому что они по своей природе являются служебными классами. Примеры классов JMS производителей и потребителей приведены ниже:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.javacodegeeks.gwtspring.server.utils;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
@Service("notificationsProducer")
public class NotificationsProducer {
 
 @Autowired
 Queue notificationsQueue;
 
 @Autowired
 ConnectionFactory inVMConnectionFactory;
 
 private Connection notificationsQueueConnection;
 private Session notificationsQueueSession;
 private MessageProducer notificationsQueueProducer;
 
 
 @PostConstruct
 public void init() throws Exception {
  notificationsQueueConnection = inVMConnectionFactory.createConnection();
  notificationsQueueSession = notificationsQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  notificationsQueueProducer = notificationsQueueSession.createProducer(notificationsQueue);
  notificationsQueueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 }
 
 @PreDestroy
 public void destroy() throws Exception {
  if(notificationsQueueConnection != null)
   notificationsQueueConnection.close();
 }
 
 public void sendNotification(final String message) throws Exception {
 
  TextMessage textMessage = notificationsQueueSession.createTextMessage(message);
  notificationsQueueProducer.send(textMessage);
 
 }
 
}

И потребитель,

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.javacodegeeks.gwtspring.server.utils;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
@Service("notificationsConsumer")
public class NotificationsConsumer implements MessageListener {
 
 @Autowired
 Queue notificationsQueue;
 
 @Autowired
 ConnectionFactory inVMConnectionFactory;
 
 private Connection notificationsQueueConnection;
 
 @PostConstruct
 public void init() throws Exception {
  notificationsQueueConnection = inVMConnectionFactory.createConnection();
  Session notificationsQueueSession = notificationsQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  MessageConsumer notificationsQueueConsumer = notificationsQueueSession.createConsumer(notificationsQueue);
  notificationsQueueConsumer.setMessageListener(this);
  notificationsQueueConnection.start();
 }
 
 @PreDestroy
 public void destroy() throws Exception {
  if(notificationsQueueConnection != null)
   notificationsQueueConnection.close();
 }
 
 @Override
 public void onMessage(Message message) {
  if (message instanceof TextMessage) {
   try {
    String text = ((TextMessage) message).getText();
    System.out.println("The Notification Message is : \n" + text);
   } catch (JMSException ex) {
     throw new RuntimeException(ex);
   }
  } else {
    throw new IllegalArgumentException("Message must be of type TextMessage");
  }
 }
 
}

Чтобы завершить наш пример бизнес-кейса, мы должны изменить бин Spring «employeeService», чтобы использовать служебный бин «notificationsProducer» для отправки уведомительных сообщений каждый раз, когда пользователь запрашивает сохранение или обновление данных «employee». Мы используем аннотацию «@Autowire», чтобы подключить «messagesProducer» внутри «employeeService» и вызывать операцию «sendNotification» из «tificationProducer », чтобы отправлять уведомление каждый раз, когда запрашивается операция saveOrUpdateEmployee» «employeeService» «. Полный код показан ниже:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.javacodegeeks.gwtspring.server.services;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
 
import com.javacodegeeks.gwtspring.server.dao.EmployeeDAO;
import com.javacodegeeks.gwtspring.server.utils.NotificationsProducer;
import com.javacodegeeks.gwtspring.shared.dto.EmployeeDTO;
import com.javacodegeeks.gwtspring.shared.services.EmployeeService;
 
@Service("employeeService")
public class EmployeeServiceImpl implements EmployeeService {
 
 @Autowired
 private EmployeeDAO employeeDAO;
 
 @Autowired
 NotificationsProducer notificationsProducer;
 
 @PostConstruct
 public void init() throws Exception {
 }
 
 @PreDestroy
 public void destroy() {
 }
 
 @Transactional(propagation=Propagation.SUPPORTS, rollbackFor=Exception.class)
 public EmployeeDTO findEmployee(long employeeId) {
 
  return employeeDAO.findById(employeeId);
 
 }
 
 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void saveEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {
 
  EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);
 
  if(employeeDTO == null) {
   employeeDTO = new EmployeeDTO(employeeId, name,surname, jobDescription);
   employeeDAO.persist(employeeDTO);
  }
 
 }
 
 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void updateEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {
 
  EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);
 
  if(employeeDTO != null) {
   employeeDTO.setEmployeeName(name);
   employeeDTO.setEmployeeSurname(surname);
   employeeDTO.setJob(jobDescription);
  }
 
 }
 
 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void deleteEmployee(long employeeId) throws Exception {
 
  EmployeeDTO employeeDTO = employeeDAO.findById(employeeId);
 
  if(employeeDTO != null)
   employeeDAO.remove(employeeDTO);
 
 }
 
 @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)
 public void saveOrUpdateEmployee(long employeeId, String name, String surname, String jobDescription) throws Exception {
 
  EmployeeDTO employeeDTO = new EmployeeDTO(employeeId, name,surname, jobDescription);
 
  employeeDAO.merge(employeeDTO);
 
  notificationsProducer.sendNotification("Save Or Update Employee with values : \nID : " + employeeId + "\nName : " + name + "\nSurname : " + surname + "\nJob description : " + jobDescription);
 
 }
 
}

Это оно! Для развертывания веб-приложения просто скопируйте папку / war в папку «webapps» Apache — Tomcat . Вы можете изменить имя папки war на любое другое, желательно переименовать его после имени проекта, например, GWTSpringInfinispanHornetQ

Перед запуском приложения не забудьте создать схему базы данных, здесь «javacodegeeks».

Для запуска приложения укажите ваш браузер по следующему адресу

HTTP: // локальный: 8080 / GWTSpringInfinispanHornetQ /

Если все прошло хорошо, вы должны увидеть свою главную веб-страницу. Должны отображаться два текстовых поля, за которыми следует кнопка. В первом текстовом поле вы можете сохранить или обновить сотрудника в базе данных. Введите в качестве входных данных идентификатор, имя, фамилию и описание задания, разделенные пробелом. При нажатии на кнопку «SaveOrUpdate» предоставленная информация будет сохранена в базе данных. Для существующих записей «сотрудник» (с тем же идентификатором) будет выполнено обновление. В обоих случаях журнал регистрации должен быть записан. Формат журнала должен быть следующим:

Уведомление:
Сохранить или обновить сотрудника значениями:
ID: ххх
Имя: ххх
Фамилия: ххх
Описание работы: ххх

Где «ххх» должно быть информацией «сотрудника», которую вы предоставили. Пожалуйста, смотрите файлы журнала (catalina.out). Второе текстовое поле используется для извлечения существующих записей «сотрудника». Введите идентификатор сотрудника и нажмите кнопку «Получить». Если «сотрудник» существует, вы должны увидеть идентификатор «сотрудника», имя, фамилию и описание должности.

Вы можете скачать проект отсюда (необходимые сторонние библиотеки, как описано в начале и предыдущие статьи не включены)

Повеселись!

Джастин

Статьи по Теме :