Эта статья является частью нашего курса Академии под названием Spring Integration для EAI .
В этом курсе вы познакомитесь с шаблонами интеграции корпоративных приложений и с тем, как Spring Integration обращается к ним. Далее вы углубитесь в основы Spring Integration, такие как каналы, преобразователи и адаптеры. Проверьте это здесь !
Содержание
1. Введение
Этот учебник посвящен объяснению того, как мы можем интегрировать наше приложение со средствами Spring Integration и JMS. Для этого я сначала покажу вам, как установить Active MQ, который будет нашим брокером в этом уроке. В следующих разделах будут показаны примеры отправки и получения сообщений JMS с использованием адаптеров каналов JMS Spring Integration. Следуя этим примерам, мы увидим некоторые способы настройки этих вызовов путем настройки преобразования сообщений и разрешения получателя.
В последней части этого руководства кратко показано, как использовать Spring Integration с протоколом AMQP . Он пройдет установку RabbitMQ и в итоге получит базовый пример обмена сообщениями.
Этот урок состоит из следующих разделов:
- Вступление
- Подготовка окружающей среды
- Адаптеры JMS: прием
- JMS-адаптеры: отправка
- Использование шлюзов
- Преобразование сообщений
- Поддерживаемые JMS каналы сообщений
- Динамическое разрешение пункта назначения
- Интеграция AMQP
2. Подготовка окружающей среды
Если вы хотите отправлять сообщения через JMS, вам сначала понадобится брокер. Примеры, включенные в это руководство, выполняются через Active MQ, брокер обмена сообщениями с открытым исходным кодом. В этом разделе я помогу вам установить сервер и реализовать простое приложение Spring, которое проверит, что все настроено правильно. Объяснение основано на системе Windows. Если у вас уже установлен сервер, просто пропустите этот раздел.
Первым шагом является загрузка сервера Apache MQ с Apache.org . После загрузки просто распакуйте его в папку по вашему выбору.
Для запуска сервера вам просто нужно выполнить файл activemq, который находится в папке apache-activemq-5.9.0 \ bin .
Хорошо, сервер работает. Теперь нам просто нужно реализовать приложение. Мы собираемся создать производителя, потребителя, файл конфигурации Spring и тест.
Производитель
Вы можете использовать любой класс Java вместо моего объекта TicketOrder
.
01
02
03
04
05
06
07
08
09
10
11
12
13
|
public class JmsProducer { @Autowired @Qualifier ( "jmsTemplate" ) private JmsTemplate jmsTemplate; public void convertAndSendMessage(TicketOrder order) { jmsTemplate.convertAndSend(order); } public void convertAndSendMessage(String destination, TicketOrder order) { jmsTemplate.convertAndSend(destination, order); } } |
Потребитель
1
2
3
4
5
6
7
8
|
public class SyncConsumer { @Autowired private JmsTemplate jmsTemplate; public TicketOrder receive() { return (TicketOrder) jmsTemplate.receiveAndConvert( "test.sync.queue" ); } } |
Конфигурационный файл Spring
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
< bean id = "consumer" class = "xpadro.spring.integration.consumer.SyncConsumer" /> < bean id = "producer" class = "xpadro.spring.integration.producer.JmsProducer" /> <!-- Infrastructure --> < bean id = "connectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory" > < property name = "brokerURL" value = "tcp://localhost:61616" /> </ bean > < bean id = "cachingConnectionFactory" class = "org.springframework.jms.connection.CachingConnectionFactory" > < property name = "targetConnectionFactory" ref = "connectionFactory" /> </ bean > < bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate" > < property name = "connectionFactory" ref = "cachingConnectionFactory" /> < property name = "defaultDestination" ref = "syncTestQueue" /> </ bean > <!-- Destinations --> < bean id = "syncTestQueue" class = "org.apache.activemq.command.ActiveMQQueue" > < constructor-arg value = "test.sync.queue" /> </ bean > |
Тест
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@ContextConfiguration (locations = { "/xpadro/spring/integration/test/jms-config.xml" }) @RunWith (SpringJUnit4ClassRunner. class ) public class TestJmsConfig { @Autowired private JmsProducer producer; @Autowired private SyncConsumer consumer; @Test public void testReceiving() throws InterruptedException, RemoteException { TicketOrder order = new TicketOrder( 1 , 5 , new Date()); //Sends the message to the jmsTemplate's default destination producer.convertAndSendMessage(order); Thread.sleep( 2000 ); TicketOrder receivedOrder = consumer.receive(); assertNotNull(receivedOrder); assertEquals( 1 , receivedOrder.getFilmId()); assertEquals( 5 , receivedOrder.getQuantity()); } } |
Если тест пройден, у вас все правильно настроено. Теперь мы можем перейти к следующему разделу.
3. JMS-адаптеры: прием
Spring Integration предоставляет несколько адаптеров и шлюзов для получения сообщений из очереди или темы JMS. Эти адаптеры кратко обсуждаются ниже:
- Адаптер входящего канала . Он внутренне использует шаблон JmsTemplate для активного получения сообщений из очереди или темы JMS.
- Канальный управляемый сообщениями адаптер : он внутренне использует контейнер Spring MessageListener для пассивного получения сообщений.
3.1. Адаптеры входящего канала: активный прием
В этом разделе объясняется, как использовать первый адаптер, описанный в предыдущем разделе.
Адаптер входящего канала JMS активно опрашивает очередь, чтобы извлечь из нее сообщения. Так как он использует опросчик, вам придется определить его в конфигурационном файле Spring. Как только адаптер извлечет сообщение, оно будет отправлено в систему обмена сообщениями через указанный канал сообщений. Затем мы можем обработать сообщение с использованием конечных точек, таких как преобразователи, фильтры и т. Д. Или отправить его активатору службы.
В этом примере извлекается сообщение о заказе билета из очереди JMS и отправляется его активатору службы, который обработает его и подтвердит заказ. Заказ подтверждается отправкой его в какое-то хранилище, в котором есть простой список со всеми зарегистрированными заказами.
Мы используем того же производителя, что и в разделе «2 подготовки окружающей среды»:
01
02
03
04
05
06
07
08
09
10
|
< bean id = "producer" class = "xpadro.spring.integration.producer.JmsProducer" /> <!-- Infrastructure --> <!-- Connection factory and jmsTemplate configuration --> <!-- as seen in the second section --> <!-- Destinations --> < bean id = "toIntQueue" class = "org.apache.activemq.command.ActiveMQQueue" > < constructor-arg value = "int.sync.queue" /> </ bean > |
Тест будет использовать производителя для отправки сообщения в «toIntQueue». Теперь мы собираемся настроить конфигурацию Spring Integration:
интеграция-jms.xml
1
2
3
4
5
6
7
8
9
|
< context:component-scan base-package = "xpadro.spring.integration" /> < int-jms:inbound-channel-adapter id = "jmsAdapter" destination = "toIntQueue" channel = "jmsChannel" /> < int:channel id = "jmsChannel" /> < int:service-activator method = "processOrder" input-channel = "jmsChannel" ref = "ticketProcessor" /> < int:poller id = "poller" default = "true" fixed-delay = "1000" /> |
Адаптер входящего канала JMS будет использовать определенный опросчик для извлечения сообщений из «toIntQueue». Для адаптера необходимо настроить опросник, иначе он выдаст исключение времени выполнения. В этом случае мы определили поллер по умолчанию. Это означает, что любая конечная точка, которая нуждается в опросчике, будет использовать эту. Если вы не настроите опрашивающий по умолчанию, вам нужно будет определить определенный опрашивающий для каждой конечной точки, которая активно получает сообщения.
Потребитель
Активатор службы — это просто bean-компонент (автоматически определяется при сканировании компонента):
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
@Component ( "ticketProcessor" ) public class TicketProcessor { private static final Logger logger = LoggerFactory.getLogger(TicketProcessor. class ); private static final String ERROR_INVALID_ID = "Order ID is invalid" ; @Autowired private OrderRepository repository; public void processOrder(TicketOrder order) { logger.info( "Processing order {}" , order.getFilmId()); if (isInvalidOrder(order)) { logger.info( "Error while processing order [{}]" , ERROR_INVALID_ID); throw new InvalidOrderException(ERROR_INVALID_ID); } float amount = 5 .95f * order.getQuantity(); TicketConfirmation confirmation = new TicketConfirmation( "123" , order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount); repository.confirmOrder(confirmation); } private boolean isInvalidOrder(TicketOrder order) { if (order.getFilmId() == - 1 ) { return true ; } return false ; } } |
В предыдущем фрагменте кода метод TicketOrder
получает объект TicketOrder
и непосредственно обрабатывает его. Однако вместо этого вы можете определить Message <?> Или Message <TicketOrder>, чтобы получить сообщение. Таким образом, вы будете иметь доступ как к полезной нагрузке сообщения, так и к его заголовкам.
Также обратите внимание, что метод возвращает void. Нам не нужно ничего возвращать, поскольку поток сообщений заканчивается здесь. При необходимости вы также можете определить канал ответа для сервисного адаптера и вернуть подтверждение. Кроме того, мы бы затем подписали конечную точку или шлюз на этот канал ответа, чтобы отправить подтверждение в другую очередь JMS, отправить его веб-службе или сохранить, например, в базе данных.
Наконец, давайте посмотрим на тест, чтобы увидеть, как он все выполняется:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@ContextConfiguration (locations = { "/xpadro/spring/integration/test/jms-config.xml" , "/xpadro/spring/integration/test/int-jms-config.xml" }) @RunWith (SpringJUnit4ClassRunner. class ) public class TestIntegrationJmsConfig { @Autowired private JmsProducer producer; @Autowired private OrderRepository repository; @Test public void testSendToIntegration() throws InterruptedException, RemoteException { TicketOrder order = new TicketOrder( 1 , 5 , new Date()); //Sends the message to the jmsTemplate's default destination producer.convertAndSendMessage( "int.sync.queue" , order); Thread.sleep( 4000 ); assertEquals( 1 , repository.getConfirmations().size()); assertNotNull(repository.getConfirmations().get( 0 )); TicketConfirmation conf = repository.getConfirmations().get( 0 ); assertEquals( "123" , conf.getId()); } } |
Я установил Thread.sleep
в четыре секунды, чтобы дождаться отправки сообщения. Мы могли бы использовать цикл while, чтобы проверить, получено ли сообщение до истечения времени ожидания.
3.2. Адаптеры входящего канала: пассивный прием
Во второй части раздела приема JMS используется адаптер канала, управляемый сообщениями. Таким образом, как только сообщение будет отправлено в очередь, оно будет доставлено адаптеру без необходимости использования программ опроса. Именно по каналу сообщений мы доставляем сообщение своим подписчикам.
Пример очень похож на тот, что мы видели в предыдущем разделе. Я просто покажу, какие изменения внесены в конфигурацию.
Единственное, что я изменил в предыдущем примере, — это конфигурация пружинной интеграции:
1
2
3
4
5
6
7
|
< context:component-scan base-package = "xpadro.spring.integration" /> < int-jms:message-driven-channel-adapter id = "jmsAdapter" destination = "toIntQueue" channel = "jmsChannel" /> < int:channel id = "jmsChannel" /> < int:service-activator method = "processOrder" input-channel = "jmsChannel" ref = "ticketProcessor" /> |
Я удалил модуль опроса и изменил входящий адаптер JMS для адаптера канала, управляемого сообщениями. Это оно; адаптер будет пассивно получать сообщения и доставлять их в jmsChannel
.
Учтите, что адаптеру приемника сообщений требуется хотя бы одна из следующих комбинаций:
- Контейнер слушателя сообщений.
- Фабрика связи и пункт назначения.
В нашем примере мы использовали второй вариант. Место назначения указывается в конфигурации адаптера, а фабрика соединений определяется в файле jms-config, который также импортируется тестом.
4. JMS-адаптеры: отправка
В предыдущем разделе мы видели, как получать сообщения, отправленные в очередь JMS внешней системой. В этом разделе показаны адаптеры исходящих каналов, которые позволяют отправлять сообщения JMS за пределы нашей системы.
В отличие от входящих адаптеров, существует только один тип исходящих адаптеров. Этот адаптер использует внутренний JmsTemplate
для отправки сообщения, и для настройки этого адаптера вам нужно будет указать хотя бы одно из следующего:
- Шаблон Jms.
- Фабрика связи и пункт назначения.
Как и во входящем примере, мы используем второй вариант для отправки сообщения в очередь JMS. Конфигурация выглядит следующим образом:
Для этого примера мы собираемся создать новую очередь в конфигурации jms (jms-config.xml). Именно здесь наше приложение Spring Integration отправит сообщение:
1
2
3
|
< bean id = "toJmsQueue" class = "org.apache.activemq.command.ActiveMQQueue" > < constructor-arg value = "to.jms.queue" /> </ bean > |
Хорошо, теперь мы настраиваем конфигурацию интеграции с исходящим адаптером JMS:
1
2
3
4
5
6
7
8
|
< context:component-scan base-package = "xpadro.spring.integration" /> < int:gateway default-request-channel = "requestChannel" service-interface = "xpadro.spring.integration.service.TicketService" /> < int:channel id = "requestChannel" /> < int-jms:outbound-channel-adapter id = "jmsAdapter" channel = "requestChannel" destination = "toJmsQueue" /> |
Мы используем шлюз в качестве входа в нашу систему обмена сообщениями. Тест будет использовать этот интерфейс для отправки нового объекта TicketOrder
. Шлюз получит сообщение и поместит его в канал requestChannel
. Поскольку это прямой канал , он будет отправлен на адаптер исходящего канала JMS.
Адаптер получает сообщение интеграции Spring. Затем он может отправить сообщение двумя способами:
- Преобразуйте сообщение в сообщение JMS. Это делается путем установки атрибута адаптера «extract-payload» в значение true, которое является значением по умолчанию. Это вариант, который мы использовали в примере.
- Отправьте сообщение как есть, сообщение интеграции Spring. Вы можете сделать это, установив для атрибута «extract-payload» значение false.
Это решение зависит от того, какая система ожидает вашего сообщения. Если другое приложение является приложением Spring Integration, вы можете использовать второй подход. В противном случае используйте значение по умолчанию. В нашем примере с другой стороны есть простое Spring JMS-приложение. Таким образом, мы должны выбрать первый вариант.
Продолжая наш пример, теперь мы посмотрим на тест, который использует интерфейс шлюза для отправки сообщения и пользовательский потребитель для его получения. В этом тесте потребитель будет играть роль приложения JMS, которое использует jmsTemplate
для извлечения его из очереди JMS:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@ContextConfiguration (locations = { "/xpadro/spring/integration/test/jms-config.xml" , "/xpadro/spring/integration/test/int-jms-out-config.xml" }) @RunWith (SpringJUnit4ClassRunner. class ) public class TestIntegrationJmsOutboundConfig { @Autowired private SyncConsumer consumer; @Autowired private TicketService service; @Test public void testSendToJms() throws InterruptedException, RemoteException { TicketOrder order = new TicketOrder( 1 , 5 , new Date()); service.sendOrder(order); TicketOrder receivedOrder = consumer.receive( "to.jms.queue" ); assertNotNull(receivedOrder); assertEquals( 1 , receivedOrder.getFilmId()); assertEquals( 5 , receivedOrder.getQuantity()); } } |
5. Использование шлюзов
Помимо адаптеров каналов Spring Integration предоставляет входящие и исходящие шлюзы. Как вы помните из предыдущих руководств, шлюзы обеспечивают двунаправленную связь с внешними системами, то есть операции отправки и получения или получения и ответа. В этом случае он разрешает операции запроса или повторной попытки.
В этом разделе мы рассмотрим пример использования исходящего шлюза JMS. Шлюз отправит сообщение JMS в очередь и будет ожидать ответа. Если ответ не отправляется обратно, шлюз выдаст исключение MessageTimeoutException .
Конфигурация Spring Integration
01
02
03
04
05
06
07
08
09
10
11
12
13
|
< context:component-scan base-package = "xpadro.spring.integration" /> < int:gateway id = "inGateway" default-request-channel = "requestChannel" service-interface = "xpadro.spring.integration.service.TicketService" /> < int:channel id = "requestChannel" /> < int-jms:outbound-gateway id = "outGateway" request-destination = "toAsyncJmsQueue" request-channel = "requestChannel" reply-channel = "jmsReplyChannel" /> < int:channel id = "jmsReplyChannel" /> < int:service-activator method = "registerOrderConfirmation" input-channel = "jmsReplyChannel" ref = "ticketProcessor" /> |
Поток выглядит следующим образом:
- TicketOrder, включенный в Spring Integration Message, попадет в систему обмена сообщениями через шлюз inGateway.
- Шлюз поместит сообщение в канал requestChannel.
- Канал отправляет сообщение своей подписанной конечной точке, исходящему шлюзу JMS.
- Исходящий шлюз JMS извлекает полезную нагрузку из сообщения и переносит его в сообщение JMS.
- Шлюз отправляет сообщение и ожидает ответа.
- Когда приходит ответ, в форме TicketConfirmation, заключенного в сообщение JMS, шлюз получает полезную нагрузку и упаковывает ее в сообщение Spring Integration.
- Сообщение отправляется на канал «jmsReplyChannel», где активатор службы (TicketProcessor) обработает его и зарегистрирует в нашем OrderRepository.
Процессор заказа довольно прост. Он получает TicketConfirmation и добавляет его в хранилище билетов:
1
2
3
4
5
6
7
8
9
|
@Component ( "ticketProcessor" ) public class TicketProcessor { @Autowired private OrderRepository repository; public void registerOrderConfirmation(TicketConfirmation confirmation) { repository.confirmOrder(confirmation); } } |
Тест
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@RunWith (SpringJUnit4ClassRunner. class ) public class TestIntegrationJmsOutGatewayConfig { @Autowired private OrderRepository repository; @Autowired private TicketService service; @Test public void testSendToJms() throws InterruptedException, RemoteException { TicketOrder order = new TicketOrder( 1 , 5 , new Date()); service.sendOrder(order); Thread.sleep( 4000 ); assertEquals( 1 , repository.getConfirmations().size()); assertNotNull(repository.getConfirmations().get( 0 )); TicketConfirmation conf = repository.getConfirmations().get( 0 ); assertEquals( "321" , conf.getId()); } } |
Внешняя система
Чтобы полностью понять пример, я покажу вам, что происходит, когда сообщение доставляется в очередь JMS.
Прослушивая очередь, куда сообщение было отправлено Spring Integration, есть прослушиватель asyncConsumer
:
1
2
3
4
5
6
7
8
|
< bean id = "toAsyncJmsQueue" class = "org.apache.activemq.command.ActiveMQQueue" > < constructor-arg value = "to.async.jms.queue" /> </ bean > <!-- Listeners --> < jms:listener-container connection-factory = "connectionFactory" > < jms:listener destination = "to.async.jms.queue" ref = "asyncConsumer" /> </ jms:listener-container > |
Слушатель получает сообщение, создает новое сообщение с подтверждением заявки и отвечает. Обратите внимание, что мы должны установить идентификатор корреляции ответного сообщения с тем же значением, что и сообщение запроса. Это позволит клиенту узнать, на какое сообщение мы отвечаем. Кроме того, мы устанавливаем пункт назначения для канала ответа, настроенного в сообщении запроса.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
@Component ( "asyncConsumer" ) public class AsyncConsumer implements MessageListener { @Autowired private JmsTemplate template; @Override public void onMessage(Message order) { final Message msgOrder = order; TicketOrder orderObject; try { orderObject = (TicketOrder) ((ObjectMessage) order).getObject(); } catch (JMSException e) { throw JmsUtils.convertJmsAccessException(e); } float amount = 5 .95f * orderObject.getQuantity(); TicketConfirmation confirmation = new TicketConfirmation( "321" , orderObject.getFilmId(), orderObject.getOrderDate(), orderObject.getQuantity(), amount); try { template.convertAndSend(msgOrder.getJMSReplyTo(), confirmation, new MessagePostProcessor() { public Message postProcessMessage(Message message) throws JMSException { message.setJMSCorrelationID(msgOrder.getJMSCorrelationID()); return message; } }); } catch (JmsException | JMSException e) { throw JmsUtils.convertJmsAccessException((JMSException) e); } } } |
6. Преобразование сообщений
И адаптеры каналов сообщений, и шлюзы используют конвертер сообщений для преобразования входящих сообщений в типы Java или наоборот. Конвертер должен реализовывать интерфейс MessageConverter:
1
2
3
4
5
6
7
|
public interface MessageConverter { <P> Message<P> toMessage(Object object); <P> Object fromMessage(Message<P> message); } |
Spring Integration поставляется с двумя реализациями интерфейса MessageConverter
:
MapMessageConverter
Его метод fromMessage
создает новый HashMap с двумя ключами:
- полезная нагрузка: значение — полезная нагрузка сообщения (
message.getPayload
). - headers: значение — это еще одна HashMap со всеми заголовками из исходного сообщения.
Метод «toMessage» ожидает экземпляр Map с той же структурой (ключи полезной нагрузки и заголовки) и создает сообщение Spring Integration.
SimpleMessageConverter
Это конвертер по умолчанию, используемый адаптерами и шлюзами. Вы можете видеть из исходного кода, что он конвертирует из / в объект:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
public Message<?> toMessage(Object object) throws Exception { if (object == null ) { return null ; } if (object instanceof Message<?>) { return (Message<?>) object; } return MessageBuilder.withPayload(object).build(); } public Object fromMessage(Message<?> message) throws Exception { return (message != null ) ? message.getPayload() : null ; } |
В любом случае, если вам нужна собственная реализация, вы можете указать свой собственный конвертер в конфигурации канала или шлюза. Например, используя шлюз:
1
2
3
|
< int-jms:outbound-gateway id = "outGateway" request-destination = "toAsyncJmsQueue" request-channel = "requestChannel" reply-channel = "jmsReplyChannel" message-converter = "myConverter" /> |
Просто помните, что ваш конвертер должен реализовывать MessageConverter:
1
2
|
@Component ( "myConverter" ) public class MyConverter implements MessageConverter { |
7. Каналы сообщений, поддерживаемые JMS
Канальные адаптеры и шлюзы используются для связи с внешними системами. Каналы сообщений с поддержкой JMS используются для отправки и получения сообщений JMS между потребителями и производителями, которые находятся в одном приложении. Хотя в этой ситуации мы все еще можем использовать канальные адаптеры, гораздо проще использовать канал JMS. Отличие от канала сообщений интеграции заключается в том, что канал JMS будет использовать JMS-брокер для отправки сообщения. Это означает, что сообщение не будет просто сохранено в канале в памяти. Вместо этого он будет отправлен поставщику JMS, что позволит также использовать транзакции. Если вы используете транзакции, это будет работать следующим образом:
- Производитель, отправляющий сообщение в канал, поддерживаемый JMS, не запишет его, если откат транзакции.
- Потребитель, подписанный на канал, поддерживаемый JMS, не удалит из него сообщение, если транзакция откатывается.
Для этой функции в Spring Integration предусмотрены оба канала: точка-точка и каналы публикации / подписки. Они настроены ниже:
прямой канал точка-точка
1
|
< int-jms:channel id = "jmsChannel" queue = "myQueue" /> |
канал публикации / подписки
1
|
< int-jms:publish-subscribe-channel id = "jmsChannel" topic = "myTopic" /> |
В следующем примере мы видим простое приложение с двумя конечными точками, связывающимися друг с другом с использованием канала с поддержкой JMS.
конфигурация
Сообщения, отправленные в систему обмена сообщениями (объекты TicketOrder
), поступают к активатору службы, обработчику билетов. Затем этот процессор отправляет заказ ( sendJMS
) в сообщение с поддержкой JMS. Подписанный на этот канал, есть тот же процессор, который будет получать сообщение ( receiveJms
), обрабатывать его, создавая TicketConfirmation
и регистрируя его в хранилище билетов:
01
02
03
04
05
06
07
08
09
10
11
12
|
< context:component-scan base-package = "xpadro.spring.integration" /> < int:gateway default-request-channel = "requestChannel" service-interface = "xpadro.spring.integration.service.TicketService" /> < int:channel id = "requestChannel" /> < int:service-activator method = "sendJms" input-channel = "requestChannel" output-channel = "jmsChannel" ref = "ticketJmsProcessor" /> < int-jms:channel id = "jmsChannel" queue = "syncTestQueue" /> < int:service-activator method = "receiveJms" input-channel = "jmsChannel" ref = "ticketJmsProcessor" /> |
Процессор
Реализует оба метода: sendJms
и receiveJms
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@Component ( "ticketJmsProcessor" ) public class TicketJmsProcessor { private static final Logger logger = LoggerFactory.getLogger(TicketJmsProcessor. class ); @Autowired private OrderRepository repository; public TicketOrder sendJms(TicketOrder order) { logger.info( "Sending order {}" , order.getFilmId()); return order; } public void receiveJms(TicketOrder order) { logger.info( "Processing order {}" , order.getFilmId()); float amount = 5 .95f * order.getQuantity(); TicketConfirmation confirmation = new TicketConfirmation( "123" , order.getFilmId(), order.getOrderDate(), order.getQuantity(), amount); repository.confirmOrder(confirmation); } } |
Тест
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@ContextConfiguration (locations = { "/xpadro/spring/integration/test/jms-config.xml" , "/xpadro/spring/integration/test/int-jms-jms-config.xml" }) @RunWith (SpringJUnit4ClassRunner. class ) public class TestIntegrationJmsToJmsConfig { @Autowired private OrderRepository repository; @Autowired private TicketService service; @Test public void testSendToJms() throws InterruptedException, RemoteException { TicketOrder order = new TicketOrder( 1 , 5 , new Date()); service.sendOrder(order); Thread.sleep( 4000 ); assertEquals( 1 , repository.getConfirmations().size()); assertNotNull(repository.getConfirmations().get( 0 )); TicketConfirmation conf = repository.getConfirmations().get( 0 ); assertEquals( "123" , conf.getId()); } } |
Каналы с поддержкой JMS предлагают различные возможности, такие как настройка имени очереди вместо ссылки на очередь или использование резолвера назначения:
1
2
|
< int-jms:channel id = "jmsChannel" queue-name = "myQueue" destination-resolver = "myDestinationResolver" /> |
8. Динамическое разрешение пункта назначения
Определитель адресатов — это класс, который позволяет нам преобразовывать имена адресатов в JMS-адреса. Любой конечный распознаватель должен реализовывать следующий интерфейс:
1
2
3
4
|
public interface DestinationResolver { Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException; } |
Определители получателей могут быть указаны на адаптерах каналов JMS, шлюзах JMS и каналах с поддержкой JMS. Если вы не сконфигурируете целевой преобразователь явно, Spring будет использовать реализацию по умолчанию, то есть DynamicDestinationResolver . Этот преобразователь объясняется ниже как другие реализации, предоставляемые Spring:
- DynamicDestinationResolver : Разрешает имена получателей в качестве динамических назначений, используя стандартные методы JMS Session.createTopic и Session.createQueue.
- BeanFactoryDestinationResolver : он будет искать в контексте Spring бин с именем, таким как имя получателя, и ожидает, что он будет иметь тип javax.jms.Destination . Если он не может его найти, он выдаст исключение DestinationResolutionException .
- JndiDestinationResolver : предполагается, что имя получателя является местоположением JNDI.
Если мы не хотим использовать динамический распознаватель по умолчанию, мы можем реализовать собственный распознаватель и настроить его в нужной конечной точке. Например, следующий канал с поддержкой JMS использует другую реализацию:
1
2
|
< int-jms:channel id = "jmsChannel" queue-name = "myQueue" destination-resolver = "myDestinationResolver" /> |
9. Интеграция AMQP
9.1. Установка
Для установки и запуска сервера RabbitMQ вам просто нужно выполнить шаги, описанные ниже. Если у вас уже установлен сервер, просто пропустите этот раздел.
- Первым шагом является установка erlang, необходимого для сервера RabbitMQ. Перейдите по следующему URL-адресу, загрузите версию своей системы и установите ее:
1
|
> rabbitmq-plugins enable rabbitmq_management |
1
|
> rabbitmq-server.bat |
Хорошо, теперь мы проверим, что RabbitMQ правильно установлен. Перейдите на http: // localhost: 15672 и войдите, используя «guest» в качестве имени пользователя и пароля. Если вы используете версию до 3.0, то порт будет 55672.
Если вы видите веб-интерфейс, то все готово.
9.2. Демо приложение
Чтобы использовать AMQP с Spring Integration, нам нужно добавить следующие зависимости в наш файл pom.xml:
Spring AMQP (для кролика MQ)
1
2
3
4
5
|
< dependency > < groupId >org.springframework.amqp</ groupId > < artifactId >spring-rabbit</ artifactId > < version >1.3.1.RELEASE</ version > </ dependency > |
Конечные точки AMQP Spring Integration
1
2
3
4
5
|
< dependency > < groupId >org.springframework.integration</ groupId > < artifactId >spring-integration-amqp</ artifactId > < version >3.0.2.RELEASE</ version > </ dependency > |
Теперь мы собираемся создать новый файл конфигурации amqp-config.xml, который будет содержать конфигурацию rabbitMQ (например, jms-config для JMS, которую мы использовали ранее в этом руководстве).
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> < rabbit:connection-factory id = "connectionFactory" /> < rabbit:template id = "amqpTemplate" connection-factory = "connectionFactory" /> < rabbit:admin connection-factory = "connectionFactory" /> < rabbit:queue name = "rabbit.queue" /> < rabbit:direct-exchange name = "rabbit.exchange" > < rabbit:bindings > < rabbit:binding queue = "rabbit.queue" key = "rabbit.key.binding" /> </ rabbit:bindings > </ rabbit:direct-exchange > </ beans > |
Следующий файл — это файл интеграции Spring, который содержит каналы и адаптеры каналов:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd"> < context:component-scan base-package = "xpadro.spring.integration.amqp" /> < int:gateway default-request-channel = "requestChannel" service-interface = "xpadro.spring.integration.amqp.service.AMQPService" /> < int:channel id = "requestChannel" /> < int-amqp:outbound-channel-adapter channel = "requestChannel" amqp-template = "amqpTemplate" exchange-name = "rabbit.exchange" routing-key = "rabbit.key.binding" /> < int-amqp:inbound-channel-adapter channel = "responseChannel" queue-names = "rabbit.queue" connection-factory = "connectionFactory" /> < int:channel id = "responseChannel" /> < int:service-activator ref = "amqpProcessor" method = "process" input-channel = "responseChannel" /> </ beans > |
Поток выглядит следующим образом:
- Тестовое приложение отправляет сообщение, которое будет простой строкой, на шлюз.
- От шлюза он достигнет адаптера исходящего канала через канал «requestChannel».
- Адаптер исходящего канала отправляет сообщение в очередь «rabbit.queue».
- Подписавшись на эту очередь «rabbit.queue», мы настроили адаптер входящего канала. Он будет получать сообщения, отправленные в очередь.
- Сообщение отправляется активатору услуги по каналу responseChannel.
- Сервисный активатор просто печатает сообщение.
Шлюз, который служит точкой входа в систему обмена сообщениями, содержит единственный метод:
1
2
3
4
|
public interface AMQPService { @Gateway public void sendMessage(String message); } |
Активатор службы amqpProcessor очень прост; он получает сообщение и печатает свою полезную нагрузку:
1
2
3
4
5
6
7
|
@Component ( "amqpProcessor" ) public class AmqpProcessor { public void process(Message<String> msg) { System.out.println( "Message received: " +msg.getPayload()); } } |
Чтобы закончить с примером, вот приложение, которое инициирует поток, вызывая сервис, обернутый шлюзом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
@ContextConfiguration (locations = { "/xpadro/spring/integration/test/amqp-config.xml" , "/xpadro/spring/integration/test/int-amqp-config.xml" }) @RunWith (SpringJUnit4ClassRunner. class ) public class TestIntegrationAMQPConfig { @Autowired private AMQPService service; @Test public void testSendToJms() throws InterruptedException, RemoteException { String msg = "hello" ; service.sendMessage(msg); Thread.sleep( 2000 ); } } |