В моем предыдущем посте я писал об очень простом сценарии интеграции между двумя системами — одна генерирует рабочий блок, а другая — этот рабочий блок, и как Spring Integration делает такую интеграцию очень простой.
Здесь я продемонстрирую, как этот сценарий интеграции может быть еще более упрощен с помощью Spring Cloud Stream.
У меня есть пример кода, доступный здесь — правильные maven-зависимости для Spring Cloud Stream доступны в pom.xml .
Режиссер
Итак, снова начнем с производителя, ответственного за создание рабочих единиц. Все, что нужно сделать с помощью кода, чтобы отправлять сообщения в RabbitMQ, это иметь конфигурацию Java по следующим направлениям:
1
2
3
4
|
@Configuration @EnableBinding (WorkUnitsSource. class ) @IntegrationComponentScan public class IntegrationConfiguration {} |
Это выглядит обманчиво простым, но многое делает под прикрытием, из того, что я могу понять и почерпнуть из документации, вот что вызывает эта конфигурация:
1. Создаются каналы сообщений Spring Integration на основе классов, связанных с аннотацией @EnableBinding. Приведенный выше класс WorkUnitsSource является определением пользовательского канала с именем worksChannel и выглядит следующим образом:
01
02
03
04
05
06
07
08
09
10
11
|
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface WorkUnitsSource { String CHANNEL_NAME = "worksChannel" ; @Output MessageChannel worksChannel(); } |
2. Исходя из того, какая реализация «связующего» доступна во время выполнения (скажем, RabbitMQ, Kaffka, Redis, Gemfire), канал на предыдущем шаге будет связан с соответствующими структурами в системе — так, например, я хочу, чтобы мой « WorksChannel », чтобы в свою очередь отправлять сообщения в RabbitMQ, Spring Cloud Stream позаботится об автоматическом создании темы обмена в RabbitMQ.
Я хотел еще кое-что изменить с точки зрения того, как данные отправляются в RabbitMQ — в частности, я хотел, чтобы мои доменные объекты были сериализованы в json перед отправкой, и я хочу указать имя обмена RabbitMQ, на который отправляется полезная нагрузка, это управляется определенными конфигурациями, которые можно подключить к каналу следующим образом, используя файл yaml:
1
2
3
4
5
6
7
8
|
spring: cloud: stream: bindings: worksChannel: destination: work.exchange contentType: application/json group: testgroup |
И последняя деталь — способ взаимодействия остальной части приложения с Spring Cloud Stream. Это можно сделать непосредственно в Spring Integration путем определения шлюза сообщений:
01
02
03
04
05
06
07
08
09
10
|
import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import works.service.domain.WorkUnit; @MessagingGateway public interface WorkUnitGateway { @Gateway (requestChannel = WorkUnitsSource.CHANNEL_NAME) void generate(WorkUnit workUnit); } |
По сути, Spring Cloud Stream теперь подключает весь поток интеграции Spring, создает соответствующие структуры в RabbitMQ.
потребитель
По аналогии с источником, сначала я хочу определить канал с именем worksChannel, который будет обрабатывать входящее сообщение от RabbitMQ:
1
2
3
4
5
6
7
8
9
|
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface WorkUnitsSink { String CHANNEL_NAME = "worksChannel" ; @Input SubscribableChannel worksChannel(); } |
и пусть Spring Cloud Stream создаст каналы и привязки RabbitMQ на основе этого определения:
1
2
3
4
5
6
|
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.context.annotation.Configuration; @Configuration @EnableBinding (WorkUnitsSink. class ) public class IntegrationConfiguration {} |
Для обработки сообщений Spring Cloud Stream предоставляет прослушиватель, который можно создать следующим образом:
1
2
3
4
5
6
7
8
9
|
@Service public class WorkHandler { private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler. class ); @StreamListener (WorkUnitsSink.CHANNEL_NAME) public void process(WorkUnit workUnit) { LOGGER.info( "Handling work unit - id: {}, definition: {}" , workUnit.getId(), workUnit.getDefinition()); } } |
И, наконец, конфигурация, которая соединяет этот канал с инфраструктурой RabbitMQ, выраженная в файле yaml:
1
2
3
4
5
6
7
|
spring: cloud: stream: bindings: worksChannel: destination: work.exchange group: testgroup |
Теперь, если производитель и любое количество потребителей были запущены, сообщение, отправленное через производителя, будет отправлено в обмен темой Rabbit MQ в виде json, получено потребителем, десериализовано в объект и передано в рабочий процессор.
Значительное количество вспомогательного элемента, участвующего в создании инфраструктуры RabbitMQ, теперь обрабатывается исключительно конвенциями библиотек Spring Cloud Stream. Несмотря на то, что Spring Cloud Stream пытается предоставить представление о простой интеграции Spring, полезно иметь базовые знания по интеграции Spring для эффективного использования Spring Cloud Stream.
Пример, описанный здесь, доступен в моем репозитории github.
Ссылка: | Интеграция с RabbitMQ с использованием Spring Cloud Stream от нашего партнера по JCG Бижу Кунджуммена из блога all and sundry. |