Эта статья является частью нашего курса Академии под названием Spring Integration для EAI .
В этом курсе вы познакомитесь с шаблонами интеграции корпоративных приложений и с тем, как Spring Integration обращается к ним. Далее вы углубитесь в основы Spring Integration, такие как каналы, преобразователи и адаптеры. Проверьте это здесь !
Содержание
1. Введение
Этот учебник посвящен объяснению того, как мы можем интегрировать наше приложение со средствами Spring Integration и JMS. Для этого я сначала покажу вам, как установить Active MQ, который будет нашим брокером в этом уроке. В следующих разделах будут показаны примеры отправки и получения сообщений JMS с использованием адаптеров каналов JMS Spring Integration. Следуя этим примерам, мы увидим некоторые способы настройки этих вызовов путем настройки преобразования сообщений и разрешения получателя.
В последней части этого руководства кратко показано, как использовать Spring Integration с протоколом AMQP . Он пройдет установку RabbitMQ и в итоге получит базовый пример обмена сообщениями.
Этот урок состоит из следующих разделов:
- Вступление
- Подготовка окружающей среды
- Адаптеры JMS: прием
- JMS-адаптеры: отправка
- Использование шлюзов
- Преобразование сообщений
- Поддерживаемые JMS каналы сообщений
- Динамическое разрешение пункта назначения
- Интеграция AMQP
2. Подготовка окружающей среды
Если вы хотите отправлять сообщения через JMS, вам сначала понадобится брокер. Примеры, включенные в это руководство, выполняются через Active MQ, брокер обмена сообщениями с открытым исходным кодом. В этом разделе я помогу вам установить сервер и реализовать простое приложение Spring, которое проверит, что все настроено правильно. Объяснение основано на системе Windows. Если у вас уже установлен сервер, просто пропустите этот раздел.
Первым шагом является загрузка сервера Apache MQ с Apache.org . После загрузки просто распакуйте его в папку по вашему выбору.
Для запуска сервера вам просто нужно выполнить файл activemq, который находится в папке apache-activemq-5.9.0 \ bin .
Хорошо, сервер работает. Теперь нам просто нужно реализовать приложение. Мы собираемся создать производителя, потребителя, файл конфигурации Spring и тест.
Производитель
Вы можете использовать любой класс Java вместо моего объекта TicketOrder .
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
public class JmsProducer { @Autowired @Qualifier("jmsTemplate") private JmsTemplate jmsTemplate; public void convertAndSendMessage(TicketOrder order) { jmsTemplate.convertAndSend(order); } public void convertAndSendMessage(String destination, TicketOrder order) { jmsTemplate.convertAndSend(destination, order); }} |
Потребитель
|
1
2
3
4
5
6
7
8
|
public class SyncConsumer { @Autowired private JmsTemplate jmsTemplate; public TicketOrder receive() { return (TicketOrder) jmsTemplate.receiveAndConvert("test.sync.queue"); }} |
Конфигурационный файл Spring
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
<bean id="consumer" class="xpadro.spring.integration.consumer.SyncConsumer"/><bean id="producer" class="xpadro.spring.integration.producer.JmsProducer"/><!-- Infrastructure --><bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /></bean><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="connectionFactory"/></bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory"/> <property name="defaultDestination" ref="syncTestQueue"/></bean><!-- Destinations --><bean id="syncTestQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="test.sync.queue"/></bean> |
Тест
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml"})@RunWith(SpringJUnit4ClassRunner.class)public class TestJmsConfig { @Autowired private JmsProducer producer; @Autowired private SyncConsumer consumer; @Test public void testReceiving() throws InterruptedException, RemoteException { TicketOrder order = new TicketOrder(1, 5, new Date()); //Sends the message to the jmsTemplate's default destination producer.convertAndSendMessage(order); Thread.sleep(2000); TicketOrder receivedOrder = consumer.receive(); assertNotNull(receivedOrder); assertEquals(1, receivedOrder.getFilmId()); assertEquals(5, receivedOrder.getQuantity()); }} |
Если тест пройден, у вас все правильно настроено. Теперь мы можем перейти к следующему разделу.
3. JMS-адаптеры: прием
Spring Integration предоставляет несколько адаптеров и шлюзов для получения сообщений из очереди или темы JMS. Эти адаптеры кратко обсуждаются ниже:
- Адаптер входящего канала . Он внутренне использует шаблон JmsTemplate для активного получения сообщений из очереди или темы JMS.
- Канальный управляемый сообщениями адаптер : он внутренне использует контейнер Spring MessageListener для пассивного получения сообщений.
3.1. Адаптеры входящего канала: активный прием
В этом разделе объясняется, как использовать первый адаптер, описанный в предыдущем разделе.
Адаптер входящего канала JMS активно опрашивает очередь, чтобы извлечь из нее сообщения. Так как он использует опросчик, вам придется определить его в конфигурационном файле Spring. Как только адаптер извлечет сообщение, оно будет отправлено в систему обмена сообщениями через указанный канал сообщений. Затем мы можем обработать сообщение с использованием конечных точек, таких как преобразователи, фильтры и т. Д. Или отправить его активатору службы.
В этом примере извлекается сообщение о заказе билета из очереди JMS и отправляется его активатору службы, который обработает его и подтвердит заказ. Заказ подтверждается отправкой его в какое-то хранилище, в котором есть простой список со всеми зарегистрированными заказами.
Мы используем того же производителя, что и в разделе «2 подготовки окружающей среды»:
|
01
02
03
04
05
06
07
08
09
10
|
<bean id="producer" class="xpadro.spring.integration.producer.JmsProducer"/><!-- Infrastructure --><!-- Connection factory and jmsTemplate configuration --><!-- as seen in the second section --><!-- Destinations --><bean id="toIntQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="int.sync.queue"/></bean> |
Тест будет использовать производителя для отправки сообщения в «toIntQueue». Теперь мы собираемся настроить конфигурацию Spring Integration:
интеграция-jms.xml
|
1
2
3
4
5
6
7
8
9
|
<context:component-scan base-package="xpadro.spring.integration"/><int-jms:inbound-channel-adapter id="jmsAdapter" destination="toIntQueue" channel="jmsChannel"/><int:channel id="jmsChannel"/><int:service-activator method="processOrder" input-channel="jmsChannel" ref="ticketProcessor"/><int:poller id="poller" default="true" fixed-delay="1000"/> |
Адаптер входящего канала JMS будет использовать определенный опросчик для извлечения сообщений из «toIntQueue». Для адаптера необходимо настроить опросник, иначе он выдаст исключение времени выполнения. В этом случае мы определили поллер по умолчанию. Это означает, что любая конечная точка, которая нуждается в опросчике, будет использовать эту. Если вы не настроите опрашивающий по умолчанию, вам нужно будет определить определенный опрашивающий для каждой конечной точки, которая активно получает сообщения.
Потребитель
Активатор службы — это просто bean-компонент (автоматически определяется при сканировании компонента):
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
@Component("ticketProcessor")public class TicketProcessor { private static final Logger logger = LoggerFactory.getLogger(TicketProcessor.class); private static final String ERROR_INVALID_ID = "Order ID is invalid"; @Autowired private OrderRepository repository; public void processOrder(TicketOrder order) { logger.info("Processing order {}", order.getFilmId()); if (isInvalidOrder(order)) { logger.info("Error while processing order [{}]", ERROR_INVALID_ID); throw new InvalidOrderException(ERROR_INVALID_ID); } float amount = 5.95f * order.getQuantity(); TicketConfirmation confirmation = new TicketConfirmation("123", order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount); repository.confirmOrder(confirmation); } private boolean isInvalidOrder(TicketOrder order) { if (order.getFilmId() == -1) { return true; } return false; }} |
В предыдущем фрагменте кода метод TicketOrder получает объект TicketOrder и непосредственно обрабатывает его. Однако вместо этого вы можете определить Message <?> Или Message <TicketOrder>, чтобы получить сообщение. Таким образом, вы будете иметь доступ как к полезной нагрузке сообщения, так и к его заголовкам.
Также обратите внимание, что метод возвращает void. Нам не нужно ничего возвращать, поскольку поток сообщений заканчивается здесь. При необходимости вы также можете определить канал ответа для сервисного адаптера и вернуть подтверждение. Кроме того, мы бы затем подписали конечную точку или шлюз на этот канал ответа, чтобы отправить подтверждение в другую очередь JMS, отправить его веб-службе или сохранить, например, в базе данных.
Наконец, давайте посмотрим на тест, чтобы увидеть, как он все выполняется:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml", "/xpadro/spring/integration/test/int-jms-config.xml"})@RunWith(SpringJUnit4ClassRunner.class)public class TestIntegrationJmsConfig { @Autowired private JmsProducer producer; @Autowired private OrderRepository repository; @Test public void testSendToIntegration() throws InterruptedException, RemoteException { TicketOrder order = new TicketOrder(1, 5, new Date()); //Sends the message to the jmsTemplate's default destination producer.convertAndSendMessage("int.sync.queue", order); Thread.sleep(4000); assertEquals(1, repository.getConfirmations().size()); assertNotNull(repository.getConfirmations().get(0)); TicketConfirmation conf = repository.getConfirmations().get(0); assertEquals("123", conf.getId()); }} |
Я установил Thread.sleep в четыре секунды, чтобы дождаться отправки сообщения. Мы могли бы использовать цикл while, чтобы проверить, получено ли сообщение до истечения времени ожидания.
3.2. Адаптеры входящего канала: пассивный прием
Во второй части раздела приема JMS используется адаптер канала, управляемый сообщениями. Таким образом, как только сообщение будет отправлено в очередь, оно будет доставлено адаптеру без необходимости использования программ опроса. Именно по каналу сообщений мы доставляем сообщение своим подписчикам.
Пример очень похож на тот, что мы видели в предыдущем разделе. Я просто покажу, какие изменения внесены в конфигурацию.
Единственное, что я изменил в предыдущем примере, — это конфигурация пружинной интеграции:
|
1
2
3
4
5
6
7
|
<context:component-scan base-package="xpadro.spring.integration"/><int-jms:message-driven-channel-adapter id="jmsAdapter" destination="toIntQueue" channel="jmsChannel" /><int:channel id="jmsChannel"/><int:service-activator method="processOrder" input-channel="jmsChannel" ref="ticketProcessor"/> |
Я удалил модуль опроса и изменил входящий адаптер JMS для адаптера канала, управляемого сообщениями. Это оно; адаптер будет пассивно получать сообщения и доставлять их в jmsChannel .
Учтите, что адаптеру приемника сообщений требуется хотя бы одна из следующих комбинаций:
- Контейнер слушателя сообщений.
- Фабрика связи и пункт назначения.
В нашем примере мы использовали второй вариант. Место назначения указывается в конфигурации адаптера, а фабрика соединений определяется в файле jms-config, который также импортируется тестом.
4. JMS-адаптеры: отправка
В предыдущем разделе мы видели, как получать сообщения, отправленные в очередь JMS внешней системой. В этом разделе показаны адаптеры исходящих каналов, которые позволяют отправлять сообщения JMS за пределы нашей системы.
В отличие от входящих адаптеров, существует только один тип исходящих адаптеров. Этот адаптер использует внутренний JmsTemplate для отправки сообщения, и для настройки этого адаптера вам нужно будет указать хотя бы одно из следующего:
- Шаблон Jms.
- Фабрика связи и пункт назначения.
Как и во входящем примере, мы используем второй вариант для отправки сообщения в очередь JMS. Конфигурация выглядит следующим образом:
Для этого примера мы собираемся создать новую очередь в конфигурации jms (jms-config.xml). Именно здесь наше приложение Spring Integration отправит сообщение:
|
1
2
3
|
<bean id="toJmsQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="to.jms.queue"/></bean> |
Хорошо, теперь мы настраиваем конфигурацию интеграции с исходящим адаптером JMS:
|
1
2
3
4
5
6
7
8
|
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/> <int:channel id="requestChannel"/><int-jms:outbound-channel-adapter id="jmsAdapter" channel="requestChannel" destination="toJmsQueue"/> |
Мы используем шлюз в качестве входа в нашу систему обмена сообщениями. Тест будет использовать этот интерфейс для отправки нового объекта TicketOrder . Шлюз получит сообщение и поместит его в канал requestChannel . Поскольку это прямой канал , он будет отправлен на адаптер исходящего канала JMS.
Адаптер получает сообщение интеграции Spring. Затем он может отправить сообщение двумя способами:
- Преобразуйте сообщение в сообщение JMS. Это делается путем установки атрибута адаптера «extract-payload» в значение true, которое является значением по умолчанию. Это вариант, который мы использовали в примере.
- Отправьте сообщение как есть, сообщение интеграции Spring. Вы можете сделать это, установив для атрибута «extract-payload» значение false.
Это решение зависит от того, какая система ожидает вашего сообщения. Если другое приложение является приложением Spring Integration, вы можете использовать второй подход. В противном случае используйте значение по умолчанию. В нашем примере с другой стороны есть простое Spring JMS-приложение. Таким образом, мы должны выбрать первый вариант.
Продолжая наш пример, теперь мы посмотрим на тест, который использует интерфейс шлюза для отправки сообщения и пользовательский потребитель для его получения. В этом тесте потребитель будет играть роль приложения JMS, которое использует jmsTemplate для извлечения его из очереди JMS:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml", "/xpadro/spring/integration/test/int-jms-out-config.xml"})@RunWith(SpringJUnit4ClassRunner.class)public class TestIntegrationJmsOutboundConfig { @Autowired private SyncConsumer consumer; @Autowired private TicketService service; @Test public void testSendToJms() throws InterruptedException, RemoteException { TicketOrder order = new TicketOrder(1, 5, new Date()); service.sendOrder(order); TicketOrder receivedOrder = consumer.receive("to.jms.queue"); assertNotNull(receivedOrder); assertEquals(1, receivedOrder.getFilmId()); assertEquals(5, receivedOrder.getQuantity()); }} |
5. Использование шлюзов
Помимо адаптеров каналов Spring Integration предоставляет входящие и исходящие шлюзы. Как вы помните из предыдущих руководств, шлюзы обеспечивают двунаправленную связь с внешними системами, то есть операции отправки и получения или получения и ответа. В этом случае он разрешает операции запроса или повторной попытки.
В этом разделе мы рассмотрим пример использования исходящего шлюза JMS. Шлюз отправит сообщение JMS в очередь и будет ожидать ответа. Если ответ не отправляется обратно, шлюз выдаст исключение MessageTimeoutException .
Конфигурация Spring Integration
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway id="inGateway" default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/> <int:channel id="requestChannel"/><int-jms:outbound-gateway id="outGateway" request-destination="toAsyncJmsQueue" request-channel="requestChannel" reply-channel="jmsReplyChannel"/><int:channel id="jmsReplyChannel"/><int:service-activator method="registerOrderConfirmation" input-channel="jmsReplyChannel" ref="ticketProcessor"/> |
Поток выглядит следующим образом:
- TicketOrder, включенный в Spring Integration Message, попадет в систему обмена сообщениями через шлюз inGateway.
- Шлюз поместит сообщение в канал requestChannel.
- Канал отправляет сообщение своей подписанной конечной точке, исходящему шлюзу JMS.
- Исходящий шлюз JMS извлекает полезную нагрузку из сообщения и переносит его в сообщение JMS.
- Шлюз отправляет сообщение и ожидает ответа.
- Когда приходит ответ, в форме TicketConfirmation, заключенного в сообщение JMS, шлюз получает полезную нагрузку и упаковывает ее в сообщение Spring Integration.
- Сообщение отправляется на канал «jmsReplyChannel», где активатор службы (TicketProcessor) обработает его и зарегистрирует в нашем OrderRepository.
Процессор заказа довольно прост. Он получает TicketConfirmation и добавляет его в хранилище билетов:
|
1
2
3
4
5
6
7
8
9
|
@Component("ticketProcessor")public class TicketProcessor { @Autowired private OrderRepository repository; public void registerOrderConfirmation(TicketConfirmation confirmation) { repository.confirmOrder(confirmation); }} |
Тест
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@RunWith(SpringJUnit4ClassRunner.class)public class TestIntegrationJmsOutGatewayConfig { @Autowired private OrderRepository repository; @Autowired private TicketService service; @Test public void testSendToJms() throws InterruptedException, RemoteException { TicketOrder order = new TicketOrder(1, 5, new Date()); service.sendOrder(order); Thread.sleep(4000); assertEquals(1, repository.getConfirmations().size()); assertNotNull(repository.getConfirmations().get(0)); TicketConfirmation conf = repository.getConfirmations().get(0); assertEquals("321", conf.getId()); }} |
Внешняя система
Чтобы полностью понять пример, я покажу вам, что происходит, когда сообщение доставляется в очередь JMS.
Прослушивая очередь, куда сообщение было отправлено Spring Integration, есть прослушиватель asyncConsumer :
|
1
2
3
4
5
6
7
8
|
<bean id="toAsyncJmsQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="to.async.jms.queue"/></bean><!-- Listeners --><jms:listener-container connection-factory="connectionFactory"> <jms:listener destination="to.async.jms.queue" ref="asyncConsumer"/></jms:listener-container> |
Слушатель получает сообщение, создает новое сообщение с подтверждением заявки и отвечает. Обратите внимание, что мы должны установить идентификатор корреляции ответного сообщения с тем же значением, что и сообщение запроса. Это позволит клиенту узнать, на какое сообщение мы отвечаем. Кроме того, мы устанавливаем пункт назначения для канала ответа, настроенного в сообщении запроса.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
@Component("asyncConsumer")public class AsyncConsumer implements MessageListener { @Autowired private JmsTemplate template; @Override public void onMessage(Message order) { final Message msgOrder = order; TicketOrder orderObject; try { orderObject = (TicketOrder) ((ObjectMessage) order).getObject(); } catch (JMSException e) { throw JmsUtils.convertJmsAccessException(e); } float amount = 5.95f * orderObject.getQuantity(); TicketConfirmation confirmation = new TicketConfirmation("321", orderObject.getFilmId(), orderObject.getOrderDate(), orderObject.getQuantity(), amount); try { template.convertAndSend(msgOrder.getJMSReplyTo(), confirmation, new MessagePostProcessor() { public Message postProcessMessage(Message message) throws JMSException { message.setJMSCorrelationID(msgOrder.getJMSCorrelationID()); return message; } }); } catch (JmsException | JMSException e) { throw JmsUtils.convertJmsAccessException((JMSException) e); } }} |
6. Преобразование сообщений
И адаптеры каналов сообщений, и шлюзы используют конвертер сообщений для преобразования входящих сообщений в типы Java или наоборот. Конвертер должен реализовывать интерфейс MessageConverter:
|
1
2
3
4
5
6
7
|
public interface MessageConverter { <P> Message<P> toMessage(Object object); <P> Object fromMessage(Message<P> message);} |
Spring Integration поставляется с двумя реализациями интерфейса MessageConverter :
MapMessageConverter
Его метод fromMessage создает новый HashMap с двумя ключами:
- полезная нагрузка: значение — полезная нагрузка сообщения (
message.getPayload). - headers: значение — это еще одна HashMap со всеми заголовками из исходного сообщения.
Метод «toMessage» ожидает экземпляр Map с той же структурой (ключи полезной нагрузки и заголовки) и создает сообщение Spring Integration.
SimpleMessageConverter
Это конвертер по умолчанию, используемый адаптерами и шлюзами. Вы можете видеть из исходного кода, что он конвертирует из / в объект:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
public Message<?> toMessage(Object object) throws Exception { if (object == null) { return null; } if (object instanceof Message<?>) { return (Message<?>) object; } return MessageBuilder.withPayload(object).build();}public Object fromMessage(Message<?> message) throws Exception { return (message != null) ? message.getPayload() : null;} |
В любом случае, если вам нужна собственная реализация, вы можете указать свой собственный конвертер в конфигурации канала или шлюза. Например, используя шлюз:
|
1
2
3
|
<int-jms:outbound-gateway id="outGateway" request-destination="toAsyncJmsQueue" request-channel="requestChannel" reply-channel="jmsReplyChannel" message-converter="myConverter"/> |
Просто помните, что ваш конвертер должен реализовывать MessageConverter:
|
1
2
|
@Component("myConverter")public class MyConverter implements MessageConverter { |
7. Каналы сообщений, поддерживаемые JMS
Канальные адаптеры и шлюзы используются для связи с внешними системами. Каналы сообщений с поддержкой JMS используются для отправки и получения сообщений JMS между потребителями и производителями, которые находятся в одном приложении. Хотя в этой ситуации мы все еще можем использовать канальные адаптеры, гораздо проще использовать канал JMS. Отличие от канала сообщений интеграции заключается в том, что канал JMS будет использовать JMS-брокер для отправки сообщения. Это означает, что сообщение не будет просто сохранено в канале в памяти. Вместо этого он будет отправлен поставщику JMS, что позволит также использовать транзакции. Если вы используете транзакции, это будет работать следующим образом:
- Производитель, отправляющий сообщение в канал, поддерживаемый JMS, не запишет его, если откат транзакции.
- Потребитель, подписанный на канал, поддерживаемый JMS, не удалит из него сообщение, если транзакция откатывается.
Для этой функции в Spring Integration предусмотрены оба канала: точка-точка и каналы публикации / подписки. Они настроены ниже:
прямой канал точка-точка
|
1
|
<int-jms:channel id="jmsChannel" queue="myQueue"/> |
канал публикации / подписки
|
1
|
<int-jms:publish-subscribe-channel id="jmsChannel" topic="myTopic"/> |
В следующем примере мы видим простое приложение с двумя конечными точками, связывающимися друг с другом с использованием канала с поддержкой JMS.
конфигурация
Сообщения, отправленные в систему обмена сообщениями (объекты TicketOrder ), поступают к активатору службы, обработчику билетов. Затем этот процессор отправляет заказ ( sendJMS ) в сообщение с поддержкой JMS. Подписанный на этот канал, есть тот же процессор, который будет получать сообщение ( receiveJms ), обрабатывать его, создавая TicketConfirmation и регистрируя его в хранилище билетов:
|
01
02
03
04
05
06
07
08
09
10
11
12
|
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.TicketService"/> <int:channel id="requestChannel"/><int:service-activator method="sendJms" input-channel="requestChannel" output-channel="jmsChannel" ref="ticketJmsProcessor"/><int-jms:channel id="jmsChannel" queue="syncTestQueue"/><int:service-activator method="receiveJms" input-channel="jmsChannel" ref="ticketJmsProcessor"/> |
Процессор
Реализует оба метода: sendJms и receiveJms :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@Component("ticketJmsProcessor")public class TicketJmsProcessor { private static final Logger logger = LoggerFactory.getLogger(TicketJmsProcessor.class); @Autowired private OrderRepository repository; public TicketOrder sendJms(TicketOrder order) { logger.info("Sending order {}", order.getFilmId()); return order; } public void receiveJms(TicketOrder order) { logger.info("Processing order {}", order.getFilmId()); float amount = 5.95f * order.getQuantity(); TicketConfirmation confirmation = new TicketConfirmation("123", order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount); repository.confirmOrder(confirmation); }} |
Тест
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/jms-config.xml", "/xpadro/spring/integration/test/int-jms-jms-config.xml"})@RunWith(SpringJUnit4ClassRunner.class)public class TestIntegrationJmsToJmsConfig { @Autowired private OrderRepository repository; @Autowired private TicketService service; @Test public void testSendToJms() throws InterruptedException, RemoteException { TicketOrder order = new TicketOrder(1, 5, new Date()); service.sendOrder(order); Thread.sleep(4000); assertEquals(1, repository.getConfirmations().size()); assertNotNull(repository.getConfirmations().get(0)); TicketConfirmation conf = repository.getConfirmations().get(0); assertEquals("123", conf.getId()); }} |
Каналы с поддержкой JMS предлагают различные возможности, такие как настройка имени очереди вместо ссылки на очередь или использование резолвера назначения:
|
1
2
|
<int-jms:channel id="jmsChannel" queue-name="myQueue" destination-resolver="myDestinationResolver"/> |
8. Динамическое разрешение пункта назначения
Определитель адресатов — это класс, который позволяет нам преобразовывать имена адресатов в JMS-адреса. Любой конечный распознаватель должен реализовывать следующий интерфейс:
|
1
2
3
4
|
public interface DestinationResolver { Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException;} |
Определители получателей могут быть указаны на адаптерах каналов JMS, шлюзах JMS и каналах с поддержкой JMS. Если вы не сконфигурируете целевой преобразователь явно, Spring будет использовать реализацию по умолчанию, то есть DynamicDestinationResolver . Этот преобразователь объясняется ниже как другие реализации, предоставляемые Spring:
- DynamicDestinationResolver : Разрешает имена получателей в качестве динамических назначений, используя стандартные методы JMS Session.createTopic и Session.createQueue.
- BeanFactoryDestinationResolver : он будет искать в контексте Spring бин с именем, таким как имя получателя, и ожидает, что он будет иметь тип javax.jms.Destination . Если он не может его найти, он выдаст исключение DestinationResolutionException .
- JndiDestinationResolver : предполагается, что имя получателя является местоположением JNDI.
Если мы не хотим использовать динамический распознаватель по умолчанию, мы можем реализовать собственный распознаватель и настроить его в нужной конечной точке. Например, следующий канал с поддержкой JMS использует другую реализацию:
|
1
2
|
<int-jms:channel id="jmsChannel" queue-name="myQueue" destination-resolver="myDestinationResolver"/> |
9. Интеграция AMQP
9.1. Установка
Для установки и запуска сервера RabbitMQ вам просто нужно выполнить шаги, описанные ниже. Если у вас уже установлен сервер, просто пропустите этот раздел.
- Первым шагом является установка erlang, необходимого для сервера RabbitMQ. Перейдите по следующему URL-адресу, загрузите версию своей системы и установите ее:
|
1
|
> rabbitmq-plugins enable rabbitmq_management |
|
1
|
> rabbitmq-server.bat |
Хорошо, теперь мы проверим, что RabbitMQ правильно установлен. Перейдите на http: // localhost: 15672 и войдите, используя «guest» в качестве имени пользователя и пароля. Если вы используете версию до 3.0, то порт будет 55672.
Если вы видите веб-интерфейс, то все готово.
9.2. Демо приложение
Чтобы использовать AMQP с Spring Integration, нам нужно добавить следующие зависимости в наш файл pom.xml:
Spring AMQP (для кролика MQ)
|
1
2
3
4
5
|
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.1.RELEASE</version></dependency> |
Конечные точки AMQP Spring Integration
|
1
2
3
4
5
|
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-amqp</artifactId> <version>3.0.2.RELEASE</version></dependency> |
Теперь мы собираемся создать новый файл конфигурации amqp-config.xml, который будет содержать конфигурацию rabbitMQ (например, jms-config для JMS, которую мы использовали ранее в этом руководстве).
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <rabbit:connection-factory id="connectionFactory" /> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:queue name="rabbit.queue" /> <rabbit:direct-exchange name="rabbit.exchange"> <rabbit:bindings> <rabbit:binding queue="rabbit.queue" key="rabbit.key.binding" /> </rabbit:bindings> </rabbit:direct-exchange></beans> |
Следующий файл — это файл интеграции Spring, который содержит каналы и адаптеры каналов:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd"> <context:component-scan base-package="xpadro.spring.integration.amqp"/> <int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.amqp.service.AMQPService"/> <int:channel id="requestChannel"/> <int-amqp:outbound-channel-adapter channel="requestChannel" amqp-template="amqpTemplate" exchange-name="rabbit.exchange" routing-key="rabbit.key.binding"/> <int-amqp:inbound-channel-adapter channel="responseChannel" queue-names="rabbit.queue" connection-factory="connectionFactory" /> <int:channel id="responseChannel"/> <int:service-activator ref="amqpProcessor" method="process" input-channel="responseChannel"/> </beans> |
Поток выглядит следующим образом:
- Тестовое приложение отправляет сообщение, которое будет простой строкой, на шлюз.
- От шлюза он достигнет адаптера исходящего канала через канал «requestChannel».
- Адаптер исходящего канала отправляет сообщение в очередь «rabbit.queue».
- Подписавшись на эту очередь «rabbit.queue», мы настроили адаптер входящего канала. Он будет получать сообщения, отправленные в очередь.
- Сообщение отправляется активатору услуги по каналу responseChannel.
- Сервисный активатор просто печатает сообщение.
Шлюз, который служит точкой входа в систему обмена сообщениями, содержит единственный метод:
|
1
2
3
4
|
public interface AMQPService { @Gateway public void sendMessage(String message);} |
Активатор службы amqpProcessor очень прост; он получает сообщение и печатает свою полезную нагрузку:
|
1
2
3
4
5
6
7
|
@Component("amqpProcessor")public class AmqpProcessor { public void process(Message<String> msg) { System.out.println("Message received: "+msg.getPayload()); }} |
Чтобы закончить с примером, вот приложение, которое инициирует поток, вызывая сервис, обернутый шлюзом:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
@ContextConfiguration(locations = {"/xpadro/spring/integration/test/amqp-config.xml", "/xpadro/spring/integration/test/int-amqp-config.xml"})@RunWith(SpringJUnit4ClassRunner.class)public class TestIntegrationAMQPConfig { @Autowired private AMQPService service; @Test public void testSendToJms() throws InterruptedException, RemoteException { String msg = "hello"; service.sendMessage(msg); Thread.sleep(2000); }} |
